From mboxrd@z Thu Jan 1 00:00:00 1970 From: Alexander Aring Date: Fri, 26 Mar 2021 13:33:32 -0400 Subject: [Cluster-devel] [PATCHv3 dlm/next 3/8] fs: dlm: make buffer handling per msg In-Reply-To: <20210326173337.44231-1-aahringo@redhat.com> References: <20210326173337.44231-1-aahringo@redhat.com> Message-ID: <20210326173337.44231-4-aahringo@redhat.com> List-Id: To: cluster-devel.redhat.com MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit This patch makes the void pointer handle for lowcomms functionality per message and not per page allocation entry. A refcount handling for the handle was added to keep the message alive until the user doesn't need it anymore. There exists now a per message callback which will be called when allocating a new buffer. This callback will be guaranteed to be called according the order of the sending buffer, which can be used that the caller increments a sequence number. Signed-off-by: Alexander Aring --- fs/dlm/lowcomms.c | 100 +++++++++++++++++++++++++++++++++++++++++----- fs/dlm/lowcomms.h | 5 ++- fs/dlm/midcomms.c | 8 +++- 3 files changed, 101 insertions(+), 12 deletions(-) diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index 73cc1809050a..ba782ea84281 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -114,6 +114,17 @@ struct writequeue_entry { int end; int users; struct connection *con; + struct list_head msgs; + struct kref ref; +}; + +struct dlm_msg { + struct writequeue_entry *entry; + void *ppc; + int len; + + struct list_head list; + struct kref ref; }; struct dlm_node_addr { @@ -976,12 +987,36 @@ static int accept_from_sock(struct listen_connection *con) return result; } -static void free_entry(struct writequeue_entry *e) +static void dlm_page_release(struct kref *kref) { + struct writequeue_entry *e = container_of(kref, struct writequeue_entry, + ref); + __free_page(e->page); kfree(e); } +static void dlm_msg_release(struct kref *kref) +{ + struct dlm_msg *msg = container_of(kref, struct dlm_msg, ref); + + kref_put(&msg->entry->ref, dlm_page_release); + kfree(msg); +} + +static void free_entry(struct writequeue_entry *e) +{ + struct dlm_msg *msg, *tmp; + + list_for_each_entry_safe(msg, tmp, &e->msgs, list) { + list_del(&msg->list); + kref_put(&msg->ref, dlm_msg_release); + } + + list_del(&e->list); + kref_put(&e->ref, dlm_page_release); +} + /* * writequeue_entry_complete - try to delete and free write queue entry * @e: write queue entry to try to delete @@ -994,10 +1029,8 @@ static void writequeue_entry_complete(struct writequeue_entry *e, int completed) e->offset += completed; e->len -= completed; - if (e->len == 0 && e->users == 0) { - list_del(&e->list); + if (e->len == 0 && e->users == 0) free_entry(e); - } } /* @@ -1363,12 +1396,16 @@ static struct writequeue_entry *new_writequeue_entry(struct connection *con, entry->con = con; entry->users = 1; + kref_init(&entry->ref); + INIT_LIST_HEAD(&entry->msgs); return entry; } static struct writequeue_entry *new_wq_entry(struct connection *con, int len, - gfp_t allocation, char **ppc) + gfp_t allocation, char **ppc, + void (*cb)(void *buf, void *priv), + void *priv) { struct writequeue_entry *e; @@ -1376,7 +1413,12 @@ static struct writequeue_entry *new_wq_entry(struct connection *con, int len, if (!list_empty(&con->writequeue)) { e = list_last_entry(&con->writequeue, struct writequeue_entry, list); if (DLM_WQ_REMAIN_BYTES(e) >= len) { + kref_get(&e->ref); + *ppc = page_address(e->page) + e->end; + if (cb) + cb(*ppc, priv); + e->end += len; e->users++; spin_unlock(&con->writequeue_lock); @@ -1390,19 +1432,26 @@ static struct writequeue_entry *new_wq_entry(struct connection *con, int len, if (!e) return NULL; + kref_get(&e->ref); *ppc = page_address(e->page); e->end += len; spin_lock(&con->writequeue_lock); + if (cb) + cb(*ppc, priv); + list_add_tail(&e->list, &con->writequeue); spin_unlock(&con->writequeue_lock); return e; }; -void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc) +void *dlm_lowcomms_new_buffer(int nodeid, int len, gfp_t allocation, char **ppc, + void (*cb)(void *buf, void *priv), void *priv) { + struct writequeue_entry *e; struct connection *con; + struct dlm_msg *msg; if (len > DEFAULT_BUFFER_SIZE || len < sizeof(struct dlm_header)) { @@ -1416,16 +1465,36 @@ void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc) if (!con) return NULL; - return new_wq_entry(con, len, allocation, ppc); + msg = kzalloc(sizeof(*msg), allocation); + if (!msg) + return NULL; + + kref_init(&msg->ref); + + e = new_wq_entry(con, len, allocation, ppc, cb, priv); + if (!e) { + kfree(msg); + return NULL; + } + + msg->ppc = *ppc; + msg->len = len; + msg->entry = e; + + return msg; } void dlm_lowcomms_commit_buffer(void *mh) { - struct writequeue_entry *e = (struct writequeue_entry *)mh; + struct dlm_msg *msg = mh; + struct writequeue_entry *e = msg->entry; struct connection *con = e->con; int users; spin_lock(&con->writequeue_lock); + list_add(&msg->list, &e->msgs); + kref_get(&msg->ref); + users = --e->users; if (users) goto out; @@ -1441,6 +1510,20 @@ void dlm_lowcomms_commit_buffer(void *mh) return; } +void dlm_lowcomms_put_buffer(void *mh) +{ + struct dlm_msg *msg = mh; + + kref_put(&msg->ref, dlm_msg_release); +} + +void dlm_lowcomms_get_buffer(void *mh) +{ + struct dlm_msg *msg = mh; + + kref_get(&msg->ref); +} + /* Send a message */ static void send_to_sock(struct connection *con) { @@ -1519,7 +1602,6 @@ static void clean_one_writequeue(struct connection *con) spin_lock(&con->writequeue_lock); list_for_each_entry_safe(e, safe, &con->writequeue, list) { - list_del(&e->list); free_entry(e); } spin_unlock(&con->writequeue_lock); diff --git a/fs/dlm/lowcomms.h b/fs/dlm/lowcomms.h index 48bbc4e18761..fa735497dad8 100644 --- a/fs/dlm/lowcomms.h +++ b/fs/dlm/lowcomms.h @@ -22,11 +22,14 @@ void dlm_lowcomms_shutdown(void); void dlm_lowcomms_stop(void); void dlm_lowcomms_exit(void); int dlm_lowcomms_close(int nodeid); -void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc); +void *dlm_lowcomms_new_buffer(int nodeid, int len, gfp_t allocation, char **ppc, + void (*cb)(void *buf, void *priv), void *priv); void dlm_lowcomms_commit_buffer(void *mh); int dlm_lowcomms_connect_node(int nodeid); int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark); int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len); +void dlm_lowcomms_put_buffer(void *mh); +void dlm_lowcomms_get_buffer(void *mh); #endif /* __LOWCOMMS_DOT_H__ */ diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c index bbcb242e6101..2ea0449a82ab 100644 --- a/fs/dlm/midcomms.c +++ b/fs/dlm/midcomms.c @@ -30,23 +30,27 @@ void *dlm_midcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc) { - return dlm_lowcomms_get_buffer(nodeid, len, allocation, ppc); + return dlm_lowcomms_new_buffer(nodeid, len, allocation, ppc, NULL, + NULL); } void dlm_midcomms_commit_buffer(void *mh) { dlm_lowcomms_commit_buffer(mh); + dlm_lowcomms_put_buffer(mh); } void *dlm_midcomms_stateless_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc) { - return dlm_lowcomms_get_buffer(nodeid, len, allocation, ppc); + return dlm_lowcomms_new_buffer(nodeid, len, allocation, ppc, NULL, + NULL); } void dlm_midcomms_stateless_commit_buffer(void *mh) { dlm_lowcomms_commit_buffer(mh); + dlm_lowcomms_put_buffer(mh); } void midcomms_add_member(int nodeid) -- 2.26.3