All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH 0/6] Preparation for prserv upgrade to json-based asyncrpc
@ 2021-04-26  8:16 Paul Barker
  2021-04-26  8:16 ` [PATCH 1/6] hashserv: Use generic ConnectionError Paul Barker
                   ` (5 more replies)
  0 siblings, 6 replies; 7+ messages in thread
From: Paul Barker @ 2021-04-26  8:16 UTC (permalink / raw)
  To: bitbake-devel, Richard Purdie, Joshua Watt; +Cc: Paul Barker

These patches put the groundwork in place for the planned upgrades of the
prservice (which will include moving to the new json-based asyncrpc system
used by hashserv and using the Python multiprocessing module where possible
in the service startup/shutdown code).

The common code which will be shared by both hashserv and prserv is moved to
a new asyncrpc module.

Obsolete and unused code in prserv is dropped.

A new connect() method is added to the prserv module to abstract away the
details of how the connection is started. This can then be used by the
prservice module in oe-core.  As this patch series doesn't yet remove the
PRServerConnection class the change in oe-core doesn't have to be made
immediately. I'll submit the required patch for oe-core after this series is
merged to bitbake.

These changes have been tested by running a few local builds with and without
hashserv enabled, running the full bitbake-selftest and oe-selftest suites.
A couple of failures were seen in oe-selftest but these are related to my
host system configuration (socat not installed, firewall blocking ports, etc)
so I'm fairly confident they aren't caused by this patch series.

Paul Barker (6):
  hashserv: Use generic ConnectionError
  asyncrpc: Common implementation of RPC using json & asyncio
  hashserv: Refactor to use asyncrpc
  prserv: Drop obsolete python version check
  prserv: Drop unused dump_db method
  prserv: Add connect function

 lib/bb/asyncrpc/__init__.py |  31 +++++
 lib/bb/asyncrpc/client.py   | 145 ++++++++++++++++++++++++
 lib/bb/asyncrpc/serv.py     | 218 ++++++++++++++++++++++++++++++++++++
 lib/bb/siggen.py            |   6 +-
 lib/hashserv/client.py      | 147 ++++--------------------
 lib/hashserv/server.py      | 210 +++++-----------------------------
 lib/hashserv/tests.py       |   3 +-
 lib/prserv/serv.py          |  31 +----
 8 files changed, 445 insertions(+), 346 deletions(-)
 create mode 100644 lib/bb/asyncrpc/__init__.py
 create mode 100644 lib/bb/asyncrpc/client.py
 create mode 100644 lib/bb/asyncrpc/serv.py

-- 
2.26.2


^ permalink raw reply	[flat|nested] 7+ messages in thread

* [PATCH 1/6] hashserv: Use generic ConnectionError
  2021-04-26  8:16 [PATCH 0/6] Preparation for prserv upgrade to json-based asyncrpc Paul Barker
@ 2021-04-26  8:16 ` Paul Barker
  2021-04-26  8:16 ` [PATCH 2/6] asyncrpc: Common implementation of RPC using json & asyncio Paul Barker
                   ` (4 subsequent siblings)
  5 siblings, 0 replies; 7+ messages in thread
From: Paul Barker @ 2021-04-26  8:16 UTC (permalink / raw)
  To: bitbake-devel, Richard Purdie, Joshua Watt; +Cc: Paul Barker

The Python built-in ConnectionError type can be used instead of a custom
HashConnectionError type. This will make code refactoring simpler.

Signed-off-by: Paul Barker <pbarker@konsulko.com>
---
 lib/bb/siggen.py       |  6 +++---
 lib/hashserv/client.py | 20 ++++++++------------
 lib/hashserv/tests.py  |  3 +--
 3 files changed, 12 insertions(+), 17 deletions(-)

diff --git a/lib/bb/siggen.py b/lib/bb/siggen.py
index 0d88c6ec6..f3fa3000f 100644
--- a/lib/bb/siggen.py
+++ b/lib/bb/siggen.py
@@ -542,7 +542,7 @@ class SignatureGeneratorUniHashMixIn(object):
                 hashequiv_logger.debug((1, 2)[unihash == taskhash], 'Found unihash %s in place of %s for %s from %s' % (unihash, taskhash, tid, self.server))
             else:
                 hashequiv_logger.debug2('No reported unihash for %s:%s from %s' % (tid, taskhash, self.server))
-        except hashserv.client.HashConnectionError as e:
+        except ConnectionError as e:
             bb.warn('Error contacting Hash Equivalence Server %s: %s' % (self.server, str(e)))
 
         self.set_unihash(tid, unihash)
@@ -621,7 +621,7 @@ class SignatureGeneratorUniHashMixIn(object):
                     d.setVar('BB_UNIHASH', new_unihash)
                 else:
                     hashequiv_logger.debug('Reported task %s as unihash %s to %s' % (taskhash, unihash, self.server))
-            except hashserv.client.HashConnectionError as e:
+            except ConnectionError as e:
                 bb.warn('Error contacting Hash Equivalence Server %s: %s' % (self.server, str(e)))
         finally:
             if sigfile:
@@ -661,7 +661,7 @@ class SignatureGeneratorUniHashMixIn(object):
                 # TODO: What to do here?
                 hashequiv_logger.verbose('Task %s unihash reported as unwanted hash %s' % (tid, finalunihash))
 
-        except hashserv.client.HashConnectionError as e:
+        except ConnectionError as e:
             bb.warn('Error contacting Hash Equivalence Server %s: %s' % (self.server, str(e)))
 
         return False
diff --git a/lib/hashserv/client.py b/lib/hashserv/client.py
index e05c1eb56..f370cba63 100644
--- a/lib/hashserv/client.py
+++ b/lib/hashserv/client.py
@@ -14,10 +14,6 @@ from . import chunkify, DEFAULT_MAX_CHUNK, create_async_client
 logger = logging.getLogger("hashserv.client")
 
 
-class HashConnectionError(Exception):
-    pass
-
-
 class AsyncClient(object):
     MODE_NORMAL = 0
     MODE_GET_STREAM = 1
@@ -66,14 +62,14 @@ class AsyncClient(object):
                 return await proc()
             except (
                 OSError,
-                HashConnectionError,
+                ConnectionError,
                 json.JSONDecodeError,
                 UnicodeDecodeError,
             ) as e:
                 logger.warning("Error talking to server: %s" % e)
                 if count >= 3:
-                    if not isinstance(e, HashConnectionError):
-                        raise HashConnectionError(str(e))
+                    if not isinstance(e, ConnectionError):
+                        raise ConnectionError(str(e))
                     raise e
                 await self.close()
                 count += 1
@@ -82,12 +78,12 @@ class AsyncClient(object):
         async def get_line():
             line = await self.reader.readline()
             if not line:
-                raise HashConnectionError("Connection closed")
+                raise ConnectionError("Connection closed")
 
             line = line.decode("utf-8")
 
             if not line.endswith("\n"):
-                raise HashConnectionError("Bad message %r" % message)
+                raise ConnectionError("Bad message %r" % message)
 
             return line
 
@@ -119,7 +115,7 @@ class AsyncClient(object):
             await self.writer.drain()
             l = await self.reader.readline()
             if not l:
-                raise HashConnectionError("Connection closed")
+                raise ConnectionError("Connection closed")
             return l.decode("utf-8").rstrip()
 
         return await self._send_wrapper(proc)
@@ -128,11 +124,11 @@ class AsyncClient(object):
         if new_mode == self.MODE_NORMAL and self.mode == self.MODE_GET_STREAM:
             r = await self.send_stream("END")
             if r != "ok":
-                raise HashConnectionError("Bad response from server %r" % r)
+                raise ConnectionError("Bad response from server %r" % r)
         elif new_mode == self.MODE_GET_STREAM and self.mode == self.MODE_NORMAL:
             r = await self.send_message({"get-stream": None})
             if r != "ok":
-                raise HashConnectionError("Bad response from server %r" % r)
+                raise ConnectionError("Bad response from server %r" % r)
         elif new_mode != self.mode:
             raise Exception(
                 "Undefined mode transition %r -> %r" % (self.mode, new_mode)
diff --git a/lib/hashserv/tests.py b/lib/hashserv/tests.py
index 1a696481e..e2b762dbf 100644
--- a/lib/hashserv/tests.py
+++ b/lib/hashserv/tests.py
@@ -6,7 +6,6 @@
 #
 
 from . import create_server, create_client
-from .client import HashConnectionError
 import hashlib
 import logging
 import multiprocessing
@@ -277,7 +276,7 @@ class HashEquivalenceCommonTests(object):
         outhash2 = '3c979c3db45c569f51ab7626a4651074be3a9d11a84b1db076f5b14f7d39db44'
         unihash2 = '90e9bc1d1f094c51824adca7f8ea79a048d68824'
 
-        with self.assertRaises(HashConnectionError):
+        with self.assertRaises(ConnectionError):
             ro_client.report_unihash(taskhash2, self.METHOD, outhash2, unihash2)
 
         # Ensure that the database was not modified
-- 
2.26.2


^ permalink raw reply related	[flat|nested] 7+ messages in thread

* [PATCH 2/6] asyncrpc: Common implementation of RPC using json & asyncio
  2021-04-26  8:16 [PATCH 0/6] Preparation for prserv upgrade to json-based asyncrpc Paul Barker
  2021-04-26  8:16 ` [PATCH 1/6] hashserv: Use generic ConnectionError Paul Barker
@ 2021-04-26  8:16 ` Paul Barker
  2021-04-26  8:16 ` [PATCH 3/6] hashserv: Refactor to use asyncrpc Paul Barker
                   ` (3 subsequent siblings)
  5 siblings, 0 replies; 7+ messages in thread
From: Paul Barker @ 2021-04-26  8:16 UTC (permalink / raw)
  To: bitbake-devel, Richard Purdie, Joshua Watt; +Cc: Paul Barker

The hashserv module implements a flexible RPC mechanism based on sending
json formatted messages over unix or tcp sockets and uses Python's
asyncio features to build an efficient message loop on both the client
and server side. Much of this implementation is not specific to the
hash equivalency service and can be extracted into a new module for
easy re-use elsewhere in bitbake.

Signed-off-by: Paul Barker <pbarker@konsulko.com>
---
 lib/bb/asyncrpc/__init__.py |  31 +++++
 lib/bb/asyncrpc/client.py   | 145 ++++++++++++++++++++++++
 lib/bb/asyncrpc/serv.py     | 218 ++++++++++++++++++++++++++++++++++++
 3 files changed, 394 insertions(+)
 create mode 100644 lib/bb/asyncrpc/__init__.py
 create mode 100644 lib/bb/asyncrpc/client.py
 create mode 100644 lib/bb/asyncrpc/serv.py

diff --git a/lib/bb/asyncrpc/__init__.py b/lib/bb/asyncrpc/__init__.py
new file mode 100644
index 000000000..b2bec31ab
--- /dev/null
+++ b/lib/bb/asyncrpc/__init__.py
@@ -0,0 +1,31 @@
+#
+# SPDX-License-Identifier: GPL-2.0-only
+#
+
+import itertools
+import json
+
+# The Python async server defaults to a 64K receive buffer, so we hardcode our
+# maximum chunk size. It would be better if the client and server reported to
+# each other what the maximum chunk sizes were, but that will slow down the
+# connection setup with a round trip delay so I'd rather not do that unless it
+# is necessary
+DEFAULT_MAX_CHUNK = 32 * 1024
+
+
+def chunkify(msg, max_chunk):
+    if len(msg) < max_chunk - 1:
+        yield ''.join((msg, "\n"))
+    else:
+        yield ''.join((json.dumps({
+                'chunk-stream': None
+            }), "\n"))
+
+        args = [iter(msg)] * (max_chunk - 1)
+        for m in map(''.join, itertools.zip_longest(*args, fillvalue='')):
+            yield ''.join(itertools.chain(m, "\n"))
+        yield "\n"
+
+
+from .client import AsyncClient, Client
+from .serv import AsyncServer, AsyncServerConnection
diff --git a/lib/bb/asyncrpc/client.py b/lib/bb/asyncrpc/client.py
new file mode 100644
index 000000000..4cdad9ac3
--- /dev/null
+++ b/lib/bb/asyncrpc/client.py
@@ -0,0 +1,145 @@
+#
+# SPDX-License-Identifier: GPL-2.0-only
+#
+
+import abc
+import asyncio
+import json
+import os
+import socket
+from . import chunkify, DEFAULT_MAX_CHUNK
+
+
+class AsyncClient(object):
+    def __init__(self, proto_name, proto_version, logger):
+        self.reader = None
+        self.writer = None
+        self.max_chunk = DEFAULT_MAX_CHUNK
+        self.proto_name = proto_name
+        self.proto_version = proto_version
+        self.logger = logger
+
+    async def connect_tcp(self, address, port):
+        async def connect_sock():
+            return await asyncio.open_connection(address, port)
+
+        self._connect_sock = connect_sock
+
+    async def connect_unix(self, path):
+        async def connect_sock():
+            return await asyncio.open_unix_connection(path)
+
+        self._connect_sock = connect_sock
+
+    async def setup_connection(self):
+        s = '%s %s\n\n' % (self.proto_name, self.proto_version)
+        self.writer.write(s.encode("utf-8"))
+        await self.writer.drain()
+
+    async def connect(self):
+        if self.reader is None or self.writer is None:
+            (self.reader, self.writer) = await self._connect_sock()
+            await self.setup_connection()
+
+    async def close(self):
+        self.reader = None
+
+        if self.writer is not None:
+            self.writer.close()
+            self.writer = None
+
+    async def _send_wrapper(self, proc):
+        count = 0
+        while True:
+            try:
+                await self.connect()
+                return await proc()
+            except (
+                OSError,
+                ConnectionError,
+                json.JSONDecodeError,
+                UnicodeDecodeError,
+            ) as e:
+                self.logger.warning("Error talking to server: %s" % e)
+                if count >= 3:
+                    if not isinstance(e, ConnectionError):
+                        raise ConnectionError(str(e))
+                    raise e
+                await self.close()
+                count += 1
+
+    async def send_message(self, msg):
+        async def get_line():
+            line = await self.reader.readline()
+            if not line:
+                raise ConnectionError("Connection closed")
+
+            line = line.decode("utf-8")
+
+            if not line.endswith("\n"):
+                raise ConnectionError("Bad message %r" % msg)
+
+            return line
+
+        async def proc():
+            for c in chunkify(json.dumps(msg), self.max_chunk):
+                self.writer.write(c.encode("utf-8"))
+            await self.writer.drain()
+
+            l = await get_line()
+
+            m = json.loads(l)
+            if m and "chunk-stream" in m:
+                lines = []
+                while True:
+                    l = (await get_line()).rstrip("\n")
+                    if not l:
+                        break
+                    lines.append(l)
+
+                m = json.loads("".join(lines))
+
+            return m
+
+        return await self._send_wrapper(proc)
+
+
+class Client(object):
+    def __init__(self):
+        self.client = self._get_async_client()
+        self.loop = asyncio.new_event_loop()
+
+        self._add_methods('connect_tcp', 'close')
+
+    @abc.abstractmethod
+    def _get_async_client(self):
+        pass
+
+    def _get_downcall_wrapper(self, downcall):
+        def wrapper(*args, **kwargs):
+            return self.loop.run_until_complete(downcall(*args, **kwargs))
+
+        return wrapper
+
+    def _add_methods(self, *methods):
+        for m in methods:
+            downcall = getattr(self.client, m)
+            setattr(self, m, self._get_downcall_wrapper(downcall))
+
+    def connect_unix(self, path):
+        # AF_UNIX has path length issues so chdir here to workaround
+        cwd = os.getcwd()
+        try:
+            os.chdir(os.path.dirname(path))
+            self.loop.run_until_complete(self.client.connect_unix(os.path.basename(path)))
+            self.loop.run_until_complete(self.client.connect())
+        finally:
+            os.chdir(cwd)
+
+    @property
+    def max_chunk(self):
+        return self.client.max_chunk
+
+    @max_chunk.setter
+    def max_chunk(self, value):
+        self.client.max_chunk = value
diff --git a/lib/bb/asyncrpc/serv.py b/lib/bb/asyncrpc/serv.py
new file mode 100644
index 000000000..cb3384639
--- /dev/null
+++ b/lib/bb/asyncrpc/serv.py
@@ -0,0 +1,218 @@
+#
+# SPDX-License-Identifier: GPL-2.0-only
+#
+
+import abc
+import asyncio
+import json
+import os
+import signal
+import socket
+import sys
+from . import chunkify, DEFAULT_MAX_CHUNK
+
+
+class ClientError(Exception):
+    pass
+
+
+class ServerError(Exception):
+    pass
+
+
+class AsyncServerConnection(object):
+    def __init__(self, reader, writer, proto_name, logger):
+        self.reader = reader
+        self.writer = writer
+        self.proto_name = proto_name
+        self.max_chunk = DEFAULT_MAX_CHUNK
+        self.handlers = {
+            'chunk-stream': self.handle_chunk,
+        }
+        self.logger = logger
+
+    async def process_requests(self):
+        try:
+            self.addr = self.writer.get_extra_info('peername')
+            self.logger.debug('Client %r connected' % (self.addr,))
+
+            # Read protocol and version
+            client_protocol = await self.reader.readline()
+            if client_protocol is None:
+                return
+
+            (client_proto_name, client_proto_version) = client_protocol.decode('utf-8').rstrip().split()
+            if client_proto_name != self.proto_name:
+                self.logger.debug('Rejecting invalid protocol %s' % (self.proto_name))
+                return
+
+            self.proto_version = tuple(int(v) for v in client_proto_version.split('.'))
+            if not self.validate_proto_version():
+                self.logger.debug('Rejecting invalid protocol version %s' % (client_proto_version))
+                return
+
+            # Read headers. Currently, no headers are implemented, so look for
+            # an empty line to signal the end of the headers
+            while True:
+                line = await self.reader.readline()
+                if line is None:
+                    return
+
+                line = line.decode('utf-8').rstrip()
+                if not line:
+                    break
+
+            # Handle messages
+            while True:
+                d = await self.read_message()
+                if d is None:
+                    break
+                await self.dispatch_message(d)
+                await self.writer.drain()
+        except ClientError as e:
+            self.logger.error(str(e))
+        finally:
+            self.writer.close()
+
+    async def dispatch_message(self, msg):
+        for k in self.handlers.keys():
+            if k in msg:
+                self.logger.debug('Handling %s' % k)
+                await self.handlers[k](msg[k])
+                return
+
+        raise ClientError("Unrecognized command %r" % msg)
+
+    def write_message(self, msg):
+        for c in chunkify(json.dumps(msg), self.max_chunk):
+            self.writer.write(c.encode('utf-8'))
+
+    async def read_message(self):
+        l = await self.reader.readline()
+        if not l:
+            return None
+
+        try:
+            message = l.decode('utf-8')
+
+            if not message.endswith('\n'):
+                return None
+
+            return json.loads(message)
+        except (json.JSONDecodeError, UnicodeDecodeError) as e:
+            self.logger.error('Bad message from client: %r' % message)
+            raise e
+
+    async def handle_chunk(self, request):
+        lines = []
+        try:
+            while True:
+                l = await self.reader.readline()
+                l = l.rstrip(b"\n").decode("utf-8")
+                if not l:
+                    break
+                lines.append(l)
+
+            msg = json.loads(''.join(lines))
+        except (json.JSONDecodeError, UnicodeDecodeError) as e:
+            self.logger.error('Bad message from client: %r' % lines)
+            raise e
+
+        if 'chunk-stream' in msg:
+            raise ClientError("Nested chunks are not allowed")
+
+        await self.dispatch_message(msg)
+
+
+class AsyncServer(object):
+    def __init__(self, logger, loop=None):
+        if loop is None:
+            self.loop = asyncio.new_event_loop()
+            self.close_loop = True
+        else:
+            self.loop = loop
+            self.close_loop = False
+
+        self._cleanup_socket = None
+        self.logger = logger
+
+    def start_tcp_server(self, host, port):
+        self.server = self.loop.run_until_complete(
+            asyncio.start_server(self.handle_client, host, port, loop=self.loop)
+        )
+
+        for s in self.server.sockets:
+            self.logger.info('Listening on %r' % (s.getsockname(),))
+            # Newer python does this automatically. Do it manually here for
+            # maximum compatibility
+            s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
+            s.setsockopt(socket.SOL_TCP, socket.TCP_QUICKACK, 1)
+
+        name = self.server.sockets[0].getsockname()
+        if self.server.sockets[0].family == socket.AF_INET6:
+            self.address = "[%s]:%d" % (name[0], name[1])
+        else:
+            self.address = "%s:%d" % (name[0], name[1])
+
+    def start_unix_server(self, path):
+        def cleanup():
+            os.unlink(path)
+
+        cwd = os.getcwd()
+        try:
+            # Work around path length limits in AF_UNIX
+            os.chdir(os.path.dirname(path))
+            self.server = self.loop.run_until_complete(
+                asyncio.start_unix_server(self.handle_client, os.path.basename(path), loop=self.loop)
+            )
+        finally:
+            os.chdir(cwd)
+
+        self.logger.info('Listening on %r' % path)
+
+        self._cleanup_socket = cleanup
+        self.address = "unix://%s" % os.path.abspath(path)
+
+    @abc.abstractmethod
+    def accept_client(self, reader, writer):
+        pass
+
+    async def handle_client(self, reader, writer):
+        # writer.transport.set_write_buffer_limits(0)
+        try:
+            client = self.accept_client(reader, writer)
+            await client.process_requests()
+        except Exception as e:
+            import traceback
+            self.logger.error('Error from client: %s' % str(e), exc_info=True)
+            traceback.print_exc()
+            writer.close()
+        self.logger.info('Client disconnected')
+
+    def run_loop_forever(self):
+        try:
+            self.loop.run_forever()
+        except KeyboardInterrupt:
+            pass
+
+    def signal_handler(self):
+        self.loop.stop()
+
+    def serve_forever(self):
+        asyncio.set_event_loop(self.loop)
+        try:
+            self.loop.add_signal_handler(signal.SIGTERM, self.signal_handler)
+
+            self.run_loop_forever()
+            self.server.close()
+
+            self.loop.run_until_complete(self.server.wait_closed())
+            self.logger.info('Server shutting down')
+        finally:
+            if self.close_loop:
+                if sys.version_info >= (3, 6):
+                    self.loop.run_until_complete(self.loop.shutdown_asyncgens())
+                self.loop.close()
+
+            if self._cleanup_socket is not None:
+                self._cleanup_socket()
-- 
2.26.2


^ permalink raw reply related	[flat|nested] 7+ messages in thread

* [PATCH 3/6] hashserv: Refactor to use asyncrpc
  2021-04-26  8:16 [PATCH 0/6] Preparation for prserv upgrade to json-based asyncrpc Paul Barker
  2021-04-26  8:16 ` [PATCH 1/6] hashserv: Use generic ConnectionError Paul Barker
  2021-04-26  8:16 ` [PATCH 2/6] asyncrpc: Common implementation of RPC using json & asyncio Paul Barker
@ 2021-04-26  8:16 ` Paul Barker
  2021-04-26  8:16 ` [PATCH 4/6] prserv: Drop obsolete python version check Paul Barker
                   ` (2 subsequent siblings)
  5 siblings, 0 replies; 7+ messages in thread
From: Paul Barker @ 2021-04-26  8:16 UTC (permalink / raw)
  To: bitbake-devel, Richard Purdie, Joshua Watt; +Cc: Paul Barker

The asyncrpc module can now be used to provide the json & asyncio based
RPC system used by hashserv.

Signed-off-by: Paul Barker <pbarker@konsulko.com>
---
 lib/hashserv/client.py | 137 ++++-----------------------
 lib/hashserv/server.py | 210 +++++------------------------------------
 2 files changed, 41 insertions(+), 306 deletions(-)

diff --git a/lib/hashserv/client.py b/lib/hashserv/client.py
index f370cba63..531170967 100644
--- a/lib/hashserv/client.py
+++ b/lib/hashserv/client.py
@@ -8,106 +8,26 @@ import json
 import logging
 import socket
 import os
-from . import chunkify, DEFAULT_MAX_CHUNK, create_async_client
+import bb.asyncrpc
+from . import create_async_client
 
 
 logger = logging.getLogger("hashserv.client")
 
 
-class AsyncClient(object):
+class AsyncClient(bb.asyncrpc.AsyncClient):
     MODE_NORMAL = 0
     MODE_GET_STREAM = 1
 
     def __init__(self):
-        self.reader = None
-        self.writer = None
+        super().__init__('OEHASHEQUIV', '1.1', logger)
         self.mode = self.MODE_NORMAL
-        self.max_chunk = DEFAULT_MAX_CHUNK
 
-    async def connect_tcp(self, address, port):
-        async def connect_sock():
-            return await asyncio.open_connection(address, port)
-
-        self._connect_sock = connect_sock
-
-    async def connect_unix(self, path):
-        async def connect_sock():
-            return await asyncio.open_unix_connection(path)
-
-        self._connect_sock = connect_sock
-
-    async def connect(self):
-        if self.reader is None or self.writer is None:
-            (self.reader, self.writer) = await self._connect_sock()
-
-            self.writer.write("OEHASHEQUIV 1.1\n\n".encode("utf-8"))
-            await self.writer.drain()
-
-            cur_mode = self.mode
-            self.mode = self.MODE_NORMAL
-            await self._set_mode(cur_mode)
-
-    async def close(self):
-        self.reader = None
-
-        if self.writer is not None:
-            self.writer.close()
-            self.writer = None
-
-    async def _send_wrapper(self, proc):
-        count = 0
-        while True:
-            try:
-                await self.connect()
-                return await proc()
-            except (
-                OSError,
-                ConnectionError,
-                json.JSONDecodeError,
-                UnicodeDecodeError,
-            ) as e:
-                logger.warning("Error talking to server: %s" % e)
-                if count >= 3:
-                    if not isinstance(e, ConnectionError):
-                        raise ConnectionError(str(e))
-                    raise e
-                await self.close()
-                count += 1
-
-    async def send_message(self, msg):
-        async def get_line():
-            line = await self.reader.readline()
-            if not line:
-                raise ConnectionError("Connection closed")
-
-            line = line.decode("utf-8")
-
-            if not line.endswith("\n"):
-                raise ConnectionError("Bad message %r" % message)
-
-            return line
-
-        async def proc():
-            for c in chunkify(json.dumps(msg), self.max_chunk):
-                self.writer.write(c.encode("utf-8"))
-            await self.writer.drain()
-
-            l = await get_line()
-
-            m = json.loads(l)
-            if m and "chunk-stream" in m:
-                lines = []
-                while True:
-                    l = (await get_line()).rstrip("\n")
-                    if not l:
-                        break
-                    lines.append(l)
-
-                m = json.loads("".join(lines))
-
-            return m
-
-        return await self._send_wrapper(proc)
+    async def setup_connection(self):
+        await super().setup_connection()
+        cur_mode = self.mode
+        self.mode = self.MODE_NORMAL
+        await self._set_mode(cur_mode)
 
     async def send_stream(self, msg):
         async def proc():
@@ -185,12 +105,10 @@ class AsyncClient(object):
         return (await self.send_message({"backfill-wait": None}))["tasks"]
 
 
-class Client(object):
+class Client(bb.asyncrpc.Client):
     def __init__(self):
-        self.client = AsyncClient()
-        self.loop = asyncio.new_event_loop()
-
-        for call in (
+        super().__init__()
+        self._add_methods(
             "connect_tcp",
             "close",
             "get_unihash",
@@ -200,30 +118,7 @@ class Client(object):
             "get_stats",
             "reset_stats",
             "backfill_wait",
-        ):
-            downcall = getattr(self.client, call)
-            setattr(self, call, self._get_downcall_wrapper(downcall))
-
-    def _get_downcall_wrapper(self, downcall):
-        def wrapper(*args, **kwargs):
-            return self.loop.run_until_complete(downcall(*args, **kwargs))
-
-        return wrapper
-
-    def connect_unix(self, path):
-        # AF_UNIX has path length issues so chdir here to workaround
-        cwd = os.getcwd()
-        try:
-            os.chdir(os.path.dirname(path))
-            self.loop.run_until_complete(self.client.connect_unix(os.path.basename(path)))
-            self.loop.run_until_complete(self.client.connect())
-        finally:
-            os.chdir(cwd)
-
-    @property
-    def max_chunk(self):
-        return self.client.max_chunk
-
-    @max_chunk.setter
-    def max_chunk(self, value):
-        self.client.max_chunk = value
+        )
+
+    def _get_async_client(self):
+        return AsyncClient()
diff --git a/lib/hashserv/server.py b/lib/hashserv/server.py
index a0dc0c170..c941c0e9d 100644
--- a/lib/hashserv/server.py
+++ b/lib/hashserv/server.py
@@ -14,7 +14,9 @@ import signal
 import socket
 import sys
 import time
-from . import chunkify, DEFAULT_MAX_CHUNK, create_async_client, TABLE_COLUMNS
+from . import create_async_client, TABLE_COLUMNS
+import bb.asyncrpc
+
 
 logger = logging.getLogger('hashserv.server')
 
@@ -109,12 +111,6 @@ class Stats(object):
         return {k: getattr(self, k) for k in ('num', 'total_time', 'max_time', 'average', 'stdev')}
 
 
-class ClientError(Exception):
-    pass
-
-class ServerError(Exception):
-    pass
-
 def insert_task(cursor, data, ignore=False):
     keys = sorted(data.keys())
     query = '''INSERT%s INTO tasks_v2 (%s) VALUES (%s)''' % (
@@ -149,7 +145,7 @@ async def copy_outhash_from_upstream(client, db, method, outhash, taskhash):
 
     return d
 
-class ServerClient(object):
+class ServerClient(bb.asyncrpc.AsyncServerConnection):
     FAST_QUERY = 'SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1'
     ALL_QUERY =  'SELECT *                         FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1'
     OUTHASH_QUERY = '''
@@ -168,21 +164,19 @@ class ServerClient(object):
         '''
 
     def __init__(self, reader, writer, db, request_stats, backfill_queue, upstream, read_only):
-        self.reader = reader
-        self.writer = writer
+        super().__init__(reader, writer, 'OEHASHEQUIV', logger)
         self.db = db
         self.request_stats = request_stats
-        self.max_chunk = DEFAULT_MAX_CHUNK
+        self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK
         self.backfill_queue = backfill_queue
         self.upstream = upstream
 
-        self.handlers = {
+        self.handlers.update({
             'get': self.handle_get,
             'get-outhash': self.handle_get_outhash,
             'get-stream': self.handle_get_stream,
             'get-stats': self.handle_get_stats,
-            'chunk-stream': self.handle_chunk,
-        }
+        })
 
         if not read_only:
             self.handlers.update({
@@ -192,56 +186,19 @@ class ServerClient(object):
                 'backfill-wait': self.handle_backfill_wait,
             })
 
+    def validate_proto_version(self):
+        return (self.proto_version > (1, 0) and self.proto_version <= (1, 1))
+
     async def process_requests(self):
         if self.upstream is not None:
             self.upstream_client = await create_async_client(self.upstream)
         else:
             self.upstream_client = None
 
-        try:
-
-
-            self.addr = self.writer.get_extra_info('peername')
-            logger.debug('Client %r connected' % (self.addr,))
-
-            # Read protocol and version
-            protocol = await self.reader.readline()
-            if protocol is None:
-                return
-
-            (proto_name, proto_version) = protocol.decode('utf-8').rstrip().split()
-            if proto_name != 'OEHASHEQUIV':
-                return
-
-            proto_version = tuple(int(v) for v in proto_version.split('.'))
-            if proto_version < (1, 0) or proto_version > (1, 1):
-                return
-
-            # Read headers. Currently, no headers are implemented, so look for
-            # an empty line to signal the end of the headers
-            while True:
-                line = await self.reader.readline()
-                if line is None:
-                    return
+        await super().process_requests()
 
-                line = line.decode('utf-8').rstrip()
-                if not line:
-                    break
-
-            # Handle messages
-            while True:
-                d = await self.read_message()
-                if d is None:
-                    break
-                await self.dispatch_message(d)
-                await self.writer.drain()
-        except ClientError as e:
-            logger.error(str(e))
-        finally:
-            if self.upstream_client is not None:
-                await self.upstream_client.close()
-
-            self.writer.close()
+        if self.upstream_client is not None:
+            await self.upstream_client.close()
 
     async def dispatch_message(self, msg):
         for k in self.handlers.keys():
@@ -255,47 +212,7 @@ class ServerClient(object):
                         await self.handlers[k](msg[k])
                 return
 
-        raise ClientError("Unrecognized command %r" % msg)
-
-    def write_message(self, msg):
-        for c in chunkify(json.dumps(msg), self.max_chunk):
-            self.writer.write(c.encode('utf-8'))
-
-    async def read_message(self):
-        l = await self.reader.readline()
-        if not l:
-            return None
-
-        try:
-            message = l.decode('utf-8')
-
-            if not message.endswith('\n'):
-                return None
-
-            return json.loads(message)
-        except (json.JSONDecodeError, UnicodeDecodeError) as e:
-            logger.error('Bad message from client: %r' % message)
-            raise e
-
-    async def handle_chunk(self, request):
-        lines = []
-        try:
-            while True:
-                l = await self.reader.readline()
-                l = l.rstrip(b"\n").decode("utf-8")
-                if not l:
-                    break
-                lines.append(l)
-
-            msg = json.loads(''.join(lines))
-        except (json.JSONDecodeError, UnicodeDecodeError) as e:
-            logger.error('Bad message from client: %r' % message)
-            raise e
-
-        if 'chunk-stream' in msg:
-            raise ClientError("Nested chunks are not allowed")
-
-        await self.dispatch_message(msg)
+        raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg)
 
     async def handle_get(self, request):
         method = request['method']
@@ -499,74 +416,20 @@ class ServerClient(object):
             cursor.close()
 
 
-class Server(object):
+class Server(bb.asyncrpc.AsyncServer):
     def __init__(self, db, loop=None, upstream=None, read_only=False):
         if upstream and read_only:
-            raise ServerError("Read-only hashserv cannot pull from an upstream server")
+            raise bb.asyncrpc.ServerError("Read-only hashserv cannot pull from an upstream server")
+
+        super().__init__(logger, loop)
 
         self.request_stats = Stats()
         self.db = db
-
-        if loop is None:
-            self.loop = asyncio.new_event_loop()
-            self.close_loop = True
-        else:
-            self.loop = loop
-            self.close_loop = False
-
         self.upstream = upstream
         self.read_only = read_only
 
-        self._cleanup_socket = None
-
-    def start_tcp_server(self, host, port):
-        self.server = self.loop.run_until_complete(
-            asyncio.start_server(self.handle_client, host, port, loop=self.loop)
-        )
-
-        for s in self.server.sockets:
-            logger.info('Listening on %r' % (s.getsockname(),))
-            # Newer python does this automatically. Do it manually here for
-            # maximum compatibility
-            s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
-            s.setsockopt(socket.SOL_TCP, socket.TCP_QUICKACK, 1)
-
-        name = self.server.sockets[0].getsockname()
-        if self.server.sockets[0].family == socket.AF_INET6:
-            self.address = "[%s]:%d" % (name[0], name[1])
-        else:
-            self.address = "%s:%d" % (name[0], name[1])
-
-    def start_unix_server(self, path):
-        def cleanup():
-            os.unlink(path)
-
-        cwd = os.getcwd()
-        try:
-            # Work around path length limits in AF_UNIX
-            os.chdir(os.path.dirname(path))
-            self.server = self.loop.run_until_complete(
-                asyncio.start_unix_server(self.handle_client, os.path.basename(path), loop=self.loop)
-            )
-        finally:
-            os.chdir(cwd)
-
-        logger.info('Listening on %r' % path)
-
-        self._cleanup_socket = cleanup
-        self.address = "unix://%s" % os.path.abspath(path)
-
-    async def handle_client(self, reader, writer):
-        # writer.transport.set_write_buffer_limits(0)
-        try:
-            client = ServerClient(reader, writer, self.db, self.request_stats, self.backfill_queue, self.upstream, self.read_only)
-            await client.process_requests()
-        except Exception as e:
-            import traceback
-            logger.error('Error from client: %s' % str(e), exc_info=True)
-            traceback.print_exc()
-            writer.close()
-        logger.info('Client disconnected')
+    def accept_client(self, reader, writer):
+        return ServerClient(reader, writer, self.db, self.request_stats, self.backfill_queue, self.upstream, self.read_only)
 
     @contextmanager
     def _backfill_worker(self):
@@ -597,31 +460,8 @@ class Server(object):
         else:
             yield
 
-    def serve_forever(self):
-        def signal_handler():
-            self.loop.stop()
-
-        asyncio.set_event_loop(self.loop)
-        try:
-            self.backfill_queue = asyncio.Queue()
-
-            self.loop.add_signal_handler(signal.SIGTERM, signal_handler)
-
-            with self._backfill_worker():
-                try:
-                    self.loop.run_forever()
-                except KeyboardInterrupt:
-                    pass
-
-                self.server.close()
-
-            self.loop.run_until_complete(self.server.wait_closed())
-            logger.info('Server shutting down')
-        finally:
-            if self.close_loop:
-                if sys.version_info >= (3, 6):
-                    self.loop.run_until_complete(self.loop.shutdown_asyncgens())
-                self.loop.close()
+    def run_loop_forever(self):
+        self.backfill_queue = asyncio.Queue()
 
-            if self._cleanup_socket is not None:
-                self._cleanup_socket()
+        with self._backfill_worker():
+            super().run_loop_forever()
-- 
2.26.2


^ permalink raw reply related	[flat|nested] 7+ messages in thread

* [PATCH 4/6] prserv: Drop obsolete python version check
  2021-04-26  8:16 [PATCH 0/6] Preparation for prserv upgrade to json-based asyncrpc Paul Barker
                   ` (2 preceding siblings ...)
  2021-04-26  8:16 ` [PATCH 3/6] hashserv: Refactor to use asyncrpc Paul Barker
@ 2021-04-26  8:16 ` Paul Barker
  2021-04-26  8:16 ` [PATCH 5/6] prserv: Drop unused dump_db method Paul Barker
  2021-04-26  8:16 ` [PATCH 6/6] prserv: Add connect function Paul Barker
  5 siblings, 0 replies; 7+ messages in thread
From: Paul Barker @ 2021-04-26  8:16 UTC (permalink / raw)
  To: bitbake-devel, Richard Purdie, Joshua Watt; +Cc: Paul Barker

Bitbake no longer supports Python 2 so this version check is obsolete.

Signed-off-by: Paul Barker <pbarker@konsulko.com>
---
 lib/prserv/serv.py | 4 ----
 1 file changed, 4 deletions(-)

diff --git a/lib/prserv/serv.py b/lib/prserv/serv.py
index 25dcf8a0e..1cfbba864 100644
--- a/lib/prserv/serv.py
+++ b/lib/prserv/serv.py
@@ -18,10 +18,6 @@ import select
 
 logger = logging.getLogger("BitBake.PRserv")
 
-if sys.hexversion < 0x020600F0:
-    print("Sorry, python 2.6 or later is required.")
-    sys.exit(1)
-
 class Handler(SimpleXMLRPCRequestHandler):
     def _dispatch(self,method,params):
         try:
-- 
2.26.2


^ permalink raw reply related	[flat|nested] 7+ messages in thread

* [PATCH 5/6] prserv: Drop unused dump_db method
  2021-04-26  8:16 [PATCH 0/6] Preparation for prserv upgrade to json-based asyncrpc Paul Barker
                   ` (3 preceding siblings ...)
  2021-04-26  8:16 ` [PATCH 4/6] prserv: Drop obsolete python version check Paul Barker
@ 2021-04-26  8:16 ` Paul Barker
  2021-04-26  8:16 ` [PATCH 6/6] prserv: Add connect function Paul Barker
  5 siblings, 0 replies; 7+ messages in thread
From: Paul Barker @ 2021-04-26  8:16 UTC (permalink / raw)
  To: bitbake-devel, Richard Purdie, Joshua Watt; +Cc: Paul Barker

Signed-off-by: Paul Barker <pbarker@konsulko.com>
---
 lib/prserv/serv.py | 24 ------------------------
 1 file changed, 24 deletions(-)

diff --git a/lib/prserv/serv.py b/lib/prserv/serv.py
index 1cfbba864..6ccc7ee9e 100644
--- a/lib/prserv/serv.py
+++ b/lib/prserv/serv.py
@@ -56,7 +56,6 @@ class PRServer(SimpleXMLRPCServer):
         self.register_function(self.quit, "quit")
         self.register_function(self.ping, "ping")
         self.register_function(self.export, "export")
-        self.register_function(self.dump_db, "dump_db")
         self.register_function(self.importone, "importone")
         self.register_introspection_functions()
 
@@ -118,26 +117,6 @@ class PRServer(SimpleXMLRPCServer):
             logger.error(str(exc))
             return None
 
-    def dump_db(self):
-        """
-        Returns a script (string) that reconstructs the state of the
-        entire database at the time this function is called. The script
-        language is defined by the backing database engine, which is a
-        function of server configuration.
-        Returns None if the database engine does not support dumping to
-        script or if some other error is encountered in processing.
-        """
-        buff = io.StringIO()
-        try:
-            self.table.sync()
-            self.table.dump_db(buff)
-            return buff.getvalue()
-        except Exception as exc:
-            logger.error(str(exc))
-            return None
-        finally:
-            buff.close()
-
     def importone(self, version, pkgarch, checksum, value):
         return self.table.importone(version, pkgarch, checksum, value)
 
@@ -335,9 +314,6 @@ class PRServerConnection(object):
     def export(self,version=None, pkgarch=None, checksum=None, colinfo=True):
         return self.connection.export(version, pkgarch, checksum, colinfo)
 
-    def dump_db(self):
-        return self.connection.dump_db()
-
     def importone(self, version, pkgarch, checksum, value):
         return self.connection.importone(version, pkgarch, checksum, value)
 
-- 
2.26.2


^ permalink raw reply related	[flat|nested] 7+ messages in thread

* [PATCH 6/6] prserv: Add connect function
  2021-04-26  8:16 [PATCH 0/6] Preparation for prserv upgrade to json-based asyncrpc Paul Barker
                   ` (4 preceding siblings ...)
  2021-04-26  8:16 ` [PATCH 5/6] prserv: Drop unused dump_db method Paul Barker
@ 2021-04-26  8:16 ` Paul Barker
  5 siblings, 0 replies; 7+ messages in thread
From: Paul Barker @ 2021-04-26  8:16 UTC (permalink / raw)
  To: bitbake-devel, Richard Purdie, Joshua Watt; +Cc: Paul Barker

This function abstracts the setup of a PR service client connection so
that openembedded-core doesn't need to be updated any time the details
are changed.

Signed-off-by: Paul Barker <pbarker@konsulko.com>
---
 lib/prserv/serv.py | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/lib/prserv/serv.py b/lib/prserv/serv.py
index 6ccc7ee9e..4375d3e59 100644
--- a/lib/prserv/serv.py
+++ b/lib/prserv/serv.py
@@ -482,3 +482,6 @@ def auto_shutdown():
 def ping(host, port):
     conn=PRServerConnection(host, port)
     return conn.ping()
+
+def connect(host, port):
+    return PRServerConnection(host, port)
-- 
2.26.2


^ permalink raw reply related	[flat|nested] 7+ messages in thread

end of thread, other threads:[~2021-04-26  8:16 UTC | newest]

Thread overview: 7+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-04-26  8:16 [PATCH 0/6] Preparation for prserv upgrade to json-based asyncrpc Paul Barker
2021-04-26  8:16 ` [PATCH 1/6] hashserv: Use generic ConnectionError Paul Barker
2021-04-26  8:16 ` [PATCH 2/6] asyncrpc: Common implementation of RPC using json & asyncio Paul Barker
2021-04-26  8:16 ` [PATCH 3/6] hashserv: Refactor to use asyncrpc Paul Barker
2021-04-26  8:16 ` [PATCH 4/6] prserv: Drop obsolete python version check Paul Barker
2021-04-26  8:16 ` [PATCH 5/6] prserv: Drop unused dump_db method Paul Barker
2021-04-26  8:16 ` [PATCH 6/6] prserv: Add connect function Paul Barker

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.