On Fri, Jul 30, 2021 at 4:19 PM G S Niteesh Babu wrote: > Added a draft of AQMP TUI. > > Implements the follwing basic features: > 1) Command transmission/reception. > 2) Shows events asynchronously. > 3) Shows server status in the bottom status bar. > > Also added necessary pylint, mypy configurations > > Signed-off-by: G S Niteesh Babu > --- > python/qemu/aqmp/aqmp_tui.py | 333 +++++++++++++++++++++++++++++++++++ > python/setup.cfg | 16 +- > 2 files changed, 348 insertions(+), 1 deletion(-) > create mode 100644 python/qemu/aqmp/aqmp_tui.py > > diff --git a/python/qemu/aqmp/aqmp_tui.py b/python/qemu/aqmp/aqmp_tui.py > new file mode 100644 > index 0000000000..ec9eba0aa7 > --- /dev/null > +++ b/python/qemu/aqmp/aqmp_tui.py > @@ -0,0 +1,333 @@ > +# Copyright (c) 2021 > +# > +# Authors: > +# Niteesh Babu G S > +# > +# This work is licensed under the terms of the GNU GPL, version 2 or > +# later. See the COPYING file in the top-level directory. > + > +import argparse > +import asyncio > +import logging > +from logging import Handler > +import signal > + > +import urwid > +import urwid_readline > + > +from ..qmp import QEMUMonitorProtocol, QMPBadPortError > +from .message import DeserializationError, Message, UnexpectedTypeError > +from .protocol import ConnectError > +from .qmp_client import ExecInterruptedError, QMPClient > +from .util import create_task, pretty_traceback > + > + > +UPDATE_MSG = 'UPDATE_MSG' > + > +# Using root logger to enable all loggers under qemu and asyncio > +LOGGER = logging.getLogger() > + > + > +def format_json(msg): > + """ > + Formats given multiline JSON message into a single line message. > + Converting into single line is more asthetically pleasing when looking > + along with error messages compared to multiline JSON. > + """ > + # FIXME: Use better formatting mechanism. Might break at more complex > JSON > + # data. > + msg = msg.replace('\n', '') > + words = msg.split(' ') > + words = [word for word in words if word != ''] > + return ' '.join(words) > + > + > +class App(QMPClient): > + def __init__(self, address): > + urwid.register_signal(type(self), UPDATE_MSG) > + self.window = Window(self) > + self.address = address > + self.aloop = None > + super().__init__() > + > + def add_to_history(self, msg): > + urwid.emit_signal(self, UPDATE_MSG, msg) > + > + def _cb_outbound(self, msg): > + # FIXME: I think the ideal way to omit these messages during > in-TUI > + # logging will be to add a filter to the logger. We can use regex > to > + # filter out messages starting with 'Request:' or 'Response:' but > I > + # think a better approach will be encapsulate the message in an > object > + # and filter based on the object. Encapsulation of the message > will > + # also be necessary when we want different formatting of messages > + # inside TUI. > + handler = LOGGER.handlers[0] > + if not isinstance(handler, TUILogHandler): > + LOGGER.debug('Request: %s', str(msg)) > + self.add_to_history('<-- ' + str(msg)) > + return msg > + > + def _cb_inbound(self, msg): > + handler = LOGGER.handlers[0] > + if not isinstance(handler, TUILogHandler): > + LOGGER.debug('Response: %s', str(msg)) > + self.add_to_history('--> ' + str(msg)) > + return msg > + > + async def wait_for_events(self): > + async for event in self.events: > + self.handle_event(event) > + > + async def _send_to_server(self, raw_msg): > + # FIXME: Format the raw_msg in history view to one line. It is not > + # pleasing to see multiple lines JSON object with an error > statement. > + try: > + msg = Message(bytes(raw_msg, encoding='utf-8')) > + # Format multiline json into a single line JSON, since it is > more > + # pleasing to look along with err message in TUI. > + raw_msg = self.format_json(raw_msg) > + await self._raw(msg, assign_id='id' not in msg) > + except (ValueError, TypeError) as err: > + LOGGER.info('Invalid message: %s', str(err)) > + self.add_to_history(f'{raw_msg}: {err}') > + except (DeserializationError, UnexpectedTypeError) as err: > + LOGGER.info('Invalid message: %s', err.error_message) > + self.add_to_history(f'{raw_msg}: {err.error_message}') > + except ExecInterruptedError: > + LOGGER.info('Error server disconnected before reply') > + urwid.emit_signal(self, UPDATE_MSG, > + '{"error": "Server disconnected before > reply"}') > + self._set_status("Server disconnected") > + except Exception as err: > + LOGGER.error('Exception from _send_to_server: %s', str(err)) > + raise err > + > + def cb_send_to_server(self, msg): > + create_task(self._send_to_server(msg)) > + > + def unhandled_input(self, key): > + if key == 'esc': > + self.kill_app() > + > + def kill_app(self): > + # TODO: Work on the disconnect logic > + create_task(self._kill_app()) > + > + async def _kill_app(self): > + # It is ok to call disconnect even in disconnect state > + try: > + await self.disconnect() > + LOGGER.debug('Disconnect finished. Exiting app') > + except Exception as err: > + LOGGER.info('_kill_app: %s', str(err)) > + # Let the app crash after providing a proper stack trace > + raise err > + raise urwid.ExitMainLoop() > + > + def handle_event(self, event): > + # FIXME: Consider all states present in qapi/run-state.json > + if event['event'] == 'SHUTDOWN': > + self._set_status('Server shutdown') > + > + def _set_status(self, msg: str) -> None: > + self.window.footer.set_text(msg) > + > + def _get_formatted_address(self) -> str: > + addr = f'{self.address}' > + if isinstance(self.address, tuple): > + host, port = self.address > + addr = f'{host}:{port}' > + return addr > + > + async def connect_server(self): > + try: > + await self.connect(self.address) > + addr = self._get_formatted_address() > + self._set_status(f'Connected to {addr}') > + except ConnectError as err: > + LOGGER.info('connect_server: ConnectError %s', str(err)) > + self._set_status('Server shutdown') > + > + def run(self, debug=False): > + self.aloop = asyncio.get_event_loop() > + self.aloop.set_debug(debug) > + > + # Gracefully handle SIGTERM and SIGINT signals > + cancel_signals = [signal.SIGTERM, signal.SIGINT] > + for sig in cancel_signals: > + self.aloop.add_signal_handler(sig, self.kill_app) > + > + event_loop = urwid.AsyncioEventLoop(loop=self.aloop) > + main_loop = urwid.MainLoop(urwid.AttrMap(self.window, > 'background'), > + unhandled_input=self.unhandled_input, > + handle_mouse=True, > + event_loop=event_loop) > + > + create_task(self.wait_for_events(), self.aloop) > + create_task(self.connect_server(), self.aloop) > + try: > + main_loop.run() > + except Exception as err: > + LOGGER.error('%s\n%s\n', str(err), pretty_traceback()) > + raise err > + > + > +class StatusBar(urwid.Text): > + """ > + A simple Text widget that currently only shows connection status. > + """ > + def __init__(self, text=''): > + super().__init__(text, align='right') > + > + > +class Editor(urwid_readline.ReadlineEdit): > + """ > + Support urwid_readline features along with > + history support which lacks in urwid_readline > + """ > + def __init__(self, master): > + super().__init__(caption='> ', multiline=True) > + self.master = master > + self.history = [] > + self.last_index = -1 > + self.show_history = False > + > + def keypress(self, size, key): > + # TODO: Add some logic for down key and clean up logic if > possible. > + # Returning None means the key has been handled by this widget > + # which otherwise is propogated to the parent widget to be > + # handled > + msg = self.get_edit_text() > + if key == 'up' and not msg: > + # Show the history when 'up arrow' is pressed with no input > text. > + # NOTE: The show_history logic is necessary because in > 'multiline' > + # mode (which we use) 'up arrow' is used to move between > lines. > + self.show_history = True > + last_msg = self.history[self.last_index] if self.history else > '' > + self.set_edit_text(last_msg) > + self.edit_pos = len(last_msg) > + self.last_index += 1 > + elif key == 'up' and self.show_history: > + if self.last_index < len(self.history): > + self.set_edit_text(self.history[self.last_index]) > + self.edit_pos = len(self.history[self.last_index]) > + self.last_index += 1 > + elif key == 'meta enter': > + # When using multiline, enter inserts a new line into the > editor > + # send the input to the server on alt + enter > + self.master.cb_send_to_server(msg) > cb_send_to_server calls create_task(App._send_to_server(msg)) at which point in the coroutine you finally do msg = Message(bytes(raw_msg, encoding='utf-8')). However, you can do your message conversion *first*, here in the Editor. That way, you can avoid erasing the editor bar when the message is garbled and the user has a chance to fix the corrupted message *first* before we dispatch to the callback-async-machine. That should reduce quite a lot of the error cases in _send_to_server and helps separate out the two kinds of errors we expect to see here: 1. The user typed something that's garbage, and we didn't actually even send it (because we couldn't), and 2. We sent (or tried to send) the message; but a failure occurred. > + self.history.insert(0, msg) > + self.set_edit_text('') > + self.last_index = 0 > + self.show_history = False > + else: > + self.show_history = False > + self.last_index = 0 > + return super().keypress(size, key) > + return None > + > + > +class EditorWidget(urwid.Filler): > + """ > + Wraps CustomEdit > + """ > + def __init__(self, master): > + super().__init__(Editor(master), valign='top') > + > + > +class HistoryBox(urwid.ListBox): > + """ > + Shows all the QMP message transmitted/received > + """ > + def __init__(self, master): > + self.master = master > + self.history = urwid.SimpleFocusListWalker([]) > + super().__init__(self.history) > + > + def add_to_history(self, history): > + self.history.append(urwid.Text(history)) > + if self.history: > + self.history.set_focus(len(self.history) - 1) > + > + > +class HistoryWindow(urwid.Frame): > + """ > + Composes the HistoryBox and EditorWidget > + """ > + def __init__(self, master): > + self.master = master > + self.editor_widget = EditorWidget(master) > + self.editor = urwid.LineBox(self.editor_widget) > + self.history = HistoryBox(master) > + self.body = urwid.Pile([('weight', 80, self.history), > + ('weight', 20, self.editor)]) > + super().__init__(self.body) > + urwid.connect_signal(self.master, UPDATE_MSG, > self.cb_add_to_history) > + > + def cb_add_to_history(self, msg): > + self.history.add_to_history(msg) > + > + > +class Window(urwid.Frame): > + """ > + This is going to be the main window that is going to compose other > + windows. In this stage it is unnecesssary but will be necessary in > + future when we will have multiple windows and want to the switch > between > + them and display overlays > + """ > + def __init__(self, master): > + self.master = master > + footer = StatusBar() > + body = HistoryWindow(master) > + super().__init__(body, footer=footer) > + > + > +class TUILogHandler(Handler): > + def __init__(self, tui): > + super().__init__() > + self.tui = tui > + > + def emit(self, record): > + level = record.levelname > + msg = record.getMessage() > + msg = f'[{level}]: {msg}' > + self.tui.add_to_history(msg) > + > + > +def main(): > + parser = argparse.ArgumentParser(description='AQMP TUI') > + parser.add_argument('qmp_server', help='Address of the QMP server' > + '< UNIX socket path | TCP addr:port >') > + parser.add_argument('--log-file', help='The Log file name') > + parser.add_argument('--log-level', default='WARNING', > + help='Log level > ') > + parser.add_argument('--asyncio-debug', action='store_true', > + help='Enable debug mode for asyncio loop' > + 'Generates lot of output, makes TUI unusable when' > + 'logs are logged in the TUI itself.' > + 'Use only when logging to a file') > + args = parser.parse_args() > + > + try: > + address = QEMUMonitorProtocol.parse_address(args.qmp_server) > + except QMPBadPortError as err: > + parser.error(err) > + > + app = App(address) > + > + if args.log_file: > + LOGGER.addHandler(logging.FileHandler(args.log_file)) > + else: > + LOGGER.addHandler(TUILogHandler(app)) > + > + log_level = logging.getLevelName(args.log_level) > + # getLevelName returns 'Level {log_level}' when a invalid level is > passed. > + if log_level == f'Level {args.log_level}': > + parser.error('Invalid log level') > + LOGGER.setLevel(log_level) > + > + app.run(args.asyncio_debug) > + > + > +if __name__ == '__main__': > + main() # type: ignore > diff --git a/python/setup.cfg b/python/setup.cfg > index d106a0ed7a..50f9894468 100644 > --- a/python/setup.cfg > +++ b/python/setup.cfg > @@ -81,8 +81,22 @@ namespace_packages = True > # fusepy has no type stubs: > allow_subclassing_any = True > > +[mypy-qemu.aqmp.aqmp_tui] > +disallow_untyped_defs = False > +disallow_incomplete_defs = False > +check_untyped_defs = False > +# urwid and urwid_readline have no type stubs: > +allow_subclassing_any = True > + > +# The following missing import directives are because these libraries do > not > +# provide type stubs. Allow them on an as-needed basis for mypy. > [mypy-fuse] > -# fusepy has no type stubs: > +ignore_missing_imports = True > + > +[mypy-urwid] > +ignore_missing_imports = True > + > +[mypy-urwid_readline] > ignore_missing_imports = True > > [pylint.messages control] > -- > 2.17.1 > >