All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH v2 0/4] Re-implement prserv on top of asyncrpc
@ 2021-06-07 15:03 Paul Barker
  2021-06-07 15:03 ` [PATCH v2 1/4] asyncrpc: Fix bad message error in client Paul Barker
                   ` (3 more replies)
  0 siblings, 4 replies; 5+ messages in thread
From: Paul Barker @ 2021-06-07 15:03 UTC (permalink / raw)
  To: bitbake-devel, Richard Purdie, Joshua Watt; +Cc: Paul Barker

These changes replace the old XML-based RPC system in prserv with the
new asyncrpc implementation originally used by hashserv.

Changes from v1:
  * Drop obsolete getPR function in PRServer, this was replaced by
    handle_get_pr()

  * Add timeouts to catch bitbake hangs seen on the autobuilder. This should
    turn these hangs into errors which can be more reasonably investigated.

  * Add a minor error message fix I spotted.

Paul Barker (4):
  asyncrpc: Fix bad message error in client
  asyncrpc: Set timeout when waiting for reply from server
  prserv: Add 15s timeout to auto shutdown
  prserv: Replace XML RPC with modern asyncrpc implementation

 lib/bb/asyncrpc/client.py |  11 +-
 lib/prserv/serv.py        | 267 ++++++++++++++++++++------------------
 2 files changed, 147 insertions(+), 131 deletions(-)

-- 
2.26.2


^ permalink raw reply	[flat|nested] 5+ messages in thread

* [PATCH v2 1/4] asyncrpc: Fix bad message error in client
  2021-06-07 15:03 [PATCH v2 0/4] Re-implement prserv on top of asyncrpc Paul Barker
@ 2021-06-07 15:03 ` Paul Barker
  2021-06-07 15:03 ` [PATCH v2 2/4] asyncrpc: Set timeout when waiting for reply from server Paul Barker
                   ` (2 subsequent siblings)
  3 siblings, 0 replies; 5+ messages in thread
From: Paul Barker @ 2021-06-07 15:03 UTC (permalink / raw)
  To: bitbake-devel, Richard Purdie, Joshua Watt; +Cc: Paul Barker

If there is an issue with the format of the reply given by the server then we
should print this reply line in the error message. Printing the message which
the client sent doesn't illuminate anything here.

Signed-off-by: Paul Barker <pbarker@konsulko.com>
---
 lib/bb/asyncrpc/client.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/lib/bb/asyncrpc/client.py b/lib/bb/asyncrpc/client.py
index 79919c5be..d917ab597 100644
--- a/lib/bb/asyncrpc/client.py
+++ b/lib/bb/asyncrpc/client.py
@@ -77,7 +77,7 @@ class AsyncClient(object):
             line = line.decode("utf-8")
 
             if not line.endswith("\n"):
-                raise ConnectionError("Bad message %r" % msg)
+                raise ConnectionError("Bad message %r" % (line))
 
             return line
 
-- 
2.26.2


^ permalink raw reply related	[flat|nested] 5+ messages in thread

* [PATCH v2 2/4] asyncrpc: Set timeout when waiting for reply from server
  2021-06-07 15:03 [PATCH v2 0/4] Re-implement prserv on top of asyncrpc Paul Barker
  2021-06-07 15:03 ` [PATCH v2 1/4] asyncrpc: Fix bad message error in client Paul Barker
@ 2021-06-07 15:03 ` Paul Barker
  2021-06-07 15:03 ` [PATCH v2 3/4] prserv: Add 15s timeout to auto shutdown Paul Barker
  2021-06-07 15:03 ` [PATCH v2 4/4] prserv: Replace XML RPC with modern asyncrpc implementation Paul Barker
  3 siblings, 0 replies; 5+ messages in thread
From: Paul Barker @ 2021-06-07 15:03 UTC (permalink / raw)
  To: bitbake-devel, Richard Purdie, Joshua Watt; +Cc: Paul Barker

Signed-off-by: Paul Barker <pbarker@konsulko.com>
---
 lib/bb/asyncrpc/client.py | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git a/lib/bb/asyncrpc/client.py b/lib/bb/asyncrpc/client.py
index d917ab597..3eb4fdde8 100644
--- a/lib/bb/asyncrpc/client.py
+++ b/lib/bb/asyncrpc/client.py
@@ -11,13 +11,14 @@ from . import chunkify, DEFAULT_MAX_CHUNK
 
 
 class AsyncClient(object):
-    def __init__(self, proto_name, proto_version, logger):
+    def __init__(self, proto_name, proto_version, logger, timeout=30):
         self.reader = None
         self.writer = None
         self.max_chunk = DEFAULT_MAX_CHUNK
         self.proto_name = proto_name
         self.proto_version = proto_version
         self.logger = logger
+        self.timeout = timeout
 
     async def connect_tcp(self, address, port):
         async def connect_sock():
@@ -70,7 +71,11 @@ class AsyncClient(object):
 
     async def send_message(self, msg):
         async def get_line():
-            line = await self.reader.readline()
+            try:
+                line = await asyncio.wait_for(self.reader.readline(), self.timeout)
+            except asyncio.TimeoutError:
+                raise ConnectionError("Timed out waiting for server")
+
             if not line:
                 raise ConnectionError("Connection closed")
 
-- 
2.26.2


^ permalink raw reply related	[flat|nested] 5+ messages in thread

* [PATCH v2 3/4] prserv: Add 15s timeout to auto shutdown
  2021-06-07 15:03 [PATCH v2 0/4] Re-implement prserv on top of asyncrpc Paul Barker
  2021-06-07 15:03 ` [PATCH v2 1/4] asyncrpc: Fix bad message error in client Paul Barker
  2021-06-07 15:03 ` [PATCH v2 2/4] asyncrpc: Set timeout when waiting for reply from server Paul Barker
@ 2021-06-07 15:03 ` Paul Barker
  2021-06-07 15:03 ` [PATCH v2 4/4] prserv: Replace XML RPC with modern asyncrpc implementation Paul Barker
  3 siblings, 0 replies; 5+ messages in thread
From: Paul Barker @ 2021-06-07 15:03 UTC (permalink / raw)
  To: bitbake-devel, Richard Purdie, Joshua Watt; +Cc: Paul Barker

This timeout prevents bitbake from hanging if the prservice process
fails to terminate within a reasonable time period.

Signed-off-by: Paul Barker <pbarker@konsulko.com>
---
 lib/prserv/serv.py | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/lib/prserv/serv.py b/lib/prserv/serv.py
index 5e322bf83..50d7cb694 100644
--- a/lib/prserv/serv.py
+++ b/lib/prserv/serv.py
@@ -362,7 +362,9 @@ def auto_shutdown():
     global singleton
     if singleton and singleton.process:
         singleton.process.terminate()
-        singleton.process.join()
+        singleton.process.join(15)
+        if singleton.process.exitcode is None:
+            logger.error("PRservice shutdown timed out")
         singleton = None
 
 def ping(host, port):
-- 
2.26.2


^ permalink raw reply related	[flat|nested] 5+ messages in thread

* [PATCH v2 4/4] prserv: Replace XML RPC with modern asyncrpc implementation
  2021-06-07 15:03 [PATCH v2 0/4] Re-implement prserv on top of asyncrpc Paul Barker
                   ` (2 preceding siblings ...)
  2021-06-07 15:03 ` [PATCH v2 3/4] prserv: Add 15s timeout to auto shutdown Paul Barker
@ 2021-06-07 15:03 ` Paul Barker
  3 siblings, 0 replies; 5+ messages in thread
From: Paul Barker @ 2021-06-07 15:03 UTC (permalink / raw)
  To: bitbake-devel, Richard Purdie, Joshua Watt; +Cc: Paul Barker

Update the prserv client and server classes to use the modern json and
asyncio based RPC system implemented by the asyncrpc module.

Signed-off-by: Paul Barker <pbarker@konsulko.com>
---
 lib/prserv/serv.py | 263 +++++++++++++++++++++++----------------------
 1 file changed, 136 insertions(+), 127 deletions(-)

diff --git a/lib/prserv/serv.py b/lib/prserv/serv.py
index 50d7cb694..106040556 100644
--- a/lib/prserv/serv.py
+++ b/lib/prserv/serv.py
@@ -4,157 +4,160 @@
 
 import os,sys,logging
 import signal, time
-from xmlrpc.server import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler
 import socket
 import io
 import sqlite3
-import bb.server.xmlrpcclient
 import prserv
 import prserv.db
 import errno
 import multiprocessing
+import bb.asyncrpc
 
 logger = logging.getLogger("BitBake.PRserv")
 
-class Handler(SimpleXMLRPCRequestHandler):
-    def _dispatch(self,method,params):
-        try:
-            value=self.server.funcs[method](*params)
-        except:
-            import traceback
-            traceback.print_exc()
-            raise
-        return value
-
 PIDPREFIX = "/tmp/PRServer_%s_%s.pid"
 singleton = None
 
+class PRServerClient(bb.asyncrpc.AsyncServerConnection):
+    def __init__(self, reader, writer, table):
+        super().__init__(reader, writer, 'PRSERVICE', logger)
+        self.handlers.update({
+            'get-pr': self.handle_get_pr,
+            'import-one': self.handle_import_one,
+            'export': self.handle_export,
+        })
+        self.table = table
 
-class PRServer(SimpleXMLRPCServer):
-    def __init__(self, dbfile, logfile, interface):
-        ''' constructor '''
-        try:
-            SimpleXMLRPCServer.__init__(self, interface,
-                                        logRequests=False, allow_none=True)
-        except socket.error:
-            ip=socket.gethostbyname(interface[0])
-            port=interface[1]
-            msg="PR Server unable to bind to %s:%s\n" % (ip, port)
-            sys.stderr.write(msg)
-            raise PRServiceConfigError
-
-        self.dbfile=dbfile
-        self.logfile=logfile
-        self.host, self.port = self.socket.getsockname()
-
-        self.register_function(self.getPR, "getPR")
-        self.register_function(self.ping, "ping")
-        self.register_function(self.export, "export")
-        self.register_function(self.importone, "importone")
-        self.register_introspection_functions()
-
-        self.iter_count = 0
-        # 60 iterations between syncs or sync if dirty every ~30 seconds
-        self.iterations_between_sync = 60
-
-    def sigint_handler(self, signum, stack):
-        if self.table:
-            self.table.sync()
-
-    def sigterm_handler(self, signum, stack):
-        if self.table:
-            self.table.sync()
-        raise(SystemExit)
+    def validate_proto_version(self):
+        return (self.proto_version == (1, 0))
 
-    def process_request(self, request, client_address):
-        if request is None:
-            return
+    async def dispatch_message(self, msg):
         try:
-            self.finish_request(request, client_address)
-            self.shutdown_request(request)
-            self.iter_count = (self.iter_count + 1) % self.iterations_between_sync
-            if self.iter_count == 0:
-                self.table.sync_if_dirty()
+            await super().dispatch_message(msg)
         except:
-            self.handle_error(request, client_address)
-            self.shutdown_request(request)
             self.table.sync()
-        self.table.sync_if_dirty()
+            raise
 
-    def serve_forever(self, poll_interval=0.5):
-        signal.signal(signal.SIGINT, self.sigint_handler)
-        signal.signal(signal.SIGTERM, self.sigterm_handler)
+        self.table.sync_if_dirty()
 
-        self.db = prserv.db.PRData(self.dbfile)
-        self.table = self.db["PRMAIN"]
-        return super().serve_forever(poll_interval)
+    async def handle_get_pr(self, request):
+        version = request['version']
+        pkgarch = request['pkgarch']
+        checksum = request['checksum']
 
-    def export(self, version=None, pkgarch=None, checksum=None, colinfo=True):
+        response = None
         try:
-            return self.table.export(version, pkgarch, checksum, colinfo)
+            value = self.table.getValue(version, pkgarch, checksum)
+            response = {'value': value}
+        except prserv.NotFoundError:
+            logger.error("can not find value for (%s, %s)",version, checksum)
         except sqlite3.Error as exc:
             logger.error(str(exc))
-            return None
 
-    def importone(self, version, pkgarch, checksum, value):
-        return self.table.importone(version, pkgarch, checksum, value)
+        self.write_message(response)
 
-    def ping(self):
-        return True
+    async def handle_import_one(self, request):
+        version = request['version']
+        pkgarch = request['pkgarch']
+        checksum = request['checksum']
+        value = request['value']
 
-    def getinfo(self):
-        return (self.host, self.port)
+        value = self.table.importone(version, pkgarch, checksum, value)
+        if value is not None:
+            response = {'value': value}
+        else:
+            response = None
+        self.write_message(response)
+
+    async def handle_export(self, request):
+        version = request['version']
+        pkgarch = request['pkgarch']
+        checksum = request['checksum']
+        colinfo = request['colinfo']
 
-    def getPR(self, version, pkgarch, checksum):
         try:
-            return self.table.getValue(version, pkgarch, checksum)
-        except prserv.NotFoundError:
-            logger.error("can not find value for (%s, %s)",version, checksum)
-            return None
+            (metainfo, datainfo) = self.table.export(version, pkgarch, checksum, colinfo)
         except sqlite3.Error as exc:
             logger.error(str(exc))
-            return None
+            metainfo = datainfo = None
 
-class PRServSingleton(object):
-    def __init__(self, dbfile, logfile, interface):
+        response = {'metainfo': metainfo, 'datainfo': datainfo}
+        self.write_message(response)
+
+class PRServer(bb.asyncrpc.AsyncServer):
+    def __init__(self, dbfile, loop=None):
+        super().__init__(logger, loop)
         self.dbfile = dbfile
-        self.logfile = logfile
-        self.interface = interface
-        self.host = None
-        self.port = None
+        self.table = None
 
-    def start(self):
-        self.prserv = PRServer(self.dbfile, self.logfile, self.interface)
-        self.process = multiprocessing.Process(target=self.prserv.serve_forever)
-        self.process.start()
+    def accept_client(self, reader, writer):
+        return PRServerClient(reader, writer, self.table)
 
-        self.host, self.port = self.prserv.getinfo()
+    def serve_forever(self):
+        self.db = prserv.db.PRData(self.dbfile)
+        self.table = self.db["PRMAIN"]
 
-    def getinfo(self):
-        return (self.host, self.port)
+        logger.debug("Started PRServer with DBfile: %s, Address: %s, PID: %s" %
+                     (self.dbfile, self.address, str(os.getpid())))
 
-class PRServerConnection(object):
-    def __init__(self, host, port):
-        if is_local_special(host, port):
-            host, port = singleton.getinfo()
-        self.host = host
-        self.port = port
-        self.connection, self.transport = bb.server.xmlrpcclient._create_server(self.host, self.port)
+        super().serve_forever()
 
-    def getPR(self, version, pkgarch, checksum):
-        return self.connection.getPR(version, pkgarch, checksum)
+        self.table.sync_if_dirty()
+        self.db.disconnect()
 
-    def ping(self):
-        return self.connection.ping()
+    def signal_handler(self):
+        super().signal_handler()
+        if self.table:
+            self.table.sync()
 
-    def export(self,version=None, pkgarch=None, checksum=None, colinfo=True):
-        return self.connection.export(version, pkgarch, checksum, colinfo)
+class PRServSingleton(object):
+    def __init__(self, dbfile, logfile, host, port):
+        self.dbfile = dbfile
+        self.logfile = logfile
+        self.host = host
+        self.port = port
 
-    def importone(self, version, pkgarch, checksum, value):
-        return self.connection.importone(version, pkgarch, checksum, value)
+    def start(self):
+        self.prserv = PRServer(self.dbfile)
+        self.prserv.start_tcp_server(self.host, self.port)
+        self.process = multiprocessing.Process(target=self.prserv.serve_forever)
+        self.process.start()
 
-    def getinfo(self):
-        return self.host, self.port
+        if not self.port:
+            self.port = int(self.prserv.address.rsplit(':', 1)[1])
+
+class PRAsyncClient(bb.asyncrpc.AsyncClient):
+    def __init__(self):
+        super().__init__('PRSERVICE', '1.0', logger)
+
+    async def getPR(self, version, pkgarch, checksum):
+        response = await self.send_message(
+            {'get-pr': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum}}
+        )
+        if response:
+            return response['value']
+
+    async def importone(self, version, pkgarch, checksum, value):
+        response = await self.send_message(
+            {'import-one': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum, 'value': value}}
+        )
+        if response:
+            return response['value']
+
+    async def export(self, version, pkgarch, checksum, colinfo):
+        response = await self.send_message(
+            {'export': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum, 'colinfo': colinfo}}
+        )
+        if response:
+            return (response['metainfo'], response['datainfo'])
+
+class PRClient(bb.asyncrpc.Client):
+    def __init__(self):
+        super().__init__()
+        self._add_methods('getPR', 'importone', 'export')
+
+    def _get_async_client(self):
+        return PRAsyncClient()
 
 def run_as_daemon(func, pidfile, logfile):
     """
@@ -240,15 +243,13 @@ def start_daemon(dbfile, host, port, logfile):
                             % pidfile)
         return 1
 
-    server = PRServer(os.path.abspath(dbfile), os.path.abspath(logfile), (ip,port))
-    run_as_daemon(server.serve_forever, pidfile, os.path.abspath(logfile))
+    dbfile = os.path.abspath(dbfile)
+    def daemon_main():
+        server = PRServer(dbfile)
+        server.start_tcp_server(host, port)
+        server.serve_forever()
 
-    # Sometimes, the port (i.e. localhost:0) indicated by the user does not match with
-    # the one the server actually is listening, so at least warn the user about it
-    _,rport = server.getinfo()
-    if port != rport:
-        sys.stdout.write("Server is listening at port %s instead of %s\n"
-                         % (rport,port))
+    run_as_daemon(daemon_main, pidfile, os.path.abspath(logfile))
     return 0
 
 def stop_daemon(host, port):
@@ -302,7 +303,7 @@ def is_running(pid):
     return True
 
 def is_local_special(host, port):
-    if host.strip().upper() == 'localhost'.upper() and (not port):
+    if host.strip().lower() == 'localhost' and not port:
         return True
     else:
         return False
@@ -340,20 +341,19 @@ def auto_start(d):
                auto_shutdown()
         if not singleton:
             bb.utils.mkdirhier(cachedir)
-            singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), ("localhost",0))
+            singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), "localhost", 0)
             singleton.start()
     if singleton:
-        host, port = singleton.getinfo()
+        host = singleton.host
+        port = singleton.port
     else:
         host = host_params[0]
         port = int(host_params[1])
 
     try:
-        connection = PRServerConnection(host,port)
-        connection.ping()
-        realhost, realport = connection.getinfo()
-        return str(realhost) + ":" + str(realport)
-        
+        ping(host, port)
+        return str(host) + ":" + str(port)
+
     except Exception:
         logger.critical("PRservice %s:%d not available" % (host, port))
         raise PRServiceConfigError
@@ -368,8 +368,17 @@ def auto_shutdown():
         singleton = None
 
 def ping(host, port):
-    conn=PRServerConnection(host, port)
+    conn=PRClient()
+    conn.connect_tcp(host, port)
     return conn.ping()
 
 def connect(host, port):
-    return PRServerConnection(host, port)
+    global singleton
+
+    if host.strip().lower() == 'localhost' and not port:
+        host = 'localhost'
+        port = singleton.port
+
+    conn = PRClient()
+    conn.connect_tcp(host, port)
+    return conn
-- 
2.26.2


^ permalink raw reply related	[flat|nested] 5+ messages in thread

end of thread, other threads:[~2021-06-07 15:03 UTC | newest]

Thread overview: 5+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-06-07 15:03 [PATCH v2 0/4] Re-implement prserv on top of asyncrpc Paul Barker
2021-06-07 15:03 ` [PATCH v2 1/4] asyncrpc: Fix bad message error in client Paul Barker
2021-06-07 15:03 ` [PATCH v2 2/4] asyncrpc: Set timeout when waiting for reply from server Paul Barker
2021-06-07 15:03 ` [PATCH v2 3/4] prserv: Add 15s timeout to auto shutdown Paul Barker
2021-06-07 15:03 ` [PATCH v2 4/4] prserv: Replace XML RPC with modern asyncrpc implementation Paul Barker

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.