On Mon, Aug 16, 2021 at 09:42:38AM -0700, Elena Ufimtseva wrote: > @@ -62,5 +65,10 @@ typedef struct VFIOProxy { > > VFIOProxy *vfio_user_connect_dev(SocketAddress *addr, Error **errp); > void vfio_user_disconnect(VFIOProxy *proxy); > +void vfio_user_set_reqhandler(VFIODevice *vbasdev, "vbasedev" for consistency? > + int (*handler)(void *opaque, char *buf, > + VFIOUserFDs *fds), > + void *reqarg); The handler callback is undocumented. What context does it run in, what do the arguments mean, and what should the function return? Please document it so it's easy for others to modify this code in the future without reverse-engineering the assumptions behind it. > +void vfio_user_recv(void *opaque) > +{ > + VFIODevice *vbasedev = opaque; > + VFIOProxy *proxy = vbasedev->proxy; > + VFIOUserReply *reply = NULL; > + g_autofree int *fdp = NULL; > + VFIOUserFDs reqfds = { 0, 0, fdp }; > + VFIOUserHdr msg; > + struct iovec iov = { > + .iov_base = &msg, > + .iov_len = sizeof(msg), > + }; > + bool isreply; > + int i, ret; > + size_t msgleft, numfds = 0; > + char *data = NULL; > + g_autofree char *buf = NULL; > + Error *local_err = NULL; > + > + qemu_mutex_lock(&proxy->lock); > + if (proxy->state == VFIO_PROXY_CLOSING) { > + qemu_mutex_unlock(&proxy->lock); > + return; > + } > + > + ret = qio_channel_readv_full(proxy->ioc, &iov, 1, &fdp, &numfds, > + &local_err); This is a blocking call. My understanding is that the IOThread is shared by all vfio-user devices, so other devices will have to wait if one of them is acting up (e.g. the device emulation process sent less than sizeof(msg) bytes). While we're blocked in this function the proxy device cannot be hot-removed since proxy->lock is held. It would more robust to use of the event loop to avoid blocking. There could be a per-connection receiver coroutine that calls qio_channel_readv_full_all_eof() (it yields the coroutine if reading would block). > + /* > + * Replies signal a waiter, requests get processed by vfio code > + * that may assume the iothread lock is held. > + */ > + if (isreply) { > + reply->complete = 1; > + if (!reply->nowait) { > + qemu_cond_signal(&reply->cv); > + } else { > + if (msg.flags & VFIO_USER_ERROR) { > + error_printf("vfio_user_rcv error reply on async request "); > + error_printf("command %x error %s\n", msg.command, > + strerror(msg.error_reply)); > + } > + /* just free it if no one is waiting */ > + reply->nowait = 0; > + if (proxy->last_nowait == reply) { > + proxy->last_nowait = NULL; > + } > + g_free(reply->msg); > + QTAILQ_INSERT_HEAD(&proxy->free, reply, next); > + } > + qemu_mutex_unlock(&proxy->lock); > + } else { > + qemu_mutex_unlock(&proxy->lock); > + qemu_mutex_lock_iothread(); The fact that proxy->request() runs with the BQL suggests that VFIO communication should take place in the main event loop thread instead of a separate IOThread. > + /* > + * make sure proxy wasn't closed while we waited > + * checking state without holding the proxy lock is safe > + * since it's only set to CLOSING when BQL is held > + */ > + if (proxy->state != VFIO_PROXY_CLOSING) { > + ret = proxy->request(proxy->reqarg, buf, &reqfds); The request() callback in an earlier patch is a noop for the client implementation. Who frees passed fds? > + if (ret < 0 && !(msg.flags & VFIO_USER_NO_REPLY)) { > + vfio_user_send_reply(proxy, buf, ret); > + } > + } > + qemu_mutex_unlock_iothread(); > + } > + return; > + > +fatal: > + vfio_user_shutdown(proxy); > + proxy->state = VFIO_PROXY_RECV_ERROR; > + > +err: > + for (i = 0; i < numfds; i++) { > + close(fdp[i]); > + } > + if (reply != NULL) { > + /* force an error to keep sending thread from hanging */ > + reply->msg->flags |= VFIO_USER_ERROR; > + reply->msg->error_reply = EINVAL; > + reply->complete = 1; > + qemu_cond_signal(&reply->cv); What about fd passing? The actual fds have been closed already in fdp[] but reply has a copy too. What about the nowait case? If no one is waiting on reply->cv so this reply will be leaked? Stefan