All of lore.kernel.org
 help / color / mirror / Atom feed
From: John Snow <jsnow@redhat.com>
To: qemu-devel@nongnu.org
Cc: Eduardo Habkost <ehabkost@redhat.com>,
	Eric Blake <eblake@redhat.com>,
	Stefan Hajnoczi <stefanha@redhat.com>,
	Markus Armbruster <armbru@redhat.com>,
	Wainer dos Santos Moschetta <wainersm@redhat.com>,
	"Niteesh G . S ." <niteesh.gs@gmail.com>,
	Willian Rampazzo <wrampazz@redhat.com>,
	Cleber Rosa <crosa@redhat.com>, John Snow <jsnow@redhat.com>
Subject: [PATCH v4 09/27] python/aqmp: add AsyncProtocol.accept() method
Date: Wed, 15 Sep 2021 12:29:37 -0400	[thread overview]
Message-ID: <20210915162955.333025-10-jsnow@redhat.com> (raw)
In-Reply-To: <20210915162955.333025-1-jsnow@redhat.com>

It's a little messier than connect, because it wasn't designed to accept
*precisely one* connection. Such is life.

Signed-off-by: John Snow <jsnow@redhat.com>
Reviewed-by: Eric Blake <eblake@redhat.com>
---
 python/qemu/aqmp/protocol.py | 89 ++++++++++++++++++++++++++++++++++--
 1 file changed, 85 insertions(+), 4 deletions(-)

diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py
index 1dfd12895d..62c26ede5a 100644
--- a/python/qemu/aqmp/protocol.py
+++ b/python/qemu/aqmp/protocol.py
@@ -243,6 +243,24 @@ async def runstate_changed(self) -> Runstate:
         await self._runstate_event.wait()
         return self.runstate
 
+    @upper_half
+    @require(Runstate.IDLE)
+    async def accept(self, address: Union[str, Tuple[str, int]],
+                     ssl: Optional[SSLContext] = None) -> None:
+        """
+        Accept a connection and begin processing message queues.
+
+        If this call fails, `runstate` is guaranteed to be set back to `IDLE`.
+
+        :param address:
+            Address to listen to; UNIX socket path or TCP address/port.
+        :param ssl: SSL context to use, if any.
+
+        :raise StateError: When the `Runstate` is not `IDLE`.
+        :raise ConnectError: If a connection could not be accepted.
+        """
+        await self._new_session(address, ssl, accept=True)
+
     @upper_half
     @require(Runstate.IDLE)
     async def connect(self, address: Union[str, Tuple[str, int]],
@@ -308,7 +326,8 @@ def _set_state(self, state: Runstate) -> None:
     @upper_half
     async def _new_session(self,
                            address: Union[str, Tuple[str, int]],
-                           ssl: Optional[SSLContext] = None) -> None:
+                           ssl: Optional[SSLContext] = None,
+                           accept: bool = False) -> None:
         """
         Establish a new connection and initialize the session.
 
@@ -317,9 +336,10 @@ async def _new_session(self,
         to be set back to `IDLE`.
 
         :param address:
-            Address to connect to;
+            Address to connect to/listen on;
             UNIX socket path or TCP address/port.
         :param ssl: SSL context to use, if any.
+        :param accept: Accept a connection instead of connecting when `True`.
 
         :raise ConnectError:
             When a connection or session cannot be established.
@@ -333,7 +353,7 @@ async def _new_session(self,
 
         try:
             phase = "connection"
-            await self._establish_connection(address, ssl)
+            await self._establish_connection(address, ssl, accept)
 
             phase = "session"
             await self._establish_session()
@@ -367,6 +387,7 @@ async def _establish_connection(
             self,
             address: Union[str, Tuple[str, int]],
             ssl: Optional[SSLContext] = None,
+            accept: bool = False
     ) -> None:
         """
         Establish a new connection.
@@ -375,6 +396,7 @@ async def _establish_connection(
             Address to connect to/listen on;
             UNIX socket path or TCP address/port.
         :param ssl: SSL context to use, if any.
+        :param accept: Accept a connection instead of connecting when `True`.
         """
         assert self.runstate == Runstate.IDLE
         self._set_state(Runstate.CONNECTING)
@@ -384,7 +406,66 @@ async def _establish_connection(
         # otherwise yield.
         await asyncio.sleep(0)
 
-        await self._do_connect(address, ssl)
+        if accept:
+            await self._do_accept(address, ssl)
+        else:
+            await self._do_connect(address, ssl)
+
+    @upper_half
+    async def _do_accept(self, address: Union[str, Tuple[str, int]],
+                         ssl: Optional[SSLContext] = None) -> None:
+        """
+        Acting as the transport server, accept a single connection.
+
+        :param address:
+            Address to listen on; UNIX socket path or TCP address/port.
+        :param ssl: SSL context to use, if any.
+
+        :raise OSError: For stream-related errors.
+        """
+        self.logger.debug("Awaiting connection on %s ...", address)
+        connected = asyncio.Event()
+        server: Optional[asyncio.AbstractServer] = None
+
+        async def _client_connected_cb(reader: asyncio.StreamReader,
+                                       writer: asyncio.StreamWriter) -> None:
+            """Used to accept a single incoming connection, see below."""
+            nonlocal server
+            nonlocal connected
+
+            # A connection has been accepted; stop listening for new ones.
+            assert server is not None
+            server.close()
+            await server.wait_closed()
+            server = None
+
+            # Register this client as being connected
+            self._reader, self._writer = (reader, writer)
+
+            # Signal back: We've accepted a client!
+            connected.set()
+
+        if isinstance(address, tuple):
+            coro = asyncio.start_server(
+                _client_connected_cb,
+                host=address[0],
+                port=address[1],
+                ssl=ssl,
+                backlog=1,
+            )
+        else:
+            coro = asyncio.start_unix_server(
+                _client_connected_cb,
+                path=address,
+                ssl=ssl,
+                backlog=1,
+            )
+
+        server = await coro     # Starts listening
+        await connected.wait()  # Waits for the callback to fire (and finish)
+        assert server is None
+
+        self.logger.debug("Connection accepted.")
 
     @upper_half
     async def _do_connect(self, address: Union[str, Tuple[str, int]],
-- 
2.31.1



  parent reply	other threads:[~2021-09-15 16:43 UTC|newest]

Thread overview: 29+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-09-15 16:29 [PATCH v4 00/27] python: introduce Asynchronous QMP package John Snow
2021-09-15 16:29 ` [PATCH v4 01/27] python/aqmp: add asynchronous QMP (AQMP) subpackage John Snow
2021-09-15 16:29 ` [PATCH v4 02/27] python/aqmp: add error classes John Snow
2021-09-15 16:29 ` [PATCH v4 03/27] python/pylint: Add exception for TypeVar names ('T') John Snow
2021-09-15 16:29 ` [PATCH v4 04/27] python/aqmp: add asyncio compatibility wrappers John Snow
2021-09-15 16:29 ` [PATCH v4 05/27] python/aqmp: add generic async message-based protocol support John Snow
2021-09-15 16:29 ` [PATCH v4 06/27] python/aqmp: add runstate state machine to AsyncProtocol John Snow
2021-09-15 16:29 ` [PATCH v4 07/27] python/aqmp: Add logging utility helpers John Snow
2021-09-15 16:29 ` [PATCH v4 08/27] python/aqmp: add logging to AsyncProtocol John Snow
2021-09-15 16:29 ` John Snow [this message]
2021-09-15 16:29 ` [PATCH v4 10/27] python/aqmp: add configurable read buffer limit John Snow
2021-09-15 16:29 ` [PATCH v4 11/27] python/aqmp: add _cb_inbound and _cb_outbound logging hooks John Snow
2021-09-15 16:29 ` [PATCH v4 12/27] python/aqmp: add AsyncProtocol._readline() method John Snow
2021-09-15 16:29 ` [PATCH v4 13/27] python/aqmp: add QMP Message format John Snow
2021-09-15 16:29 ` [PATCH v4 14/27] python/aqmp: add well-known QMP object models John Snow
2021-09-15 16:29 ` [PATCH v4 15/27] python/aqmp: add QMP event support John Snow
2021-09-15 16:29 ` [PATCH v4 16/27] python/pylint: disable too-many-function-args John Snow
2021-09-15 16:29 ` [PATCH v4 17/27] python/aqmp: add QMP protocol support John Snow
2021-09-15 16:29 ` [PATCH v4 18/27] python/pylint: disable no-member check John Snow
2021-09-15 16:29 ` [PATCH v4 19/27] python/aqmp: Add message routing to QMP protocol John Snow
2021-09-15 16:29 ` [PATCH v4 20/27] python/aqmp: add execute() interfaces John Snow
2021-09-15 16:29 ` [PATCH v4 21/27] python/aqmp: add _raw() execution interface John Snow
2021-09-15 16:29 ` [PATCH v4 22/27] python/aqmp: add asyncio_run compatibility wrapper John Snow
2021-09-15 16:29 ` [PATCH v4 23/27] python/aqmp: add scary message John Snow
2021-09-15 16:29 ` [PATCH v4 24/27] python: bump avocado to v90.0 John Snow
2021-09-15 16:29 ` [PATCH v4 25/27] python/aqmp: add AsyncProtocol unit tests John Snow
2021-09-15 16:29 ` [PATCH v4 26/27] python/aqmp: add LineProtocol tests John Snow
2021-09-15 16:29 ` [PATCH v4 27/27] python/aqmp: Add Coverage.py support John Snow
2021-09-20 19:51 ` [PATCH v4 00/27] python: introduce Asynchronous QMP package John Snow

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210915162955.333025-10-jsnow@redhat.com \
    --to=jsnow@redhat.com \
    --cc=armbru@redhat.com \
    --cc=crosa@redhat.com \
    --cc=eblake@redhat.com \
    --cc=ehabkost@redhat.com \
    --cc=niteesh.gs@gmail.com \
    --cc=qemu-devel@nongnu.org \
    --cc=stefanha@redhat.com \
    --cc=wainersm@redhat.com \
    --cc=wrampazz@redhat.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.