* [PATCH v4 0/5] Re-implement prserv on top of asyncrpc
@ 2021-07-27 2:37 Scott Murray
2021-07-27 2:37 ` [PATCH v4 1/5] asyncrpc: Wait on writers to close with Python 3.7+ Scott Murray
` (6 more replies)
0 siblings, 7 replies; 10+ messages in thread
From: Scott Murray @ 2021-07-27 2:37 UTC (permalink / raw)
To: bitbake-devel, Richard Purdie, Joshua Watt
These changes replace the old XML-based RPC system in prserv with the
new asyncrpc implementation originally used by hashserv, and add a
read-only mode to match the hash equivalency server's support.
Changes from v3:
* Scott Murray taking over upstreaming effort from Paul Barker.
* Dropped patches which are currently applied to master-next, this
series should be applied on top of the current master-next branch.
* Patches 2-4 updated by Scott Murray to rebase on top of 3983643
("bitbake: asyncrpc: Catch early SIGTERM").
* Read-only PR server support patch added to stack to get it into
the review process.
Paul Barker (5):
asyncrpc: Wait on writers to close with Python 3.7+
asyncrpc: Ensure that asyncio shutdown is clean
asyncrpc: Handle exceptions
prserv: Replace XML RPC with modern asyncrpc implementation
prserv: Add read-only mode
bin/bitbake-prserv | 4 +-
lib/bb/asyncrpc/client.py | 3 +
lib/bb/asyncrpc/serv.py | 34 ++++-
lib/prserv/db.py | 65 +++++++--
lib/prserv/serv.py | 286 ++++++++++++++++++++------------------
5 files changed, 239 insertions(+), 153 deletions(-)
--
2.31.1
^ permalink raw reply [flat|nested] 10+ messages in thread
* [PATCH v4 1/5] asyncrpc: Wait on writers to close with Python 3.7+
2021-07-27 2:37 [PATCH v4 0/5] Re-implement prserv on top of asyncrpc Scott Murray
@ 2021-07-27 2:37 ` Scott Murray
2021-07-27 2:37 ` [PATCH v4 2/5] asyncrpc: Ensure that asyncio shutdown is clean Scott Murray
` (5 subsequent siblings)
6 siblings, 0 replies; 10+ messages in thread
From: Scott Murray @ 2021-07-27 2:37 UTC (permalink / raw)
To: bitbake-devel, Richard Purdie, Joshua Watt
From: Paul Barker <pbarker@konsulko.com>
The close() method of an asyncio stream writer is not synchronous and so
may return before the writer has finished closing. In Python 3.7 the
wait_closed() method was added which can be used to ensure that a writer
has finished closing before continuing. Sadly in Python 3.6 and earlier
there is no way to guarantee that a writer has finished closing before
moving on.
Signed-off-by: Paul Barker <pbarker@konsulko.com>
---
lib/bb/asyncrpc/client.py | 3 +++
lib/bb/asyncrpc/serv.py | 4 ++++
2 files changed, 7 insertions(+)
diff --git a/lib/bb/asyncrpc/client.py b/lib/bb/asyncrpc/client.py
index 3eb4fdde..82b6068a 100644
--- a/lib/bb/asyncrpc/client.py
+++ b/lib/bb/asyncrpc/client.py
@@ -7,6 +7,7 @@ import asyncio
import json
import os
import socket
+import sys
from . import chunkify, DEFAULT_MAX_CHUNK
@@ -47,6 +48,8 @@ class AsyncClient(object):
if self.writer is not None:
self.writer.close()
+ if sys.version_info >= (3, 7):
+ await self.writer.wait_closed()
self.writer = None
async def _send_wrapper(self, proc):
diff --git a/lib/bb/asyncrpc/serv.py b/lib/bb/asyncrpc/serv.py
index 4084f300..2c219c1e 100644
--- a/lib/bb/asyncrpc/serv.py
+++ b/lib/bb/asyncrpc/serv.py
@@ -75,6 +75,8 @@ class AsyncServerConnection(object):
self.logger.error(str(e))
finally:
self.writer.close()
+ if sys.version_info >= (3, 7):
+ await self.writer.wait_closed()
async def dispatch_message(self, msg):
for k in self.handlers.keys():
@@ -193,6 +195,8 @@ class AsyncServer(object):
self.logger.error('Error from client: %s' % str(e), exc_info=True)
traceback.print_exc()
writer.close()
+ if sys.version_info >= (3, 7):
+ await writer.wait_closed()
self.logger.debug('Client disconnected')
def run_loop_forever(self):
--
2.31.1
^ permalink raw reply related [flat|nested] 10+ messages in thread
* [PATCH v4 2/5] asyncrpc: Ensure that asyncio shutdown is clean
2021-07-27 2:37 [PATCH v4 0/5] Re-implement prserv on top of asyncrpc Scott Murray
2021-07-27 2:37 ` [PATCH v4 1/5] asyncrpc: Wait on writers to close with Python 3.7+ Scott Murray
@ 2021-07-27 2:37 ` Scott Murray
2021-07-27 2:37 ` [PATCH v4 3/5] asyncrpc: Handle exceptions Scott Murray
` (4 subsequent siblings)
6 siblings, 0 replies; 10+ messages in thread
From: Scott Murray @ 2021-07-27 2:37 UTC (permalink / raw)
To: bitbake-devel, Richard Purdie, Joshua Watt
From: Paul Barker <pbarker@konsulko.com>
We should ensure that all async tasks are cancelled and then allowed to
finish gracefully before closing the asyncio loop.
Signed-off-by: Paul Barker <pbarker@konsulko.com>
[updated for asyncrpc SIGTERM handling changes]
Signed-off-by: Scott Murray <scott.murray@konsulko.com>
---
lib/bb/asyncrpc/serv.py | 24 ++++++++++++++++++------
1 file changed, 18 insertions(+), 6 deletions(-)
diff --git a/lib/bb/asyncrpc/serv.py b/lib/bb/asyncrpc/serv.py
index 2c219c1e..1b7b7a6d 100644
--- a/lib/bb/asyncrpc/serv.py
+++ b/lib/bb/asyncrpc/serv.py
@@ -205,21 +205,33 @@ class AsyncServer(object):
except KeyboardInterrupt:
pass
- def signal_handler(self):
+ def handle_signal(self):
self.logger.debug("Got exit signal")
+ self.loop.create_task(self.shutdown())
+
+ async def shutdown(self):
+ self.logger.debug('Server shutting down')
+
+ # Stop accepting connections
+ self.server.close()
+ await self.server.wait_closed()
+
+ # Cancel all active tasks
+ tasks = [t for t in asyncio.Task.all_tasks(self.loop)
+ if t is not asyncio.Task.current_task(self.loop)]
+ for task in tasks:
+ task.cancel()
+ await asyncio.gather(*tasks, return_exceptions=True)
self.loop.stop()
def serve_forever(self):
asyncio.set_event_loop(self.loop)
try:
- self.loop.add_signal_handler(signal.SIGTERM, self.signal_handler)
+ self.loop.add_signal_handler(signal.SIGTERM, self.handle_signal)
signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGTERM])
+ self.loop.add_signal_handler(signal.SIGINT, self.handle_signal)
self.run_loop_forever()
- self.server.close()
-
- self.loop.run_until_complete(self.server.wait_closed())
- self.logger.debug('Server shutting down')
finally:
if self.close_loop:
if sys.version_info >= (3, 6):
--
2.31.1
^ permalink raw reply related [flat|nested] 10+ messages in thread
* [PATCH v4 3/5] asyncrpc: Handle exceptions
2021-07-27 2:37 [PATCH v4 0/5] Re-implement prserv on top of asyncrpc Scott Murray
2021-07-27 2:37 ` [PATCH v4 1/5] asyncrpc: Wait on writers to close with Python 3.7+ Scott Murray
2021-07-27 2:37 ` [PATCH v4 2/5] asyncrpc: Ensure that asyncio shutdown is clean Scott Murray
@ 2021-07-27 2:37 ` Scott Murray
2021-07-27 2:37 ` [PATCH v4 4/5] prserv: Replace XML RPC with modern asyncrpc implementation Scott Murray
` (3 subsequent siblings)
6 siblings, 0 replies; 10+ messages in thread
From: Scott Murray @ 2021-07-27 2:37 UTC (permalink / raw)
To: bitbake-devel, Richard Purdie, Joshua Watt
From: Paul Barker <pbarker@konsulko.com>
If an async task raises an exception it may cause the asyncio loop to
hang or may otherwise leave our server in a bad state. To avoid these
issues we can add an exception handler to the asyncio loop which
attempts to gracefully shut down the server if an unhandled exception
occurs.
Signed-off-by: Paul Barker <pbarker@konsulko.com>
[updated for asyncrpc SIGTERM handling changes]
Signed-off-by: Scott Murray <scott.murray@konsulko.com>
---
lib/bb/asyncrpc/serv.py | 6 ++++++
1 file changed, 6 insertions(+)
diff --git a/lib/bb/asyncrpc/serv.py b/lib/bb/asyncrpc/serv.py
index 1b7b7a6d..167b30c7 100644
--- a/lib/bb/asyncrpc/serv.py
+++ b/lib/bb/asyncrpc/serv.py
@@ -209,6 +209,11 @@ class AsyncServer(object):
self.logger.debug("Got exit signal")
self.loop.create_task(self.shutdown())
+ def handle_exception(self, loop, context):
+ msg = context.get("exception", context["message"])
+ self.logger.error("Caught exception: %s" % (msg))
+ loop.create_task(self.shutdown())
+
async def shutdown(self):
self.logger.debug('Server shutting down')
@@ -227,6 +232,7 @@ class AsyncServer(object):
def serve_forever(self):
asyncio.set_event_loop(self.loop)
try:
+ self.loop.set_exception_handler(self.handle_exception)
self.loop.add_signal_handler(signal.SIGTERM, self.handle_signal)
signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGTERM])
self.loop.add_signal_handler(signal.SIGINT, self.handle_signal)
--
2.31.1
^ permalink raw reply related [flat|nested] 10+ messages in thread
* [PATCH v4 4/5] prserv: Replace XML RPC with modern asyncrpc implementation
2021-07-27 2:37 [PATCH v4 0/5] Re-implement prserv on top of asyncrpc Scott Murray
` (2 preceding siblings ...)
2021-07-27 2:37 ` [PATCH v4 3/5] asyncrpc: Handle exceptions Scott Murray
@ 2021-07-27 2:37 ` Scott Murray
2021-07-27 2:37 ` [PATCH v4 5/5] prserv: Add read-only mode Scott Murray
` (2 subsequent siblings)
6 siblings, 0 replies; 10+ messages in thread
From: Scott Murray @ 2021-07-27 2:37 UTC (permalink / raw)
To: bitbake-devel, Richard Purdie, Joshua Watt
From: Paul Barker <pbarker@konsulko.com>
Update the prserv client and server classes to use the modern json and
asyncio based RPC system implemented by the asyncrpc module.
Signed-off-by: Paul Barker <pbarker@konsulko.com>
[updated for asyncrpc SIGTERM handling changes]
Signed-off-by: Scott Murray <scott.murray@konsulko.com>
---
lib/prserv/serv.py | 265 +++++++++++++++++++++++----------------------
1 file changed, 136 insertions(+), 129 deletions(-)
diff --git a/lib/prserv/serv.py b/lib/prserv/serv.py
index 5e322bf8..2c572e0f 100644
--- a/lib/prserv/serv.py
+++ b/lib/prserv/serv.py
@@ -4,157 +4,158 @@
import os,sys,logging
import signal, time
-from xmlrpc.server import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler
import socket
import io
import sqlite3
-import bb.server.xmlrpcclient
import prserv
import prserv.db
import errno
-import multiprocessing
+import bb.asyncrpc
logger = logging.getLogger("BitBake.PRserv")
-class Handler(SimpleXMLRPCRequestHandler):
- def _dispatch(self,method,params):
- try:
- value=self.server.funcs[method](*params)
- except:
- import traceback
- traceback.print_exc()
- raise
- return value
-
PIDPREFIX = "/tmp/PRServer_%s_%s.pid"
singleton = None
+class PRServerClient(bb.asyncrpc.AsyncServerConnection):
+ def __init__(self, reader, writer, table):
+ super().__init__(reader, writer, 'PRSERVICE', logger)
+ self.handlers.update({
+ 'get-pr': self.handle_get_pr,
+ 'import-one': self.handle_import_one,
+ 'export': self.handle_export,
+ })
+ self.table = table
-class PRServer(SimpleXMLRPCServer):
- def __init__(self, dbfile, logfile, interface):
- ''' constructor '''
- try:
- SimpleXMLRPCServer.__init__(self, interface,
- logRequests=False, allow_none=True)
- except socket.error:
- ip=socket.gethostbyname(interface[0])
- port=interface[1]
- msg="PR Server unable to bind to %s:%s\n" % (ip, port)
- sys.stderr.write(msg)
- raise PRServiceConfigError
-
- self.dbfile=dbfile
- self.logfile=logfile
- self.host, self.port = self.socket.getsockname()
-
- self.register_function(self.getPR, "getPR")
- self.register_function(self.ping, "ping")
- self.register_function(self.export, "export")
- self.register_function(self.importone, "importone")
- self.register_introspection_functions()
-
- self.iter_count = 0
- # 60 iterations between syncs or sync if dirty every ~30 seconds
- self.iterations_between_sync = 60
-
- def sigint_handler(self, signum, stack):
- if self.table:
- self.table.sync()
-
- def sigterm_handler(self, signum, stack):
- if self.table:
- self.table.sync()
- raise(SystemExit)
+ def validate_proto_version(self):
+ return (self.proto_version == (1, 0))
- def process_request(self, request, client_address):
- if request is None:
- return
+ async def dispatch_message(self, msg):
try:
- self.finish_request(request, client_address)
- self.shutdown_request(request)
- self.iter_count = (self.iter_count + 1) % self.iterations_between_sync
- if self.iter_count == 0:
- self.table.sync_if_dirty()
+ await super().dispatch_message(msg)
except:
- self.handle_error(request, client_address)
- self.shutdown_request(request)
self.table.sync()
- self.table.sync_if_dirty()
+ raise
- def serve_forever(self, poll_interval=0.5):
- signal.signal(signal.SIGINT, self.sigint_handler)
- signal.signal(signal.SIGTERM, self.sigterm_handler)
+ self.table.sync_if_dirty()
- self.db = prserv.db.PRData(self.dbfile)
- self.table = self.db["PRMAIN"]
- return super().serve_forever(poll_interval)
+ async def handle_get_pr(self, request):
+ version = request['version']
+ pkgarch = request['pkgarch']
+ checksum = request['checksum']
- def export(self, version=None, pkgarch=None, checksum=None, colinfo=True):
+ response = None
try:
- return self.table.export(version, pkgarch, checksum, colinfo)
+ value = self.table.getValue(version, pkgarch, checksum)
+ response = {'value': value}
+ except prserv.NotFoundError:
+ logger.error("can not find value for (%s, %s)",version, checksum)
except sqlite3.Error as exc:
logger.error(str(exc))
- return None
- def importone(self, version, pkgarch, checksum, value):
- return self.table.importone(version, pkgarch, checksum, value)
+ self.write_message(response)
- def ping(self):
- return True
+ async def handle_import_one(self, request):
+ version = request['version']
+ pkgarch = request['pkgarch']
+ checksum = request['checksum']
+ value = request['value']
+
+ value = self.table.importone(version, pkgarch, checksum, value)
+ if value is not None:
+ response = {'value': value}
+ else:
+ response = None
+ self.write_message(response)
- def getinfo(self):
- return (self.host, self.port)
+ async def handle_export(self, request):
+ version = request['version']
+ pkgarch = request['pkgarch']
+ checksum = request['checksum']
+ colinfo = request['colinfo']
- def getPR(self, version, pkgarch, checksum):
try:
- return self.table.getValue(version, pkgarch, checksum)
- except prserv.NotFoundError:
- logger.error("can not find value for (%s, %s)",version, checksum)
- return None
+ (metainfo, datainfo) = self.table.export(version, pkgarch, checksum, colinfo)
except sqlite3.Error as exc:
logger.error(str(exc))
- return None
+ metainfo = datainfo = None
-class PRServSingleton(object):
- def __init__(self, dbfile, logfile, interface):
- self.dbfile = dbfile
- self.logfile = logfile
- self.interface = interface
- self.host = None
- self.port = None
+ response = {'metainfo': metainfo, 'datainfo': datainfo}
+ self.write_message(response)
- def start(self):
- self.prserv = PRServer(self.dbfile, self.logfile, self.interface)
- self.process = multiprocessing.Process(target=self.prserv.serve_forever)
- self.process.start()
+class PRServer(bb.asyncrpc.AsyncServer):
+ def __init__(self, dbfile, loop=None):
+ super().__init__(logger, loop)
+ self.dbfile = dbfile
+ self.table = None
- self.host, self.port = self.prserv.getinfo()
+ def accept_client(self, reader, writer):
+ return PRServerClient(reader, writer, self.table)
- def getinfo(self):
- return (self.host, self.port)
+ def serve_forever(self):
+ self.db = prserv.db.PRData(self.dbfile)
+ self.table = self.db["PRMAIN"]
-class PRServerConnection(object):
- def __init__(self, host, port):
- if is_local_special(host, port):
- host, port = singleton.getinfo()
- self.host = host
- self.port = port
- self.connection, self.transport = bb.server.xmlrpcclient._create_server(self.host, self.port)
+ logger.debug("Started PRServer with DBfile: %s, Address: %s, PID: %s" %
+ (self.dbfile, self.address, str(os.getpid())))
- def getPR(self, version, pkgarch, checksum):
- return self.connection.getPR(version, pkgarch, checksum)
+ super().serve_forever()
- def ping(self):
- return self.connection.ping()
+ self.table.sync_if_dirty()
+ self.db.disconnect()
- def export(self,version=None, pkgarch=None, checksum=None, colinfo=True):
- return self.connection.export(version, pkgarch, checksum, colinfo)
+ def signal_handler(self):
+ super().signal_handler()
+ if self.table:
+ self.table.sync()
- def importone(self, version, pkgarch, checksum, value):
- return self.connection.importone(version, pkgarch, checksum, value)
+class PRServSingleton(object):
+ def __init__(self, dbfile, logfile, host, port):
+ self.dbfile = dbfile
+ self.logfile = logfile
+ self.host = host
+ self.port = port
- def getinfo(self):
- return self.host, self.port
+ def start(self):
+ self.prserv = PRServer(self.dbfile)
+ self.prserv.start_tcp_server(self.host, self.port)
+ self.process = self.prserv.serve_as_process()
+
+ if not self.port:
+ self.port = int(self.prserv.address.rsplit(':', 1)[1])
+
+class PRAsyncClient(bb.asyncrpc.AsyncClient):
+ def __init__(self):
+ super().__init__('PRSERVICE', '1.0', logger)
+
+ async def getPR(self, version, pkgarch, checksum):
+ response = await self.send_message(
+ {'get-pr': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum}}
+ )
+ if response:
+ return response['value']
+
+ async def importone(self, version, pkgarch, checksum, value):
+ response = await self.send_message(
+ {'import-one': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum, 'value': value}}
+ )
+ if response:
+ return response['value']
+
+ async def export(self, version, pkgarch, checksum, colinfo):
+ response = await self.send_message(
+ {'export': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum, 'colinfo': colinfo}}
+ )
+ if response:
+ return (response['metainfo'], response['datainfo'])
+
+class PRClient(bb.asyncrpc.Client):
+ def __init__(self):
+ super().__init__()
+ self._add_methods('getPR', 'importone', 'export')
+
+ def _get_async_client(self):
+ return PRAsyncClient()
def run_as_daemon(func, pidfile, logfile):
"""
@@ -240,15 +241,13 @@ def start_daemon(dbfile, host, port, logfile):
% pidfile)
return 1
- server = PRServer(os.path.abspath(dbfile), os.path.abspath(logfile), (ip,port))
- run_as_daemon(server.serve_forever, pidfile, os.path.abspath(logfile))
+ dbfile = os.path.abspath(dbfile)
+ def daemon_main():
+ server = PRServer(dbfile)
+ server.start_tcp_server(host, port)
+ server.serve_forever()
- # Sometimes, the port (i.e. localhost:0) indicated by the user does not match with
- # the one the server actually is listening, so at least warn the user about it
- _,rport = server.getinfo()
- if port != rport:
- sys.stdout.write("Server is listening at port %s instead of %s\n"
- % (rport,port))
+ run_as_daemon(daemon_main, pidfile, os.path.abspath(logfile))
return 0
def stop_daemon(host, port):
@@ -302,7 +301,7 @@ def is_running(pid):
return True
def is_local_special(host, port):
- if host.strip().upper() == 'localhost'.upper() and (not port):
+ if host.strip().lower() == 'localhost' and not port:
return True
else:
return False
@@ -340,20 +339,19 @@ def auto_start(d):
auto_shutdown()
if not singleton:
bb.utils.mkdirhier(cachedir)
- singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), ("localhost",0))
+ singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), "localhost", 0)
singleton.start()
if singleton:
- host, port = singleton.getinfo()
+ host = singleton.host
+ port = singleton.port
else:
host = host_params[0]
port = int(host_params[1])
try:
- connection = PRServerConnection(host,port)
- connection.ping()
- realhost, realport = connection.getinfo()
- return str(realhost) + ":" + str(realport)
-
+ ping(host, port)
+ return str(host) + ":" + str(port)
+
except Exception:
logger.critical("PRservice %s:%d not available" % (host, port))
raise PRServiceConfigError
@@ -366,8 +364,17 @@ def auto_shutdown():
singleton = None
def ping(host, port):
- conn=PRServerConnection(host, port)
+ conn=PRClient()
+ conn.connect_tcp(host, port)
return conn.ping()
def connect(host, port):
- return PRServerConnection(host, port)
+ global singleton
+
+ if host.strip().lower() == 'localhost' and not port:
+ host = 'localhost'
+ port = singleton.port
+
+ conn = PRClient()
+ conn.connect_tcp(host, port)
+ return conn
--
2.31.1
^ permalink raw reply related [flat|nested] 10+ messages in thread
* [PATCH v4 5/5] prserv: Add read-only mode
2021-07-27 2:37 [PATCH v4 0/5] Re-implement prserv on top of asyncrpc Scott Murray
` (3 preceding siblings ...)
2021-07-27 2:37 ` [PATCH v4 4/5] prserv: Replace XML RPC with modern asyncrpc implementation Scott Murray
@ 2021-07-27 2:37 ` Scott Murray
2021-07-27 12:26 ` [PATCH v4 0/5] Re-implement prserv on top of asyncrpc Richard Purdie
[not found] ` <1695A57BEF93A6B5.3345@lists.openembedded.org>
6 siblings, 0 replies; 10+ messages in thread
From: Scott Murray @ 2021-07-27 2:37 UTC (permalink / raw)
To: bitbake-devel, Richard Purdie, Joshua Watt
From: Paul Barker <pbarker@konsulko.com>
Signed-off-by: Paul Barker <pbarker@konsulko.com>
---
bin/bitbake-prserv | 4 ++-
lib/prserv/db.py | 65 ++++++++++++++++++++++++++++++++++++----------
lib/prserv/serv.py | 49 ++++++++++++++++++++++------------
3 files changed, 86 insertions(+), 32 deletions(-)
diff --git a/bin/bitbake-prserv b/bin/bitbake-prserv
index 1e9b6cbc..bef5ef68 100755
--- a/bin/bitbake-prserv
+++ b/bin/bitbake-prserv
@@ -36,12 +36,14 @@ def main():
dest="host", type="string", default=PRHOST_DEFAULT)
parser.add_option("--port", help="port number(default: 8585)", action="store",
dest="port", type="int", default=PRPORT_DEFAULT)
+ parser.add_option("-r", "--read-only", help="open database in read-only mode",
+ action="store_true")
options, args = parser.parse_args(sys.argv)
prserv.init_logger(os.path.abspath(options.logfile),options.loglevel)
if options.start:
- ret=prserv.serv.start_daemon(options.dbfile, options.host, options.port,os.path.abspath(options.logfile))
+ ret=prserv.serv.start_daemon(options.dbfile, options.host, options.port,os.path.abspath(options.logfile), options.read_only)
elif options.stop:
ret=prserv.serv.stop_daemon(options.host, options.port)
else:
diff --git a/lib/prserv/db.py b/lib/prserv/db.py
index cb2a2461..2710d4a2 100644
--- a/lib/prserv/db.py
+++ b/lib/prserv/db.py
@@ -30,21 +30,29 @@ if sqlversion[0] < 3 or (sqlversion[0] == 3 and sqlversion[1] < 3):
#
class PRTable(object):
- def __init__(self, conn, table, nohist):
+ def __init__(self, conn, table, nohist, read_only):
self.conn = conn
self.nohist = nohist
+ self.read_only = read_only
self.dirty = False
if nohist:
self.table = "%s_nohist" % table
else:
self.table = "%s_hist" % table
- self._execute("CREATE TABLE IF NOT EXISTS %s \
- (version TEXT NOT NULL, \
- pkgarch TEXT NOT NULL, \
- checksum TEXT NOT NULL, \
- value INTEGER, \
- PRIMARY KEY (version, pkgarch, checksum));" % self.table)
+ if self.read_only:
+ table_exists = self._execute(
+ "SELECT count(*) FROM sqlite_master \
+ WHERE type='table' AND name='%s'" % (self.table))
+ if not table_exists:
+ raise prserv.NotFoundError
+ else:
+ self._execute("CREATE TABLE IF NOT EXISTS %s \
+ (version TEXT NOT NULL, \
+ pkgarch TEXT NOT NULL, \
+ checksum TEXT NOT NULL, \
+ value INTEGER, \
+ PRIMARY KEY (version, pkgarch, checksum));" % self.table)
def _execute(self, *query):
"""Execute a query, waiting to acquire a lock if necessary"""
@@ -59,8 +67,9 @@ class PRTable(object):
raise exc
def sync(self):
- self.conn.commit()
- self._execute("BEGIN EXCLUSIVE TRANSACTION")
+ if not self.read_only:
+ self.conn.commit()
+ self._execute("BEGIN EXCLUSIVE TRANSACTION")
def sync_if_dirty(self):
if self.dirty:
@@ -75,6 +84,15 @@ class PRTable(object):
return row[0]
else:
#no value found, try to insert
+ if self.read_only:
+ data = self._execute("SELECT ifnull(max(value)+1,0) FROM %s where version=? AND pkgarch=?;" % (self.table),
+ (version, pkgarch))
+ row = data.fetchone()
+ if row is not None:
+ return row[0]
+ else:
+ return 0
+
try:
self._execute("INSERT INTO %s VALUES (?, ?, ?, (select ifnull(max(value)+1,0) from %s where version=? AND pkgarch=?));"
% (self.table,self.table),
@@ -103,6 +121,15 @@ class PRTable(object):
return row[0]
else:
#no value found, try to insert
+ if self.read_only:
+ data = self._execute("SELECT ifnull(max(value)+1,0) FROM %s where version=? AND pkgarch=?;" % (self.table),
+ (version, pkgarch))
+ row = data.fetchone()
+ if row is not None:
+ return row[0]
+ else:
+ return 0
+
try:
self._execute("INSERT OR REPLACE INTO %s VALUES (?, ?, ?, (select ifnull(max(value)+1,0) from %s where version=? AND pkgarch=?));"
% (self.table,self.table),
@@ -128,6 +155,9 @@ class PRTable(object):
return self._getValueHist(version, pkgarch, checksum)
def _importHist(self, version, pkgarch, checksum, value):
+ if self.read_only:
+ return None
+
val = None
data = self._execute("SELECT value FROM %s WHERE version=? AND pkgarch=? AND checksum=?;" % self.table,
(version, pkgarch, checksum))
@@ -152,6 +182,9 @@ class PRTable(object):
return val
def _importNohist(self, version, pkgarch, checksum, value):
+ if self.read_only:
+ return None
+
try:
#try to insert
self._execute("INSERT INTO %s VALUES (?, ?, ?, ?);" % (self.table),
@@ -245,19 +278,23 @@ class PRTable(object):
class PRData(object):
"""Object representing the PR database"""
- def __init__(self, filename, nohist=True):
+ def __init__(self, filename, nohist=True, read_only=False):
self.filename=os.path.abspath(filename)
self.nohist=nohist
+ self.read_only = read_only
#build directory hierarchy
try:
os.makedirs(os.path.dirname(self.filename))
except OSError as e:
if e.errno != errno.EEXIST:
raise e
- self.connection=sqlite3.connect(self.filename, isolation_level="EXCLUSIVE", check_same_thread = False)
+ uri = "file:%s%s" % (self.filename, "?mode=ro" if self.read_only else "")
+ logger.debug("Opening PRServ database '%s'" % (uri))
+ self.connection=sqlite3.connect(uri, uri=True, isolation_level="EXCLUSIVE", check_same_thread = False)
self.connection.row_factory=sqlite3.Row
- self.connection.execute("pragma synchronous = off;")
- self.connection.execute("PRAGMA journal_mode = MEMORY;")
+ if not self.read_only:
+ self.connection.execute("pragma synchronous = off;")
+ self.connection.execute("PRAGMA journal_mode = MEMORY;")
self._tables={}
def disconnect(self):
@@ -270,7 +307,7 @@ class PRData(object):
if tblname in self._tables:
return self._tables[tblname]
else:
- tableobj = self._tables[tblname] = PRTable(self.connection, tblname, self.nohist)
+ tableobj = self._tables[tblname] = PRTable(self.connection, tblname, self.nohist, self.read_only)
return tableobj
def __delitem__(self, tblname):
diff --git a/lib/prserv/serv.py b/lib/prserv/serv.py
index 2c572e0f..618bd452 100644
--- a/lib/prserv/serv.py
+++ b/lib/prserv/serv.py
@@ -18,14 +18,16 @@ PIDPREFIX = "/tmp/PRServer_%s_%s.pid"
singleton = None
class PRServerClient(bb.asyncrpc.AsyncServerConnection):
- def __init__(self, reader, writer, table):
+ def __init__(self, reader, writer, table, read_only):
super().__init__(reader, writer, 'PRSERVICE', logger)
self.handlers.update({
'get-pr': self.handle_get_pr,
'import-one': self.handle_import_one,
'export': self.handle_export,
+ 'is-readonly': self.handle_is_readonly,
})
self.table = table
+ self.read_only = read_only
def validate_proto_version(self):
return (self.proto_version == (1, 0))
@@ -56,16 +58,17 @@ class PRServerClient(bb.asyncrpc.AsyncServerConnection):
self.write_message(response)
async def handle_import_one(self, request):
- version = request['version']
- pkgarch = request['pkgarch']
- checksum = request['checksum']
- value = request['value']
+ response = None
+ if not self.read_only:
+ version = request['version']
+ pkgarch = request['pkgarch']
+ checksum = request['checksum']
+ value = request['value']
+
+ value = self.table.importone(version, pkgarch, checksum, value)
+ if value is not None:
+ response = {'value': value}
- value = self.table.importone(version, pkgarch, checksum, value)
- if value is not None:
- response = {'value': value}
- else:
- response = None
self.write_message(response)
async def handle_export(self, request):
@@ -83,20 +86,25 @@ class PRServerClient(bb.asyncrpc.AsyncServerConnection):
response = {'metainfo': metainfo, 'datainfo': datainfo}
self.write_message(response)
+ async def handle_is_readonly(self, request):
+ response = {'readonly': self.read_only}
+ self.write_message(response)
+
class PRServer(bb.asyncrpc.AsyncServer):
- def __init__(self, dbfile, loop=None):
+ def __init__(self, dbfile, loop=None, read_only=False):
super().__init__(logger, loop)
self.dbfile = dbfile
self.table = None
+ self.read_only = read_only
def accept_client(self, reader, writer):
- return PRServerClient(reader, writer, self.table)
+ return PRServerClient(reader, writer, self.table, self.read_only)
def serve_forever(self):
- self.db = prserv.db.PRData(self.dbfile)
+ self.db = prserv.db.PRData(self.dbfile, read_only=self.read_only)
self.table = self.db["PRMAIN"]
- logger.debug("Started PRServer with DBfile: %s, Address: %s, PID: %s" %
+ logger.info("Started PRServer with DBfile: %s, Address: %s, PID: %s" %
(self.dbfile, self.address, str(os.getpid())))
super().serve_forever()
@@ -149,10 +157,17 @@ class PRAsyncClient(bb.asyncrpc.AsyncClient):
if response:
return (response['metainfo'], response['datainfo'])
+ async def is_readonly(self):
+ response = await self.send_message(
+ {'is-readonly': {}}
+ )
+ if response:
+ return response['readonly']
+
class PRClient(bb.asyncrpc.Client):
def __init__(self):
super().__init__()
- self._add_methods('getPR', 'importone', 'export')
+ self._add_methods('getPR', 'importone', 'export', 'is_readonly')
def _get_async_client(self):
return PRAsyncClient()
@@ -227,7 +242,7 @@ def run_as_daemon(func, pidfile, logfile):
os.remove(pidfile)
os._exit(0)
-def start_daemon(dbfile, host, port, logfile):
+def start_daemon(dbfile, host, port, logfile, read_only=False):
ip = socket.gethostbyname(host)
pidfile = PIDPREFIX % (ip, port)
try:
@@ -243,7 +258,7 @@ def start_daemon(dbfile, host, port, logfile):
dbfile = os.path.abspath(dbfile)
def daemon_main():
- server = PRServer(dbfile)
+ server = PRServer(dbfile, read_only=read_only)
server.start_tcp_server(host, port)
server.serve_forever()
--
2.31.1
^ permalink raw reply related [flat|nested] 10+ messages in thread
* Re: [PATCH v4 0/5] Re-implement prserv on top of asyncrpc
2021-07-27 2:37 [PATCH v4 0/5] Re-implement prserv on top of asyncrpc Scott Murray
` (4 preceding siblings ...)
2021-07-27 2:37 ` [PATCH v4 5/5] prserv: Add read-only mode Scott Murray
@ 2021-07-27 12:26 ` Richard Purdie
[not found] ` <1695A57BEF93A6B5.3345@lists.openembedded.org>
6 siblings, 0 replies; 10+ messages in thread
From: Richard Purdie @ 2021-07-27 12:26 UTC (permalink / raw)
To: Scott Murray, bitbake-devel, Joshua Watt
On Mon, 2021-07-26 at 22:37 -0400, Scott Murray wrote:
> These changes replace the old XML-based RPC system in prserv with the
> new asyncrpc implementation originally used by hashserv, and add a
> read-only mode to match the hash equivalency server's support.
>
> Changes from v3:
> * Scott Murray taking over upstreaming effort from Paul Barker.
>
> * Dropped patches which are currently applied to master-next, this
> series should be applied on top of the current master-next branch.
>
> * Patches 2-4 updated by Scott Murray to rebase on top of 3983643
> ("bitbake: asyncrpc: Catch early SIGTERM").
>
> * Read-only PR server support patch added to stack to get it into
> the review process.
>
> Paul Barker (5):
> asyncrpc: Wait on writers to close with Python 3.7+
> asyncrpc: Ensure that asyncio shutdown is clean
> asyncrpc: Handle exceptions
> prserv: Replace XML RPC with modern asyncrpc implementation
> prserv: Add read-only mode
There is an odd failure on the autobuilder which seems related to this.
https://autobuilder.yoctoproject.org/typhoon/#/builders/75/builds/3746
and looking at the bitbake-cookerdaemon.log:
1958 12:03:42.046562 Running command ['clientComplete']
1958 12:03:42.046659 Command Completed
1958 12:03:42.047003 Processing Client
1958 12:03:42.047061 Disconnecting Client
1958 12:03:42.047215 No timeout, exiting.
1958 12:03:42.147541 Exiting
Caught exception: type object '_asyncio.Task' has no attribute 'all_tasks'
Caught exception: type object '_asyncio.Task' has no attribute 'all_tasks'
Caught exception: type object '_asyncio.Task' has no attribute 'all_tasks'
Caught exception: type object '_asyncio.Task' has no attribute 'all_tasks'
with this last line repeating a 'lot' (log is over 1GB in size).
Cheers,
Richard
^ permalink raw reply [flat|nested] 10+ messages in thread
* Re: [bitbake-devel] [PATCH v4 0/5] Re-implement prserv on top of asyncrpc
[not found] ` <1695A57BEF93A6B5.3345@lists.openembedded.org>
@ 2021-07-27 12:28 ` Richard Purdie
2021-07-27 13:13 ` Paul Barker
0 siblings, 1 reply; 10+ messages in thread
From: Richard Purdie @ 2021-07-27 12:28 UTC (permalink / raw)
To: Scott Murray, bitbake-devel, Joshua Watt
On Tue, 2021-07-27 at 13:26 +0100, Richard Purdie via lists.openembedded.org wrote:
> On Mon, 2021-07-26 at 22:37 -0400, Scott Murray wrote:
> > These changes replace the old XML-based RPC system in prserv with the
> > new asyncrpc implementation originally used by hashserv, and add a
> > read-only mode to match the hash equivalency server's support.
> >
> > Changes from v3:
> > * Scott Murray taking over upstreaming effort from Paul Barker.
> >
> > * Dropped patches which are currently applied to master-next, this
> > series should be applied on top of the current master-next branch.
> >
> > * Patches 2-4 updated by Scott Murray to rebase on top of 3983643
> > ("bitbake: asyncrpc: Catch early SIGTERM").
> >
> > * Read-only PR server support patch added to stack to get it into
> > the review process.
> >
> > Paul Barker (5):
> > asyncrpc: Wait on writers to close with Python 3.7+
> > asyncrpc: Ensure that asyncio shutdown is clean
> > asyncrpc: Handle exceptions
> > prserv: Replace XML RPC with modern asyncrpc implementation
> > prserv: Add read-only mode
>
> There is an odd failure on the autobuilder which seems related to this.
>
> https://autobuilder.yoctoproject.org/typhoon/#/builders/75/builds/3746
>
> and looking at the bitbake-cookerdaemon.log:
>
> 1958 12:03:42.046562 Running command ['clientComplete']
> 1958 12:03:42.046659 Command Completed
> 1958 12:03:42.047003 Processing Client
> 1958 12:03:42.047061 Disconnecting Client
> 1958 12:03:42.047215 No timeout, exiting.
> 1958 12:03:42.147541 Exiting
> Caught exception: type object '_asyncio.Task' has no attribute 'all_tasks'
> Caught exception: type object '_asyncio.Task' has no attribute 'all_tasks'
> Caught exception: type object '_asyncio.Task' has no attribute 'all_tasks'
> Caught exception: type object '_asyncio.Task' has no attribute 'all_tasks'
>
> with this last line repeating a 'lot' (log is over 1GB in size).
I should add, this is after the bitbake-layers command used to add the layer during
the setup of that build. The later command just times out as the server process
remains and is locked up.
Cheers,
Richard
^ permalink raw reply [flat|nested] 10+ messages in thread
* Re: [bitbake-devel] [PATCH v4 0/5] Re-implement prserv on top of asyncrpc
2021-07-27 12:28 ` [bitbake-devel] " Richard Purdie
@ 2021-07-27 13:13 ` Paul Barker
2021-07-27 14:46 ` Richard Purdie
0 siblings, 1 reply; 10+ messages in thread
From: Paul Barker @ 2021-07-27 13:13 UTC (permalink / raw)
To: Richard Purdie; +Cc: Scott Murray, bitbake-devel, Joshua Watt
[-- Attachment #1: Type: text/plain, Size: 2811 bytes --]
On Tue, 27 Jul 2021 13:28:30 +0100
"Richard Purdie" <richard.purdie@linuxfoundation.org> wrote:
> On Tue, 2021-07-27 at 13:26 +0100, Richard Purdie via lists.openembedded.org wrote:
> > On Mon, 2021-07-26 at 22:37 -0400, Scott Murray wrote:
> > > These changes replace the old XML-based RPC system in prserv with the
> > > new asyncrpc implementation originally used by hashserv, and add a
> > > read-only mode to match the hash equivalency server's support.
> > >
> > > Changes from v3:
> > > * Scott Murray taking over upstreaming effort from Paul Barker.
> > >
> > > * Dropped patches which are currently applied to master-next, this
> > > series should be applied on top of the current master-next branch.
> > >
> > > * Patches 2-4 updated by Scott Murray to rebase on top of 3983643
> > > ("bitbake: asyncrpc: Catch early SIGTERM").
> > >
> > > * Read-only PR server support patch added to stack to get it into
> > > the review process.
> > >
> > > Paul Barker (5):
> > > asyncrpc: Wait on writers to close with Python 3.7+
> > > asyncrpc: Ensure that asyncio shutdown is clean
> > > asyncrpc: Handle exceptions
> > > prserv: Replace XML RPC with modern asyncrpc implementation
> > > prserv: Add read-only mode
> >
> > There is an odd failure on the autobuilder which seems related to this.
> >
> > https://autobuilder.yoctoproject.org/typhoon/#/builders/75/builds/3746
> >
> > and looking at the bitbake-cookerdaemon.log:
> >
> > 1958 12:03:42.046562 Running command ['clientComplete']
> > 1958 12:03:42.046659 Command Completed
> > 1958 12:03:42.047003 Processing Client
> > 1958 12:03:42.047061 Disconnecting Client
> > 1958 12:03:42.047215 No timeout, exiting.
> > 1958 12:03:42.147541 Exiting
> > Caught exception: type object '_asyncio.Task' has no attribute 'all_tasks'
> > Caught exception: type object '_asyncio.Task' has no attribute 'all_tasks'
> > Caught exception: type object '_asyncio.Task' has no attribute 'all_tasks'
> > Caught exception: type object '_asyncio.Task' has no attribute 'all_tasks'
> >
> > with this last line repeating a 'lot' (log is over 1GB in size).
>
> I should add, this is after the bitbake-layers command used to add the layer during
> the setup of that build. The later command just times out as the server process
> remains and is locked up.
What's the Python version there? I've just re-checked the docs for
asyncio.Task.all_tasks():
This method is deprecated and will be removed in Python 3.9. Use the
asyncio.all_tasks() function instead.
asyncio.all_tasks() was added in Python 3.7. So we need to use
asyncio.Task.all_tasks() before 3.7, asyncio.all_tasks() after 3.7.
--
Paul Barker
https://pbarker.dev/
[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 235 bytes --]
^ permalink raw reply [flat|nested] 10+ messages in thread
* Re: [bitbake-devel] [PATCH v4 0/5] Re-implement prserv on top of asyncrpc
2021-07-27 13:13 ` Paul Barker
@ 2021-07-27 14:46 ` Richard Purdie
0 siblings, 0 replies; 10+ messages in thread
From: Richard Purdie @ 2021-07-27 14:46 UTC (permalink / raw)
To: Paul Barker; +Cc: Scott Murray, bitbake-devel, Joshua Watt
On Tue, 2021-07-27 at 14:13 +0100, Paul Barker wrote:
> On Tue, 27 Jul 2021 13:28:30 +0100
> "Richard Purdie" <richard.purdie@linuxfoundation.org> wrote:
>
> > On Tue, 2021-07-27 at 13:26 +0100, Richard Purdie via lists.openembedded.org wrote:
> > > On Mon, 2021-07-26 at 22:37 -0400, Scott Murray wrote:
> > > > These changes replace the old XML-based RPC system in prserv with the
> > > > new asyncrpc implementation originally used by hashserv, and add a
> > > > read-only mode to match the hash equivalency server's support.
> > > >
> > > > Changes from v3:
> > > > * Scott Murray taking over upstreaming effort from Paul Barker.
> > > >
> > > > * Dropped patches which are currently applied to master-next, this
> > > > series should be applied on top of the current master-next branch.
> > > >
> > > > * Patches 2-4 updated by Scott Murray to rebase on top of 3983643
> > > > ("bitbake: asyncrpc: Catch early SIGTERM").
> > > >
> > > > * Read-only PR server support patch added to stack to get it into
> > > > the review process.
> > > >
> > > > Paul Barker (5):
> > > > asyncrpc: Wait on writers to close with Python 3.7+
> > > > asyncrpc: Ensure that asyncio shutdown is clean
> > > > asyncrpc: Handle exceptions
> > > > prserv: Replace XML RPC with modern asyncrpc implementation
> > > > prserv: Add read-only mode
> > >
> > > There is an odd failure on the autobuilder which seems related to this.
> > >
> > > https://autobuilder.yoctoproject.org/typhoon/#/builders/75/builds/3746
> > >
> > > and looking at the bitbake-cookerdaemon.log:
> > >
> > > 1958 12:03:42.046562 Running command ['clientComplete']
> > > 1958 12:03:42.046659 Command Completed
> > > 1958 12:03:42.047003 Processing Client
> > > 1958 12:03:42.047061 Disconnecting Client
> > > 1958 12:03:42.047215 No timeout, exiting.
> > > 1958 12:03:42.147541 Exiting
> > > Caught exception: type object '_asyncio.Task' has no attribute 'all_tasks'
> > > Caught exception: type object '_asyncio.Task' has no attribute 'all_tasks'
> > > Caught exception: type object '_asyncio.Task' has no attribute 'all_tasks'
> > > Caught exception: type object '_asyncio.Task' has no attribute 'all_tasks'
> > >
> > > with this last line repeating a 'lot' (log is over 1GB in size).
> >
> > I should add, this is after the bitbake-layers command used to add the layer during
> > the setup of that build. The later command just times out as the server process
> > remains and is locked up.
>
> What's the Python version there? I've just re-checked the docs for
> asyncio.Task.all_tasks():
>
> This method is deprecated and will be removed in Python 3.9. Use the
> asyncio.all_tasks() function instead.
>
> asyncio.all_tasks() was added in Python 3.7. So we need to use
> asyncio.Task.all_tasks() before 3.7, asyncio.all_tasks() after 3.7.
That worker used buildtools tarball which would have 3.9 in it.
Cheers,
Richard
^ permalink raw reply [flat|nested] 10+ messages in thread
end of thread, other threads:[~2021-07-27 14:46 UTC | newest]
Thread overview: 10+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-07-27 2:37 [PATCH v4 0/5] Re-implement prserv on top of asyncrpc Scott Murray
2021-07-27 2:37 ` [PATCH v4 1/5] asyncrpc: Wait on writers to close with Python 3.7+ Scott Murray
2021-07-27 2:37 ` [PATCH v4 2/5] asyncrpc: Ensure that asyncio shutdown is clean Scott Murray
2021-07-27 2:37 ` [PATCH v4 3/5] asyncrpc: Handle exceptions Scott Murray
2021-07-27 2:37 ` [PATCH v4 4/5] prserv: Replace XML RPC with modern asyncrpc implementation Scott Murray
2021-07-27 2:37 ` [PATCH v4 5/5] prserv: Add read-only mode Scott Murray
2021-07-27 12:26 ` [PATCH v4 0/5] Re-implement prserv on top of asyncrpc Richard Purdie
[not found] ` <1695A57BEF93A6B5.3345@lists.openembedded.org>
2021-07-27 12:28 ` [bitbake-devel] " Richard Purdie
2021-07-27 13:13 ` Paul Barker
2021-07-27 14:46 ` Richard Purdie
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).