Browse Source

ccan/io: io_duplex.

Cleaner model for I/O, with cost of complexity if you really want bidir.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
Rusty Russell 12 years ago
parent
commit
a2dffefa5e
6 changed files with 352 additions and 55 deletions
  1. 88 0
      ccan/io/SCENARIOS
  2. 10 4
      ccan/io/backend.h
  3. 30 17
      ccan/io/io.c
  4. 27 0
      ccan/io/io.h
  5. 75 34
      ccan/io/poll.c
  6. 122 0
      ccan/io/test/run-12-bidir.c

+ 88 - 0
ccan/io/SCENARIOS

@@ -0,0 +1,88 @@
+Simple:
+	step1(conn): read(conn), then step2
+	step2(conn): write(conn), then close
+
+Pass-through:
+	step1(conn): read(conn), then step2
+	step2(conn): write(otherconn), then step1
+
+Pass-through-and-connect:
+	step1(conn): read(conn), then step2
+	step2(conn): connect(otherconn), then step3
+	step3(conn): write(otherconn), then step1
+
+Chatroom:
+	step1(conn): read(conn), then step2
+	step2(conn): for c in allcons: write(c).  goto step1
+
+Simple:
+
+void event(struct io_event *done)
+{
+	char *buf = done->priv;
+	struct io_event *e;
+
+	e = queue_read(done, done->conn, buf, 100);
+	e = queue_write(e, done->conn, buf, 100);
+	queue_close(e, done->conn);
+}
+
+Pass-through:
+struct passthru {
+	char buf[100];
+	struct conn *rconn, *wconn;
+};
+
+void event(struct io_event *done)
+{
+	struct passthru *p = done->priv;
+	struct io_event *e;
+
+	e = queue_read(done, p->rconn, p->buf, 100);
+	e = queue_write(e, p->wconn, buf, 100);
+	queue_event(e, event);
+}
+
+Chatroom:
+struct list_head clients;
+
+struct buffer {
+	char buf[100];
+	unsigned int ref;
+};
+
+struct client {
+	struct list_node list;
+	struct connection *conn;
+	struct buffer *rbuf, *wbuf;
+};
+
+void broadcast(struct io_event *done)
+{
+	struct client *i, *c = done->conn->priv;
+	struct io_event *e;
+
+	list_for_each(&clients, i, list) {
+		e = queue_write(done, i->conn, c->buf->buf, 100);
+		e->priv = c->buf;
+		c->buf->ref++;
+		queue_event(e, drop_ref);
+	}
+
+
+
+void event(struct io_event *done)
+{
+	struct client *c = done->conn->priv;
+	struct io_event *e;
+
+	assert(c->conn == done->conn);
+	c->buf = malloc(sizeof(*c->buf));
+	c->buf->ref = 0;
+	e = queue_read(done, c->conn, c->buf->buf, 100);
+	e = queue_event(e, broadcast);
+}
+
+
+	step1(conn): read(conn), then step2
+	step2(conn): for c in allcons: write(c).  goto step1

+ 10 - 4
ccan/io/backend.h

@@ -22,11 +22,15 @@ struct io_listener {
 };
 
 enum io_state {
-	NEXT, /* eg starting, woken from idle, return from io_break. */
+	/* These wait for something to input */
 	READ,
-	WRITE,
 	READPART,
+
+	/* These wait for room to output */
+	WRITE,
 	WRITEPART,
+
+	NEXT, /* eg starting, woken from idle, return from io_break. */
 	IDLE,
 	FINISHED,
 	PROCESSING /* We expect them to change this now. */
@@ -61,6 +65,8 @@ struct io_state_writepart {
 struct io_conn {
 	struct fd fd;
 
+	struct io_conn *duplex;
+
 	enum io_state state;
 	union {
 		struct io_state_read read;
@@ -74,9 +80,9 @@ extern void *io_loop_return;
 
 bool add_listener(struct io_listener *l);
 bool add_conn(struct io_conn *c);
+bool add_duplex(struct io_conn *c);
 void del_listener(struct io_listener *l);
 void backend_set_state(struct io_conn *conn, struct io_op *op);
 
-struct io_op *do_writeable(struct io_conn *conn);
-struct io_op *do_readable(struct io_conn *conn);
+struct io_op *do_ready(struct io_conn *conn);
 #endif /* CCAN_IO_BACKEND_H */

+ 30 - 17
ccan/io/io.c

@@ -57,6 +57,7 @@ struct io_conn *io_new_conn_(int fd,
 	conn->fd.finish = finish;
 	conn->fd.finish_arg = conn->fd.next_arg = arg;
 	conn->state = NEXT;
+	conn->duplex = NULL;
 	if (!add_conn(conn)) {
 		free(conn);
 		return NULL;
@@ -64,6 +65,34 @@ struct io_conn *io_new_conn_(int fd,
 	return conn;
 }
 
+struct io_conn *io_duplex_(struct io_conn *old,
+			     struct io_op *(*start)(struct io_conn *, void *),
+			     void (*finish)(struct io_conn *, void *),
+			     void *arg)
+{
+	struct io_conn *conn;
+
+	assert(!old->duplex);
+
+	conn = malloc(sizeof(*conn));
+	if (!conn)
+		return NULL;
+
+	conn->fd.listener = false;
+	conn->fd.fd = old->fd.fd;
+	conn->fd.next = start;
+	conn->fd.finish = finish;
+	conn->fd.finish_arg = conn->fd.next_arg = arg;
+	conn->state = NEXT;
+	conn->duplex = old;
+	if (!add_duplex(conn)) {
+		free(conn);
+		return NULL;
+	}
+	old->duplex = conn;
+	return conn;
+}
+
 /* Convenient token which only we can produce. */
 static inline struct io_next *to_ionext(struct io_conn *conn)
 {
@@ -149,7 +178,7 @@ static struct io_op *do_next(struct io_conn *conn)
 	return conn->fd.next(conn, conn->fd.next_arg);
 }
 
-struct io_op *do_writeable(struct io_conn *conn)
+struct io_op *do_ready(struct io_conn *conn)
 {
 	ssize_t ret;
 	bool finished;
@@ -171,22 +200,6 @@ struct io_op *do_writeable(struct io_conn *conn)
 		*conn->u.writepart.lenp = ret;
 		finished = true;
 		break;
-	default:
-		/* Shouldn't happen. */
-		abort();
-	}
-
-	if (finished)
-		return do_next(conn);
-	return to_ioop(conn->state);
-}
-
-struct io_op *do_readable(struct io_conn *conn)
-{
-	ssize_t ret;
-	bool finished;
-
-	switch (conn->state) {
 	case READ:
 		ret = read(conn->fd.fd, conn->u.read.buf, conn->u.read.len);
 		if (ret <= 0)

+ 27 - 0
ccan/io/io.h

@@ -151,6 +151,33 @@ struct io_op *io_write_partial(const void *data, size_t *len,
  */
 struct io_op *io_idle(struct io_conn *conn);
 
+/**
+ * io_duplex - split an fd into two connections.
+ * @conn: a connection.
+ * @start: the first function to call.
+ * @finish: the function to call when it's closed or fails.
+ * @arg: the argument to both @start and @finish.
+ *
+ * Sometimes you want to be able to simultaneously read and write on a
+ * single fd, but io forces a linear call sequence.  The solition is
+ * to have two connections for the same fd, and use one for read
+ * operations and one for write.
+ *
+ * You must io_close() both of them to close the fd.
+ */
+#define io_duplex(conn, start, finish, arg)				\
+	io_duplex_((conn),						\
+		   typesafe_cb_preargs(struct io_op *, void *,		\
+				       (start), (arg), struct io_conn *), \
+		   typesafe_cb_preargs(void, void *, (finish), (arg),	\
+				       struct io_conn *),		\
+		   (arg))
+
+struct io_conn *io_duplex_(struct io_conn *conn,
+			   struct io_op *(*start)(struct io_conn *, void *),
+			   void (*finish)(struct io_conn *, void *),
+			   void *arg);
+
 /**
  * io_wake - wake up and idle connection.
  * @conn: an idle connection.

+ 75 - 34
ccan/io/poll.c

@@ -76,11 +76,23 @@ bool add_conn(struct io_conn *c)
 	return true;
 }
 
+bool add_duplex(struct io_conn *c)
+{
+	c->fd.backend_info = c->duplex->fd.backend_info;
+	num_next++;
+	return true;
+}
+
 static void del_conn(struct io_conn *conn)
 {
 	if (conn->fd.finish)
 		conn->fd.finish(conn, conn->fd.finish_arg);
-	del_fd(&conn->fd);
+	if (conn->duplex) {
+		/* In case fds[] pointed to the other one. */
+		fds[conn->fd.backend_info] = &conn->duplex->fd;
+		conn->duplex->duplex = NULL;
+	} else
+		del_fd(&conn->fd);
 	if (conn->state == FINISHED)
 		num_finished--;
 	else if (conn->state == NEXT)
@@ -92,32 +104,38 @@ void del_listener(struct io_listener *l)
 	del_fd(&l->fd);
 }
 
-void backend_set_state(struct io_conn *conn, struct io_op *op)
+static int pollmask(enum io_state state)
 {
-	enum io_state state = from_ioop(op);
-	struct pollfd *pfd = &pollfds[conn->fd.backend_info];
-
 	switch (state) {
 	case READ:
 	case READPART:
-		pfd->events = POLLIN;
-		break;
+		return POLLIN;
 	case WRITE:
 	case WRITEPART:
-		pfd->events = POLLOUT;
-		break;
-	case IDLE:
-		pfd->events = 0;
-		break;
-	case NEXT:
-		num_next++;
-		break;
-	case FINISHED:
-		num_finished++;
-		break;
+		return POLLOUT;
 	default:
-		abort();
+		return 0;
 	}
+}
+
+void backend_set_state(struct io_conn *conn, struct io_op *op)
+{
+	enum io_state state = from_ioop(op);
+	struct pollfd *pfd = &pollfds[conn->fd.backend_info];
+
+	pfd->events = pollmask(state);
+	if (conn->duplex) {
+		int mask = pollmask(conn->duplex->state);
+		/* You can't *both* read/write. */
+		assert(!mask || pfd->events != mask);
+		pfd->events |= mask;
+	}
+
+	if (state == NEXT)
+		num_next++;
+	else if (state == FINISHED)
+		num_finished++;
+
 	conn->state = state;
 }
 
@@ -142,7 +160,7 @@ static void finish_and_next(bool finished_only)
 	unsigned int i;
 
 	for (i = 0; !io_loop_return && i < num_fds; i++) {
-		struct io_conn *c;
+		struct io_conn *c, *duplex;
 
 		if (!num_finished) {
 			if (finished_only || num_next == 0)
@@ -151,17 +169,26 @@ static void finish_and_next(bool finished_only)
 		if (fds[i]->listener)
 			continue;
 		c = (void *)fds[i];
-		if (c->state == FINISHED) {
-			del_conn(c);
-			free(c);
-			i--;
-		} else if (!finished_only && c->state == NEXT) {
-			backend_set_state(c, c->fd.next(c, c->fd.next_arg));
-			num_next--;
+		for (duplex = c->duplex; c; c = duplex, duplex = NULL) {
+			if (c->state == FINISHED) {
+				del_conn(c);
+				free(c);
+				i--;
+			} else if (!finished_only && c->state == NEXT) {
+				backend_set_state(c,
+						  c->fd.next(c,
+							     c->fd.next_arg));
+				num_next--;
+			}
 		}
 	}
 }
 
+static void ready(struct io_conn *c)
+{
+	backend_set_state(c, do_ready(c));
+}
+
 /* This is the main loop. */
 void *io_loop(void)
 {
@@ -185,16 +212,30 @@ void *io_loop(void)
 
 		for (i = 0; i < num_fds && !io_loop_return; i++) {
 			struct io_conn *c = (void *)fds[i];
-			if (pollfds[i].revents & POLLOUT)
-				backend_set_state(c, do_writeable(c));
-			else if (pollfds[i].revents & POLLIN) {
-				if (fds[i]->listener)
+			int events = pollfds[i].revents;
+
+			if (fds[i]->listener) {
+				if (events & POLLIN)
 					accept_conn((void *)c);
-				else
-					backend_set_state(c, do_readable(c));
-			} else if (pollfds[i].revents & POLLHUP) {
+			} else if (events & (POLLIN|POLLOUT)) {
+				if (c->duplex) {
+					int mask = pollmask(c->duplex->state);
+					if (events & mask) {
+						ready(c->duplex);
+						events &= ~mask;
+						if (!(events&(POLLIN|POLLOUT)))
+							continue;
+					}
+				}
+				ready(c);
+			} else if (events & POLLHUP) {
 				backend_set_state(c, io_close(c, NULL));
+				if (c->duplex)
+					backend_set_state(c->duplex,
+							  io_close(c->duplex,
+								   NULL));
 			}
+
 		}
 	}
 

+ 122 - 0
ccan/io/test/run-12-bidir.c

@@ -0,0 +1,122 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+struct data {
+	struct io_listener *l;
+	int state;
+	char buf[4];
+	char wbuf[32];
+};
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+	d->state++;
+}
+
+static struct io_op *write_out(struct io_conn *conn, struct data *d)
+{
+	d->state++;
+	return io_write(d->wbuf, sizeof(d->wbuf), io_next(conn, io_close, d));
+}
+
+static struct io_op *start_ok(struct io_conn *conn, struct data *d)
+{
+	ok1(d->state == 0);
+	d->state++;
+
+	io_close_listener(d->l);
+
+	memset(d->wbuf, 7, sizeof(d->wbuf));
+	ok1(io_duplex(conn, write_out, finish_ok, d));
+	return io_read(d->buf, sizeof(d->buf), io_next(conn, io_close, d));
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+	int fd, on = 1;
+	struct addrinfo *addrinfo, hints;
+
+	memset(&hints, 0, sizeof(hints));
+	hints.ai_family = AF_UNSPEC;
+	hints.ai_socktype = SOCK_STREAM;
+	hints.ai_flags = AI_PASSIVE;
+	hints.ai_protocol = 0;
+
+	if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+		return -1;
+
+	fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+		    addrinfo->ai_protocol);
+	if (fd < 0)
+		return -1;
+
+	setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+	if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+		close(fd);
+		return -1;
+	}
+	if (listen(fd, 1) != 0) {
+		close(fd);
+		return -1;
+	}
+	*info = addrinfo;
+	return fd;
+}
+
+int main(void)
+{
+	struct data *d = malloc(sizeof(*d));
+	struct addrinfo *addrinfo;
+	int fd, status;
+
+	/* This is how many tests you plan to run */
+	plan_tests(10);
+	d->state = 0;
+	fd = make_listen_fd("65012", &addrinfo);
+	ok1(fd >= 0);
+	d->l = io_new_listener(fd, start_ok, finish_ok, d);
+	ok1(d->l);
+	fflush(stdout);
+	if (!fork()) {
+		int i;
+		char buf[32];
+
+		io_close_listener(d->l);
+		free(d);
+		fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+			    addrinfo->ai_protocol);
+		if (fd < 0)
+			exit(1);
+		if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+			exit(2);
+		signal(SIGPIPE, SIG_IGN);
+		for (i = 0; i < 32; i++) {
+			if (read(fd, buf+i, 1) != 1)
+				break;
+		}
+		for (i = 0; i < strlen("hellothere"); i++) {
+			if (write(fd, "hellothere" + i, 1) != 1)
+				break;
+		}
+		close(fd);
+		freeaddrinfo(addrinfo);
+		exit(0);
+	}
+	freeaddrinfo(addrinfo);
+	ok1(io_loop() == NULL);
+	ok1(d->state == 4);
+	ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0);
+	free(d);
+
+	ok1(wait(&status));
+	ok1(WIFEXITED(status));
+	ok1(WEXITSTATUS(status) == 0);
+
+	/* This exits depending on whether all tests passed */
+	return exit_status();
+}