All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH v3 0/4] Re-implement prserv on top of asyncrpc
@ 2021-06-28  4:13 Paul Barker
  2021-06-28  4:13 ` [PATCH v3 1/4] asyncrpc: Wait on writers to close with Python 3.7+ Paul Barker
                   ` (3 more replies)
  0 siblings, 4 replies; 6+ messages in thread
From: Paul Barker @ 2021-06-28  4:13 UTC (permalink / raw)
  To: bitbake-devel, Richard Purdie, Joshua Watt; +Cc: Paul Barker

These changes replace the old XML-based RPC system in prserv with the
new asyncrpc implementation originally used by hashserv.

Changes from v2:
  * Dropped patches which are currently applied to master-next, this
    series should be applied on top of the current master-next branch.

  * Improved asyncio usage based on a trawl through the docs and various
    other online resources. The asyncio tutorials from Lynn Root on
    graceful shutdowns [1] and exception handling [2] were excellent and
    should probably be required reading for anyone using asyncio in
    anger.

    As can be seen in the first patch, Python 3.7+ is required for us to
    guarantee good behaviour from asyncio stream readers & writers.
    However, these changes have been written to support Python 3.5+ in
    line with bitbake's supported Python versions and I've tested this
    on Python 3.6 (OpenSUSE 15.3).

    Hopefully this should resolve the hangs we've seen on the
    autobuilder or at least bring to light any exceptions which may have
    been occuring in asyncrpc/prserv.

    [1]: https://www.roguelynn.com/words/asyncio-graceful-shutdowns/

    [2]: https://www.roguelynn.com/words/asyncio-exception-handling/

Changes from v1:
  * Drop obsolete getPR function in PRServer, this was replaced by
    handle_get_pr()

  * Add timeouts to catch bitbake hangs seen on the autobuilder. This should
    turn these hangs into errors which can be more reasonably investigated.

  * Add a minor error message fix I spotted.

Paul Barker (4):
  asyncrpc: Wait on writers to close with Python 3.7+
  asyncrpc: Ensure that asyncio shutdown is clean
  asyncrpc: Handle exceptions
  prserv: Replace XML RPC with modern asyncrpc implementation

 lib/bb/asyncrpc/client.py |   3 +
 lib/bb/asyncrpc/serv.py   |  41 ++++--
 lib/prserv/serv.py        | 263 ++++++++++++++++++++------------------
 3 files changed, 169 insertions(+), 138 deletions(-)

-- 
2.26.2


^ permalink raw reply	[flat|nested] 6+ messages in thread

* [PATCH v3 1/4] asyncrpc: Wait on writers to close with Python 3.7+
  2021-06-28  4:13 [PATCH v3 0/4] Re-implement prserv on top of asyncrpc Paul Barker
@ 2021-06-28  4:13 ` Paul Barker
  2021-06-28  4:13 ` [PATCH v3 2/4] asyncrpc: Ensure that asyncio shutdown is clean Paul Barker
                   ` (2 subsequent siblings)
  3 siblings, 0 replies; 6+ messages in thread
From: Paul Barker @ 2021-06-28  4:13 UTC (permalink / raw)
  To: bitbake-devel, Richard Purdie, Joshua Watt; +Cc: Paul Barker

The close() method of an asyncio stream writer is not synchronous and so
may return before the writer has finished closing. In Python 3.7 the
wait_closed() method was added which can be used to ensure that a writer
has finished closing before continuing. Sadly in Python 3.6 and earlier
there is no way to guarantee that a writer has finished closing before
moving on.

Signed-off-by: Paul Barker <pbarker@konsulko.com>
---
 lib/bb/asyncrpc/client.py | 3 +++
 lib/bb/asyncrpc/serv.py   | 4 ++++
 2 files changed, 7 insertions(+)

diff --git a/lib/bb/asyncrpc/client.py b/lib/bb/asyncrpc/client.py
index 3eb4fdde8..82b6068ab 100644
--- a/lib/bb/asyncrpc/client.py
+++ b/lib/bb/asyncrpc/client.py
@@ -7,6 +7,7 @@ import asyncio
 import json
 import os
 import socket
+import sys
 from . import chunkify, DEFAULT_MAX_CHUNK
 
 
@@ -47,6 +48,8 @@ class AsyncClient(object):
 
         if self.writer is not None:
             self.writer.close()
+            if sys.version_info >= (3, 7):
+                await self.writer.wait_closed()
             self.writer = None
 
     async def _send_wrapper(self, proc):
diff --git a/lib/bb/asyncrpc/serv.py b/lib/bb/asyncrpc/serv.py
index ef20cb71d..d6e3df2c5 100644
--- a/lib/bb/asyncrpc/serv.py
+++ b/lib/bb/asyncrpc/serv.py
@@ -74,6 +74,8 @@ class AsyncServerConnection(object):
             self.logger.error(str(e))
         finally:
             self.writer.close()
+            if sys.version_info >= (3, 7):
+                await self.writer.wait_closed()
 
     async def dispatch_message(self, msg):
         for k in self.handlers.keys():
@@ -192,6 +194,8 @@ class AsyncServer(object):
             self.logger.error('Error from client: %s' % str(e), exc_info=True)
             traceback.print_exc()
             writer.close()
+            if sys.version_info >= (3, 7):
+                await writer.wait_closed()
         self.logger.debug('Client disconnected')
 
     def run_loop_forever(self):
-- 
2.26.2


^ permalink raw reply related	[flat|nested] 6+ messages in thread

* [PATCH v3 2/4] asyncrpc: Ensure that asyncio shutdown is clean
  2021-06-28  4:13 [PATCH v3 0/4] Re-implement prserv on top of asyncrpc Paul Barker
  2021-06-28  4:13 ` [PATCH v3 1/4] asyncrpc: Wait on writers to close with Python 3.7+ Paul Barker
@ 2021-06-28  4:13 ` Paul Barker
  2021-07-01 15:59   ` Joshua Watt
  2021-06-28  4:13 ` [PATCH v3 3/4] asyncrpc: Handle exceptions Paul Barker
  2021-06-28  4:13 ` [PATCH v3 4/4] prserv: Replace XML RPC with modern asyncrpc implementation Paul Barker
  3 siblings, 1 reply; 6+ messages in thread
From: Paul Barker @ 2021-06-28  4:13 UTC (permalink / raw)
  To: bitbake-devel, Richard Purdie, Joshua Watt; +Cc: Paul Barker

We should ensure that all async tasks are cancelled and then allowed to
finish gracefully before closing the asyncio loop.

Signed-off-by: Paul Barker <pbarker@konsulko.com>
---
 lib/bb/asyncrpc/serv.py | 33 +++++++++++++++++++++------------
 1 file changed, 21 insertions(+), 12 deletions(-)

diff --git a/lib/bb/asyncrpc/serv.py b/lib/bb/asyncrpc/serv.py
index d6e3df2c5..0cdae5327 100644
--- a/lib/bb/asyncrpc/serv.py
+++ b/lib/bb/asyncrpc/serv.py
@@ -198,25 +198,34 @@ class AsyncServer(object):
                 await writer.wait_closed()
         self.logger.debug('Client disconnected')
 
-    def run_loop_forever(self):
-        try:
-            self.loop.run_forever()
-        except KeyboardInterrupt:
-            pass
-
-    def signal_handler(self):
+    def handle_signal(self):
+        self.loop.create_task(self.shutdown())
+
+    async def shutdown(self):
+        self.logger.debug('Server shutting down')
+
+        # Stop accepting connections
+        self.server.close()
+        await self.server.wait_closed()
+
+        # Cancel all active tasks
+        tasks = [t for t in asyncio.Task.all_tasks(self.loop)
+                 if t is not asyncio.Task.current_task(self.loop)]
+        for task in tasks:
+            task.cancel()
+        await asyncio.gather(*tasks, return_exceptions=True)
         self.loop.stop()
 
+    def run_loop_forever(self):
+        self.loop.run_forever()
+
     def serve_forever(self):
         asyncio.set_event_loop(self.loop)
         try:
-            self.loop.add_signal_handler(signal.SIGTERM, self.signal_handler)
+            self.loop.add_signal_handler(signal.SIGTERM, self.handle_signal)
+            self.loop.add_signal_handler(signal.SIGINT, self.handle_signal)
 
             self.run_loop_forever()
-            self.server.close()
-
-            self.loop.run_until_complete(self.server.wait_closed())
-            self.logger.debug('Server shutting down')
         finally:
             if self.close_loop:
                 if sys.version_info >= (3, 6):
-- 
2.26.2


^ permalink raw reply related	[flat|nested] 6+ messages in thread

* [PATCH v3 3/4] asyncrpc: Handle exceptions
  2021-06-28  4:13 [PATCH v3 0/4] Re-implement prserv on top of asyncrpc Paul Barker
  2021-06-28  4:13 ` [PATCH v3 1/4] asyncrpc: Wait on writers to close with Python 3.7+ Paul Barker
  2021-06-28  4:13 ` [PATCH v3 2/4] asyncrpc: Ensure that asyncio shutdown is clean Paul Barker
@ 2021-06-28  4:13 ` Paul Barker
  2021-06-28  4:13 ` [PATCH v3 4/4] prserv: Replace XML RPC with modern asyncrpc implementation Paul Barker
  3 siblings, 0 replies; 6+ messages in thread
From: Paul Barker @ 2021-06-28  4:13 UTC (permalink / raw)
  To: bitbake-devel, Richard Purdie, Joshua Watt; +Cc: Paul Barker

If an async task raises an exception it may cause the asyncio loop to
hang or may otherwise leave our server in a bad state. To avoid these
issues we can add an exception handler to the asyncio loop which
attempts to gracefully shut down the server if an unhandled exception
occurs.

Signed-off-by: Paul Barker <pbarker@konsulko.com>
---
 lib/bb/asyncrpc/serv.py | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/lib/bb/asyncrpc/serv.py b/lib/bb/asyncrpc/serv.py
index 0cdae5327..b4dd6d080 100644
--- a/lib/bb/asyncrpc/serv.py
+++ b/lib/bb/asyncrpc/serv.py
@@ -198,6 +198,11 @@ class AsyncServer(object):
                 await writer.wait_closed()
         self.logger.debug('Client disconnected')
 
+    def handle_exception(self, loop, context):
+        msg = context.get("exception", context["message"])
+        self.logger.error("Caught exception: %s" % (msg))
+        loop.create_task(self.shutdown())
+
     def handle_signal(self):
         self.loop.create_task(self.shutdown())
 
@@ -222,6 +227,7 @@ class AsyncServer(object):
     def serve_forever(self):
         asyncio.set_event_loop(self.loop)
         try:
+            self.loop.set_exception_handler(self.handle_exception)
             self.loop.add_signal_handler(signal.SIGTERM, self.handle_signal)
             self.loop.add_signal_handler(signal.SIGINT, self.handle_signal)
 
-- 
2.26.2


^ permalink raw reply related	[flat|nested] 6+ messages in thread

* [PATCH v3 4/4] prserv: Replace XML RPC with modern asyncrpc implementation
  2021-06-28  4:13 [PATCH v3 0/4] Re-implement prserv on top of asyncrpc Paul Barker
                   ` (2 preceding siblings ...)
  2021-06-28  4:13 ` [PATCH v3 3/4] asyncrpc: Handle exceptions Paul Barker
@ 2021-06-28  4:13 ` Paul Barker
  3 siblings, 0 replies; 6+ messages in thread
From: Paul Barker @ 2021-06-28  4:13 UTC (permalink / raw)
  To: bitbake-devel, Richard Purdie, Joshua Watt; +Cc: Paul Barker

Update the prserv client and server classes to use the modern json and
asyncio based RPC system implemented by the asyncrpc module.

Signed-off-by: Paul Barker <pbarker@konsulko.com>
---
 lib/prserv/serv.py | 263 +++++++++++++++++++++++----------------------
 1 file changed, 136 insertions(+), 127 deletions(-)

diff --git a/lib/prserv/serv.py b/lib/prserv/serv.py
index 5e322bf83..124d6b1fc 100644
--- a/lib/prserv/serv.py
+++ b/lib/prserv/serv.py
@@ -4,157 +4,160 @@
 
 import os,sys,logging
 import signal, time
-from xmlrpc.server import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler
 import socket
 import io
 import sqlite3
-import bb.server.xmlrpcclient
 import prserv
 import prserv.db
 import errno
 import multiprocessing
+import bb.asyncrpc
 
 logger = logging.getLogger("BitBake.PRserv")
 
-class Handler(SimpleXMLRPCRequestHandler):
-    def _dispatch(self,method,params):
-        try:
-            value=self.server.funcs[method](*params)
-        except:
-            import traceback
-            traceback.print_exc()
-            raise
-        return value
-
 PIDPREFIX = "/tmp/PRServer_%s_%s.pid"
 singleton = None
 
+class PRServerClient(bb.asyncrpc.AsyncServerConnection):
+    def __init__(self, reader, writer, table):
+        super().__init__(reader, writer, 'PRSERVICE', logger)
+        self.handlers.update({
+            'get-pr': self.handle_get_pr,
+            'import-one': self.handle_import_one,
+            'export': self.handle_export,
+        })
+        self.table = table
 
-class PRServer(SimpleXMLRPCServer):
-    def __init__(self, dbfile, logfile, interface):
-        ''' constructor '''
-        try:
-            SimpleXMLRPCServer.__init__(self, interface,
-                                        logRequests=False, allow_none=True)
-        except socket.error:
-            ip=socket.gethostbyname(interface[0])
-            port=interface[1]
-            msg="PR Server unable to bind to %s:%s\n" % (ip, port)
-            sys.stderr.write(msg)
-            raise PRServiceConfigError
-
-        self.dbfile=dbfile
-        self.logfile=logfile
-        self.host, self.port = self.socket.getsockname()
-
-        self.register_function(self.getPR, "getPR")
-        self.register_function(self.ping, "ping")
-        self.register_function(self.export, "export")
-        self.register_function(self.importone, "importone")
-        self.register_introspection_functions()
-
-        self.iter_count = 0
-        # 60 iterations between syncs or sync if dirty every ~30 seconds
-        self.iterations_between_sync = 60
-
-    def sigint_handler(self, signum, stack):
-        if self.table:
-            self.table.sync()
-
-    def sigterm_handler(self, signum, stack):
-        if self.table:
-            self.table.sync()
-        raise(SystemExit)
+    def validate_proto_version(self):
+        return (self.proto_version == (1, 0))
 
-    def process_request(self, request, client_address):
-        if request is None:
-            return
+    async def dispatch_message(self, msg):
         try:
-            self.finish_request(request, client_address)
-            self.shutdown_request(request)
-            self.iter_count = (self.iter_count + 1) % self.iterations_between_sync
-            if self.iter_count == 0:
-                self.table.sync_if_dirty()
+            await super().dispatch_message(msg)
         except:
-            self.handle_error(request, client_address)
-            self.shutdown_request(request)
             self.table.sync()
-        self.table.sync_if_dirty()
+            raise
 
-    def serve_forever(self, poll_interval=0.5):
-        signal.signal(signal.SIGINT, self.sigint_handler)
-        signal.signal(signal.SIGTERM, self.sigterm_handler)
+        self.table.sync_if_dirty()
 
-        self.db = prserv.db.PRData(self.dbfile)
-        self.table = self.db["PRMAIN"]
-        return super().serve_forever(poll_interval)
+    async def handle_get_pr(self, request):
+        version = request['version']
+        pkgarch = request['pkgarch']
+        checksum = request['checksum']
 
-    def export(self, version=None, pkgarch=None, checksum=None, colinfo=True):
+        response = None
         try:
-            return self.table.export(version, pkgarch, checksum, colinfo)
+            value = self.table.getValue(version, pkgarch, checksum)
+            response = {'value': value}
+        except prserv.NotFoundError:
+            logger.error("can not find value for (%s, %s)",version, checksum)
         except sqlite3.Error as exc:
             logger.error(str(exc))
-            return None
 
-    def importone(self, version, pkgarch, checksum, value):
-        return self.table.importone(version, pkgarch, checksum, value)
+        self.write_message(response)
 
-    def ping(self):
-        return True
+    async def handle_import_one(self, request):
+        version = request['version']
+        pkgarch = request['pkgarch']
+        checksum = request['checksum']
+        value = request['value']
 
-    def getinfo(self):
-        return (self.host, self.port)
+        value = self.table.importone(version, pkgarch, checksum, value)
+        if value is not None:
+            response = {'value': value}
+        else:
+            response = None
+        self.write_message(response)
+
+    async def handle_export(self, request):
+        version = request['version']
+        pkgarch = request['pkgarch']
+        checksum = request['checksum']
+        colinfo = request['colinfo']
 
-    def getPR(self, version, pkgarch, checksum):
         try:
-            return self.table.getValue(version, pkgarch, checksum)
-        except prserv.NotFoundError:
-            logger.error("can not find value for (%s, %s)",version, checksum)
-            return None
+            (metainfo, datainfo) = self.table.export(version, pkgarch, checksum, colinfo)
         except sqlite3.Error as exc:
             logger.error(str(exc))
-            return None
+            metainfo = datainfo = None
 
-class PRServSingleton(object):
-    def __init__(self, dbfile, logfile, interface):
+        response = {'metainfo': metainfo, 'datainfo': datainfo}
+        self.write_message(response)
+
+class PRServer(bb.asyncrpc.AsyncServer):
+    def __init__(self, dbfile, loop=None):
+        super().__init__(logger, loop)
         self.dbfile = dbfile
-        self.logfile = logfile
-        self.interface = interface
-        self.host = None
-        self.port = None
+        self.table = None
 
-    def start(self):
-        self.prserv = PRServer(self.dbfile, self.logfile, self.interface)
-        self.process = multiprocessing.Process(target=self.prserv.serve_forever)
-        self.process.start()
+    def accept_client(self, reader, writer):
+        return PRServerClient(reader, writer, self.table)
 
-        self.host, self.port = self.prserv.getinfo()
+    def serve_forever(self):
+        self.db = prserv.db.PRData(self.dbfile)
+        self.table = self.db["PRMAIN"]
 
-    def getinfo(self):
-        return (self.host, self.port)
+        logger.debug("Started PRServer with DBfile: %s, Address: %s, PID: %s" %
+                     (self.dbfile, self.address, str(os.getpid())))
 
-class PRServerConnection(object):
-    def __init__(self, host, port):
-        if is_local_special(host, port):
-            host, port = singleton.getinfo()
-        self.host = host
-        self.port = port
-        self.connection, self.transport = bb.server.xmlrpcclient._create_server(self.host, self.port)
+        super().serve_forever()
 
-    def getPR(self, version, pkgarch, checksum):
-        return self.connection.getPR(version, pkgarch, checksum)
+        self.table.sync_if_dirty()
+        self.db.disconnect()
 
-    def ping(self):
-        return self.connection.ping()
+    def signal_handler(self):
+        super().signal_handler()
+        if self.table:
+            self.table.sync()
 
-    def export(self,version=None, pkgarch=None, checksum=None, colinfo=True):
-        return self.connection.export(version, pkgarch, checksum, colinfo)
+class PRServSingleton(object):
+    def __init__(self, dbfile, logfile, host, port):
+        self.dbfile = dbfile
+        self.logfile = logfile
+        self.host = host
+        self.port = port
 
-    def importone(self, version, pkgarch, checksum, value):
-        return self.connection.importone(version, pkgarch, checksum, value)
+    def start(self):
+        self.prserv = PRServer(self.dbfile)
+        self.prserv.start_tcp_server(self.host, self.port)
+        self.process = multiprocessing.Process(target=self.prserv.serve_forever)
+        self.process.start()
 
-    def getinfo(self):
-        return self.host, self.port
+        if not self.port:
+            self.port = int(self.prserv.address.rsplit(':', 1)[1])
+
+class PRAsyncClient(bb.asyncrpc.AsyncClient):
+    def __init__(self):
+        super().__init__('PRSERVICE', '1.0', logger)
+
+    async def getPR(self, version, pkgarch, checksum):
+        response = await self.send_message(
+            {'get-pr': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum}}
+        )
+        if response:
+            return response['value']
+
+    async def importone(self, version, pkgarch, checksum, value):
+        response = await self.send_message(
+            {'import-one': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum, 'value': value}}
+        )
+        if response:
+            return response['value']
+
+    async def export(self, version, pkgarch, checksum, colinfo):
+        response = await self.send_message(
+            {'export': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum, 'colinfo': colinfo}}
+        )
+        if response:
+            return (response['metainfo'], response['datainfo'])
+
+class PRClient(bb.asyncrpc.Client):
+    def __init__(self):
+        super().__init__()
+        self._add_methods('getPR', 'importone', 'export')
+
+    def _get_async_client(self):
+        return PRAsyncClient()
 
 def run_as_daemon(func, pidfile, logfile):
     """
@@ -240,15 +243,13 @@ def start_daemon(dbfile, host, port, logfile):
                             % pidfile)
         return 1
 
-    server = PRServer(os.path.abspath(dbfile), os.path.abspath(logfile), (ip,port))
-    run_as_daemon(server.serve_forever, pidfile, os.path.abspath(logfile))
+    dbfile = os.path.abspath(dbfile)
+    def daemon_main():
+        server = PRServer(dbfile)
+        server.start_tcp_server(host, port)
+        server.serve_forever()
 
-    # Sometimes, the port (i.e. localhost:0) indicated by the user does not match with
-    # the one the server actually is listening, so at least warn the user about it
-    _,rport = server.getinfo()
-    if port != rport:
-        sys.stdout.write("Server is listening at port %s instead of %s\n"
-                         % (rport,port))
+    run_as_daemon(daemon_main, pidfile, os.path.abspath(logfile))
     return 0
 
 def stop_daemon(host, port):
@@ -302,7 +303,7 @@ def is_running(pid):
     return True
 
 def is_local_special(host, port):
-    if host.strip().upper() == 'localhost'.upper() and (not port):
+    if host.strip().lower() == 'localhost' and not port:
         return True
     else:
         return False
@@ -340,20 +341,19 @@ def auto_start(d):
                auto_shutdown()
         if not singleton:
             bb.utils.mkdirhier(cachedir)
-            singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), ("localhost",0))
+            singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), "localhost", 0)
             singleton.start()
     if singleton:
-        host, port = singleton.getinfo()
+        host = singleton.host
+        port = singleton.port
     else:
         host = host_params[0]
         port = int(host_params[1])
 
     try:
-        connection = PRServerConnection(host,port)
-        connection.ping()
-        realhost, realport = connection.getinfo()
-        return str(realhost) + ":" + str(realport)
-        
+        ping(host, port)
+        return str(host) + ":" + str(port)
+
     except Exception:
         logger.critical("PRservice %s:%d not available" % (host, port))
         raise PRServiceConfigError
@@ -366,8 +366,17 @@ def auto_shutdown():
         singleton = None
 
 def ping(host, port):
-    conn=PRServerConnection(host, port)
+    conn=PRClient()
+    conn.connect_tcp(host, port)
     return conn.ping()
 
 def connect(host, port):
-    return PRServerConnection(host, port)
+    global singleton
+
+    if host.strip().lower() == 'localhost' and not port:
+        host = 'localhost'
+        port = singleton.port
+
+    conn = PRClient()
+    conn.connect_tcp(host, port)
+    return conn
-- 
2.26.2


^ permalink raw reply related	[flat|nested] 6+ messages in thread

* Re: [PATCH v3 2/4] asyncrpc: Ensure that asyncio shutdown is clean
  2021-06-28  4:13 ` [PATCH v3 2/4] asyncrpc: Ensure that asyncio shutdown is clean Paul Barker
@ 2021-07-01 15:59   ` Joshua Watt
  0 siblings, 0 replies; 6+ messages in thread
From: Joshua Watt @ 2021-07-01 15:59 UTC (permalink / raw)
  To: Paul Barker, bitbake-devel, Richard Purdie


On 6/27/21 11:13 PM, Paul Barker wrote:
> We should ensure that all async tasks are cancelled and then allowed to
> finish gracefully before closing the asyncio loop.
>
> Signed-off-by: Paul Barker <pbarker@konsulko.com>
> ---
>   lib/bb/asyncrpc/serv.py | 33 +++++++++++++++++++++------------
>   1 file changed, 21 insertions(+), 12 deletions(-)
>
> diff --git a/lib/bb/asyncrpc/serv.py b/lib/bb/asyncrpc/serv.py
> index d6e3df2c5..0cdae5327 100644
> --- a/lib/bb/asyncrpc/serv.py
> +++ b/lib/bb/asyncrpc/serv.py
> @@ -198,25 +198,34 @@ class AsyncServer(object):
>                   await writer.wait_closed()
>           self.logger.debug('Client disconnected')
>   
> -    def run_loop_forever(self):
> -        try:
> -            self.loop.run_forever()
> -        except KeyboardInterrupt:
> -            pass
> -
> -    def signal_handler(self):
> +    def handle_signal(self):
> +        self.loop.create_task(self.shutdown())
> +
> +    async def shutdown(self):
> +        self.logger.debug('Server shutting down')
> +
> +        # Stop accepting connections
> +        self.server.close()
> +        await self.server.wait_closed()
> +
> +        # Cancel all active tasks
> +        tasks = [t for t in asyncio.Task.all_tasks(self.loop)
> +                 if t is not asyncio.Task.current_task(self.loop)]
> +        for task in tasks:
> +            task.cancel()
> +        await asyncio.gather(*tasks, return_exceptions=True)

Do we need to clean up all the client sockets here with .close() 
.wait_closed() like we do with the server socket?


>           self.loop.stop()
>   
> +    def run_loop_forever(self):
> +        self.loop.run_forever()
> +
>       def serve_forever(self):
>           asyncio.set_event_loop(self.loop)
>           try:
> -            self.loop.add_signal_handler(signal.SIGTERM, self.signal_handler)
> +            self.loop.add_signal_handler(signal.SIGTERM, self.handle_signal)
> +            self.loop.add_signal_handler(signal.SIGINT, self.handle_signal)
>   
>               self.run_loop_forever()
> -            self.server.close()
> -
> -            self.loop.run_until_complete(self.server.wait_closed())
> -            self.logger.debug('Server shutting down')
>           finally:
>               if self.close_loop:
>                   if sys.version_info >= (3, 6):

^ permalink raw reply	[flat|nested] 6+ messages in thread

end of thread, other threads:[~2021-07-01 15:59 UTC | newest]

Thread overview: 6+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-06-28  4:13 [PATCH v3 0/4] Re-implement prserv on top of asyncrpc Paul Barker
2021-06-28  4:13 ` [PATCH v3 1/4] asyncrpc: Wait on writers to close with Python 3.7+ Paul Barker
2021-06-28  4:13 ` [PATCH v3 2/4] asyncrpc: Ensure that asyncio shutdown is clean Paul Barker
2021-07-01 15:59   ` Joshua Watt
2021-06-28  4:13 ` [PATCH v3 3/4] asyncrpc: Handle exceptions Paul Barker
2021-06-28  4:13 ` [PATCH v3 4/4] prserv: Replace XML RPC with modern asyncrpc implementation Paul Barker

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.