linux-kernel.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
* [PATCH 01/14] aoe: for performance support larger packet payloads
  2012-08-28 12:53 [PATCH 00/14] aoe driver v49 performance and usability improvements Ed Cashin
@ 2012-08-25 14:39 ` Ed Cashin
  2012-08-25 14:39 ` [PATCH 03/14] aoe: become I/O request queue handler for increased user control Ed Cashin
                   ` (12 subsequent siblings)
  13 siblings, 0 replies; 20+ messages in thread
From: Ed Cashin @ 2012-08-25 14:39 UTC (permalink / raw)
  To: Andrew Morton; +Cc: linux-kernel, ecashin

This patch adds the ability to work with large packets composed of a
number of segments, using the scatter gather feature of the block
layer (biovecs) and the network layer (skb frag array).  The
motivation is the performance gained by using a packet data payload
greater than a page size and by using the network card's scatter
gather feature.

Users of the out-of-tree aoe driver already had these changes, but
since early 2011, they have complained of increased memory utilization
and higher CPU utilization during heavy writes.[1] The commit below
appears related, as it disables scatter gather on non-IP protocols
inside the harmonize_features function, even when the NIC supports sg.

  commit f01a5236bd4b140198fbcc550f085e8361fd73fa
  Author: Jesse Gross <jesse@nicira.com>
  Date:   Sun Jan 9 06:23:31 2011 +0000

      net offloading: Generalize netif_get_vlan_features().

With that regression in place, transmits always linearize sg AoE
packets, but in-kernel users did not have this patch.  Before 2.6.38,
though, these changes were working to allow sg to increase
performance.

1. http://www.spinics.net/lists/linux-mm/msg15184.html

Signed-off-by: Ed Cashin <ecashin@coraid.com>
---
 drivers/block/aoe/aoe.h    |    2 +
 drivers/block/aoe/aoeblk.c |    3 +
 drivers/block/aoe/aoecmd.c |  138 ++++++++++++++++++++++++++++++-------------
 drivers/block/aoe/aoedev.c |    1 +
 drivers/block/aoe/aoenet.c |   13 +++-
 5 files changed, 111 insertions(+), 46 deletions(-)

diff --git a/drivers/block/aoe/aoe.h b/drivers/block/aoe/aoe.h
index db195ab..8ca8c8a 100644
--- a/drivers/block/aoe/aoe.h
+++ b/drivers/block/aoe/aoe.h
@@ -119,6 +119,8 @@ struct frame {
 	ulong bcnt;
 	sector_t lba;
 	struct sk_buff *skb;
+	struct bio_vec *bv;
+	ulong bv_off;
 };
 
 struct aoeif {
diff --git a/drivers/block/aoe/aoeblk.c b/drivers/block/aoe/aoeblk.c
index 321de7b..3a8f093 100644
--- a/drivers/block/aoe/aoeblk.c
+++ b/drivers/block/aoe/aoeblk.c
@@ -254,6 +254,7 @@ aoeblk_gdalloc(void *vp)
 {
 	struct aoedev *d = vp;
 	struct gendisk *gd;
+	enum { KB = 1024, MB = KB * KB, READ_AHEAD = MB, };
 	ulong flags;
 
 	gd = alloc_disk(AOE_PARTITIONS);
@@ -279,6 +280,8 @@ aoeblk_gdalloc(void *vp)
 	if (bdi_init(&d->blkq->backing_dev_info))
 		goto err_blkq;
 	spin_lock_irqsave(&d->lock, flags);
+	blk_queue_max_hw_sectors(d->blkq, BLK_DEF_MAX_SECTORS);
+	d->blkq->backing_dev_info.ra_pages = READ_AHEAD / PAGE_CACHE_SIZE;
 	gd->major = AOE_MAJOR;
 	gd->first_minor = d->sysminor * AOE_PARTITIONS;
 	gd->fops = &aoe_bdops;
diff --git a/drivers/block/aoe/aoecmd.c b/drivers/block/aoe/aoecmd.c
index de0435e..f10ab49 100644
--- a/drivers/block/aoe/aoecmd.c
+++ b/drivers/block/aoe/aoecmd.c
@@ -164,7 +164,8 @@ freeframe(struct aoedev *d)
 						rf = f;
 					continue;
 				}
-gotone:				skb_shinfo(skb)->nr_frags = skb->data_len = 0;
+gotone:				skb->truesize -= skb->data_len;
+				skb_shinfo(skb)->nr_frags = skb->data_len = 0;
 				skb_trim(skb, 0);
 				d->tgt = t;
 				ifrotate(*t);
@@ -200,6 +201,24 @@ gotone:				skb_shinfo(skb)->nr_frags = skb->data_len = 0;
 	return NULL;
 }
 
+static void
+skb_fillup(struct sk_buff *skb, struct bio_vec *bv, ulong off, ulong cnt)
+{
+	int frag = 0;
+	ulong fcnt;
+loop:
+	fcnt = bv->bv_len - (off - bv->bv_offset);
+	if (fcnt > cnt)
+		fcnt = cnt;
+	skb_fill_page_desc(skb, frag++, bv->bv_page, off, fcnt);
+	cnt -= fcnt;
+	if (cnt <= 0)
+		return;
+	bv++;
+	off = bv->bv_offset;
+	goto loop;
+}
+
 static int
 aoecmd_ata_rw(struct aoedev *d)
 {
@@ -210,7 +229,7 @@ aoecmd_ata_rw(struct aoedev *d)
 	struct bio_vec *bv;
 	struct aoetgt *t;
 	struct sk_buff *skb;
-	ulong bcnt;
+	ulong bcnt, fbcnt;
 	char writebit, extbit;
 
 	writebit = 0x10;
@@ -225,8 +244,28 @@ aoecmd_ata_rw(struct aoedev *d)
 	bcnt = t->ifp->maxbcnt;
 	if (bcnt == 0)
 		bcnt = DEFAULTBCNT;
-	if (bcnt > buf->bv_resid)
-		bcnt = buf->bv_resid;
+	if (bcnt > buf->resid)
+		bcnt = buf->resid;
+	fbcnt = bcnt;
+	f->bv = buf->bv;
+	f->bv_off = f->bv->bv_offset + (f->bv->bv_len - buf->bv_resid);
+	do {
+		if (fbcnt < buf->bv_resid) {
+			buf->bv_resid -= fbcnt;
+			buf->resid -= fbcnt;
+			break;
+		}
+		fbcnt -= buf->bv_resid;
+		buf->resid -= buf->bv_resid;
+		if (buf->resid == 0) {
+			d->inprocess = NULL;
+			break;
+		}
+		buf->bv++;
+		buf->bv_resid = buf->bv->bv_len;
+		WARN_ON(buf->bv_resid == 0);
+	} while (fbcnt);
+
 	/* initialize the headers & frame */
 	skb = f->skb;
 	h = (struct aoe_hdr *) skb_mac_header(skb);
@@ -237,7 +276,6 @@ aoecmd_ata_rw(struct aoedev *d)
 	t->nout++;
 	f->waited = 0;
 	f->buf = buf;
-	f->bufaddr = page_address(bv->bv_page) + buf->bv_off;
 	f->bcnt = bcnt;
 	f->lba = buf->sector;
 
@@ -252,10 +290,11 @@ aoecmd_ata_rw(struct aoedev *d)
 		ah->lba3 |= 0xe0;	/* LBA bit + obsolete 0xa0 */
 	}
 	if (bio_data_dir(buf->bio) == WRITE) {
-		skb_fill_page_desc(skb, 0, bv->bv_page, buf->bv_off, bcnt);
+		skb_fillup(skb, f->bv, f->bv_off, bcnt);
 		ah->aflags |= AOEAFL_WRITE;
 		skb->len += bcnt;
 		skb->data_len = bcnt;
+		skb->truesize += bcnt;
 		t->wpkts++;
 	} else {
 		t->rpkts++;
@@ -266,18 +305,7 @@ aoecmd_ata_rw(struct aoedev *d)
 
 	/* mark all tracking fields and load out */
 	buf->nframesout += 1;
-	buf->bv_off += bcnt;
-	buf->bv_resid -= bcnt;
-	buf->resid -= bcnt;
 	buf->sector += bcnt >> 9;
-	if (buf->resid == 0) {
-		d->inprocess = NULL;
-	} else if (buf->bv_resid == 0) {
-		buf->bv = ++bv;
-		buf->bv_resid = bv->bv_len;
-		WARN_ON(buf->bv_resid == 0);
-		buf->bv_off = bv->bv_offset;
-	}
 
 	skb->dev = t->ifp->nd;
 	skb = skb_clone(skb, GFP_ATOMIC);
@@ -364,14 +392,12 @@ resend(struct aoedev *d, struct aoetgt *t, struct frame *f)
 		put_lba(ah, f->lba);
 
 		n = f->bcnt;
-		if (n > DEFAULTBCNT)
-			n = DEFAULTBCNT;
 		ah->scnt = n >> 9;
 		if (ah->aflags & AOEAFL_WRITE) {
-			skb_fill_page_desc(skb, 0, virt_to_page(f->bufaddr),
-				offset_in_page(f->bufaddr), n);
+			skb_fillup(skb, f->bv, f->bv_off, n);
 			skb->len = sizeof *h + sizeof *ah + n;
 			skb->data_len = n;
+			skb->truesize += n;
 		}
 	}
 	skb->dev = t->ifp->nd;
@@ -530,20 +556,6 @@ rexmit_timer(ulong vp)
 				ejectif(t, ifp);
 				ifp = NULL;
 			}
-
-			if (ata_scnt(skb_mac_header(f->skb)) > DEFAULTBCNT / 512
-			&& ifp && ++ifp->lostjumbo > (t->nframes << 1)
-			&& ifp->maxbcnt != DEFAULTBCNT) {
-				printk(KERN_INFO
-					"aoe: e%ld.%d: "
-					"too many lost jumbo on "
-					"%s:%pm - "
-					"falling back to %d frames.\n",
-					d->aoemajor, d->aoeminor,
-					ifp->nd->name, t->addr,
-					DEFAULTBCNT);
-				ifp->maxbcnt = 0;
-			}
 			resend(d, t, f);
 		}
 
@@ -736,6 +748,45 @@ diskstats(struct gendisk *disk, struct bio *bio, ulong duration, sector_t sector
 	part_stat_unlock();
 }
 
+static void
+bvcpy(struct bio_vec *bv, ulong off, struct sk_buff *skb, ulong cnt)
+{
+	ulong fcnt;
+	char *p;
+	int soff = 0;
+loop:
+	fcnt = bv->bv_len - (off - bv->bv_offset);
+	if (fcnt > cnt)
+		fcnt = cnt;
+	p = page_address(bv->bv_page) + off;
+	skb_copy_bits(skb, soff, p, fcnt);
+	soff += fcnt;
+	cnt -= fcnt;
+	if (cnt <= 0)
+		return;
+	bv++;
+	off = bv->bv_offset;
+	goto loop;
+}
+
+static void
+fadvance(struct frame *f, ulong cnt)
+{
+	ulong fcnt;
+
+	f->lba += cnt >> 9;
+loop:
+	fcnt = f->bv->bv_len - (f->bv_off - f->bv->bv_offset);
+	if (fcnt > cnt) {
+		f->bv_off += cnt;
+		return;
+	}
+	cnt -= fcnt;
+	f->bv++;
+	f->bv_off = f->bv->bv_offset;
+	goto loop;
+}
+
 void
 aoecmd_ata_rsp(struct sk_buff *skb)
 {
@@ -753,6 +804,7 @@ aoecmd_ata_rsp(struct sk_buff *skb)
 	u16 aoemajor;
 
 	hin = (struct aoe_hdr *) skb_mac_header(skb);
+	skb_pull(skb, sizeof(*hin));
 	aoemajor = get_unaligned_be16(&hin->major);
 	d = aoedev_by_aoeaddr(aoemajor, hin->minor);
 	if (d == NULL) {
@@ -790,7 +842,8 @@ aoecmd_ata_rsp(struct sk_buff *skb)
 
 	calc_rttavg(d, tsince(f->tag));
 
-	ahin = (struct aoe_atahdr *) (hin+1);
+	ahin = (struct aoe_atahdr *) skb->data;
+	skb_pull(skb, sizeof(*ahin));
 	hout = (struct aoe_hdr *) skb_mac_header(f->skb);
 	ahout = (struct aoe_atahdr *) (hout+1);
 	buf = f->buf;
@@ -809,7 +862,7 @@ aoecmd_ata_rsp(struct sk_buff *skb)
 		switch (ahout->cmdstat) {
 		case ATA_CMD_PIO_READ:
 		case ATA_CMD_PIO_READ_EXT:
-			if (skb->len - sizeof *hin - sizeof *ahin < n) {
+			if (skb->len < n) {
 				printk(KERN_ERR
 					"aoe: %s.  skb->len=%d need=%ld\n",
 					"runt data size in read", skb->len, n);
@@ -817,7 +870,7 @@ aoecmd_ata_rsp(struct sk_buff *skb)
 				spin_unlock_irqrestore(&d->lock, flags);
 				return;
 			}
-			memcpy(f->bufaddr, ahin+1, n);
+			bvcpy(f->bv, f->bv_off, skb, n);
 		case ATA_CMD_PIO_WRITE:
 		case ATA_CMD_PIO_WRITE_EXT:
 			ifp = getif(t, skb->dev);
@@ -827,21 +880,22 @@ aoecmd_ata_rsp(struct sk_buff *skb)
 					ifp->lostjumbo = 0;
 			}
 			if (f->bcnt -= n) {
-				f->lba += n >> 9;
-				f->bufaddr += n;
+				fadvance(f, n);
 				resend(d, t, f);
 				goto xmit;
 			}
 			break;
 		case ATA_CMD_ID_ATA:
-			if (skb->len - sizeof *hin - sizeof *ahin < 512) {
+			if (skb->len < 512) {
 				printk(KERN_INFO
 					"aoe: runt data size in ataid.  skb->len=%d\n",
 					skb->len);
 				spin_unlock_irqrestore(&d->lock, flags);
 				return;
 			}
-			ataid_complete(d, t, (char *) (ahin+1));
+			if (skb_linearize(skb))
+				break;
+			ataid_complete(d, t, skb->data);
 			break;
 		default:
 			printk(KERN_INFO
diff --git a/drivers/block/aoe/aoedev.c b/drivers/block/aoe/aoedev.c
index 6b5110a..b2d1fd3 100644
--- a/drivers/block/aoe/aoedev.c
+++ b/drivers/block/aoe/aoedev.c
@@ -182,6 +182,7 @@ skbfree(struct sk_buff *skb)
 			"cannot free skb -- memory leaked.");
 		return;
 	}
+	skb->truesize -= skb->data_len;
 	skb_shinfo(skb)->nr_frags = skb->data_len = 0;
 	skb_trim(skb, 0);
 	dev_kfree_skb(skb);
diff --git a/drivers/block/aoe/aoenet.c b/drivers/block/aoe/aoenet.c
index 4d3bc0d..0787807 100644
--- a/drivers/block/aoe/aoenet.c
+++ b/drivers/block/aoe/aoenet.c
@@ -102,7 +102,9 @@ static int
 aoenet_rcv(struct sk_buff *skb, struct net_device *ifp, struct packet_type *pt, struct net_device *orig_dev)
 {
 	struct aoe_hdr *h;
+	struct aoe_atahdr *ah;
 	u32 n;
+	int sn;
 
 	if (dev_net(ifp) != &init_net)
 		goto exit;
@@ -110,13 +112,16 @@ aoenet_rcv(struct sk_buff *skb, struct net_device *ifp, struct packet_type *pt,
 	skb = skb_share_check(skb, GFP_ATOMIC);
 	if (skb == NULL)
 		return 0;
-	if (skb_linearize(skb))
-		goto exit;
 	if (!is_aoe_netif(ifp))
 		goto exit;
 	skb_push(skb, ETH_HLEN);	/* (1) */
-
-	h = (struct aoe_hdr *) skb_mac_header(skb);
+	sn = sizeof(*h) + sizeof(*ah);
+	if (skb->len >= sn) {
+		sn -= skb_headlen(skb);
+		if (sn > 0 && !__pskb_pull_tail(skb, sn))
+			goto exit;
+	}
+	h = (struct aoe_hdr *) skb->data;
 	n = get_unaligned_be32(&h->tag);
 	if ((h->verfl & AOEFL_RSP) == 0 || (n & 1<<31))
 		goto exit;
-- 
1.7.2.5


^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [PATCH 02/14] aoe: kernel thread handles I/O completions for simple locking
  2012-08-28 12:53 [PATCH 00/14] aoe driver v49 performance and usability improvements Ed Cashin
  2012-08-25 14:39 ` [PATCH 01/14] aoe: for performance support larger packet payloads Ed Cashin
  2012-08-25 14:39 ` [PATCH 03/14] aoe: become I/O request queue handler for increased user control Ed Cashin
@ 2012-08-25 14:39 ` Ed Cashin
  2012-08-31 20:06   ` Andrew Morton
  2012-08-25 14:39 ` [PATCH 04/14] aoe: use a kernel thread for transmissions Ed Cashin
                   ` (10 subsequent siblings)
  13 siblings, 1 reply; 20+ messages in thread
From: Ed Cashin @ 2012-08-25 14:39 UTC (permalink / raw)
  To: Andrew Morton; +Cc: linux-kernel, ecashin

This patch makes the frames the aoe driver uses to track the
relationship between bios and packets more flexible and detached, so
that they can be passed to an "aoe_ktio" thread for completion of I/O.

The frames are handled much like skbs, with a capped amount of
preallocation so that real-world use cases are likely to run smoothly
and degenerate gracefully even under memory pressure.

Decoupling I/O completion from the receive path and serializing it in
a process makes it easier to think about the correctness of the
locking in the driver, especially in the case of a remote MAC address
becoming unusable.

Signed-off-by: Ed Cashin <ecashin@coraid.com>
---
 drivers/block/aoe/aoe.h     |   33 ++-
 drivers/block/aoe/aoechr.c  |    3 +-
 drivers/block/aoe/aoecmd.c  |  729 ++++++++++++++++++++++++++++---------------
 drivers/block/aoe/aoedev.c  |   84 +++--
 drivers/block/aoe/aoemain.c |    8 +-
 drivers/block/aoe/aoenet.c  |    6 +-
 6 files changed, 559 insertions(+), 304 deletions(-)

diff --git a/drivers/block/aoe/aoe.h b/drivers/block/aoe/aoe.h
index 8ca8c8a..0cd6c0f 100644
--- a/drivers/block/aoe/aoe.h
+++ b/drivers/block/aoe/aoe.h
@@ -91,6 +91,7 @@ enum {
 	NTARGETS = 8,
 	NAOEIFS = 8,
 	NSKBPOOLMAX = 128,
+	NFACTIVE = 17,
 
 	TIMERTICK = HZ / 10,
 	MINTIMER = HZ >> 2,
@@ -112,13 +113,16 @@ struct buf {
 };
 
 struct frame {
-	int tag;
+	struct list_head head;
+	u32 tag;
 	ulong waited;
 	struct buf *buf;
+	struct aoetgt *t;		/* parent target I belong to */
 	char *bufaddr;
 	ulong bcnt;
 	sector_t lba;
-	struct sk_buff *skb;
+	struct sk_buff *skb;		/* command skb freed on module exit */
+	struct sk_buff *r_skb;		/* response skb for async processing */
 	struct bio_vec *bv;
 	ulong bv_off;
 };
@@ -133,16 +137,18 @@ struct aoeif {
 struct aoetgt {
 	unsigned char addr[6];
 	ushort nframes;
-	struct frame *frames;
+	struct aoedev *d;			/* parent device I belong to */
+	struct list_head factive[NFACTIVE];	/* hash of active frames */
+	struct list_head ffree;			/* list of free frames */
 	struct aoeif ifs[NAOEIFS];
 	struct aoeif *ifp;	/* current aoeif in use */
 	ushort nout;
 	ushort maxout;
 	u16 lasttag;		/* last tag sent */
 	u16 useme;
+	ulong falloc;
 	ulong lastwadj;		/* last window adjustment */
 	int wpkts, rpkts;
-	int dataref;
 };
 
 struct aoedev {
@@ -169,9 +175,20 @@ struct aoedev {
 	struct buf *inprocess;	/* the one we're currently working on */
 	struct aoetgt *targets[NTARGETS];
 	struct aoetgt **tgt;	/* target in use when working */
-	struct aoetgt **htgt;	/* target needing rexmit assistance */
+	struct aoetgt *htgt;	/* target needing rexmit assistance */
+	ulong ntargets;
+	ulong kicked;
 };
 
+/* kthread tracking */
+struct ktstate {
+	struct completion rendez;
+	struct task_struct *task;
+	wait_queue_head_t *waitq;
+	int (*fn) (void);
+	char *name;
+	spinlock_t *lock;
+};
 
 int aoeblk_init(void);
 void aoeblk_exit(void);
@@ -184,11 +201,14 @@ void aoechr_error(char *);
 
 void aoecmd_work(struct aoedev *d);
 void aoecmd_cfg(ushort aoemajor, unsigned char aoeminor);
-void aoecmd_ata_rsp(struct sk_buff *);
+struct sk_buff *aoecmd_ata_rsp(struct sk_buff *);
 void aoecmd_cfg_rsp(struct sk_buff *);
 void aoecmd_sleepwork(struct work_struct *);
 void aoecmd_cleanslate(struct aoedev *);
+void aoecmd_exit(void);
+int aoecmd_init(void);
 struct sk_buff *aoecmd_ata_id(struct aoedev *);
+void aoe_freetframe(struct frame *);
 
 int aoedev_init(void);
 void aoedev_exit(void);
@@ -196,6 +216,7 @@ struct aoedev *aoedev_by_aoeaddr(int maj, int min);
 struct aoedev *aoedev_by_sysminor_m(ulong sysminor);
 void aoedev_downdev(struct aoedev *d);
 int aoedev_flush(const char __user *str, size_t size);
+void aoe_failbuf(struct aoedev *d, struct buf *buf);
 
 int aoenet_init(void);
 void aoenet_exit(void);
diff --git a/drivers/block/aoe/aoechr.c b/drivers/block/aoe/aoechr.c
index e86d206..f145388 100644
--- a/drivers/block/aoe/aoechr.c
+++ b/drivers/block/aoe/aoechr.c
@@ -86,10 +86,9 @@ revalidate(const char __user *str, size_t size)
 	if (copy_from_user(buf, str, size))
 		return -EFAULT;
 
-	/* should be e%d.%d format */
 	n = sscanf(buf, "e%d.%d", &major, &minor);
 	if (n != 2) {
-		printk(KERN_ERR "aoe: invalid device specification\n");
+		pr_err("aoe: invalid device specification %s\n", buf);
 		return -EINVAL;
 	}
 	d = aoedev_by_aoeaddr(major, minor);
diff --git a/drivers/block/aoe/aoecmd.c b/drivers/block/aoe/aoecmd.c
index f10ab49..da66a6a 100644
--- a/drivers/block/aoe/aoecmd.c
+++ b/drivers/block/aoe/aoecmd.c
@@ -12,10 +12,17 @@
 #include <linux/netdevice.h>
 #include <linux/genhd.h>
 #include <linux/moduleparam.h>
+#include <linux/workqueue.h>
+#include <linux/kthread.h>
 #include <net/net_namespace.h>
 #include <asm/unaligned.h>
+#include <linux/uio.h>
 #include "aoe.h"
 
+#define MAXIOC (8192)	/* default meant to avoid most soft lockups */
+
+static void ktcomplete(struct frame *, struct sk_buff *);
+
 static int aoe_deadsecs = 60 * 3;
 module_param(aoe_deadsecs, int, 0644);
 MODULE_PARM_DESC(aoe_deadsecs, "After aoe_deadsecs seconds, give up and fail dev.");
@@ -25,6 +32,15 @@ module_param(aoe_maxout, int, 0644);
 MODULE_PARM_DESC(aoe_maxout,
 	"Only aoe_maxout outstanding packets for every MAC on eX.Y.");
 
+static wait_queue_head_t ktiowq;
+static struct ktstate kts;
+
+/* io completion queue */
+static struct {
+	struct list_head head;
+	spinlock_t lock;
+} iocq;
+
 static struct sk_buff *
 new_skb(ulong len)
 {
@@ -40,15 +56,21 @@ new_skb(ulong len)
 }
 
 static struct frame *
-getframe(struct aoetgt *t, int tag)
+getframe(struct aoetgt *t, u32 tag)
 {
-	struct frame *f, *e;
+	struct frame *f;
+	struct list_head *head, *pos, *nx;
+	u32 n;
 
-	f = t->frames;
-	e = f + t->nframes;
-	for (; f<e; f++)
-		if (f->tag == tag)
+	n = tag % NFACTIVE;
+	head = &t->factive[n];
+	list_for_each_safe(pos, nx, head) {
+		f = list_entry(pos, struct frame, head);
+		if (f->tag == tag) {
+			list_del(pos);
 			return f;
+		}
+	}
 	return NULL;
 }
 
@@ -66,7 +88,7 @@ newtag(struct aoetgt *t)
 	return n |= (++t->lasttag & 0x7fff) << 16;
 }
 
-static int
+static u32
 aoehdr_atainit(struct aoedev *d, struct aoetgt *t, struct aoe_hdr *h)
 {
 	u32 host_tag = newtag(t);
@@ -128,75 +150,96 @@ skb_pool_get(struct aoedev *d)
 	return NULL;
 }
 
-/* freeframe is where we do our load balancing so it's a little hairy. */
+void
+aoe_freetframe(struct frame *f)
+{
+	struct aoetgt *t;
+
+	t = f->t;
+	f->buf = NULL;
+	f->bv = NULL;
+	f->r_skb = NULL;
+	list_add(&f->head, &t->ffree);
+}
+
 static struct frame *
-freeframe(struct aoedev *d)
+newtframe(struct aoedev *d, struct aoetgt *t)
 {
-	struct frame *f, *e, *rf;
-	struct aoetgt **t;
+	struct frame *f;
 	struct sk_buff *skb;
+	struct list_head *pos;
+
+	if (list_empty(&t->ffree)) {
+		if (t->falloc >= NSKBPOOLMAX*2)
+			return NULL;
+		f = kcalloc(1, sizeof(*f), GFP_ATOMIC);
+		if (f == NULL)
+			return NULL;
+		t->falloc++;
+		f->t = t;
+	} else {
+		pos = t->ffree.next;
+		list_del(pos);
+		f = list_entry(pos, struct frame, head);
+	}
+
+	skb = f->skb;
+	if (skb == NULL) {
+		f->skb = skb = new_skb(ETH_ZLEN);
+		if (!skb) {
+bail:			aoe_freetframe(f);
+			return NULL;
+		}
+	}
+
+	if (atomic_read(&skb_shinfo(skb)->dataref) != 1) {
+		skb = skb_pool_get(d);
+		if (skb == NULL)
+			goto bail;
+		skb_pool_put(d, f->skb);
+		f->skb = skb;
+	}
+
+	skb->truesize -= skb->data_len;
+	skb_shinfo(skb)->nr_frags = skb->data_len = 0;
+	skb_trim(skb, 0);
+	return f;
+}
+
+static struct frame *
+newframe(struct aoedev *d)
+{
+	struct frame *f;
+	struct aoetgt *t, **tt;
+	int totout = 0;
 
 	if (d->targets[0] == NULL) {	/* shouldn't happen, but I'm paranoid */
 		printk(KERN_ERR "aoe: NULL TARGETS!\n");
 		return NULL;
 	}
-	t = d->tgt;
-	t++;
-	if (t >= &d->targets[NTARGETS] || !*t)
-		t = d->targets;
+	tt = d->tgt;	/* last used target */
 	for (;;) {
-		if ((*t)->nout < (*t)->maxout
+		tt++;
+		if (tt >= &d->targets[NTARGETS] || !*tt)
+			tt = d->targets;
+		t = *tt;
+		totout += t->nout;
+		if (t->nout < t->maxout
 		&& t != d->htgt
-		&& (*t)->ifp->nd) {
-			rf = NULL;
-			f = (*t)->frames;
-			e = f + (*t)->nframes;
-			for (; f < e; f++) {
-				if (f->tag != FREETAG)
-					continue;
-				skb = f->skb;
-				if (!skb
-				&& !(f->skb = skb = new_skb(ETH_ZLEN)))
-					continue;
-				if (atomic_read(&skb_shinfo(skb)->dataref)
-					!= 1) {
-					if (!rf)
-						rf = f;
-					continue;
-				}
-gotone:				skb->truesize -= skb->data_len;
-				skb_shinfo(skb)->nr_frags = skb->data_len = 0;
-				skb_trim(skb, 0);
-				d->tgt = t;
-				ifrotate(*t);
+		&& t->ifp->nd) {
+			f = newtframe(d, t);
+			if (f) {
+				d->tgt = tt;
+				ifrotate(t);
 				return f;
 			}
-			/* Work can be done, but the network layer is
-			   holding our precious packets.  Try to grab
-			   one from the pool. */
-			f = rf;
-			if (f == NULL) {	/* more paranoia */
-				printk(KERN_ERR
-					"aoe: freeframe: %s.\n",
-					"unexpected null rf");
-				d->flags |= DEVFL_KICKME;
-				return NULL;
-			}
-			skb = skb_pool_get(d);
-			if (skb) {
-				skb_pool_put(d, f->skb);
-				f->skb = skb;
-				goto gotone;
-			}
-			(*t)->dataref++;
-			if ((*t)->nout == 0)
-				d->flags |= DEVFL_KICKME;
 		}
-		if (t == d->tgt)	/* we've looped and found nada */
+		if (tt == d->tgt)	/* we've looped and found nada */
 			break;
-		t++;
-		if (t >= &d->targets[NTARGETS] || !*t)
-			t = d->targets;
+	}
+	if (totout == 0) {
+		d->kicked++;
+		d->flags |= DEVFL_KICKME;
 	}
 	return NULL;
 }
@@ -219,6 +262,16 @@ loop:
 	goto loop;
 }
 
+static void
+fhash(struct frame *f)
+{
+	struct aoetgt *t = f->t;
+	u32 n;
+
+	n = f->tag % NFACTIVE;
+	list_add_tail(&f->head, &t->factive[n]);
+}
+
 static int
 aoecmd_ata_rw(struct aoedev *d)
 {
@@ -235,7 +288,7 @@ aoecmd_ata_rw(struct aoedev *d)
 	writebit = 0x10;
 	extbit = 0x4;
 
-	f = freeframe(d);
+	f = newframe(d);
 	if (f == NULL)
 		return 0;
 	t = *d->tgt;
@@ -273,6 +326,7 @@ aoecmd_ata_rw(struct aoedev *d)
 	skb_put(skb, sizeof *h + sizeof *ah);
 	memset(h, 0, skb->len);
 	f->tag = aoehdr_atainit(d, t, h);
+	fhash(f);
 	t->nout++;
 	f->waited = 0;
 	f->buf = buf;
@@ -357,14 +411,16 @@ cont:
 }
 
 static void
-resend(struct aoedev *d, struct aoetgt *t, struct frame *f)
+resend(struct aoedev *d, struct frame *f)
 {
 	struct sk_buff *skb;
 	struct aoe_hdr *h;
 	struct aoe_atahdr *ah;
+	struct aoetgt *t;
 	char buf[128];
 	u32 n;
 
+	t = f->t;
 	ifrotate(t);
 	n = newtag(t);
 	skb = f->skb;
@@ -378,28 +434,11 @@ resend(struct aoedev *d, struct aoetgt *t, struct frame *f)
 	aoechr_error(buf);
 
 	f->tag = n;
+	fhash(f);
 	h->tag = cpu_to_be32(n);
 	memcpy(h->dst, t->addr, sizeof h->dst);
 	memcpy(h->src, t->ifp->nd->dev_addr, sizeof h->src);
 
-	switch (ah->cmdstat) {
-	default:
-		break;
-	case ATA_CMD_PIO_READ:
-	case ATA_CMD_PIO_READ_EXT:
-	case ATA_CMD_PIO_WRITE:
-	case ATA_CMD_PIO_WRITE_EXT:
-		put_lba(ah, f->lba);
-
-		n = f->bcnt;
-		ah->scnt = n >> 9;
-		if (ah->aflags & AOEAFL_WRITE) {
-			skb_fillup(skb, f->bv, f->bv_off, n);
-			skb->len = sizeof *h + sizeof *ah + n;
-			skb->data_len = n;
-			skb->truesize += n;
-		}
-	}
 	skb->dev = t->ifp->nd;
 	skb = skb_clone(skb, GFP_ATOMIC);
 	if (skb == NULL)
@@ -408,7 +447,7 @@ resend(struct aoedev *d, struct aoetgt *t, struct frame *f)
 }
 
 static int
-tsince(int tag)
+tsince(u32 tag)
 {
 	int n;
 
@@ -462,26 +501,38 @@ ejectif(struct aoetgt *t, struct aoeif *ifp)
 static int
 sthtith(struct aoedev *d)
 {
-	struct frame *f, *e, *nf;
+	struct frame *f, *nf;
+	struct list_head *nx, *pos, *head;
 	struct sk_buff *skb;
-	struct aoetgt *ht = *d->htgt;
-
-	f = ht->frames;
-	e = f + ht->nframes;
-	for (; f < e; f++) {
-		if (f->tag == FREETAG)
-			continue;
-		nf = freeframe(d);
-		if (!nf)
-			return 0;
-		skb = nf->skb;
-		*nf = *f;
-		f->skb = skb;
-		f->tag = FREETAG;
-		nf->waited = 0;
-		ht->nout--;
-		(*d->tgt)->nout++;
-		resend(d, *d->tgt, nf);
+	struct aoetgt *ht = d->htgt;
+	int i;
+
+	for (i = 0; i < NFACTIVE; i++) {
+		head = &ht->factive[i];
+		list_for_each_safe(pos, nx, head) {
+			f = list_entry(pos, struct frame, head);
+			nf = newframe(d);
+			if (!nf)
+				return 0;
+
+			/* remove frame from active list */
+			list_del(pos);
+
+			/* reassign all pertinent bits to new outbound frame */
+			skb = nf->skb;
+			nf->skb = f->skb;
+			nf->buf = f->buf;
+			nf->bcnt = f->bcnt;
+			nf->lba = f->lba;
+			nf->bv = f->bv;
+			nf->bv_off = f->bv_off;
+			nf->waited = 0;
+			f->skb = skb;
+			aoe_freetframe(f);
+			ht->nout--;
+			nf->t->nout++;
+			resend(d, nf);
+		}
 	}
 	/* he's clean, he's useless.  take away his interfaces */
 	memset(ht->ifs, 0, sizeof ht->ifs);
@@ -506,9 +557,12 @@ rexmit_timer(ulong vp)
 	struct aoedev *d;
 	struct aoetgt *t, **tt, **te;
 	struct aoeif *ifp;
-	struct frame *f, *e;
+	struct frame *f;
+	struct list_head *head, *pos, *nx;
+	LIST_HEAD(flist);
 	register long timeout;
 	ulong flags, n;
+	int i;
 
 	d = (struct aoedev *) vp;
 
@@ -522,41 +576,21 @@ rexmit_timer(ulong vp)
 		spin_unlock_irqrestore(&d->lock, flags);
 		return;
 	}
+
+	/* collect all frames to rexmit into flist */
 	tt = d->targets;
 	te = tt + NTARGETS;
 	for (; tt < te && *tt; tt++) {
 		t = *tt;
-		f = t->frames;
-		e = f + t->nframes;
-		for (; f < e; f++) {
-			if (f->tag == FREETAG
-			|| tsince(f->tag) < timeout)
-				continue;
-			n = f->waited += timeout;
-			n /= HZ;
-			if (n > aoe_deadsecs) {
-				/* waited too long.  device failure. */
-				aoedev_downdev(d);
-				break;
-			}
-
-			if (n > HELPWAIT /* see if another target can help */
-			&& (tt != d->targets || d->targets[1]))
-				d->htgt = tt;
-
-			if (t->nout == t->maxout) {
-				if (t->maxout > 1)
-					t->maxout--;
-				t->lastwadj = jiffies;
-			}
-
-			ifp = getif(t, f->skb->dev);
-			if (ifp && ++ifp->lost > (t->nframes << 1)
-			&& (ifp != t->ifs || t->ifs[1].nd)) {
-				ejectif(t, ifp);
-				ifp = NULL;
+		for (i = 0; i < NFACTIVE; i++) {
+			head = &t->factive[i];
+			list_for_each_safe(pos, nx, head) {
+				f = list_entry(pos, struct frame, head);
+				if (tsince(f->tag) < timeout)
+					continue;
+				/* move to flist for later processing */
+				list_move_tail(pos, &flist);
 			}
-			resend(d, t, f);
 		}
 
 		/* window check */
@@ -568,6 +602,44 @@ rexmit_timer(ulong vp)
 		}
 	}
 
+	/* process expired frames */
+	while (!list_empty(&flist)) {
+		pos = flist.next;
+		f = list_entry(pos, struct frame, head);
+		n = f->waited += timeout;
+		n /= HZ;
+		if (n > aoe_deadsecs) {
+			/* Waited too long.  Device failure.
+			 * Hang all frames on first hash bucket for downdev
+			 * to clean up.
+			 */
+			list_splice(&flist, &f->t->factive[0]);
+			aoedev_downdev(d);
+			break;
+		}
+		list_del(pos);
+
+		t = f->t;
+		if (n > HELPWAIT) {
+			/* see if another target can help */
+			if (d->ntargets > 1)
+				d->htgt = t;
+		}
+		if (t->nout == t->maxout) {
+			if (t->maxout > 1)
+				t->maxout--;
+			t->lastwadj = jiffies;
+		}
+
+		ifp = getif(t, f->skb->dev);
+		if (ifp && ++ifp->lost > (t->nframes << 1)
+		&& (ifp != t->ifs || t->ifs[1].nd)) {
+			ejectif(t, ifp);
+			ifp = NULL;
+		}
+		resend(d, f);
+	}
+
 	if (!skb_queue_empty(&d->sendq)) {
 		n = d->rttavg <<= 1;
 		if (n > MAXTIMER)
@@ -749,7 +821,7 @@ diskstats(struct gendisk *disk, struct bio *bio, ulong duration, sector_t sector
 }
 
 static void
-bvcpy(struct bio_vec *bv, ulong off, struct sk_buff *skb, ulong cnt)
+bvcpy(struct bio_vec *bv, ulong off, struct sk_buff *skb, long cnt)
 {
 	ulong fcnt;
 	char *p;
@@ -770,60 +842,225 @@ loop:
 }
 
 static void
-fadvance(struct frame *f, ulong cnt)
+ktiocomplete(struct frame *f)
 {
-	ulong fcnt;
+	struct aoe_hdr *hin, *hout;
+	struct aoe_atahdr *ahin, *ahout;
+	struct buf *buf;
+	struct sk_buff *skb;
+	struct aoetgt *t;
+	struct aoeif *ifp;
+	struct aoedev *d;
+	long n;
 
-	f->lba += cnt >> 9;
-loop:
-	fcnt = f->bv->bv_len - (f->bv_off - f->bv->bv_offset);
-	if (fcnt > cnt) {
-		f->bv_off += cnt;
+	if (f == NULL)
 		return;
+
+	t = f->t;
+	d = t->d;
+
+	hout = (struct aoe_hdr *) skb_mac_header(f->skb);
+	ahout = (struct aoe_atahdr *) (hout+1);
+	buf = f->buf;
+	skb = f->r_skb;
+	if (skb == NULL)
+		goto noskb;	/* just fail the buf. */
+
+	hin = (struct aoe_hdr *) skb->data;
+	skb_pull(skb, sizeof(*hin));
+	ahin = (struct aoe_atahdr *) skb->data;
+	skb_pull(skb, sizeof(*ahin));
+	if (ahin->cmdstat & 0xa9) {	/* these bits cleared on success */
+		pr_err("aoe: ata error cmd=%2.2Xh stat=%2.2Xh from e%ld.%d\n",
+			ahout->cmdstat, ahin->cmdstat,
+			d->aoemajor, d->aoeminor);
+noskb:	if (buf)
+			buf->flags |= BUFFL_FAIL;
+		goto badrsp;
 	}
-	cnt -= fcnt;
-	f->bv++;
-	f->bv_off = f->bv->bv_offset;
-	goto loop;
+
+	n = ahout->scnt << 9;
+	switch (ahout->cmdstat) {
+	case ATA_CMD_PIO_READ:
+	case ATA_CMD_PIO_READ_EXT:
+		if (skb->len < n) {
+			pr_err("aoe: runt data size in read.  skb->len=%d need=%ld\n",
+				skb->len, n);
+			buf->flags |= BUFFL_FAIL;
+			break;
+		}
+		bvcpy(f->bv, f->bv_off, skb, n);
+	case ATA_CMD_PIO_WRITE:
+	case ATA_CMD_PIO_WRITE_EXT:
+		spin_lock_irq(&d->lock);
+		ifp = getif(t, skb->dev);
+		if (ifp) {
+			ifp->lost = 0;
+			if (n > DEFAULTBCNT)
+				ifp->lostjumbo = 0;
+		}
+		if (d->htgt == t) /* I'll help myself, thank you. */
+			d->htgt = NULL;
+		spin_unlock_irq(&d->lock);
+		break;
+	case ATA_CMD_ID_ATA:
+		if (skb->len < 512) {
+			pr_info("aoe: runt data size in ataid.  skb->len=%d\n",
+				skb->len);
+			break;
+		}
+		if (skb_linearize(skb))
+			break;
+		spin_lock_irq(&d->lock);
+		ataid_complete(d, t, skb->data);
+		spin_unlock_irq(&d->lock);
+		break;
+	default:
+		pr_info("aoe: unrecognized ata command %2.2Xh for %d.%d\n",
+			ahout->cmdstat,
+			be16_to_cpu(get_unaligned(&hin->major)),
+			hin->minor);
+	}
+badrsp:
+	spin_lock_irq(&d->lock);
+
+	aoe_freetframe(f);
+
+	if (buf && --buf->nframesout == 0 && buf->resid == 0) {
+		struct bio *bio = buf->bio;
+
+		diskstats(d->gd, bio, jiffies - buf->stime, buf->sector);
+		n = (buf->flags & BUFFL_FAIL) ? -EIO : 0;
+		mempool_free(buf, d->bufpool);
+		spin_unlock_irq(&d->lock);
+		if (n != -EIO)
+			bio_flush_dcache_pages(buf->bio);
+		bio_endio(bio, n);
+	} else
+		spin_unlock_irq(&d->lock);
+	dev_kfree_skb(skb);
 }
 
-void
+/* Enters with iocq.lock held.
+ * Returns true iff responses needing processing remain.
+ */
+static int
+ktio(void)
+{
+	struct frame *f;
+	struct list_head *pos;
+	int i;
+
+	for (i = 0; ; ++i) {
+		if (i == MAXIOC)
+			return 1;
+		if (list_empty(&iocq.head))
+			return 0;
+		pos = iocq.head.next;
+		list_del(pos);
+		spin_unlock_irq(&iocq.lock);
+		f = list_entry(pos, struct frame, head);
+		ktiocomplete(f);
+		spin_lock_irq(&iocq.lock);
+	}
+}
+
+static int
+kthread(void *vp)
+{
+	struct ktstate *k;
+	DECLARE_WAITQUEUE(wait, current);
+	int more;
+
+	k = vp;
+	current->flags |= PF_NOFREEZE;
+	set_user_nice(current, -10);
+	complete(&k->rendez);	/* tell spawner we're running */
+	do {
+		spin_lock_irq(k->lock);
+		more = k->fn();
+		if (!more) {
+			add_wait_queue(k->waitq, &wait);
+			__set_current_state(TASK_INTERRUPTIBLE);
+		}
+		spin_unlock_irq(k->lock);
+		if (!more) {
+			schedule();
+			remove_wait_queue(k->waitq, &wait);
+		} else
+			cond_resched();
+	} while (!kthread_should_stop());
+	complete(&k->rendez);	/* tell spawner we're stopping */
+	return 0;
+}
+
+static void
+aoe_ktstop(struct ktstate *k)
+{
+	kthread_stop(k->task);
+	wait_for_completion(&k->rendez);
+}
+
+static int
+aoe_ktstart(struct ktstate *k)
+{
+	struct task_struct *task;
+
+	init_completion(&k->rendez);
+	task = kthread_run(kthread, k, k->name);
+	if (task == NULL || IS_ERR(task))
+		return -ENOMEM;
+	k->task = task;
+	wait_for_completion(&k->rendez); /* allow kthread to start */
+	init_completion(&k->rendez);	/* for waiting for exit later */
+	return 0;
+}
+
+/* pass it off to kthreads for processing */
+static void
+ktcomplete(struct frame *f, struct sk_buff *skb)
+{
+	ulong flags;
+
+	f->r_skb = skb;
+	spin_lock_irqsave(&iocq.lock, flags);
+	list_add_tail(&f->head, &iocq.head);
+	spin_unlock_irqrestore(&iocq.lock, flags);
+	wake_up(&ktiowq);
+}
+
+struct sk_buff *
 aoecmd_ata_rsp(struct sk_buff *skb)
 {
-	struct sk_buff_head queue;
 	struct aoedev *d;
-	struct aoe_hdr *hin, *hout;
-	struct aoe_atahdr *ahin, *ahout;
+	struct aoe_hdr *h;
 	struct frame *f;
-	struct buf *buf;
 	struct aoetgt *t;
-	struct aoeif *ifp;
-	register long n;
+	u32 n;
 	ulong flags;
 	char ebuf[128];
 	u16 aoemajor;
 
-	hin = (struct aoe_hdr *) skb_mac_header(skb);
-	skb_pull(skb, sizeof(*hin));
-	aoemajor = get_unaligned_be16(&hin->major);
-	d = aoedev_by_aoeaddr(aoemajor, hin->minor);
+	h = (struct aoe_hdr *) skb->data;
+	aoemajor = be16_to_cpu(get_unaligned(&h->major));
+	d = aoedev_by_aoeaddr(aoemajor, h->minor);
 	if (d == NULL) {
 		snprintf(ebuf, sizeof ebuf, "aoecmd_ata_rsp: ata response "
 			"for unknown device %d.%d\n",
-			 aoemajor, hin->minor);
+			aoemajor, h->minor);
 		aoechr_error(ebuf);
-		return;
+		return skb;
 	}
 
 	spin_lock_irqsave(&d->lock, flags);
 
-	n = get_unaligned_be32(&hin->tag);
-	t = gettgt(d, hin->src);
+	n = be32_to_cpu(get_unaligned(&h->tag));
+	t = gettgt(d, h->src);
 	if (t == NULL) {
 		printk(KERN_INFO "aoe: can't find target e%ld.%d:%pm\n",
-			d->aoemajor, d->aoeminor, hin->src);
+		       d->aoemajor, d->aoeminor, h->src);
 		spin_unlock_irqrestore(&d->lock, flags);
-		return;
+		return skb;
 	}
 	f = getframe(t, n);
 	if (f == NULL) {
@@ -832,102 +1069,26 @@ aoecmd_ata_rsp(struct sk_buff *skb)
 		snprintf(ebuf, sizeof ebuf,
 			"%15s e%d.%d    tag=%08x@%08lx\n",
 			"unexpected rsp",
-			get_unaligned_be16(&hin->major),
-			hin->minor,
-			get_unaligned_be32(&hin->tag),
+			get_unaligned_be16(&h->major),
+			h->minor,
+			get_unaligned_be32(&h->tag),
 			jiffies);
 		aoechr_error(ebuf);
-		return;
+		return skb;
 	}
-
 	calc_rttavg(d, tsince(f->tag));
-
-	ahin = (struct aoe_atahdr *) skb->data;
-	skb_pull(skb, sizeof(*ahin));
-	hout = (struct aoe_hdr *) skb_mac_header(f->skb);
-	ahout = (struct aoe_atahdr *) (hout+1);
-	buf = f->buf;
-
-	if (ahin->cmdstat & 0xa9) {	/* these bits cleared on success */
-		printk(KERN_ERR
-			"aoe: ata error cmd=%2.2Xh stat=%2.2Xh from e%ld.%d\n",
-			ahout->cmdstat, ahin->cmdstat,
-			d->aoemajor, d->aoeminor);
-		if (buf)
-			buf->flags |= BUFFL_FAIL;
-	} else {
-		if (d->htgt && t == *d->htgt) /* I'll help myself, thank you. */
-			d->htgt = NULL;
-		n = ahout->scnt << 9;
-		switch (ahout->cmdstat) {
-		case ATA_CMD_PIO_READ:
-		case ATA_CMD_PIO_READ_EXT:
-			if (skb->len < n) {
-				printk(KERN_ERR
-					"aoe: %s.  skb->len=%d need=%ld\n",
-					"runt data size in read", skb->len, n);
-				/* fail frame f?  just returning will rexmit. */
-				spin_unlock_irqrestore(&d->lock, flags);
-				return;
-			}
-			bvcpy(f->bv, f->bv_off, skb, n);
-		case ATA_CMD_PIO_WRITE:
-		case ATA_CMD_PIO_WRITE_EXT:
-			ifp = getif(t, skb->dev);
-			if (ifp) {
-				ifp->lost = 0;
-				if (n > DEFAULTBCNT)
-					ifp->lostjumbo = 0;
-			}
-			if (f->bcnt -= n) {
-				fadvance(f, n);
-				resend(d, t, f);
-				goto xmit;
-			}
-			break;
-		case ATA_CMD_ID_ATA:
-			if (skb->len < 512) {
-				printk(KERN_INFO
-					"aoe: runt data size in ataid.  skb->len=%d\n",
-					skb->len);
-				spin_unlock_irqrestore(&d->lock, flags);
-				return;
-			}
-			if (skb_linearize(skb))
-				break;
-			ataid_complete(d, t, skb->data);
-			break;
-		default:
-			printk(KERN_INFO
-				"aoe: unrecognized ata command %2.2Xh for %d.%d\n",
-				ahout->cmdstat,
-				get_unaligned_be16(&hin->major),
-				hin->minor);
-		}
-	}
-
-	if (buf && --buf->nframesout == 0 && buf->resid == 0) {
-		diskstats(d->gd, buf->bio, jiffies - buf->stime, buf->sector);
-		if (buf->flags & BUFFL_FAIL)
-			bio_endio(buf->bio, -EIO);
-		else {
-			bio_flush_dcache_pages(buf->bio);
-			bio_endio(buf->bio, 0);
-		}
-		mempool_free(buf, d->bufpool);
-	}
-
-	f->buf = NULL;
-	f->tag = FREETAG;
 	t->nout--;
-
 	aoecmd_work(d);
-xmit:
-	__skb_queue_head_init(&queue);
-	skb_queue_splice_init(&d->sendq, &queue);
 
 	spin_unlock_irqrestore(&d->lock, flags);
-	aoenet_xmit(&queue);
+
+	ktcomplete(f, skb);
+
+	/*
+	 * Note here that we do not perform an aoedev_put, as we are
+	 * leaving this reference for the ktio to release.
+	 */
+	return NULL;
 }
 
 void
@@ -949,7 +1110,7 @@ aoecmd_ata_id(struct aoedev *d)
 	struct sk_buff *skb;
 	struct aoetgt *t;
 
-	f = freeframe(d);
+	f = newframe(d);
 	if (f == NULL)
 		return NULL;
 
@@ -962,6 +1123,7 @@ aoecmd_ata_id(struct aoedev *d)
 	skb_put(skb, sizeof *h + sizeof *ah);
 	memset(h, 0, skb->len);
 	f->tag = aoehdr_atainit(d, t, h);
+	fhash(f);
 	t->nout++;
 	f->waited = 0;
 
@@ -982,7 +1144,7 @@ static struct aoetgt *
 addtgt(struct aoedev *d, char *addr, ulong nframes)
 {
 	struct aoetgt *t, **tt, **te;
-	struct frame *f, *e;
+	int i;
 
 	tt = d->targets;
 	te = tt + NTARGETS;
@@ -995,22 +1157,21 @@ addtgt(struct aoedev *d, char *addr, ulong nframes)
 		return NULL;
 	}
 	t = kcalloc(1, sizeof *t, GFP_ATOMIC);
-	f = kcalloc(nframes, sizeof *f, GFP_ATOMIC);
-	if (!t || !f) {
-		kfree(f);
+	if (!t) {
 		kfree(t);
 		printk(KERN_INFO "aoe: cannot allocate memory to add target\n");
 		return NULL;
 	}
 
+	d->ntargets++;
 	t->nframes = nframes;
-	t->frames = f;
-	e = f + nframes;
-	for (; f < e; f++)
-		f->tag = FREETAG;
+	t->d = d;
 	memcpy(t->addr, addr, sizeof t->addr);
 	t->ifp = t->ifs;
 	t->maxout = t->nframes;
+	INIT_LIST_HEAD(&t->ffree);
+	for (i = 0; i < NFACTIVE; ++i)
+		INIT_LIST_HEAD(&t->factive[i]);
 	return *tt = t;
 }
 
@@ -1135,3 +1296,53 @@ aoecmd_cleanslate(struct aoedev *d)
 		}
 	}
 }
+
+static void
+flush_iocq(void)
+{
+	struct frame *f;
+	struct aoedev *d;
+	LIST_HEAD(flist);
+	struct list_head *pos;
+	struct sk_buff *skb;
+	ulong flags;
+
+	spin_lock_irqsave(&iocq.lock, flags);
+	list_splice_init(&iocq.head, &flist);
+	spin_unlock_irqrestore(&iocq.lock, flags);
+	while (!list_empty(&flist)) {
+		pos = flist.next;
+		list_del(pos);
+		f = list_entry(pos, struct frame, head);
+		d = f->t->d;
+		skb = f->r_skb;
+		spin_lock_irqsave(&d->lock, flags);
+		if (f->buf) {
+			f->buf->nframesout--;
+			aoe_failbuf(d, f->buf);
+		}
+		aoe_freetframe(f);
+		spin_unlock_irqrestore(&d->lock, flags);
+		dev_kfree_skb(skb);
+	}
+}
+
+int __init
+aoecmd_init(void)
+{
+	INIT_LIST_HEAD(&iocq.head);
+	spin_lock_init(&iocq.lock);
+	init_waitqueue_head(&ktiowq);
+	kts.name = "aoe_ktio";
+	kts.fn = ktio;
+	kts.waitq = &ktiowq;
+	kts.lock = &iocq.lock;
+	return aoe_ktstart(&kts);
+}
+
+void
+aoecmd_exit(void)
+{
+	aoe_ktstop(&kts);
+	flush_iocq();
+}
diff --git a/drivers/block/aoe/aoedev.c b/drivers/block/aoe/aoedev.c
index b2d1fd3..40bae1a 100644
--- a/drivers/block/aoe/aoedev.c
+++ b/drivers/block/aoe/aoedev.c
@@ -48,47 +48,60 @@ dummy_timer(ulong vp)
 }
 
 void
-aoedev_downdev(struct aoedev *d)
+aoe_failbuf(struct aoedev *d, struct buf *buf)
 {
-	struct aoetgt **t, **te;
-	struct frame *f, *e;
-	struct buf *buf;
 	struct bio *bio;
 
-	t = d->targets;
-	te = t + NTARGETS;
-	for (; t < te && *t; t++) {
-		f = (*t)->frames;
-		e = f + (*t)->nframes;
-		for (; f < e; f->tag = FREETAG, f->buf = NULL, f++) {
-			if (f->tag == FREETAG || f->buf == NULL)
-				continue;
-			buf = f->buf;
-			bio = buf->bio;
-			if (--buf->nframesout == 0
-			&& buf != d->inprocess) {
-				mempool_free(buf, d->bufpool);
-				bio_endio(bio, -EIO);
-			}
-		}
-		(*t)->maxout = (*t)->nframes;
-		(*t)->nout = 0;
-	}
-	buf = d->inprocess;
-	if (buf) {
+	if (buf == NULL)
+		return;
+	buf->flags |= BUFFL_FAIL;
+	if (buf->nframesout == 0) {
+		if (buf == d->inprocess) /* ensure we only process this once */
+			d->inprocess = NULL;
 		bio = buf->bio;
 		mempool_free(buf, d->bufpool);
 		bio_endio(bio, -EIO);
 	}
+}
+
+void
+aoedev_downdev(struct aoedev *d)
+{
+	struct aoetgt *t, **tt, **te;
+	struct frame *f;
+	struct list_head *head, *pos, *nx;
+	int i;
+
+	/* clean out active buffers on all targets */
+	tt = d->targets;
+	te = tt + NTARGETS;
+	for (; tt < te && (t = *tt); tt++) {
+		for (i = 0; i < NFACTIVE; i++) {
+			head = &t->factive[i];
+			list_for_each_safe(pos, nx, head) {
+				list_del(pos);
+				f = list_entry(pos, struct frame, head);
+				if (f->buf) {
+					f->buf->nframesout--;
+					aoe_failbuf(d, f->buf);
+				}
+				aoe_freetframe(f);
+			}
+		}
+		t->maxout = t->nframes;
+		t->nout = 0;
+	}
+
+	/* clean out the in-process buffer (if any) */
+	aoe_failbuf(d, d->inprocess);
 	d->inprocess = NULL;
 	d->htgt = NULL;
 
+	/* clean out all pending I/O */
 	while (!list_empty(&d->bufq)) {
-		buf = container_of(d->bufq.next, struct buf, bufs);
+		struct buf *buf = container_of(d->bufq.next, struct buf, bufs);
 		list_del(d->bufq.next);
-		bio = buf->bio;
-		mempool_free(buf, d->bufpool);
-		bio_endio(bio, -EIO);
+		aoe_failbuf(d, buf);
 	}
 
 	if (d->gd)
@@ -242,13 +255,16 @@ aoedev_by_sysminor_m(ulong sysminor)
 static void
 freetgt(struct aoedev *d, struct aoetgt *t)
 {
-	struct frame *f, *e;
+	struct frame *f;
+	struct list_head *pos, *nx, *head;
 
-	f = t->frames;
-	e = f + t->nframes;
-	for (; f < e; f++)
+	head = &t->ffree;
+	list_for_each_safe(pos, nx, head) {
+		list_del(pos);
+		f = list_entry(pos, struct frame, head);
 		skbfree(f->skb);
-	kfree(t->frames);
+		kfree(f);
+	}
 	kfree(t);
 }
 
diff --git a/drivers/block/aoe/aoemain.c b/drivers/block/aoe/aoemain.c
index 7f83ad9..6fc4b05 100644
--- a/drivers/block/aoe/aoemain.c
+++ b/drivers/block/aoe/aoemain.c
@@ -61,6 +61,7 @@ aoe_exit(void)
 
 	aoenet_exit();
 	unregister_blkdev(AOE_MAJOR, DEVICE_NAME);
+	aoecmd_exit();
 	aoechr_exit();
 	aoedev_exit();
 	aoeblk_exit();		/* free cache after de-allocating bufs */
@@ -83,17 +84,20 @@ aoe_init(void)
 	ret = aoenet_init();
 	if (ret)
 		goto net_fail;
+	ret = aoecmd_init();
+	if (ret)
+		goto cmd_fail;
 	ret = register_blkdev(AOE_MAJOR, DEVICE_NAME);
 	if (ret < 0) {
 		printk(KERN_ERR "aoe: can't register major\n");
 		goto blkreg_fail;
 	}
-
 	printk(KERN_INFO "aoe: AoE v%s initialised.\n", VERSION);
 	discover_timer(TINIT);
 	return 0;
-
  blkreg_fail:
+	aoecmd_exit();
+ cmd_fail:
 	aoenet_exit();
  net_fail:
 	aoeblk_exit();
diff --git a/drivers/block/aoe/aoenet.c b/drivers/block/aoe/aoenet.c
index 0787807..000eff2 100644
--- a/drivers/block/aoe/aoenet.c
+++ b/drivers/block/aoe/aoenet.c
@@ -142,7 +142,8 @@ aoenet_rcv(struct sk_buff *skb, struct net_device *ifp, struct packet_type *pt,
 
 	switch (h->cmd) {
 	case AOECMD_ATA:
-		aoecmd_ata_rsp(skb);
+		/* ata_rsp may keep skb for later processing or give it back */
+		skb = aoecmd_ata_rsp(skb);
 		break;
 	case AOECMD_CFG:
 		aoecmd_cfg_rsp(skb);
@@ -152,6 +153,9 @@ aoenet_rcv(struct sk_buff *skb, struct net_device *ifp, struct packet_type *pt,
 			break;	/* don't complain about vendor commands */
 		printk(KERN_INFO "aoe: unknown cmd %d\n", h->cmd);
 	}
+
+	if (!skb)
+		return 0;
 exit:
 	dev_kfree_skb(skb);
 	return 0;
-- 
1.7.2.5


^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [PATCH 03/14] aoe: become I/O request queue handler for increased user control
  2012-08-28 12:53 [PATCH 00/14] aoe driver v49 performance and usability improvements Ed Cashin
  2012-08-25 14:39 ` [PATCH 01/14] aoe: for performance support larger packet payloads Ed Cashin
@ 2012-08-25 14:39 ` Ed Cashin
  2012-08-25 14:39 ` [PATCH 02/14] aoe: kernel thread handles I/O completions for simple locking Ed Cashin
                   ` (11 subsequent siblings)
  13 siblings, 0 replies; 20+ messages in thread
From: Ed Cashin @ 2012-08-25 14:39 UTC (permalink / raw)
  To: Andrew Morton; +Cc: linux-kernel, ecashin

To allow users to choose an elevator algorithm for their particular
workloads, change from a make_request-style driver to an
I/O-request-queue-handler-style driver.

We have to do a couple of things that might be surprising.  We
manipulate the page _count directly on the assumption that we still
have no guarantee that users of the block layer are prohibited from
submitting bios containing pages with zero reference counts.[1] If
such a prohibition now exists, I can get rid of the _count
manipulation.

Just as before this patch, we still keep track of the sk_buffs that
the network layer still hasn't finished yet and cap the resources we
use with a "pool" of skbs.[2]

Now that the block layer maintains the disk stats, the aoe driver's
diskstats function can go away.

1. https://lkml.org/lkml/2007/3/1/374
2. https://lkml.org/lkml/2007/7/6/241

Signed-off-by: Ed Cashin <ecashin@coraid.com>
---
 drivers/block/aoe/aoe.h    |   26 ++--
 drivers/block/aoe/aoeblk.c |   88 ++++----------
 drivers/block/aoe/aoechr.c |    1 +
 drivers/block/aoe/aoecmd.c |  282 +++++++++++++++++++++++++++++++------------
 drivers/block/aoe/aoedev.c |   93 ++++++++++-----
 5 files changed, 308 insertions(+), 182 deletions(-)

diff --git a/drivers/block/aoe/aoe.h b/drivers/block/aoe/aoe.h
index 0cd6c0f..8c4f6d9 100644
--- a/drivers/block/aoe/aoe.h
+++ b/drivers/block/aoe/aoe.h
@@ -90,7 +90,7 @@ enum {
 	MIN_BUFS = 16,
 	NTARGETS = 8,
 	NAOEIFS = 8,
-	NSKBPOOLMAX = 128,
+	NSKBPOOLMAX = 256,
 	NFACTIVE = 17,
 
 	TIMERTICK = HZ / 10,
@@ -100,30 +100,26 @@ enum {
 };
 
 struct buf {
-	struct list_head bufs;
-	ulong stime;	/* for disk stats */
-	ulong flags;
 	ulong nframesout;
 	ulong resid;
 	ulong bv_resid;
-	ulong bv_off;
 	sector_t sector;
 	struct bio *bio;
 	struct bio_vec *bv;
+	struct request *rq;
 };
 
 struct frame {
 	struct list_head head;
 	u32 tag;
 	ulong waited;
-	struct buf *buf;
 	struct aoetgt *t;		/* parent target I belong to */
-	char *bufaddr;
-	ulong bcnt;
 	sector_t lba;
 	struct sk_buff *skb;		/* command skb freed on module exit */
 	struct sk_buff *r_skb;		/* response skb for async processing */
+	struct buf *buf;
 	struct bio_vec *bv;
+	ulong bcnt;
 	ulong bv_off;
 };
 
@@ -161,6 +157,7 @@ struct aoedev {
 	u16 rttavg;		/* round trip average of requests/responses */
 	u16 mintimer;
 	u16 fw_ver;		/* version of blade's firmware */
+	ulong ref;
 	struct work_struct work;/* disk create work struct */
 	struct gendisk *gd;
 	struct request_queue *blkq;
@@ -168,11 +165,13 @@ struct aoedev {
 	sector_t ssize;
 	struct timer_list timer;
 	spinlock_t lock;
-	struct sk_buff_head sendq;
 	struct sk_buff_head skbpool;
 	mempool_t *bufpool;	/* for deadlock-free Buf allocation */
-	struct list_head bufq;	/* queue of bios to work on */
-	struct buf *inprocess;	/* the one we're currently working on */
+	struct {		/* pointers to work in progress */
+		struct buf *buf;
+		struct bio *nxbio;
+		struct request *rq;
+	} ip;
 	struct aoetgt *targets[NTARGETS];
 	struct aoetgt **tgt;	/* target in use when working */
 	struct aoetgt *htgt;	/* target needing rexmit assistance */
@@ -209,6 +208,8 @@ void aoecmd_exit(void);
 int aoecmd_init(void);
 struct sk_buff *aoecmd_ata_id(struct aoedev *);
 void aoe_freetframe(struct frame *);
+void aoe_flush_iocq(void);
+void aoe_end_request(struct aoedev *, struct request *, int);
 
 int aoedev_init(void);
 void aoedev_exit(void);
@@ -216,7 +217,8 @@ struct aoedev *aoedev_by_aoeaddr(int maj, int min);
 struct aoedev *aoedev_by_sysminor_m(ulong sysminor);
 void aoedev_downdev(struct aoedev *d);
 int aoedev_flush(const char __user *str, size_t size);
-void aoe_failbuf(struct aoedev *d, struct buf *buf);
+void aoe_failbuf(struct aoedev *, struct buf *);
+void aoedev_put(struct aoedev *);
 
 int aoenet_init(void);
 void aoenet_exit(void);
diff --git a/drivers/block/aoe/aoeblk.c b/drivers/block/aoe/aoeblk.c
index 3a8f093..7ec4b8f 100644
--- a/drivers/block/aoe/aoeblk.c
+++ b/drivers/block/aoe/aoeblk.c
@@ -161,68 +161,22 @@ aoeblk_release(struct gendisk *disk, fmode_t mode)
 }
 
 static void
-aoeblk_make_request(struct request_queue *q, struct bio *bio)
+aoeblk_request(struct request_queue *q)
 {
-	struct sk_buff_head queue;
 	struct aoedev *d;
-	struct buf *buf;
-	ulong flags;
-
-	blk_queue_bounce(q, &bio);
-
-	if (bio == NULL) {
-		printk(KERN_ERR "aoe: bio is NULL\n");
-		BUG();
-		return;
-	}
-	d = bio->bi_bdev->bd_disk->private_data;
-	if (d == NULL) {
-		printk(KERN_ERR "aoe: bd_disk->private_data is NULL\n");
-		BUG();
-		bio_endio(bio, -ENXIO);
-		return;
-	} else if (bio->bi_io_vec == NULL) {
-		printk(KERN_ERR "aoe: bi_io_vec is NULL\n");
-		BUG();
-		bio_endio(bio, -ENXIO);
-		return;
-	}
-	buf = mempool_alloc(d->bufpool, GFP_NOIO);
-	if (buf == NULL) {
-		printk(KERN_INFO "aoe: buf allocation failure\n");
-		bio_endio(bio, -ENOMEM);
-		return;
-	}
-	memset(buf, 0, sizeof(*buf));
-	INIT_LIST_HEAD(&buf->bufs);
-	buf->stime = jiffies;
-	buf->bio = bio;
-	buf->resid = bio->bi_size;
-	buf->sector = bio->bi_sector;
-	buf->bv = &bio->bi_io_vec[bio->bi_idx];
-	buf->bv_resid = buf->bv->bv_len;
-	WARN_ON(buf->bv_resid == 0);
-	buf->bv_off = buf->bv->bv_offset;
-
-	spin_lock_irqsave(&d->lock, flags);
+	struct request *rq;
 
+	d = q->queuedata;
 	if ((d->flags & DEVFL_UP) == 0) {
 		pr_info_ratelimited("aoe: device %ld.%d is not up\n",
 			d->aoemajor, d->aoeminor);
-		spin_unlock_irqrestore(&d->lock, flags);
-		mempool_free(buf, d->bufpool);
-		bio_endio(bio, -ENXIO);
+		while ((rq = blk_peek_request(q))) {
+			blk_start_request(rq);
+			aoe_end_request(d, rq, 1);
+		}
 		return;
 	}
-
-	list_add_tail(&buf->bufs, &d->bufq);
-
 	aoecmd_work(d);
-	__skb_queue_head_init(&queue);
-	skb_queue_splice_init(&d->sendq, &queue);
-
-	spin_unlock_irqrestore(&d->lock, flags);
-	aoenet_xmit(&queue);
 }
 
 static int
@@ -254,34 +208,46 @@ aoeblk_gdalloc(void *vp)
 {
 	struct aoedev *d = vp;
 	struct gendisk *gd;
-	enum { KB = 1024, MB = KB * KB, READ_AHEAD = MB, };
+	mempool_t *mp;
+	struct request_queue *q;
+	enum { KB = 1024, MB = KB * KB, READ_AHEAD = 2 * MB, };
 	ulong flags;
 
 	gd = alloc_disk(AOE_PARTITIONS);
 	if (gd == NULL) {
-		printk(KERN_ERR
-			"aoe: cannot allocate disk structure for %ld.%d\n",
+		pr_err("aoe: cannot allocate disk structure for %ld.%d\n",
 			d->aoemajor, d->aoeminor);
 		goto err;
 	}
 
-	d->bufpool = mempool_create_slab_pool(MIN_BUFS, buf_pool_cache);
-	if (d->bufpool == NULL) {
+	mp = mempool_create(MIN_BUFS, mempool_alloc_slab, mempool_free_slab,
+		buf_pool_cache);
+	if (mp == NULL) {
 		printk(KERN_ERR "aoe: cannot allocate bufpool for %ld.%d\n",
 			d->aoemajor, d->aoeminor);
 		goto err_disk;
 	}
+	q = blk_init_queue(aoeblk_request, &d->lock);
+	if (q == NULL) {
+		pr_err("aoe: cannot allocate block queue for %ld.%d\n",
+			d->aoemajor, d->aoeminor);
+		mempool_destroy(mp);
+		goto err_disk;
+	}
 
 	d->blkq = blk_alloc_queue(GFP_KERNEL);
 	if (!d->blkq)
 		goto err_mempool;
-	blk_queue_make_request(d->blkq, aoeblk_make_request);
 	d->blkq->backing_dev_info.name = "aoe";
 	if (bdi_init(&d->blkq->backing_dev_info))
 		goto err_blkq;
 	spin_lock_irqsave(&d->lock, flags);
 	blk_queue_max_hw_sectors(d->blkq, BLK_DEF_MAX_SECTORS);
-	d->blkq->backing_dev_info.ra_pages = READ_AHEAD / PAGE_CACHE_SIZE;
+	q->backing_dev_info.ra_pages = READ_AHEAD / PAGE_CACHE_SIZE;
+	d->bufpool = mp;
+	d->blkq = gd->queue = q;
+	q->queuedata = d;
+	d->gd = gd;
 	gd->major = AOE_MAJOR;
 	gd->first_minor = d->sysminor * AOE_PARTITIONS;
 	gd->fops = &aoe_bdops;
@@ -290,8 +256,6 @@ aoeblk_gdalloc(void *vp)
 	snprintf(gd->disk_name, sizeof gd->disk_name, "etherd/e%ld.%d",
 		d->aoemajor, d->aoeminor);
 
-	gd->queue = d->blkq;
-	d->gd = gd;
 	d->flags &= ~DEVFL_GDALLOC;
 	d->flags |= DEVFL_UP;
 
diff --git a/drivers/block/aoe/aoechr.c b/drivers/block/aoe/aoechr.c
index f145388..3557f0d 100644
--- a/drivers/block/aoe/aoechr.c
+++ b/drivers/block/aoe/aoechr.c
@@ -106,6 +106,7 @@ loop:
 		spin_lock_irqsave(&d->lock, flags);
 		goto loop;
 	}
+	aoedev_put(d);
 	if (skb) {
 		struct sk_buff_head queue;
 		__skb_queue_head_init(&queue);
diff --git a/drivers/block/aoe/aoecmd.c b/drivers/block/aoe/aoecmd.c
index da66a6a..dd7e397 100644
--- a/drivers/block/aoe/aoecmd.c
+++ b/drivers/block/aoe/aoecmd.c
@@ -23,6 +23,8 @@
 
 static void ktcomplete(struct frame *, struct sk_buff *);
 
+static struct buf *nextbuf(struct aoedev *);
+
 static int aoe_deadsecs = 60 * 3;
 module_param(aoe_deadsecs, int, 0644);
 MODULE_PARM_DESC(aoe_deadsecs, "After aoe_deadsecs seconds, give up and fail dev.");
@@ -282,17 +284,20 @@ aoecmd_ata_rw(struct aoedev *d)
 	struct bio_vec *bv;
 	struct aoetgt *t;
 	struct sk_buff *skb;
+	struct sk_buff_head queue;
 	ulong bcnt, fbcnt;
 	char writebit, extbit;
 
 	writebit = 0x10;
 	extbit = 0x4;
 
+	buf = nextbuf(d);
+	if (buf == NULL)
+		return 0;
 	f = newframe(d);
 	if (f == NULL)
 		return 0;
 	t = *d->tgt;
-	buf = d->inprocess;
 	bv = buf->bv;
 	bcnt = t->ifp->maxbcnt;
 	if (bcnt == 0)
@@ -311,7 +316,7 @@ aoecmd_ata_rw(struct aoedev *d)
 		fbcnt -= buf->bv_resid;
 		buf->resid -= buf->bv_resid;
 		if (buf->resid == 0) {
-			d->inprocess = NULL;
+			d->ip.buf = NULL;
 			break;
 		}
 		buf->bv++;
@@ -363,8 +368,11 @@ aoecmd_ata_rw(struct aoedev *d)
 
 	skb->dev = t->ifp->nd;
 	skb = skb_clone(skb, GFP_ATOMIC);
-	if (skb)
-		__skb_queue_tail(&d->sendq, skb);
+	if (skb) {
+		__skb_queue_head_init(&queue);
+		__skb_queue_tail(&queue, skb);
+		aoenet_xmit(&queue);
+	}
 	return 1;
 }
 
@@ -414,6 +422,7 @@ static void
 resend(struct aoedev *d, struct frame *f)
 {
 	struct sk_buff *skb;
+	struct sk_buff_head queue;
 	struct aoe_hdr *h;
 	struct aoe_atahdr *ah;
 	struct aoetgt *t;
@@ -443,7 +452,9 @@ resend(struct aoedev *d, struct frame *f)
 	skb = skb_clone(skb, GFP_ATOMIC);
 	if (skb == NULL)
 		return;
-	__skb_queue_tail(&d->sendq, skb);
+	__skb_queue_head_init(&queue);
+	__skb_queue_tail(&queue, skb);
+	aoenet_xmit(&queue);
 }
 
 static int
@@ -553,7 +564,6 @@ ata_scnt(unsigned char *packet) {
 static void
 rexmit_timer(ulong vp)
 {
-	struct sk_buff_head queue;
 	struct aoedev *d;
 	struct aoetgt *t, **tt, **te;
 	struct aoeif *ifp;
@@ -602,6 +612,12 @@ rexmit_timer(ulong vp)
 		}
 	}
 
+	if (!list_empty(&flist)) {	/* retransmissions necessary */
+		n = d->rttavg <<= 1;
+		if (n > MAXTIMER)
+			d->rttavg = MAXTIMER;
+	}
+
 	/* process expired frames */
 	while (!list_empty(&flist)) {
 		pos = flist.next;
@@ -640,45 +656,131 @@ rexmit_timer(ulong vp)
 		resend(d, f);
 	}
 
-	if (!skb_queue_empty(&d->sendq)) {
-		n = d->rttavg <<= 1;
-		if (n > MAXTIMER)
-			d->rttavg = MAXTIMER;
-	}
-
-	if (d->flags & DEVFL_KICKME || d->htgt) {
+	if ((d->flags & DEVFL_KICKME || d->htgt) && d->blkq) {
 		d->flags &= ~DEVFL_KICKME;
-		aoecmd_work(d);
+		d->blkq->request_fn(d->blkq);
 	}
 
-	__skb_queue_head_init(&queue);
-	skb_queue_splice_init(&d->sendq, &queue);
-
 	d->timer.expires = jiffies + TIMERTICK;
 	add_timer(&d->timer);
 
 	spin_unlock_irqrestore(&d->lock, flags);
+}
 
-	aoenet_xmit(&queue);
+static unsigned long
+rqbiocnt(struct request *r)
+{
+	struct bio *bio;
+	unsigned long n = 0;
+
+	__rq_for_each_bio(bio, r)
+		n++;
+	return n;
+}
+
+/* This can be removed if we are certain that no users of the block
+ * layer will ever use zero-count pages in bios.  Otherwise we have to
+ * protect against the put_page sometimes done by the network layer.
+ *
+ * See http://oss.sgi.com/archives/xfs/2007-01/msg00594.html for
+ * discussion.
+ *
+ * We cannot use get_page in the workaround, because it insists on a
+ * positive page count as a precondition.  So we use _count directly.
+ */
+static void
+bio_pageinc(struct bio *bio)
+{
+	struct bio_vec *bv;
+	struct page *page;
+	int i;
+
+	bio_for_each_segment(bv, bio, i) {
+		page = bv->bv_page;
+		/* Non-zero page count for non-head members of
+		 * compound pages is no longer allowed by the kernel,
+		 * but this has never been seen here.
+		 */
+		if (unlikely(PageCompound(page)))
+			if (compound_trans_head(page) != page) {
+				pr_crit("page tail used for block I/O\n");
+				BUG();
+			}
+		atomic_inc(&page->_count);
+	}
+}
+
+static void
+bio_pagedec(struct bio *bio)
+{
+	struct bio_vec *bv;
+	int i;
+
+	bio_for_each_segment(bv, bio, i)
+		atomic_dec(&bv->bv_page->_count);
+}
+
+static void
+bufinit(struct buf *buf, struct request *rq, struct bio *bio)
+{
+	struct bio_vec *bv;
+
+	memset(buf, 0, sizeof(*buf));
+	buf->rq = rq;
+	buf->bio = bio;
+	buf->resid = bio->bi_size;
+	buf->sector = bio->bi_sector;
+	bio_pageinc(bio);
+	buf->bv = bv = &bio->bi_io_vec[bio->bi_idx];
+	buf->bv_resid = bv->bv_len;
+	WARN_ON(buf->bv_resid == 0);
+}
+
+static struct buf *
+nextbuf(struct aoedev *d)
+{
+	struct request *rq;
+	struct request_queue *q;
+	struct buf *buf;
+	struct bio *bio;
+
+	q = d->blkq;
+	if (q == NULL)
+		return NULL;	/* initializing */
+	if (d->ip.buf)
+		return d->ip.buf;
+	rq = d->ip.rq;
+	if (rq == NULL) {
+		rq = blk_peek_request(q);
+		if (rq == NULL)
+			return NULL;
+		blk_start_request(rq);
+		d->ip.rq = rq;
+		d->ip.nxbio = rq->bio;
+		rq->special = (void *) rqbiocnt(rq);
+	}
+	buf = mempool_alloc(d->bufpool, GFP_ATOMIC);
+	if (buf == NULL) {
+		pr_err("aoe: nextbuf: unable to mempool_alloc!\n");
+		return NULL;
+	}
+	bio = d->ip.nxbio;
+	bufinit(buf, rq, bio);
+	bio = bio->bi_next;
+	d->ip.nxbio = bio;
+	if (bio == NULL)
+		d->ip.rq = NULL;
+	return d->ip.buf = buf;
 }
 
 /* enters with d->lock held */
 void
 aoecmd_work(struct aoedev *d)
 {
-	struct buf *buf;
-loop:
 	if (d->htgt && !sthtith(d))
 		return;
-	if (d->inprocess == NULL) {
-		if (list_empty(&d->bufq))
-			return;
-		buf = container_of(d->bufq.next, struct buf, bufs);
-		list_del(d->bufq.next);
-		d->inprocess = buf;
-	}
-	if (aoecmd_ata_rw(d))
-		goto loop;
+	while (aoecmd_ata_rw(d))
+		;
 }
 
 /* this function performs work that has been deferred until sleeping is OK
@@ -801,25 +903,6 @@ gettgt(struct aoedev *d, char *addr)
 	return NULL;
 }
 
-static inline void
-diskstats(struct gendisk *disk, struct bio *bio, ulong duration, sector_t sector)
-{
-	unsigned long n_sect = bio->bi_size >> 9;
-	const int rw = bio_data_dir(bio);
-	struct hd_struct *part;
-	int cpu;
-
-	cpu = part_stat_lock();
-	part = disk_map_sector_rcu(disk, sector);
-
-	part_stat_inc(cpu, part, ios[rw]);
-	part_stat_add(cpu, part, ticks[rw], duration);
-	part_stat_add(cpu, part, sectors[rw], n_sect);
-	part_stat_add(cpu, part, io_ticks, duration);
-
-	part_stat_unlock();
-}
-
 static void
 bvcpy(struct bio_vec *bv, ulong off, struct sk_buff *skb, long cnt)
 {
@@ -841,6 +924,43 @@ loop:
 	goto loop;
 }
 
+void
+aoe_end_request(struct aoedev *d, struct request *rq, int fastfail)
+{
+	struct bio *bio;
+	int bok;
+	struct request_queue *q;
+
+	q = d->blkq;
+	if (rq == d->ip.rq)
+		d->ip.rq = NULL;
+	do {
+		bio = rq->bio;
+		bok = !fastfail && test_bit(BIO_UPTODATE, &bio->bi_flags);
+	} while (__blk_end_request(rq, bok ? 0 : -EIO, bio->bi_size));
+
+	/* cf. http://lkml.org/lkml/2006/10/31/28 */
+	if (!fastfail)
+		q->request_fn(q);
+}
+
+static void
+aoe_end_buf(struct aoedev *d, struct buf *buf)
+{
+	struct request *rq;
+	unsigned long n;
+
+	if (buf == d->ip.buf)
+		d->ip.buf = NULL;
+	rq = buf->rq;
+	bio_pagedec(buf->bio);
+	mempool_free(buf, d->bufpool);
+	n = (unsigned long) rq->special;
+	rq->special = (void *) --n;
+	if (n == 0)
+		aoe_end_request(d, rq, 0);
+}
+
 static void
 ktiocomplete(struct frame *f)
 {
@@ -875,7 +995,7 @@ ktiocomplete(struct frame *f)
 			ahout->cmdstat, ahin->cmdstat,
 			d->aoemajor, d->aoeminor);
 noskb:	if (buf)
-			buf->flags |= BUFFL_FAIL;
+			clear_bit(BIO_UPTODATE, &buf->bio->bi_flags);
 		goto badrsp;
 	}
 
@@ -886,7 +1006,7 @@ noskb:	if (buf)
 		if (skb->len < n) {
 			pr_err("aoe: runt data size in read.  skb->len=%d need=%ld\n",
 				skb->len, n);
-			buf->flags |= BUFFL_FAIL;
+			clear_bit(BIO_UPTODATE, &buf->bio->bi_flags);
 			break;
 		}
 		bvcpy(f->bv, f->bv_off, skb, n);
@@ -926,18 +1046,13 @@ badrsp:
 
 	aoe_freetframe(f);
 
-	if (buf && --buf->nframesout == 0 && buf->resid == 0) {
-		struct bio *bio = buf->bio;
+	if (buf && --buf->nframesout == 0 && buf->resid == 0)
+		aoe_end_buf(d, buf);
 
-		diskstats(d->gd, bio, jiffies - buf->stime, buf->sector);
-		n = (buf->flags & BUFFL_FAIL) ? -EIO : 0;
-		mempool_free(buf, d->bufpool);
-		spin_unlock_irq(&d->lock);
-		if (n != -EIO)
-			bio_flush_dcache_pages(buf->bio);
-		bio_endio(bio, n);
-	} else
-		spin_unlock_irq(&d->lock);
+	aoecmd_work(d);
+
+	spin_unlock_irq(&d->lock);
+	aoedev_put(d);
 	dev_kfree_skb(skb);
 }
 
@@ -1060,12 +1175,14 @@ aoecmd_ata_rsp(struct sk_buff *skb)
 		printk(KERN_INFO "aoe: can't find target e%ld.%d:%pm\n",
 		       d->aoemajor, d->aoeminor, h->src);
 		spin_unlock_irqrestore(&d->lock, flags);
+		aoedev_put(d);
 		return skb;
 	}
 	f = getframe(t, n);
 	if (f == NULL) {
 		calc_rttavg(d, -tsince(n));
 		spin_unlock_irqrestore(&d->lock, flags);
+		aoedev_put(d);
 		snprintf(ebuf, sizeof ebuf,
 			"%15s e%d.%d    tag=%08x@%08lx\n",
 			"unexpected rsp",
@@ -1185,8 +1302,10 @@ aoecmd_cfg_rsp(struct sk_buff *skb)
 	struct aoeif *ifp;
 	ulong flags, sysminor, aoemajor;
 	struct sk_buff *sl;
+	struct sk_buff_head queue;
 	u16 n;
 
+	sl = NULL;
 	h = (struct aoe_hdr *) skb_mac_header(skb);
 	ch = (struct aoe_cfghdr *) (h+1);
 
@@ -1223,10 +1342,8 @@ aoecmd_cfg_rsp(struct sk_buff *skb)
 	t = gettgt(d, h->src);
 	if (!t) {
 		t = addtgt(d, h->src, n);
-		if (!t) {
-			spin_unlock_irqrestore(&d->lock, flags);
-			return;
-		}
+		if (!t)
+			goto bail;
 	}
 	ifp = getif(t, skb->dev);
 	if (!ifp) {
@@ -1235,8 +1352,7 @@ aoecmd_cfg_rsp(struct sk_buff *skb)
 			printk(KERN_INFO
 				"aoe: device addif failure; "
 				"too many interfaces?\n");
-			spin_unlock_irqrestore(&d->lock, flags);
-			return;
+			goto bail;
 		}
 	}
 	if (ifp->maxbcnt) {
@@ -1257,18 +1373,14 @@ aoecmd_cfg_rsp(struct sk_buff *skb)
 	}
 
 	/* don't change users' perspective */
-	if (d->nopen) {
-		spin_unlock_irqrestore(&d->lock, flags);
-		return;
+	if (d->nopen == 0) {
+		d->fw_ver = be16_to_cpu(ch->fwver);
+		sl = aoecmd_ata_id(d);
 	}
-	d->fw_ver = be16_to_cpu(ch->fwver);
-
-	sl = aoecmd_ata_id(d);
-
+bail:
 	spin_unlock_irqrestore(&d->lock, flags);
-
+	aoedev_put(d);
 	if (sl) {
-		struct sk_buff_head queue;
 		__skb_queue_head_init(&queue);
 		__skb_queue_tail(&queue, sl);
 		aoenet_xmit(&queue);
@@ -1297,8 +1409,19 @@ aoecmd_cleanslate(struct aoedev *d)
 	}
 }
 
-static void
-flush_iocq(void)
+void
+aoe_failbuf(struct aoedev *d, struct buf *buf)
+{
+	if (buf == NULL)
+		return;
+	buf->resid = 0;
+	clear_bit(BIO_UPTODATE, &buf->bio->bi_flags);
+	if (buf->nframesout == 0)
+		aoe_end_buf(d, buf);
+}
+
+void
+aoe_flush_iocq(void)
 {
 	struct frame *f;
 	struct aoedev *d;
@@ -1324,6 +1447,7 @@ flush_iocq(void)
 		aoe_freetframe(f);
 		spin_unlock_irqrestore(&d->lock, flags);
 		dev_kfree_skb(skb);
+		aoedev_put(d);
 	}
 }
 
@@ -1344,5 +1468,5 @@ void
 aoecmd_exit(void)
 {
 	aoe_ktstop(&kts);
-	flush_iocq();
+	aoe_flush_iocq();
 }
diff --git a/drivers/block/aoe/aoedev.c b/drivers/block/aoe/aoedev.c
index 40bae1a..635dc98 100644
--- a/drivers/block/aoe/aoedev.c
+++ b/drivers/block/aoe/aoedev.c
@@ -19,6 +19,17 @@ static void skbpoolfree(struct aoedev *d);
 static struct aoedev *devlist;
 static DEFINE_SPINLOCK(devlist_lock);
 
+/*
+ * Users who grab a pointer to the device with aoedev_by_aoeaddr or
+ * aoedev_by_sysminor_m automatically get a reference count and must
+ * be responsible for performing a aoedev_put.  With the addition of
+ * async kthread processing I'm no longer confident that we can
+ * guarantee consistency in the face of device flushes.
+ *
+ * For the time being, we only bother to add extra references for
+ * frames sitting on the iocq.  When the kthreads finish processing
+ * these frames, they will aoedev_put the device.
+ */
 struct aoedev *
 aoedev_by_aoeaddr(int maj, int min)
 {
@@ -28,13 +39,25 @@ aoedev_by_aoeaddr(int maj, int min)
 	spin_lock_irqsave(&devlist_lock, flags);
 
 	for (d=devlist; d; d=d->next)
-		if (d->aoemajor == maj && d->aoeminor == min)
+		if (d->aoemajor == maj && d->aoeminor == min) {
+			d->ref++;
 			break;
+		}
 
 	spin_unlock_irqrestore(&devlist_lock, flags);
 	return d;
 }
 
+void
+aoedev_put(struct aoedev *d)
+{
+	ulong flags;
+
+	spin_lock_irqsave(&devlist_lock, flags);
+	d->ref--;
+	spin_unlock_irqrestore(&devlist_lock, flags);
+}
+
 static void
 dummy_timer(ulong vp)
 {
@@ -47,21 +70,26 @@ dummy_timer(ulong vp)
 	add_timer(&d->timer);
 }
 
-void
-aoe_failbuf(struct aoedev *d, struct buf *buf)
+static void
+aoe_failip(struct aoedev *d)
 {
+	struct request *rq;
 	struct bio *bio;
+	unsigned long n;
+
+	aoe_failbuf(d, d->ip.buf);
 
-	if (buf == NULL)
+	rq = d->ip.rq;
+	if (rq == NULL)
 		return;
-	buf->flags |= BUFFL_FAIL;
-	if (buf->nframesout == 0) {
-		if (buf == d->inprocess) /* ensure we only process this once */
-			d->inprocess = NULL;
-		bio = buf->bio;
-		mempool_free(buf, d->bufpool);
-		bio_endio(bio, -EIO);
+	while ((bio = d->ip.nxbio)) {
+		clear_bit(BIO_UPTODATE, &bio->bi_flags);
+		d->ip.nxbio = bio->bi_next;
+		n = (unsigned long) rq->special;
+		rq->special = (void *) --n;
 	}
+	if ((unsigned long) rq->special == 0)
+		aoe_end_request(d, rq, 0);
 }
 
 void
@@ -70,8 +98,11 @@ aoedev_downdev(struct aoedev *d)
 	struct aoetgt *t, **tt, **te;
 	struct frame *f;
 	struct list_head *head, *pos, *nx;
+	struct request *rq;
 	int i;
 
+	d->flags &= ~DEVFL_UP;
+
 	/* clean out active buffers on all targets */
 	tt = d->targets;
 	te = tt + NTARGETS;
@@ -92,22 +123,20 @@ aoedev_downdev(struct aoedev *d)
 		t->nout = 0;
 	}
 
-	/* clean out the in-process buffer (if any) */
-	aoe_failbuf(d, d->inprocess);
-	d->inprocess = NULL;
+	/* clean out the in-process request (if any) */
+	aoe_failip(d);
 	d->htgt = NULL;
 
-	/* clean out all pending I/O */
-	while (!list_empty(&d->bufq)) {
-		struct buf *buf = container_of(d->bufq.next, struct buf, bufs);
-		list_del(d->bufq.next);
-		aoe_failbuf(d, buf);
+	/* fast fail all pending I/O */
+	if (d->blkq) {
+		while ((rq = blk_peek_request(d->blkq))) {
+			blk_start_request(rq);
+			aoe_end_request(d, rq, 1);
+		}
 	}
 
 	if (d->gd)
 		set_capacity(d->gd, 0);
-
-	d->flags &= ~DEVFL_UP;
 }
 
 static void
@@ -120,6 +149,7 @@ aoedev_freedev(struct aoedev *d)
 		aoedisk_rm_sysfs(d);
 		del_gendisk(d->gd);
 		put_disk(d->gd);
+		blk_cleanup_queue(d->blkq);
 	}
 	t = d->targets;
 	e = t + NTARGETS;
@@ -128,7 +158,6 @@ aoedev_freedev(struct aoedev *d)
 	if (d->bufpool)
 		mempool_destroy(d->bufpool);
 	skbpoolfree(d);
-	blk_cleanup_queue(d->blkq);
 	kfree(d);
 }
 
@@ -155,7 +184,8 @@ aoedev_flush(const char __user *str, size_t cnt)
 		spin_lock(&d->lock);
 		if ((!all && (d->flags & DEVFL_UP))
 		|| (d->flags & (DEVFL_GDALLOC|DEVFL_NEWSIZE))
-		|| d->nopen) {
+		|| d->nopen
+		|| d->ref) {
 			spin_unlock(&d->lock);
 			dd = &d->next;
 			continue;
@@ -176,12 +206,15 @@ aoedev_flush(const char __user *str, size_t cnt)
 	return 0;
 }
 
-/* I'm not really sure that this is a realistic problem, but if the
-network driver goes gonzo let's just leak memory after complaining. */
+/* This has been confirmed to occur once with Tms=3*1000 due to the
+ * driver changing link and not processing its transmit ring.  The
+ * problem is hard enough to solve by returning an error that I'm
+ * still punting on "solving" this.
+ */
 static void
 skbfree(struct sk_buff *skb)
 {
-	enum { Sms = 100, Tms = 3*1000};
+	enum { Sms = 250, Tms = 30 * 1000};
 	int i = Tms / Sms;
 
 	if (skb == NULL)
@@ -222,8 +255,10 @@ aoedev_by_sysminor_m(ulong sysminor)
 	spin_lock_irqsave(&devlist_lock, flags);
 
 	for (d=devlist; d; d=d->next)
-		if (d->sysminor == sysminor)
+		if (d->sysminor == sysminor) {
+			d->ref++;
 			break;
+		}
 	if (d)
 		goto out;
 	d = kcalloc(1, sizeof *d, GFP_ATOMIC);
@@ -231,7 +266,6 @@ aoedev_by_sysminor_m(ulong sysminor)
 		goto out;
 	INIT_WORK(&d->work, aoecmd_sleepwork);
 	spin_lock_init(&d->lock);
-	skb_queue_head_init(&d->sendq);
 	skb_queue_head_init(&d->skbpool);
 	init_timer(&d->timer);
 	d->timer.data = (ulong) d;
@@ -240,7 +274,7 @@ aoedev_by_sysminor_m(ulong sysminor)
 	add_timer(&d->timer);
 	d->bufpool = NULL;	/* defer to aoeblk_gdalloc */
 	d->tgt = d->targets;
-	INIT_LIST_HEAD(&d->bufq);
+	d->ref = 1;
 	d->sysminor = sysminor;
 	d->aoemajor = AOEMAJOR(sysminor);
 	d->aoeminor = AOEMINOR(sysminor);
@@ -274,6 +308,7 @@ aoedev_exit(void)
 	struct aoedev *d;
 	ulong flags;
 
+	aoe_flush_iocq();
 	while ((d = devlist)) {
 		devlist = d->next;
 
-- 
1.7.2.5


^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [PATCH 04/14] aoe: use a kernel thread for transmissions
  2012-08-28 12:53 [PATCH 00/14] aoe driver v49 performance and usability improvements Ed Cashin
                   ` (2 preceding siblings ...)
  2012-08-25 14:39 ` [PATCH 02/14] aoe: kernel thread handles I/O completions for simple locking Ed Cashin
@ 2012-08-25 14:39 ` Ed Cashin
  2012-08-25 14:39 ` [PATCH 05/14] aoe: use packets that work with the smallest-MTU local interface Ed Cashin
                   ` (9 subsequent siblings)
  13 siblings, 0 replies; 20+ messages in thread
From: Ed Cashin @ 2012-08-25 14:39 UTC (permalink / raw)
  To: Andrew Morton; +Cc: linux-kernel, ecashin

The dev_queue_xmit function needs to have interrupts enabled, so the
most simple way to get the locking right but still fulfill that
requirement is to use a process that can call dev_queue_xmit serially
over queued transmissions.

Signed-off-by: Ed Cashin <ecashin@coraid.com>
---
 drivers/block/aoe/aoe.h    |    2 ++
 drivers/block/aoe/aoecmd.c |    4 ++--
 drivers/block/aoe/aoenet.c |   37 ++++++++++++++++++++++++++++++++++++-
 3 files changed, 40 insertions(+), 3 deletions(-)

diff --git a/drivers/block/aoe/aoe.h b/drivers/block/aoe/aoe.h
index 8c4f6d9..d0087de1 100644
--- a/drivers/block/aoe/aoe.h
+++ b/drivers/block/aoe/aoe.h
@@ -210,6 +210,8 @@ struct sk_buff *aoecmd_ata_id(struct aoedev *);
 void aoe_freetframe(struct frame *);
 void aoe_flush_iocq(void);
 void aoe_end_request(struct aoedev *, struct request *, int);
+int aoe_ktstart(struct ktstate *k);
+void aoe_ktstop(struct ktstate *k);
 
 int aoedev_init(void);
 void aoedev_exit(void);
diff --git a/drivers/block/aoe/aoecmd.c b/drivers/block/aoe/aoecmd.c
index dd7e397..5cc4d3a 100644
--- a/drivers/block/aoe/aoecmd.c
+++ b/drivers/block/aoe/aoecmd.c
@@ -1109,14 +1109,14 @@ kthread(void *vp)
 	return 0;
 }
 
-static void
+void
 aoe_ktstop(struct ktstate *k)
 {
 	kthread_stop(k->task);
 	wait_for_completion(&k->rendez);
 }
 
-static int
+int
 aoe_ktstart(struct ktstate *k)
 {
 	struct task_struct *task;
diff --git a/drivers/block/aoe/aoenet.c b/drivers/block/aoe/aoenet.c
index 000eff2..5f43710 100644
--- a/drivers/block/aoe/aoenet.c
+++ b/drivers/block/aoe/aoenet.c
@@ -33,6 +33,9 @@ static char aoe_iflist[IFLISTSZ];
 module_param_string(aoe_iflist, aoe_iflist, IFLISTSZ, 0600);
 MODULE_PARM_DESC(aoe_iflist, "aoe_iflist=\"dev1 [dev2 ...]\"");
 
+static wait_queue_head_t txwq;
+static struct ktstate kts;
+
 #ifndef MODULE
 static int __init aoe_iflist_setup(char *str)
 {
@@ -44,6 +47,23 @@ static int __init aoe_iflist_setup(char *str)
 __setup("aoe_iflist=", aoe_iflist_setup);
 #endif
 
+static spinlock_t txlock;
+static struct sk_buff_head skbtxq;
+
+/* enters with txlock held */
+static int
+tx(void)
+{
+	struct sk_buff *skb;
+
+	while ((skb = skb_dequeue(&skbtxq))) {
+		spin_unlock_irq(&txlock);
+		dev_queue_xmit(skb);
+		spin_lock_irq(&txlock);
+	}
+	return 0;
+}
+
 int
 is_aoe_netif(struct net_device *ifp)
 {
@@ -88,10 +108,14 @@ void
 aoenet_xmit(struct sk_buff_head *queue)
 {
 	struct sk_buff *skb, *tmp;
+	ulong flags;
 
 	skb_queue_walk_safe(queue, skb, tmp) {
 		__skb_unlink(skb, queue);
-		dev_queue_xmit(skb);
+		spin_lock_irqsave(&txlock, flags);
+		skb_queue_tail(&skbtxq, skb);
+		spin_unlock_irqrestore(&txlock, flags);
+		wake_up(&txwq);
 	}
 }
 
@@ -169,6 +193,15 @@ static struct packet_type aoe_pt __read_mostly = {
 int __init
 aoenet_init(void)
 {
+	skb_queue_head_init(&skbtxq);
+	init_waitqueue_head(&txwq);
+	spin_lock_init(&txlock);
+	kts.lock = &txlock;
+	kts.fn = tx;
+	kts.waitq = &txwq;
+	kts.name = "aoe_tx";
+	if (aoe_ktstart(&kts))
+		return -EAGAIN;
 	dev_add_pack(&aoe_pt);
 	return 0;
 }
@@ -176,6 +209,8 @@ aoenet_init(void)
 void
 aoenet_exit(void)
 {
+	aoe_ktstop(&kts);
+	skb_queue_purge(&skbtxq);
 	dev_remove_pack(&aoe_pt);
 }
 
-- 
1.7.2.5


^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [PATCH 05/14] aoe: use packets that work with the smallest-MTU local interface
  2012-08-28 12:53 [PATCH 00/14] aoe driver v49 performance and usability improvements Ed Cashin
                   ` (3 preceding siblings ...)
  2012-08-25 14:39 ` [PATCH 04/14] aoe: use a kernel thread for transmissions Ed Cashin
@ 2012-08-25 14:39 ` Ed Cashin
  2012-08-25 14:39 ` [PATCH 07/14] aoe: do revalidation steps in order Ed Cashin
                   ` (8 subsequent siblings)
  13 siblings, 0 replies; 20+ messages in thread
From: Ed Cashin @ 2012-08-25 14:39 UTC (permalink / raw)
  To: Andrew Morton; +Cc: linux-kernel, ecashin

Users with several network interfaces dedicated to AoE generally do
not configure them to support different-sized AoE data payloads on
purpose.

For a given AoE target, there will be a set of local network
interfaces that can reach it.  Using only the payload that will fit in
the smallest-sized MTU of all those local interfaces greatly
simplifies the driver, especially in failure scenarios.

Signed-off-by: Ed Cashin <ecashin@coraid.com>
---
 drivers/block/aoe/aoe.h    |    7 +-
 drivers/block/aoe/aoecmd.c |  151 ++++++++++++++++++++++++--------------------
 2 files changed, 87 insertions(+), 71 deletions(-)

diff --git a/drivers/block/aoe/aoe.h b/drivers/block/aoe/aoe.h
index d0087de1..ffded64 100644
--- a/drivers/block/aoe/aoe.h
+++ b/drivers/block/aoe/aoe.h
@@ -125,9 +125,8 @@ struct frame {
 
 struct aoeif {
 	struct net_device *nd;
-	unsigned char lost;
-	unsigned char lostjumbo;
-	ushort maxbcnt;
+	ulong lost;
+	int bcnt;
 };
 
 struct aoetgt {
@@ -144,6 +143,7 @@ struct aoetgt {
 	u16 useme;
 	ulong falloc;
 	ulong lastwadj;		/* last window adjustment */
+	int minbcnt;
 	int wpkts, rpkts;
 };
 
@@ -172,6 +172,7 @@ struct aoedev {
 		struct bio *nxbio;
 		struct request *rq;
 	} ip;
+	ulong maxbcnt;
 	struct aoetgt *targets[NTARGETS];
 	struct aoetgt **tgt;	/* target in use when working */
 	struct aoetgt *htgt;	/* target needing rexmit assistance */
diff --git a/drivers/block/aoe/aoecmd.c b/drivers/block/aoe/aoecmd.c
index 5cc4d3a..cf196ed 100644
--- a/drivers/block/aoe/aoecmd.c
+++ b/drivers/block/aoe/aoecmd.c
@@ -118,16 +118,18 @@ put_lba(struct aoe_atahdr *ah, sector_t lba)
 	ah->lba5 = lba >>= 8;
 }
 
-static void
+static struct aoeif *
 ifrotate(struct aoetgt *t)
 {
-	t->ifp++;
-	if (t->ifp >= &t->ifs[NAOEIFS] || t->ifp->nd == NULL)
-		t->ifp = t->ifs;
-	if (t->ifp->nd == NULL) {
-		printk(KERN_INFO "aoe: no interface to rotate to\n");
-		BUG();
-	}
+	struct aoeif *ifp;
+
+	ifp = t->ifp;
+	ifp++;
+	if (ifp >= &t->ifs[NAOEIFS] || ifp->nd == NULL)
+		ifp = t->ifs;
+	if (ifp->nd == NULL)
+		return NULL;
+	return t->ifp = ifp;
 }
 
 static void
@@ -231,8 +233,8 @@ newframe(struct aoedev *d)
 		&& t->ifp->nd) {
 			f = newtframe(d, t);
 			if (f) {
-				d->tgt = tt;
 				ifrotate(t);
+				d->tgt = tt;
 				return f;
 			}
 		}
@@ -299,7 +301,7 @@ aoecmd_ata_rw(struct aoedev *d)
 		return 0;
 	t = *d->tgt;
 	bv = buf->bv;
-	bcnt = t->ifp->maxbcnt;
+	bcnt = d->maxbcnt;
 	if (bcnt == 0)
 		bcnt = DEFAULTBCNT;
 	if (bcnt > buf->resid)
@@ -430,9 +432,14 @@ resend(struct aoedev *d, struct frame *f)
 	u32 n;
 
 	t = f->t;
-	ifrotate(t);
 	n = newtag(t);
 	skb = f->skb;
+	if (ifrotate(t) == NULL) {
+		/* probably can't happen, but set it up to fail anyway */
+		pr_info("aoe: resend: no interfaces to rotate to.\n");
+		ktcomplete(f, NULL);
+		return;
+	}
 	h = (struct aoe_hdr *) skb_mac_header(skb);
 	ah = (struct aoe_atahdr *) (h+1);
 
@@ -482,21 +489,6 @@ getif(struct aoetgt *t, struct net_device *nd)
 	return NULL;
 }
 
-static struct aoeif *
-addif(struct aoetgt *t, struct net_device *nd)
-{
-	struct aoeif *p;
-
-	p = getif(t, NULL);
-	if (!p)
-		return NULL;
-	p->nd = nd;
-	p->maxbcnt = DEFAULTBCNT;
-	p->lost = 0;
-	p->lostjumbo = 0;
-	return p;
-}
-
 static void
 ejectif(struct aoetgt *t, struct aoeif *ifp)
 {
@@ -545,7 +537,11 @@ sthtith(struct aoedev *d)
 			resend(d, nf);
 		}
 	}
-	/* he's clean, he's useless.  take away his interfaces */
+	/* We've cleaned up the outstanding so take away his
+	 * interfaces so he won't be used.  We should remove him from
+	 * the target array here, but cleaning up a target is
+	 * involved.  PUNT!
+	 */
 	memset(ht->ifs, 0, sizeof ht->ifs);
 	d->htgt = NULL;
 	return 1;
@@ -1014,11 +1010,8 @@ noskb:	if (buf)
 	case ATA_CMD_PIO_WRITE_EXT:
 		spin_lock_irq(&d->lock);
 		ifp = getif(t, skb->dev);
-		if (ifp) {
+		if (ifp)
 			ifp->lost = 0;
-			if (n > DEFAULTBCNT)
-				ifp->lostjumbo = 0;
-		}
 		if (d->htgt == t) /* I'll help myself, thank you. */
 			d->htgt = NULL;
 		spin_unlock_irq(&d->lock);
@@ -1292,6 +1285,56 @@ addtgt(struct aoedev *d, char *addr, ulong nframes)
 	return *tt = t;
 }
 
+static void
+setdbcnt(struct aoedev *d)
+{
+	struct aoetgt **t, **e;
+	int bcnt = 0;
+
+	t = d->targets;
+	e = t + NTARGETS;
+	for (; t < e && *t; t++)
+		if (bcnt == 0 || bcnt > (*t)->minbcnt)
+			bcnt = (*t)->minbcnt;
+	if (bcnt != d->maxbcnt) {
+		d->maxbcnt = bcnt;
+		pr_info("aoe: e%ld.%d: setting %d byte data frames\n",
+			d->aoemajor, d->aoeminor, bcnt);
+	}
+}
+
+static void
+setifbcnt(struct aoetgt *t, struct net_device *nd, int bcnt)
+{
+	struct aoedev *d;
+	struct aoeif *p, *e;
+	int minbcnt;
+
+	d = t->d;
+	minbcnt = bcnt;
+	p = t->ifs;
+	e = p + NAOEIFS;
+	for (; p < e; p++) {
+		if (p->nd == NULL)
+			break;		/* end of the valid interfaces */
+		if (p->nd == nd) {
+			p->bcnt = bcnt;	/* we're updating */
+			nd = NULL;
+		} else if (minbcnt > p->bcnt)
+			minbcnt = p->bcnt; /* find the min interface */
+	}
+	if (nd) {
+		if (p == e) {
+			pr_err("aoe: device setifbcnt failure; too many interfaces.\n");
+			return;
+		}
+		p->nd = nd;
+		p->bcnt = bcnt;
+	}
+	t->minbcnt = minbcnt;
+	setdbcnt(d);
+}
+
 void
 aoecmd_cfg_rsp(struct sk_buff *skb)
 {
@@ -1299,7 +1342,6 @@ aoecmd_cfg_rsp(struct sk_buff *skb)
 	struct aoe_hdr *h;
 	struct aoe_cfghdr *ch;
 	struct aoetgt *t;
-	struct aoeif *ifp;
 	ulong flags, sysminor, aoemajor;
 	struct sk_buff *sl;
 	struct sk_buff_head queue;
@@ -1345,32 +1387,13 @@ aoecmd_cfg_rsp(struct sk_buff *skb)
 		if (!t)
 			goto bail;
 	}
-	ifp = getif(t, skb->dev);
-	if (!ifp) {
-		ifp = addif(t, skb->dev);
-		if (!ifp) {
-			printk(KERN_INFO
-				"aoe: device addif failure; "
-				"too many interfaces?\n");
-			goto bail;
-		}
-	}
-	if (ifp->maxbcnt) {
-		n = ifp->nd->mtu;
-		n -= sizeof (struct aoe_hdr) + sizeof (struct aoe_atahdr);
-		n /= 512;
-		if (n > ch->scnt)
-			n = ch->scnt;
-		n = n ? n * 512 : DEFAULTBCNT;
-		if (n != ifp->maxbcnt) {
-			printk(KERN_INFO
-				"aoe: e%ld.%d: setting %d%s%s:%pm\n",
-				d->aoemajor, d->aoeminor, n,
-				" byte data frames on ", ifp->nd->name,
-				t->addr);
-			ifp->maxbcnt = n;
-		}
-	}
+	n = skb->dev->mtu;
+	n -= sizeof(struct aoe_hdr) + sizeof(struct aoe_atahdr);
+	n /= 512;
+	if (n > ch->scnt)
+		n = ch->scnt;
+	n = n ? n * 512 : DEFAULTBCNT;
+	setifbcnt(t, skb->dev, n);
 
 	/* don't change users' perspective */
 	if (d->nopen == 0) {
@@ -1391,22 +1414,14 @@ void
 aoecmd_cleanslate(struct aoedev *d)
 {
 	struct aoetgt **t, **te;
-	struct aoeif *p, *e;
 
 	d->mintimer = MINTIMER;
+	d->maxbcnt = 0;
 
 	t = d->targets;
 	te = t + NTARGETS;
-	for (; t < te && *t; t++) {
+	for (; t < te && *t; t++)
 		(*t)->maxout = (*t)->nframes;
-		p = (*t)->ifs;
-		e = p + NAOEIFS;
-		for (; p < e; p++) {
-			p->lostjumbo = 0;
-			p->lost = 0;
-			p->maxbcnt = DEFAULTBCNT;
-		}
-	}
 }
 
 void
-- 
1.7.2.5


^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [PATCH 06/14] aoe: failover remote interface based on aoe_deadsecs parameter
  2012-08-28 12:53 [PATCH 00/14] aoe driver v49 performance and usability improvements Ed Cashin
                   ` (5 preceding siblings ...)
  2012-08-25 14:39 ` [PATCH 07/14] aoe: do revalidation steps in order Ed Cashin
@ 2012-08-25 14:39 ` Ed Cashin
  2012-08-25 14:39 ` [PATCH 08/14] aoe: disallow unsupported AoE minor addresses Ed Cashin
                   ` (6 subsequent siblings)
  13 siblings, 0 replies; 20+ messages in thread
From: Ed Cashin @ 2012-08-25 14:39 UTC (permalink / raw)
  To: Andrew Morton; +Cc: linux-kernel, ecashin

The aoe_deadsecs module parameter allows the user to specify a hard
limit on the number of seconds an AoE command can be retransmitted
before the AoE block device is considered to have failed.

Using aoe_deadsecs to determine the time we try using a different
remote interface helps to ensure that the hard limit is not reached
before we've tried to recover by sending to a different remote port.

As a data storage target, the AoE target is unambiguously identified
by its {major, minor} AoE address tuple, and an AoE target can have
multiple MAC addresses.  However, note that "target" in the driver
code and comments means a {major, minor, MAC address} tuple, as in
"somewhere to send packets".

Signed-off-by: Ed Cashin <ecashin@coraid.com>
---
 drivers/block/aoe/aoe.h    |    1 -
 drivers/block/aoe/aoecmd.c |    8 +++-----
 2 files changed, 3 insertions(+), 6 deletions(-)

diff --git a/drivers/block/aoe/aoe.h b/drivers/block/aoe/aoe.h
index ffded64..d17b727 100644
--- a/drivers/block/aoe/aoe.h
+++ b/drivers/block/aoe/aoe.h
@@ -96,7 +96,6 @@ enum {
 	TIMERTICK = HZ / 10,
 	MINTIMER = HZ >> 2,
 	MAXTIMER = HZ << 1,
-	HELPWAIT = 20,
 };
 
 struct buf {
diff --git a/drivers/block/aoe/aoecmd.c b/drivers/block/aoe/aoecmd.c
index cf196ed..7f9cc44 100644
--- a/drivers/block/aoe/aoecmd.c
+++ b/drivers/block/aoe/aoecmd.c
@@ -632,11 +632,9 @@ rexmit_timer(ulong vp)
 		list_del(pos);
 
 		t = f->t;
-		if (n > HELPWAIT) {
-			/* see if another target can help */
-			if (d->ntargets > 1)
-				d->htgt = t;
-		}
+		if (n > aoe_deadsecs/2)
+			d->htgt = t; /* see if another target can help */
+
 		if (t->nout == t->maxout) {
 			if (t->maxout > 1)
 				t->maxout--;
-- 
1.7.2.5


^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [PATCH 07/14] aoe: do revalidation steps in order
  2012-08-28 12:53 [PATCH 00/14] aoe driver v49 performance and usability improvements Ed Cashin
                   ` (4 preceding siblings ...)
  2012-08-25 14:39 ` [PATCH 05/14] aoe: use packets that work with the smallest-MTU local interface Ed Cashin
@ 2012-08-25 14:39 ` Ed Cashin
  2012-08-25 14:39 ` [PATCH 06/14] aoe: failover remote interface based on aoe_deadsecs parameter Ed Cashin
                   ` (7 subsequent siblings)
  13 siblings, 0 replies; 20+ messages in thread
From: Ed Cashin @ 2012-08-25 14:39 UTC (permalink / raw)
  To: Andrew Morton; +Cc: linux-kernel, ecashin

The discovery process begins with an optional AoE config query command
and an AoE config query response.  Normally when an aoe device is
already open, the config query response does not trigger an ATA
identify device command to be sent out, since the response contains
storage capacity information that, if changed, could surprise the user
of the device.

The userland "aoe-revalidate" tool uses a character device to trigger
an AoE config query for a particular AoE storage target and an ATA
device identify command, even when the device is open.

This change causes the config query to go out first, reflecting the
normal discovery sequence.  The responses could come back in any
order, so this change is fairly cosmetic.

Signed-off-by: Ed Cashin <ecashin@coraid.com>
---
 drivers/block/aoe/aoechr.c |    4 ++--
 1 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/drivers/block/aoe/aoechr.c b/drivers/block/aoe/aoechr.c
index 3557f0d..acdd0ad 100644
--- a/drivers/block/aoe/aoechr.c
+++ b/drivers/block/aoe/aoechr.c
@@ -96,13 +96,14 @@ revalidate(const char __user *str, size_t size)
 		return -EINVAL;
 	spin_lock_irqsave(&d->lock, flags);
 	aoecmd_cleanslate(d);
+	aoecmd_cfg(major, minor);
 loop:
 	skb = aoecmd_ata_id(d);
 	spin_unlock_irqrestore(&d->lock, flags);
 	/* try again if we are able to sleep a bit,
 	 * otherwise give up this revalidation
 	 */
-	if (!skb && !msleep_interruptible(200)) {
+	if (!skb && !msleep_interruptible(250)) {
 		spin_lock_irqsave(&d->lock, flags);
 		goto loop;
 	}
@@ -113,7 +114,6 @@ loop:
 		__skb_queue_tail(&queue, skb);
 		aoenet_xmit(&queue);
 	}
-	aoecmd_cfg(major, minor);
 	return 0;
 }
 
-- 
1.7.2.5


^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [PATCH 08/14] aoe: disallow unsupported AoE minor addresses
  2012-08-28 12:53 [PATCH 00/14] aoe driver v49 performance and usability improvements Ed Cashin
                   ` (6 preceding siblings ...)
  2012-08-25 14:39 ` [PATCH 06/14] aoe: failover remote interface based on aoe_deadsecs parameter Ed Cashin
@ 2012-08-25 14:39 ` Ed Cashin
  2012-08-25 14:39 ` [PATCH 09/14] aoe: associate frames with the AoE storage target Ed Cashin
                   ` (5 subsequent siblings)
  13 siblings, 0 replies; 20+ messages in thread
From: Ed Cashin @ 2012-08-25 14:39 UTC (permalink / raw)
  To: Andrew Morton; +Cc: linux-kernel, ecashin

A guard is inserted to prevent AoE minor addresses (slot addresses)
higher than 15 to be used, as they are not yet supported by the
driver.

There is a change coming that will allow the aoe driver to overcome
this limit by using system device minor numbers dynamically, but until
then, this guard prevents unexpected targets from being used by the
driver when AoE targets with high minor numbers are on the AoE
network.

Signed-off-by: Ed Cashin <ecashin@coraid.com>
---
 drivers/block/aoe/aoecmd.c |    7 +++++++
 1 files changed, 7 insertions(+), 0 deletions(-)

diff --git a/drivers/block/aoe/aoecmd.c b/drivers/block/aoe/aoecmd.c
index 7f9cc44..61cc1cf 100644
--- a/drivers/block/aoe/aoecmd.c
+++ b/drivers/block/aoe/aoecmd.c
@@ -1359,6 +1359,13 @@ aoecmd_cfg_rsp(struct sk_buff *skb)
 			"Check shelf dip switches.\n");
 		return;
 	}
+	if (h->minor >= NPERSHELF) {
+		pr_err("aoe: e%ld.%d %s, %d\n",
+			aoemajor, h->minor,
+			"slot number larger than the maximum",
+			NPERSHELF-1);
+		return;
+	}
 
 	sysminor = SYSMINOR(aoemajor, h->minor);
 	if (sysminor * AOE_PARTITIONS + AOE_PARTITIONS > MINORMASK) {
-- 
1.7.2.5


^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [PATCH 09/14] aoe: associate frames with the AoE storage target
  2012-08-28 12:53 [PATCH 00/14] aoe driver v49 performance and usability improvements Ed Cashin
                   ` (7 preceding siblings ...)
  2012-08-25 14:39 ` [PATCH 08/14] aoe: disallow unsupported AoE minor addresses Ed Cashin
@ 2012-08-25 14:39 ` Ed Cashin
  2012-08-25 14:39 ` [PATCH 10/14] aoe: increase net_device reference count while using it Ed Cashin
                   ` (4 subsequent siblings)
  13 siblings, 0 replies; 20+ messages in thread
From: Ed Cashin @ 2012-08-25 14:39 UTC (permalink / raw)
  To: Andrew Morton; +Cc: linux-kernel, ecashin

In the driver code, "target" and aoetgt refer to a particular remote
interface on the AoE storage target.  The latter is identified by its
AoE major and minor addresses.  Commands that are being sent to an AoE
storage target {major, minor} can be sent or retransmitted to any of
the remote MAC addresses associated with the AoE storage target.

That is, frames are naturally associated with not an aoetgt (AoE
major, AoE minor, remote MAC address) but an aoedev (AoE major, AoE
minor).  Making the code reflect that reality simplifies the driver,
especially when the path to a remote MAC address becomes unusable.

Signed-off-by: Ed Cashin <ecashin@coraid.com>
---
 drivers/block/aoe/aoe.h    |    8 +++---
 drivers/block/aoe/aoecmd.c |   65 +++++++++++++++++++-------------------------
 drivers/block/aoe/aoedev.c |   30 +++++++++++---------
 3 files changed, 49 insertions(+), 54 deletions(-)

diff --git a/drivers/block/aoe/aoe.h b/drivers/block/aoe/aoe.h
index d17b727..dab7258 100644
--- a/drivers/block/aoe/aoe.h
+++ b/drivers/block/aoe/aoe.h
@@ -91,7 +91,7 @@ enum {
 	NTARGETS = 8,
 	NAOEIFS = 8,
 	NSKBPOOLMAX = 256,
-	NFACTIVE = 17,
+	NFACTIVE = 61,
 
 	TIMERTICK = HZ / 10,
 	MINTIMER = HZ >> 2,
@@ -132,14 +132,11 @@ struct aoetgt {
 	unsigned char addr[6];
 	ushort nframes;
 	struct aoedev *d;			/* parent device I belong to */
-	struct list_head factive[NFACTIVE];	/* hash of active frames */
 	struct list_head ffree;			/* list of free frames */
 	struct aoeif ifs[NAOEIFS];
 	struct aoeif *ifp;	/* current aoeif in use */
 	ushort nout;
 	ushort maxout;
-	u16 lasttag;		/* last tag sent */
-	u16 useme;
 	ulong falloc;
 	ulong lastwadj;		/* last window adjustment */
 	int minbcnt;
@@ -156,6 +153,8 @@ struct aoedev {
 	u16 rttavg;		/* round trip average of requests/responses */
 	u16 mintimer;
 	u16 fw_ver;		/* version of blade's firmware */
+	u16 lasttag;		/* last tag sent */
+	u16 useme;
 	ulong ref;
 	struct work_struct work;/* disk create work struct */
 	struct gendisk *gd;
@@ -172,6 +171,7 @@ struct aoedev {
 		struct request *rq;
 	} ip;
 	ulong maxbcnt;
+	struct list_head factive[NFACTIVE];	/* hash of active frames */
 	struct aoetgt *targets[NTARGETS];
 	struct aoetgt **tgt;	/* target in use when working */
 	struct aoetgt *htgt;	/* target needing rexmit assistance */
diff --git a/drivers/block/aoe/aoecmd.c b/drivers/block/aoe/aoecmd.c
index 61cc1cf..e39d815 100644
--- a/drivers/block/aoe/aoecmd.c
+++ b/drivers/block/aoe/aoecmd.c
@@ -58,14 +58,14 @@ new_skb(ulong len)
 }
 
 static struct frame *
-getframe(struct aoetgt *t, u32 tag)
+getframe(struct aoedev *d, u32 tag)
 {
 	struct frame *f;
 	struct list_head *head, *pos, *nx;
 	u32 n;
 
 	n = tag % NFACTIVE;
-	head = &t->factive[n];
+	head = &d->factive[n];
 	list_for_each_safe(pos, nx, head) {
 		f = list_entry(pos, struct frame, head);
 		if (f->tag == tag) {
@@ -82,18 +82,18 @@ getframe(struct aoetgt *t, u32 tag)
  * This driver reserves tag -1 to mean "unused frame."
  */
 static int
-newtag(struct aoetgt *t)
+newtag(struct aoedev *d)
 {
 	register ulong n;
 
 	n = jiffies & 0xffff;
-	return n |= (++t->lasttag & 0x7fff) << 16;
+	return n |= (++d->lasttag & 0x7fff) << 16;
 }
 
 static u32
 aoehdr_atainit(struct aoedev *d, struct aoetgt *t, struct aoe_hdr *h)
 {
-	u32 host_tag = newtag(t);
+	u32 host_tag = newtag(d);
 
 	memcpy(h->src, t->ifp->nd->dev_addr, sizeof h->src);
 	memcpy(h->dst, t->addr, sizeof h->dst);
@@ -269,11 +269,11 @@ loop:
 static void
 fhash(struct frame *f)
 {
-	struct aoetgt *t = f->t;
+	struct aoedev *d = f->t->d;
 	u32 n;
 
 	n = f->tag % NFACTIVE;
-	list_add_tail(&f->head, &t->factive[n]);
+	list_add_tail(&f->head, &d->factive[n]);
 }
 
 static int
@@ -432,7 +432,7 @@ resend(struct aoedev *d, struct frame *f)
 	u32 n;
 
 	t = f->t;
-	n = newtag(t);
+	n = newtag(d);
 	skb = f->skb;
 	if (ifrotate(t) == NULL) {
 		/* probably can't happen, but set it up to fail anyway */
@@ -511,9 +511,12 @@ sthtith(struct aoedev *d)
 	int i;
 
 	for (i = 0; i < NFACTIVE; i++) {
-		head = &ht->factive[i];
+		head = &d->factive[i];
 		list_for_each_safe(pos, nx, head) {
 			f = list_entry(pos, struct frame, head);
+			if (f->t != ht)
+				continue;
+
 			nf = newframe(d);
 			if (!nf)
 				return 0;
@@ -584,22 +587,20 @@ rexmit_timer(ulong vp)
 	}
 
 	/* collect all frames to rexmit into flist */
-	tt = d->targets;
-	te = tt + NTARGETS;
-	for (; tt < te && *tt; tt++) {
-		t = *tt;
-		for (i = 0; i < NFACTIVE; i++) {
-			head = &t->factive[i];
-			list_for_each_safe(pos, nx, head) {
-				f = list_entry(pos, struct frame, head);
-				if (tsince(f->tag) < timeout)
-					continue;
-				/* move to flist for later processing */
-				list_move_tail(pos, &flist);
-			}
+	for (i = 0; i < NFACTIVE; i++) {
+		head = &d->factive[i];
+		list_for_each_safe(pos, nx, head) {
+			f = list_entry(pos, struct frame, head);
+			if (tsince(f->tag) < timeout)
+				break;	/* end of expired frames */
+			/* move to flist for later processing */
+			list_move_tail(pos, &flist);
 		}
-
-		/* window check */
+	}
+	/* window check */
+	tt = d->targets;
+	te = tt + d->ntargets;
+	for (; tt < te && (t = *tt); tt++) {
 		if (t->nout == t->maxout
 		&& t->maxout < t->nframes
 		&& (jiffies - t->lastwadj)/HZ > 10) {
@@ -625,7 +626,7 @@ rexmit_timer(ulong vp)
 			 * Hang all frames on first hash bucket for downdev
 			 * to clean up.
 			 */
-			list_splice(&flist, &f->t->factive[0]);
+			list_splice(&flist, &d->factive[0]);
 			aoedev_downdev(d);
 			break;
 		}
@@ -1161,15 +1162,7 @@ aoecmd_ata_rsp(struct sk_buff *skb)
 	spin_lock_irqsave(&d->lock, flags);
 
 	n = be32_to_cpu(get_unaligned(&h->tag));
-	t = gettgt(d, h->src);
-	if (t == NULL) {
-		printk(KERN_INFO "aoe: can't find target e%ld.%d:%pm\n",
-		       d->aoemajor, d->aoeminor, h->src);
-		spin_unlock_irqrestore(&d->lock, flags);
-		aoedev_put(d);
-		return skb;
-	}
-	f = getframe(t, n);
+	f = getframe(d, n);
 	if (f == NULL) {
 		calc_rttavg(d, -tsince(n));
 		spin_unlock_irqrestore(&d->lock, flags);
@@ -1184,6 +1177,7 @@ aoecmd_ata_rsp(struct sk_buff *skb)
 		aoechr_error(ebuf);
 		return skb;
 	}
+	t = f->t;
 	calc_rttavg(d, tsince(f->tag));
 	t->nout--;
 	aoecmd_work(d);
@@ -1252,7 +1246,6 @@ static struct aoetgt *
 addtgt(struct aoedev *d, char *addr, ulong nframes)
 {
 	struct aoetgt *t, **tt, **te;
-	int i;
 
 	tt = d->targets;
 	te = tt + NTARGETS;
@@ -1278,8 +1271,6 @@ addtgt(struct aoedev *d, char *addr, ulong nframes)
 	t->ifp = t->ifs;
 	t->maxout = t->nframes;
 	INIT_LIST_HEAD(&t->ffree);
-	for (i = 0; i < NFACTIVE; ++i)
-		INIT_LIST_HEAD(&t->factive[i]);
 	return *tt = t;
 }
 
diff --git a/drivers/block/aoe/aoedev.c b/drivers/block/aoe/aoedev.c
index 635dc98..3968fe6 100644
--- a/drivers/block/aoe/aoedev.c
+++ b/drivers/block/aoe/aoedev.c
@@ -103,22 +103,23 @@ aoedev_downdev(struct aoedev *d)
 
 	d->flags &= ~DEVFL_UP;
 
-	/* clean out active buffers on all targets */
+	/* clean out active buffers */
+	for (i = 0; i < NFACTIVE; i++) {
+		head = &d->factive[i];
+		list_for_each_safe(pos, nx, head) {
+			f = list_entry(pos, struct frame, head);
+			list_del(pos);
+			if (f->buf) {
+				f->buf->nframesout--;
+				aoe_failbuf(d, f->buf);
+			}
+			aoe_freetframe(f);
+		}
+	}
+	/* reset window dressings */
 	tt = d->targets;
 	te = tt + NTARGETS;
 	for (; tt < te && (t = *tt); tt++) {
-		for (i = 0; i < NFACTIVE; i++) {
-			head = &t->factive[i];
-			list_for_each_safe(pos, nx, head) {
-				list_del(pos);
-				f = list_entry(pos, struct frame, head);
-				if (f->buf) {
-					f->buf->nframesout--;
-					aoe_failbuf(d, f->buf);
-				}
-				aoe_freetframe(f);
-			}
-		}
 		t->maxout = t->nframes;
 		t->nout = 0;
 	}
@@ -250,6 +251,7 @@ struct aoedev *
 aoedev_by_sysminor_m(ulong sysminor)
 {
 	struct aoedev *d;
+	int i;
 	ulong flags;
 
 	spin_lock_irqsave(&devlist_lock, flags);
@@ -275,6 +277,8 @@ aoedev_by_sysminor_m(ulong sysminor)
 	d->bufpool = NULL;	/* defer to aoeblk_gdalloc */
 	d->tgt = d->targets;
 	d->ref = 1;
+	for (i = 0; i < NFACTIVE; i++)
+		INIT_LIST_HEAD(&d->factive[i]);
 	d->sysminor = sysminor;
 	d->aoemajor = AOEMAJOR(sysminor);
 	d->aoeminor = AOEMINOR(sysminor);
-- 
1.7.2.5


^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [PATCH 10/14] aoe: increase net_device reference count while using it
  2012-08-28 12:53 [PATCH 00/14] aoe driver v49 performance and usability improvements Ed Cashin
                   ` (8 preceding siblings ...)
  2012-08-25 14:39 ` [PATCH 09/14] aoe: associate frames with the AoE storage target Ed Cashin
@ 2012-08-25 14:39 ` Ed Cashin
  2012-08-25 14:39 ` [PATCH 11/14] aoe: remove unused code and add cosmetic improvements Ed Cashin
                   ` (3 subsequent siblings)
  13 siblings, 0 replies; 20+ messages in thread
From: Ed Cashin @ 2012-08-25 14:39 UTC (permalink / raw)
  To: Andrew Morton; +Cc: linux-kernel, ecashin

This change eliminates the danger that the user could rmmod the driver
for a network interface that is being used for AoE by the aoe driver.

Signed-off-by: Ed Cashin <ecashin@coraid.com>
---
 drivers/block/aoe/aoecmd.c |    4 ++++
 drivers/block/aoe/aoedev.c |    7 +++++++
 2 files changed, 11 insertions(+), 0 deletions(-)

diff --git a/drivers/block/aoe/aoecmd.c b/drivers/block/aoe/aoecmd.c
index e39d815..d2de679 100644
--- a/drivers/block/aoe/aoecmd.c
+++ b/drivers/block/aoe/aoecmd.c
@@ -493,12 +493,15 @@ static void
 ejectif(struct aoetgt *t, struct aoeif *ifp)
 {
 	struct aoeif *e;
+	struct net_device *nd;
 	ulong n;
 
+	nd = ifp->nd;
 	e = t->ifs + NAOEIFS - 1;
 	n = (e - ifp) * sizeof *ifp;
 	memmove(ifp, ifp+1, n);
 	e->nd = NULL;
+	dev_put(nd);
 }
 
 static int
@@ -1317,6 +1320,7 @@ setifbcnt(struct aoetgt *t, struct net_device *nd, int bcnt)
 			pr_err("aoe: device setifbcnt failure; too many interfaces.\n");
 			return;
 		}
+		dev_hold(nd);
 		p->nd = nd;
 		p->bcnt = bcnt;
 	}
diff --git a/drivers/block/aoe/aoedev.c b/drivers/block/aoe/aoedev.c
index 3968fe6..6be7b38 100644
--- a/drivers/block/aoe/aoedev.c
+++ b/drivers/block/aoe/aoedev.c
@@ -295,6 +295,13 @@ freetgt(struct aoedev *d, struct aoetgt *t)
 {
 	struct frame *f;
 	struct list_head *pos, *nx, *head;
+	struct aoeif *ifp;
+
+	for (ifp = t->ifs; ifp < &t->ifs[NAOEIFS]; ++ifp) {
+		if (!ifp->nd)
+			break;
+		dev_put(ifp->nd);
+	}
 
 	head = &t->ffree;
 	list_for_each_safe(pos, nx, head) {
-- 
1.7.2.5


^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [PATCH 11/14] aoe: remove unused code and add cosmetic improvements
  2012-08-28 12:53 [PATCH 00/14] aoe driver v49 performance and usability improvements Ed Cashin
                   ` (9 preceding siblings ...)
  2012-08-25 14:39 ` [PATCH 10/14] aoe: increase net_device reference count while using it Ed Cashin
@ 2012-08-25 14:39 ` Ed Cashin
  2012-08-25 14:39 ` [PATCH 13/14] aoe: update copyright year in touched files Ed Cashin
                   ` (2 subsequent siblings)
  13 siblings, 0 replies; 20+ messages in thread
From: Ed Cashin @ 2012-08-25 14:39 UTC (permalink / raw)
  To: Andrew Morton; +Cc: linux-kernel, ecashin

This change removes some unused code and attempts to increase code
consistency.

Signed-off-by: Ed Cashin <ecashin@coraid.com>
---
 drivers/block/aoe/aoe.h    |   10 +++-------
 drivers/block/aoe/aoechr.c |    1 +
 drivers/block/aoe/aoecmd.c |   13 ++++---------
 drivers/block/aoe/aoenet.c |    3 ++-
 4 files changed, 10 insertions(+), 17 deletions(-)

diff --git a/drivers/block/aoe/aoe.h b/drivers/block/aoe/aoe.h
index dab7258..eb41fc5 100644
--- a/drivers/block/aoe/aoe.h
+++ b/drivers/block/aoe/aoe.h
@@ -75,18 +75,14 @@ enum {
 	DEVFL_UP = 1,	/* device is installed in system and ready for AoE->ATA commands */
 	DEVFL_TKILL = (1<<1),	/* flag for timer to know when to kill self */
 	DEVFL_EXT = (1<<2),	/* device accepts lba48 commands */
-	DEVFL_CLOSEWAIT = (1<<3), /* device is waiting for all closes to revalidate */
-	DEVFL_GDALLOC = (1<<4),	/* need to alloc gendisk */
-	DEVFL_KICKME = (1<<5),	/* slow polling network card catch */
-	DEVFL_NEWSIZE = (1<<6),	/* need to update dev size in block layer */
-
-	BUFFL_FAIL = 1,
+	DEVFL_GDALLOC = (1<<3),	/* need to alloc gendisk */
+	DEVFL_KICKME = (1<<4),	/* slow polling network card catch */
+	DEVFL_NEWSIZE = (1<<5),	/* need to update dev size in block layer */
 };
 
 enum {
 	DEFAULTBCNT = 2 * 512,	/* 2 sectors */
 	NPERSHELF = 16,		/* number of slots per shelf address */
-	FREETAG = -1,
 	MIN_BUFS = 16,
 	NTARGETS = 8,
 	NAOEIFS = 8,
diff --git a/drivers/block/aoe/aoechr.c b/drivers/block/aoe/aoechr.c
index acdd0ad..723e604 100644
--- a/drivers/block/aoe/aoechr.c
+++ b/drivers/block/aoe/aoechr.c
@@ -174,6 +174,7 @@ aoechr_write(struct file *filp, const char __user *buf, size_t cnt, loff_t *offp
 		break;
 	case MINOR_FLUSH:
 		ret = aoedev_flush(buf, cnt);
+		break;
 	}
 	if (ret == 0)
 		ret = cnt;
diff --git a/drivers/block/aoe/aoecmd.c b/drivers/block/aoe/aoecmd.c
index d2de679..dd5d898 100644
--- a/drivers/block/aoe/aoecmd.c
+++ b/drivers/block/aoe/aoecmd.c
@@ -283,7 +283,6 @@ aoecmd_ata_rw(struct aoedev *d)
 	struct aoe_hdr *h;
 	struct aoe_atahdr *ah;
 	struct buf *buf;
-	struct bio_vec *bv;
 	struct aoetgt *t;
 	struct sk_buff *skb;
 	struct sk_buff_head queue;
@@ -300,7 +299,6 @@ aoecmd_ata_rw(struct aoedev *d)
 	if (f == NULL)
 		return 0;
 	t = *d->tgt;
-	bv = buf->bv;
 	bcnt = d->maxbcnt;
 	if (bcnt == 0)
 		bcnt = DEFAULTBCNT;
@@ -787,28 +785,25 @@ void
 aoecmd_sleepwork(struct work_struct *work)
 {
 	struct aoedev *d = container_of(work, struct aoedev, work);
+	struct block_device *bd;
+	u64 ssize;
 
 	if (d->flags & DEVFL_GDALLOC)
 		aoeblk_gdalloc(d);
 
 	if (d->flags & DEVFL_NEWSIZE) {
-		struct block_device *bd;
-		unsigned long flags;
-		u64 ssize;
-
 		ssize = get_capacity(d->gd);
 		bd = bdget_disk(d->gd, 0);
-
 		if (bd) {
 			mutex_lock(&bd->bd_inode->i_mutex);
 			i_size_write(bd->bd_inode, (loff_t)ssize<<9);
 			mutex_unlock(&bd->bd_inode->i_mutex);
 			bdput(bd);
 		}
-		spin_lock_irqsave(&d->lock, flags);
+		spin_lock_irq(&d->lock);
 		d->flags |= DEVFL_UP;
 		d->flags &= ~DEVFL_NEWSIZE;
-		spin_unlock_irqrestore(&d->lock, flags);
+		spin_unlock_irq(&d->lock);
 	}
 }
 
diff --git a/drivers/block/aoe/aoenet.c b/drivers/block/aoe/aoenet.c
index 5f43710..3c923e5 100644
--- a/drivers/block/aoe/aoenet.c
+++ b/drivers/block/aoe/aoenet.c
@@ -175,7 +175,8 @@ aoenet_rcv(struct sk_buff *skb, struct net_device *ifp, struct packet_type *pt,
 	default:
 		if (h->cmd >= AOECMD_VEND_MIN)
 			break;	/* don't complain about vendor commands */
-		printk(KERN_INFO "aoe: unknown cmd %d\n", h->cmd);
+		pr_info("aoe: unknown AoE command type 0x%02x\n", h->cmd);
+		break;
 	}
 
 	if (!skb)
-- 
1.7.2.5


^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [PATCH 12/14] aoe: update internal version number to 49
  2012-08-28 12:53 [PATCH 00/14] aoe driver v49 performance and usability improvements Ed Cashin
                   ` (11 preceding siblings ...)
  2012-08-25 14:39 ` [PATCH 13/14] aoe: update copyright year in touched files Ed Cashin
@ 2012-08-25 14:39 ` Ed Cashin
  2012-08-25 14:39 ` [PATCH 14/14] aoe: update documentation with new URL and VM settings reference Ed Cashin
  13 siblings, 0 replies; 20+ messages in thread
From: Ed Cashin @ 2012-08-25 14:39 UTC (permalink / raw)
  To: Andrew Morton; +Cc: linux-kernel, ecashin

The internal version number of the aoe driver appears in a console
message when the driver loads and is usually obtained by the user with
the userland aoe-version tool, part of the aoetools.[1]

Although this patchset includes bugfixes backported from
higher-numbered versions published on the coraid.com website, it is a
form of version 49.

1. http://aoetools.sourceforge.net/

Signed-off-by: Ed Cashin <ecashin@coraid.com>
---
 drivers/block/aoe/aoe.h |    2 +-
 1 files changed, 1 insertions(+), 1 deletions(-)

diff --git a/drivers/block/aoe/aoe.h b/drivers/block/aoe/aoe.h
index eb41fc5..32aede9 100644
--- a/drivers/block/aoe/aoe.h
+++ b/drivers/block/aoe/aoe.h
@@ -1,5 +1,5 @@
 /* Copyright (c) 2007 Coraid, Inc.  See COPYING for GPL terms. */
-#define VERSION "47"
+#define VERSION "49"
 #define AOE_MAJOR 152
 #define DEVICE_NAME "aoe"
 
-- 
1.7.2.5


^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [PATCH 13/14] aoe: update copyright year in touched files
  2012-08-28 12:53 [PATCH 00/14] aoe driver v49 performance and usability improvements Ed Cashin
                   ` (10 preceding siblings ...)
  2012-08-25 14:39 ` [PATCH 11/14] aoe: remove unused code and add cosmetic improvements Ed Cashin
@ 2012-08-25 14:39 ` Ed Cashin
  2012-08-25 14:39 ` [PATCH 12/14] aoe: update internal version number to 49 Ed Cashin
  2012-08-25 14:39 ` [PATCH 14/14] aoe: update documentation with new URL and VM settings reference Ed Cashin
  13 siblings, 0 replies; 20+ messages in thread
From: Ed Cashin @ 2012-08-25 14:39 UTC (permalink / raw)
  To: Andrew Morton; +Cc: linux-kernel, ecashin

Signed-off-by: Ed Cashin <ecashin@coraid.com>
---
 drivers/block/aoe/aoe.h     |    2 +-
 drivers/block/aoe/aoeblk.c  |    2 +-
 drivers/block/aoe/aoechr.c  |    2 +-
 drivers/block/aoe/aoecmd.c  |    2 +-
 drivers/block/aoe/aoedev.c  |    2 +-
 drivers/block/aoe/aoemain.c |    2 +-
 drivers/block/aoe/aoenet.c  |    2 +-
 7 files changed, 7 insertions(+), 7 deletions(-)

diff --git a/drivers/block/aoe/aoe.h b/drivers/block/aoe/aoe.h
index 32aede9..27d0a21 100644
--- a/drivers/block/aoe/aoe.h
+++ b/drivers/block/aoe/aoe.h
@@ -1,4 +1,4 @@
-/* Copyright (c) 2007 Coraid, Inc.  See COPYING for GPL terms. */
+/* Copyright (c) 2012 Coraid, Inc.  See COPYING for GPL terms. */
 #define VERSION "49"
 #define AOE_MAJOR 152
 #define DEVICE_NAME "aoe"
diff --git a/drivers/block/aoe/aoeblk.c b/drivers/block/aoe/aoeblk.c
index 7ec4b8f..83160ab 100644
--- a/drivers/block/aoe/aoeblk.c
+++ b/drivers/block/aoe/aoeblk.c
@@ -1,4 +1,4 @@
-/* Copyright (c) 2007 Coraid, Inc.  See COPYING for GPL terms. */
+/* Copyright (c) 2012 Coraid, Inc.  See COPYING for GPL terms. */
 /*
  * aoeblk.c
  * block device routines
diff --git a/drivers/block/aoe/aoechr.c b/drivers/block/aoe/aoechr.c
index 723e604..deb30c1 100644
--- a/drivers/block/aoe/aoechr.c
+++ b/drivers/block/aoe/aoechr.c
@@ -1,4 +1,4 @@
-/* Copyright (c) 2007 Coraid, Inc.  See COPYING for GPL terms. */
+/* Copyright (c) 2012 Coraid, Inc.  See COPYING for GPL terms. */
 /*
  * aoechr.c
  * AoE character device driver
diff --git a/drivers/block/aoe/aoecmd.c b/drivers/block/aoe/aoecmd.c
index dd5d898..5461faa 100644
--- a/drivers/block/aoe/aoecmd.c
+++ b/drivers/block/aoe/aoecmd.c
@@ -1,4 +1,4 @@
-/* Copyright (c) 2007 Coraid, Inc.  See COPYING for GPL terms. */
+/* Copyright (c) 2012 Coraid, Inc.  See COPYING for GPL terms. */
 /*
  * aoecmd.c
  * Filesystem request handling methods
diff --git a/drivers/block/aoe/aoedev.c b/drivers/block/aoe/aoedev.c
index 6be7b38..ccaecff 100644
--- a/drivers/block/aoe/aoedev.c
+++ b/drivers/block/aoe/aoedev.c
@@ -1,4 +1,4 @@
-/* Copyright (c) 2007 Coraid, Inc.  See COPYING for GPL terms. */
+/* Copyright (c) 2012 Coraid, Inc.  See COPYING for GPL terms. */
 /*
  * aoedev.c
  * AoE device utility functions; maintains device list.
diff --git a/drivers/block/aoe/aoemain.c b/drivers/block/aoe/aoemain.c
index 6fc4b05..04793c2 100644
--- a/drivers/block/aoe/aoemain.c
+++ b/drivers/block/aoe/aoemain.c
@@ -1,4 +1,4 @@
-/* Copyright (c) 2007 Coraid, Inc.  See COPYING for GPL terms. */
+/* Copyright (c) 2012 Coraid, Inc.  See COPYING for GPL terms. */
 /*
  * aoemain.c
  * Module initialization routines, discover timer
diff --git a/drivers/block/aoe/aoenet.c b/drivers/block/aoe/aoenet.c
index 3c923e5..162c647 100644
--- a/drivers/block/aoe/aoenet.c
+++ b/drivers/block/aoe/aoenet.c
@@ -1,4 +1,4 @@
-/* Copyright (c) 2007 Coraid, Inc.  See COPYING for GPL terms. */
+/* Copyright (c) 2012 Coraid, Inc.  See COPYING for GPL terms. */
 /*
  * aoenet.c
  * Ethernet portion of AoE driver
-- 
1.7.2.5


^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [PATCH 14/14] aoe: update documentation with new URL and VM settings reference
  2012-08-28 12:53 [PATCH 00/14] aoe driver v49 performance and usability improvements Ed Cashin
                   ` (12 preceding siblings ...)
  2012-08-25 14:39 ` [PATCH 12/14] aoe: update internal version number to 49 Ed Cashin
@ 2012-08-25 14:39 ` Ed Cashin
  13 siblings, 0 replies; 20+ messages in thread
From: Ed Cashin @ 2012-08-25 14:39 UTC (permalink / raw)
  To: Andrew Morton; +Cc: linux-kernel, ecashin

The old area has a new URL.  Also, now that the driver can perform
better, it is worth mentioning the VM settings that help aoe to sink
dirty pages out early, avoiding unecessary memory pressure when much
I/O is going on.

Signed-off-by: Ed Cashin <ecashin@coraid.com>
---
 Documentation/aoe/aoe.txt |    9 ++++++---
 MAINTAINERS               |    2 +-
 2 files changed, 7 insertions(+), 4 deletions(-)

diff --git a/Documentation/aoe/aoe.txt b/Documentation/aoe/aoe.txt
index 5f5aa16..b3e4756 100644
--- a/Documentation/aoe/aoe.txt
+++ b/Documentation/aoe/aoe.txt
@@ -1,8 +1,11 @@
-The EtherDrive (R) HOWTO for users of 2.6 kernels is found at ...
+The EtherDrive (R) HOWTO for 2.6 and 3.x kernels is found at ...
 
-  http://www.coraid.com/SUPPORT/EtherDrive-HBA  
+  http://support.coraid.com/support/linux/EtherDrive-2.6-HOWTO.html
 
-  It has many tips and hints!
+It has many tips and hints!  Please see, especially, recommended
+tunings for virtual memory:
+
+  http://support.coraid.com/support/linux/EtherDrive-2.6-HOWTO-5.html#ss5.19
 
 The aoetools are userland programs that are designed to work with this
 driver.  The aoetools are on sourceforge.
diff --git a/MAINTAINERS b/MAINTAINERS
index fdc0119..917b0d8 100644
--- a/MAINTAINERS
+++ b/MAINTAINERS
@@ -1250,7 +1250,7 @@ F:	include/linux/i2c/at24.h
 
 ATA OVER ETHERNET (AOE) DRIVER
 M:	"Ed L. Cashin" <ecashin@coraid.com>
-W:	http://www.coraid.com/support/linux
+W:	http://support.coraid.com/support/linux
 S:	Supported
 F:	Documentation/aoe/
 F:	drivers/block/aoe/
-- 
1.7.2.5


^ permalink raw reply related	[flat|nested] 20+ messages in thread

* [PATCH 00/14] aoe driver v49 performance and usability improvements
@ 2012-08-28 12:53 Ed Cashin
  2012-08-25 14:39 ` [PATCH 01/14] aoe: for performance support larger packet payloads Ed Cashin
                   ` (13 more replies)
  0 siblings, 14 replies; 20+ messages in thread
From: Ed Cashin @ 2012-08-28 12:53 UTC (permalink / raw)
  To: Andrew Morton; +Cc: linux-kernel, ecashin

In this resubmission of the patchset, several suggestions from Andrew
Morton have been incorporated to clean up the changes.  I will not be
able to be very responsive during the first part of September, so the
sooner you can provide feedback, the better.

These patches go a long way to updating the in-kernel aoe driver with
the changes that have been in the coraid.com-distributed version,
bringing it from (aoe internal) version 47 to version 49.  They apply
to commit 23dcfa61bac244e1 of the mainline git tree.

These updates have been too long in coming, but there is reason to be
hopeful that once these changes are applied, there is now less of a
chance that such a large update gap will occur again for a couple
reasons.

For one, the changes between 47 and 49 were originally produced in a
form that required a large amount of work before they could be
presented to the list in a form that complies with
Documenation/Submit*.  I have done my best to disentangle the changes
and to clean them up.  Subsequent changes are more amenable to
submission to the LKML and should be easier to prepare soon.  Future
changes from Coraid are expected to be similarly amenable to LKML
submission because of changes we have made for the better in Coraid's
development practices in the past few years.

Second, the users of the aoe driver increasingly demand that the
in-kernel driver have the same high performance and advanced features
as the one distributed at coraid.com.

The plan is to follow up with more changes once this patchset is
merged, until the coraid.com-distributed driver and the in-kernel
drivers are as close as possible.  Some bugfixes from have been
backported to this patchset, though, from post-version-49 development.
The important thing, though, is to get things started with a minimal
and progressive first patchset.

Ed L. Cashin (14):
  aoe: for performance support larger packet payloads
  aoe: kernel thread handles I/O completions for simple locking
  aoe: become I/O request queue handler for increased user control
  aoe: use a kernel thread for transmissions
  aoe: use packets that work with the smallest-MTU local interface
  aoe: failover remote interface based on aoe_deadsecs parameter
  aoe: do revalidation steps in order
  aoe: disallow unsupported AoE minor addresses
  aoe: associate frames with the AoE storage target
  aoe: increase net_device reference count while using it
  aoe: remove unused code and add cosmetic improvements
  aoe: update internal version number to 49
  aoe: update copyright year in touched files
  aoe: update documentation with new URL and VM settings reference

 Documentation/aoe/aoe.txt   |    9 +-
 MAINTAINERS                 |    2 +-
 drivers/block/aoe/aoe.h     |   85 ++--
 drivers/block/aoe/aoeblk.c  |   89 +---
 drivers/block/aoe/aoechr.c  |   11 +-
 drivers/block/aoe/aoecmd.c  | 1219 ++++++++++++++++++++++++++++---------------
 drivers/block/aoe/aoedev.c  |  161 ++++--
 drivers/block/aoe/aoemain.c |   10 +-
 drivers/block/aoe/aoenet.c  |   61 ++-
 9 files changed, 1076 insertions(+), 571 deletions(-)

-- 
1.7.2.5


^ permalink raw reply	[flat|nested] 20+ messages in thread

* Re: [PATCH 02/14] aoe: kernel thread handles I/O completions for simple locking
  2012-08-25 14:39 ` [PATCH 02/14] aoe: kernel thread handles I/O completions for simple locking Ed Cashin
@ 2012-08-31 20:06   ` Andrew Morton
  0 siblings, 0 replies; 20+ messages in thread
From: Andrew Morton @ 2012-08-31 20:06 UTC (permalink / raw)
  To: Ed Cashin; +Cc: linux-kernel

On Sat, 25 Aug 2012 10:39:46 -0400
Ed Cashin <ecashin@coraid.com> wrote:

> +static int
> +kthread(void *vp)
> +{
> +	struct ktstate *k;
> +	DECLARE_WAITQUEUE(wait, current);
> +	int more;
> +
> +	k = vp;
> +	current->flags |= PF_NOFREEZE;
> +	set_user_nice(current, -10);
> +	complete(&k->rendez);	/* tell spawner we're running */
> +	do {
> +		spin_lock_irq(k->lock);
> +		more = k->fn();
> +		if (!more) {
> +			add_wait_queue(k->waitq, &wait);
> +			__set_current_state(TASK_INTERRUPTIBLE);
> +		}
> +		spin_unlock_irq(k->lock);
> +		if (!more) {
> +			schedule();
> +			remove_wait_queue(k->waitq, &wait);
> +		} else
> +			cond_resched();
> +	} while (!kthread_should_stop());
> +	complete(&k->rendez);	/* tell spawner we're stopping */
> +	return 0;
> +}
> +
> +static void
> +aoe_ktstop(struct ktstate *k)
> +{
> +	kthread_stop(k->task);
> +	wait_for_completion(&k->rendez);
> +}
> +
> +static int
> +aoe_ktstart(struct ktstate *k)
> +{
> +	struct task_struct *task;
> +
> +	init_completion(&k->rendez);
> +	task = kthread_run(kthread, k, k->name);
> +	if (task == NULL || IS_ERR(task))
> +		return -ENOMEM;
> +	k->task = task;
> +	wait_for_completion(&k->rendez); /* allow kthread to start */
> +	init_completion(&k->rendez);	/* for waiting for exit later */
> +	return 0;
> +}

It's a pretty unlikely thing, but I wonder if there's a way in which
aoe_ktstart() can run that second init_completion() _after_ kthread()
has run its second complete().  ie: someone runs aoe_ktstop()
super-early.  If that can happen, the later wait_for_completion() hangs
up.




^ permalink raw reply	[flat|nested] 20+ messages in thread

* Re: [PATCH 02/14] aoe: kernel thread handles I/O completions for simple locking
  2012-08-24 21:22   ` Andrew Morton
  2012-08-25  0:35     ` Ed Cashin
@ 2012-08-26  1:31     ` ecashin
  1 sibling, 0 replies; 20+ messages in thread
From: ecashin @ 2012-08-26  1:31 UTC (permalink / raw)
  To: Andrew Morton; +Cc: linux-kernel, Ed Cashin

On Fri, 24 Aug 2012 14:22:57 -0700, Andrew Morton wrote:

> On Fri, 17 Aug 2012 21:24:08 -0400
> Ed Cashin <ecashin@coraid.com> wrote:
...
> > +#ifdef PF_NOFREEZE
> 
> PF_NOFREEZE can never be undefined.
> 
> > +	current->flags |= PF_NOFREEZE;
> > +#endif
> > +	set_user_nice(current, -10);
> > +	sigfillset(&blocked);
> > +	sigprocmask(SIG_BLOCK, &blocked, NULL);
> > +	flush_signals(current);
> 
> This is a kernel thread - it shouldn't need to fiddle with signals.
> 
> > +	complete(&k->rendez);
> 
> That's odd.  Why do a complete() before we even start?  A code comment
> is needed if this is indeed correct.

There is a whole class of races that goes away when the code starting
a thread waits for the thread to be running before proceeding, and that's
what this rendezvous is for.  I've added a comment that will appear next
time I send the patchset.

(More comments below.)

> > +	do {
> > +		__set_current_state(TASK_UNINTERRUPTIBLE);
> 
> I think this statement is simply unneeded.
> 
> > +		spin_lock_irq(k->lock);
> > +		more = k->fn();
> > +		if (!more) {
> > +			add_wait_queue(k->waitq, &wait);
> > +			__set_current_state(TASK_INTERRUPTIBLE);
> > +		}
> > +		spin_unlock_irq(k->lock);
> > +		if (!more) {
> > +			schedule();
> > +			remove_wait_queue(k->waitq, &wait);
> > +		} else
> > +			cond_resched();
> 
> Here we can do a cond_resched() when in state TASK_INTERRUPTIBLE.  Such
> a schedule() will never return unless some other thread flips this task
> into state TASK_RUNNING.  But if another thread does that, we should
> have been on that waitqueue!
> 
> It seems all confused and racy.

When we do a cond_resched, it's only when "more" is non-zero, in which
case, we did not set the state to TASK_INTERRUPTIBLE.  I do like
your suggestions, though.

Please check out the (post-patchset) changes below that I plan to
incorporate into the patchset for resubmission, and let me know if
you see a race now that your suggestions have been incorporated.

It seems to work just as well in the testing I did with the changes
below incorporated.

diff --git a/drivers/block/aoe/aoecmd.c b/drivers/block/aoe/aoecmd.c
index d91b8d0..97d05fa 100644
--- a/drivers/block/aoe/aoecmd.c
+++ b/drivers/block/aoe/aoecmd.c
@@ -1075,20 +1075,13 @@ kthread(void *vp)
 {
 	struct ktstate *k;
 	DECLARE_WAITQUEUE(wait, current);
-	sigset_t blocked;
 	int more;
 
 	k = vp;
-#ifdef PF_NOFREEZE
 	current->flags |= PF_NOFREEZE;
-#endif
 	set_user_nice(current, -10);
-	sigfillset(&blocked);
-	sigprocmask(SIG_BLOCK, &blocked, NULL);
-	flush_signals(current);
-	complete(&k->rendez);
+	complete(&k->rendez);	/* tell spawner we're running */
 	do {
-		__set_current_state(TASK_UNINTERRUPTIBLE);
 		spin_lock_irq(k->lock);
 		more = k->fn();
 		if (!more) {
@@ -1102,8 +1095,7 @@ kthread(void *vp)
 		} else
 			cond_resched();
 	} while (!kthread_should_stop());
-	__set_current_state(TASK_RUNNING);
-	complete(&k->rendez);
+	complete(&k->rendez);	/* tell spawner we're stopping */
 	return 0;
 }
 
@@ -1122,10 +1114,10 @@ aoe_ktstart(struct ktstate *k)
 	init_completion(&k->rendez);
 	task = kthread_run(kthread, k, k->name);
 	if (task == NULL || IS_ERR(task))
-		return -EFAULT;
+		return -ENOMEM;
 	k->task = task;
-	wait_for_completion(&k->rendez);
-	init_completion(&k->rendez);	/* for exit */
+	wait_for_completion(&k->rendez); /* allow kthread to start */
+	init_completion(&k->rendez);	/* for waiting for exit later */
 	return 0;
 }
 

^ permalink raw reply related	[flat|nested] 20+ messages in thread

* Re: [PATCH 02/14] aoe: kernel thread handles I/O completions for simple locking
  2012-08-24 21:22   ` Andrew Morton
@ 2012-08-25  0:35     ` Ed Cashin
  2012-08-26  1:31     ` ecashin
  1 sibling, 0 replies; 20+ messages in thread
From: Ed Cashin @ 2012-08-25  0:35 UTC (permalink / raw)
  To: linux-kernel; +Cc: Andrew Morton, ecashin

Andrew Morton <akpm@linux-foundation.org> writes:

> On Fri, 17 Aug 2012 21:24:08 -0400
> Ed Cashin <ecashin@coraid.com> wrote:
...
>> +	sigfillset(&blocked);
>> +	sigprocmask(SIG_BLOCK, &blocked, NULL);
>> +	flush_signals(current);
>
> This is a kernel thread - it shouldn't need to fiddle with signals.
...

Thanks for the feedback.  I'll try out your suggestions and return with
changes and explanations.

-- 
  Ed


^ permalink raw reply	[flat|nested] 20+ messages in thread

* Re: [PATCH 02/14] aoe: kernel thread handles I/O completions for simple locking
  2012-08-18  1:24 ` [PATCH 02/14] aoe: kernel thread handles I/O completions for simple locking Ed Cashin
@ 2012-08-24 21:22   ` Andrew Morton
  2012-08-25  0:35     ` Ed Cashin
  2012-08-26  1:31     ` ecashin
  0 siblings, 2 replies; 20+ messages in thread
From: Andrew Morton @ 2012-08-24 21:22 UTC (permalink / raw)
  To: Ed Cashin; +Cc: linux-kernel

On Fri, 17 Aug 2012 21:24:08 -0400
Ed Cashin <ecashin@coraid.com> wrote:

> This patch makes the frames the aoe driver uses to track the
> relationship between bios and packets more flexible and detached, so
> that they can be passed to an "aoe_ktio" thread for completion of I/O.
> 
> The frames are handled much like skbs, with a capped amount of
> preallocation so that real-world use cases are likely to run smoothly
> and degenerate gracefully even under memory pressure.
> 
> Decoupling I/O completion from the receive path and serializing it in
> a process makes it easier to think about the correctness of the
> locking in the driver, especially in the case of a remote MAC address
> becoming unusable.
> 
> ...
>
> +static int
> +kthread(void *vp)
> +{
> +	struct ktstate *k;
> +	DECLARE_WAITQUEUE(wait, current);
> +	sigset_t blocked;
> +	int more;
> +
> +	k = vp;
> +#ifdef PF_NOFREEZE

PF_NOFREEZE can never be undefined.

> +	current->flags |= PF_NOFREEZE;
> +#endif
> +	set_user_nice(current, -10);
> +	sigfillset(&blocked);
> +	sigprocmask(SIG_BLOCK, &blocked, NULL);
> +	flush_signals(current);

This is a kernel thread - it shouldn't need to fiddle with signals.

> +	complete(&k->rendez);

That's odd.  Why do a complete() before we even start?  A code comment
is needed if this is indeed correct.

> +	do {
> +		__set_current_state(TASK_UNINTERRUPTIBLE);

I think this statement is simply unneeded.

> +		spin_lock_irq(k->lock);
> +		more = k->fn();
> +		if (!more) {
> +			add_wait_queue(k->waitq, &wait);
> +			__set_current_state(TASK_INTERRUPTIBLE);
> +		}
> +		spin_unlock_irq(k->lock);
> +		if (!more) {
> +			schedule();
> +			remove_wait_queue(k->waitq, &wait);
> +		} else
> +			cond_resched();

Here we can do a cond_resched() when in state TASK_INTERRUPTIBLE.  Such
a schedule() will never return unless some other thread flips this task
into state TASK_RUNNING.  But if another thread does that, we should
have been on that waitqueue!

It seems all confused and racy.

> +	} while (!kthread_should_stop());
> +	__set_current_state(TASK_RUNNING);

I don't think there's any path by which we can get here in any state
other than TASK_RUNNING.

> +	complete(&k->rendez);
> +	return 0;
> +}

This function might be a bit neater if it were to use
prepare_to_wait()/finish_wait().

> +static void
> +aoe_ktstop(struct ktstate *k)
> +{
> +	kthread_stop(k->task);
> +	wait_for_completion(&k->rendez);
> +}
> +
> +static int
> +aoe_ktstart(struct ktstate *k)
> +{
> +	struct task_struct *task;
> +
> +	init_completion(&k->rendez);
> +	task = kthread_run(kthread, k, k->name);
> +	if (task == NULL || IS_ERR(task))
> +		return -EFAULT;

EFAULT makes no sense?

> +	k->task = task;
> +	wait_for_completion(&k->rendez);
> +	init_completion(&k->rendez);	/* for exit */
> +	return 0;
> +}
>
> ...
>

^ permalink raw reply	[flat|nested] 20+ messages in thread

* [PATCH 02/14] aoe: kernel thread handles I/O completions for simple locking
  2012-08-23 17:43 [PATCH 00/14] aoe driver v49 performance and usability improvements Ed Cashin
@ 2012-08-18  1:24 ` Ed Cashin
  2012-08-24 21:22   ` Andrew Morton
  0 siblings, 1 reply; 20+ messages in thread
From: Ed Cashin @ 2012-08-18  1:24 UTC (permalink / raw)
  To: Andrew Morton; +Cc: linux-kernel, ecashin

This patch makes the frames the aoe driver uses to track the
relationship between bios and packets more flexible and detached, so
that they can be passed to an "aoe_ktio" thread for completion of I/O.

The frames are handled much like skbs, with a capped amount of
preallocation so that real-world use cases are likely to run smoothly
and degenerate gracefully even under memory pressure.

Decoupling I/O completion from the receive path and serializing it in
a process makes it easier to think about the correctness of the
locking in the driver, especially in the case of a remote MAC address
becoming unusable.

Signed-off-by: Ed Cashin <ecashin@coraid.com>
---
 drivers/block/aoe/aoe.h     |   33 ++-
 drivers/block/aoe/aoechr.c  |    3 +-
 drivers/block/aoe/aoecmd.c  |  737 ++++++++++++++++++++++++++++---------------
 drivers/block/aoe/aoedev.c  |   84 +++--
 drivers/block/aoe/aoemain.c |    8 +-
 drivers/block/aoe/aoenet.c  |    6 +-
 6 files changed, 567 insertions(+), 304 deletions(-)

diff --git a/drivers/block/aoe/aoe.h b/drivers/block/aoe/aoe.h
index 8ca8c8a..0cd6c0f 100644
--- a/drivers/block/aoe/aoe.h
+++ b/drivers/block/aoe/aoe.h
@@ -91,6 +91,7 @@ enum {
 	NTARGETS = 8,
 	NAOEIFS = 8,
 	NSKBPOOLMAX = 128,
+	NFACTIVE = 17,
 
 	TIMERTICK = HZ / 10,
 	MINTIMER = HZ >> 2,
@@ -112,13 +113,16 @@ struct buf {
 };
 
 struct frame {
-	int tag;
+	struct list_head head;
+	u32 tag;
 	ulong waited;
 	struct buf *buf;
+	struct aoetgt *t;		/* parent target I belong to */
 	char *bufaddr;
 	ulong bcnt;
 	sector_t lba;
-	struct sk_buff *skb;
+	struct sk_buff *skb;		/* command skb freed on module exit */
+	struct sk_buff *r_skb;		/* response skb for async processing */
 	struct bio_vec *bv;
 	ulong bv_off;
 };
@@ -133,16 +137,18 @@ struct aoeif {
 struct aoetgt {
 	unsigned char addr[6];
 	ushort nframes;
-	struct frame *frames;
+	struct aoedev *d;			/* parent device I belong to */
+	struct list_head factive[NFACTIVE];	/* hash of active frames */
+	struct list_head ffree;			/* list of free frames */
 	struct aoeif ifs[NAOEIFS];
 	struct aoeif *ifp;	/* current aoeif in use */
 	ushort nout;
 	ushort maxout;
 	u16 lasttag;		/* last tag sent */
 	u16 useme;
+	ulong falloc;
 	ulong lastwadj;		/* last window adjustment */
 	int wpkts, rpkts;
-	int dataref;
 };
 
 struct aoedev {
@@ -169,9 +175,20 @@ struct aoedev {
 	struct buf *inprocess;	/* the one we're currently working on */
 	struct aoetgt *targets[NTARGETS];
 	struct aoetgt **tgt;	/* target in use when working */
-	struct aoetgt **htgt;	/* target needing rexmit assistance */
+	struct aoetgt *htgt;	/* target needing rexmit assistance */
+	ulong ntargets;
+	ulong kicked;
 };
 
+/* kthread tracking */
+struct ktstate {
+	struct completion rendez;
+	struct task_struct *task;
+	wait_queue_head_t *waitq;
+	int (*fn) (void);
+	char *name;
+	spinlock_t *lock;
+};
 
 int aoeblk_init(void);
 void aoeblk_exit(void);
@@ -184,11 +201,14 @@ void aoechr_error(char *);
 
 void aoecmd_work(struct aoedev *d);
 void aoecmd_cfg(ushort aoemajor, unsigned char aoeminor);
-void aoecmd_ata_rsp(struct sk_buff *);
+struct sk_buff *aoecmd_ata_rsp(struct sk_buff *);
 void aoecmd_cfg_rsp(struct sk_buff *);
 void aoecmd_sleepwork(struct work_struct *);
 void aoecmd_cleanslate(struct aoedev *);
+void aoecmd_exit(void);
+int aoecmd_init(void);
 struct sk_buff *aoecmd_ata_id(struct aoedev *);
+void aoe_freetframe(struct frame *);
 
 int aoedev_init(void);
 void aoedev_exit(void);
@@ -196,6 +216,7 @@ struct aoedev *aoedev_by_aoeaddr(int maj, int min);
 struct aoedev *aoedev_by_sysminor_m(ulong sysminor);
 void aoedev_downdev(struct aoedev *d);
 int aoedev_flush(const char __user *str, size_t size);
+void aoe_failbuf(struct aoedev *d, struct buf *buf);
 
 int aoenet_init(void);
 void aoenet_exit(void);
diff --git a/drivers/block/aoe/aoechr.c b/drivers/block/aoe/aoechr.c
index e86d206..f145388 100644
--- a/drivers/block/aoe/aoechr.c
+++ b/drivers/block/aoe/aoechr.c
@@ -86,10 +86,9 @@ revalidate(const char __user *str, size_t size)
 	if (copy_from_user(buf, str, size))
 		return -EFAULT;
 
-	/* should be e%d.%d format */
 	n = sscanf(buf, "e%d.%d", &major, &minor);
 	if (n != 2) {
-		printk(KERN_ERR "aoe: invalid device specification\n");
+		pr_err("aoe: invalid device specification %s\n", buf);
 		return -EINVAL;
 	}
 	d = aoedev_by_aoeaddr(major, minor);
diff --git a/drivers/block/aoe/aoecmd.c b/drivers/block/aoe/aoecmd.c
index f10ab49..d0fc53f 100644
--- a/drivers/block/aoe/aoecmd.c
+++ b/drivers/block/aoe/aoecmd.c
@@ -12,10 +12,17 @@
 #include <linux/netdevice.h>
 #include <linux/genhd.h>
 #include <linux/moduleparam.h>
+#include <linux/workqueue.h>
+#include <linux/kthread.h>
 #include <net/net_namespace.h>
 #include <asm/unaligned.h>
+#include <linux/uio.h>
 #include "aoe.h"
 
+#define MAXIOC (8192)	/* default meant to avoid most soft lockups */
+
+static void ktcomplete(struct frame *, struct sk_buff *);
+
 static int aoe_deadsecs = 60 * 3;
 module_param(aoe_deadsecs, int, 0644);
 MODULE_PARM_DESC(aoe_deadsecs, "After aoe_deadsecs seconds, give up and fail dev.");
@@ -25,6 +32,15 @@ module_param(aoe_maxout, int, 0644);
 MODULE_PARM_DESC(aoe_maxout,
 	"Only aoe_maxout outstanding packets for every MAC on eX.Y.");
 
+static wait_queue_head_t ktiowq;
+static struct ktstate kts;
+
+/* io completion queue */
+static struct {
+	struct list_head head;
+	spinlock_t lock;
+} iocq;
+
 static struct sk_buff *
 new_skb(ulong len)
 {
@@ -40,15 +56,21 @@ new_skb(ulong len)
 }
 
 static struct frame *
-getframe(struct aoetgt *t, int tag)
+getframe(struct aoetgt *t, u32 tag)
 {
-	struct frame *f, *e;
+	struct frame *f;
+	struct list_head *head, *pos, *nx;
+	u32 n;
 
-	f = t->frames;
-	e = f + t->nframes;
-	for (; f<e; f++)
-		if (f->tag == tag)
+	n = tag % NFACTIVE;
+	head = &t->factive[n];
+	list_for_each_safe(pos, nx, head) {
+		f = list_entry(pos, struct frame, head);
+		if (f->tag == tag) {
+			list_del(pos);
 			return f;
+		}
+	}
 	return NULL;
 }
 
@@ -66,7 +88,7 @@ newtag(struct aoetgt *t)
 	return n |= (++t->lasttag & 0x7fff) << 16;
 }
 
-static int
+static u32
 aoehdr_atainit(struct aoedev *d, struct aoetgt *t, struct aoe_hdr *h)
 {
 	u32 host_tag = newtag(t);
@@ -128,75 +150,96 @@ skb_pool_get(struct aoedev *d)
 	return NULL;
 }
 
-/* freeframe is where we do our load balancing so it's a little hairy. */
+void
+aoe_freetframe(struct frame *f)
+{
+	struct aoetgt *t;
+
+	t = f->t;
+	f->buf = NULL;
+	f->bv = NULL;
+	f->r_skb = NULL;
+	list_add(&f->head, &t->ffree);
+}
+
 static struct frame *
-freeframe(struct aoedev *d)
+newtframe(struct aoedev *d, struct aoetgt *t)
 {
-	struct frame *f, *e, *rf;
-	struct aoetgt **t;
+	struct frame *f;
 	struct sk_buff *skb;
+	struct list_head *pos;
+
+	if (list_empty(&t->ffree)) {
+		if (t->falloc >= NSKBPOOLMAX*2)
+			return NULL;
+		f = kcalloc(1, sizeof(*f), GFP_ATOMIC);
+		if (f == NULL)
+			return NULL;
+		t->falloc++;
+		f->t = t;
+	} else {
+		pos = t->ffree.next;
+		list_del(pos);
+		f = list_entry(pos, struct frame, head);
+	}
+
+	skb = f->skb;
+	if (skb == NULL) {
+		f->skb = skb = new_skb(ETH_ZLEN);
+		if (!skb) {
+bail:			aoe_freetframe(f);
+			return NULL;
+		}
+	}
+
+	if (atomic_read(&skb_shinfo(skb)->dataref) != 1) {
+		skb = skb_pool_get(d);
+		if (skb == NULL)
+			goto bail;
+		skb_pool_put(d, f->skb);
+		f->skb = skb;
+	}
+
+	skb->truesize -= skb->data_len;
+	skb_shinfo(skb)->nr_frags = skb->data_len = 0;
+	skb_trim(skb, 0);
+	return f;
+}
+
+static struct frame *
+newframe(struct aoedev *d)
+{
+	struct frame *f;
+	struct aoetgt *t, **tt;
+	int totout = 0;
 
 	if (d->targets[0] == NULL) {	/* shouldn't happen, but I'm paranoid */
 		printk(KERN_ERR "aoe: NULL TARGETS!\n");
 		return NULL;
 	}
-	t = d->tgt;
-	t++;
-	if (t >= &d->targets[NTARGETS] || !*t)
-		t = d->targets;
+	tt = d->tgt;	/* last used target */
 	for (;;) {
-		if ((*t)->nout < (*t)->maxout
+		tt++;
+		if (tt >= &d->targets[NTARGETS] || !*tt)
+			tt = d->targets;
+		t = *tt;
+		totout += t->nout;
+		if (t->nout < t->maxout
 		&& t != d->htgt
-		&& (*t)->ifp->nd) {
-			rf = NULL;
-			f = (*t)->frames;
-			e = f + (*t)->nframes;
-			for (; f < e; f++) {
-				if (f->tag != FREETAG)
-					continue;
-				skb = f->skb;
-				if (!skb
-				&& !(f->skb = skb = new_skb(ETH_ZLEN)))
-					continue;
-				if (atomic_read(&skb_shinfo(skb)->dataref)
-					!= 1) {
-					if (!rf)
-						rf = f;
-					continue;
-				}
-gotone:				skb->truesize -= skb->data_len;
-				skb_shinfo(skb)->nr_frags = skb->data_len = 0;
-				skb_trim(skb, 0);
-				d->tgt = t;
-				ifrotate(*t);
+		&& t->ifp->nd) {
+			f = newtframe(d, t);
+			if (f) {
+				d->tgt = tt;
+				ifrotate(t);
 				return f;
 			}
-			/* Work can be done, but the network layer is
-			   holding our precious packets.  Try to grab
-			   one from the pool. */
-			f = rf;
-			if (f == NULL) {	/* more paranoia */
-				printk(KERN_ERR
-					"aoe: freeframe: %s.\n",
-					"unexpected null rf");
-				d->flags |= DEVFL_KICKME;
-				return NULL;
-			}
-			skb = skb_pool_get(d);
-			if (skb) {
-				skb_pool_put(d, f->skb);
-				f->skb = skb;
-				goto gotone;
-			}
-			(*t)->dataref++;
-			if ((*t)->nout == 0)
-				d->flags |= DEVFL_KICKME;
 		}
-		if (t == d->tgt)	/* we've looped and found nada */
+		if (tt == d->tgt)	/* we've looped and found nada */
 			break;
-		t++;
-		if (t >= &d->targets[NTARGETS] || !*t)
-			t = d->targets;
+	}
+	if (totout == 0) {
+		d->kicked++;
+		d->flags |= DEVFL_KICKME;
 	}
 	return NULL;
 }
@@ -219,6 +262,16 @@ loop:
 	goto loop;
 }
 
+static void
+fhash(struct frame *f)
+{
+	struct aoetgt *t = f->t;
+	u32 n;
+
+	n = f->tag % NFACTIVE;
+	list_add_tail(&f->head, &t->factive[n]);
+}
+
 static int
 aoecmd_ata_rw(struct aoedev *d)
 {
@@ -235,7 +288,7 @@ aoecmd_ata_rw(struct aoedev *d)
 	writebit = 0x10;
 	extbit = 0x4;
 
-	f = freeframe(d);
+	f = newframe(d);
 	if (f == NULL)
 		return 0;
 	t = *d->tgt;
@@ -273,6 +326,7 @@ aoecmd_ata_rw(struct aoedev *d)
 	skb_put(skb, sizeof *h + sizeof *ah);
 	memset(h, 0, skb->len);
 	f->tag = aoehdr_atainit(d, t, h);
+	fhash(f);
 	t->nout++;
 	f->waited = 0;
 	f->buf = buf;
@@ -357,14 +411,16 @@ cont:
 }
 
 static void
-resend(struct aoedev *d, struct aoetgt *t, struct frame *f)
+resend(struct aoedev *d, struct frame *f)
 {
 	struct sk_buff *skb;
 	struct aoe_hdr *h;
 	struct aoe_atahdr *ah;
+	struct aoetgt *t;
 	char buf[128];
 	u32 n;
 
+	t = f->t;
 	ifrotate(t);
 	n = newtag(t);
 	skb = f->skb;
@@ -378,28 +434,11 @@ resend(struct aoedev *d, struct aoetgt *t, struct frame *f)
 	aoechr_error(buf);
 
 	f->tag = n;
+	fhash(f);
 	h->tag = cpu_to_be32(n);
 	memcpy(h->dst, t->addr, sizeof h->dst);
 	memcpy(h->src, t->ifp->nd->dev_addr, sizeof h->src);
 
-	switch (ah->cmdstat) {
-	default:
-		break;
-	case ATA_CMD_PIO_READ:
-	case ATA_CMD_PIO_READ_EXT:
-	case ATA_CMD_PIO_WRITE:
-	case ATA_CMD_PIO_WRITE_EXT:
-		put_lba(ah, f->lba);
-
-		n = f->bcnt;
-		ah->scnt = n >> 9;
-		if (ah->aflags & AOEAFL_WRITE) {
-			skb_fillup(skb, f->bv, f->bv_off, n);
-			skb->len = sizeof *h + sizeof *ah + n;
-			skb->data_len = n;
-			skb->truesize += n;
-		}
-	}
 	skb->dev = t->ifp->nd;
 	skb = skb_clone(skb, GFP_ATOMIC);
 	if (skb == NULL)
@@ -408,7 +447,7 @@ resend(struct aoedev *d, struct aoetgt *t, struct frame *f)
 }
 
 static int
-tsince(int tag)
+tsince(u32 tag)
 {
 	int n;
 
@@ -462,26 +501,38 @@ ejectif(struct aoetgt *t, struct aoeif *ifp)
 static int
 sthtith(struct aoedev *d)
 {
-	struct frame *f, *e, *nf;
+	struct frame *f, *nf;
+	struct list_head *nx, *pos, *head;
 	struct sk_buff *skb;
-	struct aoetgt *ht = *d->htgt;
-
-	f = ht->frames;
-	e = f + ht->nframes;
-	for (; f < e; f++) {
-		if (f->tag == FREETAG)
-			continue;
-		nf = freeframe(d);
-		if (!nf)
-			return 0;
-		skb = nf->skb;
-		*nf = *f;
-		f->skb = skb;
-		f->tag = FREETAG;
-		nf->waited = 0;
-		ht->nout--;
-		(*d->tgt)->nout++;
-		resend(d, *d->tgt, nf);
+	struct aoetgt *ht = d->htgt;
+	int i;
+
+	for (i = 0; i < NFACTIVE; i++) {
+		head = &ht->factive[i];
+		list_for_each_safe(pos, nx, head) {
+			f = list_entry(pos, struct frame, head);
+			nf = newframe(d);
+			if (!nf)
+				return 0;
+
+			/* remove frame from active list */
+			list_del(pos);
+
+			/* reassign all pertinent bits to new outbound frame */
+			skb = nf->skb;
+			nf->skb = f->skb;
+			nf->buf = f->buf;
+			nf->bcnt = f->bcnt;
+			nf->lba = f->lba;
+			nf->bv = f->bv;
+			nf->bv_off = f->bv_off;
+			nf->waited = 0;
+			f->skb = skb;
+			aoe_freetframe(f);
+			ht->nout--;
+			nf->t->nout++;
+			resend(d, nf);
+		}
 	}
 	/* he's clean, he's useless.  take away his interfaces */
 	memset(ht->ifs, 0, sizeof ht->ifs);
@@ -506,9 +557,12 @@ rexmit_timer(ulong vp)
 	struct aoedev *d;
 	struct aoetgt *t, **tt, **te;
 	struct aoeif *ifp;
-	struct frame *f, *e;
+	struct frame *f;
+	struct list_head *head, *pos, *nx;
+	LIST_HEAD(flist);
 	register long timeout;
 	ulong flags, n;
+	int i;
 
 	d = (struct aoedev *) vp;
 
@@ -522,41 +576,21 @@ rexmit_timer(ulong vp)
 		spin_unlock_irqrestore(&d->lock, flags);
 		return;
 	}
+
+	/* collect all frames to rexmit into flist */
 	tt = d->targets;
 	te = tt + NTARGETS;
 	for (; tt < te && *tt; tt++) {
 		t = *tt;
-		f = t->frames;
-		e = f + t->nframes;
-		for (; f < e; f++) {
-			if (f->tag == FREETAG
-			|| tsince(f->tag) < timeout)
-				continue;
-			n = f->waited += timeout;
-			n /= HZ;
-			if (n > aoe_deadsecs) {
-				/* waited too long.  device failure. */
-				aoedev_downdev(d);
-				break;
-			}
-
-			if (n > HELPWAIT /* see if another target can help */
-			&& (tt != d->targets || d->targets[1]))
-				d->htgt = tt;
-
-			if (t->nout == t->maxout) {
-				if (t->maxout > 1)
-					t->maxout--;
-				t->lastwadj = jiffies;
-			}
-
-			ifp = getif(t, f->skb->dev);
-			if (ifp && ++ifp->lost > (t->nframes << 1)
-			&& (ifp != t->ifs || t->ifs[1].nd)) {
-				ejectif(t, ifp);
-				ifp = NULL;
+		for (i = 0; i < NFACTIVE; i++) {
+			head = &t->factive[i];
+			list_for_each_safe(pos, nx, head) {
+				f = list_entry(pos, struct frame, head);
+				if (tsince(f->tag) < timeout)
+					continue;
+				/* move to flist for later processing */
+				list_move_tail(pos, &flist);
 			}
-			resend(d, t, f);
 		}
 
 		/* window check */
@@ -568,6 +602,44 @@ rexmit_timer(ulong vp)
 		}
 	}
 
+	/* process expired frames */
+	while (!list_empty(&flist)) {
+		pos = flist.next;
+		f = list_entry(pos, struct frame, head);
+		n = f->waited += timeout;
+		n /= HZ;
+		if (n > aoe_deadsecs) {
+			/* Waited too long.  Device failure.
+			 * Hang all frames on first hash bucket for downdev
+			 * to clean up.
+			 */
+			list_splice(&flist, &f->t->factive[0]);
+			aoedev_downdev(d);
+			break;
+		}
+		list_del(pos);
+
+		t = f->t;
+		if (n > HELPWAIT) {
+			/* see if another target can help */
+			if (d->ntargets > 1)
+				d->htgt = t;
+		}
+		if (t->nout == t->maxout) {
+			if (t->maxout > 1)
+				t->maxout--;
+			t->lastwadj = jiffies;
+		}
+
+		ifp = getif(t, f->skb->dev);
+		if (ifp && ++ifp->lost > (t->nframes << 1)
+		&& (ifp != t->ifs || t->ifs[1].nd)) {
+			ejectif(t, ifp);
+			ifp = NULL;
+		}
+		resend(d, f);
+	}
+
 	if (!skb_queue_empty(&d->sendq)) {
 		n = d->rttavg <<= 1;
 		if (n > MAXTIMER)
@@ -749,7 +821,7 @@ diskstats(struct gendisk *disk, struct bio *bio, ulong duration, sector_t sector
 }
 
 static void
-bvcpy(struct bio_vec *bv, ulong off, struct sk_buff *skb, ulong cnt)
+bvcpy(struct bio_vec *bv, ulong off, struct sk_buff *skb, long cnt)
 {
 	ulong fcnt;
 	char *p;
@@ -770,60 +842,233 @@ loop:
 }
 
 static void
-fadvance(struct frame *f, ulong cnt)
+ktiocomplete(struct frame *f)
 {
-	ulong fcnt;
+	struct aoe_hdr *hin, *hout;
+	struct aoe_atahdr *ahin, *ahout;
+	struct buf *buf;
+	struct sk_buff *skb;
+	struct aoetgt *t;
+	struct aoeif *ifp;
+	struct aoedev *d;
+	long n;
 
-	f->lba += cnt >> 9;
-loop:
-	fcnt = f->bv->bv_len - (f->bv_off - f->bv->bv_offset);
-	if (fcnt > cnt) {
-		f->bv_off += cnt;
+	if (f == NULL)
 		return;
+
+	t = f->t;
+	d = t->d;
+
+	hout = (struct aoe_hdr *) skb_mac_header(f->skb);
+	ahout = (struct aoe_atahdr *) (hout+1);
+	buf = f->buf;
+	skb = f->r_skb;
+	if (skb == NULL)
+		goto noskb;	/* just fail the buf. */
+
+	hin = (struct aoe_hdr *) skb->data;
+	skb_pull(skb, sizeof(*hin));
+	ahin = (struct aoe_atahdr *) skb->data;
+	skb_pull(skb, sizeof(*ahin));
+	if (ahin->cmdstat & 0xa9) {	/* these bits cleared on success */
+		pr_err("aoe: ata error cmd=%2.2Xh stat=%2.2Xh from e%ld.%d\n",
+			ahout->cmdstat, ahin->cmdstat,
+			d->aoemajor, d->aoeminor);
+noskb:	if (buf)
+			buf->flags |= BUFFL_FAIL;
+		goto badrsp;
 	}
-	cnt -= fcnt;
-	f->bv++;
-	f->bv_off = f->bv->bv_offset;
-	goto loop;
+
+	n = ahout->scnt << 9;
+	switch (ahout->cmdstat) {
+	case ATA_CMD_PIO_READ:
+	case ATA_CMD_PIO_READ_EXT:
+		if (skb->len < n) {
+			pr_err("aoe: runt data size in read.  skb->len=%d need=%ld\n",
+				skb->len, n);
+			buf->flags |= BUFFL_FAIL;
+			break;
+		}
+		bvcpy(f->bv, f->bv_off, skb, n);
+	case ATA_CMD_PIO_WRITE:
+	case ATA_CMD_PIO_WRITE_EXT:
+		spin_lock_irq(&d->lock);
+		ifp = getif(t, skb->dev);
+		if (ifp) {
+			ifp->lost = 0;
+			if (n > DEFAULTBCNT)
+				ifp->lostjumbo = 0;
+		}
+		if (d->htgt == t) /* I'll help myself, thank you. */
+			d->htgt = NULL;
+		spin_unlock_irq(&d->lock);
+		break;
+	case ATA_CMD_ID_ATA:
+		if (skb->len < 512) {
+			pr_info("aoe: runt data size in ataid.  skb->len=%d\n",
+				skb->len);
+			break;
+		}
+		if (skb_linearize(skb))
+			break;
+		spin_lock_irq(&d->lock);
+		ataid_complete(d, t, skb->data);
+		spin_unlock_irq(&d->lock);
+		break;
+	default:
+		pr_info("aoe: unrecognized ata command %2.2Xh for %d.%d\n",
+			ahout->cmdstat,
+			be16_to_cpu(get_unaligned(&hin->major)),
+			hin->minor);
+	}
+badrsp:
+	spin_lock_irq(&d->lock);
+
+	aoe_freetframe(f);
+
+	if (buf && --buf->nframesout == 0 && buf->resid == 0) {
+		struct bio *bio = buf->bio;
+
+		diskstats(d->gd, bio, jiffies - buf->stime, buf->sector);
+		n = (buf->flags & BUFFL_FAIL) ? -EIO : 0;
+		mempool_free(buf, d->bufpool);
+		spin_unlock_irq(&d->lock);
+		if (n != -EIO)
+			bio_flush_dcache_pages(buf->bio);
+		bio_endio(bio, n);
+	} else
+		spin_unlock_irq(&d->lock);
+	dev_kfree_skb(skb);
 }
 
-void
+/* Enters with iocq.lock held.
+ * Returns true iff responses needing processing remain.
+ */
+static int
+ktio(void)
+{
+	struct frame *f;
+	struct list_head *pos;
+	int i;
+
+	for (i = 0; ; ++i) {
+		if (i == MAXIOC)
+			return 1;
+		if (list_empty(&iocq.head))
+			return 0;
+		pos = iocq.head.next;
+		list_del(pos);
+		spin_unlock_irq(&iocq.lock);
+		f = list_entry(pos, struct frame, head);
+		ktiocomplete(f);
+		spin_lock_irq(&iocq.lock);
+	}
+}
+
+static int
+kthread(void *vp)
+{
+	struct ktstate *k;
+	DECLARE_WAITQUEUE(wait, current);
+	sigset_t blocked;
+	int more;
+
+	k = vp;
+#ifdef PF_NOFREEZE
+	current->flags |= PF_NOFREEZE;
+#endif
+	set_user_nice(current, -10);
+	sigfillset(&blocked);
+	sigprocmask(SIG_BLOCK, &blocked, NULL);
+	flush_signals(current);
+	complete(&k->rendez);
+	do {
+		__set_current_state(TASK_UNINTERRUPTIBLE);
+		spin_lock_irq(k->lock);
+		more = k->fn();
+		if (!more) {
+			add_wait_queue(k->waitq, &wait);
+			__set_current_state(TASK_INTERRUPTIBLE);
+		}
+		spin_unlock_irq(k->lock);
+		if (!more) {
+			schedule();
+			remove_wait_queue(k->waitq, &wait);
+		} else
+			cond_resched();
+	} while (!kthread_should_stop());
+	__set_current_state(TASK_RUNNING);
+	complete(&k->rendez);
+	return 0;
+}
+
+static void
+aoe_ktstop(struct ktstate *k)
+{
+	kthread_stop(k->task);
+	wait_for_completion(&k->rendez);
+}
+
+static int
+aoe_ktstart(struct ktstate *k)
+{
+	struct task_struct *task;
+
+	init_completion(&k->rendez);
+	task = kthread_run(kthread, k, k->name);
+	if (task == NULL || IS_ERR(task))
+		return -EFAULT;
+	k->task = task;
+	wait_for_completion(&k->rendez);
+	init_completion(&k->rendez);	/* for exit */
+	return 0;
+}
+
+/* pass it off to kthreads for processing */
+static void
+ktcomplete(struct frame *f, struct sk_buff *skb)
+{
+	ulong flags;
+
+	f->r_skb = skb;
+	spin_lock_irqsave(&iocq.lock, flags);
+	list_add_tail(&f->head, &iocq.head);
+	spin_unlock_irqrestore(&iocq.lock, flags);
+	wake_up(&ktiowq);
+}
+
+struct sk_buff *
 aoecmd_ata_rsp(struct sk_buff *skb)
 {
-	struct sk_buff_head queue;
 	struct aoedev *d;
-	struct aoe_hdr *hin, *hout;
-	struct aoe_atahdr *ahin, *ahout;
+	struct aoe_hdr *h;
 	struct frame *f;
-	struct buf *buf;
 	struct aoetgt *t;
-	struct aoeif *ifp;
-	register long n;
+	u32 n;
 	ulong flags;
 	char ebuf[128];
 	u16 aoemajor;
 
-	hin = (struct aoe_hdr *) skb_mac_header(skb);
-	skb_pull(skb, sizeof(*hin));
-	aoemajor = get_unaligned_be16(&hin->major);
-	d = aoedev_by_aoeaddr(aoemajor, hin->minor);
+	h = (struct aoe_hdr *) skb->data;
+	aoemajor = be16_to_cpu(get_unaligned(&h->major));
+	d = aoedev_by_aoeaddr(aoemajor, h->minor);
 	if (d == NULL) {
 		snprintf(ebuf, sizeof ebuf, "aoecmd_ata_rsp: ata response "
 			"for unknown device %d.%d\n",
-			 aoemajor, hin->minor);
+			aoemajor, h->minor);
 		aoechr_error(ebuf);
-		return;
+		return skb;
 	}
 
 	spin_lock_irqsave(&d->lock, flags);
 
-	n = get_unaligned_be32(&hin->tag);
-	t = gettgt(d, hin->src);
+	n = be32_to_cpu(get_unaligned(&h->tag));
+	t = gettgt(d, h->src);
 	if (t == NULL) {
 		printk(KERN_INFO "aoe: can't find target e%ld.%d:%pm\n",
-			d->aoemajor, d->aoeminor, hin->src);
+		       d->aoemajor, d->aoeminor, h->src);
 		spin_unlock_irqrestore(&d->lock, flags);
-		return;
+		return skb;
 	}
 	f = getframe(t, n);
 	if (f == NULL) {
@@ -832,102 +1077,26 @@ aoecmd_ata_rsp(struct sk_buff *skb)
 		snprintf(ebuf, sizeof ebuf,
 			"%15s e%d.%d    tag=%08x@%08lx\n",
 			"unexpected rsp",
-			get_unaligned_be16(&hin->major),
-			hin->minor,
-			get_unaligned_be32(&hin->tag),
+			get_unaligned_be16(&h->major),
+			h->minor,
+			get_unaligned_be32(&h->tag),
 			jiffies);
 		aoechr_error(ebuf);
-		return;
+		return skb;
 	}
-
 	calc_rttavg(d, tsince(f->tag));
-
-	ahin = (struct aoe_atahdr *) skb->data;
-	skb_pull(skb, sizeof(*ahin));
-	hout = (struct aoe_hdr *) skb_mac_header(f->skb);
-	ahout = (struct aoe_atahdr *) (hout+1);
-	buf = f->buf;
-
-	if (ahin->cmdstat & 0xa9) {	/* these bits cleared on success */
-		printk(KERN_ERR
-			"aoe: ata error cmd=%2.2Xh stat=%2.2Xh from e%ld.%d\n",
-			ahout->cmdstat, ahin->cmdstat,
-			d->aoemajor, d->aoeminor);
-		if (buf)
-			buf->flags |= BUFFL_FAIL;
-	} else {
-		if (d->htgt && t == *d->htgt) /* I'll help myself, thank you. */
-			d->htgt = NULL;
-		n = ahout->scnt << 9;
-		switch (ahout->cmdstat) {
-		case ATA_CMD_PIO_READ:
-		case ATA_CMD_PIO_READ_EXT:
-			if (skb->len < n) {
-				printk(KERN_ERR
-					"aoe: %s.  skb->len=%d need=%ld\n",
-					"runt data size in read", skb->len, n);
-				/* fail frame f?  just returning will rexmit. */
-				spin_unlock_irqrestore(&d->lock, flags);
-				return;
-			}
-			bvcpy(f->bv, f->bv_off, skb, n);
-		case ATA_CMD_PIO_WRITE:
-		case ATA_CMD_PIO_WRITE_EXT:
-			ifp = getif(t, skb->dev);
-			if (ifp) {
-				ifp->lost = 0;
-				if (n > DEFAULTBCNT)
-					ifp->lostjumbo = 0;
-			}
-			if (f->bcnt -= n) {
-				fadvance(f, n);
-				resend(d, t, f);
-				goto xmit;
-			}
-			break;
-		case ATA_CMD_ID_ATA:
-			if (skb->len < 512) {
-				printk(KERN_INFO
-					"aoe: runt data size in ataid.  skb->len=%d\n",
-					skb->len);
-				spin_unlock_irqrestore(&d->lock, flags);
-				return;
-			}
-			if (skb_linearize(skb))
-				break;
-			ataid_complete(d, t, skb->data);
-			break;
-		default:
-			printk(KERN_INFO
-				"aoe: unrecognized ata command %2.2Xh for %d.%d\n",
-				ahout->cmdstat,
-				get_unaligned_be16(&hin->major),
-				hin->minor);
-		}
-	}
-
-	if (buf && --buf->nframesout == 0 && buf->resid == 0) {
-		diskstats(d->gd, buf->bio, jiffies - buf->stime, buf->sector);
-		if (buf->flags & BUFFL_FAIL)
-			bio_endio(buf->bio, -EIO);
-		else {
-			bio_flush_dcache_pages(buf->bio);
-			bio_endio(buf->bio, 0);
-		}
-		mempool_free(buf, d->bufpool);
-	}
-
-	f->buf = NULL;
-	f->tag = FREETAG;
 	t->nout--;
-
 	aoecmd_work(d);
-xmit:
-	__skb_queue_head_init(&queue);
-	skb_queue_splice_init(&d->sendq, &queue);
 
 	spin_unlock_irqrestore(&d->lock, flags);
-	aoenet_xmit(&queue);
+
+	ktcomplete(f, skb);
+
+	/*
+	 * Note here that we do not perform an aoedev_put, as we are
+	 * leaving this reference for the ktio to release.
+	 */
+	return NULL;
 }
 
 void
@@ -949,7 +1118,7 @@ aoecmd_ata_id(struct aoedev *d)
 	struct sk_buff *skb;
 	struct aoetgt *t;
 
-	f = freeframe(d);
+	f = newframe(d);
 	if (f == NULL)
 		return NULL;
 
@@ -962,6 +1131,7 @@ aoecmd_ata_id(struct aoedev *d)
 	skb_put(skb, sizeof *h + sizeof *ah);
 	memset(h, 0, skb->len);
 	f->tag = aoehdr_atainit(d, t, h);
+	fhash(f);
 	t->nout++;
 	f->waited = 0;
 
@@ -982,7 +1152,7 @@ static struct aoetgt *
 addtgt(struct aoedev *d, char *addr, ulong nframes)
 {
 	struct aoetgt *t, **tt, **te;
-	struct frame *f, *e;
+	int i;
 
 	tt = d->targets;
 	te = tt + NTARGETS;
@@ -995,22 +1165,21 @@ addtgt(struct aoedev *d, char *addr, ulong nframes)
 		return NULL;
 	}
 	t = kcalloc(1, sizeof *t, GFP_ATOMIC);
-	f = kcalloc(nframes, sizeof *f, GFP_ATOMIC);
-	if (!t || !f) {
-		kfree(f);
+	if (!t) {
 		kfree(t);
 		printk(KERN_INFO "aoe: cannot allocate memory to add target\n");
 		return NULL;
 	}
 
+	d->ntargets++;
 	t->nframes = nframes;
-	t->frames = f;
-	e = f + nframes;
-	for (; f < e; f++)
-		f->tag = FREETAG;
+	t->d = d;
 	memcpy(t->addr, addr, sizeof t->addr);
 	t->ifp = t->ifs;
 	t->maxout = t->nframes;
+	INIT_LIST_HEAD(&t->ffree);
+	for (i = 0; i < NFACTIVE; ++i)
+		INIT_LIST_HEAD(&t->factive[i]);
 	return *tt = t;
 }
 
@@ -1135,3 +1304,53 @@ aoecmd_cleanslate(struct aoedev *d)
 		}
 	}
 }
+
+static void
+flush_iocq(void)
+{
+	struct frame *f;
+	struct aoedev *d;
+	LIST_HEAD(flist);
+	struct list_head *pos;
+	struct sk_buff *skb;
+	ulong flags;
+
+	spin_lock_irqsave(&iocq.lock, flags);
+	list_splice_init(&iocq.head, &flist);
+	spin_unlock_irqrestore(&iocq.lock, flags);
+	while (!list_empty(&flist)) {
+		pos = flist.next;
+		list_del(pos);
+		f = list_entry(pos, struct frame, head);
+		d = f->t->d;
+		skb = f->r_skb;
+		spin_lock_irqsave(&d->lock, flags);
+		if (f->buf) {
+			f->buf->nframesout--;
+			aoe_failbuf(d, f->buf);
+		}
+		aoe_freetframe(f);
+		spin_unlock_irqrestore(&d->lock, flags);
+		dev_kfree_skb(skb);
+	}
+}
+
+int __init
+aoecmd_init(void)
+{
+	INIT_LIST_HEAD(&iocq.head);
+	spin_lock_init(&iocq.lock);
+	init_waitqueue_head(&ktiowq);
+	kts.name = "aoe_ktio";
+	kts.fn = ktio;
+	kts.waitq = &ktiowq;
+	kts.lock = &iocq.lock;
+	return aoe_ktstart(&kts);
+}
+
+void
+aoecmd_exit(void)
+{
+	aoe_ktstop(&kts);
+	flush_iocq();
+}
diff --git a/drivers/block/aoe/aoedev.c b/drivers/block/aoe/aoedev.c
index b2d1fd3..40bae1a 100644
--- a/drivers/block/aoe/aoedev.c
+++ b/drivers/block/aoe/aoedev.c
@@ -48,47 +48,60 @@ dummy_timer(ulong vp)
 }
 
 void
-aoedev_downdev(struct aoedev *d)
+aoe_failbuf(struct aoedev *d, struct buf *buf)
 {
-	struct aoetgt **t, **te;
-	struct frame *f, *e;
-	struct buf *buf;
 	struct bio *bio;
 
-	t = d->targets;
-	te = t + NTARGETS;
-	for (; t < te && *t; t++) {
-		f = (*t)->frames;
-		e = f + (*t)->nframes;
-		for (; f < e; f->tag = FREETAG, f->buf = NULL, f++) {
-			if (f->tag == FREETAG || f->buf == NULL)
-				continue;
-			buf = f->buf;
-			bio = buf->bio;
-			if (--buf->nframesout == 0
-			&& buf != d->inprocess) {
-				mempool_free(buf, d->bufpool);
-				bio_endio(bio, -EIO);
-			}
-		}
-		(*t)->maxout = (*t)->nframes;
-		(*t)->nout = 0;
-	}
-	buf = d->inprocess;
-	if (buf) {
+	if (buf == NULL)
+		return;
+	buf->flags |= BUFFL_FAIL;
+	if (buf->nframesout == 0) {
+		if (buf == d->inprocess) /* ensure we only process this once */
+			d->inprocess = NULL;
 		bio = buf->bio;
 		mempool_free(buf, d->bufpool);
 		bio_endio(bio, -EIO);
 	}
+}
+
+void
+aoedev_downdev(struct aoedev *d)
+{
+	struct aoetgt *t, **tt, **te;
+	struct frame *f;
+	struct list_head *head, *pos, *nx;
+	int i;
+
+	/* clean out active buffers on all targets */
+	tt = d->targets;
+	te = tt + NTARGETS;
+	for (; tt < te && (t = *tt); tt++) {
+		for (i = 0; i < NFACTIVE; i++) {
+			head = &t->factive[i];
+			list_for_each_safe(pos, nx, head) {
+				list_del(pos);
+				f = list_entry(pos, struct frame, head);
+				if (f->buf) {
+					f->buf->nframesout--;
+					aoe_failbuf(d, f->buf);
+				}
+				aoe_freetframe(f);
+			}
+		}
+		t->maxout = t->nframes;
+		t->nout = 0;
+	}
+
+	/* clean out the in-process buffer (if any) */
+	aoe_failbuf(d, d->inprocess);
 	d->inprocess = NULL;
 	d->htgt = NULL;
 
+	/* clean out all pending I/O */
 	while (!list_empty(&d->bufq)) {
-		buf = container_of(d->bufq.next, struct buf, bufs);
+		struct buf *buf = container_of(d->bufq.next, struct buf, bufs);
 		list_del(d->bufq.next);
-		bio = buf->bio;
-		mempool_free(buf, d->bufpool);
-		bio_endio(bio, -EIO);
+		aoe_failbuf(d, buf);
 	}
 
 	if (d->gd)
@@ -242,13 +255,16 @@ aoedev_by_sysminor_m(ulong sysminor)
 static void
 freetgt(struct aoedev *d, struct aoetgt *t)
 {
-	struct frame *f, *e;
+	struct frame *f;
+	struct list_head *pos, *nx, *head;
 
-	f = t->frames;
-	e = f + t->nframes;
-	for (; f < e; f++)
+	head = &t->ffree;
+	list_for_each_safe(pos, nx, head) {
+		list_del(pos);
+		f = list_entry(pos, struct frame, head);
 		skbfree(f->skb);
-	kfree(t->frames);
+		kfree(f);
+	}
 	kfree(t);
 }
 
diff --git a/drivers/block/aoe/aoemain.c b/drivers/block/aoe/aoemain.c
index 7f83ad9..6fc4b05 100644
--- a/drivers/block/aoe/aoemain.c
+++ b/drivers/block/aoe/aoemain.c
@@ -61,6 +61,7 @@ aoe_exit(void)
 
 	aoenet_exit();
 	unregister_blkdev(AOE_MAJOR, DEVICE_NAME);
+	aoecmd_exit();
 	aoechr_exit();
 	aoedev_exit();
 	aoeblk_exit();		/* free cache after de-allocating bufs */
@@ -83,17 +84,20 @@ aoe_init(void)
 	ret = aoenet_init();
 	if (ret)
 		goto net_fail;
+	ret = aoecmd_init();
+	if (ret)
+		goto cmd_fail;
 	ret = register_blkdev(AOE_MAJOR, DEVICE_NAME);
 	if (ret < 0) {
 		printk(KERN_ERR "aoe: can't register major\n");
 		goto blkreg_fail;
 	}
-
 	printk(KERN_INFO "aoe: AoE v%s initialised.\n", VERSION);
 	discover_timer(TINIT);
 	return 0;
-
  blkreg_fail:
+	aoecmd_exit();
+ cmd_fail:
 	aoenet_exit();
  net_fail:
 	aoeblk_exit();
diff --git a/drivers/block/aoe/aoenet.c b/drivers/block/aoe/aoenet.c
index 0787807..000eff2 100644
--- a/drivers/block/aoe/aoenet.c
+++ b/drivers/block/aoe/aoenet.c
@@ -142,7 +142,8 @@ aoenet_rcv(struct sk_buff *skb, struct net_device *ifp, struct packet_type *pt,
 
 	switch (h->cmd) {
 	case AOECMD_ATA:
-		aoecmd_ata_rsp(skb);
+		/* ata_rsp may keep skb for later processing or give it back */
+		skb = aoecmd_ata_rsp(skb);
 		break;
 	case AOECMD_CFG:
 		aoecmd_cfg_rsp(skb);
@@ -152,6 +153,9 @@ aoenet_rcv(struct sk_buff *skb, struct net_device *ifp, struct packet_type *pt,
 			break;	/* don't complain about vendor commands */
 		printk(KERN_INFO "aoe: unknown cmd %d\n", h->cmd);
 	}
+
+	if (!skb)
+		return 0;
 exit:
 	dev_kfree_skb(skb);
 	return 0;
-- 
1.7.2.5


^ permalink raw reply related	[flat|nested] 20+ messages in thread

end of thread, other threads:[~2012-08-31 20:06 UTC | newest]

Thread overview: 20+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2012-08-28 12:53 [PATCH 00/14] aoe driver v49 performance and usability improvements Ed Cashin
2012-08-25 14:39 ` [PATCH 01/14] aoe: for performance support larger packet payloads Ed Cashin
2012-08-25 14:39 ` [PATCH 03/14] aoe: become I/O request queue handler for increased user control Ed Cashin
2012-08-25 14:39 ` [PATCH 02/14] aoe: kernel thread handles I/O completions for simple locking Ed Cashin
2012-08-31 20:06   ` Andrew Morton
2012-08-25 14:39 ` [PATCH 04/14] aoe: use a kernel thread for transmissions Ed Cashin
2012-08-25 14:39 ` [PATCH 05/14] aoe: use packets that work with the smallest-MTU local interface Ed Cashin
2012-08-25 14:39 ` [PATCH 07/14] aoe: do revalidation steps in order Ed Cashin
2012-08-25 14:39 ` [PATCH 06/14] aoe: failover remote interface based on aoe_deadsecs parameter Ed Cashin
2012-08-25 14:39 ` [PATCH 08/14] aoe: disallow unsupported AoE minor addresses Ed Cashin
2012-08-25 14:39 ` [PATCH 09/14] aoe: associate frames with the AoE storage target Ed Cashin
2012-08-25 14:39 ` [PATCH 10/14] aoe: increase net_device reference count while using it Ed Cashin
2012-08-25 14:39 ` [PATCH 11/14] aoe: remove unused code and add cosmetic improvements Ed Cashin
2012-08-25 14:39 ` [PATCH 13/14] aoe: update copyright year in touched files Ed Cashin
2012-08-25 14:39 ` [PATCH 12/14] aoe: update internal version number to 49 Ed Cashin
2012-08-25 14:39 ` [PATCH 14/14] aoe: update documentation with new URL and VM settings reference Ed Cashin
  -- strict thread matches above, loose matches on Subject: below --
2012-08-23 17:43 [PATCH 00/14] aoe driver v49 performance and usability improvements Ed Cashin
2012-08-18  1:24 ` [PATCH 02/14] aoe: kernel thread handles I/O completions for simple locking Ed Cashin
2012-08-24 21:22   ` Andrew Morton
2012-08-25  0:35     ` Ed Cashin
2012-08-26  1:31     ` ecashin

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).