On Tue, Aug 17, 2021 at 10:21 AM John Snow wrote: > > > On Fri, Jul 30, 2021 at 4:19 PM G S Niteesh Babu > wrote: > >> Instead of manually connecting and disconnecting from the >> server. We now rely on the runstate to manage the QMP >> connection. >> >> Along with this the ability to reconnect on certain exceptions >> has also been added. >> >> Signed-off-by: G S Niteesh Babu >> --- >> python/qemu/aqmp/aqmp_tui.py | 109 ++++++++++++++++++++++++++++++----- >> 1 file changed, 94 insertions(+), 15 deletions(-) >> >> diff --git a/python/qemu/aqmp/aqmp_tui.py b/python/qemu/aqmp/aqmp_tui.py >> index 0d5ec62cb7..ef91883fa5 100644 >> --- a/python/qemu/aqmp/aqmp_tui.py >> +++ b/python/qemu/aqmp/aqmp_tui.py >> @@ -25,8 +25,9 @@ >> import urwid_readline >> >> from ..qmp import QEMUMonitorProtocol, QMPBadPortError >> +from .error import ProtocolError >> from .message import DeserializationError, Message, UnexpectedTypeError >> -from .protocol import ConnectError >> +from .protocol import ConnectError, Runstate >> from .qmp_client import ExecInterruptedError, QMPClient >> from .util import create_task, pretty_traceback >> >> @@ -67,12 +68,24 @@ def format_json(msg: str) -> str: >> return ' '.join(words) >> >> >> +def type_name(mtype: Any) -> str: >> + """ >> + Returns the type name >> + """ >> + return type(mtype).__name__ >> > > This is a lot of lines for something that doesn't do very much -- do we > really need it? > No. This has been removed in v4. > > >> + >> + >> class App(QMPClient): >> - def __init__(self, address: Union[str, Tuple[str, int]]) -> None: >> + def __init__(self, address: Union[str, Tuple[str, int]], >> num_retries: int, >> + retry_delay: Optional[int]) -> None: >> urwid.register_signal(type(self), UPDATE_MSG) >> self.window = Window(self) >> self.address = address >> self.aloop: Optional[Any] = None # FIXME: Use more concrete >> type. >> + self.num_retries = num_retries >> + self.retry_delay = retry_delay >> + self.retry: bool = False >> + self.disconnecting: bool = False >> > > Why is this one needed again ? ... > A race condition occurs in protocol.py line 597 The reason behind this is there are two disconnect calls initiated. The first one via kill_app and the second one via manage_connection when the state is set to disconnecting by the first call. One of the calls set's the state to IDLE(protocol.py:584) after it has finished disconnecting, meanwhile the second call is somehow in the process of disconnecting and assert the state to be in DISCONNECTING in protocol.py:597, which it is not since it has been set to IDLE by the first call. If I don't gaurd against the second call I get the following exception ------------------------------------------------------------------------------------------ Traceback (most recent call last): File "/home/niteesh/development/qemu/python/.venv/bin/aqmp-tui", line 33, in sys.exit(load_entry_point('qemu', 'console_scripts', 'aqmp-tui')()) File "/home/niteesh/development/qemu/python/qemu/aqmp/aqmp_tui.py", line 695, in main app.run(args.asyncio_debug) File "/home/niteesh/development/qemu/python/qemu/aqmp/aqmp_tui.py", line 444, in run raise err File "/home/niteesh/development/qemu/python/qemu/aqmp/aqmp_tui.py", line 441, in run main_loop.run() File "/home/niteesh/development/qemu/python/.venv/lib/python3.6/site-packages/urwid/main_loop.py", line 287, in run self._run() File "/home/niteesh/development/qemu/python/.venv/lib/python3.6/site-packages/urwid/main_loop.py", line 385, in _run self.event_loop.run() File "/home/niteesh/development/qemu/python/.venv/lib/python3.6/site-packages/urwid/main_loop.py", line 1494, in run reraise(*exc_info) File "/home/niteesh/development/qemu/python/.venv/lib/python3.6/site-packages/urwid/compat.py", line 58, in reraise raise value File "/home/niteesh/development/qemu/python/qemu/aqmp/aqmp_tui.py", line 391, in manage_connection await self.disconnect() File "/home/niteesh/development/qemu/python/qemu/aqmp/aqmp_tui.py", line 312, in disconnect raise err File "/home/niteesh/development/qemu/python/qemu/aqmp/aqmp_tui.py", line 300, in disconnect await super().disconnect() File "/home/niteesh/development/qemu/python/qemu/aqmp/protocol.py", line 302, in disconnect await self._wait_disconnect() File "/home/niteesh/development/qemu/python/qemu/aqmp/protocol.py", line 583, in _wait_disconnect self._cleanup() File "/home/niteesh/development/qemu/python/qemu/aqmp/qmp_client.py", line 331, in _cleanup super()._cleanup() File "/home/niteesh/development/qemu/python/qemu/aqmp/protocol.py", line 597, in _cleanup assert self.runstate == Runstate.DISCONNECTING AssertionError ------------------------------------------------------------------------------------------- > >> super().__init__() >> >> def add_to_history(self, msg: str, level: Optional[str] = None) -> >> None: >> @@ -119,7 +132,7 @@ def _cb_inbound(self, msg: Message) -> Message: >> LOGGER.info('Error server disconnected before reply') >> urwid.emit_signal(self, UPDATE_MSG, >> '{"error": "Server disconnected before >> reply"}') >> - self._set_status("Server disconnected") >> + await self.disconnect() >> except Exception as err: >> LOGGER.error('Exception from _send_to_server: %s', str(err)) >> raise err >> @@ -136,15 +149,29 @@ def kill_app(self) -> None: >> create_task(self._kill_app()) >> >> async def _kill_app(self) -> None: >> - # It is ok to call disconnect even in disconnect state >> + await self.disconnect() >> + LOGGER.debug('Disconnect finished. Exiting app') >> + raise urwid.ExitMainLoop() >> + >> + async def disconnect(self) -> None: >> + if self.disconnecting: >> + return >> try: >> - await self.disconnect() >> - LOGGER.debug('Disconnect finished. Exiting app') >> + self.disconnecting = True >> + await super().disconnect() >> + self.retry = True >> + except EOFError as err: >> + LOGGER.info('disconnect: %s', type_name(err)) >> + self.retry = True >> + except ProtocolError as err: >> + LOGGER.info('disconnect: %s', type_name(err)) >> + self.retry = False >> except Exception as err: >> - LOGGER.info('_kill_app: %s', str(err)) >> - # Let the app crash after providing a proper stack trace >> + LOGGER.error('disconnect: Unhandled exception %s', str(err)) >> + self.retry = False >> raise err >> - raise urwid.ExitMainLoop() >> + finally: >> + self.disconnecting = False >> >> def handle_event(self, event: Message) -> None: >> # FIXME: Consider all states present in qapi/run-state.json >> @@ -161,14 +188,61 @@ def _get_formatted_address(self) -> str: >> addr = f'{host}:{port}' >> return addr >> >> - async def connect_server(self) -> None: >> + async def _retry_connection(self) -> Optional[str]: >> + current_retries = 0 >> + err = None >> + # Increase in power sequence of 2 if no delay is provided >> + cur_delay = 1 >> + inc_delay = 2 >> + if self.retry_delay: >> + inc_delay = 1 >> + cur_delay = self.retry_delay >> + # initial try >> + await self.connect_server() >> + while self.retry and current_retries < self.num_retries: >> + LOGGER.info('Connection Failed, retrying in %d', cur_delay) >> + status = f'[Retry #{current_retries} ({cur_delay}s)]' >> + self._set_status(status) >> + >> + await asyncio.sleep(cur_delay) >> + >> + err = await self.connect_server() >> + cur_delay *= inc_delay >> + # Cap delay to 5mins >> + cur_delay = min(cur_delay, 5 * 60) >> + current_retries += 1 >> + # If all retries failed report the last error >> + LOGGER.info('All retries failed: %s', str(err)) >> + return type_name(err) >> > > I had suggested something like an exponential backoff, but maybe a > constant delay would be a little cleaner to implement for right now without > getting too fancy over it. If you go with a simpler retry algorithm, do you > think you could clean up the logic in the retry loop here a bit more? > Yes, we can. I'll refactor it to constant delay. > Something like: > > for _ in range(num_retries): > try: > whatever_you_have_to_do_to_connect() > return > except ConnectError as err: > LOGGER.info(...etc) > await asyncio.sleep(whatever_the_delay_is) > # ran out of retries here, presumably the connection manager will just go > idle until the user interferes some other way. > > In particular, I think passing around the name of the exception is a > little dubious -- we should be logging with the actual Exception we've > received. > This has been fixed in V4. We pass the exception now instead of just passing around the name. > > >> + >> + async def manage_connection(self) -> None: >> + while True: >> + if self.runstate == Runstate.IDLE: >> + LOGGER.info('Trying to reconnect') >> > > But will this be true upon the very first boot? This message might not be > right. > Yes, it also occurs in the first boot. I'll fix this in the V3. > > >> + err = await self._retry_connection() >> > > This seems named oddly too, since it might be the initial attempt and not > necessarily a reconnection or a retry. > Will fix that. > > >> + # If retry is still true then, we have exhausted all our >> tries. >> + if self.retry: >> + self._set_status(f'Error: {err}') >> > + else: >> + addr = self._get_formatted_address() >> + self._set_status(f'[Connected {addr}]') >> + elif self.runstate == Runstate.DISCONNECTING: >> + self._set_status('[Disconnected]') >> + await self.disconnect() >> + # check if a retry is needed >> > > Is this required? I would have hoped that after calling disconnect that > the state would have again changed to IDLE and you wouldn't need this > clause here. > After you mentioned it I too felt it was redundant. But on removing it the whole app freezes when trying to exit. I logged the state after the call to disconnect, instead of being in the IDLE state, it is still in DISCONNECTING state. I suspect this results in the constant infinite looping which doesn't give other coroutines a chance to run and blocks the event loop thus resulting in the freezing of the app. But I am not sure why the state isn't changing to IDLE. > > >> + if self.runstate == Runstate.IDLE: >> + continue >> + await self.runstate_changed() >> + >> + async def connect_server(self) -> Optional[str]: >> try: >> await self.connect(self.address) >> - addr = self._get_formatted_address() >> - self._set_status(f'Connected to {addr}') >> + self.retry = False >> except ConnectError as err: >> LOGGER.info('connect_server: ConnectError %s', str(err)) >> - self._set_status('Server shutdown') >> + self.retry = True >> + return type_name(err) >> + return None >> >> def run(self, debug: bool = False) -> None: >> screen = urwid.raw_display.Screen() >> @@ -191,7 +265,7 @@ def run(self, debug: bool = False) -> None: >> event_loop=event_loop) >> >> create_task(self.wait_for_events(), self.aloop) >> - create_task(self.connect_server(), self.aloop) >> + create_task(self.manage_connection(), self.aloop) >> try: >> main_loop.run() >> except Exception as err: >> @@ -333,6 +407,11 @@ def main() -> None: >> parser = argparse.ArgumentParser(description='AQMP TUI') >> parser.add_argument('qmp_server', help='Address of the QMP server' >> '< UNIX socket path | TCP addr:port >') >> + parser.add_argument('--num-retries', type=int, default=10, >> + help='Number of times to reconnect before giving >> up') >> + parser.add_argument('--retry-delay', type=int, >> + help='Time(s) to wait before next retry.' >> + 'Default action is to increase delay in powers >> of 2') >> parser.add_argument('--log-file', help='The Log file name') >> parser.add_argument('--log-level', default='WARNING', >> help='Log level >> ') >> @@ -348,7 +427,7 @@ def main() -> None: >> except QMPBadPortError as err: >> parser.error(str(err)) >> >> - app = App(address) >> + app = App(address, args.num_retries, args.retry_delay) >> >> if args.log_file: >> LOGGER.addHandler(logging.FileHandler(args.log_file)) >> -- >> 2.17.1 >> >> > Right idea overall - possibly needs some polish and to be integrated with > an earlier patch to avoid the intermediate FIXMEs. > > Thanks, > --js >