From fd6c8c353c28a2c3de39847957661014b6142470 Mon Sep 17 00:00:00 2001 From: Paymon MARANDI Date: Thu, 9 Dec 2021 14:30:19 -0500 Subject: [PATCH] try with tcp --- main.c | 106 ++++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 68 insertions(+), 38 deletions(-) diff --git a/main.c b/main.c index cc14a89..bc2f21f 100644 --- a/main.c +++ b/main.c @@ -16,6 +16,7 @@ #include #include #include +#include #include @@ -156,9 +157,16 @@ static void q_accept(struct io_uring *ring, conn_info *conn_i) sqe, conn_i->skf.mama, (struct sockaddr *)&conn_i->skf.child.peer.addr, &conn_i->skf.child.peer.addrlen, 0, conn_i->gbid); + /* + io_uring_prep_accept( + sqe, conn_i->skf.mama, + (struct sockaddr *)&conn_i->skf.child.peer.addr, + &conn_i->skf.child.peer.addrlen, 0); + */ conn_i->state = ACCEPT; io_uring_sqe_set_data(sqe, conn_i); + // sqe->flags |= IOSQE_IO_DRAIN; // sqe->flags |= IOSQE_IO_LINK | IOSQE_IO_HARDLINK; /* if (conn_i->reacts->recv.q) */ /* conn_i->reacts->recv.q(ring, conn_i); */ @@ -377,8 +385,12 @@ static int conn_factory(conn_info **conns, struct io_uring *ring, if ((conns[i]->provides_buf = tmpl.provides_buf)) conns[i]->reacts->buf.q(ring, conns[i]); } - if (!tmpl.provides_buf) + if (!tmpl.provides_buf) { + for (int i = 0; i < max_conn; i++) + conns[i]->state = READY; + return ret; + } unsigned head; unsigned count = 0; @@ -543,11 +555,11 @@ void *ctrlsock(void *name) exit(EX_OSERR); struct io_uring ring = { 0 }; - /* struct io_uring_params p = { */ - /* .flags = IORING_SETUP_SQPOLL, */ - /* .sq_thread_idle = 2000, */ - /* }; */ - struct io_uring_params p = { 0 }; + // struct io_uring_params p = { 0 }; + struct io_uring_params p = { + .flags = IORING_SETUP_SQPOLL, + .sq_thread_idle = 2000, + }; ret = io_uring_queue_init_params(MAX_CONNECTIONS * 2, &ring, &p); perrork(ret, "io_uring_queue_init_params::unix"); @@ -565,12 +577,13 @@ void *ctrlsock(void *name) conn_info tmpl = { .buf_len = 1024, .buf_num_seg = 1, - .reacts = &reacts, .provides_buf = true, + .reacts = &reacts, }; conns[0] = &tmpl; ret = conn_factory(conns, &ring, children_pool, unix_addr, MAX_CONNECTIONS); + perrork(ret, "conn_factory(unix)"); if (ret) { fprintf(stderr, "%s::%s %s\n", "conn_factory", __func__, strerror(ret)); @@ -584,8 +597,7 @@ void *ctrlsock(void *name) exit(EX_OSERR); } - for (int i=0; ireacts->accept.q(&ring, conns[i]); + conns[0]->reacts->accept.q(&ring, conns[0]); fprintf(stderr, "accepting connections to @%s\n", (char *)name); @@ -624,33 +636,29 @@ static void *wsub(void *data) children_pool[0] = socket(args->addrstore.ss_family, SOCK_STREAM, 0); /* shorthand */ __s32 mama = children_pool[0]; - if (mama == -1) { - perror("sock"); + perror("sock"); + if (mama == -1) exit(EX_OSERR); - } int val = 1; int ret = setsockopt(mama, SOL_SOCKET, SO_REUSEPORT | SO_REUSEADDR, &val, sizeof(val)); - if (ret == -1) { - perror("setsockopt(wsub)"); + perror("setsockopt(wsub)"); + if (ret == -1) exit(EX_OSERR); - } int tcp_f = TCP_NODELAY | TCP_DEFER_ACCEPT; ret = setsockopt(mama, IPPROTO_TCP, tcp_f, &val, sizeof(val)); - if (ret == -1) { - perror("setsockopt(tcp_f)"); + perror("setsockopt(tcp_f)"); + if (ret == -1) exit(EX_OSERR); - } if (args->ipv6) { ret = setsockopt(mama, IPPROTO_IP, IPV6_RECVPKTINFO, &val, sizeof(val)); - if (ret == -1) { - perror("setsockopt(ipv6)"); + perror("setsockopt(ipv6)"); + if (ret == -1) exit(EX_OSERR); - } // "fe80::5b3e:1bc6:ac47:c5c4", // wsub_addr->sin6.sin6_scope_id = if_nametoindex("enp0s31f6"); // wsub_addr.sin6_addr = inet_pton(in6addr_loopback); @@ -667,27 +675,30 @@ static void *wsub(void *data) } // bind and listen - if (bind(mama, (struct sockaddr *)args->addr, sizeof(*args->addr)) < - 0) { - perror("bind(wsub)"); + ret = bind(mama, (struct sockaddr *)args->addr, sizeof(*args->addr)); + perror("bind(wsub)"); + if (ret) exit(EX_OSERR); - } - if (listen(mama, BACKLOG) < 0) { - perror("listen(wsub)"); + ret = listen(mama, BACKLOG); + perror("listen(wsub)"); + if (ret) exit(EX_OSERR); - } fprintf(stderr, "wsub listening for connections on port: %d\n", args->port); struct io_uring_params p; struct io_uring ring; - memset(&p, 0, sizeof(p)); + // struct io_uring_params p = { 0 }; + struct io_uring_params param = { + .flags = IORING_SETUP_SQPOLL, + .sq_thread_idle = 2000, + }; - if (io_uring_queue_init_params(2048, &ring, &p) < 0) { - perror("io_uring_queue_init_params(wsub)"); + ret = io_uring_queue_init_params(MAX_CONNECTIONS * 2, &ring, ¶m); + perrork(ret, "io_uring_queue_init_params::unix"); + if (ret < 0) exit(EX_OSERR); - } reactions reacts = { .accept = { .dq = dq_accept, .q = q_accept }, @@ -702,16 +713,23 @@ static void *wsub(void *data) .reacts = &reacts, }; conns[0] = &tmpl; - // io_uring_prep_provide_buffers(sqe_wsub, bufs, MAX_MESSAGE_LEN, - // BUFFERS_COUNT, group_id, 0); - ret = conn_factory(conns, &ring, children_pool, args->addr, MAX_CONNECTIONS); + perrork(ret, "conn_factory(wsub)"); if (ret) { fprintf(stderr, "ret = %d\n", ret); - perror("conn_factory(wsub)"); exit(EX_OSERR); } + ret = io_uring_register_files(&ring, children_pool, + MAX_CONNECTIONS + 1); + if (ret) { + fprintf(stderr, "%s::%s %s\n", "io_uring_register_files", + __func__, strerror(-ret)); + exit(EX_OSERR); + } + + conns[0]->reacts->accept.q(&ring, conns[0]); + event_loop(&ring, conns); close(mama); @@ -732,6 +750,18 @@ static void *wsub(void *data) int main(int argc, char **argv) { rval_t ret = EX_OK; + struct rlimit rlim = {0}; + getrlimit(RLIMIT_NPROC, &rlim); + if (!(rlim.rlim_cur == RLIM_INFINITY) || !(rlim.rlim_max == RLIM_INFINITY)) { + fprintf(stderr, "rlim.rlim_cur=%lu rlim.rlim_max=%lu\n", + rlim.rlim_cur, rlim.rlim_max); + rlim.rlim_cur = RLIM_INFINITY; + rlim.rlim_max = RLIM_INFINITY; + ret = setrlimit(RLIMIT_NPROC, &rlim); + perror("setrlimit"); + if (ret) + exit(EX_SOFTWARE); + } struct wsub_args wsub_args = { .port = 8002, @@ -767,19 +797,19 @@ int main(int argc, char **argv) if (!exec_name) return EX_OSERR; + /* ret = pthread_create(&ctrl_thread, NULL, ctrlsock, (void *)(exec_name)); if (ret) { perror("pthread_create(ctrlsock)"); return EX_OSERR; } - /* + */ pthread_t wsub_thread; ret = pthread_create(&wsub_thread, NULL, wsub, (void *)(&wsub_args)); if (ret) { perror("pthread_create(wsub)"); return EX_OSERR; } - */ pthread_exit(0); return ret; -- 2.34.1