Browse Source

ccan/io: new module.

Designed for async I/O.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
Rusty Russell 12 years ago
parent
commit
0a2fd289c7

+ 1 - 0
Makefile-ccan

@@ -55,6 +55,7 @@ MODS_WITH_SRC := antithread \
 	htable \
 	htable \
 	idtree \
 	idtree \
 	ilog \
 	ilog \
+	io \
 	isaac \
 	isaac \
 	iscsi \
 	iscsi \
 	jmap \
 	jmap \

+ 1 - 0
ccan/io/LICENSE

@@ -0,0 +1 @@
+../../licenses/BSD-MIT

+ 174 - 0
ccan/io/_info

@@ -0,0 +1,174 @@
+#include <stdio.h>
+#include <string.h>
+#include "config.h"
+
+/**
+ * io - simple library for stateful io handling.
+ *
+ * io provides a simple mechanism to write I/O servers with multiple
+ * connections.  Handling of connections is multiplexed, and function
+ * indicate what they want written or read, and what follow-on
+ * function to call on success (or failure).
+ *
+ * Example:
+ * // Given tr A-Z a-z outputs tr a-z a-z
+ * #include <ccan/io/io.h>
+ * #include <ccan/err/err.h>
+ * #include <assert.h>
+ * #include <stdlib.h>
+ * #include <signal.h>
+ * #include <sys/types.h>
+ * #include <sys/wait.h>
+ *
+ * struct buffer {
+ * 	size_t max, off, rlen;
+ * 	char *buf;
+ * };
+ *
+ * struct stdin_buffer {
+ * 	struct io_conn *reader, *writer;
+ * 	size_t len;
+ * 	char inbuf[4096];
+ * };
+ *
+ * // This reads from stdin.
+ * static struct io_op *wake_writer(struct io_conn *, struct stdin_buffer *);
+ * // This writes the stdin buffer to the child.
+ * static struct io_op *write_to_child(struct io_conn *c,
+ * 				       struct stdin_buffer *b);
+ * static struct io_op *read_stdin(struct io_conn *c, struct stdin_buffer *b)
+ * {
+ * 	assert(c == b->reader);
+ * 	b->len = sizeof(b->inbuf);
+ * 	return io_read_partial(b->inbuf, &b->len,
+ *			       io_next(c, wake_writer, b));
+ * }
+ *
+ * static struct io_op *wake_writer(struct io_conn *c, struct stdin_buffer *b)
+ * {
+ * 	assert(c == b->reader);
+ * 	io_wake(b->writer, write_to_child, b);
+ * 	return io_idle(c);
+ * }
+ *
+ * static void reader_exit(struct io_conn *c, struct stdin_buffer *b)
+ * {
+ * 	assert(c == b->reader);
+ * 	io_wake(b->writer, write_to_child, b);
+ * 	b->reader = NULL;
+ * }
+ *
+ * static struct io_op *wake_reader(struct io_conn *c, struct stdin_buffer *b)
+ * {
+ * 	assert(c == b->writer);
+ * 	io_wake(b->reader, read_stdin, b);
+ * 	return io_idle(c);
+ * }
+ *
+ * static struct io_op *write_to_child(struct io_conn *conn,
+ * 				      struct stdin_buffer *b)
+ * {
+ * 	assert(conn == b->writer);
+ * 	if (!b->reader)
+ * 		return io_close(conn, NULL);
+ * 	return io_write(b->inbuf, b->len, io_next(conn, wake_reader, b));
+ * }
+ *
+ * static struct io_op *start_writer(struct io_conn *conn,
+ * 				     struct stdin_buffer *b)
+ * {
+ * 	assert(conn == b->writer);
+ * 	return io_idle(conn);
+ * }
+ *
+ * static void fail_child_write(struct io_conn *conn, struct stdin_buffer *b)
+ * {
+ * 	if (b->reader)
+ * 		err(1, "Failed writing to child.");
+ * }
+ *
+ * // This reads from the child and saves it into buffer.
+ * static struct io_op *read_from_child(struct io_conn *conn,
+ * 				        struct buffer *b)
+ * {
+ * 	b->off += b->rlen;
+ *
+ * 	if (b->off == b->max) {
+ * 		if (b->max == 0)
+ * 			b->max = 128;
+ * 		else if (b->max >= 1024*1024)
+ * 			b->max += 1024*1024;
+ * 		else
+ * 			b->max *= 2;
+ * 		b->buf = realloc(b->buf, b->max);
+ * 	}
+ *
+ * 	b->rlen = b->max - b->off;
+ * 	return io_read_partial(b->buf + b->off, &b->rlen,
+ *			       io_next(conn, read_from_child, b));
+ * }
+ *
+ * // Feed a program our stdin, gather its stdout, print that at end.
+ * int main(int argc, char *argv[])
+ * {
+ * 	int tochild[2], fromchild[2];
+ * 	struct buffer out = { 0, 0, 0, NULL };
+ * 	struct stdin_buffer sbuf;
+ * 	int status;
+ * 	size_t off;
+ * 	ssize_t ret;
+ *
+ * 	if (argc == 1)
+ * 		errx(1, "Usage: runner <cmdline>...");
+ *
+ * 	if (pipe(tochild) != 0 || pipe(fromchild) != 0)
+ * 		err(1, "Creating pipes");
+ *
+ * 	if (!fork()) {
+ * 		// Child runs command.
+ * 		close(tochild[1]);
+ * 		close(fromchild[0]);
+ *
+ * 		dup2(tochild[0], STDIN_FILENO);
+ * 		dup2(fromchild[1], STDOUT_FILENO);
+ * 		execvp(argv[1], argv + 1);
+ * 		exit(127);
+ * 	}
+ *
+ * 	close(tochild[0]);
+ * 	close(fromchild[1]);
+ * 	signal(SIGPIPE, SIG_IGN);
+ *
+ * 	sbuf.reader = io_new_conn(STDIN_FILENO, read_stdin, reader_exit, &sbuf);
+ * 	sbuf.writer = io_new_conn(tochild[1], start_writer, fail_child_write,
+ * 				  &sbuf);
+ * 	if (!sbuf.reader || !sbuf.writer
+ * 	    || !io_new_conn(fromchild[0], read_from_child, NULL, &out))
+ * 		err(1, "Allocating connections");
+ *
+ * 	io_loop();
+ * 	wait(&status);
+ *
+ * 	for (off = 0; off < out.off; off += ret) {
+ * 		ret = write(STDOUT_FILENO, out.buf+off, out.off-off);
+ * 		if (ret < 0)
+ * 			err(1, "Writing stdout");
+ * 	}
+ * 	free(out.buf);
+ *
+ * 	return WIFEXITED(status) ? WEXITSTATUS(status) : 2;
+ * }
+ *
+ * License: BSD-MIT
+ */
+int main(int argc, char *argv[])
+{
+	if (argc != 2)
+		return 1;
+
+	if (strcmp(argv[1], "depends") == 0) {
+		return 0;
+	}
+
+	return 1;
+}

+ 82 - 0
ccan/io/backend.h

@@ -0,0 +1,82 @@
+/* Licensed under BSD-MIT - see LICENSE file for details */
+#ifndef CCAN_IO_BACKEND_H
+#define CCAN_IO_BACKEND_H
+#include <stdbool.h>
+
+struct fd {
+	int fd;
+	bool listener;
+	size_t backend_info;
+
+	struct io_op *(*next)(struct io_conn *, void *arg);
+	void *next_arg;
+
+	void (*finish)(struct io_conn *, void *arg);
+	void *finish_arg;
+};
+
+
+/* Listeners create connections. */
+struct io_listener {
+	struct fd fd;
+};
+
+enum io_state {
+	NEXT, /* eg starting, woken from idle, return from io_break. */
+	READ,
+	WRITE,
+	READPART,
+	WRITEPART,
+	IDLE,
+	FINISHED,
+	PROCESSING /* We expect them to change this now. */
+};
+
+static inline enum io_state from_ioop(struct io_op *op)
+{
+	return (enum io_state)(long)op;
+}
+
+struct io_state_read {
+	char *buf;
+	size_t len;
+};
+
+struct io_state_write {
+	const char *buf;
+	size_t len;
+};
+
+struct io_state_readpart {
+	char *buf;
+	size_t *lenp;
+};
+
+struct io_state_writepart {
+	const char *buf;
+	size_t *lenp;
+};
+
+/* One connection per client. */
+struct io_conn {
+	struct fd fd;
+
+	enum io_state state;
+	union {
+		struct io_state_read read;
+		struct io_state_write write;
+		struct io_state_readpart readpart;
+		struct io_state_writepart writepart;
+	} u;
+};
+
+extern void *io_loop_return;
+
+bool add_listener(struct io_listener *l);
+bool add_conn(struct io_conn *c);
+void del_listener(struct io_listener *l);
+void backend_set_state(struct io_conn *conn, struct io_op *op);
+
+struct io_op *do_writeable(struct io_conn *conn);
+struct io_op *do_readable(struct io_conn *conn);
+#endif /* CCAN_IO_BACKEND_H */

+ 229 - 0
ccan/io/io.c

@@ -0,0 +1,229 @@
+/* Licensed under BSD-MIT - see LICENSE file for details */
+#include "io.h"
+#include "backend.h"
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <string.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <assert.h>
+
+void *io_loop_return;
+
+struct io_listener *io_new_listener_(int fd,
+				     struct io_op *(*start)(struct io_conn *,
+							    void *arg),
+				     void (*finish)(struct io_conn *, void *),
+				     void *arg)
+{
+	struct io_listener *l = malloc(sizeof(*l));
+
+	if (!l)
+		return NULL;
+
+	l->fd.listener = true;
+	l->fd.fd = fd;
+	l->fd.next = start;
+	l->fd.finish = finish;
+	l->fd.finish_arg = l->fd.next_arg = arg;
+	if (!add_listener(l)) {
+		free(l);
+		return NULL;
+	}
+	return l;
+}
+
+void io_close_listener(struct io_listener *l)
+{
+	close(l->fd.fd);
+	del_listener(l);
+	free(l);
+}
+
+struct io_conn *io_new_conn_(int fd,
+			     struct io_op *(*start)(struct io_conn *, void *),
+			     void (*finish)(struct io_conn *, void *),
+			     void *arg)
+{
+	struct io_conn *conn = malloc(sizeof(*conn));
+
+	if (!conn)
+		return NULL;
+
+	conn->fd.listener = false;
+	conn->fd.fd = fd;
+	conn->fd.next = start;
+	conn->fd.finish = finish;
+	conn->fd.finish_arg = conn->fd.next_arg = arg;
+	conn->state = NEXT;
+	if (!add_conn(conn)) {
+		free(conn);
+		return NULL;
+	}
+	return conn;
+}
+
+/* Convenient token which only we can produce. */
+static inline struct io_next *to_ionext(struct io_conn *conn)
+{
+	return (struct io_next *)conn;
+}
+
+static inline struct io_op *to_ioop(enum io_state state)
+{
+	return (struct io_op *)(long)state;
+}
+
+static inline struct io_conn *from_ionext(struct io_next *next)
+{
+	return (struct io_conn *)next;
+}
+
+struct io_next *io_next_(struct io_conn *conn,
+			 struct io_op *(*next)(struct io_conn *, void *),
+			 void *arg)
+{
+	conn->fd.next = next;
+	conn->fd.next_arg = arg;
+
+	return to_ionext(conn);
+}
+
+/* Queue some data to be written. */
+struct io_op *io_write(const void *data, size_t len, struct io_next *next)
+{
+	struct io_conn *conn = from_ionext(next);
+	conn->u.write.buf = data;
+	conn->u.write.len = len;
+	return to_ioop(WRITE);
+}
+
+/* Queue a request to read into a buffer. */
+struct io_op *io_read(void *data, size_t len, struct io_next *next)
+{
+	struct io_conn *conn = from_ionext(next);
+	conn->u.read.buf = data;
+	conn->u.read.len = len;
+	return to_ioop(READ);
+}
+
+/* Queue a partial request to read into a buffer. */
+struct io_op *io_read_partial(void *data, size_t *len, struct io_next *next)
+{
+	struct io_conn *conn = from_ionext(next);
+	conn->u.readpart.buf = data;
+	conn->u.readpart.lenp = len;
+	return to_ioop(READPART);
+}
+
+/* Queue a partial write request. */
+struct io_op *io_write_partial(const void *data, size_t *len, struct io_next *next)
+{
+	struct io_conn *conn = from_ionext(next);
+	conn->u.writepart.buf = data;
+	conn->u.writepart.lenp = len;
+	return to_ioop(WRITEPART);
+}
+
+struct io_op *io_idle(struct io_conn *conn)
+{
+	return to_ioop(IDLE);
+}
+
+void io_wake_(struct io_conn *conn,
+	      struct io_op *(*next)(struct io_conn *, void *), void *arg)
+
+{
+	/* It might have finished, but we haven't called its finish() yet. */
+	if (conn->state == FINISHED)
+		return;
+	assert(conn->state == IDLE);
+	conn->fd.next = next;
+	conn->fd.next_arg = arg;
+	backend_set_state(conn, to_ioop(NEXT));
+}
+
+static struct io_op *do_next(struct io_conn *conn)
+{
+	return conn->fd.next(conn, conn->fd.next_arg);
+}
+
+struct io_op *do_writeable(struct io_conn *conn)
+{
+	ssize_t ret;
+	bool finished;
+
+	switch (conn->state) {
+	case WRITE:
+		ret = write(conn->fd.fd, conn->u.write.buf, conn->u.write.len);
+		if (ret < 0)
+			return io_close(conn, NULL);
+		conn->u.write.buf += ret;
+		conn->u.write.len -= ret;
+		finished = (conn->u.write.len == 0);
+		break;
+	case WRITEPART:
+		ret = write(conn->fd.fd, conn->u.writepart.buf,
+			    *conn->u.writepart.lenp);
+		if (ret < 0)
+			return io_close(conn, NULL);
+		*conn->u.writepart.lenp = ret;
+		finished = true;
+		break;
+	default:
+		/* Shouldn't happen. */
+		abort();
+	}
+
+	if (finished)
+		return do_next(conn);
+	return to_ioop(conn->state);
+}
+
+struct io_op *do_readable(struct io_conn *conn)
+{
+	ssize_t ret;
+	bool finished;
+
+	switch (conn->state) {
+	case READ:
+		ret = read(conn->fd.fd, conn->u.read.buf, conn->u.read.len);
+		if (ret <= 0)
+			return io_close(conn, NULL);
+		conn->u.read.buf += ret;
+		conn->u.read.len -= ret;
+		finished = (conn->u.read.len == 0);
+		break;
+	case READPART:
+		ret = read(conn->fd.fd, conn->u.readpart.buf,
+			    *conn->u.readpart.lenp);
+		if (ret <= 0)
+			return io_close(conn, NULL);
+		*conn->u.readpart.lenp = ret;
+		finished = true;
+		break;
+	default:
+		/* Shouldn't happen. */
+		abort();
+	}
+
+	if (finished)
+		return do_next(conn);
+	return to_ioop(conn->state);
+}
+
+/* Useful next functions. */
+/* Close the connection, we're done. */
+struct io_op *io_close(struct io_conn *conn, void *arg)
+{
+	return to_ioop(FINISHED);
+}
+
+/* Exit the loop, returning this (non-NULL) arg. */
+struct io_op *io_break(void *arg, struct io_next *next)
+{
+	io_loop_return = arg;
+
+	return to_ioop(NEXT);
+}

+ 229 - 0
ccan/io/io.h

@@ -0,0 +1,229 @@
+/* Licensed under BSD-MIT - see LICENSE file for details */
+#ifndef CCAN_IO_H
+#define CCAN_IO_H
+#include <ccan/typesafe_cb/typesafe_cb.h>
+#include <stdbool.h>
+#include <unistd.h>
+
+/**
+ * struct io_op - pointer to return from io functions.
+ *
+ * This undefined structure is just to help the compiler check that you
+ * really do return the result of an io-queueing method.
+ */
+struct io_op;
+
+/**
+ * struct io_next - pointer to what we're going to do next.
+ *
+ * Bundles up callbacks, generated by io_next().
+ */
+struct io_next;
+
+/**
+ * io_new_conn - create a new connection.
+ * @fd: the file descriptor.
+ * @start: the first function to call.
+ * @finish: the function to call when it's closed or fails.
+ * @arg: the argument to both @start and @finish.
+ *
+ * This creates a connection which owns @fd.  @start will be called on the
+ * next return to io_loop(), and @finish will be called when an I/O operation
+ * fails, or you call io_close() on the connection.
+ *
+ * The @start function must call one of the io queueing functions
+ * (eg. io_read, io_write) and return the next function to call once
+ * that is done using io_next().  The alternative is to call io_close().
+ *
+ * Returns NULL on error (and sets errno).
+ */
+#define io_new_conn(fd, start, finish, arg)				\
+	io_new_conn_((fd),						\
+		     typesafe_cb_preargs(struct io_op *, void *,	\
+					 (start), (arg), struct io_conn *), \
+		     typesafe_cb_preargs(void, void *, (finish), (arg),	\
+					 struct io_conn *),		\
+		     (arg))
+struct io_conn *io_new_conn_(int fd,
+			     struct io_op *(*start)(struct io_conn *, void *),
+			     void (*finish)(struct io_conn *, void *),
+			     void *arg);
+
+/**
+ * io_new_listener - create a new accepting listener.
+ * @fd: the file descriptor.
+ * @start: the first function to call on new connections.
+ * @finish: the function to call when the connection is closed or fails.
+ * @arg: the argument to both @start and @finish.
+ *
+ * When @fd becomes readable, we accept() and turn that fd into a new
+ * connection.
+ *
+ * Returns NULL on error (and sets errno).
+ */
+#define io_new_listener(fd, start, finish, arg)				\
+	io_new_listener_((fd),						\
+			 typesafe_cb_preargs(struct io_op *, void *,	\
+					     (start), (arg),		\
+					     struct io_conn *),		\
+			 typesafe_cb_preargs(void, void *, (finish),	\
+					     (arg), struct io_conn *),	\
+			 (arg))
+struct io_listener *io_new_listener_(int fd,
+				     struct io_op *(*start)(struct io_conn *,
+							    void *arg),
+				     void (*finish)(struct io_conn *,
+						    void *arg),
+				     void *arg);
+
+/**
+ * io_close_listener - delete a listener.
+ * @listener: the listener returned from io_new_listener.
+ *
+ * This closes the fd and frees @listener.
+ */
+void io_close_listener(struct io_listener *listener);
+
+/**
+ * io_write - queue data to be written.
+ * @data: the data buffer.
+ * @len: the length to write.
+ * @next: what to call next.
+ *
+ * This will queue the data buffer for writing.  Once it's all written, the
+ * function registered with io_next() will be called: on an error, the finish
+ * function is called instead.
+ *
+ * Note that the I/O may actually be done immediately.
+ */
+struct io_op *io_write(const void *data, size_t len, struct io_next *next);
+
+/**
+ * io_read - queue buffer to be read.
+ * @data: the data buffer.
+ * @len: the length to read.
+ * @next: what to call next.
+ *
+ * This will queue the data buffer for reading.  Once it's all read, the
+ * function registered with io_next() will be called: on an error, the finish
+ * function is called instead.
+ *
+ * Note that the I/O may actually be done immediately.
+ */
+struct io_op *io_read(void *data, size_t len, struct io_next *next);
+
+/**
+ * io_read_partial - queue buffer to be read (partial OK).
+ * @data: the data buffer.
+ * @len: the maximum length to read, set to the length actually read.
+ * @next: what to call next.
+ *
+ * This will queue the data buffer for reading.  Once any data is
+ * read, @len is updated and the function registered with io_next()
+ * will be called: on an error, the finish function is called instead.
+ *
+ * Note that the I/O may actually be done immediately.
+ */
+struct io_op *io_read_partial(void *data, size_t *len, struct io_next *next);
+
+/**
+ * io_write_partial - queue data to be written (partial OK).
+ * @data: the data buffer.
+ * @len: the maximum length to write, set to the length actually written.
+ * @next: what to call next.
+ *
+ * This will queue the data buffer for writing.  Once any data is
+ * written, @len is updated and the function registered with io_next()
+ * will be called: on an error, the finish function is called instead.
+ *
+ * Note that the I/O may actually be done immediately.
+ */
+struct io_op *io_write_partial(const void *data, size_t *len,
+			       struct io_next *next);
+
+/**
+ * io_idle - explicitly note that this connection will do nothing.
+ * @conn: the current connection.
+ *
+ * This indicates the connection is idle: some other function will
+ * later call io_read/io_write etc. (or io_close) on it, in which case
+ * it will do that.
+ */
+struct io_op *io_idle(struct io_conn *conn);
+
+/**
+ * io_wake - wake up and idle connection.
+ * @conn: an idle connection.
+ * @next: the next function to call once queued IO is complete.
+ * @arg: the argument to @next.
+ *
+ * This makes @conn run its @next function the next time around the
+ * io_loop().
+ */
+#define io_wake(conn, next, arg)					\
+	io_wake_((conn),						\
+		 typesafe_cb_preargs(struct io_op *, void *,		\
+				     (next), (arg), struct io_conn *),	\
+		 (arg))
+void io_wake_(struct io_conn *conn,
+	      struct io_op *(*next)(struct io_conn *, void *), void *arg);
+
+/**
+ * io_break - return from io_loop()
+ * @arg: non-NULL value to return from io_loop().
+ * @next: what to call next (can be NULL if we expect no return).
+ *
+ * This breaks out of the io_loop.  As soon as the current @next
+ * function returns, any io_closed()'d connections will have their
+ * finish callbacks called, then io_loop() with return with @arg.
+ *
+ * If io_loop() is called again, then @next will be called.
+ */
+struct io_op *io_break(void *arg, struct io_next *next);
+
+/**
+ * io_next - indicate what callback to call next.
+ * @conn: this connection.
+ * @next: the next function to call once queued IO is complete.
+ * @arg: the argument to @next.
+ *
+ * Every @next (or @start) function should "return io_next(...);" once
+ * they have indicated what io to perform (eg. io_write, io_idle).
+ * The exception is io_close(), which can be used instead of io_next().
+ *
+ * Note that as an optimization, the next function may be called
+ * immediately, which is why this should be the last statement in your
+ * function.
+ */
+#define io_next(conn, next, arg)					\
+	io_next_((conn),						\
+		 typesafe_cb_preargs(struct io_op *, void *,		\
+				     (next), (arg), struct io_conn *),	\
+		 (arg))
+struct io_next *io_next_(struct io_conn *conn,
+			 struct io_op *(*next)(struct io_conn *, void *arg),
+			 void *arg);
+
+/* FIXME: io_recvfrom/io_sendto */
+
+/**
+ * io_close - terminate a connection.
+ * @conn: any connection.
+ *
+ * The schedules a connection to be closed.  It can be done on any
+ * connection, whether it has I/O queued or not (though that I/O may
+ * be performed first).
+ *
+ * It's common to 'return io_close(...)' from a @next function, but
+ * io_close can also be used as an argument to io_next().
+ */
+struct io_op *io_close(struct io_conn *, void *unused);
+
+/**
+ * io_loop - process fds until all closed on io_break.
+ *
+ * This is the core loop; it exits with the io_break() arg, or NULL if
+ * all connections and listeners are closed.
+ */
+void *io_loop(void);
+#endif /* CCAN_IO_H */

+ 207 - 0
ccan/io/poll.c

@@ -0,0 +1,207 @@
+/* Licensed under BSD-MIT - see LICENSE file for details */
+#include "io.h"
+#include "backend.h"
+#include <assert.h>
+#include <poll.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+
+static size_t num_fds = 0, max_fds = 0, num_next = 0, num_finished = 0;
+static struct pollfd *pollfds = NULL;
+static struct fd **fds = NULL;
+
+static bool add_fd(struct fd *fd, short events)
+{
+	if (num_fds + 1 > max_fds) {
+		struct pollfd *newpollfds;
+		struct fd **newfds;
+		size_t num = max_fds ? max_fds * 2 : 8;
+
+		newpollfds = realloc(pollfds, sizeof(*newpollfds) * num);
+		if (!newpollfds)
+			return false;
+		pollfds = newpollfds;
+		newfds = realloc(fds, sizeof(*newfds) * num);
+		if (!newfds)
+			return false;
+		fds = newfds;
+		max_fds = num;
+	}
+
+	pollfds[num_fds].fd = fd->fd;
+	pollfds[num_fds].events = events;
+	pollfds[num_fds].revents = 0; /* In case we're iterating now */
+	fds[num_fds] = fd;
+	fd->backend_info = num_fds;
+	num_fds++;
+	return true;
+}
+
+static void del_fd(struct fd *fd)
+{
+	size_t n = fd->backend_info;
+
+	assert(n != -1);
+	assert(n < num_fds);
+	if (n != num_fds - 1) {
+		/* Move last one over us. */
+		pollfds[n] = pollfds[num_fds-1];
+		fds[n] = fds[num_fds-1];
+		assert(fds[n]->backend_info == num_fds-1);
+		fds[n]->backend_info = n;
+	} else if (num_fds == 1) {
+		/* Free everything when no more fds. */
+		free(pollfds);
+		free(fds);
+		pollfds = NULL;
+		fds = NULL;
+		max_fds = 0;
+	}
+	num_fds--;
+	fd->backend_info = -1;
+	close(fd->fd);
+}
+
+bool add_listener(struct io_listener *l)
+{
+	return add_fd(&l->fd, POLLIN);
+}
+
+bool add_conn(struct io_conn *c)
+{
+	if (!add_fd(&c->fd, 0))
+		return false;
+	num_next++;
+	return true;
+}
+
+static void del_conn(struct io_conn *conn)
+{
+	if (conn->fd.finish)
+		conn->fd.finish(conn, conn->fd.finish_arg);
+	del_fd(&conn->fd);
+	if (conn->state == FINISHED)
+		num_finished--;
+	else if (conn->state == NEXT)
+		num_next--;
+}
+
+void del_listener(struct io_listener *l)
+{
+	del_fd(&l->fd);
+}
+
+void backend_set_state(struct io_conn *conn, struct io_op *op)
+{
+	enum io_state state = from_ioop(op);
+	struct pollfd *pfd = &pollfds[conn->fd.backend_info];
+
+	switch (state) {
+	case READ:
+	case READPART:
+		pfd->events = POLLIN;
+		break;
+	case WRITE:
+	case WRITEPART:
+		pfd->events = POLLOUT;
+		break;
+	case IDLE:
+		pfd->events = 0;
+		break;
+	case NEXT:
+		num_next++;
+		break;
+	case FINISHED:
+		num_finished++;
+		break;
+	default:
+		abort();
+	}
+	conn->state = state;
+}
+
+static void accept_conn(struct io_listener *l)
+{
+	struct io_conn *c;
+	int fd = accept(l->fd.fd, NULL, NULL);
+
+	/* FIXME: What to do here? */
+	if (fd < 0)
+		return;
+	c = io_new_conn(fd, l->fd.next, l->fd.finish, l->fd.next_arg);
+	if (!c) {
+		close(fd);
+		return;
+	}
+}
+
+/* It's OK to miss some, as long as we make progress. */
+static void finish_and_next(bool finished_only)
+{
+	unsigned int i;
+
+	for (i = 0; !io_loop_return && i < num_fds; i++) {
+		struct io_conn *c;
+
+		if (!num_finished) {
+			if (finished_only || num_next == 0)
+				break;
+		}
+		if (fds[i]->listener)
+			continue;
+		c = (void *)fds[i];
+		if (c->state == FINISHED) {
+			del_conn(c);
+			free(c);
+			i--;
+		} else if (!finished_only && c->state == NEXT) {
+			backend_set_state(c, c->fd.next(c, c->fd.next_arg));
+			num_next--;
+		}
+	}
+}
+
+/* This is the main loop. */
+void *io_loop(void)
+{
+	void *ret;
+
+	while (!io_loop_return) {
+		int i, r;
+
+		if (num_finished || num_next) {
+			finish_and_next(false);
+			/* Could have started/finished more. */
+			continue;
+		}
+
+		if (num_fds == 0)
+			break;
+
+		r = poll(pollfds, num_fds, -1);
+		if (r < 0)
+			break;
+
+		for (i = 0; i < num_fds && !io_loop_return; i++) {
+			struct io_conn *c = (void *)fds[i];
+			if (pollfds[i].revents & POLLOUT)
+				backend_set_state(c, do_writeable(c));
+			else if (pollfds[i].revents & POLLIN) {
+				if (fds[i]->listener)
+					accept_conn((void *)c);
+				else
+					backend_set_state(c, do_readable(c));
+			} else if (pollfds[i].revents & POLLHUP) {
+				backend_set_state(c, io_close(c, NULL));
+			}
+		}
+	}
+
+	while (num_finished)
+		finish_and_next(true);
+
+	ret = io_loop_return;
+	io_loop_return = NULL;
+	return ret;
+}

+ 91 - 0
ccan/io/test/run-01-start-finish.c

@@ -0,0 +1,91 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+static struct io_op *start_ok(struct io_conn *conn, int *state)
+{
+	ok1(*state == 0);
+	(*state)++;
+	return io_close(conn, NULL);
+}
+
+static void finish_ok(struct io_conn *conn, int *state)
+{
+	ok1(*state == 1);
+	(*state)++;
+	io_break(state + 1, NULL);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+	int fd, on = 1;
+	struct addrinfo *addrinfo, hints;
+
+	memset(&hints, 0, sizeof(hints));
+	hints.ai_family = AF_UNSPEC;
+	hints.ai_socktype = SOCK_STREAM;
+	hints.ai_flags = AI_PASSIVE;
+	hints.ai_protocol = 0;
+
+	if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+		return -1;
+
+	fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+		    addrinfo->ai_protocol);
+	if (fd < 0)
+		return -1;
+
+	setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+	if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+		close(fd);
+		return -1;
+	}
+	if (listen(fd, 1) != 0) {
+		close(fd);
+		return -1;
+	}
+	*info = addrinfo;
+	return fd;
+}
+
+int main(void)
+{
+	int state = 0;
+	struct addrinfo *addrinfo;
+	struct io_listener *l;
+	int fd;
+
+	/* This is how many tests you plan to run */
+	plan_tests(9);
+	fd = make_listen_fd("65001", &addrinfo);
+	ok1(fd >= 0);
+	l = io_new_listener(fd, start_ok, finish_ok, &state);
+	ok1(l);
+	fflush(stdout);
+	if (!fork()) {
+		io_close_listener(l);
+		fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+			    addrinfo->ai_protocol);
+		if (fd < 0)
+			exit(1);
+		if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+			exit(2);
+		close(fd);
+		freeaddrinfo(addrinfo);
+		exit(0);
+	}
+	freeaddrinfo(addrinfo);
+	ok1(io_loop() == &state + 1);
+	ok1(state == 2);
+	io_close_listener(l);
+	ok1(wait(&state));
+	ok1(WIFEXITED(state));
+	ok1(WEXITSTATUS(state) == 0);
+
+	/* This exits depending on whether all tests passed */
+	return exit_status();
+}

+ 108 - 0
ccan/io/test/run-02-read.c

@@ -0,0 +1,108 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+struct data {
+	int state;
+	char buf[4];
+};
+
+static struct io_op *start_ok(struct io_conn *conn, struct data *d)
+{
+	ok1(d->state == 0);
+	d->state++;
+	return io_read(d->buf, sizeof(d->buf), io_next(conn, io_close, d));
+}
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+	ok1(d->state == 1);
+	d->state++;
+	io_break(d, NULL);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+	int fd, on = 1;
+	struct addrinfo *addrinfo, hints;
+
+	memset(&hints, 0, sizeof(hints));
+	hints.ai_family = AF_UNSPEC;
+	hints.ai_socktype = SOCK_STREAM;
+	hints.ai_flags = AI_PASSIVE;
+	hints.ai_protocol = 0;
+
+	if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+		return -1;
+
+	fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+		    addrinfo->ai_protocol);
+	if (fd < 0)
+		return -1;
+
+	setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+	if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+		close(fd);
+		return -1;
+	}
+	if (listen(fd, 1) != 0) {
+		close(fd);
+		return -1;
+	}
+	*info = addrinfo;
+	return fd;
+}
+
+int main(void)
+{
+	struct data *d = malloc(sizeof(*d));
+	struct addrinfo *addrinfo;
+	struct io_listener *l;
+	int fd, status;
+
+	/* This is how many tests you plan to run */
+	plan_tests(10);
+	d->state = 0;
+	fd = make_listen_fd("65002", &addrinfo);
+	ok1(fd >= 0);
+	l = io_new_listener(fd, start_ok, finish_ok, d);
+	ok1(l);
+	fflush(stdout);
+	if (!fork()) {
+		int i;
+
+		io_close_listener(l);
+		fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+			    addrinfo->ai_protocol);
+		if (fd < 0)
+			exit(1);
+		if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+			exit(2);
+		signal(SIGPIPE, SIG_IGN);
+		for (i = 0; i < strlen("hellothere"); i++) {
+			if (write(fd, "hellothere" + i, 1) != 1)
+				break;
+		}
+		close(fd);
+		freeaddrinfo(addrinfo);
+		free(d);
+		exit(0);
+	}
+	freeaddrinfo(addrinfo);
+	ok1(io_loop() == d);
+	ok1(d->state == 2);
+	ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0);
+	free(d);
+	io_close_listener(l);
+
+	ok1(wait(&status));
+	ok1(WIFEXITED(status));
+	ok1(WEXITSTATUS(status) == 0);
+
+	/* This exits depending on whether all tests passed */
+	return exit_status();
+}

+ 137 - 0
ccan/io/test/run-03-readpartial.c

@@ -0,0 +1,137 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+struct data {
+	int state;
+	size_t bytes;
+	char buf[4];
+};
+
+static struct io_op *start_ok(struct io_conn *conn, struct data *d)
+{
+	ok1(d->state == 0);
+	d->state++;
+	d->bytes = sizeof(d->buf);
+	return io_read_partial(d->buf, &d->bytes, io_next(conn, io_close, d));
+}
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+	ok1(d->state == 1);
+	d->state++;
+	io_break(d, NULL);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+	int fd, on = 1;
+	struct addrinfo *addrinfo, hints;
+
+	memset(&hints, 0, sizeof(hints));
+	hints.ai_family = AF_UNSPEC;
+	hints.ai_socktype = SOCK_STREAM;
+	hints.ai_flags = AI_PASSIVE;
+	hints.ai_protocol = 0;
+
+	if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+		return -1;
+
+	fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+		    addrinfo->ai_protocol);
+	if (fd < 0)
+		return -1;
+
+	setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+	if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+		close(fd);
+		return -1;
+	}
+	if (listen(fd, 1) != 0) {
+		close(fd);
+		return -1;
+	}
+	*info = addrinfo;
+	return fd;
+}
+
+static void write_to_socket(const char *str, const struct addrinfo *addrinfo)
+{
+	int fd, i;
+
+	fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+		    addrinfo->ai_protocol);
+	if (fd < 0)
+		exit(1);
+	if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+		exit(2);
+	signal(SIGPIPE, SIG_IGN);
+	for (i = 0; i < strlen(str); i++) {
+		if (write(fd, str + i, 1) != 1)
+			break;
+	}
+	close(fd);
+}
+
+int main(void)
+{
+	struct data *d = malloc(sizeof(*d));
+	struct addrinfo *addrinfo;
+	struct io_listener *l;
+	int fd, status;
+
+	/* This is how many tests you plan to run */
+	plan_tests(22);
+	d->state = 0;
+	fd = make_listen_fd("65003", &addrinfo);
+	ok1(fd >= 0);
+	l = io_new_listener(fd, start_ok, finish_ok, d);
+	ok1(l);
+	fflush(stdout);
+	if (!fork()) {
+		io_close_listener(l);
+		write_to_socket("hellothere", addrinfo);
+		freeaddrinfo(addrinfo);
+		free(d);
+		exit(0);
+	}
+	ok1(io_loop() == d);
+	ok1(d->state == 2);
+	ok1(d->bytes > 0);
+	ok1(d->bytes <= sizeof(d->buf));
+	ok1(memcmp(d->buf, "hellothere", d->bytes) == 0);
+
+	ok1(wait(&status));
+	ok1(WIFEXITED(status));
+	ok1(WEXITSTATUS(status) == 0);
+
+	fflush(stdout);
+	if (!fork()) {
+		io_close_listener(l);
+		write_to_socket("hi", addrinfo);
+		freeaddrinfo(addrinfo);
+		free(d);
+		exit(0);
+	}
+	d->state = 0;
+	ok1(io_loop() == d);
+	ok1(d->state == 2);
+	ok1(d->bytes > 0);
+	ok1(d->bytes <= strlen("hi"));
+	ok1(memcmp(d->buf, "hi", d->bytes) == 0);
+
+	freeaddrinfo(addrinfo);
+	free(d);
+	io_close_listener(l);
+
+	ok1(wait(&status));
+	ok1(WIFEXITED(status));
+	ok1(WEXITSTATUS(status) == 0);
+
+	/* This exits depending on whether all tests passed */
+	return exit_status();
+}

+ 121 - 0
ccan/io/test/run-04-writepartial.c

@@ -0,0 +1,121 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+struct data {
+	int state;
+	size_t bytes;
+	char *buf;
+};
+
+static struct io_op *start_ok(struct io_conn *conn, struct data *d)
+{
+	ok1(d->state == 0);
+	d->state++;
+	return io_write_partial(d->buf, &d->bytes, io_next(conn, io_close, d));
+}
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+	ok1(d->state == 1);
+	d->state++;
+	io_break(d, NULL);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+	int fd, on = 1;
+	struct addrinfo *addrinfo, hints;
+
+	memset(&hints, 0, sizeof(hints));
+	hints.ai_family = AF_UNSPEC;
+	hints.ai_socktype = SOCK_STREAM;
+	hints.ai_flags = AI_PASSIVE;
+	hints.ai_protocol = 0;
+
+	if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+		return -1;
+
+	fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+		    addrinfo->ai_protocol);
+	if (fd < 0)
+		return -1;
+
+	setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+	if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+		close(fd);
+		return -1;
+	}
+	if (listen(fd, 1) != 0) {
+		close(fd);
+		return -1;
+	}
+	*info = addrinfo;
+	return fd;
+}
+
+static void read_from_socket(const char *str, const struct addrinfo *addrinfo)
+{
+	int fd;
+	char buf[100];
+
+	fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+		    addrinfo->ai_protocol);
+	if (fd < 0)
+		exit(1);
+	if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+		exit(2);
+	if (read(fd, buf, strlen(str)) != strlen(str))
+		exit(3);
+	if (memcmp(buf, str, strlen(str)) != 0)
+		exit(4);
+	close(fd);
+}
+
+int main(void)
+{
+	struct data *d = malloc(sizeof(*d));
+	struct addrinfo *addrinfo;
+	struct io_listener *l;
+	int fd, status;
+
+	/* This is how many tests you plan to run */
+	plan_tests(11);
+	d->state = 0;
+	d->bytes = 1024*1024;
+	d->buf = malloc(d->bytes);
+	memset(d->buf, 'a', d->bytes);
+	fd = make_listen_fd("65004", &addrinfo);
+	ok1(fd >= 0);
+	l = io_new_listener(fd, start_ok, finish_ok, d);
+	ok1(l);
+	fflush(stdout);
+	if (!fork()) {
+		io_close_listener(l);
+		read_from_socket("aaaaaa", addrinfo);
+		freeaddrinfo(addrinfo);
+		free(d->buf);
+		free(d);
+		exit(0);
+	}
+	ok1(io_loop() == d);
+	ok1(d->state == 2);
+	ok1(d->bytes > 0);
+	ok1(d->bytes <= 1024*1024);
+
+	ok1(wait(&status));
+	ok1(WIFEXITED(status));
+	ok1(WEXITSTATUS(status) == 0);
+
+	freeaddrinfo(addrinfo);
+	free(d->buf);
+	free(d);
+	io_close_listener(l);
+
+	/* This exits depending on whether all tests passed */
+	return exit_status();
+}

+ 122 - 0
ccan/io/test/run-05-write.c

@@ -0,0 +1,122 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+struct data {
+	int state;
+	size_t bytes;
+	char *buf;
+};
+
+static struct io_op *start_ok(struct io_conn *conn, struct data *d)
+{
+	ok1(d->state == 0);
+	d->state++;
+	return io_write(d->buf, d->bytes, io_next(conn, io_close, d));
+}
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+	ok1(d->state == 1);
+	d->state++;
+	io_break(d, NULL);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+	int fd, on = 1;
+	struct addrinfo *addrinfo, hints;
+
+	memset(&hints, 0, sizeof(hints));
+	hints.ai_family = AF_UNSPEC;
+	hints.ai_socktype = SOCK_STREAM;
+	hints.ai_flags = AI_PASSIVE;
+	hints.ai_protocol = 0;
+
+	if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+		return -1;
+
+	fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+		    addrinfo->ai_protocol);
+	if (fd < 0)
+		return -1;
+
+	setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+	if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+		close(fd);
+		return -1;
+	}
+	if (listen(fd, 1) != 0) {
+		close(fd);
+		return -1;
+	}
+	*info = addrinfo;
+	return fd;
+}
+
+static void read_from_socket(size_t bytes, const struct addrinfo *addrinfo)
+{
+	int fd, done, r;
+	char buf[100];
+
+	fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+		    addrinfo->ai_protocol);
+	if (fd < 0)
+		exit(1);
+	if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+		exit(2);
+
+	for (done = 0; done < bytes; done += r) {
+		r = read(fd, buf, sizeof(buf));
+		if (r < 0)
+			exit(3);
+		done += r;
+	}
+	close(fd);
+}
+
+int main(void)
+{
+	struct data *d = malloc(sizeof(*d));
+	struct addrinfo *addrinfo;
+	struct io_listener *l;
+	int fd, status;
+
+	/* This is how many tests you plan to run */
+	plan_tests(9);
+	d->state = 0;
+	d->bytes = 1024*1024;
+	d->buf = malloc(d->bytes);
+	memset(d->buf, 'a', d->bytes);
+	fd = make_listen_fd("65005", &addrinfo);
+	ok1(fd >= 0);
+	l = io_new_listener(fd, start_ok, finish_ok, d);
+	ok1(l);
+	fflush(stdout);
+	if (!fork()) {
+		io_close_listener(l);
+		read_from_socket(d->bytes, addrinfo);
+		freeaddrinfo(addrinfo);
+		free(d->buf);
+		free(d);
+		exit(0);
+	}
+	ok1(io_loop() == d);
+	ok1(d->state == 2);
+
+	ok1(wait(&status));
+	ok1(WIFEXITED(status));
+	ok1(WEXITSTATUS(status) == 0);
+
+	freeaddrinfo(addrinfo);
+	free(d->buf);
+	free(d);
+	io_close_listener(l);
+
+	/* This exits depending on whether all tests passed */
+	return exit_status();
+}

+ 145 - 0
ccan/io/test/run-06-idle.c

@@ -0,0 +1,145 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+static struct io_conn *idler;
+
+struct data {
+	int state;
+	char buf[4];
+};
+
+static struct io_op *do_read(struct io_conn *conn, struct data *d)
+{
+	ok1(d->state == 2 || d->state == 3);
+	d->state++;
+	return io_read(d->buf, sizeof(d->buf), io_next(conn, io_close, d));
+}
+
+static struct io_op *start_waker(struct io_conn *conn, struct data *d)
+{
+	ok1(d->state == 1);
+	d->state++;
+
+	io_wake(idler, do_read, d);
+	return io_close(conn, NULL);
+}
+
+static void finish_waker(struct io_conn *conn, struct data *d)
+{
+	ok1(d->state == 2 || d->state == 3);
+	d->state++;
+}
+
+static struct io_op *start_idle(struct io_conn *conn, struct data *d)
+{
+	int fd;
+
+	ok1(d->state == 0);
+	d->state++;
+	idler = conn;
+
+	/* This will wake us up. */
+	fd = open("/dev/null", O_RDONLY);
+	ok1(fd >= 0);
+	ok1(io_new_conn(fd, start_waker, finish_waker, d));
+
+	return io_idle(conn);
+}
+
+static void finish_idle(struct io_conn *conn, struct data *d)
+{
+	ok1(d->state == 4);
+	d->state++;
+	io_break(d, NULL);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+	int fd, on = 1;
+	struct addrinfo *addrinfo, hints;
+
+	memset(&hints, 0, sizeof(hints));
+	hints.ai_family = AF_UNSPEC;
+	hints.ai_socktype = SOCK_STREAM;
+	hints.ai_flags = AI_PASSIVE;
+	hints.ai_protocol = 0;
+
+	if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+		return -1;
+
+	fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+		    addrinfo->ai_protocol);
+	if (fd < 0)
+		return -1;
+
+	setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+	if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+		close(fd);
+		return -1;
+	}
+	if (listen(fd, 1) != 0) {
+		close(fd);
+		return -1;
+	}
+	*info = addrinfo;
+	return fd;
+}
+
+int main(void)
+{
+	struct data *d = malloc(sizeof(*d));
+	struct addrinfo *addrinfo;
+	struct io_listener *l;
+	int fd, status;
+
+	/* This is how many tests you plan to run */
+	plan_tests(15);
+	d->state = 0;
+	fd = make_listen_fd("65006", &addrinfo);
+	ok1(fd >= 0);
+	l = io_new_listener(fd, start_idle, finish_idle, d);
+	ok1(l);
+	fflush(stdout);
+	if (!fork()) {
+		int i;
+
+		io_close_listener(l);
+		fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+			    addrinfo->ai_protocol);
+		if (fd < 0)
+			exit(1);
+		if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+			exit(2);
+		signal(SIGPIPE, SIG_IGN);
+		for (i = 0; i < strlen("hellothere"); i++) {
+			if (write(fd, "hellothere" + i, 1) != 1)
+				break;
+		}
+		close(fd);
+		freeaddrinfo(addrinfo);
+		free(d);
+		exit(0);
+	}
+	freeaddrinfo(addrinfo);
+
+	ok1(io_loop() == d);
+	ok1(d->state == 5);
+	ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0);
+	free(d);
+	io_close_listener(l);
+
+	ok1(wait(&status));
+	ok1(WIFEXITED(status));
+	ok1(WEXITSTATUS(status) == 0);
+
+	/* This exits depending on whether all tests passed */
+	return exit_status();
+}

+ 117 - 0
ccan/io/test/run-07-break.c

@@ -0,0 +1,117 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+struct data {
+	int state;
+	char buf[4];
+};
+
+static struct io_op *do_read(struct io_conn *conn, struct data *d)
+{
+	ok1(d->state == 1);
+	d->state++;
+	return io_read(d->buf, sizeof(d->buf), io_next(conn, io_close, d));
+}
+
+static struct io_op *start_break(struct io_conn *conn, struct data *d)
+{
+	ok1(d->state == 0);
+	d->state++;
+	return io_break(d, io_next(conn, do_read, d));
+}
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+	ok1(d->state == 2);
+	d->state++;
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+	int fd, on = 1;
+	struct addrinfo *addrinfo, hints;
+
+	memset(&hints, 0, sizeof(hints));
+	hints.ai_family = AF_UNSPEC;
+	hints.ai_socktype = SOCK_STREAM;
+	hints.ai_flags = AI_PASSIVE;
+	hints.ai_protocol = 0;
+
+	if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+		return -1;
+
+	fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+		    addrinfo->ai_protocol);
+	if (fd < 0)
+		return -1;
+
+	setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+	if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+		close(fd);
+		return -1;
+	}
+	if (listen(fd, 1) != 0) {
+		close(fd);
+		return -1;
+	}
+	*info = addrinfo;
+	return fd;
+}
+
+int main(void)
+{
+	struct data *d = malloc(sizeof(*d));
+	struct addrinfo *addrinfo;
+	struct io_listener *l;
+	int fd, status;
+
+	/* This is how many tests you plan to run */
+	plan_tests(13);
+	d->state = 0;
+	fd = make_listen_fd("65007", &addrinfo);
+	ok1(fd >= 0);
+	l = io_new_listener(fd, start_break, finish_ok, d);
+	ok1(l);
+	fflush(stdout);
+	if (!fork()) {
+		int i;
+
+		io_close_listener(l);
+		fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+			    addrinfo->ai_protocol);
+		if (fd < 0)
+			exit(1);
+		if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+			exit(2);
+		signal(SIGPIPE, SIG_IGN);
+		for (i = 0; i < strlen("hellothere"); i++) {
+			if (write(fd, "hellothere" + i, 1) != 1)
+				break;
+		}
+		close(fd);
+		freeaddrinfo(addrinfo);
+		free(d);
+		exit(0);
+	}
+	freeaddrinfo(addrinfo);
+	ok1(io_loop() == d);
+	ok1(d->state == 1);
+	io_close_listener(l);
+
+	ok1(io_loop() == NULL);
+	ok1(d->state == 3);
+	ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0);
+	free(d);
+
+	ok1(wait(&status));
+	ok1(WIFEXITED(status));
+	ok1(WEXITSTATUS(status) == 0);
+
+	/* This exits depending on whether all tests passed */
+	return exit_status();
+}

+ 122 - 0
ccan/io/test/run-10-many.c

@@ -0,0 +1,122 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#define NUM 100
+#define NUM_ITERS 1000
+
+struct buffer {
+	int iters;
+	struct io_conn *reader, *writer;
+	char buf[32];
+};
+
+static struct io_op *poke_writer(struct io_conn *conn, struct buffer *buf);
+static struct io_op *poke_reader(struct io_conn *conn, struct buffer *buf);
+
+static struct io_op *do_read(struct io_conn *conn, struct buffer *buf)
+{
+	assert(conn == buf->reader);
+
+	return io_read(&buf->buf, sizeof(buf->buf),
+		       io_next(conn, poke_writer, buf));
+}
+
+static struct io_op *do_write(struct io_conn *conn, struct buffer *buf)
+{
+	assert(conn == buf->writer);
+
+	return io_write(&buf->buf, sizeof(buf->buf),
+			io_next(conn, poke_reader, buf));
+}
+
+static struct io_op *poke_writer(struct io_conn *conn, struct buffer *buf)
+{
+	assert(conn == buf->reader);
+
+	if (buf->iters == NUM_ITERS)
+		return io_close(conn, NULL);
+
+	/* You write. */
+	io_wake(buf->writer, do_write, buf);
+
+	/* I'll wait until you wake me. */
+	return io_idle(conn);
+}
+
+static struct io_op *poke_reader(struct io_conn *conn, struct buffer *buf)
+{
+	assert(conn == buf->writer);
+	/* You read. */
+	io_wake(buf->reader, do_read, buf);
+
+	if (++buf->iters == NUM_ITERS)
+		return io_close(conn, NULL);
+
+	/* I'll wait until you tell me to write. */
+	return io_idle(conn);
+}
+
+static struct io_op *reader(struct io_conn *conn, struct buffer *buf)
+{
+	assert(conn == buf->reader);
+
+	/* Wait for writer to tell us to read. */
+	return io_idle(conn);
+}
+
+static struct buffer buf[NUM];
+
+int main(void)
+{
+	unsigned int i;
+	int fds[2], last_read, last_write;
+
+	plan_tests(5 + NUM);
+
+	ok1(pipe(fds) == 0);
+	last_read = fds[0];
+	last_write = fds[1];
+
+	for (i = 1; i < NUM; i++) {
+		if (pipe(fds) < 0)
+			break;
+		memset(buf[i].buf, i, sizeof(buf[i].buf));
+		sprintf(buf[i].buf, "%i-%i", i, i);
+
+		buf[i].reader = io_new_conn(last_read, reader, NULL, &buf[i]);
+		if (!buf[i].reader)
+			break;
+		buf[i].writer = io_new_conn(fds[1], do_write, NULL, &buf[i]);
+		if (!buf[i].writer)
+			break;
+		last_read = fds[0];
+	}
+	if (!ok1(i == NUM))
+		exit(exit_status());
+
+	/* Last one completes the cirle. */
+	i = 0;
+	sprintf(buf[i].buf, "%i-%i", i, i);
+	buf[i].reader = io_new_conn(last_read, reader, NULL, &buf[i]);
+	ok1(buf[i].reader);
+	buf[i].writer = io_new_conn(last_write, do_write, NULL, &buf[i]);
+	ok1(buf[i].writer);
+
+	/* They should eventually exit */
+	ok1(io_loop() == NULL);
+
+	for (i = 0; i < NUM; i++) {
+		char b[sizeof(buf[0].buf)];
+		memset(b, i, sizeof(b));
+		sprintf(b, "%i-%i", i, i);
+		ok1(memcmp(b, buf[(i + NUM_ITERS) % NUM].buf, sizeof(b)) == 0);
+	}
+
+	/* This exits depending on whether all tests passed */
+	return exit_status();
+}