All of lore.kernel.org
 help / color / mirror / Atom feed
From: John Snow <jsnow@redhat.com>
To: qemu-devel@nongnu.org
Cc: Willian Rampazzo <wrampazz@redhat.com>,
	Eric Blake <eblake@redhat.com>,
	Markus Armbruster <armbru@redhat.com>,
	Wainer dos Santos Moschetta <wainersm@redhat.com>,
	"Niteesh G . S ." <niteesh.gs@gmail.com>,
	Stefan Hajnoczi <stefanha@redhat.com>,
	Cleber Rosa <crosa@redhat.com>, John Snow <jsnow@redhat.com>,
	Eduardo Habkost <ehabkost@redhat.com>
Subject: [PATCH v3 20/25] python/aqmp: add execute() interfaces
Date: Tue,  3 Aug 2021 14:29:36 -0400	[thread overview]
Message-ID: <20210803182941.504537-21-jsnow@redhat.com> (raw)
In-Reply-To: <20210803182941.504537-1-jsnow@redhat.com>

Add execute() and execute_msg().

_execute() is split into _issue() and _reply() halves so that
hypothetical subclasses of QMP that want to support different execution
paradigms can do so.

I anticipate a synchronous interface may have need of separating the
send/reply phases. However, I do not wish to expose that interface here
and want to actively discourage it, so they remain private interfaces.

Signed-off-by: John Snow <jsnow@redhat.com>
---
 python/qemu/aqmp/__init__.py   |   4 +-
 python/qemu/aqmp/qmp_client.py | 202 +++++++++++++++++++++++++++++++--
 2 files changed, 198 insertions(+), 8 deletions(-)

diff --git a/python/qemu/aqmp/__init__.py b/python/qemu/aqmp/__init__.py
index d975c752eaa..4b7df53e006 100644
--- a/python/qemu/aqmp/__init__.py
+++ b/python/qemu/aqmp/__init__.py
@@ -25,7 +25,7 @@
 from .events import EventListener
 from .message import Message
 from .protocol import ConnectError, Runstate, StateError
-from .qmp_client import QMPClient
+from .qmp_client import ExecInterruptedError, ExecuteError, QMPClient
 
 
 # The order of these fields impact the Sphinx documentation order.
@@ -40,4 +40,6 @@
     'AQMPError',
     'StateError',
     'ConnectError',
+    'ExecuteError',
+    'ExecInterruptedError',
 )
diff --git a/python/qemu/aqmp/qmp_client.py b/python/qemu/aqmp/qmp_client.py
index fa0cc7c5ae5..879348feaaa 100644
--- a/python/qemu/aqmp/qmp_client.py
+++ b/python/qemu/aqmp/qmp_client.py
@@ -7,8 +7,7 @@
 accept an incoming connection from that server.
 """
 
-# The import workarounds here are fixed in the next commit.
-import asyncio  # pylint: disable=unused-import # noqa
+import asyncio
 import logging
 from typing import (
     Dict,
@@ -22,8 +21,8 @@
 from .error import AQMPError, ProtocolError
 from .events import Events
 from .message import Message
-from .models import Greeting
-from .protocol import AsyncProtocol
+from .models import ErrorResponse, Greeting
+from .protocol import AsyncProtocol, Runstate, require
 from .util import (
     bottom_half,
     exception_summary,
@@ -65,11 +64,32 @@ class NegotiationError(_WrappedProtocolError):
     """
 
 
+class ExecuteError(AQMPError):
+    """
+    Exception raised by `QMPClient.execute()` on RPC failure.
+
+    :param error_response: The RPC error response object.
+    :param sent: The sent RPC message that caused the failure.
+    :param received: The raw RPC error reply received.
+    """
+    def __init__(self, error_response: ErrorResponse,
+                 sent: Message, received: Message):
+        super().__init__(error_response.error.desc)
+        #: The sent `Message` that caused the failure
+        self.sent: Message = sent
+        #: The received `Message` that indicated failure
+        self.received: Message = received
+        #: The parsed error response
+        self.error: ErrorResponse = error_response
+        #: The QMP error class
+        self.error_class: str = error_response.error.class_
+
+
 class ExecInterruptedError(AQMPError):
     """
-    Exception raised when an RPC is interrupted.
+    Exception raised by `execute()` (et al) when an RPC is interrupted.
 
-    This error is raised when an execute() statement could not be
+    This error is raised when an `execute()` statement could not be
     completed.  This can occur because the connection itself was
     terminated before a reply was received.
 
@@ -112,6 +132,27 @@ class ServerParseError(_MsgProtocolError):
     """
 
 
+class BadReplyError(_MsgProtocolError):
+    """
+    An execution reply was successfully routed, but not understood.
+
+    If a QMP message is received with an 'id' field to allow it to be
+    routed, but is otherwise malformed, this exception will be raised.
+
+    A reply message is malformed if it is missing either the 'return' or
+    'error' keys, or if the 'error' value has missing keys or members of
+    the wrong type.
+
+    :param error_message: Human-readable string describing the error.
+    :param msg: The malformed reply that was received.
+    :param sent: The message that was sent that prompted the error.
+    """
+    def __init__(self, error_message: str, msg: Message, sent: Message):
+        super().__init__(error_message, msg)
+        #: The sent `Message` that caused the failure
+        self.sent = sent
+
+
 class QMPClient(AsyncProtocol[Message], Events):
     """
     Implements a QMP client connection.
@@ -174,6 +215,9 @@ def __init__(self, name: Optional[str] = None) -> None:
         # Cached Greeting, if one was awaited.
         self._greeting: Optional[Greeting] = None
 
+        # Command ID counter
+        self._execute_id = 0
+
         # Incoming RPC reply messages.
         self._pending: Dict[
             Union[str, None],
@@ -363,12 +407,135 @@ def _do_send(self, msg: Message) -> None:
         assert self._writer is not None
         self._writer.write(bytes(msg))
 
+    @upper_half
+    def _get_exec_id(self) -> str:
+        exec_id = f"__aqmp#{self._execute_id:05d}"
+        self._execute_id += 1
+        return exec_id
+
+    @upper_half
+    async def _issue(self, msg: Message) -> Union[None, str]:
+        """
+        Issue a QMP `Message` and do not wait for a reply.
+
+        :param msg: The QMP `Message` to send to the server.
+
+        :return: The ID of the `Message` sent.
+        """
+        msg_id: Optional[str] = None
+        if 'id' in msg:
+            assert isinstance(msg['id'], str)
+            msg_id = msg['id']
+
+        self._pending[msg_id] = asyncio.Queue(maxsize=1)
+        await self._outgoing.put(msg)
+
+        return msg_id
+
+    @upper_half
+    async def _reply(self, msg_id: Union[str, None]) -> Message:
+        """
+        Await a reply to a previously issued QMP message.
+
+        :param msg_id: The ID of the previously issued message.
+
+        :return: The reply from the server.
+        :raise ExecInterruptedError:
+            When the reply could not be retrieved because the connection
+            was lost, or some other problem.
+        """
+        queue = self._pending[msg_id]
+        result = await queue.get()
+
+        try:
+            if isinstance(result, ExecInterruptedError):
+                raise result
+            return result
+        finally:
+            del self._pending[msg_id]
+
+    @upper_half
+    async def _execute(self, msg: Message, assign_id: bool = True) -> Message:
+        """
+        Send a QMP `Message` to the server and await a reply.
+
+        This method *assumes* you are sending some kind of an execute
+        statement that *will* receive a reply.
+
+        An execution ID will be assigned if assign_id is `True`. It can be
+        disabled, but this requires that an ID is manually assigned
+        instead. For manually assigned IDs, you must not use the string
+        '__aqmp#' anywhere in the ID.
+
+        :param msg: The QMP `Message` to execute.
+        :param assign_id: If True, assign a new execution ID.
+
+        :return: Execution reply from the server.
+        :raise ExecInterruptedError:
+            When the reply could not be retrieved because the connection
+            was lost, or some other problem.
+        """
+        if assign_id:
+            msg['id'] = self._get_exec_id()
+        elif 'id' in msg:
+            assert isinstance(msg['id'], str)
+            assert '__aqmp#' not in msg['id']
+
+        exec_id = await self._issue(msg)
+        return await self._reply(exec_id)
+
+    @upper_half
+    @require(Runstate.RUNNING)
+    async def execute_msg(self, msg: Message) -> object:
+        """
+        Execute a QMP command and return its value.
+
+        :param msg: The QMP `Message` to execute.
+
+        :return:
+            The command execution return value from the server. The type of
+            object returned depends on the command that was issued,
+            though most in QEMU return a `dict`.
+        :raise ValueError:
+            If the QMP `Message` does not have either the 'execute' or
+            'exec-oob' fields set.
+        :raise ExecuteError: When the server returns an error response.
+        :raise ExecInterruptedError: if the connection was terminated early.
+        """
+        if not ('execute' in msg or 'exec-oob' in msg):
+            raise ValueError("Requires 'execute' or 'exec-oob' message")
+
+        # Copy the Message so that the ID assigned by _execute() is
+        # local to this method; allowing the ID to be seen in raised
+        # Exceptions but without modifying the caller's held copy.
+        msg = Message(msg)
+        reply = await self._execute(msg)
+
+        if 'error' in reply:
+            try:
+                error_response = ErrorResponse(reply)
+            except (KeyError, TypeError) as err:
+                # Error response was malformed.
+                raise BadReplyError(
+                    "QMP error reply is malformed", reply, msg,
+                ) from err
+
+            raise ExecuteError(error_response, msg, reply)
+
+        if 'return' not in reply:
+            raise BadReplyError(
+                "QMP reply is missing a 'error' or 'return' member",
+                reply, msg,
+            )
+
+        return reply['return']
+
     @classmethod
     def make_execute_msg(cls, cmd: str,
                          arguments: Optional[Mapping[str, object]] = None,
                          oob: bool = False) -> Message:
         """
-        Create an executable message to be sent later.
+        Create an executable message to be sent by `execute_msg` later.
 
         :param cmd: QMP command name.
         :param arguments: Arguments (if any). Must be JSON-serializable.
@@ -380,3 +547,24 @@ def make_execute_msg(cls, cmd: str,
         if arguments is not None:
             msg['arguments'] = arguments
         return msg
+
+    @upper_half
+    async def execute(self, cmd: str,
+                      arguments: Optional[Mapping[str, object]] = None,
+                      oob: bool = False) -> object:
+        """
+        Execute a QMP command and return its value.
+
+        :param cmd: QMP command name.
+        :param arguments: Arguments (if any). Must be JSON-serializable.
+        :param oob: If `True`, execute "out of band".
+
+        :return:
+            The command execution return value from the server. The type of
+            object returned depends on the command that was issued,
+            though most in QEMU return a `dict`.
+        :raise ExecuteError: When the server returns an error response.
+        :raise ExecInterruptedError: if the connection was terminated early.
+        """
+        msg = self.make_execute_msg(cmd, arguments, oob=oob)
+        return await self.execute_msg(msg)
-- 
2.31.1



  parent reply	other threads:[~2021-08-03 18:48 UTC|newest]

Thread overview: 42+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-08-03 18:29 [PATCH v3 00/25] python: introduce Asynchronous QMP package John Snow
2021-08-03 18:29 ` [PATCH v3 01/25] python/aqmp: add asynchronous QMP (AQMP) subpackage John Snow
2021-08-03 18:29 ` [PATCH v3 02/25] python/aqmp: add error classes John Snow
2021-08-03 18:29 ` [PATCH v3 03/25] python/pylint: Add exception for TypeVar names ('T') John Snow
2021-08-03 18:29 ` [PATCH v3 04/25] python/aqmp: add asyncio compatibility wrappers John Snow
2021-08-03 18:29 ` [PATCH v3 05/25] python/aqmp: add generic async message-based protocol support John Snow
2021-08-03 18:29 ` [PATCH v3 06/25] python/aqmp: add runstate state machine to AsyncProtocol John Snow
2021-08-05 21:02   ` Eric Blake
2021-08-05 21:30     ` John Snow
2021-08-03 18:29 ` [PATCH v3 07/25] python/aqmp: Add logging utility helpers John Snow
2021-08-17 19:14   ` Eric Blake
2021-08-03 18:29 ` [PATCH v3 08/25] python/aqmp: add logging to AsyncProtocol John Snow
2021-08-17 19:18   ` Eric Blake
2021-08-03 18:29 ` [PATCH v3 09/25] python/aqmp: add AsyncProtocol.accept() method John Snow
2021-08-17 19:29   ` Eric Blake
2021-08-18 14:24     ` John Snow
2021-08-19 14:50       ` Eric Blake
2021-08-19 15:48         ` John Snow
2021-08-19 16:43           ` Eduardo Habkost
2021-08-03 18:29 ` [PATCH v3 10/25] python/aqmp: add configurable read buffer limit John Snow
2021-08-17 19:30   ` Eric Blake
2021-08-03 18:29 ` [PATCH v3 11/25] python/aqmp: add _cb_inbound and _cb_outbound logging hooks John Snow
2021-08-17 19:32   ` Eric Blake
2021-08-03 18:29 ` [PATCH v3 12/25] python/aqmp: add AsyncProtocol._readline() method John Snow
2021-08-17 19:36   ` Eric Blake
2021-08-03 18:29 ` [PATCH v3 13/25] python/aqmp: add QMP Message format John Snow
2021-08-17 19:47   ` Eric Blake
2021-08-18 14:31     ` John Snow
2021-08-03 18:29 ` [PATCH v3 14/25] python/aqmp: add well-known QMP object models John Snow
2021-08-03 18:29 ` [PATCH v3 15/25] python/aqmp: add QMP event support John Snow
2021-08-03 18:29 ` [PATCH v3 16/25] python/pylint: disable too-many-function-args John Snow
2021-08-03 18:29 ` [PATCH v3 17/25] python/aqmp: add QMP protocol support John Snow
2021-08-03 18:29 ` [PATCH v3 18/25] python/pylint: disable no-member check John Snow
2021-08-03 18:29 ` [PATCH v3 19/25] python/aqmp: Add message routing to QMP protocol John Snow
2021-08-03 18:29 ` John Snow [this message]
2021-08-03 18:29 ` [PATCH v3 21/25] python/aqmp: add _raw() execution interface John Snow
2021-08-03 18:29 ` [PATCH v3 22/25] python/aqmp: add asyncio_run compatibility wrapper John Snow
2021-08-03 18:29 ` [PATCH v3 23/25] python/aqmp: add scary message John Snow
2021-08-03 18:29 ` [PATCH v3 24/25] python: bump avocado to v90.0 John Snow
2021-08-03 18:29 ` [PATCH v3 25/25] python/aqmp: add AsyncProtocol unit tests John Snow
2021-08-04 18:41   ` John Snow
2021-08-16 21:44 ` [PATCH v3 00/25] python: introduce Asynchronous QMP package John Snow

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210803182941.504537-21-jsnow@redhat.com \
    --to=jsnow@redhat.com \
    --cc=armbru@redhat.com \
    --cc=crosa@redhat.com \
    --cc=eblake@redhat.com \
    --cc=ehabkost@redhat.com \
    --cc=niteesh.gs@gmail.com \
    --cc=qemu-devel@nongnu.org \
    --cc=stefanha@redhat.com \
    --cc=wainersm@redhat.com \
    --cc=wrampazz@redhat.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.