All of lore.kernel.org
 help / color / mirror / Atom feed
* [bitbake-devel][PATCH] bitbake: asyncrpc: Catch early SIGTERM
@ 2021-07-22 16:19 Joshua Watt
  2021-07-23 17:05 ` Scott Murray
  0 siblings, 1 reply; 2+ messages in thread
From: Joshua Watt @ 2021-07-22 16:19 UTC (permalink / raw)
  To: bitbake-devel; +Cc: Scott Murray, richard.purdie, Joshua Watt

If the SIGTERM signal is sent to an asyncrpc server before it has
installed the SIGTERM handler in the main loop, it may miss the signal
which will can cause the calling process to wait forever on the join().
To resolve this, the calling process should mask of SIGTERM before
forking the server process and the server should unmask the signal only
after the handler is installed. To simplify the usage of the server, an
new helper function called serve_as_process() is added to do this
automatically and correctly.

Thanks: Scott Murray <scott.murray@konsulko.com> for helping debug
Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
---
 bitbake/lib/bb/asyncrpc/serv.py | 21 +++++++++++++
 bitbake/lib/bb/cooker.py        |  3 +-
 bitbake/lib/hashserv/tests.py   | 54 +++++++++++++++++++++++++--------
 3 files changed, 64 insertions(+), 14 deletions(-)

diff --git a/bitbake/lib/bb/asyncrpc/serv.py b/bitbake/lib/bb/asyncrpc/serv.py
index ef20cb71df..4084f300df 100644
--- a/bitbake/lib/bb/asyncrpc/serv.py
+++ b/bitbake/lib/bb/asyncrpc/serv.py
@@ -9,6 +9,7 @@ import os
 import signal
 import socket
 import sys
+import multiprocessing
 from . import chunkify, DEFAULT_MAX_CHUNK
 
 
@@ -201,12 +202,14 @@ class AsyncServer(object):
             pass
 
     def signal_handler(self):
+        self.logger.debug("Got exit signal")
         self.loop.stop()
 
     def serve_forever(self):
         asyncio.set_event_loop(self.loop)
         try:
             self.loop.add_signal_handler(signal.SIGTERM, self.signal_handler)
+            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGTERM])
 
             self.run_loop_forever()
             self.server.close()
@@ -221,3 +224,21 @@ class AsyncServer(object):
 
             if self._cleanup_socket is not None:
                 self._cleanup_socket()
+
+    def serve_as_process(self, *, prefunc=None, args=()):
+        def run():
+            if prefunc is not None:
+                prefunc(self, *args)
+            self.serve_forever()
+
+        # Temporarily block SIGTERM. The server process will inherit this
+        # block which will ensure it doesn't receive the SIGTERM until the
+        # handler is ready for it
+        mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGTERM])
+        try:
+            self.process = multiprocessing.Process(target=run)
+            self.process.start()
+
+            return self.process
+        finally:
+            signal.pthread_sigmask(signal.SIG_SETMASK, mask)
diff --git a/bitbake/lib/bb/cooker.py b/bitbake/lib/bb/cooker.py
index 39e10e6133..b2d69c28cf 100644
--- a/bitbake/lib/bb/cooker.py
+++ b/bitbake/lib/bb/cooker.py
@@ -390,8 +390,7 @@ class BBCooker:
                 dbfile = (self.data.getVar("PERSISTENT_DIR") or self.data.getVar("CACHE")) + "/hashserv.db"
                 self.hashservaddr = "unix://%s/hashserve.sock" % self.data.getVar("TOPDIR")
                 self.hashserv = hashserv.create_server(self.hashservaddr, dbfile, sync=False)
-                self.hashserv.process = multiprocessing.Process(target=self.hashserv.serve_forever)
-                self.hashserv.process.start()
+                self.hashserv.serve_as_process()
             self.data.setVar("BB_HASHSERVE", self.hashservaddr)
             self.databuilder.origdata.setVar("BB_HASHSERVE", self.hashservaddr)
             self.databuilder.data.setVar("BB_HASHSERVE", self.hashservaddr)
diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py
index e2b762dbf0..e851535c59 100644
--- a/bitbake/lib/hashserv/tests.py
+++ b/bitbake/lib/hashserv/tests.py
@@ -15,28 +15,32 @@ import tempfile
 import threading
 import unittest
 import socket
+import time
+import signal
 
-def _run_server(server, idx):
-    # logging.basicConfig(level=logging.DEBUG, filename='bbhashserv.log', filemode='w',
-    #                     format='%(levelname)s %(filename)s:%(lineno)d %(message)s')
+def server_prefunc(server, idx):
+    logging.basicConfig(level=logging.DEBUG, filename='bbhashserv.log', filemode='w',
+                        format='%(levelname)s %(filename)s:%(lineno)d %(message)s')
+    server.logger.debug("Running server %d" % idx)
     sys.stdout = open('bbhashserv-%d.log' % idx, 'w')
     sys.stderr = sys.stdout
-    server.serve_forever()
-
 
 class HashEquivalenceTestSetup(object):
     METHOD = 'TestMethod'
 
     server_index = 0
 
-    def start_server(self, dbpath=None, upstream=None, read_only=False):
+    def start_server(self, dbpath=None, upstream=None, read_only=False, prefunc=server_prefunc):
         self.server_index += 1
         if dbpath is None:
             dbpath = os.path.join(self.temp_dir.name, "db%d.sqlite" % self.server_index)
 
-        def cleanup_thread(thread):
-            thread.terminate()
-            thread.join()
+        def cleanup_server(server):
+            if server.process.exitcode is not None:
+                return
+
+            server.process.terminate()
+            server.process.join()
 
         server = create_server(self.get_server_addr(self.server_index),
                                dbpath,
@@ -44,9 +48,8 @@ class HashEquivalenceTestSetup(object):
                                read_only=read_only)
         server.dbpath = dbpath
 
-        server.thread = multiprocessing.Process(target=_run_server, args=(server, self.server_index))
-        server.thread.start()
-        self.addCleanup(cleanup_thread, server.thread)
+        server.serve_as_process(prefunc=prefunc, args=(self.server_index,))
+        self.addCleanup(cleanup_server, server)
 
         def cleanup_client(client):
             client.close()
@@ -283,6 +286,33 @@ class HashEquivalenceCommonTests(object):
         self.assertClientGetHash(self.client, taskhash2, None)
 
 
+    def test_slow_server_start(self):
+        """
+        Ensures that the server will exit correctly even if it gets a SIGTERM
+        before entering the main loop
+        """
+
+        event = multiprocessing.Event()
+
+        def prefunc(server, idx):
+            nonlocal event
+            server_prefunc(server, idx)
+            event.wait()
+
+        def do_nothing(signum, frame):
+            pass
+
+        old_signal = signal.signal(signal.SIGTERM, do_nothing)
+        self.addCleanup(signal.signal, signal.SIGTERM, old_signal)
+
+        _, server = self.start_server(prefunc=prefunc)
+        server.process.terminate()
+        time.sleep(30)
+        event.set()
+        server.process.join(300)
+        self.assertIsNotNone(server.process.exitcode, "Server did not exit in a timely manner!")
+
+
 class TestHashEquivalenceUnixServer(HashEquivalenceTestSetup, HashEquivalenceCommonTests, unittest.TestCase):
     def get_server_addr(self, server_idx):
         return "unix://" + os.path.join(self.temp_dir.name, 'sock%d' % server_idx)
-- 
2.32.0


^ permalink raw reply related	[flat|nested] 2+ messages in thread

* Re: [bitbake-devel][PATCH] bitbake: asyncrpc: Catch early SIGTERM
  2021-07-22 16:19 [bitbake-devel][PATCH] bitbake: asyncrpc: Catch early SIGTERM Joshua Watt
@ 2021-07-23 17:05 ` Scott Murray
  0 siblings, 0 replies; 2+ messages in thread
From: Scott Murray @ 2021-07-23 17:05 UTC (permalink / raw)
  To: Joshua Watt; +Cc: bitbake-devel, richard.purdie

On Thu, 22 Jul 2021, Joshua Watt wrote:

> If the SIGTERM signal is sent to an asyncrpc server before it has
> installed the SIGTERM handler in the main loop, it may miss the signal
> which will can cause the calling process to wait forever on the join().
> To resolve this, the calling process should mask of SIGTERM before
> forking the server process and the server should unmask the signal only
> after the handler is installed. To simplify the usage of the server, an
> new helper function called serve_as_process() is added to do this
> automatically and correctly.
>
> Thanks: Scott Murray <scott.murray@konsulko.com> for helping debug
> Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>

I did some runs of parallelized oe-selftest with stock poky (so hash equiv
server in auto mode) with this change added, and it did clear up the test
failures I was previously seeing from stuck bitbake-server processes, so
also:

Tested-by: Scott Murray <scott.murray@konsulko.com>

I'm going to rebase the PR server rework on top and do some test runs.

> ---
>  bitbake/lib/bb/asyncrpc/serv.py | 21 +++++++++++++
>  bitbake/lib/bb/cooker.py        |  3 +-
>  bitbake/lib/hashserv/tests.py   | 54 +++++++++++++++++++++++++--------
>  3 files changed, 64 insertions(+), 14 deletions(-)
>
> diff --git a/bitbake/lib/bb/asyncrpc/serv.py b/bitbake/lib/bb/asyncrpc/serv.py
> index ef20cb71df..4084f300df 100644
> --- a/bitbake/lib/bb/asyncrpc/serv.py
> +++ b/bitbake/lib/bb/asyncrpc/serv.py
> @@ -9,6 +9,7 @@ import os
>  import signal
>  import socket
>  import sys
> +import multiprocessing
>  from . import chunkify, DEFAULT_MAX_CHUNK
>
>
> @@ -201,12 +202,14 @@ class AsyncServer(object):
>              pass
>
>      def signal_handler(self):
> +        self.logger.debug("Got exit signal")
>          self.loop.stop()
>
>      def serve_forever(self):
>          asyncio.set_event_loop(self.loop)
>          try:
>              self.loop.add_signal_handler(signal.SIGTERM, self.signal_handler)
> +            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGTERM])
>
>              self.run_loop_forever()
>              self.server.close()
> @@ -221,3 +224,21 @@ class AsyncServer(object):
>
>              if self._cleanup_socket is not None:
>                  self._cleanup_socket()
> +
> +    def serve_as_process(self, *, prefunc=None, args=()):
> +        def run():
> +            if prefunc is not None:
> +                prefunc(self, *args)
> +            self.serve_forever()
> +
> +        # Temporarily block SIGTERM. The server process will inherit this
> +        # block which will ensure it doesn't receive the SIGTERM until the
> +        # handler is ready for it
> +        mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGTERM])
> +        try:
> +            self.process = multiprocessing.Process(target=run)
> +            self.process.start()
> +
> +            return self.process
> +        finally:
> +            signal.pthread_sigmask(signal.SIG_SETMASK, mask)
> diff --git a/bitbake/lib/bb/cooker.py b/bitbake/lib/bb/cooker.py
> index 39e10e6133..b2d69c28cf 100644
> --- a/bitbake/lib/bb/cooker.py
> +++ b/bitbake/lib/bb/cooker.py
> @@ -390,8 +390,7 @@ class BBCooker:
>                  dbfile = (self.data.getVar("PERSISTENT_DIR") or self.data.getVar("CACHE")) + "/hashserv.db"
>                  self.hashservaddr = "unix://%s/hashserve.sock" % self.data.getVar("TOPDIR")
>                  self.hashserv = hashserv.create_server(self.hashservaddr, dbfile, sync=False)
> -                self.hashserv.process = multiprocessing.Process(target=self.hashserv.serve_forever)
> -                self.hashserv.process.start()
> +                self.hashserv.serve_as_process()
>              self.data.setVar("BB_HASHSERVE", self.hashservaddr)
>              self.databuilder.origdata.setVar("BB_HASHSERVE", self.hashservaddr)
>              self.databuilder.data.setVar("BB_HASHSERVE", self.hashservaddr)
> diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py
> index e2b762dbf0..e851535c59 100644
> --- a/bitbake/lib/hashserv/tests.py
> +++ b/bitbake/lib/hashserv/tests.py
> @@ -15,28 +15,32 @@ import tempfile
>  import threading
>  import unittest
>  import socket
> +import time
> +import signal
>
> -def _run_server(server, idx):
> -    # logging.basicConfig(level=logging.DEBUG, filename='bbhashserv.log', filemode='w',
> -    #                     format='%(levelname)s %(filename)s:%(lineno)d %(message)s')
> +def server_prefunc(server, idx):
> +    logging.basicConfig(level=logging.DEBUG, filename='bbhashserv.log', filemode='w',
> +                        format='%(levelname)s %(filename)s:%(lineno)d %(message)s')
> +    server.logger.debug("Running server %d" % idx)
>      sys.stdout = open('bbhashserv-%d.log' % idx, 'w')
>      sys.stderr = sys.stdout
> -    server.serve_forever()
> -
>
>  class HashEquivalenceTestSetup(object):
>      METHOD = 'TestMethod'
>
>      server_index = 0
>
> -    def start_server(self, dbpath=None, upstream=None, read_only=False):
> +    def start_server(self, dbpath=None, upstream=None, read_only=False, prefunc=server_prefunc):
>          self.server_index += 1
>          if dbpath is None:
>              dbpath = os.path.join(self.temp_dir.name, "db%d.sqlite" % self.server_index)
>
> -        def cleanup_thread(thread):
> -            thread.terminate()
> -            thread.join()
> +        def cleanup_server(server):
> +            if server.process.exitcode is not None:
> +                return
> +
> +            server.process.terminate()
> +            server.process.join()
>
>          server = create_server(self.get_server_addr(self.server_index),
>                                 dbpath,
> @@ -44,9 +48,8 @@ class HashEquivalenceTestSetup(object):
>                                 read_only=read_only)
>          server.dbpath = dbpath
>
> -        server.thread = multiprocessing.Process(target=_run_server, args=(server, self.server_index))
> -        server.thread.start()
> -        self.addCleanup(cleanup_thread, server.thread)
> +        server.serve_as_process(prefunc=prefunc, args=(self.server_index,))
> +        self.addCleanup(cleanup_server, server)
>
>          def cleanup_client(client):
>              client.close()
> @@ -283,6 +286,33 @@ class HashEquivalenceCommonTests(object):
>          self.assertClientGetHash(self.client, taskhash2, None)
>
>
> +    def test_slow_server_start(self):
> +        """
> +        Ensures that the server will exit correctly even if it gets a SIGTERM
> +        before entering the main loop
> +        """
> +
> +        event = multiprocessing.Event()
> +
> +        def prefunc(server, idx):
> +            nonlocal event
> +            server_prefunc(server, idx)
> +            event.wait()
> +
> +        def do_nothing(signum, frame):
> +            pass
> +
> +        old_signal = signal.signal(signal.SIGTERM, do_nothing)
> +        self.addCleanup(signal.signal, signal.SIGTERM, old_signal)
> +
> +        _, server = self.start_server(prefunc=prefunc)
> +        server.process.terminate()
> +        time.sleep(30)
> +        event.set()
> +        server.process.join(300)
> +        self.assertIsNotNone(server.process.exitcode, "Server did not exit in a timely manner!")
> +
> +
>  class TestHashEquivalenceUnixServer(HashEquivalenceTestSetup, HashEquivalenceCommonTests, unittest.TestCase):
>      def get_server_addr(self, server_idx):
>          return "unix://" + os.path.join(self.temp_dir.name, 'sock%d' % server_idx)
>

^ permalink raw reply	[flat|nested] 2+ messages in thread

end of thread, other threads:[~2021-07-23 17:05 UTC | newest]

Thread overview: 2+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-07-22 16:19 [bitbake-devel][PATCH] bitbake: asyncrpc: Catch early SIGTERM Joshua Watt
2021-07-23 17:05 ` Scott Murray

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.