All of lore.kernel.org
 help / color / mirror / Atom feed
From: "Scott Murray" <scott.murray@konsulko.com>
To: bitbake-devel@lists.openembedded.org,
	Richard Purdie <richard.purdie@linuxfoundation.org>,
	Joshua Watt <JPEWhacker@gmail.com>,
	Paul Barker <paul@pbarker.dev>
Subject: [PATCH v6 1/4] bitbake: asyncrpc: Defer all asyncio to child process
Date: Thu, 19 Aug 2021 12:46:41 -0400	[thread overview]
Message-ID: <22a4e6fc5913fdf78f5c93893b16472f0641ff29.1629388054.git.scott.murray@konsulko.com> (raw)
In-Reply-To: <cover.1629388054.git.scott.murray@konsulko.com>

From: Joshua Watt <JPEWhacker@gmail.com>

Reworks the async I/O API so that the async loop is only created in the
child process. This requires deferring the creation of the server until
the child process and a queue to transfer the bound address back to the
parent process

Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
[small loop -> self.loop fix in serv.py]
Signed-off-by: Scott Murray <scott.murray@konsulko.com>
---
 lib/bb/asyncrpc/serv.py | 118 ++++++++++++++++++++++++----------------
 lib/hashserv/server.py  |   4 +-
 2 files changed, 74 insertions(+), 48 deletions(-)

diff --git a/lib/bb/asyncrpc/serv.py b/lib/bb/asyncrpc/serv.py
index 4084f300..45628698 100644
--- a/lib/bb/asyncrpc/serv.py
+++ b/lib/bb/asyncrpc/serv.py
@@ -131,53 +131,58 @@ class AsyncServerConnection(object):
 
 
 class AsyncServer(object):
-    def __init__(self, logger, loop=None):
-        if loop is None:
-            self.loop = asyncio.new_event_loop()
-            self.close_loop = True
-        else:
-            self.loop = loop
-            self.close_loop = False
-
+    def __init__(self, logger):
         self._cleanup_socket = None
         self.logger = logger
+        self.start = None
+        self.address = None
+
+    @property
+    def loop(self):
+        return asyncio.get_event_loop()
 
     def start_tcp_server(self, host, port):
-        self.server = self.loop.run_until_complete(
-            asyncio.start_server(self.handle_client, host, port, loop=self.loop)
-        )
-
-        for s in self.server.sockets:
-            self.logger.debug('Listening on %r' % (s.getsockname(),))
-            # Newer python does this automatically. Do it manually here for
-            # maximum compatibility
-            s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
-            s.setsockopt(socket.SOL_TCP, socket.TCP_QUICKACK, 1)
-
-        name = self.server.sockets[0].getsockname()
-        if self.server.sockets[0].family == socket.AF_INET6:
-            self.address = "[%s]:%d" % (name[0], name[1])
-        else:
-            self.address = "%s:%d" % (name[0], name[1])
+        def start_tcp():
+            self.server = self.loop.run_until_complete(
+                asyncio.start_server(self.handle_client, host, port)
+            )
+
+            for s in self.server.sockets:
+                self.logger.debug('Listening on %r' % (s.getsockname(),))
+                # Newer python does this automatically. Do it manually here for
+                # maximum compatibility
+                s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
+                s.setsockopt(socket.SOL_TCP, socket.TCP_QUICKACK, 1)
+
+            name = self.server.sockets[0].getsockname()
+            if self.server.sockets[0].family == socket.AF_INET6:
+                self.address = "[%s]:%d" % (name[0], name[1])
+            else:
+                self.address = "%s:%d" % (name[0], name[1])
+
+        self.start = start_tcp
 
     def start_unix_server(self, path):
         def cleanup():
             os.unlink(path)
 
-        cwd = os.getcwd()
-        try:
-            # Work around path length limits in AF_UNIX
-            os.chdir(os.path.dirname(path))
-            self.server = self.loop.run_until_complete(
-                asyncio.start_unix_server(self.handle_client, os.path.basename(path), loop=self.loop)
-            )
-        finally:
-            os.chdir(cwd)
+        def start_unix():
+            cwd = os.getcwd()
+            try:
+                # Work around path length limits in AF_UNIX
+                os.chdir(os.path.dirname(path))
+                self.server = self.loop.run_until_complete(
+                    asyncio.start_unix_server(self.handle_client, os.path.basename(path))
+                )
+            finally:
+                os.chdir(cwd)
 
-        self.logger.debug('Listening on %r' % path)
+            self.logger.debug('Listening on %r' % path)
 
-        self._cleanup_socket = cleanup
-        self.address = "unix://%s" % os.path.abspath(path)
+            self._cleanup_socket = cleanup
+            self.address = "unix://%s" % os.path.abspath(path)
+
+        self.start = start_unix
 
     @abc.abstractmethod
     def accept_client(self, reader, writer):
@@ -205,8 +210,7 @@ class AsyncServer(object):
         self.logger.debug("Got exit signal")
         self.loop.stop()
 
-    def serve_forever(self):
-        asyncio.set_event_loop(self.loop)
+    def _serve_forever(self):
         try:
             self.loop.add_signal_handler(signal.SIGTERM, self.signal_handler)
             signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGTERM])
@@ -217,28 +221,50 @@ class AsyncServer(object):
             self.loop.run_until_complete(self.server.wait_closed())
             self.logger.debug('Server shutting down')
         finally:
-            if self.close_loop:
-                if sys.version_info >= (3, 6):
-                    self.loop.run_until_complete(self.loop.shutdown_asyncgens())
-                self.loop.close()
-
             if self._cleanup_socket is not None:
                 self._cleanup_socket()
 
+    def serve_forever(self):
+        """
+        Serve requests in the current process
+        """
+        self.start()
+        self._serve_forever()
+
     def serve_as_process(self, *, prefunc=None, args=()):
-        def run():
+        """
+        Serve requests in a child process
+        """
+        def run(queue):
+            try:
+                self.start()
+            finally:
+                queue.put(self.address)
+                queue.close()
+
             if prefunc is not None:
                 prefunc(self, *args)
-            self.serve_forever()
+
+            self._serve_forever()
+
+            if sys.version_info >= (3, 6):
+                self.loop.run_until_complete(self.loop.shutdown_asyncgens())
+            self.loop.close()
+
+        queue = multiprocessing.Queue()
 
         # Temporarily block SIGTERM. The server process will inherit this
         # block which will ensure it doesn't receive the SIGTERM until the
         # handler is ready for it
         mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGTERM])
         try:
-            self.process = multiprocessing.Process(target=run)
+            self.process = multiprocessing.Process(target=run, args=(queue,))
             self.process.start()
 
+            self.address = queue.get()
+            queue.close()
+            queue.join_thread()
+
             return self.process
         finally:
             signal.pthread_sigmask(signal.SIG_SETMASK, mask)
diff --git a/lib/hashserv/server.py b/lib/hashserv/server.py
index 8e849897..a059e521 100644
--- a/lib/hashserv/server.py
+++ b/lib/hashserv/server.py
@@ -410,11 +410,11 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
 
 
 class Server(bb.asyncrpc.AsyncServer):
-    def __init__(self, db, loop=None, upstream=None, read_only=False):
+    def __init__(self, db, upstream=None, read_only=False):
         if upstream and read_only:
             raise bb.asyncrpc.ServerError("Read-only hashserv cannot pull from an upstream server")
 
-        super().__init__(logger, loop)
+        super().__init__(logger)
 
         self.request_stats = Stats()
         self.db = db
-- 
2.31.1


  reply	other threads:[~2021-08-19 16:46 UTC|newest]

Thread overview: 8+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-08-19 16:46 [PATCH v6 0/4] Re-implement prserv on top of asyncrpc Scott Murray
2021-08-19 16:46 ` Scott Murray [this message]
2021-08-19 16:46 ` [PATCH v6 2/4] bitbake: asyncrpc: always create new asyncio loops Scott Murray
2021-08-19 19:58   ` Joshua Watt
2021-08-19 16:46 ` [PATCH v6 3/4] prserv: Replace XML RPC with modern asyncrpc implementation Scott Murray
2021-08-26 16:02   ` [bitbake-devel] " Martin Jansa
2021-08-26 19:17     ` Scott Murray
2021-08-19 16:46 ` [PATCH v6 4/4] prserv: Add read-only mode Scott Murray

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=22a4e6fc5913fdf78f5c93893b16472f0641ff29.1629388054.git.scott.murray@konsulko.com \
    --to=scott.murray@konsulko.com \
    --cc=JPEWhacker@gmail.com \
    --cc=bitbake-devel@lists.openembedded.org \
    --cc=paul@pbarker.dev \
    --cc=richard.purdie@linuxfoundation.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.