qemu-devel.nongnu.org archive mirror
 help / color / mirror / Atom feed
* [PATCH RFC 0/7] RFC: Asynchronous QMP Draft
@ 2021-04-13 15:55 John Snow
  2021-04-13 15:55 ` [PATCH RFC 1/7] util: asyncio-related helpers John Snow
                   ` (7 more replies)
  0 siblings, 8 replies; 21+ messages in thread
From: John Snow @ 2021-04-13 15:55 UTC (permalink / raw)
  To: qemu-devel; +Cc: crosa, John Snow, ehabkost, stefanha, armbru

(Does not apply to the QEMU tree; this is against a blank repository.)\r
\r
Hi! This is a Draft RFC for an asyncio-based Python library that\r
implements a QMP client. The intent is to eventually publish this\r
library directly to PyPI, so the design focus of this library is to be\r
"useful" instead of providing as low-level an interface as possible.\r
\r
I am sending this to solicit general, high-level design feedback on the\r
overal layout and approach. Many minor details are still left to be\r
implemented, and a lot of the docstrings and documentation need to be\r
audited to make sure they still apply as I've shuffled things around a\r
lot in the course of development.\r
\r
There are some pretty notable things missing still; in particular I need\r
to develop an Event API (there is a tiny stub added as a hack, but it's\r
very simplistic), and I also need to develop a sync bridge so that this\r
library could be used in existing iotests if we eventually expect to\r
replace the old QMP library with this one.\r
\r
Scattered throughout these files are "RFC" comments and other "FIXME"\r
and "TODO" items where I've tried to stub out some of the things I am\r
still unsure of.\r
\r
Thanks!\r
\r
John Snow (7):\r
  util: asyncio-related helpers\r
  error: Error classes and so on.\r
  protocol: generic async message-based protocol loop\r
  message: add QMP Message type\r
  models: Add well-known QMP objects\r
  qmp_protocol: add QMP client implementation\r
  linter config\r
\r
 .flake8         |   2 +\r
 error.py        | 163 +++++++++++\r
 message.py      | 196 ++++++++++++++\r
 models.py       | 177 ++++++++++++\r
 protocol.py     | 704 ++++++++++++++++++++++++++++++++++++++++++++++++\r
 pylintrc        |  53 ++++\r
 qmp_protocol.py | 420 +++++++++++++++++++++++++++++\r
 util.py         |  87 ++++++\r
 8 files changed, 1802 insertions(+)\r
 create mode 100644 .flake8\r
 create mode 100644 error.py\r
 create mode 100644 message.py\r
 create mode 100644 models.py\r
 create mode 100644 protocol.py\r
 create mode 100644 pylintrc\r
 create mode 100644 qmp_protocol.py\r
 create mode 100644 util.py\r
\r
-- \r
2.30.2\r
\r



^ permalink raw reply	[flat|nested] 21+ messages in thread

* [PATCH RFC 1/7] util: asyncio-related helpers
  2021-04-13 15:55 [PATCH RFC 0/7] RFC: Asynchronous QMP Draft John Snow
@ 2021-04-13 15:55 ` John Snow
  2021-04-13 15:55 ` [PATCH RFC 2/7] error: Error classes and so on John Snow
                   ` (6 subsequent siblings)
  7 siblings, 0 replies; 21+ messages in thread
From: John Snow @ 2021-04-13 15:55 UTC (permalink / raw)
  To: qemu-devel; +Cc: crosa, John Snow, ehabkost, stefanha, armbru

Nothing too interesting design-wise here; mostly asyncio-related helpers
designed to make writing Python 3.6-compliant code a little nicer to
read.

Signed-off-by: John Snow <jsnow@redhat.com>
---
 util.py | 87 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 87 insertions(+)
 create mode 100644 util.py

diff --git a/util.py b/util.py
new file mode 100644
index 0000000..2640f82
--- /dev/null
+++ b/util.py
@@ -0,0 +1,87 @@
+"""
+Misc. utils and helper functions
+"""
+
+import asyncio
+import traceback
+import sys
+from typing import (
+    Any,
+    Coroutine,
+    TypeVar,
+)
+
+
+T = TypeVar('T')
+
+
+def create_task(coro: Coroutine[Any, Any, T]) -> 'asyncio.Future[T]':
+    """
+    Python 3.6-compatible create_task() wrapper.
+    """
+    if hasattr(asyncio, 'create_task'):
+        # Python 3.7+
+        return asyncio.create_task(coro)
+
+    # Python 3.6
+    return asyncio.ensure_future(coro)
+
+
+async def wait_closed(writer: asyncio.StreamWriter) -> None:
+    """
+    Python 3.6-compatible StreamWriter.wait_closed() wrapper.
+    """
+    if hasattr(writer, 'wait_closed'):
+        # Python 3.7+
+        await writer.wait_closed()
+    else:
+        # Python 3.6
+        transport = writer.transport
+        assert isinstance(transport, asyncio.WriteTransport)
+
+        while not transport.is_closing():
+            await asyncio.sleep(0.0)
+        while transport.get_write_buffer_size() > 0:
+            await asyncio.sleep(0.0)
+
+
+def asyncio_run(coro: Coroutine[Any, Any, T]) -> T:
+    """
+    Python 3.6-compatible asyncio.run() wrapper.
+    """
+    # Python 3.7+
+    if hasattr(asyncio, 'run'):
+        return asyncio.run(coro)
+
+    # Python 3.6
+    loop = asyncio.get_event_loop()
+    ret = loop.run_until_complete(coro)
+    loop.close()
+
+    return ret
+
+
+def pretty_traceback() -> str:
+    """
+    Print the current traceback, but indented to provide visual distinction.
+
+    This is useful for printing a traceback within a traceback for
+    debugging purposes when encapsulating errors to deliver them up the
+    stack; when those errors are printed, this helps provide a nice
+    visual grouping to quickly identify the parts of the error that
+    belong to the inner exception.
+
+    :returns: A string, formatted something like the following::
+
+      | Traceback (most recent call last):
+      |   File "foobar.py", line 42, in arbitrary_example
+      |     foo.baz()
+      | ArbitraryError: [Errno 42] Something bad happened!
+
+    """
+    exc_lines = []
+    for chunk in traceback.format_exception(*sys.exc_info()):
+        for line in chunk.split("\n"):
+            if line:
+                exc_lines.append(f"  | {line}")
+    return "\n".join(exc_lines)
-- 
2.30.2



^ permalink raw reply	[flat|nested] 21+ messages in thread

* [PATCH RFC 2/7] error: Error classes and so on.
  2021-04-13 15:55 [PATCH RFC 0/7] RFC: Asynchronous QMP Draft John Snow
  2021-04-13 15:55 ` [PATCH RFC 1/7] util: asyncio-related helpers John Snow
@ 2021-04-13 15:55 ` John Snow
  2021-04-13 15:55 ` [PATCH RFC 3/7] protocol: generic async message-based protocol loop John Snow
                   ` (5 subsequent siblings)
  7 siblings, 0 replies; 21+ messages in thread
From: John Snow @ 2021-04-13 15:55 UTC (permalink / raw)
  To: qemu-devel; +Cc: crosa, John Snow, ehabkost, stefanha, armbru

May be somewhat hard to make sense of until you see how these classes
are used later on. Notably, although I have split QMP's functionality
into a "protocol" class and a "QMP" class, representing a separation of
the loop mechanisms and the QMP protocol itself, this file was written
prior to that split and contains both "generic" and "QMP-specific" error
classes.

It will have to be split out later, but for the purposes of an RFC where
I wanted a quick eyeball on design, I thought it wasn't necessary to
clean that up just yet.

The MultiException class might warrant a closer inspection, it's the
"weirdest" thing here. It's intended to be used internally by the
module, but as with all best laid plans, there is always the ability it
will somehow leak out into the caller's space through some unforseen
mechanism.

Signed-off-by: John Snow <jsnow@redhat.com>
---
 error.py | 163 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 163 insertions(+)
 create mode 100644 error.py

diff --git a/error.py b/error.py
new file mode 100644
index 0000000..f19f8e0
--- /dev/null
+++ b/error.py
@@ -0,0 +1,163 @@
+"""Generic error classes.
+
+This module seeks to provide semantic error classes that are intended to
+be used directly by clients when they would like to handle particular
+semantic failures (e.g. "failed to connect") without needing to know the
+enumeration of possible reasons for that failure.
+
+AQMPError serves as the ancestor for almost all exceptions raised by
+this package, and is suitable for use in handling semantic errors from
+this library. In most cases, individual methods will attempt to catch
+and re-encapsulate various exceptions to provide a semantic
+error-handling interface, though this is not necessarily true of
+internal interfaces.
+
+Some errors are not defined here in this module, but exist alongside
+more specific error domains in other modules. They are listed here for
+convenience anyway.
+
+The error inheritance tree is as follows::
+
+  MultiException
+  AQMPError
+    ProtocolError
+      RawProtocolError
+        DeserializationError
+        UnexpectedTypeError
+      GreetingError
+      NegotiationError
+      MsgProtocolError   (message.py)
+        ObjectTypeError  (message.py)
+        OrphanedError    (message.py)
+        ServerParseError (message.py)
+    ConnectError
+    DisconnectedError
+    StateError
+
+The only exception that is not an `AQMPError` is `MultiException`. It is
+special, and used to encapsulate one-or-more exceptions of an arbitrary
+kind; this exception MAY be raised on disconnect() when there are two or
+more exceptions from the AQMP event loop to report back to the caller.
+
+(The bottom half is designed in such a way that exceptions are attempted
+to be handled internally, but in cases of catastrophic failure, it may
+still occur.)
+
+See `MultiException` and `AsyncProtocol.disconnect()` for more details.
+
+"""
+
+from typing import Iterable, Iterator
+
+
+class AQMPError(Exception):
+    # Don't use this directly: create a subclass.
+    """Base failure for all errors raised by AQMP."""
+
+
+class ProtocolError(AQMPError):
+    """Abstract error class for protocol failures."""
+    def __init__(self, error_message: str):
+        super().__init__()
+        self.error_message = error_message
+
+    def __str__(self) -> str:
+        return f"QMP protocol error: {self.error_message}"
+
+
+class RawProtocolError(ProtocolError):
+    """
+    Abstract error class for low-level parsing failures.
+    """
+    def __init__(self, error_message: str, raw: bytes):
+        super().__init__(error_message)
+        self.raw = raw
+
+    def __str__(self) -> str:
+        return "\n".join([
+            super().__str__(),
+            f"  raw bytes were: {str(self.raw)}",
+        ])
+
+
+class DeserializationError(RawProtocolError):
+    """Incoming message was not understood as JSON."""
+
+
+class UnexpectedTypeError(RawProtocolError):
+    """Incoming message was JSON, but not a JSON object."""
+
+
+class ConnectError(AQMPError):
+    """
+    Initial connection process failed.
+    Always wraps a "root cause" exception that can be interrogated for info.
+    """
+
+
+class GreetingError(ProtocolError):
+    """An exception occurred during the Greeting phase."""
+    def __init__(self, error_message: str, exc: Exception):
+        super().__init__(error_message)
+        self.exc = exc
+
+    def __str__(self) -> str:
+        return (
+            f"QMP protocol error: {self.error_message}\n"
+            f"  Cause: {self.exc!s}\n"
+        )
+
+
+class NegotiationError(ProtocolError):
+    """An exception occurred during the Negotiation phase."""
+    def __init__(self, error_message: str, exc: Exception):
+        super().__init__(error_message)
+        self.exc = exc
+
+    def __str__(self) -> str:
+        return (
+            f"QMP protocol error: {self.error_message}\n"
+            f"  Cause: {self.exc!s}\n"
+        )
+
+
+class DisconnectedError(AQMPError):
+    """
+    Command was not able to be completed; we have been Disconnected.
+
+    This error is raised in response to a pending execution when the
+    back-end is unable to process responses any more.
+    """
+
+
+class StateError(AQMPError):
+    """
+    An API command (connect, execute, etc) was issued at an inappropriate time.
+
+    (e.g. execute() while disconnected; connect() while connected; etc.)
+    """
+
+
+class MultiException(Exception):
+    """
+    Used for multiplexing exceptions.
+
+    This exception is used in the case that errors were encountered in both the
+    Reader and Writer tasks, and we must raise more than one.
+    """
+    def __init__(self, exceptions: Iterable[BaseException]):
+        super().__init__(exceptions)
+        self.exceptions = list(exceptions)
+
+    def __str__(self) -> str:
+        ret = "------------------------------\n"
+        ret += "Multiple Exceptions occurred:\n"
+        ret += "\n"
+        for i, exc in enumerate(self.exceptions):
+            ret += f"{i}) {str(exc)}\n"
+            ret += "\n"
+        ret += "-----------------------------\n"
+        return ret
+
+    def __iter__(self) -> Iterator[BaseException]:
+        return iter(self.exceptions)
-- 
2.30.2



^ permalink raw reply	[flat|nested] 21+ messages in thread

* [PATCH RFC 3/7] protocol: generic async message-based protocol loop
  2021-04-13 15:55 [PATCH RFC 0/7] RFC: Asynchronous QMP Draft John Snow
  2021-04-13 15:55 ` [PATCH RFC 1/7] util: asyncio-related helpers John Snow
  2021-04-13 15:55 ` [PATCH RFC 2/7] error: Error classes and so on John Snow
@ 2021-04-13 15:55 ` John Snow
  2021-04-13 20:00   ` Stefan Hajnoczi
  2021-04-13 15:55 ` [PATCH RFC 4/7] message: add QMP Message type John Snow
                   ` (4 subsequent siblings)
  7 siblings, 1 reply; 21+ messages in thread
From: John Snow @ 2021-04-13 15:55 UTC (permalink / raw)
  To: qemu-devel; +Cc: crosa, John Snow, ehabkost, stefanha, armbru

This module provides the protocol-agnostic framework upon which QMP will
be built. I also have (not included in this series) a qtest
implementation that uses this same framework, which is why it is split
into two portions like this.

The design uses two independent tasks in the "bottol half", a writer and
a reader. These tasks run for the duration of the connection and
independently send and receive messages, respectively.

A third task, disconnect, is scheduled whenever an error occurs and
facilitates coalescing of the other two tasks. MultiException is used in
this case if *both* tasks should have Exceptions that need to be
reported, though at the time of writing, I think this circumstance might
only be a theoretical concern.

The generic model here does not provide execute(), but the model for QMP
is informative for how this class is laid out. Below, QMP's execute()
function deposits a message into the outbound queue. The writer task
wakes up to process the queue and deposits information in the write
buffer, where the message is finally dispatched. Meanwhile, the
execute() call is expected to block on an RPC mailbox waiting for a
reply from the server.

On the return trip, the reader wakes up when data arrives in the
buffer. The message is deserialized and handed off to the protocol layer
to route accordingly. QMP will route this message into either the Event
queue or one of the pending RPC mailboxes.

Upon this message being routed to the correct RPC mailbox, execute()
will be woken up and allowed to process the reply and deliver it back to
the caller.

The reason for separating the inbound and outbound tasks to such an
extreme degree is to allow for designs and extensions where this
asynchronous loop may be launched in a separate thread. In this model,
it is possible to use a synchronous, thread-safe function to deposit new
messages into the outbound queue; this was seen as a viable way to offer
solid synchronous bindings while still allowing events to be processed
truly asynchronously.

Separating it this way also allows us to fairly easily support
Out-of-band executions with little additional effort; essentially all
commands are treated as out-of-band.

The execute graph:

                       +---------+
                       | caller  |
                       +---------+
                            |
                            v
                       +---------+
     +---------------- |execute()| <----------+
     |                 +---------+            |
     |                                        |
-----------------------------------------------------------
     v                                        |
+----+----+    +-----------+           +------+-------+
|Mailboxes|    |Event Queue|           |Outbound Queue|
+----+----+    +------+----+           +------+-------+
     |                |                       ^
     v                v                       |
  +--+----------------+---+       +-----------+-----------+
  | Reader Task/Coroutine |       | Writer Task/Coroutine |
  +-----------+-----------+       +-----------+-----------+
              |                               ^
              v                               |
        +-----+------+                  +-----+------+
        |StreamReader|                  |StreamWriter|
        +------------+                  +------------+

Signed-off-by: John Snow <jsnow@redhat.com>
---
 protocol.py | 704 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 704 insertions(+)
 create mode 100644 protocol.py

diff --git a/protocol.py b/protocol.py
new file mode 100644
index 0000000..27d1558
--- /dev/null
+++ b/protocol.py
@@ -0,0 +1,704 @@
+"""
+Async message-based protocol support.
+
+This module provides a generic framework for sending and receiving
+messages over an asyncio stream.
+
+`AsyncProtocol` is an abstract class that implements the core mechanisms
+of a simple send/receive protocol, and is designed to be extended.
+
+`AsyncTasks` provides a container class that aggregates tasks that make
+up the loop used by `AsyncProtocol`.
+"""
+
+import asyncio
+from asyncio import StreamReader, StreamWriter
+import logging
+from ssl import SSLContext
+from typing import (
+    Any,
+    Awaitable,
+    Callable,
+    Coroutine,
+    Iterator,
+    List,
+    Generic,
+    Optional,
+    Tuple,
+    TypeVar,
+    Union,
+)
+
+from error import (
+    ConnectError,
+    MultiException,
+    StateError,
+)
+from util import create_task, pretty_traceback, wait_closed
+
+
+T = TypeVar('T')
+_TaskFN = Callable[[], Awaitable[None]]  # aka ``async def func() -> None``
+_FutureT = TypeVar('_FutureT', bound=Optional['asyncio.Future[Any]'])
+_GatherRet = List[Optional[BaseException]]
+
+
+class AsyncTasks:
+    """
+    AsyncTasks is a collection of bottom half tasks designed to run forever.
+
+    This is a convenience wrapper to make calls from `AsyncProtocol` simpler to
+    follow by behaving as a simple aggregate of two or more tasks, such that
+    a higher-level connection manager can simply refer to "the bottom half"
+    as one coherent entity instead of several.
+
+    The general flow is:
+
+    1. ``tasks = AsyncTasks(logger_for_my_client)``
+    2. ``tasks.start(my_reader, my_writer)``
+    3. ``...``
+    4. ``await tasks.cancel()``
+    5. ``tasks.result()``
+
+    :param logger: A logger to use for debugging messages. Useful to
+                   associate messages with a particular server context.
+    """
+
+    logger = logging.getLogger(__name__)
+
+    def __init__(self, logger: Optional[logging.Logger] = None):
+        if logger is not None:
+            self.logger = logger
+
+        # Named tasks
+        self.reader: Optional['asyncio.Future[None]'] = None
+        self.writer: Optional['asyncio.Future[None]'] = None
+
+        # Internal aggregate of all of the above tasks.
+        self._all: Optional['asyncio.Future[_GatherRet]'] = None
+
+    def _all_tasks(self) -> Iterator[Optional['asyncio.Future[None]']]:
+        """Yields all tasks, defined or not, in ideal cancellation order."""
+        yield self.writer
+        yield self.reader
+
+    def __iter__(self) -> Iterator['asyncio.Future[None]']:
+        """Yields all defined tasks, in ideal cancellation order."""
+        for task in self._all_tasks():
+            if task is not None:
+                yield task
+
+    @property
+    def _all_tasks_defined(self) -> bool:
+        """Returns True if all tasks are defined."""
+        return all(map(lambda task: task is not None, self._all_tasks()))
+
+    @property
+    def _some_tasks_done(self) -> bool:
+        """Returns True if any defined tasks are done executing."""
+        return any(map(lambda task: task.done(), iter(self)))
+
+    def __bool__(self) -> bool:
+        """Returns True when any tasks are defined at all."""
+        return bool(tuple(iter(self)))
+
+    @property
+    def running(self) -> bool:
+        """Returns True if all tasks are defined and still running."""
+        return self._all_tasks_defined and not self._some_tasks_done
+
+    def start(self,
+              reader_coro: Coroutine[Any, Any, None],
+              writer_coro: Coroutine[Any, Any, None]) -> None:
+        """
+        Starts executing tasks in the current async context.
+
+        :param reader_coro: Coroutine, message reader task.
+        :param writer_coro: Coroutine, message writer task.
+        """
+        self.reader = create_task(reader_coro)
+        self.writer = create_task(writer_coro)
+
+        # Uses extensible self-iterator.
+        self._all = asyncio.gather(*iter(self), return_exceptions=True)
+
+    async def cancel(self) -> None:
+        """
+        Cancels all tasks and awaits full cancellation.
+
+        Exceptions, if any, can be obtained by calling `result()`.
+        """
+        for task in self:
+            if task and not task.done():
+                self.logger.debug("cancelling task %s", str(task))
+                task.cancel()
+
+        if self._all:
+            self.logger.debug("Awaiting all tasks to finish ...")
+            await self._all
+
+    def _cleanup(self) -> None:
+        """
+        Erase all task handles; asserts that no tasks are running.
+        """
+        def _paranoid_task_erase(task: _FutureT) -> Optional[_FutureT]:
+            assert (task is None) or task.done()
+            return None if (task and task.done()) else task
+
+        self.reader = _paranoid_task_erase(self.reader)
+        self.writer = _paranoid_task_erase(self.writer)
+        self._all = _paranoid_task_erase(self._all)
+
+    def result(self) -> None:
+        """
+        Raises exception(s) from the finished tasks, if any.
+
+        Called to fully quiesce this task group. asyncio.CancelledError is
+        never raised; in the event of an intentional cancellation this
+        function will not raise any errors.
+
+        If an exception in one bottom half caused an unscheduled disconnect,
+        that exception will be raised.
+
+        :raise: `Exception`      Arbitrary exceptions re-raised on behalf of
+                                 the bottom half.
+        :raise: `MultiException` Iterable Exception used to multiplex multiple
+                                 exceptions when multiple threads failed.
+        """
+        exceptions: List[BaseException] = []
+        results = self._all.result() if self._all else ()
+        self._cleanup()
+
+        for result in results:
+            if result is None:
+                continue
+            if not isinstance(result, asyncio.CancelledError):
+                exceptions.append(result)
+
+        if len(exceptions) == 1:
+            raise exceptions.pop()
+        if len(exceptions) > 1:
+            raise MultiException(exceptions)
+
+
+class AsyncProtocol(Generic[T]):
+    """AsyncProtocol implements a generic async message-based protocol.
+
+    This protocol assumes the basic unit of information transfer between
+    client and server is a "message", the details of which are left up
+    to the implementation. It assumes the sending and receiving of these
+    messages is full-duplex and not necessarily correlated; i.e. it
+    supports asynchronous inbound messages.
+
+    It is designed to be extended by a specific protocol which provides
+    the implementations for how to read and send messages. These must be
+    defined in `_do_recv()` and `_do_send()`, respectively.
+
+    Other callbacks that have a default implemention, but may be
+    extended or overridden:
+     - _on_connect: Actions performed prior to loop start.
+     - _on_start:   Actions performed immediately after loop start.
+     - _on_message: Actions performed when a message is received.
+                    The default implementation does nothing at all.
+     - _cb_outbound: Log/Filter outgoing messages.
+     - _cb_inbound: Log/Filter incoming messages.
+
+    :param name: Name used for logging messages, if any.
+    """
+    #: Logger object for debugging messages
+    logger = logging.getLogger(__name__)
+
+    # -------------------------
+    # Section: Public interface
+    # -------------------------
+
+    def __init__(self, name: Optional[str] = None) -> None:
+        self.name = name
+        if self.name is not None:
+            self.logger = self.logger.getChild(self.name)
+
+        # stream I/O
+        self._reader: Optional[StreamReader] = None
+        self._writer: Optional[StreamWriter] = None
+
+        # I/O queues
+        self._outgoing: asyncio.Queue[T] = asyncio.Queue()
+
+        # I/O tasks (message reader, message writer)
+        self._tasks = AsyncTasks(self.logger)
+
+        # Disconnect task; separate from the core loop.
+        self._dc_task: Optional[asyncio.Future[None]] = None
+
+    @property
+    def running(self) -> bool:
+        """
+        Return True when the loop is currently connected and running.
+        """
+        if self.disconnecting:
+            return False
+        return self._tasks.running
+
+    @property
+    def disconnecting(self) -> bool:
+        """
+        Return True when the loop is disconnecting, or disconnected.
+        """
+        return bool(self._dc_task)
+
+    @property
+    def unconnected(self) -> bool:
+        """
+        Return True when the loop is fully idle and quiesced.
+
+        Returns True specifically when the loop is neither `running`
+        nor `disconnecting`. A call to `disconnect()` is required
+        to transition from `disconnecting` to `unconnected`.
+        """
+        return not (self.running or self.disconnecting)
+
+    async def accept(self, address: Union[str, Tuple[str, int]],
+                     ssl: Optional[SSLContext] = None) -> None:
+        """
+        Accept a connection and begin processing message queues.
+
+        :param address: Address to connect to;
+                        UNIX socket path or TCP address/port.
+        :param ssl:     SSL context to use, if any.
+
+        :raise: `StateError`   (loop is running or disconnecting.)
+        :raise: `ConnectError` (Connection was not successful.)
+        """
+        if self.disconnecting:
+            raise StateError("Client is disconnecting/disconnected."
+                             " Call disconnect() to fully disconnect.")
+        if self.running:
+            raise StateError("Client is already connected and running.")
+        assert self.unconnected
+
+        try:
+            await self._new_session(self._do_accept(address, ssl))
+        except Exception as err:
+            emsg = "Failed to accept incoming connection"
+            self.logger.error("%s:\n%s\n", emsg, pretty_traceback())
+            raise ConnectError(f"{emsg}: {err!s}") from err
+
+    async def connect(self, address: Union[str, Tuple[str, int]],
+                      ssl: Optional[SSLContext] = None) -> None:
+        """
+        Connect to the server and begin processing message queues.
+
+        :param address: Address to connect to;
+                        UNIX socket path or TCP address/port.
+        :param ssl:     SSL context to use, if any.
+
+        :raise: `StateError`   (loop is running or disconnecting.)
+        :raise: `ConnectError` (Connection was not successful.)
+        """
+        if self.disconnecting:
+            raise StateError("Client is disconnecting/disconnected."
+                             " Call disconnect() to fully disconnect.")
+        if self.running:
+            raise StateError("Client is already connected and running.")
+        assert self.unconnected
+
+        try:
+            await self._new_session(self._do_connect(address, ssl))
+        except Exception as err:
+            emsg = "Failed to connect to server"
+            self.logger.error("%s:\n%s\n", emsg, pretty_traceback())
+            raise ConnectError(f"{emsg}: {err!s}") from err
+
+    async def disconnect(self) -> None:
+        """
+        Disconnect and wait for all tasks to fully stop.
+
+        If there were exceptions that caused the bottom half to terminate
+        prematurely, they will be raised here.
+
+        :raise: `Exception`      Arbitrary exceptions re-raised on behalf of
+                                 the bottom half.
+        :raise: `MultiException` Iterable Exception used to multiplex multiple
+                                 exceptions when multiple tasks failed.
+        """
+        self._schedule_disconnect()
+        await self._wait_disconnect()
+
+    # -----------------------------
+    # Section: Connection machinery
+    # -----------------------------
+
+    async def _register_streams(self,
+                                reader: asyncio.StreamReader,
+                                writer: asyncio.StreamWriter) -> None:
+        """Register the Reader/Writer streams."""
+        self._reader = reader
+        self._writer = writer
+
+    async def _new_session(self, coro: Awaitable[None]) -> None:
+        """
+        Create a new session.
+
+        This is called for both `accept()` and `connect()` pathways.
+
+        :param coro: An awaitable that will perform either connect or accept.
+        """
+        assert self._reader is None
+        assert self._writer is None
+
+        # NB: If a previous session had stale messages, they are dropped here.
+        self._outgoing = asyncio.Queue()
+
+        # Connect / Await Connection
+        await coro
+        assert self._reader is not None
+        assert self._writer is not None
+
+        await self._on_connect()
+
+        reader_coro = self._bh_loop_forever(self._bh_recv_message, 'Reader')
+        writer_coro = self._bh_loop_forever(self._bh_send_message, 'Writer')
+        self._tasks.start(reader_coro, writer_coro)
+
+        await self._on_start()
+
+    async def _do_accept(self, address: Union[str, Tuple[str, int]],
+                         ssl: Optional[SSLContext] = None) -> None:
+        """
+        Acting as the protocol server, accept a single connection.
+
+        Used as the awaitable callback to `_new_session()`.
+        """
+        self.logger.debug("Awaiting connection ...")
+        connected = asyncio.Event()
+        server: Optional[asyncio.AbstractServer] = None
+
+        async def _client_connected_cb(reader: asyncio.StreamReader,
+                                       writer: asyncio.StreamWriter) -> None:
+            """Used to accept a single incoming connection, see below."""
+            nonlocal server
+            nonlocal connected
+
+            # A connection has been accepted; stop listening for new ones.
+            assert server is not None
+            server.close()
+            await server.wait_closed()
+            server = None
+
+            # Register this client as being connected
+            await self._register_streams(reader, writer)
+
+            # Signal back: We've accepted a client!
+            connected.set()
+
+        if isinstance(address, tuple):
+            coro = asyncio.start_server(
+                _client_connected_cb,
+                host=address[0],
+                port=address[1],
+                ssl=ssl,
+                backlog=1,
+            )
+        else:
+            coro = asyncio.start_unix_server(
+                _client_connected_cb,
+                path=address,
+                ssl=ssl,
+                backlog=1,
+            )
+
+        server = await coro     # Starts listening
+        await connected.wait()  # Waits for the callback to fire (and finish)
+        assert server is None
+
+        self.logger.debug("Connection accepted")
+
+    async def _do_connect(self, address: Union[str, Tuple[str, int]],
+                          ssl: Optional[SSLContext] = None) -> None:
+        self.logger.debug("Connecting ...")
+
+        if isinstance(address, tuple):
+            connect = asyncio.open_connection(address[0], address[1], ssl=ssl)
+        else:
+            connect = asyncio.open_unix_connection(path=address, ssl=ssl)
+        reader, writer = await(connect)
+        await self._register_streams(reader, writer)
+
+        self.logger.debug("Connected")
+
+    async def _on_connect(self) -> None:
+        """
+        Async callback invoked after connection, but prior to loop start.
+
+        This callback is invoked after the stream is opened, but prior to
+        starting the reader/writer tasks. Use this callback to handle
+        handshakes, greetings, &c to avoid having special edge cases in the
+        generic message handler.
+        """
+        # Nothing to do in the general case.
+
+    async def _on_start(self) -> None:
+        """
+        Async callback invoked after connection and loop start.
+
+        This callback is invoked after the stream is opened AND after
+        the reader/writer tasks have been started. Use this callback to
+        auto-perform certain tasks during the connect() call.
+        """
+        # Nothing to do in the general case.
+
+    def _schedule_disconnect(self) -> None:
+        """
+        Initiate a disconnect; idempotent.
+
+        This is called by the reader/writer tasks upon exceptions,
+        or directly by a user call to `disconnect()`.
+        """
+        if not self._dc_task:
+            self._dc_task = create_task(self._bh_disconnect())
+
+    async def _wait_disconnect(self) -> None:
+        """
+        _wait_disconnect waits for a scheduled disconnect to finish.
+
+        This function will gather any bottom half exceptions and re-raise them;
+        so it is intended to be used in the upper half call chain.
+
+        If a single exception is encountered, it will be re-raised faithfully.
+        If multiple are found, they will be multiplexed into a MultiException.
+
+        :raise: `Exception`      Many kinds; anything the bottom half raises.
+        :raise: `MultiException` When the Reader/Writer both have exceptions.
+        """
+        assert self._dc_task
+        await self._dc_task
+        self._dc_task = None
+
+        try:
+            self._tasks.result()
+        finally:
+            self._cleanup()
+
+    def _cleanup(self) -> None:
+        """
+        Fully reset this object to a clean state.
+        """
+        assert not self.running
+        assert self._dc_task is None
+        # _tasks.result() called in _wait_disconnect does _tasks cleanup, so:
+        assert not self._tasks
+
+        self._reader = None
+        self._writer = None
+
+    # ------------------------------
+    # Section: Bottom Half functions
+    # ------------------------------
+
+    async def _bh_disconnect(self) -> None:
+        """
+        Disconnect and cancel all outstanding tasks.
+
+        It is designed to be called from its task context, self._dc_task.
+        """
+        # RFC: Maybe I shot myself in the foot by trying too hard to
+        # group the tasks together as one unit. I suspect the ideal
+        # cancellation order here is actually: MessageWriter,
+        # StreamWriter, MessageReader
+
+        # What I have here instead is MessageWriter, MessageReader,
+        # StreamWriter
+
+        # Cancel the the message reader/writer.
+        await self._tasks.cancel()
+
+        # Handle the stream writer itself, now.
+        if self._writer:
+            if not self._writer.is_closing():
+                self.logger.debug("Writer is open; draining")
+                await self._writer.drain()
+                self.logger.debug("Closing writer")
+                self._writer.close()
+            self.logger.debug("Awaiting writer to fully close")
+            await wait_closed(self._writer)
+            self.logger.debug("Fully closed.")
+
+        # TODO: Add a hook for higher-level protocol cancellations here?
+        #       (Otherwise, the disconnected logging event happens too soon.)
+
+        self.logger.debug("Protocol Disconnected.")
+
+    async def _bh_loop_forever(self, async_fn: _TaskFN, name: str) -> None:
+        """
+        Run one of the bottom-half functions in a loop forever.
+
+        If the bottom half ever raises any exception, schedule a disconnect.
+        """
+        try:
+            while True:
+                await async_fn()
+        except asyncio.CancelledError as err:
+            # We are cancelled (by _bh_disconnect), so no need to call it.
+            self.logger.debug("Task.%s: cancelled: %s.",
+                              name, type(err).__name__)
+            raise
+        except:
+            self.logger.error("Task.%s: failure:\n%s\n", name,
+                              pretty_traceback())
+            self.logger.debug("Task.%s: scheduling disconnect.", name)
+            self._schedule_disconnect()
+            raise
+        finally:
+            self.logger.debug("Task.%s: exiting.", name)
+
+    async def _bh_send_message(self) -> None:
+        """
+        Wait for an outgoing message, then send it.
+        """
+        self.logger.log(5, "Waiting for message in outgoing queue to send ...")
+        msg = await self._outgoing.get()
+        try:
+            self.logger.log(5, "Got outgoing message, sending ...")
+            await self._send(msg)
+        finally:
+            self._outgoing.task_done()
+            self.logger.log(5, "Outgoing message sent.")
+
+    async def _bh_recv_message(self) -> None:
+        """
+        Wait for an incoming message and call `_on_message` to route it.
+
+        Exceptions seen may be from `_recv` or from `_on_message`.
+        """
+        self.logger.log(5, "Waiting to receive incoming message ...")
+        msg = await self._recv()
+        self.logger.log(5, "Routing message ...")
+        await self._on_message(msg)
+        self.logger.log(5, "Message routed.")
+
+    # ---------------------
+    # Section: Datagram I/O
+    # ---------------------
+
+    def _cb_outbound(self, msg: T) -> T:
+        """
+        Callback: outbound message hook.
+
+        This is intended for subclasses to be able to add arbitrary hooks to
+        filter or manipulate outgoing messages. The base implementation
+        does nothing but log the message without any manipulation of the
+        message. It is designed for you to invoke super() at the tail of
+        any overridden method.
+
+        :param msg: raw outbound message
+        :return: final outbound message
+        """
+        self.logger.debug("--> %s", str(msg))
+        return msg
+
+    def _cb_inbound(self, msg: T) -> T:
+        """
+        Callback: inbound message hook.
+
+        This is intended for subclasses to be able to add arbitrary hooks to
+        filter or manipulate incoming messages. The base implementation
+        does nothing but log the message without any manipulation of the
+        message. It is designed for you to invoke super() at the head of
+        any overridden method.
+
+        This method does not "handle" incoming messages; it is a filter.
+        The actual "endpoint" for incoming messages is `_on_message()`.
+
+        :param msg: raw inbound message
+        :return: processed inbound message
+        """
+        self.logger.debug("<-- %s", str(msg))
+        return msg
+
+    async def _readline(self) -> bytes:
+        """
+        Wait for a newline from the incoming reader.
+
+        This method is provided as a convenience for upper-layer
+        protocols, as many will be line-based.
+
+        This function *may* return a sequence of bytes without a
+        trailing newline if EOF occurs, but *some* bytes were
+        received. In this case, the next call will raise EOF.
+
+        :raise OSError: Stream-related errors.
+        :raise EOFError: If the reader stream is at EOF and there
+                         are no bytes to return.
+        """
+        assert self._reader is not None
+        msg_bytes = await self._reader.readline()
+        self.logger.log(5, "Read %d bytes", len(msg_bytes))
+
+        if not msg_bytes:
+            if self._reader.at_eof():
+                self.logger.debug("EOF")
+                raise EOFError()
+
+        return msg_bytes
+
+    async def _do_recv(self) -> T:
+        """
+        Abstract: Read from the stream and return a message.
+
+        Very low-level; intended to only be called by `_recv()`.
+        """
+        raise NotImplementedError
+
+    async def _recv(self) -> T:
+        """
+        Read an arbitrary protocol message. (WARNING: Extremely low-level.)
+
+        This function is intended primarily for _bh_recv_message to use
+        in an asynchronous task loop. Using it outside of this loop will
+        "steal" messages from the normal routing mechanism. It is safe to
+        use during `_on_connect()`, but should not be used otherwise.
+
+        This function uses `_do_recv()` to retrieve the raw message, and
+        then transforms it using `_cb_inbound()`.
+
+        Errors raised may be any of those from either method implementation.
+
+        :return: A single (filtered, processed) protocol message.
+        """
+        message = await self._do_recv()
+        return self._cb_inbound(message)
+
+    def _do_send(self, msg: T) -> None:
+        """
+        Abstract: Write a message to the stream.
+
+        Very low-level; intended to only be called by `_send()`.
+        """
+        raise NotImplementedError
+
+    async def _send(self, msg: T) -> None:
+        """
+        Send an arbitrary protocol message. (WARNING: Low-level.)
+
+        Like `_read()`, this function is intended to be called by the writer
+        task loop that processes outgoing messages. This function will
+        transform any outgoing messages according to `_cb_outbound()`.
+
+        :raise: OSError - Various stream errors.
+        """
+        assert self._writer is not None
+        msg = self._cb_outbound(msg)
+        self._do_send(msg)
+
+    async def _on_message(self, msg: T) -> None:
+        """
+        Called when a new message is received.
+
+        Executed from within the reader loop BH, so be advised that waiting
+        on other asynchronous tasks may be risky, depending. Additionally,
+        any errors raised here will directly cause the loop to halt; limit
+        error checking to what is strictly necessary for message routing.
+
+        :param msg: The incoming message, already logged/filtered.
+        """
+        # Nothing to do in the abstract case.
-- 
2.30.2



^ permalink raw reply	[flat|nested] 21+ messages in thread

* [PATCH RFC 4/7] message: add QMP Message type
  2021-04-13 15:55 [PATCH RFC 0/7] RFC: Asynchronous QMP Draft John Snow
                   ` (2 preceding siblings ...)
  2021-04-13 15:55 ` [PATCH RFC 3/7] protocol: generic async message-based protocol loop John Snow
@ 2021-04-13 15:55 ` John Snow
  2021-04-13 20:07   ` Stefan Hajnoczi
  2021-04-13 15:55 ` [PATCH RFC 5/7] models: Add well-known QMP objects John Snow
                   ` (3 subsequent siblings)
  7 siblings, 1 reply; 21+ messages in thread
From: John Snow @ 2021-04-13 15:55 UTC (permalink / raw)
  To: qemu-devel; +Cc: crosa, John Snow, ehabkost, stefanha, armbru

This is an abstraction that represents a single message either sent to
or received from the server. It is used to subclass the
AsyncProtocol(Generic[T]) type.

It was written such that it can be populated by either raw data or by a
dict, with the other form being generated on-demand, as-needed.

It behaves almost exactly like a dict, but has some extra methods and a
special constructor. (It should quack fairly convincingly.)

Signed-off-by: John Snow <jsnow@redhat.com>
---
 message.py | 196 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 196 insertions(+)
 create mode 100644 message.py

diff --git a/message.py b/message.py
new file mode 100644
index 0000000..5c7e828
--- /dev/null
+++ b/message.py
@@ -0,0 +1,196 @@
+"""
+QMP Message format and errors.
+
+This module provides the `Message` class, which represents a single QMP
+message sent to or from the server. Several error-classes that depend on
+knowing the format of this message are also included here.
+"""
+
+import json
+from json import JSONDecodeError
+from typing import (
+    Dict,
+    ItemsView,
+    Iterable,
+    KeysView,
+    Optional,
+    Union,
+    ValuesView,
+)
+
+from error import (
+    DeserializationError,
+    ProtocolError,
+    UnexpectedTypeError,
+)
+
+
+class Message:
+    """
+    Represents a single QMP protocol message.
+
+    QMP uses JSON objects as its basic communicative unit; so this
+    object behaves like a MutableMapping. It may be instantiated from
+    either another mapping (like a dict), or from raw bytes that still
+    need to be deserialized.
+
+    :param value: Initial value, if any.
+    :param eager: When true, attempt to serialize (or deserialize) the
+                  initial value immediately, such that conversion exceptions
+                  are raised during the call to the initialization method.
+    """
+    # TODO: make Message properly a MutableMapping so it can be typed as such?
+    def __init__(self,
+                 value: Union[bytes, Dict[str, object]] = b'', *,
+                 eager: bool = True):
+        self._data: Optional[bytes] = None
+        self._obj: Optional[Dict[str, object]] = None
+
+        if isinstance(value, bytes):
+            self._data = value
+            if eager:
+                self._obj = self._deserialize(self._data)
+        else:
+            self._obj = value
+            if eager:
+                self._data = self._serialize(self._obj)
+
+    @classmethod
+    def _serialize(cls, value: object) -> bytes:
+        """
+        Serialize a JSON object as bytes.
+
+        :raises: ValueError, TypeError from the json library.
+        """
+        return json.dumps(value, separators=(',', ':')).encode('utf-8')
+
+    @classmethod
+    def _deserialize(cls, data: bytes) -> Dict[str, object]:
+        """
+        Deserialize JSON bytes into a native python dict.
+
+        :raises: DeserializationError if JSON deserialization
+                 fails for any reason.
+        :raises: UnexpectedTypeError if data does not represent
+                 a JSON object.
+        """
+        try:
+            obj = json.loads(data)
+        except JSONDecodeError as err:
+            emsg = "Failed to deserialize QMP message."
+            raise DeserializationError(emsg, data) from err
+        if not isinstance(obj, dict):
+            raise UnexpectedTypeError(
+                "Incoming QMP message is not a JSON object.",
+                data
+            )
+        return obj
+
+    @property
+    def data(self) -> bytes:
+        """
+        bytes representing this QMP message.
+
+        Generated on-demand if required.
+        """
+        if self._data is None:
+            self._data = self._serialize(self._obj or {})
+        return self._data
+
+    @property
+    def _object(self) -> Dict[str, object]:
+        """
+        dict representing this QMP message.
+
+        Generated on-demand if required; Private because it returns an
+        object that could be used to validate the internal state of the
+        Message object.
+        """
+        if self._obj is None:
+            self._obj = self._deserialize(self._data or b'')
+        return self._obj
+
+    def __str__(self) -> str:
+        """Pretty-printed representation of this QMP message."""
+        return json.dumps(self._object, indent=2)
+
+    def __bytes__(self) -> bytes:
+        return self.data
+
+    def __contains__(self, item: str) -> bool:  # Container, Collection
+        return item in self._object
+
+    def __iter__(self) -> Iterable[str]:  # Iterable, Collection, Mapping
+        return iter(self._object)
+
+    def __len__(self) -> int:  # Sized, Collection, Mapping
+        return len(self._object)
+
+    def __getitem__(self, key: str) -> object:  # Mapping
+        return self._object[key]
+
+    def __setitem__(self, key: str, value: object) -> None:  # MutableMapping
+        self._object[key] = value
+        self._data = None
+
+    def __delitem__(self, key: str) -> None:  # MutableMapping
+        del self._object[key]
+        self._data = None
+
+    def keys(self) -> KeysView[str]:
+        """Return a KeysView object containing all field names."""
+        return self._object.keys()
+
+    def items(self) -> ItemsView[str, object]:
+        """Return an ItemsView object containing all key:value pairs."""
+        return self._object.items()
+
+    def values(self) -> ValuesView[object]:
+        """Return a ValuesView object containing all field values."""
+        return self._object.values()
+
+    def get(self, key: str,
+            default: Optional[object] = None) -> Optional[object]:
+        """Get the value for a single key."""
+        return self._object.get(key, default)
+
+
+class MsgProtocolError(ProtocolError):
+    """Abstract error class for protocol errors that have a JSON object."""
+    def __init__(self, error_message: str, msg: Message):
+        super().__init__(error_message)
+        self.msg = msg
+
+    def __str__(self) -> str:
+        return "\n".join([
+            super().__str__(),
+            f"  Message was: {str(self.msg)}\n",
+        ])
+
+
+class ObjectTypeError(MsgProtocolError):
+    """
+    Incoming message was a JSON object, but has an unexpected data shape.
+
+    e.g.: A malformed greeting may cause this error.
+    """
+
+
+# FIXME: Remove this? Current draft simply trashes these replies.
+
+# class OrphanedError(MsgProtocolError):
+#     """
+#     Received message, but had no queue to deliver it to.
+#
+#     e.g.: A reply arrives from the server, but the ID does not match any
+#     pending execution requests we are aware of.
+#     """
+
+
+class ServerParseError(MsgProtocolError):
+    """
+    Server sent a `ParsingError` message.
+
+    e.g. A reply arrives from the server, but it is missing the "ID"
+    field, which indicates a parsing error on behalf of the server.
+    """
-- 
2.30.2



^ permalink raw reply	[flat|nested] 21+ messages in thread

* [PATCH RFC 5/7] models: Add well-known QMP objects
  2021-04-13 15:55 [PATCH RFC 0/7] RFC: Asynchronous QMP Draft John Snow
                   ` (3 preceding siblings ...)
  2021-04-13 15:55 ` [PATCH RFC 4/7] message: add QMP Message type John Snow
@ 2021-04-13 15:55 ` John Snow
  2021-04-13 15:55 ` [PATCH RFC 6/7] qmp_protocol: add QMP client implementation John Snow
                   ` (2 subsequent siblings)
  7 siblings, 0 replies; 21+ messages in thread
From: John Snow @ 2021-04-13 15:55 UTC (permalink / raw)
  To: qemu-devel; +Cc: crosa, John Snow, ehabkost, stefanha, armbru

This uses the third-party pydantic library to provide grammatical
validation of various JSON objects used in the QMP protocol, along with
documentation that references where these objects are defined.

This is done both to ensure that objects conform to the standard set
forth in the QMP specification, and to provide a strict type-safe
interface that can be used to access information sent by the server in a
type-safe way.

If you've not run into pydantic before, you define objects by creating
classes that inherit from BaseModel. Then, similar to Python's own
@dataclass format, you declare the fields (and their types) that you
expect to see in this object. Pydantic will then automatically generate
a parser/validator for this object, and the end result is a strictly
typed, native Python object that is guaranteed to have the fields
specified.

NOTE: Pydantic does not, by default, ensure that *extra* fields are not
present in the model. This is intentional, as it allows backwards
compatibility if new fields should be added to the specification in the future.

This strictness feature, however, *can* be added. A debug/strict mode
could be added (but is not present in this RFC) to enable that
strictness on-demand, but for a general-purpose client it's likely best
to leave that disabled.

Signed-off-by: John Snow <jsnow@redhat.com>
---
 models.py | 177 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 177 insertions(+)
 create mode 100644 models.py

diff --git a/models.py b/models.py
new file mode 100644
index 0000000..7c42d47
--- /dev/null
+++ b/models.py
@@ -0,0 +1,177 @@
+"""
+QMP message models.
+
+This module provides definitions for several well-defined JSON object
+types that are seen in the QMP wire protocol. Using pydantic, these
+models also handle the parsing and validation of these objects in order
+to provide strict typing guarantees elsewhere in the library.
+
+Notably, it provides these object models:
+
+- `Greeting`: the standard QMP greeting message (and nested children)
+- Three types of server RPC response messages:
+  - `ErrorResponse`: A failed-execution reply. (Application-level failure)
+  - `SuccessResponse`: A successful execution reply.
+  - `ParsingError`: A reply indicating the RPC message was not understood.
+                  (Library-level failure, or worse.)
+- A special pydantic form of the above three; `ServerResponse`,
+  used to parse incoming messages.
+- `AsynchronousEvent`: A generic event message.
+"""
+
+from typing import (
+    Any,
+    Dict,
+    List,
+    Type,
+    TypeVar,
+    Union,
+)
+
+from pydantic import BaseModel, Field, root_validator, ValidationError
+
+
+from message import Message, ObjectTypeError
+
+
+class MessageBase(BaseModel):
+    """
+    An abstract pydantic model that represents any QMP object.
+
+    It does not define any fields, so it isn't very useful as a type.
+    However, it provides a strictly typed parsing helper that allows
+    us to convert from a QMP `Message` object into a specific model,
+    so long as that model inherits from this class.
+    """
+    _T = TypeVar('_T', bound='MessageBase')
+
+    @classmethod
+    def parse_msg(cls: Type[_T], obj: Message) -> _T:
+        """
+        Convert a `Message` into a strictly typed Python object.
+
+        For Messages that do not pass validation, pydantic validation
+        errors are encapsulated using the `ValidationError` class.
+
+        :raises: ValidationError when the given Message cannot be
+                 validated (and converted into) as an instance of this class.
+        """
+        try:
+            return cls.parse_obj(obj)
+        except ValidationError as err:
+            raise ObjectTypeError("Message failed validation.", obj) from err
+
+
+class VersionTriple(BaseModel):
+    """
+    Mirrors qapi/control.json VersionTriple structure.
+    """
+    major: int
+    minor: int
+    micro: int
+
+
+class VersionInfo(BaseModel):
+    """
+    Mirrors qapi/control.json VersionInfo structure.
+    """
+    qemu: VersionTriple
+    package: str
+
+
+class QMPGreeting(BaseModel):
+    """
+    'QMP' subsection of the protocol greeting.
+
+    Defined in qmp-spec.txt, section 2.2, "Server Greeting".
+    """
+    version: VersionInfo
+    capabilities: List[str]
+
+
+class Greeting(MessageBase):
+    """
+    QMP protocol greeting message.
+
+    Defined in qmp-spec.txt, section 2.2, "Server Greeting".
+    """
+    QMP: QMPGreeting
+
+
+class ErrorInfo(BaseModel):
+    """
+    Error field inside of an error response.
+
+    Defined in qmp-spec.txt, section 2.4.2, "error".
+    """
+    class_: str = Field(None, alias='class')
+    desc: str
+
+
+class ParsingError(MessageBase):
+    """
+    Parsing error from QMP that omits ID due to failure.
+
+    Implicitly defined in qmp-spec.txt, section 2.4.2, "error".
+    """
+    error: ErrorInfo
+
+
+class SuccessResponse(MessageBase):
+    """
+    Successful execution response.
+
+    Defined in qmp-spec.txt, section 2.4.1, "success".
+    """
+    return_: Any = Field(None, alias='return')
+    id: str  # NB: The spec allows ANY object here. AQMP does not!
+
+    @root_validator(pre=True)
+    @classmethod
+    def check_return_value(cls,
+                           values: Dict[str, object]) -> Dict[str, object]:
+        """Enforce that the 'return' key is present, even if it is None."""
+        # To pydantic, 'Any' means 'Optional'; force its presence:
+        if 'return' not in values:
+            raise TypeError("'return' key not present in object.")
+        return values
+
+
+class ErrorResponse(MessageBase):
+    """
+    Unsuccessful execution response.
+
+    Defined in qmp-spec.txt, section 2.4.2, "error".
+    """
+    error: ErrorInfo
+    id: str  # NB: The spec allows ANY object here. AQMP does not!
+
+
+class ServerResponse(MessageBase):
+    """
+    Union type: This object can be any one of the component messages.
+
+    Implicitly defined in qmp-spec.txt, section 2.4, "Commands Responses".
+    """
+    __root__: Union[SuccessResponse, ErrorResponse, ParsingError]
+
+
+class EventTimestamp(BaseModel):
+    """
+    Timestamp field of QMP event, see `AsynchronousEvent`.
+
+    Defined in qmp-spec.txt, section 2.5, "Asynchronous events".
+    """
+    seconds: int
+    microseconds: int
+
+
+class AsynchronousEvent(BaseModel):
+    """
+    Asynchronous event message.
+
+    Defined in qmp-spec.txt, section 2.5, "Asynchronous events".
+    """
+    event: str
+    data: Union[List[Any], Dict[str, Any], str, int, float]
+    timestamp: EventTimestamp
-- 
2.30.2



^ permalink raw reply	[flat|nested] 21+ messages in thread

* [PATCH RFC 6/7] qmp_protocol: add QMP client implementation
  2021-04-13 15:55 [PATCH RFC 0/7] RFC: Asynchronous QMP Draft John Snow
                   ` (4 preceding siblings ...)
  2021-04-13 15:55 ` [PATCH RFC 5/7] models: Add well-known QMP objects John Snow
@ 2021-04-13 15:55 ` John Snow
  2021-04-14  5:44   ` Stefan Hajnoczi
  2021-04-13 15:55 ` [PATCH RFC 7/7] linter config John Snow
  2021-04-14  6:38 ` [PATCH RFC 0/7] RFC: Asynchronous QMP Draft Stefan Hajnoczi
  7 siblings, 1 reply; 21+ messages in thread
From: John Snow @ 2021-04-13 15:55 UTC (permalink / raw)
  To: qemu-devel; +Cc: crosa, John Snow, ehabkost, stefanha, armbru

Using everything added so far, add the QMP client itself.

So far, this QMP object cannot actually pretend to be a server; it only
implements the client logic (receiving events and sending commands.)
Future work may involve implementing the ability to send events and
receive RPC commands, so that we can create a QMP test server for unit
test purposes.

(It can, however, both connect to or receive a connection from QEMU so
that it can be used to instrument iotests.)

Note: the event handling is a total hack; I need to figure out the most
delightful way to create an interface to consume these easily, as I
think it's one of the biggest shortcomings of the synchronous library so
far. Consider that part very much a work-in-progress.

Signed-off-by: John Snow <jsnow@redhat.com>
---
 qmp_protocol.py | 420 ++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 420 insertions(+)
 create mode 100644 qmp_protocol.py

diff --git a/qmp_protocol.py b/qmp_protocol.py
new file mode 100644
index 0000000..6e6ac25
--- /dev/null
+++ b/qmp_protocol.py
@@ -0,0 +1,420 @@
+"""
+QMP Client Implementation
+
+This module provides the QMP class, which can be used to connect and
+send commands to a QMP server such as QEMU. The QMP class can be used to
+either connect to a listening server, or used to listen and accept an
+incoming connection from the server.
+"""
+
+import asyncio
+import logging
+from typing import (
+    Awaitable,
+    Callable,
+    Dict,
+    List,
+    Mapping,
+    Optional,
+    Tuple,
+    cast,
+)
+
+from error import (
+    AQMPError,
+    DisconnectedError,
+    DeserializationError,
+    GreetingError,
+    NegotiationError,
+    StateError,
+    UnexpectedTypeError,
+)
+from message import (
+    Message,
+    ObjectTypeError,
+    ServerParseError,
+)
+from models import (
+    ErrorInfo,
+    ErrorResponse,
+    Greeting,
+    ParsingError,
+    ServerResponse,
+    SuccessResponse,
+)
+from protocol import AsyncProtocol
+from util import create_task, pretty_traceback
+
+
+class ExecuteError(AQMPError):
+    """Execution statement returned failure."""
+    def __init__(self,
+                 sent: Message,
+                 received: Message,
+                 error: ErrorInfo):
+        super().__init__()
+        self.sent = sent
+        self.received = received
+        self.error = error
+
+    def __str__(self) -> str:
+        return self.error.desc
+
+
+_EventCallbackFn = Callable[['QMP', Message], Awaitable[None]]
+
+
+class QMP(AsyncProtocol[Message]):
+    """
+    Implements a QMP connection to/from the server.
+
+    Basic usage looks like this::
+
+      qmp = QMP('my_virtual_machine_name')
+      await qmp.connect(('127.0.0.1', 1234))
+      ...
+      res = await qmp.execute('block-query')
+      ...
+      await qmp.disconnect()
+
+    :param name: Optional nickname for the connection, used for logging.
+    """
+    #: Logger object for debugging messages
+    logger = logging.getLogger(__name__)
+
+    def __init__(self, name: Optional[str] = None) -> None:
+        super().__init__(name)
+
+        # Greeting
+        self.await_greeting = True
+        self._greeting: Optional[Greeting] = None
+        self.greeting_timeout = 5  # (In seconds)
+
+        # RFC: Do I even want to use any timeouts internally? They're
+        # not defined in the protocol itself. Theoretically, a client
+        # could simply use asyncio.wait_for(qmp.connect(...), timeout=5)
+        # and then I don't have to support this interface at all.
+        #
+        # We don't need to support any timeouts so long as we never initiate
+        # any long-term wait that wasn't in direct response to a user action.
+
+        # Command ID counter
+        self._execute_id = 0
+
+        # Event handling
+        self._event_queue: asyncio.Queue[Message] = asyncio.Queue()
+        self._event_callbacks: List[_EventCallbackFn] = []
+
+        # Incoming RPC reply messages
+        self._pending: Dict[str, Tuple[
+            asyncio.Future[object],
+            asyncio.Queue[Message]]] = {}
+
+    def on_event(self, func: _EventCallbackFn) -> _EventCallbackFn:
+        """
+        FIXME: Quick hack: decorator to register event handlers.
+
+        Use it like this::
+
+          @qmp.on_event
+          async def my_event_handler(qmp, event: Message) -> None:
+            print(f"Received event: {event['event']}")
+
+        RFC: What kind of event handler would be the most useful in
+        practical terms? In tests, we are usually waiting for an
+        event with some criteria to occur; maybe it would be useful
+        to allow "coroutine" style functions where we can block
+        until a certain event shows up?
+        """
+        if func not in self._event_callbacks:
+            self._event_callbacks.append(func)
+        return func
+
+    async def _new_session(self, coro: Awaitable[None]) -> None:
+        self._event_queue = asyncio.Queue()
+        await super()._new_session(coro)
+
+    async def _on_connect(self) -> None:
+        """
+        Wait for the QMP greeting prior to the engagement of the full loop.
+
+        :raise: GreetingError when the greeting is not understood.
+        """
+        if self.await_greeting:
+            self._greeting = await self._get_greeting()
+
+    async def _on_start(self) -> None:
+        """
+        Perform QMP negotiation right after the loop starts.
+
+        Negotiation is performed afterwards so that the implementation
+        can simply use `execute()`, which relies on the loop machinery
+        to be running.
+
+        :raise: NegotiationError if the negotiation fails in some way.
+        """
+        await self._negotiate()
+
+    async def _get_greeting(self) -> Greeting:
+        """
+        :raise: GreetingError  (Many causes.)
+        """
+        self.logger.debug("Awaiting greeting ...")
+        try:
+            msg = await asyncio.wait_for(self._recv(), self.greeting_timeout)
+            return Greeting.parse_msg(msg)
+        except Exception as err:
+            if isinstance(err, (asyncio.TimeoutError, OSError, EOFError)):
+                emsg = "Failed to receive Greeting"
+            elif isinstance(err, (DeserializationError, UnexpectedTypeError)):
+                emsg = "Failed to understand Greeting"
+            elif isinstance(err, ObjectTypeError):
+                emsg = "Failed to validate Greeting"
+            else:
+                emsg = "Unknown failure acquiring Greeting"
+
+            self.logger.error("%s:\n%s\n", emsg, pretty_traceback())
+            raise GreetingError(emsg, err) from err
+
+    async def _negotiate(self) -> None:
+        """
+        :raise: NegotiationError  (Many causes.)
+        """
+        self.logger.debug("Negotiating capabilities ...")
+        arguments: Dict[str, List[str]] = {'enable': []}
+        if self._greeting and 'oob' in self._greeting.QMP.capabilities:
+            arguments['enable'].append('oob')
+        try:
+            await self.execute('qmp_capabilities', arguments=arguments)
+        except Exception as err:
+            # FIXME: what exceptions do we actually expect execute to raise?
+            emsg = "Failure negotiating capabilities"
+            self.logger.error("%s:\n%s\n", emsg, pretty_traceback())
+            raise NegotiationError(emsg, err) from err
+
+    async def _bh_disconnect(self) -> None:
+        # See AsyncProtocol._bh_disconnect().
+        await super()._bh_disconnect()
+
+        if self._pending:
+            self.logger.debug("Cancelling pending executions")
+        for key in self._pending:
+            self.logger.debug("Cancelling execution %s", key)
+            # NB: This signals cancellation, but doesn't fully quiesce;
+            # it merely requests the cancellation; it will be thrown into
+            # that tasks's context on the next event loop cycle.
+            #
+            # This task is being awaited on by `_execute()`, which will
+            # exist in the user's callstack in the upper-half. Since
+            # we're here, we know it isn't running! It won't have a
+            # chance to run again except to receive a cancellation.
+            #
+            # NB: Python 3.9 adds a msg= parameter to cancel that would
+            # be useful for debugging the 'cause' of cancellations.
+            self._pending[key][0].cancel()
+
+        self.logger.debug("QMP Disconnected.")
+
+    async def _on_message(self, msg: Message) -> None:
+        """
+        Add an incoming message to the appropriate queue/handler.
+
+        :raise: RawProtocolError     (`_recv` via `Message._deserialize`)
+        :raise: ServerParseError     (Message has no 'event' nor 'id' field)
+        """
+        # Incoming messages are not fully parsed/validated here;
+        # do only light peeking to know how to route the messages.
+
+        if 'event' in msg:
+            await self._event_queue.put(msg)
+            # FIXME: quick hack; event queue handling.
+            for func in self._event_callbacks:
+                await func(self, msg)
+            return
+
+        # Below, we assume everything left is an execute/exec-oob response.
+
+        if 'id' in msg:
+            exec_id = str(msg['id'])
+            if exec_id not in self._pending:
+                # qmp-spec.txt, section 2.4:
+                # 'Clients should drop all the responses
+                #  that have an unknown "id" field.'
+                self.logger.warning("Unknown ID '%s', response dropped.",
+                                    exec_id)
+                return
+        else:
+            # This is a server parsing error;
+            # It inherently does not "belong" to any pending execution.
+            # Instead of performing clever recovery, just terminate.
+            raise ServerParseError(
+                "Server sent a message without an ID,"
+                " indicating parse failure.", msg)
+
+        _, queue = self._pending[exec_id]
+        await queue.put(msg)
+
+    async def _do_recv(self) -> Message:
+        """
+        :raise: OSError            (Stream errors)
+        :raise: `EOFError`         (When the stream is at EOF)
+        :raise: `RawProtocolError` (via `Message._deserialize`)
+
+        :return: A single QMP `Message`.
+        """
+        msg_bytes = await self._readline()
+        msg = Message(msg_bytes, eager=True)
+        return msg
+
+    def _do_send(self, msg: Message) -> None:
+        """
+        :raise: ValueError  (JSON serialization failure)
+        :raise: TypeError   (JSON serialization failure)
+        :raise: OSError     (Stream errors)
+        """
+        assert self._writer is not None
+        self._writer.write(bytes(msg))
+
+    def _cleanup(self) -> None:
+        super()._cleanup()
+        self._greeting = None
+        assert self._pending == {}
+        self._event_queue = asyncio.Queue()
+
+    @classmethod
+    def make_execute_msg(cls, cmd: str,
+                         arguments: Optional[Mapping[str, object]] = None,
+                         oob: bool = False) -> Message:
+        """
+        Create an executable message to be sent by `execute_msg` later.
+
+        :param cmd: QMP command name.
+        :param arguments: Arguments (if any). Must be JSON-serializable.
+        :param oob: If true, execute "out of band".
+
+        :return: An executable QMP message.
+        """
+        msg = Message({'exec-oob' if oob else 'execute': cmd})
+        if arguments is not None:
+            msg['arguments'] = arguments
+        return msg
+
+    async def _bh_execute(self, msg: Message,
+                          queue: 'asyncio.Queue[Message]') -> object:
+        """
+        Execute a QMP Message and wait for the result.
+
+        :param msg: Message to execute.
+        :param queue: The queue we should expect to see a reply delivered to.
+
+        :return: Execution result from the server.
+                 The type depends on the command sent.
+        """
+        if not self.running:
+            raise StateError("QMP is not running.")
+        assert self._outgoing
+
+        self._outgoing.put_nowait(msg)
+        reply_msg = await queue.get()
+
+        # May raise ObjectTypeError (Unlikely - only if it has missing keys.)
+        reply = ServerResponse.parse_msg(reply_msg).__root__
+        assert not isinstance(reply, ParsingError)  # Handled by BH
+
+        if isinstance(reply, ErrorResponse):
+            # Server indicated execution failure.
+            raise ExecuteError(msg, reply_msg, reply.error)
+
+        assert isinstance(reply, SuccessResponse)
+        return reply.return_
+
+    async def _execute(self, msg: Message) -> object:
+        """
+        The same as `execute_msg()`, but without safety mechanisms.
+
+        Does not assign an execution ID and does not check that the form
+        of the message being sent is valid.
+
+        This method *Requires* an 'id' parameter to be set on the
+        message, it will not set one for you like `execute()` or
+        `execute_msg()`.
+
+        Do not use "__aqmp#00000" style IDs, use something else to avoid
+        potential clashes. If this ID clashes with an ID presently
+        in-use or otherwise clashes with the auto-generated IDs, the
+        response routing mechanisms in _on_message may very well fail
+        loudly enough to cause the entire loop to crash.
+
+        The ID should be a str; or at least something JSON
+        serializable. It *must* be hashable.
+        """
+        exec_id = cast(str, msg['id'])
+        self.logger.debug("Execute(%s): '%s'", exec_id,
+                          msg.get('execute', msg.get('exec-oob')))
+
+        queue: asyncio.Queue[Message] = asyncio.Queue(maxsize=1)
+        task = create_task(self._bh_execute(msg, queue))
+        self._pending[exec_id] = (task, queue)
+
+        try:
+            result = await task
+        except asyncio.CancelledError as err:
+            raise DisconnectedError("Disconnected") from err
+        finally:
+            del self._pending[exec_id]
+
+        return result
+
+    async def execute_msg(self, msg: Message) -> object:
+        """
+        Execute a QMP message and return the response.
+
+        :param msg: The QMP `Message` to execute.
+        :raises: ValueError if the QMP `Message` does not have either the
+                 'execute' or 'exec-oob' fields set.
+        :raises: ExecuteError if the server returns an error response.
+        :raises: DisconnectedError if the connection was terminated early.
+
+        :return: Execution response from the server. The type of object depends
+                 on the command that was issued, though most return a dict.
+        """
+        if not ('execute' in msg or 'exec-oob' in msg):
+            raise ValueError("Requires 'execute' or 'exec-oob' message")
+        if self.disconnecting:
+            raise StateError("QMP is disconnecting/disconnected."
+                             " Call disconnect() to fully disconnect.")
+
+        # FIXME: Copy the message here, to avoid leaking the ID back out.
+
+        exec_id = f"__aqmp#{self._execute_id:05d}"
+        msg['id'] = exec_id
+        self._execute_id += 1
+
+        return await self._execute(msg)
+
+    async def execute(self, cmd: str,
+                      arguments: Optional[Mapping[str, object]] = None,
+                      oob: bool = False) -> object:
+        """
+        Execute a QMP command and return the response.
+
+        :param cmd: QMP command name.
+        :param arguments: Arguments (if any). Must be JSON-serializable.
+        :param oob: If true, execute "out of band".
+
+        :raise: ExecuteError if the server returns an error response.
+        :raise: DisconnectedError if the connection was terminated early.
+
+        :return: Execution response from the server. The type of object depends
+                 on the command that was issued, though most return a dict.
+        """
+        # Note: I designed arguments to be its own argument instead of
+        # kwparams so that we are able to add other modifiers that
+        # change execution parameters later on. A theoretical
+        # higher-level API that is generated against a particular QAPI
+        # Schema should generate function signatures the way we want at
+        # that point; modifying those commands to behave differently
+        # could be performed using context managers that alter the QMP
+        # loop for any commands that occur within that block.
+        msg = self.make_execute_msg(cmd, arguments, oob=oob)
+        return await self.execute_msg(msg)
-- 
2.30.2



^ permalink raw reply	[flat|nested] 21+ messages in thread

* [PATCH RFC 7/7] linter config
  2021-04-13 15:55 [PATCH RFC 0/7] RFC: Asynchronous QMP Draft John Snow
                   ` (5 preceding siblings ...)
  2021-04-13 15:55 ` [PATCH RFC 6/7] qmp_protocol: add QMP client implementation John Snow
@ 2021-04-13 15:55 ` John Snow
  2021-04-14  6:38 ` [PATCH RFC 0/7] RFC: Asynchronous QMP Draft Stefan Hajnoczi
  7 siblings, 0 replies; 21+ messages in thread
From: John Snow @ 2021-04-13 15:55 UTC (permalink / raw)
  To: qemu-devel; +Cc: crosa, John Snow, ehabkost, stefanha, armbru

Everything in this series should pass with flake8, pylint, and mypy; but
there are a few bits of dust swept under the rug with these config
files.

Signed-off-by: John Snow <jsnow@redhat.com>
---
 .flake8  |  2 ++
 pylintrc | 53 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 55 insertions(+)
 create mode 100644 .flake8
 create mode 100644 pylintrc

diff --git a/.flake8 b/.flake8
new file mode 100644
index 0000000..45d8146
--- /dev/null
+++ b/.flake8
@@ -0,0 +1,2 @@
+[flake8]
+extend-ignore = E722  # Pylint handles this, but smarter.
\ No newline at end of file
diff --git a/pylintrc b/pylintrc
new file mode 100644
index 0000000..7cf16c0
--- /dev/null
+++ b/pylintrc
@@ -0,0 +1,53 @@
+[MASTER]
+
+extension-pkg-allow-list=pydantic
+
+[MESSAGES CONTROL]
+
+# disable=
+
+[REPORTS]
+
+[REFACTORING]
+
+[MISCELLANEOUS]
+
+[LOGGING]
+
+[BASIC]
+
+# Good variable names which should always be accepted, separated by a comma.
+good-names=i,
+           j,
+           k,
+           ex,
+           Run,
+           _,
+           fd,
+           c,
+           ns,
+           rc,
+           T,
+
+[VARIABLES]
+
+[STRING]
+
+[SPELLING]
+
+[FORMAT]
+
+[SIMILARITIES]
+
+# Ignore imports when computing similarities.
+ignore-imports=yes
+
+[TYPECHECK]
+
+[CLASSES]
+
+[IMPORTS]
+
+[DESIGN]
+
+[EXCEPTIONS]
-- 
2.30.2



^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH RFC 3/7] protocol: generic async message-based protocol loop
  2021-04-13 15:55 ` [PATCH RFC 3/7] protocol: generic async message-based protocol loop John Snow
@ 2021-04-13 20:00   ` Stefan Hajnoczi
  2021-04-14 17:29     ` John Snow
  0 siblings, 1 reply; 21+ messages in thread
From: Stefan Hajnoczi @ 2021-04-13 20:00 UTC (permalink / raw)
  To: John Snow; +Cc: armbru, crosa, qemu-devel, ehabkost

[-- Attachment #1: Type: text/plain, Size: 32929 bytes --]

On Tue, Apr 13, 2021 at 11:55:49AM -0400, John Snow wrote:
> This module provides the protocol-agnostic framework upon which QMP will
> be built. I also have (not included in this series) a qtest
> implementation that uses this same framework, which is why it is split
> into two portions like this.
> 
> The design uses two independent tasks in the "bottol half", a writer and
> a reader. These tasks run for the duration of the connection and
> independently send and receive messages, respectively.
> 
> A third task, disconnect, is scheduled whenever an error occurs and
> facilitates coalescing of the other two tasks. MultiException is used in
> this case if *both* tasks should have Exceptions that need to be
> reported, though at the time of writing, I think this circumstance might
> only be a theoretical concern.
> 
> The generic model here does not provide execute(), but the model for QMP
> is informative for how this class is laid out. Below, QMP's execute()
> function deposits a message into the outbound queue. The writer task
> wakes up to process the queue and deposits information in the write
> buffer, where the message is finally dispatched. Meanwhile, the
> execute() call is expected to block on an RPC mailbox waiting for a
> reply from the server.
> 
> On the return trip, the reader wakes up when data arrives in the
> buffer. The message is deserialized and handed off to the protocol layer
> to route accordingly. QMP will route this message into either the Event
> queue or one of the pending RPC mailboxes.
> 
> Upon this message being routed to the correct RPC mailbox, execute()
> will be woken up and allowed to process the reply and deliver it back to
> the caller.
> 
> The reason for separating the inbound and outbound tasks to such an
> extreme degree is to allow for designs and extensions where this
> asynchronous loop may be launched in a separate thread. In this model,
> it is possible to use a synchronous, thread-safe function to deposit new
> messages into the outbound queue; this was seen as a viable way to offer
> solid synchronous bindings while still allowing events to be processed
> truly asynchronously.
> 
> Separating it this way also allows us to fairly easily support
> Out-of-band executions with little additional effort; essentially all
> commands are treated as out-of-band.
> 
> The execute graph:
> 
>                        +---------+
>                        | caller  |
>                        +---------+
>                             |
>                             v
>                        +---------+
>      +---------------- |execute()| <----------+
>      |                 +---------+            |
>      |                                        |
> -----------------------------------------------------------
>      v                                        |
> +----+----+    +-----------+           +------+-------+
> |Mailboxes|    |Event Queue|           |Outbound Queue|
> +----+----+    +------+----+           +------+-------+
>      |                |                       ^
>      v                v                       |
>   +--+----------------+---+       +-----------+-----------+
>   | Reader Task/Coroutine |       | Writer Task/Coroutine |
>   +-----------+-----------+       +-----------+-----------+
>               |                               ^
>               v                               |
>         +-----+------+                  +-----+------+
>         |StreamReader|                  |StreamWriter|
>         +------------+                  +------------+

The arrow directions confuse me. I don't understand what they convey.

> 
> Signed-off-by: John Snow <jsnow@redhat.com>
> ---
>  protocol.py | 704 ++++++++++++++++++++++++++++++++++++++++++++++++++++

Yikes, this is complex. I'm not sure the abstractions are worth the
cost. Hopefully everything will be tied up with a simple high-level API
later in the series.

>  1 file changed, 704 insertions(+)
>  create mode 100644 protocol.py
> 
> diff --git a/protocol.py b/protocol.py
> new file mode 100644
> index 0000000..27d1558
> --- /dev/null
> +++ b/protocol.py
> @@ -0,0 +1,704 @@
> +"""
> +Async message-based protocol support.
> +
> +This module provides a generic framework for sending and receiving
> +messages over an asyncio stream.
> +
> +`AsyncProtocol` is an abstract class that implements the core mechanisms
> +of a simple send/receive protocol, and is designed to be extended.
> +
> +`AsyncTasks` provides a container class that aggregates tasks that make
> +up the loop used by `AsyncProtocol`.
> +"""
> +
> +import asyncio
> +from asyncio import StreamReader, StreamWriter
> +import logging
> +from ssl import SSLContext
> +from typing import (
> +    Any,
> +    Awaitable,
> +    Callable,
> +    Coroutine,
> +    Iterator,
> +    List,
> +    Generic,
> +    Optional,
> +    Tuple,
> +    TypeVar,
> +    Union,
> +)
> +
> +from error import (
> +    ConnectError,
> +    MultiException,
> +    StateError,
> +)
> +from util import create_task, pretty_traceback, wait_closed
> +
> +
> +T = TypeVar('T')
> +_TaskFN = Callable[[], Awaitable[None]]  # aka ``async def func() -> None``
> +_FutureT = TypeVar('_FutureT', bound=Optional['asyncio.Future[Any]'])
> +_GatherRet = List[Optional[BaseException]]
> +
> +
> +class AsyncTasks:
> +    """
> +    AsyncTasks is a collection of bottom half tasks designed to run forever.
> +
> +    This is a convenience wrapper to make calls from `AsyncProtocol` simpler to
> +    follow by behaving as a simple aggregate of two or more tasks, such that
> +    a higher-level connection manager can simply refer to "the bottom half"
> +    as one coherent entity instead of several.
> +
> +    The general flow is:
> +
> +    1. ``tasks = AsyncTasks(logger_for_my_client)``
> +    2. ``tasks.start(my_reader, my_writer)``
> +    3. ``...``
> +    4. ``await tasks.cancel()``
> +    5. ``tasks.result()``
> +
> +    :param logger: A logger to use for debugging messages. Useful to
> +                   associate messages with a particular server context.
> +    """
> +
> +    logger = logging.getLogger(__name__)
> +
> +    def __init__(self, logger: Optional[logging.Logger] = None):
> +        if logger is not None:
> +            self.logger = logger
> +
> +        # Named tasks
> +        self.reader: Optional['asyncio.Future[None]'] = None
> +        self.writer: Optional['asyncio.Future[None]'] = None
> +
> +        # Internal aggregate of all of the above tasks.
> +        self._all: Optional['asyncio.Future[_GatherRet]'] = None
> +
> +    def _all_tasks(self) -> Iterator[Optional['asyncio.Future[None]']]:
> +        """Yields all tasks, defined or not, in ideal cancellation order."""
> +        yield self.writer
> +        yield self.reader
> +
> +    def __iter__(self) -> Iterator['asyncio.Future[None]']:
> +        """Yields all defined tasks, in ideal cancellation order."""
> +        for task in self._all_tasks():
> +            if task is not None:
> +                yield task
> +
> +    @property
> +    def _all_tasks_defined(self) -> bool:
> +        """Returns True if all tasks are defined."""
> +        return all(map(lambda task: task is not None, self._all_tasks()))
> +
> +    @property
> +    def _some_tasks_done(self) -> bool:
> +        """Returns True if any defined tasks are done executing."""
> +        return any(map(lambda task: task.done(), iter(self)))
> +
> +    def __bool__(self) -> bool:
> +        """Returns True when any tasks are defined at all."""
> +        return bool(tuple(iter(self)))
> +
> +    @property
> +    def running(self) -> bool:
> +        """Returns True if all tasks are defined and still running."""
> +        return self._all_tasks_defined and not self._some_tasks_done
> +
> +    def start(self,
> +              reader_coro: Coroutine[Any, Any, None],
> +              writer_coro: Coroutine[Any, Any, None]) -> None:
> +        """
> +        Starts executing tasks in the current async context.
> +
> +        :param reader_coro: Coroutine, message reader task.
> +        :param writer_coro: Coroutine, message writer task.
> +        """
> +        self.reader = create_task(reader_coro)
> +        self.writer = create_task(writer_coro)
> +
> +        # Uses extensible self-iterator.
> +        self._all = asyncio.gather(*iter(self), return_exceptions=True)
> +
> +    async def cancel(self) -> None:
> +        """
> +        Cancels all tasks and awaits full cancellation.
> +
> +        Exceptions, if any, can be obtained by calling `result()`.
> +        """
> +        for task in self:
> +            if task and not task.done():
> +                self.logger.debug("cancelling task %s", str(task))
> +                task.cancel()
> +
> +        if self._all:
> +            self.logger.debug("Awaiting all tasks to finish ...")
> +            await self._all
> +
> +    def _cleanup(self) -> None:
> +        """
> +        Erase all task handles; asserts that no tasks are running.
> +        """
> +        def _paranoid_task_erase(task: _FutureT) -> Optional[_FutureT]:
> +            assert (task is None) or task.done()
> +            return None if (task and task.done()) else task
> +
> +        self.reader = _paranoid_task_erase(self.reader)
> +        self.writer = _paranoid_task_erase(self.writer)
> +        self._all = _paranoid_task_erase(self._all)
> +
> +    def result(self) -> None:
> +        """
> +        Raises exception(s) from the finished tasks, if any.
> +
> +        Called to fully quiesce this task group. asyncio.CancelledError is
> +        never raised; in the event of an intentional cancellation this
> +        function will not raise any errors.
> +
> +        If an exception in one bottom half caused an unscheduled disconnect,
> +        that exception will be raised.
> +
> +        :raise: `Exception`      Arbitrary exceptions re-raised on behalf of
> +                                 the bottom half.
> +        :raise: `MultiException` Iterable Exception used to multiplex multiple
> +                                 exceptions when multiple threads failed.
> +        """
> +        exceptions: List[BaseException] = []
> +        results = self._all.result() if self._all else ()
> +        self._cleanup()
> +
> +        for result in results:
> +            if result is None:
> +                continue
> +            if not isinstance(result, asyncio.CancelledError):
> +                exceptions.append(result)
> +
> +        if len(exceptions) == 1:
> +            raise exceptions.pop()
> +        if len(exceptions) > 1:
> +            raise MultiException(exceptions)
> +
> +
> +class AsyncProtocol(Generic[T]):
> +    """AsyncProtocol implements a generic async message-based protocol.
> +
> +    This protocol assumes the basic unit of information transfer between
> +    client and server is a "message", the details of which are left up
> +    to the implementation. It assumes the sending and receiving of these
> +    messages is full-duplex and not necessarily correlated; i.e. it
> +    supports asynchronous inbound messages.
> +
> +    It is designed to be extended by a specific protocol which provides
> +    the implementations for how to read and send messages. These must be
> +    defined in `_do_recv()` and `_do_send()`, respectively.
> +
> +    Other callbacks that have a default implemention, but may be
> +    extended or overridden:
> +     - _on_connect: Actions performed prior to loop start.
> +     - _on_start:   Actions performed immediately after loop start.
> +     - _on_message: Actions performed when a message is received.
> +                    The default implementation does nothing at all.
> +     - _cb_outbound: Log/Filter outgoing messages.
> +     - _cb_inbound: Log/Filter incoming messages.

This reminds me of asyncio.protocols and twisted.internet.protocol.

> +
> +    :param name: Name used for logging messages, if any.
> +    """
> +    #: Logger object for debugging messages
> +    logger = logging.getLogger(__name__)
> +
> +    # -------------------------
> +    # Section: Public interface
> +    # -------------------------
> +
> +    def __init__(self, name: Optional[str] = None) -> None:
> +        self.name = name
> +        if self.name is not None:
> +            self.logger = self.logger.getChild(self.name)
> +
> +        # stream I/O
> +        self._reader: Optional[StreamReader] = None
> +        self._writer: Optional[StreamWriter] = None
> +
> +        # I/O queues
> +        self._outgoing: asyncio.Queue[T] = asyncio.Queue()
> +
> +        # I/O tasks (message reader, message writer)
> +        self._tasks = AsyncTasks(self.logger)
> +
> +        # Disconnect task; separate from the core loop.
> +        self._dc_task: Optional[asyncio.Future[None]] = None
> +
> +    @property
> +    def running(self) -> bool:
> +        """
> +        Return True when the loop is currently connected and running.
> +        """
> +        if self.disconnecting:
> +            return False
> +        return self._tasks.running
> +
> +    @property
> +    def disconnecting(self) -> bool:
> +        """
> +        Return True when the loop is disconnecting, or disconnected.
> +        """
> +        return bool(self._dc_task)
> +
> +    @property
> +    def unconnected(self) -> bool:
> +        """
> +        Return True when the loop is fully idle and quiesced.
> +
> +        Returns True specifically when the loop is neither `running`
> +        nor `disconnecting`. A call to `disconnect()` is required
> +        to transition from `disconnecting` to `unconnected`.
> +        """
> +        return not (self.running or self.disconnecting)
> +
> +    async def accept(self, address: Union[str, Tuple[str, int]],
> +                     ssl: Optional[SSLContext] = None) -> None:
> +        """
> +        Accept a connection and begin processing message queues.
> +
> +        :param address: Address to connect to;
> +                        UNIX socket path or TCP address/port.
> +        :param ssl:     SSL context to use, if any.
> +
> +        :raise: `StateError`   (loop is running or disconnecting.)
> +        :raise: `ConnectError` (Connection was not successful.)
> +        """
> +        if self.disconnecting:
> +            raise StateError("Client is disconnecting/disconnected."
> +                             " Call disconnect() to fully disconnect.")
> +        if self.running:
> +            raise StateError("Client is already connected and running.")
> +        assert self.unconnected
> +
> +        try:
> +            await self._new_session(self._do_accept(address, ssl))
> +        except Exception as err:
> +            emsg = "Failed to accept incoming connection"
> +            self.logger.error("%s:\n%s\n", emsg, pretty_traceback())
> +            raise ConnectError(f"{emsg}: {err!s}") from err

Wrapping the exception in ConnectError() obfuscates what's going on IMO.

> +
> +    async def connect(self, address: Union[str, Tuple[str, int]],
> +                      ssl: Optional[SSLContext] = None) -> None:
> +        """
> +        Connect to the server and begin processing message queues.
> +
> +        :param address: Address to connect to;
> +                        UNIX socket path or TCP address/port.
> +        :param ssl:     SSL context to use, if any.
> +
> +        :raise: `StateError`   (loop is running or disconnecting.)
> +        :raise: `ConnectError` (Connection was not successful.)
> +        """
> +        if self.disconnecting:
> +            raise StateError("Client is disconnecting/disconnected."
> +                             " Call disconnect() to fully disconnect.")
> +        if self.running:
> +            raise StateError("Client is already connected and running.")
> +        assert self.unconnected
> +
> +        try:
> +            await self._new_session(self._do_connect(address, ssl))
> +        except Exception as err:
> +            emsg = "Failed to connect to server"
> +            self.logger.error("%s:\n%s\n", emsg, pretty_traceback())
> +            raise ConnectError(f"{emsg}: {err!s}") from err
> +
> +    async def disconnect(self) -> None:
> +        """
> +        Disconnect and wait for all tasks to fully stop.
> +
> +        If there were exceptions that caused the bottom half to terminate
> +        prematurely, they will be raised here.
> +
> +        :raise: `Exception`      Arbitrary exceptions re-raised on behalf of
> +                                 the bottom half.
> +        :raise: `MultiException` Iterable Exception used to multiplex multiple
> +                                 exceptions when multiple tasks failed.
> +        """
> +        self._schedule_disconnect()
> +        await self._wait_disconnect()
> +
> +    # -----------------------------
> +    # Section: Connection machinery
> +    # -----------------------------
> +
> +    async def _register_streams(self,
> +                                reader: asyncio.StreamReader,
> +                                writer: asyncio.StreamWriter) -> None:
> +        """Register the Reader/Writer streams."""
> +        self._reader = reader
> +        self._writer = writer
> +
> +    async def _new_session(self, coro: Awaitable[None]) -> None:
> +        """
> +        Create a new session.
> +
> +        This is called for both `accept()` and `connect()` pathways.
> +
> +        :param coro: An awaitable that will perform either connect or accept.
> +        """
> +        assert self._reader is None
> +        assert self._writer is None
> +
> +        # NB: If a previous session had stale messages, they are dropped here.
> +        self._outgoing = asyncio.Queue()
> +
> +        # Connect / Await Connection
> +        await coro
> +        assert self._reader is not None
> +        assert self._writer is not None
> +
> +        await self._on_connect()
> +
> +        reader_coro = self._bh_loop_forever(self._bh_recv_message, 'Reader')
> +        writer_coro = self._bh_loop_forever(self._bh_send_message, 'Writer')
> +        self._tasks.start(reader_coro, writer_coro)
> +
> +        await self._on_start()
> +
> +    async def _do_accept(self, address: Union[str, Tuple[str, int]],
> +                         ssl: Optional[SSLContext] = None) -> None:
> +        """
> +        Acting as the protocol server, accept a single connection.
> +
> +        Used as the awaitable callback to `_new_session()`.
> +        """
> +        self.logger.debug("Awaiting connection ...")
> +        connected = asyncio.Event()
> +        server: Optional[asyncio.AbstractServer] = None
> +
> +        async def _client_connected_cb(reader: asyncio.StreamReader,
> +                                       writer: asyncio.StreamWriter) -> None:
> +            """Used to accept a single incoming connection, see below."""
> +            nonlocal server
> +            nonlocal connected
> +
> +            # A connection has been accepted; stop listening for new ones.
> +            assert server is not None
> +            server.close()
> +            await server.wait_closed()
> +            server = None
> +
> +            # Register this client as being connected
> +            await self._register_streams(reader, writer)
> +
> +            # Signal back: We've accepted a client!
> +            connected.set()
> +
> +        if isinstance(address, tuple):
> +            coro = asyncio.start_server(
> +                _client_connected_cb,
> +                host=address[0],
> +                port=address[1],
> +                ssl=ssl,
> +                backlog=1,
> +            )
> +        else:
> +            coro = asyncio.start_unix_server(
> +                _client_connected_cb,
> +                path=address,
> +                ssl=ssl,
> +                backlog=1,
> +            )
> +
> +        server = await coro     # Starts listening
> +        await connected.wait()  # Waits for the callback to fire (and finish)
> +        assert server is None

Async callbacks defeat the readability advantages of coroutines :(.

asyncio.start_server() is designed for spawning client connections in
separate tasks but we just want to accept a client connection in the
current coroutine. It might be possible to eliminate the callback
business by not using the server and instead doing:

  conn, addr = await loop.sock_accept(listen_sock)

Whether that ends up being simpler, I'm not sure because you may need to
unwrap/wrap the listen_sock and conn sockets into asyncio classes to
interface with the rest of the code.

> +
> +        self.logger.debug("Connection accepted")
> +
> +    async def _do_connect(self, address: Union[str, Tuple[str, int]],
> +                          ssl: Optional[SSLContext] = None) -> None:
> +        self.logger.debug("Connecting ...")
> +
> +        if isinstance(address, tuple):
> +            connect = asyncio.open_connection(address[0], address[1], ssl=ssl)
> +        else:
> +            connect = asyncio.open_unix_connection(path=address, ssl=ssl)
> +        reader, writer = await(connect)
> +        await self._register_streams(reader, writer)
> +
> +        self.logger.debug("Connected")
> +
> +    async def _on_connect(self) -> None:
> +        """
> +        Async callback invoked after connection, but prior to loop start.
> +
> +        This callback is invoked after the stream is opened, but prior to
> +        starting the reader/writer tasks. Use this callback to handle
> +        handshakes, greetings, &c to avoid having special edge cases in the
> +        generic message handler.
> +        """
> +        # Nothing to do in the general case.
> +
> +    async def _on_start(self) -> None:
> +        """
> +        Async callback invoked after connection and loop start.
> +
> +        This callback is invoked after the stream is opened AND after
> +        the reader/writer tasks have been started. Use this callback to
> +        auto-perform certain tasks during the connect() call.
> +        """
> +        # Nothing to do in the general case.
> +
> +    def _schedule_disconnect(self) -> None:
> +        """
> +        Initiate a disconnect; idempotent.
> +
> +        This is called by the reader/writer tasks upon exceptions,
> +        or directly by a user call to `disconnect()`.
> +        """
> +        if not self._dc_task:
> +            self._dc_task = create_task(self._bh_disconnect())
> +
> +    async def _wait_disconnect(self) -> None:
> +        """
> +        _wait_disconnect waits for a scheduled disconnect to finish.
> +
> +        This function will gather any bottom half exceptions and re-raise them;
> +        so it is intended to be used in the upper half call chain.
> +
> +        If a single exception is encountered, it will be re-raised faithfully.
> +        If multiple are found, they will be multiplexed into a MultiException.
> +
> +        :raise: `Exception`      Many kinds; anything the bottom half raises.
> +        :raise: `MultiException` When the Reader/Writer both have exceptions.
> +        """
> +        assert self._dc_task
> +        await self._dc_task
> +        self._dc_task = None
> +
> +        try:
> +            self._tasks.result()
> +        finally:
> +            self._cleanup()
> +
> +    def _cleanup(self) -> None:
> +        """
> +        Fully reset this object to a clean state.
> +        """
> +        assert not self.running
> +        assert self._dc_task is None
> +        # _tasks.result() called in _wait_disconnect does _tasks cleanup, so:
> +        assert not self._tasks
> +
> +        self._reader = None
> +        self._writer = None
> +
> +    # ------------------------------
> +    # Section: Bottom Half functions
> +    # ------------------------------
> +
> +    async def _bh_disconnect(self) -> None:
> +        """
> +        Disconnect and cancel all outstanding tasks.
> +
> +        It is designed to be called from its task context, self._dc_task.
> +        """
> +        # RFC: Maybe I shot myself in the foot by trying too hard to
> +        # group the tasks together as one unit. I suspect the ideal
> +        # cancellation order here is actually: MessageWriter,
> +        # StreamWriter, MessageReader
> +
> +        # What I have here instead is MessageWriter, MessageReader,
> +        # StreamWriter
> +
> +        # Cancel the the message reader/writer.
> +        await self._tasks.cancel()
> +
> +        # Handle the stream writer itself, now.
> +        if self._writer:
> +            if not self._writer.is_closing():
> +                self.logger.debug("Writer is open; draining")
> +                await self._writer.drain()
> +                self.logger.debug("Closing writer")
> +                self._writer.close()
> +            self.logger.debug("Awaiting writer to fully close")
> +            await wait_closed(self._writer)
> +            self.logger.debug("Fully closed.")
> +
> +        # TODO: Add a hook for higher-level protocol cancellations here?
> +        #       (Otherwise, the disconnected logging event happens too soon.)
> +
> +        self.logger.debug("Protocol Disconnected.")
> +
> +    async def _bh_loop_forever(self, async_fn: _TaskFN, name: str) -> None:
> +        """
> +        Run one of the bottom-half functions in a loop forever.
> +
> +        If the bottom half ever raises any exception, schedule a disconnect.
> +        """
> +        try:
> +            while True:
> +                await async_fn()
> +        except asyncio.CancelledError as err:
> +            # We are cancelled (by _bh_disconnect), so no need to call it.
> +            self.logger.debug("Task.%s: cancelled: %s.",
> +                              name, type(err).__name__)
> +            raise
> +        except:
> +            self.logger.error("Task.%s: failure:\n%s\n", name,
> +                              pretty_traceback())
> +            self.logger.debug("Task.%s: scheduling disconnect.", name)
> +            self._schedule_disconnect()
> +            raise
> +        finally:
> +            self.logger.debug("Task.%s: exiting.", name)
> +
> +    async def _bh_send_message(self) -> None:
> +        """
> +        Wait for an outgoing message, then send it.
> +        """
> +        self.logger.log(5, "Waiting for message in outgoing queue to send ...")
> +        msg = await self._outgoing.get()
> +        try:
> +            self.logger.log(5, "Got outgoing message, sending ...")
> +            await self._send(msg)
> +        finally:
> +            self._outgoing.task_done()
> +            self.logger.log(5, "Outgoing message sent.")
> +
> +    async def _bh_recv_message(self) -> None:
> +        """
> +        Wait for an incoming message and call `_on_message` to route it.
> +
> +        Exceptions seen may be from `_recv` or from `_on_message`.
> +        """
> +        self.logger.log(5, "Waiting to receive incoming message ...")
> +        msg = await self._recv()
> +        self.logger.log(5, "Routing message ...")
> +        await self._on_message(msg)
> +        self.logger.log(5, "Message routed.")
> +
> +    # ---------------------
> +    # Section: Datagram I/O
> +    # ---------------------
> +
> +    def _cb_outbound(self, msg: T) -> T:
> +        """
> +        Callback: outbound message hook.
> +
> +        This is intended for subclasses to be able to add arbitrary hooks to
> +        filter or manipulate outgoing messages. The base implementation
> +        does nothing but log the message without any manipulation of the
> +        message. It is designed for you to invoke super() at the tail of
> +        any overridden method.
> +
> +        :param msg: raw outbound message
> +        :return: final outbound message
> +        """
> +        self.logger.debug("--> %s", str(msg))
> +        return msg
> +
> +    def _cb_inbound(self, msg: T) -> T:
> +        """
> +        Callback: inbound message hook.
> +
> +        This is intended for subclasses to be able to add arbitrary hooks to
> +        filter or manipulate incoming messages. The base implementation
> +        does nothing but log the message without any manipulation of the
> +        message. It is designed for you to invoke super() at the head of
> +        any overridden method.
> +
> +        This method does not "handle" incoming messages; it is a filter.
> +        The actual "endpoint" for incoming messages is `_on_message()`.
> +
> +        :param msg: raw inbound message
> +        :return: processed inbound message
> +        """
> +        self.logger.debug("<-- %s", str(msg))
> +        return msg
> +
> +    async def _readline(self) -> bytes:
> +        """
> +        Wait for a newline from the incoming reader.
> +
> +        This method is provided as a convenience for upper-layer
> +        protocols, as many will be line-based.
> +
> +        This function *may* return a sequence of bytes without a
> +        trailing newline if EOF occurs, but *some* bytes were
> +        received. In this case, the next call will raise EOF.
> +
> +        :raise OSError: Stream-related errors.
> +        :raise EOFError: If the reader stream is at EOF and there
> +                         are no bytes to return.
> +        """
> +        assert self._reader is not None
> +        msg_bytes = await self._reader.readline()
> +        self.logger.log(5, "Read %d bytes", len(msg_bytes))
> +
> +        if not msg_bytes:
> +            if self._reader.at_eof():
> +                self.logger.debug("EOF")
> +                raise EOFError()
> +
> +        return msg_bytes
> +
> +    async def _do_recv(self) -> T:
> +        """
> +        Abstract: Read from the stream and return a message.
> +
> +        Very low-level; intended to only be called by `_recv()`.
> +        """
> +        raise NotImplementedError
> +
> +    async def _recv(self) -> T:
> +        """
> +        Read an arbitrary protocol message. (WARNING: Extremely low-level.)
> +
> +        This function is intended primarily for _bh_recv_message to use
> +        in an asynchronous task loop. Using it outside of this loop will
> +        "steal" messages from the normal routing mechanism. It is safe to
> +        use during `_on_connect()`, but should not be used otherwise.
> +
> +        This function uses `_do_recv()` to retrieve the raw message, and
> +        then transforms it using `_cb_inbound()`.
> +
> +        Errors raised may be any of those from either method implementation.
> +
> +        :return: A single (filtered, processed) protocol message.
> +        """
> +        message = await self._do_recv()
> +        return self._cb_inbound(message)
> +
> +    def _do_send(self, msg: T) -> None:
> +        """
> +        Abstract: Write a message to the stream.
> +
> +        Very low-level; intended to only be called by `_send()`.
> +        """
> +        raise NotImplementedError
> +
> +    async def _send(self, msg: T) -> None:
> +        """
> +        Send an arbitrary protocol message. (WARNING: Low-level.)
> +
> +        Like `_read()`, this function is intended to be called by the writer
> +        task loop that processes outgoing messages. This function will
> +        transform any outgoing messages according to `_cb_outbound()`.
> +
> +        :raise: OSError - Various stream errors.
> +        """
> +        assert self._writer is not None
> +        msg = self._cb_outbound(msg)
> +        self._do_send(msg)
> +
> +    async def _on_message(self, msg: T) -> None:
> +        """
> +        Called when a new message is received.
> +
> +        Executed from within the reader loop BH, so be advised that waiting
> +        on other asynchronous tasks may be risky, depending. Additionally,
> +        any errors raised here will directly cause the loop to halt; limit
> +        error checking to what is strictly necessary for message routing.
> +
> +        :param msg: The incoming message, already logged/filtered.
> +        """
> +        # Nothing to do in the abstract case.
> -- 
> 2.30.2
> 

[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 488 bytes --]

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH RFC 4/7] message: add QMP Message type
  2021-04-13 15:55 ` [PATCH RFC 4/7] message: add QMP Message type John Snow
@ 2021-04-13 20:07   ` Stefan Hajnoczi
  2021-04-14 17:39     ` John Snow
  0 siblings, 1 reply; 21+ messages in thread
From: Stefan Hajnoczi @ 2021-04-13 20:07 UTC (permalink / raw)
  To: John Snow; +Cc: armbru, crosa, qemu-devel, ehabkost

[-- Attachment #1: Type: text/plain, Size: 2233 bytes --]

On Tue, Apr 13, 2021 at 11:55:50AM -0400, John Snow wrote:
> This is an abstraction that represents a single message either sent to
> or received from the server. It is used to subclass the
> AsyncProtocol(Generic[T]) type.
> 
> It was written such that it can be populated by either raw data or by a
> dict, with the other form being generated on-demand, as-needed.
> 
> It behaves almost exactly like a dict, but has some extra methods and a
> special constructor. (It should quack fairly convincingly.)
> 
> Signed-off-by: John Snow <jsnow@redhat.com>
> ---
>  message.py | 196 +++++++++++++++++++++++++++++++++++++++++++++++++++++
>  1 file changed, 196 insertions(+)
>  create mode 100644 message.py
> 
> diff --git a/message.py b/message.py
> new file mode 100644
> index 0000000..5c7e828
> --- /dev/null
> +++ b/message.py
> @@ -0,0 +1,196 @@
> +"""
> +QMP Message format and errors.
> +
> +This module provides the `Message` class, which represents a single QMP
> +message sent to or from the server. Several error-classes that depend on
> +knowing the format of this message are also included here.
> +"""
> +
> +import json
> +from json import JSONDecodeError
> +from typing import (
> +    Dict,
> +    ItemsView,
> +    Iterable,
> +    KeysView,
> +    Optional,
> +    Union,
> +    ValuesView,
> +)
> +
> +from error import (
> +    DeserializationError,
> +    ProtocolError,
> +    UnexpectedTypeError,
> +)
> +
> +
> +class Message:
> +    """
> +    Represents a single QMP protocol message.
> +
> +    QMP uses JSON objects as its basic communicative unit; so this
> +    object behaves like a MutableMapping. It may be instantiated from
> +    either another mapping (like a dict), or from raw bytes that still
> +    need to be deserialized.
> +
> +    :param value: Initial value, if any.
> +    :param eager: When true, attempt to serialize (or deserialize) the
> +                  initial value immediately, such that conversion exceptions
> +                  are raised during the call to the initialization method.
> +    """

Why define this class instead of using dicts? It's a very fancy way of
calling json.dumps() and json.loads().

[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 488 bytes --]

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH RFC 6/7] qmp_protocol: add QMP client implementation
  2021-04-13 15:55 ` [PATCH RFC 6/7] qmp_protocol: add QMP client implementation John Snow
@ 2021-04-14  5:44   ` Stefan Hajnoczi
  2021-04-14 17:50     ` John Snow
  0 siblings, 1 reply; 21+ messages in thread
From: Stefan Hajnoczi @ 2021-04-14  5:44 UTC (permalink / raw)
  To: John Snow; +Cc: armbru, crosa, qemu-devel, ehabkost

[-- Attachment #1: Type: text/plain, Size: 1564 bytes --]

On Tue, Apr 13, 2021 at 11:55:52AM -0400, John Snow wrote:
> +    async def _execute(self, msg: Message) -> object:
> +        """
> +        The same as `execute_msg()`, but without safety mechanisms.
> +
> +        Does not assign an execution ID and does not check that the form
> +        of the message being sent is valid.
> +
> +        This method *Requires* an 'id' parameter to be set on the
> +        message, it will not set one for you like `execute()` or
> +        `execute_msg()`.
> +
> +        Do not use "__aqmp#00000" style IDs, use something else to avoid
> +        potential clashes. If this ID clashes with an ID presently
> +        in-use or otherwise clashes with the auto-generated IDs, the
> +        response routing mechanisms in _on_message may very well fail
> +        loudly enough to cause the entire loop to crash.
> +
> +        The ID should be a str; or at least something JSON
> +        serializable. It *must* be hashable.
> +        """
> +        exec_id = cast(str, msg['id'])
> +        self.logger.debug("Execute(%s): '%s'", exec_id,
> +                          msg.get('execute', msg.get('exec-oob')))
> +
> +        queue: asyncio.Queue[Message] = asyncio.Queue(maxsize=1)
> +        task = create_task(self._bh_execute(msg, queue))

We're already in a coroutine, can we await queue.get() ourselves instead
of creating a new task?

I guess this is done in order to use Task.cancel() in _bh_disconnect()
but it seems simpler to use queue both for success and cancellation.
Fewer tasks are easier to reason about.

[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 488 bytes --]

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH RFC 0/7] RFC: Asynchronous QMP Draft
  2021-04-13 15:55 [PATCH RFC 0/7] RFC: Asynchronous QMP Draft John Snow
                   ` (6 preceding siblings ...)
  2021-04-13 15:55 ` [PATCH RFC 7/7] linter config John Snow
@ 2021-04-14  6:38 ` Stefan Hajnoczi
  2021-04-14 19:17   ` John Snow
  7 siblings, 1 reply; 21+ messages in thread
From: Stefan Hajnoczi @ 2021-04-14  6:38 UTC (permalink / raw)
  To: John Snow; +Cc: armbru, crosa, qemu-devel, ehabkost

[-- Attachment #1: Type: text/plain, Size: 13229 bytes --]

Below are the API docs that I found helpful for understanding the big
picture.

The QMP.execute() API is nice.

Regarding QMP events, I can think of two approaches:
1. Callbacks
2. An async get_event(name=Optional[str]) -> object API
   (plus get_event_nowait(name=Optional[str]) -> object)

(There's probably a third approach using async iterators but it's
similar to get_event().)

Both approaches are useful. The first is good in larger asynchronous
applications that perform many tasks concurrently. The second is good
when there is just one specific thing to do, like waiting for a block
job to complete.

My general impression is that the public API is nice and usable but the
implementation is complex and risks discouraging other people from
hacking on the code. There are too many abstractions and while it's
highly structured, there is a cost to having all this infrastructure. I
think simplifying it would make it easier for others to understand and
contribute to the code.

Ideas: open code or inline simple things instead of defining
abstractions that only have 1 user, drop the pydantic models, drop
classes that just wrap things like Message and the exception hierarchy,
combine protocol and qmp_protocol.

Things that might be worth adding:
1. File descriptor passing support.
2. Introspection support to easily check if a command/feature is
   available. Users can do this manually by sending QMP commands and
   interpreting the response, but this may be common enough to warrant a
   friendly API.

Help on module qmp.qmp_protocol in qmp:

NAME
    qmp.qmp_protocol - QMP Client Implementation

DESCRIPTION
    This module provides the QMP class, which can be used to connect and
    send commands to a QMP server such as QEMU. The QMP class can be used to
    either connect to a listening server, or used to listen and accept an
    incoming connection from the server.

CLASSES
    qmp.error.AQMPError(builtins.Exception)
        ExecuteError
    qmp.protocol.AsyncProtocol(typing.Generic)
        QMP
    
    class ExecuteError(qmp.error.AQMPError)
     |  ExecuteError(sent: qmp.message.Message, received: qmp.message.Message, error: qmp.models.ErrorInfo)
     |  
     |  Execution statement returned failure.
     |  
     |  Method resolution order:
     |      ExecuteError
     |      qmp.error.AQMPError
     |      builtins.Exception
     |      builtins.BaseException
     |      builtins.object
     |  
     |  Methods defined here:
     |  
     |  __init__(self, sent: qmp.message.Message, received: qmp.message.Message, error: qmp.models.ErrorInfo)
     |      Initialize self.  See help(type(self)) for accurate signature.
     |  
     |  __str__(self) -> str
     |      Return str(self).
     |  
     |  ----------------------------------------------------------------------
     |  Data descriptors inherited from qmp.error.AQMPError:
     |  
     |  __weakref__
     |      list of weak references to the object (if defined)
     |  
     |  ----------------------------------------------------------------------
     |  Static methods inherited from builtins.Exception:
     |  
     |  __new__(*args, **kwargs) from builtins.type
     |      Create and return a new object.  See help(type) for accurate signature.
     |  
     |  ----------------------------------------------------------------------
     |  Methods inherited from builtins.BaseException:
     |  
     |  __delattr__(self, name, /)
     |      Implement delattr(self, name).
     |  
     |  __getattribute__(self, name, /)
     |      Return getattr(self, name).
     |  
     |  __reduce__(...)
     |      Helper for pickle.
     |  
     |  __repr__(self, /)
     |      Return repr(self).
     |  
     |  __setattr__(self, name, value, /)
     |      Implement setattr(self, name, value).
     |  
     |  __setstate__(...)
     |  
     |  with_traceback(...)
     |      Exception.with_traceback(tb) --
     |      set self.__traceback__ to tb and return self.
     |  
     |  ----------------------------------------------------------------------
     |  Data descriptors inherited from builtins.BaseException:
     |  
     |  __cause__
     |      exception cause
     |  
     |  __context__
     |      exception context
     |  
     |  __dict__
     |  
     |  __suppress_context__
     |  
     |  __traceback__
     |  
     |  args
    
    class QMP(qmp.protocol.AsyncProtocol)
     |  QMP(name: Optional[str] = None) -> None
     |  
     |  Implements a QMP connection to/from the server.
     |  
     |  Basic usage looks like this::
     |  
     |    qmp = QMP('my_virtual_machine_name')
     |    await qmp.connect(('127.0.0.1', 1234))
     |    ...
     |    res = await qmp.execute('block-query')
     |    ...
     |    await qmp.disconnect()
     |  
     |  :param name: Optional nickname for the connection, used for logging.
     |  
     |  Method resolution order:
     |      QMP
     |      qmp.protocol.AsyncProtocol
     |      typing.Generic
     |      builtins.object
     |  
     |  Methods defined here:
     |  
     |  __init__(self, name: Optional[str] = None) -> None
     |      Initialize self.  See help(type(self)) for accurate signature.
     |  
     |  async execute(self, cmd: str, arguments: Optional[Mapping[str, object]] = None, oob: bool = False) -> object
     |      Execute a QMP command and return the response.
     |      
     |      :param cmd: QMP command name.
     |      :param arguments: Arguments (if any). Must be JSON-serializable.
     |      :param oob: If true, execute "out of band".
     |      
     |      :raise: ExecuteError if the server returns an error response.
     |      :raise: DisconnectedError if the connection was terminated early.
     |      
     |      :return: Execution response from the server. The type of object depends
     |               on the command that was issued, though most return a dict.
     |  
     |  async execute_msg(self, msg: qmp.message.Message) -> object
     |      Execute a QMP message and return the response.
     |      
     |      :param msg: The QMP `Message` to execute.
     |      :raises: ValueError if the QMP `Message` does not have either the
     |               'execute' or 'exec-oob' fields set.
     |      :raises: ExecuteError if the server returns an error response.
     |      :raises: DisconnectedError if the connection was terminated early.
     |      
     |      :return: Execution response from the server. The type of object depends
     |               on the command that was issued, though most return a dict.
     |  
     |  on_event(self, func: Callable[[ForwardRef('QMP'), qmp.message.Message], Awaitable[NoneType]]) -> Callable[[ForwardRef('QMP'), qmp.message.Message], Awaitable[NoneType]]
     |      FIXME: Quick hack: decorator to register event handlers.
     |      
     |      Use it like this::
     |      
     |        @qmp.on_event
     |        async def my_event_handler(qmp, event: Message) -> None:
     |          print(f"Received event: {event['event']}")
     |      
     |      RFC: What kind of event handler would be the most useful in
     |      practical terms? In tests, we are usually waiting for an
     |      event with some criteria to occur; maybe it would be useful
     |      to allow "coroutine" style functions where we can block
     |      until a certain event shows up?
     |  
     |  ----------------------------------------------------------------------
     |  Class methods defined here:
     |  
     |  make_execute_msg(cmd: str, arguments: Optional[Mapping[str, object]] = None, oob: bool = False) -> qmp.message.Message from builtins.type
     |      Create an executable message to be sent by `execute_msg` later.
     |      
     |      :param cmd: QMP command name.
     |      :param arguments: Arguments (if any). Must be JSON-serializable.
     |      :param oob: If true, execute "out of band".
     |      
     |      :return: An executable QMP message.
     |  
     |  ----------------------------------------------------------------------
     |  Data and other attributes defined here:
     |  
     |  __orig_bases__ = (qmp.protocol.AsyncProtocol[qmp.message.Message],)
     |  
     |  __parameters__ = ()
     |  
     |  logger = <Logger qmp.qmp_protocol (WARNING)>
     |  
     |  ----------------------------------------------------------------------
     |  Methods inherited from qmp.protocol.AsyncProtocol:
     |  
     |  async accept(self, address: Union[str, Tuple[str, int]], ssl: Optional[ssl.SSLContext] = None) -> None
     |      Accept a connection and begin processing message queues.
     |      
     |      :param address: Address to connect to;
     |                      UNIX socket path or TCP address/port.
     |      :param ssl:     SSL context to use, if any.
     |      
     |      :raise: `StateError`   (loop is running or disconnecting.)
     |      :raise: `ConnectError` (Connection was not successful.)
     |  
     |  async connect(self, address: Union[str, Tuple[str, int]], ssl: Optional[ssl.SSLContext] = None) -> None
     |      Connect to the server and begin processing message queues.
     |      
     |      :param address: Address to connect to;
     |                      UNIX socket path or TCP address/port.
     |      :param ssl:     SSL context to use, if any.
     |      
     |      :raise: `StateError`   (loop is running or disconnecting.)
     |      :raise: `ConnectError` (Connection was not successful.)
     |  
     |  async disconnect(self) -> None
     |      Disconnect and wait for all tasks to fully stop.
     |      
     |      If there were exceptions that caused the bottom half to terminate
     |      prematurely, they will be raised here.
     |      
     |      :raise: `Exception`      Arbitrary exceptions re-raised on behalf of
     |                               the bottom half.
     |      :raise: `MultiException` Iterable Exception used to multiplex multiple
     |                               exceptions when multiple tasks failed.
     |  
     |  ----------------------------------------------------------------------
     |  Readonly properties inherited from qmp.protocol.AsyncProtocol:
     |  
     |  disconnecting
     |      Return True when the loop is disconnecting, or disconnected.
     |  
     |  running
     |      Return True when the loop is currently connected and running.
     |  
     |  unconnected
     |      Return True when the loop is fully idle and quiesced.
     |      
     |      Returns True specifically when the loop is neither `running`
     |      nor `disconnecting`. A call to `disconnect()` is required
     |      to transition from `disconnecting` to `unconnected`.
     |  
     |  ----------------------------------------------------------------------
     |  Data descriptors inherited from qmp.protocol.AsyncProtocol:
     |  
     |  __dict__
     |      dictionary for instance variables (if defined)
     |  
     |  __weakref__
     |      list of weak references to the object (if defined)
     |  
     |  ----------------------------------------------------------------------
     |  Class methods inherited from typing.Generic:
     |  
     |  __class_getitem__(params) from builtins.type
     |  
     |  __init_subclass__(*args, **kwargs) from builtins.type
     |      This method is called when a class is subclassed.
     |      
     |      The default implementation does nothing. It may be
     |      overridden to extend subclasses.

DATA
    Awaitable = typing.Awaitable
        A generic version of collections.abc.Awaitable.
    
    Callable = typing.Callable
        Callable type; Callable[[int], str] is a function of (int) -> str.
        
        The subscription syntax must always be used with exactly two
        values: the argument list and the return type.  The argument list
        must be a list of types or ellipsis; the return type must be a single type.
        
        There is no syntax to indicate optional or keyword arguments,
        such function types are rarely used as callback types.
    
    Dict = typing.Dict
        A generic version of dict.
    
    List = typing.List
        A generic version of list.
    
    Mapping = typing.Mapping
        A generic version of collections.abc.Mapping.
    
    Optional = typing.Optional
        Optional type.
        
        Optional[X] is equivalent to Union[X, None].
    
    Tuple = typing.Tuple
        Tuple type; Tuple[X, Y] is the cross-product type of X and Y.
        
        Example: Tuple[T1, T2] is a tuple of two elements corresponding
        to type variables T1 and T2.  Tuple[int, float, str] is a tuple
        of an int, a float and a string.
        
        To specify a variable-length tuple of homogeneous type, use Tuple[T, ...].

FILE
    /tmp/foo/qmp/qmp_protocol.py



[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 488 bytes --]

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH RFC 3/7] protocol: generic async message-based protocol loop
  2021-04-13 20:00   ` Stefan Hajnoczi
@ 2021-04-14 17:29     ` John Snow
  2021-04-15  9:14       ` Stefan Hajnoczi
  0 siblings, 1 reply; 21+ messages in thread
From: John Snow @ 2021-04-14 17:29 UTC (permalink / raw)
  To: Stefan Hajnoczi; +Cc: armbru, crosa, qemu-devel, ehabkost

On 4/13/21 4:00 PM, Stefan Hajnoczi wrote:
> On Tue, Apr 13, 2021 at 11:55:49AM -0400, John Snow wrote:
>> This module provides the protocol-agnostic framework upon which QMP will
>> be built. I also have (not included in this series) a qtest
>> implementation that uses this same framework, which is why it is split
>> into two portions like this.
>>
>> The design uses two independent tasks in the "bottol half", a writer and
>> a reader. These tasks run for the duration of the connection and
>> independently send and receive messages, respectively.
>>
>> A third task, disconnect, is scheduled whenever an error occurs and
>> facilitates coalescing of the other two tasks. MultiException is used in
>> this case if *both* tasks should have Exceptions that need to be
>> reported, though at the time of writing, I think this circumstance might
>> only be a theoretical concern.
>>
>> The generic model here does not provide execute(), but the model for QMP
>> is informative for how this class is laid out. Below, QMP's execute()
>> function deposits a message into the outbound queue. The writer task
>> wakes up to process the queue and deposits information in the write
>> buffer, where the message is finally dispatched. Meanwhile, the
>> execute() call is expected to block on an RPC mailbox waiting for a
>> reply from the server.
>>
>> On the return trip, the reader wakes up when data arrives in the
>> buffer. The message is deserialized and handed off to the protocol layer
>> to route accordingly. QMP will route this message into either the Event
>> queue or one of the pending RPC mailboxes.
>>
>> Upon this message being routed to the correct RPC mailbox, execute()
>> will be woken up and allowed to process the reply and deliver it back to
>> the caller.
>>
>> The reason for separating the inbound and outbound tasks to such an
>> extreme degree is to allow for designs and extensions where this
>> asynchronous loop may be launched in a separate thread. In this model,
>> it is possible to use a synchronous, thread-safe function to deposit new
>> messages into the outbound queue; this was seen as a viable way to offer
>> solid synchronous bindings while still allowing events to be processed
>> truly asynchronously.
>>
>> Separating it this way also allows us to fairly easily support
>> Out-of-band executions with little additional effort; essentially all
>> commands are treated as out-of-band.
>>
>> The execute graph:
>>
>>                         +---------+
>>                         | caller  |
>>                         +---------+
>>                              |
>>                              v
>>                         +---------+
>>       +---------------- |execute()| <----------+
>>       |                 +---------+            |
>>       |                                        |
>> -----------------------------------------------------------
>>       v                                        |
>> +----+----+    +-----------+           +------+-------+
>> |Mailboxes|    |Event Queue|           |Outbound Queue|
>> +----+----+    +------+----+           +------+-------+
>>       |                |                       ^
>>       v                v                       |
>>    +--+----------------+---+       +-----------+-----------+
>>    | Reader Task/Coroutine |       | Writer Task/Coroutine |
>>    +-----------+-----------+       +-----------+-----------+
>>                |                               ^
>>                v                               |
>>          +-----+------+                  +-----+------+
>>          |StreamReader|                  |StreamWriter|
>>          +------------+                  +------------+
> 
> The arrow directions confuse me. I don't understand what they convey.
> 

I meant to say "blocks on" or "awaits". The StreamWriter waits on the 
Writer task, the Writer task waits on the outbound queue. The outbound 
queue waits (ultimately) on execute() depositing something into the 
queue, and so on.

>>
>> Signed-off-by: John Snow <jsnow@redhat.com>
>> ---
>>   protocol.py | 704 ++++++++++++++++++++++++++++++++++++++++++++++++++++
> 
> Yikes, this is complex. I'm not sure the abstractions are worth the
> cost. Hopefully everything will be tied up with a simple high-level API
> later in the series.
> 

Ah, don't despair!

It's a lot of docstrings and a lot of tiny little methods and 
boilerplate. I thought it helped keep the resulting QMP-specific bits 
looking much simpler and easy to digest.

One of the reasons it's split out here like this is because I also wrote 
a qtest protocol that uses the same infrastructure. I tried to keep both 
of those looking as simple as possible.

I thought it was difficult to get the underlying machinery operating 
smoothly, and I didn't like the idea of repeating so much code to 
implement two things. So, this was my attempt to share common code as 
best as I could manage it.

>>   1 file changed, 704 insertions(+)
>>   create mode 100644 protocol.py
>>
>> diff --git a/protocol.py b/protocol.py
>> new file mode 100644
>> index 0000000..27d1558
>> --- /dev/null
>> +++ b/protocol.py
>> @@ -0,0 +1,704 @@
>> +"""
>> +Async message-based protocol support.
>> +
>> +This module provides a generic framework for sending and receiving
>> +messages over an asyncio stream.
>> +
>> +`AsyncProtocol` is an abstract class that implements the core mechanisms
>> +of a simple send/receive protocol, and is designed to be extended.
>> +
>> +`AsyncTasks` provides a container class that aggregates tasks that make
>> +up the loop used by `AsyncProtocol`.
>> +"""
>> +
>> +import asyncio
>> +from asyncio import StreamReader, StreamWriter
>> +import logging
>> +from ssl import SSLContext
>> +from typing import (
>> +    Any,
>> +    Awaitable,
>> +    Callable,
>> +    Coroutine,
>> +    Iterator,
>> +    List,
>> +    Generic,
>> +    Optional,
>> +    Tuple,
>> +    TypeVar,
>> +    Union,
>> +)
>> +
>> +from error import (
>> +    ConnectError,
>> +    MultiException,
>> +    StateError,
>> +)
>> +from util import create_task, pretty_traceback, wait_closed
>> +
>> +
>> +T = TypeVar('T')
>> +_TaskFN = Callable[[], Awaitable[None]]  # aka ``async def func() -> None``
>> +_FutureT = TypeVar('_FutureT', bound=Optional['asyncio.Future[Any]'])
>> +_GatherRet = List[Optional[BaseException]]
>> +
>> +
>> +class AsyncTasks:
>> +    """
>> +    AsyncTasks is a collection of bottom half tasks designed to run forever.
>> +
>> +    This is a convenience wrapper to make calls from `AsyncProtocol` simpler to
>> +    follow by behaving as a simple aggregate of two or more tasks, such that
>> +    a higher-level connection manager can simply refer to "the bottom half"
>> +    as one coherent entity instead of several.
>> +
>> +    The general flow is:
>> +
>> +    1. ``tasks = AsyncTasks(logger_for_my_client)``
>> +    2. ``tasks.start(my_reader, my_writer)``
>> +    3. ``...``
>> +    4. ``await tasks.cancel()``
>> +    5. ``tasks.result()``
>> +
>> +    :param logger: A logger to use for debugging messages. Useful to
>> +                   associate messages with a particular server context.
>> +    """
>> +
>> +    logger = logging.getLogger(__name__)
>> +
>> +    def __init__(self, logger: Optional[logging.Logger] = None):
>> +        if logger is not None:
>> +            self.logger = logger
>> +
>> +        # Named tasks
>> +        self.reader: Optional['asyncio.Future[None]'] = None
>> +        self.writer: Optional['asyncio.Future[None]'] = None
>> +
>> +        # Internal aggregate of all of the above tasks.
>> +        self._all: Optional['asyncio.Future[_GatherRet]'] = None
>> +
>> +    def _all_tasks(self) -> Iterator[Optional['asyncio.Future[None]']]:
>> +        """Yields all tasks, defined or not, in ideal cancellation order."""
>> +        yield self.writer
>> +        yield self.reader
>> +
>> +    def __iter__(self) -> Iterator['asyncio.Future[None]']:
>> +        """Yields all defined tasks, in ideal cancellation order."""
>> +        for task in self._all_tasks():
>> +            if task is not None:
>> +                yield task
>> +
>> +    @property
>> +    def _all_tasks_defined(self) -> bool:
>> +        """Returns True if all tasks are defined."""
>> +        return all(map(lambda task: task is not None, self._all_tasks()))
>> +
>> +    @property
>> +    def _some_tasks_done(self) -> bool:
>> +        """Returns True if any defined tasks are done executing."""
>> +        return any(map(lambda task: task.done(), iter(self)))
>> +
>> +    def __bool__(self) -> bool:
>> +        """Returns True when any tasks are defined at all."""
>> +        return bool(tuple(iter(self)))
>> +
>> +    @property
>> +    def running(self) -> bool:
>> +        """Returns True if all tasks are defined and still running."""
>> +        return self._all_tasks_defined and not self._some_tasks_done
>> +
>> +    def start(self,
>> +              reader_coro: Coroutine[Any, Any, None],
>> +              writer_coro: Coroutine[Any, Any, None]) -> None:
>> +        """
>> +        Starts executing tasks in the current async context.
>> +
>> +        :param reader_coro: Coroutine, message reader task.
>> +        :param writer_coro: Coroutine, message writer task.
>> +        """
>> +        self.reader = create_task(reader_coro)
>> +        self.writer = create_task(writer_coro)
>> +
>> +        # Uses extensible self-iterator.
>> +        self._all = asyncio.gather(*iter(self), return_exceptions=True)
>> +
>> +    async def cancel(self) -> None:
>> +        """
>> +        Cancels all tasks and awaits full cancellation.
>> +
>> +        Exceptions, if any, can be obtained by calling `result()`.
>> +        """
>> +        for task in self:
>> +            if task and not task.done():
>> +                self.logger.debug("cancelling task %s", str(task))
>> +                task.cancel()
>> +
>> +        if self._all:
>> +            self.logger.debug("Awaiting all tasks to finish ...")
>> +            await self._all
>> +
>> +    def _cleanup(self) -> None:
>> +        """
>> +        Erase all task handles; asserts that no tasks are running.
>> +        """
>> +        def _paranoid_task_erase(task: _FutureT) -> Optional[_FutureT]:
>> +            assert (task is None) or task.done()
>> +            return None if (task and task.done()) else task
>> +
>> +        self.reader = _paranoid_task_erase(self.reader)
>> +        self.writer = _paranoid_task_erase(self.writer)
>> +        self._all = _paranoid_task_erase(self._all)
>> +
>> +    def result(self) -> None:
>> +        """
>> +        Raises exception(s) from the finished tasks, if any.
>> +
>> +        Called to fully quiesce this task group. asyncio.CancelledError is
>> +        never raised; in the event of an intentional cancellation this
>> +        function will not raise any errors.
>> +
>> +        If an exception in one bottom half caused an unscheduled disconnect,
>> +        that exception will be raised.
>> +
>> +        :raise: `Exception`      Arbitrary exceptions re-raised on behalf of
>> +                                 the bottom half.
>> +        :raise: `MultiException` Iterable Exception used to multiplex multiple
>> +                                 exceptions when multiple threads failed.
>> +        """
>> +        exceptions: List[BaseException] = []
>> +        results = self._all.result() if self._all else ()
>> +        self._cleanup()
>> +
>> +        for result in results:
>> +            if result is None:
>> +                continue
>> +            if not isinstance(result, asyncio.CancelledError):
>> +                exceptions.append(result)
>> +
>> +        if len(exceptions) == 1:
>> +            raise exceptions.pop()
>> +        if len(exceptions) > 1:
>> +            raise MultiException(exceptions)
>> +
>> +
>> +class AsyncProtocol(Generic[T]):
>> +    """AsyncProtocol implements a generic async message-based protocol.
>> +
>> +    This protocol assumes the basic unit of information transfer between
>> +    client and server is a "message", the details of which are left up
>> +    to the implementation. It assumes the sending and receiving of these
>> +    messages is full-duplex and not necessarily correlated; i.e. it
>> +    supports asynchronous inbound messages.
>> +
>> +    It is designed to be extended by a specific protocol which provides
>> +    the implementations for how to read and send messages. These must be
>> +    defined in `_do_recv()` and `_do_send()`, respectively.
>> +
>> +    Other callbacks that have a default implemention, but may be
>> +    extended or overridden:
>> +     - _on_connect: Actions performed prior to loop start.
>> +     - _on_start:   Actions performed immediately after loop start.
>> +     - _on_message: Actions performed when a message is received.
>> +                    The default implementation does nothing at all.
>> +     - _cb_outbound: Log/Filter outgoing messages.
>> +     - _cb_inbound: Log/Filter incoming messages.

(FWIW: I intended these callback hooks to be used by iotests 
specifically, for logging and filtering test messages. It solves a 
problem I ran into when creating a general-purpose Job-running framework 
where a different "context" was "consuming" QMP messages, but they still 
needed to be logged and filtered in a centralized manner.)

> 
> This reminds me of asyncio.protocols and twisted.internet.protocol.
> 

I tried looking at asyncio.protocols, but it seemed like it was meant 
for something quite a bit lower level. Maybe I misunderstood those docs? 
(As in, it seemed like it was meant for a protocol in the sense of "TCP" 
or "UDP" and not an OSI layers 5-7 kind of thing.)

I think I'd like to avoid twisted if I can, but maybe that's misguided. 
I had an impression that it was primarily meant for Python2-based code, 
and I wanted to try a "native" implementation.

I realize that does mean I reinvent the wheel somewhat, which adds its 
own complexities in testing and maintainability. I must admit that the 
asyncio stuff is very new and has some fairly rough edges in places, 
especially in 3.6. I am trying to protect client code from it as much as 
I can.

(Admittedly, the discord.py infrastructure is a major inspiration to me 
here. It is very complex in its underbelly, but I have seen it be 
successfully used by very amateur coders to do some very interesting 
things, so I hold it in high regard.)

>> +
>> +    :param name: Name used for logging messages, if any.
>> +    """
>> +    #: Logger object for debugging messages
>> +    logger = logging.getLogger(__name__)
>> +
>> +    # -------------------------
>> +    # Section: Public interface
>> +    # -------------------------
>> +
>> +    def __init__(self, name: Optional[str] = None) -> None:
>> +        self.name = name
>> +        if self.name is not None:
>> +            self.logger = self.logger.getChild(self.name)
>> +
>> +        # stream I/O
>> +        self._reader: Optional[StreamReader] = None
>> +        self._writer: Optional[StreamWriter] = None
>> +
>> +        # I/O queues
>> +        self._outgoing: asyncio.Queue[T] = asyncio.Queue()
>> +
>> +        # I/O tasks (message reader, message writer)
>> +        self._tasks = AsyncTasks(self.logger)
>> +
>> +        # Disconnect task; separate from the core loop.
>> +        self._dc_task: Optional[asyncio.Future[None]] = None
>> +
>> +    @property
>> +    def running(self) -> bool:
>> +        """
>> +        Return True when the loop is currently connected and running.
>> +        """
>> +        if self.disconnecting:
>> +            return False
>> +        return self._tasks.running
>> +
>> +    @property
>> +    def disconnecting(self) -> bool:
>> +        """
>> +        Return True when the loop is disconnecting, or disconnected.
>> +        """
>> +        return bool(self._dc_task)
>> +
>> +    @property
>> +    def unconnected(self) -> bool:
>> +        """
>> +        Return True when the loop is fully idle and quiesced.
>> +
>> +        Returns True specifically when the loop is neither `running`
>> +        nor `disconnecting`. A call to `disconnect()` is required
>> +        to transition from `disconnecting` to `unconnected`.
>> +        """
>> +        return not (self.running or self.disconnecting)
>> +
>> +    async def accept(self, address: Union[str, Tuple[str, int]],
>> +                     ssl: Optional[SSLContext] = None) -> None:
>> +        """
>> +        Accept a connection and begin processing message queues.
>> +
>> +        :param address: Address to connect to;
>> +                        UNIX socket path or TCP address/port.
>> +        :param ssl:     SSL context to use, if any.
>> +
>> +        :raise: `StateError`   (loop is running or disconnecting.)
>> +        :raise: `ConnectError` (Connection was not successful.)
>> +        """
>> +        if self.disconnecting:
>> +            raise StateError("Client is disconnecting/disconnected."
>> +                             " Call disconnect() to fully disconnect.")
>> +        if self.running:
>> +            raise StateError("Client is already connected and running.")
>> +        assert self.unconnected
>> +
>> +        try:
>> +            await self._new_session(self._do_accept(address, ssl))
>> +        except Exception as err:
>> +            emsg = "Failed to accept incoming connection"
>> +            self.logger.error("%s:\n%s\n", emsg, pretty_traceback())
>> +            raise ConnectError(f"{emsg}: {err!s}") from err
> 
> Wrapping the exception in ConnectError() obfuscates what's going on IMO.
> 

It can. I will admit to you that the reason I did it was so that at a 
higher level it was possible to write things like:

try:
     await qmp.connect(('127.0.0.1', 1234))
except ConnectError:
     print("Oh no! ...")
     handle_connect_problem()

while still allowing for other problems to "bubble up", for instance, 
keyboard interrupts are BaseException and won't be caught by that wrapper.

I adhere to this pattern fairly regularly in this draft; using 
"container" exceptions to declare a semantic problem regardless of the 
actual underlying cause, assuming that a high-level user will (probably) 
be unable to make sense of the internal details anyway.

Part of the reason I do this is so that I could write in the docstrings 
what exceptions we actually expect to be raised and under what 
circumstances.

I suppose what I don't document, though, is how to get at the "root" 
exception in these cases. You can do it:

try:
     ...
except Foo as err:
     root_err = err.__cause__
     ...


I suppose the specific problem I wanted to solve is this: When we fail 
to connect, what exception will we see? How many types of exceptions? 
Which ones should I try to catch, and which ones should I not? Which 
error classes merit a retry, and which do not?

It seemed almost impossible to enumerate, so writing informed client 
code seemed difficult or impossible.

Any thoughts on this perception?

>> +
>> +    async def connect(self, address: Union[str, Tuple[str, int]],
>> +                      ssl: Optional[SSLContext] = None) -> None:
>> +        """
>> +        Connect to the server and begin processing message queues.
>> +
>> +        :param address: Address to connect to;
>> +                        UNIX socket path or TCP address/port.
>> +        :param ssl:     SSL context to use, if any.
>> +
>> +        :raise: `StateError`   (loop is running or disconnecting.)
>> +        :raise: `ConnectError` (Connection was not successful.)
>> +        """
>> +        if self.disconnecting:
>> +            raise StateError("Client is disconnecting/disconnected."
>> +                             " Call disconnect() to fully disconnect.")
>> +        if self.running:
>> +            raise StateError("Client is already connected and running.")
>> +        assert self.unconnected
>> +
>> +        try:
>> +            await self._new_session(self._do_connect(address, ssl))
>> +        except Exception as err:
>> +            emsg = "Failed to connect to server"
>> +            self.logger.error("%s:\n%s\n", emsg, pretty_traceback())
>> +            raise ConnectError(f"{emsg}: {err!s}") from err
>> +
>> +    async def disconnect(self) -> None:
>> +        """
>> +        Disconnect and wait for all tasks to fully stop.
>> +
>> +        If there were exceptions that caused the bottom half to terminate
>> +        prematurely, they will be raised here.
>> +
>> +        :raise: `Exception`      Arbitrary exceptions re-raised on behalf of
>> +                                 the bottom half.
>> +        :raise: `MultiException` Iterable Exception used to multiplex multiple
>> +                                 exceptions when multiple tasks failed.
>> +        """
>> +        self._schedule_disconnect()
>> +        await self._wait_disconnect()
>> +
>> +    # -----------------------------
>> +    # Section: Connection machinery
>> +    # -----------------------------
>> +
>> +    async def _register_streams(self,
>> +                                reader: asyncio.StreamReader,
>> +                                writer: asyncio.StreamWriter) -> None:
>> +        """Register the Reader/Writer streams."""
>> +        self._reader = reader
>> +        self._writer = writer
>> +
>> +    async def _new_session(self, coro: Awaitable[None]) -> None:
>> +        """
>> +        Create a new session.
>> +
>> +        This is called for both `accept()` and `connect()` pathways.
>> +
>> +        :param coro: An awaitable that will perform either connect or accept.
>> +        """
>> +        assert self._reader is None
>> +        assert self._writer is None
>> +
>> +        # NB: If a previous session had stale messages, they are dropped here.
>> +        self._outgoing = asyncio.Queue()
>> +
>> +        # Connect / Await Connection
>> +        await coro
>> +        assert self._reader is not None
>> +        assert self._writer is not None
>> +
>> +        await self._on_connect()
>> +
>> +        reader_coro = self._bh_loop_forever(self._bh_recv_message, 'Reader')
>> +        writer_coro = self._bh_loop_forever(self._bh_send_message, 'Writer')
>> +        self._tasks.start(reader_coro, writer_coro)
>> +
>> +        await self._on_start()
>> +
>> +    async def _do_accept(self, address: Union[str, Tuple[str, int]],
>> +                         ssl: Optional[SSLContext] = None) -> None:
>> +        """
>> +        Acting as the protocol server, accept a single connection.
>> +
>> +        Used as the awaitable callback to `_new_session()`.
>> +        """
>> +        self.logger.debug("Awaiting connection ...")
>> +        connected = asyncio.Event()
>> +        server: Optional[asyncio.AbstractServer] = None
>> +
>> +        async def _client_connected_cb(reader: asyncio.StreamReader,
>> +                                       writer: asyncio.StreamWriter) -> None:
>> +            """Used to accept a single incoming connection, see below."""
>> +            nonlocal server
>> +            nonlocal connected
>> +
>> +            # A connection has been accepted; stop listening for new ones.
>> +            assert server is not None
>> +            server.close()
>> +            await server.wait_closed()
>> +            server = None
>> +
>> +            # Register this client as being connected
>> +            await self._register_streams(reader, writer)
>> +
>> +            # Signal back: We've accepted a client!
>> +            connected.set()
>> +
>> +        if isinstance(address, tuple):
>> +            coro = asyncio.start_server(
>> +                _client_connected_cb,
>> +                host=address[0],
>> +                port=address[1],
>> +                ssl=ssl,
>> +                backlog=1,
>> +            )
>> +        else:
>> +            coro = asyncio.start_unix_server(
>> +                _client_connected_cb,
>> +                path=address,
>> +                ssl=ssl,
>> +                backlog=1,
>> +            )
>> +
>> +        server = await coro     # Starts listening
>> +        await connected.wait()  # Waits for the callback to fire (and finish)
>> +        assert server is None
> 
> Async callbacks defeat the readability advantages of coroutines :(.
> 

Yes. That's the interface I was given, though, ... :(

> asyncio.start_server() is designed for spawning client connections in
> separate tasks but we just want to accept a client connection in the
> current coroutine. It might be possible to eliminate the callback
> business by not using the server and instead doing:
> 
>    conn, addr = await loop.sock_accept(listen_sock)
> 
> Whether that ends up being simpler, I'm not sure because you may need to
> unwrap/wrap the listen_sock and conn sockets into asyncio classes to
> interface with the rest of the code.
> 

Yeah. I am definitely using machinery here that was not designed to be 
used in this way. I fully admit that the accept() call here is a hack. 
As you say, the server is designed to accept clients and then the 
callback will be used to construct instances and register instances to 
some other machinery.

Instead, I have tried to flatten it in-line so that we can keep the 
upper-interface simple, advertising only accept() and connect().

I can look into finding a way to use lower-level primitives and 
stitching it back into the asyncio high-level APIs, but I think it's a 
fairly low priority for me.

I had earlier drafts that were far, far uglier :)

>> +
>> +        self.logger.debug("Connection accepted")
>> +
>> +    async def _do_connect(self, address: Union[str, Tuple[str, int]],
>> +                          ssl: Optional[SSLContext] = None) -> None:
>> +        self.logger.debug("Connecting ...")
>> +
>> +        if isinstance(address, tuple):
>> +            connect = asyncio.open_connection(address[0], address[1], ssl=ssl)
>> +        else:
>> +            connect = asyncio.open_unix_connection(path=address, ssl=ssl)
>> +        reader, writer = await(connect)
>> +        await self._register_streams(reader, writer)
>> +
>> +        self.logger.debug("Connected")
>> +
>> +    async def _on_connect(self) -> None:
>> +        """
>> +        Async callback invoked after connection, but prior to loop start.
>> +
>> +        This callback is invoked after the stream is opened, but prior to
>> +        starting the reader/writer tasks. Use this callback to handle
>> +        handshakes, greetings, &c to avoid having special edge cases in the
>> +        generic message handler.
>> +        """
>> +        # Nothing to do in the general case.
>> +
>> +    async def _on_start(self) -> None:
>> +        """
>> +        Async callback invoked after connection and loop start.
>> +
>> +        This callback is invoked after the stream is opened AND after
>> +        the reader/writer tasks have been started. Use this callback to
>> +        auto-perform certain tasks during the connect() call.
>> +        """
>> +        # Nothing to do in the general case.
>> +
>> +    def _schedule_disconnect(self) -> None:
>> +        """
>> +        Initiate a disconnect; idempotent.
>> +
>> +        This is called by the reader/writer tasks upon exceptions,
>> +        or directly by a user call to `disconnect()`.
>> +        """
>> +        if not self._dc_task:
>> +            self._dc_task = create_task(self._bh_disconnect())
>> +
>> +    async def _wait_disconnect(self) -> None:
>> +        """
>> +        _wait_disconnect waits for a scheduled disconnect to finish.
>> +
>> +        This function will gather any bottom half exceptions and re-raise them;
>> +        so it is intended to be used in the upper half call chain.
>> +
>> +        If a single exception is encountered, it will be re-raised faithfully.
>> +        If multiple are found, they will be multiplexed into a MultiException.
>> +
>> +        :raise: `Exception`      Many kinds; anything the bottom half raises.
>> +        :raise: `MultiException` When the Reader/Writer both have exceptions.
>> +        """
>> +        assert self._dc_task
>> +        await self._dc_task
>> +        self._dc_task = None
>> +
>> +        try:
>> +            self._tasks.result()
>> +        finally:
>> +            self._cleanup()
>> +
>> +    def _cleanup(self) -> None:
>> +        """
>> +        Fully reset this object to a clean state.
>> +        """
>> +        assert not self.running
>> +        assert self._dc_task is None
>> +        # _tasks.result() called in _wait_disconnect does _tasks cleanup, so:
>> +        assert not self._tasks
>> +
>> +        self._reader = None
>> +        self._writer = None
>> +
>> +    # ------------------------------
>> +    # Section: Bottom Half functions
>> +    # ------------------------------
>> +
>> +    async def _bh_disconnect(self) -> None:
>> +        """
>> +        Disconnect and cancel all outstanding tasks.
>> +
>> +        It is designed to be called from its task context, self._dc_task.
>> +        """
>> +        # RFC: Maybe I shot myself in the foot by trying too hard to
>> +        # group the tasks together as one unit. I suspect the ideal
>> +        # cancellation order here is actually: MessageWriter,
>> +        # StreamWriter, MessageReader
>> +
>> +        # What I have here instead is MessageWriter, MessageReader,
>> +        # StreamWriter
>> +
>> +        # Cancel the the message reader/writer.
>> +        await self._tasks.cancel()
>> +
>> +        # Handle the stream writer itself, now.
>> +        if self._writer:
>> +            if not self._writer.is_closing():
>> +                self.logger.debug("Writer is open; draining")
>> +                await self._writer.drain()
>> +                self.logger.debug("Closing writer")
>> +                self._writer.close()
>> +            self.logger.debug("Awaiting writer to fully close")
>> +            await wait_closed(self._writer)
>> +            self.logger.debug("Fully closed.")
>> +
>> +        # TODO: Add a hook for higher-level protocol cancellations here?
>> +        #       (Otherwise, the disconnected logging event happens too soon.)
>> +
>> +        self.logger.debug("Protocol Disconnected.")
>> +
>> +    async def _bh_loop_forever(self, async_fn: _TaskFN, name: str) -> None:
>> +        """
>> +        Run one of the bottom-half functions in a loop forever.
>> +
>> +        If the bottom half ever raises any exception, schedule a disconnect.
>> +        """
>> +        try:
>> +            while True:
>> +                await async_fn()
>> +        except asyncio.CancelledError as err:
>> +            # We are cancelled (by _bh_disconnect), so no need to call it.
>> +            self.logger.debug("Task.%s: cancelled: %s.",
>> +                              name, type(err).__name__)
>> +            raise
>> +        except:
>> +            self.logger.error("Task.%s: failure:\n%s\n", name,
>> +                              pretty_traceback())
>> +            self.logger.debug("Task.%s: scheduling disconnect.", name)
>> +            self._schedule_disconnect()
>> +            raise
>> +        finally:
>> +            self.logger.debug("Task.%s: exiting.", name)
>> +
>> +    async def _bh_send_message(self) -> None:
>> +        """
>> +        Wait for an outgoing message, then send it.
>> +        """
>> +        self.logger.log(5, "Waiting for message in outgoing queue to send ...")
>> +        msg = await self._outgoing.get()
>> +        try:
>> +            self.logger.log(5, "Got outgoing message, sending ...")
>> +            await self._send(msg)
>> +        finally:
>> +            self._outgoing.task_done()
>> +            self.logger.log(5, "Outgoing message sent.")
>> +
>> +    async def _bh_recv_message(self) -> None:
>> +        """
>> +        Wait for an incoming message and call `_on_message` to route it.
>> +
>> +        Exceptions seen may be from `_recv` or from `_on_message`.
>> +        """
>> +        self.logger.log(5, "Waiting to receive incoming message ...")
>> +        msg = await self._recv()
>> +        self.logger.log(5, "Routing message ...")
>> +        await self._on_message(msg)
>> +        self.logger.log(5, "Message routed.")
>> +
>> +    # ---------------------
>> +    # Section: Datagram I/O
>> +    # ---------------------
>> +
>> +    def _cb_outbound(self, msg: T) -> T:
>> +        """
>> +        Callback: outbound message hook.
>> +
>> +        This is intended for subclasses to be able to add arbitrary hooks to
>> +        filter or manipulate outgoing messages. The base implementation
>> +        does nothing but log the message without any manipulation of the
>> +        message. It is designed for you to invoke super() at the tail of
>> +        any overridden method.
>> +
>> +        :param msg: raw outbound message
>> +        :return: final outbound message
>> +        """
>> +        self.logger.debug("--> %s", str(msg))
>> +        return msg
>> +
>> +    def _cb_inbound(self, msg: T) -> T:
>> +        """
>> +        Callback: inbound message hook.
>> +
>> +        This is intended for subclasses to be able to add arbitrary hooks to
>> +        filter or manipulate incoming messages. The base implementation
>> +        does nothing but log the message without any manipulation of the
>> +        message. It is designed for you to invoke super() at the head of
>> +        any overridden method.
>> +
>> +        This method does not "handle" incoming messages; it is a filter.
>> +        The actual "endpoint" for incoming messages is `_on_message()`.
>> +
>> +        :param msg: raw inbound message
>> +        :return: processed inbound message
>> +        """
>> +        self.logger.debug("<-- %s", str(msg))
>> +        return msg
>> +
>> +    async def _readline(self) -> bytes:
>> +        """
>> +        Wait for a newline from the incoming reader.
>> +
>> +        This method is provided as a convenience for upper-layer
>> +        protocols, as many will be line-based.
>> +
>> +        This function *may* return a sequence of bytes without a
>> +        trailing newline if EOF occurs, but *some* bytes were
>> +        received. In this case, the next call will raise EOF.
>> +
>> +        :raise OSError: Stream-related errors.
>> +        :raise EOFError: If the reader stream is at EOF and there
>> +                         are no bytes to return.
>> +        """
>> +        assert self._reader is not None
>> +        msg_bytes = await self._reader.readline()
>> +        self.logger.log(5, "Read %d bytes", len(msg_bytes))
>> +
>> +        if not msg_bytes:
>> +            if self._reader.at_eof():
>> +                self.logger.debug("EOF")
>> +                raise EOFError()
>> +
>> +        return msg_bytes
>> +
>> +    async def _do_recv(self) -> T:
>> +        """
>> +        Abstract: Read from the stream and return a message.
>> +
>> +        Very low-level; intended to only be called by `_recv()`.
>> +        """
>> +        raise NotImplementedError
>> +
>> +    async def _recv(self) -> T:
>> +        """
>> +        Read an arbitrary protocol message. (WARNING: Extremely low-level.)
>> +
>> +        This function is intended primarily for _bh_recv_message to use
>> +        in an asynchronous task loop. Using it outside of this loop will
>> +        "steal" messages from the normal routing mechanism. It is safe to
>> +        use during `_on_connect()`, but should not be used otherwise.
>> +
>> +        This function uses `_do_recv()` to retrieve the raw message, and
>> +        then transforms it using `_cb_inbound()`.
>> +
>> +        Errors raised may be any of those from either method implementation.
>> +
>> +        :return: A single (filtered, processed) protocol message.
>> +        """
>> +        message = await self._do_recv()
>> +        return self._cb_inbound(message)
>> +
>> +    def _do_send(self, msg: T) -> None:
>> +        """
>> +        Abstract: Write a message to the stream.
>> +
>> +        Very low-level; intended to only be called by `_send()`.
>> +        """
>> +        raise NotImplementedError
>> +
>> +    async def _send(self, msg: T) -> None:
>> +        """
>> +        Send an arbitrary protocol message. (WARNING: Low-level.)
>> +
>> +        Like `_read()`, this function is intended to be called by the writer
>> +        task loop that processes outgoing messages. This function will
>> +        transform any outgoing messages according to `_cb_outbound()`.
>> +
>> +        :raise: OSError - Various stream errors.
>> +        """
>> +        assert self._writer is not None
>> +        msg = self._cb_outbound(msg)
>> +        self._do_send(msg)
>> +
>> +    async def _on_message(self, msg: T) -> None:
>> +        """
>> +        Called when a new message is received.
>> +
>> +        Executed from within the reader loop BH, so be advised that waiting
>> +        on other asynchronous tasks may be risky, depending. Additionally,
>> +        any errors raised here will directly cause the loop to halt; limit
>> +        error checking to what is strictly necessary for message routing.
>> +
>> +        :param msg: The incoming message, already logged/filtered.
>> +        """
>> +        # Nothing to do in the abstract case.
>> -- 
>> 2.30.2
>>



^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH RFC 4/7] message: add QMP Message type
  2021-04-13 20:07   ` Stefan Hajnoczi
@ 2021-04-14 17:39     ` John Snow
  0 siblings, 0 replies; 21+ messages in thread
From: John Snow @ 2021-04-14 17:39 UTC (permalink / raw)
  To: Stefan Hajnoczi; +Cc: armbru, crosa, qemu-devel, ehabkost

On 4/13/21 4:07 PM, Stefan Hajnoczi wrote:
> On Tue, Apr 13, 2021 at 11:55:50AM -0400, John Snow wrote:
>> This is an abstraction that represents a single message either sent to
>> or received from the server. It is used to subclass the
>> AsyncProtocol(Generic[T]) type.
>>
>> It was written such that it can be populated by either raw data or by a
>> dict, with the other form being generated on-demand, as-needed.
>>
>> It behaves almost exactly like a dict, but has some extra methods and a
>> special constructor. (It should quack fairly convincingly.)
>>
>> Signed-off-by: John Snow <jsnow@redhat.com>
>> ---
>>   message.py | 196 +++++++++++++++++++++++++++++++++++++++++++++++++++++
>>   1 file changed, 196 insertions(+)
>>   create mode 100644 message.py
>>
>> diff --git a/message.py b/message.py
>> new file mode 100644
>> index 0000000..5c7e828
>> --- /dev/null
>> +++ b/message.py
>> @@ -0,0 +1,196 @@
>> +"""
>> +QMP Message format and errors.
>> +
>> +This module provides the `Message` class, which represents a single QMP
>> +message sent to or from the server. Several error-classes that depend on
>> +knowing the format of this message are also included here.
>> +"""
>> +
>> +import json
>> +from json import JSONDecodeError
>> +from typing import (
>> +    Dict,
>> +    ItemsView,
>> +    Iterable,
>> +    KeysView,
>> +    Optional,
>> +    Union,
>> +    ValuesView,
>> +)
>> +
>> +from error import (
>> +    DeserializationError,
>> +    ProtocolError,
>> +    UnexpectedTypeError,
>> +)
>> +
>> +
>> +class Message:
>> +    """
>> +    Represents a single QMP protocol message.
>> +
>> +    QMP uses JSON objects as its basic communicative unit; so this
>> +    object behaves like a MutableMapping. It may be instantiated from
>> +    either another mapping (like a dict), or from raw bytes that still
>> +    need to be deserialized.
>> +
>> +    :param value: Initial value, if any.
>> +    :param eager: When true, attempt to serialize (or deserialize) the
>> +                  initial value immediately, such that conversion exceptions
>> +                  are raised during the call to the initialization method.
>> +    """
> 
> Why define this class instead of using dicts? It's a very fancy way of
> calling json.dumps() and json.loads().
> 

Mostly just to associate the de/serialization methods of the 
unit-message with that data type, and it's nice for strict typing.

It does repeat a lot of boilerplate to just re-implement the 
dict-quacking; but I think I might actually be able to get around that 
by inheriting from MutableMapping to get all of that boilerplate "for free".

I'll see. I'll put it high on the list for the chopping block.

--js



^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH RFC 6/7] qmp_protocol: add QMP client implementation
  2021-04-14  5:44   ` Stefan Hajnoczi
@ 2021-04-14 17:50     ` John Snow
  2021-04-15  9:23       ` Stefan Hajnoczi
  0 siblings, 1 reply; 21+ messages in thread
From: John Snow @ 2021-04-14 17:50 UTC (permalink / raw)
  To: Stefan Hajnoczi; +Cc: armbru, crosa, qemu-devel, ehabkost

On 4/14/21 1:44 AM, Stefan Hajnoczi wrote:
> On Tue, Apr 13, 2021 at 11:55:52AM -0400, John Snow wrote:
>> +    async def _execute(self, msg: Message) -> object:
>> +        """
>> +        The same as `execute_msg()`, but without safety mechanisms.
>> +
>> +        Does not assign an execution ID and does not check that the form
>> +        of the message being sent is valid.
>> +
>> +        This method *Requires* an 'id' parameter to be set on the
>> +        message, it will not set one for you like `execute()` or
>> +        `execute_msg()`.
>> +
>> +        Do not use "__aqmp#00000" style IDs, use something else to avoid
>> +        potential clashes. If this ID clashes with an ID presently
>> +        in-use or otherwise clashes with the auto-generated IDs, the
>> +        response routing mechanisms in _on_message may very well fail
>> +        loudly enough to cause the entire loop to crash.
>> +
>> +        The ID should be a str; or at least something JSON
>> +        serializable. It *must* be hashable.
>> +        """
>> +        exec_id = cast(str, msg['id'])
>> +        self.logger.debug("Execute(%s): '%s'", exec_id,
>> +                          msg.get('execute', msg.get('exec-oob')))
>> +
>> +        queue: asyncio.Queue[Message] = asyncio.Queue(maxsize=1)
>> +        task = create_task(self._bh_execute(msg, queue))
> 
> We're already in a coroutine, can we await queue.get() ourselves instead
> of creating a new task?
> 
> I guess this is done in order to use Task.cancel() in _bh_disconnect()
> but it seems simpler to use queue both for success and cancellation.
> Fewer tasks are easier to reason about.
> 

...queues do not have a cancellation signal :( :( :( :(

There's no way to "cancel" a queue:
https://docs.python.org/3/library/asyncio-queue.html#queue

You *could* craft a special message and inject an exception into the 
queue to notify the reader that the message will never arrive, but it 
feels like working against the intended mechanism of that primitive. It 
really feels like it wants to be wrapped in a *task*.

An earlier draft used an approach where it crafted a special "mailbox" 
object, comprised of message, event, and error fields. The waiter sets 
up a mailbox and then blocks on the event. Upon being notified of an 
event, the caller checks to see if the message OR the error field was 
filled.

I wound up removing it, because I felt it added too much custom 
machinery/terminology and instead went with Tasks and a queue with a 
depth of one.

Both feel like they are working against the intended mechanisms to a 
degree. I am open to suggestions here!

(It's also worth noting that iotests will want the ability to separate 
the queueing of a message and the waiting for that message. The current 
design only allows for send-and-wait, and not separate send-then-wait 
semantics. Tasks do provide a rather convenient handle if I want to 
split that mechanism out.)

All of the above options are a little hacky to me. Any thoughts or 
preferences?

--js



^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH RFC 0/7] RFC: Asynchronous QMP Draft
  2021-04-14  6:38 ` [PATCH RFC 0/7] RFC: Asynchronous QMP Draft Stefan Hajnoczi
@ 2021-04-14 19:17   ` John Snow
  2021-04-15  9:52     ` Stefan Hajnoczi
  0 siblings, 1 reply; 21+ messages in thread
From: John Snow @ 2021-04-14 19:17 UTC (permalink / raw)
  To: Stefan Hajnoczi; +Cc: armbru, crosa, qemu-devel, ehabkost

First and foremost, thank you for reviewing this! It is very helpful to 
me to see what others think of this pet project I've been growing in the 
privacy of my own mind.

On 4/14/21 2:38 AM, Stefan Hajnoczi wrote:
> Below are the API docs that I found helpful for understanding the big
> picture.
> 
> The QMP.execute() API is nice.
> 

Yes. It mimics (sync) qmp.command(), which I believe Eduardo Habkost 
wrote. I think it's the correct idea for a generic (QAPI-schema 
ignorant) QMP client library meant to be "used".

I think raising RPC in-band execution errors as exceptions is a nice 
"pythonic" way to do it.

(And, if desired, it is possible to use the QAPI generator to generate 
wrappers around this interface using type-safe arguments in a low-level 
SDK layer. I think that would be pretty swell. We are not there yet, 
though, and I'll focus on this layer first.)

> Regarding QMP events, I can think of two approaches:
> 1. Callbacks
> 2. An async get_event(name=Optional[str]) -> object API
>     (plus get_event_nowait(name=Optional[str]) -> object)
> 
> (There's probably a third approach using async iterators but it's
> similar to get_event().)
> 
> Both approaches are useful. The first is good in larger asynchronous
> applications that perform many tasks concurrently. The second is good
> when there is just one specific thing to do, like waiting for a block
> job to complete.
> 
(1) On callbacks:

Callbacks are what I meagerly mocked up; discord.py has a "cute" little 
hack that works like this:

bot = commands.bot(...)

@bot.event
async def on_ready():
     print("Logged in as")
     print(bot.user.name)
     ...

(See 
https://github.com/Rapptz/discord.py/blob/master/examples/basic_bot.py )

I find this to be extremely cute: the framework uses the name of the 
callback to determine which event you are registering, and uses the 
decorator to merely register the callback.

This makes a nice, uncomplicated way to plug coroutines into the state 
machine of the client loop in the most basic cases.

I thought it might be nice to try and mimic that design, by perhaps 
using the names of QMP events as their own 'queues', and then 
dispatching user callbacks as desired. (Possibly with one mega-queue 
that exists for ALL callbacks.)

For instance, something like this:

@qmp.event
async def job_status_block_job_ready(qmp, event):
     ...

or more generally,

@qmp.event_handler
async def my_own_event_handler(qmp, event):
     ...

I didn't spend much time on the actual queue or dispatch mechanism in my 
draft, though, but it could be "bolstered" into a more full-fledged API 
if desired.

One nice thing about this design is that events aren't "consumed" by a 
caller, they are just dispatched to anyone waiting on an event of that type.

As I recall, events getting "eaten" at the wrong time was a major burden 
when writing iotests that exercised multiple jobs, transactions, etc.

(A side note: a higher-level VM class that uses QMP may wish to capture 
certain events to record state changes, such that the state can be 
queried at an arbitrary point by any number of callers without needing 
to have witnessed the state change event personally. That's not really 
important here in the protocol library, though, which will pretend not 
to know which events exist -- but it's a consideration for making sure 
the design that IS chosen can be extensible to support that kind of thing.)


(2) On get_event or async iterators:

This is likely a good ad-hoc feature. Should it only work for events 
that are delivered from that moment in time, or should there be a 
"backlog" of events to deliver?

Should waiting on events in this manner "consume" the event from the 
backlog, if we have one?

My concern::

   await qmp.execute('blockdev-backup', {...etc...})
   async for event in qmp.get_events():
       ...


It's possible that an event we'd like to see has already occurred by the 
time we get around to invoking the async iterator. You'd really want to 
start checking for events *before* you issue the job request, but that 
involves tasks, and the code doesn't "flow" well anymore.

I don't have ideas, at-present, for how to make things like iotests 
"flow" well in a linear co-routine sense...

...although, maybe it's worth creating something like an Event Listener 
object that, from its creation, stashes events from that point forward. 
How about::

   async with qmp.event_listener() as events:
       await qmp.execute('blockdev-backup', {...})
       async for event in events:
           ...

Actually, that seems pretty cool. What do you think? I think it's fairly 
elegant for ad-hoc use. We could even extend the constructor to accept 
filtering criteria if we wanted to, later.

Possibly we could also augment the Event Listener object to support a 
few methods to facilitate blocking until a certain event occurs, like::

   async with qmp.event_listener() as events:
       await qmp.execute('blockdev-backup', {...})
       await events.event('JOB_STATUS_CHANGE', status="pending")
       await qmp.execute('job-finalize', {...})
       ...


I think that's pretty workable, actually! And it could co-exist 
perfectly well alongside event callback handlers.


> My general impression is that the public API is nice and usable but the
> implementation is complex and risks discouraging other people from
> hacking on the code. There are too many abstractions and while it's
> highly structured, there is a cost to having all this infrastructure. I
> think simplifying it would make it easier for others to understand and
> contribute to the code.
> 

It's a fair point. I am hoping that the protocol.py layers won't need to 
be touched quite so much. (Famous last words) and that most of the 
interesting work can happen at the qmp_protocol.py level and above, though.

> Ideas: open code or inline simple things instead of defining
> abstractions that only have 1 user, drop the pydantic models, drop
> classes that just wrap things like Message and the exception hierarchy,
> combine protocol and qmp_protocol.
> 

(1) On models:

Pydantic models are definitely optional at this stage, but I am floating 
them here to prepare people for the idea that I might try to get more 
mileage out of them in the future to offer a type-safe, QAPI-aware SDK 
layer.

They're definitely only a mild benefit here, for now, as the strict 
typing they help provide is not floated upwards or exposed to the user.


(2) On the Message class:

I'll try a draft where I try to drop or simplify the Message class. It 
seems like a good candidate, but I think I'm subjectively afraid I won't 
like the inlining. We'll see!


(3) On the Exception hierarchy:

Let's talk about error handling design a little bit more. I want to make 
sure that the errors that can happen and in which circumstances are 
obvious and have good names, but there might be better approaches to 
managing that complexity.


(4) On combining protocol and qmp_protocol:

Maybe. Do you want to look at the qtest implementation? It's somewhat 
ancillary to this project, but felt it would make a nice companion 
library. It doesn't benefit as strongly as QMP (As it does not offer 
anything like OOB), but it does have async messages it can send, so it 
can re-use the same infrastructure.

(Fully admit that the first draft, of course, did feature a combined 
protocol/qmp_protocol class. It was split out later.)

> Things that might be worth adding:
> 1. File descriptor passing support.

Do you have an example workflow that I can use to test this? This is a 
weak spot in my knowledge.

> 2. Introspection support to easily check if a command/feature is
>     available. Users can do this manually by sending QMP commands and
>     interpreting the response, but this may be common enough to warrant a
>     friendly API.
> 

I think this treads into QAPI-specific domain knowledge, and I might 
leave such features to a higher-level object.

The QMP spec itself does not define a mechanism by which the QMP 
protocol itself will reveal the valid commands, and so it might be up to 
a machine.py-based extension/capsulation of qmp_protocol to provide that.

(Though, I do agree; I want this feature somewhere. We do have such a 
thing coded into the existing qmp-shell tool, using the query-commands 
command. Maybe I can offer a subclass that offers some of these 
convenience features using a best-effort guess-and-check style 
introspection. Please forgive me if I focus on shoring up the design of 
the core implementation first.)

> Help on module qmp.qmp_protocol in qmp:
> 
> NAME
>      qmp.qmp_protocol - QMP Client Implementation
> 
> DESCRIPTION
>      This module provides the QMP class, which can be used to connect and
>      send commands to a QMP server such as QEMU. The QMP class can be used to
>      either connect to a listening server, or used to listen and accept an
>      incoming connection from the server.
> 
> CLASSES
>      qmp.error.AQMPError(builtins.Exception)
>          ExecuteError
>      qmp.protocol.AsyncProtocol(typing.Generic)
>          QMP
>      
>      class ExecuteError(qmp.error.AQMPError)
>       |  ExecuteError(sent: qmp.message.Message, received: qmp.message.Message, error: qmp.models.ErrorInfo)
>       |
>       |  Execution statement returned failure.
>       |
>       |  Method resolution order:
>       |      ExecuteError
>       |      qmp.error.AQMPError
>       |      builtins.Exception
>       |      builtins.BaseException
>       |      builtins.object
>       |
>       |  Methods defined here:
>       |
>       |  __init__(self, sent: qmp.message.Message, received: qmp.message.Message, error: qmp.models.ErrorInfo)
>       |      Initialize self.  See help(type(self)) for accurate signature.
>       |
>       |  __str__(self) -> str
>       |      Return str(self).
>       |
>       |  ----------------------------------------------------------------------
>       |  Data descriptors inherited from qmp.error.AQMPError:
>       |
>       |  __weakref__
>       |      list of weak references to the object (if defined)
>       |
>       |  ----------------------------------------------------------------------
>       |  Static methods inherited from builtins.Exception:
>       |
>       |  __new__(*args, **kwargs) from builtins.type
>       |      Create and return a new object.  See help(type) for accurate signature.
>       |
>       |  ----------------------------------------------------------------------
>       |  Methods inherited from builtins.BaseException:
>       |
>       |  __delattr__(self, name, /)
>       |      Implement delattr(self, name).
>       |
>       |  __getattribute__(self, name, /)
>       |      Return getattr(self, name).
>       |
>       |  __reduce__(...)
>       |      Helper for pickle.
>       |
>       |  __repr__(self, /)
>       |      Return repr(self).
>       |
>       |  __setattr__(self, name, value, /)
>       |      Implement setattr(self, name, value).
>       |
>       |  __setstate__(...)
>       |
>       |  with_traceback(...)
>       |      Exception.with_traceback(tb) --
>       |      set self.__traceback__ to tb and return self.
>       |
>       |  ----------------------------------------------------------------------
>       |  Data descriptors inherited from builtins.BaseException:
>       |
>       |  __cause__
>       |      exception cause
>       |
>       |  __context__
>       |      exception context
>       |
>       |  __dict__
>       |
>       |  __suppress_context__
>       |
>       |  __traceback__
>       |
>       |  args
>      
>      class QMP(qmp.protocol.AsyncProtocol)
>       |  QMP(name: Optional[str] = None) -> None
>       |
>       |  Implements a QMP connection to/from the server.
>       |
>       |  Basic usage looks like this::
>       |
>       |    qmp = QMP('my_virtual_machine_name')
>       |    await qmp.connect(('127.0.0.1', 1234))
>       |    ...
>       |    res = await qmp.execute('block-query')
>       |    ...
>       |    await qmp.disconnect()
>       |

This reminds me.

I was briefly considering the idea that the QMP object could be 
converted into something like a QMP session factory instead, and you 
could use a context manager.

e.g.

qmp = QMP(session_factory_settings)
async with qmp.connect(...) as session:
     ...
     ...

with disconnect() being implicitly called upon the destruction of the 
session object when it goes out of scope.

I could also simply mock this up without creating a factory/session 
split, just by having the context manager simply return 'self':

proto = QMP(various_settings)
async with proto.connect(...) as qmp:
     ...
     ...

Though it's a little hacky, as with-expressions are expected to return 
*something*, and we actually already have a handle to that object.

>       |  :param name: Optional nickname for the connection, used for logging.
>       |
>       |  Method resolution order:
>       |      QMP
>       |      qmp.protocol.AsyncProtocol
>       |      typing.Generic
>       |      builtins.object
>       |
>       |  Methods defined here:
>       |
>       |  __init__(self, name: Optional[str] = None) -> None
>       |      Initialize self.  See help(type(self)) for accurate signature.
>       |
>       |  async execute(self, cmd: str, arguments: Optional[Mapping[str, object]] = None, oob: bool = False) -> object
>       |      Execute a QMP command and return the response.
>       |
>       |      :param cmd: QMP command name.
>       |      :param arguments: Arguments (if any). Must be JSON-serializable.
>       |      :param oob: If true, execute "out of band".
>       |
>       |      :raise: ExecuteError if the server returns an error response.
>       |      :raise: DisconnectedError if the connection was terminated early.
>       |
>       |      :return: Execution response from the server. The type of object depends
>       |               on the command that was issued, though most return a dict.
>       |
>       |  async execute_msg(self, msg: qmp.message.Message) -> object
>       |      Execute a QMP message and return the response.
>       |
>       |      :param msg: The QMP `Message` to execute.
>       |      :raises: ValueError if the QMP `Message` does not have either the
>       |               'execute' or 'exec-oob' fields set.
>       |      :raises: ExecuteError if the server returns an error response.
>       |      :raises: DisconnectedError if the connection was terminated early.
>       |
>       |      :return: Execution response from the server. The type of object depends
>       |               on the command that was issued, though most return a dict.
>       |
>       |  on_event(self, func: Callable[[ForwardRef('QMP'), qmp.message.Message], Awaitable[NoneType]]) -> Callable[[ForwardRef('QMP'), qmp.message.Message], Awaitable[NoneType]]
>       |      FIXME: Quick hack: decorator to register event handlers.
>       |
>       |      Use it like this::
>       |
>       |        @qmp.on_event
>       |        async def my_event_handler(qmp, event: Message) -> None:
>       |          print(f"Received event: {event['event']}")
>       |
>       |      RFC: What kind of event handler would be the most useful in
>       |      practical terms? In tests, we are usually waiting for an
>       |      event with some criteria to occur; maybe it would be useful
>       |      to allow "coroutine" style functions where we can block
>       |      until a certain event shows up?
>       |
>       |  ----------------------------------------------------------------------
>       |  Class methods defined here:
>       |
>       |  make_execute_msg(cmd: str, arguments: Optional[Mapping[str, object]] = None, oob: bool = False) -> qmp.message.Message from builtins.type
>       |      Create an executable message to be sent by `execute_msg` later.
>       |
>       |      :param cmd: QMP command name.
>       |      :param arguments: Arguments (if any). Must be JSON-serializable.
>       |      :param oob: If true, execute "out of band".
>       |
>       |      :return: An executable QMP message.
>       |
>       |  ----------------------------------------------------------------------
>       |  Data and other attributes defined here:
>       |
>       |  __orig_bases__ = (qmp.protocol.AsyncProtocol[qmp.message.Message],)
>       |
>       |  __parameters__ = ()
>       |
>       |  logger = <Logger qmp.qmp_protocol (WARNING)>
>       |
>       |  ----------------------------------------------------------------------
>       |  Methods inherited from qmp.protocol.AsyncProtocol:
>       |
>       |  async accept(self, address: Union[str, Tuple[str, int]], ssl: Optional[ssl.SSLContext] = None) -> None
>       |      Accept a connection and begin processing message queues.
>       |
>       |      :param address: Address to connect to;
>       |                      UNIX socket path or TCP address/port.
>       |      :param ssl:     SSL context to use, if any.
>       |
>       |      :raise: `StateError`   (loop is running or disconnecting.)
>       |      :raise: `ConnectError` (Connection was not successful.)
>       |
>       |  async connect(self, address: Union[str, Tuple[str, int]], ssl: Optional[ssl.SSLContext] = None) -> None
>       |      Connect to the server and begin processing message queues.
>       |
>       |      :param address: Address to connect to;
>       |                      UNIX socket path or TCP address/port.
>       |      :param ssl:     SSL context to use, if any.
>       |
>       |      :raise: `StateError`   (loop is running or disconnecting.)
>       |      :raise: `ConnectError` (Connection was not successful.)
>       |
>       |  async disconnect(self) -> None
>       |      Disconnect and wait for all tasks to fully stop.
>       |
>       |      If there were exceptions that caused the bottom half to terminate
>       |      prematurely, they will be raised here.
>       |
>       |      :raise: `Exception`      Arbitrary exceptions re-raised on behalf of
>       |                               the bottom half.
>       |      :raise: `MultiException` Iterable Exception used to multiplex multiple
>       |                               exceptions when multiple tasks failed.
>       |
>       |  ----------------------------------------------------------------------
>       |  Readonly properties inherited from qmp.protocol.AsyncProtocol:
>       |
>       |  disconnecting
>       |      Return True when the loop is disconnecting, or disconnected.
>       |
>       |  running
>       |      Return True when the loop is currently connected and running.
>       |
>       |  unconnected
>       |      Return True when the loop is fully idle and quiesced.
>       |
>       |      Returns True specifically when the loop is neither `running`
>       |      nor `disconnecting`. A call to `disconnect()` is required
>       |      to transition from `disconnecting` to `unconnected`.

Any thoughts on these? I think I accidentally created some landmines 
here, actually.

disconnecting: This one is, I think, consistent with other primitives in 
Python like "is_closing", where it also does include the "fully closed" 
state. It hopefully adheres to the principle of least surprise.


running: This one is maybe bad, though. I mean this to be "fully 
running", i.e. the loop is fully open and nothing is wrong. I believe it 
is not true until sometime "in the middle" of the accept() or connect() 
calls; i.e. after the Reader/Writer tasks are started. I worry that this 
is a confusing point *within* this library.

Maybe it ought to report "true" as soon as we start trying to build a 
session; and I may need other internal helpers for testing more specific 
conditions inside the loop.


unconnected: I think this name is just genuinely bad, but I needed some 
kind of state to represent "fully quisced, not connected, and idle." 
i.e. that we were prepared and able to issue another accept() or 
connect() call.

"quiesced"? (Kind of jargon-y.)

"disconnected"? (Viable, but introduces ambiguity between 
fully-disconnected but waiting for the user to collect loop 
status/exceptions.)

"idle"? (Might be confused with a loop that's open.)


... Maybe it doesn't need to be externally exposed at all, anyway. The 
primary purpose for it is to safe-guard calls to connect() and accept() 
such that if the loop is engaged or not-yet-cleaned up, that these calls 
will bark an error back at the user.

Perhaps an internal property would suffice in that case.

_has_session True/False might suffice for this concept.

Thoughts?

>       |
>       |  ----------------------------------------------------------------------
>       |  Data descriptors inherited from qmp.protocol.AsyncProtocol:
>       |
>       |  __dict__
>       |      dictionary for instance variables (if defined)
>       |
>       |  __weakref__
>       |      list of weak references to the object (if defined)
>       |
>       |  ----------------------------------------------------------------------
>       |  Class methods inherited from typing.Generic:
>       |
>       |  __class_getitem__(params) from builtins.type
>       |
>       |  __init_subclass__(*args, **kwargs) from builtins.type
>       |      This method is called when a class is subclassed.
>       |
>       |      The default implementation does nothing. It may be
>       |      overridden to extend subclasses.
> 

 From here down, ...

> DATA
>      Awaitable = typing.Awaitable
>          A generic version of collections.abc.Awaitable.
>      
>      Callable = typing.Callable
>          Callable type; Callable[[int], str] is a function of (int) -> str.
>          
>          The subscription syntax must always be used with exactly two
>          values: the argument list and the return type.  The argument list
>          must be a list of types or ellipsis; the return type must be a single type.
>          
>          There is no syntax to indicate optional or keyword arguments,
>          such function types are rarely used as callback types.
>      
>      Dict = typing.Dict
>          A generic version of dict.
>      
>      List = typing.List
>          A generic version of list.
>      
>      Mapping = typing.Mapping
>          A generic version of collections.abc.Mapping.
>      
>      Optional = typing.Optional
>          Optional type.
>          
>          Optional[X] is equivalent to Union[X, None].
>      
>      Tuple = typing.Tuple
>          Tuple type; Tuple[X, Y] is the cross-product type of X and Y.
>          
>          Example: Tuple[T1, T2] is a tuple of two elements corresponding
>          to type variables T1 and T2.  Tuple[int, float, str] is a tuple
>          of an int, a float and a string.
>          
>          To specify a variable-length tuple of homogeneous type, use Tuple[T, ...].
> 

... Oops, I need to "hide" some more of those things from help, I think 
by specifying __all__ in that module I can restrict some of these things 
that aren't interesting to see in the help menu.

> FILE
>      /tmp/foo/qmp/qmp_protocol.py
> 
> 

In general, do you feel this design is roughly serviceable and worth 
pursuing cleanups for? I realize it's a bit "much" but as the audience 
extends beyond our castle walls, I wanted to be quite thorough. It's a 
design that's likely overkill for iotests, but hopefully just about 
correct for external users to prototype toy management scripts with.

At some point, I might try to get it checked in to the QEMU tree as an 
"alpha" library so that iterations on the design can be debated on their 
own merit instead of trying to update a giant new-code-blob. I am not 
sure if it's ready for that just yet, but I think it's close to that 
point where it needs to not live primarily in a separate repo anymore.

Thanks again,
-- John



^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH RFC 3/7] protocol: generic async message-based protocol loop
  2021-04-14 17:29     ` John Snow
@ 2021-04-15  9:14       ` Stefan Hajnoczi
  0 siblings, 0 replies; 21+ messages in thread
From: Stefan Hajnoczi @ 2021-04-15  9:14 UTC (permalink / raw)
  To: John Snow; +Cc: armbru, crosa, qemu-devel, ehabkost

[-- Attachment #1: Type: text/plain, Size: 3505 bytes --]

On Wed, Apr 14, 2021 at 01:29:40PM -0400, John Snow wrote:
> On 4/13/21 4:00 PM, Stefan Hajnoczi wrote:
> > On Tue, Apr 13, 2021 at 11:55:49AM -0400, John Snow wrote:
> One of the reasons it's split out here like this is because I also wrote a
> qtest protocol that uses the same infrastructure. I tried to keep both of
> those looking as simple as possible.

If infrastructure is needed, let's add it when it's needed but not
before then. Reviewers can't take into account qtest requirements
without seeing that code.

> > > +        try:
> > > +            await self._new_session(self._do_accept(address, ssl))
> > > +        except Exception as err:
> > > +            emsg = "Failed to accept incoming connection"
> > > +            self.logger.error("%s:\n%s\n", emsg, pretty_traceback())
> > > +            raise ConnectError(f"{emsg}: {err!s}") from err
> > 
> > Wrapping the exception in ConnectError() obfuscates what's going on IMO.
> > 
> 
> It can. I will admit to you that the reason I did it was so that at a higher
> level it was possible to write things like:
> 
> try:
>     await qmp.connect(('127.0.0.1', 1234))
> except ConnectError:
>     print("Oh no! ...")
>     handle_connect_problem()
> 
> while still allowing for other problems to "bubble up", for instance,
> keyboard interrupts are BaseException and won't be caught by that wrapper.

That's what "except Exception as err" is for. It allows system exiting
exceptions through.

> I adhere to this pattern fairly regularly in this draft; using "container"
> exceptions to declare a semantic problem regardless of the actual underlying
> cause, assuming that a high-level user will (probably) be unable to make
> sense of the internal details anyway.
> 
> Part of the reason I do this is so that I could write in the docstrings what
> exceptions we actually expect to be raised and under what circumstances.
> 
> I suppose what I don't document, though, is how to get at the "root"
> exception in these cases. You can do it:
> 
> try:
>     ...
> except Foo as err:
>     root_err = err.__cause__
>     ...
> 
> 
> I suppose the specific problem I wanted to solve is this: When we fail to
> connect, what exception will we see? How many types of exceptions? Which
> ones should I try to catch, and which ones should I not? Which error classes
> merit a retry, and which do not?
> 
> It seemed almost impossible to enumerate, so writing informed client code
> seemed difficult or impossible.
> 
> Any thoughts on this perception?

I reviewed error.py again. I was assuming the other exceptions are like
ConnectError, which would have been bad but they seem to be more
actionable. I think ConnectError is okay as a catch-all here.

requests is similar, its exceptions consist mostly of actionable
exceptions but it does have catch-all exceptions for general networking
errors:
https://docs.python-requests.org/en/latest/api/#exceptions

MultiException and the 4-level exception inheritance hierarchy in
error.py still seem complex. Is it really necessary to have
ProtocolError, RawProtocolError, and MsgProtocolError abstract classes?
Having them forces the user to make extra decisions about how to catch
exceptions. If error handling is involved, more mistakes will be made.

The requests package doesn't document the exception hierarchy. They keep
it simple with a flat list of exceptions. (RequestException is the base
class though.)

Stefan

[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 488 bytes --]

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH RFC 6/7] qmp_protocol: add QMP client implementation
  2021-04-14 17:50     ` John Snow
@ 2021-04-15  9:23       ` Stefan Hajnoczi
  0 siblings, 0 replies; 21+ messages in thread
From: Stefan Hajnoczi @ 2021-04-15  9:23 UTC (permalink / raw)
  To: John Snow; +Cc: armbru, crosa, qemu-devel, ehabkost

[-- Attachment #1: Type: text/plain, Size: 2469 bytes --]

On Wed, Apr 14, 2021 at 01:50:37PM -0400, John Snow wrote:
> On 4/14/21 1:44 AM, Stefan Hajnoczi wrote:
> > On Tue, Apr 13, 2021 at 11:55:52AM -0400, John Snow wrote:
> > > +    async def _execute(self, msg: Message) -> object:
> > > +        """
> > > +        The same as `execute_msg()`, but without safety mechanisms.
> > > +
> > > +        Does not assign an execution ID and does not check that the form
> > > +        of the message being sent is valid.
> > > +
> > > +        This method *Requires* an 'id' parameter to be set on the
> > > +        message, it will not set one for you like `execute()` or
> > > +        `execute_msg()`.
> > > +
> > > +        Do not use "__aqmp#00000" style IDs, use something else to avoid
> > > +        potential clashes. If this ID clashes with an ID presently
> > > +        in-use or otherwise clashes with the auto-generated IDs, the
> > > +        response routing mechanisms in _on_message may very well fail
> > > +        loudly enough to cause the entire loop to crash.
> > > +
> > > +        The ID should be a str; or at least something JSON
> > > +        serializable. It *must* be hashable.
> > > +        """
> > > +        exec_id = cast(str, msg['id'])
> > > +        self.logger.debug("Execute(%s): '%s'", exec_id,
> > > +                          msg.get('execute', msg.get('exec-oob')))
> > > +
> > > +        queue: asyncio.Queue[Message] = asyncio.Queue(maxsize=1)
> > > +        task = create_task(self._bh_execute(msg, queue))
> > 
> > We're already in a coroutine, can we await queue.get() ourselves instead
> > of creating a new task?
> > 
> > I guess this is done in order to use Task.cancel() in _bh_disconnect()
> > but it seems simpler to use queue both for success and cancellation.
> > Fewer tasks are easier to reason about.
> > 
> 
> ...queues do not have a cancellation signal :( :( :( :(
> 
> There's no way to "cancel" a queue:
> https://docs.python.org/3/library/asyncio-queue.html#queue
> 
> You *could* craft a special message and inject an exception into the queue
> to notify the reader that the message will never arrive, but it feels like
> working against the intended mechanism of that primitive. It really feels
> like it wants to be wrapped in a *task*.

That's what I meant by "it seems simpler to use the queue both for
success and cancellation". Just queue a message that says the execution
has been cancelled.

Stefan

[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 488 bytes --]

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH RFC 0/7] RFC: Asynchronous QMP Draft
  2021-04-14 19:17   ` John Snow
@ 2021-04-15  9:52     ` Stefan Hajnoczi
  2021-04-20  2:26       ` John Snow
  0 siblings, 1 reply; 21+ messages in thread
From: Stefan Hajnoczi @ 2021-04-15  9:52 UTC (permalink / raw)
  To: John Snow; +Cc: armbru, crosa, qemu-devel, ehabkost

[-- Attachment #1: Type: text/plain, Size: 9770 bytes --]

On Wed, Apr 14, 2021 at 03:17:48PM -0400, John Snow wrote:
> First and foremost, thank you for reviewing this! It is very helpful to me
> to see what others think of this pet project I've been growing in the
> privacy of my own mind.
> 
> On 4/14/21 2:38 AM, Stefan Hajnoczi wrote:
> > Below are the API docs that I found helpful for understanding the big
> > picture.
> > 
> > The QMP.execute() API is nice.
> > 
> 
> Yes. It mimics (sync) qmp.command(), which I believe Eduardo Habkost wrote.
> I think it's the correct idea for a generic (QAPI-schema ignorant) QMP
> client library meant to be "used".
> 
> I think raising RPC in-band execution errors as exceptions is a nice
> "pythonic" way to do it.
> 
> (And, if desired, it is possible to use the QAPI generator to generate
> wrappers around this interface using type-safe arguments in a low-level SDK
> layer. I think that would be pretty swell. We are not there yet, though, and
> I'll focus on this layer first.)
> 
> > Regarding QMP events, I can think of two approaches:
> > 1. Callbacks
> > 2. An async get_event(name=Optional[str]) -> object API
> >     (plus get_event_nowait(name=Optional[str]) -> object)
> > 
> > (There's probably a third approach using async iterators but it's
> > similar to get_event().)
> > 
> > Both approaches are useful. The first is good in larger asynchronous
> > applications that perform many tasks concurrently. The second is good
> > when there is just one specific thing to do, like waiting for a block
> > job to complete.
> > 
> (1) On callbacks:
> 
> Callbacks are what I meagerly mocked up; discord.py has a "cute" little hack
> that works like this:
> 
> bot = commands.bot(...)
> 
> @bot.event
> async def on_ready():
>     print("Logged in as")
>     print(bot.user.name)
>     ...
> 
> (See https://github.com/Rapptz/discord.py/blob/master/examples/basic_bot.py
> )
> 
> I find this to be extremely cute: the framework uses the name of the
> callback to determine which event you are registering, and uses the
> decorator to merely register the callback.
> 
> This makes a nice, uncomplicated way to plug coroutines into the state
> machine of the client loop in the most basic cases.
> 
> I thought it might be nice to try and mimic that design, by perhaps using
> the names of QMP events as their own 'queues', and then dispatching user
> callbacks as desired. (Possibly with one mega-queue that exists for ALL
> callbacks.)
> 
> For instance, something like this:
> 
> @qmp.event
> async def job_status_block_job_ready(qmp, event):
>     ...
> 
> or more generally,
> 
> @qmp.event_handler
> async def my_own_event_handler(qmp, event):
>     ...
> 
> I didn't spend much time on the actual queue or dispatch mechanism in my
> draft, though, but it could be "bolstered" into a more full-fledged API if
> desired.
> 
> One nice thing about this design is that events aren't "consumed" by a
> caller, they are just dispatched to anyone waiting on an event of that type.
> 
> As I recall, events getting "eaten" at the wrong time was a major burden
> when writing iotests that exercised multiple jobs, transactions, etc.
> 
> (A side note: a higher-level VM class that uses QMP may wish to capture
> certain events to record state changes, such that the state can be queried
> at an arbitrary point by any number of callers without needing to have
> witnessed the state change event personally. That's not really important
> here in the protocol library, though, which will pretend not to know which
> events exist -- but it's a consideration for making sure the design that IS
> chosen can be extensible to support that kind of thing.)
> 
> 
> (2) On get_event or async iterators:
> 
> This is likely a good ad-hoc feature. Should it only work for events that
> are delivered from that moment in time, or should there be a "backlog" of
> events to deliver?
> 
> Should waiting on events in this manner "consume" the event from the
> backlog, if we have one?
> 
> My concern::
> 
>   await qmp.execute('blockdev-backup', {...etc...})
>   async for event in qmp.get_events():
>       ...
> 
> 
> It's possible that an event we'd like to see has already occurred by the
> time we get around to invoking the async iterator. You'd really want to
> start checking for events *before* you issue the job request, but that
> involves tasks, and the code doesn't "flow" well anymore.
> 
> I don't have ideas, at-present, for how to make things like iotests "flow"
> well in a linear co-routine sense...
> 
> ...although, maybe it's worth creating something like an Event Listener
> object that, from its creation, stashes events from that point forward. How
> about::
> 
>   async with qmp.event_listener() as events:
>       await qmp.execute('blockdev-backup', {...})
>       async for event in events:
>           ...
> 
> Actually, that seems pretty cool. What do you think? I think it's fairly
> elegant for ad-hoc use. We could even extend the constructor to accept
> filtering criteria if we wanted to, later.

Yeah, it seems very nice for allowing multiple event listeners that
don't steal each other's events. I like it.

qmp.event_listener() could take a sequence of QMP event names to trigger
on. If the sequence is empty then all QMP events will be reported.

> 
> Possibly we could also augment the Event Listener object to support a few
> methods to facilitate blocking until a certain event occurs, like::
> 
>   async with qmp.event_listener() as events:
>       await qmp.execute('blockdev-backup', {...})
>       await events.event('JOB_STATUS_CHANGE', status="pending")
>       await qmp.execute('job-finalize', {...})
>       ...
> 
> 
> I think that's pretty workable, actually! And it could co-exist perfectly
> well alongside event callback handlers.

Callbacks and async iterators are equivalent since a callback is
basically a Task with an event_listener() loop that invokes the callback
function. If the boilerplate for setting that up is minimal then there
might be no need to provide both interfaces.

> Pydantic models are definitely optional at this stage, but I am floating
> them here to prepare people for the idea that I might try to get more
> mileage out of them in the future to offer a type-safe, QAPI-aware SDK
> layer.
> 
> They're definitely only a mild benefit here, for now, as the strict typing
> they help provide is not floated upwards or exposed to the user.

Yes, I can see the benefits for programs that need a lot of data
validation and have complex schemas.

Since this library is oblivious to the QMP schema it's probably not
needed.

An example of why I suggested dropping pydantic: I was trying to figure
out what happens if QMP is extended with new response fields. Will
pydantic raise an exception when it encounters an unexpected field? It's
not obvious from the code so I needed to go study pydantic to find the
answer.

> (4) On combining protocol and qmp_protocol:
> 
> Maybe. Do you want to look at the qtest implementation? It's somewhat
> ancillary to this project, but felt it would make a nice companion library.
> It doesn't benefit as strongly as QMP (As it does not offer anything like
> OOB), but it does have async messages it can send, so it can re-use the same
> infrastructure.
> 
> (Fully admit that the first draft, of course, did feature a combined
> protocol/qmp_protocol class. It was split out later.)

Sure, it would be interesting to see the qtest code.

> > Things that might be worth adding:
> > 1. File descriptor passing support.
> 
> Do you have an example workflow that I can use to test this? This is a weak
> spot in my knowledge.

The add-fd QMP command. I guess this patch series cannot execute that
command successfully since it doesn't support fd passing.

It should be easy to do:

  qmp.execute('add-fd', pass_fds=[fobj])

where pass_fds is an optional sequence of file descriptors. The file
descriptors can either be int or file-like objects that support the
fileno() method.

I'm not sure if QMP commands also send file descriptors back to the
client in responses.

> > 2. Introspection support to easily check if a command/feature is
> >     available. Users can do this manually by sending QMP commands and
> >     interpreting the response, but this may be common enough to warrant a
> >     friendly API.
> > 
> 
> I think this treads into QAPI-specific domain knowledge, and I might leave
> such features to a higher-level object.
> 
> The QMP spec itself does not define a mechanism by which the QMP protocol
> itself will reveal the valid commands, and so it might be up to a
> machine.py-based extension/capsulation of qmp_protocol to provide that.
> 
> (Though, I do agree; I want this feature somewhere. We do have such a thing
> coded into the existing qmp-shell tool, using the query-commands command.
> Maybe I can offer a subclass that offers some of these convenience features
> using a best-effort guess-and-check style introspection. Please forgive me
> if I focus on shoring up the design of the core implementation first.)

Okay.

> In general, do you feel this design is roughly serviceable and worth
> pursuing cleanups for? I realize it's a bit "much" but as the audience
> extends beyond our castle walls, I wanted to be quite thorough. It's a

I see the complexity mostly as accidental complexity, not essential
complexity. IMO it's not that the current approach is overkill now but
could be necessary later. I think it will always be unnecessarily
complex because there are simpler ways to do it :D.

Stefan

[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 488 bytes --]

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH RFC 0/7] RFC: Asynchronous QMP Draft
  2021-04-15  9:52     ` Stefan Hajnoczi
@ 2021-04-20  2:26       ` John Snow
  2021-04-20  2:47         ` John Snow
  0 siblings, 1 reply; 21+ messages in thread
From: John Snow @ 2021-04-20  2:26 UTC (permalink / raw)
  To: Stefan Hajnoczi; +Cc: armbru, crosa, qemu-devel, ehabkost

On 4/15/21 5:52 AM, Stefan Hajnoczi wrote:
> Yeah, it seems very nice for allowing multiple event listeners that
> don't steal each other's events. I like it.
> 
> qmp.event_listener() could take a sequence of QMP event names to trigger
> on. If the sequence is empty then all QMP events will be reported.

I made something like this:


# Example 1
with qmp.listener('STOP') as listener:
     await qmp.execute('stop')
     await listener.get()


# Example 2
with qmp.listener('JOB_STATUS_CHANGE') as listener:
     await qmp.execute('blockdev-create', ...)
     async for event in listener:
         if event['data']['status'] == 'concluded':
             break
     await qmp.execute('job-dismiss', ...)


# Example 3 - all events
with qmp.listener() as events:
     async for event in events:
         print(f"got '{event['event']}' event!")


# Example 4 - several events on one listener
job_events = (
     'JOB_STATUS_CHANGE', 'BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED',
     'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY', 'BLOCK_JOB_PENDING'
)
with qmp.listener(job_events) as events:
     ...


There is a *post-filtering* syntax available to EventListener.get(). It 
will filter events out using a very simplistic syntax.


# Example 5 -- a short-hand form of Example 2.
with qmp.listener('JOB_STATUS_CHANGE') as job_events:
     await qmp.execute('blockdev-create', ...)
     await job_events.get(status='concluded')
     await qmp.execute('job-dismiss', ...)



A shortcoming with this interface is that it's easy to create a listener 
that hears multiple events, but it's not easy to create *several 
listeners*. I am not sure what syntax will be the nicest for this, but I 
tried by allowing the manual creation of listeners:


# Example 6
listener1 = EventListener('JOB_STATUS_CHANGE')
listener2 = EventListener(job_events)

# Note the use of listen() instead of listener()
with qmp.listen(listener1, listener2) as (ev1, ev2):
     # listeners are now active.
     ...
# listeners are now inactive.
# The context manager clears any stale events in the listener(s).


I thought this might be nicer than trying to extend the listener syntax:

with qmp.listeners(
     'JOB_STATUS_CHANGE',
     (job_events)
) as (
     listener1,
     listener2,
):
     ...

especially because it might get confusing when trying to separate "one 
listener with multiple events" vs "several listeners with one event 
each, and it makes things a little ambiguous:

with qmp.listeners('STOP') as (stop_events,):
     ...

And this isn't any prettier, and also likely to confuse:

with qmp.listeners('STOP', 'RESUME') as (stops, resumes):
     ...

because it's only so very subtly different from this:

with qmp.listeners(('STOP', 'RESUME')) as (runstate_events,):
     ...

This also doesn't begin to address one of the worst headaches of writing 
iotests where transactions are involved: accidentally eating events 
meant for other jobs.

I prototyped something where it's possible to create an EventListener 
with an optional pre-filter, but it's a little bulky:


# Example 7
listener = EventListener('JOB_STATUS_CHANGE',
                          lambda e: e['data']['id'] == 'job0')

with qmp.listen(listener):
     await qmp.execute('blockdev-create', arguments={'job-id': 'job0'})
     await listener.get(status='created')
     ...


Some thoughts on this:
- Pre-filters are powerful, but involve a lot of boilerplate.
- Accepting two kinds of parameters, name(s) and filter both, makes it 
even trickier to write concise context blocks; especially with multiple 
jobs.


Here's a final example of something you may very well want to do in 
iotest code:


# Example 8

def job_filter(job_id: str) -> EventFilter:
     def filter(event: Message) -> bool:
         return event.get('data', {}).get('id') == job_id
     return filter

listener1 = EventListener('JOB_STATUS_CHANGE', job_filter('job0'))
listener2 = EventListener('JOB_STATUS_CHANGE', job_filter('job1'))

with qmp.listen(listener1, listener2) as (job0, job1):
     await asyncio.gather(
         qmp.execute('blockdev-create', arguments={'job-id': 'job0'}),
         qmp.execute('blockdev-create', arguments={'job-id': 'job1'}),
         job0.get(status='concluded'),
         job1.get(status='concluded')
     )

(Note: gather isn't required here. You could write the execute and get 
statements individually and in whichever order you wanted, as long as 
the execute statement for a given job appears prior to the corresponding 
wait!)

The difficulty I have here is extending that backwards to the "create 
listener on the fly" syntax, for the reasons stated above with making it 
ambiguous as to whether we're creating one or two listeners, etc. Trying 
to minimize boilerplate while leaving the interfaces generic and 
powerful is tough.

I'm still playing around with different options and solutions, but your 
feedback/input is welcome.

--js



^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH RFC 0/7] RFC: Asynchronous QMP Draft
  2021-04-20  2:26       ` John Snow
@ 2021-04-20  2:47         ` John Snow
  0 siblings, 0 replies; 21+ messages in thread
From: John Snow @ 2021-04-20  2:47 UTC (permalink / raw)
  To: Stefan Hajnoczi; +Cc: armbru, crosa, qemu-devel, ehabkost

On 4/19/21 10:26 PM, John Snow wrote:
> On 4/15/21 5:52 AM, Stefan Hajnoczi wrote:
>> Yeah, it seems very nice for allowing multiple event listeners that
>> don't steal each other's events. I like it.
>>
>> qmp.event_listener() could take a sequence of QMP event names to trigger
>> on. If the sequence is empty then all QMP events will be reported.
> 
> I made something like this:
> 
> 
> # Example 1
> with qmp.listener('STOP') as listener:
>      await qmp.execute('stop')
>      await listener.get()
> 
> 
> # Example 2
> with qmp.listener('JOB_STATUS_CHANGE') as listener:
>      await qmp.execute('blockdev-create', ...)
>      async for event in listener:
>          if event['data']['status'] == 'concluded':
>              break
>      await qmp.execute('job-dismiss', ...)
> 
> 
> # Example 3 - all events
> with qmp.listener() as events:
>      async for event in events:
>          print(f"got '{event['event']}' event!")
> 
> 
> # Example 4 - several events on one listener
> job_events = (
>      'JOB_STATUS_CHANGE', 'BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED',
>      'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY', 'BLOCK_JOB_PENDING'
> )
> with qmp.listener(job_events) as events:
>      ...
> 
> 
> There is a *post-filtering* syntax available to EventListener.get(). It 
> will filter events out using a very simplistic syntax.
> 
> 
> # Example 5 -- a short-hand form of Example 2.
> with qmp.listener('JOB_STATUS_CHANGE') as job_events:
>      await qmp.execute('blockdev-create', ...)
>      await job_events.get(status='concluded')
>      await qmp.execute('job-dismiss', ...)
> 
> 
> 
> A shortcoming with this interface is that it's easy to create a listener 
> that hears multiple events, but it's not easy to create *several 
> listeners*. I am not sure what syntax will be the nicest for this, but I 
> tried by allowing the manual creation of listeners:
> 
> 
> # Example 6
> listener1 = EventListener('JOB_STATUS_CHANGE')
> listener2 = EventListener(job_events)
> 
> # Note the use of listen() instead of listener()
> with qmp.listen(listener1, listener2) as (ev1, ev2):
>      # listeners are now active.
>      ...
> # listeners are now inactive.
> # The context manager clears any stale events in the listener(s).
> 
> 
> I thought this might be nicer than trying to extend the listener syntax:
> 
> with qmp.listeners(
>      'JOB_STATUS_CHANGE',
>      (job_events)
> ) as (
>      listener1,
>      listener2,
> ):
>      ...
> 
> especially because it might get confusing when trying to separate "one 
> listener with multiple events" vs "several listeners with one event 
> each, and it makes things a little ambiguous:
> 
> with qmp.listeners('STOP') as (stop_events,):
>      ...
> 
> And this isn't any prettier, and also likely to confuse:
> 
> with qmp.listeners('STOP', 'RESUME') as (stops, resumes):
>      ...
> 
> because it's only so very subtly different from this:
> 
> with qmp.listeners(('STOP', 'RESUME')) as (runstate_events,):
>      ...
> 
> This also doesn't begin to address one of the worst headaches of writing 
> iotests where transactions are involved: accidentally eating events 
> meant for other jobs.
> 
> I prototyped something where it's possible to create an EventListener 
> with an optional pre-filter, but it's a little bulky:
> 
> 
> # Example 7
> listener = EventListener('JOB_STATUS_CHANGE',
>                           lambda e: e['data']['id'] == 'job0')
> 
> with qmp.listen(listener):
>      await qmp.execute('blockdev-create', arguments={'job-id': 'job0'})
>      await listener.get(status='created')
>      ...
> 
> 
> Some thoughts on this:
> - Pre-filters are powerful, but involve a lot of boilerplate.
> - Accepting two kinds of parameters, name(s) and filter both, makes it 
> even trickier to write concise context blocks; especially with multiple 
> jobs.
> 
> 
> Here's a final example of something you may very well want to do in 
> iotest code:
> 
> 
> # Example 8
> 
> def job_filter(job_id: str) -> EventFilter:
>      def filter(event: Message) -> bool:
>          return event.get('data', {}).get('id') == job_id
>      return filter
> 
> listener1 = EventListener('JOB_STATUS_CHANGE', job_filter('job0'))
> listener2 = EventListener('JOB_STATUS_CHANGE', job_filter('job1'))
> 
> with qmp.listen(listener1, listener2) as (job0, job1):
>      await asyncio.gather(
>          qmp.execute('blockdev-create', arguments={'job-id': 'job0'}),
>          qmp.execute('blockdev-create', arguments={'job-id': 'job1'}),
>          job0.get(status='concluded'),
>          job1.get(status='concluded')
>      )
> 
> (Note: gather isn't required here. You could write the execute and get 
> statements individually and in whichever order you wanted, as long as 
> the execute statement for a given job appears prior to the corresponding 
> wait!)
> 
> The difficulty I have here is extending that backwards to the "create 
> listener on the fly" syntax, for the reasons stated above with making it 
> ambiguous as to whether we're creating one or two listeners, etc. Trying 
> to minimize boilerplate while leaving the interfaces generic and 
> powerful is tough.
> 
> I'm still playing around with different options and solutions, but your 
> feedback/input is welcome.
> 
> --js


Oh, though of course, the moment I sent this, I realized there is 
actually a somewhat nicer way to do this in non-test code that doesn't 
care about ordering, but still wouldn't work for QMP transactions; but 
it's nice to look at:

# Example 9 -- Multiple jobs without a transaction:

async def blockdev_create(qmp, job_id: str, options: Dict[str, Any]):
     with qmp.listener('JOB_STATUS_CHANGE') as listener:
         await qmp.execute('blockdev-create', arguments={
             'job-id': job_id,
             'options': options,
         })
         await listener.get(id=job_id, status='concluded')
         await qmp.execute('job-dismiss', arguments={'id': job_id})
         await listener.get(id=job_id, status='null')

await asyncio.gather(
     blockdev_create(qmp, 'job2', {...}),
     blockdev_create(qmp, 'job3', {...}),
)

It won't work for transactions because we spawn multiple IDs with a 
single command in a single context. You could remedy it by creating 
multiple listeners and just being very careful to always use just one 
per each job, but that's likely prone to failure and hard to catch on 
reviews, etc.

--js



^ permalink raw reply	[flat|nested] 21+ messages in thread

end of thread, other threads:[~2021-04-20  2:48 UTC | newest]

Thread overview: 21+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-04-13 15:55 [PATCH RFC 0/7] RFC: Asynchronous QMP Draft John Snow
2021-04-13 15:55 ` [PATCH RFC 1/7] util: asyncio-related helpers John Snow
2021-04-13 15:55 ` [PATCH RFC 2/7] error: Error classes and so on John Snow
2021-04-13 15:55 ` [PATCH RFC 3/7] protocol: generic async message-based protocol loop John Snow
2021-04-13 20:00   ` Stefan Hajnoczi
2021-04-14 17:29     ` John Snow
2021-04-15  9:14       ` Stefan Hajnoczi
2021-04-13 15:55 ` [PATCH RFC 4/7] message: add QMP Message type John Snow
2021-04-13 20:07   ` Stefan Hajnoczi
2021-04-14 17:39     ` John Snow
2021-04-13 15:55 ` [PATCH RFC 5/7] models: Add well-known QMP objects John Snow
2021-04-13 15:55 ` [PATCH RFC 6/7] qmp_protocol: add QMP client implementation John Snow
2021-04-14  5:44   ` Stefan Hajnoczi
2021-04-14 17:50     ` John Snow
2021-04-15  9:23       ` Stefan Hajnoczi
2021-04-13 15:55 ` [PATCH RFC 7/7] linter config John Snow
2021-04-14  6:38 ` [PATCH RFC 0/7] RFC: Asynchronous QMP Draft Stefan Hajnoczi
2021-04-14 19:17   ` John Snow
2021-04-15  9:52     ` Stefan Hajnoczi
2021-04-20  2:26       ` John Snow
2021-04-20  2:47         ` John Snow

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).