On 2019/8/7 下午10:02, Jason Wang wrote: > > On 2019/8/7 下午8:07, Jason Gunthorpe wrote: >> On Wed, Aug 07, 2019 at 03:06:15AM -0400, Jason Wang wrote: >>> We used to use RCU to synchronize MMU notifier with worker. This leads >>> calling synchronize_rcu() in invalidate_range_start(). But on a busy >>> system, there would be many factors that may slow down the >>> synchronize_rcu() which makes it unsuitable to be called in MMU >>> notifier. >>> >>> So this patch switches use seqlock counter to track whether or not the >>> map was used. The counter was increased when vq try to start or finish >>> uses the map. This means, when it was even, we're sure there's no >>> readers and MMU notifier is synchronized. When it was odd, it means >>> there's a reader we need to wait it to be even again then we are >>> synchronized. Consider the read critical section is pretty small the >>> synchronization should be done very fast. >>> >>> Reported-by: Michael S. Tsirkin >>> Fixes: 7f466032dc9e ("vhost: access vq metadata through kernel >>> virtual address") >>> Signed-off-by: Jason Wang >>>   drivers/vhost/vhost.c | 141 >>> ++++++++++++++++++++++++++---------------- >>>   drivers/vhost/vhost.h |   7 ++- >>>   2 files changed, 90 insertions(+), 58 deletions(-) >>> >>> diff --git a/drivers/vhost/vhost.c b/drivers/vhost/vhost.c >>> index cfc11f9ed9c9..57bfbb60d960 100644 >>> +++ b/drivers/vhost/vhost.c >>> @@ -324,17 +324,16 @@ static void vhost_uninit_vq_maps(struct >>> vhost_virtqueue *vq) >>>         spin_lock(&vq->mmu_lock); >>>       for (i = 0; i < VHOST_NUM_ADDRS; i++) { >>> -        map[i] = rcu_dereference_protected(vq->maps[i], >>> -                  lockdep_is_held(&vq->mmu_lock)); >>> +        map[i] = vq->maps[i]; >>>           if (map[i]) { >>>               vhost_set_map_dirty(vq, map[i], i); >>> -            rcu_assign_pointer(vq->maps[i], NULL); >>> +            vq->maps[i] = NULL; >>>           } >>>       } >>>       spin_unlock(&vq->mmu_lock); >>>   -    /* No need for synchronize_rcu() or kfree_rcu() since we are >>> -     * serialized with memory accessors (e.g vq mutex held). >>> +    /* No need for synchronization since we are serialized with >>> +     * memory accessors (e.g vq mutex held). >>>        */ >>>         for (i = 0; i < VHOST_NUM_ADDRS; i++) >>> @@ -362,6 +361,40 @@ static bool vhost_map_range_overlap(struct >>> vhost_uaddr *uaddr, >>>       return !(end < uaddr->uaddr || start > uaddr->uaddr - 1 + >>> uaddr->size); >>>   } >>>   +static void inline vhost_vq_access_map_begin(struct >>> vhost_virtqueue *vq) >>> +{ >>> +    write_seqcount_begin(&vq->seq); >>> +} >>> + >>> +static void inline vhost_vq_access_map_end(struct vhost_virtqueue *vq) >>> +{ >>> +    write_seqcount_end(&vq->seq); >>> +} >> The write side of a seqlock only provides write barriers. Access to >> >>     map = vq->maps[VHOST_ADDR_USED]; >> >> Still needs a read side barrier, and then I think this will be no >> better than a normal spinlock. >> >> It also doesn't seem like this algorithm even needs a seqlock, as this >> is just a one bit flag > > > Right, so then I tend to use spinlock first for correctness. > > >> >> atomic_set_bit(using map) >> smp_mb__after_atomic() >> .. maps [...] >> atomic_clear_bit(using map) >> >> >> map = NULL; >> smp_mb__before_atomic(); >> while (atomic_read_bit(using map)) >>     relax() >> >> Again, not clear this could be faster than a spinlock when the >> barriers are correct... > I've done some benchmark[1] on x86, and yes it looks even slower. It looks to me the atomic stuffs is not necessary, so in order to compare it better with spinlock. I tweak it a little bit through smp_load_acquire()/store_releaes() + mb() like: static struct vhost_map *vhost_vq_access_map_begin(struct vhost_virtqueue *vq,                                                                                                                              unsigned int type)                                                                                    {                                                                                                                                                                ++vq->counter;                                                                                                                                           /* Ensure map was read after incresing the counter. Paired                                                                                                * with smp_mb() in vhost_vq_sync_access().                                                                                                               */                                                                                                                                                      smp_mb();                                                                                                                                                return vq->maps[type];                                                                                                                           }                                                                                                                                                                                                                                                                                                                 static void inline vhost_vq_access_map_end(struct vhost_virtqueue *vq)                                                                                   {                                                                                                                                                                /* Ensure all memory access through map was done before                                                                                                   * reducing the counter. Paired with smp_load_acquire() in                                                                                                * vhost_vq_sync_access() */                                                                                                                             smp_store_release(&vq->counter, --vq->counter);                                                                                                  }                                                                                                                                                                                                                                                                                                                 static void inline vhost_vq_sync_access(struct vhost_virtqueue *vq)                                                                                      {                                                                                                                                                                /* Ensure new map value is visible before checking counter. */                                                                                           smp_mb();                                                                                                                                                /* Ensure map was freed after reading counter value, paired                                                                                               * with smp_store_release() in vhost_vq_access_map_end().                                                                                                 */                                                                                                                                                      while (smp_load_acquire(&vq->counter)) {                                                                                                                         if (need_resched())                                                                                                                                              schedule();                                                                                                                              }                                                                                                                                                }          And the result is something like:          base | direct + atomic bitops | direct + spinlock() | direct + counter + smp_mb() | direct + RCU     | SMAP on  | 5.0Mpps | 5.0Mpps     (+0%)      | 5.7Mpps      (+14%)      | 5.9Mpps  (+18%)            | 6.2Mpps  (+24%)  | SMAP off | 7.0Mpps | 7.0Mpps     (+0%)      | 7.0Mpps   (+0%)     | 7.5Mpps  (+7%)            | 8.2Mpps  (+17%)  | base: normal copy_to_user()/copy_from_user() path. direct + atomic bitops: using direct mapping but synchronize through atomic bitops like you suggested above direct + spinlock(): using direct mapping but synchronize through spinlocks direct + counter + smp_mb(): using direct mapping but synchronize through counter + smp_mb() direct + RCU: using direct mapping and synchronize through RCU (buggy and need to be addressed by this series) So smp_mb() + counter is fastest way. And spinlock can still show some improvement (+14%) in the case of SMAP, but no the case when SMAP is off. I don't have any objection to convert  to spinlock() but just want to know if any case that the above smp_mb() + counter looks good to you? Thanks > > Yes, for next release we may want to use the idea from Michael like to > mitigate the impact of mb. > > https://lwn.net/Articles/775871/ > > Thanks > > >> >> Jason