All of lore.kernel.org
 help / color / mirror / Atom feed
From: Jack Wang <jinpu.wang@profitbricks.com>
To: linux-block@vger.kernel.org, linux-rdma@vger.kernel.org
Cc: dledford@redhat.com, axboe@kernel.dk, hch@lst.de,
	mail@fholler.de, Milind.dumbare@gmail.com,
	yun.wang@profitbricks.com,
	Jack Wang <jinpu.wang@profitbricks.com>,
	Kleber Souza <kleber.souza@profitbricks.com>,
	Danil Kipnis <danil.kipnis@profitbricks.com>,
	Roman Pen <roman.penyaev@profitbricks.com>
Subject: [PATCH 10/28] ibtrs_srv: add main functionality for ibtrs_server
Date: Fri, 24 Mar 2017 11:45:25 +0100	[thread overview]
Message-ID: <1490352343-20075-11-git-send-email-jinpu.wangl@profitbricks.com> (raw)
In-Reply-To: <1490352343-20075-1-git-send-email-jinpu.wangl@profitbricks.com>

From: Jack Wang <jinpu.wang@profitbricks.com>

Service accept connection requests from clients and reserve memory
for them.

It excutes rdma transfers, hands over received data to ibnbd_server.

Signed-off-by: Jack Wang <jinpu.wang@profitbricks.com>
Signed-off-by: Kleber Souza <kleber.souza@profitbricks.com>
Signed-off-by: Danil Kipnis <danil.kipnis@profitbricks.com>
Signed-off-by: Roman Pen <roman.penyaev@profitbricks.com>
---
 drivers/infiniband/ulp/ibtrs_server/ibtrs_srv.c | 3744 +++++++++++++++++++++++
 1 file changed, 3744 insertions(+)
 create mode 100644 drivers/infiniband/ulp/ibtrs_server/ibtrs_srv.c

diff --git a/drivers/infiniband/ulp/ibtrs_server/ibtrs_srv.c b/drivers/infiniband/ulp/ibtrs_server/ibtrs_srv.c
new file mode 100644
index 0000000..513e90a
--- /dev/null
+++ b/drivers/infiniband/ulp/ibtrs_server/ibtrs_srv.c
@@ -0,0 +1,3744 @@
+/*
+ * InfiniBand Transport Layer
+ *
+ * Copyright (c) 2014 - 2017 ProfitBricks GmbH. All rights reserved.
+ * Authors: Fabian Holler < mail@fholler.de>
+ *          Jack Wang <jinpu.wang@profitbricks.com>
+ *   	    Kleber Souza <kleber.souza@profitbricks.com>
+ * 	    Danil Kipnis <danil.kipnis@profitbricks.com>
+ *   	    Roman Pen <roman.penyaev@profitbricks.com>
+ *          Milind Dumbare <Milind.dumbare@gmail.com>
+ *
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions, and the following disclaimer,
+ *    without modification.
+ * 2. Redistributions in binary form must reproduce at minimum a disclaimer
+ *    substantially similar to the "NO WARRANTY" disclaimer below
+ *    ("Disclaimer") and any redistribution must be conditioned upon
+ *    including a substantially similar Disclaimer requirement for further
+ *    binary redistribution.
+ * 3. Neither the names of the above-listed copyright holders nor the names
+ *    of any contributors may be used to endorse or promote products derived
+ *    from this software without specific prior written permission.
+ *
+ * Alternatively, this software may be distributed under the terms of the
+ * GNU General Public License ("GPL") version 2 as published by the Free
+ * Software Foundation.
+ *
+ * NO WARRANTY
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTIBILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * HOLDERS OR CONTRIBUTORS BE LIABLE FOR SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+ * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+ * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
+ * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGES.
+ *
+ */
+
+#include <linux/module.h>
+#include <linux/sizes.h>
+#include <linux/utsname.h>
+#include <linux/cpumask.h>
+#include <linux/debugfs.h>
+#include <rdma/ib_verbs.h>
+#include <rdma/rdma_cm.h>
+#include <rdma/ib.h>
+
+#include <rdma/ibtrs_srv.h>
+#include "ibtrs_srv_sysfs.h"
+#include "ibtrs_srv_internal.h"
+#include <rdma/ibtrs.h>
+#include <rdma/ibtrs_log.h>
+
+MODULE_AUTHOR("ibnbd@profitbricks.com");
+MODULE_DESCRIPTION("InfiniBand Transport Server");
+MODULE_VERSION(__stringify(IBTRS_VER));
+MODULE_LICENSE("GPL");
+
+#define DEFAULT_MAX_IO_SIZE_KB 128
+#define DEFAULT_MAX_IO_SIZE (DEFAULT_MAX_IO_SIZE_KB * 1024)
+static int max_io_size = DEFAULT_MAX_IO_SIZE;
+#define MAX_REQ_SIZE PAGE_SIZE
+static int rcv_buf_size = DEFAULT_MAX_IO_SIZE + MAX_REQ_SIZE;
+
+static int max_io_size_set(const char *val, const struct kernel_param *kp)
+{
+	int err, ival;
+
+	err = kstrtoint(val, 0, &ival);
+	if (err)
+		return err;
+
+	if (ival < 4096 || ival + MAX_REQ_SIZE > (4096 * 1024) ||
+	    (ival + MAX_REQ_SIZE) % 512 != 0) {
+		ERR_NP("Invalid max io size value %d, has to be"
+		       " > %d, < %d\n", ival, 4096, 4194304);
+		return -EINVAL;
+	}
+
+	max_io_size = ival;
+	rcv_buf_size = max_io_size + MAX_REQ_SIZE;
+	INFO_NP("max io size changed to %d\n", ival);
+
+	return 0;
+}
+
+static const struct kernel_param_ops max_io_size_ops = {
+	.set		= max_io_size_set,
+	.get		= param_get_int,
+};
+module_param_cb(max_io_size, &max_io_size_ops, &max_io_size, 0444);
+MODULE_PARM_DESC(max_io_size,
+		 "Max size for each IO request, when change the unit is in byte"
+		 " (default: " __stringify(DEFAULT_MAX_IO_SIZE_KB) "KB)");
+
+#define DEFAULT_SESS_QUEUE_DEPTH 512
+static int sess_queue_depth = DEFAULT_SESS_QUEUE_DEPTH;
+module_param_named(sess_queue_depth, sess_queue_depth, int, 0444);
+MODULE_PARM_DESC(sess_queue_depth,
+		 "Number of buffers for pending I/O requests to allocate"
+		 " per session. Maximum: " __stringify(MAX_SESS_QUEUE_DEPTH)
+		 " (default: " __stringify(DEFAULT_SESS_QUEUE_DEPTH) ")");
+
+#define DEFAULT_INIT_POOL_SIZE 10
+static int init_pool_size = DEFAULT_INIT_POOL_SIZE;
+module_param_named(init_pool_size, init_pool_size, int, 0444);
+MODULE_PARM_DESC(init_pool_size,
+		 "Maximum size of the RDMA buffers pool to pre-allocate on"
+		 " module load, in number of sessions. (default: "
+		 __stringify(DEFAULT_INIT_POOL_SIZE) ")");
+
+#define DEFAULT_POOL_SIZE_HI_WM 100
+static int pool_size_hi_wm = DEFAULT_POOL_SIZE_HI_WM;
+module_param_named(pool_size_hi_wm, pool_size_hi_wm, int, 0444);
+MODULE_PARM_DESC(pool_size_hi_wm,
+		 "High watermark value for the size of RDMA buffers pool"
+		 " (in number of sessions). Newly allocated buffers will be"
+		 " added to the pool until pool_size_hi_wm is reached."
+		 " (default: " __stringify(DEFAULT_POOL_SIZE_HI_WM) ")");
+
+static int retry_count = 7;
+
+static int retry_count_set(const char *val, const struct kernel_param *kp)
+{
+	int err, ival;
+
+	err = kstrtoint(val, 0, &ival);
+	if (err)
+		return err;
+
+	if (ival < MIN_RTR_CNT || ival > MAX_RTR_CNT) {
+		ERR_NP("Invalid retry count value %d, has to be"
+		       " > %d, < %d\n", ival, MIN_RTR_CNT, MAX_RTR_CNT);
+		return -EINVAL;
+	}
+
+	retry_count = ival;
+	INFO_NP("QP retry count changed to %d\n", ival);
+
+	return 0;
+}
+
+static const struct kernel_param_ops retry_count_ops = {
+	.set		= retry_count_set,
+	.get		= param_get_int,
+};
+module_param_cb(retry_count, &retry_count_ops, &retry_count, 0644);
+
+MODULE_PARM_DESC(retry_count, "Number of times to send the message if the"
+		 " remote side didn't respond with Ack or Nack (default: 3,"
+		 " min: " __stringify(MIN_RTR_CNT) ", max: "
+		 __stringify(MAX_RTR_CNT) ")");
+
+static int default_heartbeat_timeout_ms = DEFAULT_HEARTBEAT_TIMEOUT_MS;
+
+static int default_heartbeat_timeout_set(const char *val,
+					 const struct kernel_param *kp)
+{
+	int ret, ival;
+
+	ret = kstrtouint(val, 0, &ival);
+	if (ret)
+		return ret;
+
+	ret = ibtrs_heartbeat_timeout_validate(ival);
+	if (ret)
+		return ret;
+
+	default_heartbeat_timeout_ms = ival;
+	INFO_NP("Default heartbeat timeout changed to %d\n", ival);
+
+	return 0;
+}
+
+static const struct kernel_param_ops heartbeat_timeout_ops = {
+	.set		= default_heartbeat_timeout_set,
+	.get		= param_get_int,
+};
+
+module_param_cb(default_heartbeat_timeout_ms, &heartbeat_timeout_ops,
+		&default_heartbeat_timeout_ms, 0644);
+MODULE_PARM_DESC(default_heartbeat_timeout_ms, "default heartbeat timeout,"
+		 " min. " __stringify(MIN_HEARTBEAT_TIMEOUT_MS)
+		 " (default:" __stringify(DEFAULT_HEARTBEAT_TIMEOUT_MS) ")");
+
+static char cq_affinity_list[256] = "";
+static cpumask_t cq_affinity_mask = { CPU_BITS_ALL };
+
+static void init_cq_affinity(void)
+{
+	sprintf(cq_affinity_list, "0-%d", nr_cpu_ids - 1);
+}
+
+static int cq_affinity_list_set(const char *val, const struct kernel_param *kp)
+{
+	int ret = 0, len = strlen(val);
+	cpumask_var_t new_value;
+
+	if (!strlen(cq_affinity_list))
+		init_cq_affinity();
+
+	if (len >= sizeof(cq_affinity_list))
+		return -EINVAL;
+	if (!alloc_cpumask_var(&new_value, GFP_KERNEL))
+		return -ENOMEM;
+
+	ret = cpulist_parse(val, new_value);
+	if (ret) {
+		ERR_NP("Can't set cq_affinity_list \"%s\": %d\n", val, ret);
+		goto free_cpumask;
+	}
+
+	strlcpy(cq_affinity_list, val, sizeof(cq_affinity_list));
+	*strchrnul(cq_affinity_list, '\n') = '\0';
+	cpumask_copy(&cq_affinity_mask, new_value);
+
+	INFO_NP("cq_affinity_list changed to %*pbl\n",
+		cpumask_pr_args(&cq_affinity_mask));
+free_cpumask:
+	free_cpumask_var(new_value);
+	return ret;
+}
+
+static struct kparam_string cq_affinity_list_kparam_str = {
+	.maxlen	= sizeof(cq_affinity_list),
+	.string	= cq_affinity_list
+};
+
+static const struct kernel_param_ops cq_affinity_list_ops = {
+	.set	= cq_affinity_list_set,
+	.get	= param_get_string,
+};
+
+module_param_cb(cq_affinity_list, &cq_affinity_list_ops,
+		&cq_affinity_list_kparam_str, 0644);
+MODULE_PARM_DESC(cq_affinity_list, "Sets the list of cpus to use as cq vectors."
+				   "(default: use all possible CPUs)");
+
+static char hostname[MAXHOSTNAMELEN] = "";
+
+static int hostname_set(const char *val, const struct kernel_param *kp)
+{
+	int ret = 0, len = strlen(val);
+
+	if (len >= sizeof(hostname))
+		return -EINVAL;
+	strlcpy(hostname, val, sizeof(hostname));
+	*strchrnul(hostname, '\n') = '\0';
+
+	INFO_NP("hostname changed to %s\n", hostname);
+	return ret;
+}
+
+static struct kparam_string hostname_kparam_str = {
+	.maxlen	= sizeof(hostname),
+	.string	= hostname
+};
+
+static const struct kernel_param_ops hostname_ops = {
+	.set	= hostname_set,
+	.get	= param_get_string,
+};
+
+module_param_cb(hostname, &hostname_ops,
+		&hostname_kparam_str, 0644);
+MODULE_PARM_DESC(hostname, "Sets hostname of local server, will send to the"
+		 " other side if set,  will display togather with addr "
+		 "(default: empty)");
+
+static struct dentry *ibtrs_srv_debugfs_dir;
+static struct dentry *mempool_debugfs_dir;
+
+static struct rdma_cm_id	*cm_id_ip;
+static struct rdma_cm_id	*cm_id_ib;
+static DEFINE_MUTEX(sess_mutex);
+static LIST_HEAD(sess_list);
+static DECLARE_WAIT_QUEUE_HEAD(sess_list_waitq);
+static struct workqueue_struct *destroy_wq;
+
+static LIST_HEAD(device_list);
+static DEFINE_MUTEX(device_list_mutex);
+
+static DEFINE_MUTEX(buf_pool_mutex);
+static LIST_HEAD(free_buf_pool_list);
+static int nr_free_buf_pool;
+static int nr_total_buf_pool;
+static int nr_active_sessions;
+
+static const struct ibtrs_srv_ops *srv_ops;
+enum ssm_ev {
+	SSM_EV_CON_DISCONNECTED,
+	SSM_EV_CON_EST_ERR,
+	SSM_EV_CON_CONNECTED,
+	SSM_EV_SESS_CLOSE,
+	SSM_EV_SYSFS_DISCONNECT
+};
+
+static inline const char *ssm_ev_str(enum ssm_ev ev)
+{
+	switch (ev) {
+	case SSM_EV_CON_DISCONNECTED:
+		return "SSM_EV_CON_DISCONNECTED";
+	case SSM_EV_CON_EST_ERR:
+		return "SSM_EV_CON_EST_ERR";
+	case SSM_EV_CON_CONNECTED:
+		return "SSM_EV_CON_CONNECTED";
+	case SSM_EV_SESS_CLOSE:
+		return "SSM_EV_SESS_CLOSE";
+	case SSM_EV_SYSFS_DISCONNECT:
+		return "SSM_EV_SYSFS_DISCONNECT";
+	default:
+		return "UNKNOWN";
+	}
+}
+
+static const char *ssm_state_str(enum ssm_state state)
+{
+	switch (state) {
+	case SSM_STATE_IDLE:
+		return "SSM_STATE_IDLE";
+	case SSM_STATE_CONNECTED:
+		return "SSM_STATE_CONNECTED";
+	case SSM_STATE_CLOSING:
+		return "SSM_STATE_CLOSING";
+	case SSM_STATE_CLOSED:
+		return "SSM_STATE_CLOSED";
+	default:
+		return "UNKNOWN";
+	}
+}
+
+enum csm_state {
+	CSM_STATE_REQUESTED,
+	CSM_STATE_CONNECTED,
+	CSM_STATE_CLOSING,
+	CSM_STATE_FLUSHING,
+	CSM_STATE_CLOSED
+};
+
+static inline const char *csm_state_str(enum csm_state s)
+{
+	switch (s) {
+	case CSM_STATE_REQUESTED:
+		return "CSM_STATE_REQUESTED";
+	case CSM_STATE_CONNECTED:
+		return "CSM_STATE_CONNECTED";
+	case CSM_STATE_CLOSING:
+		return "CSM_STATE_CLOSING";
+	case CSM_STATE_FLUSHING:
+		return "CSM_STATE_FLUSHING";
+	case CSM_STATE_CLOSED:
+		return "CSM_STATE_CLOSED";
+	default:
+		return "UNKNOWN";
+	}
+}
+
+enum csm_ev {
+	CSM_EV_CON_REQUEST,
+	CSM_EV_CON_ESTABLISHED,
+	CSM_EV_CON_ERROR,
+	CSM_EV_DEVICE_REMOVAL,
+	CSM_EV_SESS_CLOSING,
+	CSM_EV_CON_DISCONNECTED,
+	CSM_EV_BEACON_COMPLETED
+};
+
+static inline const char *csm_ev_str(enum csm_ev ev)
+{
+	switch (ev) {
+	case CSM_EV_CON_REQUEST:
+		return "CSM_EV_CON_REQUEST";
+	case CSM_EV_CON_ESTABLISHED:
+		return "CSM_EV_CON_ESTABLISHED";
+	case CSM_EV_CON_ERROR:
+		return "CSM_EV_CON_ERROR";
+	case CSM_EV_DEVICE_REMOVAL:
+		return "CSM_EV_DEVICE_REMOVAL";
+	case CSM_EV_SESS_CLOSING:
+		return "CSM_EV_SESS_CLOSING";
+	case CSM_EV_CON_DISCONNECTED:
+		return "CSM_EV_CON_DISCONNECTED";
+	case CSM_EV_BEACON_COMPLETED:
+		return "CSM_EV_BEACON_COMPLETED";
+	default:
+		return "UNKNOWN";
+	}
+}
+
+struct sess_put_work {
+	struct ibtrs_session	*sess;
+	struct work_struct	work;
+};
+
+struct ibtrs_srv_sysfs_put_work {
+	struct work_struct	work;
+	struct ibtrs_session	*sess;
+};
+
+struct ssm_create_con_work {
+	struct ibtrs_session	*sess;
+	struct rdma_cm_id	*cm_id;
+	struct work_struct	work;
+	bool			user;/* true if con is for user msg only */
+};
+
+struct ssm_work {
+	struct ibtrs_session	*sess;
+	enum ssm_ev		ev;
+	struct work_struct	work;
+};
+
+struct ibtrs_con {
+	/* list for ibtrs_session->con_list */
+	struct list_head	list;
+	enum csm_state		state;
+	/* true if con is for user msg only */
+	bool			user;
+	bool			failover_enabled;
+	struct ib_con		ib_con;
+	atomic_t		wr_cnt;
+	struct rdma_cm_id	*cm_id;
+	int			cq_vector;
+	struct ibtrs_session	*sess;
+	struct work_struct	cq_work;
+	struct workqueue_struct *cq_wq;
+	struct workqueue_struct *rdma_resp_wq;
+	struct ib_wc		wcs[WC_ARRAY_SIZE];
+	bool			device_being_removed;
+};
+
+struct csm_work {
+	struct ibtrs_con	*con;
+	enum csm_ev		ev;
+	struct work_struct	work;
+};
+
+struct msg_work {
+	struct work_struct	work;
+	struct ibtrs_con	*con;
+	void                    *msg;
+};
+
+struct ibtrs_device {
+	struct list_head	entry;
+	struct ib_device	*device;
+	struct ib_session	ib_sess;
+	struct completion	*ib_sess_destroy_completion;
+	struct kref		ref;
+};
+
+struct ibtrs_ops_id {
+	struct ibtrs_con		*con;
+	u32				msg_id;
+	u8				dir;
+	u64				data_dma_addr;
+	struct ibtrs_msg_req_rdma_write *req;
+	struct ib_rdma_wr		*tx_wr;
+	struct ib_sge			*tx_sg;
+	int				status;
+	struct work_struct		work;
+} ____cacheline_aligned;
+
+static void csm_set_state(struct ibtrs_con *con, enum csm_state s)
+{
+	if (con->state != s) {
+		DEB("changing con %p csm state from %s to %s\n", con,
+		    csm_state_str(con->state), csm_state_str(s));
+		con->state = s;
+	}
+}
+
+static void ssm_set_state(struct ibtrs_session *sess, enum ssm_state state)
+{
+	if (sess->state != state) {
+		DEB("changing sess %p ssm state from %s to %s\n", sess,
+		    ssm_state_str(sess->state), ssm_state_str(state));
+		sess->state = state;
+	}
+}
+
+static struct ibtrs_con *ibtrs_srv_get_user_con(struct ibtrs_session *sess)
+{
+	struct ibtrs_con *con;
+
+	if (sess->est_cnt > 0) {
+		list_for_each_entry(con, &sess->con_list, list) {
+			if (con->user && con->state == CSM_STATE_CONNECTED)
+				return con;
+		}
+	}
+	return NULL;
+}
+
+static void csm_init(struct ibtrs_con *con);
+static void csm_schedule_event(struct ibtrs_con *con, enum csm_ev ev);
+static int ssm_init(struct ibtrs_session *sess);
+static int ssm_schedule_event(struct ibtrs_session *sess, enum ssm_ev ev);
+
+static int ibtrs_srv_get_sess_current_port_num(struct ibtrs_session *sess)
+{
+	struct ibtrs_con *con, *next;
+	struct ibtrs_con *ucon = ibtrs_srv_get_user_con(sess);
+
+	if (sess->state != SSM_STATE_CONNECTED || !ucon)
+		return -ECOMM;
+
+	mutex_lock(&sess->lock);
+	if (WARN_ON(!sess->cm_id)) {
+		mutex_unlock(&sess->lock);
+		return -ENODEV;
+	}
+	list_for_each_entry_safe(con, next, &sess->con_list, list) {
+		if (unlikely(con->state != CSM_STATE_CONNECTED)) {
+			mutex_unlock(&sess->lock);
+			return -ECOMM;
+		}
+		if (con->cm_id->port_num != sess->cm_id->port_num) {
+			mutex_unlock(&sess->lock);
+			return 0;
+		}
+	}
+	mutex_unlock(&sess->lock);
+	return sess->cm_id->port_num;
+}
+
+int ibtrs_srv_current_hca_port_to_str(struct ibtrs_session *sess,
+				      char *buf, size_t len)
+{
+	if (!ibtrs_srv_get_sess_current_port_num(sess))
+		return scnprintf(buf, len, "migrating\n");
+
+	if (ibtrs_srv_get_sess_current_port_num(sess) < 0)
+		return ibtrs_srv_get_sess_current_port_num(sess);
+
+	return scnprintf(buf, len, "%u\n",
+			 ibtrs_srv_get_sess_current_port_num(sess));
+}
+
+inline const char *ibtrs_srv_get_sess_hca_name(struct ibtrs_session *sess)
+{
+	struct ibtrs_con *con = ibtrs_srv_get_user_con(sess);
+
+	if (con)
+		return sess->dev->device->name;
+	return "n/a";
+}
+
+static void ibtrs_srv_update_rdma_stats(struct ibtrs_srv_stats *s,
+					size_t size, bool read)
+{
+	int inflight;
+
+	if (read) {
+		atomic64_inc(&s->rdma_stats.cnt_read);
+		atomic64_add(size, &s->rdma_stats.size_total_read);
+	} else {
+		atomic64_inc(&s->rdma_stats.cnt_write);
+		atomic64_add(size, &s->rdma_stats.size_total_write);
+	}
+
+	inflight = atomic_inc_return(&s->rdma_stats.inflight);
+	atomic64_add(inflight, &s->rdma_stats.inflight_total);
+}
+
+static inline void ibtrs_srv_stats_dec_inflight(struct ibtrs_session *sess)
+{
+	if (!atomic_dec_return(&sess->stats.rdma_stats.inflight))
+		wake_up(&sess->bufs_wait);
+}
+
+int ibtrs_srv_reset_rdma_stats(struct ibtrs_session *sess, bool enable)
+{
+	if (enable) {
+		struct ibtrs_srv_stats_rdma_stats *r = &sess->stats.rdma_stats;
+
+		/*
+		 * TODO: inflight is used for flow control
+		 * we can't memset the whole structure, so reset each member
+		 */
+		atomic64_set(&r->cnt_read, 0);
+		atomic64_set(&r->size_total_read, 0);
+		atomic64_set(&r->cnt_write, 0);
+		atomic64_set(&r->size_total_write, 0);
+		atomic64_set(&r->inflight_total, 0);
+		return 0;
+	} else {
+		return -EINVAL;
+	}
+}
+
+ssize_t ibtrs_srv_stats_rdma_to_str(struct ibtrs_session *sess,
+				    char *page, size_t len)
+{
+	struct ibtrs_srv_stats_rdma_stats *r = &sess->stats.rdma_stats;
+
+	return scnprintf(page, len, "%ld %ld %ld %ld %u %ld\n",
+			 atomic64_read(&r->cnt_read),
+			 atomic64_read(&r->size_total_read),
+			 atomic64_read(&r->cnt_write),
+			 atomic64_read(&r->size_total_write),
+			 atomic_read(&r->inflight),
+			 (atomic64_read(&r->cnt_read) +
+			  atomic64_read(&r->cnt_write)) ?
+			 atomic64_read(&r->inflight_total) /
+			 (atomic64_read(&r->cnt_read) +
+			  atomic64_read(&r->cnt_write)) : 0);
+}
+
+int ibtrs_srv_reset_user_ib_msgs_stats(struct ibtrs_session *sess, bool enable)
+{
+	if (enable) {
+		memset(&sess->stats.user_ib_msgs, 0,
+		       sizeof(sess->stats.user_ib_msgs));
+		return 0;
+	} else {
+		return -EINVAL;
+	}
+}
+
+int ibtrs_srv_stats_user_ib_msgs_to_str(struct ibtrs_session *sess, char *buf,
+					size_t len)
+{
+	return snprintf(buf, len, "%ld %ld %ld %ld\n",
+			atomic64_read(&sess->stats.user_ib_msgs.recv_msg_cnt),
+			atomic64_read(&sess->stats.user_ib_msgs.recv_size),
+			atomic64_read(&sess->stats.user_ib_msgs.sent_msg_cnt),
+			atomic64_read(&sess->stats.user_ib_msgs.sent_size));
+}
+
+int ibtrs_srv_reset_wc_completion_stats(struct ibtrs_session *sess, bool enable)
+{
+	if (enable) {
+		memset(&sess->stats.wc_comp, 0, sizeof(sess->stats.wc_comp));
+		return 0;
+	} else {
+		return -EINVAL;
+	}
+}
+
+int ibtrs_srv_stats_wc_completion_to_str(struct ibtrs_session *sess, char *buf,
+					 size_t len)
+{
+	return snprintf(buf, len, "%d %ld %ld\n",
+			atomic_read(&sess->stats.wc_comp.max_wc_cnt),
+			atomic64_read(&sess->stats.wc_comp.total_wc_cnt),
+			atomic64_read(&sess->stats.wc_comp.calls));
+}
+
+ssize_t ibtrs_srv_reset_all_help(struct ibtrs_session *sess,
+				 char *page, size_t len)
+{
+	return scnprintf(page, PAGE_SIZE, "echo 1 to reset all statistics\n");
+}
+
+int ibtrs_srv_reset_all_stats(struct ibtrs_session *sess, bool enable)
+{
+	if (enable) {
+		ibtrs_srv_reset_wc_completion_stats(sess, enable);
+		ibtrs_srv_reset_user_ib_msgs_stats(sess, enable);
+		ibtrs_srv_reset_rdma_stats(sess, enable);
+		return 0;
+	} else {
+		return -EINVAL;
+	}
+}
+
+static inline bool srv_ops_are_valid(const struct ibtrs_srv_ops *ops)
+{
+	return ops && ops->sess_ev && ops->rdma_ev && ops->recv;
+}
+
+static int ibtrs_srv_sess_ev(struct ibtrs_session *sess,
+			     enum ibtrs_srv_sess_ev ev)
+{
+	if (!sess->session_announced_to_user &&
+	    ev != IBTRS_SRV_SESS_EV_CONNECTED)
+		return 0;
+
+	if (ev == IBTRS_SRV_SESS_EV_CONNECTED)
+		sess->session_announced_to_user = true;
+
+	return srv_ops->sess_ev(sess, ev, sess->priv);
+}
+
+static void free_id(struct ibtrs_ops_id *id)
+{
+	if (!id)
+		return;
+	kfree(id->tx_wr);
+	kfree(id->tx_sg);
+	kvfree(id);
+}
+
+static void free_sess_tx_bufs(struct ibtrs_session *sess)
+{
+	struct ibtrs_iu *e, *next;
+	int i;
+
+	if (sess->rdma_info_iu) {
+		ibtrs_iu_free(sess->rdma_info_iu, DMA_TO_DEVICE,
+			      sess->dev->device);
+		sess->rdma_info_iu = NULL;
+	}
+
+	WARN_ON(sess->tx_bufs_used);
+	list_for_each_entry_safe(e, next, &sess->tx_bufs, list) {
+		list_del(&e->list);
+		ibtrs_iu_free(e, DMA_TO_DEVICE, sess->dev->device);
+	}
+
+	if (sess->ops_ids) {
+		for (i = 0; i < sess->queue_depth; i++)
+			free_id(sess->ops_ids[i]);
+		kfree(sess->ops_ids);
+		sess->ops_ids = NULL;
+	}
+}
+
+static void put_tx_iu(struct ibtrs_session *sess, struct ibtrs_iu *iu)
+{
+	spin_lock(&sess->tx_bufs_lock);
+	ibtrs_iu_put(&sess->tx_bufs, iu);
+	sess->tx_bufs_used--;
+	spin_unlock(&sess->tx_bufs_lock);
+}
+
+static struct ibtrs_iu *get_tx_iu(struct ibtrs_session *sess)
+{
+	struct ibtrs_iu *iu;
+
+	spin_lock(&sess->tx_bufs_lock);
+	iu = ibtrs_iu_get(&sess->tx_bufs);
+	if (iu)
+		sess->tx_bufs_used++;
+	spin_unlock(&sess->tx_bufs_lock);
+
+	return iu;
+}
+
+static int rdma_write_sg(struct ibtrs_ops_id *id)
+{
+	int err, i, offset;
+	struct ib_send_wr *bad_wr;
+	struct ib_rdma_wr *wr = NULL;
+	struct ibtrs_session *sess = id->con->sess;
+
+	if (unlikely(id->req->sg_cnt == 0))
+		return -EINVAL;
+
+	offset = 0;
+	for (i = 0; i < id->req->sg_cnt; i++) {
+		struct ib_sge *list;
+
+		wr		= &id->tx_wr[i];
+		list		= &id->tx_sg[i];
+		list->addr	= id->data_dma_addr + offset;
+		list->length	= id->req->desc[i].len;
+
+		/* WR will fail with length error
+		 * if this is 0
+		 */
+		if (unlikely(list->length == 0)) {
+			ERR(sess, "Invalid RDMA-Write sg list length 0\n");
+			return -EINVAL;
+		}
+
+		list->lkey = sess->dev->ib_sess.pd->local_dma_lkey;
+		offset += list->length;
+
+		wr->wr.wr_id		= (uintptr_t)id;
+		wr->wr.sg_list		= list;
+		wr->wr.num_sge		= 1;
+		wr->remote_addr	= id->req->desc[i].addr;
+		wr->rkey	= id->req->desc[i].key;
+
+		if (i < (id->req->sg_cnt - 1)) {
+			wr->wr.next	= &id->tx_wr[i + 1].wr;
+			wr->wr.opcode	= IB_WR_RDMA_WRITE;
+			wr->wr.ex.imm_data	= 0;
+			wr->wr.send_flags	= 0;
+		}
+	}
+
+	wr->wr.opcode	= IB_WR_RDMA_WRITE_WITH_IMM;
+	wr->wr.next	= NULL;
+	wr->wr.send_flags	= atomic_inc_return(&id->con->wr_cnt) %
+				sess->queue_depth ? 0 : IB_SEND_SIGNALED;
+	wr->wr.ex.imm_data	= cpu_to_be32(id->msg_id << 16);
+
+	err = ib_post_send(id->con->ib_con.qp, &id->tx_wr[0].wr, &bad_wr);
+	if (unlikely(err))
+		ERR(sess,
+		    "Posting RDMA-Write-Request to QP failed, errno: %d\n",
+		    err);
+
+	return err;
+}
+
+static int send_io_resp_imm(struct ibtrs_con *con, int msg_id, s16 errno)
+{
+	int err;
+
+	err = ibtrs_write_empty_imm(con->ib_con.qp, (msg_id << 16) | (u16)errno,
+				    atomic_inc_return(&con->wr_cnt) %
+				    con->sess->queue_depth ? 0 :
+				    IB_SEND_SIGNALED);
+	if (unlikely(err))
+		ERR_RL(con->sess, "Posting RDMA-Write-Request to QP failed,"
+		       " errno: %d\n", err);
+
+	return err;
+}
+
+static int send_heartbeat_raw(struct ibtrs_con *con)
+{
+	int err;
+
+	err = ibtrs_write_empty_imm(con->ib_con.qp, UINT_MAX, IB_SEND_SIGNALED);
+	if (unlikely(err)) {
+		ERR(con->sess,
+		    "Sending heartbeat failed, posting msg to QP failed,"
+		    " errno: %d\n", err);
+		return err;
+	}
+
+	ibtrs_heartbeat_set_send_ts(&con->sess->heartbeat);
+	return err;
+}
+
+static int send_heartbeat(struct ibtrs_session *sess)
+{
+	struct ibtrs_con *con;
+
+	if (unlikely(list_empty(&sess->con_list)))
+		return -ENOENT;
+
+	con = list_first_entry(&sess->con_list, struct ibtrs_con, list);
+	WARN_ON(!con->user);
+
+	if (unlikely(con->state != CSM_STATE_CONNECTED))
+		return -ENOTCONN;
+
+	return send_heartbeat_raw(con);
+}
+
+static int ibtrs_srv_queue_resp_rdma(struct ibtrs_ops_id *id)
+{
+	if (unlikely(id->con->state != CSM_STATE_CONNECTED)) {
+		ERR_RL(id->con->sess, "Sending I/O response failed, "
+		       " session is disconnected, sess state %s,"
+		       " con state %s\n", ssm_state_str(id->con->sess->state),
+		       csm_state_str(id->con->state));
+		return -ECOMM;
+	}
+
+	if (WARN_ON(!queue_work(id->con->rdma_resp_wq, &id->work))) {
+		ERR_RL(id->con->sess, "Sending I/O response failed,"
+		       " couldn't queue work\n");
+		return -EPERM;
+	}
+
+	return 0;
+}
+
+static void ibtrs_srv_resp_rdma_worker(struct work_struct *work)
+{
+	struct ibtrs_ops_id *id;
+	int err;
+	struct ibtrs_session *sess;
+
+	id = container_of(work, struct ibtrs_ops_id, work);
+	sess = id->con->sess;
+
+	if (id->status || id->dir == WRITE) {
+		DEB("err or write msg_id=%d, status=%d, sending response\n",
+		    id->msg_id, id->status);
+
+		err = send_io_resp_imm(id->con, id->msg_id, id->status);
+		if (unlikely(err)) {
+			ERR_RL(sess, "Sending imm msg failed, errno: %d\n",
+			       err);
+			if (err == -ENOMEM && !ibtrs_srv_queue_resp_rdma(id))
+				return;
+			csm_schedule_event(id->con, CSM_EV_CON_ERROR);
+		}
+
+		ibtrs_heartbeat_set_send_ts(&sess->heartbeat);
+		ibtrs_srv_stats_dec_inflight(sess);
+		return;
+	}
+
+	DEB("read req msg_id=%d completed, sending data\n", id->msg_id);
+	err = rdma_write_sg(id);
+	if (unlikely(err)) {
+		ERR_RL(sess, "Sending I/O read response failed, errno: %d\n",
+		       err);
+		if (err == -ENOMEM && !ibtrs_srv_queue_resp_rdma(id))
+			return;
+		csm_schedule_event(id->con, CSM_EV_CON_ERROR);
+	}
+	ibtrs_heartbeat_set_send_ts(&sess->heartbeat);
+	ibtrs_srv_stats_dec_inflight(sess);
+}
+
+/*
+ * This function may be called from an interrupt context, e.g. on bio_endio
+ * callback on the user module. Queue the real work on a workqueue so we don't
+ * need to hold an irq spinlock.
+ */
+int ibtrs_srv_resp_rdma(struct ibtrs_ops_id *id, int status)
+{
+	int err = 0;
+
+	if (unlikely(!id)) {
+		ERR_NP("Sending I/O response failed, I/O ops id NULL\n");
+		return -EINVAL;
+	}
+
+	id->status = status;
+	INIT_WORK(&id->work, ibtrs_srv_resp_rdma_worker);
+
+	err = ibtrs_srv_queue_resp_rdma(id);
+	if (err)
+		ibtrs_srv_stats_dec_inflight(id->con->sess);
+	return err;
+}
+EXPORT_SYMBOL(ibtrs_srv_resp_rdma);
+
+static bool ibtrs_srv_get_usr_msg_buf(struct ibtrs_session *sess)
+{
+	return atomic_dec_if_positive(&sess->peer_usr_msg_bufs) >= 0;
+}
+
+int ibtrs_srv_send(struct ibtrs_session *sess, const struct kvec *vec,
+		   size_t nr)
+{
+	struct ibtrs_iu *iu = NULL;
+	struct ibtrs_con *con;
+	struct ibtrs_msg_user *msg;
+	size_t len;
+	bool closed_st = false;
+	int err;
+
+	if (WARN_ONCE(list_empty(&sess->con_list),
+		      "Sending message failed, no connection available\n"))
+		return -ECOMM;
+	con = ibtrs_srv_get_user_con(sess);
+
+	if (unlikely(!con)) {
+		WRN(sess,
+		    "Sending message failed, no user connection exists\n");
+		return -ECOMM;
+	}
+
+	len = kvec_length(vec, nr);
+
+	if (unlikely(len + IBTRS_HDR_LEN > MAX_REQ_SIZE)) {
+		WRN_RL(sess, "Sending message failed, passed data too big,"
+		       " %zu > %lu\n", len, MAX_REQ_SIZE - IBTRS_HDR_LEN);
+		return -EMSGSIZE;
+	}
+
+	wait_event(sess->mu_buf_wait_q,
+		   (closed_st = (con->state != CSM_STATE_CONNECTED)) ||
+		   ibtrs_srv_get_usr_msg_buf(sess));
+
+	if (unlikely(closed_st)) {
+		ERR_RL(sess, "Sending message failed, not connected (state"
+		       " %s)\n", csm_state_str(con->state));
+		return -ECOMM;
+	}
+
+	wait_event(sess->mu_iu_wait_q,
+		   (closed_st = (con->state != CSM_STATE_CONNECTED)) ||
+		   (iu = get_tx_iu(sess)) != NULL);
+
+	if (unlikely(closed_st)) {
+		ERR_RL(sess, "Sending message failed, not connected (state"
+		       " %s)\n", csm_state_str(con->state));
+		err = -ECOMM;
+		goto err_iu;
+	}
+
+	msg		= iu->buf;
+	msg->hdr.type	= IBTRS_MSG_USER;
+	msg->hdr.tsize	= len + IBTRS_HDR_LEN;
+	copy_from_kvec(msg->payl, vec, len);
+
+	ibtrs_deb_msg_hdr("Sending: ", &msg->hdr);
+	err = ibtrs_post_send(con->ib_con.qp,
+			      con->sess->dev->ib_sess.pd->__internal_mr, iu,
+			      msg->hdr.tsize);
+	if (unlikely(err)) {
+		ERR_RL(sess, "Sending message failed, posting message to QP"
+		       " failed, errno: %d\n", err);
+		goto err_post_send;
+	}
+	ibtrs_heartbeat_set_send_ts(&sess->heartbeat);
+
+	atomic64_inc(&sess->stats.user_ib_msgs.sent_msg_cnt);
+	atomic64_add(len, &sess->stats.user_ib_msgs.sent_size);
+
+	return 0;
+
+err_post_send:
+	put_tx_iu(sess, iu);
+	wake_up(&con->sess->mu_iu_wait_q);
+err_iu:
+	atomic_inc(&sess->peer_usr_msg_bufs);
+	wake_up(&con->sess->mu_buf_wait_q);
+	return err;
+}
+EXPORT_SYMBOL(ibtrs_srv_send);
+
+inline void ibtrs_srv_set_sess_priv(struct ibtrs_session *sess, void *priv)
+{
+	sess->priv = priv;
+}
+EXPORT_SYMBOL(ibtrs_srv_set_sess_priv);
+
+static int ibtrs_post_recv(struct ibtrs_con *con, struct ibtrs_iu *iu)
+{
+	struct ib_recv_wr wr, *bad_wr;
+	struct ib_sge list;
+	int err;
+
+	list.addr   = iu->dma_addr;
+	list.length = iu->size;
+	list.lkey   = con->sess->dev->ib_sess.pd->local_dma_lkey;
+
+	if (unlikely(list.length == 0)) {
+		ERR_RL(con->sess, "Posting recv buffer failed, invalid sg list"
+		       " length 0\n");
+		return -EINVAL;
+	}
+
+	wr.next     = NULL;
+	wr.wr_id    = (uintptr_t)iu;
+	wr.sg_list  = &list;
+	wr.num_sge  = 1;
+
+	err = ib_post_recv(con->ib_con.qp, &wr, &bad_wr);
+	if (unlikely(err))
+		ERR_RL(con->sess, "Posting recv buffer failed, errno: %d\n",
+		       err);
+
+	return err;
+}
+
+static struct ibtrs_rcv_buf_pool *alloc_rcv_buf_pool(void)
+{
+	struct ibtrs_rcv_buf_pool *pool;
+	struct page *cont_pages = NULL;
+	struct ibtrs_mem_chunk *mem_chunk;
+	int alloced_bufs = 0;
+	int rcv_buf_order = get_order(rcv_buf_size);
+	int max_order, alloc_order;
+	unsigned int alloced_size;
+
+	pool = kzalloc(sizeof(*pool), GFP_KERNEL);
+	if (!pool) {
+		ERR_NP("Failed to allocate memory for buffer pool struct\n");
+		return NULL;
+	}
+
+	pool->rcv_bufs = kcalloc(sess_queue_depth, sizeof(*pool->rcv_bufs),
+				 GFP_KERNEL);
+	if (!pool->rcv_bufs) {
+		ERR_NP("Failed to allocate array for receive buffers\n");
+		kfree(pool);
+		return NULL;
+	}
+	INIT_LIST_HEAD(&pool->chunk_list);
+
+	while (alloced_bufs < sess_queue_depth) {
+		mem_chunk = kzalloc(sizeof(*mem_chunk), GFP_KERNEL);
+		if (!mem_chunk) {
+			ERR_NP("Failed to allocate memory for memory chunk"
+			       " struct\n");
+			goto alloc_fail;
+		}
+
+		max_order = min(MAX_ORDER - 1,
+				get_order((sess_queue_depth - alloced_bufs) *
+					  rcv_buf_size));
+		for (alloc_order = max_order; alloc_order > rcv_buf_order;
+		     alloc_order--) {
+			cont_pages = alloc_pages(__GFP_NORETRY | __GFP_NOWARN |
+						 __GFP_ZERO, alloc_order);
+			if (cont_pages) {
+				DEB("Allocated order %d pages\n", alloc_order);
+				break;
+			}
+			DEB("Failed to allocate order %d pages\n", alloc_order);
+		}
+
+		if (cont_pages) {
+			void *recv_buf_start;
+
+			mem_chunk->order = alloc_order;
+			mem_chunk->addr = page_address(cont_pages);
+			list_add_tail(&mem_chunk->list, &pool->chunk_list);
+			alloced_size = (1 << alloc_order) * PAGE_SIZE;
+
+			DEB("Memory chunk size: %d, address: %p\n",
+			    alloced_size, mem_chunk->addr);
+
+			recv_buf_start = mem_chunk->addr;
+			while (alloced_size > rcv_buf_size &&
+			       alloced_bufs < sess_queue_depth) {
+				pool->rcv_bufs[alloced_bufs].buf =
+					recv_buf_start;
+				alloced_bufs++;
+				recv_buf_start += rcv_buf_size;
+				alloced_size -= rcv_buf_size;
+			}
+		} else {
+			/* if allocation of pages to fit multiple rcv_buf's
+			 * failed we fall back to alloc'ing exact number of
+			 * pages
+			 */
+			gfp_t gfp_mask = (GFP_KERNEL | __GFP_REPEAT |
+					  __GFP_ZERO);
+			void *addr = alloc_pages_exact(rcv_buf_size, gfp_mask);
+
+			if (!addr) {
+				ERR_NP("Failed to allocate memory for "
+				       " receive buffer (size %dB)\n",
+				       rcv_buf_size);
+				goto alloc_fail;
+			}
+
+			DEB("Alloced pages exact at %p for rcv_bufs[%d]\n",
+			    addr, alloced_bufs);
+
+			mem_chunk->addr = addr;
+			mem_chunk->order = IBTRS_MEM_CHUNK_NOORDER;
+			list_add_tail(&mem_chunk->list, &pool->chunk_list);
+
+			pool->rcv_bufs[alloced_bufs].buf = addr;
+			alloced_bufs++;
+		}
+	}
+
+	return pool;
+
+alloc_fail:
+	if (!list_empty(&pool->chunk_list)) {
+		struct ibtrs_mem_chunk *tmp;
+
+		list_for_each_entry_safe(mem_chunk, tmp, &pool->chunk_list,
+					 list) {
+			if (mem_chunk->order != IBTRS_MEM_CHUNK_NOORDER)
+				free_pages((unsigned long)mem_chunk->addr,
+					   mem_chunk->order);
+			else
+				free_pages_exact(mem_chunk->addr, rcv_buf_size);
+			list_del(&mem_chunk->list);
+			kfree(mem_chunk);
+		}
+	}
+	kfree(pool->rcv_bufs);
+	kfree(pool);
+	return NULL;
+}
+
+static struct ibtrs_rcv_buf_pool *__get_pool_from_list(void)
+{
+	struct ibtrs_rcv_buf_pool *pool = NULL;
+
+	if (!list_empty(&free_buf_pool_list)) {
+		DEB("Getting buf pool from pre-allocated list\n");
+		pool = list_first_entry(&free_buf_pool_list,
+					struct ibtrs_rcv_buf_pool, list);
+		list_del(&pool->list);
+		nr_free_buf_pool--;
+	}
+
+	return pool;
+}
+
+static void __put_pool_on_list(struct ibtrs_rcv_buf_pool *pool)
+{
+	list_add(&pool->list, &free_buf_pool_list);
+	nr_free_buf_pool++;
+	DEB("Put buf pool back to the free list (nr_free_buf_pool: %d)\n",
+	    nr_free_buf_pool);
+}
+
+static struct ibtrs_rcv_buf_pool *get_alloc_rcv_buf_pool(void)
+{
+	struct ibtrs_rcv_buf_pool *pool = NULL;
+
+	mutex_lock(&buf_pool_mutex);
+	if (nr_active_sessions >= pool_size_hi_wm) {
+		WARN_ON(nr_free_buf_pool || !list_empty(&free_buf_pool_list));
+		DEB("current nr_active_sessions (%d), pool_size_hi_wm (%d),"
+		    ", allocating.\n", nr_active_sessions, pool_size_hi_wm);
+		pool = alloc_rcv_buf_pool();
+	} else if (nr_total_buf_pool < pool_size_hi_wm) {
+		/* try to allocate new pool while used+free is less then
+		 * watermark
+		 */
+		DEB("nr_total_buf_pool (%d) smaller than pool_size_hi_wm (%d)"
+		    ", trying to allocate.\n", nr_total_buf_pool,
+		    pool_size_hi_wm);
+		pool = alloc_rcv_buf_pool();
+		if (pool)
+			nr_total_buf_pool++;
+		else
+			pool = __get_pool_from_list();
+	} else if (nr_total_buf_pool == pool_size_hi_wm) {
+		/* pool size has already reached watermark, check if there are
+		 * free pools on the list
+		 */
+		if (nr_free_buf_pool) {
+			pool = __get_pool_from_list();
+			WARN_ON(!pool);
+			DEB("Got pool from free list (nr_free_buf_pool: %d)\n",
+			    nr_free_buf_pool);
+		} else {
+			/* all pools are already being used */
+			DEB("No free pool on the list\n");
+			WARN_ON((nr_active_sessions != nr_total_buf_pool) ||
+				nr_free_buf_pool);
+			pool = alloc_rcv_buf_pool();
+		}
+	} else {
+		/* all possibilities should be covered */
+		WARN_ON(1);
+	}
+
+	if (pool)
+		nr_active_sessions++;
+
+	mutex_unlock(&buf_pool_mutex);
+
+	return pool;
+}
+
+static void free_recv_buf_pool(struct ibtrs_rcv_buf_pool *pool)
+{
+	struct ibtrs_mem_chunk *mem_chunk, *tmp;
+
+	DEB("Freeing memory chunks for %d receive buffers\n", sess_queue_depth);
+
+	list_for_each_entry_safe(mem_chunk, tmp, &pool->chunk_list, list) {
+		if (mem_chunk->order != IBTRS_MEM_CHUNK_NOORDER)
+			free_pages((unsigned long)mem_chunk->addr,
+				   mem_chunk->order);
+		else
+			free_pages_exact(mem_chunk->addr, rcv_buf_size);
+		list_del(&mem_chunk->list);
+		kfree(mem_chunk);
+	}
+
+	kfree(pool->rcv_bufs);
+	kfree(pool);
+}
+
+static void put_rcv_buf_pool(struct ibtrs_rcv_buf_pool *pool)
+{
+	mutex_lock(&buf_pool_mutex);
+	nr_active_sessions--;
+	if (nr_active_sessions >= pool_size_hi_wm) {
+		mutex_unlock(&buf_pool_mutex);
+		DEB("Freeing buf pool"
+		    " (nr_active_sessions: %d, pool_size_hi_wm: %d)\n",
+		    nr_active_sessions, pool_size_hi_wm);
+		free_recv_buf_pool(pool);
+	} else {
+		__put_pool_on_list(pool);
+		mutex_unlock(&buf_pool_mutex);
+	}
+}
+
+static void unreg_cont_bufs(struct ibtrs_session *sess)
+{
+	struct ibtrs_rcv_buf *buf;
+	int i;
+
+	DEB("Unregistering %d RDMA buffers\n", sess_queue_depth);
+	for (i = 0; i < sess_queue_depth; i++) {
+		buf = &sess->rcv_buf_pool->rcv_bufs[i];
+
+		ib_dma_unmap_single(sess->dev->device, buf->rdma_addr,
+				    rcv_buf_size, DMA_BIDIRECTIONAL);
+	}
+}
+
+static void release_cont_bufs(struct ibtrs_session *sess)
+{
+	unreg_cont_bufs(sess);
+	put_rcv_buf_pool(sess->rcv_buf_pool);
+	sess->rcv_buf_pool = NULL;
+}
+
+static int setup_cont_bufs(struct ibtrs_session *sess)
+{
+	struct ibtrs_rcv_buf *buf;
+	int i, err;
+
+	sess->rcv_buf_pool = get_alloc_rcv_buf_pool();
+	if (!sess->rcv_buf_pool) {
+		ERR(sess, "Failed to allocate receive buffers for session\n");
+		return -ENOMEM;
+	}
+
+	DEB("Mapping %d buffers for RDMA\n", sess->queue_depth);
+	for (i = 0; i < sess->queue_depth; i++) {
+		buf = &sess->rcv_buf_pool->rcv_bufs[i];
+
+		buf->rdma_addr = ib_dma_map_single(sess->dev->device, buf->buf,
+						   rcv_buf_size,
+						   DMA_BIDIRECTIONAL);
+		if (unlikely(ib_dma_mapping_error(sess->dev->device,
+						  buf->rdma_addr))) {
+			ERR_NP("Registering RDMA buf failed,"
+			       " DMA mapping failed\n");
+			err = -EIO;
+			goto err_map;
+		}
+	}
+
+	sess->off_len = 31 - ilog2(sess->queue_depth - 1);
+	sess->off_mask = (1 << sess->off_len) - 1;
+
+	INFO(sess, "Allocated %d %dKB RDMA receive buffers, %dKB in total\n",
+	     sess->queue_depth, rcv_buf_size >> 10,
+	     sess->queue_depth * rcv_buf_size >> 10);
+
+	return 0;
+
+err_map:
+	for (i = 0; i < sess->queue_depth; i++) {
+		buf = &sess->rcv_buf_pool->rcv_bufs[i];
+
+		if (buf->rdma_addr &&
+		    !ib_dma_mapping_error(sess->dev->device, buf->rdma_addr))
+			ib_dma_unmap_single(sess->dev->device, buf->rdma_addr,
+					    rcv_buf_size, DMA_BIDIRECTIONAL);
+	}
+	return err;
+}
+
+static void fill_ibtrs_msg_sess_open_resp(struct ibtrs_msg_sess_open_resp *msg,
+					  struct ibtrs_con *con)
+{
+	int i;
+
+	msg->hdr.type   = IBTRS_MSG_SESS_OPEN_RESP;
+	msg->hdr.tsize  = IBTRS_MSG_SESS_OPEN_RESP_LEN(con->sess->queue_depth);
+
+	msg->ver = con->sess->ver;
+	strlcpy(msg->hostname, hostname, sizeof(msg->hostname));
+	msg->cnt = con->sess->queue_depth;
+	msg->rkey = con->sess->dev->ib_sess.pd->unsafe_global_rkey;
+	msg->max_inflight_msg = con->sess->queue_depth;
+	msg->max_io_size = max_io_size;
+	msg->max_req_size = MAX_REQ_SIZE;
+	for (i = 0; i < con->sess->queue_depth; i++)
+		msg->addr[i] = con->sess->rcv_buf_pool->rcv_bufs[i].rdma_addr;
+}
+
+static void free_sess_rx_bufs(struct ibtrs_session *sess)
+{
+	int i;
+
+	if (sess->dummy_rx_iu) {
+		ibtrs_iu_free(sess->dummy_rx_iu, DMA_FROM_DEVICE,
+			      sess->dev->device);
+		sess->dummy_rx_iu = NULL;
+	}
+
+	if (sess->usr_rx_ring) {
+		for (i = 0; i < USR_CON_BUF_SIZE; ++i)
+			if (sess->usr_rx_ring[i])
+				ibtrs_iu_free(sess->usr_rx_ring[i],
+					      DMA_FROM_DEVICE,
+					      sess->dev->device);
+		kfree(sess->usr_rx_ring);
+		sess->usr_rx_ring = NULL;
+	}
+}
+
+static int alloc_sess_tx_bufs(struct ibtrs_session *sess)
+{
+	struct ibtrs_iu *iu;
+	struct ibtrs_ops_id *id;
+	struct ib_device *ib_dev = sess->dev->device;
+	int i;
+
+	sess->rdma_info_iu =
+		ibtrs_iu_alloc(0, IBTRS_MSG_SESS_OPEN_RESP_LEN(
+			       sess->queue_depth), GFP_KERNEL, ib_dev,
+			       DMA_TO_DEVICE, true);
+	if (unlikely(!sess->rdma_info_iu)) {
+		ERR_RL(sess, "Can't allocate transfer buffer for "
+			     "sess open resp\n");
+		return -ENOMEM;
+	}
+
+	sess->ops_ids = kcalloc(sess->queue_depth, sizeof(*sess->ops_ids),
+				GFP_KERNEL);
+	if (unlikely(!sess->ops_ids)) {
+		ERR_RL(sess, "Can't alloc ops_ids for the session\n");
+		goto err;
+	}
+
+	for (i = 0; i < sess->queue_depth; ++i) {
+		id = ibtrs_zalloc(sizeof(*id));
+		if (unlikely(!id)) {
+			ERR_RL(sess, "Can't alloc ops id for session\n");
+			goto err;
+		}
+		sess->ops_ids[i] = id;
+	}
+
+	for (i = 0; i < USR_MSG_CNT; ++i) {
+		iu = ibtrs_iu_alloc(i, MAX_REQ_SIZE, GFP_KERNEL,
+				    ib_dev, DMA_TO_DEVICE, true);
+		if (!iu) {
+			ERR_RL(sess, "Can't alloc tx bufs for user msgs\n");
+			goto err;
+		}
+		list_add(&iu->list, &sess->tx_bufs);
+	}
+
+	return 0;
+
+err:
+	free_sess_tx_bufs(sess);
+	return -ENOMEM;
+}
+
+static int alloc_sess_rx_bufs(struct ibtrs_session *sess)
+{
+	int i;
+
+	sess->dummy_rx_iu =
+		ibtrs_iu_alloc(0, IBTRS_HDR_LEN, GFP_KERNEL, sess->dev->device,
+			       DMA_FROM_DEVICE, true);
+	if (!sess->dummy_rx_iu) {
+		ERR(sess, "Failed to allocate dummy IU to receive "
+			  "immediate messages on io connections\n");
+		goto err;
+	}
+
+	sess->usr_rx_ring = kcalloc(USR_CON_BUF_SIZE,
+				    sizeof(*sess->usr_rx_ring), GFP_KERNEL);
+	if (!sess->usr_rx_ring) {
+		ERR(sess, "Alloc usr_rx_ring for session failed\n");
+		goto err;
+	}
+
+	for (i = 0; i < USR_CON_BUF_SIZE; ++i) {
+		sess->usr_rx_ring[i] =
+			ibtrs_iu_alloc(i, MAX_REQ_SIZE, GFP_KERNEL,
+				       sess->dev->device, DMA_FROM_DEVICE,
+				       true);
+		if (!sess->usr_rx_ring[i]) {
+			ERR(sess, "Failed to allocate iu for usr_rx_ring\n");
+			goto err;
+		}
+	}
+
+	return 0;
+
+err:
+	free_sess_rx_bufs(sess);
+	return -ENOMEM;
+}
+
+static int alloc_sess_bufs(struct ibtrs_session *sess)
+{
+	int err;
+
+	err = alloc_sess_rx_bufs(sess);
+	if (err)
+		return err;
+	else
+		return alloc_sess_tx_bufs(sess);
+}
+
+static int post_io_con_recv(struct ibtrs_con *con)
+{
+	int i, ret;
+
+	for (i = 0; i < con->sess->queue_depth; i++) {
+		ret = ibtrs_post_recv(con, con->sess->dummy_rx_iu);
+		if (unlikely(ret))
+			return ret;
+	}
+
+	return 0;
+}
+
+static int post_user_con_recv(struct ibtrs_con *con)
+{
+	int i, ret;
+
+	for (i = 0; i < USR_CON_BUF_SIZE; i++) {
+		struct ibtrs_iu *iu = con->sess->usr_rx_ring[i];
+
+		ret = ibtrs_post_recv(con, iu);
+		if (unlikely(ret))
+			return ret;
+	}
+
+	return 0;
+}
+
+static int post_recv(struct ibtrs_con *con)
+{
+	if (con->user)
+		return post_user_con_recv(con);
+	else
+		return post_io_con_recv(con);
+
+	return 0;
+}
+
+static void free_sess_bufs(struct ibtrs_session *sess)
+{
+	free_sess_rx_bufs(sess);
+	free_sess_tx_bufs(sess);
+}
+
+static int init_transfer_bufs(struct ibtrs_con *con)
+{
+	int err;
+	struct ibtrs_session *sess = con->sess;
+
+	if (con->user) {
+		err = alloc_sess_bufs(sess);
+		if (err) {
+			ERR(sess, "Alloc sess bufs failed: %d\n", err);
+			return err;
+		}
+	}
+
+	return post_recv(con);
+}
+
+static void process_rdma_write_req(struct ibtrs_con *con,
+				   struct ibtrs_msg_req_rdma_write *req,
+				   u32 buf_id, u32 off)
+{
+	int ret;
+	struct ibtrs_ops_id *id;
+	struct ibtrs_session *sess = con->sess;
+
+	if (unlikely(sess->state != SSM_STATE_CONNECTED ||
+		     con->state != CSM_STATE_CONNECTED)) {
+		ERR_RL(sess, "Processing RDMA-Write-Req request failed, "
+		       " session is disconnected, sess state %s,"
+		       " con state %s\n", ssm_state_str(sess->state),
+		       csm_state_str(con->state));
+		return;
+	}
+	ibtrs_srv_update_rdma_stats(&sess->stats, off, true);
+	id = sess->ops_ids[buf_id];
+	kfree(id->tx_wr);
+	kfree(id->tx_sg);
+	id->con		= con;
+	id->dir		= READ;
+	id->msg_id	= buf_id;
+	id->req		= req;
+	id->tx_wr	= kcalloc(req->sg_cnt, sizeof(*id->tx_wr), GFP_KERNEL);
+	id->tx_sg	= kcalloc(req->sg_cnt, sizeof(*id->tx_sg), GFP_KERNEL);
+	if (!id->tx_wr || !id->tx_sg) {
+		ERR_RL(sess, "Processing RDMA-Write-Req failed, work request "
+		       "or scatter gather allocation failed for msg_id %d\n",
+		       buf_id);
+		ret = -ENOMEM;
+		goto send_err_msg;
+	}
+
+	id->data_dma_addr = sess->rcv_buf_pool->rcv_bufs[buf_id].rdma_addr;
+	ret = srv_ops->rdma_ev(con->sess, sess->priv, id,
+			       IBTRS_SRV_RDMA_EV_WRITE_REQ,
+			       sess->rcv_buf_pool->rcv_bufs[buf_id].buf, off);
+
+	if (unlikely(ret)) {
+		ERR_RL(sess, "Processing RDMA-Write-Req failed, user "
+		       "module cb reported for msg_id %d, errno: %d\n",
+		       buf_id, ret);
+		goto send_err_msg;
+	}
+
+	return;
+
+send_err_msg:
+	ret = send_io_resp_imm(con, buf_id, ret);
+	if (ret < 0) {
+		ERR_RL(sess, "Sending err msg for failed RDMA-Write-Req"
+		       " failed, msg_id %d, errno: %d\n", buf_id, ret);
+		csm_schedule_event(con, CSM_EV_CON_ERROR);
+	}
+	ibtrs_srv_stats_dec_inflight(sess);
+}
+
+static void process_rdma_write(struct ibtrs_con *con,
+			       struct ibtrs_msg_rdma_write *req,
+			       u32 buf_id, u32 off)
+{
+	int ret;
+	struct ibtrs_ops_id *id;
+	struct ibtrs_session *sess = con->sess;
+
+	if (unlikely(sess->state != SSM_STATE_CONNECTED ||
+		     con->state != CSM_STATE_CONNECTED)) {
+		ERR_RL(sess, "Processing RDMA-Write request failed, "
+		       " session is disconnected, sess state %s,"
+		       " con state %s\n", ssm_state_str(sess->state),
+		       csm_state_str(con->state));
+		return;
+	}
+	ibtrs_srv_update_rdma_stats(&sess->stats, off, false);
+	id = con->sess->ops_ids[buf_id];
+	id->con    = con;
+	id->dir    = WRITE;
+	id->msg_id = buf_id;
+
+	ret = srv_ops->rdma_ev(sess, sess->priv, id, IBTRS_SRV_RDMA_EV_RECV,
+			       sess->rcv_buf_pool->rcv_bufs[buf_id].buf, off);
+	if (unlikely(ret)) {
+		ERR_RL(sess, "Processing RDMA-Write failed, user module"
+		       " callback reports errno: %d\n", ret);
+		goto send_err_msg;
+	}
+
+	return;
+
+send_err_msg:
+	ret = send_io_resp_imm(con, buf_id, ret);
+	if (ret < 0) {
+		ERR_RL(sess, "Processing RDMA-Write failed, sending I/O"
+		       " response failed, msg_id %d, errno: %d\n",
+		       buf_id, ret);
+		csm_schedule_event(con, CSM_EV_CON_ERROR);
+	}
+	ibtrs_srv_stats_dec_inflight(sess);
+}
+
+static int ibtrs_send_usr_msg_ack(struct ibtrs_con *con)
+{
+	struct ibtrs_session *sess;
+	int err;
+
+	sess = con->sess;
+
+	if (unlikely(con->state != CSM_STATE_CONNECTED)) {
+		ERR_RL(sess, "Sending user msg ack failed, disconnected"
+			" Connection state is %s\n", csm_state_str(con->state));
+		return -ECOMM;
+	}
+	DEB("Sending user message ack\n");
+	err = ibtrs_write_empty_imm(con->ib_con.qp, UINT_MAX - 1,
+				    IB_SEND_SIGNALED);
+	if (unlikely(err)) {
+		ERR_RL(sess, "Sending user Ack msg failed, errno: %d\n", err);
+		return err;
+	}
+
+	ibtrs_heartbeat_set_send_ts(&sess->heartbeat);
+	return 0;
+}
+
+static void process_msg_user(struct ibtrs_con *con,
+			     struct ibtrs_msg_user *msg)
+{
+	int len;
+	struct ibtrs_session *sess = con->sess;
+
+	len = msg->hdr.tsize - IBTRS_HDR_LEN;
+	if (unlikely(sess->state < SSM_STATE_CONNECTED || !sess->priv)) {
+		ERR_RL(sess, "Sending user msg failed, session isn't ready."
+			" Session state is %s\n", ssm_state_str(sess->state));
+		return;
+	}
+
+	srv_ops->recv(sess, sess->priv, msg->payl, len);
+
+	atomic64_inc(&sess->stats.user_ib_msgs.recv_msg_cnt);
+	atomic64_add(len, &sess->stats.user_ib_msgs.recv_size);
+}
+
+static void process_msg_user_ack(struct ibtrs_con *con)
+{
+	struct ibtrs_session *sess = con->sess;
+
+	atomic_inc(&sess->peer_usr_msg_bufs);
+	wake_up(&con->sess->mu_buf_wait_q);
+}
+
+static void ibtrs_handle_write(struct ibtrs_con *con, struct ibtrs_iu *iu,
+			       struct ibtrs_msg_hdr *hdr, u32 id, u32 off)
+{
+	struct ibtrs_session *sess = con->sess;
+	int ret;
+
+	if (unlikely(ibtrs_validate_message(sess->queue_depth, hdr))) {
+		ERR(sess,
+		    "Processing I/O failed, message validation failed\n");
+		ret = ibtrs_post_recv(con, iu);
+		if (unlikely(ret != 0))
+			ERR(sess,
+			    "Failed to post receive buffer to HCA, errno: %d\n",
+			    ret);
+		goto err;
+	}
+
+	DEB("recv completion, type 0x%02x, tag %u, id %u, off %u\n",
+	    hdr->type, iu->tag, id, off);
+	print_hex_dump_debug("", DUMP_PREFIX_OFFSET, 8, 1,
+			     hdr, IBTRS_HDR_LEN + 32, true);
+	ret = ibtrs_post_recv(con, iu);
+	if (unlikely(ret != 0)) {
+		ERR(sess, "Posting receive buffer to HCA failed, errno: %d\n",
+		    ret);
+		goto err;
+	}
+
+	switch (hdr->type) {
+	case IBTRS_MSG_RDMA_WRITE:
+		process_rdma_write(con, (struct ibtrs_msg_rdma_write *)hdr,
+				   id, off);
+		break;
+	case IBTRS_MSG_REQ_RDMA_WRITE:
+		process_rdma_write_req(con,
+				       (struct ibtrs_msg_req_rdma_write *)hdr,
+				       id, off);
+		break;
+	default:
+		ERR(sess, "Processing I/O request failed, "
+		    "unknown message type received: 0x%02x\n", hdr->type);
+		goto err;
+	}
+
+	return;
+
+err:
+	csm_schedule_event(con, CSM_EV_CON_ERROR);
+}
+
+static void msg_worker(struct work_struct *work)
+{
+	struct msg_work *w;
+	struct ibtrs_con *con;
+	struct ibtrs_msg_user *msg;
+
+	w = container_of(work, struct msg_work, work);
+	con = w->con;
+	msg = w->msg;
+	kvfree(w);
+	process_msg_user(con, msg);
+	kvfree(msg);
+}
+
+static int ibtrs_schedule_msg(struct ibtrs_con *con, struct ibtrs_msg_user *msg)
+{
+	struct msg_work *w;
+
+	w = ibtrs_malloc(sizeof(*w));
+	if (!w)
+		return -ENOMEM;
+
+	w->con = con;
+	w->msg = ibtrs_malloc(msg->hdr.tsize);
+	if (!w->msg) {
+		kvfree(w);
+		return -ENOMEM;
+	}
+	memcpy(w->msg, msg, msg->hdr.tsize);
+	INIT_WORK(&w->work, msg_worker);
+	queue_work(con->sess->msg_wq, &w->work);
+	return 0;
+}
+
+static void ibtrs_handle_recv(struct ibtrs_con *con,  struct ibtrs_iu *iu)
+{
+	struct ibtrs_msg_hdr *hdr;
+	struct ibtrs_msg_sess_info *req;
+	struct ibtrs_session *sess = con->sess;
+	int ret;
+	u8 type;
+
+	hdr = (struct ibtrs_msg_hdr *)iu->buf;
+	if (unlikely(ibtrs_validate_message(sess->queue_depth, hdr)))
+		goto err1;
+
+	type = hdr->type;
+
+	DEB("recv completion, type 0x%02x, tag %u\n",
+	    type, iu->tag);
+	print_hex_dump_debug("", DUMP_PREFIX_OFFSET, 8, 1,
+			     iu->buf, IBTRS_HDR_LEN, true);
+
+	switch (type) {
+	case IBTRS_MSG_USER:
+		ret = ibtrs_schedule_msg(con, iu->buf);
+		if (unlikely(ret)) {
+			ERR_RL(sess, "Scheduling worker of user message "
+			       "to user module failed, errno: %d\n", ret);
+			goto err1;
+		}
+		ret = ibtrs_post_recv(con, iu);
+		if (unlikely(ret)) {
+			ERR_RL(sess, "Posting receive buffer of user message "
+			       "to HCA failed, errno: %d\n", ret);
+			goto err2;
+		}
+		ret = ibtrs_send_usr_msg_ack(con);
+		if (unlikely(ret)) {
+			ERR_RL(sess, "Sending ACK for user message failed, "
+			       "errno: %d\n", ret);
+			goto err2;
+		}
+		return;
+	case IBTRS_MSG_SESS_INFO:
+		ret = ibtrs_post_recv(con, iu);
+		if (unlikely(ret)) {
+			ERR_RL(sess, "Posting receive buffer of sess info "
+			       "to HCA failed, errno: %d\n", ret);
+			goto err2;
+		}
+		req = (struct ibtrs_msg_sess_info *)hdr;
+		strlcpy(sess->hostname, req->hostname, sizeof(sess->hostname));
+		return;
+	default:
+		ERR(sess, "Processing received message failed, "
+		    "unknown type: 0x%02x\n", type);
+		goto err1;
+	}
+
+err1:
+	ibtrs_post_recv(con, iu);
+err2:
+	ERR(sess, "Failed to process IBTRS message\n");
+	csm_schedule_event(con, CSM_EV_CON_ERROR);
+}
+
+static void add_con_to_list(struct ibtrs_session *sess, struct ibtrs_con *con)
+{
+	mutex_lock(&sess->lock);
+	list_add_tail(&con->list, &sess->con_list);
+	mutex_unlock(&sess->lock);
+}
+
+static void remove_con_from_list(struct ibtrs_con *con)
+{
+	if (WARN_ON(!con->sess))
+		return;
+	mutex_lock(&con->sess->lock);
+	list_del(&con->list);
+	mutex_unlock(&con->sess->lock);
+}
+
+static void close_con(struct ibtrs_con *con)
+{
+	struct ibtrs_session *sess = con->sess;
+
+	DEB("Closing connection %p\n", con);
+
+	if (con->user)
+		cancel_delayed_work(&sess->send_heartbeat_dwork);
+
+	cancel_work_sync(&con->cq_work);
+	destroy_workqueue(con->rdma_resp_wq);
+
+	ib_con_destroy(&con->ib_con);
+	if (!con->user && !con->device_being_removed)
+		rdma_destroy_id(con->cm_id);
+
+	destroy_workqueue(con->cq_wq);
+
+	if (con->user) {
+		/* notify possible user msg ACK thread waiting for a tx iu or
+		 * user msg buffer so they can check the connection state, give
+		 * up waiting and put back any tx_iu reserved
+		 */
+		wake_up(&sess->mu_buf_wait_q);
+		wake_up(&sess->mu_iu_wait_q);
+		destroy_workqueue(sess->msg_wq);
+	}
+
+	con->sess->active_cnt--;
+}
+
+static void destroy_con(struct ibtrs_con *con)
+{
+	remove_con_from_list(con);
+	kvfree(con);
+}
+
+static void destroy_sess(struct kref *kref)
+{
+	struct ibtrs_session *sess = container_of(kref, struct ibtrs_session,
+						  kref);
+	struct ibtrs_con *con, *con_next;
+
+	if (sess->cm_id)
+		rdma_destroy_id(sess->cm_id);
+
+	destroy_workqueue(sess->sm_wq);
+
+	list_for_each_entry_safe(con, con_next, &sess->con_list, list)
+		destroy_con(con);
+
+	mutex_lock(&sess_mutex);
+	list_del(&sess->list);
+	mutex_unlock(&sess_mutex);
+	wake_up(&sess_list_waitq);
+
+	INFO(sess, "Session is closed\n");
+	kvfree(sess);
+}
+
+int ibtrs_srv_sess_get(struct ibtrs_session *sess)
+{
+	return kref_get_unless_zero(&sess->kref);
+}
+
+void ibtrs_srv_sess_put(struct ibtrs_session *sess)
+{
+	kref_put(&sess->kref, destroy_sess);
+}
+
+static void sess_put_worker(struct work_struct *work)
+{
+	struct sess_put_work *w = container_of(work, struct sess_put_work,
+					       work);
+
+	ibtrs_srv_sess_put(w->sess);
+	kvfree(w);
+}
+
+static void schedule_sess_put(struct ibtrs_session *sess)
+{
+	struct sess_put_work *w;
+
+	while (true) {
+		w = ibtrs_malloc(sizeof(*w));
+		if (w)
+			break;
+		cond_resched();
+	}
+
+	/* Since we can be closing this session from a session workqueue,
+	 * we need to schedule another work on the global workqueue to put the
+	 * session, which can destroy the session workqueue and free the
+	 * session.
+	 */
+	w->sess = sess;
+	INIT_WORK(&w->work, sess_put_worker);
+	queue_work(destroy_wq, &w->work);
+}
+
+static void ibtrs_srv_sysfs_put_worker(struct work_struct *work)
+{
+	struct ibtrs_srv_sysfs_put_work *w;
+
+	w = container_of(work, struct ibtrs_srv_sysfs_put_work, work);
+	kobject_put(&w->sess->kobj_stats);
+	kobject_put(&w->sess->kobj);
+
+	kvfree(w);
+}
+
+static void ibtrs_srv_schedule_sysfs_put(struct ibtrs_session *sess)
+{
+	struct ibtrs_srv_sysfs_put_work *w = ibtrs_malloc(sizeof(*w));
+
+	if (WARN_ON(!w))
+		return;
+
+	w->sess	= sess;
+
+	INIT_WORK(&w->work, ibtrs_srv_sysfs_put_worker);
+	queue_work(destroy_wq, &w->work);
+}
+
+static void ibtrs_free_dev(struct kref *ref)
+{
+	struct ibtrs_device *ndev =
+		container_of(ref, struct ibtrs_device, ref);
+
+	mutex_lock(&device_list_mutex);
+	list_del(&ndev->entry);
+	mutex_unlock(&device_list_mutex);
+	ib_session_destroy(&ndev->ib_sess);
+	if (ndev->ib_sess_destroy_completion)
+		complete_all(ndev->ib_sess_destroy_completion);
+	kfree(ndev);
+}
+
+static struct ibtrs_device *
+ibtrs_find_get_device(struct rdma_cm_id *cm_id)
+{
+	struct ibtrs_device *ndev;
+	int err;
+
+	mutex_lock(&device_list_mutex);
+	list_for_each_entry(ndev, &device_list, entry) {
+		if (ndev->device->node_guid == cm_id->device->node_guid &&
+		    kref_get_unless_zero(&ndev->ref))
+			goto out_unlock;
+	}
+
+	ndev = kzalloc(sizeof(*ndev), GFP_KERNEL);
+	if (!ndev)
+		goto out_err;
+
+	ndev->device = cm_id->device;
+	kref_init(&ndev->ref);
+
+	err = ib_session_init(cm_id->device, &ndev->ib_sess);
+	if (err)
+		goto out_free;
+
+	list_add(&ndev->entry, &device_list);
+	DEB("added %s.\n", ndev->device->name);
+out_unlock:
+	mutex_unlock(&device_list_mutex);
+	return ndev;
+
+out_free:
+	kfree(ndev);
+out_err:
+	mutex_unlock(&device_list_mutex);
+	return NULL;
+}
+
+static void ibtrs_srv_destroy_ib_session(struct ibtrs_session *sess)
+{
+	release_cont_bufs(sess);
+	free_sess_bufs(sess);
+	kref_put(&sess->dev->ref, ibtrs_free_dev);
+}
+
+static void process_err_wc(struct ibtrs_con *con, struct ib_wc *wc)
+{
+	struct ibtrs_iu *iu;
+
+	if (wc->wr_id == (uintptr_t)&con->ib_con.beacon) {
+		DEB("beacon received for con %p\n", con);
+		csm_schedule_event(con, CSM_EV_BEACON_COMPLETED);
+		return;
+	}
+
+	/* only wc->wr_id is ensured to be correct in erroneous WCs,
+	 * we can't rely on wc->opcode, use iu->direction to determine if it's
+	 * an tx or rx IU
+	 */
+	iu = (struct ibtrs_iu *)wc->wr_id;
+	if (iu && iu->direction == DMA_TO_DEVICE &&
+	    iu != con->sess->rdma_info_iu)
+		put_tx_iu(con->sess, iu);
+
+	if (wc->status != IB_WC_WR_FLUSH_ERR ||
+	    (con->state != CSM_STATE_CLOSING &&
+	     con->state != CSM_STATE_FLUSHING)) {
+		/* suppress flush errors when the connection has
+		 * just called rdma_disconnect() and is in
+		 * DISCONNECTING state waiting for the second
+		 * CM_DISCONNECTED event
+		 */
+		ERR_RL(con->sess, "%s (wr_id: 0x%llx,"
+		       " type: %s, vendor_err: 0x%x, len: %u)\n",
+		       ib_wc_status_msg(wc->status), wc->wr_id,
+		       ib_wc_opcode_str(wc->opcode),
+		       wc->vendor_err, wc->byte_len);
+	}
+	csm_schedule_event(con, CSM_EV_CON_ERROR);
+}
+
+static int process_wcs(struct ibtrs_con *con, struct ib_wc *wcs, size_t len)
+{
+	int i, ret;
+	struct ibtrs_iu *iu;
+	struct ibtrs_session *sess = con->sess;
+
+	for (i = 0; i < len; i++) {
+		struct ib_wc wc = wcs[i];
+
+		if (unlikely(wc.status != IB_WC_SUCCESS)) {
+			process_err_wc(con, &wc);
+			continue;
+		}
+
+		/* DEB("cq complete with wr_id 0x%llx, len %u "
+		 *  "status %d (%s) type %d (%s)\n", wc.wr_id,
+		 *  wc.byte_len, wc.status, ib_wc_status_msg(wc.status),
+		 *  wc.opcode, ib_wc_opcode_str(wc.opcode));
+		 */
+
+		switch (wc.opcode) {
+		case IB_WC_SEND:
+			iu = (struct ibtrs_iu *)(uintptr_t)wc.wr_id;
+			if (iu == con->sess->rdma_info_iu)
+				break;
+			put_tx_iu(sess, iu);
+			if (con->user)
+				wake_up(&sess->mu_iu_wait_q);
+			break;
+
+		case IB_WC_RECV_RDMA_WITH_IMM: {
+			u32 imm, id, off;
+			struct ibtrs_msg_hdr *hdr;
+
+			ibtrs_set_last_heartbeat(&sess->heartbeat);
+
+			iu = (struct ibtrs_iu *)(uintptr_t)wc.wr_id;
+			imm = be32_to_cpu(wc.ex.imm_data);
+			if (imm == UINT_MAX) {
+				ret = ibtrs_post_recv(con, iu);
+				if (unlikely(ret != 0)) {
+					ERR(sess, "post receive buffer failed,"
+					    " errno: %d\n", ret);
+					return ret;
+				}
+				break;
+			} else if (imm == UINT_MAX - 1) {
+				ret = ibtrs_post_recv(con, iu);
+				if (unlikely(ret))
+					ERR_RL(sess, "Posting receive buffer of"
+					       " user Ack msg to HCA failed,"
+					       " errno: %d\n", ret);
+				process_msg_user_ack(con);
+				break;
+			}
+			id = imm >> sess->off_len;
+			off = imm & sess->off_mask;
+
+			if (id > sess->queue_depth || off > rcv_buf_size) {
+				ERR(sess, "Processing I/O failed, contiguous "
+				    "buf addr is out of reserved area\n");
+				ret = ibtrs_post_recv(con, iu);
+				if (unlikely(ret != 0))
+					ERR(sess, "Processing I/O failed, "
+					    "post receive buffer failed, "
+					    "errno: %d\n", ret);
+				return -EIO;
+			}
+
+			hdr = (struct ibtrs_msg_hdr *)
+				(sess->rcv_buf_pool->rcv_bufs[id].buf + off);
+
+			ibtrs_handle_write(con, iu, hdr, id, off);
+			break;
+		}
+
+		case IB_WC_RDMA_WRITE:
+			break;
+
+		case IB_WC_RECV: {
+			struct ibtrs_msg_hdr *hdr;
+
+			ibtrs_set_last_heartbeat(&sess->heartbeat);
+			iu = (struct ibtrs_iu *)(uintptr_t)wc.wr_id;
+			hdr = (struct ibtrs_msg_hdr *)iu->buf;
+			ibtrs_deb_msg_hdr("Received: ", hdr);
+			ibtrs_handle_recv(con, iu);
+			break;
+		}
+
+		default:
+			ERR(sess, "Processing work completion failed,"
+			    " WC has unknown opcode: %s\n",
+			    ib_wc_opcode_str(wc.opcode));
+			return -EINVAL;
+		}
+	}
+	return 0;
+}
+
+static void ibtrs_srv_update_wc_stats(struct ibtrs_con *con, int cnt)
+{
+	int old_max = atomic_read(&con->sess->stats.wc_comp.max_wc_cnt);
+	int act_max;
+
+	while (cnt > old_max) {
+		act_max = atomic_cmpxchg(&con->sess->stats.wc_comp.max_wc_cnt,
+					 old_max, cnt);
+		if (likely(act_max == old_max))
+			break;
+		old_max = act_max;
+	}
+
+	atomic64_inc(&con->sess->stats.wc_comp.calls);
+	atomic64_add(cnt, &con->sess->stats.wc_comp.total_wc_cnt);
+}
+
+static int get_process_wcs(struct ibtrs_con *con, int *total_cnt)
+{
+	int cnt, err;
+
+	do {
+		cnt = ib_poll_cq(con->ib_con.cq, ARRAY_SIZE(con->wcs),
+				 con->wcs);
+		if (unlikely(cnt < 0)) {
+			ERR(con->sess, "Polling completion queue failed, "
+			    "errno: %d\n", cnt);
+			return cnt;
+		}
+
+		if (likely(cnt > 0)) {
+			err = process_wcs(con, con->wcs, cnt);
+			*total_cnt += cnt;
+			if (unlikely(err))
+				return err;
+		}
+	} while (cnt > 0);
+
+	return 0;
+}
+
+static void wrapper_handle_cq_comp(struct work_struct *work)
+{
+	int err;
+	struct ibtrs_con *con = container_of(work, struct ibtrs_con, cq_work);
+	struct ibtrs_session *sess = con->sess;
+	int total_cnt = 0;
+
+	if (unlikely(con->state == CSM_STATE_CLOSED)) {
+		ERR(sess, "Retrieving work completions from completion"
+		    " queue failed, connection is disconnected\n");
+		goto error;
+	}
+
+	err = get_process_wcs(con, &total_cnt);
+	if (unlikely(err))
+		goto error;
+
+	while ((err = ib_req_notify_cq(con->ib_con.cq, IB_CQ_NEXT_COMP |
+				       IB_CQ_REPORT_MISSED_EVENTS)) > 0) {
+		DEB("Missed %d CQ notifications, processing missed WCs...\n",
+		    err);
+		err = get_process_wcs(con, &total_cnt);
+		if (unlikely(err))
+			goto error;
+	}
+
+	if (unlikely(err))
+		goto error;
+
+	ibtrs_srv_update_wc_stats(con, total_cnt);
+	return;
+
+error:
+	csm_schedule_event(con, CSM_EV_CON_ERROR);
+}
+
+static void cq_event_handler(struct ib_cq *cq, void *ctx)
+{
+	struct ibtrs_con *con = ctx;
+
+	/* queue_work() can return False here.
+	 * The work can be already queued, When CQ notifications were already
+	 * activiated and are activated again after the beacon was posted.
+	 */
+	if (con->state != CSM_STATE_CLOSED)
+		queue_work(con->cq_wq, &con->cq_work);
+}
+
+static int accept(struct ibtrs_con *con)
+{
+	struct rdma_conn_param conn_param;
+	int ret;
+	struct ibtrs_session *sess = con->sess;
+
+	memset(&conn_param, 0, sizeof(conn_param));
+	conn_param.retry_count = retry_count;
+
+	if (con->user)
+		conn_param.rnr_retry_count = 7;
+
+	ret = rdma_accept(con->cm_id, &conn_param);
+	if (ret) {
+		ERR(sess, "Accepting RDMA connection request failed,"
+		    " errno: %d\n", ret);
+		return ret;
+	}
+
+	return 0;
+}
+
+static struct ibtrs_session *
+__create_sess(struct rdma_cm_id *cm_id, const struct ibtrs_msg_sess_open *req)
+{
+	struct ibtrs_session *sess;
+	int err;
+
+	sess = ibtrs_zalloc(sizeof(*sess));
+	if (!sess) {
+		err = -ENOMEM;
+		goto out;
+	}
+
+	err = ibtrs_addr_to_str(&cm_id->route.addr.dst_addr, sess->addr,
+				sizeof(sess->addr));
+	if (err < 0)
+		goto err1;
+
+	sess->est_cnt = 0;
+	sess->state_in_sysfs = false;
+	sess->cur_cq_vector = -1;
+	INIT_LIST_HEAD(&sess->con_list);
+	mutex_init(&sess->lock);
+
+	INIT_LIST_HEAD(&sess->tx_bufs);
+	spin_lock_init(&sess->tx_bufs_lock);
+
+	err = ib_get_max_wr_queue_size(cm_id->device);
+	if (err < 0)
+		goto err1;
+
+	sess->wq_size = err - 1;
+
+	sess->queue_depth		= sess_queue_depth;
+	sess->con_cnt			= req->con_cnt;
+	sess->ver			= min_t(u8, req->ver, IBTRS_VERSION);
+	sess->primary_port_num		= cm_id->port_num;
+
+	init_waitqueue_head(&sess->mu_iu_wait_q);
+	init_waitqueue_head(&sess->mu_buf_wait_q);
+	ibtrs_set_heartbeat_timeout(&sess->heartbeat,
+				    default_heartbeat_timeout_ms <
+				    MIN_HEARTBEAT_TIMEOUT_MS ?
+				    MIN_HEARTBEAT_TIMEOUT_MS :
+				    default_heartbeat_timeout_ms);
+	atomic64_set(&sess->heartbeat.send_ts_ms, 0);
+	atomic64_set(&sess->heartbeat.recv_ts_ms, 0);
+	sess->heartbeat.addr = sess->addr;
+	sess->heartbeat.hostname = sess->hostname;
+
+	atomic_set(&sess->peer_usr_msg_bufs, USR_MSG_CNT);
+	sess->dev = ibtrs_find_get_device(cm_id);
+	if (!sess->dev) {
+		err = -ENOMEM;
+		WRN(sess, "Failed to alloc ibtrs_device\n");
+		goto err1;
+	}
+	err = setup_cont_bufs(sess);
+	if (err)
+		goto err2;
+
+	memcpy(sess->uuid, req->uuid, IBTRS_UUID_SIZE);
+	err = ssm_init(sess);
+	if (err) {
+		WRN(sess, "Failed to initialize the session state machine\n");
+		goto err3;
+	}
+
+	kref_init(&sess->kref);
+	init_waitqueue_head(&sess->bufs_wait);
+
+	list_add(&sess->list, &sess_list);
+	INFO(sess, "IBTRS Session created (queue depth: %d)\n",
+	     sess->queue_depth);
+
+	return sess;
+
+err3:
+	release_cont_bufs(sess);
+err2:
+	kref_put(&sess->dev->ref, ibtrs_free_dev);
+err1:
+	kvfree(sess);
+out:
+	return ERR_PTR(err);
+}
+
+inline const char *ibtrs_srv_get_sess_hostname(struct ibtrs_session *sess)
+{
+	return sess->hostname;
+}
+EXPORT_SYMBOL(ibtrs_srv_get_sess_hostname);
+
+inline const char *ibtrs_srv_get_sess_addr(struct ibtrs_session *sess)
+{
+	return sess->addr;
+}
+EXPORT_SYMBOL(ibtrs_srv_get_sess_addr);
+
+inline int ibtrs_srv_get_sess_qdepth(struct ibtrs_session *sess)
+{
+	return sess->queue_depth;
+}
+EXPORT_SYMBOL(ibtrs_srv_get_sess_qdepth);
+
+static struct ibtrs_session *__find_active_sess(const char *uuid)
+{
+	struct ibtrs_session *n;
+
+	list_for_each_entry(n, &sess_list, list) {
+		if (!memcmp(n->uuid, uuid, sizeof(n->uuid)) &&
+		    n->state != SSM_STATE_CLOSING &&
+		    n->state != SSM_STATE_CLOSED)
+			return n;
+	}
+
+	return NULL;
+}
+
+static int rdma_con_reject(struct rdma_cm_id *cm_id, s16 errno)
+{
+	struct ibtrs_msg_error msg;
+	int ret;
+
+	memset(&msg, 0, sizeof(msg));
+	msg.hdr.type	= IBTRS_MSG_ERROR;
+	msg.hdr.tsize	= sizeof(msg);
+	msg.errno	= errno;
+
+	ret = rdma_reject(cm_id, &msg, sizeof(msg));
+	if (ret)
+		ERR_NP("Rejecting RDMA connection request failed, errno: %d\n",
+		       ret);
+
+	return ret;
+}
+
+static int find_next_bit_ring(int cur)
+{
+	int v = cpumask_next(cur, &cq_affinity_mask);
+
+	if (v >= nr_cpu_ids)
+		v = cpumask_first(&cq_affinity_mask);
+	return v;
+}
+
+static int ibtrs_srv_get_next_cq_vector(struct ibtrs_session *sess)
+{
+	sess->cur_cq_vector = find_next_bit_ring(sess->cur_cq_vector);
+
+	return sess->cur_cq_vector;
+}
+
+static void ssm_create_con_worker(struct work_struct *work)
+{
+	struct ssm_create_con_work *ssm_w =
+			container_of(work, struct ssm_create_con_work, work);
+	struct ibtrs_session *sess = ssm_w->sess;
+	struct rdma_cm_id *cm_id = ssm_w->cm_id;
+	bool user = ssm_w->user;
+	struct ibtrs_con *con;
+	int ret;
+	u16 cq_size, wr_queue_size;
+
+	kvfree(ssm_w);
+
+	if (sess->state == SSM_STATE_CLOSING ||
+	    sess->state == SSM_STATE_CLOSED) {
+		WRN(sess, "Creating connection failed, "
+		    "session is being closed\n");
+		ret = -ECOMM;
+		goto err_reject;
+	}
+
+	con = ibtrs_zalloc(sizeof(*con));
+	if (!con) {
+		ERR(sess, "Creating connection failed, "
+		    "can't allocate memory for connection\n");
+		ret = -ENOMEM;
+		goto err_reject;
+	}
+
+	con->cm_id			= cm_id;
+	con->sess			= sess;
+	con->user			= user;
+	con->device_being_removed	= false;
+
+	atomic_set(&con->wr_cnt, 0);
+	if (con->user) {
+		cq_size		= USR_CON_BUF_SIZE + 1;
+		wr_queue_size	= USR_CON_BUF_SIZE + 1;
+	} else {
+		cq_size		= con->sess->queue_depth;
+		wr_queue_size	= sess->wq_size;
+	}
+
+	con->cq_vector = ibtrs_srv_get_next_cq_vector(sess);
+
+	con->ib_con.addr = sess->addr;
+	con->ib_con.hostname = sess->hostname;
+	ret = ib_con_init(&con->ib_con, con->cm_id,
+			  1, cq_event_handler, con, con->cq_vector, cq_size,
+			  wr_queue_size, &con->sess->dev->ib_sess);
+	if (ret)
+		goto err_init;
+
+	INIT_WORK(&con->cq_work, wrapper_handle_cq_comp);
+	if (con->user)
+		con->cq_wq = alloc_ordered_workqueue("%s",
+						     WQ_HIGHPRI,
+						     "ibtrs_srv_wq");
+	else
+		con->cq_wq = alloc_workqueue("%s",
+					     WQ_CPU_INTENSIVE | WQ_HIGHPRI, 0,
+					     "ibtrs_srv_wq");
+	if (!con->cq_wq) {
+		ERR(sess, "Creating connection failed, can't allocate "
+		    "work queue for completion queue, errno: %d\n", ret);
+		goto err_wq1;
+	}
+
+	con->rdma_resp_wq = alloc_workqueue("%s", 0, WQ_HIGHPRI,
+					    "ibtrs_rdma_resp");
+
+	if (!con->rdma_resp_wq) {
+		ERR(sess, "Creating connection failed, can't allocate"
+		    " work queue for send response, errno: %d\n", ret);
+		goto err_wq2;
+	}
+
+	ret = init_transfer_bufs(con);
+	if (ret) {
+		ERR(sess, "Creating connection failed, can't init"
+		    " transfer buffers, errno: %d\n", ret);
+		goto err_buf;
+	}
+
+	csm_init(con);
+	add_con_to_list(sess, con);
+
+	cm_id->context = con;
+	if (con->user) {
+		con->sess->msg_wq = alloc_ordered_workqueue("sess_msg_wq", 0);
+		if (!con->sess->msg_wq) {
+			ERR(con->sess, "Failed to create user message"
+			    " workqueue\n");
+			ret = -ENOMEM;
+			goto err_accept;
+		}
+	}
+
+	DEB("accept request\n");
+	ret = accept(con);
+	if (ret)
+		goto err_msg;
+
+	if (con->user)
+		con->sess->cm_id = cm_id;
+
+	con->sess->active_cnt++;
+
+	return;
+err_msg:
+	if (con->user)
+		destroy_workqueue(con->sess->msg_wq);
+err_accept:
+	cm_id->context = NULL;
+	remove_con_from_list(con);
+err_buf:
+	destroy_workqueue(con->rdma_resp_wq);
+err_wq2:
+	destroy_workqueue(con->cq_wq);
+err_wq1:
+	ib_con_destroy(&con->ib_con);
+err_init:
+	kvfree(con);
+err_reject:
+	rdma_destroy_id(cm_id);
+
+	ssm_schedule_event(sess, SSM_EV_CON_EST_ERR);
+}
+
+static int ssm_schedule_create_con(struct ibtrs_session *sess,
+				   struct rdma_cm_id *cm_id,
+				   bool user)
+{
+	struct ssm_create_con_work *w;
+
+	w = ibtrs_malloc(sizeof(*w));
+	if (!w)
+		return -ENOMEM;
+
+	w->sess		= sess;
+	w->cm_id	= cm_id;
+	w->user		= user;
+	INIT_WORK(&w->work, ssm_create_con_worker);
+	queue_work(sess->sm_wq, &w->work);
+
+	return 0;
+}
+
+static int rdma_con_establish(struct rdma_cm_id *cm_id, const void *data,
+			      size_t size)
+{
+	struct ibtrs_session *sess;
+	int ret;
+	const char *uuid = NULL;
+	const struct ibtrs_msg_hdr *hdr = data;
+	bool user = false;
+
+	if (unlikely(!srv_ops_are_valid(srv_ops))) {
+		ERR_NP("Establishing connection failed, "
+		       "no user module registered!\n");
+		ret = -ECOMM;
+		goto err_reject;
+	}
+
+	if (unlikely((size < sizeof(struct ibtrs_msg_con_open)) ||
+		     (size < sizeof(struct ibtrs_msg_sess_open)) ||
+		     ibtrs_validate_message(0, hdr))) {
+		ERR_NP("Establishing connection failed, "
+		       "connection request payload size unexpected "
+		       "%zu != %lu or %lu\n", size,
+		       sizeof(struct ibtrs_msg_con_open),
+		       sizeof(struct ibtrs_msg_sess_open));
+		ret = -EINVAL;
+		goto err_reject;
+	}
+
+	if (hdr->type == IBTRS_MSG_SESS_OPEN)
+		uuid = ((struct ibtrs_msg_sess_open *)data)->uuid;
+	else if (hdr->type == IBTRS_MSG_CON_OPEN)
+		uuid = ((struct ibtrs_msg_con_open *)data)->uuid;
+
+	mutex_lock(&sess_mutex);
+	sess = __find_active_sess(uuid);
+	if (sess) {
+		if (unlikely(hdr->type == IBTRS_MSG_SESS_OPEN)) {
+			INFO(sess, "Connection request rejected, "
+			     "session already exists\n");
+			mutex_unlock(&sess_mutex);
+			ret = -EEXIST;
+			goto err_reject;
+		}
+		if (!ibtrs_srv_sess_get(sess)) {
+			INFO(sess, "Connection request rejected,"
+			     " session is being closed\n");
+			mutex_unlock(&sess_mutex);
+			ret = -EINVAL;
+			goto err_reject;
+		}
+	} else {
+		if (unlikely(hdr->type == IBTRS_MSG_CON_OPEN)) {
+			mutex_unlock(&sess_mutex);
+			INFO_NP("Connection request rejected,"
+				" received con_open msg but no active session"
+				" exists.\n");
+			ret = -EINVAL;
+			goto err_reject;
+		}
+
+		sess = __create_sess(cm_id, (struct ibtrs_msg_sess_open *)data);
+		if (IS_ERR(sess)) {
+			mutex_unlock(&sess_mutex);
+			ret = PTR_ERR(sess);
+			ERR_NP("Establishing connection failed, "
+			       "creating local session resource failed, errno:"
+			       " %d\n", ret);
+			goto err_reject;
+		}
+		ibtrs_srv_sess_get(sess);
+		user = true;
+	}
+
+	mutex_unlock(&sess_mutex);
+
+	ret = ssm_schedule_create_con(sess, cm_id, user);
+	if (ret) {
+		ERR(sess, "Unable to schedule creation of connection,"
+		    " session will be closed.\n");
+		goto err_close;
+	}
+
+	ibtrs_srv_sess_put(sess);
+	return 0;
+
+err_close:
+	ssm_schedule_event(sess, SSM_EV_CON_EST_ERR);
+	ibtrs_srv_sess_put(sess);
+err_reject:
+	rdma_con_reject(cm_id, ret);
+	return ret;
+}
+
+static int ibtrs_srv_rdma_cm_ev_handler(struct rdma_cm_id *cm_id,
+					struct rdma_cm_event *event)
+{
+	struct ibtrs_con *con = cm_id->context;
+	int ret = 0;
+
+	DEB("cma_event type %d cma_id %p(%s) on con: %p\n", event->event,
+	    cm_id, rdma_event_msg(event->event), con);
+	if (!con && event->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
+		INFO_NP("Ignore cma_event type %d cma_id %p(%s)\n",
+			event->event, cm_id, rdma_event_msg(event->event));
+		return 0;
+	}
+
+	switch (event->event) {
+	case RDMA_CM_EVENT_CONNECT_REQUEST:
+		ret = rdma_con_establish(cm_id, event->param.conn.private_data,
+					 event->param.conn.private_data_len);
+		break;
+	case RDMA_CM_EVENT_ESTABLISHED:
+		csm_schedule_event(con, CSM_EV_CON_ESTABLISHED);
+		break;
+	case RDMA_CM_EVENT_DISCONNECTED:
+	case RDMA_CM_EVENT_TIMEWAIT_EXIT:
+		csm_schedule_event(con, CSM_EV_CON_DISCONNECTED);
+		break;
+
+	case RDMA_CM_EVENT_DEVICE_REMOVAL: {
+		struct completion dc;
+
+		ERR_RL(con->sess,
+		       "IB Device was removed, disconnecting session.\n");
+
+		con->device_being_removed = true;
+		init_completion(&dc);
+		con->sess->dev->ib_sess_destroy_completion = &dc;
+
+		csm_schedule_event(con, CSM_EV_DEVICE_REMOVAL);
+		wait_for_completion(&dc);
+
+		/* If it's user connection, the cm_id will be destroyed by
+		 * destroy_sess(), so return 0 to signal that we will destroy
+		 * it later. Otherwise, return 1 so CMA will destroy it.
+		 */
+		if (con->user)
+			return 0;
+		else
+			return 1;
+	}
+	case RDMA_CM_EVENT_CONNECT_ERROR:
+	case RDMA_CM_EVENT_ROUTE_ERROR:
+	case RDMA_CM_EVENT_UNREACHABLE:
+	case RDMA_CM_EVENT_ADDR_CHANGE:
+		ERR_RL(con->sess, "CM error (CM event: %s, errno: %d)\n",
+		       rdma_event_msg(event->event), event->status);
+
+		csm_schedule_event(con, CSM_EV_CON_ERROR);
+		break;
+	case RDMA_CM_EVENT_REJECTED:
+		/* reject status is defined in enum, not errno */
+		ERR_RL(con->sess,
+		       "Connection rejected (CM event: %s, err: %s)\n",
+		       rdma_event_msg(event->event),
+		       rdma_reject_msg(cm_id, event->status));
+		csm_schedule_event(con, CSM_EV_CON_ERROR);
+		break;
+	default:
+		WRN(con->sess, "Ignoring unexpected CM event %s, errno %d\n",
+		    rdma_event_msg(event->event), event->status);
+		break;
+	}
+	return ret;
+}
+
+static int ibtrs_srv_cm_init(struct rdma_cm_id **cm_id, struct sockaddr *addr,
+			     enum rdma_port_space ps)
+{
+	int ret;
+
+	*cm_id = rdma_create_id(&init_net, ibtrs_srv_rdma_cm_ev_handler, NULL,
+				ps, IB_QPT_RC);
+	if (IS_ERR(*cm_id)) {
+		ret = PTR_ERR(*cm_id);
+		ERR_NP("Creating id for RDMA connection failed, errno: %d\n",
+		       ret);
+		goto err_out;
+	}
+	DEB("created cm_id %p\n", *cm_id);
+	ret = rdma_bind_addr(*cm_id, addr);
+	if (ret) {
+		ERR_NP("Binding RDMA address failed, errno: %d\n", ret);
+		goto err_cm;
+	}
+	DEB("rdma_bind_addr successful\n");
+	/* we currently accept 64 rdma_connects */
+	ret = rdma_listen(*cm_id, 64);
+	if (ret) {
+		ERR_NP("Listening on RDMA connection failed, errno: %d\n", ret);
+		goto err_cm;
+	}
+
+	switch (addr->sa_family) {
+	case AF_INET:
+		DEB("listening on port %u\n",
+		    ntohs(((struct sockaddr_in *)addr)->sin_port));
+		break;
+	case AF_INET6:
+		DEB("listening on port %u\n",
+		    ntohs(((struct sockaddr_in6 *)addr)->sin6_port));
+		break;
+	case AF_IB:
+		DEB("listening on service id 0x%016llx\n",
+		    be64_to_cpu(rdma_get_service_id(*cm_id, addr)));
+		break;
+	default:
+		DEB("listening on address family %u\n", addr->sa_family);
+	}
+
+	return 0;
+
+err_cm:
+	rdma_destroy_id(*cm_id);
+err_out:
+	return ret;
+}
+
+static int ibtrs_srv_rdma_init(void)
+{
+	int ret = 0;
+	struct sockaddr_in6 sin = {
+		.sin6_family	= AF_INET6,
+		.sin6_addr	= IN6ADDR_ANY_INIT,
+		.sin6_port	= htons(IBTRS_SERVER_PORT),
+	};
+	struct sockaddr_ib sib = {
+		.sib_family			= AF_IB,
+		.sib_addr.sib_subnet_prefix	= 0ULL,
+		.sib_addr.sib_interface_id	= 0ULL,
+		.sib_sid	= cpu_to_be64(RDMA_IB_IP_PS_IB |
+					      IBTRS_SERVER_PORT),
+		.sib_sid_mask	= cpu_to_be64(0xffffffffffffffffULL),
+		.sib_pkey	= cpu_to_be16(0xffff),
+	};
+
+	/*
+	 * We accept both IPoIB and IB connections, so we need to keep
+	 * two cm id's, one for each socket type and port space.
+	 * If the cm initialization of one of the id's fails, we abort
+	 * everything.
+	 */
+
+	ret = ibtrs_srv_cm_init(&cm_id_ip, (struct sockaddr *)&sin,
+				RDMA_PS_TCP);
+	if (ret)
+		return ret;
+
+	ret = ibtrs_srv_cm_init(&cm_id_ib, (struct sockaddr *)&sib, RDMA_PS_IB);
+	if (ret)
+		goto err_cm_ib;
+
+	return ret;
+
+err_cm_ib:
+	rdma_destroy_id(cm_id_ip);
+	return ret;
+}
+
+static void ibtrs_srv_destroy_buf_pool(void)
+{
+	struct ibtrs_rcv_buf_pool *pool, *pool_next;
+
+	mutex_lock(&buf_pool_mutex);
+	list_for_each_entry_safe(pool, pool_next, &free_buf_pool_list, list) {
+		list_del(&pool->list);
+		nr_free_buf_pool--;
+		free_recv_buf_pool(pool);
+	}
+	mutex_unlock(&buf_pool_mutex);
+}
+
+static void ibtrs_srv_alloc_ini_buf_pool(void)
+{
+	struct ibtrs_rcv_buf_pool *pool;
+	int i;
+
+	if (init_pool_size == 0)
+		return;
+
+	INFO_NP("Trying to allocate RDMA buffers pool for %d client(s)\n",
+		init_pool_size);
+	for (i = 0; i < init_pool_size; i++) {
+		pool = alloc_rcv_buf_pool();
+		if (!pool) {
+			ERR_NP("Failed to allocate initial RDMA buffer pool"
+			       " #%d\n", i + 1);
+			break;
+		}
+		mutex_lock(&buf_pool_mutex);
+		list_add(&pool->list, &free_buf_pool_list);
+		nr_free_buf_pool++;
+		nr_total_buf_pool++;
+		mutex_unlock(&buf_pool_mutex);
+		DEB("Allocated buffer pool #%d\n", i);
+	}
+
+	INFO_NP("Allocated RDMA buffers pool for %d client(s)\n", i);
+}
+
+int ibtrs_srv_register(const struct ibtrs_srv_ops *ops)
+{
+	int err;
+
+	if (srv_ops) {
+		ERR_NP("Registration failed, module %s already registered,"
+		       " only 1 user module supported\n",
+		srv_ops->owner->name);
+		return -ENOTSUPP;
+	}
+
+	if (unlikely(!srv_ops_are_valid(ops))) {
+		ERR_NP("Registration failed, user module supploed invalid ops"
+		       " parameter\n");
+		return -EFAULT;
+	}
+
+	ibtrs_srv_alloc_ini_buf_pool();
+
+	err = ibtrs_srv_rdma_init();
+	if (err) {
+		ERR_NP("Can't init RDMA resource, errno: %d\n", err);
+		return err;
+	}
+	srv_ops = ops;
+
+	return 0;
+}
+EXPORT_SYMBOL(ibtrs_srv_register);
+
+inline void ibtrs_srv_queue_close(struct ibtrs_session *sess)
+{
+	ssm_schedule_event(sess, SSM_EV_SYSFS_DISCONNECT);
+}
+
+static void close_sessions(void)
+{
+	struct ibtrs_session *sess;
+
+	mutex_lock(&sess_mutex);
+	list_for_each_entry(sess, &sess_list, list) {
+		if (!ibtrs_srv_sess_get(sess))
+			continue;
+		ssm_schedule_event(sess, SSM_EV_SESS_CLOSE);
+		ibtrs_srv_sess_put(sess);
+	}
+	mutex_unlock(&sess_mutex);
+
+	wait_event(sess_list_waitq, list_empty(&sess_list));
+}
+
+void ibtrs_srv_unregister(const struct ibtrs_srv_ops *ops)
+{
+	if (!srv_ops) {
+		WRN_NP("Nothing to unregister - srv_ops = NULL\n");
+		return;
+	}
+
+	/* TODO: in order to support registration of multiple modules,
+	 * introduce a list with srv_ops and search for the correct
+	 * one.
+	 */
+
+	if (srv_ops != ops) {
+		ERR_NP("Ops is not the ops we have registered\n");
+		return;
+	}
+
+	rdma_destroy_id(cm_id_ip);
+	cm_id_ip = NULL;
+	rdma_destroy_id(cm_id_ib);
+	cm_id_ib = NULL;
+	close_sessions();
+	flush_workqueue(destroy_wq);
+	ibtrs_srv_destroy_buf_pool();
+	srv_ops = NULL;
+}
+EXPORT_SYMBOL(ibtrs_srv_unregister);
+
+static int check_module_params(void)
+{
+	if (sess_queue_depth < 1 || sess_queue_depth > MAX_SESS_QUEUE_DEPTH) {
+		ERR_NP("Invalid sess_queue_depth parameter value\n");
+		return -EINVAL;
+	}
+
+	/* check if IB immediate data size is enough to hold the mem_id and the
+	 * offset inside the memory chunk
+	 */
+	if (ilog2(sess_queue_depth - 1) + ilog2(rcv_buf_size - 1) >
+	    IB_IMM_SIZE_BITS) {
+		ERR_NP("RDMA immediate size (%db) not enough to encode "
+		       "%d buffers of size %dB. Reduce 'sess_queue_depth' "
+		       "or 'max_io_size' parameters.\n", IB_IMM_SIZE_BITS,
+		       sess_queue_depth, rcv_buf_size);
+		return -EINVAL;
+	}
+
+	if (init_pool_size < 0) {
+		ERR_NP("Invalid 'init_pool_size' parameter value."
+		       " Value must be positive.\n");
+		return -EINVAL;
+	}
+
+	if (pool_size_hi_wm < init_pool_size) {
+		ERR_NP("Invalid 'pool_size_hi_wm' parameter value. Value must"
+		       " be iqual or higher than 'init_pool_size'.\n");
+		return -EINVAL;
+	}
+
+	return 0;
+}
+
+static void csm_init(struct ibtrs_con *con)
+{
+	DEB("initializing csm to %s\n", csm_state_str(CSM_STATE_REQUESTED));
+	csm_set_state(con, CSM_STATE_REQUESTED);
+}
+
+static int send_msg_sess_open_resp(struct ibtrs_con *con)
+{
+	struct ibtrs_msg_sess_open_resp *msg;
+	int err;
+	struct ibtrs_session *sess = con->sess;
+
+	msg = sess->rdma_info_iu->buf;
+
+	fill_ibtrs_msg_sess_open_resp(msg, con);
+
+	err = ibtrs_post_send(con->ib_con.qp, con->sess->dev->ib_sess.mr,
+			      sess->rdma_info_iu, msg->hdr.tsize);
+	if (unlikely(err))
+		ERR(sess, "Sending sess open resp failed, "
+			  "posting msg to QP failed, errno: %d\n", err);
+
+	return err;
+}
+
+static void queue_heartbeat_dwork(struct ibtrs_session *sess)
+{
+	ibtrs_set_last_heartbeat(&sess->heartbeat);
+	WARN_ON(!queue_delayed_work(sess->sm_wq,
+				    &sess->send_heartbeat_dwork,
+				    HEARTBEAT_INTV_JIFFIES));
+	WARN_ON(!queue_delayed_work(sess->sm_wq,
+				    &sess->check_heartbeat_dwork,
+				    HEARTBEAT_INTV_JIFFIES));
+}
+
+static void csm_requested(struct ibtrs_con *con, enum csm_ev ev)
+{
+	struct ibtrs_session *sess = con->sess;
+	enum csm_state state = con->state;
+
+	DEB("con %p, event %s\n", con, csm_ev_str(ev));
+	switch (ev) {
+	case CSM_EV_CON_ESTABLISHED: {
+		csm_set_state(con, CSM_STATE_CONNECTED);
+		if (con->user) {
+			/* send back rdma info */
+			if (send_msg_sess_open_resp(con))
+				goto destroy;
+			queue_heartbeat_dwork(con->sess);
+		}
+		ssm_schedule_event(sess, SSM_EV_CON_CONNECTED);
+		break;
+	}
+	case CSM_EV_DEVICE_REMOVAL:
+	case CSM_EV_CON_ERROR:
+	case CSM_EV_SESS_CLOSING:
+	case CSM_EV_CON_DISCONNECTED:
+destroy:
+		csm_set_state(con, CSM_STATE_CLOSED);
+		close_con(con);
+		ssm_schedule_event(sess, SSM_EV_CON_EST_ERR);
+		break;
+	default:
+		ERR(sess, "Connection received unexpected event %s "
+		    "in %s state.\n", csm_ev_str(ev), csm_state_str(state));
+	}
+}
+
+static void csm_connected(struct ibtrs_con *con, enum csm_ev ev)
+{
+	struct ibtrs_session *sess = con->sess;
+	enum csm_state state = con->state;
+
+	DEB("con %p, event %s\n", con, csm_ev_str(ev));
+	switch (ev) {
+	case CSM_EV_CON_ERROR:
+	case CSM_EV_SESS_CLOSING: {
+		int err;
+
+		csm_set_state(con, CSM_STATE_CLOSING);
+		err = rdma_disconnect(con->cm_id);
+		if (err)
+			ERR(sess, "Connection received event %s "
+			    "in %s state, new state is %s, but failed to "
+			    "disconnect connection.\n", csm_ev_str(ev),
+			    csm_state_str(state), csm_state_str(con->state));
+		break;
+		}
+	case CSM_EV_DEVICE_REMOVAL:
+		/* Send a SSM_EV_SESS_CLOSE event to the session to speed up the
+		 * closing of the other connections. If we just wait for the
+		 * client to close all connections this can take a while.
+		 */
+		ssm_schedule_event(sess, SSM_EV_SESS_CLOSE);
+		/* fall-through */
+	case CSM_EV_CON_DISCONNECTED: {
+		int err, cnt = 0;
+
+		csm_set_state(con, CSM_STATE_FLUSHING);
+		err = rdma_disconnect(con->cm_id);
+		if (err)
+			ERR(sess, "Connection received event %s "
+			    "in %s state, new state is %s, but failed to "
+			    "disconnect connection.\n", csm_ev_str(ev),
+			    csm_state_str(state), csm_state_str(con->state));
+
+		wait_event(sess->bufs_wait,
+			   !atomic_read(&sess->stats.rdma_stats.inflight));
+		DEB("posting beacon on con %p\n", con);
+		err = post_beacon(&con->ib_con);
+		if (err) {
+			ERR(sess, "Connection received event %s "
+			    "in %s state, new state is %s but failed to post"
+			    " beacon, closing connection.\n", csm_ev_str(ev),
+			    csm_state_str(state), csm_state_str(con->state));
+			goto destroy;
+		}
+
+		err = ibtrs_request_cq_notifications(&con->ib_con);
+		if (unlikely(err < 0)) {
+			WRN(con->sess, "Requesting CQ Notification for"
+			    " ib_con failed. Connection will be destroyed\n");
+			goto destroy;
+		} else if (err > 0) {
+			err = get_process_wcs(con, &cnt);
+			if (unlikely(err))
+				goto destroy;
+			break;
+		}
+		break;
+
+destroy:
+		csm_set_state(con, CSM_STATE_CLOSED);
+		close_con(con);
+		ssm_schedule_event(sess, SSM_EV_CON_DISCONNECTED);
+
+		break;
+		}
+	default:
+		ERR(sess, "Connection received unexpected event %s "
+		    "in %s state\n", csm_ev_str(ev), csm_state_str(state));
+	}
+}
+
+static void csm_closing(struct ibtrs_con *con, enum csm_ev ev)
+{
+	struct ibtrs_session *sess = con->sess;
+	enum csm_state state = con->state;
+
+	DEB("con %p, event %s\n", con, csm_ev_str(ev));
+	switch (ev) {
+	case CSM_EV_DEVICE_REMOVAL:
+	case CSM_EV_CON_DISCONNECTED: {
+		int err, cnt = 0;
+
+		csm_set_state(con, CSM_STATE_FLUSHING);
+
+		wait_event(sess->bufs_wait,
+			   !atomic_read(&sess->stats.rdma_stats.inflight));
+
+		DEB("posting beacon on con %p\n", con);
+		if (post_beacon(&con->ib_con)) {
+			ERR(sess, "Connection received event %s "
+			    "in %s state, new state is %s but failed to post"
+			    " beacon, closing connection.\n", csm_ev_str(ev),
+			    csm_state_str(state), csm_state_str(con->state));
+			goto destroy;
+		}
+
+		err = ibtrs_request_cq_notifications(&con->ib_con);
+		if (unlikely(err < 0)) {
+			WRN(con->sess, "Requesting CQ Notification for"
+			    " ib_con failed. Connection will be destroyed\n");
+			goto destroy;
+		} else if (err > 0) {
+			err = get_process_wcs(con, &cnt);
+			if (unlikely(err))
+				goto destroy;
+			break;
+		}
+		break;
+
+destroy:
+		csm_set_state(con, CSM_STATE_CLOSED);
+		close_con(con);
+		ssm_schedule_event(sess, SSM_EV_CON_DISCONNECTED);
+		break;
+	}
+	case CSM_EV_CON_ERROR:
+		/* ignore connection errors, just wait for CM_DISCONNECTED */
+	case CSM_EV_SESS_CLOSING:
+		break;
+	default:
+		ERR(sess, "Connection received unexpected event %s "
+		    "in %s state\n", csm_ev_str(ev), csm_state_str(state));
+	}
+}
+
+static void csm_flushing(struct ibtrs_con *con, enum csm_ev ev)
+{
+	struct ibtrs_session *sess = con->sess;
+	enum csm_state state = con->state;
+
+	DEB("con %p, event %s\n", con, csm_ev_str(ev));
+
+	switch (ev) {
+	case CSM_EV_BEACON_COMPLETED:
+		csm_set_state(con, CSM_STATE_CLOSED);
+		close_con(con);
+		ssm_schedule_event(sess, SSM_EV_CON_DISCONNECTED);
+		break;
+	case CSM_EV_SESS_CLOSING:
+	case CSM_EV_DEVICE_REMOVAL:
+		/* Ignore CSM_EV_DEVICE_REMOVAL and CSM_EV_SESS_CLOSING in
+		 * this state. The beacon was already posted, so the
+		 * CSM_EV_BEACON_COMPLETED event should arrive anytime soon.
+		 */
+		break;
+	case CSM_EV_CON_ERROR:
+		break;
+	case CSM_EV_CON_DISCONNECTED:
+		/* Ignore CSM_EV_CON_DISCONNECTED. At this point we could have
+		 * already received a CSM_EV_CON_DISCONNECTED for the same
+		 * connection, but an additional RDMA_CM_EVENT_DISCONNECTED or
+		 * RDMA_CM_EVENT_TIMEWAIT_EXIT could be generated.
+		 */
+		break;
+	default:
+		ERR(sess, "Connection received unexpected event %s "
+		    "in %s state\n", csm_ev_str(ev), csm_state_str(state));
+	}
+}
+
+static void csm_closed(struct ibtrs_con *con, enum csm_ev ev)
+{
+	/* in this state, we ignore every event scheduled for this connection
+	 * and just wait for the session workqueue to be flushed and the
+	 * connection freed
+	 */
+	DEB("con %p, event %s\n", con, csm_ev_str(ev));
+}
+
+typedef void (ibtrs_srv_csm_ev_handler_fn)(struct ibtrs_con *, enum csm_ev);
+
+static ibtrs_srv_csm_ev_handler_fn *ibtrs_srv_csm_ev_handlers[] = {
+	[CSM_STATE_REQUESTED]		= csm_requested,
+	[CSM_STATE_CONNECTED]		= csm_connected,
+	[CSM_STATE_CLOSING]		= csm_closing,
+	[CSM_STATE_FLUSHING]		= csm_flushing,
+	[CSM_STATE_CLOSED]		= csm_closed,
+};
+
+static inline void ibtrs_srv_csm_ev_handle(struct ibtrs_con *con,
+					   enum csm_ev ev)
+{
+	return (*ibtrs_srv_csm_ev_handlers[con->state])(con, ev);
+}
+
+static void csm_worker(struct work_struct *work)
+{
+	struct csm_work *csm_w = container_of(work, struct csm_work, work);
+
+	ibtrs_srv_csm_ev_handle(csm_w->con, csm_w->ev);
+	kvfree(csm_w);
+}
+
+static void csm_schedule_event(struct ibtrs_con *con, enum csm_ev ev)
+{
+	struct csm_work *w;
+
+	if (!ibtrs_srv_sess_get(con->sess))
+		return;
+
+	while (true) {
+		if (con->state == CSM_STATE_CLOSED)
+			goto out;
+		w = ibtrs_malloc(sizeof(*w));
+		if (w)
+			break;
+		cond_resched();
+	}
+
+	w->con = con;
+	w->ev = ev;
+	INIT_WORK(&w->work, csm_worker);
+	queue_work(con->sess->sm_wq, &w->work);
+
+out:
+	ibtrs_srv_sess_put(con->sess);
+}
+
+static void sess_schedule_csm_event(struct ibtrs_session *sess, enum csm_ev ev)
+{
+	struct ibtrs_con *con;
+
+	list_for_each_entry(con, &sess->con_list, list)
+		csm_schedule_event(con, ev);
+}
+
+static void remove_sess_from_sysfs(struct ibtrs_session *sess)
+{
+	if (!sess->state_in_sysfs)
+		return;
+
+	kobject_del(&sess->kobj_stats);
+	kobject_del(&sess->kobj);
+	sess->state_in_sysfs = false;
+
+	ibtrs_srv_schedule_sysfs_put(sess);
+}
+
+static __always_inline int
+__ibtrs_srv_request_cq_notifications(struct ibtrs_con *con)
+{
+	return ibtrs_request_cq_notifications(&con->ib_con);
+}
+
+static int ibtrs_srv_request_cq_notifications(struct ibtrs_session *sess)
+{
+	struct ibtrs_con *con;
+	int err, cnt = 0;
+
+	list_for_each_entry(con, &sess->con_list, list)  {
+		if (con->state == CSM_STATE_CONNECTED) {
+			err = __ibtrs_srv_request_cq_notifications(con);
+			if (unlikely(err < 0)) {
+				return err;
+			} else if (err > 0) {
+				err = get_process_wcs(con, &cnt);
+				if (unlikely(err))
+					return err;
+			}
+		}
+	}
+
+	return 0;
+}
+
+static void ssm_idle(struct ibtrs_session *sess, enum ssm_ev ev)
+{
+	enum ssm_state state = sess->state;
+
+	DEB("sess %p, event %s, est_cnt=%d\n", sess, ssm_ev_str(ev),
+	    sess->est_cnt);
+	switch (ev) {
+	case SSM_EV_CON_DISCONNECTED:
+		sess->est_cnt--;
+		/* fall through */
+	case SSM_EV_CON_EST_ERR:
+		if (!sess->active_cnt) {
+			ibtrs_srv_destroy_ib_session(sess);
+			ssm_set_state(sess, SSM_STATE_CLOSED);
+			cancel_delayed_work(&sess->check_heartbeat_dwork);
+			schedule_sess_put(sess);
+		} else {
+			ssm_set_state(sess, SSM_STATE_CLOSING);
+		}
+		break;
+	case SSM_EV_CON_CONNECTED: {
+		int err;
+
+		sess->est_cnt++;
+		if (sess->est_cnt != sess->con_cnt)
+			break;
+
+		err = ibtrs_srv_create_sess_files(sess);
+		if (err) {
+			if (err == -EEXIST)
+				ERR(sess,
+				    "Session sysfs files already exist,"
+				    " possibly a user-space process is"
+				    " holding them\n");
+			else
+				ERR(sess,
+				    "Create session sysfs files failed,"
+				    " errno: %d\n", err);
+			goto destroy;
+		}
+
+		sess->state_in_sysfs = true;
+
+		err = ibtrs_srv_sess_ev(sess, IBTRS_SRV_SESS_EV_CONNECTED);
+		if (err) {
+			ERR(sess, "Notifying user session event"
+			    " failed, errno: %d\n. Session is closed", err);
+			goto destroy;
+		}
+
+		ssm_set_state(sess, SSM_STATE_CONNECTED);
+		err = ibtrs_srv_request_cq_notifications(sess);
+		if (err) {
+			ERR(sess, "Requesting CQ completion notifications"
+			    " failed, errno: %d. Session will be closed.\n",
+			    err);
+			goto destroy;
+		}
+
+		break;
+destroy:
+		remove_sess_from_sysfs(sess);
+		ssm_set_state(sess, SSM_STATE_CLOSING);
+		sess_schedule_csm_event(sess, CSM_EV_SESS_CLOSING);
+		break;
+	}
+	case SSM_EV_SESS_CLOSE:
+		ssm_set_state(sess, SSM_STATE_CLOSING);
+		sess_schedule_csm_event(sess, CSM_EV_SESS_CLOSING);
+		break;
+	default:
+		ERR(sess, "Session received unexpected event %s "
+		    "in %s state.\n", ssm_ev_str(ev), ssm_state_str(state));
+	}
+}
+
+static void ssm_connected(struct ibtrs_session *sess, enum ssm_ev ev)
+{
+	enum ssm_state state = sess->state;
+
+	DEB("sess %p, event %s, est_cnt=%d\n", sess, ssm_ev_str(ev),
+	    sess->est_cnt);
+	switch (ev) {
+	case SSM_EV_CON_DISCONNECTED:
+		remove_sess_from_sysfs(sess);
+		sess->est_cnt--;
+
+		ssm_set_state(sess, SSM_STATE_CLOSING);
+		ibtrs_srv_sess_ev(sess, IBTRS_SRV_SESS_EV_DISCONNECTING);
+		break;
+	case SSM_EV_SESS_CLOSE:
+	case SSM_EV_SYSFS_DISCONNECT:
+		remove_sess_from_sysfs(sess);
+		ssm_set_state(sess, SSM_STATE_CLOSING);
+		ibtrs_srv_sess_ev(sess, IBTRS_SRV_SESS_EV_DISCONNECTING);
+
+		sess_schedule_csm_event(sess, CSM_EV_SESS_CLOSING);
+		break;
+	default:
+		ERR(sess, "Session received unexpected event %s "
+		    "in %s state.\n", ssm_ev_str(ev), ssm_state_str(state));
+	}
+}
+
+static void ssm_closing(struct ibtrs_session *sess, enum ssm_ev ev)
+{
+	enum ssm_state state = sess->state;
+
+	DEB("sess %p, event %s, est_cnt=%d\n", sess, ssm_ev_str(ev),
+	    sess->est_cnt);
+	switch (ev) {
+	case SSM_EV_CON_CONNECTED:
+		sess->est_cnt++;
+		break;
+	case SSM_EV_CON_DISCONNECTED:
+		sess->est_cnt--;
+		/* fall through */
+	case SSM_EV_CON_EST_ERR:
+		if (sess->active_cnt == 0) {
+			ibtrs_srv_destroy_ib_session(sess);
+			ssm_set_state(sess, SSM_STATE_CLOSED);
+			ibtrs_srv_sess_ev(sess, IBTRS_SRV_SESS_EV_DISCONNECTED);
+			cancel_delayed_work(&sess->check_heartbeat_dwork);
+			schedule_sess_put(sess);
+		}
+		break;
+	case SSM_EV_SESS_CLOSE:
+		sess_schedule_csm_event(sess, CSM_EV_SESS_CLOSING);
+		break;
+	case SSM_EV_SYSFS_DISCONNECT:
+		/* just ignore it, the connection should have a
+		 * CSM_EV_SESS_CLOSING event on the queue to be
+		 * processed later
+		 */
+		break;
+	default:
+		ERR(sess, "Session received unexpected event %s "
+		    "in %s state.\n", ssm_ev_str(ev), ssm_state_str(state));
+	}
+}
+
+static void ssm_closed(struct ibtrs_session *sess, enum ssm_ev ev)
+{
+	/* in this state, we ignore every event and wait for the session
+	 * to be destroyed
+	 */
+	DEB("sess %p, event %s, est_cnt=%d\n", sess, ssm_ev_str(ev),
+	    sess->est_cnt);
+}
+
+typedef void (ssm_ev_handler_fn)(struct ibtrs_session *, enum ssm_ev);
+
+static ssm_ev_handler_fn *ibtrs_srv_ev_handlers[] = {
+	[SSM_STATE_IDLE]		= ssm_idle,
+	[SSM_STATE_CONNECTED]		= ssm_connected,
+	[SSM_STATE_CLOSING]		= ssm_closing,
+	[SSM_STATE_CLOSED]		= ssm_closed,
+};
+
+static void check_heartbeat_work(struct work_struct *work)
+{
+	struct ibtrs_session *sess;
+
+	sess = container_of(to_delayed_work(work), struct ibtrs_session,
+			    check_heartbeat_dwork);
+
+	if (ibtrs_heartbeat_timeout_is_expired(&sess->heartbeat)) {
+		ssm_schedule_event(sess, SSM_EV_SESS_CLOSE);
+		return;
+	}
+
+	ibtrs_heartbeat_warn(&sess->heartbeat);
+
+	if (WARN_ON(!queue_delayed_work(sess->sm_wq,
+					&sess->check_heartbeat_dwork,
+					HEARTBEAT_INTV_JIFFIES)))
+		WRN_RL(sess, "Schedule check heartbeat work failed, "
+		       "check_heartbeat worker already queued?\n");
+}
+
+static void send_heartbeat_work(struct work_struct *work)
+{
+	struct ibtrs_session *sess;
+	int err;
+
+	sess = container_of(to_delayed_work(work), struct ibtrs_session,
+			    send_heartbeat_dwork);
+
+	if (ibtrs_heartbeat_send_ts_diff_ms(&sess->heartbeat) >=
+	    HEARTBEAT_INTV_MS) {
+		err = send_heartbeat(sess);
+		if (unlikely(err)) {
+			WRN_RL(sess,
+			       "Sending heartbeat failed, errno: %d,"
+			       " no further heartbeat will be sent\n", err);
+			return;
+		}
+	}
+
+	if (WARN_ON(!queue_delayed_work(sess->sm_wq,
+					&sess->send_heartbeat_dwork,
+					HEARTBEAT_INTV_JIFFIES)))
+		WRN_RL(sess, "schedule send heartbeat work failed, "
+		       "send_heartbeat worker already queued?\n");
+}
+
+static inline void ssm_ev_handle(struct ibtrs_session *sess, enum ssm_ev ev)
+{
+	return (*ibtrs_srv_ev_handlers[sess->state])(sess, ev);
+}
+
+static void ssm_worker(struct work_struct *work)
+{
+	struct ssm_work *ssm_w = container_of(work, struct ssm_work, work);
+
+	ssm_ev_handle(ssm_w->sess, ssm_w->ev);
+	kvfree(ssm_w);
+}
+
+static int ssm_schedule_event(struct ibtrs_session *sess, enum ssm_ev ev)
+{
+	struct ssm_work *w;
+	int ret = 0;
+
+	if (!ibtrs_srv_sess_get(sess))
+		return -EPERM;
+
+	while (true) {
+		if (sess->state == SSM_STATE_CLOSED) {
+			ret = -EPERM;
+			goto out;
+		}
+		w = ibtrs_malloc(sizeof(*w));
+		if (w)
+			break;
+		cond_resched();
+	}
+
+	w->sess = sess;
+	w->ev = ev;
+	INIT_WORK(&w->work, ssm_worker);
+	queue_work(sess->sm_wq, &w->work);
+
+out:
+	ibtrs_srv_sess_put(sess);
+	return ret;
+}
+
+static int ssm_init(struct ibtrs_session *sess)
+{
+	sess->sm_wq = create_singlethread_workqueue("ibtrs_ssm_wq");
+	if (!sess->sm_wq)
+		return -ENOMEM;
+
+	INIT_DELAYED_WORK(&sess->check_heartbeat_dwork, check_heartbeat_work);
+	INIT_DELAYED_WORK(&sess->send_heartbeat_dwork, send_heartbeat_work);
+
+	ssm_set_state(sess, SSM_STATE_IDLE);
+
+	return 0;
+}
+
+static int ibtrs_srv_create_debugfs_files(void)
+{
+	int ret = 0;
+	struct dentry *file;
+
+	ibtrs_srv_debugfs_dir = debugfs_create_dir("ibtrs_server", NULL);
+	if (IS_ERR_OR_NULL(ibtrs_srv_debugfs_dir)) {
+		ibtrs_srv_debugfs_dir = NULL;
+		ret = PTR_ERR(ibtrs_srv_debugfs_dir);
+		if (ret == -ENODEV)
+			WRN_NP("Debugfs not enabled in kernel\n");
+		else
+			WRN_NP("Failed to create top-level debugfs directory,"
+			       " errno: %d\n", ret);
+		goto out;
+	}
+
+	mempool_debugfs_dir = debugfs_create_dir("mempool",
+						 ibtrs_srv_debugfs_dir);
+	if (IS_ERR_OR_NULL(mempool_debugfs_dir)) {
+		ret = PTR_ERR(mempool_debugfs_dir);
+		WRN_NP("Failed to create mempool debugfs directory,"
+		       " errno: %d\n", ret);
+		goto out_remove;
+	}
+
+	file = debugfs_create_u32("nr_free_buf_pool", 0444,
+				  mempool_debugfs_dir, &nr_free_buf_pool);
+	if (IS_ERR_OR_NULL(file)) {
+		WRN_NP("Failed to create mempool \"nr_free_buf_pool\""
+		       " debugfs file\n");
+		ret = -EINVAL;
+		goto out_remove;
+	}
+
+	file = debugfs_create_u32("nr_total_buf_pool", 0444,
+				  mempool_debugfs_dir, &nr_total_buf_pool);
+	if (IS_ERR_OR_NULL(file)) {
+		WRN_NP("Failed to create mempool \"nr_total_buf_pool\""
+		       " debugfs file\n");
+		ret = -EINVAL;
+		goto out_remove;
+	}
+
+	file = debugfs_create_u32("nr_active_sessions", 0444,
+				  mempool_debugfs_dir, &nr_active_sessions);
+	if (IS_ERR_OR_NULL(file)) {
+		WRN_NP("Failed to create mempool \"nr_active_sessions\""
+		       " debugfs file\n");
+		ret = -EINVAL;
+		goto out_remove;
+	}
+
+	goto out;
+
+out_remove:
+	debugfs_remove_recursive(ibtrs_srv_debugfs_dir);
+	ibtrs_srv_debugfs_dir = NULL;
+	mempool_debugfs_dir = NULL;
+out:
+	return ret;
+}
+
+static void ibtrs_srv_destroy_debugfs_files(void)
+{
+	debugfs_remove_recursive(ibtrs_srv_debugfs_dir);
+}
+
+static int __init ibtrs_server_init(void)
+{
+	int err;
+
+	if (!strlen(cq_affinity_list))
+		init_cq_affinity();
+
+	scnprintf(hostname, sizeof(hostname), "%s", utsname()->nodename);
+	INFO_NP("Loading module ibtrs_server, version: %s ("
+		" retry_count: %d, "
+		" default_heartbeat_timeout_ms: %d,"
+		" cq_affinity_list: %s, max_io_size: %d,"
+		" sess_queue_depth: %d, init_pool_size: %d,"
+		" pool_size_hi_wm: %d, hostname: %s)\n",
+		__stringify(IBTRS_VER),
+		retry_count, default_heartbeat_timeout_ms,
+		cq_affinity_list, max_io_size, sess_queue_depth,
+		init_pool_size, pool_size_hi_wm, hostname);
+
+	err = check_module_params();
+	if (err) {
+		ERR_NP("Failed to load module, invalid module parameters,"
+		       " errno: %d\n", err);
+		return err;
+	}
+
+	destroy_wq = alloc_workqueue("ibtrs_server_destroy_wq", 0, 0);
+	if (!destroy_wq) {
+		ERR_NP("Failed to load module,"
+		       " alloc ibtrs_server_destroy_wq failed\n");
+		return -ENOMEM;
+	}
+
+	err = ibtrs_srv_create_sysfs_files();
+	if (err) {
+		ERR_NP("Failed to load module, can't create sysfs files,"
+		       " errno: %d\n", err);
+		goto out_destroy_wq;
+	}
+
+	err = ibtrs_srv_create_debugfs_files();
+	if (err)
+		WRN_NP("Unable to create debugfs files, errno: %d."
+		       " Continuing without debugfs\n", err);
+
+	return 0;
+
+out_destroy_wq:
+	destroy_workqueue(destroy_wq);
+	return err;
+}
+
+static void __exit ibtrs_server_exit(void)
+{
+	INFO_NP("Unloading module\n");
+	ibtrs_srv_destroy_debugfs_files();
+	ibtrs_srv_destroy_sysfs_files();
+	destroy_workqueue(destroy_wq);
+
+	INFO_NP("Module unloaded\n");
+}
+
+module_init(ibtrs_server_init);
+module_exit(ibtrs_server_exit);
-- 
2.7.4

  parent reply	other threads:[~2017-03-24 10:45 UTC|newest]

Thread overview: 87+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2017-03-24 10:45 [RFC PATCH 00/28] INFINIBAND NETWORK BLOCK DEVICE (IBNBD) Jack Wang
2017-03-24 10:45 ` Jack Wang
2017-03-24 10:45 ` [PATCH 01/28] ibtrs: add header shared between ibtrs_client and ibtrs_server Jack Wang
2017-03-24 10:45   ` Jack Wang
2017-03-24 12:35   ` Johannes Thumshirn
2017-03-24 12:35     ` Johannes Thumshirn
2017-03-24 12:54     ` Jinpu Wang
2017-03-24 12:54       ` Jinpu Wang
2017-03-24 14:31       ` Johannes Thumshirn
2017-03-24 14:31         ` Johannes Thumshirn
2017-03-24 14:35         ` Jinpu Wang
2017-03-24 14:35           ` Jinpu Wang
2017-03-24 10:45 ` [PATCH 02/28] ibtrs: add header for log MICROs " Jack Wang
2017-03-24 10:45 ` [PATCH 03/28] ibtrs_lib: add common functions shared by client and server Jack Wang
2017-03-24 10:45 ` [PATCH 04/28] ibtrs_clt: add header file for exported interface Jack Wang
2017-03-24 10:45 ` [PATCH 05/28] ibtrs_clt: main functionality of ibtrs_client Jack Wang
2017-03-24 10:45   ` Jack Wang
2017-03-24 10:45 ` [PATCH 06/28] ibtrs_clt: add header file shared only in ibtrs_client Jack Wang
2017-03-24 10:45 ` [PATCH 07/28] ibtrs_clt: add files for sysfs interface Jack Wang
2017-03-24 10:45 ` [PATCH 08/28] ibtrs_clt: add Makefile and Kconfig Jack Wang
2017-03-25  5:51   ` kbuild test robot
2017-03-25  5:51     ` kbuild test robot
2017-03-25  6:55   ` kbuild test robot
2017-03-25  6:55     ` kbuild test robot
2017-03-24 10:45 ` [PATCH 09/28] ibtrs_srv: add header file for exported interface Jack Wang
2017-03-24 10:45   ` Jack Wang
2017-03-24 10:45 ` Jack Wang [this message]
2017-03-24 10:45 ` [PATCH 11/28] ibtrs_srv: add header shared in ibtrs_server Jack Wang
2017-03-24 10:45   ` Jack Wang
2017-03-24 10:45 ` [PATCH 12/28] ibtrs_srv: add sysfs interface Jack Wang
2017-03-24 10:45 ` [PATCH 13/28] ibtrs_srv: add Makefile and Kconfig Jack Wang
2017-03-24 10:45   ` Jack Wang
2017-03-25  7:55   ` kbuild test robot
2017-03-25  7:55     ` kbuild test robot
2017-03-25 10:54   ` kbuild test robot
2017-03-25 10:54     ` kbuild test robot
2017-03-24 10:45 ` [PATCH 14/28] ibnbd: add headers shared by ibnbd_client and ibnbd_server Jack Wang
2017-03-24 10:45 ` [PATCH 15/28] ibnbd: add shared library functions Jack Wang
2017-03-24 10:45   ` Jack Wang
2017-03-24 10:45 ` [PATCH 16/28] ibnbd_clt: add main functionality of ibnbd_client Jack Wang
2017-03-24 10:45   ` Jack Wang
2017-03-24 10:45 ` [PATCH 17/28] ibnbd_clt: add header shared in ibnbd_client Jack Wang
2017-03-24 10:45   ` Jack Wang
2017-03-24 10:45 ` [PATCH 18/28] ibnbd_clt: add sysfs interface Jack Wang
2017-03-24 10:45   ` Jack Wang
2017-03-24 10:45 ` [PATCH 19/28] ibnbd_clt: add log helpers Jack Wang
2017-03-24 10:45 ` [PATCH 20/28] ibnbd_clt: add Makefile and Kconfig Jack Wang
2017-03-24 10:45   ` Jack Wang
2017-03-25  8:38   ` kbuild test robot
2017-03-25  8:38     ` kbuild test robot
2017-03-25 11:17   ` kbuild test robot
2017-03-25 11:17     ` kbuild test robot
2017-03-24 10:45 ` [PATCH 21/28] ibnbd_srv: add header shared in ibnbd_server Jack Wang
2017-03-24 10:45   ` Jack Wang
2017-03-24 10:45 ` [PATCH 22/28] ibnbd_srv: add main functionality Jack Wang
2017-03-24 10:45 ` [PATCH 23/28] ibnbd_srv: add abstraction for submit IO to file or block device Jack Wang
2017-03-24 10:45   ` Jack Wang
2017-03-24 10:45 ` [PATCH 24/28] ibnbd_srv: add log helpers Jack Wang
2017-03-24 10:45 ` [PATCH 25/28] ibnbd_srv: add sysfs interface Jack Wang
2017-03-24 10:45   ` Jack Wang
2017-03-24 10:45 ` [PATCH 26/28] ibnbd_srv: add Makefile and Kconfig Jack Wang
2017-03-25  9:27   ` kbuild test robot
2017-03-25  9:27     ` kbuild test robot
2017-03-24 10:45 ` [PATCH 27/28] ibnbd: add doc for how to use ibnbd and sysfs interface Jack Wang
2017-03-25  7:44   ` kbuild test robot
2017-03-25  7:44     ` kbuild test robot
2017-03-24 10:45 ` [PATCH 28/28] MAINTRAINERS: Add maintainer for IBNBD/IBTRS Jack Wang
2017-03-24 12:15 ` [RFC PATCH 00/28] INFINIBAND NETWORK BLOCK DEVICE (IBNBD) Johannes Thumshirn
2017-03-24 12:15   ` Johannes Thumshirn
2017-03-24 12:46   ` Jinpu Wang
2017-03-24 12:46     ` Jinpu Wang
2017-03-24 12:48     ` Johannes Thumshirn
2017-03-24 12:48       ` Johannes Thumshirn
2017-03-24 13:31     ` Bart Van Assche
2017-03-24 13:31       ` Bart Van Assche
2017-03-24 14:24       ` Jinpu Wang
2017-03-24 14:24         ` Jinpu Wang
2017-03-24 14:20 ` Steve Wise
2017-03-24 14:20   ` Steve Wise
2017-03-24 14:37   ` Jinpu Wang
2017-03-24 14:37     ` Jinpu Wang
2017-03-27  2:20 ` Sagi Grimberg
2017-03-27  2:20   ` Sagi Grimberg
2017-03-27 10:21   ` Jinpu Wang
2017-03-27 10:21     ` Jinpu Wang
2017-03-28 14:17     ` Roman Penyaev
2017-03-28 14:17       ` Roman Penyaev

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1490352343-20075-11-git-send-email-jinpu.wangl@profitbricks.com \
    --to=jinpu.wang@profitbricks.com \
    --cc=Milind.dumbare@gmail.com \
    --cc=axboe@kernel.dk \
    --cc=danil.kipnis@profitbricks.com \
    --cc=dledford@redhat.com \
    --cc=hch@lst.de \
    --cc=kleber.souza@profitbricks.com \
    --cc=linux-block@vger.kernel.org \
    --cc=linux-rdma@vger.kernel.org \
    --cc=mail@fholler.de \
    --cc=roman.penyaev@profitbricks.com \
    --cc=yun.wang@profitbricks.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.