All of lore.kernel.org
 help / color / mirror / Atom feed
From: John Snow <jsnow@redhat.com>
To: qemu-devel@nongnu.org
Cc: Eduardo Habkost <ehabkost@redhat.com>,
	Eric Blake <eblake@redhat.com>,
	Stefan Hajnoczi <stefanha@redhat.com>,
	Markus Armbruster <armbru@redhat.com>,
	Wainer dos Santos Moschetta <wainersm@redhat.com>,
	"Niteesh G . S ." <niteesh.gs@gmail.com>,
	Willian Rampazzo <wrampazz@redhat.com>,
	Cleber Rosa <crosa@redhat.com>, John Snow <jsnow@redhat.com>
Subject: [PATCH 16/20] python/aqmp: Add message routing to QMP protocol
Date: Thu,  1 Jul 2021 00:13:09 -0400	[thread overview]
Message-ID: <20210701041313.1696009-17-jsnow@redhat.com> (raw)
In-Reply-To: <20210701041313.1696009-1-jsnow@redhat.com>

Add the ability to handle and route messages in qmp_protocol.py. The
interface for actually sending anything still isn't added until next
commit.

Signed-off-by: John Snow <jsnow@redhat.com>
---
 python/qemu/aqmp/qmp_protocol.py | 98 +++++++++++++++++++++++++++++++-
 1 file changed, 96 insertions(+), 2 deletions(-)

diff --git a/python/qemu/aqmp/qmp_protocol.py b/python/qemu/aqmp/qmp_protocol.py
index 5872bfc017..04c8a8cb54 100644
--- a/python/qemu/aqmp/qmp_protocol.py
+++ b/python/qemu/aqmp/qmp_protocol.py
@@ -7,15 +7,18 @@
 incoming connection from that server.
 """
 
+# The import workarounds here are fixed in the next commit.
+import asyncio  # pylint: disable=unused-import # noqa
 import logging
 from typing import (
     Dict,
     List,
     Mapping,
     Optional,
+    Union,
 )
 
-from .error import ProtocolError
+from .error import AQMPError, ProtocolError
 from .events import Events
 from .message import Message
 from .models import Greeting
@@ -56,6 +59,53 @@ class NegotiationError(_WrappedProtocolError):
     """
 
 
+class ExecInterruptedError(AQMPError):
+    """
+    Exception raised when an RPC is interrupted.
+
+    This error is raised when an execute() statement could not be
+    completed.  This can occur because the connection itself was
+    terminated before a reply was received.
+
+    The true cause of the interruption will be available via `disconnect()`.
+    """
+
+
+class _MsgProtocolError(ProtocolError):
+    """
+    Abstract error class for protocol errors that have a `Message` object.
+
+    This Exception class is used for protocol errors where the `Message`
+    was mechanically understood, but was found to be inappropriate or
+    malformed.
+
+    :param error_message: Human-readable string describing the error.
+    :param msg: The QMP `Message` that caused the error.
+    """
+    def __init__(self, error_message: str, msg: Message):
+        super().__init__(error_message)
+        #: The received `Message` that caused the error.
+        self.msg: Message = msg
+
+    def __str__(self) -> str:
+        return "\n".join([
+            super().__str__(),
+            f"  Message was: {str(self.msg)}\n",
+        ])
+
+
+class ServerParseError(_MsgProtocolError):
+    """
+    The Server sent a `Message` indicating parsing failure.
+
+    i.e. A reply has arrived from the server, but it is missing the "ID"
+    field, indicating a parsing error.
+
+    :param error_message: Human-readable string describing the error.
+    :param msg: The QMP `Message` that caused the error.
+    """
+
+
 class QMP(AsyncProtocol[Message], Events):
     """
     Implements a QMP client connection.
@@ -98,6 +148,9 @@ async def run(self, address='/tmp/qemu.socket'):
     #: Logger object used for debugging messages.
     logger = logging.getLogger(__name__)
 
+    # Type alias for pending execute() result items
+    _PendingT = Union[Message, ExecInterruptedError]
+
     def __init__(self, name: Optional[str] = None) -> None:
         super().__init__(name)
         Events.__init__(self)
@@ -112,6 +165,9 @@ def __init__(self, name: Optional[str] = None) -> None:
         # Cached Greeting, if one was awaited.
         self._greeting: Optional[Greeting] = None
 
+        # Incoming RPC reply messages
+        self._pending: Dict[str, 'asyncio.Queue[QMP._PendingT]'] = {}
+
     @upper_half
     async def _begin_new_session(self) -> None:
         """
@@ -191,10 +247,27 @@ async def _negotiate(self) -> None:
             self.logger.error("%s:\n%s\n", emsg, pretty_traceback())
             raise
 
+    @bottom_half
+    async def _bh_disconnect(self, force: bool = False) -> None:
+        await super()._bh_disconnect(force)
+
+        if self._pending:
+            self.logger.debug("Cancelling pending executions")
+        keys = self._pending.keys()
+        for key in keys:
+            self.logger.debug("Cancelling execution '%s'", key)
+            self._pending[key].put_nowait(
+                ExecInterruptedError("Disconnected")
+            )
+
+        self.logger.debug("QMP Disconnected.")
+
     @bottom_half
     async def _on_message(self, msg: Message) -> None:
         """
         Add an incoming message to the appropriate queue/handler.
+
+        :raise ServerParseError: When Message has no 'event' nor 'id' member
         """
         # Incoming messages are not fully parsed/validated here;
         # do only light peeking to know how to route the messages.
@@ -204,7 +277,27 @@ async def _on_message(self, msg: Message) -> None:
             return
 
         # Below, we assume everything left is an execute/exec-oob response.
-        # ... Which we'll implement in the next commit!
+
+        if 'id' not in msg:
+            # This is (very likely) a server parsing error.
+            # It doesn't inherently belong to any pending execution.
+            # Instead of performing clever recovery, just terminate.
+            # See "NOTE" in qmp-spec.txt, section 2.4.2
+            raise ServerParseError("Server sent a message without an ID,"
+                                   " indicating parse failure.", msg)
+
+        assert 'id' in msg
+        exec_id = str(msg['id'])
+
+        if exec_id not in self._pending:
+            # qmp-spec.txt, section 2.4:
+            # 'Clients should drop all the responses
+            # that have an unknown "id" field.'
+            self.logger.warning("Unknown ID '%s', message dropped.", exec_id)
+            self.logger.debug("Unroutable message: %s", str(msg))
+            return
+
+        await self._pending[exec_id].put(msg)
 
     @upper_half
     @bottom_half
@@ -237,6 +330,7 @@ def _do_send(self, msg: Message) -> None:
     def _cleanup(self) -> None:
         super()._cleanup()
         self._greeting = None
+        assert not self._pending
 
     @classmethod
     def make_execute_msg(cls, cmd: str,
-- 
2.31.1



  parent reply	other threads:[~2021-07-01  4:23 UTC|newest]

Thread overview: 25+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-07-01  4:12 [PATCH 00/20] python: introduce Asynchronous QMP package John Snow
2021-07-01  4:12 ` [PATCH 01/20] python/pylint: Add exception for TypeVar names ('T') John Snow
2021-07-01  4:12 ` [PATCH 02/20] python/pylint: disable too-many-function-args John Snow
2021-07-01  4:12 ` [PATCH 03/20] python/aqmp: add asynchronous QMP (AQMP) subpackage John Snow
2021-07-01  4:12 ` [PATCH 04/20] python/aqmp: add error classes John Snow
2021-07-01  4:12 ` [PATCH 05/20] python/aqmp: add asyncio compatibility wrappers John Snow
2021-07-01  4:12 ` [PATCH 06/20] python/aqmp: add generic async message-based protocol support John Snow
2021-07-01  4:13 ` [PATCH 07/20] python/aqmp: add runstate state machine to AsyncProtocol John Snow
2021-07-01  4:13 ` [PATCH 08/20] python/aqmp: add logging " John Snow
2021-07-01  4:13 ` [PATCH 09/20] python/aqmp: add AsyncProtocol.accept() method John Snow
2021-07-01  4:13 ` [PATCH 10/20] python/aqmp: add _cb_inbound and _cb_inbound logging hooks John Snow
2021-07-01  4:13 ` [PATCH 11/20] python/aqmp: add AsyncProtocol._readline() method John Snow
2021-07-01  4:13 ` [PATCH 12/20] python/aqmp: add QMP Message format John Snow
2021-07-07 14:52   ` Niteesh G. S.
2021-07-08 16:50     ` John Snow
2021-07-01  4:13 ` [PATCH 13/20] python/aqmp: add well-known QMP object models John Snow
2021-07-01  4:13 ` [PATCH 14/20] python/aqmp: add QMP event support John Snow
2021-07-01  4:13 ` [PATCH 15/20] python/aqmp: add QMP protocol support John Snow
2021-07-01  4:13 ` John Snow [this message]
2021-07-01  4:13 ` [PATCH 17/20] python/aqmp: add execute() interfaces John Snow
2021-07-01  4:13 ` [PATCH 18/20] python/aqmp: add _raw() execution interface John Snow
2021-07-01  4:13 ` [PATCH 19/20] python/aqmp: add asyncio_run compatibility wrapper John Snow
2021-07-01  4:13 ` [PATCH 20/20] python/aqmp: add scary message John Snow
2021-07-05 13:19 ` [PATCH 00/20] python: introduce Asynchronous QMP package Stefan Hajnoczi
2021-07-08 13:24   ` John Snow

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210701041313.1696009-17-jsnow@redhat.com \
    --to=jsnow@redhat.com \
    --cc=armbru@redhat.com \
    --cc=crosa@redhat.com \
    --cc=eblake@redhat.com \
    --cc=ehabkost@redhat.com \
    --cc=niteesh.gs@gmail.com \
    --cc=qemu-devel@nongnu.org \
    --cc=stefanha@redhat.com \
    --cc=wainersm@redhat.com \
    --cc=wrampazz@redhat.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.