[V4,7/9] vhost: do not use RCU to synchronize MMU notifier with worker
diff mbox series

Message ID 20190807070617.23716-8-jasowang@redhat.com
State New
Headers show
Series
  • Fixes for metadata accelreation
Related show

Commit Message

Jason Wang Aug. 7, 2019, 7:06 a.m. UTC
We used to use RCU to synchronize MMU notifier with worker. This leads
calling synchronize_rcu() in invalidate_range_start(). But on a busy
system, there would be many factors that may slow down the
synchronize_rcu() which makes it unsuitable to be called in MMU
notifier.

So this patch switches use seqlock counter to track whether or not the
map was used. The counter was increased when vq try to start or finish
uses the map. This means, when it was even, we're sure there's no
readers and MMU notifier is synchronized. When it was odd, it means
there's a reader we need to wait it to be even again then we are
synchronized. Consider the read critical section is pretty small the
synchronization should be done very fast.

Reported-by: Michael S. Tsirkin <mst@redhat.com>
Fixes: 7f466032dc9e ("vhost: access vq metadata through kernel virtual address")
Signed-off-by: Jason Wang <jasowang@redhat.com>
---
 drivers/vhost/vhost.c | 141 ++++++++++++++++++++++++++----------------
 drivers/vhost/vhost.h |   7 ++-
 2 files changed, 90 insertions(+), 58 deletions(-)

Comments

Jason Gunthorpe Aug. 7, 2019, 12:07 p.m. UTC | #1
On Wed, Aug 07, 2019 at 03:06:15AM -0400, Jason Wang wrote:
> We used to use RCU to synchronize MMU notifier with worker. This leads
> calling synchronize_rcu() in invalidate_range_start(). But on a busy
> system, there would be many factors that may slow down the
> synchronize_rcu() which makes it unsuitable to be called in MMU
> notifier.
> 
> So this patch switches use seqlock counter to track whether or not the
> map was used. The counter was increased when vq try to start or finish
> uses the map. This means, when it was even, we're sure there's no
> readers and MMU notifier is synchronized. When it was odd, it means
> there's a reader we need to wait it to be even again then we are
> synchronized. Consider the read critical section is pretty small the
> synchronization should be done very fast.
> 
> Reported-by: Michael S. Tsirkin <mst@redhat.com>
> Fixes: 7f466032dc9e ("vhost: access vq metadata through kernel virtual address")
> Signed-off-by: Jason Wang <jasowang@redhat.com>
>  drivers/vhost/vhost.c | 141 ++++++++++++++++++++++++++----------------
>  drivers/vhost/vhost.h |   7 ++-
>  2 files changed, 90 insertions(+), 58 deletions(-)
> 
> diff --git a/drivers/vhost/vhost.c b/drivers/vhost/vhost.c
> index cfc11f9ed9c9..57bfbb60d960 100644
> +++ b/drivers/vhost/vhost.c
> @@ -324,17 +324,16 @@ static void vhost_uninit_vq_maps(struct vhost_virtqueue *vq)
>  
>  	spin_lock(&vq->mmu_lock);
>  	for (i = 0; i < VHOST_NUM_ADDRS; i++) {
> -		map[i] = rcu_dereference_protected(vq->maps[i],
> -				  lockdep_is_held(&vq->mmu_lock));
> +		map[i] = vq->maps[i];
>  		if (map[i]) {
>  			vhost_set_map_dirty(vq, map[i], i);
> -			rcu_assign_pointer(vq->maps[i], NULL);
> +			vq->maps[i] = NULL;
>  		}
>  	}
>  	spin_unlock(&vq->mmu_lock);
>  
> -	/* No need for synchronize_rcu() or kfree_rcu() since we are
> -	 * serialized with memory accessors (e.g vq mutex held).
> +	/* No need for synchronization since we are serialized with
> +	 * memory accessors (e.g vq mutex held).
>  	 */
>  
>  	for (i = 0; i < VHOST_NUM_ADDRS; i++)
> @@ -362,6 +361,40 @@ static bool vhost_map_range_overlap(struct vhost_uaddr *uaddr,
>  	return !(end < uaddr->uaddr || start > uaddr->uaddr - 1 + uaddr->size);
>  }
>  
> +static void inline vhost_vq_access_map_begin(struct vhost_virtqueue *vq)
> +{
> +	write_seqcount_begin(&vq->seq);
> +}
> +
> +static void inline vhost_vq_access_map_end(struct vhost_virtqueue *vq)
> +{
> +	write_seqcount_end(&vq->seq);
> +}

The write side of a seqlock only provides write barriers. Access to

	map = vq->maps[VHOST_ADDR_USED];

Still needs a read side barrier, and then I think this will be no
better than a normal spinlock.

It also doesn't seem like this algorithm even needs a seqlock, as this
is just a one bit flag

atomic_set_bit(using map)
smp_mb__after_atomic()
.. maps [...]
atomic_clear_bit(using map)


map = NULL;
smp_mb__before_atomic();
while (atomic_read_bit(using map))
   relax()

Again, not clear this could be faster than a spinlock when the
barriers are correct...

Jason
Jason Wang Aug. 7, 2019, 2:02 p.m. UTC | #2
On 2019/8/7 下午8:07, Jason Gunthorpe wrote:
> On Wed, Aug 07, 2019 at 03:06:15AM -0400, Jason Wang wrote:
>> We used to use RCU to synchronize MMU notifier with worker. This leads
>> calling synchronize_rcu() in invalidate_range_start(). But on a busy
>> system, there would be many factors that may slow down the
>> synchronize_rcu() which makes it unsuitable to be called in MMU
>> notifier.
>>
>> So this patch switches use seqlock counter to track whether or not the
>> map was used. The counter was increased when vq try to start or finish
>> uses the map. This means, when it was even, we're sure there's no
>> readers and MMU notifier is synchronized. When it was odd, it means
>> there's a reader we need to wait it to be even again then we are
>> synchronized. Consider the read critical section is pretty small the
>> synchronization should be done very fast.
>>
>> Reported-by: Michael S. Tsirkin <mst@redhat.com>
>> Fixes: 7f466032dc9e ("vhost: access vq metadata through kernel virtual address")
>> Signed-off-by: Jason Wang <jasowang@redhat.com>
>>   drivers/vhost/vhost.c | 141 ++++++++++++++++++++++++++----------------
>>   drivers/vhost/vhost.h |   7 ++-
>>   2 files changed, 90 insertions(+), 58 deletions(-)
>>
>> diff --git a/drivers/vhost/vhost.c b/drivers/vhost/vhost.c
>> index cfc11f9ed9c9..57bfbb60d960 100644
>> +++ b/drivers/vhost/vhost.c
>> @@ -324,17 +324,16 @@ static void vhost_uninit_vq_maps(struct vhost_virtqueue *vq)
>>   
>>   	spin_lock(&vq->mmu_lock);
>>   	for (i = 0; i < VHOST_NUM_ADDRS; i++) {
>> -		map[i] = rcu_dereference_protected(vq->maps[i],
>> -				  lockdep_is_held(&vq->mmu_lock));
>> +		map[i] = vq->maps[i];
>>   		if (map[i]) {
>>   			vhost_set_map_dirty(vq, map[i], i);
>> -			rcu_assign_pointer(vq->maps[i], NULL);
>> +			vq->maps[i] = NULL;
>>   		}
>>   	}
>>   	spin_unlock(&vq->mmu_lock);
>>   
>> -	/* No need for synchronize_rcu() or kfree_rcu() since we are
>> -	 * serialized with memory accessors (e.g vq mutex held).
>> +	/* No need for synchronization since we are serialized with
>> +	 * memory accessors (e.g vq mutex held).
>>   	 */
>>   
>>   	for (i = 0; i < VHOST_NUM_ADDRS; i++)
>> @@ -362,6 +361,40 @@ static bool vhost_map_range_overlap(struct vhost_uaddr *uaddr,
>>   	return !(end < uaddr->uaddr || start > uaddr->uaddr - 1 + uaddr->size);
>>   }
>>   
>> +static void inline vhost_vq_access_map_begin(struct vhost_virtqueue *vq)
>> +{
>> +	write_seqcount_begin(&vq->seq);
>> +}
>> +
>> +static void inline vhost_vq_access_map_end(struct vhost_virtqueue *vq)
>> +{
>> +	write_seqcount_end(&vq->seq);
>> +}
> The write side of a seqlock only provides write barriers. Access to
>
> 	map = vq->maps[VHOST_ADDR_USED];
>
> Still needs a read side barrier, and then I think this will be no
> better than a normal spinlock.
>
> It also doesn't seem like this algorithm even needs a seqlock, as this
> is just a one bit flag


Right, so then I tend to use spinlock first for correctness.


>
> atomic_set_bit(using map)
> smp_mb__after_atomic()
> .. maps [...]
> atomic_clear_bit(using map)
>
>
> map = NULL;
> smp_mb__before_atomic();
> while (atomic_read_bit(using map))
>     relax()
>
> Again, not clear this could be faster than a spinlock when the
> barriers are correct...


Yes, for next release we may want to use the idea from Michael like to 
mitigate the impact of mb.

https://lwn.net/Articles/775871/

Thanks


>
> Jason
Jason Wang Aug. 8, 2019, 12:54 p.m. UTC | #3
On 2019/8/7 下午10:02, Jason Wang wrote:
>
> On 2019/8/7 下午8:07, Jason Gunthorpe wrote:
>> On Wed, Aug 07, 2019 at 03:06:15AM -0400, Jason Wang wrote:
>>> We used to use RCU to synchronize MMU notifier with worker. This leads
>>> calling synchronize_rcu() in invalidate_range_start(). But on a busy
>>> system, there would be many factors that may slow down the
>>> synchronize_rcu() which makes it unsuitable to be called in MMU
>>> notifier.
>>>
>>> So this patch switches use seqlock counter to track whether or not the
>>> map was used. The counter was increased when vq try to start or finish
>>> uses the map. This means, when it was even, we're sure there's no
>>> readers and MMU notifier is synchronized. When it was odd, it means
>>> there's a reader we need to wait it to be even again then we are
>>> synchronized. Consider the read critical section is pretty small the
>>> synchronization should be done very fast.
>>>
>>> Reported-by: Michael S. Tsirkin <mst@redhat.com>
>>> Fixes: 7f466032dc9e ("vhost: access vq metadata through kernel
>>> virtual address")
>>> Signed-off-by: Jason Wang <jasowang@redhat.com>
>>>   drivers/vhost/vhost.c | 141
>>> ++++++++++++++++++++++++++----------------
>>>   drivers/vhost/vhost.h |   7 ++-
>>>   2 files changed, 90 insertions(+), 58 deletions(-)
>>>
>>> diff --git a/drivers/vhost/vhost.c b/drivers/vhost/vhost.c
>>> index cfc11f9ed9c9..57bfbb60d960 100644
>>> +++ b/drivers/vhost/vhost.c
>>> @@ -324,17 +324,16 @@ static void vhost_uninit_vq_maps(struct
>>> vhost_virtqueue *vq)
>>>         spin_lock(&vq->mmu_lock);
>>>       for (i = 0; i < VHOST_NUM_ADDRS; i++) {
>>> -        map[i] = rcu_dereference_protected(vq->maps[i],
>>> -                  lockdep_is_held(&vq->mmu_lock));
>>> +        map[i] = vq->maps[i];
>>>           if (map[i]) {
>>>               vhost_set_map_dirty(vq, map[i], i);
>>> -            rcu_assign_pointer(vq->maps[i], NULL);
>>> +            vq->maps[i] = NULL;
>>>           }
>>>       }
>>>       spin_unlock(&vq->mmu_lock);
>>>   -    /* No need for synchronize_rcu() or kfree_rcu() since we are
>>> -     * serialized with memory accessors (e.g vq mutex held).
>>> +    /* No need for synchronization since we are serialized with
>>> +     * memory accessors (e.g vq mutex held).
>>>        */
>>>         for (i = 0; i < VHOST_NUM_ADDRS; i++)
>>> @@ -362,6 +361,40 @@ static bool vhost_map_range_overlap(struct
>>> vhost_uaddr *uaddr,
>>>       return !(end < uaddr->uaddr || start > uaddr->uaddr - 1 +
>>> uaddr->size);
>>>   }
>>>   +static void inline vhost_vq_access_map_begin(struct
>>> vhost_virtqueue *vq)
>>> +{
>>> +    write_seqcount_begin(&vq->seq);
>>> +}
>>> +
>>> +static void inline vhost_vq_access_map_end(struct vhost_virtqueue *vq)
>>> +{
>>> +    write_seqcount_end(&vq->seq);
>>> +}
>> The write side of a seqlock only provides write barriers. Access to
>>
>>     map = vq->maps[VHOST_ADDR_USED];
>>
>> Still needs a read side barrier, and then I think this will be no
>> better than a normal spinlock.
>>
>> It also doesn't seem like this algorithm even needs a seqlock, as this
>> is just a one bit flag
>
>
> Right, so then I tend to use spinlock first for correctness.
>
>
>>
>> atomic_set_bit(using map)
>> smp_mb__after_atomic()
>> .. maps [...]
>> atomic_clear_bit(using map)
>>
>>
>> map = NULL;
>> smp_mb__before_atomic();
>> while (atomic_read_bit(using map))
>>     relax()
>>
>> Again, not clear this could be faster than a spinlock when the
>> barriers are correct...
>

I've done some benchmark[1] on x86, and yes it looks even slower. It
looks to me the atomic stuffs is not necessary, so in order to compare
it better with spinlock. I tweak it a little bit through
smp_load_acquire()/store_releaes() + mb() like:

static struct vhost_map *vhost_vq_access_map_begin(struct
vhost_virtqueue
*vq,                                                                          

                                                   unsigned int
type)                                                                                   

{                                                                                                                                                       

       
++vq->counter;                                                                                                                                  

        /* Ensure map was read after incresing the counter.
Paired                                                                                      

         * with smp_mb() in
vhost_vq_sync_access().                                                                                                     

        
*/                                                                                                                                             

       
smp_mb();                                                                                                                                       

        return
vq->maps[type];                                                                                                                          

}                                                                                                                                                       

                                                                                                                                                        

static void inline vhost_vq_access_map_end(struct vhost_virtqueue
*vq)                                                                                  

{                                                                                                                                                       

        /* Ensure all memory access through map was done
before                                                                                         

         * reducing the counter. Paired with smp_load_acquire()
in                                                                                      

         * vhost_vq_sync_access()
*/                                                                                                                    

        smp_store_release(&vq->counter,
--vq->counter);                                                                                                 

}                                                                                                                                                       

                                                                                                                                                        

static void inline vhost_vq_sync_access(struct vhost_virtqueue
*vq)                                                                                     

{                                                                                                                                                       

        /* Ensure new map value is visible before checking counter.
*/                                                                                  

       
smp_mb();                                                                                                                                       

        /* Ensure map was freed after reading counter value,
paired                                                                                     

         * with smp_store_release() in
vhost_vq_access_map_end().                                                                                       

        
*/                                                                                                                                             

        while (smp_load_acquire(&vq->counter))
{                                                                                                        

                if
(need_resched())                                                                                                                     

                       
schedule();                                                                                                                     

       
}                                                                                                                                               

}         


And the result is something like:

         base | direct + atomic bitops | direct + spinlock() | direct +
counter + smp_mb() | direct + RCU     |
SMAP on  | 5.0Mpps | 5.0Mpps     (+0%)      | 5.7Mpps      (+14%)      |
5.9Mpps  (+18%)            | 6.2Mpps  (+24%)  |
SMAP off | 7.0Mpps | 7.0Mpps     (+0%)      | 7.0Mpps   (+0%)     |
7.5Mpps  (+7%)            | 8.2Mpps  (+17%)  |


base: normal copy_to_user()/copy_from_user() path.
direct + atomic bitops: using direct mapping but synchronize through
atomic bitops like you suggested above
direct + spinlock(): using direct mapping but synchronize through spinlocks
direct + counter + smp_mb(): using direct mapping but synchronize
through counter + smp_mb()
direct + RCU: using direct mapping and synchronize through RCU (buggy
and need to be addressed by this series)


So smp_mb() + counter is fastest way. And spinlock can still show some
improvement (+14%) in the case of SMAP, but no the case when SMAP is off.

I don't have any objection to convert  to spinlock() but just want to
know if any case that the above smp_mb() + counter looks good to you?

Thanks


>
> Yes, for next release we may want to use the idea from Michael like to
> mitigate the impact of mb.
>
> https://lwn.net/Articles/775871/
>
> Thanks
>
>
>>
>> Jason
Jason Wang Aug. 8, 2019, 1:01 p.m. UTC | #4
----- Original Message -----
> 
> On 2019/8/7 下午10:02, Jason Wang wrote:
> >
> > On 2019/8/7 下午8:07, Jason Gunthorpe wrote:
> >> On Wed, Aug 07, 2019 at 03:06:15AM -0400, Jason Wang wrote:
> >>> We used to use RCU to synchronize MMU notifier with worker. This leads
> >>> calling synchronize_rcu() in invalidate_range_start(). But on a busy
> >>> system, there would be many factors that may slow down the
> >>> synchronize_rcu() which makes it unsuitable to be called in MMU
> >>> notifier.
> >>>
> >>> So this patch switches use seqlock counter to track whether or not the
> >>> map was used. The counter was increased when vq try to start or finish
> >>> uses the map. This means, when it was even, we're sure there's no
> >>> readers and MMU notifier is synchronized. When it was odd, it means
> >>> there's a reader we need to wait it to be even again then we are
> >>> synchronized. Consider the read critical section is pretty small the
> >>> synchronization should be done very fast.
> >>>
> >>> Reported-by: Michael S. Tsirkin <mst@redhat.com>
> >>> Fixes: 7f466032dc9e ("vhost: access vq metadata through kernel
> >>> virtual address")
> >>> Signed-off-by: Jason Wang <jasowang@redhat.com>
> >>>   drivers/vhost/vhost.c | 141
> >>> ++++++++++++++++++++++++++----------------
> >>>   drivers/vhost/vhost.h |   7 ++-
> >>>   2 files changed, 90 insertions(+), 58 deletions(-)
> >>>
> >>> diff --git a/drivers/vhost/vhost.c b/drivers/vhost/vhost.c
> >>> index cfc11f9ed9c9..57bfbb60d960 100644
> >>> +++ b/drivers/vhost/vhost.c
> >>> @@ -324,17 +324,16 @@ static void vhost_uninit_vq_maps(struct
> >>> vhost_virtqueue *vq)
> >>>         spin_lock(&vq->mmu_lock);
> >>>       for (i = 0; i < VHOST_NUM_ADDRS; i++) {
> >>> -        map[i] = rcu_dereference_protected(vq->maps[i],
> >>> -                  lockdep_is_held(&vq->mmu_lock));
> >>> +        map[i] = vq->maps[i];
> >>>           if (map[i]) {
> >>>               vhost_set_map_dirty(vq, map[i], i);
> >>> -            rcu_assign_pointer(vq->maps[i], NULL);
> >>> +            vq->maps[i] = NULL;
> >>>           }
> >>>       }
> >>>       spin_unlock(&vq->mmu_lock);
> >>>   -    /* No need for synchronize_rcu() or kfree_rcu() since we are
> >>> -     * serialized with memory accessors (e.g vq mutex held).
> >>> +    /* No need for synchronization since we are serialized with
> >>> +     * memory accessors (e.g vq mutex held).
> >>>        */
> >>>         for (i = 0; i < VHOST_NUM_ADDRS; i++)
> >>> @@ -362,6 +361,40 @@ static bool vhost_map_range_overlap(struct
> >>> vhost_uaddr *uaddr,
> >>>       return !(end < uaddr->uaddr || start > uaddr->uaddr - 1 +
> >>> uaddr->size);
> >>>   }
> >>>   +static void inline vhost_vq_access_map_begin(struct
> >>> vhost_virtqueue *vq)
> >>> +{
> >>> +    write_seqcount_begin(&vq->seq);
> >>> +}
> >>> +
> >>> +static void inline vhost_vq_access_map_end(struct vhost_virtqueue *vq)
> >>> +{
> >>> +    write_seqcount_end(&vq->seq);
> >>> +}
> >> The write side of a seqlock only provides write barriers. Access to
> >>
> >>     map = vq->maps[VHOST_ADDR_USED];
> >>
> >> Still needs a read side barrier, and then I think this will be no
> >> better than a normal spinlock.
> >>
> >> It also doesn't seem like this algorithm even needs a seqlock, as this
> >> is just a one bit flag
> >
> >
> > Right, so then I tend to use spinlock first for correctness.
> >
> >
> >>
> >> atomic_set_bit(using map)
> >> smp_mb__after_atomic()
> >> .. maps [...]
> >> atomic_clear_bit(using map)
> >>
> >>
> >> map = NULL;
> >> smp_mb__before_atomic();
> >> while (atomic_read_bit(using map))
> >>     relax()
> >>
> >> Again, not clear this could be faster than a spinlock when the
> >> barriers are correct...
> >
> 
> I've done some benchmark[1] on x86, and yes it looks even slower. It
> looks to me the atomic stuffs is not necessary, so in order to compare
> it better with spinlock. I tweak it a little bit through
> smp_load_acquire()/store_releaes() + mb() like:
> 

Sorry the format is messed up:

The code should be something like:

static struct vhost_map *vhost_vq_access_map_begin(struct vhost_virtqueue *vq,
                                                   unsigned int type)
{
        ++vq->counter;
        /* Ensure map was read after incresing the counter. Paired
         * with smp_mb() in vhost_vq_sync_access().
         */
        smp_mb();
        return vq->maps[type];
}

static void inline vhost_vq_access_map_end(struct vhost_virtqueue *vq)
{
 	/* Ensure all memory access through map was done before
         * reducing the counter. Paired with smp_load_acquire() in
         * vhost_vq_sync_access() */
        smp_store_release(&vq->counter, --vq->counter);
}

static void inline vhost_vq_sync_access(struct vhost_virtqueue *vq)
{
        /* Ensure new map value is visible before checking counter. */
        smp_mb();
        /* Ensure map was freed after reading counter value, paired
         * with smp_store_release() in vhost_vq_access_map_end().
         */
        while (smp_load_acquire(&vq->counter)) {
                if (need_resched())
                        schedule();
        }
}

And the benchmark result is:

         | base    | direct + atomic bitops | direct + spinlock() | direct + counter + smp_mb() | direct + RCU     |
SMAP on  | 5.0Mpps | 5.0Mpps     (+0%)      | 5.7Mpps  	(+14%)	  | 5.9Mpps  (+18%)	        | 6.2Mpps  (+24%)  |
SMAP off | 7.0Mpps | 7.0Mpps     (+0%)      | 7.0Mpps   (+0%)     | 7.5Mpps  (+7%)	        | 8.2Mpps  (+17%)  |


> 
> 
> base: normal copy_to_user()/copy_from_user() path.
> direct + atomic bitops: using direct mapping but synchronize through
> atomic bitops like you suggested above
> direct + spinlock(): using direct mapping but synchronize through spinlocks
> direct + counter + smp_mb(): using direct mapping but synchronize
> through counter + smp_mb()
> direct + RCU: using direct mapping and synchronize through RCU (buggy
> and need to be addressed by this series)
> 
> 
> So smp_mb() + counter is fastest way. And spinlock can still show some
> improvement (+14%) in the case of SMAP, but no the case when SMAP is off.
> 
> I don't have any objection to convert  to spinlock() but just want to
> know if any case that the above smp_mb() + counter looks good to you?
> 
> Thanks
> 
> 
> >
> > Yes, for next release we may want to use the idea from Michael like to
> > mitigate the impact of mb.
> >
> > https://lwn.net/Articles/775871/
> >
> > Thanks
> >
> >
> >>
> >> Jason
> 
> _______________________________________________
> Virtualization mailing list
> Virtualization@lists.linux-foundation.org
> https://lists.linuxfoundation.org/mailman/listinfo/virtualization
Jason Gunthorpe Aug. 8, 2019, 1:05 p.m. UTC | #5
On Thu, Aug 08, 2019 at 08:54:54PM +0800, Jason Wang wrote:

> I don't have any objection to convert  to spinlock() but just want to
> know if any case that the above smp_mb() + counter looks good to you?

This email is horribly mangled, but I don't think mixing smb_mb() and
smp_load_acquire() would be considerd a best-practice, and using
smp_store_release() instead would be the wrong barrier.

spinlock does seem to be the only existing locking primitive that does
what is needed here.

Jason
Michael S. Tsirkin Aug. 10, 2019, 7:12 p.m. UTC | #6
On Thu, Aug 08, 2019 at 08:54:54PM +0800, Jason Wang wrote:
> I don't have any objection to convert  to spinlock() but just want to
> know if any case that the above smp_mb() + counter looks good to you?

So how about we try this:
- revert the original patch for this release
- new safe patch with a spinlock for the next release
- whatever improvements we can come up with on top

Thoughts?

Because I think this needs much more scrutiny than we can
give an incremental patch.

Patch
diff mbox series

diff --git a/drivers/vhost/vhost.c b/drivers/vhost/vhost.c
index cfc11f9ed9c9..57bfbb60d960 100644
--- a/drivers/vhost/vhost.c
+++ b/drivers/vhost/vhost.c
@@ -324,17 +324,16 @@  static void vhost_uninit_vq_maps(struct vhost_virtqueue *vq)
 
 	spin_lock(&vq->mmu_lock);
 	for (i = 0; i < VHOST_NUM_ADDRS; i++) {
-		map[i] = rcu_dereference_protected(vq->maps[i],
-				  lockdep_is_held(&vq->mmu_lock));
+		map[i] = vq->maps[i];
 		if (map[i]) {
 			vhost_set_map_dirty(vq, map[i], i);
-			rcu_assign_pointer(vq->maps[i], NULL);
+			vq->maps[i] = NULL;
 		}
 	}
 	spin_unlock(&vq->mmu_lock);
 
-	/* No need for synchronize_rcu() or kfree_rcu() since we are
-	 * serialized with memory accessors (e.g vq mutex held).
+	/* No need for synchronization since we are serialized with
+	 * memory accessors (e.g vq mutex held).
 	 */
 
 	for (i = 0; i < VHOST_NUM_ADDRS; i++)
@@ -362,6 +361,40 @@  static bool vhost_map_range_overlap(struct vhost_uaddr *uaddr,
 	return !(end < uaddr->uaddr || start > uaddr->uaddr - 1 + uaddr->size);
 }
 
+static void inline vhost_vq_access_map_begin(struct vhost_virtqueue *vq)
+{
+	write_seqcount_begin(&vq->seq);
+}
+
+static void inline vhost_vq_access_map_end(struct vhost_virtqueue *vq)
+{
+	write_seqcount_end(&vq->seq);
+}
+
+static void inline vhost_vq_sync_access(struct vhost_virtqueue *vq)
+{
+	unsigned int seq;
+
+	/* Make sure any changes to map was done before checking seq
+	 * counter. Paired with smp_wmb() in write_seqcount_begin().
+	 */
+	smp_mb();
+	seq = raw_read_seqcount(&vq->seq);
+	/* Odd means the map was currently accessed by vhost worker */
+	if (seq & 0x1) {
+		/* When seq changes, we are sure no reader can see
+		 * previous map */
+		while (raw_read_seqcount(&vq->seq) == seq) {
+			if (need_resched())
+				schedule();
+		}
+	}
+	/* Make sure seq counter was checked before map is
+	 * freed. Paired with smp_wmb() in write_seqcount_end().
+	 */
+	smp_mb();
+}
+
 static void vhost_invalidate_vq_start(struct vhost_virtqueue *vq,
 				      int index,
 				      unsigned long start,
@@ -376,16 +409,15 @@  static void vhost_invalidate_vq_start(struct vhost_virtqueue *vq,
 	spin_lock(&vq->mmu_lock);
 	++vq->invalidate_count;
 
-	map = rcu_dereference_protected(vq->maps[index],
-					lockdep_is_held(&vq->mmu_lock));
+	map = vq->maps[index];
 	if (map) {
 		vhost_set_map_dirty(vq, map, index);
-		rcu_assign_pointer(vq->maps[index], NULL);
+		vq->maps[index] = NULL;
 	}
 	spin_unlock(&vq->mmu_lock);
 
 	if (map) {
-		synchronize_rcu();
+		vhost_vq_sync_access(vq);
 		vhost_map_unprefetch(map);
 	}
 }
@@ -457,7 +489,7 @@  static void vhost_init_maps(struct vhost_dev *dev)
 	for (i = 0; i < dev->nvqs; ++i) {
 		vq = dev->vqs[i];
 		for (j = 0; j < VHOST_NUM_ADDRS; j++)
-			RCU_INIT_POINTER(vq->maps[j], NULL);
+			vq->maps[j] = NULL;
 	}
 }
 #endif
@@ -655,6 +687,7 @@  void vhost_dev_init(struct vhost_dev *dev,
 		vq->indirect = NULL;
 		vq->heads = NULL;
 		vq->dev = dev;
+		seqcount_init(&vq->seq);
 		mutex_init(&vq->mutex);
 		spin_lock_init(&vq->mmu_lock);
 		vhost_vq_reset(dev, vq);
@@ -921,7 +954,7 @@  static int vhost_map_prefetch(struct vhost_virtqueue *vq,
 	map->npages = npages;
 	map->pages = pages;
 
-	rcu_assign_pointer(vq->maps[index], map);
+	vq->maps[index] = map;
 	/* No need for a synchronize_rcu(). This function should be
 	 * called by dev->worker so we are serialized with all
 	 * readers.
@@ -1216,18 +1249,18 @@  static inline int vhost_put_avail_event(struct vhost_virtqueue *vq)
 	struct vring_used *used;
 
 	if (!vq->iotlb) {
-		rcu_read_lock();
+		vhost_vq_access_map_begin(vq);
 
-		map = rcu_dereference(vq->maps[VHOST_ADDR_USED]);
+		map = vq->maps[VHOST_ADDR_USED];
 		if (likely(map)) {
 			used = map->addr;
 			*((__virtio16 *)&used->ring[vq->num]) =
 				cpu_to_vhost16(vq, vq->avail_idx);
-			rcu_read_unlock();
+			vhost_vq_access_map_end(vq);
 			return 0;
 		}
 
-		rcu_read_unlock();
+		vhost_vq_access_map_end(vq);
 	}
 #endif
 
@@ -1245,18 +1278,18 @@  static inline int vhost_put_used(struct vhost_virtqueue *vq,
 	size_t size;
 
 	if (!vq->iotlb) {
-		rcu_read_lock();
+		vhost_vq_access_map_begin(vq);
 
-		map = rcu_dereference(vq->maps[VHOST_ADDR_USED]);
+		map = vq->maps[VHOST_ADDR_USED];
 		if (likely(map)) {
 			used = map->addr;
 			size = count * sizeof(*head);
 			memcpy(used->ring + idx, head, size);
-			rcu_read_unlock();
+			vhost_vq_access_map_end(vq);
 			return 0;
 		}
 
-		rcu_read_unlock();
+		vhost_vq_access_map_end(vq);
 	}
 #endif
 
@@ -1272,17 +1305,17 @@  static inline int vhost_put_used_flags(struct vhost_virtqueue *vq)
 	struct vring_used *used;
 
 	if (!vq->iotlb) {
-		rcu_read_lock();
+		vhost_vq_access_map_begin(vq);
 
-		map = rcu_dereference(vq->maps[VHOST_ADDR_USED]);
+		map = vq->maps[VHOST_ADDR_USED];
 		if (likely(map)) {
 			used = map->addr;
 			used->flags = cpu_to_vhost16(vq, vq->used_flags);
-			rcu_read_unlock();
+			vhost_vq_access_map_end(vq);
 			return 0;
 		}
 
-		rcu_read_unlock();
+		vhost_vq_access_map_end(vq);
 	}
 #endif
 
@@ -1298,17 +1331,17 @@  static inline int vhost_put_used_idx(struct vhost_virtqueue *vq)
 	struct vring_used *used;
 
 	if (!vq->iotlb) {
-		rcu_read_lock();
+		vhost_vq_access_map_begin(vq);
 
-		map = rcu_dereference(vq->maps[VHOST_ADDR_USED]);
+		map = vq->maps[VHOST_ADDR_USED];
 		if (likely(map)) {
 			used = map->addr;
 			used->idx = cpu_to_vhost16(vq, vq->last_used_idx);
-			rcu_read_unlock();
+			vhost_vq_access_map_end(vq);
 			return 0;
 		}
 
-		rcu_read_unlock();
+		vhost_vq_access_map_end(vq);
 	}
 #endif
 
@@ -1362,17 +1395,17 @@  static inline int vhost_get_avail_idx(struct vhost_virtqueue *vq,
 	struct vring_avail *avail;
 
 	if (!vq->iotlb) {
-		rcu_read_lock();
+		vhost_vq_access_map_begin(vq);
 
-		map = rcu_dereference(vq->maps[VHOST_ADDR_AVAIL]);
+		map = vq->maps[VHOST_ADDR_AVAIL];
 		if (likely(map)) {
 			avail = map->addr;
 			*idx = avail->idx;
-			rcu_read_unlock();
+			vhost_vq_access_map_end(vq);
 			return 0;
 		}
 
-		rcu_read_unlock();
+		vhost_vq_access_map_end(vq);
 	}
 #endif
 
@@ -1387,17 +1420,17 @@  static inline int vhost_get_avail_head(struct vhost_virtqueue *vq,
 	struct vring_avail *avail;
 
 	if (!vq->iotlb) {
-		rcu_read_lock();
+		vhost_vq_access_map_begin(vq);
 
-		map = rcu_dereference(vq->maps[VHOST_ADDR_AVAIL]);
+		map = vq->maps[VHOST_ADDR_AVAIL];
 		if (likely(map)) {
 			avail = map->addr;
 			*head = avail->ring[idx & (vq->num - 1)];
-			rcu_read_unlock();
+			vhost_vq_access_map_end(vq);
 			return 0;
 		}
 
-		rcu_read_unlock();
+		vhost_vq_access_map_end(vq);
 	}
 #endif
 
@@ -1413,17 +1446,17 @@  static inline int vhost_get_avail_flags(struct vhost_virtqueue *vq,
 	struct vring_avail *avail;
 
 	if (!vq->iotlb) {
-		rcu_read_lock();
+		vhost_vq_access_map_begin(vq);
 
-		map = rcu_dereference(vq->maps[VHOST_ADDR_AVAIL]);
+		map = vq->maps[VHOST_ADDR_AVAIL];
 		if (likely(map)) {
 			avail = map->addr;
 			*flags = avail->flags;
-			rcu_read_unlock();
+			vhost_vq_access_map_end(vq);
 			return 0;
 		}
 
-		rcu_read_unlock();
+		vhost_vq_access_map_end(vq);
 	}
 #endif
 
@@ -1438,15 +1471,15 @@  static inline int vhost_get_used_event(struct vhost_virtqueue *vq,
 	struct vring_avail *avail;
 
 	if (!vq->iotlb) {
-		rcu_read_lock();
-		map = rcu_dereference(vq->maps[VHOST_ADDR_AVAIL]);
+		vhost_vq_access_map_begin(vq);
+		map = vq->maps[VHOST_ADDR_AVAIL];
 		if (likely(map)) {
 			avail = map->addr;
 			*event = (__virtio16)avail->ring[vq->num];
-			rcu_read_unlock();
+			vhost_vq_access_map_end(vq);
 			return 0;
 		}
-		rcu_read_unlock();
+		vhost_vq_access_map_end(vq);
 	}
 #endif
 
@@ -1461,17 +1494,17 @@  static inline int vhost_get_used_idx(struct vhost_virtqueue *vq,
 	struct vring_used *used;
 
 	if (!vq->iotlb) {
-		rcu_read_lock();
+		vhost_vq_access_map_begin(vq);
 
-		map = rcu_dereference(vq->maps[VHOST_ADDR_USED]);
+		map = vq->maps[VHOST_ADDR_USED];
 		if (likely(map)) {
 			used = map->addr;
 			*idx = used->idx;
-			rcu_read_unlock();
+			vhost_vq_access_map_end(vq);
 			return 0;
 		}
 
-		rcu_read_unlock();
+		vhost_vq_access_map_end(vq);
 	}
 #endif
 
@@ -1486,17 +1519,17 @@  static inline int vhost_get_desc(struct vhost_virtqueue *vq,
 	struct vring_desc *d;
 
 	if (!vq->iotlb) {
-		rcu_read_lock();
+		vhost_vq_access_map_begin(vq);
 
-		map = rcu_dereference(vq->maps[VHOST_ADDR_DESC]);
+		map = vq->maps[VHOST_ADDR_DESC];
 		if (likely(map)) {
 			d = map->addr;
 			*desc = *(d + idx);
-			rcu_read_unlock();
+			vhost_vq_access_map_end(vq);
 			return 0;
 		}
 
-		rcu_read_unlock();
+		vhost_vq_access_map_end(vq);
 	}
 #endif
 
@@ -1843,13 +1876,11 @@  static bool iotlb_access_ok(struct vhost_virtqueue *vq,
 #if VHOST_ARCH_CAN_ACCEL_UACCESS
 static void vhost_vq_map_prefetch(struct vhost_virtqueue *vq)
 {
-	struct vhost_map __rcu *map;
+	struct vhost_map *map;
 	int i;
 
 	for (i = 0; i < VHOST_NUM_ADDRS; i++) {
-		rcu_read_lock();
-		map = rcu_dereference(vq->maps[i]);
-		rcu_read_unlock();
+		map = vq->maps[i];
 		if (unlikely(!map))
 			vhost_map_prefetch(vq, i);
 	}
diff --git a/drivers/vhost/vhost.h b/drivers/vhost/vhost.h
index a9a2a93857d2..12399e7c7a61 100644
--- a/drivers/vhost/vhost.h
+++ b/drivers/vhost/vhost.h
@@ -115,16 +115,17 @@  struct vhost_virtqueue {
 #if VHOST_ARCH_CAN_ACCEL_UACCESS
 	/* Read by memory accessors, modified by meta data
 	 * prefetching, MMU notifier and vring ioctl().
-	 * Synchonrized through mmu_lock (writers) and RCU (writers
-	 * and readers).
+	 * Synchonrized through mmu_lock (writers) and seqlock
+         * counters,  see vhost_vq_access_map_{begin|end}().
 	 */
-	struct vhost_map __rcu *maps[VHOST_NUM_ADDRS];
+	struct vhost_map *maps[VHOST_NUM_ADDRS];
 	/* Read by MMU notifier, modified by vring ioctl(),
 	 * synchronized through MMU notifier
 	 * registering/unregistering.
 	 */
 	struct vhost_uaddr uaddrs[VHOST_NUM_ADDRS];
 #endif
+	seqcount_t seq;
 	const struct vhost_umem_node *meta_iotlb[VHOST_NUM_ADDRS];
 
 	struct file *kick;