qemu-devel.nongnu.org archive mirror
 help / color / mirror / Atom feed
* [PATCH v6 0/7] Live Migration With IAA
@ 2024-05-05 16:57 Yuan Liu
  2024-05-05 16:57 ` [PATCH v6 1/7] docs/migration: add qpl compression feature Yuan Liu
                   ` (6 more replies)
  0 siblings, 7 replies; 23+ messages in thread
From: Yuan Liu @ 2024-05-05 16:57 UTC (permalink / raw)
  To: peterx, farosas; +Cc: qemu-devel, yuan1.liu, nanhai.zou

I am writing to submit a code change aimed at enhancing live migration
acceleration by leveraging the compression capability of the Intel
In-Memory Analytics Accelerator (IAA).

The implementation of the IAA (de)compression code is based on Intel Query
Processing Library (QPL), an open-source software project designed for
high-performance query processing operations on Intel CPUs.
https://github.com/intel/qpl

I would like to summarize the progress so far
1. QPL will be used as an independent compression method like ZLIB and ZSTD,
   For the summary of issues compatible with ZLIB, please refer to
   docs/devel/migration/qpl-compression.rst

2. The QPL method supports both software path and hardware path (use IAA device)
   for multifd migration compression and decompression.
   The hardware path is always used first, if the hardware path is unavailable,
   it automatically falls back to the software path.

3. Compression accelerator related patches are removed from this patch set and
   will be added to the QAT patch set, we will submit separate patches to use
   QAT to accelerate ZLIB and ZSTD.

4. Advantages of using IAA accelerator include:
   a. Compared with the non-compression method, it can improve downtime
      performance without adding additional host resources (both CPU and
      network).
   b. Compared with using software compression methods (ZSTD/ZLIB), it can
      provide high data compression ratio and save a lot of CPU resources
      used for compression.

Test condition:
  1. Host CPUs are based on Sapphire Rapids
  2. VM type, 16 vCPU and 64G memory
  3. The source and destination respectively use 4 IAA devices.
  4. The workload in the VM
    a. all vCPUs are idle state
    b. 90% of the virtual machine's memory is used, use silesia to fill
       the memory.
       The introduction of silesia:
       https://sun.aei.polsl.pl//~sdeor/index.php?page=silesia
  5. Set "--mem-prealloc" boot parameter on the destination, this parameter
     can make IAA performance better and related introduction is added here.
     docs/devel/migration/qpl-compression.rst
  6. Source migration configuration commands
     a. migrate_set_capability multifd on
     b. migrate_set_parameter multifd-channels 2/4/8
     c. migrate_set_parameter downtime-limit 300
     f. migrate_set_parameter max-bandwidth 100G/1G
     d. migrate_set_parameter multifd-compression none/qpl/zstd
  7. Destination migration configuration commands
     a. migrate_set_capability multifd on
     b. migrate_set_parameter multifd-channels 2/4/8
     c. migrate_set_parameter multifd-compression none/qpl/zstd

Early migration result, each result is the average of three tests

 +--------+-------------+--------+--------+---------+----------+------|
 |        | The number  |total   |downtime|network  |pages per | CPU  |
 | None   | of channels |time(ms)|(ms)    |bandwidth|second    | Util |
 | Comp   |             |        |        |(mbps)   |          |      |
 |        +-------------+-----------------+---------+----------+------+
 |Network |            2|    8571|      69|    58391|   1896525|  256%|
 |BW:100G +-------------+--------+--------+---------+----------+------+
 |        |            4|    7180|      92|    69736|   1865640|  300%|
 |        +-------------+--------+--------+---------+----------+------+
 |        |            8|    7090|     121|    70562|   2174060|  307%|
 +--------+-------------+--------+--------+---------+----------+------+

 +--------+-------------+--------+--------+---------+----------+------|
 |        | The number  |total   |downtime|network  |pages per | CPU  |
 |QPL(IAA)| of channels |time(ms)|(ms)    |bandwidth|second    | Util |
 | Comp   |             |        |        |(mbps)   |          |      |
 |        +-------------+-----------------+---------+----------+------+
 |Network |            2|    8413|      34|    30067|   1732411|  230%|
 |BW:100G +-------------+--------+--------+---------+----------+------+
 |        |            4|    6559|      32|    38804|   1689954|  450%|
 |        +-------------+--------+--------+---------+----------+------+
 |        |            8|    6623|      37|    38745|   1566507|  790%|
 +--------+-------------+--------+--------+---------+----------+------+

 +--------+-------------+--------+--------+---------+----------+------|
 |        | The number  |total   |downtime|network  |pages per | CPU  |
 | QPL(SW)| of channels |time(ms)|(ms)    |bandwidth|second    | Util |
 | Comp   |             |        |        |(mbps)   |          |      |
 |        +-------------+-----------------+---------+----------+------+
 |Network |            2|   93776|      30|     3050|    500815|  210%|
 |BW:100G +-------------+--------+--------+---------+----------+------+
 |        |            4|   47263|      57|     6068|   1376627|  403%|
 |        +-------------+--------+--------+---------+----------+------+
 |        |            8|   24474|      37|    11828|   1951372|  800%|
 +--------+-------------+--------+--------+---------+----------+------+

 +--------+-------------+--------+--------+---------+----------+------|
 |        | The number  |total   |downtime|network  |pages per | CPU  |
 | ZSTD   | of channels |time(ms)|(ms)    |bandwidth|second    | Util |
 | Comp   |             |        |        |(mbps)   |          |      |
 |        +-------------+-----------------+---------+----------+------+
 |Network |            2|   95846|      24|     1800|    521829|  203%|
 |BW:100G +-------------+--------+--------+---------+----------+------+
 |        |            4|   49004|      24|     3529|    890532|  403%|
 |        +-------------+--------+--------+---------+----------+------+
 |        |            8|   25574|      32|     6782|   1762222|  800%|
 +--------+-------------+--------+--------+---------+----------+------+

When network bandwidth resource is sufficient, QPL can improve downtime
by 2x compared to no compression. In this scenario, with 4/8 channels,
IAA hardware resources are fully used, so adding more channels will not
gain more benefits.


 +--------+-------------+--------+--------+---------+----------+------|
 |        | The number  |total   |downtime|network  |pages per | CPU  |
 | None   | of channels |time(ms)|(ms)    |bandwidth|second    | Util |
 | Comp   |             |        |        |(mbps)   |          |      |
 |        +-------------+-----------------+---------+----------+------+
 |Network |            2|   57758|      66|     8643|    264617|   34%|
 |BW:  1G +-------------+--------+--------+---------+----------+------+
 |        |            4|   57216|      58|     8726|    266773|   34%|
 |        +-------------+--------+--------+---------+----------+------+
 |        |            8|   56708|      53|     8804|    270223|   33%|
 +--------+-------------+--------+--------+---------+----------+------+

 +--------+-------------+--------+--------+---------+----------+------|
 |        | The number  |total   |downtime|network  |pages per | CPU  |
 |QPL(IAA)| of channels |time(ms)|(ms)    |bandwidth|second    | Util |
 | Comp   |             |        |        |(mbps)   |          |      |
 |        +-------------+-----------------+---------+----------+------+
 |Network |            2|   30129|      34|     8345|   2224761|   54%|
 |BW:  1G +-------------+--------+--------+---------+----------+------+
 |        |            4|   30317|      39|     8300|   2025220|   73%|
 |        +-------------+--------+--------+---------+----------+------+
 |        |            8|   29615|      35|     8514|   2250122|  131%|
 +--------+-------------+--------+--------+---------+----------+------+

 +--------+-------------+--------+--------+---------+----------+------|
 |        | The number  |total   |downtime|network  |pages per | CPU  |
 | QPL(SW)| of channels |time(ms)|(ms)    |bandwidth|second    | Util |
 | Comp   |             |        |        |(mbps)   |          |      |
 |        +-------------+-----------------+---------+----------+------+
 |Network |            2|   93358|      41|     3064|    534588|  202%|
 |BW:  1G +-------------+--------+--------+---------+----------+------+
 |        |            4|   47266|      52|     6067|   1392941|  402%|
 |        +-------------+--------+--------+---------+----------+------+
 |        |            8|   33134|      45|     8706|   2433242|  580%|
 +--------+-------------+--------+--------+---------+----------+------+

 +--------+-------------+--------+--------+---------+----------+------|
 |        | The number  |total   |downtime|network  |pages per | CPU  |
 | ZSTD   | of channels |time(ms)|(ms)    |bandwidth|second    | Util |
 | Comp   |             |        |        |(mbps)   |          |      |
 |        +-------------+-----------------+---------+----------+------+
 |Network |            2|   95750|      24|     1802|    477236|  202%|
 |BW:  1G +-------------+--------+--------+---------+----------+------+
 |        |            4|   48907|      24|     3536|   1002142|  404%|
 |        +-------------+--------+--------+---------+----------+------+
 |        |            8|   25568|      32|     6783|   1696437|  800%|
 +--------+-------------+--------+--------+---------+----------+------+

When network bandwidth resource is limited, the "page perf second" metric
decreases for none compression, the success rate of migration will reduce.
Comparison of QPL and ZSTD compression methods, QPL can save a lot of CPU
resources used for compression.

v2:
  - add support for multifd compression accelerator
  - add support for the QPL accelerator in the multifd
    compression accelerator
  - fixed the issue that QPL was compiled into the migration
    module by default

v3:
  - use Meson instead of pkg-config to resolve QPL build
    dependency issue
  - fix coding style
  - fix a CI issue for get_multifd_ops function in multifd.c file

v4:
  - patch based on commit: da96ad4a6a Merge tag 'hw-misc-20240215' of
    https://github.com/philmd/qemu into staging
  - remove the compression accelerator implementation patches, the patches
    will be placed in the QAT accelerator implementation.
  - introduce QPL as a new compression method
  - add QPL compression documentation
  - add QPL compression migration test
  - fix zlib/zstd compression level issue

v5:
  - patch based on v9.0.0-rc0 (c62d54d0a8)
  - use pkgconfig to check libaccel-config, libaccel-config is already
    in many distributions.
  - initialize the IOV of the sender by the specific compression method
  - refine the coding style
  - remove the zlib/zstd compression level not working patch, the issue
    has been solved

v6:
  - rebase to commit id 248f6f62df, Merge tag 'pull-axp-20240504' of
    https://gitlab.com/rth7680/qemu into staging.
  - add qpl software path, if the hardware path(IAA) is unavailable, the
    qpl will fall back to software path automatically.
  - add pkgconfig to check qpl, qpl version 1.5 already supports pkgconfig,
    users can install the qpl library and qpl.pc file through source code.
  - remove libaccel-config library detection, if there is no the library,
    the qpl will automatically switch to the software path.
  - use g_malloc0 instead of mmap to apply for memory.
  - add the more introduction of IAA device management, including usage
    permission, resource configuration, etc in qpl-compression.rst.
  - modified unit test to use software path to complete testing when
    hardware path is unavailable.

Yuan Liu (7):
  docs/migration: add qpl compression feature
  migration/multifd: put IOV initialization into compression method
  configure: add --enable-qpl build option
  migration/multifd: add qpl compression method
  migration/multifd: implement initialization of qpl compression
  migration/multifd: implement qpl compression and decompression
  tests/migration-test: add qpl compression test

Yuan Liu (7):
  docs/migration: add qpl compression feature
  migration/multifd: put IOV initialization into compression method
  configure: add --enable-qpl build option
  migration/multifd: add qpl compression method
  migration/multifd: implement initialization of qpl compression
  migration/multifd: implement qpl compression and decompression
  tests/migration-test: add qpl compression test

 docs/devel/migration/features.rst        |   1 +
 docs/devel/migration/qpl-compression.rst | 262 +++++++++++
 hw/core/qdev-properties-system.c         |   2 +-
 meson.build                              |   8 +
 meson_options.txt                        |   2 +
 migration/meson.build                    |   1 +
 migration/multifd-qpl.c                  | 566 +++++++++++++++++++++++
 migration/multifd-zlib.c                 |   7 +
 migration/multifd-zstd.c                 |   8 +-
 migration/multifd.c                      |  22 +-
 migration/multifd.h                      |   1 +
 qapi/migration.json                      |   7 +-
 scripts/meson-buildoptions.sh            |   3 +
 tests/qtest/migration-test.c             |  24 +
 14 files changed, 901 insertions(+), 13 deletions(-)
 create mode 100644 docs/devel/migration/qpl-compression.rst
 create mode 100644 migration/multifd-qpl.c

-- 
2.39.3



^ permalink raw reply	[flat|nested] 23+ messages in thread

* [PATCH v6 1/7] docs/migration: add qpl compression feature
  2024-05-05 16:57 [PATCH v6 0/7] Live Migration With IAA Yuan Liu
@ 2024-05-05 16:57 ` Yuan Liu
  2024-05-05 16:57 ` [PATCH v6 2/7] migration/multifd: put IOV initialization into compression method Yuan Liu
                   ` (5 subsequent siblings)
  6 siblings, 0 replies; 23+ messages in thread
From: Yuan Liu @ 2024-05-05 16:57 UTC (permalink / raw)
  To: peterx, farosas; +Cc: qemu-devel, yuan1.liu, nanhai.zou

add Intel Query Processing Library (QPL) compression method
introduction

Signed-off-by: Yuan Liu <yuan1.liu@intel.com>
Reviewed-by: Nanhai Zou <nanhai.zou@intel.com>
---
 docs/devel/migration/features.rst        |   1 +
 docs/devel/migration/qpl-compression.rst | 262 +++++++++++++++++++++++
 2 files changed, 263 insertions(+)
 create mode 100644 docs/devel/migration/qpl-compression.rst

diff --git a/docs/devel/migration/features.rst b/docs/devel/migration/features.rst
index d5ca7b86d5..bc98b65075 100644
--- a/docs/devel/migration/features.rst
+++ b/docs/devel/migration/features.rst
@@ -12,3 +12,4 @@ Migration has plenty of features to support different use cases.
    virtio
    mapped-ram
    CPR
+   qpl-compression
diff --git a/docs/devel/migration/qpl-compression.rst b/docs/devel/migration/qpl-compression.rst
new file mode 100644
index 0000000000..13fb7a67b1
--- /dev/null
+++ b/docs/devel/migration/qpl-compression.rst
@@ -0,0 +1,262 @@
+===============
+QPL Compression
+===============
+The Intel Query Processing Library (Intel ``QPL``) is an open-source library to
+provide compression and decompression features and it is based on deflate
+compression algorithm (RFC 1951).
+
+The ``QPL`` compression relies on Intel In-Memory Analytics Accelerator(``IAA``)
+and Shared Virtual Memory(``SVM``) technology, they are new features supported
+from Intel 4th Gen Intel Xeon Scalable processors, codenamed Sapphire Rapids
+processor(``SPR``).
+
+For more ``QPL`` introduction, please refer to `QPL Introduction
+<https://intel.github.io/qpl/documentation/introduction_docs/introduction.html>`_
+
+QPL Compression Framework
+=========================
+
+::
+
+  +----------------+       +------------------+
+  | MultiFD Thread |       |accel-config tool |
+  +-------+--------+       +--------+---------+
+          |                         |
+          |                         |
+          |compress/decompress      |
+  +-------+--------+                | Setup IAA
+  |  QPL library   |                | Resources
+  +-------+---+----+                |
+          |   |                     |
+          |   +-------------+-------+
+          |   Open IAA      |
+          |   Devices +-----+-----+
+          |           |idxd driver|
+          |           +-----+-----+
+          |                 |
+          |                 |
+          |           +-----+-----+
+          +-----------+IAA Devices|
+      Submit jobs     +-----------+
+      via enqcmd
+
+
+QPL Build And Installation
+--------------------------
+
+.. code-block:: shell
+
+  $git clone --recursive https://github.com/intel/qpl.git qpl
+  $mkdir qpl/build
+  $cd qpl/build
+  $cmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_INSTALL_PREFIX=/usr -DQPL_LIBRARY_TYPE=SHARED ..
+  $sudo cmake --build . --target install
+
+For more details about ``QPL`` installation, please refer to `QPL Installation
+<https://intel.github.io/qpl/documentation/get_started_docs/installation.html>`_
+
+IAA Device Management
+---------------------
+
+The number of ``IAA`` devices will vary depending on the Xeon product model.
+On a ``SPR`` server, there can be a maximum of 8 ``IAA`` devices, with up to
+4 devices per socket.
+
+By default, all ``IAA`` devices are disabled and need to be configured and
+enabled by users manually.
+
+Check the number of devices through the following command
+
+.. code-block:: shell
+
+  #lspci -d 8086:0cfe
+  6a:02.0 System peripheral: Intel Corporation Device 0cfe
+  6f:02.0 System peripheral: Intel Corporation Device 0cfe
+  74:02.0 System peripheral: Intel Corporation Device 0cfe
+  79:02.0 System peripheral: Intel Corporation Device 0cfe
+  e7:02.0 System peripheral: Intel Corporation Device 0cfe
+  ec:02.0 System peripheral: Intel Corporation Device 0cfe
+  f1:02.0 System peripheral: Intel Corporation Device 0cfe
+  f6:02.0 System peripheral: Intel Corporation Device 0cfe
+
+IAA Device Configuration And Enabling
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+The ``accel-config`` tool is used to enable ``IAA`` devices and configure
+``IAA`` hardware resources(work queues and engines). One ``IAA`` device
+has 8 work queues and 8 processing engines, multiple engines can be assigned
+to a work queue via ``group`` attribute.
+
+For ``accel-config`` installation, please refer to `accel-config installation
+<https://github.com/intel/idxd-config>`_
+
+One example of configuring and enabling an ``IAA`` device.
+
+.. code-block:: shell
+
+  #accel-config config-engine iax1/engine1.0 -g 0
+  #accel-config config-engine iax1/engine1.1 -g 0
+  #accel-config config-engine iax1/engine1.2 -g 0
+  #accel-config config-engine iax1/engine1.3 -g 0
+  #accel-config config-engine iax1/engine1.4 -g 0
+  #accel-config config-engine iax1/engine1.5 -g 0
+  #accel-config config-engine iax1/engine1.6 -g 0
+  #accel-config config-engine iax1/engine1.7 -g 0
+  #accel-config config-wq iax1/wq1.0 -g 0 -s 128 -p 10 -b 1 -t 128 -m shared -y user -n app1 -d user
+  #accel-config enable-device iax1
+  #accel-config enable-wq iax1/wq1.0
+
+.. note::
+   IAX is an early name for IAA
+
+- The ``IAA`` device index is 1, use ``ls -lh /sys/bus/dsa/devices/iax*``
+  command to query the ``IAA`` device index.
+
+- 8 engines and 1 work queue are configured in group 0, so all compression jobs
+  submitted to this work queue can be processed by all engines at the same time.
+
+- Set work queue attributes including the work mode, work queue size and so on.
+
+- Enable the ``IAA1`` device and work queue 1.0
+
+.. note::
+
+  Set work queue mode to shared mode, since ``QPL`` library only supports
+  shared mode
+
+For more detailed configuration, please refer to `IAA Configuration Samples
+<https://github.com/intel/idxd-config/tree/stable/Documentation/accfg>`_
+
+IAA Unit Test
+^^^^^^^^^^^^^
+
+- Enabling ``IAA`` devices for Xeon platform, please refer to `IAA User Guide
+  <https://www.intel.com/content/www/us/en/content-details/780887/intel-in-memory-analytics-accelerator-intel-iaa.html>`_
+
+- ``IAA`` device driver is Intel Data Accelerator Driver (idxd), it is
+  recommended that the minimum version of Linux kernel is 5.18.
+
+- Add ``"intel_iommu=on,sm_on"`` parameter to kernel command line
+  for ``SVM`` feature enabling.
+
+Here is an easy way to verify ``IAA`` device driver and ``SVM`` with `iaa_test
+<https://github.com/intel/idxd-config/tree/stable/test>`_
+
+.. code-block:: shell
+
+  #./test/iaa_test
+   [ info] alloc wq 0 shared size 128 addr 0x7f26cebe5000 batch sz 0xfffffffe xfer sz 0x80000000
+   [ info] test noop: tflags 0x1 num_desc 1
+   [ info] preparing descriptor for noop
+   [ info] Submitted all noop jobs
+   [ info] verifying task result for 0x16f7e20
+   [ info] test with op 0 passed
+
+
+IAA Resources Allocation For Migration
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+There is no ``IAA`` resource configuration parameters for migration and
+``accel-config`` tool configuration cannot directly specify the ``IAA``
+resources used for migration.
+
+The multifd migration with ``QPL`` compression method  will use all work
+queues that are enabled and shared mode.
+
+.. note::
+
+  Accessing IAA resources requires ``sudo`` command or ``root`` privileges
+  by default. Administrators can modify the IAA device node ownership
+  so that Qemu can use IAA with specified user permissions.
+
+  For example
+
+  #chown -R Qemu /dev/iax
+
+
+Shared Virtual Memory(SVM) Introduction
+=======================================
+
+An ability for an accelerator I/O device to operate in the same virtual
+memory space of applications on host processors. It also implies the
+ability to operate from pageable memory, avoiding functional requirements
+to pin memory for DMA operations.
+
+When using ``SVM`` technology, users do not need to reserve memory for the
+``IAA`` device and perform pin memory operation. The ``IAA`` device can
+directly access data using the virtual address of the process.
+
+For more ``SVM`` technology, please refer to
+`Shared Virtual Addressing (SVA) with ENQCMD
+<https://docs.kernel.org/next/x86/sva.html>`_
+
+
+How To Use QPL Compression In Migration
+=======================================
+
+1 - Installation of ``QPL`` library and ``accel-config`` library if using IAA
+
+2 - Configure and enable ``IAA`` devices and work queues via ``accel-config``
+
+3 - Build ``Qemu`` with ``--enable-qpl`` parameter
+
+  E.g. configure --target-list=x86_64-softmmu --enable-kvm ``--enable-qpl``
+
+4 - Enable ``QPL`` compression during migration
+
+  Set ``migrate_set_parameter multifd-compression qpl`` when migrating, the
+  ``QPL`` compression does not support configuring the compression level, it
+  only supports one compression level.
+
+The Difference Between QPL And ZLIB
+===================================
+
+Although both ``QPL`` and ``ZLIB`` are based on the deflate compression
+algorithm, and ``QPL`` can support the header and tail of ``ZLIB``, ``QPL``
+is still not fully compatible with the ``ZLIB`` compression in the migration.
+
+``QPL`` only supports 4K history buffer, and ``ZLIB`` is 32K by default. The
+``ZLIB`` compressed data that ``QPL`` may not decompress correctly and
+vice versa.
+
+``QPL`` does not support the ``Z_SYNC_FLUSH`` operation in ``ZLIB`` streaming
+compression, current ``ZLIB`` implementation uses ``Z_SYNC_FLUSH``, so each
+``multifd`` thread has a ``ZLIB`` streaming context, and all page compression
+and decompression are based on this stream. ``QPL`` cannot decompress such data
+and vice versa.
+
+The introduction for ``Z_SYNC_FLUSH``, please refer to `Zlib Manual
+<https://www.zlib.net/manual.html>`_
+
+The Best Practices
+==================
+When user enables the IAA device for ``QPL`` compression, it is recommended
+to add ``-mem-prealloc`` parameter to the destination boot parameters. This
+parameter can avoid the occurrence of I/O page fault and reduce the overhead
+of IAA compression and decompression.
+
+The example of booting with ``-mem-prealloc`` parameter
+
+.. code-block:: shell
+
+   $qemu-system-x86_64 --enable-kvm -cpu host --mem-prealloc ...
+
+
+An example about I/O page fault measurement of destination without
+``-mem-prealloc``, the ``svm_prq`` indicates the number of I/O page fault
+occurrences and processing time.
+
+.. code-block:: shell
+
+  #echo 1 > /sys/kernel/debug/iommu/intel/dmar_perf_latency
+  #echo 2 > /sys/kernel/debug/iommu/intel/dmar_perf_latency
+  #echo 3 > /sys/kernel/debug/iommu/intel/dmar_perf_latency
+  #echo 4 > /sys/kernel/debug/iommu/intel/dmar_perf_latency
+  #cat /sys/kernel/debug/iommu/intel/dmar_perf_latency
+  IOMMU: dmar18 Register Base Address: c87fc000
+                  <0.1us   0.1us-1us    1us-10us  10us-100us   100us-1ms    1ms-10ms      >=10ms     min(us)     max(us) average(us)
+   inv_iotlb           0         286         123           0           0           0           0           0           1           0
+  inv_devtlb           0         276         133           0           0           0           0           0           2           0
+     inv_iec           0           0           0           0           0           0           0           0           0           0
+     svm_prq           0           0       25206         364         395           0           0           1         556           9
+
-- 
2.39.3



^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH v6 2/7] migration/multifd: put IOV initialization into compression method
  2024-05-05 16:57 [PATCH v6 0/7] Live Migration With IAA Yuan Liu
  2024-05-05 16:57 ` [PATCH v6 1/7] docs/migration: add qpl compression feature Yuan Liu
@ 2024-05-05 16:57 ` Yuan Liu
  2024-05-10 20:21   ` Fabiano Rosas
  2024-05-27 20:50   ` Peter Xu
  2024-05-05 16:57 ` [PATCH v6 3/7] configure: add --enable-qpl build option Yuan Liu
                   ` (4 subsequent siblings)
  6 siblings, 2 replies; 23+ messages in thread
From: Yuan Liu @ 2024-05-05 16:57 UTC (permalink / raw)
  To: peterx, farosas; +Cc: qemu-devel, yuan1.liu, nanhai.zou

Different compression methods may require different numbers of IOVs.
Based on streaming compression of zlib and zstd, all pages will be
compressed to a data block, so two IOVs are needed for packet header
and compressed data block.

Signed-off-by: Yuan Liu <yuan1.liu@intel.com>
Reviewed-by: Nanhai Zou <nanhai.zou@intel.com>
---
 migration/multifd-zlib.c |  7 +++++++
 migration/multifd-zstd.c |  8 +++++++-
 migration/multifd.c      | 22 ++++++++++++----------
 3 files changed, 26 insertions(+), 11 deletions(-)

diff --git a/migration/multifd-zlib.c b/migration/multifd-zlib.c
index 737a9645d2..2ced69487e 100644
--- a/migration/multifd-zlib.c
+++ b/migration/multifd-zlib.c
@@ -70,6 +70,10 @@ static int zlib_send_setup(MultiFDSendParams *p, Error **errp)
         goto err_free_zbuff;
     }
     p->compress_data = z;
+
+    /* Needs 2 IOVs, one for packet header and one for compressed data */
+    p->iov = g_new0(struct iovec, 2);
+
     return 0;
 
 err_free_zbuff:
@@ -101,6 +105,9 @@ static void zlib_send_cleanup(MultiFDSendParams *p, Error **errp)
     z->buf = NULL;
     g_free(p->compress_data);
     p->compress_data = NULL;
+
+    g_free(p->iov);
+    p->iov = NULL;
 }
 
 /**
diff --git a/migration/multifd-zstd.c b/migration/multifd-zstd.c
index 256858df0a..ca17b7e310 100644
--- a/migration/multifd-zstd.c
+++ b/migration/multifd-zstd.c
@@ -52,7 +52,6 @@ static int zstd_send_setup(MultiFDSendParams *p, Error **errp)
     struct zstd_data *z = g_new0(struct zstd_data, 1);
     int res;
 
-    p->compress_data = z;
     z->zcs = ZSTD_createCStream();
     if (!z->zcs) {
         g_free(z);
@@ -77,6 +76,10 @@ static int zstd_send_setup(MultiFDSendParams *p, Error **errp)
         error_setg(errp, "multifd %u: out of memory for zbuff", p->id);
         return -1;
     }
+    p->compress_data = z;
+
+    /* Needs 2 IOVs, one for packet header and one for compressed data */
+    p->iov = g_new0(struct iovec, 2);
     return 0;
 }
 
@@ -98,6 +101,9 @@ static void zstd_send_cleanup(MultiFDSendParams *p, Error **errp)
     z->zbuff = NULL;
     g_free(p->compress_data);
     p->compress_data = NULL;
+
+    g_free(p->iov);
+    p->iov = NULL;
 }
 
 /**
diff --git a/migration/multifd.c b/migration/multifd.c
index f317bff077..d82885fdbb 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -137,6 +137,13 @@ static int nocomp_send_setup(MultiFDSendParams *p, Error **errp)
         p->write_flags |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY;
     }
 
+    if (multifd_use_packets()) {
+        /* We need one extra place for the packet header */
+        p->iov = g_new0(struct iovec, p->page_count + 1);
+    } else {
+        p->iov = g_new0(struct iovec, p->page_count);
+    }
+
     return 0;
 }
 
@@ -150,6 +157,8 @@ static int nocomp_send_setup(MultiFDSendParams *p, Error **errp)
  */
 static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp)
 {
+    g_free(p->iov);
+    p->iov = NULL;
     return;
 }
 
@@ -228,6 +237,7 @@ static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp)
  */
 static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp)
 {
+    p->iov = g_new0(struct iovec, p->page_count);
     return 0;
 }
 
@@ -240,6 +250,8 @@ static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp)
  */
 static void nocomp_recv_cleanup(MultiFDRecvParams *p)
 {
+    g_free(p->iov);
+    p->iov = NULL;
 }
 
 /**
@@ -783,8 +795,6 @@ static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
     p->packet_len = 0;
     g_free(p->packet);
     p->packet = NULL;
-    g_free(p->iov);
-    p->iov = NULL;
     multifd_send_state->ops->send_cleanup(p, errp);
 
     return *errp == NULL;
@@ -1179,11 +1189,6 @@ bool multifd_send_setup(void)
             p->packet = g_malloc0(p->packet_len);
             p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
             p->packet->version = cpu_to_be32(MULTIFD_VERSION);
-
-            /* We need one extra place for the packet header */
-            p->iov = g_new0(struct iovec, page_count + 1);
-        } else {
-            p->iov = g_new0(struct iovec, page_count);
         }
         p->name = g_strdup_printf("multifdsend_%d", i);
         p->page_size = qemu_target_page_size();
@@ -1353,8 +1358,6 @@ static void multifd_recv_cleanup_channel(MultiFDRecvParams *p)
     p->packet_len = 0;
     g_free(p->packet);
     p->packet = NULL;
-    g_free(p->iov);
-    p->iov = NULL;
     g_free(p->normal);
     p->normal = NULL;
     g_free(p->zero);
@@ -1602,7 +1605,6 @@ int multifd_recv_setup(Error **errp)
             p->packet = g_malloc0(p->packet_len);
         }
         p->name = g_strdup_printf("multifdrecv_%d", i);
-        p->iov = g_new0(struct iovec, page_count);
         p->normal = g_new0(ram_addr_t, page_count);
         p->zero = g_new0(ram_addr_t, page_count);
         p->page_count = page_count;
-- 
2.39.3



^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH v6 3/7] configure: add --enable-qpl build option
  2024-05-05 16:57 [PATCH v6 0/7] Live Migration With IAA Yuan Liu
  2024-05-05 16:57 ` [PATCH v6 1/7] docs/migration: add qpl compression feature Yuan Liu
  2024-05-05 16:57 ` [PATCH v6 2/7] migration/multifd: put IOV initialization into compression method Yuan Liu
@ 2024-05-05 16:57 ` Yuan Liu
  2024-05-05 16:57 ` [PATCH v6 4/7] migration/multifd: add qpl compression method Yuan Liu
                   ` (3 subsequent siblings)
  6 siblings, 0 replies; 23+ messages in thread
From: Yuan Liu @ 2024-05-05 16:57 UTC (permalink / raw)
  To: peterx, farosas; +Cc: qemu-devel, yuan1.liu, nanhai.zou

add --enable-qpl and --disable-qpl options to enable and disable
the QPL compression method for multifd migration.

The Query Processing Library (QPL) is an open-source library
that supports data compression and decompression features. It
is based on the deflate compression algorithm and use Intel
In-Memory Analytics Accelerator(IAA) hardware for compression
and decompression acceleration.

For more live migration with IAA, please refer to the document
docs/devel/migration/qpl-compression.rst

Signed-off-by: Yuan Liu <yuan1.liu@intel.com>
Reviewed-by: Nanhai Zou <nanhai.zou@intel.com>
---
 meson.build                   | 8 ++++++++
 meson_options.txt             | 2 ++
 scripts/meson-buildoptions.sh | 3 +++
 3 files changed, 13 insertions(+)

diff --git a/meson.build b/meson.build
index 5db2dbc12e..2a8d8385fe 100644
--- a/meson.build
+++ b/meson.build
@@ -1204,6 +1204,12 @@ if not get_option('zstd').auto() or have_block
                     required: get_option('zstd'),
                     method: 'pkg-config')
 endif
+qpl = not_found
+if not get_option('qpl').auto() or have_system
+  qpl = dependency('qpl', version: '>=1.5.0',
+                    required: get_option('qpl'),
+                    method: 'pkg-config')
+endif
 virgl = not_found
 
 have_vhost_user_gpu = have_tools and host_os == 'linux' and pixman.found()
@@ -2309,6 +2315,7 @@ config_host_data.set('CONFIG_MALLOC_TRIM', has_malloc_trim)
 config_host_data.set('CONFIG_STATX', has_statx)
 config_host_data.set('CONFIG_STATX_MNT_ID', has_statx_mnt_id)
 config_host_data.set('CONFIG_ZSTD', zstd.found())
+config_host_data.set('CONFIG_QPL', qpl.found())
 config_host_data.set('CONFIG_FUSE', fuse.found())
 config_host_data.set('CONFIG_FUSE_LSEEK', fuse_lseek.found())
 config_host_data.set('CONFIG_SPICE_PROTOCOL', spice_protocol.found())
@@ -4436,6 +4443,7 @@ summary_info += {'snappy support':    snappy}
 summary_info += {'bzip2 support':     libbzip2}
 summary_info += {'lzfse support':     liblzfse}
 summary_info += {'zstd support':      zstd}
+summary_info += {'Query Processing Library support': qpl}
 summary_info += {'NUMA host support': numa}
 summary_info += {'capstone':          capstone}
 summary_info += {'libpmem support':   libpmem}
diff --git a/meson_options.txt b/meson_options.txt
index adc77bae0c..562db29ab4 100644
--- a/meson_options.txt
+++ b/meson_options.txt
@@ -259,6 +259,8 @@ option('xkbcommon', type : 'feature', value : 'auto',
        description: 'xkbcommon support')
 option('zstd', type : 'feature', value : 'auto',
        description: 'zstd compression support')
+option('qpl', type : 'feature', value : 'auto',
+       description: 'Query Processing Library support')
 option('fuse', type: 'feature', value: 'auto',
        description: 'FUSE block device export')
 option('fuse_lseek', type : 'feature', value : 'auto',
diff --git a/scripts/meson-buildoptions.sh b/scripts/meson-buildoptions.sh
index 0a29d35fdb..26bf9e21fd 100644
--- a/scripts/meson-buildoptions.sh
+++ b/scripts/meson-buildoptions.sh
@@ -222,6 +222,7 @@ meson_options_help() {
   printf "%s\n" '                  Xen PCI passthrough support'
   printf "%s\n" '  xkbcommon       xkbcommon support'
   printf "%s\n" '  zstd            zstd compression support'
+  printf "%s\n" '  qpl             Query Processing Library support'
 }
 _meson_option_parse() {
   case $1 in
@@ -562,6 +563,8 @@ _meson_option_parse() {
     --disable-xkbcommon) printf "%s" -Dxkbcommon=disabled ;;
     --enable-zstd) printf "%s" -Dzstd=enabled ;;
     --disable-zstd) printf "%s" -Dzstd=disabled ;;
+    --enable-qpl) printf "%s" -Dqpl=enabled ;;
+    --disable-qpl) printf "%s" -Dqpl=disabled ;;
     *) return 1 ;;
   esac
 }
-- 
2.39.3



^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH v6 4/7] migration/multifd: add qpl compression method
  2024-05-05 16:57 [PATCH v6 0/7] Live Migration With IAA Yuan Liu
                   ` (2 preceding siblings ...)
  2024-05-05 16:57 ` [PATCH v6 3/7] configure: add --enable-qpl build option Yuan Liu
@ 2024-05-05 16:57 ` Yuan Liu
  2024-05-10 14:12   ` Fabiano Rosas
  2024-05-05 16:57 ` [PATCH v6 5/7] migration/multifd: implement initialization of qpl compression Yuan Liu
                   ` (2 subsequent siblings)
  6 siblings, 1 reply; 23+ messages in thread
From: Yuan Liu @ 2024-05-05 16:57 UTC (permalink / raw)
  To: peterx, farosas; +Cc: qemu-devel, yuan1.liu, nanhai.zou

add the Query Processing Library (QPL) compression method

Introduce the qpl as a new multifd migration compression method, it can
use In-Memory Analytics Accelerator(IAA) to accelerate compression and
decompression, which can not only reduce network bandwidth requirement
but also reduce host compression and decompression CPU overhead.

How to enable qpl compression during migration:
migrate_set_parameter multifd-compression qpl

The qpl method only supports one compression level, there is no qpl
compression level parameter added, users do not need to specify the
qpl compression level.

Signed-off-by: Yuan Liu <yuan1.liu@intel.com>
Reviewed-by: Nanhai Zou <nanhai.zou@intel.com>
---
 hw/core/qdev-properties-system.c |  2 +-
 migration/meson.build            |  1 +
 migration/multifd-qpl.c          | 20 ++++++++++++++++++++
 migration/multifd.h              |  1 +
 qapi/migration.json              |  7 ++++++-
 5 files changed, 29 insertions(+), 2 deletions(-)
 create mode 100644 migration/multifd-qpl.c

diff --git a/hw/core/qdev-properties-system.c b/hw/core/qdev-properties-system.c
index d79d6f4b53..6ccd7224f6 100644
--- a/hw/core/qdev-properties-system.c
+++ b/hw/core/qdev-properties-system.c
@@ -659,7 +659,7 @@ const PropertyInfo qdev_prop_fdc_drive_type = {
 const PropertyInfo qdev_prop_multifd_compression = {
     .name = "MultiFDCompression",
     .description = "multifd_compression values, "
-                   "none/zlib/zstd",
+                   "none/zlib/zstd/qpl",
     .enum_table = &MultiFDCompression_lookup,
     .get = qdev_propinfo_get_enum,
     .set = qdev_propinfo_set_enum,
diff --git a/migration/meson.build b/migration/meson.build
index f76b1ba328..1d432d5328 100644
--- a/migration/meson.build
+++ b/migration/meson.build
@@ -43,6 +43,7 @@ if get_option('live_block_migration').allowed()
   system_ss.add(files('block.c'))
 endif
 system_ss.add(when: zstd, if_true: files('multifd-zstd.c'))
+system_ss.add(when: qpl, if_true: files('multifd-qpl.c'))
 
 specific_ss.add(when: 'CONFIG_SYSTEM_ONLY',
                 if_true: files('ram.c',
diff --git a/migration/multifd-qpl.c b/migration/multifd-qpl.c
new file mode 100644
index 0000000000..056a68a060
--- /dev/null
+++ b/migration/multifd-qpl.c
@@ -0,0 +1,20 @@
+/*
+ * Multifd qpl compression accelerator implementation
+ *
+ * Copyright (c) 2023 Intel Corporation
+ *
+ * Authors:
+ *  Yuan Liu<yuan1.liu@intel.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ */
+#include "qemu/osdep.h"
+#include "qemu/module.h"
+
+static void multifd_qpl_register(void)
+{
+    /* noop */
+}
+
+migration_init(multifd_qpl_register);
diff --git a/migration/multifd.h b/migration/multifd.h
index c9d9b09239..5b7d9b15f8 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -40,6 +40,7 @@ MultiFDRecvData *multifd_get_recv_data(void);
 #define MULTIFD_FLAG_NOCOMP (0 << 1)
 #define MULTIFD_FLAG_ZLIB (1 << 1)
 #define MULTIFD_FLAG_ZSTD (2 << 1)
+#define MULTIFD_FLAG_QPL (4 << 1)
 
 /* This value needs to be a multiple of qemu_target_page_size() */
 #define MULTIFD_PACKET_SIZE (512 * 1024)
diff --git a/qapi/migration.json b/qapi/migration.json
index 8c65b90328..854e8609bd 100644
--- a/qapi/migration.json
+++ b/qapi/migration.json
@@ -628,11 +628,16 @@
 #
 # @zstd: use zstd compression method.
 #
+# @qpl: use qpl compression method. Query Processing Library(qpl) is based on
+#       the deflate compression algorithm and use the Intel In-Memory Analytics
+#       Accelerator(IAA) accelerated compression and decompression. (Since 9.1)
+#
 # Since: 5.0
 ##
 { 'enum': 'MultiFDCompression',
   'data': [ 'none', 'zlib',
-            { 'name': 'zstd', 'if': 'CONFIG_ZSTD' } ] }
+            { 'name': 'zstd', 'if': 'CONFIG_ZSTD' },
+            { 'name': 'qpl', 'if': 'CONFIG_QPL' } ] }
 
 ##
 # @MigMode:
-- 
2.39.3



^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH v6 5/7] migration/multifd: implement initialization of qpl compression
  2024-05-05 16:57 [PATCH v6 0/7] Live Migration With IAA Yuan Liu
                   ` (3 preceding siblings ...)
  2024-05-05 16:57 ` [PATCH v6 4/7] migration/multifd: add qpl compression method Yuan Liu
@ 2024-05-05 16:57 ` Yuan Liu
  2024-05-10 20:45   ` Fabiano Rosas
  2024-05-10 20:52   ` Fabiano Rosas
  2024-05-05 16:57 ` [PATCH v6 6/7] migration/multifd: implement qpl compression and decompression Yuan Liu
  2024-05-05 16:57 ` [PATCH v6 7/7] tests/migration-test: add qpl compression test Yuan Liu
  6 siblings, 2 replies; 23+ messages in thread
From: Yuan Liu @ 2024-05-05 16:57 UTC (permalink / raw)
  To: peterx, farosas; +Cc: qemu-devel, yuan1.liu, nanhai.zou

the qpl initialization includes memory allocation for compressed
data and the qpl job initialization.

the qpl job initialization will check if the In-Memory Analytics
Accelerator(IAA) device is available and use the IAA device first.
If the platform does not have IAA device or the IAA device is not
available, the qpl compression will fallback to the software path.

Signed-off-by: Yuan Liu <yuan1.liu@intel.com>
Reviewed-by: Nanhai Zou <nanhai.zou@intel.com>
---
 migration/multifd-qpl.c | 272 +++++++++++++++++++++++++++++++++++++++-
 1 file changed, 271 insertions(+), 1 deletion(-)

diff --git a/migration/multifd-qpl.c b/migration/multifd-qpl.c
index 056a68a060..89fa51091a 100644
--- a/migration/multifd-qpl.c
+++ b/migration/multifd-qpl.c
@@ -9,12 +9,282 @@
  * This work is licensed under the terms of the GNU GPL, version 2 or later.
  * See the COPYING file in the top-level directory.
  */
+
 #include "qemu/osdep.h"
 #include "qemu/module.h"
+#include "qapi/error.h"
+#include "migration.h"
+#include "multifd.h"
+#include "qpl/qpl.h"
+
+typedef struct {
+    qpl_job **job_array;
+    /* the number of allocated jobs */
+    uint32_t total_job_num;
+    /* compressed data buffer */
+    uint8_t *zbuf;
+    /* the length of compressed data */
+    uint32_t *zbuf_hdr;
+    /* the status of IAA device */
+    bool iaa_avail;
+} QplData;
+
+/**
+ * check_iaa_avail: check if IAA device is available
+ *
+ * If the system does not have an IAA device, the IAA device is
+ * not enabled or the IAA work queue is not configured as a shared
+ * mode, the QPL hardware path initialization will fail.
+ *
+ * Returns true if IAA device is available, otherwise false.
+ */
+static bool check_iaa_avail(void)
+{
+    qpl_job *job = NULL;
+    uint32_t job_size = 0;
+    qpl_path_t path = qpl_path_hardware;
+
+    if (qpl_get_job_size(path, &job_size) != QPL_STS_OK) {
+        return false;
+    }
+    job = g_malloc0(job_size);
+    if (qpl_init_job(path, job) != QPL_STS_OK) {
+        g_free(job);
+        return false;
+    }
+    g_free(job);
+    return true;
+}
+
+/**
+ * multifd_qpl_free_jobs: cleanup jobs
+ *
+ * Free all job resources.
+ *
+ * @qpl: pointer to the QplData structure
+ */
+static void multifd_qpl_free_jobs(QplData *qpl)
+{
+    assert(qpl != NULL);
+    for (int i = 0; i < qpl->total_job_num; i++) {
+        qpl_fini_job(qpl->job_array[i]);
+        g_free(qpl->job_array[i]);
+        qpl->job_array[i] = NULL;
+    }
+    g_free(qpl->job_array);
+    qpl->job_array = NULL;
+}
+
+/**
+ * multifd_qpl_init_jobs: initialize jobs
+ *
+ * Initialize all jobs
+ *
+ * @qpl: pointer to the QplData structure
+ * @chan_id: multifd channel number
+ * @errp: pointer to an error
+ */
+static int multifd_qpl_init_jobs(QplData *qpl, uint8_t chan_id, Error **errp)
+{
+    qpl_path_t path;
+    qpl_status status;
+    uint32_t job_size = 0;
+    qpl_job *job = NULL;
+
+    path = qpl->iaa_avail ? qpl_path_hardware : qpl_path_software;
+    status = qpl_get_job_size(path, &job_size);
+    if (status != QPL_STS_OK) {
+        error_setg(errp, "multifd: %u: qpl_get_job_size failed with error %d",
+                   chan_id, status);
+        return -1;
+    }
+    qpl->job_array = g_new0(qpl_job *, qpl->total_job_num);
+    for (int i = 0; i < qpl->total_job_num; i++) {
+        job = g_malloc0(job_size);
+        status = qpl_init_job(path, job);
+        if (status != QPL_STS_OK) {
+            error_setg(errp, "multifd: %u: qpl_init_job failed with error %d",
+                       chan_id, status);
+            multifd_qpl_free_jobs(qpl);
+            return -1;
+        }
+        qpl->job_array[i] = job;
+    }
+    return 0;
+}
+
+/**
+ * multifd_qpl_init: initialize QplData structure
+ *
+ * Allocate and initialize a QplData structure
+ *
+ * Returns QplData pointer for success or NULL for error
+ *
+ * @job_num: pointer to the QplData structure
+ * @job_size: the buffer size of the job
+ * @chan_id: multifd channel number
+ * @errp: pointer to an error
+ */
+static QplData *multifd_qpl_init(uint32_t job_num, uint32_t job_size,
+                                 uint8_t chan_id, Error **errp)
+{
+    QplData *qpl;
+
+    qpl = g_new0(QplData, 1);
+    qpl->total_job_num = job_num;
+    qpl->iaa_avail = check_iaa_avail();
+    if (multifd_qpl_init_jobs(qpl, chan_id, errp) != 0) {
+        g_free(qpl);
+        return NULL;
+    }
+    qpl->zbuf = g_malloc0(job_size * job_num);
+    qpl->zbuf_hdr = g_new0(uint32_t, job_num);
+    return qpl;
+}
+
+/**
+ * multifd_qpl_deinit: cleanup QplData structure
+ *
+ * Free jobs, comprssed buffers and QplData structure
+ *
+ * @qpl: pointer to the QplData structure
+ */
+static void multifd_qpl_deinit(QplData *qpl)
+{
+    if (qpl != NULL) {
+        multifd_qpl_free_jobs(qpl);
+        g_free(qpl->zbuf_hdr);
+        g_free(qpl->zbuf);
+        g_free(qpl);
+    }
+}
+
+/**
+ * multifd_qpl_send_setup: setup send side
+ *
+ * Setup each channel with QPL compression.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @errp: pointer to an error
+ */
+static int multifd_qpl_send_setup(MultiFDSendParams *p, Error **errp)
+{
+    QplData *qpl;
+
+    qpl = multifd_qpl_init(p->page_count, p->page_size, p->id, errp);
+    if (!qpl) {
+        return -1;
+    }
+    p->compress_data = qpl;
+
+    /*
+     * Each page will be compressed independently and sent using an IOV. The
+     * additional two IOVs are used to store packet header and compressed data
+     * length
+     */
+    p->iov = g_new0(struct iovec, p->page_count + 2);
+    return 0;
+}
+
+/**
+ * multifd_qpl_send_cleanup: cleanup send side
+ *
+ * Close the channel and return memory.
+ *
+ * @p: Params for the channel that we are using
+ * @errp: pointer to an error
+ */
+static void multifd_qpl_send_cleanup(MultiFDSendParams *p, Error **errp)
+{
+    multifd_qpl_deinit(p->compress_data);
+    p->compress_data = NULL;
+    g_free(p->iov);
+    p->iov = NULL;
+}
+
+/**
+ * multifd_qpl_send_prepare: prepare data to be able to send
+ *
+ * Create a compressed buffer with all the pages that we are going to
+ * send.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @errp: pointer to an error
+ */
+static int multifd_qpl_send_prepare(MultiFDSendParams *p, Error **errp)
+{
+    /* Implement in next patch */
+    return -1;
+}
+
+/**
+ * multifd_qpl_recv_setup: setup receive side
+ *
+ * Create the compressed channel and buffer.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @errp: pointer to an error
+ */
+static int multifd_qpl_recv_setup(MultiFDRecvParams *p, Error **errp)
+{
+    QplData *qpl;
+
+    qpl = multifd_qpl_init(p->page_count, p->page_size, p->id, errp);
+    if (!qpl) {
+        return -1;
+    }
+    p->compress_data = qpl;
+    return 0;
+}
+
+/**
+ * multifd_qpl_recv_cleanup: setup receive side
+ *
+ * Close the channel and return memory.
+ *
+ * @p: Params for the channel that we are using
+ */
+static void multifd_qpl_recv_cleanup(MultiFDRecvParams *p)
+{
+    multifd_qpl_deinit(p->compress_data);
+    p->compress_data = NULL;
+}
+
+/**
+ * multifd_qpl_recv: read the data from the channel into actual pages
+ *
+ * Read the compressed buffer, and uncompress it into the actual
+ * pages.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @errp: pointer to an error
+ */
+static int multifd_qpl_recv(MultiFDRecvParams *p, Error **errp)
+{
+    /* Implement in next patch */
+    return -1;
+}
+
+static MultiFDMethods multifd_qpl_ops = {
+    .send_setup = multifd_qpl_send_setup,
+    .send_cleanup = multifd_qpl_send_cleanup,
+    .send_prepare = multifd_qpl_send_prepare,
+    .recv_setup = multifd_qpl_recv_setup,
+    .recv_cleanup = multifd_qpl_recv_cleanup,
+    .recv = multifd_qpl_recv,
+};
 
 static void multifd_qpl_register(void)
 {
-    /* noop */
+    multifd_register_ops(MULTIFD_COMPRESSION_QPL, &multifd_qpl_ops);
 }
 
 migration_init(multifd_qpl_register);
-- 
2.39.3



^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH v6 6/7] migration/multifd: implement qpl compression and decompression
  2024-05-05 16:57 [PATCH v6 0/7] Live Migration With IAA Yuan Liu
                   ` (4 preceding siblings ...)
  2024-05-05 16:57 ` [PATCH v6 5/7] migration/multifd: implement initialization of qpl compression Yuan Liu
@ 2024-05-05 16:57 ` Yuan Liu
  2024-05-13 15:13   ` Fabiano Rosas
  2024-05-05 16:57 ` [PATCH v6 7/7] tests/migration-test: add qpl compression test Yuan Liu
  6 siblings, 1 reply; 23+ messages in thread
From: Yuan Liu @ 2024-05-05 16:57 UTC (permalink / raw)
  To: peterx, farosas; +Cc: qemu-devel, yuan1.liu, nanhai.zou

each qpl job is used to (de)compress a normal page and it can
be processed independently by the IAA hardware. All qpl jobs
are submitted to the hardware at once, and wait for all jobs
completion. If hardware path(IAA) is not available, use software
for compression and decompression.

Signed-off-by: Yuan Liu <yuan1.liu@intel.com>
Reviewed-by: Nanhai Zou <nanhai.zou@intel.com>
---
 migration/multifd-qpl.c | 284 +++++++++++++++++++++++++++++++++++++++-
 1 file changed, 280 insertions(+), 4 deletions(-)

diff --git a/migration/multifd-qpl.c b/migration/multifd-qpl.c
index 89fa51091a..9a1fddbdd0 100644
--- a/migration/multifd-qpl.c
+++ b/migration/multifd-qpl.c
@@ -13,6 +13,7 @@
 #include "qemu/osdep.h"
 #include "qemu/module.h"
 #include "qapi/error.h"
+#include "exec/ramblock.h"
 #include "migration.h"
 #include "multifd.h"
 #include "qpl/qpl.h"
@@ -204,6 +205,139 @@ static void multifd_qpl_send_cleanup(MultiFDSendParams *p, Error **errp)
     p->iov = NULL;
 }
 
+/**
+ * multifd_qpl_prepare_job: prepare a compression or decompression job
+ *
+ * Prepare a compression or decompression job and configure job attributes
+ * including job compression level and flags.
+ *
+ * @job: pointer to the QplData structure
+ * @is_compression: compression or decompression indication
+ * @input: pointer to the input data buffer
+ * @input_len: the length of the input data
+ * @output: pointer to the output data buffer
+ * @output_len: the size of the output data buffer
+ */
+static void multifd_qpl_prepare_job(qpl_job *job, bool is_compression,
+                                    uint8_t *input, uint32_t input_len,
+                                    uint8_t *output, uint32_t output_len)
+{
+    job->op = is_compression ? qpl_op_compress : qpl_op_decompress;
+    job->next_in_ptr = input;
+    job->next_out_ptr = output;
+    job->available_in = input_len;
+    job->available_out = output_len;
+    job->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST | QPL_FLAG_OMIT_VERIFY;
+    /* only supports one compression level */
+    job->level = 1;
+}
+
+/**
+ * multifd_qpl_build_packet: build a qpl compressed data packet
+ *
+ * The qpl compressed data packet consists of two parts, one part stores
+ * the compressed length of each page, and the other part is the compressed
+ * data of each page. The zbuf_hdr stores the compressed length of all pages,
+ * and use a separate IOV to store the compressed data of each page.
+ *
+ * @qpl: pointer to the QplData structure
+ * @p: Params for the channel that we are using
+ * @idx: The index of the compressed length array
+ * @addr: pointer to the compressed data
+ * @len: The length of the compressed data
+ */
+static void multifd_qpl_build_packet(QplData *qpl, MultiFDSendParams *p,
+                                     uint32_t idx, uint8_t *addr, uint32_t len)
+{
+    qpl->zbuf_hdr[idx] = cpu_to_be32(len);
+    p->iov[p->iovs_num].iov_base = addr;
+    p->iov[p->iovs_num].iov_len = len;
+    p->iovs_num++;
+    p->next_packet_size += len;
+}
+
+/**
+ * multifd_qpl_compress_pages: compress normal pages
+ *
+ * Each normal page will be compressed independently, and the compression jobs
+ * will be submitted to the IAA hardware in non-blocking mode, waiting for all
+ * jobs to be completed and filling the compressed length and data into the
+ * sending IOVs. If IAA device is not available, the software path is used.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @errp: pointer to an error
+ */
+static int multifd_qpl_compress_pages(MultiFDSendParams *p, Error **errp)
+{
+    qpl_status status;
+    QplData *qpl = p->compress_data;
+    MultiFDPages_t *pages = p->pages;
+    uint8_t *zbuf = qpl->zbuf;
+    uint8_t *host = pages->block->host;
+    uint32_t job_num = pages->normal_num;
+    qpl_job *job = NULL;
+
+    assert(job_num <= qpl->total_job_num);
+    /* submit all compression jobs */
+    for (int i = 0; i < job_num; i++) {
+        job = qpl->job_array[i];
+        multifd_qpl_prepare_job(job, true, host + pages->offset[i],
+                                p->page_size, zbuf, p->page_size - 1);
+        /* if hardware path(IAA) is unavailable, call the software path */
+        if (!qpl->iaa_avail) {
+            status = qpl_execute_job(job);
+            if (status == QPL_STS_OK) {
+                multifd_qpl_build_packet(qpl, p, i, zbuf, job->total_out);
+            } else if (status == QPL_STS_MORE_OUTPUT_NEEDED) {
+                /* compressed length exceeds page size, send page directly */
+                multifd_qpl_build_packet(qpl, p, i, host + pages->offset[i],
+                                         p->page_size);
+            } else {
+                error_setg(errp, "multifd %u: qpl_execute_job error %d",
+                           p->id, status);
+                return -1;
+            }
+            zbuf += p->page_size;
+            continue;
+        }
+retry:
+        status = qpl_submit_job(job);
+        if (status == QPL_STS_OK) {
+            zbuf += p->page_size;
+        } else if (status == QPL_STS_QUEUES_ARE_BUSY_ERR) {
+            goto retry;
+        } else {
+            error_setg(errp, "multifd %u: qpl_submit_job failed with error %d",
+                       p->id, status);
+            return -1;
+        }
+    }
+    if (!qpl->iaa_avail) {
+        goto done;
+    }
+    /* wait all jobs to complete for hardware(IAA) path */
+    for (int i = 0; i < job_num; i++) {
+        job = qpl->job_array[i];
+        status = qpl_wait_job(job);
+        if (status == QPL_STS_OK) {
+            multifd_qpl_build_packet(qpl, p, i, qpl->zbuf + (p->page_size * i),
+                                     job->total_out);
+        } else if (status == QPL_STS_MORE_OUTPUT_NEEDED) {
+            /* compressed data length exceeds page size, send page directly */
+            multifd_qpl_build_packet(qpl, p, i, host + pages->offset[i],
+                                     p->page_size);
+        } else {
+            error_setg(errp, "multifd %u: qpl_wait_job failed with error %d",
+                       p->id, status);
+            return -1;
+        }
+    }
+done:
+    return 0;
+}
+
 /**
  * multifd_qpl_send_prepare: prepare data to be able to send
  *
@@ -217,8 +351,28 @@ static void multifd_qpl_send_cleanup(MultiFDSendParams *p, Error **errp)
  */
 static int multifd_qpl_send_prepare(MultiFDSendParams *p, Error **errp)
 {
-    /* Implement in next patch */
-    return -1;
+    QplData *qpl = p->compress_data;
+    uint32_t hdr_size;
+
+    if (!multifd_send_prepare_common(p)) {
+        goto out;
+    }
+
+    assert(p->pages->normal_num <= qpl->total_job_num);
+    hdr_size = p->pages->normal_num * sizeof(uint32_t);
+    /* prepare the header that stores the lengths of all compressed data */
+    p->iov[1].iov_base = (uint8_t *) qpl->zbuf_hdr;
+    p->iov[1].iov_len = hdr_size;
+    p->iovs_num++;
+    p->next_packet_size += hdr_size;
+    if (multifd_qpl_compress_pages(p, errp) != 0) {
+        return -1;
+    }
+
+out:
+    p->flags |= MULTIFD_FLAG_QPL;
+    multifd_send_fill_packet(p);
+    return 0;
 }
 
 /**
@@ -256,6 +410,88 @@ static void multifd_qpl_recv_cleanup(MultiFDRecvParams *p)
     p->compress_data = NULL;
 }
 
+/**
+ * multifd_qpl_decompress_pages: decompress normal pages
+ *
+ * Each compressed page will be decompressed independently, and the
+ * decompression jobs will be submitted to the IAA hardware in non-blocking
+ * mode, waiting for all jobs to be completed and loading the decompressed
+ * data into guest memory. If IAA device is not available, the software path
+ * is used.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @errp: pointer to an error
+ */
+static int multifd_qpl_decompress_pages(MultiFDRecvParams *p, Error **errp)
+{
+    qpl_status status;
+    qpl_job *job;
+    QplData *qpl = p->compress_data;
+    uint32_t job_num = p->normal_num;
+    uint32_t off = 0;
+
+    assert(job_num <= qpl->total_job_num);
+    /* submit all decompression jobs */
+    for (int i = 0; i < job_num; i++) {
+        /* if the data size is the same as the page size, load it directly */
+        if (qpl->zbuf_hdr[i] == p->page_size) {
+            memcpy(p->host + p->normal[i], qpl->zbuf + off, p->page_size);
+            off += p->page_size;
+            continue;
+        }
+        job = qpl->job_array[i];
+        multifd_qpl_prepare_job(job, false, qpl->zbuf + off, qpl->zbuf_hdr[i],
+                                p->host + p->normal[i], p->page_size);
+        /* if hardware path(IAA) is unavailable, call the software path */
+        if (!qpl->iaa_avail) {
+            status = qpl_execute_job(job);
+            if (status == QPL_STS_OK) {
+                off += qpl->zbuf_hdr[i];
+                continue;
+            }
+            error_setg(errp, "multifd %u: qpl_execute_job failed with error %d",
+                       p->id, status);
+            return -1;
+        }
+retry:
+        status = qpl_submit_job(job);
+        if (status == QPL_STS_OK) {
+            off += qpl->zbuf_hdr[i];
+        } else if (status == QPL_STS_QUEUES_ARE_BUSY_ERR) {
+            goto retry;
+        } else {
+            error_setg(errp, "multifd %u: qpl_submit_job failed with error %d",
+                       p->id, status);
+            return -1;
+        }
+    }
+    if (!qpl->iaa_avail) {
+        goto done;
+    }
+    /* wait all jobs to complete for hardware(IAA) path */
+    for (int i = 0; i < job_num; i++) {
+        if (qpl->zbuf_hdr[i] == p->page_size) {
+            continue;
+        }
+        job = qpl->job_array[i];
+        status = qpl_wait_job(job);
+        if (status != QPL_STS_OK) {
+            error_setg(errp, "multifd %u: qpl_wait_job failed with error %d",
+                       p->id, status);
+            return -1;
+        }
+        if (job->total_out != p->page_size) {
+            error_setg(errp, "multifd %u: decompressed len %u, expected len %u",
+                       p->id, job->total_out, p->page_size);
+            return -1;
+        }
+    }
+done:
+    return 0;
+}
+
 /**
  * multifd_qpl_recv: read the data from the channel into actual pages
  *
@@ -269,8 +505,48 @@ static void multifd_qpl_recv_cleanup(MultiFDRecvParams *p)
  */
 static int multifd_qpl_recv(MultiFDRecvParams *p, Error **errp)
 {
-    /* Implement in next patch */
-    return -1;
+    QplData *qpl = p->compress_data;
+    uint32_t in_size = p->next_packet_size;
+    uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
+    uint32_t hdr_len = p->normal_num * sizeof(uint32_t);
+    uint32_t data_len = 0;
+    int ret;
+
+    if (flags != MULTIFD_FLAG_QPL) {
+        error_setg(errp, "multifd %u: flags received %x flags expected %x",
+                   p->id, flags, MULTIFD_FLAG_QPL);
+        return -1;
+    }
+    multifd_recv_zero_page_process(p);
+    if (!p->normal_num) {
+        assert(in_size == 0);
+        return 0;
+    }
+
+    /* read compressed data lengths */
+    assert(hdr_len < in_size);
+    ret = qio_channel_read_all(p->c, (void *) qpl->zbuf_hdr, hdr_len, errp);
+    if (ret != 0) {
+        return ret;
+    }
+    assert(p->normal_num <= qpl->total_job_num);
+    for (int i = 0; i < p->normal_num; i++) {
+        qpl->zbuf_hdr[i] = be32_to_cpu(qpl->zbuf_hdr[i]);
+        data_len += qpl->zbuf_hdr[i];
+        assert(qpl->zbuf_hdr[i] <= p->page_size);
+    }
+
+    /* read compressed data */
+    assert(in_size == hdr_len + data_len);
+    ret = qio_channel_read_all(p->c, (void *) qpl->zbuf, data_len, errp);
+    if (ret != 0) {
+        return ret;
+    }
+
+    if (multifd_qpl_decompress_pages(p, errp) != 0) {
+        return -1;
+    }
+    return 0;
 }
 
 static MultiFDMethods multifd_qpl_ops = {
-- 
2.39.3



^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH v6 7/7] tests/migration-test: add qpl compression test
  2024-05-05 16:57 [PATCH v6 0/7] Live Migration With IAA Yuan Liu
                   ` (5 preceding siblings ...)
  2024-05-05 16:57 ` [PATCH v6 6/7] migration/multifd: implement qpl compression and decompression Yuan Liu
@ 2024-05-05 16:57 ` Yuan Liu
  2024-05-27 20:56   ` Peter Xu
  6 siblings, 1 reply; 23+ messages in thread
From: Yuan Liu @ 2024-05-05 16:57 UTC (permalink / raw)
  To: peterx, farosas; +Cc: qemu-devel, yuan1.liu, nanhai.zou

add qpl to compression method test for multifd migration

the qpl compression supports software path and hardware
path(IAA device), and the hardware path is used first by
default. If the hardware path is unavailable, it will
automatically fallback to the software path for testing.

Signed-off-by: Yuan Liu <yuan1.liu@intel.com>
Reviewed-by: Nanhai Zou <nanhai.zou@intel.com>
---
 tests/qtest/migration-test.c | 24 ++++++++++++++++++++++++
 1 file changed, 24 insertions(+)

diff --git a/tests/qtest/migration-test.c b/tests/qtest/migration-test.c
index 5d6d8cd634..0f75ed7c49 100644
--- a/tests/qtest/migration-test.c
+++ b/tests/qtest/migration-test.c
@@ -2777,6 +2777,15 @@ test_migrate_precopy_tcp_multifd_zstd_start(QTestState *from,
 }
 #endif /* CONFIG_ZSTD */
 
+#ifdef CONFIG_QPL
+static void *
+test_migrate_precopy_tcp_multifd_qpl_start(QTestState *from,
+                                            QTestState *to)
+{
+    return test_migrate_precopy_tcp_multifd_start_common(from, to, "qpl");
+}
+#endif /* CONFIG_QPL */
+
 static void test_multifd_tcp_uri_none(void)
 {
     MigrateCommon args = {
@@ -2857,6 +2866,17 @@ static void test_multifd_tcp_zstd(void)
 }
 #endif
 
+#ifdef CONFIG_QPL
+static void test_multifd_tcp_qpl(void)
+{
+    MigrateCommon args = {
+        .listen_uri = "defer",
+        .start_hook = test_migrate_precopy_tcp_multifd_qpl_start,
+    };
+    test_precopy_common(&args);
+}
+#endif
+
 #ifdef CONFIG_GNUTLS
 static void *
 test_migrate_multifd_tcp_tls_psk_start_match(QTestState *from,
@@ -3760,6 +3780,10 @@ int main(int argc, char **argv)
     migration_test_add("/migration/multifd/tcp/plain/zstd",
                        test_multifd_tcp_zstd);
 #endif
+#ifdef CONFIG_QPL
+    migration_test_add("/migration/multifd/tcp/plain/qpl",
+                       test_multifd_tcp_qpl);
+#endif
 #ifdef CONFIG_GNUTLS
     migration_test_add("/migration/multifd/tcp/tls/psk/match",
                        test_multifd_tcp_tls_psk_match);
-- 
2.39.3



^ permalink raw reply related	[flat|nested] 23+ messages in thread

* Re: [PATCH v6 4/7] migration/multifd: add qpl compression method
  2024-05-05 16:57 ` [PATCH v6 4/7] migration/multifd: add qpl compression method Yuan Liu
@ 2024-05-10 14:12   ` Fabiano Rosas
  2024-05-10 14:23     ` Liu, Yuan1
  0 siblings, 1 reply; 23+ messages in thread
From: Fabiano Rosas @ 2024-05-10 14:12 UTC (permalink / raw)
  To: Yuan Liu, peterx; +Cc: qemu-devel, yuan1.liu, nanhai.zou

Yuan Liu <yuan1.liu@intel.com> writes:

> add the Query Processing Library (QPL) compression method
>
> Introduce the qpl as a new multifd migration compression method, it can
> use In-Memory Analytics Accelerator(IAA) to accelerate compression and
> decompression, which can not only reduce network bandwidth requirement
> but also reduce host compression and decompression CPU overhead.
>
> How to enable qpl compression during migration:
> migrate_set_parameter multifd-compression qpl
>
> The qpl method only supports one compression level, there is no qpl
> compression level parameter added, users do not need to specify the
> qpl compression level.
>
> Signed-off-by: Yuan Liu <yuan1.liu@intel.com>
> Reviewed-by: Nanhai Zou <nanhai.zou@intel.com>

There's an r-b from Peter that you forgot to bring along in this version
of the series.



^ permalink raw reply	[flat|nested] 23+ messages in thread

* RE: [PATCH v6 4/7] migration/multifd: add qpl compression method
  2024-05-10 14:12   ` Fabiano Rosas
@ 2024-05-10 14:23     ` Liu, Yuan1
  0 siblings, 0 replies; 23+ messages in thread
From: Liu, Yuan1 @ 2024-05-10 14:23 UTC (permalink / raw)
  To: Fabiano Rosas, peterx; +Cc: qemu-devel, Zou, Nanhai

> -----Original Message-----
> From: Fabiano Rosas <farosas@suse.de>
> Sent: Friday, May 10, 2024 10:12 PM
> To: Liu, Yuan1 <yuan1.liu@intel.com>; peterx@redhat.com
> Cc: qemu-devel@nongnu.org; Liu, Yuan1 <yuan1.liu@intel.com>; Zou, Nanhai
> <nanhai.zou@intel.com>
> Subject: Re: [PATCH v6 4/7] migration/multifd: add qpl compression method
> 
> Yuan Liu <yuan1.liu@intel.com> writes:
> 
> > add the Query Processing Library (QPL) compression method
> >
> > Introduce the qpl as a new multifd migration compression method, it can
> > use In-Memory Analytics Accelerator(IAA) to accelerate compression and
> > decompression, which can not only reduce network bandwidth requirement
> > but also reduce host compression and decompression CPU overhead.
> >
> > How to enable qpl compression during migration:
> > migrate_set_parameter multifd-compression qpl
> >
> > The qpl method only supports one compression level, there is no qpl
> > compression level parameter added, users do not need to specify the
> > qpl compression level.
> >
> > Signed-off-by: Yuan Liu <yuan1.liu@intel.com>
> > Reviewed-by: Nanhai Zou <nanhai.zou@intel.com>
> 
> There's an r-b from Peter that you forgot to bring along in this version
> of the series.

Yes, this patch has received r-b from Peter Xu in the previous version.
Sorry, I forgot this, I will add it next time.
Thank you very much for the reminder.



^ permalink raw reply	[flat|nested] 23+ messages in thread

* Re: [PATCH v6 2/7] migration/multifd: put IOV initialization into compression method
  2024-05-05 16:57 ` [PATCH v6 2/7] migration/multifd: put IOV initialization into compression method Yuan Liu
@ 2024-05-10 20:21   ` Fabiano Rosas
  2024-05-27 20:50   ` Peter Xu
  1 sibling, 0 replies; 23+ messages in thread
From: Fabiano Rosas @ 2024-05-10 20:21 UTC (permalink / raw)
  To: Yuan Liu, peterx; +Cc: qemu-devel, yuan1.liu, nanhai.zou

Yuan Liu <yuan1.liu@intel.com> writes:

> Different compression methods may require different numbers of IOVs.
> Based on streaming compression of zlib and zstd, all pages will be
> compressed to a data block, so two IOVs are needed for packet header
> and compressed data block.
>
> Signed-off-by: Yuan Liu <yuan1.liu@intel.com>
> Reviewed-by: Nanhai Zou <nanhai.zou@intel.com>

Reviewed-by: Fabiano Rosas <farosas@suse.de>


^ permalink raw reply	[flat|nested] 23+ messages in thread

* Re: [PATCH v6 5/7] migration/multifd: implement initialization of qpl compression
  2024-05-05 16:57 ` [PATCH v6 5/7] migration/multifd: implement initialization of qpl compression Yuan Liu
@ 2024-05-10 20:45   ` Fabiano Rosas
  2024-05-11 12:55     ` Liu, Yuan1
  2024-05-10 20:52   ` Fabiano Rosas
  1 sibling, 1 reply; 23+ messages in thread
From: Fabiano Rosas @ 2024-05-10 20:45 UTC (permalink / raw)
  To: Yuan Liu, peterx; +Cc: qemu-devel, yuan1.liu, nanhai.zou

Yuan Liu <yuan1.liu@intel.com> writes:

> the qpl initialization includes memory allocation for compressed
> data and the qpl job initialization.
>
> the qpl job initialization will check if the In-Memory Analytics
> Accelerator(IAA) device is available and use the IAA device first.
> If the platform does not have IAA device or the IAA device is not
> available, the qpl compression will fallback to the software path.
>
> Signed-off-by: Yuan Liu <yuan1.liu@intel.com>
> Reviewed-by: Nanhai Zou <nanhai.zou@intel.com>

Looks good, just some nits below.

> ---
>  migration/multifd-qpl.c | 272 +++++++++++++++++++++++++++++++++++++++-
>  1 file changed, 271 insertions(+), 1 deletion(-)
>
> diff --git a/migration/multifd-qpl.c b/migration/multifd-qpl.c
> index 056a68a060..89fa51091a 100644
> --- a/migration/multifd-qpl.c
> +++ b/migration/multifd-qpl.c
> @@ -9,12 +9,282 @@
>   * This work is licensed under the terms of the GNU GPL, version 2 or later.
>   * See the COPYING file in the top-level directory.
>   */
> +
>  #include "qemu/osdep.h"
>  #include "qemu/module.h"
> +#include "qapi/error.h"
> +#include "migration.h"
> +#include "multifd.h"
> +#include "qpl/qpl.h"
> +
> +typedef struct {
> +    qpl_job **job_array;
> +    /* the number of allocated jobs */
> +    uint32_t total_job_num;
> +    /* compressed data buffer */
> +    uint8_t *zbuf;
> +    /* the length of compressed data */

array of lenghts

> +    uint32_t *zbuf_hdr;

Why the _hdr suffix if the lengths are the only data stored here?

> +    /* the status of IAA device */
> +    bool iaa_avail;
> +} QplData;
> +
> +/**
> + * check_iaa_avail: check if IAA device is available
> + *
> + * If the system does not have an IAA device, the IAA device is
> + * not enabled or the IAA work queue is not configured as a shared
> + * mode, the QPL hardware path initialization will fail.
> + *
> + * Returns true if IAA device is available, otherwise false.
> + */
> +static bool check_iaa_avail(void)
> +{
> +    qpl_job *job = NULL;
> +    uint32_t job_size = 0;
> +    qpl_path_t path = qpl_path_hardware;
> +
> +    if (qpl_get_job_size(path, &job_size) != QPL_STS_OK) {
> +        return false;
> +    }
> +    job = g_malloc0(job_size);
> +    if (qpl_init_job(path, job) != QPL_STS_OK) {
> +        g_free(job);
> +        return false;
> +    }
> +    g_free(job);
> +    return true;
> +}
> +
> +/**
> + * multifd_qpl_free_jobs: cleanup jobs
> + *
> + * Free all job resources.
> + *
> + * @qpl: pointer to the QplData structure
> + */
> +static void multifd_qpl_free_jobs(QplData *qpl)
> +{
> +    assert(qpl != NULL);
> +    for (int i = 0; i < qpl->total_job_num; i++) {
> +        qpl_fini_job(qpl->job_array[i]);
> +        g_free(qpl->job_array[i]);
> +        qpl->job_array[i] = NULL;
> +    }
> +    g_free(qpl->job_array);
> +    qpl->job_array = NULL;
> +}
> +
> +/**
> + * multifd_qpl_init_jobs: initialize jobs
> + *
> + * Initialize all jobs
> + *
> + * @qpl: pointer to the QplData structure
> + * @chan_id: multifd channel number
> + * @errp: pointer to an error
> + */
> +static int multifd_qpl_init_jobs(QplData *qpl, uint8_t chan_id, Error **errp)
> +{
> +    qpl_path_t path;
> +    qpl_status status;
> +    uint32_t job_size = 0;
> +    qpl_job *job = NULL;
> +
> +    path = qpl->iaa_avail ? qpl_path_hardware : qpl_path_software;
> +    status = qpl_get_job_size(path, &job_size);
> +    if (status != QPL_STS_OK) {
> +        error_setg(errp, "multifd: %u: qpl_get_job_size failed with error %d",
> +                   chan_id, status);
> +        return -1;
> +    }
> +    qpl->job_array = g_new0(qpl_job *, qpl->total_job_num);
> +    for (int i = 0; i < qpl->total_job_num; i++) {
> +        job = g_malloc0(job_size);
> +        status = qpl_init_job(path, job);
> +        if (status != QPL_STS_OK) {
> +            error_setg(errp, "multifd: %u: qpl_init_job failed with error %d",
> +                       chan_id, status);
> +            multifd_qpl_free_jobs(qpl);
> +            return -1;
> +        }
> +        qpl->job_array[i] = job;
> +    }
> +    return 0;
> +}
> +
> +/**
> + * multifd_qpl_init: initialize QplData structure
> + *
> + * Allocate and initialize a QplData structure
> + *
> + * Returns QplData pointer for success or NULL for error
> + *
> + * @job_num: pointer to the QplData structure

This needs updating^

> + * @job_size: the buffer size of the job
> + * @chan_id: multifd channel number
> + * @errp: pointer to an error
> + */
> +static QplData *multifd_qpl_init(uint32_t job_num, uint32_t job_size,
> +                                 uint8_t chan_id, Error **errp)
> +{
> +    QplData *qpl;
> +
> +    qpl = g_new0(QplData, 1);
> +    qpl->total_job_num = job_num;
> +    qpl->iaa_avail = check_iaa_avail();
> +    if (multifd_qpl_init_jobs(qpl, chan_id, errp) != 0) {
> +        g_free(qpl);
> +        return NULL;
> +    }
> +    qpl->zbuf = g_malloc0(job_size * job_num);
> +    qpl->zbuf_hdr = g_new0(uint32_t, job_num);
> +    return qpl;
> +}
> +
> +/**
> + * multifd_qpl_deinit: cleanup QplData structure
> + *
> + * Free jobs, comprssed buffers and QplData structure

compressed

> + *
> + * @qpl: pointer to the QplData structure
> + */
> +static void multifd_qpl_deinit(QplData *qpl)
> +{
> +    if (qpl != NULL) {
> +        multifd_qpl_free_jobs(qpl);
> +        g_free(qpl->zbuf_hdr);
> +        g_free(qpl->zbuf);
> +        g_free(qpl);
> +    }
> +}
> +
> +/**
> + * multifd_qpl_send_setup: setup send side
> + *
> + * Setup each channel with QPL compression.

s/each/the/

> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @errp: pointer to an error
> + */
> +static int multifd_qpl_send_setup(MultiFDSendParams *p, Error **errp)
> +{
> +    QplData *qpl;
> +
> +    qpl = multifd_qpl_init(p->page_count, p->page_size, p->id, errp);
> +    if (!qpl) {
> +        return -1;
> +    }
> +    p->compress_data = qpl;
> +
> +    /*
> +     * Each page will be compressed independently and sent using an IOV. The
> +     * additional two IOVs are used to store packet header and compressed data
> +     * length
> +     */
> +    p->iov = g_new0(struct iovec, p->page_count + 2);
> +    return 0;
> +}
> +
> +/**
> + * multifd_qpl_send_cleanup: cleanup send side
> + *
> + * Close the channel and return memory.
> + *
> + * @p: Params for the channel that we are using
> + * @errp: pointer to an error
> + */
> +static void multifd_qpl_send_cleanup(MultiFDSendParams *p, Error **errp)
> +{
> +    multifd_qpl_deinit(p->compress_data);
> +    p->compress_data = NULL;
> +    g_free(p->iov);
> +    p->iov = NULL;
> +}
> +
> +/**
> + * multifd_qpl_send_prepare: prepare data to be able to send
> + *
> + * Create a compressed buffer with all the pages that we are going to
> + * send.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @errp: pointer to an error
> + */
> +static int multifd_qpl_send_prepare(MultiFDSendParams *p, Error **errp)
> +{
> +    /* Implement in next patch */
> +    return -1;
> +}
> +
> +/**
> + * multifd_qpl_recv_setup: setup receive side
> + *
> + * Create the compressed channel and buffer.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @errp: pointer to an error
> + */
> +static int multifd_qpl_recv_setup(MultiFDRecvParams *p, Error **errp)
> +{
> +    QplData *qpl;
> +
> +    qpl = multifd_qpl_init(p->page_count, p->page_size, p->id, errp);
> +    if (!qpl) {
> +        return -1;
> +    }
> +    p->compress_data = qpl;
> +    return 0;
> +}
> +
> +/**
> + * multifd_qpl_recv_cleanup: setup receive side
> + *
> + * Close the channel and return memory.
> + *
> + * @p: Params for the channel that we are using
> + */
> +static void multifd_qpl_recv_cleanup(MultiFDRecvParams *p)
> +{
> +    multifd_qpl_deinit(p->compress_data);
> +    p->compress_data = NULL;
> +}
> +
> +/**
> + * multifd_qpl_recv: read the data from the channel into actual pages
> + *
> + * Read the compressed buffer, and uncompress it into the actual
> + * pages.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @errp: pointer to an error
> + */
> +static int multifd_qpl_recv(MultiFDRecvParams *p, Error **errp)
> +{
> +    /* Implement in next patch */
> +    return -1;
> +}
> +
> +static MultiFDMethods multifd_qpl_ops = {
> +    .send_setup = multifd_qpl_send_setup,
> +    .send_cleanup = multifd_qpl_send_cleanup,
> +    .send_prepare = multifd_qpl_send_prepare,
> +    .recv_setup = multifd_qpl_recv_setup,
> +    .recv_cleanup = multifd_qpl_recv_cleanup,
> +    .recv = multifd_qpl_recv,
> +};
>  
>  static void multifd_qpl_register(void)
>  {
> -    /* noop */
> +    multifd_register_ops(MULTIFD_COMPRESSION_QPL, &multifd_qpl_ops);
>  }
>  
>  migration_init(multifd_qpl_register);


^ permalink raw reply	[flat|nested] 23+ messages in thread

* Re: [PATCH v6 5/7] migration/multifd: implement initialization of qpl compression
  2024-05-05 16:57 ` [PATCH v6 5/7] migration/multifd: implement initialization of qpl compression Yuan Liu
  2024-05-10 20:45   ` Fabiano Rosas
@ 2024-05-10 20:52   ` Fabiano Rosas
  2024-05-11 12:39     ` Liu, Yuan1
  1 sibling, 1 reply; 23+ messages in thread
From: Fabiano Rosas @ 2024-05-10 20:52 UTC (permalink / raw)
  To: Yuan Liu, peterx; +Cc: qemu-devel, yuan1.liu, nanhai.zou

Yuan Liu <yuan1.liu@intel.com> writes:

> the qpl initialization includes memory allocation for compressed
> data and the qpl job initialization.
>
> the qpl job initialization will check if the In-Memory Analytics
> Accelerator(IAA) device is available and use the IAA device first.
> If the platform does not have IAA device or the IAA device is not
> available, the qpl compression will fallback to the software path.
>
> Signed-off-by: Yuan Liu <yuan1.liu@intel.com>
> Reviewed-by: Nanhai Zou <nanhai.zou@intel.com>
> ---
>  migration/multifd-qpl.c | 272 +++++++++++++++++++++++++++++++++++++++-
>  1 file changed, 271 insertions(+), 1 deletion(-)
>
> diff --git a/migration/multifd-qpl.c b/migration/multifd-qpl.c
> index 056a68a060..89fa51091a 100644
> --- a/migration/multifd-qpl.c
> +++ b/migration/multifd-qpl.c
> @@ -9,12 +9,282 @@
>   * This work is licensed under the terms of the GNU GPL, version 2 or later.
>   * See the COPYING file in the top-level directory.
>   */
> +
>  #include "qemu/osdep.h"
>  #include "qemu/module.h"
> +#include "qapi/error.h"
> +#include "migration.h"

Where is this used? I think you only need qapi/qapi-types-migration.h

> +#include "multifd.h"
> +#include "qpl/qpl.h"
> +
> +typedef struct {
> +    qpl_job **job_array;
> +    /* the number of allocated jobs */
> +    uint32_t total_job_num;
> +    /* compressed data buffer */
> +    uint8_t *zbuf;
> +    /* the length of compressed data */
> +    uint32_t *zbuf_hdr;
> +    /* the status of IAA device */
> +    bool iaa_avail;
> +} QplData;
> +
> +/**
> + * check_iaa_avail: check if IAA device is available
> + *
> + * If the system does not have an IAA device, the IAA device is
> + * not enabled or the IAA work queue is not configured as a shared
> + * mode, the QPL hardware path initialization will fail.
> + *
> + * Returns true if IAA device is available, otherwise false.
> + */
> +static bool check_iaa_avail(void)
> +{
> +    qpl_job *job = NULL;
> +    uint32_t job_size = 0;
> +    qpl_path_t path = qpl_path_hardware;
> +
> +    if (qpl_get_job_size(path, &job_size) != QPL_STS_OK) {
> +        return false;
> +    }
> +    job = g_malloc0(job_size);
> +    if (qpl_init_job(path, job) != QPL_STS_OK) {
> +        g_free(job);
> +        return false;
> +    }
> +    g_free(job);
> +    return true;
> +}
> +
> +/**
> + * multifd_qpl_free_jobs: cleanup jobs
> + *
> + * Free all job resources.
> + *
> + * @qpl: pointer to the QplData structure
> + */
> +static void multifd_qpl_free_jobs(QplData *qpl)
> +{
> +    assert(qpl != NULL);
> +    for (int i = 0; i < qpl->total_job_num; i++) {
> +        qpl_fini_job(qpl->job_array[i]);
> +        g_free(qpl->job_array[i]);
> +        qpl->job_array[i] = NULL;
> +    }
> +    g_free(qpl->job_array);
> +    qpl->job_array = NULL;
> +}
> +
> +/**
> + * multifd_qpl_init_jobs: initialize jobs
> + *
> + * Initialize all jobs
> + *
> + * @qpl: pointer to the QplData structure
> + * @chan_id: multifd channel number
> + * @errp: pointer to an error
> + */
> +static int multifd_qpl_init_jobs(QplData *qpl, uint8_t chan_id, Error **errp)
> +{
> +    qpl_path_t path;
> +    qpl_status status;
> +    uint32_t job_size = 0;
> +    qpl_job *job = NULL;
> +
> +    path = qpl->iaa_avail ? qpl_path_hardware : qpl_path_software;
> +    status = qpl_get_job_size(path, &job_size);
> +    if (status != QPL_STS_OK) {
> +        error_setg(errp, "multifd: %u: qpl_get_job_size failed with error %d",
> +                   chan_id, status);
> +        return -1;
> +    }
> +    qpl->job_array = g_new0(qpl_job *, qpl->total_job_num);
> +    for (int i = 0; i < qpl->total_job_num; i++) {
> +        job = g_malloc0(job_size);
> +        status = qpl_init_job(path, job);
> +        if (status != QPL_STS_OK) {
> +            error_setg(errp, "multifd: %u: qpl_init_job failed with error %d",
> +                       chan_id, status);
> +            multifd_qpl_free_jobs(qpl);
> +            return -1;
> +        }
> +        qpl->job_array[i] = job;
> +    }
> +    return 0;
> +}
> +
> +/**
> + * multifd_qpl_init: initialize QplData structure
> + *
> + * Allocate and initialize a QplData structure
> + *
> + * Returns QplData pointer for success or NULL for error
> + *
> + * @job_num: pointer to the QplData structure
> + * @job_size: the buffer size of the job
> + * @chan_id: multifd channel number
> + * @errp: pointer to an error
> + */
> +static QplData *multifd_qpl_init(uint32_t job_num, uint32_t job_size,
> +                                 uint8_t chan_id, Error **errp)
> +{
> +    QplData *qpl;
> +
> +    qpl = g_new0(QplData, 1);
> +    qpl->total_job_num = job_num;
> +    qpl->iaa_avail = check_iaa_avail();
> +    if (multifd_qpl_init_jobs(qpl, chan_id, errp) != 0) {
> +        g_free(qpl);
> +        return NULL;
> +    }
> +    qpl->zbuf = g_malloc0(job_size * job_num);
> +    qpl->zbuf_hdr = g_new0(uint32_t, job_num);
> +    return qpl;
> +}
> +
> +/**
> + * multifd_qpl_deinit: cleanup QplData structure
> + *
> + * Free jobs, comprssed buffers and QplData structure
> + *
> + * @qpl: pointer to the QplData structure
> + */
> +static void multifd_qpl_deinit(QplData *qpl)
> +{
> +    if (qpl != NULL) {
> +        multifd_qpl_free_jobs(qpl);
> +        g_free(qpl->zbuf_hdr);
> +        g_free(qpl->zbuf);
> +        g_free(qpl);
> +    }
> +}
> +
> +/**
> + * multifd_qpl_send_setup: setup send side
> + *
> + * Setup each channel with QPL compression.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @errp: pointer to an error
> + */
> +static int multifd_qpl_send_setup(MultiFDSendParams *p, Error **errp)
> +{
> +    QplData *qpl;
> +
> +    qpl = multifd_qpl_init(p->page_count, p->page_size, p->id, errp);
> +    if (!qpl) {
> +        return -1;
> +    }
> +    p->compress_data = qpl;
> +
> +    /*
> +     * Each page will be compressed independently and sent using an IOV. The
> +     * additional two IOVs are used to store packet header and compressed data
> +     * length
> +     */
> +    p->iov = g_new0(struct iovec, p->page_count + 2);
> +    return 0;
> +}
> +
> +/**
> + * multifd_qpl_send_cleanup: cleanup send side
> + *
> + * Close the channel and return memory.
> + *
> + * @p: Params for the channel that we are using
> + * @errp: pointer to an error
> + */
> +static void multifd_qpl_send_cleanup(MultiFDSendParams *p, Error **errp)
> +{
> +    multifd_qpl_deinit(p->compress_data);
> +    p->compress_data = NULL;
> +    g_free(p->iov);
> +    p->iov = NULL;
> +}
> +
> +/**
> + * multifd_qpl_send_prepare: prepare data to be able to send
> + *
> + * Create a compressed buffer with all the pages that we are going to
> + * send.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @errp: pointer to an error
> + */
> +static int multifd_qpl_send_prepare(MultiFDSendParams *p, Error **errp)
> +{
> +    /* Implement in next patch */
> +    return -1;
> +}
> +
> +/**
> + * multifd_qpl_recv_setup: setup receive side
> + *
> + * Create the compressed channel and buffer.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @errp: pointer to an error
> + */
> +static int multifd_qpl_recv_setup(MultiFDRecvParams *p, Error **errp)
> +{
> +    QplData *qpl;
> +
> +    qpl = multifd_qpl_init(p->page_count, p->page_size, p->id, errp);
> +    if (!qpl) {
> +        return -1;
> +    }
> +    p->compress_data = qpl;
> +    return 0;
> +}
> +
> +/**
> + * multifd_qpl_recv_cleanup: setup receive side
> + *
> + * Close the channel and return memory.
> + *
> + * @p: Params for the channel that we are using
> + */
> +static void multifd_qpl_recv_cleanup(MultiFDRecvParams *p)
> +{
> +    multifd_qpl_deinit(p->compress_data);
> +    p->compress_data = NULL;
> +}
> +
> +/**
> + * multifd_qpl_recv: read the data from the channel into actual pages
> + *
> + * Read the compressed buffer, and uncompress it into the actual
> + * pages.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @errp: pointer to an error
> + */
> +static int multifd_qpl_recv(MultiFDRecvParams *p, Error **errp)
> +{
> +    /* Implement in next patch */
> +    return -1;
> +}
> +
> +static MultiFDMethods multifd_qpl_ops = {
> +    .send_setup = multifd_qpl_send_setup,
> +    .send_cleanup = multifd_qpl_send_cleanup,
> +    .send_prepare = multifd_qpl_send_prepare,
> +    .recv_setup = multifd_qpl_recv_setup,
> +    .recv_cleanup = multifd_qpl_recv_cleanup,
> +    .recv = multifd_qpl_recv,
> +};
>  
>  static void multifd_qpl_register(void)
>  {
> -    /* noop */
> +    multifd_register_ops(MULTIFD_COMPRESSION_QPL, &multifd_qpl_ops);
>  }
>  
>  migration_init(multifd_qpl_register);


^ permalink raw reply	[flat|nested] 23+ messages in thread

* RE: [PATCH v6 5/7] migration/multifd: implement initialization of qpl compression
  2024-05-10 20:52   ` Fabiano Rosas
@ 2024-05-11 12:39     ` Liu, Yuan1
  0 siblings, 0 replies; 23+ messages in thread
From: Liu, Yuan1 @ 2024-05-11 12:39 UTC (permalink / raw)
  To: Fabiano Rosas, peterx; +Cc: qemu-devel, Zou, Nanhai

> -----Original Message-----
> From: Fabiano Rosas <farosas@suse.de>
> Sent: Saturday, May 11, 2024 4:53 AM
> To: Liu, Yuan1 <yuan1.liu@intel.com>; peterx@redhat.com
> Cc: qemu-devel@nongnu.org; Liu, Yuan1 <yuan1.liu@intel.com>; Zou, Nanhai
> <nanhai.zou@intel.com>
> Subject: Re: [PATCH v6 5/7] migration/multifd: implement initialization of
> qpl compression
> 
> Yuan Liu <yuan1.liu@intel.com> writes:
> 
> > the qpl initialization includes memory allocation for compressed
> > data and the qpl job initialization.
> >
> > the qpl job initialization will check if the In-Memory Analytics
> > Accelerator(IAA) device is available and use the IAA device first.
> > If the platform does not have IAA device or the IAA device is not
> > available, the qpl compression will fallback to the software path.
> >
> > Signed-off-by: Yuan Liu <yuan1.liu@intel.com>
> > Reviewed-by: Nanhai Zou <nanhai.zou@intel.com>
> > ---
> >  migration/multifd-qpl.c | 272 +++++++++++++++++++++++++++++++++++++++-
> >  1 file changed, 271 insertions(+), 1 deletion(-)
> >
> > diff --git a/migration/multifd-qpl.c b/migration/multifd-qpl.c
> > index 056a68a060..89fa51091a 100644
> > --- a/migration/multifd-qpl.c
> > +++ b/migration/multifd-qpl.c
> > @@ -9,12 +9,282 @@
> >   * This work is licensed under the terms of the GNU GPL, version 2 or
> later.
> >   * See the COPYING file in the top-level directory.
> >   */
> > +
> >  #include "qemu/osdep.h"
> >  #include "qemu/module.h"
> > +#include "qapi/error.h"
> > +#include "migration.h"
> 
> Where is this used? I think you only need qapi/qapi-types-migration.h

Yes, the qapi/qapi-types-migration.h is enough, no need to include migration.h
I will fix this next version.

> > +#include "multifd.h"
> > +#include "qpl/qpl.h"
> > +
> > +typedef struct {
> > +    qpl_job **job_array;
> > +    /* the number of allocated jobs */
> > +    uint32_t total_job_num;
> > +    /* compressed data buffer */
> > +    uint8_t *zbuf;
> > +    /* the length of compressed data */
> > +    uint32_t *zbuf_hdr;
> > +    /* the status of IAA device */
> > +    bool iaa_avail;
> > +} QplData;
> > +
> > +/**
> > + * check_iaa_avail: check if IAA device is available
> > + *
> > + * If the system does not have an IAA device, the IAA device is
> > + * not enabled or the IAA work queue is not configured as a shared
> > + * mode, the QPL hardware path initialization will fail.
> > + *
> > + * Returns true if IAA device is available, otherwise false.
> > + */
> > +static bool check_iaa_avail(void)
> > +{
> > +    qpl_job *job = NULL;
> > +    uint32_t job_size = 0;
> > +    qpl_path_t path = qpl_path_hardware;
> > +
> > +    if (qpl_get_job_size(path, &job_size) != QPL_STS_OK) {
> > +        return false;
> > +    }
> > +    job = g_malloc0(job_size);
> > +    if (qpl_init_job(path, job) != QPL_STS_OK) {
> > +        g_free(job);
> > +        return false;
> > +    }
> > +    g_free(job);
> > +    return true;
> > +}
> > +
> > +/**
> > + * multifd_qpl_free_jobs: cleanup jobs
> > + *
> > + * Free all job resources.
> > + *
> > + * @qpl: pointer to the QplData structure
> > + */
> > +static void multifd_qpl_free_jobs(QplData *qpl)
> > +{
> > +    assert(qpl != NULL);
> > +    for (int i = 0; i < qpl->total_job_num; i++) {
> > +        qpl_fini_job(qpl->job_array[i]);
> > +        g_free(qpl->job_array[i]);
> > +        qpl->job_array[i] = NULL;
> > +    }
> > +    g_free(qpl->job_array);
> > +    qpl->job_array = NULL;
> > +}
> > +
> > +/**
> > + * multifd_qpl_init_jobs: initialize jobs
> > + *
> > + * Initialize all jobs
> > + *
> > + * @qpl: pointer to the QplData structure
> > + * @chan_id: multifd channel number
> > + * @errp: pointer to an error
> > + */
> > +static int multifd_qpl_init_jobs(QplData *qpl, uint8_t chan_id, Error
> **errp)
> > +{
> > +    qpl_path_t path;
> > +    qpl_status status;
> > +    uint32_t job_size = 0;
> > +    qpl_job *job = NULL;
> > +
> > +    path = qpl->iaa_avail ? qpl_path_hardware : qpl_path_software;
> > +    status = qpl_get_job_size(path, &job_size);
> > +    if (status != QPL_STS_OK) {
> > +        error_setg(errp, "multifd: %u: qpl_get_job_size failed with
> error %d",
> > +                   chan_id, status);
> > +        return -1;
> > +    }
> > +    qpl->job_array = g_new0(qpl_job *, qpl->total_job_num);
> > +    for (int i = 0; i < qpl->total_job_num; i++) {
> > +        job = g_malloc0(job_size);
> > +        status = qpl_init_job(path, job);
> > +        if (status != QPL_STS_OK) {
> > +            error_setg(errp, "multifd: %u: qpl_init_job failed with
> error %d",
> > +                       chan_id, status);
> > +            multifd_qpl_free_jobs(qpl);
> > +            return -1;
> > +        }
> > +        qpl->job_array[i] = job;
> > +    }
> > +    return 0;
> > +}
> > +
> > +/**
> > + * multifd_qpl_init: initialize QplData structure
> > + *
> > + * Allocate and initialize a QplData structure
> > + *
> > + * Returns QplData pointer for success or NULL for error
> > + *
> > + * @job_num: pointer to the QplData structure
> > + * @job_size: the buffer size of the job
> > + * @chan_id: multifd channel number
> > + * @errp: pointer to an error
> > + */
> > +static QplData *multifd_qpl_init(uint32_t job_num, uint32_t job_size,
> > +                                 uint8_t chan_id, Error **errp)
> > +{
> > +    QplData *qpl;
> > +
> > +    qpl = g_new0(QplData, 1);
> > +    qpl->total_job_num = job_num;
> > +    qpl->iaa_avail = check_iaa_avail();
> > +    if (multifd_qpl_init_jobs(qpl, chan_id, errp) != 0) {
> > +        g_free(qpl);
> > +        return NULL;
> > +    }
> > +    qpl->zbuf = g_malloc0(job_size * job_num);
> > +    qpl->zbuf_hdr = g_new0(uint32_t, job_num);
> > +    return qpl;
> > +}
> > +
> > +/**
> > + * multifd_qpl_deinit: cleanup QplData structure
> > + *
> > + * Free jobs, comprssed buffers and QplData structure
> > + *
> > + * @qpl: pointer to the QplData structure
> > + */
> > +static void multifd_qpl_deinit(QplData *qpl)
> > +{
> > +    if (qpl != NULL) {
> > +        multifd_qpl_free_jobs(qpl);
> > +        g_free(qpl->zbuf_hdr);
> > +        g_free(qpl->zbuf);
> > +        g_free(qpl);
> > +    }
> > +}
> > +
> > +/**
> > + * multifd_qpl_send_setup: setup send side
> > + *
> > + * Setup each channel with QPL compression.
> > + *
> > + * Returns 0 for success or -1 for error
> > + *
> > + * @p: Params for the channel that we are using
> > + * @errp: pointer to an error
> > + */
> > +static int multifd_qpl_send_setup(MultiFDSendParams *p, Error **errp)
> > +{
> > +    QplData *qpl;
> > +
> > +    qpl = multifd_qpl_init(p->page_count, p->page_size, p->id, errp);
> > +    if (!qpl) {
> > +        return -1;
> > +    }
> > +    p->compress_data = qpl;
> > +
> > +    /*
> > +     * Each page will be compressed independently and sent using an
> IOV. The
> > +     * additional two IOVs are used to store packet header and
> compressed data
> > +     * length
> > +     */
> > +    p->iov = g_new0(struct iovec, p->page_count + 2);
> > +    return 0;
> > +}
> > +
> > +/**
> > + * multifd_qpl_send_cleanup: cleanup send side
> > + *
> > + * Close the channel and return memory.
> > + *
> > + * @p: Params for the channel that we are using
> > + * @errp: pointer to an error
> > + */
> > +static void multifd_qpl_send_cleanup(MultiFDSendParams *p, Error
> **errp)
> > +{
> > +    multifd_qpl_deinit(p->compress_data);
> > +    p->compress_data = NULL;
> > +    g_free(p->iov);
> > +    p->iov = NULL;
> > +}
> > +
> > +/**
> > + * multifd_qpl_send_prepare: prepare data to be able to send
> > + *
> > + * Create a compressed buffer with all the pages that we are going to
> > + * send.
> > + *
> > + * Returns 0 for success or -1 for error
> > + *
> > + * @p: Params for the channel that we are using
> > + * @errp: pointer to an error
> > + */
> > +static int multifd_qpl_send_prepare(MultiFDSendParams *p, Error **errp)
> > +{
> > +    /* Implement in next patch */
> > +    return -1;
> > +}
> > +
> > +/**
> > + * multifd_qpl_recv_setup: setup receive side
> > + *
> > + * Create the compressed channel and buffer.
> > + *
> > + * Returns 0 for success or -1 for error
> > + *
> > + * @p: Params for the channel that we are using
> > + * @errp: pointer to an error
> > + */
> > +static int multifd_qpl_recv_setup(MultiFDRecvParams *p, Error **errp)
> > +{
> > +    QplData *qpl;
> > +
> > +    qpl = multifd_qpl_init(p->page_count, p->page_size, p->id, errp);
> > +    if (!qpl) {
> > +        return -1;
> > +    }
> > +    p->compress_data = qpl;
> > +    return 0;
> > +}
> > +
> > +/**
> > + * multifd_qpl_recv_cleanup: setup receive side
> > + *
> > + * Close the channel and return memory.
> > + *
> > + * @p: Params for the channel that we are using
> > + */
> > +static void multifd_qpl_recv_cleanup(MultiFDRecvParams *p)
> > +{
> > +    multifd_qpl_deinit(p->compress_data);
> > +    p->compress_data = NULL;
> > +}
> > +
> > +/**
> > + * multifd_qpl_recv: read the data from the channel into actual pages
> > + *
> > + * Read the compressed buffer, and uncompress it into the actual
> > + * pages.
> > + *
> > + * Returns 0 for success or -1 for error
> > + *
> > + * @p: Params for the channel that we are using
> > + * @errp: pointer to an error
> > + */
> > +static int multifd_qpl_recv(MultiFDRecvParams *p, Error **errp)
> > +{
> > +    /* Implement in next patch */
> > +    return -1;
> > +}
> > +
> > +static MultiFDMethods multifd_qpl_ops = {
> > +    .send_setup = multifd_qpl_send_setup,
> > +    .send_cleanup = multifd_qpl_send_cleanup,
> > +    .send_prepare = multifd_qpl_send_prepare,
> > +    .recv_setup = multifd_qpl_recv_setup,
> > +    .recv_cleanup = multifd_qpl_recv_cleanup,
> > +    .recv = multifd_qpl_recv,
> > +};
> >
> >  static void multifd_qpl_register(void)
> >  {
> > -    /* noop */
> > +    multifd_register_ops(MULTIFD_COMPRESSION_QPL, &multifd_qpl_ops);
> >  }
> >
> >  migration_init(multifd_qpl_register);


^ permalink raw reply	[flat|nested] 23+ messages in thread

* RE: [PATCH v6 5/7] migration/multifd: implement initialization of qpl compression
  2024-05-10 20:45   ` Fabiano Rosas
@ 2024-05-11 12:55     ` Liu, Yuan1
  0 siblings, 0 replies; 23+ messages in thread
From: Liu, Yuan1 @ 2024-05-11 12:55 UTC (permalink / raw)
  To: Fabiano Rosas, peterx; +Cc: qemu-devel, Zou, Nanhai

> -----Original Message-----
> From: Fabiano Rosas <farosas@suse.de>
> Sent: Saturday, May 11, 2024 4:45 AM
> To: Liu, Yuan1 <yuan1.liu@intel.com>; peterx@redhat.com
> Cc: qemu-devel@nongnu.org; Liu, Yuan1 <yuan1.liu@intel.com>; Zou, Nanhai
> <nanhai.zou@intel.com>
> Subject: Re: [PATCH v6 5/7] migration/multifd: implement initialization of
> qpl compression
> 
> Yuan Liu <yuan1.liu@intel.com> writes:
> 
> > the qpl initialization includes memory allocation for compressed
> > data and the qpl job initialization.
> >
> > the qpl job initialization will check if the In-Memory Analytics
> > Accelerator(IAA) device is available and use the IAA device first.
> > If the platform does not have IAA device or the IAA device is not
> > available, the qpl compression will fallback to the software path.
> >
> > Signed-off-by: Yuan Liu <yuan1.liu@intel.com>
> > Reviewed-by: Nanhai Zou <nanhai.zou@intel.com>
> 
> Looks good, just some nits below.
> 
> > ---
> >  migration/multifd-qpl.c | 272 +++++++++++++++++++++++++++++++++++++++-
> >  1 file changed, 271 insertions(+), 1 deletion(-)
> >
> > diff --git a/migration/multifd-qpl.c b/migration/multifd-qpl.c
> > index 056a68a060..89fa51091a 100644
> > --- a/migration/multifd-qpl.c
> > +++ b/migration/multifd-qpl.c
> > @@ -9,12 +9,282 @@
> >   * This work is licensed under the terms of the GNU GPL, version 2 or
> later.
> >   * See the COPYING file in the top-level directory.
> >   */
> > +
> >  #include "qemu/osdep.h"
> >  #include "qemu/module.h"
> > +#include "qapi/error.h"
> > +#include "migration.h"
> > +#include "multifd.h"
> > +#include "qpl/qpl.h"
> > +
> > +typedef struct {
> > +    qpl_job **job_array;
> > +    /* the number of allocated jobs */
> > +    uint32_t total_job_num;
> > +    /* compressed data buffer */
> > +    uint8_t *zbuf;
> > +    /* the length of compressed data */
> 
> array of lenghts

Thanks for the comment, I will improve the comment next version.

> > +    uint32_t *zbuf_hdr;
> 
> Why the _hdr suffix if the lengths are the only data stored here?

This zbuf_hdr is confusing, I will use lens instead of zbuf_hdr next version

> > +    /* the status of IAA device */
> > +    bool iaa_avail;
> > +} QplData;
> > +
> > +/**
> > + * check_iaa_avail: check if IAA device is available
> > + *
> > + * If the system does not have an IAA device, the IAA device is
> > + * not enabled or the IAA work queue is not configured as a shared
> > + * mode, the QPL hardware path initialization will fail.
> > + *
> > + * Returns true if IAA device is available, otherwise false.
> > + */
> > +static bool check_iaa_avail(void)
> > +{
> > +    qpl_job *job = NULL;
> > +    uint32_t job_size = 0;
> > +    qpl_path_t path = qpl_path_hardware;
> > +
> > +    if (qpl_get_job_size(path, &job_size) != QPL_STS_OK) {
> > +        return false;
> > +    }
> > +    job = g_malloc0(job_size);
> > +    if (qpl_init_job(path, job) != QPL_STS_OK) {
> > +        g_free(job);
> > +        return false;
> > +    }
> > +    g_free(job);
> > +    return true;
> > +}
> > +
> > +/**
> > + * multifd_qpl_free_jobs: cleanup jobs
> > + *
> > + * Free all job resources.
> > + *
> > + * @qpl: pointer to the QplData structure
> > + */
> > +static void multifd_qpl_free_jobs(QplData *qpl)
> > +{
> > +    assert(qpl != NULL);
> > +    for (int i = 0; i < qpl->total_job_num; i++) {
> > +        qpl_fini_job(qpl->job_array[i]);
> > +        g_free(qpl->job_array[i]);
> > +        qpl->job_array[i] = NULL;
> > +    }
> > +    g_free(qpl->job_array);
> > +    qpl->job_array = NULL;
> > +}
> > +
> > +/**
> > + * multifd_qpl_init_jobs: initialize jobs
> > + *
> > + * Initialize all jobs
> > + *
> > + * @qpl: pointer to the QplData structure
> > + * @chan_id: multifd channel number
> > + * @errp: pointer to an error
> > + */
> > +static int multifd_qpl_init_jobs(QplData *qpl, uint8_t chan_id, Error
> **errp)
> > +{
> > +    qpl_path_t path;
> > +    qpl_status status;
> > +    uint32_t job_size = 0;
> > +    qpl_job *job = NULL;
> > +
> > +    path = qpl->iaa_avail ? qpl_path_hardware : qpl_path_software;
> > +    status = qpl_get_job_size(path, &job_size);
> > +    if (status != QPL_STS_OK) {
> > +        error_setg(errp, "multifd: %u: qpl_get_job_size failed with
> error %d",
> > +                   chan_id, status);
> > +        return -1;
> > +    }
> > +    qpl->job_array = g_new0(qpl_job *, qpl->total_job_num);
> > +    for (int i = 0; i < qpl->total_job_num; i++) {
> > +        job = g_malloc0(job_size);
> > +        status = qpl_init_job(path, job);
> > +        if (status != QPL_STS_OK) {
> > +            error_setg(errp, "multifd: %u: qpl_init_job failed with
> error %d",
> > +                       chan_id, status);
> > +            multifd_qpl_free_jobs(qpl);
> > +            return -1;
> > +        }
> > +        qpl->job_array[i] = job;
> > +    }
> > +    return 0;
> > +}
> > +
> > +/**
> > + * multifd_qpl_init: initialize QplData structure
> > + *
> > + * Allocate and initialize a QplData structure
> > + *
> > + * Returns QplData pointer for success or NULL for error
> > + *
> > + * @job_num: pointer to the QplData structure
> 
> This needs updating^

Yes, thanks for the comment, I will fix it next version

> > + * @job_size: the buffer size of the job
> > + * @chan_id: multifd channel number
> > + * @errp: pointer to an error
> > + */
> > +static QplData *multifd_qpl_init(uint32_t job_num, uint32_t job_size,
> > +                                 uint8_t chan_id, Error **errp)
> > +{
> > +    QplData *qpl;
> > +
> > +    qpl = g_new0(QplData, 1);
> > +    qpl->total_job_num = job_num;
> > +    qpl->iaa_avail = check_iaa_avail();
> > +    if (multifd_qpl_init_jobs(qpl, chan_id, errp) != 0) {
> > +        g_free(qpl);
> > +        return NULL;
> > +    }
> > +    qpl->zbuf = g_malloc0(job_size * job_num);
> > +    qpl->zbuf_hdr = g_new0(uint32_t, job_num);
> > +    return qpl;
> > +}
> > +
> > +/**
> > + * multifd_qpl_deinit: cleanup QplData structure
> > + *
> > + * Free jobs, comprssed buffers and QplData structure
> 
> compressed

Thanks for the comment, I will fix the typo next version

> > + *
> > + * @qpl: pointer to the QplData structure
> > + */
> > +static void multifd_qpl_deinit(QplData *qpl)
> > +{
> > +    if (qpl != NULL) {
> > +        multifd_qpl_free_jobs(qpl);
> > +        g_free(qpl->zbuf_hdr);
> > +        g_free(qpl->zbuf);
> > +        g_free(qpl);
> > +    }
> > +}
> > +
> > +/**
> > + * multifd_qpl_send_setup: setup send side
> > + *
> > + * Setup each channel with QPL compression.
> 
> s/each/the/

Sure, I will refine the comment next version

> > + *
> > + * Returns 0 for success or -1 for error
> > + *
> > + * @p: Params for the channel that we are using
> > + * @errp: pointer to an error
> > + */
> > +static int multifd_qpl_send_setup(MultiFDSendParams *p, Error **errp)
> > +{
> > +    QplData *qpl;
> > +
> > +    qpl = multifd_qpl_init(p->page_count, p->page_size, p->id, errp);
> > +    if (!qpl) {
> > +        return -1;
> > +    }
> > +    p->compress_data = qpl;
> > +
> > +    /*
> > +     * Each page will be compressed independently and sent using an
> IOV. The
> > +     * additional two IOVs are used to store packet header and
> compressed data
> > +     * length
> > +     */
> > +    p->iov = g_new0(struct iovec, p->page_count + 2);
> > +    return 0;
> > +}
> > +
> > +/**
> > + * multifd_qpl_send_cleanup: cleanup send side
> > + *
> > + * Close the channel and return memory.
> > + *
> > + * @p: Params for the channel that we are using
> > + * @errp: pointer to an error
> > + */
> > +static void multifd_qpl_send_cleanup(MultiFDSendParams *p, Error
> **errp)
> > +{
> > +    multifd_qpl_deinit(p->compress_data);
> > +    p->compress_data = NULL;
> > +    g_free(p->iov);
> > +    p->iov = NULL;
> > +}
> > +
> > +/**
> > + * multifd_qpl_send_prepare: prepare data to be able to send
> > + *
> > + * Create a compressed buffer with all the pages that we are going to
> > + * send.
> > + *
> > + * Returns 0 for success or -1 for error
> > + *
> > + * @p: Params for the channel that we are using
> > + * @errp: pointer to an error
> > + */
> > +static int multifd_qpl_send_prepare(MultiFDSendParams *p, Error **errp)
> > +{
> > +    /* Implement in next patch */
> > +    return -1;
> > +}
> > +
> > +/**
> > + * multifd_qpl_recv_setup: setup receive side
> > + *
> > + * Create the compressed channel and buffer.
> > + *
> > + * Returns 0 for success or -1 for error
> > + *
> > + * @p: Params for the channel that we are using
> > + * @errp: pointer to an error
> > + */
> > +static int multifd_qpl_recv_setup(MultiFDRecvParams *p, Error **errp)
> > +{
> > +    QplData *qpl;
> > +
> > +    qpl = multifd_qpl_init(p->page_count, p->page_size, p->id, errp);
> > +    if (!qpl) {
> > +        return -1;
> > +    }
> > +    p->compress_data = qpl;
> > +    return 0;
> > +}
> > +
> > +/**
> > + * multifd_qpl_recv_cleanup: setup receive side
> > + *
> > + * Close the channel and return memory.
> > + *
> > + * @p: Params for the channel that we are using
> > + */
> > +static void multifd_qpl_recv_cleanup(MultiFDRecvParams *p)
> > +{
> > +    multifd_qpl_deinit(p->compress_data);
> > +    p->compress_data = NULL;
> > +}
> > +
> > +/**
> > + * multifd_qpl_recv: read the data from the channel into actual pages
> > + *
> > + * Read the compressed buffer, and uncompress it into the actual
> > + * pages.
> > + *
> > + * Returns 0 for success or -1 for error
> > + *
> > + * @p: Params for the channel that we are using
> > + * @errp: pointer to an error
> > + */
> > +static int multifd_qpl_recv(MultiFDRecvParams *p, Error **errp)
> > +{
> > +    /* Implement in next patch */
> > +    return -1;
> > +}
> > +
> > +static MultiFDMethods multifd_qpl_ops = {
> > +    .send_setup = multifd_qpl_send_setup,
> > +    .send_cleanup = multifd_qpl_send_cleanup,
> > +    .send_prepare = multifd_qpl_send_prepare,
> > +    .recv_setup = multifd_qpl_recv_setup,
> > +    .recv_cleanup = multifd_qpl_recv_cleanup,
> > +    .recv = multifd_qpl_recv,
> > +};
> >
> >  static void multifd_qpl_register(void)
> >  {
> > -    /* noop */
> > +    multifd_register_ops(MULTIFD_COMPRESSION_QPL, &multifd_qpl_ops);
> >  }
> >
> >  migration_init(multifd_qpl_register);


^ permalink raw reply	[flat|nested] 23+ messages in thread

* Re: [PATCH v6 6/7] migration/multifd: implement qpl compression and decompression
  2024-05-05 16:57 ` [PATCH v6 6/7] migration/multifd: implement qpl compression and decompression Yuan Liu
@ 2024-05-13 15:13   ` Fabiano Rosas
  2024-05-14  6:30     ` Liu, Yuan1
  0 siblings, 1 reply; 23+ messages in thread
From: Fabiano Rosas @ 2024-05-13 15:13 UTC (permalink / raw)
  To: Yuan Liu, peterx; +Cc: qemu-devel, yuan1.liu, nanhai.zou

Yuan Liu <yuan1.liu@intel.com> writes:

> each qpl job is used to (de)compress a normal page and it can
> be processed independently by the IAA hardware. All qpl jobs
> are submitted to the hardware at once, and wait for all jobs
> completion. If hardware path(IAA) is not available, use software
> for compression and decompression.
>
> Signed-off-by: Yuan Liu <yuan1.liu@intel.com>
> Reviewed-by: Nanhai Zou <nanhai.zou@intel.com>
> ---
>  migration/multifd-qpl.c | 284 +++++++++++++++++++++++++++++++++++++++-
>  1 file changed, 280 insertions(+), 4 deletions(-)
>
> diff --git a/migration/multifd-qpl.c b/migration/multifd-qpl.c
> index 89fa51091a..9a1fddbdd0 100644
> --- a/migration/multifd-qpl.c
> +++ b/migration/multifd-qpl.c
> @@ -13,6 +13,7 @@
>  #include "qemu/osdep.h"
>  #include "qemu/module.h"
>  #include "qapi/error.h"
> +#include "exec/ramblock.h"
>  #include "migration.h"
>  #include "multifd.h"
>  #include "qpl/qpl.h"
> @@ -204,6 +205,139 @@ static void multifd_qpl_send_cleanup(MultiFDSendParams *p, Error **errp)
>      p->iov = NULL;
>  }
>  
> +/**
> + * multifd_qpl_prepare_job: prepare a compression or decompression job
> + *
> + * Prepare a compression or decompression job and configure job attributes
> + * including job compression level and flags.
> + *
> + * @job: pointer to the QplData structure

qpl_job structure

> + * @is_compression: compression or decompression indication
> + * @input: pointer to the input data buffer
> + * @input_len: the length of the input data
> + * @output: pointer to the output data buffer
> + * @output_len: the size of the output data buffer
> + */
> +static void multifd_qpl_prepare_job(qpl_job *job, bool is_compression,
> +                                    uint8_t *input, uint32_t input_len,
> +                                    uint8_t *output, uint32_t output_len)
> +{
> +    job->op = is_compression ? qpl_op_compress : qpl_op_decompress;
> +    job->next_in_ptr = input;
> +    job->next_out_ptr = output;
> +    job->available_in = input_len;
> +    job->available_out = output_len;
> +    job->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST | QPL_FLAG_OMIT_VERIFY;
> +    /* only supports one compression level */
> +    job->level = 1;
> +}
> +
> +/**
> + * multifd_qpl_build_packet: build a qpl compressed data packet
> + *
> + * The qpl compressed data packet consists of two parts, one part stores
> + * the compressed length of each page, and the other part is the compressed
> + * data of each page. The zbuf_hdr stores the compressed length of all pages,
> + * and use a separate IOV to store the compressed data of each page.
> + *
> + * @qpl: pointer to the QplData structure
> + * @p: Params for the channel that we are using
> + * @idx: The index of the compressed length array
> + * @addr: pointer to the compressed data
> + * @len: The length of the compressed data
> + */
> +static void multifd_qpl_build_packet(QplData *qpl, MultiFDSendParams *p,
> +                                     uint32_t idx, uint8_t *addr, uint32_t len)
> +{
> +    qpl->zbuf_hdr[idx] = cpu_to_be32(len);
> +    p->iov[p->iovs_num].iov_base = addr;
> +    p->iov[p->iovs_num].iov_len = len;
> +    p->iovs_num++;
> +    p->next_packet_size += len;
> +}
> +
> +/**
> + * multifd_qpl_compress_pages: compress normal pages
> + *
> + * Each normal page will be compressed independently, and the compression jobs
> + * will be submitted to the IAA hardware in non-blocking mode, waiting for all
> + * jobs to be completed and filling the compressed length and data into the
> + * sending IOVs. If IAA device is not available, the software path is used.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @errp: pointer to an error
> + */
> +static int multifd_qpl_compress_pages(MultiFDSendParams *p, Error **errp)
> +{
> +    qpl_status status;
> +    QplData *qpl = p->compress_data;
> +    MultiFDPages_t *pages = p->pages;
> +    uint8_t *zbuf = qpl->zbuf;
> +    uint8_t *host = pages->block->host;
> +    uint32_t job_num = pages->normal_num;

A bit misleading because job_num is used in the previous patch as a
synonym for page_count. We could change the previous patch to:
multifd_qpl_init(uint32_t page_count, ...

> +    qpl_job *job = NULL;
> +
> +    assert(job_num <= qpl->total_job_num);
> +    /* submit all compression jobs */
> +    for (int i = 0; i < job_num; i++) {
> +        job = qpl->job_array[i];
> +        multifd_qpl_prepare_job(job, true, host + pages->offset[i],
> +                                p->page_size, zbuf, p->page_size - 1);

Isn't the output buffer size == page size, why the -1?

> +        /* if hardware path(IAA) is unavailable, call the software path */

If we're doing the fallback automatically, isn't that what qpl_path_auto
does already? What's the difference betweeen the two approaches?

> +        if (!qpl->iaa_avail) {

This function got a bit convoluted, it's probably worth a check at the
start and a branch to different multifd_qpl_compress_pages_slow()
routine altogether.

> +            status = qpl_execute_job(job);
> +            if (status == QPL_STS_OK) {
> +                multifd_qpl_build_packet(qpl, p, i, zbuf, job->total_out);
> +            } else if (status == QPL_STS_MORE_OUTPUT_NEEDED) {
> +                /* compressed length exceeds page size, send page directly */
> +                multifd_qpl_build_packet(qpl, p, i, host + pages->offset[i],
> +                                         p->page_size);
> +            } else {
> +                error_setg(errp, "multifd %u: qpl_execute_job error %d",
> +                           p->id, status);
> +                return -1;
> +            }
> +            zbuf += p->page_size;
> +            continue;
> +        }
> +retry:
> +        status = qpl_submit_job(job);
> +        if (status == QPL_STS_OK) {
> +            zbuf += p->page_size;
> +        } else if (status == QPL_STS_QUEUES_ARE_BUSY_ERR) {
> +            goto retry;

A retry count here would be nice.

> +        } else {
> +            error_setg(errp, "multifd %u: qpl_submit_job failed with error %d",
> +                       p->id, status);
> +            return -1;
> +        }
> +    }
> +    if (!qpl->iaa_avail) {
> +        goto done;
> +    }
> +    /* wait all jobs to complete for hardware(IAA) path */
> +    for (int i = 0; i < job_num; i++) {
> +        job = qpl->job_array[i];
> +        status = qpl_wait_job(job);
> +        if (status == QPL_STS_OK) {
> +            multifd_qpl_build_packet(qpl, p, i, qpl->zbuf + (p->page_size * i),
> +                                     job->total_out);
> +        } else if (status == QPL_STS_MORE_OUTPUT_NEEDED) {
> +            /* compressed data length exceeds page size, send page directly */
> +            multifd_qpl_build_packet(qpl, p, i, host + pages->offset[i],
> +                                     p->page_size);
> +        } else {
> +            error_setg(errp, "multifd %u: qpl_wait_job failed with error %d",
> +                       p->id, status);
> +            return -1;
> +        }
> +    }
> +done:
> +    return 0;
> +}
> +
>  /**
>   * multifd_qpl_send_prepare: prepare data to be able to send
>   *
> @@ -217,8 +351,28 @@ static void multifd_qpl_send_cleanup(MultiFDSendParams *p, Error **errp)
>   */
>  static int multifd_qpl_send_prepare(MultiFDSendParams *p, Error **errp)
>  {
> -    /* Implement in next patch */
> -    return -1;
> +    QplData *qpl = p->compress_data;
> +    uint32_t hdr_size;
> +
> +    if (!multifd_send_prepare_common(p)) {
> +        goto out;
> +    }
> +
> +    assert(p->pages->normal_num <= qpl->total_job_num);
> +    hdr_size = p->pages->normal_num * sizeof(uint32_t);
> +    /* prepare the header that stores the lengths of all compressed data */
> +    p->iov[1].iov_base = (uint8_t *) qpl->zbuf_hdr;
> +    p->iov[1].iov_len = hdr_size;

Better use p->iovs_num here in case we ever decide to add more stuff to
the front of the array.

> +    p->iovs_num++;
> +    p->next_packet_size += hdr_size;

Here's the first time we're setting this value, right? So just a regular
attribution(=).

> +    if (multifd_qpl_compress_pages(p, errp) != 0) {
> +        return -1;
> +    }
> +
> +out:
> +    p->flags |= MULTIFD_FLAG_QPL;
> +    multifd_send_fill_packet(p);
> +    return 0;
>  }
>  
>  /**
> @@ -256,6 +410,88 @@ static void multifd_qpl_recv_cleanup(MultiFDRecvParams *p)
>      p->compress_data = NULL;
>  }
>  
> +/**
> + * multifd_qpl_decompress_pages: decompress normal pages
> + *
> + * Each compressed page will be decompressed independently, and the
> + * decompression jobs will be submitted to the IAA hardware in non-blocking
> + * mode, waiting for all jobs to be completed and loading the decompressed
> + * data into guest memory. If IAA device is not available, the software path
> + * is used.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @errp: pointer to an error
> + */
> +static int multifd_qpl_decompress_pages(MultiFDRecvParams *p, Error **errp)
> +{
> +    qpl_status status;
> +    qpl_job *job;
> +    QplData *qpl = p->compress_data;
> +    uint32_t job_num = p->normal_num;
> +    uint32_t off = 0;
> +
> +    assert(job_num <= qpl->total_job_num);
> +    /* submit all decompression jobs */
> +    for (int i = 0; i < job_num; i++) {
> +        /* if the data size is the same as the page size, load it directly */
> +        if (qpl->zbuf_hdr[i] == p->page_size) {
> +            memcpy(p->host + p->normal[i], qpl->zbuf + off, p->page_size);
> +            off += p->page_size;
> +            continue;
> +        }
> +        job = qpl->job_array[i];
> +        multifd_qpl_prepare_job(job, false, qpl->zbuf + off, qpl->zbuf_hdr[i],
> +                                p->host + p->normal[i], p->page_size);
> +        /* if hardware path(IAA) is unavailable, call the software path */
> +        if (!qpl->iaa_avail) {
> +            status = qpl_execute_job(job);
> +            if (status == QPL_STS_OK) {
> +                off += qpl->zbuf_hdr[i];
> +                continue;
> +            }
> +            error_setg(errp, "multifd %u: qpl_execute_job failed with error %d",
> +                       p->id, status);
> +            return -1;
> +        }
> +retry:
> +        status = qpl_submit_job(job);
> +        if (status == QPL_STS_OK) {
> +            off += qpl->zbuf_hdr[i];
> +        } else if (status == QPL_STS_QUEUES_ARE_BUSY_ERR) {
> +            goto retry;
> +        } else {
> +            error_setg(errp, "multifd %u: qpl_submit_job failed with error %d",
> +                       p->id, status);
> +            return -1;
> +        }
> +    }
> +    if (!qpl->iaa_avail) {
> +        goto done;
> +    }
> +    /* wait all jobs to complete for hardware(IAA) path */
> +    for (int i = 0; i < job_num; i++) {
> +        if (qpl->zbuf_hdr[i] == p->page_size) {
> +            continue;
> +        }
> +        job = qpl->job_array[i];
> +        status = qpl_wait_job(job);
> +        if (status != QPL_STS_OK) {
> +            error_setg(errp, "multifd %u: qpl_wait_job failed with error %d",
> +                       p->id, status);
> +            return -1;
> +        }
> +        if (job->total_out != p->page_size) {
> +            error_setg(errp, "multifd %u: decompressed len %u, expected len %u",
> +                       p->id, job->total_out, p->page_size);
> +            return -1;
> +        }
> +    }
> +done:
> +    return 0;
> +}
> +
>  /**
>   * multifd_qpl_recv: read the data from the channel into actual pages
>   *
> @@ -269,8 +505,48 @@ static void multifd_qpl_recv_cleanup(MultiFDRecvParams *p)
>   */
>  static int multifd_qpl_recv(MultiFDRecvParams *p, Error **errp)
>  {
> -    /* Implement in next patch */
> -    return -1;
> +    QplData *qpl = p->compress_data;
> +    uint32_t in_size = p->next_packet_size;
> +    uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
> +    uint32_t hdr_len = p->normal_num * sizeof(uint32_t);
> +    uint32_t data_len = 0;
> +    int ret;
> +
> +    if (flags != MULTIFD_FLAG_QPL) {
> +        error_setg(errp, "multifd %u: flags received %x flags expected %x",
> +                   p->id, flags, MULTIFD_FLAG_QPL);
> +        return -1;
> +    }
> +    multifd_recv_zero_page_process(p);
> +    if (!p->normal_num) {
> +        assert(in_size == 0);
> +        return 0;
> +    }
> +
> +    /* read compressed data lengths */
> +    assert(hdr_len < in_size);
> +    ret = qio_channel_read_all(p->c, (void *) qpl->zbuf_hdr, hdr_len, errp);
> +    if (ret != 0) {
> +        return ret;
> +    }
> +    assert(p->normal_num <= qpl->total_job_num);

I'm still in doubt whether we should use p->page_count directly all
over. It's nice to move the concept into the QPL domain space, but it
makes less sense in these functions that take MultiFD*Params as
argument.

> +    for (int i = 0; i < p->normal_num; i++) {
> +        qpl->zbuf_hdr[i] = be32_to_cpu(qpl->zbuf_hdr[i]);
> +        data_len += qpl->zbuf_hdr[i];
> +        assert(qpl->zbuf_hdr[i] <= p->page_size);
> +    }
> +
> +    /* read compressed data */
> +    assert(in_size == hdr_len + data_len);
> +    ret = qio_channel_read_all(p->c, (void *) qpl->zbuf, data_len, errp);
> +    if (ret != 0) {
> +        return ret;
> +    }
> +
> +    if (multifd_qpl_decompress_pages(p, errp) != 0) {
> +        return -1;
> +    }
> +    return 0;
>  }
>  
>  static MultiFDMethods multifd_qpl_ops = {


^ permalink raw reply	[flat|nested] 23+ messages in thread

* RE: [PATCH v6 6/7] migration/multifd: implement qpl compression and decompression
  2024-05-13 15:13   ` Fabiano Rosas
@ 2024-05-14  6:30     ` Liu, Yuan1
  2024-05-14 14:08       ` Fabiano Rosas
  0 siblings, 1 reply; 23+ messages in thread
From: Liu, Yuan1 @ 2024-05-14  6:30 UTC (permalink / raw)
  To: Fabiano Rosas, peterx; +Cc: qemu-devel, Zou, Nanhai

> -----Original Message-----
> From: Fabiano Rosas <farosas@suse.de>
> Sent: Monday, May 13, 2024 11:14 PM
> To: Liu, Yuan1 <yuan1.liu@intel.com>; peterx@redhat.com
> Cc: qemu-devel@nongnu.org; Liu, Yuan1 <yuan1.liu@intel.com>; Zou, Nanhai
> <nanhai.zou@intel.com>
> Subject: Re: [PATCH v6 6/7] migration/multifd: implement qpl compression
> and decompression
> 
> Yuan Liu <yuan1.liu@intel.com> writes:
> 
> > each qpl job is used to (de)compress a normal page and it can
> > be processed independently by the IAA hardware. All qpl jobs
> > are submitted to the hardware at once, and wait for all jobs
> > completion. If hardware path(IAA) is not available, use software
> > for compression and decompression.
> >
> > Signed-off-by: Yuan Liu <yuan1.liu@intel.com>
> > Reviewed-by: Nanhai Zou <nanhai.zou@intel.com>
> > ---
> >  migration/multifd-qpl.c | 284 +++++++++++++++++++++++++++++++++++++++-
> >  1 file changed, 280 insertions(+), 4 deletions(-)
> >
> > diff --git a/migration/multifd-qpl.c b/migration/multifd-qpl.c
> > index 89fa51091a..9a1fddbdd0 100644
> > --- a/migration/multifd-qpl.c
> > +++ b/migration/multifd-qpl.c
> > @@ -13,6 +13,7 @@
> >  #include "qemu/osdep.h"
> >  #include "qemu/module.h"
> >  #include "qapi/error.h"
> > +#include "exec/ramblock.h"
> >  #include "migration.h"
> >  #include "multifd.h"
> >  #include "qpl/qpl.h"
> > @@ -204,6 +205,139 @@ static void
> multifd_qpl_send_cleanup(MultiFDSendParams *p, Error **errp)
> >      p->iov = NULL;
> >  }
> >
> > +/**
> > + * multifd_qpl_prepare_job: prepare a compression or decompression job
> > + *
> > + * Prepare a compression or decompression job and configure job
> attributes
> > + * including job compression level and flags.
> > + *
> > + * @job: pointer to the QplData structure
> 
> qpl_job structure

Thanks for the comment, I will fix this next version.

> > + * @is_compression: compression or decompression indication
> > + * @input: pointer to the input data buffer
> > + * @input_len: the length of the input data
> > + * @output: pointer to the output data buffer
> > + * @output_len: the size of the output data buffer
> > + */
> > +static void multifd_qpl_prepare_job(qpl_job *job, bool is_compression,
> > +                                    uint8_t *input, uint32_t input_len,
> > +                                    uint8_t *output, uint32_t
> output_len)
> > +{
> > +    job->op = is_compression ? qpl_op_compress : qpl_op_decompress;
> > +    job->next_in_ptr = input;
> > +    job->next_out_ptr = output;
> > +    job->available_in = input_len;
> > +    job->available_out = output_len;
> > +    job->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST | QPL_FLAG_OMIT_VERIFY;
> > +    /* only supports one compression level */
> > +    job->level = 1;
> > +}
> > +
> > +/**
> > + * multifd_qpl_build_packet: build a qpl compressed data packet
> > + *
> > + * The qpl compressed data packet consists of two parts, one part
> stores
> > + * the compressed length of each page, and the other part is the
> compressed
> > + * data of each page. The zbuf_hdr stores the compressed length of all
> pages,
> > + * and use a separate IOV to store the compressed data of each page.
> > + *
> > + * @qpl: pointer to the QplData structure
> > + * @p: Params for the channel that we are using
> > + * @idx: The index of the compressed length array
> > + * @addr: pointer to the compressed data
> > + * @len: The length of the compressed data
> > + */
> > +static void multifd_qpl_build_packet(QplData *qpl, MultiFDSendParams
> *p,
> > +                                     uint32_t idx, uint8_t *addr,
> uint32_t len)
> > +{
> > +    qpl->zbuf_hdr[idx] = cpu_to_be32(len);
> > +    p->iov[p->iovs_num].iov_base = addr;
> > +    p->iov[p->iovs_num].iov_len = len;
> > +    p->iovs_num++;
> > +    p->next_packet_size += len;
> > +}
> > +
> > +/**
> > + * multifd_qpl_compress_pages: compress normal pages
> > + *
> > + * Each normal page will be compressed independently, and the
> compression jobs
> > + * will be submitted to the IAA hardware in non-blocking mode, waiting
> for all
> > + * jobs to be completed and filling the compressed length and data into
> the
> > + * sending IOVs. If IAA device is not available, the software path is
> used.
> > + *
> > + * Returns 0 for success or -1 for error
> > + *
> > + * @p: Params for the channel that we are using
> > + * @errp: pointer to an error
> > + */
> > +static int multifd_qpl_compress_pages(MultiFDSendParams *p, Error
> **errp)
> > +{
> > +    qpl_status status;
> > +    QplData *qpl = p->compress_data;
> > +    MultiFDPages_t *pages = p->pages;
> > +    uint8_t *zbuf = qpl->zbuf;
> > +    uint8_t *host = pages->block->host;
> > +    uint32_t job_num = pages->normal_num;
> 
> A bit misleading because job_num is used in the previous patch as a
> synonym for page_count. We could change the previous patch to:
> multifd_qpl_init(uint32_t page_count, ...

Yes, I will replace job_num with page_count for multifd_qpl_init next version.

> > +    qpl_job *job = NULL;
> > +
> > +    assert(job_num <= qpl->total_job_num);
> > +    /* submit all compression jobs */
> > +    for (int i = 0; i < job_num; i++) {
> > +        job = qpl->job_array[i];
> > +        multifd_qpl_prepare_job(job, true, host + pages->offset[i],
> > +                                p->page_size, zbuf, p->page_size - 1);
> 
> Isn't the output buffer size == page size, why the -1?

I think the compressed data should be smaller than a normal page. 
If the compressed data size is equal to a normal page, send the original 
page directly to avoid decompression operation.

If the output buffer size is set to p->page_size, the compressed data size 
may be greater than or equal to a normal page, then there are two conditions for
sending a normal page, one is job status == QPL_STS_OK and job->total_out == p->page_size,
another is job status == QPL_STS_MORE_OUTPUT_NEEDED.

If the output buffer size is p->page_size - 1, only check QPL_STS_MORE_OUTPUT_NEEDED is ok. 
I will add comments for this in the next version.

> > +        /* if hardware path(IAA) is unavailable, call the software path
> */
> 
> If we're doing the fallback automatically, isn't that what qpl_path_auto
> does already? What's the difference between the two approaches?

The qpl_path_auto feature currently has some limitations.

Limitation 1: it does not detect IAA device status before job submission, which means
I need to use qpl_init_job(qpl_path_hardware, ...) first to make sure the hardware path
is available, then qpl_path_auto can work for software fallback when submitting a job to 
the IAA fails.

Limitation 2: The job submission API has changed
For the qpl_path_hardware, the qpl_submit_job is a non-blocking function, only submitting
a job to IAA work queues, use qpl_wait_job to get the job result.

For qpl_path_software/auto, the qpl_submit_job is a blocking function and returns the job
result directly,  qpl_wait_job can't get the job result.

In short, the qpl_submit_job/qpl_wait_job APIs do not well support the qpl_path_auto feature.

> > +        if (!qpl->iaa_avail) {
> 
> This function got a bit convoluted, it's probably worth a check at the
> start and a branch to different multifd_qpl_compress_pages_slow()
> routine altogether.

I agree that using multifd_qpl_compress_pages_slow is a better choice.

In my previous thoughts, the QPl software path(or slow path) is only used
for the unit test and I did not consider fallback to the software path when
migration start because the QPL software path has no advantage over zstd.
So when the work queue is full, always try to resubmit the job instead of 
fallback software path.

I now realize that when IAA hardware throughput is the bottleneck(the work queue is full),
switching to software may also increase performance.

I will implement the automatically falling back to the software when IAA work
queue is full into the next version and update the performance data. With the
increase in mutlfd migration threads, the performance will be better than now.
Currently, 4 threads will reach the IAA device throughput bottleneck, then increasing
The number of threads does not increase any performance.

> > +            status = qpl_execute_job(job);
> > +            if (status == QPL_STS_OK) {
> > +                multifd_qpl_build_packet(qpl, p, i, zbuf, job-
> >total_out);
> > +            } else if (status == QPL_STS_MORE_OUTPUT_NEEDED) {
> > +                /* compressed length exceeds page size, send page
> directly */
> > +                multifd_qpl_build_packet(qpl, p, i, host + pages-
> >offset[i],
> > +                                         p->page_size);
> > +            } else {
> > +                error_setg(errp, "multifd %u: qpl_execute_job
> error %d",
> > +                           p->id, status);
> > +                return -1;
> > +            }
> > +            zbuf += p->page_size;
> > +            continue;
> > +        }
> > +retry:
> > +        status = qpl_submit_job(job);
> > +        if (status == QPL_STS_OK) {
> > +            zbuf += p->page_size;
> > +        } else if (status == QPL_STS_QUEUES_ARE_BUSY_ERR) {
> > +            goto retry;
> 
> A retry count here would be nice.

As the previous comment, I will switch to multifd_qpl_compress_pages_slow
When the work queue is full, I will give a performance comparison between
hardware only and software fallback next version.

> > +        } else {
> > +            error_setg(errp, "multifd %u: qpl_submit_job failed with
> error %d",
> > +                       p->id, status);
> > +            return -1;
> > +        }
> > +    }
> > +    if (!qpl->iaa_avail) {
> > +        goto done;
> > +    }
> > +    /* wait all jobs to complete for hardware(IAA) path */
> > +    for (int i = 0; i < job_num; i++) {
> > +        job = qpl->job_array[i];
> > +        status = qpl_wait_job(job);
> > +        if (status == QPL_STS_OK) {
> > +            multifd_qpl_build_packet(qpl, p, i, qpl->zbuf + (p-
> >page_size * i),
> > +                                     job->total_out);
> > +        } else if (status == QPL_STS_MORE_OUTPUT_NEEDED) {
> > +            /* compressed data length exceeds page size, send page
> directly */
> > +            multifd_qpl_build_packet(qpl, p, i, host + pages-
> >offset[i],
> > +                                     p->page_size);
> > +        } else {
> > +            error_setg(errp, "multifd %u: qpl_wait_job failed with
> error %d",
> > +                       p->id, status);
> > +            return -1;
> > +        }
> > +    }
> > +done:
> > +    return 0;
> > +}
> > +
> >  /**
> >   * multifd_qpl_send_prepare: prepare data to be able to send
> >   *
> > @@ -217,8 +351,28 @@ static void
> multifd_qpl_send_cleanup(MultiFDSendParams *p, Error **errp)
> >   */
> >  static int multifd_qpl_send_prepare(MultiFDSendParams *p, Error **errp)
> >  {
> > -    /* Implement in next patch */
> > -    return -1;
> > +    QplData *qpl = p->compress_data;
> > +    uint32_t hdr_size;
> > +
> > +    if (!multifd_send_prepare_common(p)) {
> > +        goto out;
> > +    }
> > +
> > +    assert(p->pages->normal_num <= qpl->total_job_num);
> > +    hdr_size = p->pages->normal_num * sizeof(uint32_t);
> > +    /* prepare the header that stores the lengths of all compressed
> data */
> > +    p->iov[1].iov_base = (uint8_t *) qpl->zbuf_hdr;
> > +    p->iov[1].iov_len = hdr_size;
> 
> Better use p->iovs_num here in case we ever decide to add more stuff to
> the front of the array.

Get it, I will fix this next version.
 
> > +    p->iovs_num++;
> > +    p->next_packet_size += hdr_size;
> 
> Here's the first time we're setting this value, right? So just a regular
> attribution(=).

Yes, I will fix this next version.

> > +    if (multifd_qpl_compress_pages(p, errp) != 0) {
> > +        return -1;
> > +    }
> > +
> > +out:
> > +    p->flags |= MULTIFD_FLAG_QPL;
> > +    multifd_send_fill_packet(p);
> > +    return 0;
> >  }
> >
> >  /**
> > @@ -256,6 +410,88 @@ static void
> multifd_qpl_recv_cleanup(MultiFDRecvParams *p)
> >      p->compress_data = NULL;
> >  }
> >
> > +/**
> > + * multifd_qpl_decompress_pages: decompress normal pages
> > + *
> > + * Each compressed page will be decompressed independently, and the
> > + * decompression jobs will be submitted to the IAA hardware in non-
> blocking
> > + * mode, waiting for all jobs to be completed and loading the
> decompressed
> > + * data into guest memory. If IAA device is not available, the software
> path
> > + * is used.
> > + *
> > + * Returns 0 for success or -1 for error
> > + *
> > + * @p: Params for the channel that we are using
> > + * @errp: pointer to an error
> > + */
> > +static int multifd_qpl_decompress_pages(MultiFDRecvParams *p, Error
> **errp)
> > +{
> > +    qpl_status status;
> > +    qpl_job *job;
> > +    QplData *qpl = p->compress_data;
> > +    uint32_t job_num = p->normal_num;
> > +    uint32_t off = 0;
> > +
> > +    assert(job_num <= qpl->total_job_num);
> > +    /* submit all decompression jobs */
> > +    for (int i = 0; i < job_num; i++) {
> > +        /* if the data size is the same as the page size, load it
> directly */
> > +        if (qpl->zbuf_hdr[i] == p->page_size) {
> > +            memcpy(p->host + p->normal[i], qpl->zbuf + off, p-
> >page_size);
> > +            off += p->page_size;
> > +            continue;
> > +        }
> > +        job = qpl->job_array[i];
> > +        multifd_qpl_prepare_job(job, false, qpl->zbuf + off, qpl-
> >zbuf_hdr[i],
> > +                                p->host + p->normal[i], p->page_size);
> > +        /* if hardware path(IAA) is unavailable, call the software path
> */
> > +        if (!qpl->iaa_avail) {
> > +            status = qpl_execute_job(job);
> > +            if (status == QPL_STS_OK) {
> > +                off += qpl->zbuf_hdr[i];
> > +                continue;
> > +            }
> > +            error_setg(errp, "multifd %u: qpl_execute_job failed with
> error %d",
> > +                       p->id, status);
> > +            return -1;
> > +        }
> > +retry:
> > +        status = qpl_submit_job(job);
> > +        if (status == QPL_STS_OK) {
> > +            off += qpl->zbuf_hdr[i];
> > +        } else if (status == QPL_STS_QUEUES_ARE_BUSY_ERR) {
> > +            goto retry;
> > +        } else {
> > +            error_setg(errp, "multifd %u: qpl_submit_job failed with
> error %d",
> > +                       p->id, status);
> > +            return -1;
> > +        }
> > +    }
> > +    if (!qpl->iaa_avail) {
> > +        goto done;
> > +    }
> > +    /* wait all jobs to complete for hardware(IAA) path */
> > +    for (int i = 0; i < job_num; i++) {
> > +        if (qpl->zbuf_hdr[i] == p->page_size) {
> > +            continue;
> > +        }
> > +        job = qpl->job_array[i];
> > +        status = qpl_wait_job(job);
> > +        if (status != QPL_STS_OK) {
> > +            error_setg(errp, "multifd %u: qpl_wait_job failed with
> error %d",
> > +                       p->id, status);
> > +            return -1;
> > +        }
> > +        if (job->total_out != p->page_size) {
> > +            error_setg(errp, "multifd %u: decompressed len %u, expected
> len %u",
> > +                       p->id, job->total_out, p->page_size);
> > +            return -1;
> > +        }
> > +    }
> > +done:
> > +    return 0;
> > +}
> > +
> >  /**
> >   * multifd_qpl_recv: read the data from the channel into actual pages
> >   *
> > @@ -269,8 +505,48 @@ static void
> multifd_qpl_recv_cleanup(MultiFDRecvParams *p)
> >   */
> >  static int multifd_qpl_recv(MultiFDRecvParams *p, Error **errp)
> >  {
> > -    /* Implement in next patch */
> > -    return -1;
> > +    QplData *qpl = p->compress_data;
> > +    uint32_t in_size = p->next_packet_size;
> > +    uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
> > +    uint32_t hdr_len = p->normal_num * sizeof(uint32_t);
> > +    uint32_t data_len = 0;
> > +    int ret;
> > +
> > +    if (flags != MULTIFD_FLAG_QPL) {
> > +        error_setg(errp, "multifd %u: flags received %x flags
> expected %x",
> > +                   p->id, flags, MULTIFD_FLAG_QPL);
> > +        return -1;
> > +    }
> > +    multifd_recv_zero_page_process(p);
> > +    if (!p->normal_num) {
> > +        assert(in_size == 0);
> > +        return 0;
> > +    }
> > +
> > +    /* read compressed data lengths */
> > +    assert(hdr_len < in_size);
> > +    ret = qio_channel_read_all(p->c, (void *) qpl->zbuf_hdr, hdr_len,
> errp);
> > +    if (ret != 0) {
> > +        return ret;
> > +    }
> > +    assert(p->normal_num <= qpl->total_job_num);
> 
> I'm still in doubt whether we should use p->page_count directly all
> over. It's nice to move the concept into the QPL domain space, but it
> makes less sense in these functions that take MultiFD*Params as
> argument.

I don't understand what you mean here. Do you plan to remove page_count from MultiFD*Params
and provide a new API to get the migrated page count?

> > +    for (int i = 0; i < p->normal_num; i++) {
> > +        qpl->zbuf_hdr[i] = be32_to_cpu(qpl->zbuf_hdr[i]);
> > +        data_len += qpl->zbuf_hdr[i];
> > +        assert(qpl->zbuf_hdr[i] <= p->page_size);
> > +    }
> > +
> > +    /* read compressed data */
> > +    assert(in_size == hdr_len + data_len);
> > +    ret = qio_channel_read_all(p->c, (void *) qpl->zbuf, data_len,
> errp);
> > +    if (ret != 0) {
> > +        return ret;
> > +    }
> > +
> > +    if (multifd_qpl_decompress_pages(p, errp) != 0) {
> > +        return -1;
> > +    }
> > +    return 0;
> >  }
> >
> >  static MultiFDMethods multifd_qpl_ops = {


^ permalink raw reply	[flat|nested] 23+ messages in thread

* RE: [PATCH v6 6/7] migration/multifd: implement qpl compression and decompression
  2024-05-14  6:30     ` Liu, Yuan1
@ 2024-05-14 14:08       ` Fabiano Rosas
  2024-05-15  6:36         ` Liu, Yuan1
  0 siblings, 1 reply; 23+ messages in thread
From: Fabiano Rosas @ 2024-05-14 14:08 UTC (permalink / raw)
  To: Liu, Yuan1, peterx; +Cc: qemu-devel, Zou, Nanhai

"Liu, Yuan1" <yuan1.liu@intel.com> writes:

>> -----Original Message-----
>> From: Fabiano Rosas <farosas@suse.de>
>> Sent: Monday, May 13, 2024 11:14 PM
>> To: Liu, Yuan1 <yuan1.liu@intel.com>; peterx@redhat.com
>> Cc: qemu-devel@nongnu.org; Liu, Yuan1 <yuan1.liu@intel.com>; Zou, Nanhai
>> <nanhai.zou@intel.com>
>> Subject: Re: [PATCH v6 6/7] migration/multifd: implement qpl compression
>> and decompression
>> 
>> Yuan Liu <yuan1.liu@intel.com> writes:
>> 
>> > each qpl job is used to (de)compress a normal page and it can
>> > be processed independently by the IAA hardware. All qpl jobs
>> > are submitted to the hardware at once, and wait for all jobs
>> > completion. If hardware path(IAA) is not available, use software
>> > for compression and decompression.
>> >
>> > Signed-off-by: Yuan Liu <yuan1.liu@intel.com>
>> > Reviewed-by: Nanhai Zou <nanhai.zou@intel.com>
>> > ---
>> >  migration/multifd-qpl.c | 284 +++++++++++++++++++++++++++++++++++++++-
>> >  1 file changed, 280 insertions(+), 4 deletions(-)
>> >
>> > diff --git a/migration/multifd-qpl.c b/migration/multifd-qpl.c
>> > index 89fa51091a..9a1fddbdd0 100644
>> > --- a/migration/multifd-qpl.c
>> > +++ b/migration/multifd-qpl.c
>> > @@ -13,6 +13,7 @@
>> >  #include "qemu/osdep.h"
>> >  #include "qemu/module.h"
>> >  #include "qapi/error.h"
>> > +#include "exec/ramblock.h"
>> >  #include "migration.h"
>> >  #include "multifd.h"
>> >  #include "qpl/qpl.h"
>> > @@ -204,6 +205,139 @@ static void
>> multifd_qpl_send_cleanup(MultiFDSendParams *p, Error **errp)
>> >      p->iov = NULL;
>> >  }
>> >
>> > +/**
>> > + * multifd_qpl_prepare_job: prepare a compression or decompression job
>> > + *
>> > + * Prepare a compression or decompression job and configure job
>> attributes
>> > + * including job compression level and flags.
>> > + *
>> > + * @job: pointer to the QplData structure
>> 
>> qpl_job structure
>
> Thanks for the comment, I will fix this next version.
>
>> > + * @is_compression: compression or decompression indication
>> > + * @input: pointer to the input data buffer
>> > + * @input_len: the length of the input data
>> > + * @output: pointer to the output data buffer
>> > + * @output_len: the size of the output data buffer
>> > + */
>> > +static void multifd_qpl_prepare_job(qpl_job *job, bool is_compression,
>> > +                                    uint8_t *input, uint32_t input_len,
>> > +                                    uint8_t *output, uint32_t
>> output_len)
>> > +{
>> > +    job->op = is_compression ? qpl_op_compress : qpl_op_decompress;
>> > +    job->next_in_ptr = input;
>> > +    job->next_out_ptr = output;
>> > +    job->available_in = input_len;
>> > +    job->available_out = output_len;
>> > +    job->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST | QPL_FLAG_OMIT_VERIFY;
>> > +    /* only supports one compression level */
>> > +    job->level = 1;
>> > +}
>> > +
>> > +/**
>> > + * multifd_qpl_build_packet: build a qpl compressed data packet
>> > + *
>> > + * The qpl compressed data packet consists of two parts, one part
>> stores
>> > + * the compressed length of each page, and the other part is the
>> compressed
>> > + * data of each page. The zbuf_hdr stores the compressed length of all
>> pages,
>> > + * and use a separate IOV to store the compressed data of each page.
>> > + *
>> > + * @qpl: pointer to the QplData structure
>> > + * @p: Params for the channel that we are using
>> > + * @idx: The index of the compressed length array
>> > + * @addr: pointer to the compressed data
>> > + * @len: The length of the compressed data
>> > + */
>> > +static void multifd_qpl_build_packet(QplData *qpl, MultiFDSendParams
>> *p,
>> > +                                     uint32_t idx, uint8_t *addr,
>> uint32_t len)
>> > +{
>> > +    qpl->zbuf_hdr[idx] = cpu_to_be32(len);
>> > +    p->iov[p->iovs_num].iov_base = addr;
>> > +    p->iov[p->iovs_num].iov_len = len;
>> > +    p->iovs_num++;
>> > +    p->next_packet_size += len;
>> > +}
>> > +
>> > +/**
>> > + * multifd_qpl_compress_pages: compress normal pages
>> > + *
>> > + * Each normal page will be compressed independently, and the
>> compression jobs
>> > + * will be submitted to the IAA hardware in non-blocking mode, waiting
>> for all
>> > + * jobs to be completed and filling the compressed length and data into
>> the
>> > + * sending IOVs. If IAA device is not available, the software path is
>> used.
>> > + *
>> > + * Returns 0 for success or -1 for error
>> > + *
>> > + * @p: Params for the channel that we are using
>> > + * @errp: pointer to an error
>> > + */
>> > +static int multifd_qpl_compress_pages(MultiFDSendParams *p, Error
>> **errp)
>> > +{
>> > +    qpl_status status;
>> > +    QplData *qpl = p->compress_data;
>> > +    MultiFDPages_t *pages = p->pages;
>> > +    uint8_t *zbuf = qpl->zbuf;
>> > +    uint8_t *host = pages->block->host;
>> > +    uint32_t job_num = pages->normal_num;
>> 
>> A bit misleading because job_num is used in the previous patch as a
>> synonym for page_count. We could change the previous patch to:
>> multifd_qpl_init(uint32_t page_count, ...
>
> Yes, I will replace job_num with page_count for multifd_qpl_init next version.
>
>> > +    qpl_job *job = NULL;
>> > +
>> > +    assert(job_num <= qpl->total_job_num);
>> > +    /* submit all compression jobs */
>> > +    for (int i = 0; i < job_num; i++) {
>> > +        job = qpl->job_array[i];
>> > +        multifd_qpl_prepare_job(job, true, host + pages->offset[i],
>> > +                                p->page_size, zbuf, p->page_size - 1);
>> 
>> Isn't the output buffer size == page size, why the -1?
>
> I think the compressed data should be smaller than a normal page. 
> If the compressed data size is equal to a normal page, send the original 
> page directly to avoid decompression operation.
>
> If the output buffer size is set to p->page_size, the compressed data size 
> may be greater than or equal to a normal page, then there are two conditions for
> sending a normal page, one is job status == QPL_STS_OK and job->total_out == p->page_size,
> another is job status == QPL_STS_MORE_OUTPUT_NEEDED.
>
> If the output buffer size is p->page_size - 1, only check QPL_STS_MORE_OUTPUT_NEEDED is ok. 
> I will add comments for this in the next version.
>

Thanks.

>> > +        /* if hardware path(IAA) is unavailable, call the software path
>> */
>> 
>> If we're doing the fallback automatically, isn't that what qpl_path_auto
>> does already? What's the difference between the two approaches?
>
> The qpl_path_auto feature currently has some limitations.
>
> Limitation 1: it does not detect IAA device status before job submission, which means
> I need to use qpl_init_job(qpl_path_hardware, ...) first to make sure the hardware path
> is available, then qpl_path_auto can work for software fallback when submitting a job to 
> the IAA fails.
>
> Limitation 2: The job submission API has changed
> For the qpl_path_hardware, the qpl_submit_job is a non-blocking function, only submitting
> a job to IAA work queues, use qpl_wait_job to get the job result.
>
> For qpl_path_software/auto, the qpl_submit_job is a blocking function and returns the job
> result directly,  qpl_wait_job can't get the job result.
>
> In short, the qpl_submit_job/qpl_wait_job APIs do not well support the qpl_path_auto feature.

Please put a summary of this in the commit message so in the future we
can evaluate whether to continue checking for ourselves.

>
>> > +        if (!qpl->iaa_avail) {
>> 
>> This function got a bit convoluted, it's probably worth a check at the
>> start and a branch to different multifd_qpl_compress_pages_slow()
>> routine altogether.
>
> I agree that using multifd_qpl_compress_pages_slow is a better choice.
>
> In my previous thoughts, the QPl software path(or slow path) is only used
> for the unit test and I did not consider fallback to the software path when
> migration start because the QPL software path has no advantage over zstd.
> So when the work queue is full, always try to resubmit the job instead of 
> fallback software path.
>
> I now realize that when IAA hardware throughput is the bottleneck(the work queue is full),
> switching to software may also increase performance.
>
> I will implement the automatically falling back to the software when IAA work
> queue is full into the next version and update the performance data. With the
> increase in mutlfd migration threads, the performance will be better than now.
> Currently, 4 threads will reach the IAA device throughput bottleneck, then increasing
> The number of threads does not increase any performance.
>
>> > +            status = qpl_execute_job(job);
>> > +            if (status == QPL_STS_OK) {
>> > +                multifd_qpl_build_packet(qpl, p, i, zbuf, job-
>> >total_out);
>> > +            } else if (status == QPL_STS_MORE_OUTPUT_NEEDED) {
>> > +                /* compressed length exceeds page size, send page
>> directly */
>> > +                multifd_qpl_build_packet(qpl, p, i, host + pages-
>> >offset[i],
>> > +                                         p->page_size);
>> > +            } else {
>> > +                error_setg(errp, "multifd %u: qpl_execute_job
>> error %d",
>> > +                           p->id, status);
>> > +                return -1;
>> > +            }
>> > +            zbuf += p->page_size;
>> > +            continue;
>> > +        }
>> > +retry:
>> > +        status = qpl_submit_job(job);
>> > +        if (status == QPL_STS_OK) {
>> > +            zbuf += p->page_size;
>> > +        } else if (status == QPL_STS_QUEUES_ARE_BUSY_ERR) {
>> > +            goto retry;
>> 
>> A retry count here would be nice.
>
> As the previous comment, I will switch to multifd_qpl_compress_pages_slow
> When the work queue is full, I will give a performance comparison between
> hardware only and software fallback next version.
>
>> > +        } else {
>> > +            error_setg(errp, "multifd %u: qpl_submit_job failed with
>> error %d",
>> > +                       p->id, status);
>> > +            return -1;
>> > +        }
>> > +    }
>> > +    if (!qpl->iaa_avail) {
>> > +        goto done;
>> > +    }
>> > +    /* wait all jobs to complete for hardware(IAA) path */
>> > +    for (int i = 0; i < job_num; i++) {
>> > +        job = qpl->job_array[i];
>> > +        status = qpl_wait_job(job);
>> > +        if (status == QPL_STS_OK) {
>> > +            multifd_qpl_build_packet(qpl, p, i, qpl->zbuf + (p-
>> >page_size * i),
>> > +                                     job->total_out);
>> > +        } else if (status == QPL_STS_MORE_OUTPUT_NEEDED) {
>> > +            /* compressed data length exceeds page size, send page
>> directly */
>> > +            multifd_qpl_build_packet(qpl, p, i, host + pages-
>> >offset[i],
>> > +                                     p->page_size);
>> > +        } else {
>> > +            error_setg(errp, "multifd %u: qpl_wait_job failed with
>> error %d",
>> > +                       p->id, status);
>> > +            return -1;
>> > +        }
>> > +    }
>> > +done:
>> > +    return 0;
>> > +}
>> > +
>> >  /**
>> >   * multifd_qpl_send_prepare: prepare data to be able to send
>> >   *
>> > @@ -217,8 +351,28 @@ static void
>> multifd_qpl_send_cleanup(MultiFDSendParams *p, Error **errp)
>> >   */
>> >  static int multifd_qpl_send_prepare(MultiFDSendParams *p, Error **errp)
>> >  {
>> > -    /* Implement in next patch */
>> > -    return -1;
>> > +    QplData *qpl = p->compress_data;
>> > +    uint32_t hdr_size;
>> > +
>> > +    if (!multifd_send_prepare_common(p)) {
>> > +        goto out;
>> > +    }
>> > +
>> > +    assert(p->pages->normal_num <= qpl->total_job_num);
>> > +    hdr_size = p->pages->normal_num * sizeof(uint32_t);
>> > +    /* prepare the header that stores the lengths of all compressed
>> data */
>> > +    p->iov[1].iov_base = (uint8_t *) qpl->zbuf_hdr;
>> > +    p->iov[1].iov_len = hdr_size;
>> 
>> Better use p->iovs_num here in case we ever decide to add more stuff to
>> the front of the array.
>
> Get it, I will fix this next version.
>  
>> > +    p->iovs_num++;
>> > +    p->next_packet_size += hdr_size;
>> 
>> Here's the first time we're setting this value, right? So just a regular
>> attribution(=).
>
> Yes, I will fix this next version.
>
>> > +    if (multifd_qpl_compress_pages(p, errp) != 0) {
>> > +        return -1;
>> > +    }
>> > +
>> > +out:
>> > +    p->flags |= MULTIFD_FLAG_QPL;
>> > +    multifd_send_fill_packet(p);
>> > +    return 0;
>> >  }
>> >
>> >  /**
>> > @@ -256,6 +410,88 @@ static void
>> multifd_qpl_recv_cleanup(MultiFDRecvParams *p)
>> >      p->compress_data = NULL;
>> >  }
>> >
>> > +/**
>> > + * multifd_qpl_decompress_pages: decompress normal pages
>> > + *
>> > + * Each compressed page will be decompressed independently, and the
>> > + * decompression jobs will be submitted to the IAA hardware in non-
>> blocking
>> > + * mode, waiting for all jobs to be completed and loading the
>> decompressed
>> > + * data into guest memory. If IAA device is not available, the software
>> path
>> > + * is used.
>> > + *
>> > + * Returns 0 for success or -1 for error
>> > + *
>> > + * @p: Params for the channel that we are using
>> > + * @errp: pointer to an error
>> > + */
>> > +static int multifd_qpl_decompress_pages(MultiFDRecvParams *p, Error
>> **errp)
>> > +{
>> > +    qpl_status status;
>> > +    qpl_job *job;
>> > +    QplData *qpl = p->compress_data;
>> > +    uint32_t job_num = p->normal_num;
>> > +    uint32_t off = 0;
>> > +
>> > +    assert(job_num <= qpl->total_job_num);
>> > +    /* submit all decompression jobs */
>> > +    for (int i = 0; i < job_num; i++) {
>> > +        /* if the data size is the same as the page size, load it
>> directly */
>> > +        if (qpl->zbuf_hdr[i] == p->page_size) {
>> > +            memcpy(p->host + p->normal[i], qpl->zbuf + off, p-
>> >page_size);
>> > +            off += p->page_size;
>> > +            continue;
>> > +        }
>> > +        job = qpl->job_array[i];
>> > +        multifd_qpl_prepare_job(job, false, qpl->zbuf + off, qpl-
>> >zbuf_hdr[i],
>> > +                                p->host + p->normal[i], p->page_size);
>> > +        /* if hardware path(IAA) is unavailable, call the software path
>> */
>> > +        if (!qpl->iaa_avail) {
>> > +            status = qpl_execute_job(job);
>> > +            if (status == QPL_STS_OK) {
>> > +                off += qpl->zbuf_hdr[i];
>> > +                continue;
>> > +            }
>> > +            error_setg(errp, "multifd %u: qpl_execute_job failed with
>> error %d",
>> > +                       p->id, status);
>> > +            return -1;
>> > +        }
>> > +retry:
>> > +        status = qpl_submit_job(job);
>> > +        if (status == QPL_STS_OK) {
>> > +            off += qpl->zbuf_hdr[i];
>> > +        } else if (status == QPL_STS_QUEUES_ARE_BUSY_ERR) {
>> > +            goto retry;
>> > +        } else {
>> > +            error_setg(errp, "multifd %u: qpl_submit_job failed with
>> error %d",
>> > +                       p->id, status);
>> > +            return -1;
>> > +        }
>> > +    }
>> > +    if (!qpl->iaa_avail) {
>> > +        goto done;
>> > +    }
>> > +    /* wait all jobs to complete for hardware(IAA) path */
>> > +    for (int i = 0; i < job_num; i++) {
>> > +        if (qpl->zbuf_hdr[i] == p->page_size) {
>> > +            continue;
>> > +        }
>> > +        job = qpl->job_array[i];
>> > +        status = qpl_wait_job(job);
>> > +        if (status != QPL_STS_OK) {
>> > +            error_setg(errp, "multifd %u: qpl_wait_job failed with
>> error %d",
>> > +                       p->id, status);
>> > +            return -1;
>> > +        }
>> > +        if (job->total_out != p->page_size) {
>> > +            error_setg(errp, "multifd %u: decompressed len %u, expected
>> len %u",
>> > +                       p->id, job->total_out, p->page_size);
>> > +            return -1;
>> > +        }
>> > +    }
>> > +done:
>> > +    return 0;
>> > +}
>> > +
>> >  /**
>> >   * multifd_qpl_recv: read the data from the channel into actual pages
>> >   *
>> > @@ -269,8 +505,48 @@ static void
>> multifd_qpl_recv_cleanup(MultiFDRecvParams *p)
>> >   */
>> >  static int multifd_qpl_recv(MultiFDRecvParams *p, Error **errp)
>> >  {
>> > -    /* Implement in next patch */
>> > -    return -1;
>> > +    QplData *qpl = p->compress_data;
>> > +    uint32_t in_size = p->next_packet_size;
>> > +    uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
>> > +    uint32_t hdr_len = p->normal_num * sizeof(uint32_t);
>> > +    uint32_t data_len = 0;
>> > +    int ret;
>> > +
>> > +    if (flags != MULTIFD_FLAG_QPL) {
>> > +        error_setg(errp, "multifd %u: flags received %x flags
>> expected %x",
>> > +                   p->id, flags, MULTIFD_FLAG_QPL);
>> > +        return -1;
>> > +    }
>> > +    multifd_recv_zero_page_process(p);
>> > +    if (!p->normal_num) {
>> > +        assert(in_size == 0);
>> > +        return 0;
>> > +    }
>> > +
>> > +    /* read compressed data lengths */
>> > +    assert(hdr_len < in_size);
>> > +    ret = qio_channel_read_all(p->c, (void *) qpl->zbuf_hdr, hdr_len,
>> errp);
>> > +    if (ret != 0) {
>> > +        return ret;
>> > +    }
>> > +    assert(p->normal_num <= qpl->total_job_num);
>> 
>> I'm still in doubt whether we should use p->page_count directly all
>> over. It's nice to move the concept into the QPL domain space, but it
>> makes less sense in these functions that take MultiFD*Params as
>> argument.
>
> I don't understand what you mean here. Do you plan to remove page_count from MultiFD*Params
> and provide a new API to get the migrated page count?
>

No, I mean that qpl->total_job_num == p->page_count, so we could use
p->page_count in the functions that have MultiFDParams available. Maybe
even drop total_job_num altogether. But I'm debating if it is worth it,
because that makes the code more coupled to multifd and we may not want
that. Let's leave it for now.

>> > +    for (int i = 0; i < p->normal_num; i++) {
>> > +        qpl->zbuf_hdr[i] = be32_to_cpu(qpl->zbuf_hdr[i]);
>> > +        data_len += qpl->zbuf_hdr[i];
>> > +        assert(qpl->zbuf_hdr[i] <= p->page_size);
>> > +    }
>> > +
>> > +    /* read compressed data */
>> > +    assert(in_size == hdr_len + data_len);
>> > +    ret = qio_channel_read_all(p->c, (void *) qpl->zbuf, data_len,
>> errp);
>> > +    if (ret != 0) {
>> > +        return ret;
>> > +    }
>> > +
>> > +    if (multifd_qpl_decompress_pages(p, errp) != 0) {
>> > +        return -1;
>> > +    }
>> > +    return 0;
>> >  }
>> >
>> >  static MultiFDMethods multifd_qpl_ops = {


^ permalink raw reply	[flat|nested] 23+ messages in thread

* RE: [PATCH v6 6/7] migration/multifd: implement qpl compression and decompression
  2024-05-14 14:08       ` Fabiano Rosas
@ 2024-05-15  6:36         ` Liu, Yuan1
  0 siblings, 0 replies; 23+ messages in thread
From: Liu, Yuan1 @ 2024-05-15  6:36 UTC (permalink / raw)
  To: Fabiano Rosas, peterx; +Cc: qemu-devel, Zou, Nanhai

> -----Original Message-----
> From: Fabiano Rosas <farosas@suse.de>
> Sent: Tuesday, May 14, 2024 10:08 PM
> To: Liu, Yuan1 <yuan1.liu@intel.com>; peterx@redhat.com
> Cc: qemu-devel@nongnu.org; Zou, Nanhai <nanhai.zou@intel.com>
> Subject: RE: [PATCH v6 6/7] migration/multifd: implement qpl compression
> and decompression
> 
> "Liu, Yuan1" <yuan1.liu@intel.com> writes:
> 
> >> -----Original Message-----
> >> From: Fabiano Rosas <farosas@suse.de>
> >> Sent: Monday, May 13, 2024 11:14 PM
> >> To: Liu, Yuan1 <yuan1.liu@intel.com>; peterx@redhat.com
> >> Cc: qemu-devel@nongnu.org; Liu, Yuan1 <yuan1.liu@intel.com>; Zou,
> Nanhai
> >> <nanhai.zou@intel.com>
> >> Subject: Re: [PATCH v6 6/7] migration/multifd: implement qpl
> compression
> >> and decompression
> >>
> >> Yuan Liu <yuan1.liu@intel.com> writes:
> >>
> >> > each qpl job is used to (de)compress a normal page and it can
> >> > be processed independently by the IAA hardware. All qpl jobs
> >> > are submitted to the hardware at once, and wait for all jobs
> >> > completion. If hardware path(IAA) is not available, use software
> >> > for compression and decompression.
> >> >
> >> > Signed-off-by: Yuan Liu <yuan1.liu@intel.com>
> >> > Reviewed-by: Nanhai Zou <nanhai.zou@intel.com>
> >> > ---
> >> >  migration/multifd-qpl.c | 284
> +++++++++++++++++++++++++++++++++++++++-
> >> >  1 file changed, 280 insertions(+), 4 deletions(-)
> >> >
> >> > diff --git a/migration/multifd-qpl.c b/migration/multifd-qpl.c
> >> > index 89fa51091a..9a1fddbdd0 100644
> >> > --- a/migration/multifd-qpl.c
> >> > +++ b/migration/multifd-qpl.c
> >> > @@ -13,6 +13,7 @@
> >> >  #include "qemu/osdep.h"
> >> >  #include "qemu/module.h"
> >> >  #include "qapi/error.h"
> >> > +#include "exec/ramblock.h"
> >> >  #include "migration.h"
> >> >  #include "multifd.h"
> >> >  #include "qpl/qpl.h"
> >> > @@ -204,6 +205,139 @@ static void
> >> multifd_qpl_send_cleanup(MultiFDSendParams *p, Error **errp)
> >> >      p->iov = NULL;
> >> >  }
> >> >
> >> > +/**
> >> > + * multifd_qpl_prepare_job: prepare a compression or decompression
> job
> >> > + *
> >> > + * Prepare a compression or decompression job and configure job
> >> attributes
> >> > + * including job compression level and flags.
> >> > + *
> >> > + * @job: pointer to the QplData structure
> >>
> >> qpl_job structure
> >
> > Thanks for the comment, I will fix this next version.
> >
> >> > + * @is_compression: compression or decompression indication
> >> > + * @input: pointer to the input data buffer
> >> > + * @input_len: the length of the input data
> >> > + * @output: pointer to the output data buffer
> >> > + * @output_len: the size of the output data buffer
> >> > + */
> >> > +static void multifd_qpl_prepare_job(qpl_job *job, bool
> is_compression,
> >> > +                                    uint8_t *input, uint32_t
> input_len,
> >> > +                                    uint8_t *output, uint32_t
> >> output_len)
> >> > +{
> >> > +    job->op = is_compression ? qpl_op_compress : qpl_op_decompress;
> >> > +    job->next_in_ptr = input;
> >> > +    job->next_out_ptr = output;
> >> > +    job->available_in = input_len;
> >> > +    job->available_out = output_len;
> >> > +    job->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST |
> QPL_FLAG_OMIT_VERIFY;
> >> > +    /* only supports one compression level */
> >> > +    job->level = 1;
> >> > +}
> >> > +
> >> > +/**
> >> > + * multifd_qpl_build_packet: build a qpl compressed data packet
> >> > + *
> >> > + * The qpl compressed data packet consists of two parts, one part
> >> stores
> >> > + * the compressed length of each page, and the other part is the
> >> compressed
> >> > + * data of each page. The zbuf_hdr stores the compressed length of
> all
> >> pages,
> >> > + * and use a separate IOV to store the compressed data of each page.
> >> > + *
> >> > + * @qpl: pointer to the QplData structure
> >> > + * @p: Params for the channel that we are using
> >> > + * @idx: The index of the compressed length array
> >> > + * @addr: pointer to the compressed data
> >> > + * @len: The length of the compressed data
> >> > + */
> >> > +static void multifd_qpl_build_packet(QplData *qpl, MultiFDSendParams
> >> *p,
> >> > +                                     uint32_t idx, uint8_t *addr,
> >> uint32_t len)
> >> > +{
> >> > +    qpl->zbuf_hdr[idx] = cpu_to_be32(len);
> >> > +    p->iov[p->iovs_num].iov_base = addr;
> >> > +    p->iov[p->iovs_num].iov_len = len;
> >> > +    p->iovs_num++;
> >> > +    p->next_packet_size += len;
> >> > +}
> >> > +
> >> > +/**
> >> > + * multifd_qpl_compress_pages: compress normal pages
> >> > + *
> >> > + * Each normal page will be compressed independently, and the
> >> compression jobs
> >> > + * will be submitted to the IAA hardware in non-blocking mode,
> waiting
> >> for all
> >> > + * jobs to be completed and filling the compressed length and data
> into
> >> the
> >> > + * sending IOVs. If IAA device is not available, the software path
> is
> >> used.
> >> > + *
> >> > + * Returns 0 for success or -1 for error
> >> > + *
> >> > + * @p: Params for the channel that we are using
> >> > + * @errp: pointer to an error
> >> > + */
> >> > +static int multifd_qpl_compress_pages(MultiFDSendParams *p, Error
> >> **errp)
> >> > +{
> >> > +    qpl_status status;
> >> > +    QplData *qpl = p->compress_data;
> >> > +    MultiFDPages_t *pages = p->pages;
> >> > +    uint8_t *zbuf = qpl->zbuf;
> >> > +    uint8_t *host = pages->block->host;
> >> > +    uint32_t job_num = pages->normal_num;
> >>
> >> A bit misleading because job_num is used in the previous patch as a
> >> synonym for page_count. We could change the previous patch to:
> >> multifd_qpl_init(uint32_t page_count, ...
> >
> > Yes, I will replace job_num with page_count for multifd_qpl_init next
> version.
> >
> >> > +    qpl_job *job = NULL;
> >> > +
> >> > +    assert(job_num <= qpl->total_job_num);
> >> > +    /* submit all compression jobs */
> >> > +    for (int i = 0; i < job_num; i++) {
> >> > +        job = qpl->job_array[i];
> >> > +        multifd_qpl_prepare_job(job, true, host + pages->offset[i],
> >> > +                                p->page_size, zbuf, p->page_size -
> 1);
> >>
> >> Isn't the output buffer size == page size, why the -1?
> >
> > I think the compressed data should be smaller than a normal page.
> > If the compressed data size is equal to a normal page, send the original
> > page directly to avoid decompression operation.
> >
> > If the output buffer size is set to p->page_size, the compressed data
> size
> > may be greater than or equal to a normal page, then there are two
> conditions for
> > sending a normal page, one is job status == QPL_STS_OK and job-
> >total_out == p->page_size,
> > another is job status == QPL_STS_MORE_OUTPUT_NEEDED.
> >
> > If the output buffer size is p->page_size - 1, only check
> QPL_STS_MORE_OUTPUT_NEEDED is ok.
> > I will add comments for this in the next version.
> >
> 
> Thanks.
> 
> >> > +        /* if hardware path(IAA) is unavailable, call the software
> path
> >> */
> >>
> >> If we're doing the fallback automatically, isn't that what
> qpl_path_auto
> >> does already? What's the difference between the two approaches?
> >
> > The qpl_path_auto feature currently has some limitations.
> >
> > Limitation 1: it does not detect IAA device status before job
> submission, which means
> > I need to use qpl_init_job(qpl_path_hardware, ...) first to make sure
> the hardware path
> > is available, then qpl_path_auto can work for software fallback when
> submitting a job to
> > the IAA fails.
> >
> > Limitation 2: The job submission API has changed
> > For the qpl_path_hardware, the qpl_submit_job is a non-blocking
> function, only submitting
> > a job to IAA work queues, use qpl_wait_job to get the job result.
> >
> > For qpl_path_software/auto, the qpl_submit_job is a blocking function
> and returns the job
> > result directly,  qpl_wait_job can't get the job result.
> >
> > In short, the qpl_submit_job/qpl_wait_job APIs do not well support the
> qpl_path_auto feature.
> 
> Please put a summary of this in the commit message so in the future we
> can evaluate whether to continue checking for ourselves.

Sure, I will add a summary in the commit message.
I confirmed with the QPL development team that, sw fallback for
qpl_submit_job/qpl_wait_job APIs will be ready next QPL release. So I will
keep the current design next version, the software path only for the QPL unit test.

After the next version of QPL is released, I will update the sw fallback code
so that sw fallback can be used both in the QPL unit test and migration with IAA.

> >> > +        if (!qpl->iaa_avail) {
> >>
> >> This function got a bit convoluted, it's probably worth a check at the
> >> start and a branch to different multifd_qpl_compress_pages_slow()
> >> routine altogether.
> >
> > I agree that using multifd_qpl_compress_pages_slow is a better choice.
> >
> > In my previous thoughts, the QPl software path(or slow path) is only
> used
> > for the unit test and I did not consider fallback to the software path
> when
> > migration start because the QPL software path has no advantage over
> zstd.
> > So when the work queue is full, always try to resubmit the job instead
> of
> > fallback software path.
> >
> > I now realize that when IAA hardware throughput is the bottleneck(the
> work queue is full),
> > switching to software may also increase performance.
> >
> > I will implement the automatically falling back to the software when IAA
> work
> > queue is full into the next version and update the performance data.
> With the
> > increase in mutlfd migration threads, the performance will be better
> than now.
> > Currently, 4 threads will reach the IAA device throughput bottleneck,
> then increasing
> > The number of threads does not increase any performance.
> >
> >> > +            status = qpl_execute_job(job);
> >> > +            if (status == QPL_STS_OK) {
> >> > +                multifd_qpl_build_packet(qpl, p, i, zbuf, job-
> >> >total_out);
> >> > +            } else if (status == QPL_STS_MORE_OUTPUT_NEEDED) {
> >> > +                /* compressed length exceeds page size, send page
> >> directly */
> >> > +                multifd_qpl_build_packet(qpl, p, i, host + pages-
> >> >offset[i],
> >> > +                                         p->page_size);
> >> > +            } else {
> >> > +                error_setg(errp, "multifd %u: qpl_execute_job
> >> error %d",
> >> > +                           p->id, status);
> >> > +                return -1;
> >> > +            }
> >> > +            zbuf += p->page_size;
> >> > +            continue;
> >> > +        }
> >> > +retry:
> >> > +        status = qpl_submit_job(job);
> >> > +        if (status == QPL_STS_OK) {
> >> > +            zbuf += p->page_size;
> >> > +        } else if (status == QPL_STS_QUEUES_ARE_BUSY_ERR) {
> >> > +            goto retry;
> >>
> >> A retry count here would be nice.
> >
> > As the previous comment, I will switch to
> multifd_qpl_compress_pages_slow
> > When the work queue is full, I will give a performance comparison
> between
> > hardware only and software fallback next version.
> >
> >> > +        } else {
> >> > +            error_setg(errp, "multifd %u: qpl_submit_job failed with
> >> error %d",
> >> > +                       p->id, status);
> >> > +            return -1;
> >> > +        }
> >> > +    }
> >> > +    if (!qpl->iaa_avail) {
> >> > +        goto done;
> >> > +    }
> >> > +    /* wait all jobs to complete for hardware(IAA) path */
> >> > +    for (int i = 0; i < job_num; i++) {
> >> > +        job = qpl->job_array[i];
> >> > +        status = qpl_wait_job(job);
> >> > +        if (status == QPL_STS_OK) {
> >> > +            multifd_qpl_build_packet(qpl, p, i, qpl->zbuf + (p-
> >> >page_size * i),
> >> > +                                     job->total_out);
> >> > +        } else if (status == QPL_STS_MORE_OUTPUT_NEEDED) {
> >> > +            /* compressed data length exceeds page size, send page
> >> directly */
> >> > +            multifd_qpl_build_packet(qpl, p, i, host + pages-
> >> >offset[i],
> >> > +                                     p->page_size);
> >> > +        } else {
> >> > +            error_setg(errp, "multifd %u: qpl_wait_job failed with
> >> error %d",
> >> > +                       p->id, status);
> >> > +            return -1;
> >> > +        }
> >> > +    }
> >> > +done:
> >> > +    return 0;
> >> > +}
> >> > +
> >> >  /**
> >> >   * multifd_qpl_send_prepare: prepare data to be able to send
> >> >   *
> >> > @@ -217,8 +351,28 @@ static void
> >> multifd_qpl_send_cleanup(MultiFDSendParams *p, Error **errp)
> >> >   */
> >> >  static int multifd_qpl_send_prepare(MultiFDSendParams *p, Error
> **errp)
> >> >  {
> >> > -    /* Implement in next patch */
> >> > -    return -1;
> >> > +    QplData *qpl = p->compress_data;
> >> > +    uint32_t hdr_size;
> >> > +
> >> > +    if (!multifd_send_prepare_common(p)) {
> >> > +        goto out;
> >> > +    }
> >> > +
> >> > +    assert(p->pages->normal_num <= qpl->total_job_num);
> >> > +    hdr_size = p->pages->normal_num * sizeof(uint32_t);
> >> > +    /* prepare the header that stores the lengths of all compressed
> >> data */
> >> > +    p->iov[1].iov_base = (uint8_t *) qpl->zbuf_hdr;
> >> > +    p->iov[1].iov_len = hdr_size;
> >>
> >> Better use p->iovs_num here in case we ever decide to add more stuff to
> >> the front of the array.
> >
> > Get it, I will fix this next version.
> >
> >> > +    p->iovs_num++;
> >> > +    p->next_packet_size += hdr_size;
> >>
> >> Here's the first time we're setting this value, right? So just a
> regular
> >> attribution(=).
> >
> > Yes, I will fix this next version.
> >
> >> > +    if (multifd_qpl_compress_pages(p, errp) != 0) {
> >> > +        return -1;
> >> > +    }
> >> > +
> >> > +out:
> >> > +    p->flags |= MULTIFD_FLAG_QPL;
> >> > +    multifd_send_fill_packet(p);
> >> > +    return 0;
> >> >  }
> >> >
> >> >  /**
> >> > @@ -256,6 +410,88 @@ static void
> >> multifd_qpl_recv_cleanup(MultiFDRecvParams *p)
> >> >      p->compress_data = NULL;
> >> >  }
> >> >
> >> > +/**
> >> > + * multifd_qpl_decompress_pages: decompress normal pages
> >> > + *
> >> > + * Each compressed page will be decompressed independently, and the
> >> > + * decompression jobs will be submitted to the IAA hardware in non-
> >> blocking
> >> > + * mode, waiting for all jobs to be completed and loading the
> >> decompressed
> >> > + * data into guest memory. If IAA device is not available, the
> software
> >> path
> >> > + * is used.
> >> > + *
> >> > + * Returns 0 for success or -1 for error
> >> > + *
> >> > + * @p: Params for the channel that we are using
> >> > + * @errp: pointer to an error
> >> > + */
> >> > +static int multifd_qpl_decompress_pages(MultiFDRecvParams *p, Error
> >> **errp)
> >> > +{
> >> > +    qpl_status status;
> >> > +    qpl_job *job;
> >> > +    QplData *qpl = p->compress_data;
> >> > +    uint32_t job_num = p->normal_num;
> >> > +    uint32_t off = 0;
> >> > +
> >> > +    assert(job_num <= qpl->total_job_num);
> >> > +    /* submit all decompression jobs */
> >> > +    for (int i = 0; i < job_num; i++) {
> >> > +        /* if the data size is the same as the page size, load it
> >> directly */
> >> > +        if (qpl->zbuf_hdr[i] == p->page_size) {
> >> > +            memcpy(p->host + p->normal[i], qpl->zbuf + off, p-
> >> >page_size);
> >> > +            off += p->page_size;
> >> > +            continue;
> >> > +        }
> >> > +        job = qpl->job_array[i];
> >> > +        multifd_qpl_prepare_job(job, false, qpl->zbuf + off, qpl-
> >> >zbuf_hdr[i],
> >> > +                                p->host + p->normal[i], p-
> >page_size);
> >> > +        /* if hardware path(IAA) is unavailable, call the software
> path
> >> */
> >> > +        if (!qpl->iaa_avail) {
> >> > +            status = qpl_execute_job(job);
> >> > +            if (status == QPL_STS_OK) {
> >> > +                off += qpl->zbuf_hdr[i];
> >> > +                continue;
> >> > +            }
> >> > +            error_setg(errp, "multifd %u: qpl_execute_job failed
> with
> >> error %d",
> >> > +                       p->id, status);
> >> > +            return -1;
> >> > +        }
> >> > +retry:
> >> > +        status = qpl_submit_job(job);
> >> > +        if (status == QPL_STS_OK) {
> >> > +            off += qpl->zbuf_hdr[i];
> >> > +        } else if (status == QPL_STS_QUEUES_ARE_BUSY_ERR) {
> >> > +            goto retry;
> >> > +        } else {
> >> > +            error_setg(errp, "multifd %u: qpl_submit_job failed with
> >> error %d",
> >> > +                       p->id, status);
> >> > +            return -1;
> >> > +        }
> >> > +    }
> >> > +    if (!qpl->iaa_avail) {
> >> > +        goto done;
> >> > +    }
> >> > +    /* wait all jobs to complete for hardware(IAA) path */
> >> > +    for (int i = 0; i < job_num; i++) {
> >> > +        if (qpl->zbuf_hdr[i] == p->page_size) {
> >> > +            continue;
> >> > +        }
> >> > +        job = qpl->job_array[i];
> >> > +        status = qpl_wait_job(job);
> >> > +        if (status != QPL_STS_OK) {
> >> > +            error_setg(errp, "multifd %u: qpl_wait_job failed with
> >> error %d",
> >> > +                       p->id, status);
> >> > +            return -1;
> >> > +        }
> >> > +        if (job->total_out != p->page_size) {
> >> > +            error_setg(errp, "multifd %u: decompressed len %u,
> expected
> >> len %u",
> >> > +                       p->id, job->total_out, p->page_size);
> >> > +            return -1;
> >> > +        }
> >> > +    }
> >> > +done:
> >> > +    return 0;
> >> > +}
> >> > +
> >> >  /**
> >> >   * multifd_qpl_recv: read the data from the channel into actual
> pages
> >> >   *
> >> > @@ -269,8 +505,48 @@ static void
> >> multifd_qpl_recv_cleanup(MultiFDRecvParams *p)
> >> >   */
> >> >  static int multifd_qpl_recv(MultiFDRecvParams *p, Error **errp)
> >> >  {
> >> > -    /* Implement in next patch */
> >> > -    return -1;
> >> > +    QplData *qpl = p->compress_data;
> >> > +    uint32_t in_size = p->next_packet_size;
> >> > +    uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
> >> > +    uint32_t hdr_len = p->normal_num * sizeof(uint32_t);
> >> > +    uint32_t data_len = 0;
> >> > +    int ret;
> >> > +
> >> > +    if (flags != MULTIFD_FLAG_QPL) {
> >> > +        error_setg(errp, "multifd %u: flags received %x flags
> >> expected %x",
> >> > +                   p->id, flags, MULTIFD_FLAG_QPL);
> >> > +        return -1;
> >> > +    }
> >> > +    multifd_recv_zero_page_process(p);
> >> > +    if (!p->normal_num) {
> >> > +        assert(in_size == 0);
> >> > +        return 0;
> >> > +    }
> >> > +
> >> > +    /* read compressed data lengths */
> >> > +    assert(hdr_len < in_size);
> >> > +    ret = qio_channel_read_all(p->c, (void *) qpl->zbuf_hdr,
> hdr_len,
> >> errp);
> >> > +    if (ret != 0) {
> >> > +        return ret;
> >> > +    }
> >> > +    assert(p->normal_num <= qpl->total_job_num);
> >>
> >> I'm still in doubt whether we should use p->page_count directly all
> >> over. It's nice to move the concept into the QPL domain space, but it
> >> makes less sense in these functions that take MultiFD*Params as
> >> argument.
> >
> > I don't understand what you mean here. Do you plan to remove page_count
> from MultiFD*Params
> > and provide a new API to get the migrated page count?
> >
> 
> No, I mean that qpl->total_job_num == p->page_count, so we could use
> p->page_count in the functions that have MultiFDParams available. Maybe
> even drop total_job_num altogether. But I'm debating if it is worth it,
> because that makes the code more coupled to multifd and we may not want
> that. Let's leave it for now.

Get it, thanks for the comment

> >> > +    for (int i = 0; i < p->normal_num; i++) {
> >> > +        qpl->zbuf_hdr[i] = be32_to_cpu(qpl->zbuf_hdr[i]);
> >> > +        data_len += qpl->zbuf_hdr[i];
> >> > +        assert(qpl->zbuf_hdr[i] <= p->page_size);
> >> > +    }
> >> > +
> >> > +    /* read compressed data */
> >> > +    assert(in_size == hdr_len + data_len);
> >> > +    ret = qio_channel_read_all(p->c, (void *) qpl->zbuf, data_len,
> >> errp);
> >> > +    if (ret != 0) {
> >> > +        return ret;
> >> > +    }
> >> > +
> >> > +    if (multifd_qpl_decompress_pages(p, errp) != 0) {
> >> > +        return -1;
> >> > +    }
> >> > +    return 0;
> >> >  }
> >> >
> >> >  static MultiFDMethods multifd_qpl_ops = {


^ permalink raw reply	[flat|nested] 23+ messages in thread

* Re: [PATCH v6 2/7] migration/multifd: put IOV initialization into compression method
  2024-05-05 16:57 ` [PATCH v6 2/7] migration/multifd: put IOV initialization into compression method Yuan Liu
  2024-05-10 20:21   ` Fabiano Rosas
@ 2024-05-27 20:50   ` Peter Xu
  2024-05-28 13:36     ` Liu, Yuan1
  1 sibling, 1 reply; 23+ messages in thread
From: Peter Xu @ 2024-05-27 20:50 UTC (permalink / raw)
  To: Yuan Liu; +Cc: farosas, qemu-devel, nanhai.zou

On Mon, May 06, 2024 at 12:57:46AM +0800, Yuan Liu wrote:
> Different compression methods may require different numbers of IOVs.
> Based on streaming compression of zlib and zstd, all pages will be
> compressed to a data block, so two IOVs are needed for packet header
> and compressed data block.
> 
> Signed-off-by: Yuan Liu <yuan1.liu@intel.com>
> Reviewed-by: Nanhai Zou <nanhai.zou@intel.com>
> ---
>  migration/multifd-zlib.c |  7 +++++++
>  migration/multifd-zstd.c |  8 +++++++-
>  migration/multifd.c      | 22 ++++++++++++----------
>  3 files changed, 26 insertions(+), 11 deletions(-)
> 
> diff --git a/migration/multifd-zlib.c b/migration/multifd-zlib.c
> index 737a9645d2..2ced69487e 100644
> --- a/migration/multifd-zlib.c
> +++ b/migration/multifd-zlib.c
> @@ -70,6 +70,10 @@ static int zlib_send_setup(MultiFDSendParams *p, Error **errp)
>          goto err_free_zbuff;
>      }
>      p->compress_data = z;
> +
> +    /* Needs 2 IOVs, one for packet header and one for compressed data */
> +    p->iov = g_new0(struct iovec, 2);
> +
>      return 0;
>  
>  err_free_zbuff:
> @@ -101,6 +105,9 @@ static void zlib_send_cleanup(MultiFDSendParams *p, Error **errp)
>      z->buf = NULL;
>      g_free(p->compress_data);
>      p->compress_data = NULL;
> +
> +    g_free(p->iov);
> +    p->iov = NULL;
>  }
>  
>  /**
> diff --git a/migration/multifd-zstd.c b/migration/multifd-zstd.c
> index 256858df0a..ca17b7e310 100644
> --- a/migration/multifd-zstd.c
> +++ b/migration/multifd-zstd.c
> @@ -52,7 +52,6 @@ static int zstd_send_setup(MultiFDSendParams *p, Error **errp)
>      struct zstd_data *z = g_new0(struct zstd_data, 1);
>      int res;
>  
> -    p->compress_data = z;
>      z->zcs = ZSTD_createCStream();
>      if (!z->zcs) {
>          g_free(z);
> @@ -77,6 +76,10 @@ static int zstd_send_setup(MultiFDSendParams *p, Error **errp)
>          error_setg(errp, "multifd %u: out of memory for zbuff", p->id);
>          return -1;
>      }
> +    p->compress_data = z;
> +
> +    /* Needs 2 IOVs, one for packet header and one for compressed data */
> +    p->iov = g_new0(struct iovec, 2);
>      return 0;
>  }
>  
> @@ -98,6 +101,9 @@ static void zstd_send_cleanup(MultiFDSendParams *p, Error **errp)
>      z->zbuff = NULL;
>      g_free(p->compress_data);
>      p->compress_data = NULL;
> +
> +    g_free(p->iov);
> +    p->iov = NULL;
>  }
>  
>  /**
> diff --git a/migration/multifd.c b/migration/multifd.c
> index f317bff077..d82885fdbb 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -137,6 +137,13 @@ static int nocomp_send_setup(MultiFDSendParams *p, Error **errp)
>          p->write_flags |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY;
>      }
>  
> +    if (multifd_use_packets()) {
> +        /* We need one extra place for the packet header */
> +        p->iov = g_new0(struct iovec, p->page_count + 1);
> +    } else {
> +        p->iov = g_new0(struct iovec, p->page_count);
> +    }
> +
>      return 0;
>  }
>  
> @@ -150,6 +157,8 @@ static int nocomp_send_setup(MultiFDSendParams *p, Error **errp)
>   */
>  static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp)
>  {
> +    g_free(p->iov);
> +    p->iov = NULL;
>      return;
>  }
>  
> @@ -228,6 +237,7 @@ static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp)
>   */
>  static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp)
>  {
> +    p->iov = g_new0(struct iovec, p->page_count);
>      return 0;
>  }
>  
> @@ -240,6 +250,8 @@ static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp)
>   */
>  static void nocomp_recv_cleanup(MultiFDRecvParams *p)
>  {
> +    g_free(p->iov);
> +    p->iov = NULL;
>  }

Are recv_setup()/recv_cleanup() for zstd/zlib missing?

If the iov will be managed by the compressors, I'm wondering whether we
should start assert(p->iov) after send|recv_setup(), and assert(!p->iov)
after send|recv_cleanup().  That'll guard all these.

>  
>  /**
> @@ -783,8 +795,6 @@ static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
>      p->packet_len = 0;
>      g_free(p->packet);
>      p->packet = NULL;
> -    g_free(p->iov);
> -    p->iov = NULL;
>      multifd_send_state->ops->send_cleanup(p, errp);
>  
>      return *errp == NULL;
> @@ -1179,11 +1189,6 @@ bool multifd_send_setup(void)
>              p->packet = g_malloc0(p->packet_len);
>              p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
>              p->packet->version = cpu_to_be32(MULTIFD_VERSION);
> -
> -            /* We need one extra place for the packet header */
> -            p->iov = g_new0(struct iovec, page_count + 1);
> -        } else {
> -            p->iov = g_new0(struct iovec, page_count);
>          }
>          p->name = g_strdup_printf("multifdsend_%d", i);
>          p->page_size = qemu_target_page_size();
> @@ -1353,8 +1358,6 @@ static void multifd_recv_cleanup_channel(MultiFDRecvParams *p)
>      p->packet_len = 0;
>      g_free(p->packet);
>      p->packet = NULL;
> -    g_free(p->iov);
> -    p->iov = NULL;
>      g_free(p->normal);
>      p->normal = NULL;
>      g_free(p->zero);
> @@ -1602,7 +1605,6 @@ int multifd_recv_setup(Error **errp)
>              p->packet = g_malloc0(p->packet_len);
>          }
>          p->name = g_strdup_printf("multifdrecv_%d", i);
> -        p->iov = g_new0(struct iovec, page_count);
>          p->normal = g_new0(ram_addr_t, page_count);
>          p->zero = g_new0(ram_addr_t, page_count);
>          p->page_count = page_count;
> -- 
> 2.39.3
> 

-- 
Peter Xu



^ permalink raw reply	[flat|nested] 23+ messages in thread

* Re: [PATCH v6 7/7] tests/migration-test: add qpl compression test
  2024-05-05 16:57 ` [PATCH v6 7/7] tests/migration-test: add qpl compression test Yuan Liu
@ 2024-05-27 20:56   ` Peter Xu
  0 siblings, 0 replies; 23+ messages in thread
From: Peter Xu @ 2024-05-27 20:56 UTC (permalink / raw)
  To: Yuan Liu; +Cc: farosas, qemu-devel, nanhai.zou

On Mon, May 06, 2024 at 12:57:51AM +0800, Yuan Liu wrote:
> add qpl to compression method test for multifd migration
> 
> the qpl compression supports software path and hardware
> path(IAA device), and the hardware path is used first by
> default. If the hardware path is unavailable, it will
> automatically fallback to the software path for testing.
> 
> Signed-off-by: Yuan Liu <yuan1.liu@intel.com>
> Reviewed-by: Nanhai Zou <nanhai.zou@intel.com>

Reviewed-by: Peter Xu <peterx@redhat.com>

-- 
Peter Xu



^ permalink raw reply	[flat|nested] 23+ messages in thread

* RE: [PATCH v6 2/7] migration/multifd: put IOV initialization into compression method
  2024-05-27 20:50   ` Peter Xu
@ 2024-05-28 13:36     ` Liu, Yuan1
  2024-05-28 15:32       ` Peter Xu
  0 siblings, 1 reply; 23+ messages in thread
From: Liu, Yuan1 @ 2024-05-28 13:36 UTC (permalink / raw)
  To: Peter Xu; +Cc: farosas, qemu-devel, Zou, Nanhai

> -----Original Message-----
> From: Peter Xu <peterx@redhat.com>
> Sent: Tuesday, May 28, 2024 4:51 AM
> To: Liu, Yuan1 <yuan1.liu@intel.com>
> Cc: farosas@suse.de; qemu-devel@nongnu.org; Zou, Nanhai
> <nanhai.zou@intel.com>
> Subject: Re: [PATCH v6 2/7] migration/multifd: put IOV initialization into
> compression method
> 
> On Mon, May 06, 2024 at 12:57:46AM +0800, Yuan Liu wrote:
> > Different compression methods may require different numbers of IOVs.
> > Based on streaming compression of zlib and zstd, all pages will be
> > compressed to a data block, so two IOVs are needed for packet header
> > and compressed data block.
> >
> > Signed-off-by: Yuan Liu <yuan1.liu@intel.com>
> > Reviewed-by: Nanhai Zou <nanhai.zou@intel.com>
> > ---
> >  migration/multifd-zlib.c |  7 +++++++
> >  migration/multifd-zstd.c |  8 +++++++-
> >  migration/multifd.c      | 22 ++++++++++++----------
> >  3 files changed, 26 insertions(+), 11 deletions(-)
> >
> > diff --git a/migration/multifd-zlib.c b/migration/multifd-zlib.c
> > index 737a9645d2..2ced69487e 100644
> > --- a/migration/multifd-zlib.c
> > +++ b/migration/multifd-zlib.c
> > @@ -70,6 +70,10 @@ static int zlib_send_setup(MultiFDSendParams *p,
> Error **errp)
> >          goto err_free_zbuff;
> >      }
> >      p->compress_data = z;
> > +
> > +    /* Needs 2 IOVs, one for packet header and one for compressed data
> */
> > +    p->iov = g_new0(struct iovec, 2);
> > +
> >      return 0;
> >
> >  err_free_zbuff:
> > @@ -101,6 +105,9 @@ static void zlib_send_cleanup(MultiFDSendParams *p,
> Error **errp)
> >      z->buf = NULL;
> >      g_free(p->compress_data);
> >      p->compress_data = NULL;
> > +
> > +    g_free(p->iov);
> > +    p->iov = NULL;
> >  }
> >
> >  /**
> > diff --git a/migration/multifd-zstd.c b/migration/multifd-zstd.c
> > index 256858df0a..ca17b7e310 100644
> > --- a/migration/multifd-zstd.c
> > +++ b/migration/multifd-zstd.c
> > @@ -52,7 +52,6 @@ static int zstd_send_setup(MultiFDSendParams *p, Error
> **errp)
> >      struct zstd_data *z = g_new0(struct zstd_data, 1);
> >      int res;
> >
> > -    p->compress_data = z;
> >      z->zcs = ZSTD_createCStream();
> >      if (!z->zcs) {
> >          g_free(z);
> > @@ -77,6 +76,10 @@ static int zstd_send_setup(MultiFDSendParams *p,
> Error **errp)
> >          error_setg(errp, "multifd %u: out of memory for zbuff", p->id);
> >          return -1;
> >      }
> > +    p->compress_data = z;
> > +
> > +    /* Needs 2 IOVs, one for packet header and one for compressed data
> */
> > +    p->iov = g_new0(struct iovec, 2);
> >      return 0;
> >  }
> >
> > @@ -98,6 +101,9 @@ static void zstd_send_cleanup(MultiFDSendParams *p,
> Error **errp)
> >      z->zbuff = NULL;
> >      g_free(p->compress_data);
> >      p->compress_data = NULL;
> > +
> > +    g_free(p->iov);
> > +    p->iov = NULL;
> >  }
> >
> >  /**
> > diff --git a/migration/multifd.c b/migration/multifd.c
> > index f317bff077..d82885fdbb 100644
> > --- a/migration/multifd.c
> > +++ b/migration/multifd.c
> > @@ -137,6 +137,13 @@ static int nocomp_send_setup(MultiFDSendParams *p,
> Error **errp)
> >          p->write_flags |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY;
> >      }
> >
> > +    if (multifd_use_packets()) {
> > +        /* We need one extra place for the packet header */
> > +        p->iov = g_new0(struct iovec, p->page_count + 1);
> > +    } else {
> > +        p->iov = g_new0(struct iovec, p->page_count);
> > +    }
> > +
> >      return 0;
> >  }
> >
> > @@ -150,6 +157,8 @@ static int nocomp_send_setup(MultiFDSendParams *p,
> Error **errp)
> >   */
> >  static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp)
> >  {
> > +    g_free(p->iov);
> > +    p->iov = NULL;
> >      return;
> >  }
> >
> > @@ -228,6 +237,7 @@ static int nocomp_send_prepare(MultiFDSendParams *p,
> Error **errp)
> >   */
> >  static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp)
> >  {
> > +    p->iov = g_new0(struct iovec, p->page_count);
> >      return 0;
> >  }
> >
> > @@ -240,6 +250,8 @@ static int nocomp_recv_setup(MultiFDRecvParams *p,
> Error **errp)
> >   */
> >  static void nocomp_recv_cleanup(MultiFDRecvParams *p)
> >  {
> > +    g_free(p->iov);
> > +    p->iov = NULL;
> >  }
> 
> Are recv_setup()/recv_cleanup() for zstd/zlib missing?

Zstd/zlib does not request the IOVs on the receiving side.
The zstd/zlib reads the compressed data into the buffer directly

qio_channel_read_all(p->c, (void *)z->zbuff, in_size, errp);

> If the iov will be managed by the compressors, I'm wondering whether we
> should start assert(p->iov) after send|recv_setup(), and assert(!p->iov)
> after send|recv_cleanup().  That'll guard all these.

Yes, I agree with adding a check of IOV in multifd.c since different compressors 
may have different requirements for IOV.

We can add assert(p->iov) after send_setup, not recv_setup. The sending side always
uses IOV to send pages by qio_channel_writev_full_all in multifd.c, but the receiving
side may not require the IOV for reading pages.

It is better to add assert(!p->iov) after send|recv_cleanup()

> >
> >  /**
> > @@ -783,8 +795,6 @@ static bool
> multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
> >      p->packet_len = 0;
> >      g_free(p->packet);
> >      p->packet = NULL;
> > -    g_free(p->iov);
> > -    p->iov = NULL;
> >      multifd_send_state->ops->send_cleanup(p, errp);
> >
> >      return *errp == NULL;
> > @@ -1179,11 +1189,6 @@ bool multifd_send_setup(void)
> >              p->packet = g_malloc0(p->packet_len);
> >              p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
> >              p->packet->version = cpu_to_be32(MULTIFD_VERSION);
> > -
> > -            /* We need one extra place for the packet header */
> > -            p->iov = g_new0(struct iovec, page_count + 1);
> > -        } else {
> > -            p->iov = g_new0(struct iovec, page_count);
> >          }
> >          p->name = g_strdup_printf("multifdsend_%d", i);
> >          p->page_size = qemu_target_page_size();
> > @@ -1353,8 +1358,6 @@ static void
> multifd_recv_cleanup_channel(MultiFDRecvParams *p)
> >      p->packet_len = 0;
> >      g_free(p->packet);
> >      p->packet = NULL;
> > -    g_free(p->iov);
> > -    p->iov = NULL;
> >      g_free(p->normal);
> >      p->normal = NULL;
> >      g_free(p->zero);
> > @@ -1602,7 +1605,6 @@ int multifd_recv_setup(Error **errp)
> >              p->packet = g_malloc0(p->packet_len);
> >          }
> >          p->name = g_strdup_printf("multifdrecv_%d", i);
> > -        p->iov = g_new0(struct iovec, page_count);
> >          p->normal = g_new0(ram_addr_t, page_count);
> >          p->zero = g_new0(ram_addr_t, page_count);
> >          p->page_count = page_count;
> > --
> > 2.39.3
> >
> 
> --
> Peter Xu


^ permalink raw reply	[flat|nested] 23+ messages in thread

* Re: [PATCH v6 2/7] migration/multifd: put IOV initialization into compression method
  2024-05-28 13:36     ` Liu, Yuan1
@ 2024-05-28 15:32       ` Peter Xu
  0 siblings, 0 replies; 23+ messages in thread
From: Peter Xu @ 2024-05-28 15:32 UTC (permalink / raw)
  To: Liu, Yuan1; +Cc: farosas, qemu-devel, Zou, Nanhai

On Tue, May 28, 2024 at 01:36:38PM +0000, Liu, Yuan1 wrote:
> > -----Original Message-----
> > From: Peter Xu <peterx@redhat.com>
> > Sent: Tuesday, May 28, 2024 4:51 AM
> > To: Liu, Yuan1 <yuan1.liu@intel.com>
> > Cc: farosas@suse.de; qemu-devel@nongnu.org; Zou, Nanhai
> > <nanhai.zou@intel.com>
> > Subject: Re: [PATCH v6 2/7] migration/multifd: put IOV initialization into
> > compression method
> > 
> > On Mon, May 06, 2024 at 12:57:46AM +0800, Yuan Liu wrote:
> > > Different compression methods may require different numbers of IOVs.
> > > Based on streaming compression of zlib and zstd, all pages will be
> > > compressed to a data block, so two IOVs are needed for packet header
> > > and compressed data block.
> > >
> > > Signed-off-by: Yuan Liu <yuan1.liu@intel.com>
> > > Reviewed-by: Nanhai Zou <nanhai.zou@intel.com>
> > > ---
> > >  migration/multifd-zlib.c |  7 +++++++
> > >  migration/multifd-zstd.c |  8 +++++++-
> > >  migration/multifd.c      | 22 ++++++++++++----------
> > >  3 files changed, 26 insertions(+), 11 deletions(-)
> > >
> > > diff --git a/migration/multifd-zlib.c b/migration/multifd-zlib.c
> > > index 737a9645d2..2ced69487e 100644
> > > --- a/migration/multifd-zlib.c
> > > +++ b/migration/multifd-zlib.c
> > > @@ -70,6 +70,10 @@ static int zlib_send_setup(MultiFDSendParams *p,
> > Error **errp)
> > >          goto err_free_zbuff;
> > >      }
> > >      p->compress_data = z;
> > > +
> > > +    /* Needs 2 IOVs, one for packet header and one for compressed data
> > */
> > > +    p->iov = g_new0(struct iovec, 2);
> > > +
> > >      return 0;
> > >
> > >  err_free_zbuff:
> > > @@ -101,6 +105,9 @@ static void zlib_send_cleanup(MultiFDSendParams *p,
> > Error **errp)
> > >      z->buf = NULL;
> > >      g_free(p->compress_data);
> > >      p->compress_data = NULL;
> > > +
> > > +    g_free(p->iov);
> > > +    p->iov = NULL;
> > >  }
> > >
> > >  /**
> > > diff --git a/migration/multifd-zstd.c b/migration/multifd-zstd.c
> > > index 256858df0a..ca17b7e310 100644
> > > --- a/migration/multifd-zstd.c
> > > +++ b/migration/multifd-zstd.c
> > > @@ -52,7 +52,6 @@ static int zstd_send_setup(MultiFDSendParams *p, Error
> > **errp)
> > >      struct zstd_data *z = g_new0(struct zstd_data, 1);
> > >      int res;
> > >
> > > -    p->compress_data = z;
> > >      z->zcs = ZSTD_createCStream();
> > >      if (!z->zcs) {
> > >          g_free(z);
> > > @@ -77,6 +76,10 @@ static int zstd_send_setup(MultiFDSendParams *p,
> > Error **errp)
> > >          error_setg(errp, "multifd %u: out of memory for zbuff", p->id);
> > >          return -1;
> > >      }
> > > +    p->compress_data = z;
> > > +
> > > +    /* Needs 2 IOVs, one for packet header and one for compressed data
> > */
> > > +    p->iov = g_new0(struct iovec, 2);
> > >      return 0;
> > >  }
> > >
> > > @@ -98,6 +101,9 @@ static void zstd_send_cleanup(MultiFDSendParams *p,
> > Error **errp)
> > >      z->zbuff = NULL;
> > >      g_free(p->compress_data);
> > >      p->compress_data = NULL;
> > > +
> > > +    g_free(p->iov);
> > > +    p->iov = NULL;
> > >  }
> > >
> > >  /**
> > > diff --git a/migration/multifd.c b/migration/multifd.c
> > > index f317bff077..d82885fdbb 100644
> > > --- a/migration/multifd.c
> > > +++ b/migration/multifd.c
> > > @@ -137,6 +137,13 @@ static int nocomp_send_setup(MultiFDSendParams *p,
> > Error **errp)
> > >          p->write_flags |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY;
> > >      }
> > >
> > > +    if (multifd_use_packets()) {
> > > +        /* We need one extra place for the packet header */
> > > +        p->iov = g_new0(struct iovec, p->page_count + 1);
> > > +    } else {
> > > +        p->iov = g_new0(struct iovec, p->page_count);
> > > +    }
> > > +
> > >      return 0;
> > >  }
> > >
> > > @@ -150,6 +157,8 @@ static int nocomp_send_setup(MultiFDSendParams *p,
> > Error **errp)
> > >   */
> > >  static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp)
> > >  {
> > > +    g_free(p->iov);
> > > +    p->iov = NULL;
> > >      return;
> > >  }
> > >
> > > @@ -228,6 +237,7 @@ static int nocomp_send_prepare(MultiFDSendParams *p,
> > Error **errp)
> > >   */
> > >  static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp)
> > >  {
> > > +    p->iov = g_new0(struct iovec, p->page_count);
> > >      return 0;
> > >  }
> > >
> > > @@ -240,6 +250,8 @@ static int nocomp_recv_setup(MultiFDRecvParams *p,
> > Error **errp)
> > >   */
> > >  static void nocomp_recv_cleanup(MultiFDRecvParams *p)
> > >  {
> > > +    g_free(p->iov);
> > > +    p->iov = NULL;
> > >  }
> > 
> > Are recv_setup()/recv_cleanup() for zstd/zlib missing?
> 
> Zstd/zlib does not request the IOVs on the receiving side.
> The zstd/zlib reads the compressed data into the buffer directly
> 
> qio_channel_read_all(p->c, (void *)z->zbuff, in_size, errp);

Hmm indeed, thanks.

> 
> > If the iov will be managed by the compressors, I'm wondering whether we
> > should start assert(p->iov) after send|recv_setup(), and assert(!p->iov)
> > after send|recv_cleanup().  That'll guard all these.
> 
> Yes, I agree with adding a check of IOV in multifd.c since different compressors 
> may have different requirements for IOV.
> 
> We can add assert(p->iov) after send_setup, not recv_setup. The sending side always
> uses IOV to send pages by qio_channel_writev_full_all in multifd.c, but the receiving
> side may not require the IOV for reading pages.
> 
> It is better to add assert(!p->iov) after send|recv_cleanup()

In that case maybe we should at some time move iov out of MultiFDRecvParams
as that's not a shared object, making MultiFDSendParams.compress_data
points to iov for nocomp case.  But we can leave that for later.

Then I've no question on this patch, feel free to take (with/without the
conditional assertions):

Reviewed-by: Peter Xu <peterx@redhat.com>

Thanks,

> 
> > >
> > >  /**
> > > @@ -783,8 +795,6 @@ static bool
> > multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
> > >      p->packet_len = 0;
> > >      g_free(p->packet);
> > >      p->packet = NULL;
> > > -    g_free(p->iov);
> > > -    p->iov = NULL;
> > >      multifd_send_state->ops->send_cleanup(p, errp);
> > >
> > >      return *errp == NULL;
> > > @@ -1179,11 +1189,6 @@ bool multifd_send_setup(void)
> > >              p->packet = g_malloc0(p->packet_len);
> > >              p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
> > >              p->packet->version = cpu_to_be32(MULTIFD_VERSION);
> > > -
> > > -            /* We need one extra place for the packet header */
> > > -            p->iov = g_new0(struct iovec, page_count + 1);
> > > -        } else {
> > > -            p->iov = g_new0(struct iovec, page_count);
> > >          }
> > >          p->name = g_strdup_printf("multifdsend_%d", i);
> > >          p->page_size = qemu_target_page_size();
> > > @@ -1353,8 +1358,6 @@ static void
> > multifd_recv_cleanup_channel(MultiFDRecvParams *p)
> > >      p->packet_len = 0;
> > >      g_free(p->packet);
> > >      p->packet = NULL;
> > > -    g_free(p->iov);
> > > -    p->iov = NULL;
> > >      g_free(p->normal);
> > >      p->normal = NULL;
> > >      g_free(p->zero);
> > > @@ -1602,7 +1605,6 @@ int multifd_recv_setup(Error **errp)
> > >              p->packet = g_malloc0(p->packet_len);
> > >          }
> > >          p->name = g_strdup_printf("multifdrecv_%d", i);
> > > -        p->iov = g_new0(struct iovec, page_count);
> > >          p->normal = g_new0(ram_addr_t, page_count);
> > >          p->zero = g_new0(ram_addr_t, page_count);
> > >          p->page_count = page_count;
> > > --
> > > 2.39.3
> > >
> > 
> > --
> > Peter Xu
> 

-- 
Peter Xu



^ permalink raw reply	[flat|nested] 23+ messages in thread

end of thread, other threads:[~2024-05-28 15:33 UTC | newest]

Thread overview: 23+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2024-05-05 16:57 [PATCH v6 0/7] Live Migration With IAA Yuan Liu
2024-05-05 16:57 ` [PATCH v6 1/7] docs/migration: add qpl compression feature Yuan Liu
2024-05-05 16:57 ` [PATCH v6 2/7] migration/multifd: put IOV initialization into compression method Yuan Liu
2024-05-10 20:21   ` Fabiano Rosas
2024-05-27 20:50   ` Peter Xu
2024-05-28 13:36     ` Liu, Yuan1
2024-05-28 15:32       ` Peter Xu
2024-05-05 16:57 ` [PATCH v6 3/7] configure: add --enable-qpl build option Yuan Liu
2024-05-05 16:57 ` [PATCH v6 4/7] migration/multifd: add qpl compression method Yuan Liu
2024-05-10 14:12   ` Fabiano Rosas
2024-05-10 14:23     ` Liu, Yuan1
2024-05-05 16:57 ` [PATCH v6 5/7] migration/multifd: implement initialization of qpl compression Yuan Liu
2024-05-10 20:45   ` Fabiano Rosas
2024-05-11 12:55     ` Liu, Yuan1
2024-05-10 20:52   ` Fabiano Rosas
2024-05-11 12:39     ` Liu, Yuan1
2024-05-05 16:57 ` [PATCH v6 6/7] migration/multifd: implement qpl compression and decompression Yuan Liu
2024-05-13 15:13   ` Fabiano Rosas
2024-05-14  6:30     ` Liu, Yuan1
2024-05-14 14:08       ` Fabiano Rosas
2024-05-15  6:36         ` Liu, Yuan1
2024-05-05 16:57 ` [PATCH v6 7/7] tests/migration-test: add qpl compression test Yuan Liu
2024-05-27 20:56   ` Peter Xu

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).