All of lore.kernel.org
 help / color / mirror / Atom feed
* [Qemu-devel] [PATCH RFC v2 00/22] I/O prefetch cache
@ 2016-08-29 17:09 Pavel Butsykin
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 01/22] block/pcache: empty pcache driver filter Pavel Butsykin
                   ` (23 more replies)
  0 siblings, 24 replies; 43+ messages in thread
From: Pavel Butsykin @ 2016-08-29 17:09 UTC (permalink / raw)
  To: qemu-block, qemu-devel; +Cc: kwolf, mreitz, stefanha, den, jsnow, eblake, famz

The prefetch cache aims to improve the performance of sequential read data.
Of most interest here are the requests of a small size of data for sequential
read, such requests can be optimized by extending them and moving into 
the prefetch cache. However, there are 2 issues:
 - In aggregate only a small portion of requests is sequential, so delays caused
   by the need to read more volumes of data will lead to an overall decrease
   in performance.
 - The presence of redundant data in the cache memory with a large number of
   random requests.
This pcache implementation solves the above and other problems prefetching data.
The pcache algorithm can be summarised by the following main steps.

1. Monitor I/O requests to identify typical sequences.
This implementation of prefetch cache works at the storage system level and has 
information only about the physical block addresses of I/O requests. Statistics 
are collected only from read requests to a maximum size of 32kb(by default),
each request that matches the criteria falls into a pool of requests. In order
to store requests statistic used by the rb-tree(lreq.tree), it's simple but for
this issue a quite efficient data structure.

2. Identifying sequential I/O streams.
For each read request to be carried out attempting to lift the chain sequence 
from lreq.tree, where this request will be element of a sequential chain of 
requests. The key to search for consecutive requests is the area of sectors 
preceding the current request. The size of this area should not be too small to 
avoid false readahead. The sequential stream data requests can be identified
even when a large number of random requests. For example, if there is access to 
the blocks 100, 1157, 27520, 4, 101, 312, 1337, 102, in the context of request
processing 102 will be identified the chain of sequential requests 100, 101. 102
and then should a decision be made to do readahead. Also a situation may arise
when multiple applications A, B, C simultaneously perform sequential read of
data. For each separate application that will be sequential read data 
A(100, 101, 102), B(300, 301, 302), C(700, 701, 702), but for block devices it 
may look like a random data reading: 100,300,700,101,301,701,102,302,702. 
In this case, the sequential streams will also be recognised because location
requests in the rb-tree will allow to separate the sequential I/O streams.

3. Do the readahead into the cache for recognized sequential data streams.
After the issue of the detection of pcache case was resolved, need using larger 
requests to bring data into the cache. In this implementation the pcache used
readahead instead of the extension request, therefore the request goes as is. 
There is not any reason to put data in the cache that will never be picked up, 
but this will always happen in the case of extension requests. In order to store
areas of cached blocks is also used by the rb-tree(pcache.tree), it's simple but
for this issue a quite efficient data structure.

4. Control size of the prefetch cache pool and the requests statistic pool
For control the border of the pool statistic of requests, the data of requests 
are placed and replaced according to the FIFO principle, everything is simple.
For control the boundaries of the memory cache used LRU list, it allows to limit
the max amount memory that we can allocate for pcache. But the LRU is there
mainly to prevent displacement of the cache blocks that was read partially. 
The main way the memory is pushed out immediately after use, as soon as a chunk
of memory from the cache has been completely read, since the probability of
repetition of the request is very low. Cases when one and the same portion of
the cache memory has been read several times are not optimized and do not apply
to the cases that can optimize the pcache. Thus, using a cache memory of small
volume, by the optimization of the operations read-ahead and clear memory, we
can read entire volumes of data, providing a 100% cache hit. Also does not
decrease the effectiveness of random read requests.

PCache is implemented as a qemu block filter driver, has some configurable
parameters, such as: total cache size, readahead size, maximum size of block
that can be processed.

For performance evaluation has been used several test cases with different
sequential and random read data on SSD disk. Here are the results of tests and
qemu parameters:

qemu parameters: 
-M pc-i440fx-2.4 --enable-kvm -smp 4 -m 1024 
-drive file=centos7.qcow2,if=none,id=drive-virtio-disk0,format=qcow2,cache=none,
       aio=native,pcache-full-size=4MB,pcache-readahead-size=128KB,
       pcache-max-aio-size=32KB
-device virtio-blk-pci,scsi=off,bus=pci.0,addr=0x8,drive=drive-virtio-disk0,
        id=virtio-disk0
(-set device.virtio-disk0.x-data-plane=on)

********************************************************************************
* Testcase                        * Results in iops                            *
*                                 **********************************************
*                                 * clean qemu   * pcache       * x-data-plane *
********************************************************************************
* Create/open 16 file(s) of total * 25514 req/s  * 85659 req/s  * 28249 req/s  *
* size 2048.00 MB named           * 25692 req/s  * 89064 req/s  * 27950 req/s  *
* /tmp/tmp.tmp, start 4 thread(s) * 25836 req/s  * 84142 req/s  * 28120 req/s  *
* and do uncached sequential read *              *              *              *
* by 4KB blocks                   *              *              *              *
********************************************************************************
* Create/open 16 file(s) of total * 56006 req/s  * 92137 req/s  * 56992 req/s  *
* size 2048.00 MB named           * 55335 req/s  * 92269 req/s  * 57023 req/s  *
* /tmp/tmp.tmp, start 4 thread(s) * 55731 req/s  * 98722 req/s  * 56593 req/s  *
* and do uncached sequential read *              *              *              *
* by 4KB blocks with constant     *              *              *              *
********************************************************************************
* Create/open 16 file(s) of total * 14104 req/s  * 14164 req/s  * 13914 req/s  *
* size 2048.00 MB named           * 14130 req/s  * 14232 req/s  * 13613 req/s  *
* /tmp/tmp.tmp, start 4 thread(s) * 14183 req/s  * 14080 req/s  * 13374 req/s  *
* and do uncached random read by  *              *              *              *
* 4KB blocks                      *              *              *              *
********************************************************************************
* Create/open 16 file(s) of total * 23480 req/s  * 23483 req/s  * 20887 req/s  *
* size 2048.00 MB named           * 23070 req/s  * 22432 req/s  * 21127 req/s  *
* /tmp/tmp.tmp, start 4 thread(s) * 24090 req/s  * 23499 req/s  * 23415 req/s  *
* and do uncached random read by  *              *              *              *
* 4KB blocks with constant queue  *              *              *              *
* len 32                          *              *              *              *
********************************************************************************

TODO list:
- add tracepoints
- add migration support 
- add more explanations in the commit messages
- get rid of the additional allocation in pcache_node_find_and_create() and
  pcache_aio_readv()

Changes from v1:
- Fix failed automatic build test (11)

Pavel Butsykin (22):
  block/pcache: empty pcache driver filter
  block/pcache: add own AIOCB block
  util/rbtree: add rbtree from linux kernel
  block/pcache: add pcache debug build
  block/pcache: add aio requests into cache
  block/pcache: restrict cache size
  block/pcache: introduce LRU as method of memory
  block/pcache: implement pickup parts of the cache
  block/pcache: separation AIOCB on requests
  block/pcache: add check node leak
  add QEMU style defines for __sync_add_and_fetch
  block/pcache: implement read cache to qiov and drop node during aio
    write
  block/pcache: add generic request complete
  block/pcache: add support for rescheduling requests
  block/pcache: simple readahead one chunk forward
  block/pcache: pcache readahead node around
  block/pcache: skip readahead for non-sequential requests
  block/pcache: add pcache skip large aio read
  block/pcache: add pcache node assert
  block/pcache: implement pcache error handling of aio cb
  block/pcache: add write through node
  block/pcache: drop used pcache node

 block/Makefile.objs             |    1 +
 block/pcache.c                  | 1224 +++++++++++++++++++++++++++++++++++++++
 include/qemu/atomic.h           |    8 +
 include/qemu/rbtree.h           |  109 ++++
 include/qemu/rbtree_augmented.h |  237 ++++++++
 util/Makefile.objs              |    1 +
 util/rbtree.c                   |  570 ++++++++++++++++++
 7 files changed, 2150 insertions(+)
 create mode 100644 block/pcache.c
 create mode 100644 include/qemu/rbtree.h
 create mode 100644 include/qemu/rbtree_augmented.h
 create mode 100644 util/rbtree.c

-- 
2.8.3

^ permalink raw reply	[flat|nested] 43+ messages in thread

* [Qemu-devel] [PATCH RFC v2 01/22] block/pcache: empty pcache driver filter
  2016-08-29 17:09 [Qemu-devel] [PATCH RFC v2 00/22] I/O prefetch cache Pavel Butsykin
@ 2016-08-29 17:10 ` Pavel Butsykin
  2016-09-01 14:31   ` Kevin Wolf
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 02/22] block/pcache: add own AIOCB block Pavel Butsykin
                   ` (22 subsequent siblings)
  23 siblings, 1 reply; 43+ messages in thread
From: Pavel Butsykin @ 2016-08-29 17:10 UTC (permalink / raw)
  To: qemu-block, qemu-devel; +Cc: kwolf, mreitz, stefanha, den, jsnow, eblake, famz

The basic version of pcache driver for easy preparation of a patch set.

Signed-off-by: Pavel Butsykin <pbutsykin@virtuozzo.com>
---
 block/Makefile.objs |   1 +
 block/pcache.c      | 156 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 157 insertions(+)
 create mode 100644 block/pcache.c

diff --git a/block/Makefile.objs b/block/Makefile.objs
index 2593a2f..7c588ac 100644
--- a/block/Makefile.objs
+++ b/block/Makefile.objs
@@ -4,6 +4,7 @@ block-obj-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o qed-cluster.o
 block-obj-y += qed-check.o
 block-obj-$(CONFIG_VHDX) += vhdx.o vhdx-endian.o vhdx-log.o
 block-obj-y += quorum.o
+block-obj-y += pcache.o
 block-obj-y += parallels.o blkdebug.o blkverify.o blkreplay.o
 block-obj-y += block-backend.o snapshot.o qapi.o
 block-obj-$(CONFIG_WIN32) += raw-win32.o win32-aio.o
diff --git a/block/pcache.c b/block/pcache.c
new file mode 100644
index 0000000..770bbc0
--- /dev/null
+++ b/block/pcache.c
@@ -0,0 +1,156 @@
+/*
+ * Prefetch cache driver filter
+ *
+ * Copyright (c) 2016 Pavel Butsykin <pbutsykin@virtuozzo.com>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+#include "qemu/osdep.h"
+#include "block/block_int.h"
+#include "qapi/error.h"
+#include "qapi/qmp/qstring.h"
+
+
+static const AIOCBInfo pcache_aiocb_info = {
+    .aiocb_size = sizeof(BlockAIOCB),
+};
+
+static QemuOptsList runtime_opts = {
+    .name = "pcache",
+    .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head),
+    .desc = {
+        {
+            .name = "x-image",
+            .type = QEMU_OPT_STRING,
+            .help = "[internal use only, will be removed]",
+        },
+        { /* end of list */ }
+    },
+};
+
+static void pcache_aio_cb(void *opaque, int ret)
+{
+
+    BlockAIOCB *acb = opaque;
+
+    acb->cb(acb->opaque, ret);
+
+    qemu_aio_unref(acb);
+}
+
+static BlockAIOCB *pcache_aio_readv(BlockDriverState *bs,
+                                    int64_t sector_num,
+                                    QEMUIOVector *qiov,
+                                    int nb_sectors,
+                                    BlockCompletionFunc *cb,
+                                    void *opaque)
+{
+    BlockAIOCB *acb = qemu_aio_get(&pcache_aiocb_info, bs, cb, opaque);
+
+    bdrv_aio_readv(bs->file, sector_num, qiov, nb_sectors,
+                   pcache_aio_cb, acb);
+    return acb;
+}
+
+static BlockAIOCB *pcache_aio_writev(BlockDriverState *bs,
+                                     int64_t sector_num,
+                                     QEMUIOVector *qiov,
+                                     int nb_sectors,
+                                     BlockCompletionFunc *cb,
+                                     void *opaque)
+{
+    BlockAIOCB *acb = qemu_aio_get(&pcache_aiocb_info, bs, cb, opaque);
+
+    bdrv_aio_writev(bs->file, sector_num, qiov, nb_sectors,
+                    pcache_aio_cb, acb);
+    return acb;
+}
+
+static int pcache_file_open(BlockDriverState *bs, QDict *options, int flags,
+                            Error **errp)
+{
+    QemuOpts *opts;
+    Error *local_err = NULL;
+    int ret = 0;
+
+    opts = qemu_opts_create(&runtime_opts, NULL, 0, &error_abort);
+    qemu_opts_absorb_qdict(opts, options, &local_err);
+    if (local_err) {
+        error_propagate(errp, local_err);
+        ret = -EINVAL;
+        goto fail;
+    }
+
+    assert(bs->file == NULL);
+    bs->file = bdrv_open_child(qemu_opt_get(opts, "x-image"), options,
+                               "image", bs, &child_format, false, &local_err);
+    if (local_err) {
+        ret = -EINVAL;
+        error_propagate(errp, local_err);
+    }
+fail:
+    qemu_opts_del(opts);
+    return ret;
+}
+
+static void pcache_close(BlockDriverState *bs)
+{
+}
+
+static void pcache_parse_filename(const char *filename, QDict *options,
+                                  Error **errp)
+{
+    qdict_put(options, "x-image", qstring_from_str(filename));
+}
+
+static int64_t pcache_getlength(BlockDriverState *bs)
+{
+    return bdrv_getlength(bs->file->bs);
+}
+
+static bool pcache_recurse_is_first_non_filter(BlockDriverState *bs,
+                                               BlockDriverState *candidate)
+{
+    return bdrv_recurse_is_first_non_filter(bs->file->bs, candidate);
+}
+
+static BlockDriver bdrv_pcache = {
+    .format_name                        = "pcache",
+    .protocol_name                      = "pcache",
+    .instance_size                      = 0,
+
+    .bdrv_parse_filename                = pcache_parse_filename,
+    .bdrv_file_open                     = pcache_file_open,
+    .bdrv_close                         = pcache_close,
+    .bdrv_getlength                     = pcache_getlength,
+
+    .bdrv_aio_readv                     = pcache_aio_readv,
+    .bdrv_aio_writev                    = pcache_aio_writev,
+
+    .is_filter                          = true,
+    .bdrv_recurse_is_first_non_filter   = pcache_recurse_is_first_non_filter,
+};
+
+static void bdrv_cache_init(void)
+{
+    bdrv_register(&bdrv_pcache);
+}
+
+block_init(bdrv_cache_init);
-- 
2.8.3

^ permalink raw reply related	[flat|nested] 43+ messages in thread

* [Qemu-devel] [PATCH RFC v2 02/22] block/pcache: add own AIOCB block
  2016-08-29 17:09 [Qemu-devel] [PATCH RFC v2 00/22] I/O prefetch cache Pavel Butsykin
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 01/22] block/pcache: empty pcache driver filter Pavel Butsykin
@ 2016-08-29 17:10 ` Pavel Butsykin
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 03/22] util/rbtree: add rbtree from linux kernel Pavel Butsykin
                   ` (21 subsequent siblings)
  23 siblings, 0 replies; 43+ messages in thread
From: Pavel Butsykin @ 2016-08-29 17:10 UTC (permalink / raw)
  To: qemu-block, qemu-devel; +Cc: kwolf, mreitz, stefanha, den, jsnow, eblake, famz

Signed-off-by: Pavel Butsykin <pbutsykin@virtuozzo.com>
---
 block/pcache.c | 43 +++++++++++++++++++++++++++++++++++--------
 1 file changed, 35 insertions(+), 8 deletions(-)

diff --git a/block/pcache.c b/block/pcache.c
index 770bbc0..74a4bc4 100644
--- a/block/pcache.c
+++ b/block/pcache.c
@@ -24,12 +24,22 @@
 
 #include "qemu/osdep.h"
 #include "block/block_int.h"
+#include "block/raw-aio.h"
 #include "qapi/error.h"
 #include "qapi/qmp/qstring.h"
 
+typedef struct PrefCacheAIOCB {
+    BlockAIOCB common;
+
+    QEMUIOVector *qiov;
+    uint64_t sector_num;
+    uint32_t nb_sectors;
+    int      aio_type;
+    int      ret;
+} PrefCacheAIOCB;
 
 static const AIOCBInfo pcache_aiocb_info = {
-    .aiocb_size = sizeof(BlockAIOCB),
+    .aiocb_size = sizeof(PrefCacheAIOCB),
 };
 
 static QemuOptsList runtime_opts = {
@@ -47,14 +57,29 @@ static QemuOptsList runtime_opts = {
 
 static void pcache_aio_cb(void *opaque, int ret)
 {
+    PrefCacheAIOCB *acb = opaque;
 
-    BlockAIOCB *acb = opaque;
-
-    acb->cb(acb->opaque, ret);
+    acb->common.cb(acb->common.opaque, ret);
 
     qemu_aio_unref(acb);
 }
 
+static PrefCacheAIOCB *pcache_aio_get(BlockDriverState *bs, int64_t sector_num,
+                                      QEMUIOVector *qiov, int nb_sectors,
+                                      BlockCompletionFunc *cb, void *opaque,
+                                      int type)
+{
+    PrefCacheAIOCB *acb = qemu_aio_get(&pcache_aiocb_info, bs, cb, opaque);
+
+    acb->sector_num = sector_num;
+    acb->nb_sectors = nb_sectors;
+    acb->qiov = qiov;
+    acb->aio_type = type;
+    acb->ret = 0;
+
+    return acb;
+}
+
 static BlockAIOCB *pcache_aio_readv(BlockDriverState *bs,
                                     int64_t sector_num,
                                     QEMUIOVector *qiov,
@@ -62,11 +87,12 @@ static BlockAIOCB *pcache_aio_readv(BlockDriverState *bs,
                                     BlockCompletionFunc *cb,
                                     void *opaque)
 {
-    BlockAIOCB *acb = qemu_aio_get(&pcache_aiocb_info, bs, cb, opaque);
+    PrefCacheAIOCB *acb = pcache_aio_get(bs, sector_num, qiov, nb_sectors, cb,
+                                         opaque, QEMU_AIO_READ);
 
     bdrv_aio_readv(bs->file, sector_num, qiov, nb_sectors,
                    pcache_aio_cb, acb);
-    return acb;
+    return &acb->common;
 }
 
 static BlockAIOCB *pcache_aio_writev(BlockDriverState *bs,
@@ -76,11 +102,12 @@ static BlockAIOCB *pcache_aio_writev(BlockDriverState *bs,
                                      BlockCompletionFunc *cb,
                                      void *opaque)
 {
-    BlockAIOCB *acb = qemu_aio_get(&pcache_aiocb_info, bs, cb, opaque);
+    PrefCacheAIOCB *acb = pcache_aio_get(bs, sector_num, qiov, nb_sectors, cb,
+                                         opaque, QEMU_AIO_WRITE);
 
     bdrv_aio_writev(bs->file, sector_num, qiov, nb_sectors,
                     pcache_aio_cb, acb);
-    return acb;
+    return &acb->common;
 }
 
 static int pcache_file_open(BlockDriverState *bs, QDict *options, int flags,
-- 
2.8.3

^ permalink raw reply related	[flat|nested] 43+ messages in thread

* [Qemu-devel] [PATCH RFC v2 03/22] util/rbtree: add rbtree from linux kernel
  2016-08-29 17:09 [Qemu-devel] [PATCH RFC v2 00/22] I/O prefetch cache Pavel Butsykin
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 01/22] block/pcache: empty pcache driver filter Pavel Butsykin
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 02/22] block/pcache: add own AIOCB block Pavel Butsykin
@ 2016-08-29 17:10 ` Pavel Butsykin
  2016-09-01 14:37   ` Kevin Wolf
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 04/22] block/pcache: add pcache debug build Pavel Butsykin
                   ` (20 subsequent siblings)
  23 siblings, 1 reply; 43+ messages in thread
From: Pavel Butsykin @ 2016-08-29 17:10 UTC (permalink / raw)
  To: qemu-block, qemu-devel; +Cc: kwolf, mreitz, stefanha, den, jsnow, eblake, famz

Why don't we use rbtree from glib? We need  pointer to the parent node.
For optimal implementation storing of cached chunks in the rbtree
need to get next and previous nodes and content of parent node
is very useful for effective implementation of these functions. In this
implementation of rbtree (unlike rbtree of glib) the node contains a pointer
to parent node.  Moreover, this rbtree allows more flexibility to
work with an algorithm because to use rbtrees you'll have to implement
your own insert and search cores. This will avoid us to use callbacks and
to drop drammatically performances.

Signed-off-by: Pavel Butsykin <pbutsykin@virtuozzo.com>
---
 include/qemu/rbtree.h           | 109 ++++++++
 include/qemu/rbtree_augmented.h | 237 +++++++++++++++++
 util/Makefile.objs              |   1 +
 util/rbtree.c                   | 570 ++++++++++++++++++++++++++++++++++++++++
 4 files changed, 917 insertions(+)
 create mode 100644 include/qemu/rbtree.h
 create mode 100644 include/qemu/rbtree_augmented.h
 create mode 100644 util/rbtree.c

diff --git a/include/qemu/rbtree.h b/include/qemu/rbtree.h
new file mode 100644
index 0000000..c87a46f
--- /dev/null
+++ b/include/qemu/rbtree.h
@@ -0,0 +1,109 @@
+/*
+  Red Black Trees
+  (C) 1999  Andrea Arcangeli <andrea@suse.de>
+
+  This program is free software; you can redistribute it and/or modify
+  it under the terms of the GNU General Public License as published by
+  the Free Software Foundation; either version 2 of the License, or
+  (at your option) any later version.
+
+  This program is distributed in the hope that it will be useful,
+  but WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+  GNU General Public License for more details.
+
+  You should have received a copy of the GNU General Public License
+  along with this program; if not, write to the Free Software
+  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+
+  linux/include/linux/rbtree.h
+
+  To use rbtrees you'll have to implement your own insert and search cores.
+  This will avoid us to use callbacks and to drop drammatically performances.
+  I know it's not the cleaner way,  but in C (not in C++) to get
+  performances and genericity...
+
+  See Documentation/rbtree.txt for documentation and samples.
+*/
+
+#ifndef QEMU_RBTREE_H
+#define QEMU_RBTREE_H
+
+#include <unistd.h>
+#include <stdint.h>
+#include <stdbool.h>
+
+struct RbNode {
+    uintptr_t __rb_parent_color;
+    struct RbNode *rb_right;
+    struct RbNode *rb_left;
+} __attribute__((aligned(sizeof(uintptr_t))));
+    /* The alignment might seem pointless, but allegedly CRIS needs it */
+
+struct RbRoot {
+    struct RbNode *rb_node;
+};
+
+
+#define RB_PARENT(r) ((struct RbNode *)((r)->__rb_parent_color & ~3))
+
+#define RB_ROOT (struct RbRoot) { NULL, }
+#define RB_ENTRY(ptr, type, member) container_of(ptr, type, member)
+
+#define RB_EMPTY_ROOT(root) ((root)->rb_node == NULL)
+
+/* 'empty' nodes are nodes that are known not to be inserted in an rbtree */
+#define RB_EMPTY_NODE(node)  \
+    ((node)->__rb_parent_color == (uintptr_t)(node))
+#define RB_CLEAR_NODE(node)  \
+    ((node)->__rb_parent_color = (uintptr_t)(node))
+
+
+extern void rb_insert_color(struct RbNode *, struct RbRoot *);
+extern void rb_erase(struct RbNode *, struct RbRoot *);
+
+
+/* Find logical next and previous nodes in a tree */
+extern struct RbNode *rb_next(const struct RbNode *);
+extern struct RbNode *rb_prev(const struct RbNode *);
+extern struct RbNode *rb_first(const struct RbRoot *);
+extern struct RbNode *rb_last(const struct RbRoot *);
+
+/* Postorder iteration - always visit the parent after its children */
+extern struct RbNode *rb_first_postorder(const struct RbRoot *);
+extern struct RbNode *rb_next_postorder(const struct RbNode *);
+
+/* Fast replacement of a single node without remove/rebalance/add/rebalance */
+extern void rb_replace_node(struct RbNode *victim, struct RbNode *new,
+                            struct RbRoot *root);
+
+static inline void rb_link_node(struct RbNode *node, struct RbNode *parent,
+                                struct RbNode **rb_link)
+{
+    node->__rb_parent_color = (uintptr_t)parent;
+    node->rb_left = node->rb_right = NULL;
+
+    *rb_link = node;
+}
+
+#define RB_ENTRY_SAFE(ptr, type, member)                 \
+    ({ typeof(ptr) ____ptr = (ptr);                      \
+       ____ptr ? rb_entry(____ptr, type, member) : NULL; \
+    })
+
+/**
+ * rbtree_postorder_for_each_entry_safe - iterate over rb_root in post order of
+ * given type safe against removal of rb_node entry
+ *
+ * @pos:   the 'type *' to use as a loop cursor.
+ * @n:     another 'type *' to use as temporary storage
+ * @root:  'rb_root *' of the rbtree.
+ * @field: the name of the rb_node field within 'type'.
+ */
+#define RBTREE_POSTORDER_FOR_EACH_ENTRY_SAFE(pos, n, root, field)            \
+    for (pos = rb_entry_safe(rb_first_postorder(root), typeof(*pos), field); \
+         pos && ({ n = rb_entry_safe(rb_next_postorder(&pos->field),         \
+         typeof(*pos), field); 1; });                                        \
+         pos = n)
+
+#endif  /* QEMU_RBTREE_H */
diff --git a/include/qemu/rbtree_augmented.h b/include/qemu/rbtree_augmented.h
new file mode 100644
index 0000000..e880387
--- /dev/null
+++ b/include/qemu/rbtree_augmented.h
@@ -0,0 +1,237 @@
+/*
+  Red Black Trees
+  (C) 1999  Andrea Arcangeli <andrea@suse.de>
+  (C) 2002  David Woodhouse <dwmw2@infradead.org>
+  (C) 2012  Michel Lespinasse <walken@google.com>
+
+  This program is free software; you can redistribute it and/or modify
+  it under the terms of the GNU General Public License as published by
+  the Free Software Foundation; either version 2 of the License, or
+  (at your option) any later version.
+
+  This program is distributed in the hope that it will be useful,
+  but WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+  GNU General Public License for more details.
+
+  You should have received a copy of the GNU General Public License
+  along with this program; if not, write to the Free Software
+  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+
+  linux/include/linux/rbtree_augmented.h
+*/
+
+#ifndef QEMU_RBTREE_AUGMENTED_H
+#define QEMU_RBTREE_AUGMENTED_H
+
+#include "qemu/compiler.h"
+#include "qemu/rbtree.h"
+
+/*
+ * Please note - only struct RbAugmentCallbacks and the prototypes for
+ * rb_insert_augmented() and rb_erase_augmented() are intended to be public.
+ * The rest are implementation details you are not expected to depend on.
+ *
+ * See Documentation/rbtree.txt for documentation and samples.
+ */
+
+struct RbAugmentCallbacks {
+    void (*propagate)(struct RbNode *node, struct RbNode *stop);
+    void (*copy)(struct RbNode *old, struct RbNode *new);
+    void (*rotate)(struct RbNode *old, struct RbNode *new);
+};
+
+extern void __rb_insert_augmented(struct RbNode *node, struct RbRoot *root,
+    void (*augment_rotate)(struct RbNode *old, struct RbNode *new));
+/*
+ * Fixup the rbtree and update the augmented information when rebalancing.
+ *
+ * On insertion, the user must update the augmented information on the path
+ * leading to the inserted node, then call rb_link_node() as usual and
+ * rb_augment_inserted() instead of the usual rb_insert_color() call.
+ * If rb_augment_inserted() rebalances the rbtree, it will callback into
+ * a user provided function to update the augmented information on the
+ * affected subtrees.
+ */
+static inline void
+rb_insert_augmented(struct RbNode *node, struct RbRoot *root,
+                    const struct RbAugmentCallbacks *augment)
+{
+    __rb_insert_augmented(node, root, augment->rotate);
+}
+
+#define RB_DECLARE_CALLBACKS(rbstatic, rbname, rbstruct, rbfield, \
+                             rbtype, rbaugmented, rbcompute)      \
+static inline void                                                \
+rbname ## _propagate(struct RbNode *rb, struct RbNode *stop)      \
+{                                                                 \
+    while (rb != stop) {                                          \
+        rbstruct *node = rb_entry(rb, rbstruct, rbfield);         \
+        rbtype augmented = rbcompute(node);                       \
+        if (node->rbaugmented == augmented) {                     \
+            break;                                                \
+        }                                                         \
+        node->rbaugmented = augmented;                            \
+        rb = rb_parent(&node->rbfield);                           \
+    }                                                             \
+}                                                                 \
+static inline void                                                \
+rbname ## _copy(struct RbNode *rb_old, struct RbNode *rb_new)     \
+{                                                                 \
+    rbstruct *old = rb_entry(rb_old, rbstruct, rbfield);          \
+    rbstruct *new = rb_entry(rb_new, rbstruct, rbfield);          \
+    new->rbaugmented = old->rbaugmented;                          \
+}                                                                 \
+static void                                                       \
+rbname ## _rotate(struct RbNode *rb_old, struct RbNode *rb_new)   \
+{                                                                 \
+    rbstruct *old = rb_entry(rb_old, rbstruct, rbfield);          \
+    rbstruct *new = rb_entry(rb_new, rbstruct, rbfield);          \
+    new->rbaugmented = old->rbaugmented;                          \
+    old->rbaugmented = rbcompute(old);                            \
+}                                                                 \
+rbstatic const struct RbAugmentCallbacks rbname = {               \
+    rbname ## _propagate, rbname ## _copy, rbname ## _rotate      \
+};
+
+
+#define RB_RED   0
+#define RB_BLACK 1
+
+#define __RB_PARENT(pc)    ((struct RbNode *)(pc & ~3))
+
+#define __RB_COLOR(pc)     ((pc) & 1)
+#define __RB_IS_BLACK(pc)  __RB_COLOR(pc)
+#define __RB_IS_RED(pc)    (!__RB_COLOR(pc))
+#define RB_COLOR(rb)       __RB_COLOR((rb)->__rb_parent_color)
+#define RB_IS_RED(rb)      __RB_IS_RED((rb)->__rb_parent_color)
+#define RB_IS_BLACK(rb)    __RB_IS_BLACK((rb)->__rb_parent_color)
+
+static inline void rb_set_parent(struct RbNode *rb, struct RbNode *p)
+{
+    rb->__rb_parent_color = RB_COLOR(rb) | (uintptr_t)p;
+}
+
+static inline void rb_set_parent_color(struct RbNode *rb,
+                                       struct RbNode *p, int color)
+{
+    rb->__rb_parent_color = (uintptr_t)p | color;
+}
+
+static inline void
+__rb_change_child(struct RbNode *old, struct RbNode *new,
+                  struct RbNode *parent, struct RbRoot *root)
+{
+    if (parent) {
+        if (parent->rb_left == old) {
+            parent->rb_left = new;
+        } else {
+            parent->rb_right = new;
+        }
+    } else {
+        root->rb_node = new;
+    }
+}
+
+extern void __rb_erase_color(struct RbNode *parent, struct RbRoot *root,
+    void (*augment_rotate)(struct RbNode *old, struct RbNode *new));
+
+static inline struct RbNode *
+__rb_erase_augmented(struct RbNode *node, struct RbRoot *root,
+                     const struct RbAugmentCallbacks *augment)
+{
+    struct RbNode *child = node->rb_right, *tmp = node->rb_left;
+    struct RbNode *parent, *rebalance;
+    uintptr_t pc;
+
+    if (!tmp) {
+        /*
+         * Case 1: node to erase has no more than 1 child (easy!)
+         *
+         * Note that if there is one child it must be red due to 5)
+         * and node must be black due to 4). We adjust colors locally
+         * so as to bypass __rb_erase_color() later on.
+         */
+        pc = node->__rb_parent_color;
+        parent = __RB_PARENT(pc);
+        __rb_change_child(node, child, parent, root);
+        if (child) {
+            child->__rb_parent_color = pc;
+            rebalance = NULL;
+        } else {
+            rebalance = __RB_IS_BLACK(pc) ? parent : NULL;
+        }
+        tmp = parent;
+    } else if (!child) {
+        /* Still case 1, but this time the child is node->rb_left */
+        tmp->__rb_parent_color = pc = node->__rb_parent_color;
+        parent = __RB_PARENT(pc);
+        __rb_change_child(node, tmp, parent, root);
+        rebalance = NULL;
+        tmp = parent;
+    } else {
+        struct RbNode *successor = child, *child2;
+        tmp = child->rb_left;
+        if (!tmp) {
+            /*
+             * Case 2: node's successor is its right child
+             *
+             *    (n)          (s)
+             *    / \          / \
+             *  (x) (s)  ->  (x) (c)
+             *        \
+             *        (c)
+             */
+            parent = successor;
+            child2 = successor->rb_right;
+            augment->copy(node, successor);
+        } else {
+            /*
+             * Case 3: node's successor is leftmost under
+             * node's right child subtree
+             *
+             *    (n)          (s)
+             *    / \          / \
+             *  (x) (y)  ->  (x) (y)
+             *      /            /
+             *    (p)          (p)
+             *    /            /
+             *  (s)          (c)
+             *    \
+             *    (c)
+             */
+            do {
+                parent = successor;
+                successor = tmp;
+                tmp = tmp->rb_left;
+            } while (tmp);
+            parent->rb_left = child2 = successor->rb_right;
+            successor->rb_right = child;
+            rb_set_parent(child, successor);
+            augment->copy(node, successor);
+            augment->propagate(parent, successor);
+        }
+
+        successor->rb_left = tmp = node->rb_left;
+        rb_set_parent(tmp, successor);
+
+        pc = node->__rb_parent_color;
+        tmp = __RB_PARENT(pc);
+        __rb_change_child(node, successor, tmp, root);
+        if (child2) {
+            successor->__rb_parent_color = pc;
+            rb_set_parent_color(child2, parent, RB_BLACK);
+            rebalance = NULL;
+        } else {
+            unsigned long pc2 = successor->__rb_parent_color;
+            successor->__rb_parent_color = pc;
+            rebalance = __RB_IS_BLACK(pc2) ? parent : NULL;
+        }
+        tmp = successor;
+    }
+
+    augment->propagate(tmp, NULL);
+    return rebalance;
+}
+
+#endif /* QEMU_RBTREE_AUGMENTED_H */
diff --git a/util/Makefile.objs b/util/Makefile.objs
index 96cb1e0..5b4b790 100644
--- a/util/Makefile.objs
+++ b/util/Makefile.objs
@@ -35,3 +35,4 @@ util-obj-y += log.o
 util-obj-y += qdist.o
 util-obj-y += qht.o
 util-obj-y += range.o
+util-obj-y += rbtree.o
diff --git a/util/rbtree.c b/util/rbtree.c
new file mode 100644
index 0000000..704dcea
--- /dev/null
+++ b/util/rbtree.c
@@ -0,0 +1,570 @@
+/*
+  Red Black Trees
+  (C) 1999  Andrea Arcangeli <andrea@suse.de>
+  (C) 2002  David Woodhouse <dwmw2@infradead.org>
+  (C) 2012  Michel Lespinasse <walken@google.com>
+
+  This program is free software; you can redistribute it and/or modify
+  it under the terms of the GNU General Public License as published by
+  the Free Software Foundation; either version 2 of the License, or
+  (at your option) any later version.
+
+  This program is distributed in the hope that it will be useful,
+  but WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+  GNU General Public License for more details.
+
+  You should have received a copy of the GNU General Public License
+  along with this program; if not, write to the Free Software
+  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+*/
+
+#include "qemu/rbtree_augmented.h"
+
+/*
+ * red-black trees properties:  http://en.wikipedia.org/wiki/Rbtree
+ *
+ *  1) A node is either red or black
+ *  2) The root is black
+ *  3) All leaves (NULL) are black
+ *  4) Both children of every red node are black
+ *  5) Every simple path from root to leaves contains the same number
+ *     of black nodes.
+ *
+ *  4 and 5 give the O(log n) guarantee, since 4 implies you cannot have two
+ *  consecutive red nodes in a path and every red node is therefore followed by
+ *  a black. So if B is the number of black nodes on every simple path (as per
+ *  5), then the longest possible path due to 4 is 2B.
+ *
+ *  We shall indicate color with case, where black nodes are uppercase and red
+ *  nodes will be lowercase. Unknown color nodes shall be drawn as red within
+ *  parentheses and have some accompanying text comment.
+ */
+
+static inline void rb_set_black(struct RbNode *rb)
+{
+    rb->__rb_parent_color |= RB_BLACK;
+}
+
+static inline struct RbNode *rb_red_parent(struct RbNode *red)
+{
+    return (struct RbNode *)red->__rb_parent_color;
+}
+
+/*
+ * Helper function for rotations:
+ * - old's parent and color get assigned to new
+ * - old gets assigned new as a parent and 'color' as a color.
+ */
+static inline void
+__rb_rotate_set_parents(struct RbNode *old, struct RbNode *new,
+                        struct RbRoot *root, int color)
+{
+    struct RbNode *parent = RB_PARENT(old);
+    new->__rb_parent_color = old->__rb_parent_color;
+    rb_set_parent_color(old, new, color);
+    __rb_change_child(old, new, parent, root);
+}
+
+static inline void
+__rb_insert(struct RbNode *node, struct RbRoot *root,
+            void (*augment_rotate)(struct RbNode *old, struct RbNode *new))
+{
+    struct RbNode *parent = rb_red_parent(node), *gparent, *tmp;
+
+    while (true) {
+        /*
+        * Loop invariant: node is red
+        *
+        * If there is a black parent, we are done.
+        * Otherwise, take some corrective action as we don't
+        * want a red root or two consecutive red nodes.
+        */
+        if (!parent) {
+            rb_set_parent_color(node, NULL, RB_BLACK);
+            break;
+        } else if (RB_IS_BLACK(parent)) {
+            break;
+        }
+
+        gparent = rb_red_parent(parent);
+
+        tmp = gparent->rb_right;
+        if (parent != tmp) {    /* parent == gparent->rb_left */
+            if (tmp && RB_IS_RED(tmp)) {
+                /*
+                 * Case 1 - color flips
+                 *
+                 *       G            g
+                 *      / \          / \
+                 *     p   u  -->   P   U
+                 *    /            /
+                 *   n            n
+                 *
+                 * However, since g's parent might be red, and
+                 * 4) does not allow this, we need to recurse
+                 * at g.
+                 */
+                rb_set_parent_color(tmp, gparent, RB_BLACK);
+                rb_set_parent_color(parent, gparent, RB_BLACK);
+                node = gparent;
+                parent = RB_PARENT(node);
+                rb_set_parent_color(node, parent, RB_RED);
+                continue;
+            }
+
+            tmp = parent->rb_right;
+            if (node == tmp) {
+                /*
+                 * Case 2 - left rotate at parent
+                 *
+                 *      G             G
+                 *     / \           / \
+                 *    p   U  -->    n   U
+                 *     \           /
+                 *      n         p
+                 *
+                 * This still leaves us in violation of 4), the
+                 * continuation into Case 3 will fix that.
+                 */
+                parent->rb_right = tmp = node->rb_left;
+                node->rb_left = parent;
+                if (tmp) {
+                    rb_set_parent_color(tmp, parent, RB_BLACK);
+                }
+                rb_set_parent_color(parent, node, RB_RED);
+                augment_rotate(parent, node);
+                parent = node;
+                tmp = node->rb_right;
+            }
+
+            /*
+             * Case 3 - right rotate at gparent
+             *
+             *        G           P
+             *       / \         / \
+             *      p   U  -->  n   g
+             *     /                 \
+             *    n                   U
+             */
+            gparent->rb_left = tmp;  /* == parent->rb_right */
+            parent->rb_right = gparent;
+            if (tmp) {
+                rb_set_parent_color(tmp, gparent, RB_BLACK);
+            }
+            __rb_rotate_set_parents(gparent, parent, root, RB_RED);
+            augment_rotate(gparent, parent);
+            break;
+        } else {
+            tmp = gparent->rb_left;
+            if (tmp && RB_IS_RED(tmp)) {
+                /* Case 1 - color flips */
+                rb_set_parent_color(tmp, gparent, RB_BLACK);
+                rb_set_parent_color(parent, gparent, RB_BLACK);
+                node = gparent;
+                parent = RB_PARENT(node);
+                rb_set_parent_color(node, parent, RB_RED);
+                continue;
+            }
+
+            tmp = parent->rb_left;
+            if (node == tmp) {
+                /* Case 2 - right rotate at parent */
+                parent->rb_left = tmp = node->rb_right;
+                node->rb_right = parent;
+                if (tmp) {
+                    rb_set_parent_color(tmp, parent, RB_BLACK);
+                }
+                rb_set_parent_color(parent, node, RB_RED);
+                augment_rotate(parent, node);
+                parent = node;
+                tmp = node->rb_left;
+            }
+
+            /* Case 3 - left rotate at gparent */
+            gparent->rb_right = tmp;  /* == parent->rb_left */
+            parent->rb_left = gparent;
+            if (tmp) {
+                rb_set_parent_color(tmp, gparent, RB_BLACK);
+            }
+            __rb_rotate_set_parents(gparent, parent, root, RB_RED);
+            augment_rotate(gparent, parent);
+            break;
+        }
+    }
+}
+
+/*
+ * Inline version for rb_erase() use - we want to be able to inline
+ * and eliminate the dummy_rotate callback there
+ */
+static inline void
+____rb_erase_color(struct RbNode *parent, struct RbRoot *root,
+                   void (*augment_rotate)(struct RbNode *old,
+                                          struct RbNode *new))
+{
+    struct RbNode *node = NULL, *sibling, *tmp1, *tmp2;
+
+    while (true) {
+        /*
+         * Loop invariants:
+         * - node is black (or NULL on first iteration)
+         * - node is not the root (parent is not NULL)
+         * - All leaf paths going through parent and node have a
+         *   black node count that is 1 lower than other leaf paths.
+         */
+        sibling = parent->rb_right;
+        if (node != sibling) {    /* node == parent->rb_left */
+            if (RB_IS_RED(sibling)) {
+                /*
+                 * Case 1 - left rotate at parent
+                 *
+                 *     P               S
+                 *    / \             / \
+                 *   N   s    -->    p   Sr
+                 *      / \         / \
+                 *     Sl  Sr      N   Sl
+                 */
+                parent->rb_right = tmp1 = sibling->rb_left;
+                sibling->rb_left = parent;
+                rb_set_parent_color(tmp1, parent, RB_BLACK);
+                __rb_rotate_set_parents(parent, sibling, root,
+                            RB_RED);
+                augment_rotate(parent, sibling);
+                sibling = tmp1;
+            }
+            tmp1 = sibling->rb_right;
+            if (!tmp1 || RB_IS_BLACK(tmp1)) {
+                tmp2 = sibling->rb_left;
+                if (!tmp2 || RB_IS_BLACK(tmp2)) {
+                    /*
+                     * Case 2 - sibling color flip
+                     * (p could be either color here)
+                     *
+                     *    (p)           (p)
+                     *    / \           / \
+                     *   N   S    -->  N   s
+                     *      / \           / \
+                     *     Sl  Sr        Sl  Sr
+                     *
+                     * This leaves us violating 5) which
+                     * can be fixed by flipping p to black
+                     * if it was red, or by recursing at p.
+                     * p is red when coming from Case 1.
+                     */
+                    rb_set_parent_color(sibling, parent,
+                                RB_RED);
+                    if (RB_IS_RED(parent)) {
+                        rb_set_black(parent);
+                    } else {
+                        node = parent;
+                        parent = RB_PARENT(node);
+                        if (parent) {
+                            continue;
+                        }
+                    }
+                    break;
+                }
+                /*
+                 * Case 3 - right rotate at sibling
+                 * (p could be either color here)
+                 *
+                 *   (p)           (p)
+                 *   / \           / \
+                 *  N   S    -->  N   Sl
+                 *     / \             \
+                 *    sl  Sr            s
+                 *                       \
+                 *                        Sr
+                 */
+                sibling->rb_left = tmp1 = tmp2->rb_right;
+                tmp2->rb_right = sibling;
+                parent->rb_right = tmp2;
+                if (tmp1) {
+                    rb_set_parent_color(tmp1, sibling, RB_BLACK);
+                }
+                augment_rotate(sibling, tmp2);
+                tmp1 = sibling;
+                sibling = tmp2;
+            }
+            /*
+             * Case 4 - left rotate at parent + color flips
+             * (p and sl could be either color here.
+             *  After rotation, p becomes black, s acquires
+             *  p's color, and sl keeps its color)
+             *
+             *      (p)             (s)
+             *      / \             / \
+             *     N   S     -->   P   Sr
+             *        / \         / \
+             *      (sl) sr      N  (sl)
+             */
+            parent->rb_right = tmp2 = sibling->rb_left;
+            sibling->rb_left = parent;
+            rb_set_parent_color(tmp1, sibling, RB_BLACK);
+            if (tmp2) {
+                rb_set_parent(tmp2, parent);
+            }
+            __rb_rotate_set_parents(parent, sibling, root,
+                        RB_BLACK);
+            augment_rotate(parent, sibling);
+            break;
+        } else {
+            sibling = parent->rb_left;
+            if (RB_IS_RED(sibling)) {
+                /* Case 1 - right rotate at parent */
+                parent->rb_left = tmp1 = sibling->rb_right;
+                sibling->rb_right = parent;
+                rb_set_parent_color(tmp1, parent, RB_BLACK);
+                __rb_rotate_set_parents(parent, sibling, root,
+                            RB_RED);
+                augment_rotate(parent, sibling);
+                sibling = tmp1;
+            }
+            tmp1 = sibling->rb_left;
+            if (!tmp1 || RB_IS_BLACK(tmp1)) {
+                tmp2 = sibling->rb_right;
+                if (!tmp2 || RB_IS_BLACK(tmp2)) {
+                    /* Case 2 - sibling color flip */
+                    rb_set_parent_color(sibling, parent,
+                                RB_RED);
+                    if (RB_IS_RED(parent)) {
+                        rb_set_black(parent);
+                    } else {
+                        node = parent;
+                        parent = RB_PARENT(node);
+                        if (parent) {
+                            continue;
+                        }
+                    }
+                    break;
+                }
+                /* Case 3 - right rotate at sibling */
+                sibling->rb_right = tmp1 = tmp2->rb_left;
+                tmp2->rb_left = sibling;
+                parent->rb_left = tmp2;
+                if (tmp1) {
+                    rb_set_parent_color(tmp1, sibling,
+                                RB_BLACK);
+                }
+                augment_rotate(sibling, tmp2);
+                tmp1 = sibling;
+                sibling = tmp2;
+            }
+            /* Case 4 - left rotate at parent + color flips */
+            parent->rb_left = tmp2 = sibling->rb_right;
+            sibling->rb_right = parent;
+            rb_set_parent_color(tmp1, sibling, RB_BLACK);
+            if (tmp2) {
+                rb_set_parent(tmp2, parent);
+            }
+            __rb_rotate_set_parents(parent, sibling, root,
+                        RB_BLACK);
+            augment_rotate(parent, sibling);
+            break;
+        }
+    }
+}
+
+/* Non-inline version for rb_erase_augmented() use */
+void __rb_erase_color(struct RbNode *parent, struct RbRoot *root,
+                      void (*augment_rotate)(struct RbNode *old,
+                                             struct RbNode *new))
+{
+    ____rb_erase_color(parent, root, augment_rotate);
+}
+
+/*
+ * Non-augmented rbtree manipulation functions.
+ *
+ * We use dummy augmented callbacks here, and have the compiler optimize them
+ * out of the rb_insert_color() and rb_erase() function definitions.
+ */
+
+static inline void dummy_propagate(struct RbNode *node, struct RbNode *stop) {}
+static inline void dummy_copy(struct RbNode *old, struct RbNode *new) {}
+static inline void dummy_rotate(struct RbNode *old, struct RbNode *new) {}
+
+static const struct RbAugmentCallbacks dummy_callbacks = {
+    dummy_propagate, dummy_copy, dummy_rotate
+};
+
+void rb_insert_color(struct RbNode *node, struct RbRoot *root)
+{
+    __rb_insert(node, root, dummy_rotate);
+}
+
+void rb_erase(struct RbNode *node, struct RbRoot *root)
+{
+    struct RbNode *rebalance;
+    rebalance = __rb_erase_augmented(node, root, &dummy_callbacks);
+    if (rebalance) {
+        ____rb_erase_color(rebalance, root, dummy_rotate);
+    }
+}
+
+/*
+ * Augmented rbtree manipulation functions.
+ *
+ * This instantiates the same __always_inline functions as in the non-augmented
+ * case, but this time with user-defined callbacks.
+ */
+
+void __rb_insert_augmented(struct RbNode *node, struct RbRoot *root,
+                           void (*augment_rotate)(struct RbNode *old,
+                                                  struct RbNode *new))
+{
+    __rb_insert(node, root, augment_rotate);
+}
+
+/*
+ * This function returns the first node (in sort order) of the tree.
+ */
+struct RbNode *rb_first(const struct RbRoot *root)
+{
+    struct RbNode    *n;
+
+    n = root->rb_node;
+    if (!n) {
+        return NULL;
+    }
+    while (n->rb_left) {
+        n = n->rb_left;
+    }
+    return n;
+}
+
+struct RbNode *rb_last(const struct RbRoot *root)
+{
+    struct RbNode    *n;
+
+    n = root->rb_node;
+    if (!n) {
+        return NULL;
+    }
+    while (n->rb_right) {
+        n = n->rb_right;
+    }
+    return n;
+}
+
+struct RbNode *rb_next(const struct RbNode *node)
+{
+    struct RbNode *parent;
+
+    if (RB_EMPTY_NODE(node)) {
+        return NULL;
+    }
+
+    /*
+     * If we have a right-hand child, go down and then left as far
+     * as we can.
+     */
+    if (node->rb_right) {
+        node = node->rb_right;
+        while (node->rb_left) {
+            node = node->rb_left;
+        }
+        return (struct RbNode *)node;
+    }
+
+    /*
+     * No right-hand children. Everything down and left is smaller than us,
+     * so any 'next' node must be in the general direction of our parent.
+     * Go up the tree; any time the ancestor is a right-hand child of its
+     * parent, keep going up. First time it's a left-hand child of its
+     * parent, said parent is our 'next' node.
+     */
+    while ((parent = RB_PARENT(node)) && node == parent->rb_right) {
+        node = parent;
+    }
+    return parent;
+}
+
+struct RbNode *rb_prev(const struct RbNode *node)
+{
+    struct RbNode *parent;
+
+    if (RB_EMPTY_NODE(node)) {
+        return NULL;
+    }
+
+    /*
+     * If we have a left-hand child, go down and then right as far
+     * as we can.
+     */
+    if (node->rb_left) {
+        node = node->rb_left;
+        while (node->rb_right) {
+            node = node->rb_right;
+        }
+        return (struct RbNode *)node;
+    }
+
+    /*
+     * No left-hand children. Go up till we find an ancestor which
+     * is a right-hand child of its parent.
+     */
+    while ((parent = RB_PARENT(node)) && node == parent->rb_left) {
+        node = parent;
+    }
+    return parent;
+}
+
+void rb_replace_node(struct RbNode *victim, struct RbNode *new,
+             struct RbRoot *root)
+{
+    struct RbNode *parent = RB_PARENT(victim);
+
+    /* Set the surrounding nodes to point to the replacement */
+    __rb_change_child(victim, new, parent, root);
+    if (victim->rb_left) {
+        rb_set_parent(victim->rb_left, new);
+    }
+    if (victim->rb_right) {
+        rb_set_parent(victim->rb_right, new);
+    }
+    /* Copy the pointers/colour from the victim to the replacement */
+    *new = *victim;
+}
+
+static struct RbNode *rb_left_deepest_node(const struct RbNode *node)
+{
+    for (;;) {
+        if (node->rb_left) {
+            node = node->rb_left;
+        } else if (node->rb_right) {
+            node = node->rb_right;
+        } else {
+            return (struct RbNode *)node;
+        }
+    }
+}
+
+struct RbNode *rb_next_postorder(const struct RbNode *node)
+{
+    const struct RbNode *parent;
+    if (!node) {
+        return NULL;
+    }
+    parent = RB_PARENT(node);
+
+    /* If we're sitting on node, we've already seen our children */
+    if (parent && node == parent->rb_left && parent->rb_right) {
+        /* If we are the parent's left node, go to the parent's right
+         * node then all the way down to the left */
+        return rb_left_deepest_node(parent->rb_right);
+    } else {
+        /* Otherwise we are the parent's right node, and the parent
+         * should be next */
+        return (struct RbNode *)parent;
+    }
+}
+
+struct RbNode *rb_first_postorder(const struct RbRoot *root)
+{
+    if (!root->rb_node) {
+        return NULL;
+    }
+    return rb_left_deepest_node(root->rb_node);
+}
-- 
2.8.3

^ permalink raw reply related	[flat|nested] 43+ messages in thread

* [Qemu-devel] [PATCH RFC v2 04/22] block/pcache: add pcache debug build
  2016-08-29 17:09 [Qemu-devel] [PATCH RFC v2 00/22] I/O prefetch cache Pavel Butsykin
                   ` (2 preceding siblings ...)
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 03/22] util/rbtree: add rbtree from linux kernel Pavel Butsykin
@ 2016-08-29 17:10 ` Pavel Butsykin
  2016-09-08 15:11   ` Eric Blake
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 05/22] block/pcache: add aio requests into cache Pavel Butsykin
                   ` (19 subsequent siblings)
  23 siblings, 1 reply; 43+ messages in thread
From: Pavel Butsykin @ 2016-08-29 17:10 UTC (permalink / raw)
  To: qemu-block, qemu-devel; +Cc: kwolf, mreitz, stefanha, den, jsnow, eblake, famz

Signed-off-by: Pavel Butsykin <pbutsykin@virtuozzo.com>
---
 block/pcache.c | 9 +++++++++
 1 file changed, 9 insertions(+)

diff --git a/block/pcache.c b/block/pcache.c
index 74a4bc4..7f221d6 100644
--- a/block/pcache.c
+++ b/block/pcache.c
@@ -28,6 +28,15 @@
 #include "qapi/error.h"
 #include "qapi/qmp/qstring.h"
 
+#define PCACHE_DEBUG
+
+#ifdef PCACHE_DEBUG
+#define DPRINTF(fmt, ...) \
+        printf("%s:%s:%d "fmt, __FILE__, __func__, __LINE__, ## __VA_ARGS__)
+#else
+#define DPRINTF(fmt, ...) do { } while (0)
+#endif
+
 typedef struct PrefCacheAIOCB {
     BlockAIOCB common;
 
-- 
2.8.3

^ permalink raw reply related	[flat|nested] 43+ messages in thread

* [Qemu-devel] [PATCH RFC v2 05/22] block/pcache: add aio requests into cache
  2016-08-29 17:09 [Qemu-devel] [PATCH RFC v2 00/22] I/O prefetch cache Pavel Butsykin
                   ` (3 preceding siblings ...)
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 04/22] block/pcache: add pcache debug build Pavel Butsykin
@ 2016-08-29 17:10 ` Pavel Butsykin
  2016-09-01 15:28   ` Kevin Wolf
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 06/22] block/pcache: restrict cache size Pavel Butsykin
                   ` (18 subsequent siblings)
  23 siblings, 1 reply; 43+ messages in thread
From: Pavel Butsykin @ 2016-08-29 17:10 UTC (permalink / raw)
  To: qemu-block, qemu-devel; +Cc: kwolf, mreitz, stefanha, den, jsnow, eblake, famz

For storing requests use an rbtree, here are add basic operations on the
rbtree to work with  cache nodes.

Signed-off-by: Pavel Butsykin <pbutsykin@virtuozzo.com>
---
 block/pcache.c | 190 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 189 insertions(+), 1 deletion(-)

diff --git a/block/pcache.c b/block/pcache.c
index 7f221d6..f5022f9 100644
--- a/block/pcache.c
+++ b/block/pcache.c
@@ -27,6 +27,7 @@
 #include "block/raw-aio.h"
 #include "qapi/error.h"
 #include "qapi/qmp/qstring.h"
+#include "qemu/rbtree.h"
 
 #define PCACHE_DEBUG
 
@@ -37,9 +38,53 @@
 #define DPRINTF(fmt, ...) do { } while (0)
 #endif
 
+typedef struct RbNodeKey {
+    uint64_t    num;
+    uint32_t    size;
+} RbNodeKey;
+
+typedef struct BlockNode {
+    struct RbNode               rb_node;
+    union {
+        RbNodeKey               key;
+        struct {
+            uint64_t            sector_num;
+            uint32_t            nb_sectors;
+        };
+    };
+    QTAILQ_ENTRY(BlockNode) entry;
+} BlockNode;
+
+typedef struct PCNode {
+    BlockNode cm;
+
+    uint8_t                  *data;
+} PCNode;
+
+typedef struct ReqStor {
+    struct {
+        struct RbRoot root;
+        CoMutex       lock;
+    } tree;
+
+    uint32_t curr_size;
+} ReqStor;
+
+typedef struct BDRVPCacheState {
+    BlockDriverState **bs;
+
+    ReqStor pcache;
+
+    struct {
+        QTAILQ_HEAD(pcache_head, BlockNode) head;
+        CoMutex lock;
+    } list;
+} BDRVPCacheState;
+
 typedef struct PrefCacheAIOCB {
     BlockAIOCB common;
 
+    BDRVPCacheState *s;
     QEMUIOVector *qiov;
     uint64_t sector_num;
     uint32_t nb_sectors;
@@ -64,6 +109,124 @@ static QemuOptsList runtime_opts = {
     },
 };
 
+#define PCNODE(_n) ((PCNode *)(_n))
+
+static int pcache_key_cmp(const RbNodeKey *key1, const RbNodeKey *key2)
+{
+    assert(key1 != NULL);
+    assert(key2 != NULL);
+
+    if (key1->num >= key2->num + key2->size) {
+        return 1;
+    }
+    if (key1->num + key1->size <= key2->num) {
+        return -1;
+    }
+
+    return 0;
+}
+
+static void *node_insert(struct RbRoot *root, BlockNode *node)
+{
+    struct RbNode **new = &(root->rb_node), *parent = NULL;
+
+    /* Figure out where to put new node */
+    while (*new) {
+        BlockNode *this = container_of(*new, BlockNode, rb_node);
+        int result = pcache_key_cmp(&node->key, &this->key);
+        if (result == 0) {
+            return this;
+        }
+        parent = *new;
+        new = result < 0 ? &((*new)->rb_left) : &((*new)->rb_right);
+    }
+    /* Add new node and rebalance tree. */
+    rb_link_node(&node->rb_node, parent, new);
+    rb_insert_color(&node->rb_node, root);
+
+    return node;
+}
+
+static inline PCNode *pcache_node_insert(struct RbRoot *root, PCNode *node)
+{
+    return node_insert(root, &node->cm);
+}
+
+static inline void pcache_node_free(PCNode *node)
+{
+    g_free(node->data);
+    g_slice_free1(sizeof(*node), node);
+}
+
+static inline void *pcache_node_alloc(RbNodeKey* key)
+{
+    PCNode *node = g_slice_alloc(sizeof(*node));
+
+    node->cm.sector_num = key->num;
+    node->cm.nb_sectors = key->size;
+    node->data = g_malloc(node->cm.nb_sectors << BDRV_SECTOR_BITS);
+
+    return node;
+}
+
+static bool pcache_node_find_and_create(PrefCacheAIOCB *acb, RbNodeKey *key,
+                                        PCNode **out_node)
+{
+    BDRVPCacheState *s = acb->s;
+    PCNode *new_node = pcache_node_alloc(key);
+    PCNode *found;
+
+    qemu_co_mutex_lock(&s->pcache.tree.lock);
+    found = pcache_node_insert(&s->pcache.tree.root, new_node);
+    qemu_co_mutex_unlock(&s->pcache.tree.lock);
+    if (found != new_node) {
+        pcache_node_free(new_node);
+        *out_node = found;
+        return false;
+    }
+    atomic_add(&s->pcache.curr_size, new_node->cm.nb_sectors);
+
+    qemu_co_mutex_lock(&s->list.lock);
+    QTAILQ_INSERT_HEAD(&s->list.head, &new_node->cm, entry);
+    qemu_co_mutex_unlock(&s->list.lock);
+
+    *out_node = new_node;
+    return true;
+}
+
+static inline void prefetch_init_key(PrefCacheAIOCB *acb, RbNodeKey* key)
+{
+    key->num = acb->sector_num;
+    key->size = acb->nb_sectors;
+}
+
+enum {
+    PREFETCH_NEW_NODE  = 0,
+    PREFETCH_FULL_UP   = 1,
+    PREFETCH_PART_UP   = 2
+};
+
+static int32_t pcache_prefetch(PrefCacheAIOCB *acb)
+{
+    RbNodeKey key;
+    PCNode *node = NULL;
+
+    prefetch_init_key(acb, &key);
+    if (pcache_node_find_and_create(acb, &key, &node)) {
+        return PREFETCH_NEW_NODE;
+    }
+
+    /* Node covers the whole request */
+    if (node->cm.sector_num <= acb->sector_num &&
+        node->cm.sector_num + node->cm.nb_sectors >= acb->sector_num +
+                                                     acb->nb_sectors)
+    {
+        return PREFETCH_FULL_UP;
+    }
+
+    return PREFETCH_PART_UP;
+}
+
 static void pcache_aio_cb(void *opaque, int ret)
 {
     PrefCacheAIOCB *acb = opaque;
@@ -80,6 +243,7 @@ static PrefCacheAIOCB *pcache_aio_get(BlockDriverState *bs, int64_t sector_num,
 {
     PrefCacheAIOCB *acb = qemu_aio_get(&pcache_aiocb_info, bs, cb, opaque);
 
+    acb->s = bs->opaque;
     acb->sector_num = sector_num;
     acb->nb_sectors = nb_sectors;
     acb->qiov = qiov;
@@ -99,6 +263,8 @@ static BlockAIOCB *pcache_aio_readv(BlockDriverState *bs,
     PrefCacheAIOCB *acb = pcache_aio_get(bs, sector_num, qiov, nb_sectors, cb,
                                          opaque, QEMU_AIO_READ);
 
+    pcache_prefetch(acb);
+
     bdrv_aio_readv(bs->file, sector_num, qiov, nb_sectors,
                    pcache_aio_cb, acb);
     return &acb->common;
@@ -119,6 +285,17 @@ static BlockAIOCB *pcache_aio_writev(BlockDriverState *bs,
     return &acb->common;
 }
 
+static void pcache_state_init(QemuOpts *opts, BDRVPCacheState *s)
+{
+    DPRINTF("pcache configure:\n");
+
+    s->pcache.tree.root = RB_ROOT;
+    qemu_co_mutex_init(&s->pcache.tree.lock);
+    QTAILQ_INIT(&s->list.head);
+    qemu_co_mutex_init(&s->list.lock);
+    s->pcache.curr_size = 0;
+}
+
 static int pcache_file_open(BlockDriverState *bs, QDict *options, int flags,
                             Error **errp)
 {
@@ -140,7 +317,9 @@ static int pcache_file_open(BlockDriverState *bs, QDict *options, int flags,
     if (local_err) {
         ret = -EINVAL;
         error_propagate(errp, local_err);
+        goto fail;
     }
+    pcache_state_init(opts, bs->opaque);
 fail:
     qemu_opts_del(opts);
     return ret;
@@ -148,6 +327,15 @@ fail:
 
 static void pcache_close(BlockDriverState *bs)
 {
+    uint32_t cnt = 0;
+    BDRVPCacheState *s = bs->opaque;
+    BlockNode *node, *next;
+    QTAILQ_FOREACH_SAFE(node, &s->list.head, entry, next) {
+        QTAILQ_REMOVE(&s->list.head, node, entry);
+        pcache_node_free(PCNODE(node));
+        cnt++;
+    }
+    DPRINTF("used %d nodes\n", cnt);
 }
 
 static void pcache_parse_filename(const char *filename, QDict *options,
@@ -170,7 +358,7 @@ static bool pcache_recurse_is_first_non_filter(BlockDriverState *bs,
 static BlockDriver bdrv_pcache = {
     .format_name                        = "pcache",
     .protocol_name                      = "pcache",
-    .instance_size                      = 0,
+    .instance_size                      = sizeof(BDRVPCacheState),
 
     .bdrv_parse_filename                = pcache_parse_filename,
     .bdrv_file_open                     = pcache_file_open,
-- 
2.8.3

^ permalink raw reply related	[flat|nested] 43+ messages in thread

* [Qemu-devel] [PATCH RFC v2 06/22] block/pcache: restrict cache size
  2016-08-29 17:09 [Qemu-devel] [PATCH RFC v2 00/22] I/O prefetch cache Pavel Butsykin
                   ` (4 preceding siblings ...)
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 05/22] block/pcache: add aio requests into cache Pavel Butsykin
@ 2016-08-29 17:10 ` Pavel Butsykin
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 07/22] block/pcache: introduce LRU as method of memory Pavel Butsykin
                   ` (17 subsequent siblings)
  23 siblings, 0 replies; 43+ messages in thread
From: Pavel Butsykin @ 2016-08-29 17:10 UTC (permalink / raw)
  To: qemu-block, qemu-devel; +Cc: kwolf, mreitz, stefanha, den, jsnow, eblake, famz

Add pcache-full-size open parameter.

Signed-off-by: Pavel Butsykin <pbutsykin@virtuozzo.com>
---
 block/pcache.c | 22 +++++++++++++++++++++-
 1 file changed, 21 insertions(+), 1 deletion(-)

diff --git a/block/pcache.c b/block/pcache.c
index f5022f9..54d4526 100644
--- a/block/pcache.c
+++ b/block/pcache.c
@@ -79,6 +79,8 @@ typedef struct BDRVPCacheState {
         QTAILQ_HEAD(pcache_head, BlockNode) head;
         CoMutex lock;
     } list;
+
+    uint32_t cfg_cache_size;
 } BDRVPCacheState;
 
 typedef struct PrefCacheAIOCB {
@@ -96,6 +98,8 @@ static const AIOCBInfo pcache_aiocb_info = {
     .aiocb_size = sizeof(PrefCacheAIOCB),
 };
 
+#define PCACHE_OPT_CACHE_SIZE "pcache-full-size"
+
 static QemuOptsList runtime_opts = {
     .name = "pcache",
     .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head),
@@ -105,10 +109,19 @@ static QemuOptsList runtime_opts = {
             .type = QEMU_OPT_STRING,
             .help = "[internal use only, will be removed]",
         },
+        {
+            .name = PCACHE_OPT_CACHE_SIZE,
+            .type = QEMU_OPT_SIZE,
+            .help = "Total cache size",
+        },
         { /* end of list */ }
     },
 };
 
+#define KB_BITS 10
+#define MB_BITS 20
+#define PCACHE_DEFAULT_CACHE_SIZE (4 << MB_BITS)
+
 #define PCNODE(_n) ((PCNode *)(_n))
 
 static int pcache_key_cmp(const RbNodeKey *key1, const RbNodeKey *key2)
@@ -263,7 +276,9 @@ static BlockAIOCB *pcache_aio_readv(BlockDriverState *bs,
     PrefCacheAIOCB *acb = pcache_aio_get(bs, sector_num, qiov, nb_sectors, cb,
                                          opaque, QEMU_AIO_READ);
 
-    pcache_prefetch(acb);
+    if (acb->s->pcache.curr_size < acb->s->cfg_cache_size) {
+        pcache_prefetch(acb);
+    }
 
     bdrv_aio_readv(bs->file, sector_num, qiov, nb_sectors,
                    pcache_aio_cb, acb);
@@ -287,13 +302,18 @@ static BlockAIOCB *pcache_aio_writev(BlockDriverState *bs,
 
 static void pcache_state_init(QemuOpts *opts, BDRVPCacheState *s)
 {
+    uint64_t cache_size = qemu_opt_get_size(opts, PCACHE_OPT_CACHE_SIZE,
+                                            PCACHE_DEFAULT_CACHE_SIZE);
     DPRINTF("pcache configure:\n");
+    DPRINTF("pcache-full-size = %jd\n", cache_size);
 
     s->pcache.tree.root = RB_ROOT;
     qemu_co_mutex_init(&s->pcache.tree.lock);
     QTAILQ_INIT(&s->list.head);
     qemu_co_mutex_init(&s->list.lock);
     s->pcache.curr_size = 0;
+
+    s->cfg_cache_size = cache_size >> BDRV_SECTOR_BITS;
 }
 
 static int pcache_file_open(BlockDriverState *bs, QDict *options, int flags,
-- 
2.8.3

^ permalink raw reply related	[flat|nested] 43+ messages in thread

* [Qemu-devel] [PATCH RFC v2 07/22] block/pcache: introduce LRU as method of memory
  2016-08-29 17:09 [Qemu-devel] [PATCH RFC v2 00/22] I/O prefetch cache Pavel Butsykin
                   ` (5 preceding siblings ...)
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 06/22] block/pcache: restrict cache size Pavel Butsykin
@ 2016-08-29 17:10 ` Pavel Butsykin
  2016-09-02  8:49   ` Kevin Wolf
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 08/22] block/pcache: implement pickup parts of the cache Pavel Butsykin
                   ` (16 subsequent siblings)
  23 siblings, 1 reply; 43+ messages in thread
From: Pavel Butsykin @ 2016-08-29 17:10 UTC (permalink / raw)
  To: qemu-block, qemu-devel; +Cc: kwolf, mreitz, stefanha, den, jsnow, eblake, famz

This is a simple solution to the problem of displacement of cache memory.
The LRU can be useful to avoid the displacement of the nodes, which have
been partially read.

Signed-off-by: Pavel Butsykin <pbutsykin@virtuozzo.com>
---
 block/pcache.c | 74 +++++++++++++++++++++++++++++++++++++++++++++-------------
 1 file changed, 58 insertions(+), 16 deletions(-)

diff --git a/block/pcache.c b/block/pcache.c
index 54d4526..7504db8 100644
--- a/block/pcache.c
+++ b/block/pcache.c
@@ -67,6 +67,11 @@ typedef struct ReqStor {
         CoMutex       lock;
     } tree;
 
+    struct {
+        QTAILQ_HEAD(lru_head, BlockNode) list;
+        CoMutex lock;
+    } lru;
+
     uint32_t curr_size;
 } ReqStor;
 
@@ -75,12 +80,11 @@ typedef struct BDRVPCacheState {
 
     ReqStor pcache;
 
-    struct {
-        QTAILQ_HEAD(pcache_head, BlockNode) head;
-        CoMutex lock;
-    } list;
-
     uint32_t cfg_cache_size;
+
+#ifdef PCACHE_DEBUG
+    uint64_t shrink_cnt_node;
+#endif
 } BDRVPCacheState;
 
 typedef struct PrefCacheAIOCB {
@@ -182,6 +186,44 @@ static inline void *pcache_node_alloc(RbNodeKey* key)
     return node;
 }
 
+static void pcache_node_drop(BDRVPCacheState *s, PCNode *node)
+{
+    atomic_sub(&s->pcache.curr_size, node->cm.nb_sectors);
+
+    qemu_co_mutex_lock(&s->pcache.lru.lock);
+    QTAILQ_REMOVE(&s->pcache.lru.list, &node->cm, entry);
+    qemu_co_mutex_unlock(&s->pcache.lru.lock);
+
+    qemu_co_mutex_lock(&s->pcache.tree.lock);
+    rb_erase(&node->cm.rb_node, &s->pcache.tree.root);
+    qemu_co_mutex_unlock(&s->pcache.tree.lock);
+
+    pcache_node_free(node);
+}
+
+static void pcache_try_shrink(BDRVPCacheState *s)
+{
+    while (s->pcache.curr_size > s->cfg_cache_size) {
+        qemu_co_mutex_lock(&s->pcache.lru.lock);
+        assert(!QTAILQ_EMPTY(&s->pcache.lru.list));
+        PCNode *rmv_node = PCNODE(QTAILQ_LAST(&s->pcache.lru.list, lru_head));
+        qemu_co_mutex_unlock(&s->pcache.lru.lock);
+
+        pcache_node_drop(s, rmv_node);
+#ifdef PCACHE_DEBUG
+        atomic_inc(&s->shrink_cnt_node);
+#endif
+    }
+}
+
+static inline void pcache_lru_node_up(BDRVPCacheState *s, PCNode *node)
+{
+    qemu_co_mutex_lock(&s->pcache.lru.lock);
+    QTAILQ_REMOVE(&s->pcache.lru.list, &node->cm, entry);
+    QTAILQ_INSERT_HEAD(&s->pcache.lru.list, &node->cm, entry);
+    qemu_co_mutex_unlock(&s->pcache.lru.lock);
+}
+
 static bool pcache_node_find_and_create(PrefCacheAIOCB *acb, RbNodeKey *key,
                                         PCNode **out_node)
 {
@@ -194,14 +236,17 @@ static bool pcache_node_find_and_create(PrefCacheAIOCB *acb, RbNodeKey *key,
     qemu_co_mutex_unlock(&s->pcache.tree.lock);
     if (found != new_node) {
         pcache_node_free(new_node);
+        pcache_lru_node_up(s, found);
         *out_node = found;
         return false;
     }
     atomic_add(&s->pcache.curr_size, new_node->cm.nb_sectors);
 
-    qemu_co_mutex_lock(&s->list.lock);
-    QTAILQ_INSERT_HEAD(&s->list.head, &new_node->cm, entry);
-    qemu_co_mutex_unlock(&s->list.lock);
+    qemu_co_mutex_lock(&s->pcache.lru.lock);
+    QTAILQ_INSERT_HEAD(&s->pcache.lru.list, &new_node->cm, entry);
+    qemu_co_mutex_unlock(&s->pcache.lru.lock);
+
+    pcache_try_shrink(s);
 
     *out_node = new_node;
     return true;
@@ -275,10 +320,7 @@ static BlockAIOCB *pcache_aio_readv(BlockDriverState *bs,
 {
     PrefCacheAIOCB *acb = pcache_aio_get(bs, sector_num, qiov, nb_sectors, cb,
                                          opaque, QEMU_AIO_READ);
-
-    if (acb->s->pcache.curr_size < acb->s->cfg_cache_size) {
-        pcache_prefetch(acb);
-    }
+    pcache_prefetch(acb);
 
     bdrv_aio_readv(bs->file, sector_num, qiov, nb_sectors,
                    pcache_aio_cb, acb);
@@ -309,8 +351,8 @@ static void pcache_state_init(QemuOpts *opts, BDRVPCacheState *s)
 
     s->pcache.tree.root = RB_ROOT;
     qemu_co_mutex_init(&s->pcache.tree.lock);
-    QTAILQ_INIT(&s->list.head);
-    qemu_co_mutex_init(&s->list.lock);
+    QTAILQ_INIT(&s->pcache.lru.list);
+    qemu_co_mutex_init(&s->pcache.lru.lock);
     s->pcache.curr_size = 0;
 
     s->cfg_cache_size = cache_size >> BDRV_SECTOR_BITS;
@@ -350,8 +392,8 @@ static void pcache_close(BlockDriverState *bs)
     uint32_t cnt = 0;
     BDRVPCacheState *s = bs->opaque;
     BlockNode *node, *next;
-    QTAILQ_FOREACH_SAFE(node, &s->list.head, entry, next) {
-        QTAILQ_REMOVE(&s->list.head, node, entry);
+    QTAILQ_FOREACH_SAFE(node, &s->pcache.lru.list, entry, next) {
+        QTAILQ_REMOVE(&s->pcache.lru.list, node, entry);
         pcache_node_free(PCNODE(node));
         cnt++;
     }
-- 
2.8.3

^ permalink raw reply related	[flat|nested] 43+ messages in thread

* [Qemu-devel] [PATCH RFC v2 08/22] block/pcache: implement pickup parts of the cache
  2016-08-29 17:09 [Qemu-devel] [PATCH RFC v2 00/22] I/O prefetch cache Pavel Butsykin
                   ` (6 preceding siblings ...)
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 07/22] block/pcache: introduce LRU as method of memory Pavel Butsykin
@ 2016-08-29 17:10 ` Pavel Butsykin
  2016-09-02  8:59   ` Kevin Wolf
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 09/22] block/pcache: separation AIOCB on requests Pavel Butsykin
                   ` (15 subsequent siblings)
  23 siblings, 1 reply; 43+ messages in thread
From: Pavel Butsykin @ 2016-08-29 17:10 UTC (permalink / raw)
  To: qemu-block, qemu-devel; +Cc: kwolf, mreitz, stefanha, den, jsnow, eblake, famz

Implementation of obtaining fragments of the cache belonging to one area
of request. This will allow to handle the case when a request is partially
hits the cache.

Signed-off-by: Pavel Butsykin <pbutsykin@virtuozzo.com>
---
 block/pcache.c | 60 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 59 insertions(+), 1 deletion(-)

diff --git a/block/pcache.c b/block/pcache.c
index 7504db8..28bd056 100644
--- a/block/pcache.c
+++ b/block/pcache.c
@@ -143,6 +143,24 @@ static int pcache_key_cmp(const RbNodeKey *key1, const RbNodeKey *key2)
     return 0;
 }
 
+static BlockNode *pcache_node_prev(BlockNode* node, RbNodeKey *key)
+{
+    while (node) {
+        struct RbNode *prev_rb_node = rb_prev(&node->rb_node);
+        BlockNode *prev_node;
+        if (prev_rb_node == NULL) {
+            break;
+        }
+        prev_node = container_of(prev_rb_node, BlockNode, rb_node);
+        if (prev_node->sector_num + prev_node->nb_sectors <= key->num) {
+            break;
+        }
+        node = prev_node;
+    }
+
+    return node;
+}
+
 static void *node_insert(struct RbRoot *root, BlockNode *node)
 {
     struct RbNode **new = &(root->rb_node), *parent = NULL;
@@ -152,7 +170,7 @@ static void *node_insert(struct RbRoot *root, BlockNode *node)
         BlockNode *this = container_of(*new, BlockNode, rb_node);
         int result = pcache_key_cmp(&node->key, &this->key);
         if (result == 0) {
-            return this;
+            return pcache_node_prev(this, &node->key);
         }
         parent = *new;
         new = result < 0 ? &((*new)->rb_left) : &((*new)->rb_right);
@@ -258,6 +276,45 @@ static inline void prefetch_init_key(PrefCacheAIOCB *acb, RbNodeKey* key)
     key->size = acb->nb_sectors;
 }
 
+static void pcache_pickup_parts_of_cache(PrefCacheAIOCB *acb, PCNode *node,
+                                         uint64_t num, uint32_t size)
+{
+    uint32_t up_size;
+
+    do {
+        if (num < node->cm.sector_num) {
+            PCNode *new_node;
+            RbNodeKey lc_key = {
+                .num = num,
+                .size = node->cm.sector_num - num,
+            };
+            up_size = lc_key.size;
+
+            if (!pcache_node_find_and_create(acb, &lc_key, &new_node)) {
+                node = new_node;
+                continue;
+            }
+            size -= up_size;
+            num += up_size;
+        }
+        /* XXX: node read */
+        up_size = MIN(node->cm.sector_num + node->cm.nb_sectors - num, size);
+
+        size -= up_size;
+        num += up_size;
+        if (size != 0) {
+            RbNodeKey lc_key = {
+                .num = num,
+                .size = size,
+            };
+            if (pcache_node_find_and_create(acb, &lc_key, &node)) {
+                size -= lc_key.size;
+                assert(size == 0);
+            }
+        }
+    } while (size);
+}
+
 enum {
     PREFETCH_NEW_NODE  = 0,
     PREFETCH_FULL_UP   = 1,
@@ -281,6 +338,7 @@ static int32_t pcache_prefetch(PrefCacheAIOCB *acb)
     {
         return PREFETCH_FULL_UP;
     }
+    pcache_pickup_parts_of_cache(acb, node, key.num, key.size);
 
     return PREFETCH_PART_UP;
 }
-- 
2.8.3

^ permalink raw reply related	[flat|nested] 43+ messages in thread

* [Qemu-devel] [PATCH RFC v2 09/22] block/pcache: separation AIOCB on requests
  2016-08-29 17:09 [Qemu-devel] [PATCH RFC v2 00/22] I/O prefetch cache Pavel Butsykin
                   ` (7 preceding siblings ...)
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 08/22] block/pcache: implement pickup parts of the cache Pavel Butsykin
@ 2016-08-29 17:10 ` Pavel Butsykin
  2016-09-02  9:10   ` Kevin Wolf
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 10/22] block/pcache: add check node leak Pavel Butsykin
                   ` (14 subsequent siblings)
  23 siblings, 1 reply; 43+ messages in thread
From: Pavel Butsykin @ 2016-08-29 17:10 UTC (permalink / raw)
  To: qemu-block, qemu-devel; +Cc: kwolf, mreitz, stefanha, den, jsnow, eblake, famz

for case when the cache partially covers request we are part of the request
is filled from the cache, and the other part request from disk. Also add
reference counting for nodes, as way to maintain multithreading.

There is still no full synchronization in multithreaded mode.

Signed-off-by: Pavel Butsykin <pbutsykin@virtuozzo.com>
---
 block/pcache.c | 169 ++++++++++++++++++++++++++++++++++++++++++++++++++++-----
 1 file changed, 155 insertions(+), 14 deletions(-)

diff --git a/block/pcache.c b/block/pcache.c
index 28bd056..6114289 100644
--- a/block/pcache.c
+++ b/block/pcache.c
@@ -58,7 +58,10 @@ typedef struct BlockNode {
 typedef struct PCNode {
     BlockNode cm;
 
+    uint32_t                 status;
+    uint32_t                 ref;
     uint8_t                  *data;
+    CoMutex                  lock;
 } PCNode;
 
 typedef struct ReqStor {
@@ -95,9 +98,23 @@ typedef struct PrefCacheAIOCB {
     uint64_t sector_num;
     uint32_t nb_sectors;
     int      aio_type;
+    struct {
+        QTAILQ_HEAD(req_head, PrefCachePartReq) list;
+        CoMutex lock;
+    } requests;
     int      ret;
 } PrefCacheAIOCB;
 
+typedef struct PrefCachePartReq {
+    uint64_t sector_num;
+    uint32_t nb_sectors;
+
+    QEMUIOVector qiov;
+    PCNode *node;
+    PrefCacheAIOCB *acb;
+    QTAILQ_ENTRY(PrefCachePartReq) entry;
+} PrefCachePartReq;
+
 static const AIOCBInfo pcache_aiocb_info = {
     .aiocb_size = sizeof(PrefCacheAIOCB),
 };
@@ -126,8 +143,39 @@ static QemuOptsList runtime_opts = {
 #define MB_BITS 20
 #define PCACHE_DEFAULT_CACHE_SIZE (4 << MB_BITS)
 
+enum {
+    NODE_SUCCESS_STATUS = 0,
+    NODE_WAIT_STATUS    = 1,
+    NODE_REMOVE_STATUS  = 2,
+    NODE_GHOST_STATUS   = 3 /* only for debugging */
+};
+
 #define PCNODE(_n) ((PCNode *)(_n))
 
+static inline void pcache_node_unref(PCNode *node)
+{
+    assert(node->status == NODE_SUCCESS_STATUS ||
+           node->status == NODE_REMOVE_STATUS);
+
+    if (atomic_fetch_dec(&node->ref) == 0) {
+        assert(node->status == NODE_REMOVE_STATUS);
+
+        node->status = NODE_GHOST_STATUS;
+        g_free(node->data);
+        g_slice_free1(sizeof(*node), node);
+    }
+}
+
+static inline PCNode *pcache_node_ref(PCNode *node)
+{
+    assert(node->status == NODE_SUCCESS_STATUS ||
+           node->status == NODE_WAIT_STATUS);
+    assert(atomic_read(&node->ref) == 0);/* XXX: only for sequential requests */
+    atomic_inc(&node->ref);
+
+    return node;
+}
+
 static int pcache_key_cmp(const RbNodeKey *key1, const RbNodeKey *key2)
 {
     assert(key1 != NULL);
@@ -184,13 +232,7 @@ static void *node_insert(struct RbRoot *root, BlockNode *node)
 
 static inline PCNode *pcache_node_insert(struct RbRoot *root, PCNode *node)
 {
-    return node_insert(root, &node->cm);
-}
-
-static inline void pcache_node_free(PCNode *node)
-{
-    g_free(node->data);
-    g_slice_free1(sizeof(*node), node);
+    return pcache_node_ref(node_insert(root, &node->cm));
 }
 
 static inline void *pcache_node_alloc(RbNodeKey* key)
@@ -199,6 +241,9 @@ static inline void *pcache_node_alloc(RbNodeKey* key)
 
     node->cm.sector_num = key->num;
     node->cm.nb_sectors = key->size;
+    node->ref = 0;
+    node->status = NODE_WAIT_STATUS;
+    qemu_co_mutex_init(&node->lock);
     node->data = g_malloc(node->cm.nb_sectors << BDRV_SECTOR_BITS);
 
     return node;
@@ -206,6 +251,12 @@ static inline void *pcache_node_alloc(RbNodeKey* key)
 
 static void pcache_node_drop(BDRVPCacheState *s, PCNode *node)
 {
+    uint32_t prev_status = atomic_xchg(&node->status, NODE_REMOVE_STATUS);
+    if (prev_status == NODE_REMOVE_STATUS) {
+        return;
+    }
+    assert(prev_status != NODE_GHOST_STATUS);
+
     atomic_sub(&s->pcache.curr_size, node->cm.nb_sectors);
 
     qemu_co_mutex_lock(&s->pcache.lru.lock);
@@ -216,7 +267,7 @@ static void pcache_node_drop(BDRVPCacheState *s, PCNode *node)
     rb_erase(&node->cm.rb_node, &s->pcache.tree.root);
     qemu_co_mutex_unlock(&s->pcache.tree.lock);
 
-    pcache_node_free(node);
+    pcache_node_unref(node);
 }
 
 static void pcache_try_shrink(BDRVPCacheState *s)
@@ -234,6 +285,30 @@ static void pcache_try_shrink(BDRVPCacheState *s)
     }
 }
 
+static PrefCachePartReq *pcache_req_get(PrefCacheAIOCB *acb, PCNode *node)
+{
+    PrefCachePartReq *req = g_slice_alloc(sizeof(*req));
+
+    req->nb_sectors = node->cm.nb_sectors;
+    req->sector_num = node->cm.sector_num;
+    req->node = node;
+    req->acb = acb;
+
+    assert(acb->sector_num <= node->cm.sector_num + node->cm.nb_sectors);
+
+    qemu_iovec_init(&req->qiov, 1);
+    qemu_iovec_add(&req->qiov, node->data,
+                   node->cm.nb_sectors << BDRV_SECTOR_BITS);
+    return req;
+}
+
+static inline void push_node_request(PrefCacheAIOCB *acb, PCNode *node)
+{
+    PrefCachePartReq *req = pcache_req_get(acb, node);
+
+    QTAILQ_INSERT_HEAD(&acb->requests.list, req, entry);
+}
+
 static inline void pcache_lru_node_up(BDRVPCacheState *s, PCNode *node)
 {
     qemu_co_mutex_lock(&s->pcache.lru.lock);
@@ -253,16 +328,17 @@ static bool pcache_node_find_and_create(PrefCacheAIOCB *acb, RbNodeKey *key,
     found = pcache_node_insert(&s->pcache.tree.root, new_node);
     qemu_co_mutex_unlock(&s->pcache.tree.lock);
     if (found != new_node) {
-        pcache_node_free(new_node);
-        pcache_lru_node_up(s, found);
+        g_free(new_node->data);
+        g_slice_free1(sizeof(*new_node), new_node);
+        if (found->status == NODE_SUCCESS_STATUS) {
+            pcache_lru_node_up(s, found);
+        }
         *out_node = found;
         return false;
     }
     atomic_add(&s->pcache.curr_size, new_node->cm.nb_sectors);
 
-    qemu_co_mutex_lock(&s->pcache.lru.lock);
-    QTAILQ_INSERT_HEAD(&s->pcache.lru.list, &new_node->cm, entry);
-    qemu_co_mutex_unlock(&s->pcache.lru.lock);
+    push_node_request(acb, new_node);
 
     pcache_try_shrink(s);
 
@@ -291,6 +367,7 @@ static void pcache_pickup_parts_of_cache(PrefCacheAIOCB *acb, PCNode *node,
             up_size = lc_key.size;
 
             if (!pcache_node_find_and_create(acb, &lc_key, &new_node)) {
+                pcache_node_unref(node);
                 node = new_node;
                 continue;
             }
@@ -300,6 +377,8 @@ static void pcache_pickup_parts_of_cache(PrefCacheAIOCB *acb, PCNode *node,
         /* XXX: node read */
         up_size = MIN(node->cm.sector_num + node->cm.nb_sectors - num, size);
 
+        pcache_node_unref(node);
+
         size -= up_size;
         num += up_size;
         if (size != 0) {
@@ -336,6 +415,8 @@ static int32_t pcache_prefetch(PrefCacheAIOCB *acb)
         node->cm.sector_num + node->cm.nb_sectors >= acb->sector_num +
                                                      acb->nb_sectors)
     {
+        /* XXX: node read */
+        pcache_node_unref(node);
         return PREFETCH_FULL_UP;
     }
     pcache_pickup_parts_of_cache(acb, node, key.num, key.size);
@@ -343,10 +424,56 @@ static int32_t pcache_prefetch(PrefCacheAIOCB *acb)
     return PREFETCH_PART_UP;
 }
 
+static void pcache_node_submit(PrefCachePartReq *req)
+{
+    PCNode *node = req->node;
+    BDRVPCacheState *s = req->acb->s;
+
+    assert(node != NULL);
+    assert(atomic_read(&node->ref) != 0);
+    assert(node->data != NULL);
+
+    qemu_co_mutex_lock(&node->lock);
+    if (node->status == NODE_WAIT_STATUS) {
+        qemu_co_mutex_lock(&s->pcache.lru.lock);
+        QTAILQ_INSERT_HEAD(&s->pcache.lru.list, &node->cm, entry);
+        qemu_co_mutex_unlock(&s->pcache.lru.lock);
+
+        node->status = NODE_SUCCESS_STATUS;
+    }
+    qemu_co_mutex_unlock(&node->lock);
+}
+
+static void pcache_merge_requests(PrefCacheAIOCB *acb)
+{
+    PrefCachePartReq *req, *next;
+
+    qemu_co_mutex_lock(&acb->requests.lock);
+    QTAILQ_FOREACH_SAFE(req, &acb->requests.list, entry, next) {
+        QTAILQ_REMOVE(&acb->requests.list, req, entry);
+
+        assert(req != NULL);
+        assert(req->node->status == NODE_WAIT_STATUS);
+
+        pcache_node_submit(req);
+
+        /* XXX: pcache read */
+
+        pcache_node_unref(req->node);
+
+        g_slice_free1(sizeof(*req), req);
+    }
+    qemu_co_mutex_unlock(&acb->requests.lock);
+}
+
 static void pcache_aio_cb(void *opaque, int ret)
 {
     PrefCacheAIOCB *acb = opaque;
 
+    if (acb->aio_type & QEMU_AIO_READ) {
+        pcache_merge_requests(acb);
+    }
+
     acb->common.cb(acb->common.opaque, ret);
 
     qemu_aio_unref(acb);
@@ -366,6 +493,9 @@ static PrefCacheAIOCB *pcache_aio_get(BlockDriverState *bs, int64_t sector_num,
     acb->aio_type = type;
     acb->ret = 0;
 
+    QTAILQ_INIT(&acb->requests.list);
+    qemu_co_mutex_init(&acb->requests.lock);
+
     return acb;
 }
 
@@ -445,6 +575,17 @@ fail:
     return ret;
 }
 
+static void pcache_node_check_and_free(BDRVPCacheState *s, PCNode *node)
+{
+    assert(node->status == NODE_SUCCESS_STATUS);
+    assert(node->ref == 0);
+
+    node->status = NODE_REMOVE_STATUS;
+    rb_erase(&node->cm.rb_node, &s->pcache.tree.root);
+    g_free(node->data);
+    g_slice_free1(sizeof(*node), node);
+}
+
 static void pcache_close(BlockDriverState *bs)
 {
     uint32_t cnt = 0;
@@ -452,7 +593,7 @@ static void pcache_close(BlockDriverState *bs)
     BlockNode *node, *next;
     QTAILQ_FOREACH_SAFE(node, &s->pcache.lru.list, entry, next) {
         QTAILQ_REMOVE(&s->pcache.lru.list, node, entry);
-        pcache_node_free(PCNODE(node));
+        pcache_node_check_and_free(s, PCNODE(node));
         cnt++;
     }
     DPRINTF("used %d nodes\n", cnt);
-- 
2.8.3

^ permalink raw reply related	[flat|nested] 43+ messages in thread

* [Qemu-devel] [PATCH RFC v2 10/22] block/pcache: add check node leak
  2016-08-29 17:09 [Qemu-devel] [PATCH RFC v2 00/22] I/O prefetch cache Pavel Butsykin
                   ` (8 preceding siblings ...)
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 09/22] block/pcache: separation AIOCB on requests Pavel Butsykin
@ 2016-08-29 17:10 ` Pavel Butsykin
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 11/22] add QEMU style defines for __sync_add_and_fetch Pavel Butsykin
                   ` (13 subsequent siblings)
  23 siblings, 0 replies; 43+ messages in thread
From: Pavel Butsykin @ 2016-08-29 17:10 UTC (permalink / raw)
  To: qemu-block, qemu-devel; +Cc: kwolf, mreitz, stefanha, den, jsnow, eblake, famz

If the pcache  has a bug with node reference, then s->death_node_list can help
to know about it.

Signed-off-by: Pavel Butsykin <pbutsykin@virtuozzo.com>
---
 block/pcache.c | 45 +++++++++++++++++++++++++++++++++++++++------
 1 file changed, 39 insertions(+), 6 deletions(-)

diff --git a/block/pcache.c b/block/pcache.c
index 6114289..a8a57e3 100644
--- a/block/pcache.c
+++ b/block/pcache.c
@@ -87,6 +87,8 @@ typedef struct BDRVPCacheState {
 
 #ifdef PCACHE_DEBUG
     uint64_t shrink_cnt_node;
+    QTAILQ_HEAD(death_node_head, BlockNode) death_node_list;
+    CoMutex                                 death_node_lock;
 #endif
 } BDRVPCacheState;
 
@@ -152,7 +154,7 @@ enum {
 
 #define PCNODE(_n) ((PCNode *)(_n))
 
-static inline void pcache_node_unref(PCNode *node)
+static inline void pcache_node_unref(BDRVPCacheState *s, PCNode *node)
 {
     assert(node->status == NODE_SUCCESS_STATUS ||
            node->status == NODE_REMOVE_STATUS);
@@ -161,6 +163,12 @@ static inline void pcache_node_unref(PCNode *node)
         assert(node->status == NODE_REMOVE_STATUS);
 
         node->status = NODE_GHOST_STATUS;
+
+#ifdef PCACHE_DEBUG
+        qemu_co_mutex_lock(&s->death_node_lock);
+        QTAILQ_REMOVE(&s->death_node_list, &node->cm, entry);
+        qemu_co_mutex_unlock(&s->death_node_lock);
+#endif
         g_free(node->data);
         g_slice_free1(sizeof(*node), node);
     }
@@ -263,11 +271,17 @@ static void pcache_node_drop(BDRVPCacheState *s, PCNode *node)
     QTAILQ_REMOVE(&s->pcache.lru.list, &node->cm, entry);
     qemu_co_mutex_unlock(&s->pcache.lru.lock);
 
+#ifdef PCACHE_DEBUG
+    qemu_co_mutex_lock(&s->death_node_lock);
+    QTAILQ_INSERT_HEAD(&s->death_node_list, &node->cm, entry);
+    qemu_co_mutex_unlock(&s->death_node_lock);
+#endif
+
     qemu_co_mutex_lock(&s->pcache.tree.lock);
     rb_erase(&node->cm.rb_node, &s->pcache.tree.root);
     qemu_co_mutex_unlock(&s->pcache.tree.lock);
 
-    pcache_node_unref(node);
+    pcache_node_unref(s, node);
 }
 
 static void pcache_try_shrink(BDRVPCacheState *s)
@@ -367,7 +381,7 @@ static void pcache_pickup_parts_of_cache(PrefCacheAIOCB *acb, PCNode *node,
             up_size = lc_key.size;
 
             if (!pcache_node_find_and_create(acb, &lc_key, &new_node)) {
-                pcache_node_unref(node);
+                pcache_node_unref(acb->s, node);
                 node = new_node;
                 continue;
             }
@@ -377,7 +391,7 @@ static void pcache_pickup_parts_of_cache(PrefCacheAIOCB *acb, PCNode *node,
         /* XXX: node read */
         up_size = MIN(node->cm.sector_num + node->cm.nb_sectors - num, size);
 
-        pcache_node_unref(node);
+        pcache_node_unref(acb->s, node);
 
         size -= up_size;
         num += up_size;
@@ -416,7 +430,7 @@ static int32_t pcache_prefetch(PrefCacheAIOCB *acb)
                                                      acb->nb_sectors)
     {
         /* XXX: node read */
-        pcache_node_unref(node);
+        pcache_node_unref(acb->s, node);
         return PREFETCH_FULL_UP;
     }
     pcache_pickup_parts_of_cache(acb, node, key.num, key.size);
@@ -459,7 +473,7 @@ static void pcache_merge_requests(PrefCacheAIOCB *acb)
 
         /* XXX: pcache read */
 
-        pcache_node_unref(req->node);
+        pcache_node_unref(acb->s, req->node);
 
         g_slice_free1(sizeof(*req), req);
     }
@@ -544,6 +558,11 @@ static void pcache_state_init(QemuOpts *opts, BDRVPCacheState *s)
     s->pcache.curr_size = 0;
 
     s->cfg_cache_size = cache_size >> BDRV_SECTOR_BITS;
+
+#ifdef PCACHE_DEBUG
+    QTAILQ_INIT(&s->death_node_list);
+    qemu_co_mutex_init(&s->death_node_lock);
+#endif
 }
 
 static int pcache_file_open(BlockDriverState *bs, QDict *options, int flags,
@@ -597,6 +616,20 @@ static void pcache_close(BlockDriverState *bs)
         cnt++;
     }
     DPRINTF("used %d nodes\n", cnt);
+
+#ifdef PCACHE_DEBUG
+    if (!QTAILQ_EMPTY(&s->death_node_list)) {
+        cnt = 0;
+        DPRINTF("warning: death node list contains of node\n");
+        QTAILQ_FOREACH_SAFE(node, &s->death_node_list, entry, next) {
+            QTAILQ_REMOVE(&s->death_node_list, node, entry);
+            g_free(PCNODE(node)->data);
+            g_slice_free1(sizeof(*node), node);
+            cnt++;
+        }
+        DPRINTF("death nodes: %d", cnt);
+    }
+#endif
 }
 
 static void pcache_parse_filename(const char *filename, QDict *options,
-- 
2.8.3

^ permalink raw reply related	[flat|nested] 43+ messages in thread

* [Qemu-devel] [PATCH RFC v2 11/22] add QEMU style defines for __sync_add_and_fetch
  2016-08-29 17:09 [Qemu-devel] [PATCH RFC v2 00/22] I/O prefetch cache Pavel Butsykin
                   ` (9 preceding siblings ...)
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 10/22] block/pcache: add check node leak Pavel Butsykin
@ 2016-08-29 17:10 ` Pavel Butsykin
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 12/22] block/pcache: implement read cache to qiov and drop node during aio write Pavel Butsykin
                   ` (12 subsequent siblings)
  23 siblings, 0 replies; 43+ messages in thread
From: Pavel Butsykin @ 2016-08-29 17:10 UTC (permalink / raw)
  To: qemu-block, qemu-devel; +Cc: kwolf, mreitz, stefanha, den, jsnow, eblake, famz

Signed-off-by: Pavel Butsykin <pbutsykin@virtuozzo.com>
---
 include/qemu/atomic.h | 8 ++++++++
 1 file changed, 8 insertions(+)

diff --git a/include/qemu/atomic.h b/include/qemu/atomic.h
index 7e13fca..7087d0f 100644
--- a/include/qemu/atomic.h
+++ b/include/qemu/atomic.h
@@ -152,6 +152,10 @@
     _old;                                                               \
     })
 
+/* Provide shorter names for GCC atomic builtins, return new value */
+#define atomic_inc_fetch(ptr)  __sync_add_and_fetch(ptr, 1, __ATOMIC_SEQ_CST)
+#define atomic_dec_fetch(ptr)  __sync_add_and_fetch(ptr, -1, __ATOMIC_SEQ_CST)
+
 /* Provide shorter names for GCC atomic builtins, return old value */
 #define atomic_fetch_inc(ptr)  __atomic_fetch_add(ptr, 1, __ATOMIC_SEQ_CST)
 #define atomic_fetch_dec(ptr)  __atomic_fetch_sub(ptr, 1, __ATOMIC_SEQ_CST)
@@ -346,6 +350,10 @@
 #endif
 #endif
 
+/* Provide shorter names for GCC atomic builtins, return new value */
+#define atomic_inc_fetch(ptr)  __sync_add_and_fetch(ptr, 1)
+#define atomic_dec_fetch(ptr)  __sync_add_and_fetch(ptr, -1)
+
 /* Provide shorter names for GCC atomic builtins.  */
 #define atomic_fetch_inc(ptr)  __sync_fetch_and_add(ptr, 1)
 #define atomic_fetch_dec(ptr)  __sync_fetch_and_add(ptr, -1)
-- 
2.8.3

^ permalink raw reply related	[flat|nested] 43+ messages in thread

* [Qemu-devel] [PATCH RFC v2 12/22] block/pcache: implement read cache to qiov and drop node during aio write
  2016-08-29 17:09 [Qemu-devel] [PATCH RFC v2 00/22] I/O prefetch cache Pavel Butsykin
                   ` (10 preceding siblings ...)
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 11/22] add QEMU style defines for __sync_add_and_fetch Pavel Butsykin
@ 2016-08-29 17:10 ` Pavel Butsykin
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 13/22] block/pcache: add generic request complete Pavel Butsykin
                   ` (11 subsequent siblings)
  23 siblings, 0 replies; 43+ messages in thread
From: Pavel Butsykin @ 2016-08-29 17:10 UTC (permalink / raw)
  To: qemu-block, qemu-devel; +Cc: kwolf, mreitz, stefanha, den, jsnow, eblake, famz

After applying this patch aio read can pick up data from the pcache.

for correct driver work added 3 things:
1. Filling the qiov out of the cache.
2. Request completes. This is necessary to inform the upper level,
that the data for the request has already been obtained.
3. Cache Invalidation in the aio write requests. This is a simple way to keep
the cache up-to-date.

Signed-off-by: Pavel Butsykin <pbutsykin@virtuozzo.com>
---
 block/pcache.c | 126 +++++++++++++++++++++++++++++++++++++++++++++++++++++----
 1 file changed, 118 insertions(+), 8 deletions(-)

diff --git a/block/pcache.c b/block/pcache.c
index a8a57e3..435f2b4 100644
--- a/block/pcache.c
+++ b/block/pcache.c
@@ -103,7 +103,9 @@ typedef struct PrefCacheAIOCB {
     struct {
         QTAILQ_HEAD(req_head, PrefCachePartReq) list;
         CoMutex lock;
+        uint32_t cnt;
     } requests;
+    QEMUBH   *bh;
     int      ret;
 } PrefCacheAIOCB;
 
@@ -217,6 +219,27 @@ static BlockNode *pcache_node_prev(BlockNode* node, RbNodeKey *key)
     return node;
 }
 
+static void *node_search(struct RbRoot *root, RbNodeKey *key)
+{
+    struct RbNode *rb_node = root->rb_node;
+
+    while (rb_node) {
+        BlockNode *node = container_of(rb_node, BlockNode, rb_node);
+        int32_t result = pcache_key_cmp(key, &node->key);
+        if (result == 0) {
+            return pcache_node_prev(node, key);
+        }
+        rb_node = result < 0 ? rb_node->rb_left : rb_node->rb_right;
+    }
+    return NULL;
+}
+
+static PCNode *pcache_node_search(struct RbRoot *root, RbNodeKey *key)
+{
+    PCNode *node = node_search(root, key);
+    return node == NULL ? NULL : pcache_node_ref(node);
+}
+
 static void *node_insert(struct RbRoot *root, BlockNode *node)
 {
     struct RbNode **new = &(root->rb_node), *parent = NULL;
@@ -320,6 +343,8 @@ static inline void push_node_request(PrefCacheAIOCB *acb, PCNode *node)
 {
     PrefCachePartReq *req = pcache_req_get(acb, node);
 
+    acb->requests.cnt++;
+
     QTAILQ_INSERT_HEAD(&acb->requests.list, req, entry);
 }
 
@@ -360,6 +385,38 @@ static bool pcache_node_find_and_create(PrefCacheAIOCB *acb, RbNodeKey *key,
     return true;
 }
 
+static uint64_t ranges_overlap_size(uint64_t node1, uint32_t size1,
+                                    uint64_t node2, uint32_t size2)
+{
+    return MIN(node1 + size1, node2 + size2) - MAX(node1, node2);
+}
+
+static void pcache_node_read(PrefCacheAIOCB *acb, PCNode* node)
+{
+    uint64_t qiov_offs = 0, node_offs = 0;
+    uint32_t size;
+    uint32_t copy;
+
+    if (acb->sector_num < node->cm.sector_num) {
+        qiov_offs = (node->cm.sector_num - acb->sector_num) << BDRV_SECTOR_BITS;
+    } else {
+        node_offs = (acb->sector_num - node->cm.sector_num) << BDRV_SECTOR_BITS;
+    }
+    size = ranges_overlap_size(acb->sector_num, acb->nb_sectors,
+                               node->cm.sector_num, node->cm.nb_sectors)
+           << BDRV_SECTOR_BITS;
+
+    assert(node->status == NODE_SUCCESS_STATUS ||
+           node->status == NODE_REMOVE_STATUS);
+    assert(node->data != NULL);
+
+    qemu_co_mutex_lock(&node->lock);
+    copy = \
+        qemu_iovec_from_buf(acb->qiov, qiov_offs, node->data + node_offs, size);
+    assert(copy == size);
+    qemu_co_mutex_unlock(&node->lock);
+}
+
 static inline void prefetch_init_key(PrefCacheAIOCB *acb, RbNodeKey* key)
 {
     key->num = acb->sector_num;
@@ -388,7 +445,7 @@ static void pcache_pickup_parts_of_cache(PrefCacheAIOCB *acb, PCNode *node,
             size -= up_size;
             num += up_size;
         }
-        /* XXX: node read */
+        pcache_node_read(acb, node);
         up_size = MIN(node->cm.sector_num + node->cm.nb_sectors - num, size);
 
         pcache_node_unref(acb->s, node);
@@ -429,13 +486,28 @@ static int32_t pcache_prefetch(PrefCacheAIOCB *acb)
         node->cm.sector_num + node->cm.nb_sectors >= acb->sector_num +
                                                      acb->nb_sectors)
     {
-        /* XXX: node read */
+        pcache_node_read(acb, node);
         pcache_node_unref(acb->s, node);
         return PREFETCH_FULL_UP;
     }
     pcache_pickup_parts_of_cache(acb, node, key.num, key.size);
 
-    return PREFETCH_PART_UP;
+    return acb->requests.cnt == 0 ? PREFETCH_FULL_UP : PREFETCH_PART_UP;
+}
+
+static void pcache_aio_bh(void *opaque)
+{
+    PrefCacheAIOCB *acb = opaque;
+    qemu_bh_delete(acb->bh);
+    acb->common.cb(acb->common.opaque, 0);
+    qemu_aio_unref(acb);
+}
+
+static void complete_aio_request(PrefCacheAIOCB *acb)
+{
+    acb->bh = aio_bh_new(bdrv_get_aio_context(acb->common.bs),
+                         pcache_aio_bh, acb);
+    qemu_bh_schedule(acb->bh);
 }
 
 static void pcache_node_submit(PrefCachePartReq *req)
@@ -471,7 +543,7 @@ static void pcache_merge_requests(PrefCacheAIOCB *acb)
 
         pcache_node_submit(req);
 
-        /* XXX: pcache read */
+        pcache_node_read(acb, req->node);
 
         pcache_node_unref(acb->s, req->node);
 
@@ -480,11 +552,36 @@ static void pcache_merge_requests(PrefCacheAIOCB *acb)
     qemu_co_mutex_unlock(&acb->requests.lock);
 }
 
+static void pcache_try_node_drop(PrefCacheAIOCB *acb)
+{
+    BDRVPCacheState *s = acb->s;
+    RbNodeKey key;
+
+    prefetch_init_key(acb, &key);
+
+    do {
+        PCNode *node;
+        qemu_co_mutex_lock(&s->pcache.tree.lock);
+        node = pcache_node_search(&s->pcache.tree.root, &key);
+        qemu_co_mutex_unlock(&s->pcache.tree.lock);
+        if (node == NULL) {
+            break;
+        }
+
+        pcache_node_drop(s, node);
+
+        pcache_node_unref(s, node);
+    } while (true);
+}
+
 static void pcache_aio_cb(void *opaque, int ret)
 {
     PrefCacheAIOCB *acb = opaque;
 
     if (acb->aio_type & QEMU_AIO_READ) {
+        if (atomic_dec_fetch(&acb->requests.cnt) > 0) {
+            return;
+        }
         pcache_merge_requests(acb);
     }
 
@@ -503,6 +600,7 @@ static PrefCacheAIOCB *pcache_aio_get(BlockDriverState *bs, int64_t sector_num,
     acb->s = bs->opaque;
     acb->sector_num = sector_num;
     acb->nb_sectors = nb_sectors;
+    acb->requests.cnt = 0;
     acb->qiov = qiov;
     acb->aio_type = type;
     acb->ret = 0;
@@ -522,10 +620,21 @@ static BlockAIOCB *pcache_aio_readv(BlockDriverState *bs,
 {
     PrefCacheAIOCB *acb = pcache_aio_get(bs, sector_num, qiov, nb_sectors, cb,
                                          opaque, QEMU_AIO_READ);
-    pcache_prefetch(acb);
-
-    bdrv_aio_readv(bs->file, sector_num, qiov, nb_sectors,
-                   pcache_aio_cb, acb);
+    int32_t status = pcache_prefetch(acb);
+    if (status == PREFETCH_FULL_UP) {
+        assert(acb->requests.cnt == 0);
+        complete_aio_request(acb);
+    } else {
+        PrefCachePartReq *req;
+        assert(acb->requests.cnt != 0);
+
+        qemu_co_mutex_lock(&acb->requests.lock);
+        QTAILQ_FOREACH(req, &acb->requests.list, entry) {
+            bdrv_aio_readv(bs->file, req->sector_num, &req->qiov,
+                           req->nb_sectors, pcache_aio_cb, acb);
+        }
+        qemu_co_mutex_unlock(&acb->requests.lock);
+    }
     return &acb->common;
 }
 
@@ -538,6 +647,7 @@ static BlockAIOCB *pcache_aio_writev(BlockDriverState *bs,
 {
     PrefCacheAIOCB *acb = pcache_aio_get(bs, sector_num, qiov, nb_sectors, cb,
                                          opaque, QEMU_AIO_WRITE);
+    pcache_try_node_drop(acb); /* XXX: use write through */
 
     bdrv_aio_writev(bs->file, sector_num, qiov, nb_sectors,
                     pcache_aio_cb, acb);
-- 
2.8.3

^ permalink raw reply related	[flat|nested] 43+ messages in thread

* [Qemu-devel] [PATCH RFC v2 13/22] block/pcache: add generic request complete
  2016-08-29 17:09 [Qemu-devel] [PATCH RFC v2 00/22] I/O prefetch cache Pavel Butsykin
                   ` (11 preceding siblings ...)
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 12/22] block/pcache: implement read cache to qiov and drop node during aio write Pavel Butsykin
@ 2016-08-29 17:10 ` Pavel Butsykin
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 14/22] block/pcache: add support for rescheduling requests Pavel Butsykin
                   ` (10 subsequent siblings)
  23 siblings, 0 replies; 43+ messages in thread
From: Pavel Butsykin @ 2016-08-29 17:10 UTC (permalink / raw)
  To: qemu-block, qemu-devel; +Cc: kwolf, mreitz, stefanha, den, jsnow, eblake, famz

his change  allow us to generalize the completion of all requests.

Signed-off-by: Pavel Butsykin <pbutsykin@virtuozzo.com>
---
 block/pcache.c | 14 ++++++++------
 1 file changed, 8 insertions(+), 6 deletions(-)

diff --git a/block/pcache.c b/block/pcache.c
index 435f2b4..1ff4c6a 100644
--- a/block/pcache.c
+++ b/block/pcache.c
@@ -105,6 +105,7 @@ typedef struct PrefCacheAIOCB {
         CoMutex lock;
         uint32_t cnt;
     } requests;
+    uint32_t ref;
     QEMUBH   *bh;
     int      ret;
 } PrefCacheAIOCB;
@@ -505,9 +506,11 @@ static void pcache_aio_bh(void *opaque)
 
 static void complete_aio_request(PrefCacheAIOCB *acb)
 {
-    acb->bh = aio_bh_new(bdrv_get_aio_context(acb->common.bs),
-                         pcache_aio_bh, acb);
-    qemu_bh_schedule(acb->bh);
+    if (atomic_dec_fetch(&acb->ref) == 0) {
+        acb->bh = aio_bh_new(bdrv_get_aio_context(acb->common.bs),
+                             pcache_aio_bh, acb);
+        qemu_bh_schedule(acb->bh);
+    }
 }
 
 static void pcache_node_submit(PrefCachePartReq *req)
@@ -585,9 +588,7 @@ static void pcache_aio_cb(void *opaque, int ret)
         pcache_merge_requests(acb);
     }
 
-    acb->common.cb(acb->common.opaque, ret);
-
-    qemu_aio_unref(acb);
+    complete_aio_request(acb);
 }
 
 static PrefCacheAIOCB *pcache_aio_get(BlockDriverState *bs, int64_t sector_num,
@@ -603,6 +604,7 @@ static PrefCacheAIOCB *pcache_aio_get(BlockDriverState *bs, int64_t sector_num,
     acb->requests.cnt = 0;
     acb->qiov = qiov;
     acb->aio_type = type;
+    acb->ref = 1;
     acb->ret = 0;
 
     QTAILQ_INIT(&acb->requests.list);
-- 
2.8.3

^ permalink raw reply related	[flat|nested] 43+ messages in thread

* [Qemu-devel] [PATCH RFC v2 14/22] block/pcache: add support for rescheduling requests
  2016-08-29 17:09 [Qemu-devel] [PATCH RFC v2 00/22] I/O prefetch cache Pavel Butsykin
                   ` (12 preceding siblings ...)
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 13/22] block/pcache: add generic request complete Pavel Butsykin
@ 2016-08-29 17:10 ` Pavel Butsykin
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 15/22] block/pcache: simple readahead one chunk forward Pavel Butsykin
                   ` (9 subsequent siblings)
  23 siblings, 0 replies; 43+ messages in thread
From: Pavel Butsykin @ 2016-08-29 17:10 UTC (permalink / raw)
  To: qemu-block, qemu-devel; +Cc: kwolf, mreitz, stefanha, den, jsnow, eblake, famz

Now we can't drop nodes until aio write request will not be completed,
because there is no guarantee that in the interval of time between the start
request and its completion can be cached overlapping chunk of blocks
and some data in the cache will be irrelevant.

Also became possible when aio write corresponds to PCNode with status
NODE_WAIT_STATUS, if we drop the nodes in aio callback,
then these nodes can be skipped because there is a guarantee
that at the time of processing aio read for pending node data on the disk
will be relevant.

Signed-off-by: Pavel Butsykin <pbutsykin@virtuozzo.com>
---
 block/pcache.c | 136 +++++++++++++++++++++++++++++++++++++++++++++++----------
 1 file changed, 112 insertions(+), 24 deletions(-)

diff --git a/block/pcache.c b/block/pcache.c
index 1ff4c6a..cb5f884 100644
--- a/block/pcache.c
+++ b/block/pcache.c
@@ -43,6 +43,11 @@ typedef struct RbNodeKey {
     uint32_t    size;
 } RbNodeKey;
 
+typedef struct ACBEntryLink {
+    QTAILQ_ENTRY(ACBEntryLink) entry;
+    struct PrefCacheAIOCB     *acb;
+} ACBEntryLink;
+
 typedef struct BlockNode {
     struct RbNode               rb_node;
     union {
@@ -58,6 +63,10 @@ typedef struct BlockNode {
 typedef struct PCNode {
     BlockNode cm;
 
+    struct {
+        QTAILQ_HEAD(acb_head, ACBEntryLink) list;
+        uint32_t cnt;
+    } wait;
     uint32_t                 status;
     uint32_t                 ref;
     uint8_t                  *data;
@@ -181,7 +190,6 @@ static inline PCNode *pcache_node_ref(PCNode *node)
 {
     assert(node->status == NODE_SUCCESS_STATUS ||
            node->status == NODE_WAIT_STATUS);
-    assert(atomic_read(&node->ref) == 0);/* XXX: only for sequential requests */
     atomic_inc(&node->ref);
 
     return node;
@@ -277,6 +285,8 @@ static inline void *pcache_node_alloc(RbNodeKey* key)
     node->status = NODE_WAIT_STATUS;
     qemu_co_mutex_init(&node->lock);
     node->data = g_malloc(node->cm.nb_sectors << BDRV_SECTOR_BITS);
+    node->wait.cnt = 0;
+    QTAILQ_INIT(&node->wait.list);
 
     return node;
 }
@@ -308,15 +318,33 @@ static void pcache_node_drop(BDRVPCacheState *s, PCNode *node)
     pcache_node_unref(s, node);
 }
 
+static inline PCNode *pcache_get_most_unused_node(BDRVPCacheState *s)
+{
+    PCNode *node;
+    assert(!QTAILQ_EMPTY(&s->pcache.lru.list));
+
+    qemu_co_mutex_lock(&s->pcache.lru.lock);
+    node = PCNODE(QTAILQ_LAST(&s->pcache.lru.list, lru_head));
+    pcache_node_ref(node);
+    qemu_co_mutex_unlock(&s->pcache.lru.lock);
+
+    return node;
+}
+
 static void pcache_try_shrink(BDRVPCacheState *s)
 {
     while (s->pcache.curr_size > s->cfg_cache_size) {
-        qemu_co_mutex_lock(&s->pcache.lru.lock);
-        assert(!QTAILQ_EMPTY(&s->pcache.lru.list));
-        PCNode *rmv_node = PCNODE(QTAILQ_LAST(&s->pcache.lru.list, lru_head));
-        qemu_co_mutex_unlock(&s->pcache.lru.lock);
+        PCNode *rmv_node;
+                /* it can happen if all nodes are waiting */
+        if (QTAILQ_EMPTY(&s->pcache.lru.list)) {
+            DPRINTF("lru list is empty, but curr_size: %d\n",
+                    s->pcache.curr_size);
+            break;
+        }
+        rmv_node = pcache_get_most_unused_node(s);
 
         pcache_node_drop(s, rmv_node);
+        pcache_node_unref(s, rmv_node);
 #ifdef PCACHE_DEBUG
         atomic_inc(&s->shrink_cnt_node);
 #endif
@@ -392,7 +420,7 @@ static uint64_t ranges_overlap_size(uint64_t node1, uint32_t size1,
     return MIN(node1 + size1, node2 + size2) - MAX(node1, node2);
 }
 
-static void pcache_node_read(PrefCacheAIOCB *acb, PCNode* node)
+static inline void pcache_node_read_buf(PrefCacheAIOCB *acb, PCNode* node)
 {
     uint64_t qiov_offs = 0, node_offs = 0;
     uint32_t size;
@@ -407,15 +435,41 @@ static void pcache_node_read(PrefCacheAIOCB *acb, PCNode* node)
                                node->cm.sector_num, node->cm.nb_sectors)
            << BDRV_SECTOR_BITS;
 
+    qemu_co_mutex_lock(&node->lock); /* XXX: use rw lock */
+    copy = \
+        qemu_iovec_from_buf(acb->qiov, qiov_offs, node->data + node_offs, size);
+    qemu_co_mutex_unlock(&node->lock);
+    assert(copy == size);
+}
+
+static inline void pcache_node_read_wait(PrefCacheAIOCB *acb, PCNode *node)
+{
+    ACBEntryLink *link = g_slice_alloc(sizeof(*link));
+    link->acb = acb;
+
+    atomic_inc(&node->wait.cnt);
+    QTAILQ_INSERT_HEAD(&node->wait.list, link, entry);
+    acb->ref++;
+}
+
+static void pcache_node_read(PrefCacheAIOCB *acb, PCNode* node)
+{
     assert(node->status == NODE_SUCCESS_STATUS ||
+           node->status == NODE_WAIT_STATUS    ||
            node->status == NODE_REMOVE_STATUS);
     assert(node->data != NULL);
 
     qemu_co_mutex_lock(&node->lock);
-    copy = \
-        qemu_iovec_from_buf(acb->qiov, qiov_offs, node->data + node_offs, size);
-    assert(copy == size);
+    if (node->status == NODE_WAIT_STATUS) {
+        pcache_node_read_wait(acb, node);
+        qemu_co_mutex_unlock(&node->lock);
+
+        return;
+    }
     qemu_co_mutex_unlock(&node->lock);
+
+    pcache_node_read_buf(acb, node);
+    pcache_node_unref(acb->s, node);
 }
 
 static inline void prefetch_init_key(PrefCacheAIOCB *acb, RbNodeKey* key)
@@ -446,10 +500,11 @@ static void pcache_pickup_parts_of_cache(PrefCacheAIOCB *acb, PCNode *node,
             size -= up_size;
             num += up_size;
         }
-        pcache_node_read(acb, node);
         up_size = MIN(node->cm.sector_num + node->cm.nb_sectors - num, size);
-
-        pcache_node_unref(acb->s, node);
+        pcache_node_read(acb, node); /* don't use node after pcache_node_read,
+                                      * node maybe free.
+                                      */
+        node = NULL;
 
         size -= up_size;
         num += up_size;
@@ -488,7 +543,6 @@ static int32_t pcache_prefetch(PrefCacheAIOCB *acb)
                                                      acb->nb_sectors)
     {
         pcache_node_read(acb, node);
-        pcache_node_unref(acb->s, node);
         return PREFETCH_FULL_UP;
     }
     pcache_pickup_parts_of_cache(acb, node, key.num, key.size);
@@ -513,6 +567,31 @@ static void complete_aio_request(PrefCacheAIOCB *acb)
     }
 }
 
+static void pcache_complete_acb_wait_queue(BDRVPCacheState *s, PCNode *node)
+{
+    ACBEntryLink *link, *next;
+
+    if (atomic_read(&node->wait.cnt) == 0) {
+        return;
+    }
+
+    QTAILQ_FOREACH_SAFE(link, &node->wait.list, entry, next) {
+        PrefCacheAIOCB *wait_acb = link->acb;
+
+        QTAILQ_REMOVE(&node->wait.list, link, entry);
+        g_slice_free1(sizeof(*link), link);
+
+        pcache_node_read_buf(wait_acb, node);
+
+        assert(node->ref != 0);
+        pcache_node_unref(s, node);
+
+        complete_aio_request(wait_acb);
+        atomic_dec(&node->wait.cnt);
+    }
+    assert(atomic_read(&node->wait.cnt) == 0);
+}
+
 static void pcache_node_submit(PrefCachePartReq *req)
 {
     PCNode *node = req->node;
@@ -539,14 +618,17 @@ static void pcache_merge_requests(PrefCacheAIOCB *acb)
 
     qemu_co_mutex_lock(&acb->requests.lock);
     QTAILQ_FOREACH_SAFE(req, &acb->requests.list, entry, next) {
+        PCNode *node = req->node;
         QTAILQ_REMOVE(&acb->requests.list, req, entry);
 
         assert(req != NULL);
-        assert(req->node->status == NODE_WAIT_STATUS);
+        assert(node->status == NODE_WAIT_STATUS);
 
         pcache_node_submit(req);
 
-        pcache_node_read(acb, req->node);
+        pcache_node_read_buf(acb, node);
+
+        pcache_complete_acb_wait_queue(acb->s, node);
 
         pcache_node_unref(acb->s, req->node);
 
@@ -559,22 +641,27 @@ static void pcache_try_node_drop(PrefCacheAIOCB *acb)
 {
     BDRVPCacheState *s = acb->s;
     RbNodeKey key;
+    PCNode *node;
+    uint64_t end_offs = acb->sector_num + acb->nb_sectors;
 
-    prefetch_init_key(acb, &key);
-
+    key.num = acb->sector_num;
     do {
-        PCNode *node;
-        qemu_co_mutex_lock(&s->pcache.tree.lock);
+        key.size = end_offs - key.num;
+
+        qemu_co_mutex_lock(&s->pcache.tree.lock); /* XXX: use get_next_node */
         node = pcache_node_search(&s->pcache.tree.root, &key);
         qemu_co_mutex_unlock(&s->pcache.tree.lock);
         if (node == NULL) {
-            break;
+            return;
         }
-
-        pcache_node_drop(s, node);
+        if (node->status != NODE_WAIT_STATUS) {
+            assert(node->status == NODE_SUCCESS_STATUS);
+            pcache_node_drop(s, node);
+        }
+        key.num = node->cm.sector_num + node->cm.nb_sectors;
 
         pcache_node_unref(s, node);
-    } while (true);
+    } while (end_offs > key.num);
 }
 
 static void pcache_aio_cb(void *opaque, int ret)
@@ -586,6 +673,8 @@ static void pcache_aio_cb(void *opaque, int ret)
             return;
         }
         pcache_merge_requests(acb);
+    } else {        /* QEMU_AIO_WRITE */
+        pcache_try_node_drop(acb); /* XXX: use write through */
     }
 
     complete_aio_request(acb);
@@ -649,7 +738,6 @@ static BlockAIOCB *pcache_aio_writev(BlockDriverState *bs,
 {
     PrefCacheAIOCB *acb = pcache_aio_get(bs, sector_num, qiov, nb_sectors, cb,
                                          opaque, QEMU_AIO_WRITE);
-    pcache_try_node_drop(acb); /* XXX: use write through */
 
     bdrv_aio_writev(bs->file, sector_num, qiov, nb_sectors,
                     pcache_aio_cb, acb);
-- 
2.8.3

^ permalink raw reply related	[flat|nested] 43+ messages in thread

* [Qemu-devel] [PATCH RFC v2 15/22] block/pcache: simple readahead one chunk forward
  2016-08-29 17:09 [Qemu-devel] [PATCH RFC v2 00/22] I/O prefetch cache Pavel Butsykin
                   ` (13 preceding siblings ...)
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 14/22] block/pcache: add support for rescheduling requests Pavel Butsykin
@ 2016-08-29 17:10 ` Pavel Butsykin
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 16/22] block/pcache: pcache readahead node around Pavel Butsykin
                   ` (8 subsequent siblings)
  23 siblings, 0 replies; 43+ messages in thread
From: Pavel Butsykin @ 2016-08-29 17:10 UTC (permalink / raw)
  To: qemu-block, qemu-devel; +Cc: kwolf, mreitz, stefanha, den, jsnow, eblake, famz

The performance improvement occurs mainly because of this element.
An advantage is achieved due to the fact that from readahead requests can
become queued faster than request from Guest OS.

Add pcache-readahead-size open parameter.

Signed-off-by: Pavel Butsykin <pbutsykin@virtuozzo.com>
---
 block/pcache.c | 118 ++++++++++++++++++++++++++++++++++++++++++++++++---------
 1 file changed, 100 insertions(+), 18 deletions(-)

diff --git a/block/pcache.c b/block/pcache.c
index cb5f884..90b3f85 100644
--- a/block/pcache.c
+++ b/block/pcache.c
@@ -92,7 +92,10 @@ typedef struct BDRVPCacheState {
 
     ReqStor pcache;
 
-    uint32_t cfg_cache_size;
+    struct {
+        uint32_t cache_size;
+        uint32_t readahead_size;
+    } cfg;
 
 #ifdef PCACHE_DEBUG
     uint64_t shrink_cnt_node;
@@ -134,6 +137,7 @@ static const AIOCBInfo pcache_aiocb_info = {
 };
 
 #define PCACHE_OPT_CACHE_SIZE "pcache-full-size"
+#define PCACHE_OPT_READAHEAD_SIZE "pcache-readahead-size"
 
 static QemuOptsList runtime_opts = {
     .name = "pcache",
@@ -149,6 +153,11 @@ static QemuOptsList runtime_opts = {
             .type = QEMU_OPT_SIZE,
             .help = "Total cache size",
         },
+        {
+            .name = PCACHE_OPT_READAHEAD_SIZE,
+            .type = QEMU_OPT_SIZE,
+            .help = "Prefetch cache readahead size",
+        },
         { /* end of list */ }
     },
 };
@@ -156,6 +165,7 @@ static QemuOptsList runtime_opts = {
 #define KB_BITS 10
 #define MB_BITS 20
 #define PCACHE_DEFAULT_CACHE_SIZE (4 << MB_BITS)
+#define PCACHE_DEFAULT_READAHEAD_SIZE (128 << KB_BITS)
 
 enum {
     NODE_SUCCESS_STATUS = 0,
@@ -164,13 +174,16 @@ enum {
     NODE_GHOST_STATUS   = 3 /* only for debugging */
 };
 
+enum {
+    PCACHE_AIO_READ      = 1,
+    PCACHE_AIO_WRITE     = 2,
+    PCACHE_AIO_READAHEAD = 4
+};
+
 #define PCNODE(_n) ((PCNode *)(_n))
 
 static inline void pcache_node_unref(BDRVPCacheState *s, PCNode *node)
 {
-    assert(node->status == NODE_SUCCESS_STATUS ||
-           node->status == NODE_REMOVE_STATUS);
-
     if (atomic_fetch_dec(&node->ref) == 0) {
         assert(node->status == NODE_REMOVE_STATUS);
 
@@ -333,7 +346,7 @@ static inline PCNode *pcache_get_most_unused_node(BDRVPCacheState *s)
 
 static void pcache_try_shrink(BDRVPCacheState *s)
 {
-    while (s->pcache.curr_size > s->cfg_cache_size) {
+    while (s->pcache.curr_size > s->cfg.cache_size) {
         PCNode *rmv_node;
                 /* it can happen if all nodes are waiting */
         if (QTAILQ_EMPTY(&s->pcache.lru.list)) {
@@ -626,7 +639,9 @@ static void pcache_merge_requests(PrefCacheAIOCB *acb)
 
         pcache_node_submit(req);
 
-        pcache_node_read_buf(acb, node);
+        if (!(acb->aio_type & PCACHE_AIO_READAHEAD)) {
+            pcache_node_read_buf(acb, node);
+        }
 
         pcache_complete_acb_wait_queue(acb->s, node);
 
@@ -668,12 +683,16 @@ static void pcache_aio_cb(void *opaque, int ret)
 {
     PrefCacheAIOCB *acb = opaque;
 
-    if (acb->aio_type & QEMU_AIO_READ) {
+    if (acb->aio_type & PCACHE_AIO_READ) {
         if (atomic_dec_fetch(&acb->requests.cnt) > 0) {
             return;
         }
         pcache_merge_requests(acb);
-    } else {        /* QEMU_AIO_WRITE */
+        if (acb->aio_type & PCACHE_AIO_READAHEAD) {
+            qemu_aio_unref(acb);
+            return;
+        }
+    } else {        /* PCACHE_AIO_WRITE */
         pcache_try_node_drop(acb); /* XXX: use write through */
     }
 
@@ -702,6 +721,69 @@ static PrefCacheAIOCB *pcache_aio_get(BlockDriverState *bs, int64_t sector_num,
     return acb;
 }
 
+static void pcache_send_acb_request_list(BlockDriverState *bs,
+                                         PrefCacheAIOCB *acb)
+{
+    PrefCachePartReq *req;
+
+    assert(acb->requests.cnt != 0);
+    qemu_co_mutex_lock(&acb->requests.lock);
+    QTAILQ_FOREACH(req, &acb->requests.list, entry) {
+        bdrv_aio_readv(bs->file, req->sector_num, &req->qiov,
+                       req->nb_sectors, pcache_aio_cb, acb);
+    }
+    qemu_co_mutex_unlock(&acb->requests.lock);
+}
+
+static bool check_allocated_blocks(BlockDriverState *bs, int64_t sector_num,
+                                   int32_t nb_sectors)
+{
+    int ret, num;
+
+    do {
+        ret = bdrv_is_allocated(bs, sector_num, nb_sectors, &num);
+        if (ret <= 0) {
+            return false;
+        }
+        sector_num += num;
+        nb_sectors -= num;
+
+    } while (nb_sectors);
+
+    return true;
+}
+
+static void pcache_readahead_request(BlockDriverState *bs, PrefCacheAIOCB *acb)
+{
+    BDRVPCacheState *s = acb->s;
+    PrefCacheAIOCB *acb_readahead;
+    RbNodeKey key;
+    uint64_t total_sectors = bdrv_nb_sectors(bs);
+    PCNode *node = NULL;
+
+    prefetch_init_key(acb, &key);
+
+    key.num = key.num + key.size;
+    if (total_sectors <= key.num + s->cfg.readahead_size) {
+        return; /* readahead too small or beyond end of disk */
+    }
+    key.size = s->cfg.readahead_size;
+
+    if (!check_allocated_blocks(bs->file->bs, key.num, key.size)) {
+        return;
+    }
+
+    acb_readahead = pcache_aio_get(bs, key.num, NULL, key.size, acb->common.cb,
+                                   acb->common.opaque, PCACHE_AIO_READ |
+                                                       PCACHE_AIO_READAHEAD);
+    if (!pcache_node_find_and_create(acb_readahead, &key, &node)) {
+        pcache_node_unref(s, node);
+        qemu_aio_unref(acb_readahead);
+        return;
+    }
+    pcache_send_acb_request_list(bs, acb_readahead);
+}
+
 static BlockAIOCB *pcache_aio_readv(BlockDriverState *bs,
                                     int64_t sector_num,
                                     QEMUIOVector *qiov,
@@ -710,22 +792,18 @@ static BlockAIOCB *pcache_aio_readv(BlockDriverState *bs,
                                     void *opaque)
 {
     PrefCacheAIOCB *acb = pcache_aio_get(bs, sector_num, qiov, nb_sectors, cb,
-                                         opaque, QEMU_AIO_READ);
+                                         opaque, PCACHE_AIO_READ);
     int32_t status = pcache_prefetch(acb);
     if (status == PREFETCH_FULL_UP) {
         assert(acb->requests.cnt == 0);
         complete_aio_request(acb);
     } else {
-        PrefCachePartReq *req;
         assert(acb->requests.cnt != 0);
 
-        qemu_co_mutex_lock(&acb->requests.lock);
-        QTAILQ_FOREACH(req, &acb->requests.list, entry) {
-            bdrv_aio_readv(bs->file, req->sector_num, &req->qiov,
-                           req->nb_sectors, pcache_aio_cb, acb);
-        }
-        qemu_co_mutex_unlock(&acb->requests.lock);
+        pcache_send_acb_request_list(bs, acb);
     }
+    pcache_readahead_request(bs, acb);
+
     return &acb->common;
 }
 
@@ -737,7 +815,7 @@ static BlockAIOCB *pcache_aio_writev(BlockDriverState *bs,
                                      void *opaque)
 {
     PrefCacheAIOCB *acb = pcache_aio_get(bs, sector_num, qiov, nb_sectors, cb,
-                                         opaque, QEMU_AIO_WRITE);
+                                         opaque, PCACHE_AIO_WRITE);
 
     bdrv_aio_writev(bs->file, sector_num, qiov, nb_sectors,
                     pcache_aio_cb, acb);
@@ -748,8 +826,11 @@ static void pcache_state_init(QemuOpts *opts, BDRVPCacheState *s)
 {
     uint64_t cache_size = qemu_opt_get_size(opts, PCACHE_OPT_CACHE_SIZE,
                                             PCACHE_DEFAULT_CACHE_SIZE);
+    uint64_t readahead_size = qemu_opt_get_size(opts, PCACHE_OPT_READAHEAD_SIZE,
+                                                PCACHE_DEFAULT_READAHEAD_SIZE);
     DPRINTF("pcache configure:\n");
     DPRINTF("pcache-full-size = %jd\n", cache_size);
+    DPRINTF("readahead_size = %jd\n", readahead_size);
 
     s->pcache.tree.root = RB_ROOT;
     qemu_co_mutex_init(&s->pcache.tree.lock);
@@ -757,7 +838,8 @@ static void pcache_state_init(QemuOpts *opts, BDRVPCacheState *s)
     qemu_co_mutex_init(&s->pcache.lru.lock);
     s->pcache.curr_size = 0;
 
-    s->cfg_cache_size = cache_size >> BDRV_SECTOR_BITS;
+    s->cfg.cache_size = cache_size >> BDRV_SECTOR_BITS;
+    s->cfg.readahead_size = readahead_size >> BDRV_SECTOR_BITS;
 
 #ifdef PCACHE_DEBUG
     QTAILQ_INIT(&s->death_node_list);
-- 
2.8.3

^ permalink raw reply related	[flat|nested] 43+ messages in thread

* [Qemu-devel] [PATCH RFC v2 16/22] block/pcache: pcache readahead node around
  2016-08-29 17:09 [Qemu-devel] [PATCH RFC v2 00/22] I/O prefetch cache Pavel Butsykin
                   ` (14 preceding siblings ...)
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 15/22] block/pcache: simple readahead one chunk forward Pavel Butsykin
@ 2016-08-29 17:10 ` Pavel Butsykin
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 17/22] block/pcache: skip readahead for non-sequential requests Pavel Butsykin
                   ` (7 subsequent siblings)
  23 siblings, 0 replies; 43+ messages in thread
From: Pavel Butsykin @ 2016-08-29 17:10 UTC (permalink / raw)
  To: qemu-block, qemu-devel; +Cc: kwolf, mreitz, stefanha, den, jsnow, eblake, famz

if the next block is cached, then we need to check the size of node to
ensure full readahead.

Signed-off-by: Pavel Butsykin <pbutsykin@virtuozzo.com>
---
 block/pcache.c | 51 +++++++++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 49 insertions(+), 2 deletions(-)

diff --git a/block/pcache.c b/block/pcache.c
index 90b3f85..ae7ac8d 100644
--- a/block/pcache.c
+++ b/block/pcache.c
@@ -427,6 +427,16 @@ static bool pcache_node_find_and_create(PrefCacheAIOCB *acb, RbNodeKey *key,
     return true;
 }
 
+static inline PCNode *pcache_node_add(PrefCacheAIOCB *acb, RbNodeKey *key)
+{
+    PCNode *node = NULL;
+    if (!pcache_node_find_and_create(acb, key, &node)) {
+        pcache_node_unref(acb->s, node);
+        return NULL;
+    }
+    return node;
+}
+
 static uint64_t ranges_overlap_size(uint64_t node1, uint32_t size1,
                                     uint64_t node2, uint32_t size2)
 {
@@ -735,6 +745,38 @@ static void pcache_send_acb_request_list(BlockDriverState *bs,
     qemu_co_mutex_unlock(&acb->requests.lock);
 }
 
+static void readahead_node_prev(PrefCacheAIOCB *acb, PCNode *node,
+                                RbNodeKey *key)
+{
+    RbNodeKey lc_key;
+    if (node->cm.key.num <= key->num) {
+        return;
+    }
+
+    lc_key.num = key->num;
+    lc_key.size = node->cm.key.num - key->num;
+
+    pcache_node_add(acb, &lc_key);
+}
+
+static void readahead_node_next(PrefCacheAIOCB *acb, PCNode *node,
+                                RbNodeKey *key, uint64_t total_sectors)
+{
+    BDRVPCacheState *s;
+    RbNodeKey lc_key;
+    if (node->cm.key.num + node->cm.key.size >= key->num + key->size) {
+        return;
+    }
+    s = acb->s;
+
+    lc_key.num = node->cm.key.num + node->cm.key.size;
+    lc_key.size = s->cfg.readahead_size;
+    if (total_sectors <= lc_key.num + lc_key.size) {
+        return;
+    }
+    pcache_node_add(acb, &lc_key);
+}
+
 static bool check_allocated_blocks(BlockDriverState *bs, int64_t sector_num,
                                    int32_t nb_sectors)
 {
@@ -777,9 +819,14 @@ static void pcache_readahead_request(BlockDriverState *bs, PrefCacheAIOCB *acb)
                                    acb->common.opaque, PCACHE_AIO_READ |
                                                        PCACHE_AIO_READAHEAD);
     if (!pcache_node_find_and_create(acb_readahead, &key, &node)) {
+        readahead_node_prev(acb_readahead, node, &key);
+        readahead_node_next(acb_readahead, node, &key, total_sectors);
+
         pcache_node_unref(s, node);
-        qemu_aio_unref(acb_readahead);
-        return;
+        if (acb_readahead->requests.cnt == 0) {
+            qemu_aio_unref(acb_readahead);
+            return;
+        }
     }
     pcache_send_acb_request_list(bs, acb_readahead);
 }
-- 
2.8.3

^ permalink raw reply related	[flat|nested] 43+ messages in thread

* [Qemu-devel] [PATCH RFC v2 17/22] block/pcache: skip readahead for non-sequential requests
  2016-08-29 17:09 [Qemu-devel] [PATCH RFC v2 00/22] I/O prefetch cache Pavel Butsykin
                   ` (15 preceding siblings ...)
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 16/22] block/pcache: pcache readahead node around Pavel Butsykin
@ 2016-08-29 17:10 ` Pavel Butsykin
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 18/22] block/pcache: add pcache skip large aio read Pavel Butsykin
                   ` (6 subsequent siblings)
  23 siblings, 0 replies; 43+ messages in thread
From: Pavel Butsykin @ 2016-08-29 17:10 UTC (permalink / raw)
  To: qemu-block, qemu-devel; +Cc: kwolf, mreitz, stefanha, den, jsnow, eblake, famz

When randomly reading data will be a lot of readahead, resulting in a loss of
productivity. In order to avoid added checking the requests line before
making the readahead. It also makes no sense to cache new requests,
because a cache hit on this data is very unlikely.

Signed-off-by: Pavel Butsykin <pbutsykin@virtuozzo.com>
---
 block/pcache.c | 141 +++++++++++++++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 138 insertions(+), 3 deletions(-)

diff --git a/block/pcache.c b/block/pcache.c
index ae7ac8d..7a317fc 100644
--- a/block/pcache.c
+++ b/block/pcache.c
@@ -73,6 +73,10 @@ typedef struct PCNode {
     CoMutex                  lock;
 } PCNode;
 
+typedef struct LRNode {
+    BlockNode cm;
+} LRNode;
+
 typedef struct ReqStor {
     struct {
         struct RbRoot root;
@@ -91,10 +95,12 @@ typedef struct BDRVPCacheState {
     BlockDriverState **bs;
 
     ReqStor pcache;
+    ReqStor lreq;
 
     struct {
         uint32_t cache_size;
         uint32_t readahead_size;
+        uint32_t lreq_pool_size;
     } cfg;
 
 #ifdef PCACHE_DEBUG
@@ -166,6 +172,7 @@ static QemuOptsList runtime_opts = {
 #define MB_BITS 20
 #define PCACHE_DEFAULT_CACHE_SIZE (4 << MB_BITS)
 #define PCACHE_DEFAULT_READAHEAD_SIZE (128 << KB_BITS)
+#define PCACHE_DEFAULT_POOL_STAT_SIZE (1 << MB_BITS)
 
 enum {
     NODE_SUCCESS_STATUS = 0,
@@ -181,6 +188,7 @@ enum {
 };
 
 #define PCNODE(_n) ((PCNode *)(_n))
+#define LRNODE(_n) ((LRNode *)(_n))
 
 static inline void pcache_node_unref(BDRVPCacheState *s, PCNode *node)
 {
@@ -262,6 +270,11 @@ static PCNode *pcache_node_search(struct RbRoot *root, RbNodeKey *key)
     return node == NULL ? NULL : pcache_node_ref(node);
 }
 
+static inline LRNode *lreq_node_search(struct RbRoot *root, RbNodeKey *key)
+{
+    return node_search(root, key);
+}
+
 static void *node_insert(struct RbRoot *root, BlockNode *node)
 {
     struct RbNode **new = &(root->rb_node), *parent = NULL;
@@ -288,6 +301,11 @@ static inline PCNode *pcache_node_insert(struct RbRoot *root, PCNode *node)
     return pcache_node_ref(node_insert(root, &node->cm));
 }
 
+static inline LRNode *lreq_node_insert(struct RbRoot *root, LRNode *node)
+{
+    return node_insert(root, &node->cm);
+}
+
 static inline void *pcache_node_alloc(RbNodeKey* key)
 {
     PCNode *node = g_slice_alloc(sizeof(*node));
@@ -364,6 +382,34 @@ static void pcache_try_shrink(BDRVPCacheState *s)
     }
 }
 
+static void lreq_try_shrink(BDRVPCacheState *s)
+{
+    while (s->lreq.curr_size > s->cfg.lreq_pool_size) {
+        LRNode *rmv_node;
+            /* XXX: need to filter large requests */
+        if (QTAILQ_EMPTY(&s->lreq.lru.list)) {
+            DPRINTF("lru lreq list is empty, but curr_size: %d\n",
+                    s->lreq.curr_size);
+            break;
+        }
+
+        qemu_co_mutex_lock(&s->lreq.lru.lock);
+        rmv_node = LRNODE(QTAILQ_LAST(&s->lreq.lru.list, lru_head));
+        qemu_co_mutex_unlock(&s->lreq.lru.lock);
+
+        atomic_sub(&s->lreq.curr_size, rmv_node->cm.nb_sectors);
+
+        qemu_co_mutex_lock(&s->lreq.lru.lock);
+        QTAILQ_REMOVE(&s->lreq.lru.list, &rmv_node->cm, entry);
+        qemu_co_mutex_unlock(&s->lreq.lru.lock);
+
+        qemu_co_mutex_lock(&s->lreq.tree.lock);
+        rb_erase(&rmv_node->cm.rb_node, &s->lreq.tree.root);
+        qemu_co_mutex_unlock(&s->lreq.tree.lock);
+        g_slice_free1(sizeof(*rmv_node), rmv_node);
+    }
+}
+
 static PrefCachePartReq *pcache_req_get(PrefCacheAIOCB *acb, PCNode *node)
 {
     PrefCachePartReq *req = g_slice_alloc(sizeof(*req));
@@ -437,6 +483,34 @@ static inline PCNode *pcache_node_add(PrefCacheAIOCB *acb, RbNodeKey *key)
     return node;
 }
 
+static LRNode *lreq_node_add(PrefCacheAIOCB *acb, RbNodeKey *key)
+{
+    BDRVPCacheState *s = acb->s;
+    LRNode *new_node = g_slice_alloc(sizeof(*new_node));
+    LRNode *found;
+
+    new_node->cm.sector_num = key->num;
+    new_node->cm.nb_sectors = key->size;
+
+    qemu_co_mutex_lock(&s->lreq.tree.lock);
+    found = lreq_node_insert(&s->lreq.tree.root, new_node);
+    qemu_co_mutex_unlock(&s->lreq.tree.lock);
+    if (found != new_node) {
+        g_slice_free1(sizeof(*new_node), new_node);
+        return NULL;
+    }
+
+    atomic_add(&s->lreq.curr_size, new_node->cm.nb_sectors);
+
+    lreq_try_shrink(s);
+
+    qemu_co_mutex_lock(&s->lreq.lru.lock);
+    QTAILQ_INSERT_HEAD(&s->lreq.lru.list, &new_node->cm, entry);
+    qemu_co_mutex_unlock(&s->lreq.lru.lock);
+
+    return new_node;
+}
+
 static uint64_t ranges_overlap_size(uint64_t node1, uint32_t size1,
                                     uint64_t node2, uint32_t size2)
 {
@@ -552,13 +626,24 @@ enum {
 
 static int32_t pcache_prefetch(PrefCacheAIOCB *acb)
 {
+    BDRVPCacheState *s = acb->s;
     RbNodeKey key;
-    PCNode *node = NULL;
+    PCNode *node;
 
     prefetch_init_key(acb, &key);
-    if (pcache_node_find_and_create(acb, &key, &node)) {
+
+    /* add request statistics */
+    lreq_node_add(acb, &key);
+
+    qemu_co_mutex_lock(&s->pcache.tree.lock); /* XXX: use get_next_node */
+    node = pcache_node_search(&s->pcache.tree.root, &key);
+    qemu_co_mutex_unlock(&s->pcache.tree.lock);
+    if (node == NULL) {
         return PREFETCH_NEW_NODE;
     }
+    if (node->status == NODE_SUCCESS_STATUS) {
+        pcache_lru_node_up(s, node);
+    }
 
     /* Node covers the whole request */
     if (node->cm.sector_num <= acb->sector_num &&
@@ -795,6 +880,30 @@ static bool check_allocated_blocks(BlockDriverState *bs, int64_t sector_num,
     return true;
 }
 
+static bool check_lreq_sequence(BDRVPCacheState *s, uint64_t sector_num)
+{
+    RbNodeKey key;
+    LRNode *node;
+    uint32_t cache_line_sz = s->cfg.readahead_size;
+
+    if (sector_num <= cache_line_sz) {
+        return false;
+    }
+            /* check is a previous cache block */
+    key.num = sector_num - cache_line_sz;
+    key.size = cache_line_sz;
+
+    qemu_co_mutex_lock(&s->lreq.tree.lock);
+    node = lreq_node_search(&s->lreq.tree.root, &key);
+    qemu_co_mutex_unlock(&s->lreq.tree.lock);
+    if (node == NULL) { /* requests isn't consistent,
+                         * most likely there is no sense to make readahead.
+                         */
+        return false;
+    }
+    return node->cm.sector_num > key.num ? false : true;
+}
+
 static void pcache_readahead_request(BlockDriverState *bs, PrefCacheAIOCB *acb)
 {
     BDRVPCacheState *s = acb->s;
@@ -803,6 +912,9 @@ static void pcache_readahead_request(BlockDriverState *bs, PrefCacheAIOCB *acb)
     uint64_t total_sectors = bdrv_nb_sectors(bs);
     PCNode *node = NULL;
 
+    if (!check_lreq_sequence(acb->s, acb->sector_num)) {
+        return;
+    }
     prefetch_init_key(acb, &key);
 
     key.num = key.num + key.size;
@@ -841,7 +953,13 @@ static BlockAIOCB *pcache_aio_readv(BlockDriverState *bs,
     PrefCacheAIOCB *acb = pcache_aio_get(bs, sector_num, qiov, nb_sectors, cb,
                                          opaque, PCACHE_AIO_READ);
     int32_t status = pcache_prefetch(acb);
-    if (status == PREFETCH_FULL_UP) {
+    if (status == PREFETCH_NEW_NODE) {
+        BlockAIOCB *ret = bdrv_aio_readv(bs->file, sector_num, qiov, nb_sectors,
+                                         cb, opaque);
+        pcache_readahead_request(bs, acb);
+        qemu_aio_unref(acb); /* XXX: fix superfluous alloc */
+        return ret;
+    } else if (status == PREFETCH_FULL_UP) {
         assert(acb->requests.cnt == 0);
         complete_aio_request(acb);
     } else {
@@ -885,8 +1003,15 @@ static void pcache_state_init(QemuOpts *opts, BDRVPCacheState *s)
     qemu_co_mutex_init(&s->pcache.lru.lock);
     s->pcache.curr_size = 0;
 
+    s->lreq.tree.root = RB_ROOT;
+    qemu_co_mutex_init(&s->lreq.tree.lock);
+    QTAILQ_INIT(&s->lreq.lru.list);
+    qemu_co_mutex_init(&s->lreq.lru.lock);
+    s->lreq.curr_size = 0;
+
     s->cfg.cache_size = cache_size >> BDRV_SECTOR_BITS;
     s->cfg.readahead_size = readahead_size >> BDRV_SECTOR_BITS;
+    s->cfg.lreq_pool_size = PCACHE_DEFAULT_POOL_STAT_SIZE >> BDRV_SECTOR_BITS;
 
 #ifdef PCACHE_DEBUG
     QTAILQ_INIT(&s->death_node_list);
@@ -946,6 +1071,16 @@ static void pcache_close(BlockDriverState *bs)
     }
     DPRINTF("used %d nodes\n", cnt);
 
+    cnt = 0;
+    if (!QTAILQ_EMPTY(&s->lreq.lru.list)) {
+        QTAILQ_FOREACH_SAFE(node, &s->lreq.lru.list, entry, next) {
+            QTAILQ_REMOVE(&s->lreq.lru.list, node, entry);
+            g_slice_free1(sizeof(*node), node);
+            cnt++;
+        }
+    }
+    DPRINTF("used %d lreq nodes\n", cnt);
+
 #ifdef PCACHE_DEBUG
     if (!QTAILQ_EMPTY(&s->death_node_list)) {
         cnt = 0;
-- 
2.8.3

^ permalink raw reply related	[flat|nested] 43+ messages in thread

* [Qemu-devel] [PATCH RFC v2 18/22] block/pcache: add pcache skip large aio read
  2016-08-29 17:09 [Qemu-devel] [PATCH RFC v2 00/22] I/O prefetch cache Pavel Butsykin
                   ` (16 preceding siblings ...)
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 17/22] block/pcache: skip readahead for non-sequential requests Pavel Butsykin
@ 2016-08-29 17:10 ` Pavel Butsykin
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 19/22] block/pcache: add pcache node assert Pavel Butsykin
                   ` (5 subsequent siblings)
  23 siblings, 0 replies; 43+ messages in thread
From: Pavel Butsykin @ 2016-08-29 17:10 UTC (permalink / raw)
  To: qemu-block, qemu-devel; +Cc: kwolf, mreitz, stefanha, den, jsnow, eblake, famz

This change will allow more efficient use of cache memory and
filter the case for which the pcache isn't efficient.  We miss requests
that are not required in the optimization and thereby reducing the number
of unnecessary readaheads.

Add pcache-max-aio-size open parameter.

Signed-off-by: Pavel Butsykin <pbutsykin@virtuozzo.com>
---
 block/pcache.c | 49 ++++++++++++++++++++++++++++++++++++++++---------
 1 file changed, 40 insertions(+), 9 deletions(-)

diff --git a/block/pcache.c b/block/pcache.c
index 7a317fc..287156a 100644
--- a/block/pcache.c
+++ b/block/pcache.c
@@ -100,6 +100,7 @@ typedef struct BDRVPCacheState {
     struct {
         uint32_t cache_size;
         uint32_t readahead_size;
+        uint32_t max_aio_size;
         uint32_t lreq_pool_size;
     } cfg;
 
@@ -144,6 +145,7 @@ static const AIOCBInfo pcache_aiocb_info = {
 
 #define PCACHE_OPT_CACHE_SIZE "pcache-full-size"
 #define PCACHE_OPT_READAHEAD_SIZE "pcache-readahead-size"
+#define PCACHE_OPT_MAX_AIO_SIZE "pcache-max-aio-size"
 
 static QemuOptsList runtime_opts = {
     .name = "pcache",
@@ -164,6 +166,11 @@ static QemuOptsList runtime_opts = {
             .type = QEMU_OPT_SIZE,
             .help = "Prefetch cache readahead size",
         },
+        {
+            .name = PCACHE_OPT_MAX_AIO_SIZE,
+            .type = QEMU_OPT_SIZE,
+            .help = "Maximum size of aio which is handled by pcache",
+        },
         { /* end of list */ }
     },
 };
@@ -173,6 +180,7 @@ static QemuOptsList runtime_opts = {
 #define PCACHE_DEFAULT_CACHE_SIZE (4 << MB_BITS)
 #define PCACHE_DEFAULT_READAHEAD_SIZE (128 << KB_BITS)
 #define PCACHE_DEFAULT_POOL_STAT_SIZE (1 << MB_BITS)
+#define PCACHE_DEFAULT_MAX_AIO_SIZE (32 << KB_BITS)
 
 enum {
     NODE_SUCCESS_STATUS = 0,
@@ -386,12 +394,7 @@ static void lreq_try_shrink(BDRVPCacheState *s)
 {
     while (s->lreq.curr_size > s->cfg.lreq_pool_size) {
         LRNode *rmv_node;
-            /* XXX: need to filter large requests */
-        if (QTAILQ_EMPTY(&s->lreq.lru.list)) {
-            DPRINTF("lru lreq list is empty, but curr_size: %d\n",
-                    s->lreq.curr_size);
-            break;
-        }
+        assert(!QTAILQ_EMPTY(&s->lreq.lru.list));
 
         qemu_co_mutex_lock(&s->lreq.lru.lock);
         rmv_node = LRNODE(QTAILQ_LAST(&s->lreq.lru.list, lru_head));
@@ -943,6 +946,23 @@ static void pcache_readahead_request(BlockDriverState *bs, PrefCacheAIOCB *acb)
     pcache_send_acb_request_list(bs, acb_readahead);
 }
 
+static inline bool pcache_skip_aio_read(BlockDriverState *bs,
+                                        uint64_t sector_num,
+                                        uint32_t nb_sectors)
+{
+    BDRVPCacheState *s = bs->opaque;
+
+    if (nb_sectors > s->cfg.max_aio_size) {
+        return true;
+    }
+
+    if (bdrv_nb_sectors(bs) < sector_num + nb_sectors) {
+        return true;
+    }
+
+    return false;
+}
+
 static BlockAIOCB *pcache_aio_readv(BlockDriverState *bs,
                                     int64_t sector_num,
                                     QEMUIOVector *qiov,
@@ -950,9 +970,16 @@ static BlockAIOCB *pcache_aio_readv(BlockDriverState *bs,
                                     BlockCompletionFunc *cb,
                                     void *opaque)
 {
-    PrefCacheAIOCB *acb = pcache_aio_get(bs, sector_num, qiov, nb_sectors, cb,
-                                         opaque, PCACHE_AIO_READ);
-    int32_t status = pcache_prefetch(acb);
+    PrefCacheAIOCB *acb;
+    int32_t status;
+
+    if (pcache_skip_aio_read(bs, sector_num, nb_sectors)) {
+        return bdrv_aio_readv(bs->file, sector_num, qiov, nb_sectors,
+                              cb, opaque);
+    }
+    acb = pcache_aio_get(bs, sector_num, qiov, nb_sectors, cb,
+                         opaque, PCACHE_AIO_READ);
+    status = pcache_prefetch(acb);
     if (status == PREFETCH_NEW_NODE) {
         BlockAIOCB *ret = bdrv_aio_readv(bs->file, sector_num, qiov, nb_sectors,
                                          cb, opaque);
@@ -993,9 +1020,12 @@ static void pcache_state_init(QemuOpts *opts, BDRVPCacheState *s)
                                             PCACHE_DEFAULT_CACHE_SIZE);
     uint64_t readahead_size = qemu_opt_get_size(opts, PCACHE_OPT_READAHEAD_SIZE,
                                                 PCACHE_DEFAULT_READAHEAD_SIZE);
+    uint64_t max_aio_size = qemu_opt_get_size(opts, PCACHE_OPT_MAX_AIO_SIZE,
+                                              PCACHE_DEFAULT_MAX_AIO_SIZE);
     DPRINTF("pcache configure:\n");
     DPRINTF("pcache-full-size = %jd\n", cache_size);
     DPRINTF("readahead_size = %jd\n", readahead_size);
+    DPRINTF("max_aio_size = %jd\n", max_aio_size);
 
     s->pcache.tree.root = RB_ROOT;
     qemu_co_mutex_init(&s->pcache.tree.lock);
@@ -1012,6 +1042,7 @@ static void pcache_state_init(QemuOpts *opts, BDRVPCacheState *s)
     s->cfg.cache_size = cache_size >> BDRV_SECTOR_BITS;
     s->cfg.readahead_size = readahead_size >> BDRV_SECTOR_BITS;
     s->cfg.lreq_pool_size = PCACHE_DEFAULT_POOL_STAT_SIZE >> BDRV_SECTOR_BITS;
+    s->cfg.max_aio_size = max_aio_size >> BDRV_SECTOR_BITS;
 
 #ifdef PCACHE_DEBUG
     QTAILQ_INIT(&s->death_node_list);
-- 
2.8.3

^ permalink raw reply related	[flat|nested] 43+ messages in thread

* [Qemu-devel] [PATCH RFC v2 19/22] block/pcache: add pcache node assert
  2016-08-29 17:09 [Qemu-devel] [PATCH RFC v2 00/22] I/O prefetch cache Pavel Butsykin
                   ` (17 preceding siblings ...)
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 18/22] block/pcache: add pcache skip large aio read Pavel Butsykin
@ 2016-08-29 17:10 ` Pavel Butsykin
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 20/22] block/pcache: implement pcache error handling of aio cb Pavel Butsykin
                   ` (4 subsequent siblings)
  23 siblings, 0 replies; 43+ messages in thread
From: Pavel Butsykin @ 2016-08-29 17:10 UTC (permalink / raw)
  To: qemu-block, qemu-devel; +Cc: kwolf, mreitz, stefanha, den, jsnow, eblake, famz

In case of node assert we will print the fields of a pcache node, this can be
useful for catching bugs.

Signed-off-by: Pavel Butsykin <pbutsykin@virtuozzo.com>
---
 block/pcache.c | 52 +++++++++++++++++++++++++++++++++++-----------------
 1 file changed, 35 insertions(+), 17 deletions(-)

diff --git a/block/pcache.c b/block/pcache.c
index 287156a..7b4a9a9 100644
--- a/block/pcache.c
+++ b/block/pcache.c
@@ -38,6 +38,24 @@
 #define DPRINTF(fmt, ...) do { } while (0)
 #endif
 
+#define NODE_PRINT(_node) \
+    printf("node:\n"      \
+           "num: %jd size: %d\n"   \
+           "ref: %d\nstatus: %d\n" \
+           "node_wait_cnt: %d\n"   \
+           "data: %p\nlock %u\n",  \
+           (_node)->cm.sector_num, (_node)->cm.nb_sectors,    \
+           (_node)->ref, (_node)->status, (_node)->wait.cnt,  \
+           (_node)->data, (_node)->lock.locked)
+
+#define NODE_ASSERT(_assert, _node) \
+    do {                            \
+        if (!(_assert)) {           \
+            NODE_PRINT(_node);      \
+            assert(_assert);        \
+        }                           \
+    } while (0)
+
 typedef struct RbNodeKey {
     uint64_t    num;
     uint32_t    size;
@@ -201,7 +219,7 @@ enum {
 static inline void pcache_node_unref(BDRVPCacheState *s, PCNode *node)
 {
     if (atomic_fetch_dec(&node->ref) == 0) {
-        assert(node->status == NODE_REMOVE_STATUS);
+        NODE_ASSERT(node->status == NODE_REMOVE_STATUS, node);
 
         node->status = NODE_GHOST_STATUS;
 
@@ -217,8 +235,8 @@ static inline void pcache_node_unref(BDRVPCacheState *s, PCNode *node)
 
 static inline PCNode *pcache_node_ref(PCNode *node)
 {
-    assert(node->status == NODE_SUCCESS_STATUS ||
-           node->status == NODE_WAIT_STATUS);
+    NODE_ASSERT(node->status == NODE_SUCCESS_STATUS ||
+                node->status == NODE_WAIT_STATUS, node);
     atomic_inc(&node->ref);
 
     return node;
@@ -422,8 +440,8 @@ static PrefCachePartReq *pcache_req_get(PrefCacheAIOCB *acb, PCNode *node)
     req->node = node;
     req->acb = acb;
 
-    assert(acb->sector_num <= node->cm.sector_num + node->cm.nb_sectors);
-
+    NODE_ASSERT(acb->sector_num <= node->cm.sector_num + node->cm.nb_sectors,
+                node);
     qemu_iovec_init(&req->qiov, 1);
     qemu_iovec_add(&req->qiov, node->data,
                    node->cm.nb_sectors << BDRV_SECTOR_BITS);
@@ -554,10 +572,10 @@ static inline void pcache_node_read_wait(PrefCacheAIOCB *acb, PCNode *node)
 
 static void pcache_node_read(PrefCacheAIOCB *acb, PCNode* node)
 {
-    assert(node->status == NODE_SUCCESS_STATUS ||
-           node->status == NODE_WAIT_STATUS    ||
-           node->status == NODE_REMOVE_STATUS);
-    assert(node->data != NULL);
+    NODE_ASSERT(node->status == NODE_SUCCESS_STATUS ||
+                node->status == NODE_WAIT_STATUS    ||
+                node->status == NODE_REMOVE_STATUS, node);
+    NODE_ASSERT(node->data != NULL, node);
 
     qemu_co_mutex_lock(&node->lock);
     if (node->status == NODE_WAIT_STATUS) {
@@ -694,13 +712,13 @@ static void pcache_complete_acb_wait_queue(BDRVPCacheState *s, PCNode *node)
 
         pcache_node_read_buf(wait_acb, node);
 
-        assert(node->ref != 0);
+        NODE_ASSERT(node->ref != 0, node);
         pcache_node_unref(s, node);
 
         complete_aio_request(wait_acb);
         atomic_dec(&node->wait.cnt);
     }
-    assert(atomic_read(&node->wait.cnt) == 0);
+    NODE_ASSERT(atomic_read(&node->wait.cnt) == 0, node);
 }
 
 static void pcache_node_submit(PrefCachePartReq *req)
@@ -709,8 +727,8 @@ static void pcache_node_submit(PrefCachePartReq *req)
     BDRVPCacheState *s = req->acb->s;
 
     assert(node != NULL);
-    assert(atomic_read(&node->ref) != 0);
-    assert(node->data != NULL);
+    NODE_ASSERT(atomic_read(&node->ref) != 0, node);
+    NODE_ASSERT(node->data != NULL, node);
 
     qemu_co_mutex_lock(&node->lock);
     if (node->status == NODE_WAIT_STATUS) {
@@ -733,7 +751,7 @@ static void pcache_merge_requests(PrefCacheAIOCB *acb)
         QTAILQ_REMOVE(&acb->requests.list, req, entry);
 
         assert(req != NULL);
-        assert(node->status == NODE_WAIT_STATUS);
+        NODE_ASSERT(node->status == NODE_WAIT_STATUS, node);
 
         pcache_node_submit(req);
 
@@ -768,7 +786,7 @@ static void pcache_try_node_drop(PrefCacheAIOCB *acb)
             return;
         }
         if (node->status != NODE_WAIT_STATUS) {
-            assert(node->status == NODE_SUCCESS_STATUS);
+            NODE_ASSERT(node->status == NODE_SUCCESS_STATUS, node);
             pcache_node_drop(s, node);
         }
         key.num = node->cm.sector_num + node->cm.nb_sectors;
@@ -1081,8 +1099,8 @@ fail:
 
 static void pcache_node_check_and_free(BDRVPCacheState *s, PCNode *node)
 {
-    assert(node->status == NODE_SUCCESS_STATUS);
-    assert(node->ref == 0);
+    NODE_ASSERT(node->status == NODE_SUCCESS_STATUS, node);
+    NODE_ASSERT(node->ref == 0, node);
 
     node->status = NODE_REMOVE_STATUS;
     rb_erase(&node->cm.rb_node, &s->pcache.tree.root);
-- 
2.8.3

^ permalink raw reply related	[flat|nested] 43+ messages in thread

* [Qemu-devel] [PATCH RFC v2 20/22] block/pcache: implement pcache error handling of aio cb
  2016-08-29 17:09 [Qemu-devel] [PATCH RFC v2 00/22] I/O prefetch cache Pavel Butsykin
                   ` (18 preceding siblings ...)
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 19/22] block/pcache: add pcache node assert Pavel Butsykin
@ 2016-08-29 17:10 ` Pavel Butsykin
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 21/22] block/pcache: add write through node Pavel Butsykin
                   ` (3 subsequent siblings)
  23 siblings, 0 replies; 43+ messages in thread
From: Pavel Butsykin @ 2016-08-29 17:10 UTC (permalink / raw)
  To: qemu-block, qemu-devel; +Cc: kwolf, mreitz, stefanha, den, jsnow, eblake, famz

Added error handling aio requests to pcache driver. If the request fails,
then fails all pending requests.

Signed-off-by: Pavel Butsykin <pbutsykin@virtuozzo.com>
---
 block/pcache.c | 33 ++++++++++++++++++++++-----------
 1 file changed, 22 insertions(+), 11 deletions(-)

diff --git a/block/pcache.c b/block/pcache.c
index 7b4a9a9..c5fe689 100644
--- a/block/pcache.c
+++ b/block/pcache.c
@@ -683,7 +683,7 @@ static void pcache_aio_bh(void *opaque)
 {
     PrefCacheAIOCB *acb = opaque;
     qemu_bh_delete(acb->bh);
-    acb->common.cb(acb->common.opaque, 0);
+    acb->common.cb(acb->common.opaque, acb->ret);
     qemu_aio_unref(acb);
 }
 
@@ -696,7 +696,8 @@ static void complete_aio_request(PrefCacheAIOCB *acb)
     }
 }
 
-static void pcache_complete_acb_wait_queue(BDRVPCacheState *s, PCNode *node)
+static void pcache_complete_acb_wait_queue(BDRVPCacheState *s, PCNode *node,
+                                           int ret)
 {
     ACBEntryLink *link, *next;
 
@@ -710,7 +711,11 @@ static void pcache_complete_acb_wait_queue(BDRVPCacheState *s, PCNode *node)
         QTAILQ_REMOVE(&node->wait.list, link, entry);
         g_slice_free1(sizeof(*link), link);
 
-        pcache_node_read_buf(wait_acb, node);
+        if (ret == 0) {
+            pcache_node_read_buf(wait_acb, node);
+        } else {  /* write only fail, because next request can rewrite error */
+            wait_acb->ret = ret;
+        }
 
         NODE_ASSERT(node->ref != 0, node);
         pcache_node_unref(s, node);
@@ -753,16 +758,17 @@ static void pcache_merge_requests(PrefCacheAIOCB *acb)
         assert(req != NULL);
         NODE_ASSERT(node->status == NODE_WAIT_STATUS, node);
 
-        pcache_node_submit(req);
-
-        if (!(acb->aio_type & PCACHE_AIO_READAHEAD)) {
-            pcache_node_read_buf(acb, node);
+        if (acb->ret == 0) {
+            pcache_node_submit(req);
+            if (!(acb->aio_type & PCACHE_AIO_READAHEAD)) {
+                pcache_node_read_buf(acb, node);
+            }
+        } else {
+            pcache_node_drop(acb->s, node);
         }
+        pcache_complete_acb_wait_queue(acb->s, node, acb->ret);
 
-        pcache_complete_acb_wait_queue(acb->s, node);
-
-        pcache_node_unref(acb->s, req->node);
-
+        pcache_node_unref(acb->s, node);
         g_slice_free1(sizeof(*req), req);
     }
     qemu_co_mutex_unlock(&acb->requests.lock);
@@ -799,6 +805,11 @@ static void pcache_aio_cb(void *opaque, int ret)
 {
     PrefCacheAIOCB *acb = opaque;
 
+    if (ret != 0) {
+        acb->ret = ret;
+        DPRINTF("pcache aio_cb(num: %jd nb: %d) err: %d",
+                acb->sector_num, acb->nb_sectors, ret);
+    }
     if (acb->aio_type & PCACHE_AIO_READ) {
         if (atomic_dec_fetch(&acb->requests.cnt) > 0) {
             return;
-- 
2.8.3

^ permalink raw reply related	[flat|nested] 43+ messages in thread

* [Qemu-devel] [PATCH RFC v2 21/22] block/pcache: add write through node
  2016-08-29 17:09 [Qemu-devel] [PATCH RFC v2 00/22] I/O prefetch cache Pavel Butsykin
                   ` (19 preceding siblings ...)
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 20/22] block/pcache: implement pcache error handling of aio cb Pavel Butsykin
@ 2016-08-29 17:10 ` Pavel Butsykin
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 22/22] block/pcache: drop used pcache node Pavel Butsykin
                   ` (2 subsequent siblings)
  23 siblings, 0 replies; 43+ messages in thread
From: Pavel Butsykin @ 2016-08-29 17:10 UTC (permalink / raw)
  To: qemu-block, qemu-devel; +Cc: kwolf, mreitz, stefanha, den, jsnow, eblake, famz

Write-through is another way to keep the cache up-to-date. Even if this
case will be rare, a node write buf is easier than a node drop.

Signed-off-by: Pavel Butsykin <pbutsykin@virtuozzo.com>
---
 block/pcache.c | 28 ++++++++++++++++++++--------
 1 file changed, 20 insertions(+), 8 deletions(-)

diff --git a/block/pcache.c b/block/pcache.c
index c5fe689..2b2edf5 100644
--- a/block/pcache.c
+++ b/block/pcache.c
@@ -200,6 +200,8 @@ static QemuOptsList runtime_opts = {
 #define PCACHE_DEFAULT_POOL_STAT_SIZE (1 << MB_BITS)
 #define PCACHE_DEFAULT_MAX_AIO_SIZE (32 << KB_BITS)
 
+#define PCACHE_WRITE_THROUGH_NODE TRUE
+
 enum {
     NODE_SUCCESS_STATUS = 0,
     NODE_WAIT_STATUS    = 1,
@@ -538,7 +540,12 @@ static uint64_t ranges_overlap_size(uint64_t node1, uint32_t size1,
     return MIN(node1 + size1, node2 + size2) - MAX(node1, node2);
 }
 
-static inline void pcache_node_read_buf(PrefCacheAIOCB *acb, PCNode* node)
+enum {
+    NODE_READ_BUF  = 1,
+    NODE_WRITE_BUF = 2
+};
+
+static void pcache_node_rw_buf(PrefCacheAIOCB *acb, PCNode* node, uint32_t type)
 {
     uint64_t qiov_offs = 0, node_offs = 0;
     uint32_t size;
@@ -554,8 +561,9 @@ static inline void pcache_node_read_buf(PrefCacheAIOCB *acb, PCNode* node)
            << BDRV_SECTOR_BITS;
 
     qemu_co_mutex_lock(&node->lock); /* XXX: use rw lock */
-    copy = \
-        qemu_iovec_from_buf(acb->qiov, qiov_offs, node->data + node_offs, size);
+    copy = type & NODE_READ_BUF ?
+        qemu_iovec_from_buf(acb->qiov, qiov_offs, node->data + node_offs, size)
+        : qemu_iovec_to_buf(acb->qiov, qiov_offs, node->data + node_offs, size);
     qemu_co_mutex_unlock(&node->lock);
     assert(copy == size);
 }
@@ -586,7 +594,7 @@ static void pcache_node_read(PrefCacheAIOCB *acb, PCNode* node)
     }
     qemu_co_mutex_unlock(&node->lock);
 
-    pcache_node_read_buf(acb, node);
+    pcache_node_rw_buf(acb, node, NODE_READ_BUF);
     pcache_node_unref(acb->s, node);
 }
 
@@ -712,7 +720,7 @@ static void pcache_complete_acb_wait_queue(BDRVPCacheState *s, PCNode *node,
         g_slice_free1(sizeof(*link), link);
 
         if (ret == 0) {
-            pcache_node_read_buf(wait_acb, node);
+            pcache_node_rw_buf(wait_acb, node, NODE_READ_BUF);
         } else {  /* write only fail, because next request can rewrite error */
             wait_acb->ret = ret;
         }
@@ -761,7 +769,7 @@ static void pcache_merge_requests(PrefCacheAIOCB *acb)
         if (acb->ret == 0) {
             pcache_node_submit(req);
             if (!(acb->aio_type & PCACHE_AIO_READAHEAD)) {
-                pcache_node_read_buf(acb, node);
+                pcache_node_rw_buf(acb, node, NODE_READ_BUF);
             }
         } else {
             pcache_node_drop(acb->s, node);
@@ -774,7 +782,7 @@ static void pcache_merge_requests(PrefCacheAIOCB *acb)
     qemu_co_mutex_unlock(&acb->requests.lock);
 }
 
-static void pcache_try_node_drop(PrefCacheAIOCB *acb)
+static void pcache_update_node_state(PrefCacheAIOCB *acb)
 {
     BDRVPCacheState *s = acb->s;
     RbNodeKey key;
@@ -793,7 +801,11 @@ static void pcache_try_node_drop(PrefCacheAIOCB *acb)
         }
         if (node->status != NODE_WAIT_STATUS) {
             NODE_ASSERT(node->status == NODE_SUCCESS_STATUS, node);
+#if PCACHE_WRITE_THROUGH_NODE
+            pcache_node_rw_buf(acb, node, NODE_WRITE_BUF);
+#else
             pcache_node_drop(s, node);
+#endif
         }
         key.num = node->cm.sector_num + node->cm.nb_sectors;
 
@@ -820,7 +832,7 @@ static void pcache_aio_cb(void *opaque, int ret)
             return;
         }
     } else {        /* PCACHE_AIO_WRITE */
-        pcache_try_node_drop(acb); /* XXX: use write through */
+        pcache_update_node_state(acb);
     }
 
     complete_aio_request(acb);
-- 
2.8.3

^ permalink raw reply related	[flat|nested] 43+ messages in thread

* [Qemu-devel] [PATCH RFC v2 22/22] block/pcache: drop used pcache node
  2016-08-29 17:09 [Qemu-devel] [PATCH RFC v2 00/22] I/O prefetch cache Pavel Butsykin
                   ` (20 preceding siblings ...)
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 21/22] block/pcache: add write through node Pavel Butsykin
@ 2016-08-29 17:10 ` Pavel Butsykin
  2016-09-01 14:17 ` [Qemu-devel] [PATCH RFC v2 00/22] I/O prefetch cache Kevin Wolf
  2016-09-01 15:26 ` Avi Kivity
  23 siblings, 0 replies; 43+ messages in thread
From: Pavel Butsykin @ 2016-08-29 17:10 UTC (permalink / raw)
  To: qemu-block, qemu-devel; +Cc: kwolf, mreitz, stefanha, den, jsnow, eblake, famz

The pcache is directed to certain situations to sequential reads.
This concept allows to drop parts of the cache that were already used, which
will reduce the size of cache and the number of displaced nodes.

Signed-off-by: Pavel Butsykin <pbutsykin@virtuozzo.com>
---
 block/pcache.c | 24 +++++++++++++++++++-----
 1 file changed, 19 insertions(+), 5 deletions(-)

diff --git a/block/pcache.c b/block/pcache.c
index 2b2edf5..3fad4ca 100644
--- a/block/pcache.c
+++ b/block/pcache.c
@@ -88,6 +88,7 @@ typedef struct PCNode {
     uint32_t                 status;
     uint32_t                 ref;
     uint8_t                  *data;
+    uint32_t                 rdcnt;
     CoMutex                  lock;
 } PCNode;
 
@@ -342,6 +343,7 @@ static inline void *pcache_node_alloc(RbNodeKey* key)
     node->cm.nb_sectors = key->size;
     node->ref = 0;
     node->status = NODE_WAIT_STATUS;
+    node->rdcnt = 0;
     qemu_co_mutex_init(&node->lock);
     node->data = g_malloc(node->cm.nb_sectors << BDRV_SECTOR_BITS);
     node->wait.cnt = 0;
@@ -560,11 +562,23 @@ static void pcache_node_rw_buf(PrefCacheAIOCB *acb, PCNode* node, uint32_t type)
                                node->cm.sector_num, node->cm.nb_sectors)
            << BDRV_SECTOR_BITS;
 
-    qemu_co_mutex_lock(&node->lock); /* XXX: use rw lock */
-    copy = type & NODE_READ_BUF ?
-        qemu_iovec_from_buf(acb->qiov, qiov_offs, node->data + node_offs, size)
-        : qemu_iovec_to_buf(acb->qiov, qiov_offs, node->data + node_offs, size);
-    qemu_co_mutex_unlock(&node->lock);
+    if (type & NODE_READ_BUF) {
+        qemu_co_mutex_lock(&node->lock); /* XXX: use rw lock */
+        copy = qemu_iovec_from_buf(acb->qiov, qiov_offs,
+                                   node->data + node_offs, size);
+        qemu_co_mutex_unlock(&node->lock);
+
+        /* pcache node is no longer needed, when it was all read */
+        atomic_add(&node->rdcnt, size >> BDRV_SECTOR_BITS);
+        if (node->rdcnt >= node->cm.nb_sectors) {
+            pcache_node_drop(acb->s, node);
+        }
+    } else {
+        qemu_co_mutex_lock(&node->lock); /* XXX: use rw lock */
+        copy = qemu_iovec_to_buf(acb->qiov, qiov_offs,
+                                 node->data + node_offs, size);
+        qemu_co_mutex_unlock(&node->lock);
+    }
     assert(copy == size);
 }
 
-- 
2.8.3

^ permalink raw reply related	[flat|nested] 43+ messages in thread

* Re: [Qemu-devel] [PATCH RFC v2 00/22] I/O prefetch cache
  2016-08-29 17:09 [Qemu-devel] [PATCH RFC v2 00/22] I/O prefetch cache Pavel Butsykin
                   ` (21 preceding siblings ...)
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 22/22] block/pcache: drop used pcache node Pavel Butsykin
@ 2016-09-01 14:17 ` Kevin Wolf
  2016-09-06 12:36   ` Pavel Butsykin
  2016-09-01 15:26 ` Avi Kivity
  23 siblings, 1 reply; 43+ messages in thread
From: Kevin Wolf @ 2016-09-01 14:17 UTC (permalink / raw)
  To: Pavel Butsykin
  Cc: qemu-block, qemu-devel, mreitz, stefanha, den, jsnow, eblake, famz

Am 29.08.2016 um 19:09 hat Pavel Butsykin geschrieben:
> The prefetch cache aims to improve the performance of sequential read data.
> Of most interest here are the requests of a small size of data for sequential
> read, such requests can be optimized by extending them and moving into 
> the prefetch cache.
> [...]

Before I start actually looking into your code, I read both this cover
letter and your KVM Forum slides, and as far as I can say, the
fundamental idea and your design look sound to me. It was a good read,
too, so thanks for writing all the explanations!

One thing that came to mind is that we have more caches elsewhere, most
notably the qcow2 metadata cache, and I still have that private branch
that adds a qcow2 data cache, too (for merging small allocating writes,
if you remember my talk from last year). However, the existing
Qcow2Cache has a few problems like being tied to the cluster size.

So I wondered how hard you think it would be to split pcache into a
reusable cache core that just manages the contents based on calls like
"allocate/drop/get cached memory for bytes x...y", and the actual
pcache code that implements the read-ahead policy. Then qcow2 could
reuse the core and use its own policy about what metadata to cache etc.

Of course, this can be done incrementally on top and should by no means
block the inclusion of your code, but if it's possible, it might be an
interesting thought to keep in mind.

Kevin

^ permalink raw reply	[flat|nested] 43+ messages in thread

* Re: [Qemu-devel] [PATCH RFC v2 01/22] block/pcache: empty pcache driver filter
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 01/22] block/pcache: empty pcache driver filter Pavel Butsykin
@ 2016-09-01 14:31   ` Kevin Wolf
  2016-09-06 15:20     ` Pavel Butsykin
  0 siblings, 1 reply; 43+ messages in thread
From: Kevin Wolf @ 2016-09-01 14:31 UTC (permalink / raw)
  To: Pavel Butsykin
  Cc: qemu-block, qemu-devel, mreitz, stefanha, den, jsnow, eblake, famz

Am 29.08.2016 um 19:10 hat Pavel Butsykin geschrieben:
> The basic version of pcache driver for easy preparation of a patch set.
> 
> Signed-off-by: Pavel Butsykin <pbutsykin@virtuozzo.com>

> +    .bdrv_aio_readv                     = pcache_aio_readv,
> +    .bdrv_aio_writev                    = pcache_aio_writev,

Can you please use .bdrv_co_preadv/pwritev instead and make everything
based on bytes rather than sectors?

Internally you can still spawn AIO requests to achieve the same
parallelism as you have now (we'll just need new byte-based
bdrv_co_aio_prw_vector() wrappers, but the functionality is there) and I
don't think making the outer layer coroutine based would be too hard. In
fact it might even simplify some code.

Kevin

^ permalink raw reply	[flat|nested] 43+ messages in thread

* Re: [Qemu-devel] [PATCH RFC v2 03/22] util/rbtree: add rbtree from linux kernel
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 03/22] util/rbtree: add rbtree from linux kernel Pavel Butsykin
@ 2016-09-01 14:37   ` Kevin Wolf
  0 siblings, 0 replies; 43+ messages in thread
From: Kevin Wolf @ 2016-09-01 14:37 UTC (permalink / raw)
  To: Pavel Butsykin
  Cc: qemu-block, qemu-devel, mreitz, stefanha, den, jsnow, eblake, famz

Am 29.08.2016 um 19:10 hat Pavel Butsykin geschrieben:
> Why don't we use rbtree from glib? We need  pointer to the parent node.
> For optimal implementation storing of cached chunks in the rbtree
> need to get next and previous nodes and content of parent node
> is very useful for effective implementation of these functions. In this
> implementation of rbtree (unlike rbtree of glib) the node contains a pointer
> to parent node.  Moreover, this rbtree allows more flexibility to
> work with an algorithm because to use rbtrees you'll have to implement
> your own insert and search cores. This will avoid us to use callbacks and
> to drop drammatically performances.
> 
> Signed-off-by: Pavel Butsykin <pbutsykin@virtuozzo.com>

General note (only having looked at the diffstat): We need to make sure
that new files that this series adds are covered by MAINTAINERS entries.

Kevin

^ permalink raw reply	[flat|nested] 43+ messages in thread

* Re: [Qemu-devel] [PATCH RFC v2 00/22] I/O prefetch cache
  2016-08-29 17:09 [Qemu-devel] [PATCH RFC v2 00/22] I/O prefetch cache Pavel Butsykin
                   ` (22 preceding siblings ...)
  2016-09-01 14:17 ` [Qemu-devel] [PATCH RFC v2 00/22] I/O prefetch cache Kevin Wolf
@ 2016-09-01 15:26 ` Avi Kivity
  2016-09-06 12:40   ` Pavel Butsykin
  23 siblings, 1 reply; 43+ messages in thread
From: Avi Kivity @ 2016-09-01 15:26 UTC (permalink / raw)
  To: Pavel Butsykin, qemu-block, qemu-devel
  Cc: kwolf, famz, mreitz, stefanha, den, jsnow

On 08/29/2016 08:09 PM, Pavel Butsykin wrote:
> The prefetch cache aims to improve the performance of sequential read data.
> Of most interest here are the requests of a small size of data for sequential
> read, such requests can be optimized by extending them and moving into
> the prefetch cache. However, there are 2 issues:
>   - In aggregate only a small portion of requests is sequential, so delays caused
>     by the need to read more volumes of data will lead to an overall decrease
>     in performance.
>   - The presence of redundant data in the cache memory with a large number of
>     random requests.
> This pcache implementation solves the above and other problems prefetching data.
> The pcache algorithm can be summarised by the following main steps.
>
> 1. Monitor I/O requests to identify typical sequences.
> This implementation of prefetch cache works at the storage system level and has
> information only about the physical block addresses of I/O requests. Statistics
> are collected only from read requests to a maximum size of 32kb(by default),
> each request that matches the criteria falls into a pool of requests. In order
> to store requests statistic used by the rb-tree(lreq.tree), it's simple but for
> this issue a quite efficient data structure.
>
> 2. Identifying sequential I/O streams.
> For each read request to be carried out attempting to lift the chain sequence
> from lreq.tree, where this request will be element of a sequential chain of
> requests. The key to search for consecutive requests is the area of sectors
> preceding the current request. The size of this area should not be too small to
> avoid false readahead. The sequential stream data requests can be identified
> even when a large number of random requests. For example, if there is access to
> the blocks 100, 1157, 27520, 4, 101, 312, 1337, 102, in the context of request
> processing 102 will be identified the chain of sequential requests 100, 101. 102
> and then should a decision be made to do readahead. Also a situation may arise
> when multiple applications A, B, C simultaneously perform sequential read of
> data. For each separate application that will be sequential read data
> A(100, 101, 102), B(300, 301, 302), C(700, 701, 702), but for block devices it
> may look like a random data reading: 100,300,700,101,301,701,102,302,702.
> In this case, the sequential streams will also be recognised because location
> requests in the rb-tree will allow to separate the sequential I/O streams.
>
> 3. Do the readahead into the cache for recognized sequential data streams.
> After the issue of the detection of pcache case was resolved, need using larger
> requests to bring data into the cache. In this implementation the pcache used
> readahead instead of the extension request, therefore the request goes as is.
> There is not any reason to put data in the cache that will never be picked up,
> but this will always happen in the case of extension requests. In order to store
> areas of cached blocks is also used by the rb-tree(pcache.tree), it's simple but
> for this issue a quite efficient data structure.
>
> 4. Control size of the prefetch cache pool and the requests statistic pool
> For control the border of the pool statistic of requests, the data of requests
> are placed and replaced according to the FIFO principle, everything is simple.
> For control the boundaries of the memory cache used LRU list, it allows to limit
> the max amount memory that we can allocate for pcache. But the LRU is there
> mainly to prevent displacement of the cache blocks that was read partially.
> The main way the memory is pushed out immediately after use, as soon as a chunk
> of memory from the cache has been completely read, since the probability of
> repetition of the request is very low. Cases when one and the same portion of
> the cache memory has been read several times are not optimized and do not apply
> to the cases that can optimize the pcache. Thus, using a cache memory of small
> volume, by the optimization of the operations read-ahead and clear memory, we
> can read entire volumes of data, providing a 100% cache hit. Also does not
> decrease the effectiveness of random read requests.
>
> PCache is implemented as a qemu block filter driver, has some configurable
> parameters, such as: total cache size, readahead size, maximum size of block
> that can be processed.
>
> For performance evaluation has been used several test cases with different
> sequential and random read data on SSD disk. Here are the results of tests and
> qemu parameters:
>
> qemu parameters:
> -M pc-i440fx-2.4 --enable-kvm -smp 4 -m 1024
> -drive file=centos7.qcow2,if=none,id=drive-virtio-disk0,format=qcow2,cache=none,
>         aio=native,pcache-full-size=4MB,pcache-readahead-size=128KB,
>         pcache-max-aio-size=32KB
> -device virtio-blk-pci,scsi=off,bus=pci.0,addr=0x8,drive=drive-virtio-disk0,
>          id=virtio-disk0
> (-set device.virtio-disk0.x-data-plane=on)
>
> ********************************************************************************
> * Testcase                        * Results in iops                            *
> *                                 **********************************************
> *                                 * clean qemu   * pcache       * x-data-plane *
> ********************************************************************************
> * Create/open 16 file(s) of total * 25514 req/s  * 85659 req/s  * 28249 req/s  *
> * size 2048.00 MB named           * 25692 req/s  * 89064 req/s  * 27950 req/s  *
> * /tmp/tmp.tmp, start 4 thread(s) * 25836 req/s  * 84142 req/s  * 28120 req/s  *
> * and do uncached sequential read *              *              *              *
> * by 4KB blocks                   *              *              *              *
> ********************************************************************************
> * Create/open 16 file(s) of total * 56006 req/s  * 92137 req/s  * 56992 req/s  *
> * size 2048.00 MB named           * 55335 req/s  * 92269 req/s  * 57023 req/s  *
> * /tmp/tmp.tmp, start 4 thread(s) * 55731 req/s  * 98722 req/s  * 56593 req/s  *
> * and do uncached sequential read *              *              *              *
> * by 4KB blocks with constant     *              *              *              *
> ********************************************************************************
> * Create/open 16 file(s) of total * 14104 req/s  * 14164 req/s  * 13914 req/s  *
> * size 2048.00 MB named           * 14130 req/s  * 14232 req/s  * 13613 req/s  *
> * /tmp/tmp.tmp, start 4 thread(s) * 14183 req/s  * 14080 req/s  * 13374 req/s  *
> * and do uncached random read by  *              *              *              *
> * 4KB blocks                      *              *              *              *
> ********************************************************************************
> * Create/open 16 file(s) of total * 23480 req/s  * 23483 req/s  * 20887 req/s  *
> * size 2048.00 MB named           * 23070 req/s  * 22432 req/s  * 21127 req/s  *
> * /tmp/tmp.tmp, start 4 thread(s) * 24090 req/s  * 23499 req/s  * 23415 req/s  *
> * and do uncached random read by  *              *              *              *
> * 4KB blocks with constant queue  *              *              *              *
> * len 32                          *              *              *              *
> ********************************************************************************


I note, in your tests, you use uncached sequential reads.  But are 
uncached sequential reads with a small block size common?

Consider the case of cached sequential reads.  Here, the guest OS will 
issue read-aheads.  pcache will detect them and issue its own 
read-aheads, both layers will read ahead more than necessary, so pcache 
is adding extra I/O and memory copies here.

So I'm wondering about the use case.  Guest userspace applications which 
do uncached reads will typically manage their own read-ahead; and cached 
reads have the kernel reading ahead for them, with the benefit of 
knowing the file layout.  That leaves dd iflag=direct, but is it such an 
important application?

> TODO list:
> - add tracepoints
> - add migration support
> - add more explanations in the commit messages
> - get rid of the additional allocation in pcache_node_find_and_create() and
>    pcache_aio_readv()
>
> Changes from v1:
> - Fix failed automatic build test (11)
>
> Pavel Butsykin (22):
>    block/pcache: empty pcache driver filter
>    block/pcache: add own AIOCB block
>    util/rbtree: add rbtree from linux kernel
>    block/pcache: add pcache debug build
>    block/pcache: add aio requests into cache
>    block/pcache: restrict cache size
>    block/pcache: introduce LRU as method of memory
>    block/pcache: implement pickup parts of the cache
>    block/pcache: separation AIOCB on requests
>    block/pcache: add check node leak
>    add QEMU style defines for __sync_add_and_fetch
>    block/pcache: implement read cache to qiov and drop node during aio
>      write
>    block/pcache: add generic request complete
>    block/pcache: add support for rescheduling requests
>    block/pcache: simple readahead one chunk forward
>    block/pcache: pcache readahead node around
>    block/pcache: skip readahead for non-sequential requests
>    block/pcache: add pcache skip large aio read
>    block/pcache: add pcache node assert
>    block/pcache: implement pcache error handling of aio cb
>    block/pcache: add write through node
>    block/pcache: drop used pcache node
>
>   block/Makefile.objs             |    1 +
>   block/pcache.c                  | 1224 +++++++++++++++++++++++++++++++++++++++
>   include/qemu/atomic.h           |    8 +
>   include/qemu/rbtree.h           |  109 ++++
>   include/qemu/rbtree_augmented.h |  237 ++++++++
>   util/Makefile.objs              |    1 +
>   util/rbtree.c                   |  570 ++++++++++++++++++
>   7 files changed, 2150 insertions(+)
>   create mode 100644 block/pcache.c
>   create mode 100644 include/qemu/rbtree.h
>   create mode 100644 include/qemu/rbtree_augmented.h
>   create mode 100644 util/rbtree.c
>

^ permalink raw reply	[flat|nested] 43+ messages in thread

* Re: [Qemu-devel] [PATCH RFC v2 05/22] block/pcache: add aio requests into cache
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 05/22] block/pcache: add aio requests into cache Pavel Butsykin
@ 2016-09-01 15:28   ` Kevin Wolf
  2016-09-06 16:54     ` Pavel Butsykin
  0 siblings, 1 reply; 43+ messages in thread
From: Kevin Wolf @ 2016-09-01 15:28 UTC (permalink / raw)
  To: Pavel Butsykin
  Cc: qemu-block, qemu-devel, mreitz, stefanha, den, jsnow, eblake, famz

Am 29.08.2016 um 19:10 hat Pavel Butsykin geschrieben:
> For storing requests use an rbtree, here are add basic operations on the
> rbtree to work with  cache nodes.
> 
> Signed-off-by: Pavel Butsykin <pbutsykin@virtuozzo.com>
> ---
>  block/pcache.c | 190 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
>  1 file changed, 189 insertions(+), 1 deletion(-)
> 
> diff --git a/block/pcache.c b/block/pcache.c
> index 7f221d6..f5022f9 100644
> --- a/block/pcache.c
> +++ b/block/pcache.c
> @@ -27,6 +27,7 @@
>  #include "block/raw-aio.h"
>  #include "qapi/error.h"
>  #include "qapi/qmp/qstring.h"
> +#include "qemu/rbtree.h"
>  
>  #define PCACHE_DEBUG
>  
> @@ -37,9 +38,53 @@
>  #define DPRINTF(fmt, ...) do { } while (0)
>  #endif
>  
> +typedef struct RbNodeKey {
> +    uint64_t    num;
> +    uint32_t    size;
> +} RbNodeKey;
> +
> +typedef struct BlockNode {
> +    struct RbNode               rb_node;
> +    union {
> +        RbNodeKey               key;
> +        struct {
> +            uint64_t            sector_num;
> +            uint32_t            nb_sectors;
> +        };
> +    };

What's the deal with this union?

It just adds an alias, and 'sector_num'/'nb_sectors' are actually longer
to type than 'key.num' and 'key.size', so the only advantage that I
could image doesn't really apply.

But it brings problems: We always have to be careful to keep the two
structs in sync, and grepping for the field name will only bring up half
of the users because the other half uses the other alias.

> +    QTAILQ_ENTRY(BlockNode) entry;
> +} BlockNode;
> +
> +typedef struct PCNode {
> +    BlockNode cm;
> +
> +    uint8_t                  *data;
> +} PCNode;

What do 'PC' and 'cm' mean?

> +typedef struct ReqStor {
> +    struct {
> +        struct RbRoot root;
> +        CoMutex       lock;
> +    } tree;
> +
> +    uint32_t curr_size;
> +} ReqStor;

Same question for ReqStor. For an identifier that is used only three
times, it could be a bit more descriptive.

What unit has curr_size or what does it count? The nodes in the tree?
Also, cur_ seems to be more common as a prefix than curr_.

> +typedef struct BDRVPCacheState {
> +    BlockDriverState **bs;

This is unused. (Good thing, it looks weird.)

> +    ReqStor pcache;
> +
> +    struct {
> +        QTAILQ_HEAD(pcache_head, BlockNode) head;
> +        CoMutex lock;
> +    } list;
> +} BDRVPCacheState;
> +
>  typedef struct PrefCacheAIOCB {
>      BlockAIOCB common;
>  
> +    BDRVPCacheState *s;

Not really needed, you already have acb->common.bs.

>      QEMUIOVector *qiov;
>      uint64_t sector_num;
>      uint32_t nb_sectors;
> @@ -64,6 +109,124 @@ static QemuOptsList runtime_opts = {
>      },
>  };
>  
> +#define PCNODE(_n) ((PCNode *)(_n))

container_of() would be preferable for type safety.

> +static int pcache_key_cmp(const RbNodeKey *key1, const RbNodeKey *key2)
> +{
> +    assert(key1 != NULL);
> +    assert(key2 != NULL);
> +
> +    if (key1->num >= key2->num + key2->size) {
> +        return 1;
> +    }
> +    if (key1->num + key1->size <= key2->num) {
> +        return -1;
> +    }
> +
> +    return 0;
> +}
> +
> +static void *node_insert(struct RbRoot *root, BlockNode *node)
> +{
> +    struct RbNode **new = &(root->rb_node), *parent = NULL;
> +
> +    /* Figure out where to put new node */
> +    while (*new) {
> +        BlockNode *this = container_of(*new, BlockNode, rb_node);
> +        int result = pcache_key_cmp(&node->key, &this->key);
> +        if (result == 0) {
> +            return this;
> +        }
> +        parent = *new;
> +        new = result < 0 ? &((*new)->rb_left) : &((*new)->rb_right);
> +    }
> +    /* Add new node and rebalance tree. */
> +    rb_link_node(&node->rb_node, parent, new);
> +    rb_insert_color(&node->rb_node, root);
> +
> +    return node;
> +}
> +
> +static inline PCNode *pcache_node_insert(struct RbRoot *root, PCNode *node)
> +{
> +    return node_insert(root, &node->cm);
> +}
> +
> +static inline void pcache_node_free(PCNode *node)
> +{
> +    g_free(node->data);
> +    g_slice_free1(sizeof(*node), node);
> +}

We moved away from g_slice_* because it turned out that it hurt more
than it helped.

> +static inline void *pcache_node_alloc(RbNodeKey* key)
> +{
> +    PCNode *node = g_slice_alloc(sizeof(*node));
> +
> +    node->cm.sector_num = key->num;
> +    node->cm.nb_sectors = key->size;

In other words, node->cm.key = *key;

> +    node->data = g_malloc(node->cm.nb_sectors << BDRV_SECTOR_BITS);
> +
> +    return node;
> +}
> +
> +static bool pcache_node_find_and_create(PrefCacheAIOCB *acb, RbNodeKey *key,
> +                                        PCNode **out_node)
> +{
> +    BDRVPCacheState *s = acb->s;
> +    PCNode *new_node = pcache_node_alloc(key);
> +    PCNode *found;
> +
> +    qemu_co_mutex_lock(&s->pcache.tree.lock);
> +    found = pcache_node_insert(&s->pcache.tree.root, new_node);
> +    qemu_co_mutex_unlock(&s->pcache.tree.lock);

pcache_node_insert() doesn't yield, so the CoMutex is unnecessary.

> +    if (found != new_node) {
> +        pcache_node_free(new_node);

Isn't it a bit wasteful to allocate a new node just in case and then
immediately free it again if it turns out that we don't need it?

> +        *out_node = found;
> +        return false;
> +    }
> +    atomic_add(&s->pcache.curr_size, new_node->cm.nb_sectors);

atomic_add() implies that you have concurrent threads. I don't see any.

> +    qemu_co_mutex_lock(&s->list.lock);
> +    QTAILQ_INSERT_HEAD(&s->list.head, &new_node->cm, entry);
> +    qemu_co_mutex_unlock(&s->list.lock);

Same here as above, QTAILQ_INSERT_HEAD doesn't yield.

> +    *out_node = new_node;
> +    return true;
> +}
> +
> +static inline void prefetch_init_key(PrefCacheAIOCB *acb, RbNodeKey* key)
> +{
> +    key->num = acb->sector_num;
> +    key->size = acb->nb_sectors;
> +}
> +
> +enum {
> +    PREFETCH_NEW_NODE  = 0,
> +    PREFETCH_FULL_UP   = 1,
> +    PREFETCH_PART_UP   = 2
> +};
> +
> +static int32_t pcache_prefetch(PrefCacheAIOCB *acb)
> +{
> +    RbNodeKey key;
> +    PCNode *node = NULL;
> +
> +    prefetch_init_key(acb, &key);
> +    if (pcache_node_find_and_create(acb, &key, &node)) {
> +        return PREFETCH_NEW_NODE;
> +    }
> +
> +    /* Node covers the whole request */
> +    if (node->cm.sector_num <= acb->sector_num &&
> +        node->cm.sector_num + node->cm.nb_sectors >= acb->sector_num +
> +                                                     acb->nb_sectors)
> +    {
> +        return PREFETCH_FULL_UP;
> +    }
> +
> +    return PREFETCH_PART_UP;
> +}
> +
>  static void pcache_aio_cb(void *opaque, int ret)
>  {
>      PrefCacheAIOCB *acb = opaque;
> @@ -80,6 +243,7 @@ static PrefCacheAIOCB *pcache_aio_get(BlockDriverState *bs, int64_t sector_num,
>  {
>      PrefCacheAIOCB *acb = qemu_aio_get(&pcache_aiocb_info, bs, cb, opaque);
>  
> +    acb->s = bs->opaque;
>      acb->sector_num = sector_num;
>      acb->nb_sectors = nb_sectors;
>      acb->qiov = qiov;
> @@ -99,6 +263,8 @@ static BlockAIOCB *pcache_aio_readv(BlockDriverState *bs,
>      PrefCacheAIOCB *acb = pcache_aio_get(bs, sector_num, qiov, nb_sectors, cb,
>                                           opaque, QEMU_AIO_READ);
>  
> +    pcache_prefetch(acb);
> +
>      bdrv_aio_readv(bs->file, sector_num, qiov, nb_sectors,
>                     pcache_aio_cb, acb);
>      return &acb->common;
> @@ -119,6 +285,17 @@ static BlockAIOCB *pcache_aio_writev(BlockDriverState *bs,
>      return &acb->common;
>  }
>  
> +static void pcache_state_init(QemuOpts *opts, BDRVPCacheState *s)
> +{
> +    DPRINTF("pcache configure:\n");
> +
> +    s->pcache.tree.root = RB_ROOT;
> +    qemu_co_mutex_init(&s->pcache.tree.lock);
> +    QTAILQ_INIT(&s->list.head);
> +    qemu_co_mutex_init(&s->list.lock);

QTAILQ_INIT() doesn't yield.

> +    s->pcache.curr_size = 0;
> +}
> +

Kevin

^ permalink raw reply	[flat|nested] 43+ messages in thread

* Re: [Qemu-devel] [PATCH RFC v2 07/22] block/pcache: introduce LRU as method of memory
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 07/22] block/pcache: introduce LRU as method of memory Pavel Butsykin
@ 2016-09-02  8:49   ` Kevin Wolf
  0 siblings, 0 replies; 43+ messages in thread
From: Kevin Wolf @ 2016-09-02  8:49 UTC (permalink / raw)
  To: Pavel Butsykin
  Cc: qemu-block, qemu-devel, mreitz, stefanha, den, jsnow, eblake, famz

Am 29.08.2016 um 19:10 hat Pavel Butsykin geschrieben:
> This is a simple solution to the problem of displacement of cache memory.
> The LRU can be useful to avoid the displacement of the nodes, which have
> been partially read.
> 
> Signed-off-by: Pavel Butsykin <pbutsykin@virtuozzo.com>
> ---
>  block/pcache.c | 74 +++++++++++++++++++++++++++++++++++++++++++++-------------
>  1 file changed, 58 insertions(+), 16 deletions(-)
> 
> diff --git a/block/pcache.c b/block/pcache.c
> index 54d4526..7504db8 100644
> --- a/block/pcache.c
> +++ b/block/pcache.c
> @@ -67,6 +67,11 @@ typedef struct ReqStor {
>          CoMutex       lock;
>      } tree;
>  
> +    struct {
> +        QTAILQ_HEAD(lru_head, BlockNode) list;
> +        CoMutex lock;

This is another lock that doesn't do anything.

> +    } lru;
> +
>      uint32_t curr_size;
>  } ReqStor;

Kevin

^ permalink raw reply	[flat|nested] 43+ messages in thread

* Re: [Qemu-devel] [PATCH RFC v2 08/22] block/pcache: implement pickup parts of the cache
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 08/22] block/pcache: implement pickup parts of the cache Pavel Butsykin
@ 2016-09-02  8:59   ` Kevin Wolf
  2016-09-08 12:29     ` Pavel Butsykin
  0 siblings, 1 reply; 43+ messages in thread
From: Kevin Wolf @ 2016-09-02  8:59 UTC (permalink / raw)
  To: Pavel Butsykin
  Cc: qemu-block, qemu-devel, mreitz, stefanha, den, jsnow, eblake, famz

Am 29.08.2016 um 19:10 hat Pavel Butsykin geschrieben:
> Implementation of obtaining fragments of the cache belonging to one area
> of request. This will allow to handle the case when a request is partially
> hits the cache.
> 
> Signed-off-by: Pavel Butsykin <pbutsykin@virtuozzo.com>

> +static void pcache_pickup_parts_of_cache(PrefCacheAIOCB *acb, PCNode *node,
> +                                         uint64_t num, uint32_t size)
> +{
> +    uint32_t up_size;
> +
> +    do {
> +        if (num < node->cm.sector_num) {
> +            PCNode *new_node;
> +            RbNodeKey lc_key = {
> +                .num = num,
> +                .size = node->cm.sector_num - num,
> +            };
> +            up_size = lc_key.size;
> +
> +            if (!pcache_node_find_and_create(acb, &lc_key, &new_node)) {
> +                node = new_node;
> +                continue;
> +            }

We're creating additional nodes here; and we need them because they have
their own status. But once the read has completed, wouldn't it make
sense to merge all adjacent nodes in NODE_SUCCESS_STATUS?

Kevin

^ permalink raw reply	[flat|nested] 43+ messages in thread

* Re: [Qemu-devel] [PATCH RFC v2 09/22] block/pcache: separation AIOCB on requests
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 09/22] block/pcache: separation AIOCB on requests Pavel Butsykin
@ 2016-09-02  9:10   ` Kevin Wolf
  2016-09-08 15:47     ` Pavel Butsykin
  0 siblings, 1 reply; 43+ messages in thread
From: Kevin Wolf @ 2016-09-02  9:10 UTC (permalink / raw)
  To: Pavel Butsykin
  Cc: qemu-block, qemu-devel, mreitz, stefanha, den, jsnow, eblake, famz

Am 29.08.2016 um 19:10 hat Pavel Butsykin geschrieben:
> for case when the cache partially covers request we are part of the request
> is filled from the cache, and the other part request from disk. Also add
> reference counting for nodes, as way to maintain multithreading.
> 
> There is still no full synchronization in multithreaded mode.
> 
> Signed-off-by: Pavel Butsykin <pbutsykin@virtuozzo.com>
> ---
>  block/pcache.c | 169 ++++++++++++++++++++++++++++++++++++++++++++++++++++-----
>  1 file changed, 155 insertions(+), 14 deletions(-)
> 
> diff --git a/block/pcache.c b/block/pcache.c
> index 28bd056..6114289 100644
> --- a/block/pcache.c
> +++ b/block/pcache.c
> @@ -58,7 +58,10 @@ typedef struct BlockNode {
>  typedef struct PCNode {
>      BlockNode cm;
>  
> +    uint32_t                 status;

I guess this is NODE_*_STATUS. Make it a named enum then instead of
uint32_t so that it's obvious what this field means.

> +    uint32_t                 ref;
>      uint8_t                  *data;
> +    CoMutex                  lock;
>  } PCNode;
>  
>  typedef struct ReqStor {
> @@ -95,9 +98,23 @@ typedef struct PrefCacheAIOCB {
>      uint64_t sector_num;
>      uint32_t nb_sectors;
>      int      aio_type;
> +    struct {
> +        QTAILQ_HEAD(req_head, PrefCachePartReq) list;
> +        CoMutex lock;
> +    } requests;
>      int      ret;
>  } PrefCacheAIOCB;
>  
> +typedef struct PrefCachePartReq {
> +    uint64_t sector_num;
> +    uint32_t nb_sectors;

Should be byte-based, like everything.

> +    QEMUIOVector qiov;
> +    PCNode *node;
> +    PrefCacheAIOCB *acb;
> +    QTAILQ_ENTRY(PrefCachePartReq) entry;
> +} PrefCachePartReq;
> +
>  static const AIOCBInfo pcache_aiocb_info = {
>      .aiocb_size = sizeof(PrefCacheAIOCB),
>  };
> @@ -126,8 +143,39 @@ static QemuOptsList runtime_opts = {
>  #define MB_BITS 20
>  #define PCACHE_DEFAULT_CACHE_SIZE (4 << MB_BITS)
>  
> +enum {
> +    NODE_SUCCESS_STATUS = 0,
> +    NODE_WAIT_STATUS    = 1,
> +    NODE_REMOVE_STATUS  = 2,
> +    NODE_GHOST_STATUS   = 3 /* only for debugging */

NODE_DELETED_STATUS?

> +};
> +
>  #define PCNODE(_n) ((PCNode *)(_n))
>  
> +static inline void pcache_node_unref(PCNode *node)
> +{
> +    assert(node->status == NODE_SUCCESS_STATUS ||
> +           node->status == NODE_REMOVE_STATUS);
> +
> +    if (atomic_fetch_dec(&node->ref) == 0) {

Atomics imply concurrency, which we don't have.

> +        assert(node->status == NODE_REMOVE_STATUS);
> +
> +        node->status = NODE_GHOST_STATUS;
> +        g_free(node->data);
> +        g_slice_free1(sizeof(*node), node);

When you switch to plain g_malloc(), this needs to be updated.

> +    }
> +}
> +
> +static inline PCNode *pcache_node_ref(PCNode *node)
> +{
> +    assert(node->status == NODE_SUCCESS_STATUS ||
> +           node->status == NODE_WAIT_STATUS);
> +    assert(atomic_read(&node->ref) == 0);/* XXX: only for sequential requests */
> +    atomic_inc(&node->ref);

Do you expect concurrent accesses or not? Because if you don't, there is
no need for atomics, but if you do, this is buggy because each of the
lines is atomic for itself, but the assertion isn't atomic with the
refcount increment.

A ref() function that can take only a single reference feels odd anyway
and this restriction seems to be lifted later. Why have it here?

> +
> +    return node;
> +}

Kevin

^ permalink raw reply	[flat|nested] 43+ messages in thread

* Re: [Qemu-devel] [PATCH RFC v2 00/22] I/O prefetch cache
  2016-09-01 14:17 ` [Qemu-devel] [PATCH RFC v2 00/22] I/O prefetch cache Kevin Wolf
@ 2016-09-06 12:36   ` Pavel Butsykin
  0 siblings, 0 replies; 43+ messages in thread
From: Pavel Butsykin @ 2016-09-06 12:36 UTC (permalink / raw)
  To: Kevin Wolf
  Cc: qemu-block, qemu-devel, mreitz, stefanha, den, jsnow, eblake, famz

On 01.09.2016 17:17, Kevin Wolf wrote:
> Am 29.08.2016 um 19:09 hat Pavel Butsykin geschrieben:
>> The prefetch cache aims to improve the performance of sequential read data.
>> Of most interest here are the requests of a small size of data for sequential
>> read, such requests can be optimized by extending them and moving into
>> the prefetch cache.
>> [...]
>
> Before I start actually looking into your code, I read both this cover
> letter and your KVM Forum slides, and as far as I can say, the
> fundamental idea and your design look sound to me. It was a good read,
> too, so thanks for writing all the explanations!
>
Thank you!

> One thing that came to mind is that we have more caches elsewhere, most
> notably the qcow2 metadata cache, and I still have that private branch
> that adds a qcow2 data cache, too (for merging small allocating writes,
> if you remember my talk from last year). However, the existing
> Qcow2Cache has a few problems like being tied to the cluster size.
>
> So I wondered how hard you think it would be to split pcache into a
> reusable cache core that just manages the contents based on calls like
> "allocate/drop/get cached memory for bytes x...y", and the actual
> pcache code that implements the read-ahead policy. Then qcow2 could
> reuse the core and use its own policy about what metadata to cache etc.
>
I think it's a good idea, and at first glance, this seems achievable.
But if we consider more I need a little more information about the
interfaces that can use your private cache. Probably for all operations
that require increase or decrease the reference count on the object
memory will need to make interfaces which will be sufficient to
implement the read-ahead policy. But in generally, the separation of
memory resources and different policies over the cache memory seems
very correct decision.

> Of course, this can be done incrementally on top and should by no means
> block the inclusion of your code, but if it's possible, it might be an
> interesting thought to keep in mind.
>
I like the idea, so I'm ready to work on more effective solutions. For
a start, it would be nice to designate the interfaces of the cache core.
The next version I could build, based on anticipated interfaces. Also I
might suggest these interfaces in the next version of pcache, your
requirements to the interfaces might help me for this.

> Kevin
>

^ permalink raw reply	[flat|nested] 43+ messages in thread

* Re: [Qemu-devel] [PATCH RFC v2 00/22] I/O prefetch cache
  2016-09-01 15:26 ` Avi Kivity
@ 2016-09-06 12:40   ` Pavel Butsykin
  0 siblings, 0 replies; 43+ messages in thread
From: Pavel Butsykin @ 2016-09-06 12:40 UTC (permalink / raw)
  To: Avi Kivity, qemu-block, qemu-devel
  Cc: kwolf, famz, mreitz, stefanha, den, jsnow

On 01.09.2016 18:26, Avi Kivity wrote:
> On 08/29/2016 08:09 PM, Pavel Butsykin wrote:
>> The prefetch cache aims to improve the performance of sequential read
>> data.
>> Of most interest here are the requests of a small size of data for
>> sequential
>> read, such requests can be optimized by extending them and moving into
>> the prefetch cache. However, there are 2 issues:
>>   - In aggregate only a small portion of requests is sequential, so
>> delays caused
>>     by the need to read more volumes of data will lead to an overall
>> decrease
>>     in performance.
>>   - The presence of redundant data in the cache memory with a large
>> number of
>>     random requests.
>> This pcache implementation solves the above and other problems
>> prefetching data.
>> The pcache algorithm can be summarised by the following main steps.
>>
>> 1. Monitor I/O requests to identify typical sequences.
>> This implementation of prefetch cache works at the storage system
>> level and has
>> information only about the physical block addresses of I/O requests.
>> Statistics
>> are collected only from read requests to a maximum size of 32kb(by
>> default),
>> each request that matches the criteria falls into a pool of requests.
>> In order
>> to store requests statistic used by the rb-tree(lreq.tree), it's
>> simple but for
>> this issue a quite efficient data structure.
>>
>> 2. Identifying sequential I/O streams.
>> For each read request to be carried out attempting to lift the chain
>> sequence
>> from lreq.tree, where this request will be element of a sequential
>> chain of
>> requests. The key to search for consecutive requests is the area of
>> sectors
>> preceding the current request. The size of this area should not be too
>> small to
>> avoid false readahead. The sequential stream data requests can be
>> identified
>> even when a large number of random requests. For example, if there is
>> access to
>> the blocks 100, 1157, 27520, 4, 101, 312, 1337, 102, in the context of
>> request
>> processing 102 will be identified the chain of sequential requests
>> 100, 101. 102
>> and then should a decision be made to do readahead. Also a situation
>> may arise
>> when multiple applications A, B, C simultaneously perform sequential
>> read of
>> data. For each separate application that will be sequential read data
>> A(100, 101, 102), B(300, 301, 302), C(700, 701, 702), but for block
>> devices it
>> may look like a random data reading: 100,300,700,101,301,701,102,302,702.
>> In this case, the sequential streams will also be recognised because
>> location
>> requests in the rb-tree will allow to separate the sequential I/O
>> streams.
>>
>> 3. Do the readahead into the cache for recognized sequential data
>> streams.
>> After the issue of the detection of pcache case was resolved, need
>> using larger
>> requests to bring data into the cache. In this implementation the
>> pcache used
>> readahead instead of the extension request, therefore the request goes
>> as is.
>> There is not any reason to put data in the cache that will never be
>> picked up,
>> but this will always happen in the case of extension requests. In
>> order to store
>> areas of cached blocks is also used by the rb-tree(pcache.tree), it's
>> simple but
>> for this issue a quite efficient data structure.
>>
>> 4. Control size of the prefetch cache pool and the requests statistic
>> pool
>> For control the border of the pool statistic of requests, the data of
>> requests
>> are placed and replaced according to the FIFO principle, everything is
>> simple.
>> For control the boundaries of the memory cache used LRU list, it
>> allows to limit
>> the max amount memory that we can allocate for pcache. But the LRU is
>> there
>> mainly to prevent displacement of the cache blocks that was read
>> partially.
>> The main way the memory is pushed out immediately after use, as soon
>> as a chunk
>> of memory from the cache has been completely read, since the
>> probability of
>> repetition of the request is very low. Cases when one and the same
>> portion of
>> the cache memory has been read several times are not optimized and do
>> not apply
>> to the cases that can optimize the pcache. Thus, using a cache memory
>> of small
>> volume, by the optimization of the operations read-ahead and clear
>> memory, we
>> can read entire volumes of data, providing a 100% cache hit. Also does
>> not
>> decrease the effectiveness of random read requests.
>>
>> PCache is implemented as a qemu block filter driver, has some
>> configurable
>> parameters, such as: total cache size, readahead size, maximum size of
>> block
>> that can be processed.
>>
>> For performance evaluation has been used several test cases with
>> different
>> sequential and random read data on SSD disk. Here are the results of
>> tests and
>> qemu parameters:
>>
>> qemu parameters:
>> -M pc-i440fx-2.4 --enable-kvm -smp 4 -m 1024
>> -drive
>> file=centos7.qcow2,if=none,id=drive-virtio-disk0,format=qcow2,cache=none,
>>         aio=native,pcache-full-size=4MB,pcache-readahead-size=128KB,
>>         pcache-max-aio-size=32KB
>> -device
>> virtio-blk-pci,scsi=off,bus=pci.0,addr=0x8,drive=drive-virtio-disk0,
>>          id=virtio-disk0
>> (-set device.virtio-disk0.x-data-plane=on)
>>
>> ********************************************************************************
>>
>> * Testcase                        * Results in
>> iops                            *
>> *
>> **********************************************
>> *                                 * clean qemu   * pcache       *
>> x-data-plane *
>> ********************************************************************************
>>
>> * Create/open 16 file(s) of total * 25514 req/s  * 85659 req/s  *
>> 28249 req/s  *
>> * size 2048.00 MB named           * 25692 req/s  * 89064 req/s  *
>> 27950 req/s  *
>> * /tmp/tmp.tmp, start 4 thread(s) * 25836 req/s  * 84142 req/s  *
>> 28120 req/s  *
>> * and do uncached sequential read *              *
>> *              *
>> * by 4KB blocks                   *              *
>> *              *
>> ********************************************************************************
>>
>> * Create/open 16 file(s) of total * 56006 req/s  * 92137 req/s  *
>> 56992 req/s  *
>> * size 2048.00 MB named           * 55335 req/s  * 92269 req/s  *
>> 57023 req/s  *
>> * /tmp/tmp.tmp, start 4 thread(s) * 55731 req/s  * 98722 req/s  *
>> 56593 req/s  *
>> * and do uncached sequential read *              *
>> *              *
>> * by 4KB blocks with constant     *              *
>> *              *
>> ********************************************************************************
>>
>> * Create/open 16 file(s) of total * 14104 req/s  * 14164 req/s  *
>> 13914 req/s  *
>> * size 2048.00 MB named           * 14130 req/s  * 14232 req/s  *
>> 13613 req/s  *
>> * /tmp/tmp.tmp, start 4 thread(s) * 14183 req/s  * 14080 req/s  *
>> 13374 req/s  *
>> * and do uncached random read by  *              *
>> *              *
>> * 4KB blocks                      *              *
>> *              *
>> ********************************************************************************
>>
>> * Create/open 16 file(s) of total * 23480 req/s  * 23483 req/s  *
>> 20887 req/s  *
>> * size 2048.00 MB named           * 23070 req/s  * 22432 req/s  *
>> 21127 req/s  *
>> * /tmp/tmp.tmp, start 4 thread(s) * 24090 req/s  * 23499 req/s  *
>> 23415 req/s  *
>> * and do uncached random read by  *              *
>> *              *
>> * 4KB blocks with constant queue  *              *
>> *              *
>> * len 32                          *              *
>> *              *
>> ********************************************************************************
>>
>
>
> I note, in your tests, you use uncached sequential reads.  But are
> uncached sequential reads with a small block size common?
>
> Consider the case of cached sequential reads.  Here, the guest OS will
> issue read-aheads.  pcache will detect them and issue its own
> read-aheads, both layers will read ahead more than necessary, so pcache
> is adding extra I/O and memory copies here.
>
Yes, guests can have their own read-ahead cache, but pcache in this
case doesn't lead to excessive activity, because the first guest
read-ahead request hit in the pcache memory, and the next read-ahead
requests will be filtered out on the side of pcache. This is only for
the same size window, but if the window size is different, then a
concurrent read-ahead request will never happen. Even if simultaneous
read-ahead request can leads to extra I/O, it is only a problem of
pcache implementation.

> So I'm wondering about the use case.  Guest userspace applications which
> do uncached reads will typically manage their own read-ahead; and cached
> reads have the kernel reading ahead for them, with the benefit of
> knowing the file layout.  That leaves dd iflag=direct, but is it such an
> important application?
>
It helps with live loads on Windows. A simple example, Windows
boot(win8.1 1024-RAM), even with enabled Windows Prefetcher leads to
reading about 300MB from pcache memory. It should be understood that
pcache is designed for optimizing the guest's behaviour as a whole and
not any apps inside. Guest read-ahead is tied to fd, and aimed at
optimizing userspace application, but pcache is several levels above
that allows us to cover other cases. Another example is walking a
directory tree. This effect happens because, when traversing a
directory tree, there big chance that some fs blocks can be placed
sequentially. But in generally, pcache helps to reduce latency under
high load for Windows VMs.

>> TODO list:
>> - add tracepoints
>> - add migration support
>> - add more explanations in the commit messages
>> - get rid of the additional allocation in
>> pcache_node_find_and_create() and
>>    pcache_aio_readv()
>>
>> Changes from v1:
>> - Fix failed automatic build test (11)
>>
>> Pavel Butsykin (22):
>>    block/pcache: empty pcache driver filter
>>    block/pcache: add own AIOCB block
>>    util/rbtree: add rbtree from linux kernel
>>    block/pcache: add pcache debug build
>>    block/pcache: add aio requests into cache
>>    block/pcache: restrict cache size
>>    block/pcache: introduce LRU as method of memory
>>    block/pcache: implement pickup parts of the cache
>>    block/pcache: separation AIOCB on requests
>>    block/pcache: add check node leak
>>    add QEMU style defines for __sync_add_and_fetch
>>    block/pcache: implement read cache to qiov and drop node during aio
>>      write
>>    block/pcache: add generic request complete
>>    block/pcache: add support for rescheduling requests
>>    block/pcache: simple readahead one chunk forward
>>    block/pcache: pcache readahead node around
>>    block/pcache: skip readahead for non-sequential requests
>>    block/pcache: add pcache skip large aio read
>>    block/pcache: add pcache node assert
>>    block/pcache: implement pcache error handling of aio cb
>>    block/pcache: add write through node
>>    block/pcache: drop used pcache node
>>
>>   block/Makefile.objs             |    1 +
>>   block/pcache.c                  | 1224
>> +++++++++++++++++++++++++++++++++++++++
>>   include/qemu/atomic.h           |    8 +
>>   include/qemu/rbtree.h           |  109 ++++
>>   include/qemu/rbtree_augmented.h |  237 ++++++++
>>   util/Makefile.objs              |    1 +
>>   util/rbtree.c                   |  570 ++++++++++++++++++
>>   7 files changed, 2150 insertions(+)
>>   create mode 100644 block/pcache.c
>>   create mode 100644 include/qemu/rbtree.h
>>   create mode 100644 include/qemu/rbtree_augmented.h
>>   create mode 100644 util/rbtree.c
>>
>

^ permalink raw reply	[flat|nested] 43+ messages in thread

* Re: [Qemu-devel] [PATCH RFC v2 01/22] block/pcache: empty pcache driver filter
  2016-09-01 14:31   ` Kevin Wolf
@ 2016-09-06 15:20     ` Pavel Butsykin
  0 siblings, 0 replies; 43+ messages in thread
From: Pavel Butsykin @ 2016-09-06 15:20 UTC (permalink / raw)
  To: Kevin Wolf
  Cc: qemu-block, qemu-devel, mreitz, stefanha, den, jsnow, eblake, famz

On 01.09.2016 17:31, Kevin Wolf wrote:
> Am 29.08.2016 um 19:10 hat Pavel Butsykin geschrieben:
>> The basic version of pcache driver for easy preparation of a patch set.
>>
>> Signed-off-by: Pavel Butsykin <pbutsykin@virtuozzo.com>
>
>> +    .bdrv_aio_readv                     = pcache_aio_readv,
>> +    .bdrv_aio_writev                    = pcache_aio_writev,
>
> Can you please use .bdrv_co_preadv/pwritev instead and make everything
> based on bytes rather than sectors?
>
> Internally you can still spawn AIO requests to achieve the same
> parallelism as you have now (we'll just need new byte-based
> bdrv_co_aio_prw_vector() wrappers, but the functionality is there) and I
> don't think making the outer layer coroutine based would be too hard. In
> fact it might even simplify some code.
>
Of course, I just wrote pcache when new byte-based interfaces have not
yet been implemented.

> Kevin
>

^ permalink raw reply	[flat|nested] 43+ messages in thread

* Re: [Qemu-devel] [PATCH RFC v2 05/22] block/pcache: add aio requests into cache
  2016-09-01 15:28   ` Kevin Wolf
@ 2016-09-06 16:54     ` Pavel Butsykin
  2016-09-06 17:07       ` Kevin Wolf
  0 siblings, 1 reply; 43+ messages in thread
From: Pavel Butsykin @ 2016-09-06 16:54 UTC (permalink / raw)
  To: Kevin Wolf
  Cc: qemu-block, qemu-devel, mreitz, stefanha, den, jsnow, eblake, famz

On 01.09.2016 18:28, Kevin Wolf wrote:
> Am 29.08.2016 um 19:10 hat Pavel Butsykin geschrieben:
>> For storing requests use an rbtree, here are add basic operations on the
>> rbtree to work with  cache nodes.
>>
>> Signed-off-by: Pavel Butsykin <pbutsykin@virtuozzo.com>
>> ---
>>   block/pcache.c | 190 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
>>   1 file changed, 189 insertions(+), 1 deletion(-)
>>
>> diff --git a/block/pcache.c b/block/pcache.c
>> index 7f221d6..f5022f9 100644
>> --- a/block/pcache.c
>> +++ b/block/pcache.c
>> @@ -27,6 +27,7 @@
>>   #include "block/raw-aio.h"
>>   #include "qapi/error.h"
>>   #include "qapi/qmp/qstring.h"
>> +#include "qemu/rbtree.h"
>>
>>   #define PCACHE_DEBUG
>>
>> @@ -37,9 +38,53 @@
>>   #define DPRINTF(fmt, ...) do { } while (0)
>>   #endif
>>
>> +typedef struct RbNodeKey {
>> +    uint64_t    num;
>> +    uint32_t    size;
>> +} RbNodeKey;
>> +
>> +typedef struct BlockNode {
>> +    struct RbNode               rb_node;
>> +    union {
>> +        RbNodeKey               key;
>> +        struct {
>> +            uint64_t            sector_num;
>> +            uint32_t            nb_sectors;
>> +        };
>> +    };
>
> What's the deal with this union?
>
> It just adds an alias, and 'sector_num'/'nb_sectors' are actually longer
> to type than 'key.num' and 'key.size', so the only advantage that I
> could image doesn't really apply.
>
Yes, you're right. Initially this was done to reduce the code in those
places where the name change will not confuse. But later it turned out
that such places are few.

> But it brings problems: We always have to be careful to keep the two
> structs in sync, and grepping for the field name will only bring up half
> of the users because the other half uses the other alias.
>
Yep, relative to the rest of the Qemu code it looks unusual, and given
the small benefits it is need to remove :)

>> +    QTAILQ_ENTRY(BlockNode) entry;
>> +} BlockNode;
>> +
>> +typedef struct PCNode {
>> +    BlockNode cm;
>> +
>> +    uint8_t                  *data;
>> +} PCNode;
>
> What do 'PC' and 'cm' mean?
>
PC - PrefetchCache, cm - common.

But I think it would be better to fix it:

PrefetchCache - PrefCacheNode, common - common.

>> +typedef struct ReqStor {
>> +    struct {
>> +        struct RbRoot root;
>> +        CoMutex       lock;
>> +    } tree;
>> +
>> +    uint32_t curr_size;
>> +} ReqStor;
>
> Same question for ReqStor. For an identifier that is used only three
> times, it could be a bit more descriptive.
>
Storage requests

> What unit has curr_size or what does it count? The nodes in the tree?

This is the current size of the storage requests.

> Also, cur_ seems to be more common as a prefix than curr_.
>
OK

>> +typedef struct BDRVPCacheState {
>> +    BlockDriverState **bs;
>
> This is unused. (Good thing, it looks weird.)
>
Apparently I forgot to remove it.

>> +    ReqStor pcache;
>> +
>> +    struct {
>> +        QTAILQ_HEAD(pcache_head, BlockNode) head;
>> +        CoMutex lock;
>> +    } list;
>> +} BDRVPCacheState;
>> +
>>   typedef struct PrefCacheAIOCB {
>>       BlockAIOCB common;
>>
>> +    BDRVPCacheState *s;
>
> Not really needed, you already have acb->common.bs.
>
>>       QEMUIOVector *qiov;
>>       uint64_t sector_num;
>>       uint32_t nb_sectors;
>> @@ -64,6 +109,124 @@ static QemuOptsList runtime_opts = {
>>       },
>>   };
>>
>> +#define PCNODE(_n) ((PCNode *)(_n))
>
> container_of() would be preferable for type safety.
>
OK

>> +static int pcache_key_cmp(const RbNodeKey *key1, const RbNodeKey *key2)
>> +{
>> +    assert(key1 != NULL);
>> +    assert(key2 != NULL);
>> +
>> +    if (key1->num >= key2->num + key2->size) {
>> +        return 1;
>> +    }
>> +    if (key1->num + key1->size <= key2->num) {
>> +        return -1;
>> +    }
>> +
>> +    return 0;
>> +}
>> +
>> +static void *node_insert(struct RbRoot *root, BlockNode *node)
>> +{
>> +    struct RbNode **new = &(root->rb_node), *parent = NULL;
>> +
>> +    /* Figure out where to put new node */
>> +    while (*new) {
>> +        BlockNode *this = container_of(*new, BlockNode, rb_node);
>> +        int result = pcache_key_cmp(&node->key, &this->key);
>> +        if (result == 0) {
>> +            return this;
>> +        }
>> +        parent = *new;
>> +        new = result < 0 ? &((*new)->rb_left) : &((*new)->rb_right);
>> +    }
>> +    /* Add new node and rebalance tree. */
>> +    rb_link_node(&node->rb_node, parent, new);
>> +    rb_insert_color(&node->rb_node, root);
>> +
>> +    return node;
>> +}
>> +
>> +static inline PCNode *pcache_node_insert(struct RbRoot *root, PCNode *node)
>> +{
>> +    return node_insert(root, &node->cm);
>> +}
>> +
>> +static inline void pcache_node_free(PCNode *node)
>> +{
>> +    g_free(node->data);
>> +    g_slice_free1(sizeof(*node), node);
>> +}
>
> We moved away from g_slice_* because it turned out that it hurt more
> than it helped.
>
Somewhere I saw it, thanks.

>> +static inline void *pcache_node_alloc(RbNodeKey* key)
>> +{
>> +    PCNode *node = g_slice_alloc(sizeof(*node));
>> +
>> +    node->cm.sector_num = key->num;
>> +    node->cm.nb_sectors = key->size;
>
> In other words, node->cm.key = *key;
>
Yep :)

>> +    node->data = g_malloc(node->cm.nb_sectors << BDRV_SECTOR_BITS);
>> +
>> +    return node;
>> +}
>> +
>> +static bool pcache_node_find_and_create(PrefCacheAIOCB *acb, RbNodeKey *key,
>> +                                        PCNode **out_node)
>> +{
>> +    BDRVPCacheState *s = acb->s;
>> +    PCNode *new_node = pcache_node_alloc(key);
>> +    PCNode *found;
>> +
>> +    qemu_co_mutex_lock(&s->pcache.tree.lock);
>> +    found = pcache_node_insert(&s->pcache.tree.root, new_node);
>> +    qemu_co_mutex_unlock(&s->pcache.tree.lock);
>
> pcache_node_insert() doesn't yield, so the CoMutex is unnecessary.
>
>> +    if (found != new_node) {
>> +        pcache_node_free(new_node);
>
> Isn't it a bit wasteful to allocate a new node just in case and then
> immediately free it again if it turns out that we don't need it?
>
Yes, I know about this problem. I wrote about this in TODO list, but
not very detailed.

>> +        *out_node = found;
>> +        return false;
>> +    }
>> +    atomic_add(&s->pcache.curr_size, new_node->cm.nb_sectors);
>
> atomic_add() implies that you have concurrent threads. I don't see any.
>
Yeah, but what about iothread? Doesn't presence of iothread lead to
parallel handling of requests in multiple threads?

>> +    qemu_co_mutex_lock(&s->list.lock);
>> +    QTAILQ_INSERT_HEAD(&s->list.head, &new_node->cm, entry);
>> +    qemu_co_mutex_unlock(&s->list.lock);
>
> Same here as above, QTAILQ_INSERT_HEAD doesn't yield.
>
>> +    *out_node = new_node;
>> +    return true;
>> +}
>> +
>> +static inline void prefetch_init_key(PrefCacheAIOCB *acb, RbNodeKey* key)
>> +{
>> +    key->num = acb->sector_num;
>> +    key->size = acb->nb_sectors;
>> +}
>> +
>> +enum {
>> +    PREFETCH_NEW_NODE  = 0,
>> +    PREFETCH_FULL_UP   = 1,
>> +    PREFETCH_PART_UP   = 2
>> +};
>> +
>> +static int32_t pcache_prefetch(PrefCacheAIOCB *acb)
>> +{
>> +    RbNodeKey key;
>> +    PCNode *node = NULL;
>> +
>> +    prefetch_init_key(acb, &key);
>> +    if (pcache_node_find_and_create(acb, &key, &node)) {
>> +        return PREFETCH_NEW_NODE;
>> +    }
>> +
>> +    /* Node covers the whole request */
>> +    if (node->cm.sector_num <= acb->sector_num &&
>> +        node->cm.sector_num + node->cm.nb_sectors >= acb->sector_num +
>> +                                                     acb->nb_sectors)
>> +    {
>> +        return PREFETCH_FULL_UP;
>> +    }
>> +
>> +    return PREFETCH_PART_UP;
>> +}
>> +
>>   static void pcache_aio_cb(void *opaque, int ret)
>>   {
>>       PrefCacheAIOCB *acb = opaque;
>> @@ -80,6 +243,7 @@ static PrefCacheAIOCB *pcache_aio_get(BlockDriverState *bs, int64_t sector_num,
>>   {
>>       PrefCacheAIOCB *acb = qemu_aio_get(&pcache_aiocb_info, bs, cb, opaque);
>>
>> +    acb->s = bs->opaque;
>>       acb->sector_num = sector_num;
>>       acb->nb_sectors = nb_sectors;
>>       acb->qiov = qiov;
>> @@ -99,6 +263,8 @@ static BlockAIOCB *pcache_aio_readv(BlockDriverState *bs,
>>       PrefCacheAIOCB *acb = pcache_aio_get(bs, sector_num, qiov, nb_sectors, cb,
>>                                            opaque, QEMU_AIO_READ);
>>
>> +    pcache_prefetch(acb);
>> +
>>       bdrv_aio_readv(bs->file, sector_num, qiov, nb_sectors,
>>                      pcache_aio_cb, acb);
>>       return &acb->common;
>> @@ -119,6 +285,17 @@ static BlockAIOCB *pcache_aio_writev(BlockDriverState *bs,
>>       return &acb->common;
>>   }
>>
>> +static void pcache_state_init(QemuOpts *opts, BDRVPCacheState *s)
>> +{
>> +    DPRINTF("pcache configure:\n");
>> +
>> +    s->pcache.tree.root = RB_ROOT;
>> +    qemu_co_mutex_init(&s->pcache.tree.lock);
>> +    QTAILQ_INIT(&s->list.head);
>> +    qemu_co_mutex_init(&s->list.lock);
>
> QTAILQ_INIT() doesn't yield.
>
>> +    s->pcache.curr_size = 0;
>> +}
>> +
>
> Kevin
>

^ permalink raw reply	[flat|nested] 43+ messages in thread

* Re: [Qemu-devel] [PATCH RFC v2 05/22] block/pcache: add aio requests into cache
  2016-09-06 16:54     ` Pavel Butsykin
@ 2016-09-06 17:07       ` Kevin Wolf
  2016-09-07 16:21         ` Pavel Butsykin
  0 siblings, 1 reply; 43+ messages in thread
From: Kevin Wolf @ 2016-09-06 17:07 UTC (permalink / raw)
  To: Pavel Butsykin
  Cc: qemu-block, qemu-devel, mreitz, stefanha, den, jsnow, eblake, famz

Am 06.09.2016 um 18:54 hat Pavel Butsykin geschrieben:
> >>+        *out_node = found;
> >>+        return false;
> >>+    }
> >>+    atomic_add(&s->pcache.curr_size, new_node->cm.nb_sectors);
> >
> >atomic_add() implies that you have concurrent threads. I don't see any.
> >
> Yeah, but what about iothread? Doesn't presence of iothread lead to
> parallel handling of requests in multiple threads?

No, it doesn't. We're running under the AioContext lock.

If we were actually running in parallel threads, your locks might also
be needed, but then they would have to be real thread locks instead of
coroutine locks. But it doesn't happen today, so I think it's best to
ignore.

Kevin

^ permalink raw reply	[flat|nested] 43+ messages in thread

* Re: [Qemu-devel] [PATCH RFC v2 05/22] block/pcache: add aio requests into cache
  2016-09-06 17:07       ` Kevin Wolf
@ 2016-09-07 16:21         ` Pavel Butsykin
  0 siblings, 0 replies; 43+ messages in thread
From: Pavel Butsykin @ 2016-09-07 16:21 UTC (permalink / raw)
  To: Kevin Wolf
  Cc: qemu-block, qemu-devel, mreitz, stefanha, den, jsnow, eblake, famz

On 06.09.2016 20:07, Kevin Wolf wrote:
> Am 06.09.2016 um 18:54 hat Pavel Butsykin geschrieben:
>>>> +        *out_node = found;
>>>> +        return false;
>>>> +    }
>>>> +    atomic_add(&s->pcache.curr_size, new_node->cm.nb_sectors);
>>>
>>> atomic_add() implies that you have concurrent threads. I don't see any.
>>>
>> Yeah, but what about iothread? Doesn't presence of iothread lead to
>> parallel handling of requests in multiple threads?
>
> No, it doesn't. We're running under the AioContext lock.

Yes, you're right. For some reason I thought that there is such a
possibility :)

> If we were actually running in parallel threads, your locks might also
> be needed, but then they would have to be real thread locks instead of
> coroutine locks. But it doesn't happen today, so I think it's best to
> ignore.

I agree, use coroutine locks to protect the data was a bad idea.

> Kevin
>

^ permalink raw reply	[flat|nested] 43+ messages in thread

* Re: [Qemu-devel] [PATCH RFC v2 08/22] block/pcache: implement pickup parts of the cache
  2016-09-02  8:59   ` Kevin Wolf
@ 2016-09-08 12:29     ` Pavel Butsykin
  0 siblings, 0 replies; 43+ messages in thread
From: Pavel Butsykin @ 2016-09-08 12:29 UTC (permalink / raw)
  To: Kevin Wolf
  Cc: qemu-block, qemu-devel, mreitz, stefanha, den, jsnow, eblake, famz

On 02.09.2016 11:59, Kevin Wolf wrote:
> Am 29.08.2016 um 19:10 hat Pavel Butsykin geschrieben:
>> Implementation of obtaining fragments of the cache belonging to one area
>> of request. This will allow to handle the case when a request is partially
>> hits the cache.
>>
>> Signed-off-by: Pavel Butsykin <pbutsykin@virtuozzo.com>
>
>> +static void pcache_pickup_parts_of_cache(PrefCacheAIOCB *acb, PCNode *node,
>> +                                         uint64_t num, uint32_t size)
>> +{
>> +    uint32_t up_size;
>> +
>> +    do {
>> +        if (num < node->cm.sector_num) {
>> +            PCNode *new_node;
>> +            RbNodeKey lc_key = {
>> +                .num = num,
>> +                .size = node->cm.sector_num - num,
>> +            };
>> +            up_size = lc_key.size;
>> +
>> +            if (!pcache_node_find_and_create(acb, &lc_key, &new_node)) {
>> +                node = new_node;
>> +                continue;
>> +            }
>
> We're creating additional nodes here; and we need them because they have
> their own status. But once the read has completed, wouldn't it make
> sense to merge all adjacent nodes in NODE_SUCCESS_STATUS?

This can be done, but first we need to think, worth it or not. If we
merge nodes, the tree will have less nodes, the search will be faster,
that's good. But we will have to re-allocate memory, as well as the
merging could lead to the formation of larger nodes, and it's not very
good for the displacement, because it increases the chance of
displacement of unread areas of the cache memory.

I think we do not need to cache missing memory, because it is contrary
to the read-ahead policy. (read the area of the disk once will not be
read again) Also, considering the 22nd patch(
[PATCH RFC v2 22/22] block/pcache: drop used pcache node), it just
makes no sense to move the missing pieces of data in the cache memory.

> Kevin
>

^ permalink raw reply	[flat|nested] 43+ messages in thread

* Re: [Qemu-devel] [PATCH RFC v2 04/22] block/pcache: add pcache debug build
  2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 04/22] block/pcache: add pcache debug build Pavel Butsykin
@ 2016-09-08 15:11   ` Eric Blake
  2016-09-08 15:49     ` Pavel Butsykin
  0 siblings, 1 reply; 43+ messages in thread
From: Eric Blake @ 2016-09-08 15:11 UTC (permalink / raw)
  To: Pavel Butsykin, qemu-block, qemu-devel
  Cc: kwolf, mreitz, stefanha, den, jsnow, famz

[-- Attachment #1: Type: text/plain, Size: 1316 bytes --]

On 08/29/2016 12:10 PM, Pavel Butsykin wrote:
> Signed-off-by: Pavel Butsykin <pbutsykin@virtuozzo.com>
> ---
>  block/pcache.c | 9 +++++++++
>  1 file changed, 9 insertions(+)
> 
> diff --git a/block/pcache.c b/block/pcache.c
> index 74a4bc4..7f221d6 100644
> --- a/block/pcache.c
> +++ b/block/pcache.c
> @@ -28,6 +28,15 @@
>  #include "qapi/error.h"
>  #include "qapi/qmp/qstring.h"
>  
> +#define PCACHE_DEBUG

Are you sure you want this left enabled?

> +
> +#ifdef PCACHE_DEBUG
> +#define DPRINTF(fmt, ...) \
> +        printf("%s:%s:%d "fmt, __FILE__, __func__, __LINE__, ## __VA_ARGS__)
> +#else
> +#define DPRINTF(fmt, ...) do { } while (0)
> +#endif

NACK.  This leads to bitrot when PCACHE_DEBUG is not defined.  Also, we
typically send debug to stderr, not stdout.  Instead, please follow the
lead of many other debug places, which do something similar to this (off
the top of my head, therefore untested):

#ifdef PCACHE_DEBUG
# define PCACHE_DEBUG_PRINT 1
#else
# define PCACHE_DEBUG_PRINT 0
#endif
#define DPRINTF(fmt, ...) \
    do { \
        if (PCACHE_DEBUG_PRINT) { \
            fprintf(stderr, ... __VA_ARGS__) \
        } \
    } while (0)

-- 
Eric Blake   eblake redhat com    +1-919-301-3266
Libvirt virtualization library http://libvirt.org


[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 604 bytes --]

^ permalink raw reply	[flat|nested] 43+ messages in thread

* Re: [Qemu-devel] [PATCH RFC v2 09/22] block/pcache: separation AIOCB on requests
  2016-09-02  9:10   ` Kevin Wolf
@ 2016-09-08 15:47     ` Pavel Butsykin
  0 siblings, 0 replies; 43+ messages in thread
From: Pavel Butsykin @ 2016-09-08 15:47 UTC (permalink / raw)
  To: Kevin Wolf
  Cc: qemu-block, qemu-devel, mreitz, stefanha, den, jsnow, eblake, famz

On 02.09.2016 12:10, Kevin Wolf wrote:
> Am 29.08.2016 um 19:10 hat Pavel Butsykin geschrieben:
>> for case when the cache partially covers request we are part of the request
>> is filled from the cache, and the other part request from disk. Also add
>> reference counting for nodes, as way to maintain multithreading.
>>
>> There is still no full synchronization in multithreaded mode.
>>
>> Signed-off-by: Pavel Butsykin <pbutsykin@virtuozzo.com>
>> ---
>>   block/pcache.c | 169 ++++++++++++++++++++++++++++++++++++++++++++++++++++-----
>>   1 file changed, 155 insertions(+), 14 deletions(-)
>>
>> diff --git a/block/pcache.c b/block/pcache.c
>> index 28bd056..6114289 100644
>> --- a/block/pcache.c
>> +++ b/block/pcache.c
>> @@ -58,7 +58,10 @@ typedef struct BlockNode {
>>   typedef struct PCNode {
>>       BlockNode cm;
>>
>> +    uint32_t                 status;
>
> I guess this is NODE_*_STATUS. Make it a named enum then instead of
> uint32_t so that it's obvious what this field means.

OK

>> +    uint32_t                 ref;
>>       uint8_t                  *data;
>> +    CoMutex                  lock;
>>   } PCNode;
>>
>>   typedef struct ReqStor {
>> @@ -95,9 +98,23 @@ typedef struct PrefCacheAIOCB {
>>       uint64_t sector_num;
>>       uint32_t nb_sectors;
>>       int      aio_type;
>> +    struct {
>> +        QTAILQ_HEAD(req_head, PrefCachePartReq) list;
>> +        CoMutex lock;
>> +    } requests;
>>       int      ret;
>>   } PrefCacheAIOCB;
>>
>> +typedef struct PrefCachePartReq {
>> +    uint64_t sector_num;
>> +    uint32_t nb_sectors;
>
> Should be byte-based, like everything.
>
>> +    QEMUIOVector qiov;
>> +    PCNode *node;
>> +    PrefCacheAIOCB *acb;
>> +    QTAILQ_ENTRY(PrefCachePartReq) entry;
>> +} PrefCachePartReq;
>> +
>>   static const AIOCBInfo pcache_aiocb_info = {
>>       .aiocb_size = sizeof(PrefCacheAIOCB),
>>   };
>> @@ -126,8 +143,39 @@ static QemuOptsList runtime_opts = {
>>   #define MB_BITS 20
>>   #define PCACHE_DEFAULT_CACHE_SIZE (4 << MB_BITS)
>>
>> +enum {
>> +    NODE_SUCCESS_STATUS = 0,
>> +    NODE_WAIT_STATUS    = 1,
>> +    NODE_REMOVE_STATUS  = 2,
>> +    NODE_GHOST_STATUS   = 3 /* only for debugging */
>
> NODE_DELETED_STATUS?

Yes :)

>> +};
>> +
>>   #define PCNODE(_n) ((PCNode *)(_n))
>>
>> +static inline void pcache_node_unref(PCNode *node)
>> +{
>> +    assert(node->status == NODE_SUCCESS_STATUS ||
>> +           node->status == NODE_REMOVE_STATUS);
>> +
>> +    if (atomic_fetch_dec(&node->ref) == 0) {
>
> Atomics imply concurrency, which we don't have.
>
>> +        assert(node->status == NODE_REMOVE_STATUS);
>> +
>> +        node->status = NODE_GHOST_STATUS;
>> +        g_free(node->data);
>> +        g_slice_free1(sizeof(*node), node);
>
> When you switch to plain g_malloc(), this needs to be updated.
>
>> +    }
>> +}
>> +
>> +static inline PCNode *pcache_node_ref(PCNode *node)
>> +{
>> +    assert(node->status == NODE_SUCCESS_STATUS ||
>> +           node->status == NODE_WAIT_STATUS);
>> +    assert(atomic_read(&node->ref) == 0);/* XXX: only for sequential requests */
>> +    atomic_inc(&node->ref);
>
> Do you expect concurrent accesses or not? Because if you don't, there is
> no need for atomics, but if you do, this is buggy because each of the
> lines is atomic for itself, but the assertion isn't atomic with the
> refcount increment.

Well, about concurrent accesses, we've already figured out.

> A ref() function that can take only a single reference feels odd anyway
> and this restriction seems to be lifted later. Why have it here?

No, this is a temporary assert(). In fact, it is not necessary, but the
assert helps to check the correct functioning on the current patch,
because not yet implemented reading of nodes and rescheduling requests.

>> +
>> +    return node;
>> +}
>
> Kevin
>

^ permalink raw reply	[flat|nested] 43+ messages in thread

* Re: [Qemu-devel] [PATCH RFC v2 04/22] block/pcache: add pcache debug build
  2016-09-08 15:11   ` Eric Blake
@ 2016-09-08 15:49     ` Pavel Butsykin
  2016-09-08 16:05       ` Pavel Butsykin
  0 siblings, 1 reply; 43+ messages in thread
From: Pavel Butsykin @ 2016-09-08 15:49 UTC (permalink / raw)
  To: Eric Blake, qemu-block, qemu-devel
  Cc: kwolf, mreitz, stefanha, den, jsnow, famz

On 08.09.2016 18:11, Eric Blake wrote:
> On 08/29/2016 12:10 PM, Pavel Butsykin wrote:
>> Signed-off-by: Pavel Butsykin <pbutsykin@virtuozzo.com>
>> ---
>>   block/pcache.c | 9 +++++++++
>>   1 file changed, 9 insertions(+)
>>
>> diff --git a/block/pcache.c b/block/pcache.c
>> index 74a4bc4..7f221d6 100644
>> --- a/block/pcache.c
>> +++ b/block/pcache.c
>> @@ -28,6 +28,15 @@
>>   #include "qapi/error.h"
>>   #include "qapi/qmp/qstring.h"
>>
>> +#define PCACHE_DEBUG
>
> Are you sure you want this left enabled?

No.

>> +
>> +#ifdef PCACHE_DEBUG
>> +#define DPRINTF(fmt, ...) \
>> +        printf("%s:%s:%d "fmt, __FILE__, __func__, __LINE__, ## __VA_ARGS__)
>> +#else
>> +#define DPRINTF(fmt, ...) do { } while (0)
>> +#endif
>
> NACK.  This leads to bitrot when PCACHE_DEBUG is not defined.  Also, we
> typically send debug to stderr, not stdout.  Instead, please follow the
> lead of many other debug places, which do something similar to this (off
> the top of my head, therefore untested):
>
> #ifdef PCACHE_DEBUG
> # define PCACHE_DEBUG_PRINT 1
> #else
> # define PCACHE_DEBUG_PRINT 0
> #endif
> #define DPRINTF(fmt, ...) \
>      do { \
>          if (PCACHE_DEBUG_PRINT) { \
>              fprintf(stderr, ... __VA_ARGS__) \
>          } \
>      } while (0)
>

OK, thanks!

^ permalink raw reply	[flat|nested] 43+ messages in thread

* Re: [Qemu-devel] [PATCH RFC v2 04/22] block/pcache: add pcache debug build
  2016-09-08 15:49     ` Pavel Butsykin
@ 2016-09-08 16:05       ` Pavel Butsykin
  2016-09-08 18:42         ` Eric Blake
  0 siblings, 1 reply; 43+ messages in thread
From: Pavel Butsykin @ 2016-09-08 16:05 UTC (permalink / raw)
  To: Eric Blake, qemu-block, qemu-devel
  Cc: kwolf, mreitz, stefanha, den, jsnow, famz

On 08.09.2016 18:49, Pavel Butsykin wrote:
> On 08.09.2016 18:11, Eric Blake wrote:
>> On 08/29/2016 12:10 PM, Pavel Butsykin wrote:
>>> Signed-off-by: Pavel Butsykin <pbutsykin@virtuozzo.com>
>>> ---
>>>   block/pcache.c | 9 +++++++++
>>>   1 file changed, 9 insertions(+)
>>>
>>> diff --git a/block/pcache.c b/block/pcache.c
>>> index 74a4bc4..7f221d6 100644
>>> --- a/block/pcache.c
>>> +++ b/block/pcache.c
>>> @@ -28,6 +28,15 @@
>>>   #include "qapi/error.h"
>>>   #include "qapi/qmp/qstring.h"
>>>
>>> +#define PCACHE_DEBUG
>>
>> Are you sure you want this left enabled?
>
> No.
>
>>> +
>>> +#ifdef PCACHE_DEBUG
>>> +#define DPRINTF(fmt, ...) \
>>> +        printf("%s:%s:%d "fmt, __FILE__, __func__, __LINE__, ##
>>> __VA_ARGS__)
>>> +#else
>>> +#define DPRINTF(fmt, ...) do { } while (0)
>>> +#endif
>>
>> NACK.  This leads to bitrot when PCACHE_DEBUG is not defined.  Also, we
>> typically send debug to stderr, not stdout.  Instead, please follow the
>> lead of many other debug places, which do something similar to this (off
>> the top of my head, therefore untested):
>>
>> #ifdef PCACHE_DEBUG
>> # define PCACHE_DEBUG_PRINT 1
>> #else
>> # define PCACHE_DEBUG_PRINT 0
>> #endif
>> #define DPRINTF(fmt, ...) \
>>      do { \
>>          if (PCACHE_DEBUG_PRINT) { \
>>              fprintf(stderr, ... __VA_ARGS__) \
>>          } \
>>      } while (0)
>>
>
> OK, thanks!

Can I replace DPRINTFs on tracepoints?

^ permalink raw reply	[flat|nested] 43+ messages in thread

* Re: [Qemu-devel] [PATCH RFC v2 04/22] block/pcache: add pcache debug build
  2016-09-08 16:05       ` Pavel Butsykin
@ 2016-09-08 18:42         ` Eric Blake
  0 siblings, 0 replies; 43+ messages in thread
From: Eric Blake @ 2016-09-08 18:42 UTC (permalink / raw)
  To: Pavel Butsykin, qemu-block, qemu-devel
  Cc: kwolf, mreitz, stefanha, den, jsnow, famz

[-- Attachment #1: Type: text/plain, Size: 776 bytes --]

On 09/08/2016 11:05 AM, Pavel Butsykin wrote:
>>>
>>> #ifdef PCACHE_DEBUG
>>> # define PCACHE_DEBUG_PRINT 1
>>> #else
>>> # define PCACHE_DEBUG_PRINT 0
>>> #endif
>>> #define DPRINTF(fmt, ...) \
>>>      do { \
>>>          if (PCACHE_DEBUG_PRINT) { \
>>>              fprintf(stderr, ... __VA_ARGS__) \
>>>          } \
>>>      } while (0)
>>>
>>
>> OK, thanks!
> 
> Can I replace DPRINTFs on tracepoints?
> 

Yes, tracepoints are even better than conditional printfs. They are a
bit trickier to set up, but more powerful in the end.  And they are
equally immune to the bitrot that I was trying to prevent with your
definition of DPRINTF.

-- 
Eric Blake   eblake redhat com    +1-919-301-3266
Libvirt virtualization library http://libvirt.org


[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 604 bytes --]

^ permalink raw reply	[flat|nested] 43+ messages in thread

end of thread, other threads:[~2016-09-08 21:04 UTC | newest]

Thread overview: 43+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2016-08-29 17:09 [Qemu-devel] [PATCH RFC v2 00/22] I/O prefetch cache Pavel Butsykin
2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 01/22] block/pcache: empty pcache driver filter Pavel Butsykin
2016-09-01 14:31   ` Kevin Wolf
2016-09-06 15:20     ` Pavel Butsykin
2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 02/22] block/pcache: add own AIOCB block Pavel Butsykin
2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 03/22] util/rbtree: add rbtree from linux kernel Pavel Butsykin
2016-09-01 14:37   ` Kevin Wolf
2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 04/22] block/pcache: add pcache debug build Pavel Butsykin
2016-09-08 15:11   ` Eric Blake
2016-09-08 15:49     ` Pavel Butsykin
2016-09-08 16:05       ` Pavel Butsykin
2016-09-08 18:42         ` Eric Blake
2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 05/22] block/pcache: add aio requests into cache Pavel Butsykin
2016-09-01 15:28   ` Kevin Wolf
2016-09-06 16:54     ` Pavel Butsykin
2016-09-06 17:07       ` Kevin Wolf
2016-09-07 16:21         ` Pavel Butsykin
2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 06/22] block/pcache: restrict cache size Pavel Butsykin
2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 07/22] block/pcache: introduce LRU as method of memory Pavel Butsykin
2016-09-02  8:49   ` Kevin Wolf
2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 08/22] block/pcache: implement pickup parts of the cache Pavel Butsykin
2016-09-02  8:59   ` Kevin Wolf
2016-09-08 12:29     ` Pavel Butsykin
2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 09/22] block/pcache: separation AIOCB on requests Pavel Butsykin
2016-09-02  9:10   ` Kevin Wolf
2016-09-08 15:47     ` Pavel Butsykin
2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 10/22] block/pcache: add check node leak Pavel Butsykin
2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 11/22] add QEMU style defines for __sync_add_and_fetch Pavel Butsykin
2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 12/22] block/pcache: implement read cache to qiov and drop node during aio write Pavel Butsykin
2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 13/22] block/pcache: add generic request complete Pavel Butsykin
2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 14/22] block/pcache: add support for rescheduling requests Pavel Butsykin
2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 15/22] block/pcache: simple readahead one chunk forward Pavel Butsykin
2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 16/22] block/pcache: pcache readahead node around Pavel Butsykin
2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 17/22] block/pcache: skip readahead for non-sequential requests Pavel Butsykin
2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 18/22] block/pcache: add pcache skip large aio read Pavel Butsykin
2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 19/22] block/pcache: add pcache node assert Pavel Butsykin
2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 20/22] block/pcache: implement pcache error handling of aio cb Pavel Butsykin
2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 21/22] block/pcache: add write through node Pavel Butsykin
2016-08-29 17:10 ` [Qemu-devel] [PATCH RFC v2 22/22] block/pcache: drop used pcache node Pavel Butsykin
2016-09-01 14:17 ` [Qemu-devel] [PATCH RFC v2 00/22] I/O prefetch cache Kevin Wolf
2016-09-06 12:36   ` Pavel Butsykin
2016-09-01 15:26 ` Avi Kivity
2016-09-06 12:40   ` Pavel Butsykin

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.