All of lore.kernel.org
 help / color / mirror / Atom feed
From: Yuan Liu <yuan1.liu@intel.com>
To: peterx@redhat.com, farosas@suse.de
Cc: qemu-devel@nongnu.org, hao.xiang@bytedance.com,
	bryan.zhang@bytedance.com, yuan1.liu@intel.com,
	nanhai.zou@intel.com
Subject: [PATCH v5 6/7] migration/multifd: implement qpl compression and decompression
Date: Wed, 20 Mar 2024 00:45:26 +0800	[thread overview]
Message-ID: <20240319164527.1873891-7-yuan1.liu@intel.com> (raw)
In-Reply-To: <20240319164527.1873891-1-yuan1.liu@intel.com>

each qpl job is used to (de)compress a normal page and it can
be processed independently by the IAA hardware. All qpl jobs
are submitted to the hardware at once, and wait for all jobs
completion.

Signed-off-by: Yuan Liu <yuan1.liu@intel.com>
Reviewed-by: Nanhai Zou <nanhai.zou@intel.com>
---
 migration/multifd-qpl.c | 229 +++++++++++++++++++++++++++++++++++++++-
 1 file changed, 225 insertions(+), 4 deletions(-)

diff --git a/migration/multifd-qpl.c b/migration/multifd-qpl.c
index 6de65e9da7..479b051b24 100644
--- a/migration/multifd-qpl.c
+++ b/migration/multifd-qpl.c
@@ -13,6 +13,7 @@
 #include "qemu/osdep.h"
 #include "qemu/module.h"
 #include "qapi/error.h"
+#include "exec/ramblock.h"
 #include "migration.h"
 #include "multifd.h"
 #include "qpl/qpl.h"
@@ -171,6 +172,112 @@ static void qpl_send_cleanup(MultiFDSendParams *p, Error **errp)
     p->compress_data = NULL;
 }
 
+static inline void prepare_job(qpl_job *job, uint8_t *input, uint32_t input_len,
+                               uint8_t *output, uint32_t output_len,
+                               bool is_compression)
+{
+    job->op = is_compression ? qpl_op_compress : qpl_op_decompress;
+    job->next_in_ptr = input;
+    job->next_out_ptr = output;
+    job->available_in = input_len;
+    job->available_out = output_len;
+    job->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST | QPL_FLAG_OMIT_VERIFY;
+    /* only supports one compression level */
+    job->level = 1;
+}
+
+/**
+ * set_raw_data_hdr: set the length of raw data
+ *
+ * If the length of the compressed output data is greater than or equal to
+ * the page size, then set the compressed data length to the data size and
+ * send raw data directly.
+ *
+ * @qpl: pointer to the QplData structure
+ * @index: the index of the compression job header
+ */
+static inline void set_raw_data_hdr(QplData *qpl, uint32_t index)
+{
+    assert(index < qpl->job_num);
+    qpl->zbuf_hdr[index] = cpu_to_be32(qpl->data_size);
+}
+
+/**
+ * is_raw_data: check if the data is raw data
+ *
+ * The raw data length is always equal to data size, which is the
+ * size of one page.
+ *
+ * Returns true if the data is raw data, otherwise false
+ *
+ * @qpl: pointer to the QplData structure
+ * @index: the index of the decompressed job header
+ */
+static inline bool is_raw_data(QplData *qpl, uint32_t index)
+{
+    assert(index < qpl->job_num);
+    return qpl->zbuf_hdr[index] == qpl->data_size;
+}
+
+static int run_comp_jobs(MultiFDSendParams *p, Error **errp)
+{
+    qpl_status status;
+    QplData *qpl = p->compress_data;
+    MultiFDPages_t *pages = p->pages;
+    uint32_t job_num = pages->normal_num;
+    qpl_job *job = NULL;
+    uint32_t off = 0;
+
+    assert(job_num <= qpl->job_num);
+    /* submit all compression jobs */
+    for (int i = 0; i < job_num; i++) {
+        job = qpl->job_array[i];
+        /* the compressed data size should be less than one page */
+        prepare_job(job, pages->block->host + pages->offset[i], qpl->data_size,
+                    qpl->zbuf + off, qpl->data_size - 1, true);
+retry:
+        status = qpl_submit_job(job);
+        if (status == QPL_STS_OK) {
+            off += qpl->data_size;
+        } else if (status == QPL_STS_QUEUES_ARE_BUSY_ERR) {
+            goto retry;
+        } else {
+            error_setg(errp, "multifd %u: qpl_submit_job failed with error %d",
+                       p->id, status);
+            return -1;
+        }
+    }
+
+    /* wait all jobs to complete */
+    for (int i = 0; i < job_num; i++) {
+        job = qpl->job_array[i];
+        status = qpl_wait_job(job);
+        if (status == QPL_STS_OK) {
+            qpl->zbuf_hdr[i] = cpu_to_be32(job->total_out);
+            p->iov[p->iovs_num].iov_len = job->total_out;
+            p->iov[p->iovs_num].iov_base = qpl->zbuf + (qpl->data_size * i);
+            p->next_packet_size += job->total_out;
+        } else if (status == QPL_STS_MORE_OUTPUT_NEEDED) {
+            /*
+             * the compression job does not fail, the output data
+             * size is larger than the provided memory size. In this
+             * case, raw data is sent directly to the destination.
+             */
+            set_raw_data_hdr(qpl, i);
+            p->iov[p->iovs_num].iov_len = qpl->data_size;
+            p->iov[p->iovs_num].iov_base = pages->block->host +
+                                           pages->offset[i];
+            p->next_packet_size += qpl->data_size;
+        } else {
+            error_setg(errp, "multifd %u: qpl_wait_job failed with error %d",
+                       p->id, status);
+            return -1;
+        }
+        p->iovs_num++;
+    }
+    return 0;
+}
+
 /**
  * qpl_send_prepare: prepare data to be able to send
  *
@@ -184,8 +291,28 @@ static void qpl_send_cleanup(MultiFDSendParams *p, Error **errp)
  */
 static int qpl_send_prepare(MultiFDSendParams *p, Error **errp)
 {
-    /* Implement in next patch */
-    return -1;
+    QplData *qpl = p->compress_data;
+    uint32_t hdr_size;
+
+    if (!multifd_send_prepare_common(p)) {
+        goto out;
+    }
+
+    assert(p->pages->normal_num <= qpl->job_num);
+    hdr_size = p->pages->normal_num * sizeof(uint32_t);
+    /* prepare the header that stores the lengths of all compressed data */
+    p->iov[1].iov_base = (uint8_t *) qpl->zbuf_hdr;
+    p->iov[1].iov_len = hdr_size;
+    p->iovs_num++;
+    p->next_packet_size += hdr_size;
+    if (run_comp_jobs(p, errp) != 0) {
+        return -1;
+    }
+
+out:
+    p->flags |= MULTIFD_FLAG_QPL;
+    multifd_send_fill_packet(p);
+    return 0;
 }
 
 /**
@@ -227,6 +354,60 @@ static void qpl_recv_cleanup(MultiFDRecvParams *p)
     p->compress_data = NULL;
 }
 
+static int run_decomp_jobs(MultiFDRecvParams *p, Error **errp)
+{
+    qpl_status status;
+    qpl_job *job;
+    QplData *qpl = p->compress_data;
+    uint32_t off = 0;
+    uint32_t job_num = p->normal_num;
+
+    assert(job_num <= qpl->job_num);
+    /* submit all decompression jobs */
+    for (int i = 0; i < job_num; i++) {
+        /* for the raw data, load it directly */
+        if (is_raw_data(qpl, i)) {
+            memcpy(p->host + p->normal[i], qpl->zbuf + off, qpl->data_size);
+            off += qpl->data_size;
+            continue;
+        }
+        job = qpl->job_array[i];
+        prepare_job(job, qpl->zbuf + off, qpl->zbuf_hdr[i],
+                    p->host + p->normal[i], qpl->data_size, false);
+retry:
+        status = qpl_submit_job(job);
+        if (status == QPL_STS_OK) {
+            off += qpl->zbuf_hdr[i];
+        } else if (status == QPL_STS_QUEUES_ARE_BUSY_ERR) {
+            goto retry;
+        } else {
+            error_setg(errp, "multifd %u: qpl_submit_job failed with error %d",
+                       p->id, status);
+            return -1;
+        }
+    }
+
+    /* wait all jobs to complete */
+    for (int i = 0; i < job_num; i++) {
+        if (is_raw_data(qpl, i)) {
+            continue;
+        }
+        job = qpl->job_array[i];
+        status = qpl_wait_job(job);
+        if (status != QPL_STS_OK) {
+            error_setg(errp, "multifd %u: qpl_wait_job failed with error %d",
+                       p->id, status);
+            return -1;
+        }
+        if (job->total_out != qpl->data_size) {
+            error_setg(errp, "multifd %u: decompressed len %u, expected len %u",
+                       p->id, job->total_out, qpl->data_size);
+            return -1;
+        }
+    }
+    return 0;
+}
+
 /**
  * qpl_recv: read the data from the channel into actual pages
  *
@@ -240,8 +421,48 @@ static void qpl_recv_cleanup(MultiFDRecvParams *p)
  */
 static int qpl_recv(MultiFDRecvParams *p, Error **errp)
 {
-    /* Implement in next patch */
-    return -1;
+    QplData *qpl = p->compress_data;
+    uint32_t in_size = p->next_packet_size;
+    uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
+    uint32_t hdr_len = p->normal_num * sizeof(uint32_t);
+    uint32_t data_len = 0;
+    int ret;
+
+    if (flags != MULTIFD_FLAG_QPL) {
+        error_setg(errp, "multifd %u: flags received %x flags expected %x",
+                   p->id, flags, MULTIFD_FLAG_QPL);
+        return -1;
+    }
+    multifd_recv_zero_page_process(p);
+    if (!p->normal_num) {
+        assert(in_size == 0);
+        return 0;
+    }
+
+    /* read compressed data lengths */
+    assert(hdr_len < in_size);
+    ret = qio_channel_read_all(p->c, (void *) qpl->zbuf_hdr, hdr_len, errp);
+    if (ret != 0) {
+        return ret;
+    }
+    assert(p->normal_num <= qpl->job_num);
+    for (int i = 0; i < p->normal_num; i++) {
+        qpl->zbuf_hdr[i] = be32_to_cpu(qpl->zbuf_hdr[i]);
+        data_len += qpl->zbuf_hdr[i];
+        assert(qpl->zbuf_hdr[i] <= qpl->data_size);
+    }
+
+    /* read compressed data */
+    assert(in_size == hdr_len + data_len);
+    ret = qio_channel_read_all(p->c, (void *) qpl->zbuf, data_len, errp);
+    if (ret != 0) {
+        return ret;
+    }
+
+    if (run_decomp_jobs(p, errp) != 0) {
+        return -1;
+    }
+    return 0;
 }
 
 static MultiFDMethods multifd_qpl_ops = {
-- 
2.39.3



  parent reply	other threads:[~2024-03-20  8:34 UTC|newest]

Thread overview: 45+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2024-03-19 16:45 [PATCH v5 0/7] Live Migration With IAA Yuan Liu
2024-03-19 16:45 ` [PATCH v5 1/7] docs/migration: add qpl compression feature Yuan Liu
2024-03-26 17:58   ` Peter Xu
2024-03-27  2:14     ` Liu, Yuan1
2024-03-19 16:45 ` [PATCH v5 2/7] migration/multifd: put IOV initialization into compression method Yuan Liu
2024-03-20 15:18   ` Fabiano Rosas
2024-03-20 15:32     ` Liu, Yuan1
2024-03-19 16:45 ` [PATCH v5 3/7] configure: add --enable-qpl build option Yuan Liu
2024-03-20  8:55   ` Thomas Huth
2024-03-20  8:56     ` Thomas Huth
2024-03-20 14:34       ` Liu, Yuan1
2024-03-20 10:31   ` Daniel P. Berrangé
2024-03-20 14:42     ` Liu, Yuan1
2024-03-19 16:45 ` [PATCH v5 4/7] migration/multifd: add qpl compression method Yuan Liu
2024-03-27 19:49   ` Peter Xu
2024-03-28  3:03     ` Liu, Yuan1
2024-03-19 16:45 ` [PATCH v5 5/7] migration/multifd: implement initialization of qpl compression Yuan Liu
2024-03-20 10:42   ` Daniel P. Berrangé
2024-03-20 15:02     ` Liu, Yuan1
2024-03-20 15:20       ` Daniel P. Berrangé
2024-03-20 16:04         ` Liu, Yuan1
2024-03-20 15:34       ` Peter Xu
2024-03-20 16:23         ` Liu, Yuan1
2024-03-20 20:31           ` Peter Xu
2024-03-21  1:37             ` Liu, Yuan1
2024-03-21 15:28               ` Peter Xu
2024-03-22  2:06                 ` Liu, Yuan1
2024-03-22 14:47                   ` Liu, Yuan1
2024-03-22 16:40                     ` Peter Xu
2024-03-27 19:25                       ` Peter Xu
2024-03-28  2:32                         ` Liu, Yuan1
2024-03-28 15:16                           ` Peter Xu
2024-03-29  2:04                             ` Liu, Yuan1
2024-03-19 16:45 ` Yuan Liu [this message]
2024-03-19 16:45 ` [PATCH v5 7/7] tests/migration-test: add qpl compression test Yuan Liu
2024-03-20 10:45   ` Daniel P. Berrangé
2024-03-20 15:30     ` Liu, Yuan1
2024-03-20 15:39       ` Daniel P. Berrangé
2024-03-20 16:26         ` Liu, Yuan1
2024-03-26 20:30 ` [PATCH v5 0/7] Live Migration With IAA Peter Xu
2024-03-27  3:20   ` Liu, Yuan1
2024-03-27 19:46     ` Peter Xu
2024-03-28  3:02       ` Liu, Yuan1
2024-03-28 15:22         ` Peter Xu
2024-03-29  3:33           ` Liu, Yuan1

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20240319164527.1873891-7-yuan1.liu@intel.com \
    --to=yuan1.liu@intel.com \
    --cc=bryan.zhang@bytedance.com \
    --cc=farosas@suse.de \
    --cc=hao.xiang@bytedance.com \
    --cc=nanhai.zou@intel.com \
    --cc=peterx@redhat.com \
    --cc=qemu-devel@nongnu.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.