All of lore.kernel.org
 help / color / mirror / Atom feed
From: majianpeng <majianpeng@gmail.com>
To: sage <sage@inktank.com>
Cc: "Yan, Zheng" <zheng.z.yan@intel.com>,
	ceph-devel <ceph-devel@vger.kernel.org>,
	linux-fsdevel <linux-fsdevel@vger.kernel.org>
Subject: [PATCH 2/2] ceph: Implement writev/pwritev for sync operation.
Date: Tue, 3 Sep 2013 16:52:14 +0800	[thread overview]
Message-ID: <201309031652122920661@gmail.com> (raw)

For writev/pwritev sync-operatoin, ceph only do the first iov.
It don't think other iovs.Now implement this.
I divided the write-sync-operation into two functions.One for
direct-write,other for none-direct-sync-write.This is because for
none-direct-sync-write we can merge iovs to one.But for direct-write,
we can't merge iovs.

Signed-off-by: Jianpeng Ma <majianpeng@gmail.com>
---
 fs/ceph/file.c | 328 +++++++++++++++++++++++++++++++++++++++++++--------------
 1 file changed, 248 insertions(+), 80 deletions(-)

diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 7d6a3ee..42c97b3 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -533,17 +533,19 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)
 	}
 }
 
+
 /*
- * Synchronous write, straight from __user pointer or user pages (if
- * O_DIRECT).
+ * Synchronous write, straight from __user pointer or user pages.
  *
  * If write spans object boundary, just do multiple writes.  (For a
  * correct atomic write, we should e.g. take write locks on all
  * objects, rollback on failure, etc.)
  */
-static ssize_t ceph_sync_write(struct file *file, const char __user *data,
-			       size_t left, loff_t pos, loff_t *ppos)
+static ssize_t
+ceph_sync_direct_write(struct kiocb *iocb, const struct iovec *iov,
+		       unsigned long nr_segs, size_t count)
 {
+	struct file *file = iocb->ki_filp;
 	struct inode *inode = file_inode(file);
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
@@ -557,59 +559,55 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data,
 	int written = 0;
 	int flags;
 	int check_caps = 0;
-	int page_align, io_align;
-	unsigned long buf_align;
-	int ret;
+	int page_align;
+	int ret, i;
 	struct timespec mtime = CURRENT_TIME;
-	bool own_pages = false;
+	loff_t pos = iocb->ki_pos;
 
 	if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
 		return -EROFS;
 
-	dout("sync_write on file %p %lld~%u %s\n", file, pos,
-	     (unsigned)left, (file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
+	dout("sync_direct_write on file %p %lld~%u\n", file, pos,
+	     (unsigned)count);
 
-	ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + left);
+	ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
 	if (ret < 0)
 		return ret;
 
 	ret = invalidate_inode_pages2_range(inode->i_mapping,
 					    pos >> PAGE_CACHE_SHIFT,
-					    (pos + left) >> PAGE_CACHE_SHIFT);
+					    (pos + count) >> PAGE_CACHE_SHIFT);
 	if (ret < 0)
 		dout("invalidate_inode_pages2_range returned %d\n", ret);
 
 	flags = CEPH_OSD_FLAG_ORDERSNAP |
 		CEPH_OSD_FLAG_ONDISK |
 		CEPH_OSD_FLAG_WRITE;
-	if ((file->f_flags & (O_SYNC|O_DIRECT)) == 0)
-		flags |= CEPH_OSD_FLAG_ACK;
-	else
-		num_ops++;	/* Also include a 'startsync' command. */
+	num_ops++;	/* Also include a 'startsync' command. */
 
-	/*
-	 * we may need to do multiple writes here if we span an object
-	 * boundary.  this isn't atomic, unfortunately.  :(
-	 */
-more:
-	io_align = pos & ~PAGE_MASK;
-	buf_align = (unsigned long)data & ~PAGE_MASK;
-	len = left;
+	for (i = 0; i < nr_segs && count; i++) {
+		void __user *data = iov[i].iov_base;
+		size_t left;
 
-	snapc = ci->i_snap_realm->cached_context;
-	vino = ceph_vino(inode);
-	req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
-				    vino, pos, &len, num_ops,
-				    CEPH_OSD_OP_WRITE, flags, snapc,
-				    ci->i_truncate_seq, ci->i_truncate_size,
-				    false);
-	if (IS_ERR(req))
-		return PTR_ERR(req);
+		left = min(count, iov[i].iov_len);
+more:
+		page_align = (unsigned long)data & ~PAGE_MASK;
+		len = left;
+
+		snapc = ci->i_snap_realm->cached_context;
+		vino = ceph_vino(inode);
+		req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
+					    vino, pos, &len, num_ops,
+					    CEPH_OSD_OP_WRITE, flags, snapc,
+					    ci->i_truncate_seq,
+					    ci->i_truncate_size,
+					    false);
+		if (IS_ERR(req)) {
+			ret = PTR_ERR(req);
+			goto out;
+		}
 
-	/* write from beginning of first page, regardless of io alignment */
-	page_align = file->f_flags & O_DIRECT ? buf_align : io_align;
-	num_pages = calc_pages_for(page_align, len);
-	if (file->f_flags & O_DIRECT) {
+		num_pages = calc_pages_for(page_align, len);
 		pages = ceph_get_direct_page_vector(data, num_pages, false);
 		if (IS_ERR(pages)) {
 			ret = PTR_ERR(pages);
@@ -621,61 +619,229 @@ more:
 		 * may block.
 		 */
 		truncate_inode_pages_range(inode->i_mapping, pos,
-					   (pos+len) | (PAGE_CACHE_SIZE-1));
-	} else {
+				   (pos+len) | (PAGE_CACHE_SIZE-1));
+		osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
+						false, false);
+
+		/* BUG_ON(vino.snap != CEPH_NOSNAP); */
+		ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
+
+		ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
+		if (!ret)
+			ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
+
+		ceph_put_page_vector(pages, num_pages, false);
+
+out:
+		ceph_osdc_put_request(req);
+		if (ret == 0) {
+			pos += len;
+			written += len;
+			left -= len;
+			count -= len;
+			data += len;
+			if (left)
+				goto more;
+
+			ret = written;
+			if (pos > i_size_read(inode))
+				check_caps = ceph_inode_set_size(inode, pos);
+				if (check_caps)
+					ceph_check_caps(ceph_inode(inode),
+							CHECK_CAPS_AUTHONLY,
+							NULL);
+		} else {
+			if (ret != -EOLDSNAPC && written > 0)
+				ret = written;
+			break;
+		}
+	}
+
+	if (ret > 0)
+		iocb->ki_pos = pos;
+	return ret;
+}
+
+
+/*
+ * Synchronous write, straight from __user pointer or user pages.
+ *
+ * If write spans object boundary, just do multiple writes.  (For a
+ * correct atomic write, we should e.g. take write locks on all
+ * objects, rollback on failure, etc.)
+ */
+static ssize_t ceph_sync_write(struct kiocb *iocb, const struct iovec *iov,
+			       unsigned long nr_segs, size_t count)
+{
+	struct file *file = iocb->ki_filp;
+	struct inode *inode = file_inode(file);
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+	struct ceph_snap_context *snapc;
+	struct ceph_vino vino;
+	struct ceph_osd_request *req;
+	int num_ops = 1;
+	struct page **pages;
+	int num_pages;
+	u64 len;
+	int written = 0;
+	int flags;
+	int check_caps = 0;
+	int ret, i;
+	struct timespec mtime = CURRENT_TIME;
+	loff_t pos = iocb->ki_pos;
+	struct iovec *iov_clone;
+
+	if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
+		return -EROFS;
+
+	dout("sync_write on file %p %lld~%u\n", file, pos, (unsigned)count);
+
+	ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
+	if (ret < 0)
+		return ret;
+
+	ret = invalidate_inode_pages2_range(inode->i_mapping,
+					    pos >> PAGE_CACHE_SHIFT,
+					    (pos + count) >> PAGE_CACHE_SHIFT);
+	if (ret < 0)
+		dout("invalidate_inode_pages2_range returned %d\n", ret);
+
+	flags = CEPH_OSD_FLAG_ORDERSNAP |
+		CEPH_OSD_FLAG_ONDISK |
+		CEPH_OSD_FLAG_WRITE |
+		CEPH_OSD_FLAG_ACK;
+
+	iov_clone = kmalloc(nr_segs * sizeof(struct iovec), GFP_KERNEL);
+	if (iov_clone == NULL)
+		return -ENOMEM;
+	memcpy(iov_clone, iov, nr_segs * sizeof(struct iovec));
+
+	for (i = 0; i < nr_segs && count; i++) {
+		void __user *data;
+		size_t left;
+
+		left = count;
+more:
+		len = left;
+
+		snapc = ci->i_snap_realm->cached_context;
+		vino = ceph_vino(inode);
+		req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
+					    vino, pos, &len, num_ops,
+					    CEPH_OSD_OP_WRITE, flags, snapc,
+					    ci->i_truncate_seq,
+					    ci->i_truncate_size,
+					    false);
+		if (IS_ERR(req)) {
+			ret = PTR_ERR(req);
+			goto out;
+		}
+
+		/*
+		 * write from beginning of first page,
+		 * regardless of io alignment
+		 */
+		num_pages = (len + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
+
 		pages = ceph_alloc_page_vector(num_pages, GFP_NOFS);
 		if (IS_ERR(pages)) {
 			ret = PTR_ERR(pages);
 			goto out;
 		}
-		ret = ceph_copy_user_to_page_vector(pages, data, pos, len);
+
+		if (len <= iov_clone[i].iov_len) {
+			data = iov_clone[i].iov_base;
+			ret = ceph_copy_user_to_page_vector(pages,
+								data, 0, len);
+			if (ret > 0) {
+				iov_clone[i].iov_base += ret;
+				iov_clone[i].iov_len -= ret;
+			}
+		} else {
+			int j, l, k = 0, copyed = 0;
+			size_t tmp = len;
+
+			for (j = i; j < nr_segs && tmp; j++) {
+				data = iov_clone[j].iov_base;
+				l = iov_clone[j].iov_len;
+
+				if (tmp < l) {
+					ret = ceph_copy_user_to_page_vector(&pages[k],
+									    data,
+									    copyed,
+									    tmp);
+					iov_clone[j].iov_len -= ret;
+					iov_clone[j].iov_base += ret;
+					break;
+				} else if (l) {
+					ret = ceph_copy_user_to_page_vector(&pages[k],
+									    data,
+									    copyed,
+									    l);
+					if (ret < 0)
+						break;
+					iov_clone[j].iov_len = 0;
+					copyed += ret;
+					tmp -= ret;
+					k = calc_pages_for(0, copyed + 1) - 1;
+				}
+			}
+
+			/*
+			 * For this case,it will call for action.i will add one
+			 * But iov_clone[j].iov_len maybe not zero.
+			 */
+			if (left == len)
+				i = j - 1;
+		}
+
 		if (ret < 0) {
 			ceph_release_page_vector(pages, num_pages);
 			goto out;
 		}
 
-		if ((file->f_flags & O_SYNC) == 0) {
-			/* get a second commit callback */
-			req->r_unsafe_callback = ceph_sync_write_unsafe;
-			req->r_inode = inode;
-			own_pages = true;
-		}
-	}
-	osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
-					false, own_pages);
+		/* get a second commit callback */
+		req->r_unsafe_callback = ceph_sync_write_unsafe;
+		req->r_inode = inode;
 
-	/* BUG_ON(vino.snap != CEPH_NOSNAP); */
-	ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
+		osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0,
+						false, true);
 
-	ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
-	if (!ret)
-		ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
+		/* BUG_ON(vino.snap != CEPH_NOSNAP); */
+		ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
 
-	if (file->f_flags & O_DIRECT)
-		ceph_put_page_vector(pages, num_pages, false);
-	else if (file->f_flags & O_SYNC)
-		ceph_release_page_vector(pages, num_pages);
+		ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
+		if (!ret)
+			ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
 
 out:
-	ceph_osdc_put_request(req);
-	if (ret == 0) {
-		pos += len;
-		written += len;
-		left -= len;
-		data += len;
-		if (left)
-			goto more;
-
-		ret = written;
-		*ppos = pos;
-		if (pos > i_size_read(inode))
-			check_caps = ceph_inode_set_size(inode, pos);
-		if (check_caps)
-			ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY,
-					NULL);
-	} else if (ret != -EOLDSNAPC && written > 0) {
-		ret = written;
+		ceph_osdc_put_request(req);
+		if (ret == 0) {
+			pos += len;
+			written += len;
+			left -= len;
+			count -= len;
+			if (left)
+				goto more;
+
+			ret = written;
+			if (pos > i_size_read(inode))
+				check_caps = ceph_inode_set_size(inode, pos);
+				if (check_caps)
+					ceph_check_caps(ceph_inode(inode),
+							CHECK_CAPS_AUTHONLY,
+							NULL);
+		} else {
+			if (ret != -EOLDSNAPC && written > 0)
+				ret = written;
+			break;
+		}
 	}
+
+	if (ret > 0)
+		iocb->ki_pos = pos;
+	kfree(iov_clone);
 	return ret;
 }
 
@@ -843,11 +1009,13 @@ retry_snap:
 	     inode, ceph_vinop(inode), pos, count, ceph_cap_string(got));
 
 	if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
-	    (iocb->ki_filp->f_flags & O_DIRECT) ||
-	    (fi->flags & CEPH_F_SYNC)) {
+	    (file->f_flags & O_DIRECT) || (fi->flags & CEPH_F_SYNC)) {
 		mutex_unlock(&inode->i_mutex);
-		written = ceph_sync_write(file, iov->iov_base, count,
-					  pos, &iocb->ki_pos);
+		if (file->f_flags & O_DIRECT)
+			written = ceph_sync_direct_write(iocb, iov,
+							 nr_segs, count);
+		else
+			written = ceph_sync_write(iocb, iov, nr_segs, count);
 		if (written == -EOLDSNAPC) {
 			dout("aio_write %p %llx.%llx %llu~%u"
 				"got EOLDSNAPC, retrying\n",
-- 
1.8.1.2

             reply	other threads:[~2013-09-03  8:52 UTC|newest]

Thread overview: 5+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2013-09-03  8:52 majianpeng [this message]
2013-09-04 13:17 ` [PATCH 2/2] ceph: Implement writev/pwritev for sync operation Yan, Zheng
2013-09-04 13:20 ` Yan, Zheng
2013-09-06  0:46   ` majianpeng
2013-09-06  1:09     ` Yan, Zheng

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=201309031652122920661@gmail.com \
    --to=majianpeng@gmail.com \
    --cc=ceph-devel@vger.kernel.org \
    --cc=linux-fsdevel@vger.kernel.org \
    --cc=sage@inktank.com \
    --cc=zheng.z.yan@intel.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.