All of lore.kernel.org
 help / color / mirror / Atom feed
* Re: [RFC] [PATCH] Avoid enqueuing skb for default qdiscs
       [not found] <20090728155055.2266.41649.sendpatchset@localhost.localdomain>
@ 2009-08-02  8:51 ` Krishna Kumar2
  2009-08-03  7:15   ` Jarek Poplawski
  2009-08-04  3:29   ` David Miller
  0 siblings, 2 replies; 16+ messages in thread
From: Krishna Kumar2 @ 2009-08-02  8:51 UTC (permalink / raw)
  To: davem, herbert, jarkao2, kaber, netdev

Krishna Kumar2/India/IBM@IBMIN wrote on 07/28/2009 09:20:55 PM:

> Subject [RFC] [PATCH] Avoid enqueuing skb for default qdiscs
>
> From: Krishna Kumar <krkumar2@in.ibm.com>
>
> dev_queue_xmit enqueue's a skb and calls qdisc_run which
> dequeue's the skb and xmits it. In most cases (after
> instrumenting the code), the skb that is enqueue'd is the
> same one that is dequeue'd (unless the queue gets stopped
> or multiple cpu's write to the same queue and ends in a
> race with qdisc_run). For default qdiscs, we can remove
> this path and simply xmit the skb since this is a work
> conserving queue.

Any comments on this patch?

Thanks,

- KK

> The patch uses a new flag - TCQ_F_CAN_BYPASS to identify
> the default fast queue. I plan to use this flag for the
> previous patch also (rename if required).  The controversial
> part of the patch is incrementing qlen when a skb is
> requeued, this is to avoid checks like the second line below:
>
> +    } else if ((q->flags & TCQ_F_CAN_BYPASS) && !qdisc_qlen(q) &&
> >> THIS LINE:   !q->gso_skb &&
> +               !test_and_set_bit(__QDISC_STATE_RUNNING, &q->state)) {
>
> Results of a 4 hour testing for multiple netperf sessions
> (1, 2, 4, 8, 12 sessions on a 4 cpu system-X and 1, 2, 4,
> 8, 16, 32 sessions on a 16 cpu P6). Aggregate Mb/s across
> the iterations:
>
> -----------------------------------------------------------------
>      |         System-X          |               P6
> -----------------------------------------------------------------
> Size |  ORG BW          NEW BW   |      ORG BW          NEW BW
> -----|---------------------------|-------------------------------
> 16K  |  154264          156234   |      155350          157569
> 64K  |  154364          154825   |      155790          158845
> 128K |  154644          154803   |      153418          155572
> 256K |  153882          152007   |      154784          154596
> -----------------------------------------------------------------
>
> Netperf reported Service demand reduced by 15% on the P6 but
> no noticeable difference on the system-X box.
>
> Please review.
>
> Thanks,
>
> - KK


^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [RFC] [PATCH] Avoid enqueuing skb for default qdiscs
  2009-08-02  8:51 ` [RFC] [PATCH] Avoid enqueuing skb for default qdiscs Krishna Kumar2
@ 2009-08-03  7:15   ` Jarek Poplawski
  2009-08-03  8:09     ` Krishna Kumar2
  2009-08-04  3:29   ` David Miller
  1 sibling, 1 reply; 16+ messages in thread
From: Jarek Poplawski @ 2009-08-03  7:15 UTC (permalink / raw)
  To: Krishna Kumar2; +Cc: davem, herbert, kaber, netdev

On Sun, Aug 02, 2009 at 02:21:30PM +0530, Krishna Kumar2 wrote:
> Krishna Kumar2/India/IBM@IBMIN wrote on 07/28/2009 09:20:55 PM:
> 
> > Subject [RFC] [PATCH] Avoid enqueuing skb for default qdiscs
> >
> > From: Krishna Kumar <krkumar2@in.ibm.com>
> >
> > dev_queue_xmit enqueue's a skb and calls qdisc_run which
> > dequeue's the skb and xmits it. In most cases (after
> > instrumenting the code), the skb that is enqueue'd is the
> > same one that is dequeue'd (unless the queue gets stopped
> > or multiple cpu's write to the same queue and ends in a
> > race with qdisc_run). For default qdiscs, we can remove
> > this path and simply xmit the skb since this is a work
> > conserving queue.
> 
> Any comments on this patch?

Maybe I missed something, but I didn't get this patch, and can't see
it e.g. in the patchwork.

Jarek P.

> Thanks,
> 
> - KK
> 
> > The patch uses a new flag - TCQ_F_CAN_BYPASS to identify
> > the default fast queue. I plan to use this flag for the
> > previous patch also (rename if required).  The controversial
> > part of the patch is incrementing qlen when a skb is
> > requeued, this is to avoid checks like the second line below:
> >
> > +    } else if ((q->flags & TCQ_F_CAN_BYPASS) && !qdisc_qlen(q) &&
> > >> THIS LINE:   !q->gso_skb &&
> > +               !test_and_set_bit(__QDISC_STATE_RUNNING, &q->state)) {
> >
> > Results of a 4 hour testing for multiple netperf sessions
> > (1, 2, 4, 8, 12 sessions on a 4 cpu system-X and 1, 2, 4,
> > 8, 16, 32 sessions on a 16 cpu P6). Aggregate Mb/s across
> > the iterations:
> >
> > -----------------------------------------------------------------
> >      |         System-X          |               P6
> > -----------------------------------------------------------------
> > Size |  ORG BW          NEW BW   |      ORG BW          NEW BW
> > -----|---------------------------|-------------------------------
> > 16K  |  154264          156234   |      155350          157569
> > 64K  |  154364          154825   |      155790          158845
> > 128K |  154644          154803   |      153418          155572
> > 256K |  153882          152007   |      154784          154596
> > -----------------------------------------------------------------
> >
> > Netperf reported Service demand reduced by 15% on the P6 but
> > no noticeable difference on the system-X box.
> >
> > Please review.
> >
> > Thanks,
> >
> > - KK
> 

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [RFC] [PATCH] Avoid enqueuing skb for default qdiscs
  2009-08-03  7:15   ` Jarek Poplawski
@ 2009-08-03  8:09     ` Krishna Kumar2
  0 siblings, 0 replies; 16+ messages in thread
From: Krishna Kumar2 @ 2009-08-03  8:09 UTC (permalink / raw)
  To: Jarek Poplawski; +Cc: davem, herbert, kaber, netdev

Jarek Poplawski <jarkao2@gmail.com> wrote on 08/03/2009 12:45:39 PM:
> > Any comments on this patch?
>
> Maybe I missed something, but I didn't get this patch, and can't see
> it e.g. in the patchwork.
>
> Jarek P.

That is strange, I too do not find it :) But after I sent the patch, I
got a copy of the mail which says:
                                                                           
             Krishna                                                       
             Kumar2/India/IBM@                                             
             IBMIN                                                      To 
                                       davem@davemloft.net                 
             07/28/2009 09:20                                           cc 
             PM                        Krishna Kumar2/India/IBM@IBMIN,     
                                       herbert@gondor.apana.org.au,        
                                       kaber@trash.net, jarkao2@gmail.com, 
                                       netdev@vger.kernel.org              
                                                                   Subject 
                                       [RFC] [PATCH] Avoid enqueuing skb   
                                       for default qdiscs                  
                                                                           
                                                                           
                                                                           
                                                                           
                                                                           
                                                                           


I will try to send it out once again. Thanks for letting me know.

thanks,

- KK


^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [RFC] [PATCH] Avoid enqueuing skb for default qdiscs
  2009-08-02  8:51 ` [RFC] [PATCH] Avoid enqueuing skb for default qdiscs Krishna Kumar2
  2009-08-03  7:15   ` Jarek Poplawski
@ 2009-08-04  3:29   ` David Miller
  2009-08-04  3:49     ` Herbert Xu
  1 sibling, 1 reply; 16+ messages in thread
From: David Miller @ 2009-08-04  3:29 UTC (permalink / raw)
  To: krkumar2; +Cc: herbert, jarkao2, kaber, netdev

From: Krishna Kumar2 <krkumar2@in.ibm.com>
Date: Sun, 2 Aug 2009 14:21:30 +0530

> Krishna Kumar2/India/IBM@IBMIN wrote on 07/28/2009 09:20:55 PM:
> 
>> Subject [RFC] [PATCH] Avoid enqueuing skb for default qdiscs
>>
>> From: Krishna Kumar <krkumar2@in.ibm.com>
>>
>> dev_queue_xmit enqueue's a skb and calls qdisc_run which
>> dequeue's the skb and xmits it. In most cases (after
>> instrumenting the code), the skb that is enqueue'd is the
>> same one that is dequeue'd (unless the queue gets stopped
>> or multiple cpu's write to the same queue and ends in a
>> race with qdisc_run). For default qdiscs, we can remove
>> this path and simply xmit the skb since this is a work
>> conserving queue.
> 
> Any comments on this patch?

Although PFIFO is not work-conserving, isn't it important to retain
ordering?  What if higher priority packets are in the queue when we
enqueue?  This new bypass will send the wrong packet, won't it?

I'm beginning to think, if we want to make the default case go as fast
as possible, we should just bypass everything altogether.  The entire
qdisc layer, all of it.

Special casing something that essentially is unused, is in a way
a waste of time.  If this bypass could be applied to some of the
complicated qdiscs, then it'd be worthwhile, but just for the
default which effectively makes it do nothing, I don't see that
value in it.

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [RFC] [PATCH] Avoid enqueuing skb for default qdiscs
  2009-08-04  3:29   ` David Miller
@ 2009-08-04  3:49     ` Herbert Xu
  2009-08-04  3:55       ` David Miller
  2009-08-04  6:45       ` Krishna Kumar2
  0 siblings, 2 replies; 16+ messages in thread
From: Herbert Xu @ 2009-08-04  3:49 UTC (permalink / raw)
  To: David Miller; +Cc: krkumar2, jarkao2, kaber, netdev

On Mon, Aug 03, 2009 at 08:29:35PM -0700, David Miller wrote:
>
> Although PFIFO is not work-conserving, isn't it important to retain
> ordering?  What if higher priority packets are in the queue when we
> enqueue?  This new bypass will send the wrong packet, won't it?

The bypass only kicks in if the queue length is zero.

> I'm beginning to think, if we want to make the default case go as fast
> as possible, we should just bypass everything altogether.  The entire
> qdisc layer, all of it.

Can you be more specific? AFAICS he's already bypassing the qdisc
layer when it can be done safely.

> Special casing something that essentially is unused, is in a way
> a waste of time.  If this bypass could be applied to some of the
> complicated qdiscs, then it'd be worthwhile, but just for the
> default which effectively makes it do nothing, I don't see that
> value in it.

I agree with this sentiment.  Essentially what this bypass does
is to eliminate the enqueue + qdisc_restart + dequeue in the
cases where it is safe.  So its value is entirely dependent on
the cost of the code that is eliminated, which may not be that
large for the default qdisc.

Krishna, those netperf tests that you performed, were they done
with a single TX queue or multiple TX queues? If the latter did
you tune the system so that each netperf was bound to a single
core which had its own dedicated TX queue?

Cheers,
-- 
Visit Openswan at http://www.openswan.org/
Email: Herbert Xu ~{PmV>HI~} <herbert@gondor.apana.org.au>
Home Page: http://gondor.apana.org.au/~herbert/
PGP Key: http://gondor.apana.org.au/~herbert/pubkey.txt

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [RFC] [PATCH] Avoid enqueuing skb for default qdiscs
  2009-08-04  3:49     ` Herbert Xu
@ 2009-08-04  3:55       ` David Miller
  2009-08-04  6:45       ` Krishna Kumar2
  1 sibling, 0 replies; 16+ messages in thread
From: David Miller @ 2009-08-04  3:55 UTC (permalink / raw)
  To: herbert; +Cc: krkumar2, jarkao2, kaber, netdev

From: Herbert Xu <herbert@gondor.apana.org.au>
Date: Tue, 4 Aug 2009 11:49:10 +0800

> On Mon, Aug 03, 2009 at 08:29:35PM -0700, David Miller wrote:
>>
>> Although PFIFO is not work-conserving, isn't it important to retain
>> ordering?  What if higher priority packets are in the queue when we
>> enqueue?  This new bypass will send the wrong packet, won't it?
> 
> The bypass only kicks in if the queue length is zero.

Thanks for explaining, makes sense now.

>> I'm beginning to think, if we want to make the default case go as fast
>> as possible, we should just bypass everything altogether.  The entire
>> qdisc layer, all of it.
> 
> Can you be more specific? AFAICS he's already bypassing the qdisc
> layer when it can be done safely.

Ignore this, it's based upon my missing the part where this
bypass only triggers when the qdisc is empty.

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [RFC] [PATCH] Avoid enqueuing skb for default qdiscs
  2009-08-04  3:49     ` Herbert Xu
  2009-08-04  3:55       ` David Miller
@ 2009-08-04  6:45       ` Krishna Kumar2
  1 sibling, 0 replies; 16+ messages in thread
From: Krishna Kumar2 @ 2009-08-04  6:45 UTC (permalink / raw)
  To: Herbert Xu; +Cc: David Miller, jarkao2, kaber, netdev

Hi Herbert,

Herbert Xu <herbert@gondor.apana.org.au> wrote on 08/04/2009 09:19:10 AM:
> On Mon, Aug 03, 2009 at 08:29:35PM -0700, David Miller wrote:
> >
> > Although PFIFO is not work-conserving, isn't it important to retain
> > ordering?  What if higher priority packets are in the queue when we
> > enqueue?  This new bypass will send the wrong packet, won't it?
>
> The bypass only kicks in if the queue length is zero.
>
> > I'm beginning to think, if we want to make the default case go as fast
> > as possible, we should just bypass everything altogether.  The entire
> > qdisc layer, all of it.
>
> Can you be more specific? AFAICS he's already bypassing the qdisc
> layer when it can be done safely.
>
> > Special casing something that essentially is unused, is in a way
> > a waste of time.  If this bypass could be applied to some of the
> > complicated qdiscs, then it'd be worthwhile, but just for the
> > default which effectively makes it do nothing, I don't see that
> > value in it.
>
> I agree with this sentiment.  Essentially what this bypass does
> is to eliminate the enqueue + qdisc_restart + dequeue in the
> cases where it is safe.  So its value is entirely dependent on
> the cost of the code that is eliminated, which may not be that
> large for the default qdisc.
>
> Krishna, those netperf tests that you performed, were they done
> with a single TX queue or multiple TX queues? If the latter did
> you tune the system so that each netperf was bound to a single
> core which had its own dedicated TX queue?

I ran on a Chelsio multiple TX queue card but I ran the default
netperf without any tuning (irqbalance, irq binding, cpu binding
of netperf/netserver, sysctl, etc). Do you want me to try with a
specific tuning also?

thanks,

- KK


^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [RFC] [PATCH] Avoid enqueuing skb for default qdiscs
  2009-08-04 16:21 ` Krishna Kumar2
@ 2009-08-04 20:23   ` Jarek Poplawski
  0 siblings, 0 replies; 16+ messages in thread
From: Jarek Poplawski @ 2009-08-04 20:23 UTC (permalink / raw)
  To: Krishna Kumar2; +Cc: davem, herbert, kaber, netdev

On Tue, Aug 04, 2009 at 09:51:10PM +0530, Krishna Kumar2 wrote:
> I will send out a final version after any more comments with a fix for a
> typo:
> 
> -static inline int __qdisc_update_bstats(struct Qdisc *sch, unsigned int
> len)
> +static inline void __qdisc_update_bstats(struct Qdisc *sch, unsigned int
> len)
> 
> Thanks,
> 
> - KK

Yes, except this typo this patch looks OK to me.

Thanks,
Jarek P.

> 
> Krishna Kumar2/India/IBM@IBMIN wrote on 08/04/2009 09:21:35 PM:
> > Hi Jarek,
> >
> > (Sorry that you are getting this mail twice since the mail didn't go out
> to
> > others, but this time I have fixed my mailer)
> >
> > > Actually I meant:
> > > -      qdisc->q.qlen--;
> > > +      qdisc->q.qlen = 0;
> > > because you can do it after another qdisc->q.qlen = 0.
> >
> > You are right - I saw some of the sched resets setting it to zero (while
> others
> > dont touch it). I will reset to zero.
> >
> > > > > I wonder if we shouldn't update bstats yet. Btw, try checkpatch.
> > > >
> > > > Rate estimator isn't used for default qdisc, so is anything
> > > > required?
> > >
> > > I meant "Sent" stats from "tc -s qdisc sh" could be misleading.
> >
> > You are right - for a single netperf run, the stats magically show 0
> > packets/bytes sent!
> >
> > > > Also, checkpatch suggested to change the original code:
> > > >         "if (unlikely((skb = dequeue_skb(q)) == NULL))"
> > > > since it appears as "new code", hope that change is fine.
> > >
> > > I mainly meant a whitespace warning.
> >
> > Yes, I had fixed that also.
> >
> > A compile and single-netperf-run tested patch is inlined below - the only
> > two changes from last patch is setting qlen=0 and updating bstats. Now
> > the stats show:
> >
> > qdisc pfifo_fast 0: root bands 3 priomap  1 2 2 2 1 2 0 0 1 1 1 1 1 1 1 1
> >  Sent 10170288228 bytes 157522 pkt (dropped 0, overlimits 0 requeues 0)
> >  rate 0bit 0pps backlog 0b 0p requeues 0
> >
> > On the original kernel, I got:
> > qdisc pfifo_fast 0: root bands 3 priomap  1 2 2 2 1 2 0 0 1 1 1 1 1 1 1 1
> >  Sent 10157266798 bytes 177539 pkt (dropped 0, overlimits 0 requeues 0)
> >  rate 0bit 0pps backlog 0b 0p requeues 0
> >
> > Thanks,
> >
> > - KK
> >
> > Signed-off-by: Krishna Kumar <krkumar2@in.ibm.com>
> > ---
> >  include/net/pkt_sched.h   |    3 +
> >  include/net/sch_generic.h |   15 +++++
> >  net/core/dev.c            |   51 ++++++++++++++-----
> >  net/sched/sch_generic.c   |   93 ++++++++++++++++++++++--------------
> >  4 files changed, 111 insertions(+), 51 deletions(-)
> >
> > diff -ruNp org2/include/net/pkt_sched.h new5/include/net/pkt_sched.h
> > --- org2/include/net/pkt_sched.h   2009-08-04 11:41:21.000000000 +0530
> > +++ new5/include/net/pkt_sched.h   2009-08-04 18:47:16.000000000 +0530
> > @@ -87,6 +87,9 @@ extern struct qdisc_rate_table *qdisc_ge
> >  extern void qdisc_put_rtab(struct qdisc_rate_table *tab);
> >  extern void qdisc_put_stab(struct qdisc_size_table *tab);
> >  extern void qdisc_warn_nonwc(char *txt, struct Qdisc *qdisc);
> > +extern int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,
> > +            struct net_device *dev, struct netdev_queue *txq,
> > +            spinlock_t *root_lock);
> >
> >  extern void __qdisc_run(struct Qdisc *q);
> >
> > diff -ruNp org2/include/net/sch_generic.h new5/include/net/sch_generic.h
> > --- org2/include/net/sch_generic.h   2009-03-24 08:54:16.000000000 +0530
> > +++ new5/include/net/sch_generic.h   2009-08-04 19:58:26.000000000 +0530
> > @@ -45,6 +45,7 @@ struct Qdisc
> >  #define TCQ_F_BUILTIN      1
> >  #define TCQ_F_THROTTLED      2
> >  #define TCQ_F_INGRESS      4
> > +#define TCQ_F_CAN_BYPASS   8
> >  #define TCQ_F_WARN_NONWC   (1 << 16)
> >     int         padded;
> >     struct Qdisc_ops   *ops;
> > @@ -182,6 +183,11 @@ struct qdisc_skb_cb {
> >     char         data[];
> >  };
> >
> > +static inline int qdisc_qlen(struct Qdisc *q)
> > +{
> > +   return q->q.qlen;
> > +}
> > +
> >  static inline struct qdisc_skb_cb *qdisc_skb_cb(struct sk_buff *skb)
> >  {
> >     return (struct qdisc_skb_cb *)skb->cb;
> > @@ -387,13 +393,18 @@ static inline int qdisc_enqueue_root(str
> >     return qdisc_enqueue(skb, sch) & NET_XMIT_MASK;
> >  }
> >
> > +static inline int __qdisc_update_bstats(struct Qdisc *sch, unsigned int
> len)
> > +{
> > +   sch->bstats.bytes += len;
> > +   sch->bstats.packets++;
> > +}
> > +
> >  static inline int __qdisc_enqueue_tail(struct sk_buff *skb, struct Qdisc
> *sch,
> >                     struct sk_buff_head *list)
> >  {
> >     __skb_queue_tail(list, skb);
> >     sch->qstats.backlog += qdisc_pkt_len(skb);
> > -   sch->bstats.bytes += qdisc_pkt_len(skb);
> > -   sch->bstats.packets++;
> > +   __qdisc_update_bstats(sch, qdisc_pkt_len(skb));
> >
> >     return NET_XMIT_SUCCESS;
> >  }
> > diff -ruNp org2/net/core/dev.c new5/net/core/dev.c
> > --- org2/net/core/dev.c   2009-07-27 09:08:24.000000000 +0530
> > +++ new5/net/core/dev.c   2009-08-04 20:26:57.000000000 +0530
> > @@ -1786,6 +1786,43 @@ static struct netdev_queue *dev_pick_tx(
> >     return netdev_get_tx_queue(dev, queue_index);
> >  }
> >
> > +static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q,
> > +             struct net_device *dev,
> > +             struct netdev_queue *txq)
> > +{
> > +   spinlock_t *root_lock = qdisc_lock(q);
> > +   int rc;
> > +
> > +   spin_lock(root_lock);
> > +   if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED, &q->state))) {
> > +      kfree_skb(skb);
> > +      rc = NET_XMIT_DROP;
> > +   } else if ((q->flags & TCQ_F_CAN_BYPASS) && !qdisc_qlen(q) &&
> > +         !test_and_set_bit(__QDISC_STATE_RUNNING, &q->state)) {
> > +      unsigned int len = skb->len;
> > +
> > +      /*
> > +       * This is a work-conserving queue; there are no old skbs
> > +       * waiting to be sent out; and the qdisc is not running -
> > +       * xmit the skb directly.
> > +       */
> > +      if (sch_direct_xmit(skb, q, dev, txq, root_lock))
> > +         __qdisc_run(q);
> > +      else
> > +         clear_bit(__QDISC_STATE_RUNNING, &q->state);
> > +
> > +      __qdisc_update_bstats(q, len);
> > +
> > +      rc = NET_XMIT_SUCCESS;
> > +   } else {
> > +      rc = qdisc_enqueue_root(skb, q);
> > +      qdisc_run(q);
> > +   }
> > +   spin_unlock(root_lock);
> > +
> > +   return rc;
> > +}
> > +
> >  /**
> >   *   dev_queue_xmit - transmit a buffer
> >   *   @skb: buffer to transmit
> > @@ -1859,19 +1896,7 @@ gso:
> >     skb->tc_verd = SET_TC_AT(skb->tc_verd,AT_EGRESS);
> >  #endif
> >     if (q->enqueue) {
> > -      spinlock_t *root_lock = qdisc_lock(q);
> > -
> > -      spin_lock(root_lock);
> > -
> > -      if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED, &q->state))) {
> > -         kfree_skb(skb);
> > -         rc = NET_XMIT_DROP;
> > -      } else {
> > -         rc = qdisc_enqueue_root(skb, q);
> > -         qdisc_run(q);
> > -      }
> > -      spin_unlock(root_lock);
> > -
> > +      rc = __dev_xmit_skb(skb, q, dev, txq);
> >        goto out;
> >     }
> >
> > diff -ruNp org2/net/sched/sch_generic.c new5/net/sched/sch_generic.c
> > --- org2/net/sched/sch_generic.c   2009-05-25 07:48:07.000000000 +0530
> > +++ new5/net/sched/sch_generic.c   2009-08-04 19:38:27.000000000 +0530
> > @@ -37,15 +37,11 @@
> >   * - updates to tree and tree walking are only done under the rtnl
> mutex.
> >   */
> >
> > -static inline int qdisc_qlen(struct Qdisc *q)
> > -{
> > -   return q->q.qlen;
> > -}
> > -
> >  static inline int dev_requeue_skb(struct sk_buff *skb, struct Qdisc *q)
> >  {
> >     q->gso_skb = skb;
> >     q->qstats.requeues++;
> > +   q->q.qlen++;   /* it's still part of the queue */
> >     __netif_schedule(q);
> >
> >     return 0;
> > @@ -61,9 +57,11 @@ static inline struct sk_buff *dequeue_sk
> >
> >        /* check the reason of requeuing without tx lock first */
> >        txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb));
> > -      if (!netif_tx_queue_stopped(txq) && !netif_tx_queue_frozen(txq))
> > +      if (!netif_tx_queue_stopped(txq) &&
> > +          !netif_tx_queue_frozen(txq)) {
> >           q->gso_skb = NULL;
> > -      else
> > +         q->q.qlen--;
> > +      } else
> >           skb = NULL;
> >     } else {
> >        skb = q->dequeue(q);
> > @@ -103,44 +101,23 @@ static inline int handle_dev_cpu_collisi
> >  }
> >
> >  /*
> > - * NOTE: Called under qdisc_lock(q) with locally disabled BH.
> > - *
> > - * __QDISC_STATE_RUNNING guarantees only one CPU can process
> > - * this qdisc at a time. qdisc_lock(q) serializes queue accesses for
> > - * this queue.
> > - *
> > - *  netif_tx_lock serializes accesses to device driver.
> > - *
> > - *  qdisc_lock(q) and netif_tx_lock are mutually exclusive,
> > - *  if one is grabbed, another must be free.
> > - *
> > - * Note, that this procedure can be called by a watchdog timer
> > + * Transmit one skb, and handle the return status as required. Holding
> the
> > + * __QDISC_STATE_RUNNING bit guarantees that only one CPU can execute
> this
> > + * function.
> >   *
> >   * Returns to the caller:
> >   *            0  - queue is empty or throttled.
> >   *            >0 - queue is not empty.
> > - *
> >   */
> > -static inline int qdisc_restart(struct Qdisc *q)
> > +int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,
> > +          struct net_device *dev, struct netdev_queue *txq,
> > +          spinlock_t *root_lock)
> >  {
> > -   struct netdev_queue *txq;
> >     int ret = NETDEV_TX_BUSY;
> > -   struct net_device *dev;
> > -   spinlock_t *root_lock;
> > -   struct sk_buff *skb;
> > -
> > -   /* Dequeue packet */
> > -   if (unlikely((skb = dequeue_skb(q)) == NULL))
> > -      return 0;
> > -
> > -   root_lock = qdisc_lock(q);
> >
> >     /* And release qdisc */
> >     spin_unlock(root_lock);
> >
> > -   dev = qdisc_dev(q);
> > -   txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb));
> > -
> >     HARD_TX_LOCK(dev, txq, smp_processor_id());
> >     if (!netif_tx_queue_stopped(txq) &&
> >         !netif_tx_queue_frozen(txq))
> > @@ -177,6 +154,44 @@ static inline int qdisc_restart(struct Q
> >     return ret;
> >  }
> >
> > +/*
> > + * NOTE: Called under qdisc_lock(q) with locally disabled BH.
> > + *
> > + * __QDISC_STATE_RUNNING guarantees only one CPU can process
> > + * this qdisc at a time. qdisc_lock(q) serializes queue accesses for
> > + * this queue.
> > + *
> > + *  netif_tx_lock serializes accesses to device driver.
> > + *
> > + *  qdisc_lock(q) and netif_tx_lock are mutually exclusive,
> > + *  if one is grabbed, another must be free.
> > + *
> > + * Note, that this procedure can be called by a watchdog timer
> > + *
> > + * Returns to the caller:
> > + *            0  - queue is empty or throttled.
> > + *            >0 - queue is not empty.
> > + *
> > + */
> > +static inline int qdisc_restart(struct Qdisc *q)
> > +{
> > +   struct netdev_queue *txq;
> > +   struct net_device *dev;
> > +   spinlock_t *root_lock;
> > +   struct sk_buff *skb;
> > +
> > +   /* Dequeue packet */
> > +   skb = dequeue_skb(q);
> > +   if (unlikely(!skb))
> > +      return 0;
> > +
> > +   root_lock = qdisc_lock(q);
> > +   dev = qdisc_dev(q);
> > +   txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb));
> > +
> > +   return sch_direct_xmit(skb, q, dev, txq, root_lock);
> > +}
> > +
> >  void __qdisc_run(struct Qdisc *q)
> >  {
> >     unsigned long start_time = jiffies;
> > @@ -547,8 +562,11 @@ void qdisc_reset(struct Qdisc *qdisc)
> >     if (ops->reset)
> >        ops->reset(qdisc);
> >
> > -   kfree_skb(qdisc->gso_skb);
> > -   qdisc->gso_skb = NULL;
> > +   if (qdisc->gso_skb) {
> > +      kfree_skb(qdisc->gso_skb);
> > +      qdisc->gso_skb = NULL;
> > +      qdisc->q.qlen = 0;
> > +   }
> >  }
> >  EXPORT_SYMBOL(qdisc_reset);
> >
> > @@ -605,6 +623,9 @@ static void attach_one_default_qdisc(str
> >           printk(KERN_INFO "%s: activation failed\n", dev->name);
> >           return;
> >        }
> > +
> > +      /* Can by-pass the queue discipline for default qdisc */
> > +      qdisc->flags |= TCQ_F_CAN_BYPASS;
> >     } else {
> >        qdisc =  &noqueue_qdisc;
> >     }
> 

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [RFC] [PATCH] Avoid enqueuing skb for default qdiscs
  2009-08-04 15:51 Krishna Kumar
@ 2009-08-04 16:21 ` Krishna Kumar2
  2009-08-04 20:23   ` Jarek Poplawski
  0 siblings, 1 reply; 16+ messages in thread
From: Krishna Kumar2 @ 2009-08-04 16:21 UTC (permalink / raw)
  To: davem, herbert, Jarek Poplawski, kaber, netdev

I will send out a final version after any more comments with a fix for a
typo:

-static inline int __qdisc_update_bstats(struct Qdisc *sch, unsigned int
len)
+static inline void __qdisc_update_bstats(struct Qdisc *sch, unsigned int
len)

Thanks,

- KK

Krishna Kumar2/India/IBM@IBMIN wrote on 08/04/2009 09:21:35 PM:
> Hi Jarek,
>
> (Sorry that you are getting this mail twice since the mail didn't go out
to
> others, but this time I have fixed my mailer)
>
> > Actually I meant:
> > -      qdisc->q.qlen--;
> > +      qdisc->q.qlen = 0;
> > because you can do it after another qdisc->q.qlen = 0.
>
> You are right - I saw some of the sched resets setting it to zero (while
others
> dont touch it). I will reset to zero.
>
> > > > I wonder if we shouldn't update bstats yet. Btw, try checkpatch.
> > >
> > > Rate estimator isn't used for default qdisc, so is anything
> > > required?
> >
> > I meant "Sent" stats from "tc -s qdisc sh" could be misleading.
>
> You are right - for a single netperf run, the stats magically show 0
> packets/bytes sent!
>
> > > Also, checkpatch suggested to change the original code:
> > >         "if (unlikely((skb = dequeue_skb(q)) == NULL))"
> > > since it appears as "new code", hope that change is fine.
> >
> > I mainly meant a whitespace warning.
>
> Yes, I had fixed that also.
>
> A compile and single-netperf-run tested patch is inlined below - the only
> two changes from last patch is setting qlen=0 and updating bstats. Now
> the stats show:
>
> qdisc pfifo_fast 0: root bands 3 priomap  1 2 2 2 1 2 0 0 1 1 1 1 1 1 1 1
>  Sent 10170288228 bytes 157522 pkt (dropped 0, overlimits 0 requeues 0)
>  rate 0bit 0pps backlog 0b 0p requeues 0
>
> On the original kernel, I got:
> qdisc pfifo_fast 0: root bands 3 priomap  1 2 2 2 1 2 0 0 1 1 1 1 1 1 1 1
>  Sent 10157266798 bytes 177539 pkt (dropped 0, overlimits 0 requeues 0)
>  rate 0bit 0pps backlog 0b 0p requeues 0
>
> Thanks,
>
> - KK
>
> Signed-off-by: Krishna Kumar <krkumar2@in.ibm.com>
> ---
>  include/net/pkt_sched.h   |    3 +
>  include/net/sch_generic.h |   15 +++++
>  net/core/dev.c            |   51 ++++++++++++++-----
>  net/sched/sch_generic.c   |   93 ++++++++++++++++++++++--------------
>  4 files changed, 111 insertions(+), 51 deletions(-)
>
> diff -ruNp org2/include/net/pkt_sched.h new5/include/net/pkt_sched.h
> --- org2/include/net/pkt_sched.h   2009-08-04 11:41:21.000000000 +0530
> +++ new5/include/net/pkt_sched.h   2009-08-04 18:47:16.000000000 +0530
> @@ -87,6 +87,9 @@ extern struct qdisc_rate_table *qdisc_ge
>  extern void qdisc_put_rtab(struct qdisc_rate_table *tab);
>  extern void qdisc_put_stab(struct qdisc_size_table *tab);
>  extern void qdisc_warn_nonwc(char *txt, struct Qdisc *qdisc);
> +extern int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,
> +            struct net_device *dev, struct netdev_queue *txq,
> +            spinlock_t *root_lock);
>
>  extern void __qdisc_run(struct Qdisc *q);
>
> diff -ruNp org2/include/net/sch_generic.h new5/include/net/sch_generic.h
> --- org2/include/net/sch_generic.h   2009-03-24 08:54:16.000000000 +0530
> +++ new5/include/net/sch_generic.h   2009-08-04 19:58:26.000000000 +0530
> @@ -45,6 +45,7 @@ struct Qdisc
>  #define TCQ_F_BUILTIN      1
>  #define TCQ_F_THROTTLED      2
>  #define TCQ_F_INGRESS      4
> +#define TCQ_F_CAN_BYPASS   8
>  #define TCQ_F_WARN_NONWC   (1 << 16)
>     int         padded;
>     struct Qdisc_ops   *ops;
> @@ -182,6 +183,11 @@ struct qdisc_skb_cb {
>     char         data[];
>  };
>
> +static inline int qdisc_qlen(struct Qdisc *q)
> +{
> +   return q->q.qlen;
> +}
> +
>  static inline struct qdisc_skb_cb *qdisc_skb_cb(struct sk_buff *skb)
>  {
>     return (struct qdisc_skb_cb *)skb->cb;
> @@ -387,13 +393,18 @@ static inline int qdisc_enqueue_root(str
>     return qdisc_enqueue(skb, sch) & NET_XMIT_MASK;
>  }
>
> +static inline int __qdisc_update_bstats(struct Qdisc *sch, unsigned int
len)
> +{
> +   sch->bstats.bytes += len;
> +   sch->bstats.packets++;
> +}
> +
>  static inline int __qdisc_enqueue_tail(struct sk_buff *skb, struct Qdisc
*sch,
>                     struct sk_buff_head *list)
>  {
>     __skb_queue_tail(list, skb);
>     sch->qstats.backlog += qdisc_pkt_len(skb);
> -   sch->bstats.bytes += qdisc_pkt_len(skb);
> -   sch->bstats.packets++;
> +   __qdisc_update_bstats(sch, qdisc_pkt_len(skb));
>
>     return NET_XMIT_SUCCESS;
>  }
> diff -ruNp org2/net/core/dev.c new5/net/core/dev.c
> --- org2/net/core/dev.c   2009-07-27 09:08:24.000000000 +0530
> +++ new5/net/core/dev.c   2009-08-04 20:26:57.000000000 +0530
> @@ -1786,6 +1786,43 @@ static struct netdev_queue *dev_pick_tx(
>     return netdev_get_tx_queue(dev, queue_index);
>  }
>
> +static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q,
> +             struct net_device *dev,
> +             struct netdev_queue *txq)
> +{
> +   spinlock_t *root_lock = qdisc_lock(q);
> +   int rc;
> +
> +   spin_lock(root_lock);
> +   if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED, &q->state))) {
> +      kfree_skb(skb);
> +      rc = NET_XMIT_DROP;
> +   } else if ((q->flags & TCQ_F_CAN_BYPASS) && !qdisc_qlen(q) &&
> +         !test_and_set_bit(__QDISC_STATE_RUNNING, &q->state)) {
> +      unsigned int len = skb->len;
> +
> +      /*
> +       * This is a work-conserving queue; there are no old skbs
> +       * waiting to be sent out; and the qdisc is not running -
> +       * xmit the skb directly.
> +       */
> +      if (sch_direct_xmit(skb, q, dev, txq, root_lock))
> +         __qdisc_run(q);
> +      else
> +         clear_bit(__QDISC_STATE_RUNNING, &q->state);
> +
> +      __qdisc_update_bstats(q, len);
> +
> +      rc = NET_XMIT_SUCCESS;
> +   } else {
> +      rc = qdisc_enqueue_root(skb, q);
> +      qdisc_run(q);
> +   }
> +   spin_unlock(root_lock);
> +
> +   return rc;
> +}
> +
>  /**
>   *   dev_queue_xmit - transmit a buffer
>   *   @skb: buffer to transmit
> @@ -1859,19 +1896,7 @@ gso:
>     skb->tc_verd = SET_TC_AT(skb->tc_verd,AT_EGRESS);
>  #endif
>     if (q->enqueue) {
> -      spinlock_t *root_lock = qdisc_lock(q);
> -
> -      spin_lock(root_lock);
> -
> -      if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED, &q->state))) {
> -         kfree_skb(skb);
> -         rc = NET_XMIT_DROP;
> -      } else {
> -         rc = qdisc_enqueue_root(skb, q);
> -         qdisc_run(q);
> -      }
> -      spin_unlock(root_lock);
> -
> +      rc = __dev_xmit_skb(skb, q, dev, txq);
>        goto out;
>     }
>
> diff -ruNp org2/net/sched/sch_generic.c new5/net/sched/sch_generic.c
> --- org2/net/sched/sch_generic.c   2009-05-25 07:48:07.000000000 +0530
> +++ new5/net/sched/sch_generic.c   2009-08-04 19:38:27.000000000 +0530
> @@ -37,15 +37,11 @@
>   * - updates to tree and tree walking are only done under the rtnl
mutex.
>   */
>
> -static inline int qdisc_qlen(struct Qdisc *q)
> -{
> -   return q->q.qlen;
> -}
> -
>  static inline int dev_requeue_skb(struct sk_buff *skb, struct Qdisc *q)
>  {
>     q->gso_skb = skb;
>     q->qstats.requeues++;
> +   q->q.qlen++;   /* it's still part of the queue */
>     __netif_schedule(q);
>
>     return 0;
> @@ -61,9 +57,11 @@ static inline struct sk_buff *dequeue_sk
>
>        /* check the reason of requeuing without tx lock first */
>        txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb));
> -      if (!netif_tx_queue_stopped(txq) && !netif_tx_queue_frozen(txq))
> +      if (!netif_tx_queue_stopped(txq) &&
> +          !netif_tx_queue_frozen(txq)) {
>           q->gso_skb = NULL;
> -      else
> +         q->q.qlen--;
> +      } else
>           skb = NULL;
>     } else {
>        skb = q->dequeue(q);
> @@ -103,44 +101,23 @@ static inline int handle_dev_cpu_collisi
>  }
>
>  /*
> - * NOTE: Called under qdisc_lock(q) with locally disabled BH.
> - *
> - * __QDISC_STATE_RUNNING guarantees only one CPU can process
> - * this qdisc at a time. qdisc_lock(q) serializes queue accesses for
> - * this queue.
> - *
> - *  netif_tx_lock serializes accesses to device driver.
> - *
> - *  qdisc_lock(q) and netif_tx_lock are mutually exclusive,
> - *  if one is grabbed, another must be free.
> - *
> - * Note, that this procedure can be called by a watchdog timer
> + * Transmit one skb, and handle the return status as required. Holding
the
> + * __QDISC_STATE_RUNNING bit guarantees that only one CPU can execute
this
> + * function.
>   *
>   * Returns to the caller:
>   *            0  - queue is empty or throttled.
>   *            >0 - queue is not empty.
> - *
>   */
> -static inline int qdisc_restart(struct Qdisc *q)
> +int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,
> +          struct net_device *dev, struct netdev_queue *txq,
> +          spinlock_t *root_lock)
>  {
> -   struct netdev_queue *txq;
>     int ret = NETDEV_TX_BUSY;
> -   struct net_device *dev;
> -   spinlock_t *root_lock;
> -   struct sk_buff *skb;
> -
> -   /* Dequeue packet */
> -   if (unlikely((skb = dequeue_skb(q)) == NULL))
> -      return 0;
> -
> -   root_lock = qdisc_lock(q);
>
>     /* And release qdisc */
>     spin_unlock(root_lock);
>
> -   dev = qdisc_dev(q);
> -   txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb));
> -
>     HARD_TX_LOCK(dev, txq, smp_processor_id());
>     if (!netif_tx_queue_stopped(txq) &&
>         !netif_tx_queue_frozen(txq))
> @@ -177,6 +154,44 @@ static inline int qdisc_restart(struct Q
>     return ret;
>  }
>
> +/*
> + * NOTE: Called under qdisc_lock(q) with locally disabled BH.
> + *
> + * __QDISC_STATE_RUNNING guarantees only one CPU can process
> + * this qdisc at a time. qdisc_lock(q) serializes queue accesses for
> + * this queue.
> + *
> + *  netif_tx_lock serializes accesses to device driver.
> + *
> + *  qdisc_lock(q) and netif_tx_lock are mutually exclusive,
> + *  if one is grabbed, another must be free.
> + *
> + * Note, that this procedure can be called by a watchdog timer
> + *
> + * Returns to the caller:
> + *            0  - queue is empty or throttled.
> + *            >0 - queue is not empty.
> + *
> + */
> +static inline int qdisc_restart(struct Qdisc *q)
> +{
> +   struct netdev_queue *txq;
> +   struct net_device *dev;
> +   spinlock_t *root_lock;
> +   struct sk_buff *skb;
> +
> +   /* Dequeue packet */
> +   skb = dequeue_skb(q);
> +   if (unlikely(!skb))
> +      return 0;
> +
> +   root_lock = qdisc_lock(q);
> +   dev = qdisc_dev(q);
> +   txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb));
> +
> +   return sch_direct_xmit(skb, q, dev, txq, root_lock);
> +}
> +
>  void __qdisc_run(struct Qdisc *q)
>  {
>     unsigned long start_time = jiffies;
> @@ -547,8 +562,11 @@ void qdisc_reset(struct Qdisc *qdisc)
>     if (ops->reset)
>        ops->reset(qdisc);
>
> -   kfree_skb(qdisc->gso_skb);
> -   qdisc->gso_skb = NULL;
> +   if (qdisc->gso_skb) {
> +      kfree_skb(qdisc->gso_skb);
> +      qdisc->gso_skb = NULL;
> +      qdisc->q.qlen = 0;
> +   }
>  }
>  EXPORT_SYMBOL(qdisc_reset);
>
> @@ -605,6 +623,9 @@ static void attach_one_default_qdisc(str
>           printk(KERN_INFO "%s: activation failed\n", dev->name);
>           return;
>        }
> +
> +      /* Can by-pass the queue discipline for default qdisc */
> +      qdisc->flags |= TCQ_F_CAN_BYPASS;
>     } else {
>        qdisc =  &noqueue_qdisc;
>     }


^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [RFC] [PATCH] Avoid enqueuing skb for default qdiscs
@ 2009-08-04 15:51 Krishna Kumar
  2009-08-04 16:21 ` Krishna Kumar2
  0 siblings, 1 reply; 16+ messages in thread
From: Krishna Kumar @ 2009-08-04 15:51 UTC (permalink / raw)
  To: Jarek Poplawski; +Cc: kaber, netdev, davem, Krishna Kumar, herbert

> Jarek Poplawski <jarkao2@gmail.com> wrote on 08/04/2009 12:52:05 PM:

Hi Jarek,

(Sorry that you are getting this mail twice since the mail didn't go out to
others, but this time I have fixed my mailer)

> Actually I meant:
> -      qdisc->q.qlen--;
> +      qdisc->q.qlen = 0;
> because you can do it after another qdisc->q.qlen = 0.

You are right - I saw some of the sched resets setting it to zero (while others
dont touch it). I will reset to zero.

> > > I wonder if we shouldn't update bstats yet. Btw, try checkpatch.
> >
> > Rate estimator isn't used for default qdisc, so is anything
> > required?
>
> I meant "Sent" stats from "tc -s qdisc sh" could be misleading.

You are right - for a single netperf run, the stats magically show 0
packets/bytes sent!

> > Also, checkpatch suggested to change the original code:
> >         "if (unlikely((skb = dequeue_skb(q)) == NULL))"
> > since it appears as "new code", hope that change is fine.
>
> I mainly meant a whitespace warning.

Yes, I had fixed that also.

A compile and single-netperf-run tested patch is inlined below - the only
two changes from last patch is setting qlen=0 and updating bstats. Now
the stats show:

qdisc pfifo_fast 0: root bands 3 priomap  1 2 2 2 1 2 0 0 1 1 1 1 1 1 1 1
 Sent 10170288228 bytes 157522 pkt (dropped 0, overlimits 0 requeues 0) 
 rate 0bit 0pps backlog 0b 0p requeues 0 

On the original kernel, I got:
qdisc pfifo_fast 0: root bands 3 priomap  1 2 2 2 1 2 0 0 1 1 1 1 1 1 1 1
 Sent 10157266798 bytes 177539 pkt (dropped 0, overlimits 0 requeues 0) 
 rate 0bit 0pps backlog 0b 0p requeues 0 

Thanks,

- KK

Signed-off-by: Krishna Kumar <krkumar2@in.ibm.com>
---
 include/net/pkt_sched.h   |    3 +
 include/net/sch_generic.h |   15 +++++
 net/core/dev.c            |   51 ++++++++++++++-----
 net/sched/sch_generic.c   |   93 ++++++++++++++++++++++--------------
 4 files changed, 111 insertions(+), 51 deletions(-)

diff -ruNp org2/include/net/pkt_sched.h new5/include/net/pkt_sched.h
--- org2/include/net/pkt_sched.h	2009-08-04 11:41:21.000000000 +0530
+++ new5/include/net/pkt_sched.h	2009-08-04 18:47:16.000000000 +0530
@@ -87,6 +87,9 @@ extern struct qdisc_rate_table *qdisc_ge
 extern void qdisc_put_rtab(struct qdisc_rate_table *tab);
 extern void qdisc_put_stab(struct qdisc_size_table *tab);
 extern void qdisc_warn_nonwc(char *txt, struct Qdisc *qdisc);
+extern int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,
+			   struct net_device *dev, struct netdev_queue *txq,
+			   spinlock_t *root_lock);
 
 extern void __qdisc_run(struct Qdisc *q);
 
diff -ruNp org2/include/net/sch_generic.h new5/include/net/sch_generic.h
--- org2/include/net/sch_generic.h	2009-03-24 08:54:16.000000000 +0530
+++ new5/include/net/sch_generic.h	2009-08-04 19:58:26.000000000 +0530
@@ -45,6 +45,7 @@ struct Qdisc
 #define TCQ_F_BUILTIN		1
 #define TCQ_F_THROTTLED		2
 #define TCQ_F_INGRESS		4
+#define TCQ_F_CAN_BYPASS	8
 #define TCQ_F_WARN_NONWC	(1 << 16)
 	int			padded;
 	struct Qdisc_ops	*ops;
@@ -182,6 +183,11 @@ struct qdisc_skb_cb {
 	char			data[];
 };
 
+static inline int qdisc_qlen(struct Qdisc *q)
+{
+	return q->q.qlen;
+}
+
 static inline struct qdisc_skb_cb *qdisc_skb_cb(struct sk_buff *skb)
 {
 	return (struct qdisc_skb_cb *)skb->cb;
@@ -387,13 +393,18 @@ static inline int qdisc_enqueue_root(str
 	return qdisc_enqueue(skb, sch) & NET_XMIT_MASK;
 }
 
+static inline int __qdisc_update_bstats(struct Qdisc *sch, unsigned int len)
+{
+	sch->bstats.bytes += len;
+	sch->bstats.packets++;
+}
+
 static inline int __qdisc_enqueue_tail(struct sk_buff *skb, struct Qdisc *sch,
 				       struct sk_buff_head *list)
 {
 	__skb_queue_tail(list, skb);
 	sch->qstats.backlog += qdisc_pkt_len(skb);
-	sch->bstats.bytes += qdisc_pkt_len(skb);
-	sch->bstats.packets++;
+	__qdisc_update_bstats(sch, qdisc_pkt_len(skb));
 
 	return NET_XMIT_SUCCESS;
 }
diff -ruNp org2/net/core/dev.c new5/net/core/dev.c
--- org2/net/core/dev.c	2009-07-27 09:08:24.000000000 +0530
+++ new5/net/core/dev.c	2009-08-04 20:26:57.000000000 +0530
@@ -1786,6 +1786,43 @@ static struct netdev_queue *dev_pick_tx(
 	return netdev_get_tx_queue(dev, queue_index);
 }
 
+static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q,
+				 struct net_device *dev,
+				 struct netdev_queue *txq)
+{
+	spinlock_t *root_lock = qdisc_lock(q);
+	int rc;
+
+	spin_lock(root_lock);
+	if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED, &q->state))) {
+		kfree_skb(skb);
+		rc = NET_XMIT_DROP;
+	} else if ((q->flags & TCQ_F_CAN_BYPASS) && !qdisc_qlen(q) &&
+		   !test_and_set_bit(__QDISC_STATE_RUNNING, &q->state)) {
+		unsigned int len = skb->len;
+
+		/*
+		 * This is a work-conserving queue; there are no old skbs
+		 * waiting to be sent out; and the qdisc is not running -
+		 * xmit the skb directly.
+		 */
+		if (sch_direct_xmit(skb, q, dev, txq, root_lock))
+			__qdisc_run(q);
+		else
+			clear_bit(__QDISC_STATE_RUNNING, &q->state);
+
+		__qdisc_update_bstats(q, len);
+
+		rc = NET_XMIT_SUCCESS;
+	} else {
+		rc = qdisc_enqueue_root(skb, q);
+		qdisc_run(q);
+	}
+	spin_unlock(root_lock);
+
+	return rc;
+}
+
 /**
  *	dev_queue_xmit - transmit a buffer
  *	@skb: buffer to transmit
@@ -1859,19 +1896,7 @@ gso:
 	skb->tc_verd = SET_TC_AT(skb->tc_verd,AT_EGRESS);
 #endif
 	if (q->enqueue) {
-		spinlock_t *root_lock = qdisc_lock(q);
-
-		spin_lock(root_lock);
-
-		if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED, &q->state))) {
-			kfree_skb(skb);
-			rc = NET_XMIT_DROP;
-		} else {
-			rc = qdisc_enqueue_root(skb, q);
-			qdisc_run(q);
-		}
-		spin_unlock(root_lock);
-
+		rc = __dev_xmit_skb(skb, q, dev, txq);
 		goto out;
 	}
 
diff -ruNp org2/net/sched/sch_generic.c new5/net/sched/sch_generic.c
--- org2/net/sched/sch_generic.c	2009-05-25 07:48:07.000000000 +0530
+++ new5/net/sched/sch_generic.c	2009-08-04 19:38:27.000000000 +0530
@@ -37,15 +37,11 @@
  * - updates to tree and tree walking are only done under the rtnl mutex.
  */
 
-static inline int qdisc_qlen(struct Qdisc *q)
-{
-	return q->q.qlen;
-}
-
 static inline int dev_requeue_skb(struct sk_buff *skb, struct Qdisc *q)
 {
 	q->gso_skb = skb;
 	q->qstats.requeues++;
+	q->q.qlen++;	/* it's still part of the queue */
 	__netif_schedule(q);
 
 	return 0;
@@ -61,9 +57,11 @@ static inline struct sk_buff *dequeue_sk
 
 		/* check the reason of requeuing without tx lock first */
 		txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb));
-		if (!netif_tx_queue_stopped(txq) && !netif_tx_queue_frozen(txq))
+		if (!netif_tx_queue_stopped(txq) &&
+		    !netif_tx_queue_frozen(txq)) {
 			q->gso_skb = NULL;
-		else
+			q->q.qlen--;
+		} else
 			skb = NULL;
 	} else {
 		skb = q->dequeue(q);
@@ -103,44 +101,23 @@ static inline int handle_dev_cpu_collisi
 }
 
 /*
- * NOTE: Called under qdisc_lock(q) with locally disabled BH.
- *
- * __QDISC_STATE_RUNNING guarantees only one CPU can process
- * this qdisc at a time. qdisc_lock(q) serializes queue accesses for
- * this queue.
- *
- *  netif_tx_lock serializes accesses to device driver.
- *
- *  qdisc_lock(q) and netif_tx_lock are mutually exclusive,
- *  if one is grabbed, another must be free.
- *
- * Note, that this procedure can be called by a watchdog timer
+ * Transmit one skb, and handle the return status as required. Holding the
+ * __QDISC_STATE_RUNNING bit guarantees that only one CPU can execute this
+ * function.
  *
  * Returns to the caller:
  *				0  - queue is empty or throttled.
  *				>0 - queue is not empty.
- *
  */
-static inline int qdisc_restart(struct Qdisc *q)
+int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,
+		    struct net_device *dev, struct netdev_queue *txq,
+		    spinlock_t *root_lock)
 {
-	struct netdev_queue *txq;
 	int ret = NETDEV_TX_BUSY;
-	struct net_device *dev;
-	spinlock_t *root_lock;
-	struct sk_buff *skb;
-
-	/* Dequeue packet */
-	if (unlikely((skb = dequeue_skb(q)) == NULL))
-		return 0;
-
-	root_lock = qdisc_lock(q);
 
 	/* And release qdisc */
 	spin_unlock(root_lock);
 
-	dev = qdisc_dev(q);
-	txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb));
-
 	HARD_TX_LOCK(dev, txq, smp_processor_id());
 	if (!netif_tx_queue_stopped(txq) &&
 	    !netif_tx_queue_frozen(txq))
@@ -177,6 +154,44 @@ static inline int qdisc_restart(struct Q
 	return ret;
 }
 
+/*
+ * NOTE: Called under qdisc_lock(q) with locally disabled BH.
+ *
+ * __QDISC_STATE_RUNNING guarantees only one CPU can process
+ * this qdisc at a time. qdisc_lock(q) serializes queue accesses for
+ * this queue.
+ *
+ *  netif_tx_lock serializes accesses to device driver.
+ *
+ *  qdisc_lock(q) and netif_tx_lock are mutually exclusive,
+ *  if one is grabbed, another must be free.
+ *
+ * Note, that this procedure can be called by a watchdog timer
+ *
+ * Returns to the caller:
+ *				0  - queue is empty or throttled.
+ *				>0 - queue is not empty.
+ *
+ */
+static inline int qdisc_restart(struct Qdisc *q)
+{
+	struct netdev_queue *txq;
+	struct net_device *dev;
+	spinlock_t *root_lock;
+	struct sk_buff *skb;
+
+	/* Dequeue packet */
+	skb = dequeue_skb(q);
+	if (unlikely(!skb))
+		return 0;
+
+	root_lock = qdisc_lock(q);
+	dev = qdisc_dev(q);
+	txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb));
+
+	return sch_direct_xmit(skb, q, dev, txq, root_lock);
+}
+
 void __qdisc_run(struct Qdisc *q)
 {
 	unsigned long start_time = jiffies;
@@ -547,8 +562,11 @@ void qdisc_reset(struct Qdisc *qdisc)
 	if (ops->reset)
 		ops->reset(qdisc);
 
-	kfree_skb(qdisc->gso_skb);
-	qdisc->gso_skb = NULL;
+	if (qdisc->gso_skb) {
+		kfree_skb(qdisc->gso_skb);
+		qdisc->gso_skb = NULL;
+		qdisc->q.qlen = 0;
+	}
 }
 EXPORT_SYMBOL(qdisc_reset);
 
@@ -605,6 +623,9 @@ static void attach_one_default_qdisc(str
 			printk(KERN_INFO "%s: activation failed\n", dev->name);
 			return;
 		}
+
+		/* Can by-pass the queue discipline for default qdisc */
+		qdisc->flags |= TCQ_F_CAN_BYPASS;
 	} else {
 		qdisc =  &noqueue_qdisc;
 	}

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [RFC] [PATCH] Avoid enqueuing skb for default qdiscs
  2009-08-04  6:55     ` Krishna Kumar2
@ 2009-08-04  7:22       ` Jarek Poplawski
  0 siblings, 0 replies; 16+ messages in thread
From: Jarek Poplawski @ 2009-08-04  7:22 UTC (permalink / raw)
  To: Krishna Kumar2; +Cc: davem, herbert, kaber, netdev

On Tue, Aug 04, 2009 at 12:25:07PM +0530, Krishna Kumar2 wrote:
> Jarek Poplawski <jarkao2@gmail.com> wrote on 08/04/2009 01:08:47 AM:
> 
> Hi Jarek,
> 
> > Since this quoted part looks very similarly I'm using it here, so
> > forget my note if something has changed.
> 
> Thanks for your feedback. Yes, the inline patch is the same as the
> attached one.
> 
> > >> @@ -306,6 +312,9 @@ extern struct Qdisc *qdisc_create_dflt(s
> > >>                     struct Qdisc_ops *ops, u32 parentid);
> > >>  extern void qdisc_calculate_pkt_len(struct sk_buff *skb,
> > >>                 struct qdisc_size_table *stab);
> > >> +extern int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,
> > >> +            struct net_device *dev, struct netdev_queue *txq,
> > >> +            spinlock_t *root_lock);
> >
> > Probably this would look better in pkt_sched.h around __qdisc_run()?
> 
> I had it originally in pkt_sched.h, for some reason I had moved it to
> sch_generic.h, I will move it back.
> 
> > >> +static inline int __dev_hw_xmit(struct sk_buff *skb, struct Qdisc *q,
> >
> > I guess, we don't touch "hw" in this function yet?
> 
> Right - this was due to lack of a good name. Maybe I can call it
> __dev_xmit_skb?

Looks better to me.

> 
> > >> -   kfree_skb(qdisc->gso_skb);
> > >> -   qdisc->gso_skb = NULL;
> > >> +   if (qdisc->gso_skb) {
> > >> +      kfree_skb(qdisc->gso_skb);
> > >> +      qdisc->gso_skb = NULL;
> > >> +      qdisc->q.qlen--;
> >
> > You don't need this here: qdiscs should zero it in ops->reset().
> 
> I will keep it around, as you said in your other mail.

Actually I meant:
-      qdisc->q.qlen--;
+      qdisc->q.qlen = 0;
because you can do it after another qdisc->q.qlen = 0.

> 
> > >> +      /* Can by-pass the queue discipline for default qdisc */
> > >> +      qdisc->flags = TCQ_F_CAN_BYPASS;
> >
> > "|=" looks nicer to me.
> 
> Agreed - and required if qdisc_create_dflt (and the functions it calls)
> is changed to set the flags.
> 
> > >>     } else {
> > >>        qdisc =  &noqueue_qdisc;
> > >>     }
> >
> > I wonder if we shouldn't update bstats yet. Btw, try checkpatch.
> 
> Rate estimator isn't used for default qdisc, so is anything
> required?

I meant "Sent" stats from "tc -s qdisc sh" could be misleading.

> Also, checkpatch suggested to change the original code:
>         "if (unlikely((skb = dequeue_skb(q)) == NULL))"
> since it appears as "new code", hope that change is fine.

I mainly meant a whitespace warning.

> 
> The patch after incorporating the review comments is both attached
> and inlined, since my mailer is still not working (I sent this mail
> about an hour back but this one didn't seem to have made it while
> one of my latter mails did).

I got 2 very similar messages this time, several minutes ago ;-)

Thanks,
Jarek P.

> 
> Thanks,
> 
> - KK
> 
>  (See attached file: patch4)
> 
> ----------------------------------------------------------------------
>  include/net/pkt_sched.h   |    3 +
>  include/net/sch_generic.h |    6 ++
>  net/core/dev.c            |   46 ++++++++++++-----
>  net/sched/sch_generic.c   |   93 ++++++++++++++++++++++--------------
>  4 files changed, 99 insertions(+), 49 deletions(-)
> 
> diff -ruNp org/include/net/pkt_sched.h new/include/net/pkt_sched.h
> --- org/include/net/pkt_sched.h 2009-08-04 11:41:21.000000000 +0530
> +++ new/include/net/pkt_sched.h 2009-08-04 11:42:18.000000000 +0530
> @@ -87,6 +87,9 @@ extern struct qdisc_rate_table *qdisc_ge
>  extern void qdisc_put_rtab(struct qdisc_rate_table *tab);
>  extern void qdisc_put_stab(struct qdisc_size_table *tab);
>  extern void qdisc_warn_nonwc(char *txt, struct Qdisc *qdisc);
> +extern int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,
> +                          struct net_device *dev, struct netdev_queue
> *txq,
> +                          spinlock_t *root_lock);
> 
>  extern void __qdisc_run(struct Qdisc *q);
> 
> diff -ruNp org/include/net/sch_generic.h new/include/net/sch_generic.h
> --- org/include/net/sch_generic.h       2009-03-24 08:54:16.000000000 +0530
> +++ new/include/net/sch_generic.h       2009-08-04 11:02:39.000000000 +0530
> @@ -45,6 +45,7 @@ struct Qdisc
>  #define TCQ_F_BUILTIN          1
>  #define TCQ_F_THROTTLED                2
>  #define TCQ_F_INGRESS          4
> +#define TCQ_F_CAN_BYPASS       8
>  #define TCQ_F_WARN_NONWC       (1 << 16)
>         int                     padded;
>         struct Qdisc_ops        *ops;
> @@ -182,6 +183,11 @@ struct qdisc_skb_cb {
>         char                    data[];
>  };
> 
> +static inline int qdisc_qlen(struct Qdisc *q)
> +{
> +       return q->q.qlen;
> +}
> +
>  static inline struct qdisc_skb_cb *qdisc_skb_cb(struct sk_buff *skb)
>  {
>         return (struct qdisc_skb_cb *)skb->cb;
> diff -ruNp org/net/core/dev.c new/net/core/dev.c
> --- org/net/core/dev.c  2009-07-27 09:08:24.000000000 +0530
> +++ new/net/core/dev.c  2009-08-04 11:44:57.000000000 +0530
> @@ -1786,6 +1786,38 @@ static struct netdev_queue *dev_pick_tx(
>         return netdev_get_tx_queue(dev, queue_index);
>  }
> 
> +static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q,
> +                                struct net_device *dev,
> +                                struct netdev_queue *txq)
> +{
> +       spinlock_t *root_lock = qdisc_lock(q);
> +       int rc;
> +
> +       spin_lock(root_lock);
> +       if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED, &q->state))) {
> +               kfree_skb(skb);
> +               rc = NET_XMIT_DROP;
> +       } else if ((q->flags & TCQ_F_CAN_BYPASS) && !qdisc_qlen(q) &&
> +                  !test_and_set_bit(__QDISC_STATE_RUNNING, &q->state)) {
> +               /*
> +                * This is a work-conserving queue; there are no old skbs
> +                * waiting to be sent out; and the qdisc is not running -
> +                * xmit the skb directly.
> +                */
> +               if (sch_direct_xmit(skb, q, dev, txq, root_lock))
> +                       __qdisc_run(q);
> +               else
> +                       clear_bit(__QDISC_STATE_RUNNING, &q->state);
> +               rc = NET_XMIT_SUCCESS;
> +       } else {
> +               rc = qdisc_enqueue_root(skb, q);
> +               qdisc_run(q);
> +       }
> +       spin_unlock(root_lock);
> +
> +       return rc;
> +}
> +
>  /**
>   *     dev_queue_xmit - transmit a buffer
>   *     @skb: buffer to transmit
> @@ -1859,19 +1891,7 @@ gso:
>         skb->tc_verd = SET_TC_AT(skb->tc_verd,AT_EGRESS);
>  #endif
>         if (q->enqueue) {
> -               spinlock_t *root_lock = qdisc_lock(q);
> -
> -               spin_lock(root_lock);
> -
> -               if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED,
> &q->state))) {
> -                       kfree_skb(skb);
> -                       rc = NET_XMIT_DROP;
> -               } else {
> -                       rc = qdisc_enqueue_root(skb, q);
> -                       qdisc_run(q);
> -               }
> -               spin_unlock(root_lock);
> -
> +               rc = __dev_xmit_skb(skb, q, dev, txq);
>                 goto out;
>         }
> 
> diff -ruNp org/net/sched/sch_generic.c new/net/sched/sch_generic.c
> --- org/net/sched/sch_generic.c 2009-05-25 07:48:07.000000000 +0530
> +++ new/net/sched/sch_generic.c 2009-08-04 11:00:38.000000000 +0530
> @@ -37,15 +37,11 @@
>   * - updates to tree and tree walking are only done under the rtnl mutex.
>   */
> 
> -static inline int qdisc_qlen(struct Qdisc *q)
> -{
> -       return q->q.qlen;
> -}
> -
>  static inline int dev_requeue_skb(struct sk_buff *skb, struct Qdisc *q)
>  {
>         q->gso_skb = skb;
>         q->qstats.requeues++;
> +       q->q.qlen++;    /* it's still part of the queue */
>         __netif_schedule(q);
> 
>         return 0;
> @@ -61,9 +57,11 @@ static inline struct sk_buff *dequeue_sk
> 
>                 /* check the reason of requeuing without tx lock first */
>                 txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb));
> -               if (!netif_tx_queue_stopped(txq) &&
> !netif_tx_queue_frozen(txq))
> +               if (!netif_tx_queue_stopped(txq) &&
> +                   !netif_tx_queue_frozen(txq)) {
>                         q->gso_skb = NULL;
> -               else
> +                       q->q.qlen--;
> +               } else
>                         skb = NULL;
>         } else {
>                 skb = q->dequeue(q);
> @@ -103,44 +101,23 @@ static inline int handle_dev_cpu_collisi
>  }
> 
>  /*
> - * NOTE: Called under qdisc_lock(q) with locally disabled BH.
> - *
> - * __QDISC_STATE_RUNNING guarantees only one CPU can process
> - * this qdisc at a time. qdisc_lock(q) serializes queue accesses for
> - * this queue.
> - *
> - *  netif_tx_lock serializes accesses to device driver.
> - *
> - *  qdisc_lock(q) and netif_tx_lock are mutually exclusive,
> - *  if one is grabbed, another must be free.
> - *
> - * Note, that this procedure can be called by a watchdog timer
> + * Transmit one skb, and handle the return status as required. Holding the
> + * __QDISC_STATE_RUNNING bit guarantees that only one CPU can execute this
> + * function.
>   *
>   * Returns to the caller:
>   *                             0  - queue is empty or throttled.
>   *                             >0 - queue is not empty.
> - *
>   */
> -static inline int qdisc_restart(struct Qdisc *q)
> +int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,
> +                   struct net_device *dev, struct netdev_queue *txq,
> +                   spinlock_t *root_lock)
>  {
> -       struct netdev_queue *txq;
>         int ret = NETDEV_TX_BUSY;
> -       struct net_device *dev;
> -       spinlock_t *root_lock;
> -       struct sk_buff *skb;
> -
> -       /* Dequeue packet */
> -       if (unlikely((skb = dequeue_skb(q)) == NULL))
> -               return 0;
> -
> -       root_lock = qdisc_lock(q);
> 
>         /* And release qdisc */
>         spin_unlock(root_lock);
> 
> -       dev = qdisc_dev(q);
> -       txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb));
> -
>         HARD_TX_LOCK(dev, txq, smp_processor_id());
>         if (!netif_tx_queue_stopped(txq) &&
>             !netif_tx_queue_frozen(txq))
> @@ -177,6 +154,44 @@ static inline int qdisc_restart(struct Q
>         return ret;
>  }
> 
> +/*
> + * NOTE: Called under qdisc_lock(q) with locally disabled BH.
> + *
> + * __QDISC_STATE_RUNNING guarantees only one CPU can process
> + * this qdisc at a time. qdisc_lock(q) serializes queue accesses for
> + * this queue.
> + *
> + *  netif_tx_lock serializes accesses to device driver.
> + *
> + *  qdisc_lock(q) and netif_tx_lock are mutually exclusive,
> + *  if one is grabbed, another must be free.
> + *
> + * Note, that this procedure can be called by a watchdog timer
> + *
> + * Returns to the caller:
> + *                             0  - queue is empty or throttled.
> + *                             >0 - queue is not empty.
> + *
> + */
> +static inline int qdisc_restart(struct Qdisc *q)
> +{
> +       struct netdev_queue *txq;
> +       struct net_device *dev;
> +       spinlock_t *root_lock;
> +       struct sk_buff *skb;
> +
> +       /* Dequeue packet */
> +       skb = dequeue_skb(q);
> +       if (unlikely(!skb))
> +               return 0;
> +
> +       root_lock = qdisc_lock(q);
> +       dev = qdisc_dev(q);
> +       txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb));
> +
> +       return sch_direct_xmit(skb, q, dev, txq, root_lock);
> +}
> +
>  void __qdisc_run(struct Qdisc *q)
>  {
>         unsigned long start_time = jiffies;
> @@ -547,8 +562,11 @@ void qdisc_reset(struct Qdisc *qdisc)
>         if (ops->reset)
>                 ops->reset(qdisc);
> 
> -       kfree_skb(qdisc->gso_skb);
> -       qdisc->gso_skb = NULL;
> +       if (qdisc->gso_skb) {
> +               kfree_skb(qdisc->gso_skb);
> +               qdisc->gso_skb = NULL;
> +               qdisc->q.qlen--;
> +       }
>  }
>  EXPORT_SYMBOL(qdisc_reset);
> 
> @@ -605,6 +623,9 @@ static void attach_one_default_qdisc(str
>                         printk(KERN_INFO "%s: activation failed\n",
> dev->name);
>                         return;
>                 }
> +
> +               /* Can by-pass the queue discipline for default qdisc */
> +               qdisc->flags |= TCQ_F_CAN_BYPASS;
>         } else {
>                 qdisc =  &noqueue_qdisc;
>         }



^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [RFC] [PATCH] Avoid enqueuing skb for default qdiscs
  2009-08-03 19:38   ` Jarek Poplawski
  2009-08-03 20:22     ` Jarek Poplawski
@ 2009-08-04  6:55     ` Krishna Kumar2
  2009-08-04  7:22       ` Jarek Poplawski
  1 sibling, 1 reply; 16+ messages in thread
From: Krishna Kumar2 @ 2009-08-04  6:55 UTC (permalink / raw)
  To: Jarek Poplawski; +Cc: davem, herbert, kaber, netdev

[-- Attachment #1: Type: text/plain, Size: 11318 bytes --]

Jarek Poplawski <jarkao2@gmail.com> wrote on 08/04/2009 01:08:47 AM:

Hi Jarek,

> Since this quoted part looks very similarly I'm using it here, so
> forget my note if something has changed.

Thanks for your feedback. Yes, the inline patch is the same as the
attached one.

> >> @@ -306,6 +312,9 @@ extern struct Qdisc *qdisc_create_dflt(s
> >>                     struct Qdisc_ops *ops, u32 parentid);
> >>  extern void qdisc_calculate_pkt_len(struct sk_buff *skb,
> >>                 struct qdisc_size_table *stab);
> >> +extern int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,
> >> +            struct net_device *dev, struct netdev_queue *txq,
> >> +            spinlock_t *root_lock);
>
> Probably this would look better in pkt_sched.h around __qdisc_run()?

I had it originally in pkt_sched.h, for some reason I had moved it to
sch_generic.h, I will move it back.

> >> +static inline int __dev_hw_xmit(struct sk_buff *skb, struct Qdisc *q,
>
> I guess, we don't touch "hw" in this function yet?

Right - this was due to lack of a good name. Maybe I can call it
__dev_xmit_skb?

> >> -   kfree_skb(qdisc->gso_skb);
> >> -   qdisc->gso_skb = NULL;
> >> +   if (qdisc->gso_skb) {
> >> +      kfree_skb(qdisc->gso_skb);
> >> +      qdisc->gso_skb = NULL;
> >> +      qdisc->q.qlen--;
>
> You don't need this here: qdiscs should zero it in ops->reset().

I will keep it around, as you said in your other mail.

> >> +      /* Can by-pass the queue discipline for default qdisc */
> >> +      qdisc->flags = TCQ_F_CAN_BYPASS;
>
> "|=" looks nicer to me.

Agreed - and required if qdisc_create_dflt (and the functions it calls)
is changed to set the flags.

> >>     } else {
> >>        qdisc =  &noqueue_qdisc;
> >>     }
>
> I wonder if we shouldn't update bstats yet. Btw, try checkpatch.

Rate estimator isn't used for default qdisc, so is anything
required? Also, checkpatch suggested to change the original code:
        "if (unlikely((skb = dequeue_skb(q)) == NULL))"
since it appears as "new code", hope that change is fine.

The patch after incorporating the review comments is both attached
and inlined, since my mailer is still not working (I sent this mail
about an hour back but this one didn't seem to have made it while
one of my latter mails did).

Thanks,

- KK

 (See attached file: patch4)

----------------------------------------------------------------------
 include/net/pkt_sched.h   |    3 +
 include/net/sch_generic.h |    6 ++
 net/core/dev.c            |   46 ++++++++++++-----
 net/sched/sch_generic.c   |   93 ++++++++++++++++++++++--------------
 4 files changed, 99 insertions(+), 49 deletions(-)

diff -ruNp org/include/net/pkt_sched.h new/include/net/pkt_sched.h
--- org/include/net/pkt_sched.h 2009-08-04 11:41:21.000000000 +0530
+++ new/include/net/pkt_sched.h 2009-08-04 11:42:18.000000000 +0530
@@ -87,6 +87,9 @@ extern struct qdisc_rate_table *qdisc_ge
 extern void qdisc_put_rtab(struct qdisc_rate_table *tab);
 extern void qdisc_put_stab(struct qdisc_size_table *tab);
 extern void qdisc_warn_nonwc(char *txt, struct Qdisc *qdisc);
+extern int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,
+                          struct net_device *dev, struct netdev_queue
*txq,
+                          spinlock_t *root_lock);

 extern void __qdisc_run(struct Qdisc *q);

diff -ruNp org/include/net/sch_generic.h new/include/net/sch_generic.h
--- org/include/net/sch_generic.h       2009-03-24 08:54:16.000000000 +0530
+++ new/include/net/sch_generic.h       2009-08-04 11:02:39.000000000 +0530
@@ -45,6 +45,7 @@ struct Qdisc
 #define TCQ_F_BUILTIN          1
 #define TCQ_F_THROTTLED                2
 #define TCQ_F_INGRESS          4
+#define TCQ_F_CAN_BYPASS       8
 #define TCQ_F_WARN_NONWC       (1 << 16)
        int                     padded;
        struct Qdisc_ops        *ops;
@@ -182,6 +183,11 @@ struct qdisc_skb_cb {
        char                    data[];
 };

+static inline int qdisc_qlen(struct Qdisc *q)
+{
+       return q->q.qlen;
+}
+
 static inline struct qdisc_skb_cb *qdisc_skb_cb(struct sk_buff *skb)
 {
        return (struct qdisc_skb_cb *)skb->cb;
diff -ruNp org/net/core/dev.c new/net/core/dev.c
--- org/net/core/dev.c  2009-07-27 09:08:24.000000000 +0530
+++ new/net/core/dev.c  2009-08-04 11:44:57.000000000 +0530
@@ -1786,6 +1786,38 @@ static struct netdev_queue *dev_pick_tx(
        return netdev_get_tx_queue(dev, queue_index);
 }

+static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q,
+                                struct net_device *dev,
+                                struct netdev_queue *txq)
+{
+       spinlock_t *root_lock = qdisc_lock(q);
+       int rc;
+
+       spin_lock(root_lock);
+       if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED, &q->state))) {
+               kfree_skb(skb);
+               rc = NET_XMIT_DROP;
+       } else if ((q->flags & TCQ_F_CAN_BYPASS) && !qdisc_qlen(q) &&
+                  !test_and_set_bit(__QDISC_STATE_RUNNING, &q->state)) {
+               /*
+                * This is a work-conserving queue; there are no old skbs
+                * waiting to be sent out; and the qdisc is not running -
+                * xmit the skb directly.
+                */
+               if (sch_direct_xmit(skb, q, dev, txq, root_lock))
+                       __qdisc_run(q);
+               else
+                       clear_bit(__QDISC_STATE_RUNNING, &q->state);
+               rc = NET_XMIT_SUCCESS;
+       } else {
+               rc = qdisc_enqueue_root(skb, q);
+               qdisc_run(q);
+       }
+       spin_unlock(root_lock);
+
+       return rc;
+}
+
 /**
  *     dev_queue_xmit - transmit a buffer
  *     @skb: buffer to transmit
@@ -1859,19 +1891,7 @@ gso:
        skb->tc_verd = SET_TC_AT(skb->tc_verd,AT_EGRESS);
 #endif
        if (q->enqueue) {
-               spinlock_t *root_lock = qdisc_lock(q);
-
-               spin_lock(root_lock);
-
-               if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED,
&q->state))) {
-                       kfree_skb(skb);
-                       rc = NET_XMIT_DROP;
-               } else {
-                       rc = qdisc_enqueue_root(skb, q);
-                       qdisc_run(q);
-               }
-               spin_unlock(root_lock);
-
+               rc = __dev_xmit_skb(skb, q, dev, txq);
                goto out;
        }

diff -ruNp org/net/sched/sch_generic.c new/net/sched/sch_generic.c
--- org/net/sched/sch_generic.c 2009-05-25 07:48:07.000000000 +0530
+++ new/net/sched/sch_generic.c 2009-08-04 11:00:38.000000000 +0530
@@ -37,15 +37,11 @@
  * - updates to tree and tree walking are only done under the rtnl mutex.
  */

-static inline int qdisc_qlen(struct Qdisc *q)
-{
-       return q->q.qlen;
-}
-
 static inline int dev_requeue_skb(struct sk_buff *skb, struct Qdisc *q)
 {
        q->gso_skb = skb;
        q->qstats.requeues++;
+       q->q.qlen++;    /* it's still part of the queue */
        __netif_schedule(q);

        return 0;
@@ -61,9 +57,11 @@ static inline struct sk_buff *dequeue_sk

                /* check the reason of requeuing without tx lock first */
                txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb));
-               if (!netif_tx_queue_stopped(txq) &&
!netif_tx_queue_frozen(txq))
+               if (!netif_tx_queue_stopped(txq) &&
+                   !netif_tx_queue_frozen(txq)) {
                        q->gso_skb = NULL;
-               else
+                       q->q.qlen--;
+               } else
                        skb = NULL;
        } else {
                skb = q->dequeue(q);
@@ -103,44 +101,23 @@ static inline int handle_dev_cpu_collisi
 }

 /*
- * NOTE: Called under qdisc_lock(q) with locally disabled BH.
- *
- * __QDISC_STATE_RUNNING guarantees only one CPU can process
- * this qdisc at a time. qdisc_lock(q) serializes queue accesses for
- * this queue.
- *
- *  netif_tx_lock serializes accesses to device driver.
- *
- *  qdisc_lock(q) and netif_tx_lock are mutually exclusive,
- *  if one is grabbed, another must be free.
- *
- * Note, that this procedure can be called by a watchdog timer
+ * Transmit one skb, and handle the return status as required. Holding the
+ * __QDISC_STATE_RUNNING bit guarantees that only one CPU can execute this
+ * function.
  *
  * Returns to the caller:
  *                             0  - queue is empty or throttled.
  *                             >0 - queue is not empty.
- *
  */
-static inline int qdisc_restart(struct Qdisc *q)
+int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,
+                   struct net_device *dev, struct netdev_queue *txq,
+                   spinlock_t *root_lock)
 {
-       struct netdev_queue *txq;
        int ret = NETDEV_TX_BUSY;
-       struct net_device *dev;
-       spinlock_t *root_lock;
-       struct sk_buff *skb;
-
-       /* Dequeue packet */
-       if (unlikely((skb = dequeue_skb(q)) == NULL))
-               return 0;
-
-       root_lock = qdisc_lock(q);

        /* And release qdisc */
        spin_unlock(root_lock);

-       dev = qdisc_dev(q);
-       txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb));
-
        HARD_TX_LOCK(dev, txq, smp_processor_id());
        if (!netif_tx_queue_stopped(txq) &&
            !netif_tx_queue_frozen(txq))
@@ -177,6 +154,44 @@ static inline int qdisc_restart(struct Q
        return ret;
 }

+/*
+ * NOTE: Called under qdisc_lock(q) with locally disabled BH.
+ *
+ * __QDISC_STATE_RUNNING guarantees only one CPU can process
+ * this qdisc at a time. qdisc_lock(q) serializes queue accesses for
+ * this queue.
+ *
+ *  netif_tx_lock serializes accesses to device driver.
+ *
+ *  qdisc_lock(q) and netif_tx_lock are mutually exclusive,
+ *  if one is grabbed, another must be free.
+ *
+ * Note, that this procedure can be called by a watchdog timer
+ *
+ * Returns to the caller:
+ *                             0  - queue is empty or throttled.
+ *                             >0 - queue is not empty.
+ *
+ */
+static inline int qdisc_restart(struct Qdisc *q)
+{
+       struct netdev_queue *txq;
+       struct net_device *dev;
+       spinlock_t *root_lock;
+       struct sk_buff *skb;
+
+       /* Dequeue packet */
+       skb = dequeue_skb(q);
+       if (unlikely(!skb))
+               return 0;
+
+       root_lock = qdisc_lock(q);
+       dev = qdisc_dev(q);
+       txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb));
+
+       return sch_direct_xmit(skb, q, dev, txq, root_lock);
+}
+
 void __qdisc_run(struct Qdisc *q)
 {
        unsigned long start_time = jiffies;
@@ -547,8 +562,11 @@ void qdisc_reset(struct Qdisc *qdisc)
        if (ops->reset)
                ops->reset(qdisc);

-       kfree_skb(qdisc->gso_skb);
-       qdisc->gso_skb = NULL;
+       if (qdisc->gso_skb) {
+               kfree_skb(qdisc->gso_skb);
+               qdisc->gso_skb = NULL;
+               qdisc->q.qlen--;
+       }
 }
 EXPORT_SYMBOL(qdisc_reset);

@@ -605,6 +623,9 @@ static void attach_one_default_qdisc(str
                        printk(KERN_INFO "%s: activation failed\n",
dev->name);
                        return;
                }
+
+               /* Can by-pass the queue discipline for default qdisc */
+               qdisc->flags |= TCQ_F_CAN_BYPASS;
        } else {
                qdisc =  &noqueue_qdisc;
        }

[-- Attachment #2: patch4 --]
[-- Type: application/octet-stream, Size: 9455 bytes --]

From: Krishna Kumar <krkumar2@in.ibm.com>

dev_queue_xmit enqueue's a skb and calls qdisc_run which
dequeue's the skb and xmits it. In most cases (after
instrumenting the code), the skb that is enqueue'd is the
same one that is dequeue'd (unless the queue gets stopped
or multiple cpu's write to the same queue and ends in a
race with qdisc_run). For default qdiscs, we can remove
this path and simply xmit the skb since this is a work
conserving queue.

The patch uses a new flag - TCQ_F_CAN_BYPASS to identify
the default fast queue. I plan to use this flag for the
previous patch also (rename if required).  The controversial
part of the patch is incrementing qlen when a skb is
requeued, this is to avoid checks like the second line below:

+    } else if ((q->flags & TCQ_F_CAN_BYPASS) && !qdisc_qlen(q) &&
>> THIS LINE:   !q->gso_skb &&
+               !test_and_set_bit(__QDISC_STATE_RUNNING, &q->state)) {

Results of a 4 hour testing for multiple netperf sessions
(1, 2, 4, 8, 12 sessions on a 4 cpu system-X and 1, 2, 4,
8, 16, 32 sessions on a 16 cpu P6). Aggregate Mb/s across
the iterations:

-----------------------------------------------------------------
     |         System-X          |               P6
-----------------------------------------------------------------
Size |  ORG BW          NEW BW   |      ORG BW          NEW BW
-----|---------------------------|-------------------------------
16K  |  154264          156234   |      155350          157569
64K  |  154364          154825   |      155790          158845
128K |  154644          154803   |      153418          155572
256K |  153882          152007   |      154784          154596
-----------------------------------------------------------------

Netperf reported Service demand reduced by 15% on the P6 but
no noticeable difference on the system-X box.

Please review.

Thanks,

- KK

Signed-off-by: Krishna Kumar <krkumar2@in.ibm.com>
---
 include/net/pkt_sched.h   |    3 +
 include/net/sch_generic.h |    6 ++
 net/core/dev.c            |   46 ++++++++++++-----
 net/sched/sch_generic.c   |   93 ++++++++++++++++++++++--------------
 4 files changed, 99 insertions(+), 49 deletions(-)

diff -ruNp org/include/net/pkt_sched.h new/include/net/pkt_sched.h
--- org/include/net/pkt_sched.h	2009-08-04 11:41:21.000000000 +0530
+++ new/include/net/pkt_sched.h	2009-08-04 11:42:18.000000000 +0530
@@ -87,6 +87,9 @@ extern struct qdisc_rate_table *qdisc_ge
 extern void qdisc_put_rtab(struct qdisc_rate_table *tab);
 extern void qdisc_put_stab(struct qdisc_size_table *tab);
 extern void qdisc_warn_nonwc(char *txt, struct Qdisc *qdisc);
+extern int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,
+			   struct net_device *dev, struct netdev_queue *txq,
+			   spinlock_t *root_lock);
 
 extern void __qdisc_run(struct Qdisc *q);
 
diff -ruNp org/include/net/sch_generic.h new/include/net/sch_generic.h
--- org/include/net/sch_generic.h	2009-03-24 08:54:16.000000000 +0530
+++ new/include/net/sch_generic.h	2009-08-04 11:02:39.000000000 +0530
@@ -45,6 +45,7 @@ struct Qdisc
 #define TCQ_F_BUILTIN		1
 #define TCQ_F_THROTTLED		2
 #define TCQ_F_INGRESS		4
+#define TCQ_F_CAN_BYPASS	8
 #define TCQ_F_WARN_NONWC	(1 << 16)
 	int			padded;
 	struct Qdisc_ops	*ops;
@@ -182,6 +183,11 @@ struct qdisc_skb_cb {
 	char			data[];
 };
 
+static inline int qdisc_qlen(struct Qdisc *q)
+{
+	return q->q.qlen;
+}
+
 static inline struct qdisc_skb_cb *qdisc_skb_cb(struct sk_buff *skb)
 {
 	return (struct qdisc_skb_cb *)skb->cb;
diff -ruNp org/net/core/dev.c new/net/core/dev.c
--- org/net/core/dev.c	2009-07-27 09:08:24.000000000 +0530
+++ new/net/core/dev.c	2009-08-04 11:44:57.000000000 +0530
@@ -1786,6 +1786,38 @@ static struct netdev_queue *dev_pick_tx(
 	return netdev_get_tx_queue(dev, queue_index);
 }
 
+static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q,
+				 struct net_device *dev,
+				 struct netdev_queue *txq)
+{
+	spinlock_t *root_lock = qdisc_lock(q);
+	int rc;
+
+	spin_lock(root_lock);
+	if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED, &q->state))) {
+		kfree_skb(skb);
+		rc = NET_XMIT_DROP;
+	} else if ((q->flags & TCQ_F_CAN_BYPASS) && !qdisc_qlen(q) &&
+		   !test_and_set_bit(__QDISC_STATE_RUNNING, &q->state)) {
+		/*
+		 * This is a work-conserving queue; there are no old skbs
+		 * waiting to be sent out; and the qdisc is not running -
+		 * xmit the skb directly.
+		 */
+		if (sch_direct_xmit(skb, q, dev, txq, root_lock))
+			__qdisc_run(q);
+		else
+			clear_bit(__QDISC_STATE_RUNNING, &q->state);
+		rc = NET_XMIT_SUCCESS;
+	} else {
+		rc = qdisc_enqueue_root(skb, q);
+		qdisc_run(q);
+	}
+	spin_unlock(root_lock);
+
+	return rc;
+}
+
 /**
  *	dev_queue_xmit - transmit a buffer
  *	@skb: buffer to transmit
@@ -1859,19 +1891,7 @@ gso:
 	skb->tc_verd = SET_TC_AT(skb->tc_verd,AT_EGRESS);
 #endif
 	if (q->enqueue) {
-		spinlock_t *root_lock = qdisc_lock(q);
-
-		spin_lock(root_lock);
-
-		if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED, &q->state))) {
-			kfree_skb(skb);
-			rc = NET_XMIT_DROP;
-		} else {
-			rc = qdisc_enqueue_root(skb, q);
-			qdisc_run(q);
-		}
-		spin_unlock(root_lock);
-
+		rc = __dev_xmit_skb(skb, q, dev, txq);
 		goto out;
 	}
 
diff -ruNp org/net/sched/sch_generic.c new/net/sched/sch_generic.c
--- org/net/sched/sch_generic.c	2009-05-25 07:48:07.000000000 +0530
+++ new/net/sched/sch_generic.c	2009-08-04 11:00:38.000000000 +0530
@@ -37,15 +37,11 @@
  * - updates to tree and tree walking are only done under the rtnl mutex.
  */
 
-static inline int qdisc_qlen(struct Qdisc *q)
-{
-	return q->q.qlen;
-}
-
 static inline int dev_requeue_skb(struct sk_buff *skb, struct Qdisc *q)
 {
 	q->gso_skb = skb;
 	q->qstats.requeues++;
+	q->q.qlen++;	/* it's still part of the queue */
 	__netif_schedule(q);
 
 	return 0;
@@ -61,9 +57,11 @@ static inline struct sk_buff *dequeue_sk
 
 		/* check the reason of requeuing without tx lock first */
 		txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb));
-		if (!netif_tx_queue_stopped(txq) && !netif_tx_queue_frozen(txq))
+		if (!netif_tx_queue_stopped(txq) &&
+		    !netif_tx_queue_frozen(txq)) {
 			q->gso_skb = NULL;
-		else
+			q->q.qlen--;
+		} else
 			skb = NULL;
 	} else {
 		skb = q->dequeue(q);
@@ -103,44 +101,23 @@ static inline int handle_dev_cpu_collisi
 }
 
 /*
- * NOTE: Called under qdisc_lock(q) with locally disabled BH.
- *
- * __QDISC_STATE_RUNNING guarantees only one CPU can process
- * this qdisc at a time. qdisc_lock(q) serializes queue accesses for
- * this queue.
- *
- *  netif_tx_lock serializes accesses to device driver.
- *
- *  qdisc_lock(q) and netif_tx_lock are mutually exclusive,
- *  if one is grabbed, another must be free.
- *
- * Note, that this procedure can be called by a watchdog timer
+ * Transmit one skb, and handle the return status as required. Holding the
+ * __QDISC_STATE_RUNNING bit guarantees that only one CPU can execute this
+ * function.
  *
  * Returns to the caller:
  *				0  - queue is empty or throttled.
  *				>0 - queue is not empty.
- *
  */
-static inline int qdisc_restart(struct Qdisc *q)
+int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,
+		    struct net_device *dev, struct netdev_queue *txq,
+		    spinlock_t *root_lock)
 {
-	struct netdev_queue *txq;
 	int ret = NETDEV_TX_BUSY;
-	struct net_device *dev;
-	spinlock_t *root_lock;
-	struct sk_buff *skb;
-
-	/* Dequeue packet */
-	if (unlikely((skb = dequeue_skb(q)) == NULL))
-		return 0;
-
-	root_lock = qdisc_lock(q);
 
 	/* And release qdisc */
 	spin_unlock(root_lock);
 
-	dev = qdisc_dev(q);
-	txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb));
-
 	HARD_TX_LOCK(dev, txq, smp_processor_id());
 	if (!netif_tx_queue_stopped(txq) &&
 	    !netif_tx_queue_frozen(txq))
@@ -177,6 +154,44 @@ static inline int qdisc_restart(struct Q
 	return ret;
 }
 
+/*
+ * NOTE: Called under qdisc_lock(q) with locally disabled BH.
+ *
+ * __QDISC_STATE_RUNNING guarantees only one CPU can process
+ * this qdisc at a time. qdisc_lock(q) serializes queue accesses for
+ * this queue.
+ *
+ *  netif_tx_lock serializes accesses to device driver.
+ *
+ *  qdisc_lock(q) and netif_tx_lock are mutually exclusive,
+ *  if one is grabbed, another must be free.
+ *
+ * Note, that this procedure can be called by a watchdog timer
+ *
+ * Returns to the caller:
+ *				0  - queue is empty or throttled.
+ *				>0 - queue is not empty.
+ *
+ */
+static inline int qdisc_restart(struct Qdisc *q)
+{
+	struct netdev_queue *txq;
+	struct net_device *dev;
+	spinlock_t *root_lock;
+	struct sk_buff *skb;
+
+	/* Dequeue packet */
+	skb = dequeue_skb(q);
+	if (unlikely(!skb))
+		return 0;
+
+	root_lock = qdisc_lock(q);
+	dev = qdisc_dev(q);
+	txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb));
+
+	return sch_direct_xmit(skb, q, dev, txq, root_lock);
+}
+
 void __qdisc_run(struct Qdisc *q)
 {
 	unsigned long start_time = jiffies;
@@ -547,8 +562,11 @@ void qdisc_reset(struct Qdisc *qdisc)
 	if (ops->reset)
 		ops->reset(qdisc);
 
-	kfree_skb(qdisc->gso_skb);
-	qdisc->gso_skb = NULL;
+	if (qdisc->gso_skb) {
+		kfree_skb(qdisc->gso_skb);
+		qdisc->gso_skb = NULL;
+		qdisc->q.qlen--;
+	}
 }
 EXPORT_SYMBOL(qdisc_reset);
 
@@ -605,6 +623,9 @@ static void attach_one_default_qdisc(str
 			printk(KERN_INFO "%s: activation failed\n", dev->name);
 			return;
 		}
+
+		/* Can by-pass the queue discipline for default qdisc */
+		qdisc->flags |= TCQ_F_CAN_BYPASS;
 	} else {
 		qdisc =  &noqueue_qdisc;
 	}

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [RFC] [PATCH] Avoid enqueuing skb for default qdiscs
  2009-08-03 19:38   ` Jarek Poplawski
@ 2009-08-03 20:22     ` Jarek Poplawski
  2009-08-04  6:55     ` Krishna Kumar2
  1 sibling, 0 replies; 16+ messages in thread
From: Jarek Poplawski @ 2009-08-03 20:22 UTC (permalink / raw)
  To: Krishna Kumar2; +Cc: davem, herbert, kaber, netdev

On Mon, Aug 03, 2009 at 09:38:47PM +0200, Jarek Poplawski wrote:
> Krishna Kumar2 wrote, On 08/03/2009 11:17 AM:
> > Krishna Kumar2/India/IBM@IBMIN wrote on 08/03/2009 01:40:16 PM:
> > 
> >> Krishna Kumar2/India/IBM@IBMIN
> >> 08/03/2009 01:40 PM
...
> >> [RFC] [PATCH] Avoid enqueuing skb for default qdiscs
> >>
> >> From: Krishna Kumar <krkumar2@in.ibm.com>
...
> >> @@ -547,8 +561,11 @@ void qdisc_reset(struct Qdisc *qdisc)
> >>     if (ops->reset)
> >>        ops->reset(qdisc);
> >>
> >> -   kfree_skb(qdisc->gso_skb);
> >> -   qdisc->gso_skb = NULL;
> >> +   if (qdisc->gso_skb) {
> >> +      kfree_skb(qdisc->gso_skb);
> >> +      qdisc->gso_skb = NULL;
> >> +      qdisc->q.qlen--;
> 
> You don't need this here: qdiscs should zero it in ops->reset().

Actually, there're some exceptions, so until it's fixed maybe you
should better zero it here too.

Jarek P.

> 
> >> +   }
> >>  }
> >>  EXPORT_SYMBOL(qdisc_reset);

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [RFC] [PATCH] Avoid enqueuing skb for default qdiscs
  2009-08-03  9:17 ` Krishna Kumar2
  2009-08-03  9:58   ` Jarek Poplawski
@ 2009-08-03 19:38   ` Jarek Poplawski
  2009-08-03 20:22     ` Jarek Poplawski
  2009-08-04  6:55     ` Krishna Kumar2
  1 sibling, 2 replies; 16+ messages in thread
From: Jarek Poplawski @ 2009-08-03 19:38 UTC (permalink / raw)
  To: Krishna Kumar2; +Cc: davem, herbert, kaber, netdev

Krishna Kumar2 wrote, On 08/03/2009 11:17 AM:

> It is not going through for some reason though I sent the patch about
> 1.5 hours back. Is it OK if I attach the patch this one time while I figure
> out what needs to be done to fix my mailer to start working again?
> 
> Following attachment contains the description and the patch.
> 
> thanks,
> 
> - KK
> 
> 
> (See attached file: patch.dont_queue)

Since this quoted part looks very similarly I'm using it here, so
forget my note if something has changed.

> 
> 
> Krishna Kumar2/India/IBM@IBMIN wrote on 08/03/2009 01:40:16 PM:
> 
>> Krishna Kumar2/India/IBM@IBMIN
>> 08/03/2009 01:40 PM
>>
>> To
>>
>> davem@davemloft.net
>>
>> cc
>>
>> Krishna Kumar2/India/IBM@IBMIN, herbert@gondor.apana.org.au,
> kaber@trash.net,
>> jarkao2@gmail.com, netdev@vger.kernel.org
>>
>> Subject
>>
>> [RFC] [PATCH] Avoid enqueuing skb for default qdiscs
>>
>> From: Krishna Kumar <krkumar2@in.ibm.com>
>>
>> dev_queue_xmit enqueue's a skb and calls qdisc_run which
>> dequeue's the skb and xmits it. In most cases (after
>> instrumenting the code), the skb that is enqueue'd is the
>> same one that is dequeue'd (unless the queue gets stopped
>> or multiple cpu's write to the same queue and ends in a
>> race with qdisc_run). For default qdiscs, we can remove
>> this path and simply xmit the skb since this is a work
>> conserving queue.
>>
>> The patch uses a new flag - TCQ_F_CAN_BYPASS to identify
>> the default fast queue. I plan to use this flag for the
>> previous patch also (rename if required).  The controversial
>> part of the patch is incrementing qlen when a skb is
>> requeued, this is to avoid checks like the second line below:
>>
>> +    } else if ((q->flags & TCQ_F_CAN_BYPASS) && !qdisc_qlen(q) &&
>>>> THIS LINE:   !q->gso_skb &&
>> +               !test_and_set_bit(__QDISC_STATE_RUNNING, &q->state)) {
>>
>> Results of a 4 hour testing for multiple netperf sessions
>> (1, 2, 4, 8, 12 sessions on a 4 cpu system-X and 1, 2, 4,
>> 8, 16, 32 sessions on a 16 cpu P6). Aggregate Mb/s across
>> the iterations:
>>
>> -----------------------------------------------------------------
>>      |         System-X          |               P6
>> -----------------------------------------------------------------
>> Size |  ORG BW          NEW BW   |      ORG BW          NEW BW
>> -----|---------------------------|-------------------------------
>> 16K  |  154264          156234   |      155350          157569
>> 64K  |  154364          154825   |      155790          158845
>> 128K |  154644          154803   |      153418          155572
>> 256K |  153882          152007   |      154784          154596
>> -----------------------------------------------------------------
>>
>> Netperf reported Service demand reduced by 15% on the P6 but
>> no noticeable difference on the system-X box.
>>
>> Please review.
>>
>> Thanks,
>>
>> - KK
>>
>> Signed-off-by: Krishna Kumar <krkumar2@in.ibm.com>
>> ---
>>
>>  include/net/sch_generic.h |    9 +++
>>  net/core/dev.c            |   46 ++++++++++++-----
>>  net/sched/sch_generic.c   |   94 +++++++++++++++++++++---------------
>>  3 files changed, 99 insertions(+), 50 deletions(-)
>>
>> diff -ruNp org/include/net/sch_generic.h new/include/net/sch_generic.h
>> --- org/include/net/sch_generic.h   2009-03-24 08:54:16.000000000 +0530
>> +++ new/include/net/sch_generic.h   2009-07-28 19:37:10.000000000 +0530
>> @@ -45,6 +45,7 @@ struct Qdisc
>>  #define TCQ_F_BUILTIN      1
>>  #define TCQ_F_THROTTLED      2
>>  #define TCQ_F_INGRESS      4
>> +#define TCQ_F_CAN_BYPASS   8
>>  #define TCQ_F_WARN_NONWC   (1 << 16)
>>     int         padded;
>>     struct Qdisc_ops   *ops;
>> @@ -182,6 +183,11 @@ struct qdisc_skb_cb {
>>     char         data[];
>>  };
>>
>> +static inline int qdisc_qlen(struct Qdisc *q)
>> +{
>> +   return q->q.qlen;
>> +}
>> +
>>  static inline struct qdisc_skb_cb *qdisc_skb_cb(struct sk_buff *skb)
>>  {
>>     return (struct qdisc_skb_cb *)skb->cb;
>> @@ -306,6 +312,9 @@ extern struct Qdisc *qdisc_create_dflt(s
>>                     struct Qdisc_ops *ops, u32 parentid);
>>  extern void qdisc_calculate_pkt_len(struct sk_buff *skb,
>>                 struct qdisc_size_table *stab);
>> +extern int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,
>> +            struct net_device *dev, struct netdev_queue *txq,
>> +            spinlock_t *root_lock);

Probably this would look better in pkt_sched.h around __qdisc_run()?

>>  extern void tcf_destroy(struct tcf_proto *tp);
>>  extern void tcf_destroy_chain(struct tcf_proto **fl);
>>
>> diff -ruNp org2/net/core/dev.c new2/net/core/dev.c
>> --- org2/net/core/dev.c   2009-07-27 09:08:24.000000000 +0530
>> +++ new2/net/core/dev.c   2009-07-28 19:37:10.000000000 +0530
>> @@ -1786,6 +1786,38 @@ static struct netdev_queue *dev_pick_tx(
>>     return netdev_get_tx_queue(dev, queue_index);
>>  }
>>
>> +static inline int __dev_hw_xmit(struct sk_buff *skb, struct Qdisc *q,

I guess, we don't touch "hw" in this function yet?

>> +            struct net_device *dev,
>> +            struct netdev_queue *txq)
>> +{
>> +   spinlock_t *root_lock = qdisc_lock(q);
>> +   int rc;
>> +
>> +   spin_lock(root_lock);
>> +   if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED, &q->state))) {
>> +      kfree_skb(skb);
>> +      rc = NET_XMIT_DROP;
>> +   } else if ((q->flags & TCQ_F_CAN_BYPASS) && !qdisc_qlen(q) &&
>> +         !test_and_set_bit(__QDISC_STATE_RUNNING, &q->state)) {
>> +      /*
>> +       * This is the default work-conserving queue; there are no
>> +       * old skbs waiting to be sent out; and the qdisc is not
>> +       * running - xmit the skb directly.
>> +       */
>> +      if (sch_direct_xmit(skb, q, dev, txq, root_lock))
>> +         __qdisc_run(q);
>> +      else
>> +         clear_bit(__QDISC_STATE_RUNNING, &q->state);
>> +      rc = 0;

-      rc = 0;
+      rc = NET_XMIT_SUCCESS;

>> +   } else {
>> +      rc = qdisc_enqueue_root(skb, q);
>> +      qdisc_run(q);
>> +   }
>> +   spin_unlock(root_lock);
>> +
>> +   return rc;
>> +}
>> +
>>  /**
>>   *   dev_queue_xmit - transmit a buffer
>>   *   @skb: buffer to transmit
>> @@ -1859,19 +1891,7 @@ gso:
>>     skb->tc_verd = SET_TC_AT(skb->tc_verd,AT_EGRESS);
>>  #endif
>>     if (q->enqueue) {
>> -      spinlock_t *root_lock = qdisc_lock(q);
>> -
>> -      spin_lock(root_lock);
>> -
>> -      if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED, &q->state))) {
>> -         kfree_skb(skb);
>> -         rc = NET_XMIT_DROP;
>> -      } else {
>> -         rc = qdisc_enqueue_root(skb, q);
>> -         qdisc_run(q);
>> -      }
>> -      spin_unlock(root_lock);
>> -
>> +      rc = __dev_hw_xmit(skb, q, dev, txq);
>>        goto out;
>>     }
>>
>> diff -ruNp org2/net/sched/sch_generic.c new2/net/sched/sch_generic.c
>> --- org2/net/sched/sch_generic.c   2009-05-25 07:48:07.000000000 +0530
>> +++ new2/net/sched/sch_generic.c   2009-07-28 19:40:39.000000000 +0530
>> @@ -37,15 +37,11 @@
>>   * - updates to tree and tree walking are only done under the rtnl
> mutex.
>>   */
>>
>> -static inline int qdisc_qlen(struct Qdisc *q)
>> -{
>> -   return q->q.qlen;
>> -}
>> -
>>  static inline int dev_requeue_skb(struct sk_buff *skb, struct Qdisc *q)
>>  {
>>     q->gso_skb = skb;
>>     q->qstats.requeues++;
>> +   q->q.qlen++;   /* it's still part of the queue */
>>     __netif_schedule(q);
>>
>>     return 0;
>> @@ -61,9 +57,11 @@ static inline struct sk_buff *dequeue_sk
>>
>>        /* check the reason of requeuing without tx lock first */
>>        txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb));
>> -      if (!netif_tx_queue_stopped(txq) && !netif_tx_queue_frozen(txq))
>> +      if (!netif_tx_queue_stopped(txq) &&
>> +          !netif_tx_queue_frozen(txq)) {
>>           q->gso_skb = NULL;
>> -      else
>> +         q->q.qlen--;
>> +      } else
>>           skb = NULL;
>>     } else {
>>        skb = q->dequeue(q);
>> @@ -102,45 +100,24 @@ static inline int handle_dev_cpu_collisi
>>     return ret;
>>  }
>>
>> -/*
>> - * NOTE: Called under qdisc_lock(q) with locally disabled BH.
>> - *
>> - * __QDISC_STATE_RUNNING guarantees only one CPU can process
>> - * this qdisc at a time. qdisc_lock(q) serializes queue accesses for
>> - * this queue.
>> - *
>> - *  netif_tx_lock serializes accesses to device driver.
>> - *
>> - *  qdisc_lock(q) and netif_tx_lock are mutually exclusive,
>> - *  if one is grabbed, another must be free.
>> - *
>> - * Note, that this procedure can be called by a watchdog timer
>> +/*
>> + * Transmit one skb, and handle the return status as required. Holding
> the
>> + * __QDISC_STATE_RUNNING bit guarantees that only one CPU can execute
> this
>> + * function.
>>   *
>>   * Returns to the caller:
>>   *            0  - queue is empty or throttled.
>>   *            >0 - queue is not empty.
>> - *
>>   */
>> -static inline int qdisc_restart(struct Qdisc *q)
>> +int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,
>> +          struct net_device *dev, struct netdev_queue *txq,
>> +          spinlock_t *root_lock)
>>  {
>> -   struct netdev_queue *txq;
>>     int ret = NETDEV_TX_BUSY;
>> -   struct net_device *dev;
>> -   spinlock_t *root_lock;
>> -   struct sk_buff *skb;
>> -
>> -   /* Dequeue packet */
>> -   if (unlikely((skb = dequeue_skb(q)) == NULL))
>> -      return 0;
>> -
>> -   root_lock = qdisc_lock(q);
>>
>>     /* And release qdisc */
>>     spin_unlock(root_lock);
>>
>> -   dev = qdisc_dev(q);
>> -   txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb));
>> -
>>     HARD_TX_LOCK(dev, txq, smp_processor_id());
>>     if (!netif_tx_queue_stopped(txq) &&
>>         !netif_tx_queue_frozen(txq))
>> @@ -177,6 +154,43 @@ static inline int qdisc_restart(struct Q
>>     return ret;
>>  }
>>
>> +/*
>> + * NOTE: Called under qdisc_lock(q) with locally disabled BH.
>> + *
>> + * __QDISC_STATE_RUNNING guarantees only one CPU can process
>> + * this qdisc at a time. qdisc_lock(q) serializes queue accesses for
>> + * this queue.
>> + *
>> + *  netif_tx_lock serializes accesses to device driver.
>> + *
>> + *  qdisc_lock(q) and netif_tx_lock are mutually exclusive,
>> + *  if one is grabbed, another must be free.
>> + *
>> + * Note, that this procedure can be called by a watchdog timer
>> + *
>> + * Returns to the caller:
>> + *            0  - queue is empty or throttled.
>> + *            >0 - queue is not empty.
>> + *
>> + */
>> +static inline int qdisc_restart(struct Qdisc *q)
>> +{
>> +   struct netdev_queue *txq;
>> +   struct net_device *dev;
>> +   spinlock_t *root_lock;
>> +   struct sk_buff *skb;
>> +
>> +   /* Dequeue packet */
>> +   if (unlikely((skb = dequeue_skb(q)) == NULL))
>> +      return 0;
>> +
>> +   root_lock = qdisc_lock(q);
>> +   dev = qdisc_dev(q);
>> +   txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb));
>> +
>> +   return sch_xmit_direct(skb, q, dev, txq, root_lock);
>> +}
>> +
>>  void __qdisc_run(struct Qdisc *q)
>>  {
>>     unsigned long start_time = jiffies;
>> @@ -547,8 +561,11 @@ void qdisc_reset(struct Qdisc *qdisc)
>>     if (ops->reset)
>>        ops->reset(qdisc);
>>
>> -   kfree_skb(qdisc->gso_skb);
>> -   qdisc->gso_skb = NULL;
>> +   if (qdisc->gso_skb) {
>> +      kfree_skb(qdisc->gso_skb);
>> +      qdisc->gso_skb = NULL;
>> +      qdisc->q.qlen--;

You don't need this here: qdiscs should zero it in ops->reset().

>> +   }
>>  }
>>  EXPORT_SYMBOL(qdisc_reset);
>>
>> @@ -605,6 +622,9 @@ static void attach_one_default_qdisc(str
>>           printk(KERN_INFO "%s: activation failed\n", dev->name);
>>           return;
>>        }
>> +
>> +      /* Can by-pass the queue discipline for default qdisc */
>> +      qdisc->flags = TCQ_F_CAN_BYPASS;

"|=" looks nicer to me.

>>     } else {
>>        qdisc =  &noqueue_qdisc;
>>     }

I wonder if we shouldn't update bstats yet. Btw, try checkpatch.

Thanks,
Jarek P.

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [RFC] [PATCH] Avoid enqueuing skb for default qdiscs
  2009-08-03  9:17 ` Krishna Kumar2
@ 2009-08-03  9:58   ` Jarek Poplawski
  2009-08-03 19:38   ` Jarek Poplawski
  1 sibling, 0 replies; 16+ messages in thread
From: Jarek Poplawski @ 2009-08-03  9:58 UTC (permalink / raw)
  To: Krishna Kumar2; +Cc: davem, herbert, kaber, netdev

On Mon, Aug 03, 2009 at 02:47:24PM +0530, Krishna Kumar2 wrote:
> It is not going through for some reason though I sent the patch about
> 1.5 hours back. Is it OK if I attach the patch this one time while I figure
> out what needs to be done to fix my mailer to start working again?
> 
> Following attachment contains the description and the patch.
> 
> thanks,
> 
> - KK
> 
> 
> (See attached file: patch.dont_queue)
> 

I can see this file :-)

Thanks,
Jarek P.

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [RFC] [PATCH] Avoid enqueuing skb for default qdiscs
       [not found] <20090803081016.32746.23400.sendpatchset@localhost.localdomain>
@ 2009-08-03  9:17 ` Krishna Kumar2
  2009-08-03  9:58   ` Jarek Poplawski
  2009-08-03 19:38   ` Jarek Poplawski
  0 siblings, 2 replies; 16+ messages in thread
From: Krishna Kumar2 @ 2009-08-03  9:17 UTC (permalink / raw)
  To: davem; +Cc: herbert, jarkao2, kaber, netdev

[-- Attachment #1: Type: text/plain, Size: 10995 bytes --]

It is not going through for some reason though I sent the patch about
1.5 hours back. Is it OK if I attach the patch this one time while I figure
out what needs to be done to fix my mailer to start working again?

Following attachment contains the description and the patch.

thanks,

- KK


(See attached file: patch.dont_queue)


Krishna Kumar2/India/IBM@IBMIN wrote on 08/03/2009 01:40:16 PM:

> Krishna Kumar2/India/IBM@IBMIN
> 08/03/2009 01:40 PM
>
> To
>
> davem@davemloft.net
>
> cc
>
> Krishna Kumar2/India/IBM@IBMIN, herbert@gondor.apana.org.au,
kaber@trash.net,
> jarkao2@gmail.com, netdev@vger.kernel.org
>
> Subject
>
> [RFC] [PATCH] Avoid enqueuing skb for default qdiscs
>
> From: Krishna Kumar <krkumar2@in.ibm.com>
>
> dev_queue_xmit enqueue's a skb and calls qdisc_run which
> dequeue's the skb and xmits it. In most cases (after
> instrumenting the code), the skb that is enqueue'd is the
> same one that is dequeue'd (unless the queue gets stopped
> or multiple cpu's write to the same queue and ends in a
> race with qdisc_run). For default qdiscs, we can remove
> this path and simply xmit the skb since this is a work
> conserving queue.
>
> The patch uses a new flag - TCQ_F_CAN_BYPASS to identify
> the default fast queue. I plan to use this flag for the
> previous patch also (rename if required).  The controversial
> part of the patch is incrementing qlen when a skb is
> requeued, this is to avoid checks like the second line below:
>
> +    } else if ((q->flags & TCQ_F_CAN_BYPASS) && !qdisc_qlen(q) &&
> >> THIS LINE:   !q->gso_skb &&
> +               !test_and_set_bit(__QDISC_STATE_RUNNING, &q->state)) {
>
> Results of a 4 hour testing for multiple netperf sessions
> (1, 2, 4, 8, 12 sessions on a 4 cpu system-X and 1, 2, 4,
> 8, 16, 32 sessions on a 16 cpu P6). Aggregate Mb/s across
> the iterations:
>
> -----------------------------------------------------------------
>      |         System-X          |               P6
> -----------------------------------------------------------------
> Size |  ORG BW          NEW BW   |      ORG BW          NEW BW
> -----|---------------------------|-------------------------------
> 16K  |  154264          156234   |      155350          157569
> 64K  |  154364          154825   |      155790          158845
> 128K |  154644          154803   |      153418          155572
> 256K |  153882          152007   |      154784          154596
> -----------------------------------------------------------------
>
> Netperf reported Service demand reduced by 15% on the P6 but
> no noticeable difference on the system-X box.
>
> Please review.
>
> Thanks,
>
> - KK
>
> Signed-off-by: Krishna Kumar <krkumar2@in.ibm.com>
> ---
>
>  include/net/sch_generic.h |    9 +++
>  net/core/dev.c            |   46 ++++++++++++-----
>  net/sched/sch_generic.c   |   94 +++++++++++++++++++++---------------
>  3 files changed, 99 insertions(+), 50 deletions(-)
>
> diff -ruNp org/include/net/sch_generic.h new/include/net/sch_generic.h
> --- org/include/net/sch_generic.h   2009-03-24 08:54:16.000000000 +0530
> +++ new/include/net/sch_generic.h   2009-07-28 19:37:10.000000000 +0530
> @@ -45,6 +45,7 @@ struct Qdisc
>  #define TCQ_F_BUILTIN      1
>  #define TCQ_F_THROTTLED      2
>  #define TCQ_F_INGRESS      4
> +#define TCQ_F_CAN_BYPASS   8
>  #define TCQ_F_WARN_NONWC   (1 << 16)
>     int         padded;
>     struct Qdisc_ops   *ops;
> @@ -182,6 +183,11 @@ struct qdisc_skb_cb {
>     char         data[];
>  };
>
> +static inline int qdisc_qlen(struct Qdisc *q)
> +{
> +   return q->q.qlen;
> +}
> +
>  static inline struct qdisc_skb_cb *qdisc_skb_cb(struct sk_buff *skb)
>  {
>     return (struct qdisc_skb_cb *)skb->cb;
> @@ -306,6 +312,9 @@ extern struct Qdisc *qdisc_create_dflt(s
>                     struct Qdisc_ops *ops, u32 parentid);
>  extern void qdisc_calculate_pkt_len(struct sk_buff *skb,
>                 struct qdisc_size_table *stab);
> +extern int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,
> +            struct net_device *dev, struct netdev_queue *txq,
> +            spinlock_t *root_lock);
>  extern void tcf_destroy(struct tcf_proto *tp);
>  extern void tcf_destroy_chain(struct tcf_proto **fl);
>
> diff -ruNp org2/net/core/dev.c new2/net/core/dev.c
> --- org2/net/core/dev.c   2009-07-27 09:08:24.000000000 +0530
> +++ new2/net/core/dev.c   2009-07-28 19:37:10.000000000 +0530
> @@ -1786,6 +1786,38 @@ static struct netdev_queue *dev_pick_tx(
>     return netdev_get_tx_queue(dev, queue_index);
>  }
>
> +static inline int __dev_hw_xmit(struct sk_buff *skb, struct Qdisc *q,
> +            struct net_device *dev,
> +            struct netdev_queue *txq)
> +{
> +   spinlock_t *root_lock = qdisc_lock(q);
> +   int rc;
> +
> +   spin_lock(root_lock);
> +   if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED, &q->state))) {
> +      kfree_skb(skb);
> +      rc = NET_XMIT_DROP;
> +   } else if ((q->flags & TCQ_F_CAN_BYPASS) && !qdisc_qlen(q) &&
> +         !test_and_set_bit(__QDISC_STATE_RUNNING, &q->state)) {
> +      /*
> +       * This is the default work-conserving queue; there are no
> +       * old skbs waiting to be sent out; and the qdisc is not
> +       * running - xmit the skb directly.
> +       */
> +      if (sch_direct_xmit(skb, q, dev, txq, root_lock))
> +         __qdisc_run(q);
> +      else
> +         clear_bit(__QDISC_STATE_RUNNING, &q->state);
> +      rc = 0;
> +   } else {
> +      rc = qdisc_enqueue_root(skb, q);
> +      qdisc_run(q);
> +   }
> +   spin_unlock(root_lock);
> +
> +   return rc;
> +}
> +
>  /**
>   *   dev_queue_xmit - transmit a buffer
>   *   @skb: buffer to transmit
> @@ -1859,19 +1891,7 @@ gso:
>     skb->tc_verd = SET_TC_AT(skb->tc_verd,AT_EGRESS);
>  #endif
>     if (q->enqueue) {
> -      spinlock_t *root_lock = qdisc_lock(q);
> -
> -      spin_lock(root_lock);
> -
> -      if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED, &q->state))) {
> -         kfree_skb(skb);
> -         rc = NET_XMIT_DROP;
> -      } else {
> -         rc = qdisc_enqueue_root(skb, q);
> -         qdisc_run(q);
> -      }
> -      spin_unlock(root_lock);
> -
> +      rc = __dev_hw_xmit(skb, q, dev, txq);
>        goto out;
>     }
>
> diff -ruNp org2/net/sched/sch_generic.c new2/net/sched/sch_generic.c
> --- org2/net/sched/sch_generic.c   2009-05-25 07:48:07.000000000 +0530
> +++ new2/net/sched/sch_generic.c   2009-07-28 19:40:39.000000000 +0530
> @@ -37,15 +37,11 @@
>   * - updates to tree and tree walking are only done under the rtnl
mutex.
>   */
>
> -static inline int qdisc_qlen(struct Qdisc *q)
> -{
> -   return q->q.qlen;
> -}
> -
>  static inline int dev_requeue_skb(struct sk_buff *skb, struct Qdisc *q)
>  {
>     q->gso_skb = skb;
>     q->qstats.requeues++;
> +   q->q.qlen++;   /* it's still part of the queue */
>     __netif_schedule(q);
>
>     return 0;
> @@ -61,9 +57,11 @@ static inline struct sk_buff *dequeue_sk
>
>        /* check the reason of requeuing without tx lock first */
>        txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb));
> -      if (!netif_tx_queue_stopped(txq) && !netif_tx_queue_frozen(txq))
> +      if (!netif_tx_queue_stopped(txq) &&
> +          !netif_tx_queue_frozen(txq)) {
>           q->gso_skb = NULL;
> -      else
> +         q->q.qlen--;
> +      } else
>           skb = NULL;
>     } else {
>        skb = q->dequeue(q);
> @@ -102,45 +100,24 @@ static inline int handle_dev_cpu_collisi
>     return ret;
>  }
>
> -/*
> - * NOTE: Called under qdisc_lock(q) with locally disabled BH.
> - *
> - * __QDISC_STATE_RUNNING guarantees only one CPU can process
> - * this qdisc at a time. qdisc_lock(q) serializes queue accesses for
> - * this queue.
> - *
> - *  netif_tx_lock serializes accesses to device driver.
> - *
> - *  qdisc_lock(q) and netif_tx_lock are mutually exclusive,
> - *  if one is grabbed, another must be free.
> - *
> - * Note, that this procedure can be called by a watchdog timer
> +/*
> + * Transmit one skb, and handle the return status as required. Holding
the
> + * __QDISC_STATE_RUNNING bit guarantees that only one CPU can execute
this
> + * function.
>   *
>   * Returns to the caller:
>   *            0  - queue is empty or throttled.
>   *            >0 - queue is not empty.
> - *
>   */
> -static inline int qdisc_restart(struct Qdisc *q)
> +int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,
> +          struct net_device *dev, struct netdev_queue *txq,
> +          spinlock_t *root_lock)
>  {
> -   struct netdev_queue *txq;
>     int ret = NETDEV_TX_BUSY;
> -   struct net_device *dev;
> -   spinlock_t *root_lock;
> -   struct sk_buff *skb;
> -
> -   /* Dequeue packet */
> -   if (unlikely((skb = dequeue_skb(q)) == NULL))
> -      return 0;
> -
> -   root_lock = qdisc_lock(q);
>
>     /* And release qdisc */
>     spin_unlock(root_lock);
>
> -   dev = qdisc_dev(q);
> -   txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb));
> -
>     HARD_TX_LOCK(dev, txq, smp_processor_id());
>     if (!netif_tx_queue_stopped(txq) &&
>         !netif_tx_queue_frozen(txq))
> @@ -177,6 +154,43 @@ static inline int qdisc_restart(struct Q
>     return ret;
>  }
>
> +/*
> + * NOTE: Called under qdisc_lock(q) with locally disabled BH.
> + *
> + * __QDISC_STATE_RUNNING guarantees only one CPU can process
> + * this qdisc at a time. qdisc_lock(q) serializes queue accesses for
> + * this queue.
> + *
> + *  netif_tx_lock serializes accesses to device driver.
> + *
> + *  qdisc_lock(q) and netif_tx_lock are mutually exclusive,
> + *  if one is grabbed, another must be free.
> + *
> + * Note, that this procedure can be called by a watchdog timer
> + *
> + * Returns to the caller:
> + *            0  - queue is empty or throttled.
> + *            >0 - queue is not empty.
> + *
> + */
> +static inline int qdisc_restart(struct Qdisc *q)
> +{
> +   struct netdev_queue *txq;
> +   struct net_device *dev;
> +   spinlock_t *root_lock;
> +   struct sk_buff *skb;
> +
> +   /* Dequeue packet */
> +   if (unlikely((skb = dequeue_skb(q)) == NULL))
> +      return 0;
> +
> +   root_lock = qdisc_lock(q);
> +   dev = qdisc_dev(q);
> +   txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb));
> +
> +   return sch_xmit_direct(skb, q, dev, txq, root_lock);
> +}
> +
>  void __qdisc_run(struct Qdisc *q)
>  {
>     unsigned long start_time = jiffies;
> @@ -547,8 +561,11 @@ void qdisc_reset(struct Qdisc *qdisc)
>     if (ops->reset)
>        ops->reset(qdisc);
>
> -   kfree_skb(qdisc->gso_skb);
> -   qdisc->gso_skb = NULL;
> +   if (qdisc->gso_skb) {
> +      kfree_skb(qdisc->gso_skb);
> +      qdisc->gso_skb = NULL;
> +      qdisc->q.qlen--;
> +   }
>  }
>  EXPORT_SYMBOL(qdisc_reset);
>
> @@ -605,6 +622,9 @@ static void attach_one_default_qdisc(str
>           printk(KERN_INFO "%s: activation failed\n", dev->name);
>           return;
>        }
> +
> +      /* Can by-pass the queue discipline for default qdisc */
> +      qdisc->flags = TCQ_F_CAN_BYPASS;
>     } else {
>        qdisc =  &noqueue_qdisc;
>     }

[-- Attachment #2: patch.dont_queue --]
[-- Type: application/octet-stream, Size: 9264 bytes --]

From: Krishna Kumar <krkumar2@in.ibm.com>

dev_queue_xmit enqueue's a skb and calls qdisc_run which
dequeue's the skb and xmits it. In most cases (after
instrumenting the code), the skb that is enqueue'd is the
same one that is dequeue'd (unless the queue gets stopped
or multiple cpu's write to the same queue and ends in a
race with qdisc_run). For default qdiscs, we can remove
this path and simply xmit the skb since this is a work
conserving queue.

The patch uses a new flag - TCQ_F_CAN_BYPASS to identify
the default fast queue. I plan to use this flag for the
previous patch also (rename if required).  The controversial
part of the patch is incrementing qlen when a skb is
requeued, this is to avoid checks like the second line below:

+    } else if ((q->flags & TCQ_F_CAN_BYPASS) && !qdisc_qlen(q) &&
>> THIS LINE:   !q->gso_skb &&
+               !test_and_set_bit(__QDISC_STATE_RUNNING, &q->state)) {

Results of a 4 hour testing for multiple netperf sessions
(1, 2, 4, 8, 12 sessions on a 4 cpu system-X and 1, 2, 4,
8, 16, 32 sessions on a 16 cpu P6). Aggregate Mb/s across
the iterations:

-----------------------------------------------------------------
     |         System-X          |               P6
-----------------------------------------------------------------
Size |  ORG BW          NEW BW   |      ORG BW          NEW BW
-----|---------------------------|-------------------------------
16K  |  154264          156234   |      155350          157569
64K  |  154364          154825   |      155790          158845
128K |  154644          154803   |      153418          155572
256K |  153882          152007   |      154784          154596
-----------------------------------------------------------------

Netperf reported Service demand reduced by 15% on the P6 but
no noticeable difference on the system-X box.

Please review.

Thanks,

- KK

Signed-off-by: Krishna Kumar <krkumar2@in.ibm.com>
---

 include/net/sch_generic.h |    9 +++
 net/core/dev.c            |   46 ++++++++++++-----
 net/sched/sch_generic.c   |   94 +++++++++++++++++++++---------------
 3 files changed, 99 insertions(+), 50 deletions(-)

diff -ruNp org/include/net/sch_generic.h new/include/net/sch_generic.h
--- org/include/net/sch_generic.h	2009-03-24 08:54:16.000000000 +0530
+++ new/include/net/sch_generic.h	2009-07-28 19:37:10.000000000 +0530
@@ -45,6 +45,7 @@ struct Qdisc
 #define TCQ_F_BUILTIN		1
 #define TCQ_F_THROTTLED		2
 #define TCQ_F_INGRESS		4
+#define TCQ_F_CAN_BYPASS	8
 #define TCQ_F_WARN_NONWC	(1 << 16)
 	int			padded;
 	struct Qdisc_ops	*ops;
@@ -182,6 +183,11 @@ struct qdisc_skb_cb {
 	char			data[];
 };
 
+static inline int qdisc_qlen(struct Qdisc *q)
+{
+	return q->q.qlen;
+}
+
 static inline struct qdisc_skb_cb *qdisc_skb_cb(struct sk_buff *skb)
 {
 	return (struct qdisc_skb_cb *)skb->cb;
@@ -306,6 +312,9 @@ extern struct Qdisc *qdisc_create_dflt(s
 				       struct Qdisc_ops *ops, u32 parentid);
 extern void qdisc_calculate_pkt_len(struct sk_buff *skb,
 				   struct qdisc_size_table *stab);
+extern int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,
+			   struct net_device *dev, struct netdev_queue *txq,
+			   spinlock_t *root_lock);
 extern void tcf_destroy(struct tcf_proto *tp);
 extern void tcf_destroy_chain(struct tcf_proto **fl);
 
diff -ruNp org2/net/core/dev.c new2/net/core/dev.c
--- org2/net/core/dev.c	2009-07-27 09:08:24.000000000 +0530
+++ new2/net/core/dev.c	2009-07-28 19:37:10.000000000 +0530
@@ -1786,6 +1786,38 @@ static struct netdev_queue *dev_pick_tx(
 	return netdev_get_tx_queue(dev, queue_index);
 }
 
+static inline int __dev_hw_xmit(struct sk_buff *skb, struct Qdisc *q,
+				struct net_device *dev,
+				struct netdev_queue *txq)
+{
+	spinlock_t *root_lock = qdisc_lock(q);
+	int rc;
+
+	spin_lock(root_lock);
+	if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED, &q->state))) {
+		kfree_skb(skb);
+		rc = NET_XMIT_DROP;
+	} else if ((q->flags & TCQ_F_CAN_BYPASS) && !qdisc_qlen(q) &&
+		   !test_and_set_bit(__QDISC_STATE_RUNNING, &q->state)) {
+		/*
+		 * This is the default work-conserving queue; there are no
+		 * old skbs waiting to be sent out; and the qdisc is not
+		 * running - xmit the skb directly.
+		 */
+		if (sch_direct_xmit(skb, q, dev, txq, root_lock))
+			__qdisc_run(q);
+		else
+			clear_bit(__QDISC_STATE_RUNNING, &q->state);
+		rc = 0;
+	} else {
+		rc = qdisc_enqueue_root(skb, q);
+		qdisc_run(q);
+	}
+	spin_unlock(root_lock);
+
+	return rc;
+}
+
 /**
  *	dev_queue_xmit - transmit a buffer
  *	@skb: buffer to transmit
@@ -1859,19 +1891,7 @@ gso:
 	skb->tc_verd = SET_TC_AT(skb->tc_verd,AT_EGRESS);
 #endif
 	if (q->enqueue) {
-		spinlock_t *root_lock = qdisc_lock(q);
-
-		spin_lock(root_lock);
-
-		if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED, &q->state))) {
-			kfree_skb(skb);
-			rc = NET_XMIT_DROP;
-		} else {
-			rc = qdisc_enqueue_root(skb, q);
-			qdisc_run(q);
-		}
-		spin_unlock(root_lock);
-
+		rc = __dev_hw_xmit(skb, q, dev, txq);
 		goto out;
 	}
 
diff -ruNp org2/net/sched/sch_generic.c new2/net/sched/sch_generic.c
--- org2/net/sched/sch_generic.c	2009-05-25 07:48:07.000000000 +0530
+++ new2/net/sched/sch_generic.c	2009-07-28 19:40:39.000000000 +0530
@@ -37,15 +37,11 @@
  * - updates to tree and tree walking are only done under the rtnl mutex.
  */
 
-static inline int qdisc_qlen(struct Qdisc *q)
-{
-	return q->q.qlen;
-}
-
 static inline int dev_requeue_skb(struct sk_buff *skb, struct Qdisc *q)
 {
 	q->gso_skb = skb;
 	q->qstats.requeues++;
+	q->q.qlen++;	/* it's still part of the queue */
 	__netif_schedule(q);
 
 	return 0;
@@ -61,9 +57,11 @@ static inline struct sk_buff *dequeue_sk
 
 		/* check the reason of requeuing without tx lock first */
 		txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb));
-		if (!netif_tx_queue_stopped(txq) && !netif_tx_queue_frozen(txq))
+		if (!netif_tx_queue_stopped(txq) &&
+		    !netif_tx_queue_frozen(txq)) {
 			q->gso_skb = NULL;
-		else
+			q->q.qlen--;
+		} else
 			skb = NULL;
 	} else {
 		skb = q->dequeue(q);
@@ -102,45 +100,24 @@ static inline int handle_dev_cpu_collisi
 	return ret;
 }
 
-/*
- * NOTE: Called under qdisc_lock(q) with locally disabled BH.
- *
- * __QDISC_STATE_RUNNING guarantees only one CPU can process
- * this qdisc at a time. qdisc_lock(q) serializes queue accesses for
- * this queue.
- *
- *  netif_tx_lock serializes accesses to device driver.
- *
- *  qdisc_lock(q) and netif_tx_lock are mutually exclusive,
- *  if one is grabbed, another must be free.
- *
- * Note, that this procedure can be called by a watchdog timer
+/* 
+ * Transmit one skb, and handle the return status as required. Holding the
+ * __QDISC_STATE_RUNNING bit guarantees that only one CPU can execute this
+ * function.
  *
  * Returns to the caller:
  *				0  - queue is empty or throttled.
  *				>0 - queue is not empty.
- *
  */
-static inline int qdisc_restart(struct Qdisc *q)
+int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,
+		    struct net_device *dev, struct netdev_queue *txq,
+		    spinlock_t *root_lock)
 {
-	struct netdev_queue *txq;
 	int ret = NETDEV_TX_BUSY;
-	struct net_device *dev;
-	spinlock_t *root_lock;
-	struct sk_buff *skb;
-
-	/* Dequeue packet */
-	if (unlikely((skb = dequeue_skb(q)) == NULL))
-		return 0;
-
-	root_lock = qdisc_lock(q);
 
 	/* And release qdisc */
 	spin_unlock(root_lock);
 
-	dev = qdisc_dev(q);
-	txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb));
-
 	HARD_TX_LOCK(dev, txq, smp_processor_id());
 	if (!netif_tx_queue_stopped(txq) &&
 	    !netif_tx_queue_frozen(txq))
@@ -177,6 +154,43 @@ static inline int qdisc_restart(struct Q
 	return ret;
 }
 
+/*
+ * NOTE: Called under qdisc_lock(q) with locally disabled BH.
+ *
+ * __QDISC_STATE_RUNNING guarantees only one CPU can process
+ * this qdisc at a time. qdisc_lock(q) serializes queue accesses for
+ * this queue.
+ *
+ *  netif_tx_lock serializes accesses to device driver.
+ *
+ *  qdisc_lock(q) and netif_tx_lock are mutually exclusive,
+ *  if one is grabbed, another must be free.
+ *
+ * Note, that this procedure can be called by a watchdog timer
+ *
+ * Returns to the caller:
+ *				0  - queue is empty or throttled.
+ *				>0 - queue is not empty.
+ *
+ */
+static inline int qdisc_restart(struct Qdisc *q)
+{
+	struct netdev_queue *txq;
+	struct net_device *dev;
+	spinlock_t *root_lock;
+	struct sk_buff *skb;
+
+	/* Dequeue packet */
+	if (unlikely((skb = dequeue_skb(q)) == NULL))
+		return 0;
+
+	root_lock = qdisc_lock(q);
+	dev = qdisc_dev(q);
+	txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb));
+
+	return sch_xmit_direct(skb, q, dev, txq, root_lock);
+}
+
 void __qdisc_run(struct Qdisc *q)
 {
 	unsigned long start_time = jiffies;
@@ -547,8 +561,11 @@ void qdisc_reset(struct Qdisc *qdisc)
 	if (ops->reset)
 		ops->reset(qdisc);
 
-	kfree_skb(qdisc->gso_skb);
-	qdisc->gso_skb = NULL;
+	if (qdisc->gso_skb) {
+		kfree_skb(qdisc->gso_skb);
+		qdisc->gso_skb = NULL;
+		qdisc->q.qlen--;
+	}
 }
 EXPORT_SYMBOL(qdisc_reset);
 
@@ -605,6 +622,9 @@ static void attach_one_default_qdisc(str
 			printk(KERN_INFO "%s: activation failed\n", dev->name);
 			return;
 		}
+
+		/* Can by-pass the queue discipline for default qdisc */
+		qdisc->flags = TCQ_F_CAN_BYPASS;
 	} else {
 		qdisc =  &noqueue_qdisc;
 	}

^ permalink raw reply	[flat|nested] 16+ messages in thread

end of thread, other threads:[~2009-08-04 20:23 UTC | newest]

Thread overview: 16+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
     [not found] <20090728155055.2266.41649.sendpatchset@localhost.localdomain>
2009-08-02  8:51 ` [RFC] [PATCH] Avoid enqueuing skb for default qdiscs Krishna Kumar2
2009-08-03  7:15   ` Jarek Poplawski
2009-08-03  8:09     ` Krishna Kumar2
2009-08-04  3:29   ` David Miller
2009-08-04  3:49     ` Herbert Xu
2009-08-04  3:55       ` David Miller
2009-08-04  6:45       ` Krishna Kumar2
     [not found] <20090803081016.32746.23400.sendpatchset@localhost.localdomain>
2009-08-03  9:17 ` Krishna Kumar2
2009-08-03  9:58   ` Jarek Poplawski
2009-08-03 19:38   ` Jarek Poplawski
2009-08-03 20:22     ` Jarek Poplawski
2009-08-04  6:55     ` Krishna Kumar2
2009-08-04  7:22       ` Jarek Poplawski
2009-08-04 15:51 Krishna Kumar
2009-08-04 16:21 ` Krishna Kumar2
2009-08-04 20:23   ` Jarek Poplawski

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.