All of lore.kernel.org
 help / color / mirror / Atom feed
* [RFC PATCH 00/11] Modernise prserv
@ 2021-04-12 11:41 Paul Barker
  2021-04-12 11:41 ` [RFC PATCH 01/11] hashserv: Use generic ConnectionError Paul Barker
                   ` (11 more replies)
  0 siblings, 12 replies; 14+ messages in thread
From: Paul Barker @ 2021-04-12 11:41 UTC (permalink / raw)
  To: bitbake-devel, Richard Purdie, Joshua Watt; +Cc: Paul Barker

The prserv module is converted from the old XML RPC mechinism to the more
modern json + asyncio RPC system used by hashserv. The common code now
shared between hashserv and prserv is moved to a new asyncrpc module. The
startup and shutdown code within prserv is also refactored and modernised,
using the Python multiprocessing module where possible.

These changes are not expected to land in the upcoming 3.3 release, they're
currently submitted as an RFC to get some early review.

The following change is needed in oe-core when testing these patches, this
can be submitted separately once these changes are past the RFC stage:

    diff --git a/meta/lib/oe/prservice.py b/meta/lib/oe/prservice.py
    index fcdbe66c19..15ce060ff6 100644
    --- a/meta/lib/oe/prservice.py
    +++ b/meta/lib/oe/prservice.py
    @@ -7,7 +7,7 @@ def prserv_make_conn(d, check = False):
        host_params = list([_f for _f in (d.getVar("PRSERV_HOST") or '').split(':') if _f])
        try:
            conn = None
    -        conn = prserv.serv.PRServerConnection(host_params[0], int(host_params[1]))
    +        conn = prserv.serv.connect(host_params[0], int(host_params[1]))
            if check:
                if not conn.ping():
                    raise Exception('service not available')

I'm also currently working on a follow-up patch which will add a read-only
mode to prserv.

Let me know if there are any other questions :)

Paul Barker (11):
  hashserv: Use generic ConnectionError
  asyncrpc: Common implementation of RPC using json & asyncio
  hashserv: Refactor to use asyncrpc
  prserv: Drop obsolete python version check
  asyncrpc: Add ping method
  prserv: Use multiprocessing to auto start prserver
  prserv: Extract daemonization from PRServer class
  prserv: Handle requests in main thread
  prserv: Drop unused methods
  prserv: Replace XML RPC with modern asyncrpc implementation
  prserv: Add connect function

 lib/bb/asyncrpc/__init__.py |  31 ++
 lib/bb/asyncrpc/client.py   | 150 ++++++++++
 lib/bb/asyncrpc/serv.py     | 223 +++++++++++++++
 lib/bb/siggen.py            |   6 +-
 lib/hashserv/client.py      | 147 ++--------
 lib/hashserv/server.py      | 210 ++------------
 lib/hashserv/tests.py       |   3 +-
 lib/prserv/serv.py          | 550 ++++++++++++++----------------------
 8 files changed, 663 insertions(+), 657 deletions(-)
 create mode 100644 lib/bb/asyncrpc/__init__.py
 create mode 100644 lib/bb/asyncrpc/client.py
 create mode 100644 lib/bb/asyncrpc/serv.py

-- 
2.26.2


^ permalink raw reply	[flat|nested] 14+ messages in thread

* [RFC PATCH 01/11] hashserv: Use generic ConnectionError
  2021-04-12 11:41 [RFC PATCH 00/11] Modernise prserv Paul Barker
@ 2021-04-12 11:41 ` Paul Barker
  2021-04-12 11:41 ` [RFC PATCH 02/11] asyncrpc: Common implementation of RPC using json & asyncio Paul Barker
                   ` (10 subsequent siblings)
  11 siblings, 0 replies; 14+ messages in thread
From: Paul Barker @ 2021-04-12 11:41 UTC (permalink / raw)
  To: bitbake-devel, Richard Purdie, Joshua Watt; +Cc: Paul Barker

The Python built-in ConnectionError type can be used instead of a custom
HashConnectionError type. This will make code refactoring simpler.

Signed-off-by: Paul Barker <pbarker@konsulko.com>
---
 lib/bb/siggen.py       |  6 +++---
 lib/hashserv/client.py | 20 ++++++++------------
 lib/hashserv/tests.py  |  3 +--
 3 files changed, 12 insertions(+), 17 deletions(-)

diff --git a/lib/bb/siggen.py b/lib/bb/siggen.py
index 0d88c6ec6..f3fa3000f 100644
--- a/lib/bb/siggen.py
+++ b/lib/bb/siggen.py
@@ -542,7 +542,7 @@ class SignatureGeneratorUniHashMixIn(object):
                 hashequiv_logger.debug((1, 2)[unihash == taskhash], 'Found unihash %s in place of %s for %s from %s' % (unihash, taskhash, tid, self.server))
             else:
                 hashequiv_logger.debug2('No reported unihash for %s:%s from %s' % (tid, taskhash, self.server))
-        except hashserv.client.HashConnectionError as e:
+        except ConnectionError as e:
             bb.warn('Error contacting Hash Equivalence Server %s: %s' % (self.server, str(e)))
 
         self.set_unihash(tid, unihash)
@@ -621,7 +621,7 @@ class SignatureGeneratorUniHashMixIn(object):
                     d.setVar('BB_UNIHASH', new_unihash)
                 else:
                     hashequiv_logger.debug('Reported task %s as unihash %s to %s' % (taskhash, unihash, self.server))
-            except hashserv.client.HashConnectionError as e:
+            except ConnectionError as e:
                 bb.warn('Error contacting Hash Equivalence Server %s: %s' % (self.server, str(e)))
         finally:
             if sigfile:
@@ -661,7 +661,7 @@ class SignatureGeneratorUniHashMixIn(object):
                 # TODO: What to do here?
                 hashequiv_logger.verbose('Task %s unihash reported as unwanted hash %s' % (tid, finalunihash))
 
-        except hashserv.client.HashConnectionError as e:
+        except ConnectionError as e:
             bb.warn('Error contacting Hash Equivalence Server %s: %s' % (self.server, str(e)))
 
         return False
diff --git a/lib/hashserv/client.py b/lib/hashserv/client.py
index e05c1eb56..f370cba63 100644
--- a/lib/hashserv/client.py
+++ b/lib/hashserv/client.py
@@ -14,10 +14,6 @@ from . import chunkify, DEFAULT_MAX_CHUNK, create_async_client
 logger = logging.getLogger("hashserv.client")
 
 
-class HashConnectionError(Exception):
-    pass
-
-
 class AsyncClient(object):
     MODE_NORMAL = 0
     MODE_GET_STREAM = 1
@@ -66,14 +62,14 @@ class AsyncClient(object):
                 return await proc()
             except (
                 OSError,
-                HashConnectionError,
+                ConnectionError,
                 json.JSONDecodeError,
                 UnicodeDecodeError,
             ) as e:
                 logger.warning("Error talking to server: %s" % e)
                 if count >= 3:
-                    if not isinstance(e, HashConnectionError):
-                        raise HashConnectionError(str(e))
+                    if not isinstance(e, ConnectionError):
+                        raise ConnectionError(str(e))
                     raise e
                 await self.close()
                 count += 1
@@ -82,12 +78,12 @@ class AsyncClient(object):
         async def get_line():
             line = await self.reader.readline()
             if not line:
-                raise HashConnectionError("Connection closed")
+                raise ConnectionError("Connection closed")
 
             line = line.decode("utf-8")
 
             if not line.endswith("\n"):
-                raise HashConnectionError("Bad message %r" % message)
+                raise ConnectionError("Bad message %r" % message)
 
             return line
 
@@ -119,7 +115,7 @@ class AsyncClient(object):
             await self.writer.drain()
             l = await self.reader.readline()
             if not l:
-                raise HashConnectionError("Connection closed")
+                raise ConnectionError("Connection closed")
             return l.decode("utf-8").rstrip()
 
         return await self._send_wrapper(proc)
@@ -128,11 +124,11 @@ class AsyncClient(object):
         if new_mode == self.MODE_NORMAL and self.mode == self.MODE_GET_STREAM:
             r = await self.send_stream("END")
             if r != "ok":
-                raise HashConnectionError("Bad response from server %r" % r)
+                raise ConnectionError("Bad response from server %r" % r)
         elif new_mode == self.MODE_GET_STREAM and self.mode == self.MODE_NORMAL:
             r = await self.send_message({"get-stream": None})
             if r != "ok":
-                raise HashConnectionError("Bad response from server %r" % r)
+                raise ConnectionError("Bad response from server %r" % r)
         elif new_mode != self.mode:
             raise Exception(
                 "Undefined mode transition %r -> %r" % (self.mode, new_mode)
diff --git a/lib/hashserv/tests.py b/lib/hashserv/tests.py
index 1a696481e..e2b762dbf 100644
--- a/lib/hashserv/tests.py
+++ b/lib/hashserv/tests.py
@@ -6,7 +6,6 @@
 #
 
 from . import create_server, create_client
-from .client import HashConnectionError
 import hashlib
 import logging
 import multiprocessing
@@ -277,7 +276,7 @@ class HashEquivalenceCommonTests(object):
         outhash2 = '3c979c3db45c569f51ab7626a4651074be3a9d11a84b1db076f5b14f7d39db44'
         unihash2 = '90e9bc1d1f094c51824adca7f8ea79a048d68824'
 
-        with self.assertRaises(HashConnectionError):
+        with self.assertRaises(ConnectionError):
             ro_client.report_unihash(taskhash2, self.METHOD, outhash2, unihash2)
 
         # Ensure that the database was not modified
-- 
2.26.2


^ permalink raw reply related	[flat|nested] 14+ messages in thread

* [RFC PATCH 02/11] asyncrpc: Common implementation of RPC using json & asyncio
  2021-04-12 11:41 [RFC PATCH 00/11] Modernise prserv Paul Barker
  2021-04-12 11:41 ` [RFC PATCH 01/11] hashserv: Use generic ConnectionError Paul Barker
@ 2021-04-12 11:41 ` Paul Barker
  2021-04-12 11:41 ` [RFC PATCH 03/11] hashserv: Refactor to use asyncrpc Paul Barker
                   ` (9 subsequent siblings)
  11 siblings, 0 replies; 14+ messages in thread
From: Paul Barker @ 2021-04-12 11:41 UTC (permalink / raw)
  To: bitbake-devel, Richard Purdie, Joshua Watt; +Cc: Paul Barker

The hashserv module implements a flexible RPC mechanism based on sending
json formatted messages over unix or tcp sockets and uses Python's
asyncio features to build an efficient message loop on both the client
and server side. Much of this implementation is not specific to the
hash equivalency service and can be extracted into a new module for
easy re-use elsewhere in bitbake.

Signed-off-by: Paul Barker <pbarker@konsulko.com>
---
 lib/bb/asyncrpc/__init__.py |  31 +++++
 lib/bb/asyncrpc/client.py   | 145 ++++++++++++++++++++++++
 lib/bb/asyncrpc/serv.py     | 218 ++++++++++++++++++++++++++++++++++++
 3 files changed, 394 insertions(+)
 create mode 100644 lib/bb/asyncrpc/__init__.py
 create mode 100644 lib/bb/asyncrpc/client.py
 create mode 100644 lib/bb/asyncrpc/serv.py

diff --git a/lib/bb/asyncrpc/__init__.py b/lib/bb/asyncrpc/__init__.py
new file mode 100644
index 000000000..b2bec31ab
--- /dev/null
+++ b/lib/bb/asyncrpc/__init__.py
@@ -0,0 +1,31 @@
+#
+# SPDX-License-Identifier: GPL-2.0-only
+#
+
+import itertools
+import json
+
+# The Python async server defaults to a 64K receive buffer, so we hardcode our
+# maximum chunk size. It would be better if the client and server reported to
+# each other what the maximum chunk sizes were, but that will slow down the
+# connection setup with a round trip delay so I'd rather not do that unless it
+# is necessary
+DEFAULT_MAX_CHUNK = 32 * 1024
+
+
+def chunkify(msg, max_chunk):
+    if len(msg) < max_chunk - 1:
+        yield ''.join((msg, "\n"))
+    else:
+        yield ''.join((json.dumps({
+                'chunk-stream': None
+            }), "\n"))
+
+        args = [iter(msg)] * (max_chunk - 1)
+        for m in map(''.join, itertools.zip_longest(*args, fillvalue='')):
+            yield ''.join(itertools.chain(m, "\n"))
+        yield "\n"
+
+
+from .client import AsyncClient, Client
+from .serv import AsyncServer, AsyncServerConnection
diff --git a/lib/bb/asyncrpc/client.py b/lib/bb/asyncrpc/client.py
new file mode 100644
index 000000000..4cdad9ac3
--- /dev/null
+++ b/lib/bb/asyncrpc/client.py
@@ -0,0 +1,145 @@
+#
+# SPDX-License-Identifier: GPL-2.0-only
+#
+
+import abc
+import asyncio
+import json
+import os
+import socket
+from . import chunkify, DEFAULT_MAX_CHUNK
+
+
+class AsyncClient(object):
+    def __init__(self, proto_name, proto_version, logger):
+        self.reader = None
+        self.writer = None
+        self.max_chunk = DEFAULT_MAX_CHUNK
+        self.proto_name = proto_name
+        self.proto_version = proto_version
+        self.logger = logger
+
+    async def connect_tcp(self, address, port):
+        async def connect_sock():
+            return await asyncio.open_connection(address, port)
+
+        self._connect_sock = connect_sock
+
+    async def connect_unix(self, path):
+        async def connect_sock():
+            return await asyncio.open_unix_connection(path)
+
+        self._connect_sock = connect_sock
+
+    async def setup_connection(self):
+        s = '%s %s\n\n' % (self.proto_name, self.proto_version)
+        self.writer.write(s.encode("utf-8"))
+        await self.writer.drain()
+
+    async def connect(self):
+        if self.reader is None or self.writer is None:
+            (self.reader, self.writer) = await self._connect_sock()
+            await self.setup_connection()
+
+    async def close(self):
+        self.reader = None
+
+        if self.writer is not None:
+            self.writer.close()
+            self.writer = None
+
+    async def _send_wrapper(self, proc):
+        count = 0
+        while True:
+            try:
+                await self.connect()
+                return await proc()
+            except (
+                OSError,
+                ConnectionError,
+                json.JSONDecodeError,
+                UnicodeDecodeError,
+            ) as e:
+                self.logger.warning("Error talking to server: %s" % e)
+                if count >= 3:
+                    if not isinstance(e, ConnectionError):
+                        raise ConnectionError(str(e))
+                    raise e
+                await self.close()
+                count += 1
+
+    async def send_message(self, msg):
+        async def get_line():
+            line = await self.reader.readline()
+            if not line:
+                raise ConnectionError("Connection closed")
+
+            line = line.decode("utf-8")
+
+            if not line.endswith("\n"):
+                raise ConnectionError("Bad message %r" % msg)
+
+            return line
+
+        async def proc():
+            for c in chunkify(json.dumps(msg), self.max_chunk):
+                self.writer.write(c.encode("utf-8"))
+            await self.writer.drain()
+
+            l = await get_line()
+
+            m = json.loads(l)
+            if m and "chunk-stream" in m:
+                lines = []
+                while True:
+                    l = (await get_line()).rstrip("\n")
+                    if not l:
+                        break
+                    lines.append(l)
+
+                m = json.loads("".join(lines))
+
+            return m
+
+        return await self._send_wrapper(proc)
+
+
+class Client(object):
+    def __init__(self):
+        self.client = self._get_async_client()
+        self.loop = asyncio.new_event_loop()
+
+        self._add_methods('connect_tcp', 'close')
+
+    @abc.abstractmethod
+    def _get_async_client(self):
+        pass
+
+    def _get_downcall_wrapper(self, downcall):
+        def wrapper(*args, **kwargs):
+            return self.loop.run_until_complete(downcall(*args, **kwargs))
+
+        return wrapper
+
+    def _add_methods(self, *methods):
+        for m in methods:
+            downcall = getattr(self.client, m)
+            setattr(self, m, self._get_downcall_wrapper(downcall))
+
+    def connect_unix(self, path):
+        # AF_UNIX has path length issues so chdir here to workaround
+        cwd = os.getcwd()
+        try:
+            os.chdir(os.path.dirname(path))
+            self.loop.run_until_complete(self.client.connect_unix(os.path.basename(path)))
+            self.loop.run_until_complete(self.client.connect())
+        finally:
+            os.chdir(cwd)
+
+    @property
+    def max_chunk(self):
+        return self.client.max_chunk
+
+    @max_chunk.setter
+    def max_chunk(self, value):
+        self.client.max_chunk = value
diff --git a/lib/bb/asyncrpc/serv.py b/lib/bb/asyncrpc/serv.py
new file mode 100644
index 000000000..cb3384639
--- /dev/null
+++ b/lib/bb/asyncrpc/serv.py
@@ -0,0 +1,218 @@
+#
+# SPDX-License-Identifier: GPL-2.0-only
+#
+
+import abc
+import asyncio
+import json
+import os
+import signal
+import socket
+import sys
+from . import chunkify, DEFAULT_MAX_CHUNK
+
+
+class ClientError(Exception):
+    pass
+
+
+class ServerError(Exception):
+    pass
+
+
+class AsyncServerConnection(object):
+    def __init__(self, reader, writer, proto_name, logger):
+        self.reader = reader
+        self.writer = writer
+        self.proto_name = proto_name
+        self.max_chunk = DEFAULT_MAX_CHUNK
+        self.handlers = {
+            'chunk-stream': self.handle_chunk,
+        }
+        self.logger = logger
+
+    async def process_requests(self):
+        try:
+            self.addr = self.writer.get_extra_info('peername')
+            self.logger.debug('Client %r connected' % (self.addr,))
+
+            # Read protocol and version
+            client_protocol = await self.reader.readline()
+            if client_protocol is None:
+                return
+
+            (client_proto_name, client_proto_version) = client_protocol.decode('utf-8').rstrip().split()
+            if client_proto_name != self.proto_name:
+                self.logger.debug('Rejecting invalid protocol %s' % (self.proto_name))
+                return
+
+            self.proto_version = tuple(int(v) for v in client_proto_version.split('.'))
+            if not self.validate_proto_version():
+                self.logger.debug('Rejecting invalid protocol version %s' % (client_proto_version))
+                return
+
+            # Read headers. Currently, no headers are implemented, so look for
+            # an empty line to signal the end of the headers
+            while True:
+                line = await self.reader.readline()
+                if line is None:
+                    return
+
+                line = line.decode('utf-8').rstrip()
+                if not line:
+                    break
+
+            # Handle messages
+            while True:
+                d = await self.read_message()
+                if d is None:
+                    break
+                await self.dispatch_message(d)
+                await self.writer.drain()
+        except ClientError as e:
+            self.logger.error(str(e))
+        finally:
+            self.writer.close()
+
+    async def dispatch_message(self, msg):
+        for k in self.handlers.keys():
+            if k in msg:
+                self.logger.debug('Handling %s' % k)
+                await self.handlers[k](msg[k])
+                return
+
+        raise ClientError("Unrecognized command %r" % msg)
+
+    def write_message(self, msg):
+        for c in chunkify(json.dumps(msg), self.max_chunk):
+            self.writer.write(c.encode('utf-8'))
+
+    async def read_message(self):
+        l = await self.reader.readline()
+        if not l:
+            return None
+
+        try:
+            message = l.decode('utf-8')
+
+            if not message.endswith('\n'):
+                return None
+
+            return json.loads(message)
+        except (json.JSONDecodeError, UnicodeDecodeError) as e:
+            self.logger.error('Bad message from client: %r' % message)
+            raise e
+
+    async def handle_chunk(self, request):
+        lines = []
+        try:
+            while True:
+                l = await self.reader.readline()
+                l = l.rstrip(b"\n").decode("utf-8")
+                if not l:
+                    break
+                lines.append(l)
+
+            msg = json.loads(''.join(lines))
+        except (json.JSONDecodeError, UnicodeDecodeError) as e:
+            self.logger.error('Bad message from client: %r' % lines)
+            raise e
+
+        if 'chunk-stream' in msg:
+            raise ClientError("Nested chunks are not allowed")
+
+        await self.dispatch_message(msg)
+
+
+class AsyncServer(object):
+    def __init__(self, logger, loop=None):
+        if loop is None:
+            self.loop = asyncio.new_event_loop()
+            self.close_loop = True
+        else:
+            self.loop = loop
+            self.close_loop = False
+
+        self._cleanup_socket = None
+        self.logger = logger
+
+    def start_tcp_server(self, host, port):
+        self.server = self.loop.run_until_complete(
+            asyncio.start_server(self.handle_client, host, port, loop=self.loop)
+        )
+
+        for s in self.server.sockets:
+            self.logger.info('Listening on %r' % (s.getsockname(),))
+            # Newer python does this automatically. Do it manually here for
+            # maximum compatibility
+            s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
+            s.setsockopt(socket.SOL_TCP, socket.TCP_QUICKACK, 1)
+
+        name = self.server.sockets[0].getsockname()
+        if self.server.sockets[0].family == socket.AF_INET6:
+            self.address = "[%s]:%d" % (name[0], name[1])
+        else:
+            self.address = "%s:%d" % (name[0], name[1])
+
+    def start_unix_server(self, path):
+        def cleanup():
+            os.unlink(path)
+
+        cwd = os.getcwd()
+        try:
+            # Work around path length limits in AF_UNIX
+            os.chdir(os.path.dirname(path))
+            self.server = self.loop.run_until_complete(
+                asyncio.start_unix_server(self.handle_client, os.path.basename(path), loop=self.loop)
+            )
+        finally:
+            os.chdir(cwd)
+
+        self.logger.info('Listening on %r' % path)
+
+        self._cleanup_socket = cleanup
+        self.address = "unix://%s" % os.path.abspath(path)
+
+    @abc.abstractmethod
+    def accept_client(self, reader, writer):
+        pass
+
+    async def handle_client(self, reader, writer):
+        # writer.transport.set_write_buffer_limits(0)
+        try:
+            client = self.accept_client(reader, writer)
+            await client.process_requests()
+        except Exception as e:
+            import traceback
+            self.logger.error('Error from client: %s' % str(e), exc_info=True)
+            traceback.print_exc()
+            writer.close()
+        self.logger.info('Client disconnected')
+
+    def run_loop_forever(self):
+        try:
+            self.loop.run_forever()
+        except KeyboardInterrupt:
+            pass
+
+    def signal_handler(self):
+        self.loop.stop()
+
+    def serve_forever(self):
+        asyncio.set_event_loop(self.loop)
+        try:
+            self.loop.add_signal_handler(signal.SIGTERM, self.signal_handler)
+
+            self.run_loop_forever()
+            self.server.close()
+
+            self.loop.run_until_complete(self.server.wait_closed())
+            self.logger.info('Server shutting down')
+        finally:
+            if self.close_loop:
+                if sys.version_info >= (3, 6):
+                    self.loop.run_until_complete(self.loop.shutdown_asyncgens())
+                self.loop.close()
+
+            if self._cleanup_socket is not None:
+                self._cleanup_socket()
-- 
2.26.2


^ permalink raw reply related	[flat|nested] 14+ messages in thread

* [RFC PATCH 03/11] hashserv: Refactor to use asyncrpc
  2021-04-12 11:41 [RFC PATCH 00/11] Modernise prserv Paul Barker
  2021-04-12 11:41 ` [RFC PATCH 01/11] hashserv: Use generic ConnectionError Paul Barker
  2021-04-12 11:41 ` [RFC PATCH 02/11] asyncrpc: Common implementation of RPC using json & asyncio Paul Barker
@ 2021-04-12 11:41 ` Paul Barker
  2021-04-12 11:41 ` [RFC PATCH 04/11] prserv: Drop obsolete python version check Paul Barker
                   ` (8 subsequent siblings)
  11 siblings, 0 replies; 14+ messages in thread
From: Paul Barker @ 2021-04-12 11:41 UTC (permalink / raw)
  To: bitbake-devel, Richard Purdie, Joshua Watt; +Cc: Paul Barker

The asyncrpc module can now be used to provide the json & asyncio based
RPC system used by hashserv.

Signed-off-by: Paul Barker <pbarker@konsulko.com>
---
 lib/hashserv/client.py | 137 ++++-----------------------
 lib/hashserv/server.py | 210 +++++------------------------------------
 2 files changed, 41 insertions(+), 306 deletions(-)

diff --git a/lib/hashserv/client.py b/lib/hashserv/client.py
index f370cba63..531170967 100644
--- a/lib/hashserv/client.py
+++ b/lib/hashserv/client.py
@@ -8,106 +8,26 @@ import json
 import logging
 import socket
 import os
-from . import chunkify, DEFAULT_MAX_CHUNK, create_async_client
+import bb.asyncrpc
+from . import create_async_client
 
 
 logger = logging.getLogger("hashserv.client")
 
 
-class AsyncClient(object):
+class AsyncClient(bb.asyncrpc.AsyncClient):
     MODE_NORMAL = 0
     MODE_GET_STREAM = 1
 
     def __init__(self):
-        self.reader = None
-        self.writer = None
+        super().__init__('OEHASHEQUIV', '1.1', logger)
         self.mode = self.MODE_NORMAL
-        self.max_chunk = DEFAULT_MAX_CHUNK
 
-    async def connect_tcp(self, address, port):
-        async def connect_sock():
-            return await asyncio.open_connection(address, port)
-
-        self._connect_sock = connect_sock
-
-    async def connect_unix(self, path):
-        async def connect_sock():
-            return await asyncio.open_unix_connection(path)
-
-        self._connect_sock = connect_sock
-
-    async def connect(self):
-        if self.reader is None or self.writer is None:
-            (self.reader, self.writer) = await self._connect_sock()
-
-            self.writer.write("OEHASHEQUIV 1.1\n\n".encode("utf-8"))
-            await self.writer.drain()
-
-            cur_mode = self.mode
-            self.mode = self.MODE_NORMAL
-            await self._set_mode(cur_mode)
-
-    async def close(self):
-        self.reader = None
-
-        if self.writer is not None:
-            self.writer.close()
-            self.writer = None
-
-    async def _send_wrapper(self, proc):
-        count = 0
-        while True:
-            try:
-                await self.connect()
-                return await proc()
-            except (
-                OSError,
-                ConnectionError,
-                json.JSONDecodeError,
-                UnicodeDecodeError,
-            ) as e:
-                logger.warning("Error talking to server: %s" % e)
-                if count >= 3:
-                    if not isinstance(e, ConnectionError):
-                        raise ConnectionError(str(e))
-                    raise e
-                await self.close()
-                count += 1
-
-    async def send_message(self, msg):
-        async def get_line():
-            line = await self.reader.readline()
-            if not line:
-                raise ConnectionError("Connection closed")
-
-            line = line.decode("utf-8")
-
-            if not line.endswith("\n"):
-                raise ConnectionError("Bad message %r" % message)
-
-            return line
-
-        async def proc():
-            for c in chunkify(json.dumps(msg), self.max_chunk):
-                self.writer.write(c.encode("utf-8"))
-            await self.writer.drain()
-
-            l = await get_line()
-
-            m = json.loads(l)
-            if m and "chunk-stream" in m:
-                lines = []
-                while True:
-                    l = (await get_line()).rstrip("\n")
-                    if not l:
-                        break
-                    lines.append(l)
-
-                m = json.loads("".join(lines))
-
-            return m
-
-        return await self._send_wrapper(proc)
+    async def setup_connection(self):
+        await super().setup_connection()
+        cur_mode = self.mode
+        self.mode = self.MODE_NORMAL
+        await self._set_mode(cur_mode)
 
     async def send_stream(self, msg):
         async def proc():
@@ -185,12 +105,10 @@ class AsyncClient(object):
         return (await self.send_message({"backfill-wait": None}))["tasks"]
 
 
-class Client(object):
+class Client(bb.asyncrpc.Client):
     def __init__(self):
-        self.client = AsyncClient()
-        self.loop = asyncio.new_event_loop()
-
-        for call in (
+        super().__init__()
+        self._add_methods(
             "connect_tcp",
             "close",
             "get_unihash",
@@ -200,30 +118,7 @@ class Client(object):
             "get_stats",
             "reset_stats",
             "backfill_wait",
-        ):
-            downcall = getattr(self.client, call)
-            setattr(self, call, self._get_downcall_wrapper(downcall))
-
-    def _get_downcall_wrapper(self, downcall):
-        def wrapper(*args, **kwargs):
-            return self.loop.run_until_complete(downcall(*args, **kwargs))
-
-        return wrapper
-
-    def connect_unix(self, path):
-        # AF_UNIX has path length issues so chdir here to workaround
-        cwd = os.getcwd()
-        try:
-            os.chdir(os.path.dirname(path))
-            self.loop.run_until_complete(self.client.connect_unix(os.path.basename(path)))
-            self.loop.run_until_complete(self.client.connect())
-        finally:
-            os.chdir(cwd)
-
-    @property
-    def max_chunk(self):
-        return self.client.max_chunk
-
-    @max_chunk.setter
-    def max_chunk(self, value):
-        self.client.max_chunk = value
+        )
+
+    def _get_async_client(self):
+        return AsyncClient()
diff --git a/lib/hashserv/server.py b/lib/hashserv/server.py
index a0dc0c170..c941c0e9d 100644
--- a/lib/hashserv/server.py
+++ b/lib/hashserv/server.py
@@ -14,7 +14,9 @@ import signal
 import socket
 import sys
 import time
-from . import chunkify, DEFAULT_MAX_CHUNK, create_async_client, TABLE_COLUMNS
+from . import create_async_client, TABLE_COLUMNS
+import bb.asyncrpc
+
 
 logger = logging.getLogger('hashserv.server')
 
@@ -109,12 +111,6 @@ class Stats(object):
         return {k: getattr(self, k) for k in ('num', 'total_time', 'max_time', 'average', 'stdev')}
 
 
-class ClientError(Exception):
-    pass
-
-class ServerError(Exception):
-    pass
-
 def insert_task(cursor, data, ignore=False):
     keys = sorted(data.keys())
     query = '''INSERT%s INTO tasks_v2 (%s) VALUES (%s)''' % (
@@ -149,7 +145,7 @@ async def copy_outhash_from_upstream(client, db, method, outhash, taskhash):
 
     return d
 
-class ServerClient(object):
+class ServerClient(bb.asyncrpc.AsyncServerConnection):
     FAST_QUERY = 'SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1'
     ALL_QUERY =  'SELECT *                         FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1'
     OUTHASH_QUERY = '''
@@ -168,21 +164,19 @@ class ServerClient(object):
         '''
 
     def __init__(self, reader, writer, db, request_stats, backfill_queue, upstream, read_only):
-        self.reader = reader
-        self.writer = writer
+        super().__init__(reader, writer, 'OEHASHEQUIV', logger)
         self.db = db
         self.request_stats = request_stats
-        self.max_chunk = DEFAULT_MAX_CHUNK
+        self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK
         self.backfill_queue = backfill_queue
         self.upstream = upstream
 
-        self.handlers = {
+        self.handlers.update({
             'get': self.handle_get,
             'get-outhash': self.handle_get_outhash,
             'get-stream': self.handle_get_stream,
             'get-stats': self.handle_get_stats,
-            'chunk-stream': self.handle_chunk,
-        }
+        })
 
         if not read_only:
             self.handlers.update({
@@ -192,56 +186,19 @@ class ServerClient(object):
                 'backfill-wait': self.handle_backfill_wait,
             })
 
+    def validate_proto_version(self):
+        return (self.proto_version > (1, 0) and self.proto_version <= (1, 1))
+
     async def process_requests(self):
         if self.upstream is not None:
             self.upstream_client = await create_async_client(self.upstream)
         else:
             self.upstream_client = None
 
-        try:
-
-
-            self.addr = self.writer.get_extra_info('peername')
-            logger.debug('Client %r connected' % (self.addr,))
-
-            # Read protocol and version
-            protocol = await self.reader.readline()
-            if protocol is None:
-                return
-
-            (proto_name, proto_version) = protocol.decode('utf-8').rstrip().split()
-            if proto_name != 'OEHASHEQUIV':
-                return
-
-            proto_version = tuple(int(v) for v in proto_version.split('.'))
-            if proto_version < (1, 0) or proto_version > (1, 1):
-                return
-
-            # Read headers. Currently, no headers are implemented, so look for
-            # an empty line to signal the end of the headers
-            while True:
-                line = await self.reader.readline()
-                if line is None:
-                    return
+        await super().process_requests()
 
-                line = line.decode('utf-8').rstrip()
-                if not line:
-                    break
-
-            # Handle messages
-            while True:
-                d = await self.read_message()
-                if d is None:
-                    break
-                await self.dispatch_message(d)
-                await self.writer.drain()
-        except ClientError as e:
-            logger.error(str(e))
-        finally:
-            if self.upstream_client is not None:
-                await self.upstream_client.close()
-
-            self.writer.close()
+        if self.upstream_client is not None:
+            await self.upstream_client.close()
 
     async def dispatch_message(self, msg):
         for k in self.handlers.keys():
@@ -255,47 +212,7 @@ class ServerClient(object):
                         await self.handlers[k](msg[k])
                 return
 
-        raise ClientError("Unrecognized command %r" % msg)
-
-    def write_message(self, msg):
-        for c in chunkify(json.dumps(msg), self.max_chunk):
-            self.writer.write(c.encode('utf-8'))
-
-    async def read_message(self):
-        l = await self.reader.readline()
-        if not l:
-            return None
-
-        try:
-            message = l.decode('utf-8')
-
-            if not message.endswith('\n'):
-                return None
-
-            return json.loads(message)
-        except (json.JSONDecodeError, UnicodeDecodeError) as e:
-            logger.error('Bad message from client: %r' % message)
-            raise e
-
-    async def handle_chunk(self, request):
-        lines = []
-        try:
-            while True:
-                l = await self.reader.readline()
-                l = l.rstrip(b"\n").decode("utf-8")
-                if not l:
-                    break
-                lines.append(l)
-
-            msg = json.loads(''.join(lines))
-        except (json.JSONDecodeError, UnicodeDecodeError) as e:
-            logger.error('Bad message from client: %r' % message)
-            raise e
-
-        if 'chunk-stream' in msg:
-            raise ClientError("Nested chunks are not allowed")
-
-        await self.dispatch_message(msg)
+        raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg)
 
     async def handle_get(self, request):
         method = request['method']
@@ -499,74 +416,20 @@ class ServerClient(object):
             cursor.close()
 
 
-class Server(object):
+class Server(bb.asyncrpc.AsyncServer):
     def __init__(self, db, loop=None, upstream=None, read_only=False):
         if upstream and read_only:
-            raise ServerError("Read-only hashserv cannot pull from an upstream server")
+            raise bb.asyncrpc.ServerError("Read-only hashserv cannot pull from an upstream server")
+
+        super().__init__(logger, loop)
 
         self.request_stats = Stats()
         self.db = db
-
-        if loop is None:
-            self.loop = asyncio.new_event_loop()
-            self.close_loop = True
-        else:
-            self.loop = loop
-            self.close_loop = False
-
         self.upstream = upstream
         self.read_only = read_only
 
-        self._cleanup_socket = None
-
-    def start_tcp_server(self, host, port):
-        self.server = self.loop.run_until_complete(
-            asyncio.start_server(self.handle_client, host, port, loop=self.loop)
-        )
-
-        for s in self.server.sockets:
-            logger.info('Listening on %r' % (s.getsockname(),))
-            # Newer python does this automatically. Do it manually here for
-            # maximum compatibility
-            s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
-            s.setsockopt(socket.SOL_TCP, socket.TCP_QUICKACK, 1)
-
-        name = self.server.sockets[0].getsockname()
-        if self.server.sockets[0].family == socket.AF_INET6:
-            self.address = "[%s]:%d" % (name[0], name[1])
-        else:
-            self.address = "%s:%d" % (name[0], name[1])
-
-    def start_unix_server(self, path):
-        def cleanup():
-            os.unlink(path)
-
-        cwd = os.getcwd()
-        try:
-            # Work around path length limits in AF_UNIX
-            os.chdir(os.path.dirname(path))
-            self.server = self.loop.run_until_complete(
-                asyncio.start_unix_server(self.handle_client, os.path.basename(path), loop=self.loop)
-            )
-        finally:
-            os.chdir(cwd)
-
-        logger.info('Listening on %r' % path)
-
-        self._cleanup_socket = cleanup
-        self.address = "unix://%s" % os.path.abspath(path)
-
-    async def handle_client(self, reader, writer):
-        # writer.transport.set_write_buffer_limits(0)
-        try:
-            client = ServerClient(reader, writer, self.db, self.request_stats, self.backfill_queue, self.upstream, self.read_only)
-            await client.process_requests()
-        except Exception as e:
-            import traceback
-            logger.error('Error from client: %s' % str(e), exc_info=True)
-            traceback.print_exc()
-            writer.close()
-        logger.info('Client disconnected')
+    def accept_client(self, reader, writer):
+        return ServerClient(reader, writer, self.db, self.request_stats, self.backfill_queue, self.upstream, self.read_only)
 
     @contextmanager
     def _backfill_worker(self):
@@ -597,31 +460,8 @@ class Server(object):
         else:
             yield
 
-    def serve_forever(self):
-        def signal_handler():
-            self.loop.stop()
-
-        asyncio.set_event_loop(self.loop)
-        try:
-            self.backfill_queue = asyncio.Queue()
-
-            self.loop.add_signal_handler(signal.SIGTERM, signal_handler)
-
-            with self._backfill_worker():
-                try:
-                    self.loop.run_forever()
-                except KeyboardInterrupt:
-                    pass
-
-                self.server.close()
-
-            self.loop.run_until_complete(self.server.wait_closed())
-            logger.info('Server shutting down')
-        finally:
-            if self.close_loop:
-                if sys.version_info >= (3, 6):
-                    self.loop.run_until_complete(self.loop.shutdown_asyncgens())
-                self.loop.close()
+    def run_loop_forever(self):
+        self.backfill_queue = asyncio.Queue()
 
-            if self._cleanup_socket is not None:
-                self._cleanup_socket()
+        with self._backfill_worker():
+            super().run_loop_forever()
-- 
2.26.2


^ permalink raw reply related	[flat|nested] 14+ messages in thread

* [RFC PATCH 04/11] prserv: Drop obsolete python version check
  2021-04-12 11:41 [RFC PATCH 00/11] Modernise prserv Paul Barker
                   ` (2 preceding siblings ...)
  2021-04-12 11:41 ` [RFC PATCH 03/11] hashserv: Refactor to use asyncrpc Paul Barker
@ 2021-04-12 11:41 ` Paul Barker
  2021-04-12 11:41 ` [RFC PATCH 05/11] asyncrpc: Add ping method Paul Barker
                   ` (7 subsequent siblings)
  11 siblings, 0 replies; 14+ messages in thread
From: Paul Barker @ 2021-04-12 11:41 UTC (permalink / raw)
  To: bitbake-devel, Richard Purdie, Joshua Watt; +Cc: Paul Barker

Bitbake no longer supports Python 2 so this version check is obsolete.

Signed-off-by: Paul Barker <pbarker@konsulko.com>
---
 lib/prserv/serv.py | 4 ----
 1 file changed, 4 deletions(-)

diff --git a/lib/prserv/serv.py b/lib/prserv/serv.py
index 25dcf8a0e..1cfbba864 100644
--- a/lib/prserv/serv.py
+++ b/lib/prserv/serv.py
@@ -18,10 +18,6 @@ import select
 
 logger = logging.getLogger("BitBake.PRserv")
 
-if sys.hexversion < 0x020600F0:
-    print("Sorry, python 2.6 or later is required.")
-    sys.exit(1)
-
 class Handler(SimpleXMLRPCRequestHandler):
     def _dispatch(self,method,params):
         try:
-- 
2.26.2


^ permalink raw reply related	[flat|nested] 14+ messages in thread

* [RFC PATCH 05/11] asyncrpc: Add ping method
  2021-04-12 11:41 [RFC PATCH 00/11] Modernise prserv Paul Barker
                   ` (3 preceding siblings ...)
  2021-04-12 11:41 ` [RFC PATCH 04/11] prserv: Drop obsolete python version check Paul Barker
@ 2021-04-12 11:41 ` Paul Barker
  2021-04-12 11:41 ` [RFC PATCH 06/11] prserv: Use multiprocessing to auto start prserver Paul Barker
                   ` (6 subsequent siblings)
  11 siblings, 0 replies; 14+ messages in thread
From: Paul Barker @ 2021-04-12 11:41 UTC (permalink / raw)
  To: bitbake-devel, Richard Purdie, Joshua Watt; +Cc: Paul Barker

This method is needed to support startup of the prservice. As it is so
generic we can add it to the common asyncrpc module.

Signed-off-by: Paul Barker <pbarker@konsulko.com>
---
 lib/bb/asyncrpc/client.py | 7 ++++++-
 lib/bb/asyncrpc/serv.py   | 5 +++++
 2 files changed, 11 insertions(+), 1 deletion(-)

diff --git a/lib/bb/asyncrpc/client.py b/lib/bb/asyncrpc/client.py
index 4cdad9ac3..79919c5be 100644
--- a/lib/bb/asyncrpc/client.py
+++ b/lib/bb/asyncrpc/client.py
@@ -103,13 +103,18 @@ class AsyncClient(object):
 
         return await self._send_wrapper(proc)
 
+    async def ping(self):
+        return await self.send_message(
+            {'ping': {}}
+        )
+
 
 class Client(object):
     def __init__(self):
         self.client = self._get_async_client()
         self.loop = asyncio.new_event_loop()
 
-        self._add_methods('connect_tcp', 'close')
+        self._add_methods('connect_tcp', 'close', 'ping')
 
     @abc.abstractmethod
     def _get_async_client(self):
diff --git a/lib/bb/asyncrpc/serv.py b/lib/bb/asyncrpc/serv.py
index cb3384639..fd91aa71a 100644
--- a/lib/bb/asyncrpc/serv.py
+++ b/lib/bb/asyncrpc/serv.py
@@ -28,6 +28,7 @@ class AsyncServerConnection(object):
         self.max_chunk = DEFAULT_MAX_CHUNK
         self.handlers = {
             'chunk-stream': self.handle_chunk,
+            'ping': self.handle_ping,
         }
         self.logger = logger
 
@@ -123,6 +124,10 @@ class AsyncServerConnection(object):
 
         await self.dispatch_message(msg)
 
+    async def handle_ping(self, request):
+        response = {'alive': True}
+        self.write_message(response)
+
 
 class AsyncServer(object):
     def __init__(self, logger, loop=None):
-- 
2.26.2


^ permalink raw reply related	[flat|nested] 14+ messages in thread

* [RFC PATCH 06/11] prserv: Use multiprocessing to auto start prserver
  2021-04-12 11:41 [RFC PATCH 00/11] Modernise prserv Paul Barker
                   ` (4 preceding siblings ...)
  2021-04-12 11:41 ` [RFC PATCH 05/11] asyncrpc: Add ping method Paul Barker
@ 2021-04-12 11:41 ` Paul Barker
  2021-04-12 11:41 ` [RFC PATCH 07/11] prserv: Extract daemonization from PRServer class Paul Barker
                   ` (5 subsequent siblings)
  11 siblings, 0 replies; 14+ messages in thread
From: Paul Barker @ 2021-04-12 11:41 UTC (permalink / raw)
  To: bitbake-devel, Richard Purdie, Joshua Watt; +Cc: Paul Barker

We can use the modern multiprocessing support in Python instead of
manually using fork to start the prserver process. To do this we need
to set up the signal handlers for the prserver process in the
work_forever function (which is now used as the main function for this
process).

The old code to start the prserver process using fork is not removed in
this commit as it is tightly intertwined with the daemonization code
which will be refactored in a following commit.

Signed-off-by: Paul Barker <pbarker@konsulko.com>
---
 lib/prserv/serv.py | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)

diff --git a/lib/prserv/serv.py b/lib/prserv/serv.py
index 1cfbba864..d58b859d8 100644
--- a/lib/prserv/serv.py
+++ b/lib/prserv/serv.py
@@ -15,6 +15,7 @@ import prserv
 import prserv.db
 import errno
 import select
+import multiprocessing
 
 logger = logging.getLogger("BitBake.PRserv")
 
@@ -170,6 +171,9 @@ class PRServer(SimpleXMLRPCServer):
         # if there is data there.
         self.timeout = 0.01
 
+        signal.signal(signal.SIGINT, self.sigint_handler)
+        signal.signal(signal.SIGTERM, self.sigterm_handler)
+
         bb.utils.set_process_name("PRServ")
 
         # DB connection must be created after all forks
@@ -249,8 +253,6 @@ class PRServer(SimpleXMLRPCServer):
         os._exit(0)
 
     def cleanup_handles(self):
-        signal.signal(signal.SIGINT, self.sigint_handler)
-        signal.signal(signal.SIGTERM, self.sigterm_handler)
         os.chdir("/")
 
         sys.stdout.flush()
@@ -304,8 +306,10 @@ class PRServSingleton(object):
         self.port = None
 
     def start(self):
-        self.prserv = PRServer(self.dbfile, self.logfile, self.interface, daemon=False)
-        self.prserv.start()
+        self.prserv = PRServer(self.dbfile, self.logfile, self.interface)
+        self.process = multiprocessing.Process(target=self.prserv.work_forever)
+        self.process.start()
+
         self.host, self.port = self.prserv.getinfo()
 
     def getinfo(self):
-- 
2.26.2


^ permalink raw reply related	[flat|nested] 14+ messages in thread

* [RFC PATCH 07/11] prserv: Extract daemonization from PRServer class
  2021-04-12 11:41 [RFC PATCH 00/11] Modernise prserv Paul Barker
                   ` (5 preceding siblings ...)
  2021-04-12 11:41 ` [RFC PATCH 06/11] prserv: Use multiprocessing to auto start prserver Paul Barker
@ 2021-04-12 11:41 ` Paul Barker
  2021-04-12 11:41 ` [RFC PATCH 08/11] prserv: Handle requests in main thread Paul Barker
                   ` (4 subsequent siblings)
  11 siblings, 0 replies; 14+ messages in thread
From: Paul Barker @ 2021-04-12 11:41 UTC (permalink / raw)
  To: bitbake-devel, Richard Purdie, Joshua Watt; +Cc: Paul Barker

The code to start the prservice process as a daemon is extracted out of
the PRServer class and simplified. This makes the PRServer class easier
to modernise as it no longer needs to worry about process management.

Signed-off-by: Paul Barker <pbarker@konsulko.com>
---
 lib/prserv/serv.py | 175 +++++++++++++++++++--------------------------
 1 file changed, 72 insertions(+), 103 deletions(-)

diff --git a/lib/prserv/serv.py b/lib/prserv/serv.py
index d58b859d8..74bfdc1bb 100644
--- a/lib/prserv/serv.py
+++ b/lib/prserv/serv.py
@@ -34,7 +34,7 @@ singleton = None
 
 
 class PRServer(SimpleXMLRPCServer):
-    def __init__(self, dbfile, logfile, interface, daemon=True):
+    def __init__(self, dbfile, logfile, interface):
         ''' constructor '''
         try:
             SimpleXMLRPCServer.__init__(self, interface,
@@ -47,7 +47,6 @@ class PRServer(SimpleXMLRPCServer):
             raise PRServiceConfigError
 
         self.dbfile=dbfile
-        self.daemon=daemon
         self.logfile=logfile
         self.working_thread=None
         self.host, self.port = self.socket.getsockname()
@@ -197,106 +196,6 @@ class PRServer(SimpleXMLRPCServer):
         os.close(self.quitpipein)
         return
 
-    def start(self):
-        if self.daemon:
-            pid = self.daemonize()
-        else:
-            pid = self.fork()
-            self.pid = pid
-
-        # Ensure both the parent sees this and the child from the work_forever log entry above
-        logger.info("Started PRServer with DBfile: %s, IP: %s, PORT: %s, PID: %s" %
-                     (self.dbfile, self.host, self.port, str(pid)))
-
-    def delpid(self):
-        os.remove(self.pidfile)
-
-    def daemonize(self):
-        """
-        See Advanced Programming in the UNIX, Sec 13.3
-        """
-        try:
-            pid = os.fork()
-            if pid > 0:
-                os.waitpid(pid, 0)
-                #parent return instead of exit to give control 
-                return pid
-        except OSError as e:
-            raise Exception("%s [%d]" % (e.strerror, e.errno))
-
-        os.setsid()
-        """
-        fork again to make sure the daemon is not session leader, 
-        which prevents it from acquiring controlling terminal
-        """
-        try:
-            pid = os.fork()
-            if pid > 0: #parent
-                os._exit(0)
-        except OSError as e:
-            raise Exception("%s [%d]" % (e.strerror, e.errno))
-
-        self.cleanup_handles()
-        os._exit(0)
-
-    def fork(self):
-        try:
-            pid = os.fork()
-            if pid > 0:
-                self.socket.close() # avoid ResourceWarning in parent
-                return pid
-        except OSError as e:
-            raise Exception("%s [%d]" % (e.strerror, e.errno))
-
-        bb.utils.signal_on_parent_exit("SIGTERM")
-        self.cleanup_handles()
-        os._exit(0)
-
-    def cleanup_handles(self):
-        os.chdir("/")
-
-        sys.stdout.flush()
-        sys.stderr.flush()
-
-        # We could be called from a python thread with io.StringIO as
-        # stdout/stderr or it could be 'real' unix fd forking where we need
-        # to physically close the fds to prevent the program launching us from
-        # potentially hanging on a pipe. Handle both cases.
-        si = open('/dev/null', 'r')
-        try:
-            os.dup2(si.fileno(),sys.stdin.fileno())
-        except (AttributeError, io.UnsupportedOperation):
-            sys.stdin = si
-        so = open(self.logfile, 'a+')
-        try:
-            os.dup2(so.fileno(),sys.stdout.fileno())
-        except (AttributeError, io.UnsupportedOperation):
-            sys.stdout = so
-        try:
-            os.dup2(so.fileno(),sys.stderr.fileno())
-        except (AttributeError, io.UnsupportedOperation):
-            sys.stderr = so
-
-        # Clear out all log handlers prior to the fork() to avoid calling
-        # event handlers not part of the PRserver
-        for logger_iter in logging.Logger.manager.loggerDict.keys():
-            logging.getLogger(logger_iter).handlers = []
-
-        # Ensure logging makes it to the logfile
-        streamhandler = logging.StreamHandler()
-        streamhandler.setLevel(logging.DEBUG)
-        formatter = bb.msg.BBLogFormatter("%(levelname)s: %(message)s")
-        streamhandler.setFormatter(formatter)
-        logger.addHandler(streamhandler)
-
-        # write pidfile
-        pid = str(os.getpid())
-        with open(self.pidfile, 'w') as pf:
-            pf.write("%s\n" % pid)
-
-        self.work_forever()
-        self.delpid()
-
 class PRServSingleton(object):
     def __init__(self, dbfile, logfile, interface):
         self.dbfile = dbfile
@@ -348,6 +247,76 @@ class PRServerConnection(object):
     def getinfo(self):
         return self.host, self.port
 
+def run_as_daemon(func, pidfile, logfile):
+    """
+    See Advanced Programming in the UNIX, Sec 13.3
+    """
+    try:
+        pid = os.fork()
+        if pid > 0:
+            os.waitpid(pid, 0)
+            #parent return instead of exit to give control 
+            return pid
+    except OSError as e:
+        raise Exception("%s [%d]" % (e.strerror, e.errno))
+
+    os.setsid()
+    """
+    fork again to make sure the daemon is not session leader, 
+    which prevents it from acquiring controlling terminal
+    """
+    try:
+        pid = os.fork()
+        if pid > 0: #parent
+            os._exit(0)
+    except OSError as e:
+        raise Exception("%s [%d]" % (e.strerror, e.errno))
+
+    os.chdir("/")
+
+    sys.stdout.flush()
+    sys.stderr.flush()
+
+    # We could be called from a python thread with io.StringIO as
+    # stdout/stderr or it could be 'real' unix fd forking where we need
+    # to physically close the fds to prevent the program launching us from
+    # potentially hanging on a pipe. Handle both cases.
+    si = open('/dev/null', 'r')
+    try:
+        os.dup2(si.fileno(),sys.stdin.fileno())
+    except (AttributeError, io.UnsupportedOperation):
+        sys.stdin = si
+    so = open(logfile, 'a+')
+    try:
+        os.dup2(so.fileno(),sys.stdout.fileno())
+    except (AttributeError, io.UnsupportedOperation):
+        sys.stdout = so
+    try:
+        os.dup2(so.fileno(),sys.stderr.fileno())
+    except (AttributeError, io.UnsupportedOperation):
+        sys.stderr = so
+
+    # Clear out all log handlers prior to the fork() to avoid calling
+    # event handlers not part of the PRserver
+    for logger_iter in logging.Logger.manager.loggerDict.keys():
+        logging.getLogger(logger_iter).handlers = []
+
+    # Ensure logging makes it to the logfile
+    streamhandler = logging.StreamHandler()
+    streamhandler.setLevel(logging.DEBUG)
+    formatter = bb.msg.BBLogFormatter("%(levelname)s: %(message)s")
+    streamhandler.setFormatter(formatter)
+    logger.addHandler(streamhandler)
+
+    # write pidfile
+    pid = str(os.getpid())
+    with open(pidfile, 'w') as pf:
+        pf.write("%s\n" % pid)
+
+    func()
+    os.remove(pidfile)
+    os._exit(0)
+
 def start_daemon(dbfile, host, port, logfile):
     ip = socket.gethostbyname(host)
     pidfile = PIDPREFIX % (ip, port)
@@ -363,7 +332,7 @@ def start_daemon(dbfile, host, port, logfile):
         return 1
 
     server = PRServer(os.path.abspath(dbfile), os.path.abspath(logfile), (ip,port))
-    server.start()
+    run_as_daemon(server.work_forever, pidfile, os.path.abspath(logfile))
 
     # Sometimes, the port (i.e. localhost:0) indicated by the user does not match with
     # the one the server actually is listening, so at least warn the user about it
-- 
2.26.2


^ permalink raw reply related	[flat|nested] 14+ messages in thread

* [RFC PATCH 08/11] prserv: Handle requests in main thread
  2021-04-12 11:41 [RFC PATCH 00/11] Modernise prserv Paul Barker
                   ` (6 preceding siblings ...)
  2021-04-12 11:41 ` [RFC PATCH 07/11] prserv: Extract daemonization from PRServer class Paul Barker
@ 2021-04-12 11:41 ` Paul Barker
  2021-04-12 11:41 ` [RFC PATCH 09/11] prserv: Drop unused methods Paul Barker
                   ` (3 subsequent siblings)
  11 siblings, 0 replies; 14+ messages in thread
From: Paul Barker @ 2021-04-12 11:41 UTC (permalink / raw)
  To: bitbake-devel, Richard Purdie, Joshua Watt; +Cc: Paul Barker

The prserver process is cleanly separated from the main bitbake process
so requests can be handled in the main thread. This removes the need for
a request queue and a separate request handling thread.

Signed-off-by: Paul Barker <pbarker@konsulko.com>
---
 lib/prserv/serv.py | 159 ++++++++++-----------------------------------
 1 file changed, 36 insertions(+), 123 deletions(-)

diff --git a/lib/prserv/serv.py b/lib/prserv/serv.py
index 74bfdc1bb..0c61c6bf7 100644
--- a/lib/prserv/serv.py
+++ b/lib/prserv/serv.py
@@ -5,8 +5,6 @@
 import os,sys,logging
 import signal, time
 from xmlrpc.server import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler
-import threading
-import queue
 import socket
 import io
 import sqlite3
@@ -14,7 +12,6 @@ import bb.server.xmlrpcclient
 import prserv
 import prserv.db
 import errno
-import select
 import multiprocessing
 
 logger = logging.getLogger("BitBake.PRserv")
@@ -48,55 +45,18 @@ class PRServer(SimpleXMLRPCServer):
 
         self.dbfile=dbfile
         self.logfile=logfile
-        self.working_thread=None
         self.host, self.port = self.socket.getsockname()
-        self.pidfile=PIDPREFIX % (self.host, self.port)
 
         self.register_function(self.getPR, "getPR")
-        self.register_function(self.quit, "quit")
         self.register_function(self.ping, "ping")
         self.register_function(self.export, "export")
         self.register_function(self.dump_db, "dump_db")
         self.register_function(self.importone, "importone")
         self.register_introspection_functions()
 
-        self.quitpipein, self.quitpipeout = os.pipe()
-
-        self.requestqueue = queue.Queue()
-        self.handlerthread = threading.Thread(target = self.process_request_thread)
-        self.handlerthread.daemon = False
-
-    def process_request_thread(self):
-        """Same as in BaseServer but as a thread.
-
-        In addition, exception handling is done here.
-
-        """
-        iter_count = 1
+        self.iter_count = 0
         # 60 iterations between syncs or sync if dirty every ~30 seconds
-        iterations_between_sync = 60
-
-        bb.utils.set_process_name("PRServ Handler")
-
-        while not self.quitflag:
-            try:
-                (request, client_address) = self.requestqueue.get(True, 30)
-            except queue.Empty:
-                self.table.sync_if_dirty()
-                continue
-            if request is None:
-                continue
-            try:
-                self.finish_request(request, client_address)
-                self.shutdown_request(request)
-                iter_count = (iter_count + 1) % iterations_between_sync
-                if iter_count == 0:
-                    self.table.sync_if_dirty()
-            except:
-                self.handle_error(request, client_address)
-                self.shutdown_request(request)
-                self.table.sync()
-            self.table.sync_if_dirty()
+        self.iterations_between_sync = 60
 
     def sigint_handler(self, signum, stack):
         if self.table:
@@ -105,11 +65,30 @@ class PRServer(SimpleXMLRPCServer):
     def sigterm_handler(self, signum, stack):
         if self.table:
             self.table.sync()
-        self.quit()
-        self.requestqueue.put((None, None))
+        raise(SystemExit)
 
     def process_request(self, request, client_address):
-        self.requestqueue.put((request, client_address))
+        if request is None:
+            return
+        try:
+            self.finish_request(request, client_address)
+            self.shutdown_request(request)
+            self.iter_count = (self.iter_count + 1) % self.iterations_between_sync
+            if self.iter_count == 0:
+                self.table.sync_if_dirty()
+        except:
+            self.handle_error(request, client_address)
+            self.shutdown_request(request)
+            self.table.sync()
+        self.table.sync_if_dirty()
+
+    def serve_forever(self, poll_interval=0.5):
+        signal.signal(signal.SIGINT, self.sigint_handler)
+        signal.signal(signal.SIGTERM, self.sigterm_handler)
+
+        self.db = prserv.db.PRData(self.dbfile)
+        self.table = self.db["PRMAIN"]
+        return super().serve_forever(poll_interval)
 
     def export(self, version=None, pkgarch=None, checksum=None, colinfo=True):
         try:
@@ -142,7 +121,7 @@ class PRServer(SimpleXMLRPCServer):
         return self.table.importone(version, pkgarch, checksum, value)
 
     def ping(self):
-        return not self.quitflag
+        return True
 
     def getinfo(self):
         return (self.host, self.port)
@@ -157,45 +136,6 @@ class PRServer(SimpleXMLRPCServer):
             logger.error(str(exc))
             return None
 
-    def quit(self):
-        self.quitflag=True
-        os.write(self.quitpipeout, b"q")
-        os.close(self.quitpipeout)
-        return
-
-    def work_forever(self,):
-        self.quitflag = False
-        # This timeout applies to the poll in TCPServer, we need the select 
-        # below to wake on our quit pipe closing. We only ever call into handle_request
-        # if there is data there.
-        self.timeout = 0.01
-
-        signal.signal(signal.SIGINT, self.sigint_handler)
-        signal.signal(signal.SIGTERM, self.sigterm_handler)
-
-        bb.utils.set_process_name("PRServ")
-
-        # DB connection must be created after all forks
-        self.db = prserv.db.PRData(self.dbfile)
-        self.table = self.db["PRMAIN"]
-
-        logger.info("Started PRServer with DBfile: %s, IP: %s, PORT: %s, PID: %s" %
-                     (self.dbfile, self.host, self.port, str(os.getpid())))
-
-        self.handlerthread.start()
-        while not self.quitflag:
-            ready = select.select([self.fileno(), self.quitpipein], [], [], 30)
-            if self.quitflag:
-                break
-            if self.fileno() in ready[0]:
-                self.handle_request()
-        self.handlerthread.join()
-        self.db.disconnect()
-        logger.info("PRServer: stopping...")
-        self.server_close()
-        os.close(self.quitpipein)
-        return
-
 class PRServSingleton(object):
     def __init__(self, dbfile, logfile, interface):
         self.dbfile = dbfile
@@ -206,7 +146,7 @@ class PRServSingleton(object):
 
     def start(self):
         self.prserv = PRServer(self.dbfile, self.logfile, self.interface)
-        self.process = multiprocessing.Process(target=self.prserv.work_forever)
+        self.process = multiprocessing.Process(target=self.prserv.serve_forever)
         self.process.start()
 
         self.host, self.port = self.prserv.getinfo()
@@ -222,13 +162,6 @@ class PRServerConnection(object):
         self.port = port
         self.connection, self.transport = bb.server.xmlrpcclient._create_server(self.host, self.port)
 
-    def terminate(self):
-        try:
-            logger.info("Terminating PRServer...")
-            self.connection.quit()
-        except Exception as exc:
-            sys.stderr.write("%s\n" % str(exc))
-
     def getPR(self, version, pkgarch, checksum):
         return self.connection.getPR(version, pkgarch, checksum)
 
@@ -332,7 +265,7 @@ def start_daemon(dbfile, host, port, logfile):
         return 1
 
     server = PRServer(os.path.abspath(dbfile), os.path.abspath(logfile), (ip,port))
-    run_as_daemon(server.work_forever, pidfile, os.path.abspath(logfile))
+    run_as_daemon(server.serve_forever, pidfile, os.path.abspath(logfile))
 
     # Sometimes, the port (i.e. localhost:0) indicated by the user does not match with
     # the one the server actually is listening, so at least warn the user about it
@@ -369,25 +302,13 @@ def stop_daemon(host, port):
         return 1
 
     try:
-        PRServerConnection(ip, port).terminate()
-    except:
-        logger.critical("Stop PRService %s:%d failed" % (host,port))
-
-    try:
-        if pid:
-            wait_timeout = 0
-            print("Waiting for pr-server to exit.")
-            while is_running(pid) and wait_timeout < 50:
-                time.sleep(0.1)
-                wait_timeout += 1
+        if is_running(pid):
+            print("Sending SIGTERM to pr-server.")
+            os.kill(pid, signal.SIGTERM)
+            time.sleep(0.1)
 
-            if is_running(pid):
-                print("Sending SIGTERM to pr-server.")
-                os.kill(pid,signal.SIGTERM)
-                time.sleep(0.1)
-
-            if os.path.exists(pidfile):
-                os.remove(pidfile)
+        if os.path.exists(pidfile):
+            os.remove(pidfile)
 
     except OSError as e:
         err = str(e)
@@ -463,17 +384,9 @@ def auto_start(d):
 
 def auto_shutdown():
     global singleton
-    if singleton:
-        host, port = singleton.getinfo()
-        try:
-            PRServerConnection(host, port).terminate()
-        except:
-            logger.critical("Stop PRService %s:%d failed" % (host,port))
-
-        try:
-            os.waitpid(singleton.prserv.pid, 0)
-        except ChildProcessError:
-            pass
+    if singleton and singleton.process:
+        singleton.process.terminate()
+        singleton.process.join()
         singleton = None
 
 def ping(host, port):
-- 
2.26.2


^ permalink raw reply related	[flat|nested] 14+ messages in thread

* [RFC PATCH 09/11] prserv: Drop unused methods
  2021-04-12 11:41 [RFC PATCH 00/11] Modernise prserv Paul Barker
                   ` (7 preceding siblings ...)
  2021-04-12 11:41 ` [RFC PATCH 08/11] prserv: Handle requests in main thread Paul Barker
@ 2021-04-12 11:41 ` Paul Barker
  2021-04-12 11:41 ` [RFC PATCH 10/11] prserv: Replace XML RPC with modern asyncrpc implementation Paul Barker
                   ` (2 subsequent siblings)
  11 siblings, 0 replies; 14+ messages in thread
From: Paul Barker @ 2021-04-12 11:41 UTC (permalink / raw)
  To: bitbake-devel, Richard Purdie, Joshua Watt; +Cc: Paul Barker

* The dump_db method is not used anywhere in the current bitbake or
  openembedded-core code.

* The getinfo request is handled entirely on the client side so the
  server side method isn't needed.

Signed-off-by: Paul Barker <pbarker@konsulko.com>
---
 lib/prserv/serv.py | 27 ---------------------------
 1 file changed, 27 deletions(-)

diff --git a/lib/prserv/serv.py b/lib/prserv/serv.py
index 0c61c6bf7..30f30e355 100644
--- a/lib/prserv/serv.py
+++ b/lib/prserv/serv.py
@@ -50,7 +50,6 @@ class PRServer(SimpleXMLRPCServer):
         self.register_function(self.getPR, "getPR")
         self.register_function(self.ping, "ping")
         self.register_function(self.export, "export")
-        self.register_function(self.dump_db, "dump_db")
         self.register_function(self.importone, "importone")
         self.register_introspection_functions()
 
@@ -97,35 +96,12 @@ class PRServer(SimpleXMLRPCServer):
             logger.error(str(exc))
             return None
 
-    def dump_db(self):
-        """
-        Returns a script (string) that reconstructs the state of the
-        entire database at the time this function is called. The script
-        language is defined by the backing database engine, which is a
-        function of server configuration.
-        Returns None if the database engine does not support dumping to
-        script or if some other error is encountered in processing.
-        """
-        buff = io.StringIO()
-        try:
-            self.table.sync()
-            self.table.dump_db(buff)
-            return buff.getvalue()
-        except Exception as exc:
-            logger.error(str(exc))
-            return None
-        finally:
-            buff.close()
-
     def importone(self, version, pkgarch, checksum, value):
         return self.table.importone(version, pkgarch, checksum, value)
 
     def ping(self):
         return True
 
-    def getinfo(self):
-        return (self.host, self.port)
-
     def getPR(self, version, pkgarch, checksum):
         try:
             return self.table.getValue(version, pkgarch, checksum)
@@ -171,9 +147,6 @@ class PRServerConnection(object):
     def export(self,version=None, pkgarch=None, checksum=None, colinfo=True):
         return self.connection.export(version, pkgarch, checksum, colinfo)
 
-    def dump_db(self):
-        return self.connection.dump_db()
-
     def importone(self, version, pkgarch, checksum, value):
         return self.connection.importone(version, pkgarch, checksum, value)
 
-- 
2.26.2


^ permalink raw reply related	[flat|nested] 14+ messages in thread

* [RFC PATCH 10/11] prserv: Replace XML RPC with modern asyncrpc implementation
  2021-04-12 11:41 [RFC PATCH 00/11] Modernise prserv Paul Barker
                   ` (8 preceding siblings ...)
  2021-04-12 11:41 ` [RFC PATCH 09/11] prserv: Drop unused methods Paul Barker
@ 2021-04-12 11:41 ` Paul Barker
  2021-04-12 11:41 ` [RFC PATCH 11/11] prserv: Add connect function Paul Barker
  2021-04-15 16:48 ` [RFC PATCH 00/11] Modernise prserv Richard Purdie
  11 siblings, 0 replies; 14+ messages in thread
From: Paul Barker @ 2021-04-12 11:41 UTC (permalink / raw)
  To: bitbake-devel, Richard Purdie, Joshua Watt; +Cc: Paul Barker

Update the prserv client and server classes to use the modern json and
asyncio based RPC system implemented by the asyncrpc module.

Signed-off-by: Paul Barker <pbarker@konsulko.com>
---
 lib/prserv/serv.py | 252 +++++++++++++++++++++++----------------------
 1 file changed, 129 insertions(+), 123 deletions(-)

diff --git a/lib/prserv/serv.py b/lib/prserv/serv.py
index 30f30e355..70bb2b2fc 100644
--- a/lib/prserv/serv.py
+++ b/lib/prserv/serv.py
@@ -4,154 +4,160 @@
 
 import os,sys,logging
 import signal, time
-from xmlrpc.server import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler
 import socket
 import io
 import sqlite3
-import bb.server.xmlrpcclient
 import prserv
 import prserv.db
 import errno
 import multiprocessing
+import bb.asyncrpc
 
 logger = logging.getLogger("BitBake.PRserv")
 
-class Handler(SimpleXMLRPCRequestHandler):
-    def _dispatch(self,method,params):
-        try:
-            value=self.server.funcs[method](*params)
-        except:
-            import traceback
-            traceback.print_exc()
-            raise
-        return value
-
 PIDPREFIX = "/tmp/PRServer_%s_%s.pid"
 singleton = None
 
+class PRServerClient(bb.asyncrpc.AsyncServerConnection):
+    def __init__(self, reader, writer, table):
+        super().__init__(reader, writer, 'PRSERVICE', logger)
+        self.handlers.update({
+            'get-pr': self.handle_get_pr,
+            'import-one': self.handle_import_one,
+            'export': self.handle_export,
+        })
+        self.table = table
 
-class PRServer(SimpleXMLRPCServer):
-    def __init__(self, dbfile, logfile, interface):
-        ''' constructor '''
-        try:
-            SimpleXMLRPCServer.__init__(self, interface,
-                                        logRequests=False, allow_none=True)
-        except socket.error:
-            ip=socket.gethostbyname(interface[0])
-            port=interface[1]
-            msg="PR Server unable to bind to %s:%s\n" % (ip, port)
-            sys.stderr.write(msg)
-            raise PRServiceConfigError
-
-        self.dbfile=dbfile
-        self.logfile=logfile
-        self.host, self.port = self.socket.getsockname()
+    def validate_proto_version(self):
+        return (self.proto_version == (1, 0))
 
-        self.register_function(self.getPR, "getPR")
-        self.register_function(self.ping, "ping")
-        self.register_function(self.export, "export")
-        self.register_function(self.importone, "importone")
-        self.register_introspection_functions()
-
-        self.iter_count = 0
-        # 60 iterations between syncs or sync if dirty every ~30 seconds
-        self.iterations_between_sync = 60
-
-    def sigint_handler(self, signum, stack):
-        if self.table:
-            self.table.sync()
-
-    def sigterm_handler(self, signum, stack):
-        if self.table:
-            self.table.sync()
-        raise(SystemExit)
-
-    def process_request(self, request, client_address):
-        if request is None:
-            return
+    async def dispatch_message(self, msg):
         try:
-            self.finish_request(request, client_address)
-            self.shutdown_request(request)
-            self.iter_count = (self.iter_count + 1) % self.iterations_between_sync
-            if self.iter_count == 0:
-                self.table.sync_if_dirty()
+            await super().dispatch_message(msg)
         except:
-            self.handle_error(request, client_address)
-            self.shutdown_request(request)
             self.table.sync()
-        self.table.sync_if_dirty()
+            raise
 
-    def serve_forever(self, poll_interval=0.5):
-        signal.signal(signal.SIGINT, self.sigint_handler)
-        signal.signal(signal.SIGTERM, self.sigterm_handler)
+        self.table.sync_if_dirty()
 
-        self.db = prserv.db.PRData(self.dbfile)
-        self.table = self.db["PRMAIN"]
-        return super().serve_forever(poll_interval)
+    async def handle_get_pr(self, request):
+        version = request['version']
+        pkgarch = request['pkgarch']
+        checksum = request['checksum']
 
-    def export(self, version=None, pkgarch=None, checksum=None, colinfo=True):
+        response = None
         try:
-            return self.table.export(version, pkgarch, checksum, colinfo)
+            value = self.table.getValue(version, pkgarch, checksum)
+            response = {'value': value}
+        except prserv.NotFoundError:
+            logger.error("can not find value for (%s, %s)",version, checksum)
         except sqlite3.Error as exc:
             logger.error(str(exc))
-            return None
 
-    def importone(self, version, pkgarch, checksum, value):
-        return self.table.importone(version, pkgarch, checksum, value)
+        self.write_message(response)
 
-    def ping(self):
-        return True
+    async def handle_import_one(self, request):
+        version = request['version']
+        pkgarch = request['pkgarch']
+        checksum = request['checksum']
+        value = request['value']
+
+        value = self.table.importone(version, pkgarch, checksum, value)
+        if value is not None:
+            response = {'value': value}
+        else:
+            response = None
+        self.write_message(response)
+
+    async def handle_export(self, request):
+        version = request['version']
+        pkgarch = request['pkgarch']
+        checksum = request['checksum']
+        colinfo = request['colinfo']
 
-    def getPR(self, version, pkgarch, checksum):
+        response = None
         try:
-            return self.table.getValue(version, pkgarch, checksum)
-        except prserv.NotFoundError:
-            logger.error("can not find value for (%s, %s)",version, checksum)
-            return None
+            (metainfo, datainfo) = self.table.export(version, pkgarch, checksum, colinfo)
+            response = {'metainfo': metainfo, 'datainfo': datainfo}
         except sqlite3.Error as exc:
             logger.error(str(exc))
-            return None
 
-class PRServSingleton(object):
-    def __init__(self, dbfile, logfile, interface):
+        self.write_message(response)
+
+class PRServer(bb.asyncrpc.AsyncServer):
+    def __init__(self, dbfile, loop=None):
+        super().__init__(logger, loop)
         self.dbfile = dbfile
-        self.logfile = logfile
-        self.interface = interface
-        self.host = None
-        self.port = None
+        self.table = None
 
-    def start(self):
-        self.prserv = PRServer(self.dbfile, self.logfile, self.interface)
-        self.process = multiprocessing.Process(target=self.prserv.serve_forever)
-        self.process.start()
+    def accept_client(self, reader, writer):
+        return PRServerClient(reader, writer, self.table)
 
-        self.host, self.port = self.prserv.getinfo()
+    def serve_forever(self):
+        self.db = prserv.db.PRData(self.dbfile)
+        self.table = self.db["PRMAIN"]
 
-    def getinfo(self):
-        return (self.host, self.port)
+        logger.debug("Started PRServer with DBfile: %s, Address: %s, PID: %s" %
+                     (self.dbfile, self.address, str(os.getpid())))
 
-class PRServerConnection(object):
-    def __init__(self, host, port):
-        if is_local_special(host, port):
-            host, port = singleton.getinfo()
-        self.host = host
-        self.port = port
-        self.connection, self.transport = bb.server.xmlrpcclient._create_server(self.host, self.port)
+        super().serve_forever()
 
-    def getPR(self, version, pkgarch, checksum):
-        return self.connection.getPR(version, pkgarch, checksum)
+        self.table.sync_if_dirty()
+        self.db.disconnect()
 
-    def ping(self):
-        return self.connection.ping()
+    def signal_handler(self):
+        super().signal_handler()
+        if self.table:
+            self.table.sync()
 
-    def export(self,version=None, pkgarch=None, checksum=None, colinfo=True):
-        return self.connection.export(version, pkgarch, checksum, colinfo)
+class PRServSingleton(object):
+    def __init__(self, dbfile, logfile, host, port):
+        self.dbfile = dbfile
+        self.logfile = logfile
+        self.host = host
+        self.port = port
 
-    def importone(self, version, pkgarch, checksum, value):
-        return self.connection.importone(version, pkgarch, checksum, value)
+    def start(self):
+        self.prserv = PRServer(self.dbfile)
+        self.prserv.start_tcp_server(self.host, self.port)
+        self.process = multiprocessing.Process(target=self.prserv.serve_forever)
+        self.process.start()
 
-    def getinfo(self):
-        return self.host, self.port
+        if not self.port:
+            self.port = int(self.prserv.address.rsplit(':', 1)[1])
+
+class PRAsyncClient(bb.asyncrpc.AsyncClient):
+    def __init__(self):
+        super().__init__('PRSERVICE', '1.0', logger)
+
+    async def getPR(self, version, pkgarch, checksum):
+        response = await self.send_message(
+            {'get-pr': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum}}
+        )
+        if response:
+            return response['value']
+
+    async def importone(self, version, pkgarch, checksum, value):
+        response = await self.send_message(
+            {'import-one': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum, 'value': value}}
+        )
+        if response:
+            return response['value']
+
+    async def export(self, version, pkgarch, checksum, colinfo):
+        response = await self.send_message(
+            {'export': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum, 'colinfo': colinfo}}
+        )
+        if response:
+            return (response['metainfo'], response['datainfo'])
+
+class PRClient(bb.asyncrpc.Client):
+    def __init__(self):
+        super().__init__()
+        self._add_methods('getPR', 'importone', 'export')
+
+    def _get_async_client(self):
+        return PRAsyncClient()
 
 def run_as_daemon(func, pidfile, logfile):
     """
@@ -237,15 +243,13 @@ def start_daemon(dbfile, host, port, logfile):
                             % pidfile)
         return 1
 
-    server = PRServer(os.path.abspath(dbfile), os.path.abspath(logfile), (ip,port))
-    run_as_daemon(server.serve_forever, pidfile, os.path.abspath(logfile))
+    dbfile = os.path.abspath(dbfile)
+    def daemon_main():
+        server = PRServer(dbfile)
+        server.start_tcp_server(host, port)
+        server.serve_forever()
 
-    # Sometimes, the port (i.e. localhost:0) indicated by the user does not match with
-    # the one the server actually is listening, so at least warn the user about it
-    _,rport = server.getinfo()
-    if port != rport:
-        sys.stdout.write("Server is listening at port %s instead of %s\n"
-                         % (rport,port))
+    run_as_daemon(daemon_main, pidfile, os.path.abspath(logfile))
     return 0
 
 def stop_daemon(host, port):
@@ -299,7 +303,7 @@ def is_running(pid):
     return True
 
 def is_local_special(host, port):
-    if host.strip().upper() == 'localhost'.upper() and (not port):
+    if host.strip().lower() == 'localhost' and not port:
         return True
     else:
         return False
@@ -337,20 +341,21 @@ def auto_start(d):
                auto_shutdown()
         if not singleton:
             bb.utils.mkdirhier(cachedir)
-            singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), ("localhost",0))
+            singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), "localhost", 0)
             singleton.start()
     if singleton:
-        host, port = singleton.getinfo()
+        host = singleton.host
+        port = singleton.port
     else:
         host = host_params[0]
         port = int(host_params[1])
 
     try:
-        connection = PRServerConnection(host,port)
+        connection = PRClient()
+        connection.connect_tcp(host, port)
         connection.ping()
-        realhost, realport = connection.getinfo()
-        return str(realhost) + ":" + str(realport)
-        
+        return str(host) + ":" + str(port)
+
     except Exception:
         logger.critical("PRservice %s:%d not available" % (host, port))
         raise PRServiceConfigError
@@ -363,5 +368,6 @@ def auto_shutdown():
         singleton = None
 
 def ping(host, port):
-    conn=PRServerConnection(host, port)
+    conn=PRClient()
+    conn.connect_tcp(host, port)
     return conn.ping()
-- 
2.26.2


^ permalink raw reply related	[flat|nested] 14+ messages in thread

* [RFC PATCH 11/11] prserv: Add connect function
  2021-04-12 11:41 [RFC PATCH 00/11] Modernise prserv Paul Barker
                   ` (9 preceding siblings ...)
  2021-04-12 11:41 ` [RFC PATCH 10/11] prserv: Replace XML RPC with modern asyncrpc implementation Paul Barker
@ 2021-04-12 11:41 ` Paul Barker
  2021-04-15 16:48 ` [RFC PATCH 00/11] Modernise prserv Richard Purdie
  11 siblings, 0 replies; 14+ messages in thread
From: Paul Barker @ 2021-04-12 11:41 UTC (permalink / raw)
  To: bitbake-devel, Richard Purdie, Joshua Watt; +Cc: Paul Barker

This function abstracts the setup of a PR service client connection so
that openembedded-core doesn't need to be updated any time the details
are changed.

Signed-off-by: Paul Barker <pbarker@konsulko.com>
---
 lib/prserv/serv.py | 11 +++++++++++
 1 file changed, 11 insertions(+)

diff --git a/lib/prserv/serv.py b/lib/prserv/serv.py
index 70bb2b2fc..055b18bd2 100644
--- a/lib/prserv/serv.py
+++ b/lib/prserv/serv.py
@@ -371,3 +371,14 @@ def ping(host, port):
     conn=PRClient()
     conn.connect_tcp(host, port)
     return conn.ping()
+
+def connect(host, port):
+    global singleton
+
+    if host.strip().lower() == 'localhost' and not port:
+        host = 'localhost'
+        port = singleton.port
+
+    conn = PRClient()
+    conn.connect_tcp(host, port)
+    return conn
-- 
2.26.2


^ permalink raw reply related	[flat|nested] 14+ messages in thread

* Re: [RFC PATCH 00/11] Modernise prserv
  2021-04-12 11:41 [RFC PATCH 00/11] Modernise prserv Paul Barker
                   ` (10 preceding siblings ...)
  2021-04-12 11:41 ` [RFC PATCH 11/11] prserv: Add connect function Paul Barker
@ 2021-04-15 16:48 ` Richard Purdie
  2021-04-26  7:54   ` Paul Barker
  11 siblings, 1 reply; 14+ messages in thread
From: Richard Purdie @ 2021-04-15 16:48 UTC (permalink / raw)
  To: Paul Barker, bitbake-devel, Joshua Watt

On Mon, 2021-04-12 at 12:41 +0100, Paul Barker wrote:
> The prserv module is converted from the old XML RPC mechinism to the more
> modern json + asyncio RPC system used by hashserv. The common code now
> shared between hashserv and prserv is moved to a new asyncrpc module. The
> startup and shutdown code within prserv is also refactored and modernised,
> using the Python multiprocessing module where possible.
> 
> These changes are not expected to land in the upcoming 3.3 release, they're
> currently submitted as an RFC to get some early review.
> 
> The following change is needed in oe-core when testing these patches, this
> can be submitted separately once these changes are past the RFC stage:
> 
>     diff --git a/meta/lib/oe/prservice.py b/meta/lib/oe/prservice.py
>     index fcdbe66c19..15ce060ff6 100644
>     --- a/meta/lib/oe/prservice.py
>     +++ b/meta/lib/oe/prservice.py
>     @@ -7,7 +7,7 @@ def prserv_make_conn(d, check = False):
>         host_params = list([_f for _f in (d.getVar("PRSERV_HOST") or '').split(':') if _f])
>         try:
>             conn = None
>     -        conn = prserv.serv.PRServerConnection(host_params[0], int(host_params[1]))
>     +        conn = prserv.serv.connect(host_params[0], int(host_params[1]))
>             if check:
>                 if not conn.ping():
>                     raise Exception('service not available')
> 
> I'm also currently working on a follow-up patch which will add a read-only
> mode to prserv.
> 
> Let me know if there are any other questions :)

These looked reasonable to me, I thought I'd try some testing. I can't
prove it yet but there were three hanging oe-selftest runs in the build
where these were included:

https://autobuilder.yoctoproject.org/typhoon/#/builders/79/builds/2009
https://autobuilder.yoctoproject.org/typhoon/#/builders/86/builds/1994
https://autobuilder.yoctoproject.org/typhoon/#/builders/87/builds/2026
(all running for 2+ days)

and one that failed in runtime_test.TestImage.test_testimage_dnf:

https://autobuilder.yoctoproject.org/typhoon/#/builders/87/builds/2026

I did a process trace on the centos8-ty-2 worker in case it hinted where
it was stuck and I see:

2161691 ?        S      0:00 bash /home/pokybuild/yocto-worker/oe-selftest-centos/build/scripts/bitbake-prserv-tool export /home/pokybuild/yocto-worker/oe-selftest-centos/build/build-st-1454506/export.inc
2167336 ?        Sl     0:37 python3 /home/pokybuild/yocto-worker/oe-selftest-centos/build/bitbake/bin/bitbake -R conf/prexport.conf -p

the other things I could see in ps, I'd have expected. On fedora, I
saw:

2676512 ?        S      0:00 bash /home/pokybuild/yocto-worker/oe-selftest-fedora/build/scripts/bitbake-prserv-tool export /home/pokybuild/yocto-worker/oe-selftest-fedora/build/build-st-765595/export.inc
2679599 ?        Sl     0:19 python3 /home/pokybuild/yocto-worker/oe-selftest-fedora/build/bitbake/bin/bitbake -R conf/prexport.conf -p

and on ubuntu too:

 743847 ?        S      0:00 /bin/sh -c bitbake-prserv-tool export /home/pokybuild/yocto-worker/oe-selftest-ubuntu/build/build-st-3944061/export.inc
 743851 ?        S      0:00 bash /home/pokybuild/yocto-worker/oe-selftest-ubuntu/build/scripts/bitbake-prserv-tool export /home/pokybuild/yocto-worker/oe-selftest-ubuntu/build/build-st-3944061/export.inc
 748863 ?        Sl     0:46 python3 /home/pokybuild/yocto-worker/oe-selftest-ubuntu/build/bitbake/bin/bitbake -R conf/prexport.conf -p

For 'fun' I killed the above ubuntu processes which leads to the log output in:

https://autobuilder.yoctoproject.org/typhoon/#/builders/87/builds/2026/steps/14/logs/stdio

which shows it was in prservice.BitbakePrTests.test_import_export_replace_db
and the build has now made more progress failing a few other tests. Not quite
sure what it will do from here but it seems related to this series.

Cheers,

Richard









^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [RFC PATCH 00/11] Modernise prserv
  2021-04-15 16:48 ` [RFC PATCH 00/11] Modernise prserv Richard Purdie
@ 2021-04-26  7:54   ` Paul Barker
  0 siblings, 0 replies; 14+ messages in thread
From: Paul Barker @ 2021-04-26  7:54 UTC (permalink / raw)
  To: Richard Purdie; +Cc: bitbake-devel, Joshua Watt

On Thu, 15 Apr 2021 at 17:48, Richard Purdie
<richard.purdie@linuxfoundation.org> wrote:
>
> On Mon, 2021-04-12 at 12:41 +0100, Paul Barker wrote:
> > The prserv module is converted from the old XML RPC mechinism to the more
> > modern json + asyncio RPC system used by hashserv. The common code now
> > shared between hashserv and prserv is moved to a new asyncrpc module. The
> > startup and shutdown code within prserv is also refactored and modernised,
> > using the Python multiprocessing module where possible.
> >
> > These changes are not expected to land in the upcoming 3.3 release, they're
> > currently submitted as an RFC to get some early review.
> >
> > The following change is needed in oe-core when testing these patches, this
> > can be submitted separately once these changes are past the RFC stage:
> >
> >     diff --git a/meta/lib/oe/prservice.py b/meta/lib/oe/prservice.py
> >     index fcdbe66c19..15ce060ff6 100644
> >     --- a/meta/lib/oe/prservice.py
> >     +++ b/meta/lib/oe/prservice.py
> >     @@ -7,7 +7,7 @@ def prserv_make_conn(d, check = False):
> >         host_params = list([_f for _f in (d.getVar("PRSERV_HOST") or '').split(':') if _f])
> >         try:
> >             conn = None
> >     -        conn = prserv.serv.PRServerConnection(host_params[0], int(host_params[1]))
> >     +        conn = prserv.serv.connect(host_params[0], int(host_params[1]))
> >             if check:
> >                 if not conn.ping():
> >                     raise Exception('service not available')
> >
> > I'm also currently working on a follow-up patch which will add a read-only
> > mode to prserv.
> >
> > Let me know if there are any other questions :)
>
> These looked reasonable to me, I thought I'd try some testing. I can't
> prove it yet but there were three hanging oe-selftest runs in the build
> where these were included:
>
> https://autobuilder.yoctoproject.org/typhoon/#/builders/79/builds/2009
> https://autobuilder.yoctoproject.org/typhoon/#/builders/86/builds/1994
> https://autobuilder.yoctoproject.org/typhoon/#/builders/87/builds/2026
> (all running for 2+ days)
>
> and one that failed in runtime_test.TestImage.test_testimage_dnf:
>
> https://autobuilder.yoctoproject.org/typhoon/#/builders/87/builds/2026
>
> I did a process trace on the centos8-ty-2 worker in case it hinted where
> it was stuck and I see:
>
> 2161691 ?        S      0:00 bash /home/pokybuild/yocto-worker/oe-selftest-centos/build/scripts/bitbake-prserv-tool export /home/pokybuild/yocto-worker/oe-selftest-centos/build/build-st-1454506/export.inc
> 2167336 ?        Sl     0:37 python3 /home/pokybuild/yocto-worker/oe-selftest-centos/build/bitbake/bin/bitbake -R conf/prexport.conf -p
>
> the other things I could see in ps, I'd have expected. On fedora, I
> saw:
>
> 2676512 ?        S      0:00 bash /home/pokybuild/yocto-worker/oe-selftest-fedora/build/scripts/bitbake-prserv-tool export /home/pokybuild/yocto-worker/oe-selftest-fedora/build/build-st-765595/export.inc
> 2679599 ?        Sl     0:19 python3 /home/pokybuild/yocto-worker/oe-selftest-fedora/build/bitbake/bin/bitbake -R conf/prexport.conf -p
>
> and on ubuntu too:
>
>  743847 ?        S      0:00 /bin/sh -c bitbake-prserv-tool export /home/pokybuild/yocto-worker/oe-selftest-ubuntu/build/build-st-3944061/export.inc
>  743851 ?        S      0:00 bash /home/pokybuild/yocto-worker/oe-selftest-ubuntu/build/scripts/bitbake-prserv-tool export /home/pokybuild/yocto-worker/oe-selftest-ubuntu/build/build-st-3944061/export.inc
>  748863 ?        Sl     0:46 python3 /home/pokybuild/yocto-worker/oe-selftest-ubuntu/build/bitbake/bin/bitbake -R conf/prexport.conf -p
>
> For 'fun' I killed the above ubuntu processes which leads to the log output in:
>
> https://autobuilder.yoctoproject.org/typhoon/#/builders/87/builds/2026/steps/14/logs/stdio
>
> which shows it was in prservice.BitbakePrTests.test_import_export_replace_db
> and the build has now made more progress failing a few other tests. Not quite
> sure what it will do from here but it seems related to this series.

I've looked into this and tried to reproduce the hang by running the
full oe-selftest suite locally. Unfortunately everything seems to work
for me!

I think the best approach is to break this series apart. I'll send an
initial series today which includes preparation and initial
refactoring but doesn't actually change the prserv start/shutdown
method or the RPC method. These changes should be safe to merge. After
that I'll send a patch series to change the start/shutdown methods for
prserv and finally I'll send a patch series to change the RPC method.

When I get to the 2nd patch series, for the start/shutdown methods I
will add timeouts in case this is hanging when we call .join() on the
prservice process.

Thanks,

-- 
Paul Barker
Konsulko Group

^ permalink raw reply	[flat|nested] 14+ messages in thread

end of thread, other threads:[~2021-04-26  7:54 UTC | newest]

Thread overview: 14+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-04-12 11:41 [RFC PATCH 00/11] Modernise prserv Paul Barker
2021-04-12 11:41 ` [RFC PATCH 01/11] hashserv: Use generic ConnectionError Paul Barker
2021-04-12 11:41 ` [RFC PATCH 02/11] asyncrpc: Common implementation of RPC using json & asyncio Paul Barker
2021-04-12 11:41 ` [RFC PATCH 03/11] hashserv: Refactor to use asyncrpc Paul Barker
2021-04-12 11:41 ` [RFC PATCH 04/11] prserv: Drop obsolete python version check Paul Barker
2021-04-12 11:41 ` [RFC PATCH 05/11] asyncrpc: Add ping method Paul Barker
2021-04-12 11:41 ` [RFC PATCH 06/11] prserv: Use multiprocessing to auto start prserver Paul Barker
2021-04-12 11:41 ` [RFC PATCH 07/11] prserv: Extract daemonization from PRServer class Paul Barker
2021-04-12 11:41 ` [RFC PATCH 08/11] prserv: Handle requests in main thread Paul Barker
2021-04-12 11:41 ` [RFC PATCH 09/11] prserv: Drop unused methods Paul Barker
2021-04-12 11:41 ` [RFC PATCH 10/11] prserv: Replace XML RPC with modern asyncrpc implementation Paul Barker
2021-04-12 11:41 ` [RFC PATCH 11/11] prserv: Add connect function Paul Barker
2021-04-15 16:48 ` [RFC PATCH 00/11] Modernise prserv Richard Purdie
2021-04-26  7:54   ` Paul Barker

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.