All of lore.kernel.org
 help / color / mirror / Atom feed
From: John Snow <jsnow@redhat.com>
To: qemu-devel@nongnu.org
Cc: "Peter Maydell" <peter.maydell@linaro.org>,
	"Thomas Huth" <thuth@redhat.com>,
	"Daniel Berrange" <berrange@redhat.com>,
	"Eduardo Habkost" <ehabkost@redhat.com>,
	"Alex Bennée" <alex.bennee@linaro.org>,
	"Markus Armbruster" <armbru@redhat.com>,
	"Wainer dos Santos Moschetta" <wainersm@redhat.com>,
	"Philippe Mathieu-Daudé" <f4bug@amsat.org>,
	"Willian Rampazzo" <willianr@redhat.com>,
	"Cleber Rosa" <crosa@redhat.com>, "John Snow" <jsnow@redhat.com>
Subject: [PULL 19/32] python/aqmp: Add message routing to QMP protocol
Date: Mon, 27 Sep 2021 15:25:00 -0400	[thread overview]
Message-ID: <20210927192513.744199-20-jsnow@redhat.com> (raw)
In-Reply-To: <20210927192513.744199-1-jsnow@redhat.com>

Add the ability to handle and route messages in qmp_protocol.py. The
interface for actually sending anything still isn't added until next
commit.

Signed-off-by: John Snow <jsnow@redhat.com>
Message-id: 20210915162955.333025-20-jsnow@redhat.com
Signed-off-by: John Snow <jsnow@redhat.com>
---
 python/qemu/aqmp/qmp_client.py | 122 ++++++++++++++++++++++++++++++++-
 1 file changed, 120 insertions(+), 2 deletions(-)

diff --git a/python/qemu/aqmp/qmp_client.py b/python/qemu/aqmp/qmp_client.py
index 000ff59c7a7..fa0cc7c5ae5 100644
--- a/python/qemu/aqmp/qmp_client.py
+++ b/python/qemu/aqmp/qmp_client.py
@@ -7,15 +7,19 @@
 accept an incoming connection from that server.
 """
 
+# The import workarounds here are fixed in the next commit.
+import asyncio  # pylint: disable=unused-import # noqa
 import logging
 from typing import (
     Dict,
     List,
     Mapping,
     Optional,
+    Union,
+    cast,
 )
 
-from .error import ProtocolError
+from .error import AQMPError, ProtocolError
 from .events import Events
 from .message import Message
 from .models import Greeting
@@ -61,6 +65,53 @@ class NegotiationError(_WrappedProtocolError):
     """
 
 
+class ExecInterruptedError(AQMPError):
+    """
+    Exception raised when an RPC is interrupted.
+
+    This error is raised when an execute() statement could not be
+    completed.  This can occur because the connection itself was
+    terminated before a reply was received.
+
+    The true cause of the interruption will be available via `disconnect()`.
+    """
+
+
+class _MsgProtocolError(ProtocolError):
+    """
+    Abstract error class for protocol errors that have a `Message` object.
+
+    This Exception class is used for protocol errors where the `Message`
+    was mechanically understood, but was found to be inappropriate or
+    malformed.
+
+    :param error_message: Human-readable string describing the error.
+    :param msg: The QMP `Message` that caused the error.
+    """
+    def __init__(self, error_message: str, msg: Message):
+        super().__init__(error_message)
+        #: The received `Message` that caused the error.
+        self.msg: Message = msg
+
+    def __str__(self) -> str:
+        return "\n".join([
+            super().__str__(),
+            f"  Message was: {str(self.msg)}\n",
+        ])
+
+
+class ServerParseError(_MsgProtocolError):
+    """
+    The Server sent a `Message` indicating parsing failure.
+
+    i.e. A reply has arrived from the server, but it is missing the "ID"
+    field, indicating a parsing error.
+
+    :param error_message: Human-readable string describing the error.
+    :param msg: The QMP `Message` that caused the error.
+    """
+
+
 class QMPClient(AsyncProtocol[Message], Events):
     """
     Implements a QMP client connection.
@@ -106,6 +157,9 @@ async def run(self, address='/tmp/qemu.socket'):
     # Read buffer limit; large enough to accept query-qmp-schema
     _limit = (256 * 1024)
 
+    # Type alias for pending execute() result items
+    _PendingT = Union[Message, ExecInterruptedError]
+
     def __init__(self, name: Optional[str] = None) -> None:
         super().__init__(name)
         Events.__init__(self)
@@ -120,6 +174,12 @@ def __init__(self, name: Optional[str] = None) -> None:
         # Cached Greeting, if one was awaited.
         self._greeting: Optional[Greeting] = None
 
+        # Incoming RPC reply messages.
+        self._pending: Dict[
+            Union[str, None],
+            'asyncio.Queue[QMPClient._PendingT]'
+        ] = {}
+
     @upper_half
     async def _establish_session(self) -> None:
         """
@@ -132,6 +192,9 @@ async def _establish_session(self) -> None:
         :raise EOFError: When the server unexpectedly hangs up.
         :raise OSError: For underlying stream errors.
         """
+        self._greeting = None
+        self._pending = {}
+
         if self.await_greeting or self.negotiate:
             self._greeting = await self._get_greeting()
 
@@ -203,10 +266,33 @@ async def _negotiate(self) -> None:
             self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
             raise
 
+    @bottom_half
+    async def _bh_disconnect(self) -> None:
+        try:
+            await super()._bh_disconnect()
+        finally:
+            if self._pending:
+                self.logger.debug("Cancelling pending executions")
+            keys = self._pending.keys()
+            for key in keys:
+                self.logger.debug("Cancelling execution '%s'", key)
+                self._pending[key].put_nowait(
+                    ExecInterruptedError("Disconnected")
+                )
+
+            self.logger.debug("QMP Disconnected.")
+
+    @upper_half
+    def _cleanup(self) -> None:
+        super()._cleanup()
+        assert not self._pending
+
     @bottom_half
     async def _on_message(self, msg: Message) -> None:
         """
         Add an incoming message to the appropriate queue/handler.
+
+        :raise ServerParseError: When Message indicates server parse failure.
         """
         # Incoming messages are not fully parsed/validated here;
         # do only light peeking to know how to route the messages.
@@ -216,7 +302,39 @@ async def _on_message(self, msg: Message) -> None:
             return
 
         # Below, we assume everything left is an execute/exec-oob response.
-        # ... Which we'll implement in the next commit!
+
+        exec_id = cast(Optional[str], msg.get('id'))
+
+        if exec_id in self._pending:
+            await self._pending[exec_id].put(msg)
+            return
+
+        # We have a message we can't route back to a caller.
+
+        is_error = 'error' in msg
+        has_id = 'id' in msg
+
+        if is_error and not has_id:
+            # This is very likely a server parsing error.
+            # It doesn't inherently belong to any pending execution.
+            # Instead of performing clever recovery, just terminate.
+            # See "NOTE" in qmp-spec.txt, section 2.4.2
+            raise ServerParseError(
+                ("Server sent an error response without an ID, "
+                 "but there are no ID-less executions pending. "
+                 "Assuming this is a server parser failure."),
+                msg
+            )
+
+        # qmp-spec.txt, section 2.4:
+        # 'Clients should drop all the responses
+        # that have an unknown "id" field.'
+        self.logger.log(
+            logging.ERROR if is_error else logging.WARNING,
+            "Unknown ID '%s', message dropped.",
+            exec_id,
+        )
+        self.logger.debug("Unroutable message: %s", str(msg))
 
     @upper_half
     @bottom_half
-- 
2.31.1



  parent reply	other threads:[~2021-09-27 19:50 UTC|newest]

Thread overview: 34+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-09-27 19:24 [PULL 00/32] Python patches John Snow
2021-09-27 19:24 ` [PULL 01/32] python/aqmp: add asynchronous QMP (AQMP) subpackage John Snow
2021-09-27 19:24 ` [PULL 02/32] python/aqmp: add error classes John Snow
2021-09-27 19:24 ` [PULL 03/32] python/pylint: Add exception for TypeVar names ('T') John Snow
2021-09-27 19:24 ` [PULL 04/32] python/aqmp: add asyncio compatibility wrappers John Snow
2021-09-27 19:24 ` [PULL 05/32] python/aqmp: add generic async message-based protocol support John Snow
2021-09-27 19:24 ` [PULL 06/32] python/aqmp: add runstate state machine to AsyncProtocol John Snow
2021-09-27 19:24 ` [PULL 07/32] python/aqmp: Add logging utility helpers John Snow
2021-09-27 19:24 ` [PULL 08/32] python/aqmp: add logging to AsyncProtocol John Snow
2021-09-27 19:24 ` [PULL 09/32] python/aqmp: add AsyncProtocol.accept() method John Snow
2021-09-27 19:24 ` [PULL 10/32] python/aqmp: add configurable read buffer limit John Snow
2021-09-27 19:24 ` [PULL 11/32] python/aqmp: add _cb_inbound and _cb_outbound logging hooks John Snow
2021-09-27 19:24 ` [PULL 12/32] python/aqmp: add AsyncProtocol._readline() method John Snow
2021-09-27 19:24 ` [PULL 13/32] python/aqmp: add QMP Message format John Snow
2021-09-27 19:24 ` [PULL 14/32] python/aqmp: add well-known QMP object models John Snow
2021-09-27 19:24 ` [PULL 15/32] python/aqmp: add QMP event support John Snow
2021-09-27 19:24 ` [PULL 16/32] python/pylint: disable too-many-function-args John Snow
2021-09-27 19:24 ` [PULL 17/32] python/aqmp: add QMP protocol support John Snow
2021-09-27 19:24 ` [PULL 18/32] python/pylint: disable no-member check John Snow
2021-09-27 19:25 ` John Snow [this message]
2021-09-27 19:25 ` [PULL 20/32] python/aqmp: add execute() interfaces John Snow
2021-09-27 19:25 ` [PULL 21/32] python/aqmp: add _raw() execution interface John Snow
2021-09-27 19:25 ` [PULL 22/32] python/aqmp: add asyncio_run compatibility wrapper John Snow
2021-09-27 19:25 ` [PULL 23/32] python/aqmp: add scary message John Snow
2021-09-27 19:25 ` [PULL 24/32] python: bump avocado to v90.0 John Snow
2021-09-27 19:25 ` [PULL 25/32] python/aqmp: add AsyncProtocol unit tests John Snow
2021-09-27 19:25 ` [PULL 26/32] python/aqmp: add LineProtocol tests John Snow
2021-09-27 19:25 ` [PULL 27/32] python/aqmp: Add Coverage.py support John Snow
2021-09-27 19:25 ` [PULL 28/32] python: Add dependencies for AQMP TUI John Snow
2021-09-27 19:25 ` [PULL 29/32] python/aqmp-tui: Add " John Snow
2021-09-27 19:25 ` [PULL 30/32] python: Add entry point for aqmp-tui John Snow
2021-09-27 19:25 ` [PULL 31/32] python: add optional pygments dependency John Snow
2021-09-27 19:25 ` [PULL 32/32] python/aqmp-tui: Add syntax highlighting John Snow
2021-09-28 14:37 ` [PULL 00/32] Python patches Peter Maydell

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210927192513.744199-20-jsnow@redhat.com \
    --to=jsnow@redhat.com \
    --cc=alex.bennee@linaro.org \
    --cc=armbru@redhat.com \
    --cc=berrange@redhat.com \
    --cc=crosa@redhat.com \
    --cc=ehabkost@redhat.com \
    --cc=f4bug@amsat.org \
    --cc=peter.maydell@linaro.org \
    --cc=qemu-devel@nongnu.org \
    --cc=thuth@redhat.com \
    --cc=wainersm@redhat.com \
    --cc=willianr@redhat.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.