On Fri, Jul 30, 2021 at 4:19 PM G S Niteesh Babu wrote: > Instead of manually connecting and disconnecting from the > server. We now rely on the runstate to manage the QMP > connection. > > Along with this the ability to reconnect on certain exceptions > has also been added. > > Signed-off-by: G S Niteesh Babu > --- > python/qemu/aqmp/aqmp_tui.py | 109 ++++++++++++++++++++++++++++++----- > 1 file changed, 94 insertions(+), 15 deletions(-) > > diff --git a/python/qemu/aqmp/aqmp_tui.py b/python/qemu/aqmp/aqmp_tui.py > index 0d5ec62cb7..ef91883fa5 100644 > --- a/python/qemu/aqmp/aqmp_tui.py > +++ b/python/qemu/aqmp/aqmp_tui.py > @@ -25,8 +25,9 @@ > import urwid_readline > > from ..qmp import QEMUMonitorProtocol, QMPBadPortError > +from .error import ProtocolError > from .message import DeserializationError, Message, UnexpectedTypeError > -from .protocol import ConnectError > +from .protocol import ConnectError, Runstate > from .qmp_client import ExecInterruptedError, QMPClient > from .util import create_task, pretty_traceback > > @@ -67,12 +68,24 @@ def format_json(msg: str) -> str: > return ' '.join(words) > > > +def type_name(mtype: Any) -> str: > + """ > + Returns the type name > + """ > + return type(mtype).__name__ > This is a lot of lines for something that doesn't do very much -- do we really need it? > + > + > class App(QMPClient): > - def __init__(self, address: Union[str, Tuple[str, int]]) -> None: > + def __init__(self, address: Union[str, Tuple[str, int]], num_retries: > int, > + retry_delay: Optional[int]) -> None: > urwid.register_signal(type(self), UPDATE_MSG) > self.window = Window(self) > self.address = address > self.aloop: Optional[Any] = None # FIXME: Use more concrete type. > + self.num_retries = num_retries > + self.retry_delay = retry_delay > + self.retry: bool = False > + self.disconnecting: bool = False > Why is this one needed again ? ... > super().__init__() > > def add_to_history(self, msg: str, level: Optional[str] = None) -> > None: > @@ -119,7 +132,7 @@ def _cb_inbound(self, msg: Message) -> Message: > LOGGER.info('Error server disconnected before reply') > urwid.emit_signal(self, UPDATE_MSG, > '{"error": "Server disconnected before > reply"}') > - self._set_status("Server disconnected") > + await self.disconnect() > except Exception as err: > LOGGER.error('Exception from _send_to_server: %s', str(err)) > raise err > @@ -136,15 +149,29 @@ def kill_app(self) -> None: > create_task(self._kill_app()) > > async def _kill_app(self) -> None: > - # It is ok to call disconnect even in disconnect state > + await self.disconnect() > + LOGGER.debug('Disconnect finished. Exiting app') > + raise urwid.ExitMainLoop() > + > + async def disconnect(self) -> None: > + if self.disconnecting: > + return > try: > - await self.disconnect() > - LOGGER.debug('Disconnect finished. Exiting app') > + self.disconnecting = True > + await super().disconnect() > + self.retry = True > + except EOFError as err: > + LOGGER.info('disconnect: %s', type_name(err)) > + self.retry = True > + except ProtocolError as err: > + LOGGER.info('disconnect: %s', type_name(err)) > + self.retry = False > except Exception as err: > - LOGGER.info('_kill_app: %s', str(err)) > - # Let the app crash after providing a proper stack trace > + LOGGER.error('disconnect: Unhandled exception %s', str(err)) > + self.retry = False > raise err > - raise urwid.ExitMainLoop() > + finally: > + self.disconnecting = False > > def handle_event(self, event: Message) -> None: > # FIXME: Consider all states present in qapi/run-state.json > @@ -161,14 +188,61 @@ def _get_formatted_address(self) -> str: > addr = f'{host}:{port}' > return addr > > - async def connect_server(self) -> None: > + async def _retry_connection(self) -> Optional[str]: > + current_retries = 0 > + err = None > + # Increase in power sequence of 2 if no delay is provided > + cur_delay = 1 > + inc_delay = 2 > + if self.retry_delay: > + inc_delay = 1 > + cur_delay = self.retry_delay > + # initial try > + await self.connect_server() > + while self.retry and current_retries < self.num_retries: > + LOGGER.info('Connection Failed, retrying in %d', cur_delay) > + status = f'[Retry #{current_retries} ({cur_delay}s)]' > + self._set_status(status) > + > + await asyncio.sleep(cur_delay) > + > + err = await self.connect_server() > + cur_delay *= inc_delay > + # Cap delay to 5mins > + cur_delay = min(cur_delay, 5 * 60) > + current_retries += 1 > + # If all retries failed report the last error > + LOGGER.info('All retries failed: %s', str(err)) > + return type_name(err) > I had suggested something like an exponential backoff, but maybe a constant delay would be a little cleaner to implement for right now without getting too fancy over it. If you go with a simpler retry algorithm, do you think you could clean up the logic in the retry loop here a bit more? Something like: for _ in range(num_retries): try: whatever_you_have_to_do_to_connect() return except ConnectError as err: LOGGER.info(...etc) await asyncio.sleep(whatever_the_delay_is) # ran out of retries here, presumably the connection manager will just go idle until the user interferes some other way. In particular, I think passing around the name of the exception is a little dubious -- we should be logging with the actual Exception we've received. > + > + async def manage_connection(self) -> None: > + while True: > + if self.runstate == Runstate.IDLE: > + LOGGER.info('Trying to reconnect') > But will this be true upon the very first boot? This message might not be right. > + err = await self._retry_connection() > This seems named oddly too, since it might be the initial attempt and not necessarily a reconnection or a retry. > + # If retry is still true then, we have exhausted all our > tries. > + if self.retry: > + self._set_status(f'Error: {err}') > + else: > + addr = self._get_formatted_address() > + self._set_status(f'[Connected {addr}]') > + elif self.runstate == Runstate.DISCONNECTING: > + self._set_status('[Disconnected]') > + await self.disconnect() > + # check if a retry is needed > Is this required? I would have hoped that after calling disconnect that the state would have again changed to IDLE and you wouldn't need this clause here. > + if self.runstate == Runstate.IDLE: > + continue > + await self.runstate_changed() > + > + async def connect_server(self) -> Optional[str]: > try: > await self.connect(self.address) > - addr = self._get_formatted_address() > - self._set_status(f'Connected to {addr}') > + self.retry = False > except ConnectError as err: > LOGGER.info('connect_server: ConnectError %s', str(err)) > - self._set_status('Server shutdown') > + self.retry = True > + return type_name(err) > + return None > > def run(self, debug: bool = False) -> None: > screen = urwid.raw_display.Screen() > @@ -191,7 +265,7 @@ def run(self, debug: bool = False) -> None: > event_loop=event_loop) > > create_task(self.wait_for_events(), self.aloop) > - create_task(self.connect_server(), self.aloop) > + create_task(self.manage_connection(), self.aloop) > try: > main_loop.run() > except Exception as err: > @@ -333,6 +407,11 @@ def main() -> None: > parser = argparse.ArgumentParser(description='AQMP TUI') > parser.add_argument('qmp_server', help='Address of the QMP server' > '< UNIX socket path | TCP addr:port >') > + parser.add_argument('--num-retries', type=int, default=10, > + help='Number of times to reconnect before giving > up') > + parser.add_argument('--retry-delay', type=int, > + help='Time(s) to wait before next retry.' > + 'Default action is to increase delay in powers of > 2') > parser.add_argument('--log-file', help='The Log file name') > parser.add_argument('--log-level', default='WARNING', > help='Log level > ') > @@ -348,7 +427,7 @@ def main() -> None: > except QMPBadPortError as err: > parser.error(str(err)) > > - app = App(address) > + app = App(address, args.num_retries, args.retry_delay) > > if args.log_file: > LOGGER.addHandler(logging.FileHandler(args.log_file)) > -- > 2.17.1 > > Right idea overall - possibly needs some polish and to be integrated with an earlier patch to avoid the intermediate FIXMEs. Thanks, --js