All of lore.kernel.org
 help / color / mirror / Atom feed
* read/write on RADOS using external buffer
@ 2010-09-13 11:57 Takuya ASADA
  2010-09-14 18:44 ` Sage Weil
  0 siblings, 1 reply; 18+ messages in thread
From: Takuya ASADA @ 2010-09-13 11:57 UTC (permalink / raw)
  To: ceph-devel

Hi,

I want to use external buffer in Rados::write() and Rados::read() in
my application, I figure out the way on write() but I couldn't on
read().

I have "char *buf" as buffer, and it's length "int len".
On write(), I could do it like this:
  rados.initialize(0, NULL);
  rados.open_pool(bucket, &pool);
  bl.push_front(ceph::buffer::create_static(len, buf));
  rados.write_full(pool, oid, bl);

But on read(), I tried slimier code but fails:
  r = rados.initialize(0, NULL);
  r = rados.open_pool(bucket, &pool);
  bl.push_front(ceph::buffer::create_static(len, buf));
  rados.read(pool, oid, 0, bl, 0);

It seems new buffer allocated inside of read() operation, and the data
wrote on new buffer instead of "buf".
Where that new buffer allocated?
And is there a way to use external buffer instead of new buffer?

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: read/write on RADOS using external buffer
  2010-09-13 11:57 read/write on RADOS using external buffer Takuya ASADA
@ 2010-09-14 18:44 ` Sage Weil
  2010-09-14 21:03   ` Takuya ASADA
  2010-10-19 22:27   ` Takuya ASADA
  0 siblings, 2 replies; 18+ messages in thread
From: Sage Weil @ 2010-09-14 18:44 UTC (permalink / raw)
  To: Takuya ASADA; +Cc: ceph-devel

Hi,

On Mon, 13 Sep 2010, Takuya ASADA wrote:
> I want to use external buffer in Rados::write() and Rados::read() in
> my application, I figure out the way on write() but I couldn't on
> read().
> 
> I have "char *buf" as buffer, and it's length "int len".
> On write(), I could do it like this:
>   rados.initialize(0, NULL);
>   rados.open_pool(bucket, &pool);
>   bl.push_front(ceph::buffer::create_static(len, buf));
>   rados.write_full(pool, oid, bl);
> 
> But on read(), I tried slimier code but fails:
>   r = rados.initialize(0, NULL);
>   r = rados.open_pool(bucket, &pool);
>   bl.push_front(ceph::buffer::create_static(len, buf));
>   rados.read(pool, oid, 0, bl, 0);
> 
> It seems new buffer allocated inside of read() operation, and the data
> wrote on new buffer instead of "buf".
> Where that new buffer allocated?
> And is there a way to use external buffer instead of new buffer?

There currently isn't a way to use a preallocated buffer for reads.  The 
incoming data is read into a buffer allocated low in the stack by the 
messaging layer.  The bufferlist& ref is really a handle for the caller to 
get at the buffers; any prior contents are discarded.

This is something that could be changed, but it will require an additional 
messenger callback so that the upper layers can provide the buffer to read 
the data into.  (That is what the kernel implementation does.)

sage

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: read/write on RADOS using external buffer
  2010-09-14 18:44 ` Sage Weil
@ 2010-09-14 21:03   ` Takuya ASADA
  2010-09-14 21:10     ` Sage Weil
  2010-10-19 22:27   ` Takuya ASADA
  1 sibling, 1 reply; 18+ messages in thread
From: Takuya ASADA @ 2010-09-14 21:03 UTC (permalink / raw)
  To: Sage Weil; +Cc: ceph-devel

Hi,

> There currently isn't a way to use a preallocated buffer for reads.  The
> incoming data is read into a buffer allocated low in the stack by the
> messaging layer.  The bufferlist& ref is really a handle for the caller to
> get at the buffers; any prior contents are discarded.

I understood.
So that allocated inside SimpleMessenger::Pipe::read_message(), right?

> This is something that could be changed, but it will require an additional
> messenger callback so that the upper layers can provide the buffer to read
> the data into.  (That is what the kernel implementation does.)

Well, could you tell me where can I find that callback code?
In fs/ceph/messenger.c?
I'd like to look into it.
--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: read/write on RADOS using external buffer
  2010-09-14 21:03   ` Takuya ASADA
@ 2010-09-14 21:10     ` Sage Weil
  0 siblings, 0 replies; 18+ messages in thread
From: Sage Weil @ 2010-09-14 21:10 UTC (permalink / raw)
  To: Takuya ASADA; +Cc: ceph-devel

[-- Attachment #1: Type: TEXT/PLAIN, Size: 1116 bytes --]

On Wed, 15 Sep 2010, Takuya ASADA wrote:
> Hi,
> 
> > There currently isn't a way to use a preallocated buffer for reads.  The
> > incoming data is read into a buffer allocated low in the stack by the
> > messaging layer.  The bufferlist& ref is really a handle for the caller to
> > get at the buffers; any prior contents are discarded.
> 
> I understood.
> So that allocated inside SimpleMessenger::Pipe::read_message(), right?
> 
> > This is something that could be changed, but it will require an additional
> > messenger callback so that the upper layers can provide the buffer to read
> > the data into.  (That is what the kernel implementation does.)
> 
> Well, could you tell me where can I find that callback code?
> In fs/ceph/messenger.c?
> I'd like to look into it.

Yeah, it's the alloc_msg callback in struct ceph_connection_operations.  
You'll find implementations in {osd,mon,mds}_client.c.  The message header 
is passed as an argument, which includes message payload sizes as well as 
a tid (request/transaction id) to (say) look up the request the response 
is for.

sage

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: read/write on RADOS using external buffer
  2010-09-14 18:44 ` Sage Weil
  2010-09-14 21:03   ` Takuya ASADA
@ 2010-10-19 22:27   ` Takuya ASADA
  2010-10-20  3:35     ` Colin McCabe
  2010-10-20  4:10     ` Gregory Farnum
  1 sibling, 2 replies; 18+ messages in thread
From: Takuya ASADA @ 2010-10-19 22:27 UTC (permalink / raw)
  To: Sage Weil; +Cc: ceph-devel

Hi,

I just implemented the patch to support external buffer on Rados::read() and Rados::write().

diff --git a/src/include/librados.hpp b/src/include/librados.hpp
index 06fa3b2..bfb0f5b 100644
--- a/src/include/librados.hpp
+++ b/src/include/librados.hpp
@@ -63,8 +63,10 @@ public:
   int create(pool_t pool, const std::string& oid, bool exclusive);
 
   int write(pool_t pool, const std::string& oid, off_t off, bufferlist& bl, size_t len);
+  int write(pool_t pool, const std::string& oid, off_t off, void *buf, size_t len);
   int write_full(pool_t pool, const std::string& oid, bufferlist& bl);
   int read(pool_t pool, const std::string& oid, off_t off, bufferlist& bl, size_t len);
+  int read(pool_t pool, const std::string& oid, off_t off, void *buf, size_t len);
   int remove(pool_t pool, const std::string& oid);
   int trunc(pool_t pool, const std::string& oid, size_t size);
 
@@ -135,4 +137,3 @@ public:
 }
 
 #endif
-
diff --git a/src/librados.cc b/src/librados.cc
index 4c8a464..91e6a27 100644
--- a/src/librados.cc
+++ b/src/librados.cc
@@ -72,11 +72,12 @@ class RadosClient : public Dispatcher
 
   Mutex lock;
   Cond cond;
+  static hash_map<tid_t, bufferptr*> buffer_map;
+  static bufferptr* fetch_buffer_func(tid_t tid);
 
- 
 public:
   RadosClient() : messenger(NULL), lock("radosclient") {
-    messenger = new SimpleMessenger();
+    messenger = new SimpleMessenger(&RadosClient::fetch_buffer_func);
   }
 
   ~RadosClient();
@@ -132,8 +133,10 @@ public:
   // io
   int create(PoolCtx& pool, const object_t& oid, bool exclusive);
   int write(PoolCtx& pool, const object_t& oid, off_t off, bufferlist& bl, size_t len);
+  int write(PoolCtx& pool, const object_t& oid, off_t off, void *buf, size_t len);
   int write_full(PoolCtx& pool, const object_t& oid, bufferlist& bl);
   int read(PoolCtx& pool, const object_t& oid, off_t off, bufferlist& bl, size_t len);
+  int read(PoolCtx& pool, const object_t& oid, off_t off, void *buf, size_t len);
   int remove(PoolCtx& pool, const object_t& oid);
   int stat(PoolCtx& pool, const object_t& oid, uint64_t *psize, time_t *pmtime);
   int trunc(PoolCtx& pool, const object_t& oid, size_t size);
@@ -870,6 +873,15 @@ int RadosClient::write(PoolCtx& pool, const object_t& oid, off_t off, bufferlist
   return len;
 }
 
+int RadosClient::write(PoolCtx& pool, const object_t& oid, off_t off, void *buf, size_t len)
+{
+  bufferptr bp = buffer::create_static(len, static_cast<char *>(buf));
+  bufferlist bl;
+
+  bl.push_back(bp);
+  return write(pool, oid, off, bl, len);
+}
+
 int RadosClient::write_full(PoolCtx& pool, const object_t& oid, bufferlist& bl)
 {
   utime_t ut = g_clock.now();
@@ -1116,6 +1128,46 @@ int RadosClient::read(PoolCtx& pool, const object_t& oid, off_t off, bufferlist&
   return bl.length();
 }
 
+int RadosClient::read(PoolCtx& pool, const object_t& oid, off_t off, void *buf, size_t len)
+{
+  SnapContext snapc;
+
+  Mutex mylock("RadosClient::read::mylock");
+  Cond cond;
+  bool done;
+  int r;
+  Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
+  bufferptr bp = buffer::create_static(len, static_cast<char *>(buf));
+  bufferlist bl;
+
+  bl.push_back(bp);
+  lock.Lock();
+  ceph_object_layout layout = objecter->osdmap->make_object_layout(oid, pool.poolid);
+  tid_t tid = objecter->get_tid();
+  buffer_map[tid] = &bp;
+  objecter->read_with_tid(oid, layout,
+	      off, len, pool.snap_seq, &bl, 0,
+	      onack, tid);
+  lock.Unlock();
+
+  mylock.Lock();
+  while (!done)
+    cond.Wait(mylock);
+  mylock.Unlock();
+  buffer_map.erase(tid);
+  dout(10) << "Objecter returned from read r=" << r << dendl;
+
+  if (r < 0)
+    return r;
+
+  if (bl.length() < len) {
+    dout(10) << "Returned length " << bl.length()
+	     << " less than original length "<< len << dendl;
+  }
+
+  return bl.length();
+}
+
 int RadosClient::stat(PoolCtx& pool, const object_t& oid, uint64_t *psize, time_t *pmtime)
 {
   SnapContext snapc;
@@ -1251,6 +1303,13 @@ int RadosClient::getxattrs(PoolCtx& pool, const object_t& oid, map<std::string,
   return r;
 }
 
+hash_map<tid_t, bufferptr*> RadosClient::buffer_map;
+
+bufferptr* RadosClient::fetch_buffer_func(tid_t tid)
+{
+  return buffer_map[tid];
+}
+
 // ---------------------------------------------
 
 namespace librados {
@@ -1401,6 +1460,14 @@ int Rados::write(rados_pool_t pool, const string& o, off_t off, bufferlist& bl,
   return ((RadosClient *)client)->write(*(RadosClient::PoolCtx *)pool, oid, off, bl, len);
 }
 
+int Rados::write(rados_pool_t pool, const string& o, off_t off, void *buf, size_t len)
+{
+  if (!client)
+    return -EINVAL;
+  object_t oid(o);
+  return ((RadosClient *)client)->write(*(RadosClient::PoolCtx *)pool, oid, off, buf, len);
+}
+
 int Rados::write_full(rados_pool_t pool, const string& o, bufferlist& bl)
 {
   if (!client)
@@ -1433,6 +1500,14 @@ int Rados::read(rados_pool_t pool, const string& o, off_t off, bufferlist& bl, s
   return ((RadosClient *)client)->read(*(RadosClient::PoolCtx *)pool, oid, off, bl, len);
 }
 
+int Rados::read(rados_pool_t pool, const string& o, off_t off, void *buf, size_t len)
+{
+  if (!client)
+    return -EINVAL;
+  object_t oid(o);
+  return ((RadosClient *)client)->read(*(RadosClient::PoolCtx *)pool, oid, off, buf, len);
+}
+
 int Rados::getxattr(rados_pool_t pool, const string& o, const char *name, bufferlist& bl)
 {
   if (!client)
diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc
index 4632267..75b4576 100644
--- a/src/msg/SimpleMessenger.cc
+++ b/src/msg/SimpleMessenger.cc
@@ -51,7 +51,7 @@ static ostream& _prefix(SimpleMessenger *messenger) {
 #define closed_socket() //dout(20) << "closed_socket " << --sockopen << dendl;
 #define opened_socket() //dout(20) << "opened_socket " << ++sockopen << dendl;
 
-
+SimpleMessenger::fetch_buffer_callback_t SimpleMessenger::fetch_buffer_callback = 0;
 
 /********************************************
  * Accepter
@@ -1786,36 +1786,47 @@ int SimpleMessenger::Pipe::read_message(Message **pm)
   data_len = le32_to_cpu(header.data_len);
   data_off = le32_to_cpu(header.data_off);
   if (data_len) {
-    int left = data_len;
-    if (data_off & ~PAGE_MASK) {
-      // head
-      int head = MIN(PAGE_SIZE - (data_off & ~PAGE_MASK),
-		     (unsigned)left);
-      bufferptr bp = buffer::create(head);
-      if (tcp_read( sd, bp.c_str(), head, messenger->timeout ) < 0)
+    if (fetch_buffer_callback) {
+      bufferptr *bpp = fetch_buffer_callback(header.tid);
+      if (!bpp)
+	goto allocate_buffer;
+      if (tcp_read( sd, bpp->c_str(), data_len, messenger->timeout ) < 0)
 	goto out_dethrottle;
-      data.push_back(bp);
-      left -= head;
-      dout(20) << "reader got data head " << head << dendl;
-    }
+      data.push_back(*bpp);
+      dout(20) << "reader got data " << data_len << dendl;
+    }else{
+    allocate_buffer:
+      int left = data_len;
+      if (data_off & ~PAGE_MASK) {
+	// head
+	int head = MIN(PAGE_SIZE - (data_off & ~PAGE_MASK),
+		       (unsigned)left);
+	bufferptr bp = buffer::create(head);
+	if (tcp_read( sd, bp.c_str(), head, messenger->timeout ) < 0)
+	  goto out_dethrottle;
+	data.push_back(bp);
+	left -= head;
+	dout(20) << "reader got data head " << head << dendl;
+      }
 
-    // middle
-    int middle = left & PAGE_MASK;
-    if (middle > 0) {
-      bufferptr bp = buffer::create_page_aligned(middle);
-      if (tcp_read( sd, bp.c_str(), middle, messenger->timeout ) < 0)
-	goto out_dethrottle;
-      data.push_back(bp);
-      left -= middle;
-      dout(20) << "reader got data page-aligned middle " << middle << dendl;
-    }
+      // middle
+      int middle = left & PAGE_MASK;
+      if (middle > 0) {
+	bufferptr bp = buffer::create_page_aligned(middle);
+	if (tcp_read( sd, bp.c_str(), middle, messenger->timeout ) < 0)
+	  goto out_dethrottle;
+	data.push_back(bp);
+	left -= middle;
+	dout(20) << "reader got data page-aligned middle " << middle << dendl;
+      }
 
-    if (left) {
-      bufferptr bp = buffer::create(left);
-      if (tcp_read( sd, bp.c_str(), left, messenger->timeout ) < 0)
-	goto out_dethrottle;
-      data.push_back(bp);
-      dout(20) << "reader got data tail " << left << dendl;
+      if (left) {
+	bufferptr bp = buffer::create(left);
+	if (tcp_read( sd, bp.c_str(), left, messenger->timeout ) < 0)
+	  goto out_dethrottle;
+	data.push_back(bp);
+	dout(20) << "reader got data tail " << left << dendl;
+      }
     }
   }
 
diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h
index b4a0ef3..72a5a1b 100644
--- a/src/msg/SimpleMessenger.h
+++ b/src/msg/SimpleMessenger.h
@@ -567,6 +567,9 @@ private:
   SimpleMessenger *messenger; //hack to make dout macro work, will fix
   int timeout;
 
+  typedef bufferptr* (*fetch_buffer_callback_t) (tid_t);
+  static fetch_buffer_callback_t fetch_buffer_callback;
+
 public:
   SimpleMessenger() :
     Messenger(entity_name_t()),
@@ -580,6 +583,20 @@ public:
     // for local dmsg delivery
     dispatch_queue.local_pipe = new Pipe(this, Pipe::STATE_OPEN);
   }
+
+  SimpleMessenger(fetch_buffer_callback_t callback) :
+    Messenger(entity_name_t()),
+    accepter(this),
+    lock("SimpleMessenger::lock"), started(false), did_bind(false),
+    dispatch_throttler(g_conf.ms_dispatch_throttle_bytes), need_addr(true),
+    destination_stopped(true), my_type(-1),
+    global_seq_lock("SimpleMessenger::global_seq_lock"), global_seq(0),
+    reaper_thread(this), reaper_started(false), reaper_stop(false), 
+    dispatch_thread(this), messenger(this) {
+    fetch_buffer_callback = callback;
+    // for local dmsg delivery
+    dispatch_queue.local_pipe = new Pipe(this, Pipe::STATE_OPEN);
+  }
   ~SimpleMessenger() {
     delete dispatch_queue.local_pipe;
   }
diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h
index a34c0a9..fd43795 100644
--- a/src/osdc/Objecter.h
+++ b/src/osdc/Objecter.h
@@ -530,6 +530,21 @@ private:
     o->outbl = pbl;
     return op_submit(o);
   }
+  tid_t read_with_tid(const object_t& oid, ceph_object_layout ol, 
+	     uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, int flags,
+	     Context *onfinish, tid_t tid) {
+    vector<OSDOp> ops(1);
+    ops[0].op.op = CEPH_OSD_OP_READ;
+    ops[0].op.extent.offset = off;
+    ops[0].op.extent.length = len;
+    ops[0].op.extent.truncate_size = 0;
+    ops[0].op.extent.truncate_seq = 0;
+    Op *o = new Op(oid, ol, ops, flags, onfinish, 0);
+    o->snapid = snap;
+    o->outbl = pbl;
+    o->tid = tid;
+    return op_submit(o);
+  }
   tid_t read_trunc(const object_t& oid, ceph_object_layout ol, 
 	     uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, int flags,
 	     uint64_t trunc_size, __u32 trunc_seq,
@@ -725,6 +740,8 @@ private:
 
   void list_objects(ListContext *p, Context *onfinish);
 
+  tid_t get_tid(void) { return ++last_tid; }
+
   // -------------------------
   // pool ops
 private:

^ permalink raw reply related	[flat|nested] 18+ messages in thread

* Re: read/write on RADOS using external buffer
  2010-10-19 22:27   ` Takuya ASADA
@ 2010-10-20  3:35     ` Colin McCabe
  2010-10-20  4:26       ` Sage Weil
  2010-10-20  4:10     ` Gregory Farnum
  1 sibling, 1 reply; 18+ messages in thread
From: Colin McCabe @ 2010-10-20  3:35 UTC (permalink / raw)
  To: Takuya ASADA; +Cc: Sage Weil, ceph-devel

Hi Takuya,

Thanks for looking at this!

Unfortunately I think you have taken the wrong approach here. There
isn't any need for thread-local data in the Objecter. It's not threads
that the Objecter cares about, it's clients. Also, I don't think a
change like this needs to involve SimpleMessenger and all that.

Take a look at rados_read in librados.cc. This function implements the
C API, which can already handle using external buffers. It may be that
all you need to do is add functions in the C++ header that call
rados_read and rados_write!

I actually think that rados_read could be made slightly more
efficient. We could probably add a bufferlist constructor that starts
with an already allocated buffer, to avoid the call to bl.copy. I
haven't thought too hard about how to do this but in principle it
seems reasonable.

regards,
Colin McCabe


On Tue, Oct 19, 2010 at 3:27 PM, Takuya ASADA <syuu@dokukino.com> wrote:
> Hi,
>
> I just implemented the patch to support external buffer on Rados::read() and Rados::write().
>
> diff --git a/src/include/librados.hpp b/src/include/librados.hpp
> index 06fa3b2..bfb0f5b 100644
> --- a/src/include/librados.hpp
> +++ b/src/include/librados.hpp
> @@ -63,8 +63,10 @@ public:
>   int create(pool_t pool, const std::string& oid, bool exclusive);
>
>   int write(pool_t pool, const std::string& oid, off_t off, bufferlist& bl, size_t len);
> +  int write(pool_t pool, const std::string& oid, off_t off, void *buf, size_t len);
>   int write_full(pool_t pool, const std::string& oid, bufferlist& bl);
>   int read(pool_t pool, const std::string& oid, off_t off, bufferlist& bl, size_t len);
> +  int read(pool_t pool, const std::string& oid, off_t off, void *buf, size_t len);
>   int remove(pool_t pool, const std::string& oid);
>   int trunc(pool_t pool, const std::string& oid, size_t size);
>
> @@ -135,4 +137,3 @@ public:
>  }
>
>  #endif
> -
> diff --git a/src/librados.cc b/src/librados.cc
> index 4c8a464..91e6a27 100644
> --- a/src/librados.cc
> +++ b/src/librados.cc
> @@ -72,11 +72,12 @@ class RadosClient : public Dispatcher
>
>   Mutex lock;
>   Cond cond;
> +  static hash_map<tid_t, bufferptr*> buffer_map;
> +  static bufferptr* fetch_buffer_func(tid_t tid);
>
> -
>  public:
>   RadosClient() : messenger(NULL), lock("radosclient") {
> -    messenger = new SimpleMessenger();
> +    messenger = new SimpleMessenger(&RadosClient::fetch_buffer_func);
>   }
>
>   ~RadosClient();
> @@ -132,8 +133,10 @@ public:
>   // io
>   int create(PoolCtx& pool, const object_t& oid, bool exclusive);
>   int write(PoolCtx& pool, const object_t& oid, off_t off, bufferlist& bl, size_t len);
> +  int write(PoolCtx& pool, const object_t& oid, off_t off, void *buf, size_t len);
>   int write_full(PoolCtx& pool, const object_t& oid, bufferlist& bl);
>   int read(PoolCtx& pool, const object_t& oid, off_t off, bufferlist& bl, size_t len);
> +  int read(PoolCtx& pool, const object_t& oid, off_t off, void *buf, size_t len);
>   int remove(PoolCtx& pool, const object_t& oid);
>   int stat(PoolCtx& pool, const object_t& oid, uint64_t *psize, time_t *pmtime);
>   int trunc(PoolCtx& pool, const object_t& oid, size_t size);
> @@ -870,6 +873,15 @@ int RadosClient::write(PoolCtx& pool, const object_t& oid, off_t off, bufferlist
>   return len;
>  }
>
> +int RadosClient::write(PoolCtx& pool, const object_t& oid, off_t off, void *buf, size_t len)
> +{
> +  bufferptr bp = buffer::create_static(len, static_cast<char *>(buf));
> +  bufferlist bl;
> +
> +  bl.push_back(bp);
> +  return write(pool, oid, off, bl, len);
> +}
> +
>  int RadosClient::write_full(PoolCtx& pool, const object_t& oid, bufferlist& bl)
>  {
>   utime_t ut = g_clock.now();
> @@ -1116,6 +1128,46 @@ int RadosClient::read(PoolCtx& pool, const object_t& oid, off_t off, bufferlist&
>   return bl.length();
>  }
>
> +int RadosClient::read(PoolCtx& pool, const object_t& oid, off_t off, void *buf, size_t len)
> +{
> +  SnapContext snapc;
> +
> +  Mutex mylock("RadosClient::read::mylock");
> +  Cond cond;
> +  bool done;
> +  int r;
> +  Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
> +  bufferptr bp = buffer::create_static(len, static_cast<char *>(buf));
> +  bufferlist bl;
> +
> +  bl.push_back(bp);
> +  lock.Lock();
> +  ceph_object_layout layout = objecter->osdmap->make_object_layout(oid, pool.poolid);
> +  tid_t tid = objecter->get_tid();
> +  buffer_map[tid] = &bp;
> +  objecter->read_with_tid(oid, layout,
> +             off, len, pool.snap_seq, &bl, 0,
> +             onack, tid);
> +  lock.Unlock();
> +
> +  mylock.Lock();
> +  while (!done)
> +    cond.Wait(mylock);
> +  mylock.Unlock();
> +  buffer_map.erase(tid);
> +  dout(10) << "Objecter returned from read r=" << r << dendl;
> +
> +  if (r < 0)
> +    return r;
> +
> +  if (bl.length() < len) {
> +    dout(10) << "Returned length " << bl.length()
> +            << " less than original length "<< len << dendl;
> +  }
> +
> +  return bl.length();
> +}
> +
>  int RadosClient::stat(PoolCtx& pool, const object_t& oid, uint64_t *psize, time_t *pmtime)
>  {
>   SnapContext snapc;
> @@ -1251,6 +1303,13 @@ int RadosClient::getxattrs(PoolCtx& pool, const object_t& oid, map<std::string,
>   return r;
>  }
>
> +hash_map<tid_t, bufferptr*> RadosClient::buffer_map;
> +
> +bufferptr* RadosClient::fetch_buffer_func(tid_t tid)
> +{
> +  return buffer_map[tid];
> +}
> +
>  // ---------------------------------------------
>
>  namespace librados {
> @@ -1401,6 +1460,14 @@ int Rados::write(rados_pool_t pool, const string& o, off_t off, bufferlist& bl,
>   return ((RadosClient *)client)->write(*(RadosClient::PoolCtx *)pool, oid, off, bl, len);
>  }
>
> +int Rados::write(rados_pool_t pool, const string& o, off_t off, void *buf, size_t len)
> +{
> +  if (!client)
> +    return -EINVAL;
> +  object_t oid(o);
> +  return ((RadosClient *)client)->write(*(RadosClient::PoolCtx *)pool, oid, off, buf, len);
> +}
> +
>  int Rados::write_full(rados_pool_t pool, const string& o, bufferlist& bl)
>  {
>   if (!client)
> @@ -1433,6 +1500,14 @@ int Rados::read(rados_pool_t pool, const string& o, off_t off, bufferlist& bl, s
>   return ((RadosClient *)client)->read(*(RadosClient::PoolCtx *)pool, oid, off, bl, len);
>  }
>
> +int Rados::read(rados_pool_t pool, const string& o, off_t off, void *buf, size_t len)
> +{
> +  if (!client)
> +    return -EINVAL;
> +  object_t oid(o);
> +  return ((RadosClient *)client)->read(*(RadosClient::PoolCtx *)pool, oid, off, buf, len);
> +}
> +
>  int Rados::getxattr(rados_pool_t pool, const string& o, const char *name, bufferlist& bl)
>  {
>   if (!client)
> diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc
> index 4632267..75b4576 100644
> --- a/src/msg/SimpleMessenger.cc
> +++ b/src/msg/SimpleMessenger.cc
> @@ -51,7 +51,7 @@ static ostream& _prefix(SimpleMessenger *messenger) {
>  #define closed_socket() //dout(20) << "closed_socket " << --sockopen << dendl;
>  #define opened_socket() //dout(20) << "opened_socket " << ++sockopen << dendl;
>
> -
> +SimpleMessenger::fetch_buffer_callback_t SimpleMessenger::fetch_buffer_callback = 0;
>
>  /********************************************
>  * Accepter
> @@ -1786,36 +1786,47 @@ int SimpleMessenger::Pipe::read_message(Message **pm)
>   data_len = le32_to_cpu(header.data_len);
>   data_off = le32_to_cpu(header.data_off);
>   if (data_len) {
> -    int left = data_len;
> -    if (data_off & ~PAGE_MASK) {
> -      // head
> -      int head = MIN(PAGE_SIZE - (data_off & ~PAGE_MASK),
> -                    (unsigned)left);
> -      bufferptr bp = buffer::create(head);
> -      if (tcp_read( sd, bp.c_str(), head, messenger->timeout ) < 0)
> +    if (fetch_buffer_callback) {
> +      bufferptr *bpp = fetch_buffer_callback(header.tid);
> +      if (!bpp)
> +       goto allocate_buffer;
> +      if (tcp_read( sd, bpp->c_str(), data_len, messenger->timeout ) < 0)
>        goto out_dethrottle;
> -      data.push_back(bp);
> -      left -= head;
> -      dout(20) << "reader got data head " << head << dendl;
> -    }
> +      data.push_back(*bpp);
> +      dout(20) << "reader got data " << data_len << dendl;
> +    }else{
> +    allocate_buffer:
> +      int left = data_len;
> +      if (data_off & ~PAGE_MASK) {
> +       // head
> +       int head = MIN(PAGE_SIZE - (data_off & ~PAGE_MASK),
> +                      (unsigned)left);
> +       bufferptr bp = buffer::create(head);
> +       if (tcp_read( sd, bp.c_str(), head, messenger->timeout ) < 0)
> +         goto out_dethrottle;
> +       data.push_back(bp);
> +       left -= head;
> +       dout(20) << "reader got data head " << head << dendl;
> +      }
>
> -    // middle
> -    int middle = left & PAGE_MASK;
> -    if (middle > 0) {
> -      bufferptr bp = buffer::create_page_aligned(middle);
> -      if (tcp_read( sd, bp.c_str(), middle, messenger->timeout ) < 0)
> -       goto out_dethrottle;
> -      data.push_back(bp);
> -      left -= middle;
> -      dout(20) << "reader got data page-aligned middle " << middle << dendl;
> -    }
> +      // middle
> +      int middle = left & PAGE_MASK;
> +      if (middle > 0) {
> +       bufferptr bp = buffer::create_page_aligned(middle);
> +       if (tcp_read( sd, bp.c_str(), middle, messenger->timeout ) < 0)
> +         goto out_dethrottle;
> +       data.push_back(bp);
> +       left -= middle;
> +       dout(20) << "reader got data page-aligned middle " << middle << dendl;
> +      }
>
> -    if (left) {
> -      bufferptr bp = buffer::create(left);
> -      if (tcp_read( sd, bp.c_str(), left, messenger->timeout ) < 0)
> -       goto out_dethrottle;
> -      data.push_back(bp);
> -      dout(20) << "reader got data tail " << left << dendl;
> +      if (left) {
> +       bufferptr bp = buffer::create(left);
> +       if (tcp_read( sd, bp.c_str(), left, messenger->timeout ) < 0)
> +         goto out_dethrottle;
> +       data.push_back(bp);
> +       dout(20) << "reader got data tail " << left << dendl;
> +      }
>     }
>   }
>
> diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h
> index b4a0ef3..72a5a1b 100644
> --- a/src/msg/SimpleMessenger.h
> +++ b/src/msg/SimpleMessenger.h
> @@ -567,6 +567,9 @@ private:
>   SimpleMessenger *messenger; //hack to make dout macro work, will fix
>   int timeout;
>
> +  typedef bufferptr* (*fetch_buffer_callback_t) (tid_t);
> +  static fetch_buffer_callback_t fetch_buffer_callback;
> +
>  public:
>   SimpleMessenger() :
>     Messenger(entity_name_t()),
> @@ -580,6 +583,20 @@ public:
>     // for local dmsg delivery
>     dispatch_queue.local_pipe = new Pipe(this, Pipe::STATE_OPEN);
>   }
> +
> +  SimpleMessenger(fetch_buffer_callback_t callback) :
> +    Messenger(entity_name_t()),
> +    accepter(this),
> +    lock("SimpleMessenger::lock"), started(false), did_bind(false),
> +    dispatch_throttler(g_conf.ms_dispatch_throttle_bytes), need_addr(true),
> +    destination_stopped(true), my_type(-1),
> +    global_seq_lock("SimpleMessenger::global_seq_lock"), global_seq(0),
> +    reaper_thread(this), reaper_started(false), reaper_stop(false),
> +    dispatch_thread(this), messenger(this) {
> +    fetch_buffer_callback = callback;
> +    // for local dmsg delivery
> +    dispatch_queue.local_pipe = new Pipe(this, Pipe::STATE_OPEN);
> +  }
>   ~SimpleMessenger() {
>     delete dispatch_queue.local_pipe;
>   }
> diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h
> index a34c0a9..fd43795 100644
> --- a/src/osdc/Objecter.h
> +++ b/src/osdc/Objecter.h
> @@ -530,6 +530,21 @@ private:
>     o->outbl = pbl;
>     return op_submit(o);
>   }
> +  tid_t read_with_tid(const object_t& oid, ceph_object_layout ol,
> +            uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, int flags,
> +            Context *onfinish, tid_t tid) {
> +    vector<OSDOp> ops(1);
> +    ops[0].op.op = CEPH_OSD_OP_READ;
> +    ops[0].op.extent.offset = off;
> +    ops[0].op.extent.length = len;
> +    ops[0].op.extent.truncate_size = 0;
> +    ops[0].op.extent.truncate_seq = 0;
> +    Op *o = new Op(oid, ol, ops, flags, onfinish, 0);
> +    o->snapid = snap;
> +    o->outbl = pbl;
> +    o->tid = tid;
> +    return op_submit(o);
> +  }
>   tid_t read_trunc(const object_t& oid, ceph_object_layout ol,
>             uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, int flags,
>             uint64_t trunc_size, __u32 trunc_seq,
> @@ -725,6 +740,8 @@ private:
>
>   void list_objects(ListContext *p, Context *onfinish);
>
> +  tid_t get_tid(void) { return ++last_tid; }
> +
>   // -------------------------
>   // pool ops
>  private:
> --
> To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at  http://vger.kernel.org/majordomo-info.html
>
--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: read/write on RADOS using external buffer
  2010-10-19 22:27   ` Takuya ASADA
  2010-10-20  3:35     ` Colin McCabe
@ 2010-10-20  4:10     ` Gregory Farnum
  1 sibling, 0 replies; 18+ messages in thread
From: Gregory Farnum @ 2010-10-20  4:10 UTC (permalink / raw)
  To: Takuya ASADA; +Cc: Sage Weil, ceph-devel

Takuya:
At first glance this looks okay, but we're going to want to review it
carefully given the changes to SimpleMessenger, and check its impact
on performance, etc to decide if this is something we actually want to
merge, or if we'd like to implement this as a derivative messenger or
something. :)
Could you let us know how you've been testing this? :)
-Greg

On Tue, Oct 19, 2010 at 3:27 PM, Takuya ASADA <syuu@dokukino.com> wrote:
> Hi,
>
> I just implemented the patch to support external buffer on Rados::read() and Rados::write().
>
> diff --git a/src/include/librados.hpp b/src/include/librados.hpp
> index 06fa3b2..bfb0f5b 100644
> --- a/src/include/librados.hpp
> +++ b/src/include/librados.hpp
> @@ -63,8 +63,10 @@ public:
>   int create(pool_t pool, const std::string& oid, bool exclusive);
>
>   int write(pool_t pool, const std::string& oid, off_t off, bufferlist& bl, size_t len);
> +  int write(pool_t pool, const std::string& oid, off_t off, void *buf, size_t len);
>   int write_full(pool_t pool, const std::string& oid, bufferlist& bl);
>   int read(pool_t pool, const std::string& oid, off_t off, bufferlist& bl, size_t len);
> +  int read(pool_t pool, const std::string& oid, off_t off, void *buf, size_t len);
>   int remove(pool_t pool, const std::string& oid);
>   int trunc(pool_t pool, const std::string& oid, size_t size);
>
> @@ -135,4 +137,3 @@ public:
>  }
>
>  #endif
> -
> diff --git a/src/librados.cc b/src/librados.cc
> index 4c8a464..91e6a27 100644
> --- a/src/librados.cc
> +++ b/src/librados.cc
> @@ -72,11 +72,12 @@ class RadosClient : public Dispatcher
>
>   Mutex lock;
>   Cond cond;
> +  static hash_map<tid_t, bufferptr*> buffer_map;
> +  static bufferptr* fetch_buffer_func(tid_t tid);
>
> -
>  public:
>   RadosClient() : messenger(NULL), lock("radosclient") {
> -    messenger = new SimpleMessenger();
> +    messenger = new SimpleMessenger(&RadosClient::fetch_buffer_func);
>   }
>
>   ~RadosClient();
> @@ -132,8 +133,10 @@ public:
>   // io
>   int create(PoolCtx& pool, const object_t& oid, bool exclusive);
>   int write(PoolCtx& pool, const object_t& oid, off_t off, bufferlist& bl, size_t len);
> +  int write(PoolCtx& pool, const object_t& oid, off_t off, void *buf, size_t len);
>   int write_full(PoolCtx& pool, const object_t& oid, bufferlist& bl);
>   int read(PoolCtx& pool, const object_t& oid, off_t off, bufferlist& bl, size_t len);
> +  int read(PoolCtx& pool, const object_t& oid, off_t off, void *buf, size_t len);
>   int remove(PoolCtx& pool, const object_t& oid);
>   int stat(PoolCtx& pool, const object_t& oid, uint64_t *psize, time_t *pmtime);
>   int trunc(PoolCtx& pool, const object_t& oid, size_t size);
> @@ -870,6 +873,15 @@ int RadosClient::write(PoolCtx& pool, const object_t& oid, off_t off, bufferlist
>   return len;
>  }
>
> +int RadosClient::write(PoolCtx& pool, const object_t& oid, off_t off, void *buf, size_t len)
> +{
> +  bufferptr bp = buffer::create_static(len, static_cast<char *>(buf));
> +  bufferlist bl;
> +
> +  bl.push_back(bp);
> +  return write(pool, oid, off, bl, len);
> +}
> +
>  int RadosClient::write_full(PoolCtx& pool, const object_t& oid, bufferlist& bl)
>  {
>   utime_t ut = g_clock.now();
> @@ -1116,6 +1128,46 @@ int RadosClient::read(PoolCtx& pool, const object_t& oid, off_t off, bufferlist&
>   return bl.length();
>  }
>
> +int RadosClient::read(PoolCtx& pool, const object_t& oid, off_t off, void *buf, size_t len)
> +{
> +  SnapContext snapc;
> +
> +  Mutex mylock("RadosClient::read::mylock");
> +  Cond cond;
> +  bool done;
> +  int r;
> +  Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
> +  bufferptr bp = buffer::create_static(len, static_cast<char *>(buf));
> +  bufferlist bl;
> +
> +  bl.push_back(bp);
> +  lock.Lock();
> +  ceph_object_layout layout = objecter->osdmap->make_object_layout(oid, pool.poolid);
> +  tid_t tid = objecter->get_tid();
> +  buffer_map[tid] = &bp;
> +  objecter->read_with_tid(oid, layout,
> +             off, len, pool.snap_seq, &bl, 0,
> +             onack, tid);
> +  lock.Unlock();
> +
> +  mylock.Lock();
> +  while (!done)
> +    cond.Wait(mylock);
> +  mylock.Unlock();
> +  buffer_map.erase(tid);
> +  dout(10) << "Objecter returned from read r=" << r << dendl;
> +
> +  if (r < 0)
> +    return r;
> +
> +  if (bl.length() < len) {
> +    dout(10) << "Returned length " << bl.length()
> +            << " less than original length "<< len << dendl;
> +  }
> +
> +  return bl.length();
> +}
> +
>  int RadosClient::stat(PoolCtx& pool, const object_t& oid, uint64_t *psize, time_t *pmtime)
>  {
>   SnapContext snapc;
> @@ -1251,6 +1303,13 @@ int RadosClient::getxattrs(PoolCtx& pool, const object_t& oid, map<std::string,
>   return r;
>  }
>
> +hash_map<tid_t, bufferptr*> RadosClient::buffer_map;
> +
> +bufferptr* RadosClient::fetch_buffer_func(tid_t tid)
> +{
> +  return buffer_map[tid];
> +}
> +
>  // ---------------------------------------------
>
>  namespace librados {
> @@ -1401,6 +1460,14 @@ int Rados::write(rados_pool_t pool, const string& o, off_t off, bufferlist& bl,
>   return ((RadosClient *)client)->write(*(RadosClient::PoolCtx *)pool, oid, off, bl, len);
>  }
>
> +int Rados::write(rados_pool_t pool, const string& o, off_t off, void *buf, size_t len)
> +{
> +  if (!client)
> +    return -EINVAL;
> +  object_t oid(o);
> +  return ((RadosClient *)client)->write(*(RadosClient::PoolCtx *)pool, oid, off, buf, len);
> +}
> +
>  int Rados::write_full(rados_pool_t pool, const string& o, bufferlist& bl)
>  {
>   if (!client)
> @@ -1433,6 +1500,14 @@ int Rados::read(rados_pool_t pool, const string& o, off_t off, bufferlist& bl, s
>   return ((RadosClient *)client)->read(*(RadosClient::PoolCtx *)pool, oid, off, bl, len);
>  }
>
> +int Rados::read(rados_pool_t pool, const string& o, off_t off, void *buf, size_t len)
> +{
> +  if (!client)
> +    return -EINVAL;
> +  object_t oid(o);
> +  return ((RadosClient *)client)->read(*(RadosClient::PoolCtx *)pool, oid, off, buf, len);
> +}
> +
>  int Rados::getxattr(rados_pool_t pool, const string& o, const char *name, bufferlist& bl)
>  {
>   if (!client)
> diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc
> index 4632267..75b4576 100644
> --- a/src/msg/SimpleMessenger.cc
> +++ b/src/msg/SimpleMessenger.cc
> @@ -51,7 +51,7 @@ static ostream& _prefix(SimpleMessenger *messenger) {
>  #define closed_socket() //dout(20) << "closed_socket " << --sockopen << dendl;
>  #define opened_socket() //dout(20) << "opened_socket " << ++sockopen << dendl;
>
> -
> +SimpleMessenger::fetch_buffer_callback_t SimpleMessenger::fetch_buffer_callback = 0;
>
>  /********************************************
>  * Accepter
> @@ -1786,36 +1786,47 @@ int SimpleMessenger::Pipe::read_message(Message **pm)
>   data_len = le32_to_cpu(header.data_len);
>   data_off = le32_to_cpu(header.data_off);
>   if (data_len) {
> -    int left = data_len;
> -    if (data_off & ~PAGE_MASK) {
> -      // head
> -      int head = MIN(PAGE_SIZE - (data_off & ~PAGE_MASK),
> -                    (unsigned)left);
> -      bufferptr bp = buffer::create(head);
> -      if (tcp_read( sd, bp.c_str(), head, messenger->timeout ) < 0)
> +    if (fetch_buffer_callback) {
> +      bufferptr *bpp = fetch_buffer_callback(header.tid);
> +      if (!bpp)
> +       goto allocate_buffer;
> +      if (tcp_read( sd, bpp->c_str(), data_len, messenger->timeout ) < 0)
>        goto out_dethrottle;
> -      data.push_back(bp);
> -      left -= head;
> -      dout(20) << "reader got data head " << head << dendl;
> -    }
> +      data.push_back(*bpp);
> +      dout(20) << "reader got data " << data_len << dendl;
> +    }else{
> +    allocate_buffer:
> +      int left = data_len;
> +      if (data_off & ~PAGE_MASK) {
> +       // head
> +       int head = MIN(PAGE_SIZE - (data_off & ~PAGE_MASK),
> +                      (unsigned)left);
> +       bufferptr bp = buffer::create(head);
> +       if (tcp_read( sd, bp.c_str(), head, messenger->timeout ) < 0)
> +         goto out_dethrottle;
> +       data.push_back(bp);
> +       left -= head;
> +       dout(20) << "reader got data head " << head << dendl;
> +      }
>
> -    // middle
> -    int middle = left & PAGE_MASK;
> -    if (middle > 0) {
> -      bufferptr bp = buffer::create_page_aligned(middle);
> -      if (tcp_read( sd, bp.c_str(), middle, messenger->timeout ) < 0)
> -       goto out_dethrottle;
> -      data.push_back(bp);
> -      left -= middle;
> -      dout(20) << "reader got data page-aligned middle " << middle << dendl;
> -    }
> +      // middle
> +      int middle = left & PAGE_MASK;
> +      if (middle > 0) {
> +       bufferptr bp = buffer::create_page_aligned(middle);
> +       if (tcp_read( sd, bp.c_str(), middle, messenger->timeout ) < 0)
> +         goto out_dethrottle;
> +       data.push_back(bp);
> +       left -= middle;
> +       dout(20) << "reader got data page-aligned middle " << middle << dendl;
> +      }
>
> -    if (left) {
> -      bufferptr bp = buffer::create(left);
> -      if (tcp_read( sd, bp.c_str(), left, messenger->timeout ) < 0)
> -       goto out_dethrottle;
> -      data.push_back(bp);
> -      dout(20) << "reader got data tail " << left << dendl;
> +      if (left) {
> +       bufferptr bp = buffer::create(left);
> +       if (tcp_read( sd, bp.c_str(), left, messenger->timeout ) < 0)
> +         goto out_dethrottle;
> +       data.push_back(bp);
> +       dout(20) << "reader got data tail " << left << dendl;
> +      }
>     }
>   }
>
> diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h
> index b4a0ef3..72a5a1b 100644
> --- a/src/msg/SimpleMessenger.h
> +++ b/src/msg/SimpleMessenger.h
> @@ -567,6 +567,9 @@ private:
>   SimpleMessenger *messenger; //hack to make dout macro work, will fix
>   int timeout;
>
> +  typedef bufferptr* (*fetch_buffer_callback_t) (tid_t);
> +  static fetch_buffer_callback_t fetch_buffer_callback;
> +
>  public:
>   SimpleMessenger() :
>     Messenger(entity_name_t()),
> @@ -580,6 +583,20 @@ public:
>     // for local dmsg delivery
>     dispatch_queue.local_pipe = new Pipe(this, Pipe::STATE_OPEN);
>   }
> +
> +  SimpleMessenger(fetch_buffer_callback_t callback) :
> +    Messenger(entity_name_t()),
> +    accepter(this),
> +    lock("SimpleMessenger::lock"), started(false), did_bind(false),
> +    dispatch_throttler(g_conf.ms_dispatch_throttle_bytes), need_addr(true),
> +    destination_stopped(true), my_type(-1),
> +    global_seq_lock("SimpleMessenger::global_seq_lock"), global_seq(0),
> +    reaper_thread(this), reaper_started(false), reaper_stop(false),
> +    dispatch_thread(this), messenger(this) {
> +    fetch_buffer_callback = callback;
> +    // for local dmsg delivery
> +    dispatch_queue.local_pipe = new Pipe(this, Pipe::STATE_OPEN);
> +  }
>   ~SimpleMessenger() {
>     delete dispatch_queue.local_pipe;
>   }
> diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h
> index a34c0a9..fd43795 100644
> --- a/src/osdc/Objecter.h
> +++ b/src/osdc/Objecter.h
> @@ -530,6 +530,21 @@ private:
>     o->outbl = pbl;
>     return op_submit(o);
>   }
> +  tid_t read_with_tid(const object_t& oid, ceph_object_layout ol,
> +            uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, int flags,
> +            Context *onfinish, tid_t tid) {
> +    vector<OSDOp> ops(1);
> +    ops[0].op.op = CEPH_OSD_OP_READ;
> +    ops[0].op.extent.offset = off;
> +    ops[0].op.extent.length = len;
> +    ops[0].op.extent.truncate_size = 0;
> +    ops[0].op.extent.truncate_seq = 0;
> +    Op *o = new Op(oid, ol, ops, flags, onfinish, 0);
> +    o->snapid = snap;
> +    o->outbl = pbl;
> +    o->tid = tid;
> +    return op_submit(o);
> +  }
>   tid_t read_trunc(const object_t& oid, ceph_object_layout ol,
>             uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, int flags,
>             uint64_t trunc_size, __u32 trunc_seq,
> @@ -725,6 +740,8 @@ private:
>
>   void list_objects(ListContext *p, Context *onfinish);
>
> +  tid_t get_tid(void) { return ++last_tid; }
> +
>   // -------------------------
>   // pool ops
>  private:
> --
> To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at  http://vger.kernel.org/majordomo-info.html
>
--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: read/write on RADOS using external buffer
  2010-10-20  3:35     ` Colin McCabe
@ 2010-10-20  4:26       ` Sage Weil
  2010-10-20  5:55         ` Colin McCabe
  0 siblings, 1 reply; 18+ messages in thread
From: Sage Weil @ 2010-10-20  4:26 UTC (permalink / raw)
  To: Takuya ASADA; +Cc: ceph-devel

On Tue, 19 Oct 2010, Colin McCabe wrote:
> Hi Takuya,
> 
> Thanks for looking at this!
> 
> Unfortunately I think you have taken the wrong approach here. There
> isn't any need for thread-local data in the Objecter. It's not threads
> that the Objecter cares about, it's clients. Also, I don't think a
> change like this needs to involve SimpleMessenger and all that.

The problem Takuya is solving is that the messenger currently always 
allocates _new_ buffers for any data it reads off the network, including 
the data payload of read operations.  His goal is to avoid later doing a 
memcpy into the user provided buffer.

In principle, I think this is the right approach.  There are a few things 
I think we should change with the implementation before merging anything:

 - We should make the fetch_buffer_func a Dispatcher method, like all of 
the existing ms_* callbacks.  That avoids the single callback instance for 
the whole SimpleMessenger, and the weird constructor argument.  A helper 
method Messenger::ms_deliver_get_data_buffer() will probably be needed 
too.  This keeps the interface consistent, and allows any 
registered Dispatcher to specify buffers to read into for messages it 
expects.

 - The ms_get_data_buffer() should just take the ceph_msg_header* as an 
argument. That has a tid in it _and_ the message type--everything we might 
need to determine which message (reply) we are receiving and thus which 
buffer we should read into.

I pushed a msgr_zerocopy_read branch to ceph.git that implements this 
part (patch below).

 - SimpleMessenger::read_message() just needs to call 
ms_deliver_get_data_buffer() if data_len > 0.  If it returns true, use the 
provided bufferptr.

 - The existing Objecter::read() method returns the tid, which librados 
can then store in a private map of tid -> bufferptrs.  When it's 
ms_get_data_buffer() is called it can provide the bufferptr; no need to 
modify the Objecter at all!  Just make sure to verify that the entry 
exists in the map before using the operator[] (use find()).


The main corner case is this: if there is some osd cluster change, it's 
theoretically possible we could have two threads reading read replies to 
the same request off the wire at the same time.  Even if we carefully 
define which reply "wins", we need to make sure there isn't a stray thread 
read/accessing the buffer after the read() call returns.

I'm not sure what the solution to that one is yet.  The kclient doesn't 
have separate threads, and takes pains to yank the pages out from under 
incoming message; that approach doesn't work well with the SimpleMessenger 
threading model.  :/

sage


^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: read/write on RADOS using external buffer
  2010-10-20  4:26       ` Sage Weil
@ 2010-10-20  5:55         ` Colin McCabe
  2010-10-20 15:34           ` Sage Weil
  0 siblings, 1 reply; 18+ messages in thread
From: Colin McCabe @ 2010-10-20  5:55 UTC (permalink / raw)
  To: Sage Weil; +Cc: Takuya ASADA, ceph-devel

On Tue, Oct 19, 2010 at 9:26 PM, Sage Weil <sage@newdream.net> wrote:
>  - SimpleMessenger::read_message() just needs to call
> ms_deliver_get_data_buffer() if data_len > 0.  If it returns true, use the
> provided bufferptr.
>
>  - The existing Objecter::read() method returns the tid, which librados
> can then store in a private map of tid -> bufferptrs.  When it's
> ms_get_data_buffer() is called it can provide the bufferptr; no need to
> modify the Objecter at all!  Just make sure to verify that the entry
> exists in the map before using the operator[] (use find()).

If we want thread-local data, we should at least use
pthread_setspecific / pthread_getspecific. No need for tids and STL
maps. gettid() is a syscall and it will be slow.

> The main corner case is this: if there is some osd cluster change, it's
> theoretically possible we could have two threads reading read replies to
> the same request off the wire at the same time.  Even if we carefully
> define which reply "wins", we need to make sure there isn't a stray thread
> read/accessing the buffer after the read() call returns.

I just don't see why we have to care about threads at all. If we need
rados client-specific data, it should definitely be in class
RadosClient. Otherwise, we should just get rid of class RadosClient,
because it's useless.

> I'm not sure what the solution to that one is yet.  The kclient doesn't
> have separate threads, and takes pains to yank the pages out from under
> incoming message; that approach doesn't work well with the SimpleMessenger
> threading model.  :/

I think I can propose a patch which solves this problem without rados
client-specific data, threads, tids, maps, thread-local data, or
changes to SimpleMessenger. It will just involve modifying bufferlist
so that it can use a buffer that's passed to it, rather than
allocating its own. Then we need something like a freeze operation
that will stop it from trying to reallocate or resize the buffer. This
solution will have much more generality and probably will be useful
elsewhere too.

Colin
--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: read/write on RADOS using external buffer
  2010-10-20  5:55         ` Colin McCabe
@ 2010-10-20 15:34           ` Sage Weil
  2010-10-20 15:46             ` Jim Schutt
  2010-10-20 16:32             ` Colin McCabe
  0 siblings, 2 replies; 18+ messages in thread
From: Sage Weil @ 2010-10-20 15:34 UTC (permalink / raw)
  To: Colin McCabe; +Cc: Takuya ASADA, ceph-devel

[-- Attachment #1: Type: TEXT/PLAIN, Size: 2975 bytes --]

On Tue, 19 Oct 2010, Colin McCabe wrote:
> On Tue, Oct 19, 2010 at 9:26 PM, Sage Weil <sage@newdream.net> wrote:
> >  - SimpleMessenger::read_message() just needs to call
> > ms_deliver_get_data_buffer() if data_len > 0.  If it returns true, use the
> > provided bufferptr.
> >
> >  - The existing Objecter::read() method returns the tid, which librados
> > can then store in a private map of tid -> bufferptrs.  When it's
> > ms_get_data_buffer() is called it can provide the bufferptr; no need to
> > modify the Objecter at all!  Just make sure to verify that the entry
> > exists in the map before using the operator[] (use find()).
> 
> If we want thread-local data, we should at least use
> pthread_setspecific / pthread_getspecific. No need for tids and STL
> maps. gettid() is a syscall and it will be slow.

There's no pthread specific data or gettid() syscalls here.  The 'tid' is 
a transaction id generated by the Objecter that matchers up replies to 
requests.

> > The main corner case is this: if there is some osd cluster change, it's
> > theoretically possible we could have two threads reading read replies to
> > the same request off the wire at the same time.  Even if we carefully
> > define which reply "wins", we need to make sure there isn't a stray thread
> > read/accessing the buffer after the read() call returns.
> 
> I just don't see why we have to care about threads at all.

The corner case is when we are reading two replies for the same request 
off of two different sockets from two different OSDs.  If they are 
both reading directly into the user supplied buffer, we need to make sure 
they both stop before returning success to the user; after that point 
we're not longer allows to touch that memory.  That will probably require 
some locking and non-blocking IO, which can be tricky since the pthread 
stuff doesn't usually interact well with select/poll.

> I think I can propose a patch which solves this problem without rados
> client-specific data, threads, tids, maps, thread-local data, or
> changes to SimpleMessenger.

SimpleMessenger will change no matter what, because the core problem that 
we're trying to solve is that it allocates a new buffer when a message is 
being read off the wire.  See SimpleMessenger::read_message().  Somehow 
that needs to discover and use the user provided buffer instead.  That 
will invariably involve a callback of some sort to find the user buffer.  
And some means of yanking the buffer away to deal with the corner issue 
above.

> It will just involve modifying bufferlist
> so that it can use a buffer that's passed to it, rather than
> allocating its own.  Then we need something like a freeze operation
> that will stop it from trying to reallocate or resize the buffer. This
> solution will have much more generality and probably will be useful
> elsewhere too.

That sounds useful, but not in this particular case... :/

sage

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: read/write on RADOS using external buffer
  2010-10-20 15:34           ` Sage Weil
@ 2010-10-20 15:46             ` Jim Schutt
  2010-10-20 16:04               ` Sage Weil
  2010-10-20 16:32             ` Colin McCabe
  1 sibling, 1 reply; 18+ messages in thread
From: Jim Schutt @ 2010-10-20 15:46 UTC (permalink / raw)
  To: Sage Weil; +Cc: Colin McCabe, Takuya ASADA, ceph-devel


Hi,

On Wed, 2010-10-20 at 09:34 -0600, Sage Weil wrote:
> 
> > I think I can propose a patch which solves this problem without
> rados
> > client-specific data, threads, tids, maps, thread-local data, or
> > changes to SimpleMessenger.
> 
> SimpleMessenger will change no matter what, because the core problem
> that 
> we're trying to solve is that it allocates a new buffer when a message
> is 
> being read off the wire.  See SimpleMessenger::read_message(). 

FWIW, this buffer allocation is also an issue for trying to
implement an RDMA-based messenger. It would be nice to have
a pool of buffers to use that are already registered for RDMA,
to reduce the memory registration overhead.

-- Jim




^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: read/write on RADOS using external buffer
  2010-10-20 15:46             ` Jim Schutt
@ 2010-10-20 16:04               ` Sage Weil
  2010-10-20 16:47                 ` Jim Schutt
  0 siblings, 1 reply; 18+ messages in thread
From: Sage Weil @ 2010-10-20 16:04 UTC (permalink / raw)
  To: Jim Schutt; +Cc: Colin McCabe, Takuya ASADA, ceph-devel

On Wed, 20 Oct 2010, Jim Schutt wrote:
> Hi,
> 
> On Wed, 2010-10-20 at 09:34 -0600, Sage Weil wrote:
> > 
> > > I think I can propose a patch which solves this problem without
> > rados
> > > client-specific data, threads, tids, maps, thread-local data, or
> > > changes to SimpleMessenger.
> > 
> > SimpleMessenger will change no matter what, because the core problem 
> > that we're trying to solve is that it allocates a new buffer when a 
> > message is being read off the wire.  See 
> > SimpleMessenger::read_message().
> 
> FWIW, this buffer allocation is also an issue for trying to
> implement an RDMA-based messenger. It would be nice to have
> a pool of buffers to use that are already registered for RDMA,
> to reduce the memory registration overhead.

That much is easy enough if it doesn't matter which buffer you use for a 
particular message.  If the result of a specific read needs to be read 
into a specific buffer, then something like the mechanism we're talking 
about will be needed.  Do you know if RDMA will require that?

That's what the kernel client does, incidentally, except the callback is 
to find/allocate the whole message struct, not just the data payload 
portion.  We could do that here as well, although the memory reservation 
requirements on the userspace side aren't as critical as in the kernel.

sage

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: read/write on RADOS using external buffer
  2010-10-20 15:34           ` Sage Weil
  2010-10-20 15:46             ` Jim Schutt
@ 2010-10-20 16:32             ` Colin McCabe
  2010-10-20 16:46               ` Gregory Farnum
  2010-10-20 16:53               ` Sage Weil
  1 sibling, 2 replies; 18+ messages in thread
From: Colin McCabe @ 2010-10-20 16:32 UTC (permalink / raw)
  To: Sage Weil; +Cc: Takuya ASADA, ceph-devel

On Wed, Oct 20, 2010 at 8:34 AM, Sage Weil <sage@newdream.net> wrote:
> There's no pthread specific data or gettid() syscalls here.  The 'tid' is
> a transaction id generated by the Objecter that matchers up replies to
> requests.

Oh, ok. I misunderstood completely then :\

> SimpleMessenger will change no matter what, because the core problem that
> we're trying to solve is that it allocates a new buffer when a message is
> being read off the wire.  See SimpleMessenger::read_message().  Somehow
> that needs to discover and use the user provided buffer instead.  That
> will invariably involve a callback of some sort to find the user buffer.
> And some means of yanking the buffer away to deal with the corner issue
> above.

I can see now that we're always allocating a new bufferlist in
SimpleMessenger. I can see why we want this feature.

I wish we didn't have to have a callback just to get the buffer we're
trying to write into. It feels like that should somehow be grouped
with the rest of the transaction data. Why can't we add it to
Objector::Op's constructor, for example. There must be some obvious
reason but I don't see it yet.

cheers,
Colin
--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: read/write on RADOS using external buffer
  2010-10-20 16:32             ` Colin McCabe
@ 2010-10-20 16:46               ` Gregory Farnum
  2010-10-20 16:53               ` Sage Weil
  1 sibling, 0 replies; 18+ messages in thread
From: Gregory Farnum @ 2010-10-20 16:46 UTC (permalink / raw)
  To: Colin McCabe; +Cc: Sage Weil, Takuya ASADA, ceph-devel

On Wed, Oct 20, 2010 at 9:32 AM, Colin McCabe <cmccabe@alumni.cmu.edu> wrote:
> I wish we didn't have to have a callback just to get the buffer we're
> trying to write into. It feels like that should somehow be grouped
> with the rest of the transaction data. Why can't we add it to
> Objector::Op's constructor, for example. There must be some obvious
> reason but I don't see it yet.
That wouldn't work if we were sending a message that didn't go through
the Objecter, like any messages to the MDS. So getting buffers from
out of the Messenger is going to require a callback in some form, and
about all we can guarantee is that we'll have a tid, since that's part
of the Messenger interface.
We could conceivably add a function pointer to the Message class, and
use it to obtain buffers for responses to that Message. This actually
might work out better since it allows finer-grained decisions about
memory allocation?
-Greg

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: read/write on RADOS using external buffer
  2010-10-20 16:04               ` Sage Weil
@ 2010-10-20 16:47                 ` Jim Schutt
  0 siblings, 0 replies; 18+ messages in thread
From: Jim Schutt @ 2010-10-20 16:47 UTC (permalink / raw)
  To: Sage Weil; +Cc: Colin McCabe, Takuya ASADA, ceph-devel


On Wed, 2010-10-20 at 10:04 -0600, Sage Weil wrote:
> On Wed, 20 Oct 2010, Jim Schutt wrote:
> > Hi,
> > 
> > On Wed, 2010-10-20 at 09:34 -0600, Sage Weil wrote:
> > > 
> > > > I think I can propose a patch which solves this problem without
> > > rados
> > > > client-specific data, threads, tids, maps, thread-local data, or
> > > > changes to SimpleMessenger.
> > > 
> > > SimpleMessenger will change no matter what, because the core problem 
> > > that we're trying to solve is that it allocates a new buffer when a 
> > > message is being read off the wire.  See 
> > > SimpleMessenger::read_message().
> > 
> > FWIW, this buffer allocation is also an issue for trying to
> > implement an RDMA-based messenger. It would be nice to have
> > a pool of buffers to use that are already registered for RDMA,
> > to reduce the memory registration overhead.
> 
> That much is easy enough if it doesn't matter which buffer you use for a 
> particular message.  If the result of a specific read needs to be read 
> into a specific buffer, then something like the mechanism we're talking 
> about will be needed.  Do you know if RDMA will require that?

No, I don't think that will be required.

> 
> That's what the kernel client does, incidentally, except the callback is 
> to find/allocate the whole message struct, not just the data payload 
> portion.  We could do that here as well, although the memory reservation 
> requirements on the userspace side aren't as critical as in the kernel.

I think it will be OK to have the data payload buffer separate:
for RDMA we'd need to receive the header information into a buffer
pre-posted on the receive queue.  It will contain a remote key 
we'd use when queuing the RDMA read operation for the payload.

-- Jim

> 
> sage
> 



^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: read/write on RADOS using external buffer
  2010-10-20 16:32             ` Colin McCabe
  2010-10-20 16:46               ` Gregory Farnum
@ 2010-10-20 16:53               ` Sage Weil
  2010-10-20 17:04                 ` Colin McCabe
  1 sibling, 1 reply; 18+ messages in thread
From: Sage Weil @ 2010-10-20 16:53 UTC (permalink / raw)
  To: Colin McCabe; +Cc: Takuya ASADA, ceph-devel

[-- Attachment #1: Type: TEXT/PLAIN, Size: 1938 bytes --]

On Wed, 20 Oct 2010, Colin McCabe wrote:
> > SimpleMessenger will change no matter what, because the core problem that
> > we're trying to solve is that it allocates a new buffer when a message is
> > being read off the wire.  See SimpleMessenger::read_message().  Somehow
> > that needs to discover and use the user provided buffer instead.  That
> > will invariably involve a callback of some sort to find the user buffer.
> > And some means of yanking the buffer away to deal with the corner issue
> > above.
> 
> I can see now that we're always allocating a new bufferlist in
> SimpleMessenger. I can see why we want this feature.
> 
> I wish we didn't have to have a callback just to get the buffer we're
> trying to write into. It feels like that should somehow be grouped
> with the rest of the transaction data. Why can't we add it to
> Objector::Op's constructor, for example. There must be some obvious
> reason but I don't see it yet.

Well, we could push it into the Objecter.  We could also try to push it 
all the way down into the messenger, my maintaining a map of <type,tid> to 
buffer or something.  Mainly I want to avoid changing too much of the 
intervening interfaces if we can avoid it.  That may be possible, though.  
If the Objecter::read() is given a non-empty bufferlist* to read into, it 
could advertise that buffer to the messenger and the messenger could 
handle it all.

But the same problem comes up: we need to also have an interface to take 
the buffer back, which will mean some interesting interaction with the 
reader threads.

The kernel messenger avoids this, btw, because it is event driven: we wake 
up when there is socket activity and do non-blocking reads/writes, so we 
can lock the buffers while doing that IO.  The SimpleMessenger 
implementation uses threads, and IIRC the pthread locking  
and select/poll interfaces don't play well together.

sage
 

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: read/write on RADOS using external buffer
  2010-10-20 16:53               ` Sage Weil
@ 2010-10-20 17:04                 ` Colin McCabe
  2010-11-15 18:07                   ` Sage Weil
  0 siblings, 1 reply; 18+ messages in thread
From: Colin McCabe @ 2010-10-20 17:04 UTC (permalink / raw)
  To: Sage Weil; +Cc: Takuya ASADA, ceph-devel

On Wed, Oct 20, 2010 at 9:53 AM, Sage Weil <sage@newdream.net> wrote:
> On Wed, 20 Oct 2010, Colin McCabe wrote:
>> > SimpleMessenger will change no matter what, because the core problem that
>> > we're trying to solve is that it allocates a new buffer when a message is
>> > being read off the wire.  See SimpleMessenger::read_message().  Somehow
>> > that needs to discover and use the user provided buffer instead.  That
>> > will invariably involve a callback of some sort to find the user buffer.
>> > And some means of yanking the buffer away to deal with the corner issue
>> > above.
>>
>> I can see now that we're always allocating a new bufferlist in
>> SimpleMessenger. I can see why we want this feature.
>>
>> I wish we didn't have to have a callback just to get the buffer we're
>> trying to write into. It feels like that should somehow be grouped
>> with the rest of the transaction data. Why can't we add it to
>> Objector::Op's constructor, for example. There must be some obvious
>> reason but I don't see it yet.
>
> Well, we could push it into the Objecter.  We could also try to push it
> all the way down into the messenger, my maintaining a map of <type,tid> to
> buffer or something.  Mainly I want to avoid changing too much of the
> intervening interfaces if we can avoid it.  That may be possible, though.
> If the Objecter::read() is given a non-empty bufferlist* to read into, it
> could advertise that buffer to the messenger and the messenger could
> handle it all.

I kind of like the idea of always having the Objecter to pass in a
bufferlist. If the user wants to allocate a new buffer each time, he
knows how to call bufferlist::new().

> But the same problem comes up: we need to also have an interface to take
> the buffer back, which will mean some interesting interaction with the
> reader threads.

Yeah, we would need to know when it's safe for the client to read /
reuse the buffer they passed in.

Colin

> The kernel messenger avoids this, btw, because it is event driven: we wake
> up when there is socket activity and do non-blocking reads/writes, so we
> can lock the buffers while doing that IO.  The SimpleMessenger
> implementation uses threads, and IIRC the pthread locking
> and select/poll interfaces don't play well together.
>
> sage
>
--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: read/write on RADOS using external buffer
  2010-10-20 17:04                 ` Colin McCabe
@ 2010-11-15 18:07                   ` Sage Weil
  0 siblings, 0 replies; 18+ messages in thread
From: Sage Weil @ 2010-11-15 18:07 UTC (permalink / raw)
  To: Takuya ASADA; +Cc: ceph-devel

I got around to implementing this and merged it into the unstable branch.  
See commit 7f38858c0c19db36c5ecf36cb4d333579981c811.  From my limited 
testing it seems to work fine.  Currently it only triggers via the 
librados C interface (currently the only place where there is a user 
provided buffer).

Eventually libceph/Client.h should be updated to do the same, but it's a 
bit more involved.

sage

^ permalink raw reply	[flat|nested] 18+ messages in thread

end of thread, other threads:[~2010-11-15 18:03 UTC | newest]

Thread overview: 18+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2010-09-13 11:57 read/write on RADOS using external buffer Takuya ASADA
2010-09-14 18:44 ` Sage Weil
2010-09-14 21:03   ` Takuya ASADA
2010-09-14 21:10     ` Sage Weil
2010-10-19 22:27   ` Takuya ASADA
2010-10-20  3:35     ` Colin McCabe
2010-10-20  4:26       ` Sage Weil
2010-10-20  5:55         ` Colin McCabe
2010-10-20 15:34           ` Sage Weil
2010-10-20 15:46             ` Jim Schutt
2010-10-20 16:04               ` Sage Weil
2010-10-20 16:47                 ` Jim Schutt
2010-10-20 16:32             ` Colin McCabe
2010-10-20 16:46               ` Gregory Farnum
2010-10-20 16:53               ` Sage Weil
2010-10-20 17:04                 ` Colin McCabe
2010-11-15 18:07                   ` Sage Weil
2010-10-20  4:10     ` Gregory Farnum

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.