All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH v2 00/24] python: introduce Asynchronous QMP package
@ 2021-07-17  0:32 John Snow
  2021-07-17  0:32 ` [PATCH v2 01/24] python/aqmp: add asynchronous QMP (AQMP) subpackage John Snow
                   ` (24 more replies)
  0 siblings, 25 replies; 36+ messages in thread
From: John Snow @ 2021-07-17  0:32 UTC (permalink / raw)
  To: qemu-devel
  Cc: Eduardo Habkost, Eric Blake, Stefan Hajnoczi, Markus Armbruster,
	Wainer dos Santos Moschetta, Niteesh G . S .,
	Willian Rampazzo, Cleber Rosa, John Snow

GitLab: https://gitlab.com/jsnow/qemu/-/commits/python-async-qmp-aqmp
CI: https://gitlab.com/jsnow/qemu/-/pipelines/338508045
Docs: https://people.redhat.com/~jsnow/sphinx/html/qemu.aqmp.html

Hi!

This patch series adds an Asynchronous QMP package to the Python
library. It offers a few improvements over the previous library:

- out-of-band support
- true asynchronous event support
- avoids undocumented interfaces abusing non-blocking sockets
- unit tests!
- documentation!

This library serves as the basis for a new qmp-shell program that will
offer improved reconnection support, true asynchronous display of
events, VM and job status update notifiers, and so on.

My intent is to eventually publish this library directly to PyPI as a
standalone package. I would like to phase out our usage of the old QMP
library over time; eventually replacing it entirely with this one.

This series looks big by line count, but it's *mostly*
docstrings. Seriously!

This package has *no* external dependencies whatsoever.

Notes & Design
==============

Here are some notes on the design of how the library works, to serve as
a primer for review; however I also **highly recommend** browsing the
generated Sphinx documentation for this series.

Here's that link again:
https://people.redhat.com/~jsnow/sphinx/html/qemu.aqmp.html

The core machinery is split between the AsyncProtocol and QMP
classes. AsyncProtocol provides the generic machinery, while QMP
provides the QMP-specific details.

The design uses two independent coroutines that act as the "bottom
half", a writer task and a reader task. These tasks run for the duration
of the connection and independently send and receive messages,
respectively.

A third task, disconnect, is scheduled asynchronously whenever an
unrecoverable error occurs and facilitates coalescing of the other two
tasks.

This diagram for how execute() operates may be helpful for understanding
how AsyncProtocol is laid out. The arrows indicate the direction of a
QMP message; the long horizontal dash indicates the separation between
the upper and lower half of the event loop. The queue mechanisms between
both dashes serve as the intermediaries between the upper and lower
half.

                       +---------+
                       | caller  |
                       +---------+
                           ^ |
                           | v
                       +---------+
     +---------------> |execute()| -----------+
     |                 +---------+            |
     |                                        |
[-----------------------------------------------------------]
     |                                        |
     |                                        v
+----+------+    +----------------+    +------+-------+
| ExecQueue |    | EventListeners |    |Outbound Queue|
+----+------+    +----+-----------+    +------+-------+
     ^                ^                       |
     |                |                       |
[-----------------------------------------------------------]
     |                |                       |
     |                |                       v
  +--+----------------+---+       +-----------+-----------+
  | Reader Task/Coroutine |       | Writer Task/Coroutine |
  +-----------+-----------+       +-----------+-----------+
              ^                               |
              |                               v
        +-----+------+                  +-----+------+
        |StreamReader|                  |StreamWriter|
        +------------+                  +------------+

The caller will invoke execute(), which in turn will deposit a message
in the outbound send queue. This will wake up the writer task, which
well send the message over the wire.

The execute() method will then yield to wait for a reply delivered to an
execution queue created solely for that execute statement.

When a message arrives, the Reader task will unblock and route the
message either to the EventListener subsystem, or place it in the
appropriate pending execution queue.

Once a message is placed in the pending execution queue, execute() will
unblock and the execution will conclude, returning the result of the RPC
call to the caller.

Patch Layout
============

Patches 1-4   add tiny pre-requisites, utilities, etc.
Patches 5-12  add a generic async message-based protocol class,
              AsyncProtocol. They are split fairly small and should
              be reasonably self-contained.
Patches 13-15 check in more QMP-centric components.
Patches 16-21 add qmp_client.py, with a new 'QMPClient()' class.
              They're split into reasonably tiny pieces here.
Patches 22-23 add a few finishing touches, they are small patches.
Patch 24      adds unit tests. They're maybe a little messy yet, but
              they've been quite helpful to me so far. Coverage of
              protocol.py is at about 86%.

Future Work
===========

These items are in progress:

- A synchronous QMP wrapper that allows this library to be easily used
  from non-async code; this will also allow me to prove it works well by
  demoing its replacement throughout iotests. I have all of iotests
  passing locally, but I am still seeing some failures on gitlab CI I
  need to diagnose, possibly a race condition somewhere.

- A QMP server class; to facilitate writing of unit tests. It's done,
  but needs some polish and tests.

- More unit tests for qmp_client.py, qmp_server.py and other modules.

Changelog
=========

V2:

Renamed classes/methods:

- Renamed qmp_protocol.py to qmp_client.py
- Renamed 'QMP' class to 'QMPClient'
- Renamed _begin_new_session() to _establish_session()
- Split _establish_connection() out from _new_session().
- Removed _results() method

Bugfixes:

- Suppress duplicate Exceptions when attempting to drain the
  StreamWriter
- Delay initialization of asyncio.Queue and asyncio.Event variables to
  _new_session or later -- they must not be created outside of the loop,
  even if they are not async functions.
- Rework runstate_changed events to guarantee visibility of events to
  waiters
- Improve connect()/accept() cleanup to work with
  asyncio.CancelledError, asyncio.TimeoutError
- No-argument form of Message() now succeeds properly.
- flush utility will correctly yield when data is below the "high water
  mark", giving the stream a chance to actually flush.
- Increase read buffer size to accommodate query-qmp-schema (Thanks
  Niteesh)

Ugly bits from V1 removed:

- Remove tertiary filtering from EventListener (for now), accompanying
  documentation removed from events.py
- Use asyncio.wait() instead of custom wait_task_done()
- MultiException is removed in favor of just raising the first Exception
  that occurs in the bottom half; other Exceptions if any are logged
  instead.

Improvements:

- QMPClient now allows ID-less execution statements via the _raw()
  interface.
- Add tests that grant ~86% coverage of protocol.py to the avocado test
  suite.
- Removed 'force' parameter from _bh_disconnect; the disconnection
  routine determines for itself if we are in the error pathway or not
  instead now.  This removes any chance of duplicate calls to
  _schedule_disconnect accidentally dropping the 'force' setting.

Debugging/Testing changes:

- Add debug: bool parameter to asyncio_run utility wrapper
- Improve error messages for '@require' decorator
- Add debugging message for state change events
- Avoid flushing the StreamWriter if we don't have one (This
  circumstance only arises in testing, but it's helpful.)
- Improved __repr__ method for AsyncProtocol, and removed __str__
  method.  enforcing eval(__repr__(x)) == x does not make sense for
  AsyncProtocol.
- Misc logging message changes
- Add a suite of fancy Task debugging utilities.
- Most tracebacks now log at the DEBUG level instead of
  CRITICAL/ERROR/WARNING; In those error cases, a one-line summary is
  logged instead.

Misc. aesthetic changes:

- Misc docstring fixes, whitespace, etc.
- Reordered the definition of some methods to try and keep similar
  methods near each other (Moved _cleanup near _bh_disconnect in
  QMPClient.)

~ Shucks Howdy, Gee Golly!

John Snow (24):
  python/aqmp: add asynchronous QMP (AQMP) subpackage
  python/aqmp: add error classes
  python/pylint: Add exception for TypeVar names ('T')
  python/aqmp: add asyncio compatibility wrappers
  python/aqmp: add generic async message-based protocol support
  python/aqmp: add runstate state machine to AsyncProtocol
  python/aqmp: Add logging utility helpers
  python/aqmp: add logging to AsyncProtocol
  python/aqmp: add AsyncProtocol.accept() method
  python/aqmp: add configurable read buffer limit
  python/aqmp: add _cb_inbound and _cb_inbound logging hooks
  python/aqmp: add AsyncProtocol._readline() method
  python/aqmp: add QMP Message format
  python/aqmp: add well-known QMP object models
  python/aqmp: add QMP event support
  python/pylint: disable too-many-function-args
  python/aqmp: add QMP protocol support
  python/pylint: disable no-member check
  python/aqmp: Add message routing to QMP protocol
  python/aqmp: add execute() interfaces
  python/aqmp: add _raw() execution interface
  python/aqmp: add asyncio_run compatibility wrapper
  python/aqmp: add scary message
  python/aqmp: add AsyncProtocol unit tests

 python/qemu/aqmp/__init__.py   |  58 +++
 python/qemu/aqmp/error.py      |  50 ++
 python/qemu/aqmp/events.py     | 706 ++++++++++++++++++++++++++
 python/qemu/aqmp/message.py    | 209 ++++++++
 python/qemu/aqmp/models.py     | 133 +++++
 python/qemu/aqmp/protocol.py   | 882 +++++++++++++++++++++++++++++++++
 python/qemu/aqmp/py.typed      |   0
 python/qemu/aqmp/qmp_client.py | 621 +++++++++++++++++++++++
 python/qemu/aqmp/util.py       | 207 ++++++++
 python/setup.cfg               |   5 +-
 python/tests/null_proto.py     |  67 +++
 python/tests/protocol.py       | 458 +++++++++++++++++
 12 files changed, 3395 insertions(+), 1 deletion(-)
 create mode 100644 python/qemu/aqmp/__init__.py
 create mode 100644 python/qemu/aqmp/error.py
 create mode 100644 python/qemu/aqmp/events.py
 create mode 100644 python/qemu/aqmp/message.py
 create mode 100644 python/qemu/aqmp/models.py
 create mode 100644 python/qemu/aqmp/protocol.py
 create mode 100644 python/qemu/aqmp/py.typed
 create mode 100644 python/qemu/aqmp/qmp_client.py
 create mode 100644 python/qemu/aqmp/util.py
 create mode 100644 python/tests/null_proto.py
 create mode 100644 python/tests/protocol.py

-- 
2.31.1




^ permalink raw reply	[flat|nested] 36+ messages in thread

* [PATCH v2 01/24] python/aqmp: add asynchronous QMP (AQMP) subpackage
  2021-07-17  0:32 [PATCH v2 00/24] python: introduce Asynchronous QMP package John Snow
@ 2021-07-17  0:32 ` John Snow
  2021-07-17  0:32 ` [PATCH v2 02/24] python/aqmp: add error classes John Snow
                   ` (23 subsequent siblings)
  24 siblings, 0 replies; 36+ messages in thread
From: John Snow @ 2021-07-17  0:32 UTC (permalink / raw)
  To: qemu-devel
  Cc: Eduardo Habkost, Eric Blake, Stefan Hajnoczi, Markus Armbruster,
	Wainer dos Santos Moschetta, Niteesh G . S .,
	Willian Rampazzo, Cleber Rosa, John Snow

For now, it's empty! Soon, it won't be.

Signed-off-by: John Snow <jsnow@redhat.com>
---
 python/qemu/aqmp/__init__.py | 27 +++++++++++++++++++++++++++
 python/qemu/aqmp/py.typed    |  0
 python/setup.cfg             |  1 +
 3 files changed, 28 insertions(+)
 create mode 100644 python/qemu/aqmp/__init__.py
 create mode 100644 python/qemu/aqmp/py.typed

diff --git a/python/qemu/aqmp/__init__.py b/python/qemu/aqmp/__init__.py
new file mode 100644
index 00000000000..391141c9484
--- /dev/null
+++ b/python/qemu/aqmp/__init__.py
@@ -0,0 +1,27 @@
+"""
+QEMU Monitor Protocol (QMP) development library & tooling.
+
+This package provides a fairly low-level class for communicating
+asynchronously with QMP protocol servers, as implemented by QEMU, the
+QEMU Guest Agent, and the QEMU Storage Daemon.
+
+`QMPClient` provides the main functionality of this package. All errors
+raised by this library dervive from `AQMPError`, see `aqmp.error` for
+additional detail. See `aqmp.events` for an in-depth tutorial on
+managing QMP events.
+"""
+
+# Copyright (C) 2020, 2021 John Snow for Red Hat, Inc.
+#
+# Authors:
+#  John Snow <jsnow@redhat.com>
+#
+# Based on earlier work by Luiz Capitulino <lcapitulino@redhat.com>.
+#
+# This work is licensed under the terms of the GNU GPL, version 2.  See
+# the COPYING file in the top-level directory.
+
+
+# The order of these fields impact the Sphinx documentation order.
+__all__ = (
+)
diff --git a/python/qemu/aqmp/py.typed b/python/qemu/aqmp/py.typed
new file mode 100644
index 00000000000..e69de29bb2d
diff --git a/python/setup.cfg b/python/setup.cfg
index 14bab902883..ffb754fa9e5 100644
--- a/python/setup.cfg
+++ b/python/setup.cfg
@@ -27,6 +27,7 @@ packages =
     qemu.qmp
     qemu.machine
     qemu.utils
+    qemu.aqmp
 
 [options.package_data]
 * = py.typed
-- 
2.31.1



^ permalink raw reply related	[flat|nested] 36+ messages in thread

* [PATCH v2 02/24] python/aqmp: add error classes
  2021-07-17  0:32 [PATCH v2 00/24] python: introduce Asynchronous QMP package John Snow
  2021-07-17  0:32 ` [PATCH v2 01/24] python/aqmp: add asynchronous QMP (AQMP) subpackage John Snow
@ 2021-07-17  0:32 ` John Snow
  2021-08-03 16:01   ` Eric Blake
  2021-07-17  0:32 ` [PATCH v2 03/24] python/pylint: Add exception for TypeVar names ('T') John Snow
                   ` (22 subsequent siblings)
  24 siblings, 1 reply; 36+ messages in thread
From: John Snow @ 2021-07-17  0:32 UTC (permalink / raw)
  To: qemu-devel
  Cc: Eduardo Habkost, Eric Blake, Stefan Hajnoczi, Markus Armbruster,
	Wainer dos Santos Moschetta, Niteesh G . S .,
	Willian Rampazzo, Cleber Rosa, John Snow

Signed-off-by: John Snow <jsnow@redhat.com>
---
 python/qemu/aqmp/__init__.py |  4 +++
 python/qemu/aqmp/error.py    | 50 ++++++++++++++++++++++++++++++++++++
 2 files changed, 54 insertions(+)
 create mode 100644 python/qemu/aqmp/error.py

diff --git a/python/qemu/aqmp/__init__.py b/python/qemu/aqmp/__init__.py
index 391141c9484..c97be950bf4 100644
--- a/python/qemu/aqmp/__init__.py
+++ b/python/qemu/aqmp/__init__.py
@@ -21,7 +21,11 @@
 # This work is licensed under the terms of the GNU GPL, version 2.  See
 # the COPYING file in the top-level directory.
 
+from .error import AQMPError
+
 
 # The order of these fields impact the Sphinx documentation order.
 __all__ = (
+    # Exceptions
+    'AQMPError',
 )
diff --git a/python/qemu/aqmp/error.py b/python/qemu/aqmp/error.py
new file mode 100644
index 00000000000..5bdfdbfbda4
--- /dev/null
+++ b/python/qemu/aqmp/error.py
@@ -0,0 +1,50 @@
+"""
+AQMP Error Classes
+
+This package seeks to provide semantic error classes that are intended
+to be used directly by clients when they would like to handle particular
+semantic failures (e.g. "failed to connect") without needing to know the
+enumeration of possible reasons for that failure.
+
+AQMPError serves as the ancestor for all exceptions raised by this
+package, and is suitable for use in handling semantic errors from this
+library. In most cases, individual public methods will attempt to catch
+and re-encapsulate various exceptions to provide a semantic
+error-handling interface.
+
+.. admonition:: AQMP Exception Hierarchy Reference
+
+ |   `Exception`
+ |    +-- `AQMPError`
+ |         +-- `ConnectError`
+ |         +-- `StateError`
+ |         +-- `ExecInterruptedError`
+ |         +-- `ExecuteError`
+ |         +-- `ListenerError`
+ |         +-- `ProtocolError`
+ |              +-- `DeserializationError`
+ |              +-- `UnexpectedTypeError`
+ |              +-- `ServerParseError`
+ |              +-- `BadReplyError`
+ |              +-- `GreetingError`
+ |              +-- `NegotiationError`
+"""
+
+
+class AQMPError(Exception):
+    """Abstract error class for all errors originating from this package."""
+
+
+class ProtocolError(AQMPError):
+    """
+    Abstract error class for protocol failures.
+
+    Semantically, these errors are generally the fault of either the
+    protocol server or as a result of a bug in this this library.
+
+    :param error_message: Human-readable string describing the error.
+    """
+    def __init__(self, error_message: str):
+        super().__init__(error_message)
+        #: Human-readable error message, without any prefix.
+        self.error_message: str = error_message
-- 
2.31.1



^ permalink raw reply related	[flat|nested] 36+ messages in thread

* [PATCH v2 03/24] python/pylint: Add exception for TypeVar names ('T')
  2021-07-17  0:32 [PATCH v2 00/24] python: introduce Asynchronous QMP package John Snow
  2021-07-17  0:32 ` [PATCH v2 01/24] python/aqmp: add asynchronous QMP (AQMP) subpackage John Snow
  2021-07-17  0:32 ` [PATCH v2 02/24] python/aqmp: add error classes John Snow
@ 2021-07-17  0:32 ` John Snow
  2021-07-17  0:32 ` [PATCH v2 04/24] python/aqmp: add asyncio compatibility wrappers John Snow
                   ` (21 subsequent siblings)
  24 siblings, 0 replies; 36+ messages in thread
From: John Snow @ 2021-07-17  0:32 UTC (permalink / raw)
  To: qemu-devel
  Cc: Eduardo Habkost, Eric Blake, Stefan Hajnoczi, Markus Armbruster,
	Wainer dos Santos Moschetta, Niteesh G . S .,
	Willian Rampazzo, Cleber Rosa, John Snow

'T' is a common TypeVar name, allow its use.

See also https://github.com/PyCQA/pylint/issues/3401 -- In the future,
we might be able to have a separate list of acceptable names for
TypeVars exclusively.

Signed-off-by: John Snow <jsnow@redhat.com>
---
 python/setup.cfg | 1 +
 1 file changed, 1 insertion(+)

diff --git a/python/setup.cfg b/python/setup.cfg
index ffb754fa9e5..f87e32177ab 100644
--- a/python/setup.cfg
+++ b/python/setup.cfg
@@ -101,6 +101,7 @@ good-names=i,
            fh,  # fh = open(...)
            fd,  # fd = os.open(...)
            c,   # for c in string: ...
+           T,   # for TypeVars. See pylint#3401
 
 [pylint.similarities]
 # Ignore imports when computing similarities.
-- 
2.31.1



^ permalink raw reply related	[flat|nested] 36+ messages in thread

* [PATCH v2 04/24] python/aqmp: add asyncio compatibility wrappers
  2021-07-17  0:32 [PATCH v2 00/24] python: introduce Asynchronous QMP package John Snow
                   ` (2 preceding siblings ...)
  2021-07-17  0:32 ` [PATCH v2 03/24] python/pylint: Add exception for TypeVar names ('T') John Snow
@ 2021-07-17  0:32 ` John Snow
  2021-07-17  0:32 ` [PATCH v2 05/24] python/aqmp: add generic async message-based protocol support John Snow
                   ` (20 subsequent siblings)
  24 siblings, 0 replies; 36+ messages in thread
From: John Snow @ 2021-07-17  0:32 UTC (permalink / raw)
  To: qemu-devel
  Cc: Eduardo Habkost, Eric Blake, Stefan Hajnoczi, Markus Armbruster,
	Wainer dos Santos Moschetta, Niteesh G . S .,
	Willian Rampazzo, Cleber Rosa, John Snow

Python 3.6 does not have all of the goodies that Python 3.7 does, and I
need to support both. Add some compatibility wrappers needed for this
purpose.

(Note: Python 3.6 is EOL December 2021.)

Signed-off-by: John Snow <jsnow@redhat.com>
---
 python/qemu/aqmp/util.py | 106 +++++++++++++++++++++++++++++++++++++++
 1 file changed, 106 insertions(+)
 create mode 100644 python/qemu/aqmp/util.py

diff --git a/python/qemu/aqmp/util.py b/python/qemu/aqmp/util.py
new file mode 100644
index 00000000000..7e7c2584d2b
--- /dev/null
+++ b/python/qemu/aqmp/util.py
@@ -0,0 +1,106 @@
+"""
+Miscellaneous Utilities
+
+This module provides asyncio utilities and compatibility wrappers for
+Python 3.6 to provide some features that otherwise become available in
+Python 3.7+.
+"""
+
+import asyncio
+import sys
+from typing import (
+    Any,
+    Coroutine,
+    Optional,
+    TypeVar,
+    cast,
+)
+
+
+T = TypeVar('T')
+
+
+# --------------------------
+# Section: Utility Functions
+# --------------------------
+
+
+async def flush(writer: asyncio.StreamWriter) -> None:
+    """
+    Utility function to ensure a StreamWriter is *fully* drained.
+
+    `asyncio.StreamWriter.drain` only promises we will return to below
+    the "high-water mark". This function ensures we flush the entire
+    buffer -- by setting the high water mark to 0 and then calling
+    drain. The flow control limits are restored after the call is
+    completed.
+    """
+    transport = cast(asyncio.WriteTransport, writer.transport)
+
+    # https://github.com/python/typeshed/issues/5779
+    low, high = transport.get_write_buffer_limits()  # type: ignore
+    transport.set_write_buffer_limits(0, 0)
+    try:
+        await writer.drain()
+    finally:
+        transport.set_write_buffer_limits(high, low)
+
+
+# -------------------------------
+# Section: Compatibility Wrappers
+# -------------------------------
+
+
+def create_task(coro: Coroutine[Any, Any, T],
+                loop: Optional[asyncio.AbstractEventLoop] = None
+                ) -> 'asyncio.Future[T]':
+    """
+    Python 3.6-compatible `asyncio.create_task` wrapper.
+
+    :param coro: The coroutine to execute in a task.
+    :param loop: Optionally, the loop to create the task in.
+
+    :return: An `asyncio.Future` object.
+    """
+    if sys.version_info >= (3, 7):
+        if loop is not None:
+            return loop.create_task(coro)
+        return asyncio.create_task(coro)  # pylint: disable=no-member
+
+    # Python 3.6:
+    return asyncio.ensure_future(coro, loop=loop)
+
+
+def is_closing(writer: asyncio.StreamWriter) -> bool:
+    """
+    Python 3.6-compatible `asyncio.StreamWriter.is_closing` wrapper.
+
+    :param writer: The `asyncio.StreamWriter` object.
+    :return: `True` if the writer is closing, or closed.
+    """
+    if sys.version_info >= (3, 7):
+        return writer.is_closing()
+
+    # Python 3.6:
+    transport = writer.transport
+    assert isinstance(transport, asyncio.WriteTransport)
+    return transport.is_closing()
+
+
+async def wait_closed(writer: asyncio.StreamWriter) -> None:
+    """
+    Python 3.6-compatible `asyncio.StreamWriter.wait_closed` wrapper.
+
+    :param writer: The `asyncio.StreamWriter` to wait on.
+    """
+    if sys.version_info >= (3, 7):
+        await writer.wait_closed()
+        return
+
+    # Python 3.6
+    transport = writer.transport
+    assert isinstance(transport, asyncio.WriteTransport)
+
+    while not transport.is_closing():
+        await asyncio.sleep(0)
+    await flush(writer)
-- 
2.31.1



^ permalink raw reply related	[flat|nested] 36+ messages in thread

* [PATCH v2 05/24] python/aqmp: add generic async message-based protocol support
  2021-07-17  0:32 [PATCH v2 00/24] python: introduce Asynchronous QMP package John Snow
                   ` (3 preceding siblings ...)
  2021-07-17  0:32 ` [PATCH v2 04/24] python/aqmp: add asyncio compatibility wrappers John Snow
@ 2021-07-17  0:32 ` John Snow
  2021-07-17  0:32 ` [PATCH v2 06/24] python/aqmp: add runstate state machine to AsyncProtocol John Snow
                   ` (19 subsequent siblings)
  24 siblings, 0 replies; 36+ messages in thread
From: John Snow @ 2021-07-17  0:32 UTC (permalink / raw)
  To: qemu-devel
  Cc: Eduardo Habkost, Eric Blake, Stefan Hajnoczi, Markus Armbruster,
	Wainer dos Santos Moschetta, Niteesh G . S .,
	Willian Rampazzo, Cleber Rosa, John Snow

This is the bare minimum that you need to establish a full-duplex async
message-based protocol with Python's asyncio.

The features to be added in forthcoming commits are:

- Runstate tracking
- Logging
- Support for incoming connections via accept()
- _cb_outbound, _cb_inbound message hooks
- _readline() method

Some of the docstrings have dangling references, but they will resolve
themselves within the next few commits, and have been tested at the
conclusion of this series.

A note on the broad-except in _bh_disconnect:

I'm not sure if there's a more elegant solution here, but the problem is
that if an Exception occurred in the underlying
StreamReader/StreamWriter and causes one of the tasks to fail and
schedule a disconnect, the disconnect method itself will re-stumble
across the exact same Exception when attempting to close/flush the
stream.

Ignoring this Exception means we *might* miss a brand new Exception that
we didn't see already, but as we are in the process of tearing down the
connection anyway, I don't believe it matters. This stanza only really
cares that the writer is flushed and closed -- If it raises an
Exception, we know for sure it is not active and running.

Signed-off-by: John Snow <jsnow@redhat.com>
---
 python/qemu/aqmp/__init__.py |   4 +-
 python/qemu/aqmp/protocol.py | 508 +++++++++++++++++++++++++++++++++++
 python/qemu/aqmp/util.py     |  26 ++
 3 files changed, 537 insertions(+), 1 deletion(-)
 create mode 100644 python/qemu/aqmp/protocol.py

diff --git a/python/qemu/aqmp/__init__.py b/python/qemu/aqmp/__init__.py
index c97be950bf4..5c0de72672d 100644
--- a/python/qemu/aqmp/__init__.py
+++ b/python/qemu/aqmp/__init__.py
@@ -22,10 +22,12 @@
 # the COPYING file in the top-level directory.
 
 from .error import AQMPError
+from .protocol import ConnectError
 
 
 # The order of these fields impact the Sphinx documentation order.
 __all__ = (
-    # Exceptions
+    # Exceptions, most generic to most explicit
     'AQMPError',
+    'ConnectError',
 )
diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py
new file mode 100644
index 00000000000..5413f4e50c1
--- /dev/null
+++ b/python/qemu/aqmp/protocol.py
@@ -0,0 +1,508 @@
+"""
+Generic Asynchronous Message-based Protocol Support
+
+This module provides a generic framework for sending and receiving
+messages over an asyncio stream. `AsyncProtocol` is an abstract class
+that implements the core mechanisms of a simple send/receive protocol,
+and is designed to be extended.
+
+In this package, it is used as the implementation for the `QMPClient`
+class.
+"""
+
+import asyncio
+from asyncio import StreamReader, StreamWriter
+from ssl import SSLContext
+# import exceptions will be removed in a forthcoming commit.
+# The problem stems from pylint/flake8 believing that 'Any'
+# is unused because of its only use in a string-quoted type.
+from typing import (  # pylint: disable=unused-import # noqa
+    Any,
+    Awaitable,
+    Callable,
+    Generic,
+    Optional,
+    Tuple,
+    TypeVar,
+    Union,
+)
+
+from .error import AQMPError
+from .util import (
+    bottom_half,
+    create_task,
+    flush,
+    is_closing,
+    upper_half,
+    wait_closed,
+)
+
+
+T = TypeVar('T')
+_TaskFN = Callable[[], Awaitable[None]]  # aka ``async def func() -> None``
+_FutureT = TypeVar('_FutureT', bound=Optional['asyncio.Future[Any]'])
+
+
+class ConnectError(AQMPError):
+    """
+    Raised when the initial connection process has failed.
+
+    This Exception always wraps a "root cause" exception that can be
+    interrogated for additional information.
+
+    :param error_message: Human-readable string describing the error.
+    :param exc: The root-cause exception.
+    """
+    def __init__(self, error_message: str, exc: Exception):
+        super().__init__(error_message)
+        #: Human-readable error string
+        self.error_message: str = error_message
+        #: Wrapped root cause exception
+        self.exc: Exception = exc
+
+    def __str__(self) -> str:
+        return f"{self.error_message}: {self.exc!s}"
+
+
+class AsyncProtocol(Generic[T]):
+    """
+    AsyncProtocol implements a generic async message-based protocol.
+
+    This protocol assumes the basic unit of information transfer between
+    client and server is a "message", the details of which are left up
+    to the implementation. It assumes the sending and receiving of these
+    messages is full-duplex and not necessarily correlated; i.e. it
+    supports asynchronous inbound messages.
+
+    It is designed to be extended by a specific protocol which provides
+    the implementations for how to read and send messages. These must be
+    defined in `_do_recv()` and `_do_send()`, respectively.
+
+    Other callbacks have a default implementation, but are intended to be
+    either extended or overridden:
+
+     - `_establish_session`:
+         The base implementation starts the reader/writer tasks.
+         A protocol implementation can override this call, inserting
+         actions to be taken prior to starting the reader/writer tasks
+         before the super() call; actions needing to occur afterwards
+         can be written after the super() call.
+     - `_on_message`:
+         Actions to be performed when a message is received.
+    """
+    # pylint: disable=too-many-instance-attributes
+
+    # -------------------------
+    # Section: Public interface
+    # -------------------------
+
+    def __init__(self) -> None:
+        # stream I/O
+        self._reader: Optional[StreamReader] = None
+        self._writer: Optional[StreamWriter] = None
+
+        # Outbound Message queue
+        self._outgoing: asyncio.Queue[T]
+
+        # Special, long-running tasks:
+        self._reader_task: Optional[asyncio.Future[None]] = None
+        self._writer_task: Optional[asyncio.Future[None]] = None
+
+        # Aggregate of the above two tasks, used for Exception management.
+        self._bh_tasks: Optional[asyncio.Future[Tuple[
+            Optional[BaseException],
+            Optional[BaseException],
+        ]]] = None
+
+        #: Disconnect task. The disconnect implementation runs in a task
+        #: so that asynchronous disconnects (initiated by the
+        #: reader/writer) are allowed to wait for the reader/writers to
+        #: exit.
+        self._dc_task: Optional[asyncio.Future[None]] = None
+
+    @upper_half
+    async def connect(self, address: Union[str, Tuple[str, int]],
+                      ssl: Optional[SSLContext] = None) -> None:
+        """
+        Connect to the server and begin processing message queues.
+
+        If this call fails, `runstate` is guaranteed to be set back to `IDLE`.
+
+        :param address:
+            Address to connect to; UNIX socket path or TCP address/port.
+        :param ssl: SSL context to use, if any.
+
+        :raise StateError: When the `Runstate` is not `IDLE`.
+        :raise ConnectError: If a connection cannot be made to the server.
+        """
+        await self._new_session(address, ssl)
+
+    @upper_half
+    async def disconnect(self) -> None:
+        """
+        Disconnect and wait for all tasks to fully stop.
+
+        If there were was an exception that caused the reader/writers to
+        terminate prematurely, it will be raised here.
+
+        :raise Exception: When the reader or writer terminate unexpectedly.
+        """
+        self._schedule_disconnect()
+        await self._wait_disconnect()
+
+    # --------------------------
+    # Section: Session machinery
+    # --------------------------
+
+    @upper_half
+    async def _new_session(self,
+                           address: Union[str, Tuple[str, int]],
+                           ssl: Optional[SSLContext] = None) -> None:
+        """
+        Establish a new connection and initialize the session.
+
+        Connect or accept a new connection, then begin the protocol
+        session machinery. If this call fails, `runstate` is guaranteed
+        to be set back to `IDLE`.
+
+        :param address:
+            Address to connect to;
+            UNIX socket path or TCP address/port.
+        :param ssl: SSL context to use, if any.
+
+        :raise ConnectError:
+            When a connection or session cannot be established.
+
+            This exception will wrap a more concrete one. In most cases,
+            the wrapped exception will be `OSError` or `EOFError`. If a
+            protocol-level failure occurs while establishing a new
+            session, the wrapped error may also be an `AQMPError`.
+        """
+        try:
+            phase = "connection"
+            await self._establish_connection(address, ssl)
+
+            phase = "session"
+            await self._establish_session()
+
+        except BaseException as err:
+            emsg = f"Failed to establish {phase}"
+            await self.disconnect()
+
+            # NB: CancelledError is not a BaseException before Python 3.8
+            if isinstance(err, asyncio.CancelledError):
+                raise
+
+            if isinstance(err, Exception):
+                raise ConnectError(emsg, err) from err
+
+            # Raise BaseExceptions un-wrapped, they're more important.
+            raise
+
+    @upper_half
+    async def _establish_connection(
+            self,
+            address: Union[str, Tuple[str, int]],
+            ssl: Optional[SSLContext] = None,
+    ) -> None:
+        """
+        Establish a new connection.
+
+        :param address:
+            Address to connect to/listen on;
+            UNIX socket path or TCP address/port.
+        :param ssl: SSL context to use, if any.
+        """
+        await self._do_connect(address, ssl)
+
+    @upper_half
+    async def _do_connect(self, address: Union[str, Tuple[str, int]],
+                          ssl: Optional[SSLContext] = None) -> None:
+        """
+        Acting as the transport client, initiate a connection to a server.
+
+        :param address:
+            Address to connect to; UNIX socket path or TCP address/port.
+        :param ssl: SSL context to use, if any.
+
+        :raise OSError: For stream-related errors.
+        """
+        if isinstance(address, tuple):
+            connect = asyncio.open_connection(address[0], address[1], ssl=ssl)
+        else:
+            connect = asyncio.open_unix_connection(path=address, ssl=ssl)
+        self._reader, self._writer = await connect
+
+    @upper_half
+    async def _establish_session(self) -> None:
+        """
+        Establish a new session.
+
+        Starts the readers/writer tasks; subclasses may perform their
+        own negotiations here. The Runstate will be RUNNING upon
+        successful conclusion.
+        """
+        self._outgoing = asyncio.Queue()
+
+        reader_coro = self._bh_loop_forever(self._bh_recv_message)
+        writer_coro = self._bh_loop_forever(self._bh_send_message)
+
+        self._reader_task = create_task(reader_coro)
+        self._writer_task = create_task(writer_coro)
+
+        self._bh_tasks = asyncio.gather(
+            self._reader_task,
+            self._writer_task,
+        )
+
+    @upper_half
+    @bottom_half
+    def _schedule_disconnect(self) -> None:
+        """
+        Initiate a disconnect; idempotent.
+
+        This method is used both in the upper-half as a direct
+        consequence of `disconnect()`, and in the bottom-half in the
+        case of unhandled exceptions in the reader/writer tasks.
+
+        It can be invoked no matter what the `runstate` is.
+        """
+        if not self._dc_task:
+            self._dc_task = create_task(self._bh_disconnect())
+
+    @upper_half
+    async def _wait_disconnect(self) -> None:
+        """
+        Waits for a previously scheduled disconnect to finish.
+
+        This method will gather any bottom half exceptions and re-raise
+        the one that occurred first; presuming it to be the root cause
+        of any subsequent Exceptions. It is intended to be used in the
+        upper half of the call chain.
+
+        :raise Exception:
+            Arbitrary exception re-raised on behalf of the reader/writer.
+        """
+        assert self._dc_task
+
+        try:
+            await self._dc_task
+            if self._bh_tasks:
+                await self._bh_tasks  # Raise exception from reader/writer
+        finally:
+            self._cleanup()
+
+    @upper_half
+    def _cleanup(self) -> None:
+        """
+        Fully reset this object to a clean state and return to `IDLE`.
+        """
+        def _paranoid_task_erase(task: _FutureT) -> Optional[_FutureT]:
+            # Help to erase a task, ENSURING it is fully quiesced first.
+            assert (task is None) or task.done()
+            return None if (task and task.done()) else task
+
+        self._dc_task = _paranoid_task_erase(self._dc_task)
+        self._reader_task = _paranoid_task_erase(self._reader_task)
+        self._writer_task = _paranoid_task_erase(self._writer_task)
+        self._bh_tasks = _paranoid_task_erase(self._bh_tasks)
+
+        self._reader = None
+        self._writer = None
+
+    # ----------------------------
+    # Section: Bottom Half methods
+    # ----------------------------
+
+    @bottom_half
+    async def _bh_disconnect(self) -> None:
+        """
+        Disconnect and cancel all outstanding tasks.
+
+        It is designed to be called from its task context,
+        :py:obj:`~AsyncProtocol._dc_task`. By running in its own task,
+        it is free to wait on any pending actions that may still need to
+        occur in either the reader or writer tasks.
+        """
+
+        def _done(task: Optional['asyncio.Future[Any]']) -> bool:
+            return task is not None and task.done()
+
+        def _exception(
+                task: Optional['asyncio.Future[Any]']
+        ) -> Optional[BaseException]:
+            if task is None or not task.done():
+                return None
+            return task.exception()
+
+        # NB: We can't rely on _bh_tasks being done() here, it may not
+        #     yet have had a chance to run and gather itself.
+        error_pathway = _done(self._reader_task) or _done(self._writer_task)
+
+        await self._bh_stop_writer(error_pathway)
+        await self._bh_stop_reader()
+
+        # Next, close the writer stream itself.
+        # This implicitly closes the reader, too.
+        if self._writer:
+            if not is_closing(self._writer):
+                self._writer.close()
+
+            try:
+                await wait_closed(self._writer)
+            except Exception as err:  # pylint: disable=broad-except
+                # Waiting for the stream to close if the underlying transport
+                # has already raised an Exception will unfortunately re-raise
+                # that Exception, which it shares with the StreamReader.
+                if all((err is not _exception(task) for task in (
+                        self._reader_task, self._writer_task))):
+                    raise
+
+    @bottom_half
+    async def _bh_stop_writer(self, force: bool = False) -> None:
+        if not self._writer_task or self._writer_task.done():
+            return
+
+        # If we're not in a hurry, drain the outbound queue.
+        if not force:
+            await self._outgoing.join()
+            if self._writer is not None:
+                await flush(self._writer)
+
+        self._writer_task.cancel()
+        # Waits for the writer to finish but does NOT raise its exception.
+        await asyncio.wait((self._writer_task,))
+
+    @bottom_half
+    async def _bh_stop_reader(self) -> None:
+        if not self._reader_task or self._reader_task.done():
+            return
+
+        self._reader_task.cancel()
+        # Waits for the reader to finish but does NOT raise its exception.
+        await asyncio.wait((self._reader_task,))
+
+    @bottom_half
+    async def _bh_loop_forever(self, async_fn: _TaskFN) -> None:
+        """
+        Run one of the bottom-half methods in a loop forever.
+
+        If the bottom half ever raises any exception, schedule a
+        disconnect that will terminate the entire loop.
+
+        :param async_fn: The bottom-half method to run in a loop.
+        """
+        try:
+            while True:
+                await async_fn()
+        except asyncio.CancelledError:
+            # We have been cancelled by _bh_disconnect, exit gracefully.
+            return
+        except BaseException:
+            self._schedule_disconnect()
+            raise
+
+    @bottom_half
+    async def _bh_send_message(self) -> None:
+        """
+        Wait for an outgoing message, then send it.
+
+        Designed to be run in `_bh_loop_forever()`.
+        """
+        msg = await self._outgoing.get()
+        try:
+            await self._send(msg)
+        finally:
+            self._outgoing.task_done()
+
+    @bottom_half
+    async def _bh_recv_message(self) -> None:
+        """
+        Wait for an incoming message and call `_on_message` to route it.
+
+        Designed to be run in `_bh_loop_forever()`.
+        """
+        msg = await self._recv()
+        await self._on_message(msg)
+
+    # --------------------
+    # Section: Message I/O
+    # --------------------
+
+    @upper_half
+    @bottom_half
+    async def _do_recv(self) -> T:
+        """
+        Abstract: Read from the stream and return a message.
+
+        Very low-level; intended to only be called by `_recv()`.
+        """
+        raise NotImplementedError
+
+    @upper_half
+    @bottom_half
+    async def _recv(self) -> T:
+        """
+        Read an arbitrary protocol message.
+
+        .. warning::
+            This method is intended primarily for `_bh_recv_message()`
+            to use in an asynchronous task loop. Using it outside of
+            this loop will "steal" messages from the normal routing
+            mechanism. It is safe to use prior to `_establish_session()`,
+            but should not be used otherwise.
+
+        This method uses `_do_recv()` to retrieve the raw message, and
+        then transforms it using `_cb_inbound()`.
+
+        :return: A single (filtered, processed) protocol message.
+        """
+        # A forthcoming commit makes this method less trivial.
+        return await self._do_recv()
+
+    @upper_half
+    @bottom_half
+    def _do_send(self, msg: T) -> None:
+        """
+        Abstract: Write a message to the stream.
+
+        Very low-level; intended to only be called by `_send()`.
+        """
+        raise NotImplementedError
+
+    @upper_half
+    @bottom_half
+    async def _send(self, msg: T) -> None:
+        """
+        Send an arbitrary protocol message.
+
+        This method will transform any outgoing messages according to
+        `_cb_outbound()`.
+
+        .. warning::
+            Like `_recv()`, this method is intended to be called by
+            the writer task loop that processes outgoing
+            messages. Calling it directly may circumvent logic
+            implemented by the caller meant to correlate outgoing and
+            incoming messages.
+
+        :raise OSError: For problems with the underlying stream.
+        """
+        # A forthcoming commit makes this method less trivial.
+        self._do_send(msg)
+
+    @bottom_half
+    async def _on_message(self, msg: T) -> None:
+        """
+        Called to handle the receipt of a new message.
+
+        .. caution::
+            This is executed from within the reader loop, so be advised
+            that waiting on either the reader or writer task will lead
+            to deadlock. Additionally, any unhandled exceptions will
+            directly cause the loop to halt, so logic may be best-kept
+            to a minimum if at all possible.
+
+        :param msg: The incoming message
+        """
+        # Nothing to do in the abstract case.
diff --git a/python/qemu/aqmp/util.py b/python/qemu/aqmp/util.py
index 7e7c2584d2b..88abfc9eb22 100644
--- a/python/qemu/aqmp/util.py
+++ b/python/qemu/aqmp/util.py
@@ -46,6 +46,32 @@ async def flush(writer: asyncio.StreamWriter) -> None:
         transport.set_write_buffer_limits(high, low)
 
 
+def upper_half(func: T) -> T:
+    """
+    Do-nothing decorator that annotates a method as an "upper-half" method.
+
+    These methods must not call bottom-half functions directly, but can
+    schedule them to run.
+    """
+    return func
+
+
+def bottom_half(func: T) -> T:
+    """
+    Do-nothing decorator that annotates a method as a "bottom-half" method.
+
+    These methods must take great care to handle their own exceptions whenever
+    possible. If they go unhandled, they will cause termination of the loop.
+
+    These methods do not, in general, have the ability to directly
+    report information to a caller’s context and will usually be
+    collected as a Task result instead.
+
+    They must not call upper-half functions directly.
+    """
+    return func
+
+
 # -------------------------------
 # Section: Compatibility Wrappers
 # -------------------------------
-- 
2.31.1



^ permalink raw reply related	[flat|nested] 36+ messages in thread

* [PATCH v2 06/24] python/aqmp: add runstate state machine to AsyncProtocol
  2021-07-17  0:32 [PATCH v2 00/24] python: introduce Asynchronous QMP package John Snow
                   ` (4 preceding siblings ...)
  2021-07-17  0:32 ` [PATCH v2 05/24] python/aqmp: add generic async message-based protocol support John Snow
@ 2021-07-17  0:32 ` John Snow
  2021-07-17  0:32 ` [PATCH v2 07/24] python/aqmp: Add logging utility helpers John Snow
                   ` (18 subsequent siblings)
  24 siblings, 0 replies; 36+ messages in thread
From: John Snow @ 2021-07-17  0:32 UTC (permalink / raw)
  To: qemu-devel
  Cc: Eduardo Habkost, Eric Blake, Stefan Hajnoczi, Markus Armbruster,
	Wainer dos Santos Moschetta, Niteesh G . S .,
	Willian Rampazzo, Cleber Rosa, John Snow

This serves a few purposes:

1. Protect interfaces when it's not safe to call them (via @require)

2. Add an interface by which an async client can determine if the state
has changed, for the purposes of connection management.

Signed-off-by: John Snow <jsnow@redhat.com>
---
 python/qemu/aqmp/__init__.py |   5 +-
 python/qemu/aqmp/protocol.py | 159 ++++++++++++++++++++++++++++++++++-
 2 files changed, 159 insertions(+), 5 deletions(-)

diff --git a/python/qemu/aqmp/__init__.py b/python/qemu/aqmp/__init__.py
index 5c0de72672d..ed65913c83e 100644
--- a/python/qemu/aqmp/__init__.py
+++ b/python/qemu/aqmp/__init__.py
@@ -22,11 +22,14 @@
 # the COPYING file in the top-level directory.
 
 from .error import AQMPError
-from .protocol import ConnectError
+from .protocol import ConnectError, Runstate
 
 
 # The order of these fields impact the Sphinx documentation order.
 __all__ = (
+    # Classes
+    'Runstate',
+
     # Exceptions, most generic to most explicit
     'AQMPError',
     'ConnectError',
diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py
index 5413f4e50c1..247b60c31a6 100644
--- a/python/qemu/aqmp/protocol.py
+++ b/python/qemu/aqmp/protocol.py
@@ -12,11 +12,10 @@
 
 import asyncio
 from asyncio import StreamReader, StreamWriter
+from enum import Enum
+from functools import wraps
 from ssl import SSLContext
-# import exceptions will be removed in a forthcoming commit.
-# The problem stems from pylint/flake8 believing that 'Any'
-# is unused because of its only use in a string-quoted type.
-from typing import (  # pylint: disable=unused-import # noqa
+from typing import (
     Any,
     Awaitable,
     Callable,
@@ -25,6 +24,7 @@
     Tuple,
     TypeVar,
     Union,
+    cast,
 )
 
 from .error import AQMPError
@@ -43,6 +43,20 @@
 _FutureT = TypeVar('_FutureT', bound=Optional['asyncio.Future[Any]'])
 
 
+class Runstate(Enum):
+    """Protocol session runstate."""
+
+    #: Fully quiesced and disconnected.
+    IDLE = 0
+    #: In the process of connecting or establishing a session.
+    CONNECTING = 1
+    #: Fully connected and active session.
+    RUNNING = 2
+    #: In the process of disconnecting.
+    #: Runstate may be returned to `IDLE` by calling `disconnect()`.
+    DISCONNECTING = 3
+
+
 class ConnectError(AQMPError):
     """
     Raised when the initial connection process has failed.
@@ -64,6 +78,76 @@ def __str__(self) -> str:
         return f"{self.error_message}: {self.exc!s}"
 
 
+class StateError(AQMPError):
+    """
+    An API command (connect, execute, etc) was issued at an inappropriate time.
+
+    This error is raised when a command like
+    :py:meth:`~AsyncProtocol.connect()` is issued at an inappropriate
+    time.
+
+    :param error_message: Human-readable string describing the state violation.
+    :param state: The actual `Runstate` seen at the time of the violation.
+    :param required: The `Runstate` required to process this command.
+    """
+    def __init__(self, error_message: str,
+                 state: Runstate, required: Runstate):
+        super().__init__(error_message)
+        self.error_message = error_message
+        self.state = state
+        self.required = required
+
+
+F = TypeVar('F', bound=Callable[..., Any])  # pylint: disable=invalid-name
+
+
+# Don't Panic.
+def require(required_state: Runstate) -> Callable[[F], F]:
+    """
+    Decorator: protect a method so it can only be run in a certain `Runstate`.
+
+    :param required_state: The `Runstate` required to invoke this method.
+    :raise StateError: When the required `Runstate` is not met.
+    """
+    def _decorator(func: F) -> F:
+        # _decorator is the decorator that is built by calling the
+        # require() decorator factory; e.g.:
+        #
+        # @require(Runstate.IDLE) def # foo(): ...
+        # will replace 'foo' with the result of '_decorator(foo)'.
+
+        @wraps(func)
+        def _wrapper(proto: 'AsyncProtocol[Any]',
+                     *args: Any, **kwargs: Any) -> Any:
+            # _wrapper is the function that gets executed prior to the
+            # decorated method.
+
+            name = type(proto).__name__
+
+            if proto.runstate != required_state:
+                if proto.runstate == Runstate.CONNECTING:
+                    emsg = f"{name} is currently connecting."
+                elif proto.runstate == Runstate.DISCONNECTING:
+                    emsg = (f"{name} is disconnecting."
+                            " Call disconnect() to return to IDLE state.")
+                elif proto.runstate == Runstate.RUNNING:
+                    emsg = f"{name} is already connected and running."
+                elif proto.runstate == Runstate.IDLE:
+                    emsg = f"{name} is disconnected and idle."
+                else:
+                    assert False
+                raise StateError(emsg, proto.runstate, required_state)
+            # No StateError, so call the wrapped method.
+            return func(proto, *args, **kwargs)
+
+        # Return the decorated method;
+        # Transforming Func to Decorated[Func].
+        return cast(F, _wrapper)
+
+    # Return the decorator instance from the decorator factory. Phew!
+    return _decorator
+
+
 class AsyncProtocol(Generic[T]):
     """
     AsyncProtocol implements a generic async message-based protocol.
@@ -120,7 +204,24 @@ def __init__(self) -> None:
         #: exit.
         self._dc_task: Optional[asyncio.Future[None]] = None
 
+        self._runstate = Runstate.IDLE
+        self._runstate_changed: Optional[asyncio.Event] = None
+
+    @property  # @upper_half
+    def runstate(self) -> Runstate:
+        """The current `Runstate` of the connection."""
+        return self._runstate
+
     @upper_half
+    async def runstate_changed(self) -> Runstate:
+        """
+        Wait for the `runstate` to change, then return that runstate.
+        """
+        await self._runstate_event.wait()
+        return self.runstate
+
+    @upper_half
+    @require(Runstate.IDLE)
     async def connect(self, address: Union[str, Tuple[str, int]],
                       ssl: Optional[SSLContext] = None) -> None:
         """
@@ -154,6 +255,30 @@ async def disconnect(self) -> None:
     # Section: Session machinery
     # --------------------------
 
+    @property
+    def _runstate_event(self) -> asyncio.Event:
+        # asyncio.Event() objects should not be created prior to entrance into
+        # an event loop, so we can ensure we create it in the correct context.
+        # Create it on-demand *only* at the behest of an 'async def' method.
+        if not self._runstate_changed:
+            self._runstate_changed = asyncio.Event()
+        return self._runstate_changed
+
+    @upper_half
+    @bottom_half
+    def _set_state(self, state: Runstate) -> None:
+        """
+        Change the `Runstate` of the protocol connection.
+
+        Signals the `runstate_changed` event.
+        """
+        if state == self._runstate:
+            return
+
+        self._runstate = state
+        self._runstate_event.set()
+        self._runstate_event.clear()
+
     @upper_half
     async def _new_session(self,
                            address: Union[str, Tuple[str, int]],
@@ -178,6 +303,8 @@ async def _new_session(self,
             protocol-level failure occurs while establishing a new
             session, the wrapped error may also be an `AQMPError`.
         """
+        assert self.runstate == Runstate.IDLE
+
         try:
             phase = "connection"
             await self._establish_connection(address, ssl)
@@ -187,6 +314,7 @@ async def _new_session(self,
 
         except BaseException as err:
             emsg = f"Failed to establish {phase}"
+            # Reset from CONNECTING back to IDLE.
             await self.disconnect()
 
             # NB: CancelledError is not a BaseException before Python 3.8
@@ -199,6 +327,8 @@ async def _new_session(self,
             # Raise BaseExceptions un-wrapped, they're more important.
             raise
 
+        assert self.runstate == Runstate.RUNNING
+
     @upper_half
     async def _establish_connection(
             self,
@@ -213,6 +343,14 @@ async def _establish_connection(
             UNIX socket path or TCP address/port.
         :param ssl: SSL context to use, if any.
         """
+        assert self.runstate == Runstate.IDLE
+        self._set_state(Runstate.CONNECTING)
+
+        # Allow runstate watchers to witness 'CONNECTING' state; some
+        # failures in the streaming layer are synchronous and will not
+        # otherwise yield.
+        await asyncio.sleep(0)
+
         await self._do_connect(address, ssl)
 
     @upper_half
@@ -242,6 +380,8 @@ async def _establish_session(self) -> None:
         own negotiations here. The Runstate will be RUNNING upon
         successful conclusion.
         """
+        assert self.runstate == Runstate.CONNECTING
+
         self._outgoing = asyncio.Queue()
 
         reader_coro = self._bh_loop_forever(self._bh_recv_message)
@@ -255,6 +395,9 @@ async def _establish_session(self) -> None:
             self._writer_task,
         )
 
+        self._set_state(Runstate.RUNNING)
+        await asyncio.sleep(0)  # Allow runstate_event to process
+
     @upper_half
     @bottom_half
     def _schedule_disconnect(self) -> None:
@@ -268,6 +411,7 @@ def _schedule_disconnect(self) -> None:
         It can be invoked no matter what the `runstate` is.
         """
         if not self._dc_task:
+            self._set_state(Runstate.DISCONNECTING)
             self._dc_task = create_task(self._bh_disconnect())
 
     @upper_half
@@ -283,6 +427,7 @@ async def _wait_disconnect(self) -> None:
         :raise Exception:
             Arbitrary exception re-raised on behalf of the reader/writer.
         """
+        assert self.runstate == Runstate.DISCONNECTING
         assert self._dc_task
 
         try:
@@ -291,6 +436,7 @@ async def _wait_disconnect(self) -> None:
                 await self._bh_tasks  # Raise exception from reader/writer
         finally:
             self._cleanup()
+            self._set_state(Runstate.IDLE)
 
     @upper_half
     def _cleanup(self) -> None:
@@ -302,6 +448,7 @@ def _paranoid_task_erase(task: _FutureT) -> Optional[_FutureT]:
             assert (task is None) or task.done()
             return None if (task and task.done()) else task
 
+        assert self.runstate == Runstate.DISCONNECTING
         self._dc_task = _paranoid_task_erase(self._dc_task)
         self._reader_task = _paranoid_task_erase(self._reader_task)
         self._writer_task = _paranoid_task_erase(self._writer_task)
@@ -310,6 +457,9 @@ def _paranoid_task_erase(task: _FutureT) -> Optional[_FutureT]:
         self._reader = None
         self._writer = None
 
+        # NB: _runstate_changed cannot be cleared because we still need it to
+        # send the final runstate changed event ...!
+
     # ----------------------------
     # Section: Bottom Half methods
     # ----------------------------
@@ -324,6 +474,7 @@ async def _bh_disconnect(self) -> None:
         it is free to wait on any pending actions that may still need to
         occur in either the reader or writer tasks.
         """
+        assert self.runstate == Runstate.DISCONNECTING
 
         def _done(task: Optional['asyncio.Future[Any]']) -> bool:
             return task is not None and task.done()
-- 
2.31.1



^ permalink raw reply related	[flat|nested] 36+ messages in thread

* [PATCH v2 07/24] python/aqmp: Add logging utility helpers
  2021-07-17  0:32 [PATCH v2 00/24] python: introduce Asynchronous QMP package John Snow
                   ` (5 preceding siblings ...)
  2021-07-17  0:32 ` [PATCH v2 06/24] python/aqmp: add runstate state machine to AsyncProtocol John Snow
@ 2021-07-17  0:32 ` John Snow
  2021-07-17  0:32 ` [PATCH v2 08/24] python/aqmp: add logging to AsyncProtocol John Snow
                   ` (17 subsequent siblings)
  24 siblings, 0 replies; 36+ messages in thread
From: John Snow @ 2021-07-17  0:32 UTC (permalink / raw)
  To: qemu-devel
  Cc: Eduardo Habkost, Eric Blake, Stefan Hajnoczi, Markus Armbruster,
	Wainer dos Santos Moschetta, Niteesh G . S .,
	Willian Rampazzo, Cleber Rosa, John Snow

Signed-off-by: John Snow <jsnow@redhat.com>
---
 python/qemu/aqmp/util.py | 56 ++++++++++++++++++++++++++++++++++++++++
 1 file changed, 56 insertions(+)

diff --git a/python/qemu/aqmp/util.py b/python/qemu/aqmp/util.py
index 88abfc9eb22..70ef94ad600 100644
--- a/python/qemu/aqmp/util.py
+++ b/python/qemu/aqmp/util.py
@@ -4,10 +4,15 @@
 This module provides asyncio utilities and compatibility wrappers for
 Python 3.6 to provide some features that otherwise become available in
 Python 3.7+.
+
+Various logging and debugging utilities are also provided, such as
+`exception_summary()` and `pretty_traceback()`, used primarily for
+adding information into the logging stream.
 """
 
 import asyncio
 import sys
+import traceback
 from typing import (
     Any,
     Coroutine,
@@ -130,3 +135,54 @@ async def wait_closed(writer: asyncio.StreamWriter) -> None:
     while not transport.is_closing():
         await asyncio.sleep(0)
     await flush(writer)
+
+
+# ----------------------------
+# Section: Logging & Debugging
+# ----------------------------
+
+
+def exception_summary(exc: BaseException) -> str:
+    """
+    Return a summary string of an arbitrary exception.
+
+    It will be of the form "ExceptionType: Error Message", if the error
+    string is non-empty, and just "ExceptionType" otherwise.
+    """
+    name = type(exc).__qualname__
+    smod = type(exc).__module__
+    if smod not in ("__main__", "builtins"):
+        name = smod + '.' + name
+
+    error = str(exc)
+    if error:
+        return f"{name}: {error}"
+    return name
+
+
+def pretty_traceback(prefix: str = "  | ") -> str:
+    """
+    Formats the current traceback, indented to provide visual distinction.
+
+    This is useful for printing a traceback within a traceback for
+    debugging purposes when encapsulating errors to deliver them up the
+    stack; when those errors are printed, this helps provide a nice
+    visual grouping to quickly identify the parts of the error that
+    belong to the inner exception.
+
+    :param prefix: The prefix to append to each line of the traceback.
+    :return: A string, formatted something like the following::
+
+      | Traceback (most recent call last):
+      |   File "foobar.py", line 42, in arbitrary_example
+      |     foo.baz()
+      | ArbitraryError: [Errno 42] Something bad happened!
+    """
+    output = "".join(traceback.format_exception(*sys.exc_info()))
+
+    exc_lines = []
+    for line in output.split('\n'):
+        exc_lines.append(prefix + line)
+
+    # The last line is always empty, omit it
+    return "\n".join(exc_lines[:-1])
-- 
2.31.1



^ permalink raw reply related	[flat|nested] 36+ messages in thread

* [PATCH v2 08/24] python/aqmp: add logging to AsyncProtocol
  2021-07-17  0:32 [PATCH v2 00/24] python: introduce Asynchronous QMP package John Snow
                   ` (6 preceding siblings ...)
  2021-07-17  0:32 ` [PATCH v2 07/24] python/aqmp: Add logging utility helpers John Snow
@ 2021-07-17  0:32 ` John Snow
  2021-07-17  0:32 ` [PATCH v2 09/24] python/aqmp: add AsyncProtocol.accept() method John Snow
                   ` (16 subsequent siblings)
  24 siblings, 0 replies; 36+ messages in thread
From: John Snow @ 2021-07-17  0:32 UTC (permalink / raw)
  To: qemu-devel
  Cc: Eduardo Habkost, Eric Blake, Stefan Hajnoczi, Markus Armbruster,
	Wainer dos Santos Moschetta, Niteesh G . S .,
	Willian Rampazzo, Cleber Rosa, John Snow

Give the connection and the reader/writer tasks nicknames, and add
logging statements throughout.

Signed-off-by: John Snow <jsnow@redhat.com>
---
 python/qemu/aqmp/protocol.py | 71 ++++++++++++++++++++++++++++++++----
 1 file changed, 64 insertions(+), 7 deletions(-)

diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py
index 247b60c31a6..f9295546cda 100644
--- a/python/qemu/aqmp/protocol.py
+++ b/python/qemu/aqmp/protocol.py
@@ -14,6 +14,7 @@
 from asyncio import StreamReader, StreamWriter
 from enum import Enum
 from functools import wraps
+import logging
 from ssl import SSLContext
 from typing import (
     Any,
@@ -31,8 +32,10 @@
 from .util import (
     bottom_half,
     create_task,
+    exception_summary,
     flush,
     is_closing,
+    pretty_traceback,
     upper_half,
     wait_closed,
 )
@@ -173,14 +176,28 @@ class AsyncProtocol(Generic[T]):
          can be written after the super() call.
      - `_on_message`:
          Actions to be performed when a message is received.
+
+    :param name:
+        Name used for logging messages, if any. By default, messages
+        will log to 'qemu.aqmp.protocol', but each individual connection
+        can be given its own logger by giving it a name; messages will
+        then log to 'qemu.aqmp.protocol.${name}'.
     """
     # pylint: disable=too-many-instance-attributes
 
+    #: Logger object for debugging messages from this connection.
+    logger = logging.getLogger(__name__)
+
     # -------------------------
     # Section: Public interface
     # -------------------------
 
-    def __init__(self) -> None:
+    def __init__(self, name: Optional[str] = None) -> None:
+        #: The nickname for this connection, if any.
+        self.name: Optional[str] = name
+        if self.name is not None:
+            self.logger = self.logger.getChild(self.name)
+
         # stream I/O
         self._reader: Optional[StreamReader] = None
         self._writer: Optional[StreamWriter] = None
@@ -207,6 +224,14 @@ def __init__(self) -> None:
         self._runstate = Runstate.IDLE
         self._runstate_changed: Optional[asyncio.Event] = None
 
+    def __repr__(self) -> str:
+        cls_name = type(self).__name__
+        tokens = []
+        if self.name is not None:
+            tokens.append(f"name={self.name!r}")
+        tokens.append(f"runstate={self.runstate.name}")
+        return f"<{cls_name} {' '.join(tokens)}>"
+
     @property  # @upper_half
     def runstate(self) -> Runstate:
         """The current `Runstate` of the connection."""
@@ -275,6 +300,8 @@ def _set_state(self, state: Runstate) -> None:
         if state == self._runstate:
             return
 
+        self.logger.debug("Transitioning from '%s' to '%s'.",
+                          str(self._runstate), str(state))
         self._runstate = state
         self._runstate_event.set()
         self._runstate_event.clear()
@@ -314,8 +341,15 @@ async def _new_session(self,
 
         except BaseException as err:
             emsg = f"Failed to establish {phase}"
-            # Reset from CONNECTING back to IDLE.
-            await self.disconnect()
+            self.logger.error("%s: %s", emsg, exception_summary(err))
+            self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
+            try:
+                # Reset from CONNECTING back to IDLE.
+                await self.disconnect()
+            except:
+                emsg = "Unexpected bottom half exception"
+                self.logger.critical("%s:\n%s\n", emsg, pretty_traceback())
+                raise
 
             # NB: CancelledError is not a BaseException before Python 3.8
             if isinstance(err, asyncio.CancelledError):
@@ -365,12 +399,16 @@ async def _do_connect(self, address: Union[str, Tuple[str, int]],
 
         :raise OSError: For stream-related errors.
         """
+        self.logger.debug("Connecting to %s ...", address)
+
         if isinstance(address, tuple):
             connect = asyncio.open_connection(address[0], address[1], ssl=ssl)
         else:
             connect = asyncio.open_unix_connection(path=address, ssl=ssl)
         self._reader, self._writer = await connect
 
+        self.logger.debug("Connected.")
+
     @upper_half
     async def _establish_session(self) -> None:
         """
@@ -384,8 +422,8 @@ async def _establish_session(self) -> None:
 
         self._outgoing = asyncio.Queue()
 
-        reader_coro = self._bh_loop_forever(self._bh_recv_message)
-        writer_coro = self._bh_loop_forever(self._bh_send_message)
+        reader_coro = self._bh_loop_forever(self._bh_recv_message, 'Reader')
+        writer_coro = self._bh_loop_forever(self._bh_send_message, 'Writer')
 
         self._reader_task = create_task(reader_coro)
         self._writer_task = create_task(writer_coro)
@@ -412,6 +450,7 @@ def _schedule_disconnect(self) -> None:
         """
         if not self._dc_task:
             self._set_state(Runstate.DISCONNECTING)
+            self.logger.debug("Scheduling disconnect.")
             self._dc_task = create_task(self._bh_disconnect())
 
     @upper_half
@@ -497,8 +536,10 @@ def _exception(
         # This implicitly closes the reader, too.
         if self._writer:
             if not is_closing(self._writer):
+                self.logger.debug("Closing StreamWriter.")
                 self._writer.close()
 
+            self.logger.debug("Waiting for StreamWriter to close ...")
             try:
                 await wait_closed(self._writer)
             except Exception as err:  # pylint: disable=broad-except
@@ -509,6 +550,10 @@ def _exception(
                         self._reader_task, self._writer_task))):
                     raise
 
+            self.logger.debug("StreamWriter closed.")
+
+        self.logger.debug("Disconnected.")
+
     @bottom_half
     async def _bh_stop_writer(self, force: bool = False) -> None:
         if not self._writer_task or self._writer_task.done():
@@ -516,10 +561,13 @@ async def _bh_stop_writer(self, force: bool = False) -> None:
 
         # If we're not in a hurry, drain the outbound queue.
         if not force:
+            self.logger.debug("Draining the outbound queue ...")
             await self._outgoing.join()
             if self._writer is not None:
+                self.logger.debug("Flushing the StreamWriter ...")
                 await flush(self._writer)
 
+        self.logger.debug("Cancelling writer task ...")
         self._writer_task.cancel()
         # Waits for the writer to finish but does NOT raise its exception.
         await asyncio.wait((self._writer_task,))
@@ -529,12 +577,13 @@ async def _bh_stop_reader(self) -> None:
         if not self._reader_task or self._reader_task.done():
             return
 
+        self.logger.debug("Cancelling reader task ...")
         self._reader_task.cancel()
         # Waits for the reader to finish but does NOT raise its exception.
         await asyncio.wait((self._reader_task,))
 
     @bottom_half
-    async def _bh_loop_forever(self, async_fn: _TaskFN) -> None:
+    async def _bh_loop_forever(self, async_fn: _TaskFN, name: str) -> None:
         """
         Run one of the bottom-half methods in a loop forever.
 
@@ -542,16 +591,24 @@ async def _bh_loop_forever(self, async_fn: _TaskFN) -> None:
         disconnect that will terminate the entire loop.
 
         :param async_fn: The bottom-half method to run in a loop.
+        :param name: The name of this task, used for logging.
         """
         try:
             while True:
                 await async_fn()
         except asyncio.CancelledError:
             # We have been cancelled by _bh_disconnect, exit gracefully.
+            self.logger.debug("Task.%s: cancelled.", name)
             return
-        except BaseException:
+        except BaseException as err:
+            self.logger.error("Task.%s: %s",
+                              name, exception_summary(err))
+            self.logger.debug("Task.%s: failure:\n%s\n",
+                              name, pretty_traceback())
             self._schedule_disconnect()
             raise
+        finally:
+            self.logger.debug("Task.%s: exiting.", name)
 
     @bottom_half
     async def _bh_send_message(self) -> None:
-- 
2.31.1



^ permalink raw reply related	[flat|nested] 36+ messages in thread

* [PATCH v2 09/24] python/aqmp: add AsyncProtocol.accept() method
  2021-07-17  0:32 [PATCH v2 00/24] python: introduce Asynchronous QMP package John Snow
                   ` (7 preceding siblings ...)
  2021-07-17  0:32 ` [PATCH v2 08/24] python/aqmp: add logging to AsyncProtocol John Snow
@ 2021-07-17  0:32 ` John Snow
  2021-07-17  0:32 ` [PATCH v2 10/24] python/aqmp: add configurable read buffer limit John Snow
                   ` (15 subsequent siblings)
  24 siblings, 0 replies; 36+ messages in thread
From: John Snow @ 2021-07-17  0:32 UTC (permalink / raw)
  To: qemu-devel
  Cc: Eduardo Habkost, Eric Blake, Stefan Hajnoczi, Markus Armbruster,
	Wainer dos Santos Moschetta, Niteesh G . S .,
	Willian Rampazzo, Cleber Rosa, John Snow

It's a little messier than connect, because it wasn't designed to accept
*precisely one* connection. Such is life.

Signed-off-by: John Snow <jsnow@redhat.com>
---
 python/qemu/aqmp/protocol.py | 89 ++++++++++++++++++++++++++++++++++--
 1 file changed, 85 insertions(+), 4 deletions(-)

diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py
index f9295546cda..99b9614ba94 100644
--- a/python/qemu/aqmp/protocol.py
+++ b/python/qemu/aqmp/protocol.py
@@ -245,6 +245,24 @@ async def runstate_changed(self) -> Runstate:
         await self._runstate_event.wait()
         return self.runstate
 
+    @upper_half
+    @require(Runstate.IDLE)
+    async def accept(self, address: Union[str, Tuple[str, int]],
+                     ssl: Optional[SSLContext] = None) -> None:
+        """
+        Accept a connection and begin processing message queues.
+
+        If this call fails, `runstate` is guaranteed to be set back to `IDLE`.
+
+        :param address:
+            Address to listen to; UNIX socket path or TCP address/port.
+        :param ssl: SSL context to use, if any.
+
+        :raise StateError: When the `Runstate` is not `IDLE`.
+        :raise ConnectError: If a connection could not be accepted.
+        """
+        await self._new_session(address, ssl, accept=True)
+
     @upper_half
     @require(Runstate.IDLE)
     async def connect(self, address: Union[str, Tuple[str, int]],
@@ -309,7 +327,8 @@ def _set_state(self, state: Runstate) -> None:
     @upper_half
     async def _new_session(self,
                            address: Union[str, Tuple[str, int]],
-                           ssl: Optional[SSLContext] = None) -> None:
+                           ssl: Optional[SSLContext] = None,
+                           accept: bool = False) -> None:
         """
         Establish a new connection and initialize the session.
 
@@ -318,9 +337,10 @@ async def _new_session(self,
         to be set back to `IDLE`.
 
         :param address:
-            Address to connect to;
+            Address to connect to/listen on;
             UNIX socket path or TCP address/port.
         :param ssl: SSL context to use, if any.
+        :param accept: Accept a connection instead of connecting when `True`.
 
         :raise ConnectError:
             When a connection or session cannot be established.
@@ -334,7 +354,7 @@ async def _new_session(self,
 
         try:
             phase = "connection"
-            await self._establish_connection(address, ssl)
+            await self._establish_connection(address, ssl, accept)
 
             phase = "session"
             await self._establish_session()
@@ -368,6 +388,7 @@ async def _establish_connection(
             self,
             address: Union[str, Tuple[str, int]],
             ssl: Optional[SSLContext] = None,
+            accept: bool = False
     ) -> None:
         """
         Establish a new connection.
@@ -376,6 +397,7 @@ async def _establish_connection(
             Address to connect to/listen on;
             UNIX socket path or TCP address/port.
         :param ssl: SSL context to use, if any.
+        :param accept: Accept a connection instead of connecting when `True`.
         """
         assert self.runstate == Runstate.IDLE
         self._set_state(Runstate.CONNECTING)
@@ -385,7 +407,66 @@ async def _establish_connection(
         # otherwise yield.
         await asyncio.sleep(0)
 
-        await self._do_connect(address, ssl)
+        if accept:
+            await self._do_accept(address, ssl)
+        else:
+            await self._do_connect(address, ssl)
+
+    @upper_half
+    async def _do_accept(self, address: Union[str, Tuple[str, int]],
+                         ssl: Optional[SSLContext] = None) -> None:
+        """
+        Acting as the transport server, accept a single connection.
+
+        :param address:
+            Address to listen on; UNIX socket path or TCP address/port.
+        :param ssl: SSL context to use, if any.
+
+        :raise OSError: For stream-related errors.
+        """
+        self.logger.debug("Awaiting connection on %s ...", address)
+        connected = asyncio.Event()
+        server: Optional[asyncio.AbstractServer] = None
+
+        async def _client_connected_cb(reader: asyncio.StreamReader,
+                                       writer: asyncio.StreamWriter) -> None:
+            """Used to accept a single incoming connection, see below."""
+            nonlocal server
+            nonlocal connected
+
+            # A connection has been accepted; stop listening for new ones.
+            assert server is not None
+            server.close()
+            await server.wait_closed()
+            server = None
+
+            # Register this client as being connected
+            self._reader, self._writer = (reader, writer)
+
+            # Signal back: We've accepted a client!
+            connected.set()
+
+        if isinstance(address, tuple):
+            coro = asyncio.start_server(
+                _client_connected_cb,
+                host=address[0],
+                port=address[1],
+                ssl=ssl,
+                backlog=1,
+            )
+        else:
+            coro = asyncio.start_unix_server(
+                _client_connected_cb,
+                path=address,
+                ssl=ssl,
+                backlog=1,
+            )
+
+        server = await coro     # Starts listening
+        await connected.wait()  # Waits for the callback to fire (and finish)
+        assert server is None
+
+        self.logger.debug("Connection accepted.")
 
     @upper_half
     async def _do_connect(self, address: Union[str, Tuple[str, int]],
-- 
2.31.1



^ permalink raw reply related	[flat|nested] 36+ messages in thread

* [PATCH v2 10/24] python/aqmp: add configurable read buffer limit
  2021-07-17  0:32 [PATCH v2 00/24] python: introduce Asynchronous QMP package John Snow
                   ` (8 preceding siblings ...)
  2021-07-17  0:32 ` [PATCH v2 09/24] python/aqmp: add AsyncProtocol.accept() method John Snow
@ 2021-07-17  0:32 ` John Snow
  2021-07-17  0:32 ` [PATCH v2 11/24] python/aqmp: add _cb_inbound and _cb_inbound logging hooks John Snow
                   ` (14 subsequent siblings)
  24 siblings, 0 replies; 36+ messages in thread
From: John Snow @ 2021-07-17  0:32 UTC (permalink / raw)
  To: qemu-devel
  Cc: Eduardo Habkost, Eric Blake, Stefan Hajnoczi, Markus Armbruster,
	Wainer dos Santos Moschetta, Niteesh G . S .,
	Willian Rampazzo, Cleber Rosa, John Snow

QMP can transmit some pretty big messages, and the default limit of 64KB
isn't sufficient. Make sure that we can configure it.

Reported-by: G S Niteesh Babu <niteesh.gs@gmail.com>
Signed-off-by: John Snow <jsnow@redhat.com>
---
 python/qemu/aqmp/protocol.py | 18 ++++++++++++++++--
 1 file changed, 16 insertions(+), 2 deletions(-)

diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py
index 99b9614ba94..86002a52654 100644
--- a/python/qemu/aqmp/protocol.py
+++ b/python/qemu/aqmp/protocol.py
@@ -188,6 +188,9 @@ class AsyncProtocol(Generic[T]):
     #: Logger object for debugging messages from this connection.
     logger = logging.getLogger(__name__)
 
+    # Maximum allowable size of read buffer
+    _limit = (64 * 1024)
+
     # -------------------------
     # Section: Public interface
     # -------------------------
@@ -453,6 +456,7 @@ async def _client_connected_cb(reader: asyncio.StreamReader,
                 port=address[1],
                 ssl=ssl,
                 backlog=1,
+                limit=self._limit,
             )
         else:
             coro = asyncio.start_unix_server(
@@ -460,6 +464,7 @@ async def _client_connected_cb(reader: asyncio.StreamReader,
                 path=address,
                 ssl=ssl,
                 backlog=1,
+                limit=self._limit,
             )
 
         server = await coro     # Starts listening
@@ -483,9 +488,18 @@ async def _do_connect(self, address: Union[str, Tuple[str, int]],
         self.logger.debug("Connecting to %s ...", address)
 
         if isinstance(address, tuple):
-            connect = asyncio.open_connection(address[0], address[1], ssl=ssl)
+            connect = asyncio.open_connection(
+                address[0],
+                address[1],
+                ssl=ssl,
+                limit=self._limit,
+            )
         else:
-            connect = asyncio.open_unix_connection(path=address, ssl=ssl)
+            connect = asyncio.open_unix_connection(
+                path=address,
+                ssl=ssl,
+                limit=self._limit,
+            )
         self._reader, self._writer = await connect
 
         self.logger.debug("Connected.")
-- 
2.31.1



^ permalink raw reply related	[flat|nested] 36+ messages in thread

* [PATCH v2 11/24] python/aqmp: add _cb_inbound and _cb_inbound logging hooks
  2021-07-17  0:32 [PATCH v2 00/24] python: introduce Asynchronous QMP package John Snow
                   ` (9 preceding siblings ...)
  2021-07-17  0:32 ` [PATCH v2 10/24] python/aqmp: add configurable read buffer limit John Snow
@ 2021-07-17  0:32 ` John Snow
  2021-07-20 18:51   ` Niteesh G. S.
  2021-07-17  0:32 ` [PATCH v2 12/24] python/aqmp: add AsyncProtocol._readline() method John Snow
                   ` (13 subsequent siblings)
  24 siblings, 1 reply; 36+ messages in thread
From: John Snow @ 2021-07-17  0:32 UTC (permalink / raw)
  To: qemu-devel
  Cc: Eduardo Habkost, Eric Blake, Stefan Hajnoczi, Markus Armbruster,
	Wainer dos Santos Moschetta, Niteesh G . S .,
	Willian Rampazzo, Cleber Rosa, John Snow

Add hooks designed to log/filter incoming/outgoing messages. The primary
intent for these is to be able to support iotests which may want to log
messages with specific filters for reproducible output.

Another use is for plugging into Urwid frameworks; all messages in/out
can be automatically added to a rendering list for the purposes of a
qmp-shell like tool.

Signed-off-by: John Snow <jsnow@redhat.com>
---
 python/qemu/aqmp/protocol.py | 50 +++++++++++++++++++++++++++++++++---
 1 file changed, 46 insertions(+), 4 deletions(-)

diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py
index 86002a52654..6f83d3e3922 100644
--- a/python/qemu/aqmp/protocol.py
+++ b/python/qemu/aqmp/protocol.py
@@ -176,6 +176,11 @@ class AsyncProtocol(Generic[T]):
          can be written after the super() call.
      - `_on_message`:
          Actions to be performed when a message is received.
+     - `_cb_outbound`:
+         Logging/Filtering hook for all outbound messages.
+     - `_cb_inbound`:
+         Logging/Filtering hook for all inbound messages.
+         This hook runs *before* `_on_message()`.
 
     :param name:
         Name used for logging messages, if any. By default, messages
@@ -732,6 +737,43 @@ async def _bh_recv_message(self) -> None:
     # Section: Message I/O
     # --------------------
 
+    @upper_half
+    @bottom_half
+    def _cb_outbound(self, msg: T) -> T:
+        """
+        Callback: outbound message hook.
+
+        This is intended for subclasses to be able to add arbitrary
+        hooks to filter or manipulate outgoing messages. The base
+        implementation does nothing but log the message without any
+        manipulation of the message.
+
+        :param msg: raw outbound message
+        :return: final outbound message
+        """
+        self.logger.debug("--> %s", str(msg))
+        return msg
+
+    @upper_half
+    @bottom_half
+    def _cb_inbound(self, msg: T) -> T:
+        """
+        Callback: inbound message hook.
+
+        This is intended for subclasses to be able to add arbitrary
+        hooks to filter or manipulate incoming messages. The base
+        implementation does nothing but log the message without any
+        manipulation of the message.
+
+        This method does not "handle" incoming messages; it is a filter.
+        The actual "endpoint" for incoming messages is `_on_message()`.
+
+        :param msg: raw inbound message
+        :return: processed inbound message
+        """
+        self.logger.debug("<-- %s", str(msg))
+        return msg
+
     @upper_half
     @bottom_half
     async def _do_recv(self) -> T:
@@ -760,8 +802,8 @@ async def _recv(self) -> T:
 
         :return: A single (filtered, processed) protocol message.
         """
-        # A forthcoming commit makes this method less trivial.
-        return await self._do_recv()
+        message = await self._do_recv()
+        return self._cb_inbound(message)
 
     @upper_half
     @bottom_half
@@ -791,7 +833,7 @@ async def _send(self, msg: T) -> None:
 
         :raise OSError: For problems with the underlying stream.
         """
-        # A forthcoming commit makes this method less trivial.
+        msg = self._cb_outbound(msg)
         self._do_send(msg)
 
     @bottom_half
@@ -806,6 +848,6 @@ async def _on_message(self, msg: T) -> None:
             directly cause the loop to halt, so logic may be best-kept
             to a minimum if at all possible.
 
-        :param msg: The incoming message
+        :param msg: The incoming message, already logged/filtered.
         """
         # Nothing to do in the abstract case.
-- 
2.31.1



^ permalink raw reply related	[flat|nested] 36+ messages in thread

* [PATCH v2 12/24] python/aqmp: add AsyncProtocol._readline() method
  2021-07-17  0:32 [PATCH v2 00/24] python: introduce Asynchronous QMP package John Snow
                   ` (10 preceding siblings ...)
  2021-07-17  0:32 ` [PATCH v2 11/24] python/aqmp: add _cb_inbound and _cb_inbound logging hooks John Snow
@ 2021-07-17  0:32 ` John Snow
  2021-07-17  0:32 ` [PATCH v2 13/24] python/aqmp: add QMP Message format John Snow
                   ` (12 subsequent siblings)
  24 siblings, 0 replies; 36+ messages in thread
From: John Snow @ 2021-07-17  0:32 UTC (permalink / raw)
  To: qemu-devel
  Cc: Eduardo Habkost, Eric Blake, Stefan Hajnoczi, Markus Armbruster,
	Wainer dos Santos Moschetta, Niteesh G . S .,
	Willian Rampazzo, Cleber Rosa, John Snow

This is added as a courtesy: many protocols are line-based, including
QMP. Putting it in AsyncProtocol lets us keep the QMP class
implementation just a pinch more abstract.

(And, if we decide to add a QTEST implementation later, it will need
this, too. (Yes, I have a QTEST implementation.))

Signed-off-by: John Snow <jsnow@redhat.com>
---
 python/qemu/aqmp/protocol.py | 29 +++++++++++++++++++++++++++++
 1 file changed, 29 insertions(+)

diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py
index 6f83d3e3922..28cd5d9f4fd 100644
--- a/python/qemu/aqmp/protocol.py
+++ b/python/qemu/aqmp/protocol.py
@@ -774,6 +774,35 @@ def _cb_inbound(self, msg: T) -> T:
         self.logger.debug("<-- %s", str(msg))
         return msg
 
+    @upper_half
+    @bottom_half
+    async def _readline(self) -> bytes:
+        """
+        Wait for a newline from the incoming reader.
+
+        This method is provided as a convenience for upper-layer
+        protocols, as many are line-based.
+
+        This method *may* return a sequence of bytes without a trailing
+        newline if EOF occurs, but *some* bytes were received. In this
+        case, the next call will raise `EOFError`. It is assumed that
+        the layer 5 protocol will decide if there is anything meaningful
+        to be done with a partial message.
+
+        :raise OSError: For stream-related errors.
+        :raise EOFError:
+            If the reader stream is at EOF and there are no bytes to return.
+        :return: bytes, including the newline.
+        """
+        assert self._reader is not None
+        msg_bytes = await self._reader.readline()
+
+        if not msg_bytes:
+            if self._reader.at_eof():
+                raise EOFError
+
+        return msg_bytes
+
     @upper_half
     @bottom_half
     async def _do_recv(self) -> T:
-- 
2.31.1



^ permalink raw reply related	[flat|nested] 36+ messages in thread

* [PATCH v2 13/24] python/aqmp: add QMP Message format
  2021-07-17  0:32 [PATCH v2 00/24] python: introduce Asynchronous QMP package John Snow
                   ` (11 preceding siblings ...)
  2021-07-17  0:32 ` [PATCH v2 12/24] python/aqmp: add AsyncProtocol._readline() method John Snow
@ 2021-07-17  0:32 ` John Snow
  2021-07-17  0:32 ` [PATCH v2 14/24] python/aqmp: add well-known QMP object models John Snow
                   ` (11 subsequent siblings)
  24 siblings, 0 replies; 36+ messages in thread
From: John Snow @ 2021-07-17  0:32 UTC (permalink / raw)
  To: qemu-devel
  Cc: Eduardo Habkost, Eric Blake, Stefan Hajnoczi, Markus Armbruster,
	Wainer dos Santos Moschetta, Niteesh G . S .,
	Willian Rampazzo, Cleber Rosa, John Snow

The Message class is here primarily to serve as a solid type to use for
mypy static typing for unambiguous annotation and documentation.

We can also stuff JSON serialization and deserialization into this class
itself so it can be re-used even outside this infrastructure.

Signed-off-by: John Snow <jsnow@redhat.com>
---
 python/qemu/aqmp/__init__.py |   4 +-
 python/qemu/aqmp/message.py  | 209 +++++++++++++++++++++++++++++++++++
 2 files changed, 212 insertions(+), 1 deletion(-)
 create mode 100644 python/qemu/aqmp/message.py

diff --git a/python/qemu/aqmp/__init__.py b/python/qemu/aqmp/__init__.py
index ed65913c83e..035987c756c 100644
--- a/python/qemu/aqmp/__init__.py
+++ b/python/qemu/aqmp/__init__.py
@@ -22,12 +22,14 @@
 # the COPYING file in the top-level directory.
 
 from .error import AQMPError
+from .message import Message
 from .protocol import ConnectError, Runstate
 
 
 # The order of these fields impact the Sphinx documentation order.
 __all__ = (
-    # Classes
+    # Classes, most to least important
+    'Message',
     'Runstate',
 
     # Exceptions, most generic to most explicit
diff --git a/python/qemu/aqmp/message.py b/python/qemu/aqmp/message.py
new file mode 100644
index 00000000000..f76ccc90746
--- /dev/null
+++ b/python/qemu/aqmp/message.py
@@ -0,0 +1,209 @@
+"""
+QMP Message Format
+
+This module provides the `Message` class, which represents a single QMP
+message sent to or from the server.
+"""
+
+import json
+from json import JSONDecodeError
+from typing import (
+    Dict,
+    Iterator,
+    Mapping,
+    MutableMapping,
+    Optional,
+    Union,
+)
+
+from .error import ProtocolError
+
+
+class Message(MutableMapping[str, object]):
+    """
+    Represents a single QMP protocol message.
+
+    QMP uses JSON objects as its basic communicative unit; so this
+    Python object is a :py:obj:`~collections.abc.MutableMapping`. It may
+    be instantiated from either another mapping (like a `dict`), or from
+    raw `bytes` that still need to be deserialized.
+
+    Once instantiated, it may be treated like any other MutableMapping::
+
+        >>> msg = Message(b'{"hello": "world"}')
+        >>> assert msg['hello'] == 'world'
+        >>> msg['id'] = 'foobar'
+        >>> print(msg)
+        {
+          "hello": "world",
+          "id": "foobar"
+        }
+
+    It can be converted to `bytes`::
+
+        >>> msg = Message({"hello": "world"})
+        >>> print(bytes(msg))
+        b'{"hello":"world","id":"foobar"}'
+
+    Or back into a garden-variety `dict`::
+
+       >>> dict(msg)
+       {'hello': 'world'}
+
+
+    :param value: Initial value, if any.
+    :param eager:
+        When `True`, attempt to serialize or deserialize the initial value
+        immediately, so that conversion exceptions are raised during
+        the call to ``__init__()``.
+    """
+    # pylint: disable=too-many-ancestors
+
+    def __init__(self,
+                 value: Union[bytes, Mapping[str, object]] = b'{}', *,
+                 eager: bool = True):
+        self._data: Optional[bytes] = None
+        self._obj: Optional[Dict[str, object]] = None
+
+        if isinstance(value, bytes):
+            self._data = value
+            if eager:
+                self._obj = self._deserialize(self._data)
+        else:
+            self._obj = dict(value)
+            if eager:
+                self._data = self._serialize(self._obj)
+
+    # Methods necessary to implement the MutableMapping interface, see:
+    # https://docs.python.org/3/library/collections.abc.html#collections.abc.MutableMapping
+
+    # We get pop, popitem, clear, update, setdefault, __contains__,
+    # keys, items, values, get, __eq__ and __ne__ for free.
+
+    def __getitem__(self, key: str) -> object:
+        return self._object[key]
+
+    def __setitem__(self, key: str, value: object) -> None:
+        self._object[key] = value
+        self._data = None
+
+    def __delitem__(self, key: str) -> None:
+        del self._object[key]
+        self._data = None
+
+    def __iter__(self) -> Iterator[str]:
+        return iter(self._object)
+
+    def __len__(self) -> int:
+        return len(self._object)
+
+    # Dunder methods not related to MutableMapping:
+
+    def __repr__(self) -> str:
+        if self._obj is not None:
+            return f"Message({self._object!r})"
+        return f"Message({bytes(self)!r})"
+
+    def __str__(self) -> str:
+        """Pretty-printed representation of this QMP message."""
+        return json.dumps(self._object, indent=2)
+
+    def __bytes__(self) -> bytes:
+        """bytes representing this QMP message."""
+        if self._data is None:
+            self._data = self._serialize(self._obj or {})
+        return self._data
+
+    # Conversion Methods
+
+    @property
+    def _object(self) -> Dict[str, object]:
+        """
+        A `dict` representing this QMP message.
+
+        Generated on-demand, if required. This property is private
+        because it returns an object that could be used to invalidate
+        the internal state of the `Message` object.
+        """
+        if self._obj is None:
+            self._obj = self._deserialize(self._data or b'{}')
+        return self._obj
+
+    @classmethod
+    def _serialize(cls, value: object) -> bytes:
+        """
+        Serialize a JSON object as `bytes`.
+
+        :raise ValueError: When the object cannot be serialized.
+        :raise TypeError: When the object cannot be serialized.
+
+        :return: `bytes` ready to be sent over the wire.
+        """
+        return json.dumps(value, separators=(',', ':')).encode('utf-8')
+
+    @classmethod
+    def _deserialize(cls, data: bytes) -> Dict[str, object]:
+        """
+        Deserialize JSON `bytes` into a native Python `dict`.
+
+        :raise DeserializationError:
+            If JSON deserialization fails for any reason.
+        :raise UnexpectedTypeError:
+            If the data does not represent a JSON object.
+
+        :return: A `dict` representing this QMP message.
+        """
+        try:
+            obj = json.loads(data)
+        except JSONDecodeError as err:
+            emsg = "Failed to deserialize QMP message."
+            raise DeserializationError(emsg, data) from err
+        if not isinstance(obj, dict):
+            raise UnexpectedTypeError(
+                "QMP message is not a JSON object.",
+                obj
+            )
+        return obj
+
+
+class DeserializationError(ProtocolError):
+    """
+    A QMP message was not understood as JSON.
+
+    When this Exception is raised, ``__cause__`` will be set to the
+    `json.JSONDecodeError` Exception, which can be interrogated for
+    further details.
+
+    :param error_message: Human-readable string describing the error.
+    :param raw: The raw `bytes` that prompted the failure.
+    """
+    def __init__(self, error_message: str, raw: bytes):
+        super().__init__(error_message)
+        #: The raw `bytes` that were not understood as JSON.
+        self.raw: bytes = raw
+
+    def __str__(self) -> str:
+        return "\n".join([
+            super().__str__(),
+            f"  raw bytes were: {str(self.raw)}",
+        ])
+
+
+class UnexpectedTypeError(ProtocolError):
+    """
+    A QMP message was JSON, but not a JSON object.
+
+    :param error_message: Human-readable string describing the error.
+    :param value: The deserialized JSON value that wasn't an object.
+    """
+    def __init__(self, error_message: str, value: object):
+        super().__init__(error_message)
+        #: The JSON value that was expected to be an object.
+        self.value: object = value
+
+    def __str__(self) -> str:
+        strval = json.dumps(self.value, indent=2)
+        return "\n".join([
+            super().__str__(),
+            f"  json value was: {strval}",
+        ])
-- 
2.31.1



^ permalink raw reply related	[flat|nested] 36+ messages in thread

* [PATCH v2 14/24] python/aqmp: add well-known QMP object models
  2021-07-17  0:32 [PATCH v2 00/24] python: introduce Asynchronous QMP package John Snow
                   ` (12 preceding siblings ...)
  2021-07-17  0:32 ` [PATCH v2 13/24] python/aqmp: add QMP Message format John Snow
@ 2021-07-17  0:32 ` John Snow
  2021-07-17  0:32 ` [PATCH v2 15/24] python/aqmp: add QMP event support John Snow
                   ` (10 subsequent siblings)
  24 siblings, 0 replies; 36+ messages in thread
From: John Snow @ 2021-07-17  0:32 UTC (permalink / raw)
  To: qemu-devel
  Cc: Eduardo Habkost, Eric Blake, Stefan Hajnoczi, Markus Armbruster,
	Wainer dos Santos Moschetta, Niteesh G . S .,
	Willian Rampazzo, Cleber Rosa, John Snow

The QMP spec doesn't define very many objects that are iron-clad in
their format, but there are a few. This module makes it trivial to
validate them without relying on an external third-party library.

Signed-off-by: John Snow <jsnow@redhat.com>
---
 python/qemu/aqmp/models.py | 133 +++++++++++++++++++++++++++++++++++++
 1 file changed, 133 insertions(+)
 create mode 100644 python/qemu/aqmp/models.py

diff --git a/python/qemu/aqmp/models.py b/python/qemu/aqmp/models.py
new file mode 100644
index 00000000000..24c94123ac0
--- /dev/null
+++ b/python/qemu/aqmp/models.py
@@ -0,0 +1,133 @@
+"""
+QMP Data Models
+
+This module provides simplistic data classes that represent the few
+structures that the QMP spec mandates; they are used to verify incoming
+data to make sure it conforms to spec.
+"""
+# pylint: disable=too-few-public-methods
+
+from collections import abc
+from typing import (
+    Any,
+    Mapping,
+    Optional,
+    Sequence,
+)
+
+
+class Model:
+    """
+    Abstract data model, representing some QMP object of some kind.
+
+    :param raw: The raw object to be validated.
+    :raise KeyError: If any required fields are absent.
+    :raise TypeError: If any required fields have the wrong type.
+    """
+    def __init__(self, raw: Mapping[str, Any]):
+        self._raw = raw
+
+    def _check_key(self, key: str) -> None:
+        if key not in self._raw:
+            raise KeyError(f"'{self._name}' object requires '{key}' member")
+
+    def _check_value(self, key: str, type_: type, typestr: str) -> None:
+        assert key in self._raw
+        if not isinstance(self._raw[key], type_):
+            raise TypeError(
+                f"'{self._name}' member '{key}' must be a {typestr}"
+            )
+
+    def _check_member(self, key: str, type_: type, typestr: str) -> None:
+        self._check_key(key)
+        self._check_value(key, type_, typestr)
+
+    @property
+    def _name(self) -> str:
+        return type(self).__name__
+
+    def __repr__(self) -> str:
+        return f"{self._name}({self._raw!r})"
+
+
+class Greeting(Model):
+    """
+    Defined in qmp-spec.txt, section 2.2, "Server Greeting".
+
+    :param raw: The raw Greeting object.
+    :raise KeyError: If any required fields are absent.
+    :raise TypeError: If any required fields have the wrong type.
+    """
+    def __init__(self, raw: Mapping[str, Any]):
+        super().__init__(raw)
+        #: 'QMP' member
+        self.QMP: QMPGreeting  # pylint: disable=invalid-name
+
+        self._check_member('QMP', abc.Mapping, "JSON object")
+        self.QMP = QMPGreeting(self._raw['QMP'])
+
+
+class QMPGreeting(Model):
+    """
+    Defined in qmp-spec.txt, section 2.2, "Server Greeting".
+
+    :param raw: The raw QMPGreeting object.
+    :raise KeyError: If any required fields are absent.
+    :raise TypeError: If any required fields have the wrong type.
+    """
+    def __init__(self, raw: Mapping[str, Any]):
+        super().__init__(raw)
+        #: 'version' member
+        self.version: Mapping[str, object]
+        #: 'capabilities' member
+        self.capabilities: Sequence[object]
+
+        self._check_member('version', abc.Mapping, "JSON object")
+        self.version = self._raw['version']
+
+        self._check_member('capabilities', abc.Sequence, "JSON array")
+        self.capabilities = self._raw['capabilities']
+
+
+class ErrorResponse(Model):
+    """
+    Defined in qmp-spec.txt, section 2.4.2, "error".
+
+    :param raw: The raw ErrorResponse object.
+    :raise KeyError: If any required fields are absent.
+    :raise TypeError: If any required fields have the wrong type.
+    """
+    def __init__(self, raw: Mapping[str, Any]):
+        super().__init__(raw)
+        #: 'error' member
+        self.error: ErrorInfo
+        #: 'id' member
+        self.id: Optional[object] = None  # pylint: disable=invalid-name
+
+        self._check_member('error', abc.Mapping, "JSON object")
+        self.error = ErrorInfo(self._raw['error'])
+
+        if 'id' in raw:
+            self.id = raw['id']
+
+
+class ErrorInfo(Model):
+    """
+    Defined in qmp-spec.txt, section 2.4.2, "error".
+
+    :param raw: The raw ErrorInfo object.
+    :raise KeyError: If any required fields are absent.
+    :raise TypeError: If any required fields have the wrong type.
+    """
+    def __init__(self, raw: Mapping[str, Any]):
+        super().__init__(raw)
+        #: 'class' member, with an underscore to avoid conflicts in Python.
+        self.class_: str
+        #: 'desc' member
+        self.desc: str
+
+        self._check_member('class', str, "string")
+        self.class_ = self._raw['class']
+
+        self._check_member('desc', str, "string")
+        self.desc = self._raw['desc']
-- 
2.31.1



^ permalink raw reply related	[flat|nested] 36+ messages in thread

* [PATCH v2 15/24] python/aqmp: add QMP event support
  2021-07-17  0:32 [PATCH v2 00/24] python: introduce Asynchronous QMP package John Snow
                   ` (13 preceding siblings ...)
  2021-07-17  0:32 ` [PATCH v2 14/24] python/aqmp: add well-known QMP object models John Snow
@ 2021-07-17  0:32 ` John Snow
  2021-07-17  0:32 ` [PATCH v2 16/24] python/pylint: disable too-many-function-args John Snow
                   ` (9 subsequent siblings)
  24 siblings, 0 replies; 36+ messages in thread
From: John Snow @ 2021-07-17  0:32 UTC (permalink / raw)
  To: qemu-devel
  Cc: Eduardo Habkost, Eric Blake, Stefan Hajnoczi, Markus Armbruster,
	Wainer dos Santos Moschetta, Niteesh G . S .,
	Willian Rampazzo, Cleber Rosa, John Snow

This class was designed as a "mix-in" primarily so that the feature
could be given its own treatment in its own python module.

It gets quite a bit too long otherwise.

Signed-off-by: John Snow <jsnow@redhat.com>
---
 python/qemu/aqmp/__init__.py |   2 +
 python/qemu/aqmp/events.py   | 706 +++++++++++++++++++++++++++++++++++
 2 files changed, 708 insertions(+)
 create mode 100644 python/qemu/aqmp/events.py

diff --git a/python/qemu/aqmp/__init__.py b/python/qemu/aqmp/__init__.py
index 035987c756c..084eb5ab3e8 100644
--- a/python/qemu/aqmp/__init__.py
+++ b/python/qemu/aqmp/__init__.py
@@ -22,6 +22,7 @@
 # the COPYING file in the top-level directory.
 
 from .error import AQMPError
+from .events import EventListener
 from .message import Message
 from .protocol import ConnectError, Runstate
 
@@ -30,6 +31,7 @@
 __all__ = (
     # Classes, most to least important
     'Message',
+    'EventListener',
     'Runstate',
 
     # Exceptions, most generic to most explicit
diff --git a/python/qemu/aqmp/events.py b/python/qemu/aqmp/events.py
new file mode 100644
index 00000000000..fb81d216102
--- /dev/null
+++ b/python/qemu/aqmp/events.py
@@ -0,0 +1,706 @@
+"""
+AQMP Events and EventListeners
+
+Asynchronous QMP uses `EventListener` objects to listen for events. An
+`EventListener` is a FIFO event queue that can be pre-filtered to listen
+for only specific events. Each `EventListener` instance receives its own
+copy of events that it hears, so events may be consumed without fear or
+worry for depriving other listeners of events they need to hear.
+
+
+EventListener Tutorial
+----------------------
+
+In all of the following examples, we assume that we have a `QMPClient`
+instantiated named ``qmp`` that is already connected.
+
+
+`listener()` context blocks with one name
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The most basic usage is by using the `listener()` context manager to
+construct them:
+
+.. code:: python
+
+   with qmp.listener('STOP') as listener:
+       await qmp.execute('stop')
+       await listener.get()
+
+The listener is active only for the duration of the ‘with’ block. This
+instance listens only for ‘STOP’ events.
+
+
+`listener()` context blocks with two or more names
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Multiple events can be selected for by providing any ``Iterable[str]``:
+
+.. code:: python
+
+   with qmp.listener(('STOP', 'RESUME')) as listener:
+       await qmp.execute('stop')
+       event = await listener.get()
+       assert event['event'] == 'STOP'
+
+       await qmp.execute('cont')
+       event = await listener.get()
+       assert event['event'] == 'RESUME'
+
+
+`listener()` context blocks with no names
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+By omitting names entirely, you can listen to ALL events.
+
+.. code:: python
+
+   with qmp.listener() as listener:
+       await qmp.execute('stop')
+       event = await listener.get()
+       assert event['event'] == 'STOP'
+
+This isn’t a very good use case for this feature: In a non-trivial
+running system, we may not know what event will arrive next. Grabbing
+the top of a FIFO queue returning multiple kinds of events may be prone
+to error.
+
+
+Using async iterators to retrieve events
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+If you’d like to simply watch what events happen to arrive, you can use
+the listener as an async iterator:
+
+.. code:: python
+
+   with qmp.listener() as listener:
+       async for event in listener:
+           print(f"Event arrived: {event['event']}")
+
+This is analogous to the following code:
+
+.. code:: python
+
+   with qmp.listener() as listener:
+       while True:
+           event = listener.get()
+           print(f"Event arrived: {event['event']}")
+
+This event stream will never end, so these blocks will never terminate.
+
+
+Using asyncio.Task to concurrently retrieve events
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Since a listener’s event stream will never terminate, it is not likely
+useful to use that form in a script. For longer-running clients, we can
+create event handlers by using `asyncio.Task` to create concurrent
+coroutines:
+
+.. code:: python
+
+   async def print_events(listener):
+       try:
+           async for event in listener:
+               print(f"Event arrived: {event['event']}")
+       except asyncio.CancelledError:
+           return
+
+   with qmp.listener() as listener:
+       task = asyncio.Task(print_events(listener))
+       await qmp.execute('stop')
+       await qmp.execute('cont')
+       task.cancel()
+       await task
+
+However, there is no guarantee that these events will be received by the
+time we leave this context block. Once the context block is exited, the
+listener will cease to hear any new events, and becomes inert.
+
+Be mindful of the timing: the above example will *probably*– but does
+not *guarantee*– that both STOP/RESUMED events will be printed. The
+example below outlines how to use listeners outside of a context block.
+
+
+Using `register_listener()` and `remove_listener()`
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+To create a listener with a longer lifetime, beyond the scope of a
+single block, create a listener and then call `register_listener()`:
+
+.. code:: python
+
+   class MyClient:
+       def __init__(self, qmp):
+           self.qmp = qmp
+           self.listener = EventListener()
+
+       async def print_events(self):
+           try:
+               async for event in self.listener:
+                   print(f"Event arrived: {event['event']}")
+           except asyncio.CancelledError:
+               return
+
+       async def run(self):
+           self.task = asyncio.Task(self.print_events)
+           self.qmp.register_listener(self.listener)
+           await qmp.execute('stop')
+           await qmp.execute('cont')
+
+       async def stop(self):
+           self.task.cancel()
+           await self.task
+           self.qmp.remove_listener(self.listener)
+
+The listener can be deactivated by using `remove_listener()`. When it is
+removed, any possible pending events are cleared and it can be
+re-registered at a later time.
+
+
+Using the built-in all events listener
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The `QMPClient` object creates its own default listener named
+:py:obj:`~Events.events` that can be used for the same purpose without
+having to create your own:
+
+.. code:: python
+
+   async def print_events(listener):
+       try:
+           async for event in listener:
+               print(f"Event arrived: {event['event']}")
+       except asyncio.CancelledError:
+           return
+
+   task = asyncio.Task(print_events(qmp.events))
+
+   await qmp.execute('stop')
+   await qmp.execute('cont')
+
+   task.cancel()
+   await task
+
+
+Using both .get() and async iterators
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The async iterator and `get()` methods pull events from the same FIFO
+queue. If you mix the usage of both, be aware: Events are emitted
+precisely once per listener.
+
+If multiple contexts try to pull events from the same listener instance,
+events are still emitted only precisely once.
+
+This restriction can be lifted by creating additional listeners.
+
+
+Creating multiple listeners
+~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Additional `EventListener` objects can be created at-will. Each one
+receives its own copy of events, with separate FIFO event queues.
+
+.. code:: python
+
+   my_listener = EventListener()
+   qmp.register_listener(my_listener)
+
+   await qmp.execute('stop')
+   copy1 = await my_listener.get()
+   copy2 = await qmp.events.get()
+
+   assert copy1 == copy2
+
+In this example, we await an event from both a user-created
+`EventListener` and the built-in events listener. Both receive the same
+event.
+
+
+Clearing listeners
+~~~~~~~~~~~~~~~~~~
+
+`EventListener` objects can be cleared, clearing all events seen thus far:
+
+.. code:: python
+
+   await qmp.execute('stop')
+   qmp.events.clear()
+   await qmp.execute('cont')
+   event = await qmp.events.get()
+   assert event['event'] == 'RESUME'
+
+`EventListener` objects are FIFO queues. If events are not consumed,
+they will remain in the queue until they are witnessed or discarded via
+`clear()`. FIFO queues will be drained automatically upon leaving a
+context block, or when calling `remove_listener()`.
+
+
+Accessing listener history
+~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+`EventListener` objects record their history. Even after being cleared,
+you can obtain a record of all events seen so far:
+
+.. code:: python
+
+   await qmp.execute('stop')
+   await qmp.execute('cont')
+   qmp.events.clear()
+
+   assert len(qmp.events.history) == 2
+   assert qmp.events.history[0]['event'] == 'STOP'
+   assert qmp.events.history[1]['event'] == 'RESUME'
+
+The history is updated immediately and does not require the event to be
+witnessed first.
+
+
+Using event filters
+~~~~~~~~~~~~~~~~~~~
+
+`EventListener` objects can be given complex filtering criteria if names
+are not sufficient:
+
+.. code:: python
+
+   def job1_filter(event) -> bool:
+       event_data = event.get('data', {})
+       event_job_id = event_data.get('id')
+       return event_job_id == "job1"
+
+   with qmp.listener('JOB_STATUS_CHANGE', job1_filter) as listener:
+       await qmp.execute('blockdev-backup', arguments={'job-id': 'job1', ...})
+       async for event in listener:
+           if event['data']['status'] == 'concluded':
+               break
+
+These filters might be most useful when parameterized. `EventListener`
+objects expect a function that takes only a single argument (the raw
+event, as a `Message`) and returns a bool; True if the event should be
+accepted into the stream. You can create a function that adapts this
+signature to accept configuration parameters:
+
+.. code:: python
+
+   def job_filter(job_id: str) -> EventFilter:
+       def filter(event: Message) -> bool:
+           return event['data']['id'] == job_id
+       return filter
+
+   with qmp.listener('JOB_STATUS_CHANGE', job_filter('job2')) as listener:
+       await qmp.execute('blockdev-backup', arguments={'job-id': 'job2', ...})
+       async for event in listener:
+           if event['data']['status'] == 'concluded':
+               break
+
+
+Activating an existing listener with `listen()`
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Listeners with complex, long configurations can also be created manually
+and activated temporarily by using `listen()` instead of `listener()`:
+
+.. code:: python
+
+   listener = EventListener(('BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED',
+                             'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY',
+                             'BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'))
+
+   with qmp.listen(listener):
+       await qmp.execute('blockdev-backup', arguments={'job-id': 'job3', ...})
+       async for event in listener:
+           print(event)
+           if event['event'] == 'BLOCK_JOB_COMPLETED':
+               break
+
+Any events that are not witnessed by the time the block is left will be
+cleared from the queue; entering the block is an implicit
+`register_listener()` and leaving the block is an implicit
+`remove_listener()`.
+
+
+Activating multiple existing listeners with `listen()`
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+While `listener()` is only capable of creating a single listener,
+`listen()` is capable of activating multiple listeners simultaneously:
+
+.. code:: python
+
+   def job_filter(job_id: str) -> EventFilter:
+       def filter(event: Message) -> bool:
+           return event['data']['id'] == job_id
+       return filter
+
+   jobA = EventListener('JOB_STATUS_CHANGE', job_filter('jobA'))
+   jobB = EventListener('JOB_STATUS_CHANGE', job_filter('jobB'))
+
+   with qmp.listen(jobA, jobB):
+       qmp.execute('blockdev-create', arguments={'job-id': 'jobA', ...})
+       qmp.execute('blockdev-create', arguments={'job-id': 'jobB', ...})
+
+       async for event in jobA.get():
+           if event['data']['status'] == 'concluded':
+               break
+       async for event in jobB.get():
+           if event['data']['status'] == 'concluded':
+               break
+
+
+Extending the `EventListener` class
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+In the case that a more specialized `EventListener` is desired to
+provide either more functionality or more compact syntax for specialized
+cases, it can be extended.
+
+One of the key methods to extend or override is
+:py:meth:`~EventListener.accept()`. The default implementation checks an
+incoming message for:
+
+1. A qualifying name, if any :py:obj:`~EventListener.names` were
+   specified at initialization time
+2. That :py:obj:`~EventListener.event_filter()` returns True.
+
+This can be modified however you see fit to change the criteria for
+inclusion in the stream.
+
+For convenience, a ``JobListener`` class could be created that simply
+bakes in configuration so it does not need to be repeated:
+
+.. code:: python
+
+   class JobListener(EventListener):
+       def __init__(self, job_id: str):
+           super().__init__(('BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED',
+                             'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY',
+                             'BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'))
+           self.job_id = job_id
+
+       def accept(self, event) -> bool:
+           if not super().accept(event):
+               return False
+           if event['event'] in ('BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'):
+               return event['data']['id'] == job_id
+           return event['data']['device'] == job_id
+
+From here on out, you can conjure up a custom-purpose listener that
+listens only for job-related events for a specific job-id easily:
+
+.. code:: python
+
+   listener = JobListener('job4')
+   with qmp.listener(listener):
+       await qmp.execute('blockdev-backup', arguments={'job-id': 'job4', ...})
+       async for event in listener:
+           print(event)
+           if event['event'] == 'BLOCK_JOB_COMPLETED':
+               break
+
+
+Experimental Interfaces & Design Issues
+---------------------------------------
+
+These interfaces are not ones I am sure I will keep or otherwise modify
+heavily.
+
+qmp.listener()’s type signature
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+`listener()` does not return anything, because it was assumed the caller
+already had a handle to the listener. However, for
+``qmp.listener(EventListener())`` forms, the caller will not have saved
+a handle to the listener.
+
+Because this function can accept *many* listeners, I found it hard to
+accurately type in a way where it could be used in both “one” or “many”
+forms conveniently and in a statically type-safe manner.
+
+Ultimately, I removed the return altogether, but perhaps with more time
+I can work out a way to re-add it.
+
+
+API Reference
+-------------
+
+"""
+
+import asyncio
+from contextlib import contextmanager
+import logging
+from typing import (
+    AsyncIterator,
+    Callable,
+    Iterable,
+    Iterator,
+    List,
+    Optional,
+    Set,
+    Tuple,
+    Union,
+)
+
+from .error import AQMPError
+from .message import Message
+
+
+EventNames = Union[str, Iterable[str], None]
+EventFilter = Callable[[Message], bool]
+
+
+class ListenerError(AQMPError):
+    """
+    Generic error class for `EventListener`-related problems.
+    """
+
+
+class EventListener:
+    """
+    Selectively listens for events with runtime configurable filtering.
+
+    This class is designed to be directly usable for the most common cases,
+    but it can be extended to provide more rigorous control.
+
+    :param names:
+        One or more names of events to listen for.
+        When not provided, listen for ALL events.
+    :param event_filter:
+        An optional event filtering function.
+        When names are also provided, this acts as a secondary filter.
+
+    When ``names`` and ``event_filter`` are both provided, the names
+    will be filtered first, and then the filter function will be called
+    second. The event filter function can assume that the format of the
+    event is a known format.
+    """
+    def __init__(
+        self,
+        names: EventNames = None,
+        event_filter: Optional[EventFilter] = None,
+    ):
+        # Queue of 'heard' events yet to be witnessed by a caller.
+        self._queue: 'asyncio.Queue[Message]' = asyncio.Queue()
+
+        # Intended as a historical record, NOT a processing queue or backlog.
+        self._history: List[Message] = []
+
+        #: Primary event filter, based on one or more event names.
+        self.names: Set[str] = set()
+        if isinstance(names, str):
+            self.names.add(names)
+        elif names is not None:
+            self.names.update(names)
+
+        #: Optional, secondary event filter.
+        self.event_filter: Optional[EventFilter] = event_filter
+
+    @property
+    def history(self) -> Tuple[Message, ...]:
+        """
+        A read-only history of all events seen so far.
+
+        This represents *every* event, including those not yet witnessed
+        via `get()` or ``async for``. It persists between `clear()`
+        calls and is immutable.
+        """
+        return tuple(self._history)
+
+    def accept(self, event: Message) -> bool:
+        """
+        Determine if this listener accepts this event.
+
+        This method determines which events will appear in the stream.
+        The default implementation simply checks the event against the
+        list of names and the event_filter to decide if this
+        `EventListener` accepts a given event. It can be
+        overridden/extended to provide custom listener behavior.
+
+        User code is not expected to need to invoke this method.
+
+        :param event: The event under consideration.
+        :return: `True`, if this listener accepts this event.
+        """
+        name_ok = (not self.names) or (event['event'] in self.names)
+        return name_ok and (
+            (not self.event_filter) or self.event_filter(event)
+        )
+
+    async def put(self, event: Message) -> None:
+        """
+        Conditionally put a new event into the FIFO queue.
+
+        This method is not designed to be invoked from user code, and it
+        should not need to be overridden. It is a public interface so
+        that `QMPClient` has an interface by which it can inform
+        registered listeners of new events.
+
+        The event will be put into the queue if
+        :py:meth:`~EventListener.accept()` returns `True`.
+
+        :param event: The new event to put into the FIFO queue.
+        """
+        if not self.accept(event):
+            return
+
+        self._history.append(event)
+        await self._queue.put(event)
+
+    async def get(self) -> Message:
+        """
+        Wait for the very next event in this stream.
+
+        If one is already available, return that one.
+        """
+        return await self._queue.get()
+
+    def clear(self) -> None:
+        """
+        Clear this listener of all pending events.
+
+        Called when an `EventListener` is being unregistered, this clears the
+        pending FIFO queue synchronously. It can be also be used to
+        manually clear any pending events, if desired.
+
+        .. warning::
+            Take care when discarding events. Cleared events will be
+            silently tossed on the floor. All events that were ever
+            accepted by this listener are visible in `history()`.
+        """
+        while True:
+            try:
+                self._queue.get_nowait()
+            except asyncio.QueueEmpty:
+                break
+
+    def __aiter__(self) -> AsyncIterator[Message]:
+        return self
+
+    async def __anext__(self) -> Message:
+        """
+        Enables the `EventListener` to function as an async iterator.
+
+        It may be used like this:
+
+        .. code:: python
+
+            async for event in listener:
+                print(event)
+
+        These iterators will never terminate of their own accord; you
+        must provide break conditions or otherwise prepare to run them
+        in an `asyncio.Task` that can be cancelled.
+        """
+        return await self.get()
+
+
+class Events:
+    """
+    Events is a mix-in class that adds event functionality to the QMP class.
+
+    It's designed specifically as a mix-in for `QMPClient`, and it
+    relies upon the class it is being mixed into having a 'logger'
+    property.
+    """
+    def __init__(self) -> None:
+        self._listeners: List[EventListener] = []
+
+        #: Default, all-events `EventListener`.
+        self.events: EventListener = EventListener()
+        self.register_listener(self.events)
+
+        # Parent class needs to have a logger
+        self.logger: logging.Logger
+
+    async def _event_dispatch(self, msg: Message) -> None:
+        """
+        Given a new event, propagate it to all of the active listeners.
+
+        :param msg: The event to propagate.
+        """
+        for listener in self._listeners:
+            await listener.put(msg)
+
+    def register_listener(self, listener: EventListener) -> None:
+        """
+        Register and activate an `EventListener`.
+
+        :param listener: The listener to activate.
+        :raise ListenerError: If the given listener is already registered.
+        """
+        if listener in self._listeners:
+            raise ListenerError("Attempted to re-register existing listener")
+        self.logger.debug("Registering %s.", str(listener))
+        self._listeners.append(listener)
+
+    def remove_listener(self, listener: EventListener) -> None:
+        """
+        Unregister and deactivate an `EventListener`.
+
+        The removed listener will have its pending events cleared via
+        `clear()`. The listener can be re-registered later when
+        desired.
+
+        :param listener: The listener to deactivate.
+        :raise ListenerError: If the given listener is not registered.
+        """
+        if listener == self.events:
+            raise ListenerError("Cannot remove the default listener.")
+        self.logger.debug("Removing %s.", str(listener))
+        listener.clear()
+        self._listeners.remove(listener)
+
+    @contextmanager
+    def listen(self, *listeners: EventListener) -> Iterator[None]:
+        r"""
+        Context manager: Temporarily listen with an `EventListener`.
+
+        Accepts one or more `EventListener` objects and registers them,
+        activating them for the duration of the context block.
+
+        `EventListener` objects will have any pending events in their
+        FIFO queue cleared upon exiting the context block, when they are
+        deactivated.
+
+        :param \*listeners: One or more EventListeners to activate.
+        :raise ListenerError: If the given listener(s) are already active.
+        """
+        _added = []
+
+        try:
+            for listener in listeners:
+                self.register_listener(listener)
+                _added.append(listener)
+
+            yield
+
+        finally:
+            for listener in _added:
+                self.remove_listener(listener)
+
+    @contextmanager
+    def listener(
+        self,
+        names: EventNames = (),
+        event_filter: Optional[EventFilter] = None
+    ) -> Iterator[EventListener]:
+        """
+        Context manager: Temporarily listen with a new `EventListener`.
+
+        Creates an `EventListener` object and registers it, activating
+        it for the duration of the context block.
+
+        :param names:
+            One or more names of events to listen for.
+            When not provided, listen for ALL events.
+        :param event_filter:
+            An optional event filtering function.
+            When names are also provided, this acts as a secondary filter.
+
+        :return: The newly created and active `EventListener`.
+        """
+        listener = EventListener(names, event_filter)
+        with self.listen(listener):
+            yield listener
-- 
2.31.1



^ permalink raw reply related	[flat|nested] 36+ messages in thread

* [PATCH v2 16/24] python/pylint: disable too-many-function-args
  2021-07-17  0:32 [PATCH v2 00/24] python: introduce Asynchronous QMP package John Snow
                   ` (14 preceding siblings ...)
  2021-07-17  0:32 ` [PATCH v2 15/24] python/aqmp: add QMP event support John Snow
@ 2021-07-17  0:32 ` John Snow
  2021-07-17  0:32 ` [PATCH v2 17/24] python/aqmp: add QMP protocol support John Snow
                   ` (8 subsequent siblings)
  24 siblings, 0 replies; 36+ messages in thread
From: John Snow @ 2021-07-17  0:32 UTC (permalink / raw)
  To: qemu-devel
  Cc: Eduardo Habkost, Eric Blake, Stefan Hajnoczi, Markus Armbruster,
	Wainer dos Santos Moschetta, Niteesh G . S .,
	Willian Rampazzo, Cleber Rosa, John Snow

too-many-function-args seems prone to failure when considering
things like Method Resolution Order, which mypy gets correct. When
dealing with multiple inheritance, pylint doesn't seem to understand
which method will actually get called, while mypy does.

Remove the less powerful, redundant check.

Signed-off-by: John Snow <jsnow@redhat.com>
---
 python/setup.cfg | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/python/setup.cfg b/python/setup.cfg
index f87e32177ab..19d5e154630 100644
--- a/python/setup.cfg
+++ b/python/setup.cfg
@@ -88,7 +88,7 @@ ignore_missing_imports = True
 # --enable=similarities". If you want to run only the classes checker, but have
 # no Warning level messages displayed, use "--disable=all --enable=classes
 # --disable=W".
-disable=
+disable=too-many-function-args,  # mypy handles this with less false positives.
 
 [pylint.basic]
 # Good variable names which should always be accepted, separated by a comma.
-- 
2.31.1



^ permalink raw reply related	[flat|nested] 36+ messages in thread

* [PATCH v2 17/24] python/aqmp: add QMP protocol support
  2021-07-17  0:32 [PATCH v2 00/24] python: introduce Asynchronous QMP package John Snow
                   ` (15 preceding siblings ...)
  2021-07-17  0:32 ` [PATCH v2 16/24] python/pylint: disable too-many-function-args John Snow
@ 2021-07-17  0:32 ` John Snow
  2021-07-17  0:32 ` [PATCH v2 18/24] python/pylint: disable no-member check John Snow
                   ` (7 subsequent siblings)
  24 siblings, 0 replies; 36+ messages in thread
From: John Snow @ 2021-07-17  0:32 UTC (permalink / raw)
  To: qemu-devel
  Cc: Eduardo Habkost, Eric Blake, Stefan Hajnoczi, Markus Armbruster,
	Wainer dos Santos Moschetta, Niteesh G . S .,
	Willian Rampazzo, Cleber Rosa, John Snow

The star of our show!

Add most of the QMP protocol, sans support for actually executing
commands. No problem, that happens in the next several commits.

Signed-off-by: John Snow <jsnow@redhat.com>
---
 python/qemu/aqmp/__init__.py   |   2 +
 python/qemu/aqmp/qmp_client.py | 264 +++++++++++++++++++++++++++++++++
 2 files changed, 266 insertions(+)
 create mode 100644 python/qemu/aqmp/qmp_client.py

diff --git a/python/qemu/aqmp/__init__.py b/python/qemu/aqmp/__init__.py
index 084eb5ab3e8..7bd66a48bed 100644
--- a/python/qemu/aqmp/__init__.py
+++ b/python/qemu/aqmp/__init__.py
@@ -25,11 +25,13 @@
 from .events import EventListener
 from .message import Message
 from .protocol import ConnectError, Runstate
+from .qmp_client import QMPClient
 
 
 # The order of these fields impact the Sphinx documentation order.
 __all__ = (
     # Classes, most to least important
+    'QMPClient',
     'Message',
     'EventListener',
     'Runstate',
diff --git a/python/qemu/aqmp/qmp_client.py b/python/qemu/aqmp/qmp_client.py
new file mode 100644
index 00000000000..000ff59c7a7
--- /dev/null
+++ b/python/qemu/aqmp/qmp_client.py
@@ -0,0 +1,264 @@
+"""
+QMP Protocol Implementation
+
+This module provides the `QMPClient` class, which can be used to connect
+and send commands to a QMP server such as QEMU. The QMP class can be
+used to either connect to a listening server, or used to listen and
+accept an incoming connection from that server.
+"""
+
+import logging
+from typing import (
+    Dict,
+    List,
+    Mapping,
+    Optional,
+)
+
+from .error import ProtocolError
+from .events import Events
+from .message import Message
+from .models import Greeting
+from .protocol import AsyncProtocol
+from .util import (
+    bottom_half,
+    exception_summary,
+    pretty_traceback,
+    upper_half,
+)
+
+
+class _WrappedProtocolError(ProtocolError):
+    """
+    Abstract exception class for Protocol errors that wrap an Exception.
+
+    :param error_message: Human-readable string describing the error.
+    :param exc: The root-cause exception.
+    """
+    def __init__(self, error_message: str, exc: Exception):
+        super().__init__(error_message)
+        self.exc = exc
+
+    def __str__(self) -> str:
+        return f"{self.error_message}: {self.exc!s}"
+
+
+class GreetingError(_WrappedProtocolError):
+    """
+    An exception occurred during the Greeting phase.
+
+    :param error_message: Human-readable string describing the error.
+    :param exc: The root-cause exception.
+    """
+
+
+class NegotiationError(_WrappedProtocolError):
+    """
+    An exception occurred during the Negotiation phase.
+
+    :param error_message: Human-readable string describing the error.
+    :param exc: The root-cause exception.
+    """
+
+
+class QMPClient(AsyncProtocol[Message], Events):
+    """
+    Implements a QMP client connection.
+
+    QMP can be used to establish a connection as either the transport
+    client or server, though this class always acts as the QMP client.
+
+    :param name: Optional nickname for the connection, used for logging.
+
+    Basic script-style usage looks like this::
+
+      qmp = QMPClient('my_virtual_machine_name')
+      await qmp.connect(('127.0.0.1', 1234))
+      ...
+      res = await qmp.execute('block-query')
+      ...
+      await qmp.disconnect()
+
+    Basic async client-style usage looks like this::
+
+      class Client:
+          def __init__(self, name: str):
+              self.qmp = QMPClient(name)
+
+          async def watch_events(self):
+              try:
+                  async for event in self.qmp.events:
+                      print(f"Event: {event['event']}")
+              except asyncio.CancelledError:
+                  return
+
+          async def run(self, address='/tmp/qemu.socket'):
+              await self.qmp.connect(address)
+              asyncio.create_task(self.watch_events())
+              await self.qmp.runstate_changed.wait()
+              await self.disconnect()
+
+    See `aqmp.events` for more detail on event handling patterns.
+    """
+    #: Logger object used for debugging messages.
+    logger = logging.getLogger(__name__)
+
+    # Read buffer limit; large enough to accept query-qmp-schema
+    _limit = (256 * 1024)
+
+    def __init__(self, name: Optional[str] = None) -> None:
+        super().__init__(name)
+        Events.__init__(self)
+
+        #: Whether or not to await a greeting after establishing a connection.
+        self.await_greeting: bool = True
+
+        #: Whether or not to perform capabilities negotiation upon connection.
+        #: Implies `await_greeting`.
+        self.negotiate: bool = True
+
+        # Cached Greeting, if one was awaited.
+        self._greeting: Optional[Greeting] = None
+
+    @upper_half
+    async def _establish_session(self) -> None:
+        """
+        Initiate the QMP session.
+
+        Wait for the QMP greeting and perform capabilities negotiation.
+
+        :raise GreetingError: When the greeting is not understood.
+        :raise NegotiationError: If the negotiation fails.
+        :raise EOFError: When the server unexpectedly hangs up.
+        :raise OSError: For underlying stream errors.
+        """
+        if self.await_greeting or self.negotiate:
+            self._greeting = await self._get_greeting()
+
+        if self.negotiate:
+            await self._negotiate()
+
+        # This will start the reader/writers:
+        await super()._establish_session()
+
+    @upper_half
+    async def _get_greeting(self) -> Greeting:
+        """
+        :raise GreetingError: When the greeting is not understood.
+        :raise EOFError: When the server unexpectedly hangs up.
+        :raise OSError: For underlying stream errors.
+
+        :return: the Greeting object given by the server.
+        """
+        self.logger.debug("Awaiting greeting ...")
+
+        try:
+            msg = await self._recv()
+            return Greeting(msg)
+        except (ProtocolError, KeyError, TypeError) as err:
+            emsg = "Did not understand Greeting"
+            self.logger.error("%s: %s", emsg, exception_summary(err))
+            self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
+            raise GreetingError(emsg, err) from err
+        except BaseException as err:
+            # EOFError, OSError, or something unexpected.
+            emsg = "Failed to receive Greeting"
+            self.logger.error("%s: %s", emsg, exception_summary(err))
+            self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
+            raise
+
+    @upper_half
+    async def _negotiate(self) -> None:
+        """
+        Perform QMP capabilities negotiation.
+
+        :raise NegotiationError: When negotiation fails.
+        :raise EOFError: When the server unexpectedly hangs up.
+        :raise OSError: For underlying stream errors.
+        """
+        self.logger.debug("Negotiating capabilities ...")
+
+        arguments: Dict[str, List[str]] = {'enable': []}
+        if self._greeting and 'oob' in self._greeting.QMP.capabilities:
+            arguments['enable'].append('oob')
+        msg = self.make_execute_msg('qmp_capabilities', arguments=arguments)
+
+        # It's not safe to use execute() here, because the reader/writers
+        # aren't running. AsyncProtocol *requires* that a new session
+        # does not fail after the reader/writers are running!
+        try:
+            await self._send(msg)
+            reply = await self._recv()
+            assert 'return' in reply
+            assert 'error' not in reply
+        except (ProtocolError, AssertionError) as err:
+            emsg = "Negotiation failed"
+            self.logger.error("%s: %s", emsg, exception_summary(err))
+            self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
+            raise NegotiationError(emsg, err) from err
+        except BaseException as err:
+            # EOFError, OSError, or something unexpected.
+            emsg = "Negotiation failed"
+            self.logger.error("%s: %s", emsg, exception_summary(err))
+            self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
+            raise
+
+    @bottom_half
+    async def _on_message(self, msg: Message) -> None:
+        """
+        Add an incoming message to the appropriate queue/handler.
+        """
+        # Incoming messages are not fully parsed/validated here;
+        # do only light peeking to know how to route the messages.
+
+        if 'event' in msg:
+            await self._event_dispatch(msg)
+            return
+
+        # Below, we assume everything left is an execute/exec-oob response.
+        # ... Which we'll implement in the next commit!
+
+    @upper_half
+    @bottom_half
+    async def _do_recv(self) -> Message:
+        """
+        :raise OSError: When a stream error is encountered.
+        :raise EOFError: When the stream is at EOF.
+        :raise ProtocolError:
+            When the Message is not understood.
+            See also `Message._deserialize`.
+
+        :return: A single QMP `Message`.
+        """
+        msg_bytes = await self._readline()
+        msg = Message(msg_bytes, eager=True)
+        return msg
+
+    @upper_half
+    @bottom_half
+    def _do_send(self, msg: Message) -> None:
+        """
+        :raise ValueError: JSON serialization failure
+        :raise TypeError: JSON serialization failure
+        :raise OSError: When a stream error is encountered.
+        """
+        assert self._writer is not None
+        self._writer.write(bytes(msg))
+
+    @classmethod
+    def make_execute_msg(cls, cmd: str,
+                         arguments: Optional[Mapping[str, object]] = None,
+                         oob: bool = False) -> Message:
+        """
+        Create an executable message to be sent later.
+
+        :param cmd: QMP command name.
+        :param arguments: Arguments (if any). Must be JSON-serializable.
+        :param oob: If `True`, execute "out of band".
+
+        :return: An executable QMP `Message`.
+        """
+        msg = Message({'exec-oob' if oob else 'execute': cmd})
+        if arguments is not None:
+            msg['arguments'] = arguments
+        return msg
-- 
2.31.1



^ permalink raw reply related	[flat|nested] 36+ messages in thread

* [PATCH v2 18/24] python/pylint: disable no-member check
  2021-07-17  0:32 [PATCH v2 00/24] python: introduce Asynchronous QMP package John Snow
                   ` (16 preceding siblings ...)
  2021-07-17  0:32 ` [PATCH v2 17/24] python/aqmp: add QMP protocol support John Snow
@ 2021-07-17  0:32 ` John Snow
  2021-07-17  0:32 ` [PATCH v2 19/24] python/aqmp: Add message routing to QMP protocol John Snow
                   ` (6 subsequent siblings)
  24 siblings, 0 replies; 36+ messages in thread
From: John Snow @ 2021-07-17  0:32 UTC (permalink / raw)
  To: qemu-devel
  Cc: Eduardo Habkost, Eric Blake, Stefan Hajnoczi, Markus Armbruster,
	Wainer dos Santos Moschetta, Niteesh G . S .,
	Willian Rampazzo, Cleber Rosa, John Snow

mypy handles this better -- but we only need the workaround because
pylint under Python 3.6 does not understand that a MutableMapping really
does have a .get() method attached.

We could remove this again once 3.7 is our minimum.

Signed-off-by: John Snow <jsnow@redhat.com>
---
 python/setup.cfg | 1 +
 1 file changed, 1 insertion(+)

diff --git a/python/setup.cfg b/python/setup.cfg
index 19d5e154630..2573cd7bfb3 100644
--- a/python/setup.cfg
+++ b/python/setup.cfg
@@ -89,6 +89,7 @@ ignore_missing_imports = True
 # no Warning level messages displayed, use "--disable=all --enable=classes
 # --disable=W".
 disable=too-many-function-args,  # mypy handles this with less false positives.
+        no-member,  # mypy also handles this better.
 
 [pylint.basic]
 # Good variable names which should always be accepted, separated by a comma.
-- 
2.31.1



^ permalink raw reply related	[flat|nested] 36+ messages in thread

* [PATCH v2 19/24] python/aqmp: Add message routing to QMP protocol
  2021-07-17  0:32 [PATCH v2 00/24] python: introduce Asynchronous QMP package John Snow
                   ` (17 preceding siblings ...)
  2021-07-17  0:32 ` [PATCH v2 18/24] python/pylint: disable no-member check John Snow
@ 2021-07-17  0:32 ` John Snow
  2021-07-17  0:32 ` [PATCH v2 20/24] python/aqmp: add execute() interfaces John Snow
                   ` (5 subsequent siblings)
  24 siblings, 0 replies; 36+ messages in thread
From: John Snow @ 2021-07-17  0:32 UTC (permalink / raw)
  To: qemu-devel
  Cc: Eduardo Habkost, Eric Blake, Stefan Hajnoczi, Markus Armbruster,
	Wainer dos Santos Moschetta, Niteesh G . S .,
	Willian Rampazzo, Cleber Rosa, John Snow

Add the ability to handle and route messages in qmp_protocol.py. The
interface for actually sending anything still isn't added until next
commit.

Signed-off-by: John Snow <jsnow@redhat.com>
---
 python/qemu/aqmp/qmp_client.py | 122 ++++++++++++++++++++++++++++++++-
 1 file changed, 120 insertions(+), 2 deletions(-)

diff --git a/python/qemu/aqmp/qmp_client.py b/python/qemu/aqmp/qmp_client.py
index 000ff59c7a7..fa0cc7c5ae5 100644
--- a/python/qemu/aqmp/qmp_client.py
+++ b/python/qemu/aqmp/qmp_client.py
@@ -7,15 +7,19 @@
 accept an incoming connection from that server.
 """
 
+# The import workarounds here are fixed in the next commit.
+import asyncio  # pylint: disable=unused-import # noqa
 import logging
 from typing import (
     Dict,
     List,
     Mapping,
     Optional,
+    Union,
+    cast,
 )
 
-from .error import ProtocolError
+from .error import AQMPError, ProtocolError
 from .events import Events
 from .message import Message
 from .models import Greeting
@@ -61,6 +65,53 @@ class NegotiationError(_WrappedProtocolError):
     """
 
 
+class ExecInterruptedError(AQMPError):
+    """
+    Exception raised when an RPC is interrupted.
+
+    This error is raised when an execute() statement could not be
+    completed.  This can occur because the connection itself was
+    terminated before a reply was received.
+
+    The true cause of the interruption will be available via `disconnect()`.
+    """
+
+
+class _MsgProtocolError(ProtocolError):
+    """
+    Abstract error class for protocol errors that have a `Message` object.
+
+    This Exception class is used for protocol errors where the `Message`
+    was mechanically understood, but was found to be inappropriate or
+    malformed.
+
+    :param error_message: Human-readable string describing the error.
+    :param msg: The QMP `Message` that caused the error.
+    """
+    def __init__(self, error_message: str, msg: Message):
+        super().__init__(error_message)
+        #: The received `Message` that caused the error.
+        self.msg: Message = msg
+
+    def __str__(self) -> str:
+        return "\n".join([
+            super().__str__(),
+            f"  Message was: {str(self.msg)}\n",
+        ])
+
+
+class ServerParseError(_MsgProtocolError):
+    """
+    The Server sent a `Message` indicating parsing failure.
+
+    i.e. A reply has arrived from the server, but it is missing the "ID"
+    field, indicating a parsing error.
+
+    :param error_message: Human-readable string describing the error.
+    :param msg: The QMP `Message` that caused the error.
+    """
+
+
 class QMPClient(AsyncProtocol[Message], Events):
     """
     Implements a QMP client connection.
@@ -106,6 +157,9 @@ async def run(self, address='/tmp/qemu.socket'):
     # Read buffer limit; large enough to accept query-qmp-schema
     _limit = (256 * 1024)
 
+    # Type alias for pending execute() result items
+    _PendingT = Union[Message, ExecInterruptedError]
+
     def __init__(self, name: Optional[str] = None) -> None:
         super().__init__(name)
         Events.__init__(self)
@@ -120,6 +174,12 @@ def __init__(self, name: Optional[str] = None) -> None:
         # Cached Greeting, if one was awaited.
         self._greeting: Optional[Greeting] = None
 
+        # Incoming RPC reply messages.
+        self._pending: Dict[
+            Union[str, None],
+            'asyncio.Queue[QMPClient._PendingT]'
+        ] = {}
+
     @upper_half
     async def _establish_session(self) -> None:
         """
@@ -132,6 +192,9 @@ async def _establish_session(self) -> None:
         :raise EOFError: When the server unexpectedly hangs up.
         :raise OSError: For underlying stream errors.
         """
+        self._greeting = None
+        self._pending = {}
+
         if self.await_greeting or self.negotiate:
             self._greeting = await self._get_greeting()
 
@@ -203,10 +266,33 @@ async def _negotiate(self) -> None:
             self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
             raise
 
+    @bottom_half
+    async def _bh_disconnect(self) -> None:
+        try:
+            await super()._bh_disconnect()
+        finally:
+            if self._pending:
+                self.logger.debug("Cancelling pending executions")
+            keys = self._pending.keys()
+            for key in keys:
+                self.logger.debug("Cancelling execution '%s'", key)
+                self._pending[key].put_nowait(
+                    ExecInterruptedError("Disconnected")
+                )
+
+            self.logger.debug("QMP Disconnected.")
+
+    @upper_half
+    def _cleanup(self) -> None:
+        super()._cleanup()
+        assert not self._pending
+
     @bottom_half
     async def _on_message(self, msg: Message) -> None:
         """
         Add an incoming message to the appropriate queue/handler.
+
+        :raise ServerParseError: When Message indicates server parse failure.
         """
         # Incoming messages are not fully parsed/validated here;
         # do only light peeking to know how to route the messages.
@@ -216,7 +302,39 @@ async def _on_message(self, msg: Message) -> None:
             return
 
         # Below, we assume everything left is an execute/exec-oob response.
-        # ... Which we'll implement in the next commit!
+
+        exec_id = cast(Optional[str], msg.get('id'))
+
+        if exec_id in self._pending:
+            await self._pending[exec_id].put(msg)
+            return
+
+        # We have a message we can't route back to a caller.
+
+        is_error = 'error' in msg
+        has_id = 'id' in msg
+
+        if is_error and not has_id:
+            # This is very likely a server parsing error.
+            # It doesn't inherently belong to any pending execution.
+            # Instead of performing clever recovery, just terminate.
+            # See "NOTE" in qmp-spec.txt, section 2.4.2
+            raise ServerParseError(
+                ("Server sent an error response without an ID, "
+                 "but there are no ID-less executions pending. "
+                 "Assuming this is a server parser failure."),
+                msg
+            )
+
+        # qmp-spec.txt, section 2.4:
+        # 'Clients should drop all the responses
+        # that have an unknown "id" field.'
+        self.logger.log(
+            logging.ERROR if is_error else logging.WARNING,
+            "Unknown ID '%s', message dropped.",
+            exec_id,
+        )
+        self.logger.debug("Unroutable message: %s", str(msg))
 
     @upper_half
     @bottom_half
-- 
2.31.1



^ permalink raw reply related	[flat|nested] 36+ messages in thread

* [PATCH v2 20/24] python/aqmp: add execute() interfaces
  2021-07-17  0:32 [PATCH v2 00/24] python: introduce Asynchronous QMP package John Snow
                   ` (18 preceding siblings ...)
  2021-07-17  0:32 ` [PATCH v2 19/24] python/aqmp: Add message routing to QMP protocol John Snow
@ 2021-07-17  0:32 ` John Snow
  2021-07-17  0:32 ` [PATCH v2 21/24] python/aqmp: add _raw() execution interface John Snow
                   ` (4 subsequent siblings)
  24 siblings, 0 replies; 36+ messages in thread
From: John Snow @ 2021-07-17  0:32 UTC (permalink / raw)
  To: qemu-devel
  Cc: Eduardo Habkost, Eric Blake, Stefan Hajnoczi, Markus Armbruster,
	Wainer dos Santos Moschetta, Niteesh G . S .,
	Willian Rampazzo, Cleber Rosa, John Snow

Add execute() and execute_msg().

_execute() is split into _issue() and _reply() halves so that
hypothetical subclasses of QMP that want to support different execution
paradigms can do so.

I anticipate a synchronous interface may have need of separating the
send/reply phases. However, I do not wish to expose that interface here
and want to actively discourage it, so they remain private interfaces.

Signed-off-by: John Snow <jsnow@redhat.com>
---
 python/qemu/aqmp/__init__.py   |   4 +-
 python/qemu/aqmp/qmp_client.py | 202 +++++++++++++++++++++++++++++++--
 2 files changed, 198 insertions(+), 8 deletions(-)

diff --git a/python/qemu/aqmp/__init__.py b/python/qemu/aqmp/__init__.py
index 7bd66a48bed..ef2903fa7fc 100644
--- a/python/qemu/aqmp/__init__.py
+++ b/python/qemu/aqmp/__init__.py
@@ -25,7 +25,7 @@
 from .events import EventListener
 from .message import Message
 from .protocol import ConnectError, Runstate
-from .qmp_client import QMPClient
+from .qmp_client import ExecInterruptedError, ExecuteError, QMPClient
 
 
 # The order of these fields impact the Sphinx documentation order.
@@ -39,4 +39,6 @@
     # Exceptions, most generic to most explicit
     'AQMPError',
     'ConnectError',
+    'ExecuteError',
+    'ExecInterruptedError',
 )
diff --git a/python/qemu/aqmp/qmp_client.py b/python/qemu/aqmp/qmp_client.py
index fa0cc7c5ae5..879348feaaa 100644
--- a/python/qemu/aqmp/qmp_client.py
+++ b/python/qemu/aqmp/qmp_client.py
@@ -7,8 +7,7 @@
 accept an incoming connection from that server.
 """
 
-# The import workarounds here are fixed in the next commit.
-import asyncio  # pylint: disable=unused-import # noqa
+import asyncio
 import logging
 from typing import (
     Dict,
@@ -22,8 +21,8 @@
 from .error import AQMPError, ProtocolError
 from .events import Events
 from .message import Message
-from .models import Greeting
-from .protocol import AsyncProtocol
+from .models import ErrorResponse, Greeting
+from .protocol import AsyncProtocol, Runstate, require
 from .util import (
     bottom_half,
     exception_summary,
@@ -65,11 +64,32 @@ class NegotiationError(_WrappedProtocolError):
     """
 
 
+class ExecuteError(AQMPError):
+    """
+    Exception raised by `QMPClient.execute()` on RPC failure.
+
+    :param error_response: The RPC error response object.
+    :param sent: The sent RPC message that caused the failure.
+    :param received: The raw RPC error reply received.
+    """
+    def __init__(self, error_response: ErrorResponse,
+                 sent: Message, received: Message):
+        super().__init__(error_response.error.desc)
+        #: The sent `Message` that caused the failure
+        self.sent: Message = sent
+        #: The received `Message` that indicated failure
+        self.received: Message = received
+        #: The parsed error response
+        self.error: ErrorResponse = error_response
+        #: The QMP error class
+        self.error_class: str = error_response.error.class_
+
+
 class ExecInterruptedError(AQMPError):
     """
-    Exception raised when an RPC is interrupted.
+    Exception raised by `execute()` (et al) when an RPC is interrupted.
 
-    This error is raised when an execute() statement could not be
+    This error is raised when an `execute()` statement could not be
     completed.  This can occur because the connection itself was
     terminated before a reply was received.
 
@@ -112,6 +132,27 @@ class ServerParseError(_MsgProtocolError):
     """
 
 
+class BadReplyError(_MsgProtocolError):
+    """
+    An execution reply was successfully routed, but not understood.
+
+    If a QMP message is received with an 'id' field to allow it to be
+    routed, but is otherwise malformed, this exception will be raised.
+
+    A reply message is malformed if it is missing either the 'return' or
+    'error' keys, or if the 'error' value has missing keys or members of
+    the wrong type.
+
+    :param error_message: Human-readable string describing the error.
+    :param msg: The malformed reply that was received.
+    :param sent: The message that was sent that prompted the error.
+    """
+    def __init__(self, error_message: str, msg: Message, sent: Message):
+        super().__init__(error_message, msg)
+        #: The sent `Message` that caused the failure
+        self.sent = sent
+
+
 class QMPClient(AsyncProtocol[Message], Events):
     """
     Implements a QMP client connection.
@@ -174,6 +215,9 @@ def __init__(self, name: Optional[str] = None) -> None:
         # Cached Greeting, if one was awaited.
         self._greeting: Optional[Greeting] = None
 
+        # Command ID counter
+        self._execute_id = 0
+
         # Incoming RPC reply messages.
         self._pending: Dict[
             Union[str, None],
@@ -363,12 +407,135 @@ def _do_send(self, msg: Message) -> None:
         assert self._writer is not None
         self._writer.write(bytes(msg))
 
+    @upper_half
+    def _get_exec_id(self) -> str:
+        exec_id = f"__aqmp#{self._execute_id:05d}"
+        self._execute_id += 1
+        return exec_id
+
+    @upper_half
+    async def _issue(self, msg: Message) -> Union[None, str]:
+        """
+        Issue a QMP `Message` and do not wait for a reply.
+
+        :param msg: The QMP `Message` to send to the server.
+
+        :return: The ID of the `Message` sent.
+        """
+        msg_id: Optional[str] = None
+        if 'id' in msg:
+            assert isinstance(msg['id'], str)
+            msg_id = msg['id']
+
+        self._pending[msg_id] = asyncio.Queue(maxsize=1)
+        await self._outgoing.put(msg)
+
+        return msg_id
+
+    @upper_half
+    async def _reply(self, msg_id: Union[str, None]) -> Message:
+        """
+        Await a reply to a previously issued QMP message.
+
+        :param msg_id: The ID of the previously issued message.
+
+        :return: The reply from the server.
+        :raise ExecInterruptedError:
+            When the reply could not be retrieved because the connection
+            was lost, or some other problem.
+        """
+        queue = self._pending[msg_id]
+        result = await queue.get()
+
+        try:
+            if isinstance(result, ExecInterruptedError):
+                raise result
+            return result
+        finally:
+            del self._pending[msg_id]
+
+    @upper_half
+    async def _execute(self, msg: Message, assign_id: bool = True) -> Message:
+        """
+        Send a QMP `Message` to the server and await a reply.
+
+        This method *assumes* you are sending some kind of an execute
+        statement that *will* receive a reply.
+
+        An execution ID will be assigned if assign_id is `True`. It can be
+        disabled, but this requires that an ID is manually assigned
+        instead. For manually assigned IDs, you must not use the string
+        '__aqmp#' anywhere in the ID.
+
+        :param msg: The QMP `Message` to execute.
+        :param assign_id: If True, assign a new execution ID.
+
+        :return: Execution reply from the server.
+        :raise ExecInterruptedError:
+            When the reply could not be retrieved because the connection
+            was lost, or some other problem.
+        """
+        if assign_id:
+            msg['id'] = self._get_exec_id()
+        elif 'id' in msg:
+            assert isinstance(msg['id'], str)
+            assert '__aqmp#' not in msg['id']
+
+        exec_id = await self._issue(msg)
+        return await self._reply(exec_id)
+
+    @upper_half
+    @require(Runstate.RUNNING)
+    async def execute_msg(self, msg: Message) -> object:
+        """
+        Execute a QMP command and return its value.
+
+        :param msg: The QMP `Message` to execute.
+
+        :return:
+            The command execution return value from the server. The type of
+            object returned depends on the command that was issued,
+            though most in QEMU return a `dict`.
+        :raise ValueError:
+            If the QMP `Message` does not have either the 'execute' or
+            'exec-oob' fields set.
+        :raise ExecuteError: When the server returns an error response.
+        :raise ExecInterruptedError: if the connection was terminated early.
+        """
+        if not ('execute' in msg or 'exec-oob' in msg):
+            raise ValueError("Requires 'execute' or 'exec-oob' message")
+
+        # Copy the Message so that the ID assigned by _execute() is
+        # local to this method; allowing the ID to be seen in raised
+        # Exceptions but without modifying the caller's held copy.
+        msg = Message(msg)
+        reply = await self._execute(msg)
+
+        if 'error' in reply:
+            try:
+                error_response = ErrorResponse(reply)
+            except (KeyError, TypeError) as err:
+                # Error response was malformed.
+                raise BadReplyError(
+                    "QMP error reply is malformed", reply, msg,
+                ) from err
+
+            raise ExecuteError(error_response, msg, reply)
+
+        if 'return' not in reply:
+            raise BadReplyError(
+                "QMP reply is missing a 'error' or 'return' member",
+                reply, msg,
+            )
+
+        return reply['return']
+
     @classmethod
     def make_execute_msg(cls, cmd: str,
                          arguments: Optional[Mapping[str, object]] = None,
                          oob: bool = False) -> Message:
         """
-        Create an executable message to be sent later.
+        Create an executable message to be sent by `execute_msg` later.
 
         :param cmd: QMP command name.
         :param arguments: Arguments (if any). Must be JSON-serializable.
@@ -380,3 +547,24 @@ def make_execute_msg(cls, cmd: str,
         if arguments is not None:
             msg['arguments'] = arguments
         return msg
+
+    @upper_half
+    async def execute(self, cmd: str,
+                      arguments: Optional[Mapping[str, object]] = None,
+                      oob: bool = False) -> object:
+        """
+        Execute a QMP command and return its value.
+
+        :param cmd: QMP command name.
+        :param arguments: Arguments (if any). Must be JSON-serializable.
+        :param oob: If `True`, execute "out of band".
+
+        :return:
+            The command execution return value from the server. The type of
+            object returned depends on the command that was issued,
+            though most in QEMU return a `dict`.
+        :raise ExecuteError: When the server returns an error response.
+        :raise ExecInterruptedError: if the connection was terminated early.
+        """
+        msg = self.make_execute_msg(cmd, arguments, oob=oob)
+        return await self.execute_msg(msg)
-- 
2.31.1



^ permalink raw reply related	[flat|nested] 36+ messages in thread

* [PATCH v2 21/24] python/aqmp: add _raw() execution interface
  2021-07-17  0:32 [PATCH v2 00/24] python: introduce Asynchronous QMP package John Snow
                   ` (19 preceding siblings ...)
  2021-07-17  0:32 ` [PATCH v2 20/24] python/aqmp: add execute() interfaces John Snow
@ 2021-07-17  0:32 ` John Snow
  2021-07-17  0:32 ` [PATCH v2 22/24] python/aqmp: add asyncio_run compatibility wrapper John Snow
                   ` (3 subsequent siblings)
  24 siblings, 0 replies; 36+ messages in thread
From: John Snow @ 2021-07-17  0:32 UTC (permalink / raw)
  To: qemu-devel
  Cc: Eduardo Habkost, Eric Blake, Stefan Hajnoczi, Markus Armbruster,
	Wainer dos Santos Moschetta, Niteesh G . S .,
	Willian Rampazzo, Cleber Rosa, John Snow

This is added in anticipation of wanting it for a synchronous wrapper
for the iotest interface. Normally, execute() and execute_msg() both
raise QMP errors in the form of Python exceptions.

Many iotests expect the entire reply as-is. To reduce churn there, add a
private execution interface that will ease transition churn. However, I
do not wish to encourage its use, so it will remain a private interface.

Signed-off-by: John Snow <jsnow@redhat.com>
---
 python/qemu/aqmp/qmp_client.py | 51 ++++++++++++++++++++++++++++++++++
 1 file changed, 51 insertions(+)

diff --git a/python/qemu/aqmp/qmp_client.py b/python/qemu/aqmp/qmp_client.py
index 879348feaaa..82e9dab124c 100644
--- a/python/qemu/aqmp/qmp_client.py
+++ b/python/qemu/aqmp/qmp_client.py
@@ -484,6 +484,57 @@ async def _execute(self, msg: Message, assign_id: bool = True) -> Message:
         exec_id = await self._issue(msg)
         return await self._reply(exec_id)
 
+    @upper_half
+    @require(Runstate.RUNNING)
+    async def _raw(
+            self,
+            msg: Union[Message, Mapping[str, object], bytes],
+            assign_id: bool = True,
+    ) -> Message:
+        """
+        Issue a raw `Message` to the QMP server and await a reply.
+
+        :param msg:
+            A Message to send to the server. It may be a `Message`, any
+            Mapping (including Dict), or raw bytes.
+        :param assign_id:
+            Assign an arbitrary execution ID to this message. If
+            `False`, the existing id must either be absent (and no other
+            such pending execution may omit an ID) or a string. If it is
+            a string, it must not start with '__aqmp#' and no other such
+            pending execution may currently be using that ID.
+
+        :return: Execution reply from the server.
+
+        :raise ExecInterruptedError:
+            When the reply could not be retrieved because the connection
+            was lost, or some other problem.
+        :raise TypeError:
+            When assign_id is `False`, an ID is given, and it is not a string.
+        :raise ValueError:
+            When assign_id is `False`, but the ID is not usable;
+            Either because it starts with '__aqmp#' or it is already in-use.
+        """
+        # 1. convert generic Mapping or bytes to a QMP Message
+        # 2. copy Message objects so that we assign an ID only to the copy.
+        msg = Message(msg)
+
+        exec_id = msg.get('id')
+        if not assign_id and 'id' in msg:
+            if not isinstance(exec_id, str):
+                raise TypeError(f"ID ('{exec_id}') must be a string.")
+            if exec_id.startswith('__aqmp#'):
+                raise ValueError(
+                    f"ID ('{exec_id}') must not start with '__aqmp#'."
+                )
+
+        if not assign_id and exec_id in self._pending:
+            raise ValueError(
+                f"ID '{exec_id}' is in-use and cannot be used."
+            )
+
+        return await self._execute(msg, assign_id=assign_id)
+
     @upper_half
     @require(Runstate.RUNNING)
     async def execute_msg(self, msg: Message) -> object:
-- 
2.31.1



^ permalink raw reply related	[flat|nested] 36+ messages in thread

* [PATCH v2 22/24] python/aqmp: add asyncio_run compatibility wrapper
  2021-07-17  0:32 [PATCH v2 00/24] python: introduce Asynchronous QMP package John Snow
                   ` (20 preceding siblings ...)
  2021-07-17  0:32 ` [PATCH v2 21/24] python/aqmp: add _raw() execution interface John Snow
@ 2021-07-17  0:32 ` John Snow
  2021-07-17  0:32 ` [PATCH v2 23/24] python/aqmp: add scary message John Snow
                   ` (2 subsequent siblings)
  24 siblings, 0 replies; 36+ messages in thread
From: John Snow @ 2021-07-17  0:32 UTC (permalink / raw)
  To: qemu-devel
  Cc: Eduardo Habkost, Eric Blake, Stefan Hajnoczi, Markus Armbruster,
	Wainer dos Santos Moschetta, Niteesh G . S .,
	Willian Rampazzo, Cleber Rosa, John Snow

As a convenience. It isn't used by the library itself, but it is used by
the test suite. It will also come in handy for users of the library
still on Python 3.6.

Signed-off-by: John Snow <jsnow@redhat.com>
---
 python/qemu/aqmp/util.py | 19 +++++++++++++++++++
 1 file changed, 19 insertions(+)

diff --git a/python/qemu/aqmp/util.py b/python/qemu/aqmp/util.py
index 70ef94ad600..de0df44cbd7 100644
--- a/python/qemu/aqmp/util.py
+++ b/python/qemu/aqmp/util.py
@@ -137,6 +137,25 @@ async def wait_closed(writer: asyncio.StreamWriter) -> None:
     await flush(writer)
 
 
+def asyncio_run(coro: Coroutine[Any, Any, T], *, debug: bool = False) -> T:
+    """
+    Python 3.6-compatible `asyncio.run` wrapper.
+
+    :param coro: A coroutine to execute now.
+    :return: The return value from the coroutine.
+    """
+    if sys.version_info >= (3, 7):
+        return asyncio.run(coro, debug=debug)
+
+    # Python 3.6
+    loop = asyncio.get_event_loop()
+    loop.set_debug(debug)
+    ret = loop.run_until_complete(coro)
+    loop.close()
+
+    return ret
+
+
 # ----------------------------
 # Section: Logging & Debugging
 # ----------------------------
-- 
2.31.1



^ permalink raw reply related	[flat|nested] 36+ messages in thread

* [PATCH v2 23/24] python/aqmp: add scary message
  2021-07-17  0:32 [PATCH v2 00/24] python: introduce Asynchronous QMP package John Snow
                   ` (21 preceding siblings ...)
  2021-07-17  0:32 ` [PATCH v2 22/24] python/aqmp: add asyncio_run compatibility wrapper John Snow
@ 2021-07-17  0:32 ` John Snow
  2021-07-17  0:32 ` [PATCH v2 24/24] python/aqmp: add AsyncProtocol unit tests John Snow
  2021-07-21 17:03 ` [PATCH v2 00/24] python: introduce Asynchronous QMP package Niteesh G. S.
  24 siblings, 0 replies; 36+ messages in thread
From: John Snow @ 2021-07-17  0:32 UTC (permalink / raw)
  To: qemu-devel
  Cc: Eduardo Habkost, Eric Blake, Stefan Hajnoczi, Markus Armbruster,
	Wainer dos Santos Moschetta, Niteesh G . S .,
	Willian Rampazzo, Cleber Rosa, John Snow

Add a warning whenever AQMP is used to steer people gently away from
using it for the time-being.

Signed-off-by: John Snow <jsnow@redhat.com>
---
 python/qemu/aqmp/__init__.py | 14 ++++++++++++++
 1 file changed, 14 insertions(+)

diff --git a/python/qemu/aqmp/__init__.py b/python/qemu/aqmp/__init__.py
index ef2903fa7fc..321ea5c5c4b 100644
--- a/python/qemu/aqmp/__init__.py
+++ b/python/qemu/aqmp/__init__.py
@@ -21,6 +21,8 @@
 # This work is licensed under the terms of the GNU GPL, version 2.  See
 # the COPYING file in the top-level directory.
 
+import warnings
+
 from .error import AQMPError
 from .events import EventListener
 from .message import Message
@@ -28,6 +30,18 @@
 from .qmp_client import ExecInterruptedError, ExecuteError, QMPClient
 
 
+_WMSG = """
+
+The Asynchronous QMP library is currently in development and its API
+should be considered highly fluid and subject to change. It should
+not be used by any other scripts checked into the QEMU tree.
+
+Proceed with caution!
+"""
+
+warnings.warn(_WMSG, FutureWarning)
+
+
 # The order of these fields impact the Sphinx documentation order.
 __all__ = (
     # Classes, most to least important
-- 
2.31.1



^ permalink raw reply related	[flat|nested] 36+ messages in thread

* [PATCH v2 24/24] python/aqmp: add AsyncProtocol unit tests
  2021-07-17  0:32 [PATCH v2 00/24] python: introduce Asynchronous QMP package John Snow
                   ` (22 preceding siblings ...)
  2021-07-17  0:32 ` [PATCH v2 23/24] python/aqmp: add scary message John Snow
@ 2021-07-17  0:32 ` John Snow
  2021-07-20 20:34   ` Beraldo Leal
  2021-07-21 17:03 ` [PATCH v2 00/24] python: introduce Asynchronous QMP package Niteesh G. S.
  24 siblings, 1 reply; 36+ messages in thread
From: John Snow @ 2021-07-17  0:32 UTC (permalink / raw)
  To: qemu-devel
  Cc: Eduardo Habkost, Eric Blake, Stefan Hajnoczi, Markus Armbruster,
	Wainer dos Santos Moschetta, Niteesh G . S .,
	Willian Rampazzo, Cleber Rosa, John Snow

This tests most of protocol.py -- From a hacked up Coverage.py run, it's
at about 86%. There's a few error cases that aren't very well tested
yet, they're hard to induce artificially so far. I'm working on it.

Signed-off-by: John Snow <jsnow@redhat.com>
---
 python/tests/null_proto.py |  67 ++++++
 python/tests/protocol.py   | 458 +++++++++++++++++++++++++++++++++++++
 2 files changed, 525 insertions(+)
 create mode 100644 python/tests/null_proto.py
 create mode 100644 python/tests/protocol.py

diff --git a/python/tests/null_proto.py b/python/tests/null_proto.py
new file mode 100644
index 00000000000..c697efc0001
--- /dev/null
+++ b/python/tests/null_proto.py
@@ -0,0 +1,67 @@
+import asyncio
+
+from qemu.aqmp.protocol import AsyncProtocol
+
+
+class NullProtocol(AsyncProtocol[None]):
+    """
+    NullProtocol is a test mockup of an AsyncProtocol implementation.
+
+    It adds a fake_session instance variable that enables a code path
+    that bypasses the actual connection logic, but still allows the
+    reader/writers to start.
+
+    Because the message type is defined as None, an asyncio.Event named
+    'trigger_input' is created that prohibits the reader from
+    incessantly being able to yield None; this input can be poked to
+    simulate an incoming message.
+
+    For testing symmetry with do_recv, an interface is added to "send" a
+    Null message.
+
+    For testing purposes, a "simulate_disconnection" method is also
+    added which allows us to trigger a bottom half disconnect without
+    injecting any real errors into the reader/writer loops; in essence
+    it performs exactly half of what disconnect() normally does.
+    """
+    def __init__(self, name=None):
+        self.fake_session = False
+        self.trigger_input: asyncio.Event
+        super().__init__(name)
+
+    async def _establish_session(self):
+        self.trigger_input = asyncio.Event()
+        await super()._establish_session()
+
+    async def _do_accept(self, address, ssl=None):
+        if not self.fake_session:
+            await super()._do_accept(address, ssl)
+
+    async def _do_connect(self, address, ssl=None):
+        if not self.fake_session:
+            await super()._do_connect(address, ssl)
+
+    async def _do_recv(self) -> None:
+        await self.trigger_input.wait()
+        self.trigger_input.clear()
+
+    def _do_send(self, msg: None) -> None:
+        pass
+
+    async def send_msg(self) -> None:
+        await self._outgoing.put(None)
+
+    async def simulate_disconnect(self) -> None:
+        # Simulates a bottom half disconnect, e.g. schedules a
+        # disconnection but does not wait for it to complete. This is
+        # used to put the loop into the DISCONNECTING state without
+        # fully quiescing it back to IDLE; this is normally something
+        # you cannot coax AsyncProtocol to do on purpose, but it will be
+        # similar to what happens with an unhandled Exception in the
+        # reader/writer.
+        #
+        # Under normal circumstances, the library design requires you to
+        # await on disconnect(), which awaits the disconnect task and
+        # returns bottom half errors as a pre-condition to allowing the
+        # loop to return back to IDLE.
+        self._schedule_disconnect()
diff --git a/python/tests/protocol.py b/python/tests/protocol.py
new file mode 100644
index 00000000000..2374d01365e
--- /dev/null
+++ b/python/tests/protocol.py
@@ -0,0 +1,458 @@
+import asyncio
+from contextlib import contextmanager
+import os
+import socket
+from tempfile import TemporaryDirectory
+
+import avocado
+
+from qemu.aqmp import ConnectError, Runstate
+from qemu.aqmp.protocol import StateError
+from qemu.aqmp.util import asyncio_run, create_task
+
+# An Avocado bug prevents us from defining this testing class in-line here:
+from null_proto import NullProtocol
+
+
+def run_as_task(coro, allow_cancellation=False):
+    # This helper runs a given coroutine as a task, wrapping it in a
+    # try...except that allows it to be cancelled gracefully.
+    async def _runner():
+        try:
+            await coro
+        except asyncio.CancelledError:
+            if allow_cancellation:
+                return
+            raise
+    return create_task(_runner())
+
+
+@contextmanager
+def jammed_socket():
+    # This method opens up a random TCP port on localhost, then jams it.
+    socks = []
+
+    try:
+        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+        sock.bind(('127.0.0.1', 0))
+        sock.listen(1)
+        address = sock.getsockname()
+
+        socks.append(sock)
+
+        # I don't *fully* understand why, but it takes *two* un-accepted
+        # connections to start jamming the socket.
+        for _ in range(2):
+            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+            sock.connect(address)
+            socks.append(sock)
+
+        yield address
+
+    finally:
+        for sock in socks:
+            sock.close()
+
+
+class Smoke(avocado.Test):
+
+    def setUp(self):
+        self.proto = NullProtocol()
+
+    def test__repr__(self):
+        self.assertEqual(
+            repr(self.proto),
+            "<NullProtocol runstate=IDLE>"
+        )
+
+    def testRunstate(self):
+        self.assertEqual(
+            self.proto.runstate,
+            Runstate.IDLE
+        )
+
+    def testDefaultName(self):
+        self.assertEqual(
+            self.proto.name,
+            None
+        )
+
+    def testLogger(self):
+        self.assertEqual(
+            self.proto.logger.name,
+            'qemu.aqmp.protocol'
+        )
+
+    def testName(self):
+        self.proto = NullProtocol('Steve')
+
+        self.assertEqual(
+            self.proto.name,
+            'Steve'
+        )
+
+        self.assertEqual(
+            self.proto.logger.name,
+            'qemu.aqmp.protocol.Steve'
+        )
+
+        self.assertEqual(
+            repr(self.proto),
+            "<NullProtocol name='Steve' runstate=IDLE>"
+        )
+
+
+class TestBase(avocado.Test):
+
+    def setUp(self):
+        self.proto = NullProtocol(type(self).__name__)
+        self.assertEqual(self.proto.runstate, Runstate.IDLE)
+        self.runstate_watcher = None
+
+    def tearDown(self):
+        self.assertEqual(self.proto.runstate, Runstate.IDLE)
+
+    async def _asyncSetUp(self):
+        pass
+
+    async def _asyncTearDown(self):
+        if self.runstate_watcher:
+            await self.runstate_watcher
+
+    def _asyncRunner(self, test_coroutine):
+        async def coroutine():
+            await self._asyncSetUp()
+            await test_coroutine
+            await self._asyncTearDown()
+
+        asyncio_run(coroutine(), debug=True)
+
+    # Definitions
+
+    # The states we expect a "bad" connect/accept attempt to transition through
+    BAD_CONNECTION_STATES = (
+        Runstate.CONNECTING,
+        Runstate.DISCONNECTING,
+        Runstate.IDLE,
+    )
+
+    # The states we expect a "good" session to transition through
+    GOOD_CONNECTION_STATES = (
+        Runstate.CONNECTING,
+        Runstate.RUNNING,
+        Runstate.DISCONNECTING,
+        Runstate.IDLE,
+    )
+
+    # Helpers
+
+    async def _watch_runstates(self, *states):
+        # This launches a task alongside most tests below to confirm that the
+        # sequence of runstate changes is exactly as anticipated.
+
+        async def _watcher():
+            for state in states:
+                new_state = await self.proto.runstate_changed()
+                self.assertEqual(
+                    new_state,
+                    state,
+                    msg=f"Expected state '{state.name}'",
+                )
+
+        self.runstate_watcher = create_task(_watcher())
+        # Kick the loop and force the task to block on the event.
+        await asyncio.sleep(0)
+
+
+class State(TestBase):
+
+    async def testSuperfluousDisconnect_(self):
+        await self._watch_runstates(
+            Runstate.DISCONNECTING,
+            Runstate.IDLE,
+        )
+        await self.proto.disconnect()
+
+    def testSuperfluousDisconnect(self):
+        self._asyncRunner(self.testSuperfluousDisconnect_())
+
+
+class Connect(TestBase):
+
+    async def _bad_connection(self, family: str):
+        assert family in ('INET', 'UNIX')
+
+        if family == 'INET':
+            await self.proto.connect(('127.0.0.1', 0))
+        elif family == 'UNIX':
+            await self.proto.connect('/dev/null')
+
+    async def _hanging_connection(self):
+        with jammed_socket() as addr:
+            await self.proto.connect(addr)
+
+    async def _bad_connection_test(self, family: str):
+        await self._watch_runstates(*self.BAD_CONNECTION_STATES)
+
+        with self.assertRaises(ConnectError) as context:
+            await self._bad_connection(family)
+
+        self.assertIsInstance(context.exception.exc, OSError)
+        self.assertEqual(
+            context.exception.error_message,
+            "Failed to establish connection"
+        )
+
+    def testBadINET(self):
+        self._asyncRunner(self._bad_connection_test('INET'))
+        # self.assertIsInstance(err.exc, ConnectionRefusedError)
+
+    def testBadUNIX(self):
+        self._asyncRunner(self._bad_connection_test('UNIX'))
+        # self.assertIsInstance(err.exc, ConnectionRefusedError)
+
+    async def testCancellation_(self):
+        # Note that accept() cannot be cancelled outright, as it isn't a task.
+        # However, we can wrap it in a task and cancel *that*.
+        await self._watch_runstates(*self.BAD_CONNECTION_STATES)
+        task = run_as_task(self._hanging_connection(), allow_cancellation=True)
+
+        state = await self.proto.runstate_changed()
+        self.assertEqual(state, Runstate.CONNECTING)
+
+        # This is insider baseball, but the connection attempt has
+        # yielded *just* before the actual connection attempt, so kick
+        # the loop to make sure it's truly wedged.
+        await asyncio.sleep(0)
+
+        task.cancel()
+        await task
+
+    def testCancellation(self):
+        self._asyncRunner(self.testCancellation_())
+
+    async def testTimeout_(self):
+        await self._watch_runstates(*self.BAD_CONNECTION_STATES)
+        task = run_as_task(self._hanging_connection())
+
+        # More insider baseball: to improve the speed of this test while
+        # guaranteeing that the connection even gets a chance to start,
+        # verify that the connection hangs *first*, then await the
+        # result of the task with a nearly-zero timeout.
+
+        state = await self.proto.runstate_changed()
+        self.assertEqual(state, Runstate.CONNECTING)
+        await asyncio.sleep(0)
+
+        with self.assertRaises(asyncio.TimeoutError):
+            await asyncio.wait_for(task, timeout=0)
+
+    def testTimeout(self):
+        self._asyncRunner(self.testTimeout_())
+
+    async def testRequire_(self):
+        await self._watch_runstates(*self.BAD_CONNECTION_STATES)
+        task = run_as_task(self._hanging_connection(), allow_cancellation=True)
+
+        state = await self.proto.runstate_changed()
+        self.assertEqual(state, Runstate.CONNECTING)
+
+        with self.assertRaises(StateError) as context:
+            await self._bad_connection('UNIX')
+
+        self.assertEqual(
+            context.exception.error_message,
+            "NullProtocol is currently connecting."
+        )
+        self.assertEqual(context.exception.state, Runstate.CONNECTING)
+        self.assertEqual(context.exception.required, Runstate.IDLE)
+
+        task.cancel()
+        await task
+
+    def testRequire(self):
+        self._asyncRunner(self.testRequire_())
+
+    async def testImplicitRunstateInit_(self):
+        # This tests what happens if we do not wait on the
+        # runstate until AFTER we connect, i.e., connect()/accept()
+        # themselves initialize the runstate event. All of the above
+        # tests force the initialization by waiting on the runstate
+        # *first*.
+        task = run_as_task(self._hanging_connection(), allow_cancellation=True)
+
+        # Kick the loop to coerce the state change
+        await asyncio.sleep(0)
+        assert self.proto.runstate == Runstate.CONNECTING
+
+        # We already missed the transition to CONNECTING
+        await self._watch_runstates(Runstate.DISCONNECTING, Runstate.IDLE)
+
+        task.cancel()
+        await task
+
+    def testImplicitRunstateInit(self):
+        self._asyncRunner(self.testImplicitRunstateInit_())
+
+
+class Accept(Connect):
+
+    async def _bad_connection(self, family: str):
+        assert family in ('INET', 'UNIX')
+
+        if family == 'INET':
+            await self.proto.accept(('example.com', 1))
+        elif family == 'UNIX':
+            await self.proto.accept('/dev/null')
+
+    async def _hanging_connection(self):
+        with TemporaryDirectory(suffix='.aqmp') as tmpdir:
+            sock = os.path.join(tmpdir, type(self.proto).__name__ + ".sock")
+            await self.proto.accept(sock)
+
+
+class FakeSession(TestBase):
+
+    def setUp(self):
+        super().setUp()
+        self.proto.fake_session = True
+
+    async def _asyncSetUp(self):
+        await super()._asyncSetUp()
+        await self._watch_runstates(*self.GOOD_CONNECTION_STATES)
+
+    async def _asyncTearDown(self):
+        await self.proto.disconnect()
+        await super()._asyncTearDown()
+
+    ####
+
+    async def testFakeConnect_(self):
+        await self.proto.connect('/not/a/real/path')
+        self.assertEqual(self.proto.runstate, Runstate.RUNNING)
+
+    def testFakeConnect(self):
+        """Test the full state lifecycle (via connect) with a no-op session."""
+        self._asyncRunner(self.testFakeConnect_())
+
+    async def testFakeAccept_(self):
+        await self.proto.accept('/not/a/real/path')
+        self.assertEqual(self.proto.runstate, Runstate.RUNNING)
+
+    def testFakeAccept(self):
+        """Test the full state lifecycle (via accept) with a no-op session."""
+        self._asyncRunner(self.testFakeAccept_())
+
+    async def testFakeRecv_(self):
+        await self.proto.accept('/not/a/real/path')
+
+        logname = self.proto.logger.name
+        with self.assertLogs(logname, level='DEBUG') as context:
+            self.proto.trigger_input.set()
+            self.proto.trigger_input.clear()
+            await asyncio.sleep(0)  # Kick reader.
+
+        self.assertEqual(
+            context.output,
+            [f"DEBUG:{logname}:<-- None"],
+        )
+
+    def testFakeRecv(self):
+        """Test receiving a fake/null message."""
+        self._asyncRunner(self.testFakeRecv_())
+
+    async def testFakeSend_(self):
+        await self.proto.accept('/not/a/real/path')
+
+        logname = self.proto.logger.name
+        with self.assertLogs(logname, level='DEBUG') as context:
+            # Cheat: Send a Null message to nobody.
+            await self.proto.send_msg()
+            # Kick writer; awaiting on a queue.put isn't sufficient to yield.
+            await asyncio.sleep(0)
+
+        self.assertEqual(
+            context.output,
+            [f"DEBUG:{logname}:--> None"],
+        )
+
+    def testFakeSend(self):
+        """Test sending a fake/null message."""
+        self._asyncRunner(self.testFakeSend_())
+
+    async def _prod_session_api(
+            self,
+            current_state: Runstate,
+            error_message: str,
+            accept: bool = True
+    ):
+        with self.assertRaises(StateError) as context:
+            if accept:
+                await self.proto.accept('/not/a/real/path')
+            else:
+                await self.proto.connect('/not/a/real/path')
+
+        self.assertEqual(context.exception.error_message, error_message)
+        self.assertEqual(context.exception.state, current_state)
+        self.assertEqual(context.exception.required, Runstate.IDLE)
+
+    async def testAcceptRequireRunning_(self):
+        await self.proto.accept('/not/a/real/path')
+
+        await self._prod_session_api(
+            Runstate.RUNNING,
+            "NullProtocol is already connected and running.",
+            accept=True,
+        )
+
+    def testAcceptRequireRunning(self):
+        """Test that accept() cannot be called when Runstate=RUNNING"""
+        self._asyncRunner(self.testAcceptRequireRunning_())
+
+    async def testConnectRequireRunning_(self):
+        await self.proto.accept('/not/a/real/path')
+
+        await self._prod_session_api(
+            Runstate.RUNNING,
+            "NullProtocol is already connected and running.",
+            accept=False,
+        )
+
+    def testConnectRequireRunning(self):
+        """Test that connect() cannot be called when Runstate=RUNNING"""
+        self._asyncRunner(self.testConnectRequireRunning_())
+
+    async def testAcceptRequireDisconnecting_(self):
+        await self.proto.accept('/not/a/real/path')
+
+        # Cheat: force a disconnect.
+        await self.proto.simulate_disconnect()
+
+        await self._prod_session_api(
+            Runstate.DISCONNECTING,
+            ("NullProtocol is disconnecting."
+             " Call disconnect() to return to IDLE state."),
+            accept=True,
+        )
+
+    def testAcceptRequireDisconnecting(self):
+        """Test that accept() cannot be called when Runstate=DISCONNECTING"""
+        self._asyncRunner(self.testAcceptRequireDisconnecting_())
+
+    async def testConnectRequireDisconnecting_(self):
+        await self.proto.accept('/not/a/real/path')
+
+        # Cheat: force a disconnect.
+        await self.proto.simulate_disconnect()
+
+        await self._prod_session_api(
+            Runstate.DISCONNECTING,
+            ("NullProtocol is disconnecting."
+             " Call disconnect() to return to IDLE state."),
+            accept=False,
+        )
+
+    def testConnectRequireDisconnecting(self):
+        """Test that connect() cannot be called when Runstate=DISCONNECTING"""
+        self._asyncRunner(self.testConnectRequireDisconnecting_())
-- 
2.31.1



^ permalink raw reply related	[flat|nested] 36+ messages in thread

* Re: [PATCH v2 11/24] python/aqmp: add _cb_inbound and _cb_inbound logging hooks
  2021-07-17  0:32 ` [PATCH v2 11/24] python/aqmp: add _cb_inbound and _cb_inbound logging hooks John Snow
@ 2021-07-20 18:51   ` Niteesh G. S.
  2021-07-20 19:13     ` John Snow
  0 siblings, 1 reply; 36+ messages in thread
From: Niteesh G. S. @ 2021-07-20 18:51 UTC (permalink / raw)
  To: John Snow
  Cc: Eduardo Habkost, Stefan Hajnoczi, qemu-devel,
	Wainer dos Santos Moschetta, Markus Armbruster, Willian Rampazzo,
	Cleber Rosa, Eric Blake

[-- Attachment #1: Type: text/plain, Size: 3983 bytes --]

I think there's a typo in your commit message subject.

Thanks,
Niteesh.

On Sat, Jul 17, 2021 at 6:03 AM John Snow <jsnow@redhat.com> wrote:

> Add hooks designed to log/filter incoming/outgoing messages. The primary
> intent for these is to be able to support iotests which may want to log
> messages with specific filters for reproducible output.
>
> Another use is for plugging into Urwid frameworks; all messages in/out
> can be automatically added to a rendering list for the purposes of a
> qmp-shell like tool.
>
> Signed-off-by: John Snow <jsnow@redhat.com>
> ---
>  python/qemu/aqmp/protocol.py | 50 +++++++++++++++++++++++++++++++++---
>  1 file changed, 46 insertions(+), 4 deletions(-)
>
> diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py
> index 86002a52654..6f83d3e3922 100644
> --- a/python/qemu/aqmp/protocol.py
> +++ b/python/qemu/aqmp/protocol.py
> @@ -176,6 +176,11 @@ class AsyncProtocol(Generic[T]):
>           can be written after the super() call.
>       - `_on_message`:
>           Actions to be performed when a message is received.
> +     - `_cb_outbound`:
> +         Logging/Filtering hook for all outbound messages.
> +     - `_cb_inbound`:
> +         Logging/Filtering hook for all inbound messages.
> +         This hook runs *before* `_on_message()`.
>
>      :param name:
>          Name used for logging messages, if any. By default, messages
> @@ -732,6 +737,43 @@ async def _bh_recv_message(self) -> None:
>      # Section: Message I/O
>      # --------------------
>
> +    @upper_half
> +    @bottom_half
> +    def _cb_outbound(self, msg: T) -> T:
> +        """
> +        Callback: outbound message hook.
> +
> +        This is intended for subclasses to be able to add arbitrary
> +        hooks to filter or manipulate outgoing messages. The base
> +        implementation does nothing but log the message without any
> +        manipulation of the message.
> +
> +        :param msg: raw outbound message
> +        :return: final outbound message
> +        """
> +        self.logger.debug("--> %s", str(msg))
> +        return msg
> +
> +    @upper_half
> +    @bottom_half
> +    def _cb_inbound(self, msg: T) -> T:
> +        """
> +        Callback: inbound message hook.
> +
> +        This is intended for subclasses to be able to add arbitrary
> +        hooks to filter or manipulate incoming messages. The base
> +        implementation does nothing but log the message without any
> +        manipulation of the message.
> +
> +        This method does not "handle" incoming messages; it is a filter.
> +        The actual "endpoint" for incoming messages is `_on_message()`.
> +
> +        :param msg: raw inbound message
> +        :return: processed inbound message
> +        """
> +        self.logger.debug("<-- %s", str(msg))
> +        return msg
> +
>      @upper_half
>      @bottom_half
>      async def _do_recv(self) -> T:
> @@ -760,8 +802,8 @@ async def _recv(self) -> T:
>
>          :return: A single (filtered, processed) protocol message.
>          """
> -        # A forthcoming commit makes this method less trivial.
> -        return await self._do_recv()
> +        message = await self._do_recv()
> +        return self._cb_inbound(message)
>
>      @upper_half
>      @bottom_half
> @@ -791,7 +833,7 @@ async def _send(self, msg: T) -> None:
>
>          :raise OSError: For problems with the underlying stream.
>          """
> -        # A forthcoming commit makes this method less trivial.
> +        msg = self._cb_outbound(msg)
>          self._do_send(msg)
>
>      @bottom_half
> @@ -806,6 +848,6 @@ async def _on_message(self, msg: T) -> None:
>              directly cause the loop to halt, so logic may be best-kept
>              to a minimum if at all possible.
>
> -        :param msg: The incoming message
> +        :param msg: The incoming message, already logged/filtered.
>          """
>          # Nothing to do in the abstract case.
> --
> 2.31.1
>
>

[-- Attachment #2: Type: text/html, Size: 5324 bytes --]

^ permalink raw reply	[flat|nested] 36+ messages in thread

* Re: [PATCH v2 11/24] python/aqmp: add _cb_inbound and _cb_inbound logging hooks
  2021-07-20 18:51   ` Niteesh G. S.
@ 2021-07-20 19:13     ` John Snow
  0 siblings, 0 replies; 36+ messages in thread
From: John Snow @ 2021-07-20 19:13 UTC (permalink / raw)
  To: Niteesh G. S.
  Cc: Eduardo Habkost, Stefan Hajnoczi, qemu-devel,
	Wainer dos Santos Moschetta, Markus Armbruster, Willian Rampazzo,
	Cleber Rosa, Eric Blake

[-- Attachment #1: Type: text/plain, Size: 213 bytes --]

On Tue, Jul 20, 2021 at 2:52 PM Niteesh G. S. <niteesh.gs@gmail.com> wrote:

> I think there's a typo in your commit message subject.
>
> Thanks,
> Niteesh.
>
>
Whoops, there sure is. Fixed locally, thanks!

--js

[-- Attachment #2: Type: text/html, Size: 761 bytes --]

^ permalink raw reply	[flat|nested] 36+ messages in thread

* Re: [PATCH v2 24/24] python/aqmp: add AsyncProtocol unit tests
  2021-07-17  0:32 ` [PATCH v2 24/24] python/aqmp: add AsyncProtocol unit tests John Snow
@ 2021-07-20 20:34   ` Beraldo Leal
  2021-08-02 17:24     ` John Snow
  0 siblings, 1 reply; 36+ messages in thread
From: Beraldo Leal @ 2021-07-20 20:34 UTC (permalink / raw)
  To: John Snow
  Cc: Willian Rampazzo, Eduardo Habkost, Markus Armbruster,
	Wainer dos Santos Moschetta, qemu-devel, Niteesh G . S .,
	Stefan Hajnoczi, Cleber Rosa, Eric Blake

On Fri, Jul 16, 2021 at 08:32:53PM -0400, John Snow wrote:
> This tests most of protocol.py -- From a hacked up Coverage.py run, it's
> at about 86%. There's a few error cases that aren't very well tested
> yet, they're hard to induce artificially so far. I'm working on it.
> 
> Signed-off-by: John Snow <jsnow@redhat.com>
> ---
>  python/tests/null_proto.py |  67 ++++++
>  python/tests/protocol.py   | 458 +++++++++++++++++++++++++++++++++++++
>  2 files changed, 525 insertions(+)
>  create mode 100644 python/tests/null_proto.py
>  create mode 100644 python/tests/protocol.py
> 
> diff --git a/python/tests/null_proto.py b/python/tests/null_proto.py
> new file mode 100644
> index 00000000000..c697efc0001
> --- /dev/null
> +++ b/python/tests/null_proto.py
> @@ -0,0 +1,67 @@
> +import asyncio
> +
> +from qemu.aqmp.protocol import AsyncProtocol
> +
> +
> +class NullProtocol(AsyncProtocol[None]):
> +    """
> +    NullProtocol is a test mockup of an AsyncProtocol implementation.
> +
> +    It adds a fake_session instance variable that enables a code path
> +    that bypasses the actual connection logic, but still allows the
> +    reader/writers to start.
> +
> +    Because the message type is defined as None, an asyncio.Event named
> +    'trigger_input' is created that prohibits the reader from
> +    incessantly being able to yield None; this input can be poked to
> +    simulate an incoming message.
> +
> +    For testing symmetry with do_recv, an interface is added to "send" a
> +    Null message.
> +
> +    For testing purposes, a "simulate_disconnection" method is also
> +    added which allows us to trigger a bottom half disconnect without
> +    injecting any real errors into the reader/writer loops; in essence
> +    it performs exactly half of what disconnect() normally does.
> +    """
> +    def __init__(self, name=None):
> +        self.fake_session = False
> +        self.trigger_input: asyncio.Event
> +        super().__init__(name)
> +
> +    async def _establish_session(self):
> +        self.trigger_input = asyncio.Event()
> +        await super()._establish_session()
> +
> +    async def _do_accept(self, address, ssl=None):
> +        if not self.fake_session:
> +            await super()._do_accept(address, ssl)
> +
> +    async def _do_connect(self, address, ssl=None):
> +        if not self.fake_session:
> +            await super()._do_connect(address, ssl)
> +
> +    async def _do_recv(self) -> None:
> +        await self.trigger_input.wait()
> +        self.trigger_input.clear()
> +
> +    def _do_send(self, msg: None) -> None:
> +        pass
> +
> +    async def send_msg(self) -> None:
> +        await self._outgoing.put(None)
> +
> +    async def simulate_disconnect(self) -> None:
> +        # Simulates a bottom half disconnect, e.g. schedules a
> +        # disconnection but does not wait for it to complete. This is
> +        # used to put the loop into the DISCONNECTING state without
> +        # fully quiescing it back to IDLE; this is normally something
> +        # you cannot coax AsyncProtocol to do on purpose, but it will be
> +        # similar to what happens with an unhandled Exception in the
> +        # reader/writer.
> +        #
> +        # Under normal circumstances, the library design requires you to
> +        # await on disconnect(), which awaits the disconnect task and
> +        # returns bottom half errors as a pre-condition to allowing the
> +        # loop to return back to IDLE.
> +        self._schedule_disconnect()

Nitpick: Any reason for not using a docstring? I wouldn't mind if it was
a docstring instead. ;)

> diff --git a/python/tests/protocol.py b/python/tests/protocol.py
> new file mode 100644
> index 00000000000..2374d01365e
> --- /dev/null
> +++ b/python/tests/protocol.py
> @@ -0,0 +1,458 @@
> +import asyncio
> +from contextlib import contextmanager
> +import os
> +import socket
> +from tempfile import TemporaryDirectory
> +
> +import avocado
> +
> +from qemu.aqmp import ConnectError, Runstate
> +from qemu.aqmp.protocol import StateError
> +from qemu.aqmp.util import asyncio_run, create_task

Nitpick: Maybe an isort?

> +# An Avocado bug prevents us from defining this testing class in-line here:
> +from null_proto import NullProtocol

Is this what you are looking for?

https://github.com/avocado-framework/avocado/pull/4764

If not, can you point to the right issue, please?

> +@contextmanager
> +def jammed_socket():
> +    # This method opens up a random TCP port on localhost, then jams it.
> +    socks = []
> +
> +    try:
> +        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
> +        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
> +        sock.bind(('127.0.0.1', 0))
> +        sock.listen(1)
> +        address = sock.getsockname()
> +
> +        socks.append(sock)
> +
> +        # I don't *fully* understand why, but it takes *two* un-accepted
> +        # connections to start jamming the socket.
> +        for _ in range(2):
> +            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
> +            sock.connect(address)
> +            socks.append(sock)
> +
> +        yield address
> +
> +    finally:
> +        for sock in socks:
> +            sock.close()
> +
> +
> +class Smoke(avocado.Test):
> +
> +    def setUp(self):
> +        self.proto = NullProtocol()
> +
> +    def test__repr__(self):
> +        self.assertEqual(
> +            repr(self.proto),
> +            "<NullProtocol runstate=IDLE>"
> +        )
> +
> +    def testRunstate(self):
> +        self.assertEqual(
> +            self.proto.runstate,
> +            Runstate.IDLE
> +        )
> +
> +    def testDefaultName(self):
> +        self.assertEqual(
> +            self.proto.name,
> +            None
> +        )
> +
> +    def testLogger(self):
> +        self.assertEqual(
> +            self.proto.logger.name,
> +            'qemu.aqmp.protocol'
> +        )
> +
> +    def testName(self):
> +        self.proto = NullProtocol('Steve')
> +
> +        self.assertEqual(
> +            self.proto.name,
> +            'Steve'
> +        )
> +
> +        self.assertEqual(
> +            self.proto.logger.name,
> +            'qemu.aqmp.protocol.Steve'
> +        )
> +
> +        self.assertEqual(
> +            repr(self.proto),
> +            "<NullProtocol name='Steve' runstate=IDLE>"
> +        )
> +
> +
> +class TestBase(avocado.Test):
> +
> +    def setUp(self):
> +        self.proto = NullProtocol(type(self).__name__)
> +        self.assertEqual(self.proto.runstate, Runstate.IDLE)
> +        self.runstate_watcher = None
> +
> +    def tearDown(self):
> +        self.assertEqual(self.proto.runstate, Runstate.IDLE)
> +
> +    async def _asyncSetUp(self):
> +        pass
> +
> +    async def _asyncTearDown(self):
> +        if self.runstate_watcher:
> +            await self.runstate_watcher
> +
> +    def _asyncRunner(self, test_coroutine):
> +        async def coroutine():
> +            await self._asyncSetUp()
> +            await test_coroutine
> +            await self._asyncTearDown()
> +
> +        asyncio_run(coroutine(), debug=True)
> +
> +    # Definitions
> +
> +    # The states we expect a "bad" connect/accept attempt to transition through
> +    BAD_CONNECTION_STATES = (
> +        Runstate.CONNECTING,
> +        Runstate.DISCONNECTING,
> +        Runstate.IDLE,
> +    )
> +
> +    # The states we expect a "good" session to transition through
> +    GOOD_CONNECTION_STATES = (
> +        Runstate.CONNECTING,
> +        Runstate.RUNNING,
> +        Runstate.DISCONNECTING,
> +        Runstate.IDLE,
> +    )
> +
> +    # Helpers
> +
> +    async def _watch_runstates(self, *states):
> +        # This launches a task alongside most tests below to confirm that the
> +        # sequence of runstate changes is exactly as anticipated.
> +
> +        async def _watcher():
> +            for state in states:
> +                new_state = await self.proto.runstate_changed()
> +                self.assertEqual(
> +                    new_state,
> +                    state,
> +                    msg=f"Expected state '{state.name}'",
> +                )
> +
> +        self.runstate_watcher = create_task(_watcher())
> +        # Kick the loop and force the task to block on the event.
> +        await asyncio.sleep(0)
> +
> +
> +class State(TestBase):
> +
> +    async def testSuperfluousDisconnect_(self):
> +        await self._watch_runstates(
> +            Runstate.DISCONNECTING,
> +            Runstate.IDLE,
> +        )
> +        await self.proto.disconnect()
> +
> +    def testSuperfluousDisconnect(self):
> +        self._asyncRunner(self.testSuperfluousDisconnect_())
> +
> +
> +class Connect(TestBase):
> +
> +    async def _bad_connection(self, family: str):
> +        assert family in ('INET', 'UNIX')
> +
> +        if family == 'INET':
> +            await self.proto.connect(('127.0.0.1', 0))
> +        elif family == 'UNIX':
> +            await self.proto.connect('/dev/null')
> +
> +    async def _hanging_connection(self):
> +        with jammed_socket() as addr:
> +            await self.proto.connect(addr)
> +
> +    async def _bad_connection_test(self, family: str):
> +        await self._watch_runstates(*self.BAD_CONNECTION_STATES)
> +
> +        with self.assertRaises(ConnectError) as context:
> +            await self._bad_connection(family)
> +
> +        self.assertIsInstance(context.exception.exc, OSError)
> +        self.assertEqual(
> +            context.exception.error_message,
> +            "Failed to establish connection"
> +        )
> +
> +    def testBadINET(self):
> +        self._asyncRunner(self._bad_connection_test('INET'))
> +        # self.assertIsInstance(err.exc, ConnectionRefusedError)
> +
> +    def testBadUNIX(self):
> +        self._asyncRunner(self._bad_connection_test('UNIX'))
> +        # self.assertIsInstance(err.exc, ConnectionRefusedError)
> +
> +    async def testCancellation_(self):
> +        # Note that accept() cannot be cancelled outright, as it isn't a task.
> +        # However, we can wrap it in a task and cancel *that*.
> +        await self._watch_runstates(*self.BAD_CONNECTION_STATES)
> +        task = run_as_task(self._hanging_connection(), allow_cancellation=True)
> +
> +        state = await self.proto.runstate_changed()
> +        self.assertEqual(state, Runstate.CONNECTING)
> +
> +        # This is insider baseball, but the connection attempt has
> +        # yielded *just* before the actual connection attempt, so kick
> +        # the loop to make sure it's truly wedged.
> +        await asyncio.sleep(0)
> +
> +        task.cancel()
> +        await task
> +
> +    def testCancellation(self):
> +        self._asyncRunner(self.testCancellation_())
> +
> +    async def testTimeout_(self):
> +        await self._watch_runstates(*self.BAD_CONNECTION_STATES)
> +        task = run_as_task(self._hanging_connection())
> +
> +        # More insider baseball: to improve the speed of this test while
> +        # guaranteeing that the connection even gets a chance to start,
> +        # verify that the connection hangs *first*, then await the
> +        # result of the task with a nearly-zero timeout.
> +
> +        state = await self.proto.runstate_changed()
> +        self.assertEqual(state, Runstate.CONNECTING)
> +        await asyncio.sleep(0)
> +
> +        with self.assertRaises(asyncio.TimeoutError):
> +            await asyncio.wait_for(task, timeout=0)
> +
> +    def testTimeout(self):
> +        self._asyncRunner(self.testTimeout_())
> +
> +    async def testRequire_(self):
> +        await self._watch_runstates(*self.BAD_CONNECTION_STATES)
> +        task = run_as_task(self._hanging_connection(), allow_cancellation=True)
> +
> +        state = await self.proto.runstate_changed()
> +        self.assertEqual(state, Runstate.CONNECTING)
> +
> +        with self.assertRaises(StateError) as context:
> +            await self._bad_connection('UNIX')
> +
> +        self.assertEqual(
> +            context.exception.error_message,
> +            "NullProtocol is currently connecting."
> +        )
> +        self.assertEqual(context.exception.state, Runstate.CONNECTING)
> +        self.assertEqual(context.exception.required, Runstate.IDLE)
> +
> +        task.cancel()
> +        await task
> +
> +    def testRequire(self):
> +        self._asyncRunner(self.testRequire_())
> +
> +    async def testImplicitRunstateInit_(self):
> +        # This tests what happens if we do not wait on the
> +        # runstate until AFTER we connect, i.e., connect()/accept()
> +        # themselves initialize the runstate event. All of the above
> +        # tests force the initialization by waiting on the runstate
> +        # *first*.
> +        task = run_as_task(self._hanging_connection(), allow_cancellation=True)
> +
> +        # Kick the loop to coerce the state change
> +        await asyncio.sleep(0)
> +        assert self.proto.runstate == Runstate.CONNECTING
> +
> +        # We already missed the transition to CONNECTING
> +        await self._watch_runstates(Runstate.DISCONNECTING, Runstate.IDLE)
> +
> +        task.cancel()
> +        await task
> +
> +    def testImplicitRunstateInit(self):
> +        self._asyncRunner(self.testImplicitRunstateInit_())
> +
> +
> +class Accept(Connect):
> +
> +    async def _bad_connection(self, family: str):
> +        assert family in ('INET', 'UNIX')
> +
> +        if family == 'INET':
> +            await self.proto.accept(('example.com', 1))
> +        elif family == 'UNIX':
> +            await self.proto.accept('/dev/null')
> +
> +    async def _hanging_connection(self):
> +        with TemporaryDirectory(suffix='.aqmp') as tmpdir:
> +            sock = os.path.join(tmpdir, type(self.proto).__name__ + ".sock")
> +            await self.proto.accept(sock)
> +
> +
> +class FakeSession(TestBase):
> +
> +    def setUp(self):
> +        super().setUp()
> +        self.proto.fake_session = True
> +
> +    async def _asyncSetUp(self):
> +        await super()._asyncSetUp()
> +        await self._watch_runstates(*self.GOOD_CONNECTION_STATES)
> +
> +    async def _asyncTearDown(self):
> +        await self.proto.disconnect()
> +        await super()._asyncTearDown()
> +
> +    ####
> +
> +    async def testFakeConnect_(self):
> +        await self.proto.connect('/not/a/real/path')
> +        self.assertEqual(self.proto.runstate, Runstate.RUNNING)
> +
> +    def testFakeConnect(self):
> +        """Test the full state lifecycle (via connect) with a no-op session."""
> +        self._asyncRunner(self.testFakeConnect_())
> +
> +    async def testFakeAccept_(self):
> +        await self.proto.accept('/not/a/real/path')
> +        self.assertEqual(self.proto.runstate, Runstate.RUNNING)
> +
> +    def testFakeAccept(self):
> +        """Test the full state lifecycle (via accept) with a no-op session."""
> +        self._asyncRunner(self.testFakeAccept_())
> +
> +    async def testFakeRecv_(self):
> +        await self.proto.accept('/not/a/real/path')
> +
> +        logname = self.proto.logger.name
> +        with self.assertLogs(logname, level='DEBUG') as context:
> +            self.proto.trigger_input.set()
> +            self.proto.trigger_input.clear()
> +            await asyncio.sleep(0)  # Kick reader.
> +
> +        self.assertEqual(
> +            context.output,
> +            [f"DEBUG:{logname}:<-- None"],
> +        )
> +
> +    def testFakeRecv(self):
> +        """Test receiving a fake/null message."""
> +        self._asyncRunner(self.testFakeRecv_())
> +
> +    async def testFakeSend_(self):
> +        await self.proto.accept('/not/a/real/path')
> +
> +        logname = self.proto.logger.name
> +        with self.assertLogs(logname, level='DEBUG') as context:
> +            # Cheat: Send a Null message to nobody.
> +            await self.proto.send_msg()
> +            # Kick writer; awaiting on a queue.put isn't sufficient to yield.
> +            await asyncio.sleep(0)
> +
> +        self.assertEqual(
> +            context.output,
> +            [f"DEBUG:{logname}:--> None"],
> +        )
> +
> +    def testFakeSend(self):
> +        """Test sending a fake/null message."""
> +        self._asyncRunner(self.testFakeSend_())
> +
> +    async def _prod_session_api(
> +            self,
> +            current_state: Runstate,
> +            error_message: str,
> +            accept: bool = True
> +    ):
> +        with self.assertRaises(StateError) as context:
> +            if accept:
> +                await self.proto.accept('/not/a/real/path')
> +            else:
> +                await self.proto.connect('/not/a/real/path')
> +
> +        self.assertEqual(context.exception.error_message, error_message)
> +        self.assertEqual(context.exception.state, current_state)
> +        self.assertEqual(context.exception.required, Runstate.IDLE)
> +
> +    async def testAcceptRequireRunning_(self):
> +        await self.proto.accept('/not/a/real/path')
> +
> +        await self._prod_session_api(
> +            Runstate.RUNNING,
> +            "NullProtocol is already connected and running.",
> +            accept=True,
> +        )
> +
> +    def testAcceptRequireRunning(self):
> +        """Test that accept() cannot be called when Runstate=RUNNING"""
> +        self._asyncRunner(self.testAcceptRequireRunning_())
> +
> +    async def testConnectRequireRunning_(self):
> +        await self.proto.accept('/not/a/real/path')
> +
> +        await self._prod_session_api(
> +            Runstate.RUNNING,
> +            "NullProtocol is already connected and running.",
> +            accept=False,
> +        )
> +
> +    def testConnectRequireRunning(self):
> +        """Test that connect() cannot be called when Runstate=RUNNING"""
> +        self._asyncRunner(self.testConnectRequireRunning_())
> +
> +    async def testAcceptRequireDisconnecting_(self):
> +        await self.proto.accept('/not/a/real/path')
> +
> +        # Cheat: force a disconnect.
> +        await self.proto.simulate_disconnect()
> +
> +        await self._prod_session_api(
> +            Runstate.DISCONNECTING,
> +            ("NullProtocol is disconnecting."
> +             " Call disconnect() to return to IDLE state."),
> +            accept=True,
> +        )
> +
> +    def testAcceptRequireDisconnecting(self):
> +        """Test that accept() cannot be called when Runstate=DISCONNECTING"""
> +        self._asyncRunner(self.testAcceptRequireDisconnecting_())
> +
> +    async def testConnectRequireDisconnecting_(self):
> +        await self.proto.accept('/not/a/real/path')
> +
> +        # Cheat: force a disconnect.
> +        await self.proto.simulate_disconnect()
> +
> +        await self._prod_session_api(
> +            Runstate.DISCONNECTING,
> +            ("NullProtocol is disconnecting."
> +             " Call disconnect() to return to IDLE state."),
> +            accept=False,
> +        )
> +
> +    def testConnectRequireDisconnecting(self):
> +        """Test that connect() cannot be called when Runstate=DISCONNECTING"""
> +        self._asyncRunner(self.testConnectRequireDisconnecting_())
> -- 
> 2.31.1

Besides that, I just would like to bring to the table that Avocado has
now a basic support for coroutines as tests that might help here. IIUC,
some of the boilerplate code (and duplicated methods) could be removed
with this:

https://github.com/avocado-framework/avocado/pull/4788

In any case, I understand if the latest version is not an option here,
so:

Reviewed-by: Beraldo Leal <bleal@redhat.com>

Thanks,
--
Beraldo



^ permalink raw reply	[flat|nested] 36+ messages in thread

* Re: [PATCH v2 00/24] python: introduce Asynchronous QMP package
  2021-07-17  0:32 [PATCH v2 00/24] python: introduce Asynchronous QMP package John Snow
                   ` (23 preceding siblings ...)
  2021-07-17  0:32 ` [PATCH v2 24/24] python/aqmp: add AsyncProtocol unit tests John Snow
@ 2021-07-21 17:03 ` Niteesh G. S.
       [not found]   ` <CAFn=p-YciuuRySs1F82ZyP_QGed=fbRZmzH3v7VNtdV-xM-XaA@mail.gmail.com>
  24 siblings, 1 reply; 36+ messages in thread
From: Niteesh G. S. @ 2021-07-21 17:03 UTC (permalink / raw)
  To: John Snow
  Cc: Eduardo Habkost, Stefan Hajnoczi, qemu-devel,
	Wainer dos Santos Moschetta, Markus Armbruster, Willian Rampazzo,
	Cleber Rosa, Eric Blake

[-- Attachment #1: Type: text/plain, Size: 15055 bytes --]

Hello all,

I recently rebased(incrementally) my TUI on this V2 patch and faced an
issue.
https://gitlab.com/niteesh.gs/qemu/-/commits/aqmp-tui-prototype-v3
I decided to rebase incrementally so that I can address some of the
comments posted
in my patch series. While testing out, the initial draft of TUI
which worked fine in the V1
version of AQMP failed in this version.

Disconnecting from a fully connected state doesn't exit cleanly.
---------------------------------------------------------------------------------
To reproduce the issue:
1) Initiate a QMP server
2) Connect the TUI to the server using aqmp-tui localhost:1234 --log-file
log.txt
3) Once the TUI is connected and running, press 'Esc' to exit the app. This
should result
in the following exception.
--------------------------------------------------------------------------------------------------------------------------------------------
Transitioning from 'Runstate.IDLE' to 'Runstate.CONNECTING'.
Connecting to ('localhost', 1234) ...
Connected.
Awaiting greeting ...
Response: {
  "QMP": {
    .......... Skipping
  }
}
Negotiating capabilities ...
Request: {
  "execute": "qmp_capabilities",
    .......... Skipping
  }
}
Response: {
  "return": {}
}
Transitioning from 'Runstate.CONNECTING' to 'Runstate.RUNNING'.
Transitioning from 'Runstate.RUNNING' to 'Runstate.DISCONNECTING'.
Scheduling disconnect.
Draining the outbound queue ...
Flushing the StreamWriter ...
Cancelling writer task ...
Task.Writer: cancelled.
Task.Writer: exiting.
Cancelling reader task ...
Task.Reader: cancelled.
Task.Reader: exiting.
Closing StreamWriter.
Waiting for StreamWriter to close ...
QMP Disconnected.
Transitioning from 'Runstate.DISCONNECTING' to 'Runstate.IDLE'.
_kill_app: Connection lost
Connection lost
  | Traceback (most recent call last):
  |   File "/home/niteesh/development/qemu/python/qemu/aqmp/aqmp_tui.py",
line 246, in run
  |     main_loop.run()
  |   File
"/home/niteesh/development/qemu/python/.venv/lib/python3.6/site-packages/urwid/main_loop.py",
line 287, in run
  |     self._run()
  |   File
"/home/niteesh/development/qemu/python/.venv/lib/python3.6/site-packages/urwid/main_loop.py",
line 385, in _run
  |     self.event_loop.run()
  |   File
"/home/niteesh/development/qemu/python/.venv/lib/python3.6/site-packages/urwid/main_loop.py",
line 1494, in run
  |     reraise(*exc_info)
  |   File
"/home/niteesh/development/qemu/python/.venv/lib/python3.6/site-packages/urwid/compat.py",
line 58, in reraise
  |     raise value
  |   File "/home/niteesh/development/qemu/python/qemu/aqmp/aqmp_tui.py",
line 206, in _kill_app
  |     raise err
  |   File "/home/niteesh/development/qemu/python/qemu/aqmp/aqmp_tui.py",
line 201, in _kill_app
  |     await self.disconnect()
  |   File "/home/niteesh/development/qemu/python/qemu/aqmp/protocol.py",
line 303, in disconnect
  |     await self._wait_disconnect()
  |   File "/home/niteesh/development/qemu/python/qemu/aqmp/protocol.py",
line 573, in _wait_disconnect
  |     await self._dc_task
  |   File "/home/niteesh/development/qemu/python/qemu/aqmp/qmp_client.py",
line 316, in _bh_disconnect
  |     await super()._bh_disconnect()
  |   File "/home/niteesh/development/qemu/python/qemu/aqmp/protocol.py",
line 644, in _bh_disconnect
  |     await wait_closed(self._writer)
  |   File "/home/niteesh/development/qemu/python/qemu/aqmp/util.py", line
137, in wait_closed
  |     await flush(writer)
  |   File "/home/niteesh/development/qemu/python/qemu/aqmp/util.py", line
49, in flush
  |     await writer.drain()
  |   File "/usr/lib/python3.6/asyncio/streams.py", line 339, in drain
  |     yield from self._protocol._drain_helper()
  |   File "/usr/lib/python3.6/asyncio/streams.py", line 210, in
_drain_helper
  |     raise ConnectionResetError('Connection lost')
  | ConnectionResetError: Connection lost
--------------------------------------------------------------------------------------------------------------------------------------------


On Sat, Jul 17, 2021 at 6:03 AM John Snow <jsnow@redhat.com> wrote:

> GitLab: https://gitlab.com/jsnow/qemu/-/commits/python-async-qmp-aqmp
> CI: https://gitlab.com/jsnow/qemu/-/pipelines/338508045
> Docs: https://people.redhat.com/~jsnow/sphinx/html/qemu.aqmp.html
>
> Hi!
>
> This patch series adds an Asynchronous QMP package to the Python
> library. It offers a few improvements over the previous library:
>
> - out-of-band support
> - true asynchronous event support
> - avoids undocumented interfaces abusing non-blocking sockets
> - unit tests!
> - documentation!
>
> This library serves as the basis for a new qmp-shell program that will
> offer improved reconnection support, true asynchronous display of
> events, VM and job status update notifiers, and so on.
>
> My intent is to eventually publish this library directly to PyPI as a
> standalone package. I would like to phase out our usage of the old QMP
> library over time; eventually replacing it entirely with this one.
>
> This series looks big by line count, but it's *mostly*
> docstrings. Seriously!
>
> This package has *no* external dependencies whatsoever.
>
> Notes & Design
> ==============
>
> Here are some notes on the design of how the library works, to serve as
> a primer for review; however I also **highly recommend** browsing the
> generated Sphinx documentation for this series.
>
> Here's that link again:
> https://people.redhat.com/~jsnow/sphinx/html/qemu.aqmp.html
>
> The core machinery is split between the AsyncProtocol and QMP
> classes. AsyncProtocol provides the generic machinery, while QMP
> provides the QMP-specific details.
>
> The design uses two independent coroutines that act as the "bottom
> half", a writer task and a reader task. These tasks run for the duration
> of the connection and independently send and receive messages,
> respectively.
>
> A third task, disconnect, is scheduled asynchronously whenever an
> unrecoverable error occurs and facilitates coalescing of the other two
> tasks.
>
> This diagram for how execute() operates may be helpful for understanding
> how AsyncProtocol is laid out. The arrows indicate the direction of a
> QMP message; the long horizontal dash indicates the separation between
> the upper and lower half of the event loop. The queue mechanisms between
> both dashes serve as the intermediaries between the upper and lower
> half.
>
>                        +---------+
>                        | caller  |
>                        +---------+
>                            ^ |
>                            | v
>                        +---------+
>      +---------------> |execute()| -----------+
>      |                 +---------+            |
>      |                                        |
> [-----------------------------------------------------------]
>      |                                        |
>      |                                        v
> +----+------+    +----------------+    +------+-------+
> | ExecQueue |    | EventListeners |    |Outbound Queue|
> +----+------+    +----+-----------+    +------+-------+
>      ^                ^                       |
>      |                |                       |
> [-----------------------------------------------------------]
>      |                |                       |
>      |                |                       v
>   +--+----------------+---+       +-----------+-----------+
>   | Reader Task/Coroutine |       | Writer Task/Coroutine |
>   +-----------+-----------+       +-----------+-----------+
>               ^                               |
>               |                               v
>         +-----+------+                  +-----+------+
>         |StreamReader|                  |StreamWriter|
>         +------------+                  +------------+
>
> The caller will invoke execute(), which in turn will deposit a message
> in the outbound send queue. This will wake up the writer task, which
> well send the message over the wire.
>
> The execute() method will then yield to wait for a reply delivered to an
> execution queue created solely for that execute statement.
>
> When a message arrives, the Reader task will unblock and route the
> message either to the EventListener subsystem, or place it in the
> appropriate pending execution queue.
>
> Once a message is placed in the pending execution queue, execute() will
> unblock and the execution will conclude, returning the result of the RPC
> call to the caller.
>
> Patch Layout
> ============
>
> Patches 1-4   add tiny pre-requisites, utilities, etc.
> Patches 5-12  add a generic async message-based protocol class,
>               AsyncProtocol. They are split fairly small and should
>               be reasonably self-contained.
> Patches 13-15 check in more QMP-centric components.
> Patches 16-21 add qmp_client.py, with a new 'QMPClient()' class.
>               They're split into reasonably tiny pieces here.
> Patches 22-23 add a few finishing touches, they are small patches.
> Patch 24      adds unit tests. They're maybe a little messy yet, but
>               they've been quite helpful to me so far. Coverage of
>               protocol.py is at about 86%.
>
> Future Work
> ===========
>
> These items are in progress:
>
> - A synchronous QMP wrapper that allows this library to be easily used
>   from non-async code; this will also allow me to prove it works well by
>   demoing its replacement throughout iotests. I have all of iotests
>   passing locally, but I am still seeing some failures on gitlab CI I
>   need to diagnose, possibly a race condition somewhere.
>
> - A QMP server class; to facilitate writing of unit tests. It's done,
>   but needs some polish and tests.
>
> - More unit tests for qmp_client.py, qmp_server.py and other modules.
>
> Changelog
> =========
>
> V2:
>
> Renamed classes/methods:
>
> - Renamed qmp_protocol.py to qmp_client.py
> - Renamed 'QMP' class to 'QMPClient'
> - Renamed _begin_new_session() to _establish_session()
> - Split _establish_connection() out from _new_session().
> - Removed _results() method
>
> Bugfixes:
>
> - Suppress duplicate Exceptions when attempting to drain the
>   StreamWriter
> - Delay initialization of asyncio.Queue and asyncio.Event variables to
>   _new_session or later -- they must not be created outside of the loop,
>   even if they are not async functions.
> - Rework runstate_changed events to guarantee visibility of events to
>   waiters
> - Improve connect()/accept() cleanup to work with
>   asyncio.CancelledError, asyncio.TimeoutError
> - No-argument form of Message() now succeeds properly.
> - flush utility will correctly yield when data is below the "high water
>   mark", giving the stream a chance to actually flush.
> - Increase read buffer size to accommodate query-qmp-schema (Thanks
>   Niteesh)
>
> Ugly bits from V1 removed:
>
> - Remove tertiary filtering from EventListener (for now), accompanying
>   documentation removed from events.py
> - Use asyncio.wait() instead of custom wait_task_done()
> - MultiException is removed in favor of just raising the first Exception
>   that occurs in the bottom half; other Exceptions if any are logged
>   instead.
>
> Improvements:
>
> - QMPClient now allows ID-less execution statements via the _raw()
>   interface.
> - Add tests that grant ~86% coverage of protocol.py to the avocado test
>   suite.
> - Removed 'force' parameter from _bh_disconnect; the disconnection
>   routine determines for itself if we are in the error pathway or not
>   instead now.  This removes any chance of duplicate calls to
>   _schedule_disconnect accidentally dropping the 'force' setting.
>
> Debugging/Testing changes:
>
> - Add debug: bool parameter to asyncio_run utility wrapper
> - Improve error messages for '@require' decorator
> - Add debugging message for state change events
> - Avoid flushing the StreamWriter if we don't have one (This
>   circumstance only arises in testing, but it's helpful.)
> - Improved __repr__ method for AsyncProtocol, and removed __str__
>   method.  enforcing eval(__repr__(x)) == x does not make sense for
>   AsyncProtocol.
> - Misc logging message changes
> - Add a suite of fancy Task debugging utilities.
> - Most tracebacks now log at the DEBUG level instead of
>   CRITICAL/ERROR/WARNING; In those error cases, a one-line summary is
>   logged instead.
>
> Misc. aesthetic changes:
>
> - Misc docstring fixes, whitespace, etc.
> - Reordered the definition of some methods to try and keep similar
>   methods near each other (Moved _cleanup near _bh_disconnect in
>   QMPClient.)
>
> ~ Shucks Howdy, Gee Golly!
>
> John Snow (24):
>   python/aqmp: add asynchronous QMP (AQMP) subpackage
>   python/aqmp: add error classes
>   python/pylint: Add exception for TypeVar names ('T')
>   python/aqmp: add asyncio compatibility wrappers
>   python/aqmp: add generic async message-based protocol support
>   python/aqmp: add runstate state machine to AsyncProtocol
>   python/aqmp: Add logging utility helpers
>   python/aqmp: add logging to AsyncProtocol
>   python/aqmp: add AsyncProtocol.accept() method
>   python/aqmp: add configurable read buffer limit
>   python/aqmp: add _cb_inbound and _cb_inbound logging hooks
>   python/aqmp: add AsyncProtocol._readline() method
>   python/aqmp: add QMP Message format
>   python/aqmp: add well-known QMP object models
>   python/aqmp: add QMP event support
>   python/pylint: disable too-many-function-args
>   python/aqmp: add QMP protocol support
>   python/pylint: disable no-member check
>   python/aqmp: Add message routing to QMP protocol
>   python/aqmp: add execute() interfaces
>   python/aqmp: add _raw() execution interface
>   python/aqmp: add asyncio_run compatibility wrapper
>   python/aqmp: add scary message
>   python/aqmp: add AsyncProtocol unit tests
>
>  python/qemu/aqmp/__init__.py   |  58 +++
>  python/qemu/aqmp/error.py      |  50 ++
>  python/qemu/aqmp/events.py     | 706 ++++++++++++++++++++++++++
>  python/qemu/aqmp/message.py    | 209 ++++++++
>  python/qemu/aqmp/models.py     | 133 +++++
>  python/qemu/aqmp/protocol.py   | 882 +++++++++++++++++++++++++++++++++
>  python/qemu/aqmp/py.typed      |   0
>  python/qemu/aqmp/qmp_client.py | 621 +++++++++++++++++++++++
>  python/qemu/aqmp/util.py       | 207 ++++++++
>  python/setup.cfg               |   5 +-
>  python/tests/null_proto.py     |  67 +++
>  python/tests/protocol.py       | 458 +++++++++++++++++
>  12 files changed, 3395 insertions(+), 1 deletion(-)
>  create mode 100644 python/qemu/aqmp/__init__.py
>  create mode 100644 python/qemu/aqmp/error.py
>  create mode 100644 python/qemu/aqmp/events.py
>  create mode 100644 python/qemu/aqmp/message.py
>  create mode 100644 python/qemu/aqmp/models.py
>  create mode 100644 python/qemu/aqmp/protocol.py
>  create mode 100644 python/qemu/aqmp/py.typed
>  create mode 100644 python/qemu/aqmp/qmp_client.py
>  create mode 100644 python/qemu/aqmp/util.py
>  create mode 100644 python/tests/null_proto.py
>  create mode 100644 python/tests/protocol.py
>
> --
> 2.31.1
>
>
>

[-- Attachment #2: Type: text/html, Size: 18889 bytes --]

^ permalink raw reply	[flat|nested] 36+ messages in thread

* Re: [PATCH v2 00/24] python: introduce Asynchronous QMP package
       [not found]     ` <CAN6ztm-LKWMZTURfE_q0bWpoXVKGMoqmm2jj4_CTb_kj-kEjYg@mail.gmail.com>
@ 2021-07-21 19:55       ` John Snow
  2021-07-21 20:02         ` Niteesh G. S.
  0 siblings, 1 reply; 36+ messages in thread
From: John Snow @ 2021-07-21 19:55 UTC (permalink / raw)
  To: Niteesh G. S.
  Cc: Eduardo Habkost, Stefan Hajnoczi, qemu-devel,
	Wainer dos Santos Moschetta, Markus Armbruster, Willian Rampazzo,
	Cleber Rosa, Eric Blake

[-- Attachment #1: Type: text/plain, Size: 7027 bytes --]

Looping qemu-devel back in: I removed them by accident by not hitting
reply-all :(

On Wed, Jul 21, 2021 at 2:06 PM Niteesh G. S. <niteesh.gs@gmail.com> wrote:

>
>
> On Wed, Jul 21, 2021 at 11:03 PM John Snow <jsnow@redhat.com> wrote:
>
>>
>>
>> On Wed, Jul 21, 2021 at 1:04 PM Niteesh G. S. <niteesh.gs@gmail.com>
>> wrote:
>>
>>> Hello all,
>>>
>>> I recently rebased(incrementally) my TUI on this V2 patch and faced an
>>> issue.
>>> https://gitlab.com/niteesh.gs/qemu/-/commits/aqmp-tui-prototype-v3
>>> I decided to rebase incrementally so that I can address some of the
>>> comments posted
>>> in my patch series. While testing out, the initial draft of TUI
>>> which worked fine in the V1
>>> version of AQMP failed in this version.
>>>
>>> Disconnecting from a fully connected state doesn't exit cleanly.
>>>
>>> ---------------------------------------------------------------------------------
>>> To reproduce the issue:
>>> 1) Initiate a QMP server
>>>
>>
>> Please provide the command line.
>>
> qemu-system-x86_64 -qmp tcp:localhost:1234,server,wait=on
>
>>
>>
>>> 2) Connect the TUI to the server using aqmp-tui localhost:1234
>>> --log-file log.txt
>>>
>>
>> The entry point isn't defined yet in your series, so I will assume
>> "python3 -m qemu.aqmp.aqmp_tui localhost:1234" should work here.
>>
> Yup, sorry about that. I realized this later when recreated the venv.
>
>>
>>
>>> 3) Once the TUI is connected and running, press 'Esc' to exit the app.
>>> This should result
>>> in the following exception.
>>>
>>> --------------------------------------------------------------------------------------------------------------------------------------------
>>> Transitioning from 'Runstate.IDLE' to 'Runstate.CONNECTING'.
>>> Connecting to ('localhost', 1234) ...
>>> Connected.
>>> Awaiting greeting ...
>>> Response: {
>>>   "QMP": {
>>>     .......... Skipping
>>>   }
>>> }
>>> Negotiating capabilities ...
>>> Request: {
>>>   "execute": "qmp_capabilities",
>>>     .......... Skipping
>>>   }
>>> }
>>> Response: {
>>>   "return": {}
>>> }
>>> Transitioning from 'Runstate.CONNECTING' to 'Runstate.RUNNING'.
>>> Transitioning from 'Runstate.RUNNING' to 'Runstate.DISCONNECTING'.
>>> Scheduling disconnect.
>>> Draining the outbound queue ...
>>> Flushing the StreamWriter ...
>>> Cancelling writer task ...
>>> Task.Writer: cancelled.
>>> Task.Writer: exiting.
>>> Cancelling reader task ...
>>> Task.Reader: cancelled.
>>> Task.Reader: exiting.
>>> Closing StreamWriter.
>>> Waiting for StreamWriter to close ...
>>> QMP Disconnected.
>>> Transitioning from 'Runstate.DISCONNECTING' to 'Runstate.IDLE'.
>>> _kill_app: Connection lost
>>> Connection lost
>>>   | Traceback (most recent call last):
>>>   |   File
>>> "/home/niteesh/development/qemu/python/qemu/aqmp/aqmp_tui.py", line 246, in
>>> run
>>>   |     main_loop.run()
>>>   |   File
>>> "/home/niteesh/development/qemu/python/.venv/lib/python3.6/site-packages/urwid/main_loop.py",
>>> line 287, in run
>>>   |     self._run()
>>>   |   File
>>> "/home/niteesh/development/qemu/python/.venv/lib/python3.6/site-packages/urwid/main_loop.py",
>>> line 385, in _run
>>>   |     self.event_loop.run()
>>>   |   File
>>> "/home/niteesh/development/qemu/python/.venv/lib/python3.6/site-packages/urwid/main_loop.py",
>>> line 1494, in run
>>>   |     reraise(*exc_info)
>>>   |   File
>>> "/home/niteesh/development/qemu/python/.venv/lib/python3.6/site-packages/urwid/compat.py",
>>> line 58, in reraise
>>>   |     raise value
>>>   |   File
>>> "/home/niteesh/development/qemu/python/qemu/aqmp/aqmp_tui.py", line 206, in
>>> _kill_app
>>>   |     raise err
>>>   |   File
>>> "/home/niteesh/development/qemu/python/qemu/aqmp/aqmp_tui.py", line 201, in
>>> _kill_app
>>>   |     await self.disconnect()
>>>   |   File
>>> "/home/niteesh/development/qemu/python/qemu/aqmp/protocol.py", line 303, in
>>> disconnect
>>>   |     await self._wait_disconnect()
>>>   |   File
>>> "/home/niteesh/development/qemu/python/qemu/aqmp/protocol.py", line 573, in
>>> _wait_disconnect
>>>   |     await self._dc_task
>>>   |   File
>>> "/home/niteesh/development/qemu/python/qemu/aqmp/qmp_client.py", line 316,
>>> in _bh_disconnect
>>>   |     await super()._bh_disconnect()
>>>   |   File
>>> "/home/niteesh/development/qemu/python/qemu/aqmp/protocol.py", line 644, in
>>> _bh_disconnect
>>>   |     await wait_closed(self._writer)
>>>   |   File "/home/niteesh/development/qemu/python/qemu/aqmp/util.py",
>>> line 137, in wait_closed
>>>   |     await flush(writer)
>>>   |   File "/home/niteesh/development/qemu/python/qemu/aqmp/util.py",
>>> line 49, in flush
>>>   |     await writer.drain()
>>>   |   File "/usr/lib/python3.6/asyncio/streams.py", line 339, in drain
>>>   |     yield from self._protocol._drain_helper()
>>>   |   File "/usr/lib/python3.6/asyncio/streams.py", line 210, in
>>> _drain_helper
>>>   |     raise ConnectionResetError('Connection lost')
>>>   | ConnectionResetError: Connection lost
>>>
>>> --------------------------------------------------------------------------------------------------------------------------------------------
>>>
>>>
>> I can't reproduce in Python 3.9, but I *can* reproduce in python 3.6
>> using the pipenv environment; i.e.
>>
>> > make check-pipenv
>> > pipenv shell
>> > python3 -m qemu.aqmp.aqmp_tui 127.0.0.1:1234
>>
>> What python version are you using to see this failure? Is it 3.6 ?
>>
> Yes, I was using python 3.6. I just tried it on 3.8 and I don't face this
> issue.
>
>>
>> It seems like the wait_closed() wrapper I wrote isn't quite compatible
>> with Python 3.6, it looks like it's not really safe to try and flush a
>> closing socket. I was doing so in an attempt to tell when the socket had
>> finished closing out its buffer (expecting it to normally be a no-op) but
>> in this case even a no-op drain in 3.6 seems to raise an error if we
>> attempt it after we've asked for the socket to close.
>>
>
>
>> wait_closed() was added in Python 3.7 and we just don't have access to it
>> here ... I'm not sure if there's something else we can do here to serve as
>> a workaround for not having this function.
>>
>> --js
>>
>>
I can't find a *nice* workaround, but I found one that should probably work
in most universes. We can remove this ugly code when we support 3.7 as a
minimum. However, please try this patch as a fixup:

diff --git a/python/qemu/aqmp/util.py b/python/qemu/aqmp/util.py
index de0df44cbd7..eaa5fc7d5f9 100644
--- a/python/qemu/aqmp/util.py
+++ b/python/qemu/aqmp/util.py
@@ -134,7 +134,17 @@ async def wait_closed(writer: asyncio.StreamWriter) ->
None:

     while not transport.is_closing():
         await asyncio.sleep(0)
-    await flush(writer)
+
+    # This is an ugly workaround, but it's the best I can come up with.
+    sock = transport.get_extra_info('socket')
+
+    if sock is None:
+        # Our transport doesn't have a socket? ...
+        # Nothing we can reasonably do.
+        return
+
+    while sock.fileno() != -1:
+        await asyncio.sleep(0)

[-- Attachment #2: Type: text/html, Size: 10539 bytes --]

^ permalink raw reply related	[flat|nested] 36+ messages in thread

* Re: [PATCH v2 00/24] python: introduce Asynchronous QMP package
  2021-07-21 19:55       ` John Snow
@ 2021-07-21 20:02         ` Niteesh G. S.
  0 siblings, 0 replies; 36+ messages in thread
From: Niteesh G. S. @ 2021-07-21 20:02 UTC (permalink / raw)
  To: John Snow
  Cc: Eduardo Habkost, Stefan Hajnoczi, qemu-devel,
	Wainer dos Santos Moschetta, Markus Armbruster, Willian Rampazzo,
	Cleber Rosa, Eric Blake

[-- Attachment #1: Type: text/plain, Size: 7434 bytes --]

On Thu, Jul 22, 2021 at 1:25 AM John Snow <jsnow@redhat.com> wrote:

> Looping qemu-devel back in: I removed them by accident by not hitting
> reply-all :(
>
> On Wed, Jul 21, 2021 at 2:06 PM Niteesh G. S. <niteesh.gs@gmail.com>
> wrote:
>
>>
>>
>> On Wed, Jul 21, 2021 at 11:03 PM John Snow <jsnow@redhat.com> wrote:
>>
>>>
>>>
>>> On Wed, Jul 21, 2021 at 1:04 PM Niteesh G. S. <niteesh.gs@gmail.com>
>>> wrote:
>>>
>>>> Hello all,
>>>>
>>>> I recently rebased(incrementally) my TUI on this V2 patch and faced an
>>>> issue.
>>>> https://gitlab.com/niteesh.gs/qemu/-/commits/aqmp-tui-prototype-v3
>>>> I decided to rebase incrementally so that I can address some of the
>>>> comments posted
>>>> in my patch series. While testing out, the initial draft of TUI
>>>> which worked fine in the V1
>>>> version of AQMP failed in this version.
>>>>
>>>> Disconnecting from a fully connected state doesn't exit cleanly.
>>>>
>>>> ---------------------------------------------------------------------------------
>>>> To reproduce the issue:
>>>> 1) Initiate a QMP server
>>>>
>>>
>>> Please provide the command line.
>>>
>> qemu-system-x86_64 -qmp tcp:localhost:1234,server,wait=on
>>
>>>
>>>
>>>> 2) Connect the TUI to the server using aqmp-tui localhost:1234
>>>> --log-file log.txt
>>>>
>>>
>>> The entry point isn't defined yet in your series, so I will assume
>>> "python3 -m qemu.aqmp.aqmp_tui localhost:1234" should work here.
>>>
>> Yup, sorry about that. I realized this later when recreated the venv.
>>
>>>
>>>
>>>> 3) Once the TUI is connected and running, press 'Esc' to exit the app.
>>>> This should result
>>>> in the following exception.
>>>>
>>>> --------------------------------------------------------------------------------------------------------------------------------------------
>>>> Transitioning from 'Runstate.IDLE' to 'Runstate.CONNECTING'.
>>>> Connecting to ('localhost', 1234) ...
>>>> Connected.
>>>> Awaiting greeting ...
>>>> Response: {
>>>>   "QMP": {
>>>>     .......... Skipping
>>>>   }
>>>> }
>>>> Negotiating capabilities ...
>>>> Request: {
>>>>   "execute": "qmp_capabilities",
>>>>     .......... Skipping
>>>>   }
>>>> }
>>>> Response: {
>>>>   "return": {}
>>>> }
>>>> Transitioning from 'Runstate.CONNECTING' to 'Runstate.RUNNING'.
>>>> Transitioning from 'Runstate.RUNNING' to 'Runstate.DISCONNECTING'.
>>>> Scheduling disconnect.
>>>> Draining the outbound queue ...
>>>> Flushing the StreamWriter ...
>>>> Cancelling writer task ...
>>>> Task.Writer: cancelled.
>>>> Task.Writer: exiting.
>>>> Cancelling reader task ...
>>>> Task.Reader: cancelled.
>>>> Task.Reader: exiting.
>>>> Closing StreamWriter.
>>>> Waiting for StreamWriter to close ...
>>>> QMP Disconnected.
>>>> Transitioning from 'Runstate.DISCONNECTING' to 'Runstate.IDLE'.
>>>> _kill_app: Connection lost
>>>> Connection lost
>>>>   | Traceback (most recent call last):
>>>>   |   File
>>>> "/home/niteesh/development/qemu/python/qemu/aqmp/aqmp_tui.py", line 246, in
>>>> run
>>>>   |     main_loop.run()
>>>>   |   File
>>>> "/home/niteesh/development/qemu/python/.venv/lib/python3.6/site-packages/urwid/main_loop.py",
>>>> line 287, in run
>>>>   |     self._run()
>>>>   |   File
>>>> "/home/niteesh/development/qemu/python/.venv/lib/python3.6/site-packages/urwid/main_loop.py",
>>>> line 385, in _run
>>>>   |     self.event_loop.run()
>>>>   |   File
>>>> "/home/niteesh/development/qemu/python/.venv/lib/python3.6/site-packages/urwid/main_loop.py",
>>>> line 1494, in run
>>>>   |     reraise(*exc_info)
>>>>   |   File
>>>> "/home/niteesh/development/qemu/python/.venv/lib/python3.6/site-packages/urwid/compat.py",
>>>> line 58, in reraise
>>>>   |     raise value
>>>>   |   File
>>>> "/home/niteesh/development/qemu/python/qemu/aqmp/aqmp_tui.py", line 206, in
>>>> _kill_app
>>>>   |     raise err
>>>>   |   File
>>>> "/home/niteesh/development/qemu/python/qemu/aqmp/aqmp_tui.py", line 201, in
>>>> _kill_app
>>>>   |     await self.disconnect()
>>>>   |   File
>>>> "/home/niteesh/development/qemu/python/qemu/aqmp/protocol.py", line 303, in
>>>> disconnect
>>>>   |     await self._wait_disconnect()
>>>>   |   File
>>>> "/home/niteesh/development/qemu/python/qemu/aqmp/protocol.py", line 573, in
>>>> _wait_disconnect
>>>>   |     await self._dc_task
>>>>   |   File
>>>> "/home/niteesh/development/qemu/python/qemu/aqmp/qmp_client.py", line 316,
>>>> in _bh_disconnect
>>>>   |     await super()._bh_disconnect()
>>>>   |   File
>>>> "/home/niteesh/development/qemu/python/qemu/aqmp/protocol.py", line 644, in
>>>> _bh_disconnect
>>>>   |     await wait_closed(self._writer)
>>>>   |   File "/home/niteesh/development/qemu/python/qemu/aqmp/util.py",
>>>> line 137, in wait_closed
>>>>   |     await flush(writer)
>>>>   |   File "/home/niteesh/development/qemu/python/qemu/aqmp/util.py",
>>>> line 49, in flush
>>>>   |     await writer.drain()
>>>>   |   File "/usr/lib/python3.6/asyncio/streams.py", line 339, in drain
>>>>   |     yield from self._protocol._drain_helper()
>>>>   |   File "/usr/lib/python3.6/asyncio/streams.py", line 210, in
>>>> _drain_helper
>>>>   |     raise ConnectionResetError('Connection lost')
>>>>   | ConnectionResetError: Connection lost
>>>>
>>>> --------------------------------------------------------------------------------------------------------------------------------------------
>>>>
>>>>
>>> I can't reproduce in Python 3.9, but I *can* reproduce in python 3.6
>>> using the pipenv environment; i.e.
>>>
>>> > make check-pipenv
>>> > pipenv shell
>>> > python3 -m qemu.aqmp.aqmp_tui 127.0.0.1:1234
>>>
>>> What python version are you using to see this failure? Is it 3.6 ?
>>>
>> Yes, I was using python 3.6. I just tried it on 3.8 and I don't face this
>> issue.
>>
>>>
>>> It seems like the wait_closed() wrapper I wrote isn't quite compatible
>>> with Python 3.6, it looks like it's not really safe to try and flush a
>>> closing socket. I was doing so in an attempt to tell when the socket had
>>> finished closing out its buffer (expecting it to normally be a no-op) but
>>> in this case even a no-op drain in 3.6 seems to raise an error if we
>>> attempt it after we've asked for the socket to close.
>>>
>>
>>
>>> wait_closed() was added in Python 3.7 and we just don't have access to
>>> it here ... I'm not sure if there's something else we can do here to serve
>>> as a workaround for not having this function.
>>>
>>> --js
>>>
>>>
> I can't find a *nice* workaround, but I found one that should probably
> work in most universes. We can remove this ugly code when we support 3.7 as
> a minimum. However, please try this patch as a fixup:
>
> diff --git a/python/qemu/aqmp/util.py b/python/qemu/aqmp/util.py
> index de0df44cbd7..eaa5fc7d5f9 100644
> --- a/python/qemu/aqmp/util.py
> +++ b/python/qemu/aqmp/util.py
> @@ -134,7 +134,17 @@ async def wait_closed(writer: asyncio.StreamWriter)
> -> None:
>
>      while not transport.is_closing():
>          await asyncio.sleep(0)
> -    await flush(writer)
> +
> +    # This is an ugly workaround, but it's the best I can come up with.
> +    sock = transport.get_extra_info('socket')
> +
> +    if sock is None:
> +        # Our transport doesn't have a socket? ...
> +        # Nothing we can reasonably do.
> +        return
> +
> +    while sock.fileno() != -1:
> +        await asyncio.sleep(0)
>
Thanks for the patch. I am now able to disconnect/quit without any
exceptions.

Thanks,
Niteesh.

>
>
>

[-- Attachment #2: Type: text/html, Size: 11530 bytes --]

^ permalink raw reply	[flat|nested] 36+ messages in thread

* Re: [PATCH v2 24/24] python/aqmp: add AsyncProtocol unit tests
  2021-07-20 20:34   ` Beraldo Leal
@ 2021-08-02 17:24     ` John Snow
  0 siblings, 0 replies; 36+ messages in thread
From: John Snow @ 2021-08-02 17:24 UTC (permalink / raw)
  To: Beraldo Leal
  Cc: Willian Rampazzo, Eduardo Habkost, Markus Armbruster,
	Wainer dos Santos Moschetta, qemu-devel, Niteesh G . S .,
	Stefan Hajnoczi, Cleber Rosa, Eric Blake

[-- Attachment #1: Type: text/plain, Size: 21312 bytes --]

On Tue, Jul 20, 2021 at 4:34 PM Beraldo Leal <bleal@redhat.com> wrote:

> On Fri, Jul 16, 2021 at 08:32:53PM -0400, John Snow wrote:
> > This tests most of protocol.py -- From a hacked up Coverage.py run, it's
> > at about 86%. There's a few error cases that aren't very well tested
> > yet, they're hard to induce artificially so far. I'm working on it.
> >
> > Signed-off-by: John Snow <jsnow@redhat.com>
> > ---
> >  python/tests/null_proto.py |  67 ++++++
> >  python/tests/protocol.py   | 458 +++++++++++++++++++++++++++++++++++++
> >  2 files changed, 525 insertions(+)
> >  create mode 100644 python/tests/null_proto.py
> >  create mode 100644 python/tests/protocol.py
> >
> > diff --git a/python/tests/null_proto.py b/python/tests/null_proto.py
> > new file mode 100644
> > index 00000000000..c697efc0001
> > --- /dev/null
> > +++ b/python/tests/null_proto.py
> > @@ -0,0 +1,67 @@
> > +import asyncio
> > +
> > +from qemu.aqmp.protocol import AsyncProtocol
> > +
> > +
> > +class NullProtocol(AsyncProtocol[None]):
> > +    """
> > +    NullProtocol is a test mockup of an AsyncProtocol implementation.
> > +
> > +    It adds a fake_session instance variable that enables a code path
> > +    that bypasses the actual connection logic, but still allows the
> > +    reader/writers to start.
> > +
> > +    Because the message type is defined as None, an asyncio.Event named
> > +    'trigger_input' is created that prohibits the reader from
> > +    incessantly being able to yield None; this input can be poked to
> > +    simulate an incoming message.
> > +
> > +    For testing symmetry with do_recv, an interface is added to "send" a
> > +    Null message.
> > +
> > +    For testing purposes, a "simulate_disconnection" method is also
> > +    added which allows us to trigger a bottom half disconnect without
> > +    injecting any real errors into the reader/writer loops; in essence
> > +    it performs exactly half of what disconnect() normally does.
> > +    """
> > +    def __init__(self, name=None):
> > +        self.fake_session = False
> > +        self.trigger_input: asyncio.Event
> > +        super().__init__(name)
> > +
> > +    async def _establish_session(self):
> > +        self.trigger_input = asyncio.Event()
> > +        await super()._establish_session()
> > +
> > +    async def _do_accept(self, address, ssl=None):
> > +        if not self.fake_session:
> > +            await super()._do_accept(address, ssl)
> > +
> > +    async def _do_connect(self, address, ssl=None):
> > +        if not self.fake_session:
> > +            await super()._do_connect(address, ssl)
> > +
> > +    async def _do_recv(self) -> None:
> > +        await self.trigger_input.wait()
> > +        self.trigger_input.clear()
> > +
> > +    def _do_send(self, msg: None) -> None:
> > +        pass
> > +
> > +    async def send_msg(self) -> None:
> > +        await self._outgoing.put(None)
> > +
> > +    async def simulate_disconnect(self) -> None:
> > +        # Simulates a bottom half disconnect, e.g. schedules a
> > +        # disconnection but does not wait for it to complete. This is
> > +        # used to put the loop into the DISCONNECTING state without
> > +        # fully quiescing it back to IDLE; this is normally something
> > +        # you cannot coax AsyncProtocol to do on purpose, but it will be
> > +        # similar to what happens with an unhandled Exception in the
> > +        # reader/writer.
> > +        #
> > +        # Under normal circumstances, the library design requires you to
> > +        # await on disconnect(), which awaits the disconnect task and
> > +        # returns bottom half errors as a pre-condition to allowing the
> > +        # loop to return back to IDLE.
> > +        self._schedule_disconnect()
>
> Nitpick: Any reason for not using a docstring? I wouldn't mind if it was
> a docstring instead. ;)
>
>
Nope. I've changed it.


> > diff --git a/python/tests/protocol.py b/python/tests/protocol.py
> > new file mode 100644
> > index 00000000000..2374d01365e
> > --- /dev/null
> > +++ b/python/tests/protocol.py
> > @@ -0,0 +1,458 @@
> > +import asyncio
> > +from contextlib import contextmanager
> > +import os
> > +import socket
> > +from tempfile import TemporaryDirectory
> > +
> > +import avocado
> > +
> > +from qemu.aqmp import ConnectError, Runstate
> > +from qemu.aqmp.protocol import StateError
> > +from qemu.aqmp.util import asyncio_run, create_task
>
> Nitpick: Maybe an isort?
>
>
It actually is isorted, just using some different settings than you're used
to seeing in Avocado.


> > +# An Avocado bug prevents us from defining this testing class in-line
> here:
> > +from null_proto import NullProtocol
>
> Is this what you are looking for?
>
> https://github.com/avocado-framework/avocado/pull/4764
>
> If not, can you point to the right issue, please?
>
>
That's the one. I don't have time to update to v90 right now, so I'm going
to leave it as a todo item, please forgive me! I'll update the comment,
though.


> > +@contextmanager
> > +def jammed_socket():
> > +    # This method opens up a random TCP port on localhost, then jams it.
> > +    socks = []
> > +
> > +    try:
> > +        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
> > +        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
> > +        sock.bind(('127.0.0.1', 0))
> > +        sock.listen(1)
> > +        address = sock.getsockname()
> > +
> > +        socks.append(sock)
> > +
> > +        # I don't *fully* understand why, but it takes *two* un-accepted
> > +        # connections to start jamming the socket.
> > +        for _ in range(2):
> > +            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
> > +            sock.connect(address)
> > +            socks.append(sock)
> > +
> > +        yield address
> > +
> > +    finally:
> > +        for sock in socks:
> > +            sock.close()
> > +
> > +
> > +class Smoke(avocado.Test):
> > +
> > +    def setUp(self):
> > +        self.proto = NullProtocol()
> > +
> > +    def test__repr__(self):
> > +        self.assertEqual(
> > +            repr(self.proto),
> > +            "<NullProtocol runstate=IDLE>"
> > +        )
> > +
> > +    def testRunstate(self):
> > +        self.assertEqual(
> > +            self.proto.runstate,
> > +            Runstate.IDLE
> > +        )
> > +
> > +    def testDefaultName(self):
> > +        self.assertEqual(
> > +            self.proto.name,
> > +            None
> > +        )
> > +
> > +    def testLogger(self):
> > +        self.assertEqual(
> > +            self.proto.logger.name,
> > +            'qemu.aqmp.protocol'
> > +        )
> > +
> > +    def testName(self):
> > +        self.proto = NullProtocol('Steve')
> > +
> > +        self.assertEqual(
> > +            self.proto.name,
> > +            'Steve'
> > +        )
> > +
> > +        self.assertEqual(
> > +            self.proto.logger.name,
> > +            'qemu.aqmp.protocol.Steve'
> > +        )
> > +
> > +        self.assertEqual(
> > +            repr(self.proto),
> > +            "<NullProtocol name='Steve' runstate=IDLE>"
> > +        )
> > +
> > +
> > +class TestBase(avocado.Test):
> > +
> > +    def setUp(self):
> > +        self.proto = NullProtocol(type(self).__name__)
> > +        self.assertEqual(self.proto.runstate, Runstate.IDLE)
> > +        self.runstate_watcher = None
> > +
> > +    def tearDown(self):
> > +        self.assertEqual(self.proto.runstate, Runstate.IDLE)
> > +
> > +    async def _asyncSetUp(self):
> > +        pass
> > +
> > +    async def _asyncTearDown(self):
> > +        if self.runstate_watcher:
> > +            await self.runstate_watcher
> > +
> > +    def _asyncRunner(self, test_coroutine):
> > +        async def coroutine():
> > +            await self._asyncSetUp()
> > +            await test_coroutine
> > +            await self._asyncTearDown()
> > +
> > +        asyncio_run(coroutine(), debug=True)
> > +
> > +    # Definitions
> > +
> > +    # The states we expect a "bad" connect/accept attempt to transition
> through
> > +    BAD_CONNECTION_STATES = (
> > +        Runstate.CONNECTING,
> > +        Runstate.DISCONNECTING,
> > +        Runstate.IDLE,
> > +    )
> > +
> > +    # The states we expect a "good" session to transition through
> > +    GOOD_CONNECTION_STATES = (
> > +        Runstate.CONNECTING,
> > +        Runstate.RUNNING,
> > +        Runstate.DISCONNECTING,
> > +        Runstate.IDLE,
> > +    )
> > +
> > +    # Helpers
> > +
> > +    async def _watch_runstates(self, *states):
> > +        # This launches a task alongside most tests below to confirm
> that the
> > +        # sequence of runstate changes is exactly as anticipated.
> > +
> > +        async def _watcher():
> > +            for state in states:
> > +                new_state = await self.proto.runstate_changed()
> > +                self.assertEqual(
> > +                    new_state,
> > +                    state,
> > +                    msg=f"Expected state '{state.name}'",
> > +                )
> > +
> > +        self.runstate_watcher = create_task(_watcher())
> > +        # Kick the loop and force the task to block on the event.
> > +        await asyncio.sleep(0)
> > +
> > +
> > +class State(TestBase):
> > +
> > +    async def testSuperfluousDisconnect_(self):
> > +        await self._watch_runstates(
> > +            Runstate.DISCONNECTING,
> > +            Runstate.IDLE,
> > +        )
> > +        await self.proto.disconnect()
> > +
> > +    def testSuperfluousDisconnect(self):
> > +        self._asyncRunner(self.testSuperfluousDisconnect_())
> > +
> > +
> > +class Connect(TestBase):
> > +
> > +    async def _bad_connection(self, family: str):
> > +        assert family in ('INET', 'UNIX')
> > +
> > +        if family == 'INET':
> > +            await self.proto.connect(('127.0.0.1', 0))
> > +        elif family == 'UNIX':
> > +            await self.proto.connect('/dev/null')
> > +
> > +    async def _hanging_connection(self):
> > +        with jammed_socket() as addr:
> > +            await self.proto.connect(addr)
> > +
> > +    async def _bad_connection_test(self, family: str):
> > +        await self._watch_runstates(*self.BAD_CONNECTION_STATES)
> > +
> > +        with self.assertRaises(ConnectError) as context:
> > +            await self._bad_connection(family)
> > +
> > +        self.assertIsInstance(context.exception.exc, OSError)
> > +        self.assertEqual(
> > +            context.exception.error_message,
> > +            "Failed to establish connection"
> > +        )
> > +
> > +    def testBadINET(self):
> > +        self._asyncRunner(self._bad_connection_test('INET'))
> > +        # self.assertIsInstance(err.exc, ConnectionRefusedError)
> > +
> > +    def testBadUNIX(self):
> > +        self._asyncRunner(self._bad_connection_test('UNIX'))
> > +        # self.assertIsInstance(err.exc, ConnectionRefusedError)
> > +
> > +    async def testCancellation_(self):
> > +        # Note that accept() cannot be cancelled outright, as it isn't
> a task.
> > +        # However, we can wrap it in a task and cancel *that*.
> > +        await self._watch_runstates(*self.BAD_CONNECTION_STATES)
> > +        task = run_as_task(self._hanging_connection(),
> allow_cancellation=True)
> > +
> > +        state = await self.proto.runstate_changed()
> > +        self.assertEqual(state, Runstate.CONNECTING)
> > +
> > +        # This is insider baseball, but the connection attempt has
> > +        # yielded *just* before the actual connection attempt, so kick
> > +        # the loop to make sure it's truly wedged.
> > +        await asyncio.sleep(0)
> > +
> > +        task.cancel()
> > +        await task
> > +
> > +    def testCancellation(self):
> > +        self._asyncRunner(self.testCancellation_())
> > +
> > +    async def testTimeout_(self):
> > +        await self._watch_runstates(*self.BAD_CONNECTION_STATES)
> > +        task = run_as_task(self._hanging_connection())
> > +
> > +        # More insider baseball: to improve the speed of this test while
> > +        # guaranteeing that the connection even gets a chance to start,
> > +        # verify that the connection hangs *first*, then await the
> > +        # result of the task with a nearly-zero timeout.
> > +
> > +        state = await self.proto.runstate_changed()
> > +        self.assertEqual(state, Runstate.CONNECTING)
> > +        await asyncio.sleep(0)
> > +
> > +        with self.assertRaises(asyncio.TimeoutError):
> > +            await asyncio.wait_for(task, timeout=0)
> > +
> > +    def testTimeout(self):
> > +        self._asyncRunner(self.testTimeout_())
> > +
> > +    async def testRequire_(self):
> > +        await self._watch_runstates(*self.BAD_CONNECTION_STATES)
> > +        task = run_as_task(self._hanging_connection(),
> allow_cancellation=True)
> > +
> > +        state = await self.proto.runstate_changed()
> > +        self.assertEqual(state, Runstate.CONNECTING)
> > +
> > +        with self.assertRaises(StateError) as context:
> > +            await self._bad_connection('UNIX')
> > +
> > +        self.assertEqual(
> > +            context.exception.error_message,
> > +            "NullProtocol is currently connecting."
> > +        )
> > +        self.assertEqual(context.exception.state, Runstate.CONNECTING)
> > +        self.assertEqual(context.exception.required, Runstate.IDLE)
> > +
> > +        task.cancel()
> > +        await task
> > +
> > +    def testRequire(self):
> > +        self._asyncRunner(self.testRequire_())
> > +
> > +    async def testImplicitRunstateInit_(self):
> > +        # This tests what happens if we do not wait on the
> > +        # runstate until AFTER we connect, i.e., connect()/accept()
> > +        # themselves initialize the runstate event. All of the above
> > +        # tests force the initialization by waiting on the runstate
> > +        # *first*.
> > +        task = run_as_task(self._hanging_connection(),
> allow_cancellation=True)
> > +
> > +        # Kick the loop to coerce the state change
> > +        await asyncio.sleep(0)
> > +        assert self.proto.runstate == Runstate.CONNECTING
> > +
> > +        # We already missed the transition to CONNECTING
> > +        await self._watch_runstates(Runstate.DISCONNECTING,
> Runstate.IDLE)
> > +
> > +        task.cancel()
> > +        await task
> > +
> > +    def testImplicitRunstateInit(self):
> > +        self._asyncRunner(self.testImplicitRunstateInit_())
> > +
> > +
> > +class Accept(Connect):
> > +
> > +    async def _bad_connection(self, family: str):
> > +        assert family in ('INET', 'UNIX')
> > +
> > +        if family == 'INET':
> > +            await self.proto.accept(('example.com', 1))
> > +        elif family == 'UNIX':
> > +            await self.proto.accept('/dev/null')
> > +
> > +    async def _hanging_connection(self):
> > +        with TemporaryDirectory(suffix='.aqmp') as tmpdir:
> > +            sock = os.path.join(tmpdir, type(self.proto).__name__ +
> ".sock")
> > +            await self.proto.accept(sock)
> > +
> > +
> > +class FakeSession(TestBase):
> > +
> > +    def setUp(self):
> > +        super().setUp()
> > +        self.proto.fake_session = True
> > +
> > +    async def _asyncSetUp(self):
> > +        await super()._asyncSetUp()
> > +        await self._watch_runstates(*self.GOOD_CONNECTION_STATES)
> > +
> > +    async def _asyncTearDown(self):
> > +        await self.proto.disconnect()
> > +        await super()._asyncTearDown()
> > +
> > +    ####
> > +
> > +    async def testFakeConnect_(self):
> > +        await self.proto.connect('/not/a/real/path')
> > +        self.assertEqual(self.proto.runstate, Runstate.RUNNING)
> > +
> > +    def testFakeConnect(self):
> > +        """Test the full state lifecycle (via connect) with a no-op
> session."""
> > +        self._asyncRunner(self.testFakeConnect_())
> > +
> > +    async def testFakeAccept_(self):
> > +        await self.proto.accept('/not/a/real/path')
> > +        self.assertEqual(self.proto.runstate, Runstate.RUNNING)
> > +
> > +    def testFakeAccept(self):
> > +        """Test the full state lifecycle (via accept) with a no-op
> session."""
> > +        self._asyncRunner(self.testFakeAccept_())
> > +
> > +    async def testFakeRecv_(self):
> > +        await self.proto.accept('/not/a/real/path')
> > +
> > +        logname = self.proto.logger.name
> > +        with self.assertLogs(logname, level='DEBUG') as context:
> > +            self.proto.trigger_input.set()
> > +            self.proto.trigger_input.clear()
> > +            await asyncio.sleep(0)  # Kick reader.
> > +
> > +        self.assertEqual(
> > +            context.output,
> > +            [f"DEBUG:{logname}:<-- None"],
> > +        )
> > +
> > +    def testFakeRecv(self):
> > +        """Test receiving a fake/null message."""
> > +        self._asyncRunner(self.testFakeRecv_())
> > +
> > +    async def testFakeSend_(self):
> > +        await self.proto.accept('/not/a/real/path')
> > +
> > +        logname = self.proto.logger.name
> > +        with self.assertLogs(logname, level='DEBUG') as context:
> > +            # Cheat: Send a Null message to nobody.
> > +            await self.proto.send_msg()
> > +            # Kick writer; awaiting on a queue.put isn't sufficient to
> yield.
> > +            await asyncio.sleep(0)
> > +
> > +        self.assertEqual(
> > +            context.output,
> > +            [f"DEBUG:{logname}:--> None"],
> > +        )
> > +
> > +    def testFakeSend(self):
> > +        """Test sending a fake/null message."""
> > +        self._asyncRunner(self.testFakeSend_())
> > +
> > +    async def _prod_session_api(
> > +            self,
> > +            current_state: Runstate,
> > +            error_message: str,
> > +            accept: bool = True
> > +    ):
> > +        with self.assertRaises(StateError) as context:
> > +            if accept:
> > +                await self.proto.accept('/not/a/real/path')
> > +            else:
> > +                await self.proto.connect('/not/a/real/path')
> > +
> > +        self.assertEqual(context.exception.error_message, error_message)
> > +        self.assertEqual(context.exception.state, current_state)
> > +        self.assertEqual(context.exception.required, Runstate.IDLE)
> > +
> > +    async def testAcceptRequireRunning_(self):
> > +        await self.proto.accept('/not/a/real/path')
> > +
> > +        await self._prod_session_api(
> > +            Runstate.RUNNING,
> > +            "NullProtocol is already connected and running.",
> > +            accept=True,
> > +        )
> > +
> > +    def testAcceptRequireRunning(self):
> > +        """Test that accept() cannot be called when Runstate=RUNNING"""
> > +        self._asyncRunner(self.testAcceptRequireRunning_())
> > +
> > +    async def testConnectRequireRunning_(self):
> > +        await self.proto.accept('/not/a/real/path')
> > +
> > +        await self._prod_session_api(
> > +            Runstate.RUNNING,
> > +            "NullProtocol is already connected and running.",
> > +            accept=False,
> > +        )
> > +
> > +    def testConnectRequireRunning(self):
> > +        """Test that connect() cannot be called when Runstate=RUNNING"""
> > +        self._asyncRunner(self.testConnectRequireRunning_())
> > +
> > +    async def testAcceptRequireDisconnecting_(self):
> > +        await self.proto.accept('/not/a/real/path')
> > +
> > +        # Cheat: force a disconnect.
> > +        await self.proto.simulate_disconnect()
> > +
> > +        await self._prod_session_api(
> > +            Runstate.DISCONNECTING,
> > +            ("NullProtocol is disconnecting."
> > +             " Call disconnect() to return to IDLE state."),
> > +            accept=True,
> > +        )
> > +
> > +    def testAcceptRequireDisconnecting(self):
> > +        """Test that accept() cannot be called when
> Runstate=DISCONNECTING"""
> > +        self._asyncRunner(self.testAcceptRequireDisconnecting_())
> > +
> > +    async def testConnectRequireDisconnecting_(self):
> > +        await self.proto.accept('/not/a/real/path')
> > +
> > +        # Cheat: force a disconnect.
> > +        await self.proto.simulate_disconnect()
> > +
> > +        await self._prod_session_api(
> > +            Runstate.DISCONNECTING,
> > +            ("NullProtocol is disconnecting."
> > +             " Call disconnect() to return to IDLE state."),
> > +            accept=False,
> > +        )
> > +
> > +    def testConnectRequireDisconnecting(self):
> > +        """Test that connect() cannot be called when
> Runstate=DISCONNECTING"""
> > +        self._asyncRunner(self.testConnectRequireDisconnecting_())
> > --
> > 2.31.1
>
> Besides that, I just would like to bring to the table that Avocado has
> now a basic support for coroutines as tests that might help here. IIUC,
> some of the boilerplate code (and duplicated methods) could be removed
> with this:
>
> https://github.com/avocado-framework/avocado/pull/4788
>
> In any case, I understand if the latest version is not an option here,
> so:
>
>
It's an option, it's time that is the harsh master.


> Reviewed-by: Beraldo Leal <bleal@redhat.com>
>
>
Thanks! I updated a few bits and pieces and added the other items to a list
of things to do "later".


> Thanks,
> --
> Beraldo
>
>
--js

[-- Attachment #2: Type: text/html, Size: 29032 bytes --]

^ permalink raw reply	[flat|nested] 36+ messages in thread

* Re: [PATCH v2 02/24] python/aqmp: add error classes
  2021-07-17  0:32 ` [PATCH v2 02/24] python/aqmp: add error classes John Snow
@ 2021-08-03 16:01   ` Eric Blake
  2021-08-03 17:34     ` John Snow
  0 siblings, 1 reply; 36+ messages in thread
From: Eric Blake @ 2021-08-03 16:01 UTC (permalink / raw)
  To: John Snow
  Cc: Eduardo Habkost, Stefan Hajnoczi, Markus Armbruster,
	Wainer dos Santos Moschetta, qemu-devel, Niteesh G . S .,
	Willian Rampazzo, Cleber Rosa

On Fri, Jul 16, 2021 at 08:32:31PM -0400, John Snow wrote:
> Signed-off-by: John Snow <jsnow@redhat.com>
> ---
>  python/qemu/aqmp/__init__.py |  4 +++
>  python/qemu/aqmp/error.py    | 50 ++++++++++++++++++++++++++++++++++++
>  2 files changed, 54 insertions(+)
>  create mode 100644 python/qemu/aqmp/error.py

> +++ b/python/qemu/aqmp/error.py
> @@ -0,0 +1,50 @@

> +
> +class ProtocolError(AQMPError):
> +    """
> +    Abstract error class for protocol failures.
> +
> +    Semantically, these errors are generally the fault of either the
> +    protocol server or as a result of a bug in this this library.

duplicate 'this'

-- 
Eric Blake, Principal Software Engineer
Red Hat, Inc.           +1-919-301-3266
Virtualization:  qemu.org | libvirt.org



^ permalink raw reply	[flat|nested] 36+ messages in thread

* Re: [PATCH v2 02/24] python/aqmp: add error classes
  2021-08-03 16:01   ` Eric Blake
@ 2021-08-03 17:34     ` John Snow
  2021-08-03 17:40       ` Eric Blake
  0 siblings, 1 reply; 36+ messages in thread
From: John Snow @ 2021-08-03 17:34 UTC (permalink / raw)
  To: Eric Blake
  Cc: Eduardo Habkost, Stefan Hajnoczi, Markus Armbruster,
	Wainer dos Santos Moschetta, qemu-devel, Niteesh G . S .,
	Willian Rampazzo, Cleber Rosa

[-- Attachment #1: Type: text/plain, Size: 1115 bytes --]

Got it. I was *just* about to send a refreshed version of this patchset
because I found a new bug while on my way to making a sync compatibility
shim for iotests -- Do you have more feedback cooking, or should I hit the
send button?

--js

On Tue, Aug 3, 2021 at 12:02 PM Eric Blake <eblake@redhat.com> wrote:

> On Fri, Jul 16, 2021 at 08:32:31PM -0400, John Snow wrote:
> > Signed-off-by: John Snow <jsnow@redhat.com>
> > ---
> >  python/qemu/aqmp/__init__.py |  4 +++
> >  python/qemu/aqmp/error.py    | 50 ++++++++++++++++++++++++++++++++++++
> >  2 files changed, 54 insertions(+)
> >  create mode 100644 python/qemu/aqmp/error.py
>
> > +++ b/python/qemu/aqmp/error.py
> > @@ -0,0 +1,50 @@
>
> > +
> > +class ProtocolError(AQMPError):
> > +    """
> > +    Abstract error class for protocol failures.
> > +
> > +    Semantically, these errors are generally the fault of either the
> > +    protocol server or as a result of a bug in this this library.
>
> duplicate 'this'
>
> --
> Eric Blake, Principal Software Engineer
> Red Hat, Inc.           +1-919-301-3266
> Virtualization:  qemu.org | libvirt.org
>
>

[-- Attachment #2: Type: text/html, Size: 1792 bytes --]

^ permalink raw reply	[flat|nested] 36+ messages in thread

* Re: [PATCH v2 02/24] python/aqmp: add error classes
  2021-08-03 17:34     ` John Snow
@ 2021-08-03 17:40       ` Eric Blake
  2021-08-03 18:01         ` John Snow
  0 siblings, 1 reply; 36+ messages in thread
From: Eric Blake @ 2021-08-03 17:40 UTC (permalink / raw)
  To: John Snow
  Cc: Eduardo Habkost, Stefan Hajnoczi, Markus Armbruster,
	Wainer dos Santos Moschetta, qemu-devel, Niteesh G . S .,
	Willian Rampazzo, Cleber Rosa

On Tue, Aug 03, 2021 at 01:34:32PM -0400, John Snow wrote:
> Got it. I was *just* about to send a refreshed version of this patchset
> because I found a new bug while on my way to making a sync compatibility
> shim for iotests -- Do you have more feedback cooking, or should I hit the
> send button?

I spotted another typo while browsing the web page (disconnect() "If
there were was an exception"), but I'm fine if you re-send, and I'll
resume looking at the series on the updated v3.  For 1-6, you can add:

Reviewed-by: Eric Blake <eblake@redhat.com>

although my python is weak enough that you may want another set of
eyes as well.

-- 
Eric Blake, Principal Software Engineer
Red Hat, Inc.           +1-919-301-3266
Virtualization:  qemu.org | libvirt.org



^ permalink raw reply	[flat|nested] 36+ messages in thread

* Re: [PATCH v2 02/24] python/aqmp: add error classes
  2021-08-03 17:40       ` Eric Blake
@ 2021-08-03 18:01         ` John Snow
  0 siblings, 0 replies; 36+ messages in thread
From: John Snow @ 2021-08-03 18:01 UTC (permalink / raw)
  To: Eric Blake
  Cc: Eduardo Habkost, Stefan Hajnoczi, Markus Armbruster,
	Wainer dos Santos Moschetta, qemu-devel, Niteesh G . S .,
	Willian Rampazzo, Cleber Rosa

[-- Attachment #1: Type: text/plain, Size: 1235 bytes --]

On Tue, Aug 3, 2021 at 1:40 PM Eric Blake <eblake@redhat.com> wrote:

> On Tue, Aug 03, 2021 at 01:34:32PM -0400, John Snow wrote:
> > Got it. I was *just* about to send a refreshed version of this patchset
> > because I found a new bug while on my way to making a sync compatibility
> > shim for iotests -- Do you have more feedback cooking, or should I hit
> the
> > send button?
>
> I spotted another typo while browsing the web page (disconnect() "If
> there were was an exception"), but I'm fine if you re-send, and I'll
>

Thanks for spotting that. Your proofreading ability is admired and
appreciated :)


> resume looking at the series on the updated v3.  For 1-6, you can add:
>
> Reviewed-by: Eric Blake <eblake@redhat.com>
>
> although my python is weak enough that you may want another set of
> eyes as well.
>
>
Thanks! Review on overall design, documentation, layout, organization and
presentation is plenty helpful even if you aren't necessarily eagle-eyed on
minutiae of Python. (Maybe especially if?)
I've written quite a few tests and have used this library to run our entire
iotests suite, plus Niteesh has been banging the bits pretty hard while
working on aqmp-shell, so I am not too fearful of mechanical errors.

[-- Attachment #2: Type: text/html, Size: 1888 bytes --]

^ permalink raw reply	[flat|nested] 36+ messages in thread

end of thread, other threads:[~2021-08-03 18:02 UTC | newest]

Thread overview: 36+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-07-17  0:32 [PATCH v2 00/24] python: introduce Asynchronous QMP package John Snow
2021-07-17  0:32 ` [PATCH v2 01/24] python/aqmp: add asynchronous QMP (AQMP) subpackage John Snow
2021-07-17  0:32 ` [PATCH v2 02/24] python/aqmp: add error classes John Snow
2021-08-03 16:01   ` Eric Blake
2021-08-03 17:34     ` John Snow
2021-08-03 17:40       ` Eric Blake
2021-08-03 18:01         ` John Snow
2021-07-17  0:32 ` [PATCH v2 03/24] python/pylint: Add exception for TypeVar names ('T') John Snow
2021-07-17  0:32 ` [PATCH v2 04/24] python/aqmp: add asyncio compatibility wrappers John Snow
2021-07-17  0:32 ` [PATCH v2 05/24] python/aqmp: add generic async message-based protocol support John Snow
2021-07-17  0:32 ` [PATCH v2 06/24] python/aqmp: add runstate state machine to AsyncProtocol John Snow
2021-07-17  0:32 ` [PATCH v2 07/24] python/aqmp: Add logging utility helpers John Snow
2021-07-17  0:32 ` [PATCH v2 08/24] python/aqmp: add logging to AsyncProtocol John Snow
2021-07-17  0:32 ` [PATCH v2 09/24] python/aqmp: add AsyncProtocol.accept() method John Snow
2021-07-17  0:32 ` [PATCH v2 10/24] python/aqmp: add configurable read buffer limit John Snow
2021-07-17  0:32 ` [PATCH v2 11/24] python/aqmp: add _cb_inbound and _cb_inbound logging hooks John Snow
2021-07-20 18:51   ` Niteesh G. S.
2021-07-20 19:13     ` John Snow
2021-07-17  0:32 ` [PATCH v2 12/24] python/aqmp: add AsyncProtocol._readline() method John Snow
2021-07-17  0:32 ` [PATCH v2 13/24] python/aqmp: add QMP Message format John Snow
2021-07-17  0:32 ` [PATCH v2 14/24] python/aqmp: add well-known QMP object models John Snow
2021-07-17  0:32 ` [PATCH v2 15/24] python/aqmp: add QMP event support John Snow
2021-07-17  0:32 ` [PATCH v2 16/24] python/pylint: disable too-many-function-args John Snow
2021-07-17  0:32 ` [PATCH v2 17/24] python/aqmp: add QMP protocol support John Snow
2021-07-17  0:32 ` [PATCH v2 18/24] python/pylint: disable no-member check John Snow
2021-07-17  0:32 ` [PATCH v2 19/24] python/aqmp: Add message routing to QMP protocol John Snow
2021-07-17  0:32 ` [PATCH v2 20/24] python/aqmp: add execute() interfaces John Snow
2021-07-17  0:32 ` [PATCH v2 21/24] python/aqmp: add _raw() execution interface John Snow
2021-07-17  0:32 ` [PATCH v2 22/24] python/aqmp: add asyncio_run compatibility wrapper John Snow
2021-07-17  0:32 ` [PATCH v2 23/24] python/aqmp: add scary message John Snow
2021-07-17  0:32 ` [PATCH v2 24/24] python/aqmp: add AsyncProtocol unit tests John Snow
2021-07-20 20:34   ` Beraldo Leal
2021-08-02 17:24     ` John Snow
2021-07-21 17:03 ` [PATCH v2 00/24] python: introduce Asynchronous QMP package Niteesh G. S.
     [not found]   ` <CAFn=p-YciuuRySs1F82ZyP_QGed=fbRZmzH3v7VNtdV-xM-XaA@mail.gmail.com>
     [not found]     ` <CAN6ztm-LKWMZTURfE_q0bWpoXVKGMoqmm2jj4_CTb_kj-kEjYg@mail.gmail.com>
2021-07-21 19:55       ` John Snow
2021-07-21 20:02         ` Niteesh G. S.

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.