Browse Source

Merge branch 'io'

Rusty Russell 12 years ago
parent
commit
641b511049
45 changed files with 4281 additions and 0 deletions
  1. 1 0
      Makefile-ccan
  2. 1 0
      ccan/io/LICENSE
  3. 88 0
      ccan/io/SCENARIOS
  4. 167 0
      ccan/io/_info
  5. 89 0
      ccan/io/backend.h
  6. 29 0
      ccan/io/benchmarks/Makefile
  7. 176 0
      ccan/io/benchmarks/run-different-speed.c
  8. 181 0
      ccan/io/benchmarks/run-length-prefix.c
  9. 112 0
      ccan/io/benchmarks/run-loop.c
  10. 469 0
      ccan/io/io.c
  11. 493 0
      ccan/io/io.h
  12. 150 0
      ccan/io/io_plan.h
  13. 388 0
      ccan/io/poll.c
  14. 8 0
      ccan/io/test/run-01-start-finish-DEBUG.c
  15. 95 0
      ccan/io/test/run-01-start-finish.c
  16. 8 0
      ccan/io/test/run-02-read-DEBUG.c
  17. 115 0
      ccan/io/test/run-02-read.c
  18. 8 0
      ccan/io/test/run-03-readpartial-DEBUG.c
  19. 144 0
      ccan/io/test/run-03-readpartial.c
  20. 8 0
      ccan/io/test/run-04-writepartial-DEBUG.c
  21. 127 0
      ccan/io/test/run-04-writepartial.c
  22. 8 0
      ccan/io/test/run-05-write-DEBUG.c
  23. 128 0
      ccan/io/test/run-05-write.c
  24. 8 0
      ccan/io/test/run-06-idle-DEBUG.c
  25. 146 0
      ccan/io/test/run-06-idle.c
  26. 8 0
      ccan/io/test/run-07-break-DEBUG.c
  27. 125 0
      ccan/io/test/run-07-break.c
  28. 7 0
      ccan/io/test/run-08-hangup-on-idle-DEBUG.c
  29. 45 0
      ccan/io/test/run-08-hangup-on-idle.c
  30. 7 0
      ccan/io/test/run-08-read-after-hangup-DEBUG.c
  31. 34 0
      ccan/io/test/run-08-read-after-hangup.c
  32. 8 0
      ccan/io/test/run-09-connect-DEBUG.c
  33. 103 0
      ccan/io/test/run-09-connect.c
  34. 12 0
      ccan/io/test/run-10-many-DEBUG.c
  35. 105 0
      ccan/io/test/run-10-many.c
  36. 8 0
      ccan/io/test/run-12-bidir-DEBUG.c
  37. 132 0
      ccan/io/test/run-12-bidir.c
  38. 8 0
      ccan/io/test/run-13-all-idle-DEBUG.c
  39. 31 0
      ccan/io/test/run-13-all-idle.c
  40. 8 0
      ccan/io/test/run-15-timeout-DEBUG.c
  41. 174 0
      ccan/io/test/run-15-timeout.c
  42. 8 0
      ccan/io/test/run-17-homemade-io-DEBUG.c
  43. 183 0
      ccan/io/test/run-17-homemade-io.c
  44. 8 0
      ccan/io/test/run-18-errno-DEBUG.c
  45. 120 0
      ccan/io/test/run-18-errno.c

+ 1 - 0
Makefile-ccan

@@ -55,6 +55,7 @@ MODS_WITH_SRC := antithread \
 	htable \
 	idtree \
 	ilog \
+	io \
 	isaac \
 	iscsi \
 	jmap \

+ 1 - 0
ccan/io/LICENSE

@@ -0,0 +1 @@
+../../licenses/LGPL-2.1

+ 88 - 0
ccan/io/SCENARIOS

@@ -0,0 +1,88 @@
+Simple:
+	step1(conn): read(conn), then step2
+	step2(conn): write(conn), then close
+
+Pass-through:
+	step1(conn): read(conn), then step2
+	step2(conn): write(otherconn), then step1
+
+Pass-through-and-connect:
+	step1(conn): read(conn), then step2
+	step2(conn): connect(otherconn), then step3
+	step3(conn): write(otherconn), then step1
+
+Chatroom:
+	step1(conn): read(conn), then step2
+	step2(conn): for c in allcons: write(c).  goto step1
+
+Simple:
+
+void event(struct io_event *done)
+{
+	char *buf = done->priv;
+	struct io_event *e;
+
+	e = queue_read(done, done->conn, buf, 100);
+	e = queue_write(e, done->conn, buf, 100);
+	queue_close(e, done->conn);
+}
+
+Pass-through:
+struct passthru {
+	char buf[100];
+	struct conn *rconn, *wconn;
+};
+
+void event(struct io_event *done)
+{
+	struct passthru *p = done->priv;
+	struct io_event *e;
+
+	e = queue_read(done, p->rconn, p->buf, 100);
+	e = queue_write(e, p->wconn, buf, 100);
+	queue_event(e, event);
+}
+
+Chatroom:
+struct list_head clients;
+
+struct buffer {
+	char buf[100];
+	unsigned int ref;
+};
+
+struct client {
+	struct list_node list;
+	struct connection *conn;
+	struct buffer *rbuf, *wbuf;
+};
+
+void broadcast(struct io_event *done)
+{
+	struct client *i, *c = done->conn->priv;
+	struct io_event *e;
+
+	list_for_each(&clients, i, list) {
+		e = queue_write(done, i->conn, c->buf->buf, 100);
+		e->priv = c->buf;
+		c->buf->ref++;
+		queue_event(e, drop_ref);
+	}
+
+
+
+void event(struct io_event *done)
+{
+	struct client *c = done->conn->priv;
+	struct io_event *e;
+
+	assert(c->conn == done->conn);
+	c->buf = malloc(sizeof(*c->buf));
+	c->buf->ref = 0;
+	e = queue_read(done, c->conn, c->buf->buf, 100);
+	e = queue_event(e, broadcast);
+}
+
+
+	step1(conn): read(conn), then step2
+	step2(conn): for c in allcons: write(c).  goto step1

+ 167 - 0
ccan/io/_info

@@ -0,0 +1,167 @@
+#include <stdio.h>
+#include <string.h>
+#include "config.h"
+
+/**
+ * io - simple library for asynchronous io handling.
+ *
+ * io provides a mechanism to write I/O servers with multiple
+ * connections.  Each callback indicates what I/O they plan next
+ * (eg. read, write).  It is also possible to write custom I/O
+ * plans.
+ *
+ * When compiled with DEBUG, control flow is changed so that rather
+ * than returning to the main io_loop(), plans are executed sequentially
+ * providing a backtrace showing what has occurred on that connection.
+ * Which connection(s) do this depends on the user-specified io_debug
+ * function.
+ *
+ * Example:
+ * // Given tr A-Z a-z outputs tr a-z a-z
+ * #include <ccan/io/io.h>
+ * #include <ccan/err/err.h>
+ * #include <assert.h>
+ * #include <stdlib.h>
+ * #include <signal.h>
+ * #include <sys/types.h>
+ * #include <sys/wait.h>
+ *
+ * struct buffer {
+ * 	size_t max, off, rlen;
+ * 	char *buf;
+ * };
+ *
+ * struct stdin_buffer {
+ * 	struct io_conn *reader, *writer;
+ * 	size_t len;
+ * 	char inbuf[4096];
+ * };
+ *
+ * // This reads from stdin.
+ * static struct io_plan wake_writer(struct io_conn *, struct stdin_buffer *);
+ * // This writes the stdin buffer to the child.
+ * static struct io_plan wake_reader(struct io_conn *, struct stdin_buffer *);
+ *
+ * static struct io_plan wake_writer(struct io_conn *c, struct stdin_buffer *b)
+ * {
+ * 	assert(c == b->reader);
+ * 	io_wake(b->writer, io_write(b->inbuf, b->len, wake_reader, b));
+ * 	return io_idle();
+ * }
+ *
+ * static void reader_exit(struct io_conn *c, struct stdin_buffer *b)
+ * {
+ * 	assert(c == b->reader);
+ * 	io_wake(b->writer, io_close());
+ * 	b->reader = NULL;
+ * }
+ *
+ * static struct io_plan wake_reader(struct io_conn *c, struct stdin_buffer *b)
+ * {
+ * 	assert(c == b->writer);
+ *	if (!b->reader)
+ *		return io_close();
+ *	b->len = sizeof(b->inbuf);
+ * 	io_wake(b->reader, io_read_partial(b->inbuf, &b->len, wake_writer, b));
+ * 	return io_idle();
+ * }
+ *
+ * static void fail_child_write(struct io_conn *conn, struct stdin_buffer *b)
+ * {
+ * 	if (b->reader)
+ * 		err(1, "Failed writing to child.");
+ * }
+ *
+ * // This reads from the child and saves it into buffer.
+ * static struct io_plan read_from_child(struct io_conn *conn,
+ *					 struct buffer *b)
+ * {
+ * 	b->off += b->rlen;
+ *
+ * 	if (b->off == b->max)
+ * 		b->buf = realloc(b->buf, b->max *= 2);
+ *
+ * 	b->rlen = b->max - b->off;
+ * 	return io_read_partial(b->buf + b->off, &b->rlen, read_from_child, b);
+ * }
+ *
+ * // Feed a program our stdin, gather its stdout, print that at end.
+ * int main(int argc, char *argv[])
+ * {
+ * 	int tochild[2], fromchild[2];
+ * 	struct buffer out;
+ * 	struct stdin_buffer sbuf;
+ * 	int status;
+ * 	size_t off;
+ * 	ssize_t ret;
+ *	struct io_conn *from_child;
+ *
+ * 	if (argc == 1)
+ * 		errx(1, "Usage: runner <cmdline>...");
+ *
+ * 	if (pipe(tochild) != 0 || pipe(fromchild) != 0)
+ * 		err(1, "Creating pipes");
+ *
+ * 	if (!fork()) {
+ * 		// Child runs command.
+ * 		close(tochild[1]);
+ * 		close(fromchild[0]);
+ *
+ * 		dup2(tochild[0], STDIN_FILENO);
+ * 		dup2(fromchild[1], STDOUT_FILENO);
+ * 		execvp(argv[1], argv + 1);
+ * 		exit(127);
+ * 	}
+ *
+ * 	close(tochild[0]);
+ * 	close(fromchild[1]);
+ * 	signal(SIGPIPE, SIG_IGN);
+ *
+ *	sbuf.len = sizeof(sbuf.inbuf);
+ * 	sbuf.reader = io_new_conn(STDIN_FILENO,
+ *				  io_read_partial(sbuf.inbuf, &sbuf.len,
+ *						  wake_writer, &sbuf));
+ * 	sbuf.writer = io_new_conn(tochild[1], io_idle());
+ *
+ *	out.max = 128;
+ *	out.off = 0;
+ *	out.rlen = 128;
+ *	out.buf = malloc(out.max);
+ *	from_child = io_new_conn(fromchild[0],
+ *				 io_read_partial(out.buf, &out.rlen,
+ *						 read_from_child, &out));
+ * 	if (!sbuf.reader || !sbuf.writer || !from_child)
+ * 		err(1, "Allocating connections");
+ *
+ *	io_set_finish(sbuf.reader, reader_exit, &sbuf);
+ *	io_set_finish(sbuf.writer, fail_child_write, &sbuf);
+ *
+ * 	io_loop();
+ * 	wait(&status);
+ *
+ * 	for (off = 0; off < out.off; off += ret) {
+ * 		ret = write(STDOUT_FILENO, out.buf+off, out.off-off);
+ * 		if (ret < 0)
+ * 			err(1, "Writing stdout");
+ * 	}
+ * 	free(out.buf);
+ *
+ * 	return WIFEXITED(status) ? WEXITSTATUS(status) : 2;
+ * }
+ *
+ * License: LGPL (v2.1 or any later version)
+ * Author: Rusty Russell <rusty@rustcorp.com.au>
+ */
+int main(int argc, char *argv[])
+{
+	if (argc != 2)
+		return 1;
+
+	if (strcmp(argv[1], "depends") == 0) {
+		printf("ccan/time\n");
+		printf("ccan/timer\n");
+		return 0;
+	}
+
+	return 1;
+}

+ 89 - 0
ccan/io/backend.h

@@ -0,0 +1,89 @@
+/* Licensed under LGPLv2.1+ - see LICENSE file for details */
+#ifndef CCAN_IO_BACKEND_H
+#define CCAN_IO_BACKEND_H
+#include <stdbool.h>
+#include <ccan/timer/timer.h>
+
+struct fd {
+	int fd;
+	bool listener;
+	size_t backend_info;
+};
+
+/* Listeners create connections. */
+struct io_listener {
+	struct fd fd;
+
+	/* These are for connections we create. */
+	void (*init)(int fd, void *arg);
+	void *arg;
+};
+
+struct io_timeout {
+	struct timer timer;
+	struct io_conn *conn;
+
+	struct io_plan (*next)(struct io_conn *, void *arg);
+	void *next_arg;
+};
+
+/* One connection per client. */
+struct io_conn {
+	struct fd fd;
+
+	void (*finish)(struct io_conn *, void *arg);
+	void *finish_arg;
+
+	struct io_conn *duplex;
+	struct io_timeout *timeout;
+
+	struct io_plan plan;
+};
+
+static inline bool timeout_active(const struct io_conn *conn)
+{
+	return conn->timeout && conn->timeout->conn;
+}
+
+extern void *io_loop_return;
+
+#ifdef DEBUG
+extern struct io_conn *current;
+static inline void set_current(struct io_conn *conn)
+{
+	current = conn;
+}
+static inline bool doing_debug_on(struct io_conn *conn)
+{
+	return io_debug_conn && io_debug_conn(conn);
+}
+static inline bool doing_debug(void)
+{
+	return io_debug_conn;
+}
+#else
+static inline void set_current(struct io_conn *conn)
+{
+}
+static inline bool doing_debug_on(struct io_conn *conn)
+{
+	return false;
+}
+static inline bool doing_debug(void)
+{
+	return false;
+}
+#endif
+
+bool add_listener(struct io_listener *l);
+bool add_conn(struct io_conn *c);
+bool add_duplex(struct io_conn *c);
+void del_listener(struct io_listener *l);
+void backend_plan_changed(struct io_conn *conn);
+void backend_add_timeout(struct io_conn *conn, struct timespec ts);
+void backend_del_timeout(struct io_conn *conn);
+void backend_del_conn(struct io_conn *conn);
+
+void io_ready(struct io_conn *conn);
+void *do_io_loop(struct io_conn **ready);
+#endif /* CCAN_IO_BACKEND_H */

+ 29 - 0
ccan/io/benchmarks/Makefile

@@ -0,0 +1,29 @@
+ALL:=run-loop run-different-speed run-length-prefix
+CCANDIR:=../../..
+CFLAGS:=-Wall -I$(CCANDIR) -O3 -flto
+LDFLAGS:=-O3 -flto
+LDLIBS:=-lrt
+
+OBJS:=time.o poll.o io.o err.o timer.o list.o
+
+default: $(ALL)
+
+run-loop: run-loop.o $(OBJS)
+run-different-speed: run-different-speed.o $(OBJS)
+run-length-prefix: run-length-prefix.o $(OBJS)
+
+time.o: $(CCANDIR)/ccan/time/time.c
+	$(CC) $(CFLAGS) -c -o $@ $<
+timer.o: $(CCANDIR)/ccan/timer/timer.c
+	$(CC) $(CFLAGS) -c -o $@ $<
+list.o: $(CCANDIR)/ccan/list/list.c
+	$(CC) $(CFLAGS) -c -o $@ $<
+poll.o: $(CCANDIR)/ccan/io/poll.c
+	$(CC) $(CFLAGS) -c -o $@ $<
+io.o: $(CCANDIR)/ccan/io/io.c
+	$(CC) $(CFLAGS) -c -o $@ $<
+err.o: $(CCANDIR)/ccan/err/err.c
+	$(CC) $(CFLAGS) -c -o $@ $<
+
+clean:
+	$(RM) -f *.o $(ALL)

+ 176 - 0
ccan/io/benchmarks/run-different-speed.c

@@ -0,0 +1,176 @@
+/* Simulate a server with connections of different speeds.  We count
+ * how many connections complete in 10 seconds. */
+#include <ccan/io/io.h>
+#include <ccan/time/time.h>
+#include <ccan/err/err.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <stdio.h>
+#include <signal.h>
+
+#define REQUEST_SIZE 1024
+#define REPLY_SIZE 10240
+#define NUM_CONNS 500 /* per child */
+#define NUM_CHILDREN 2
+
+static unsigned int completed;
+
+struct client {
+	char request_buffer[REQUEST_SIZE];
+	char reply_buffer[REPLY_SIZE];
+};
+
+static struct io_plan write_reply(struct io_conn *conn, struct client *client);
+static struct io_plan read_request(struct io_conn *conn, struct client *client)
+{
+	return io_read(client->request_buffer, REQUEST_SIZE,
+		       write_reply, client);
+}
+
+/* once we're done, loop again. */
+static struct io_plan write_complete(struct io_conn *conn, struct client *client)
+{
+	completed++;
+	return read_request(conn, client);
+}
+
+static struct io_plan write_reply(struct io_conn *conn, struct client *client)
+{
+	return io_write(client->reply_buffer, REPLY_SIZE,
+			write_complete, client);
+}
+
+/* This runs in the child. */
+static void create_clients(struct sockaddr_un *addr, int waitfd)
+{
+	struct client data;
+	int i, sock[NUM_CONNS], speed[NUM_CONNS], done[NUM_CONNS], count = 0;
+
+	for (i = 0; i < NUM_CONNS; i++) {
+		/* Set speed. */
+		speed[i] = (1 << (random() % 10));
+		sock[i] = socket(AF_UNIX, SOCK_STREAM, 0);
+		if (sock[i] < 0)
+			err(1, "creating socket");
+		if (connect(sock[i], (void *)addr, sizeof(*addr)) != 0)
+			err(1, "connecting socket");
+		/* Make nonblocking. */
+		fcntl(sock[i], F_SETFD, fcntl(sock[i], F_GETFD)|O_NONBLOCK);
+		done[i] = 0;
+	}
+
+	read(waitfd, &i, 1);
+
+	for (;;) {
+		for (i = 0; i < NUM_CONNS; i++) {
+			int ret, bytes = speed[i];
+			if (done[i] < REQUEST_SIZE) {
+				if (REQUEST_SIZE - done[i] < bytes)
+					bytes = REQUEST_SIZE - done[i];
+				ret = write(sock[i], data.request_buffer,
+					    bytes);
+				if (ret > 0)
+					done[i] += ret;
+				else if (ret < 0 && errno != EAGAIN)
+					goto fail;
+			} else {
+				if (REQUEST_SIZE + REPLY_SIZE - done[i] < bytes)
+					bytes = REQUEST_SIZE + REPLY_SIZE
+						- done[i];
+				ret = read(sock[i], data.reply_buffer,
+					    bytes);
+				if (ret > 0) {
+					done[i] += ret;
+					if (done[i] == REQUEST_SIZE + REPLY_SIZE) {
+						count++;
+						done[i] = 0;
+					}
+				} else if (ret < 0 && errno != EAGAIN)
+					goto fail;
+			}
+		}
+	}
+fail:
+	printf("Child did %u\n", count);
+	exit(0);
+}
+
+static int timeout[2];
+static void sigalarm(int sig)
+{
+	write(timeout[1], "1", 1);
+}
+
+static struct io_plan do_timeout(struct io_conn *conn, char *buf)
+{
+	return io_break(buf, io_idle());
+}
+
+int main(int argc, char *argv[])
+{
+	struct client client;
+	unsigned int i, j;
+	struct sockaddr_un addr;
+	struct timespec start, end;
+	int fd, wake[2];
+	char buf;
+
+	addr.sun_family = AF_UNIX;
+	sprintf(addr.sun_path, "/tmp/run-different-speed.sock.%u", getpid());
+
+	if (pipe(wake) != 0 || pipe(timeout) != 0)
+		err(1, "Creating pipes");
+
+	fd = socket(AF_UNIX, SOCK_STREAM, 0);
+	if (fd < 0)
+		err(1, "Creating socket");
+
+	if (bind(fd, (void *)&addr, sizeof(addr)) != 0)
+		err(1, "Binding to %s", addr.sun_path);
+
+	if (listen(fd, NUM_CONNS) != 0)
+		err(1, "Listening on %s", addr.sun_path);
+
+	for (i = 0; i < NUM_CHILDREN; i++) {
+		switch (fork()) {
+		case -1:
+			err(1, "forking");
+		case 0:
+			close(wake[1]);
+			create_clients(&addr, wake[0]);
+			break;
+		}
+		for (j = 0; j < NUM_CONNS; j++) {
+			int ret = accept(fd, NULL, 0);
+			if (ret < 0)
+				err(1, "Accepting fd");
+			/* For efficiency, we share client structure */
+			io_new_conn(ret,
+				    io_read(client.request_buffer, REQUEST_SIZE,
+					    write_reply, &client));
+		}
+	}
+
+	io_new_conn(timeout[0], io_read(&buf, 1, do_timeout, &buf));
+
+	close(wake[0]);
+	for (i = 0; i < NUM_CHILDREN; i++)
+		write(wake[1], "1", 1);
+
+	signal(SIGALRM, sigalarm);
+	alarm(10);
+	start = time_now();
+	io_loop();
+	end = time_now();
+	close(fd);
+
+	printf("%u connections complete (%u ns per conn)\n",
+	       completed,
+	       (int)time_to_nsec(time_divide(time_sub(end, start), completed)));
+	return 0;
+}

+ 181 - 0
ccan/io/benchmarks/run-length-prefix.c

@@ -0,0 +1,181 @@
+/* Simulate a server with connections of different speeds.  We count
+ * how many connections complete in 10 seconds. */
+#include <ccan/io/io.h>
+#include <ccan/time/time.h>
+#include <ccan/err/err.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <stdio.h>
+#include <signal.h>
+#include <assert.h>
+
+#define REQUEST_MAX 131072
+#define NUM_CONNS 500 /* per child */
+#define NUM_CHILDREN 2
+
+static unsigned int completed;
+
+struct client {
+	unsigned int len;
+	char *request_buffer;
+};
+
+static struct io_plan write_reply(struct io_conn *conn, struct client *client);
+static struct io_plan read_body(struct io_conn *conn, struct client *client)
+{
+	assert(client->len <= REQUEST_MAX);
+	return io_read(client->request_buffer, client->len,
+		       write_reply, client);
+}
+
+static struct io_plan io_read_header(struct client *client)
+{
+	return io_read(&client->len, sizeof(client->len), read_body, client);
+}
+
+/* once we're done, loop again. */
+static struct io_plan write_complete(struct io_conn *conn, struct client *client)
+{
+	completed++;
+	return io_read_header(client);
+}
+
+static struct io_plan write_reply(struct io_conn *conn, struct client *client)
+{
+	return io_write(&client->len, sizeof(client->len),
+			write_complete, client);
+}
+
+/* This runs in the child. */
+static void create_clients(struct sockaddr_un *addr, int waitfd)
+{
+	struct client data;
+	int i, sock[NUM_CONNS], len[NUM_CONNS], done[NUM_CONNS],
+		result[NUM_CONNS], count = 0;
+
+	for (i = 0; i < NUM_CONNS; i++) {
+		len[i] = (random() % REQUEST_MAX) + 1;
+		sock[i] = socket(AF_UNIX, SOCK_STREAM, 0);
+		if (sock[i] < 0)
+			err(1, "creating socket");
+		if (connect(sock[i], (void *)addr, sizeof(*addr)) != 0)
+			err(1, "connecting socket");
+		/* Make nonblocking. */
+		fcntl(sock[i], F_SETFD, fcntl(sock[i], F_GETFD)|O_NONBLOCK);
+		done[i] = 0;
+	}
+
+	read(waitfd, &i, 1);
+
+	for (;;) {
+		for (i = 0; i < NUM_CONNS; i++) {
+			int ret, totlen = len[i] + sizeof(len[i]);
+			if (done[i] < sizeof(len[i]) + len[i]) {
+				data.len = len[i];
+				ret = write(sock[i], (void *)&data + done[i],
+					    totlen - done[i]);
+				if (ret > 0)
+					done[i] += ret;
+				else if (ret < 0 && errno != EAGAIN)
+					goto fail;
+			} else {
+				int off = done[i] - totlen;
+				ret = read(sock[i], (void *)&result[i] + off,
+					   sizeof(result[i]) - off);
+				if (ret > 0) {
+					done[i] += ret;
+					if (done[i] == totlen
+					    + sizeof(result[i])) {
+						assert(result[i] == len[i]);
+						count++;
+						done[i] = 0;
+					}
+				} else if (ret < 0 && errno != EAGAIN)
+					goto fail;
+			}
+		}
+	}
+fail:
+	printf("Child did %u\n", count);
+	exit(0);
+}
+
+static int timeout[2];
+static void sigalarm(int sig)
+{
+	write(timeout[1], "1", 1);
+}
+
+static struct io_plan do_timeout(struct io_conn *conn, char *buf)
+{
+	return io_break(buf, io_idle());
+}
+
+int main(int argc, char *argv[])
+{
+	unsigned int i, j;
+	struct sockaddr_un addr;
+	struct timespec start, end;
+	char buffer[REQUEST_MAX];
+	int fd, wake[2];
+	char buf;
+
+	addr.sun_family = AF_UNIX;
+	sprintf(addr.sun_path, "/tmp/run-different-speed.sock.%u", getpid());
+
+	if (pipe(wake) != 0 || pipe(timeout) != 0)
+		err(1, "Creating pipes");
+
+	fd = socket(AF_UNIX, SOCK_STREAM, 0);
+	if (fd < 0)
+		err(1, "Creating socket");
+
+	if (bind(fd, (void *)&addr, sizeof(addr)) != 0)
+		err(1, "Binding to %s", addr.sun_path);
+
+	if (listen(fd, NUM_CONNS) != 0)
+		err(1, "Listening on %s", addr.sun_path);
+
+	for (i = 0; i < NUM_CHILDREN; i++) {
+		switch (fork()) {
+		case -1:
+			err(1, "forking");
+		case 0:
+			close(wake[1]);
+			create_clients(&addr, wake[0]);
+			break;
+		}
+		for (j = 0; j < NUM_CONNS; j++) {
+			struct client *client = malloc(sizeof(*client));
+			int ret = accept(fd, NULL, 0);
+			if (ret < 0)
+				err(1, "Accepting fd");
+			/* For efficiency, we share buffer */
+			client->request_buffer = buffer;
+			io_new_conn(ret, io_read_header(client));
+		}
+	}
+
+	io_new_conn(timeout[0], io_read(&buf, 1, do_timeout, &buf));
+
+	close(wake[0]);
+	for (i = 0; i < NUM_CHILDREN; i++)
+		write(wake[1], "1", 1);
+
+	signal(SIGALRM, sigalarm);
+	alarm(10);
+	start = time_now();
+	io_loop();
+	end = time_now();
+	close(fd);
+
+	printf("%u connections complete (%u ns per conn)\n",
+	       completed,
+	       (int)time_to_nsec(time_divide(time_sub(end, start), completed)));
+	return 0;
+}

+ 112 - 0
ccan/io/benchmarks/run-loop.c

@@ -0,0 +1,112 @@
+#include <ccan/io/io.h>
+#include <ccan/time/time.h>
+#include <sys/wait.h>
+#include <stdio.h>
+#include <string.h>
+#include <assert.h>
+#include <err.h>
+#include <signal.h>
+
+#define NUM 500
+#define NUM_ITERS 10000
+
+struct buffer {
+	int iters;
+	struct io_conn *reader, *writer;
+	char buf[32];
+};
+
+static struct io_plan poke_reader(struct io_conn *conn, struct buffer *buf);
+
+static struct io_plan poke_writer(struct io_conn *conn, struct buffer *buf)
+{
+	assert(conn == buf->reader);
+
+	if (buf->iters == NUM_ITERS)
+		return io_close();
+
+	/* You write. */
+	io_wake(buf->writer,
+		io_write(&buf->buf, sizeof(buf->buf), poke_reader, buf));
+
+	/* I'll wait until you wake me. */
+	return io_idle();
+}
+
+static struct io_plan poke_reader(struct io_conn *conn, struct buffer *buf)
+{
+	assert(conn == buf->writer);
+	/* You read. */
+	io_wake(buf->reader,
+		io_read(&buf->buf, sizeof(buf->buf), poke_writer, buf));
+
+	if (++buf->iters == NUM_ITERS)
+		return io_close();
+
+	/* I'll wait until you tell me to write. */
+	return io_idle();
+}
+
+int main(void)
+{
+	unsigned int i;
+	int fds[2], last_read, last_write;
+	struct timespec start, end;
+	struct buffer buf[NUM];
+
+	if (pipe(fds) != 0)
+		err(1, "pipe");
+	last_read = fds[0];
+	last_write = fds[1];
+
+	for (i = 1; i < NUM; i++) {
+		buf[i].iters = 0;
+		if (pipe(fds) < 0)
+			err(1, "pipe");
+		memset(buf[i].buf, i, sizeof(buf[i].buf));
+		sprintf(buf[i].buf, "%i-%i", i, i);
+
+		buf[i].reader = io_new_conn(last_read, io_idle());
+		if (!buf[i].reader)
+			err(1, "Creating reader %i", i);
+		buf[i].writer = io_new_conn(fds[1],
+					    io_write(&buf[i].buf,
+						     sizeof(buf[i].buf),
+						     poke_reader, &buf[i]));
+		if (!buf[i].writer)
+			err(1, "Creating writer %i", i);
+		last_read = fds[0];
+	}
+
+	/* Last one completes the cirle. */
+	i = 0;
+	buf[i].iters = 0;
+	sprintf(buf[i].buf, "%i-%i", i, i);
+	buf[i].reader = io_new_conn(last_read, io_idle());
+	if (!buf[i].reader)
+		err(1, "Creating reader %i", i);
+	buf[i].writer = io_new_conn(last_write, io_write(&buf[i].buf,
+							 sizeof(buf[i].buf),
+							 poke_reader, &buf[i]));
+	if (!buf[i].writer)
+		err(1, "Creating writer %i", i);
+
+	/* They should eventually exit */
+	start = time_now();
+	if (io_loop() != NULL)
+		errx(1, "io_loop?");
+	end = time_now();
+
+	for (i = 0; i < NUM; i++) {
+		char b[sizeof(buf[0].buf)];
+		memset(b, i, sizeof(b));
+		sprintf(b, "%i-%i", i, i);
+		if (memcmp(b, buf[(i + NUM_ITERS) % NUM].buf, sizeof(b)) != 0)
+			errx(1, "Buffer for %i was '%s' not '%s'",
+			     i, buf[(i + NUM_ITERS) % NUM].buf, b);
+	}
+
+	printf("run-many: %u %u iterations: %llu usec\n",
+	       NUM, NUM_ITERS, (long long)time_to_usec(time_sub(end, start)));
+	return 0;
+}

+ 469 - 0
ccan/io/io.c

@@ -0,0 +1,469 @@
+/* Licensed under LGPLv2.1+ - see LICENSE file for details */
+#include "io.h"
+#include "backend.h"
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <string.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <assert.h>
+#include <poll.h>
+#include <unistd.h>
+#include <fcntl.h>
+
+void *io_loop_return;
+
+#ifdef DEBUG
+/* Set to skip the next plan. */
+bool io_plan_nodebug;
+/* The current connection to apply plan to. */
+struct io_conn *current;
+/* User-defined function to select which connection(s) to debug. */
+bool (*io_debug_conn)(struct io_conn *conn);
+/* Set when we wake up an connection we are debugging. */
+bool io_debug_wakeup;
+
+struct io_plan io_debug(struct io_plan plan)
+{
+	struct io_conn *ready = NULL;
+
+	if (io_plan_nodebug) {
+		io_plan_nodebug = false;
+		return plan;
+	}
+
+	if (!current || !doing_debug_on(current)) {
+		if (!io_debug_wakeup)
+			return plan;
+	}
+
+	io_debug_wakeup = false;
+	current->plan = plan;
+	backend_plan_changed(current);
+
+	/* Call back into the loop immediately. */
+	io_loop_return = do_io_loop(&ready);
+
+	if (ready) {
+		set_current(ready);
+		if (!ready->plan.next) {
+			/* Call finish function immediately. */
+			if (ready->finish) {
+				errno = ready->plan.u.close.saved_errno;
+				ready->finish(ready, ready->finish_arg);
+				ready->finish = NULL;
+			}
+			backend_del_conn(ready);
+		} else {
+			/* Calls back in itself, via io_debug_io(). */
+			if (ready->plan.io(ready->fd.fd, &ready->plan) != 2)
+				abort();
+		}
+		set_current(NULL);
+	}
+
+	/* Return a do-nothing plan, so backend_plan_changed in
+	 * io_ready doesn't do anything (it's already been called). */
+	return io_idle_();
+}
+
+int io_debug_io(int ret)
+{
+	/* Cache it for debugging; current changes. */
+	struct io_conn *conn = current;
+	int saved_errno = errno;
+
+	if (!doing_debug_on(conn))
+		return ret;
+
+	/* These will all go linearly through the io_debug() path above. */
+	switch (ret) {
+	case -1:
+		/* This will call io_debug above. */
+		errno = saved_errno;
+		io_close();
+		break;
+	case 0: /* Keep going with plan. */
+		io_debug(conn->plan);
+		break;
+	case 1: /* Done: get next plan. */
+		if (timeout_active(conn))
+			backend_del_timeout(conn);
+		conn->plan.next(conn, conn->plan.next_arg);
+		break;
+	default:
+		abort();
+	}
+
+	/* Normally-invalid value, used for sanity check. */
+	return 2;
+}
+
+static void debug_io_wake(struct io_conn *conn)
+{
+	/* We want linear if we wake a debugged connection, too. */
+	if (io_debug_conn && io_debug_conn(conn))
+		io_debug_wakeup = true;
+}
+
+/* Counterpart to io_plan_no_debug(), called in macros in io.h */
+static void io_plan_debug_again(void)
+{
+	io_plan_nodebug = false;
+}
+#else
+static void debug_io_wake(struct io_conn *conn)
+{
+}
+static void io_plan_debug_again(void)
+{
+}
+#endif
+
+struct io_listener *io_new_listener_(int fd,
+				     void (*init)(int fd, void *arg),
+				     void *arg)
+{
+	struct io_listener *l = malloc(sizeof(*l));
+
+	if (!l)
+		return NULL;
+
+	l->fd.listener = true;
+	l->fd.fd = fd;
+	l->init = init;
+	l->arg = arg;
+	if (!add_listener(l)) {
+		free(l);
+		return NULL;
+	}
+	return l;
+}
+
+void io_close_listener(struct io_listener *l)
+{
+	close(l->fd.fd);
+	del_listener(l);
+	free(l);
+}
+
+struct io_conn *io_new_conn_(int fd, struct io_plan plan)
+{
+	struct io_conn *conn = malloc(sizeof(*conn));
+
+	io_plan_debug_again();
+
+	if (!conn)
+		return NULL;
+
+	conn->fd.listener = false;
+	conn->fd.fd = fd;
+	conn->plan = plan;
+	conn->finish = NULL;
+	conn->finish_arg = NULL;
+	conn->duplex = NULL;
+	conn->timeout = NULL;
+	if (!add_conn(conn)) {
+		free(conn);
+		return NULL;
+	}
+	return conn;
+}
+
+void io_set_finish_(struct io_conn *conn,
+		    void (*finish)(struct io_conn *, void *),
+		    void *arg)
+{
+	conn->finish = finish;
+	conn->finish_arg = arg;
+}
+
+struct io_conn *io_duplex_(struct io_conn *old, struct io_plan plan)
+{
+	struct io_conn *conn;
+
+	io_plan_debug_again();
+
+	assert(!old->duplex);
+
+	conn = malloc(sizeof(*conn));
+	if (!conn)
+		return NULL;
+
+	conn->fd.listener = false;
+	conn->fd.fd = old->fd.fd;
+	conn->plan = plan;
+	conn->duplex = old;
+	conn->finish = NULL;
+	conn->finish_arg = NULL;
+	conn->timeout = NULL;
+	if (!add_duplex(conn)) {
+		free(conn);
+		return NULL;
+	}
+	old->duplex = conn;
+	return conn;
+}
+
+bool io_timeout_(struct io_conn *conn, struct timespec ts,
+		 struct io_plan (*cb)(struct io_conn *, void *), void *arg)
+{
+	assert(cb);
+
+	if (!conn->timeout) {
+		conn->timeout = malloc(sizeof(*conn->timeout));
+		if (!conn->timeout)
+			return false;
+	} else
+		assert(!timeout_active(conn));
+
+	conn->timeout->next = cb;
+	conn->timeout->next_arg = arg;
+	backend_add_timeout(conn, ts);
+	return true;
+}
+
+/* Returns true if we're finished. */
+static int do_write(int fd, struct io_plan *plan)
+{
+	ssize_t ret = write(fd, plan->u.write.buf, plan->u.write.len);
+	if (ret < 0)
+		return io_debug_io(-1);
+
+	plan->u.write.buf += ret;
+	plan->u.write.len -= ret;
+	return io_debug_io(plan->u.write.len == 0);
+}
+
+/* Queue some data to be written. */
+struct io_plan io_write_(const void *data, size_t len,
+			 struct io_plan (*cb)(struct io_conn *, void *),
+			 void *arg)
+{
+	struct io_plan plan;
+
+	assert(cb);
+	plan.u.write.buf = data;
+	plan.u.write.len = len;
+	plan.io = do_write;
+	plan.next = cb;
+	plan.next_arg = arg;
+	plan.pollflag = POLLOUT;
+
+	return plan;
+}
+
+static int do_read(int fd, struct io_plan *plan)
+{
+	ssize_t ret = read(fd, plan->u.read.buf, plan->u.read.len);
+	if (ret <= 0)
+		return io_debug_io(-1);
+
+	plan->u.read.buf += ret;
+	plan->u.read.len -= ret;
+	return io_debug_io(plan->u.read.len == 0);
+}
+
+/* Queue a request to read into a buffer. */
+struct io_plan io_read_(void *data, size_t len,
+			struct io_plan (*cb)(struct io_conn *, void *),
+			void *arg)
+{
+	struct io_plan plan;
+
+	assert(cb);
+	plan.u.read.buf = data;
+	plan.u.read.len = len;
+	plan.io = do_read;
+	plan.next = cb;
+	plan.next_arg = arg;
+	plan.pollflag = POLLIN;
+
+	return plan;
+}
+
+static int do_read_partial(int fd, struct io_plan *plan)
+{
+	ssize_t ret = read(fd, plan->u.readpart.buf, *plan->u.readpart.lenp);
+	if (ret <= 0)
+		return io_debug_io(-1);
+
+	*plan->u.readpart.lenp = ret;
+	return io_debug_io(1);
+}
+
+/* Queue a partial request to read into a buffer. */
+struct io_plan io_read_partial_(void *data, size_t *len,
+				struct io_plan (*cb)(struct io_conn *, void *),
+				void *arg)
+{
+	struct io_plan plan;
+
+	assert(cb);
+	plan.u.readpart.buf = data;
+	plan.u.readpart.lenp = len;
+	plan.io = do_read_partial;
+	plan.next = cb;
+	plan.next_arg = arg;
+	plan.pollflag = POLLIN;
+
+	return plan;
+}
+
+static int do_write_partial(int fd, struct io_plan *plan)
+{
+	ssize_t ret = write(fd, plan->u.writepart.buf, *plan->u.writepart.lenp);
+	if (ret < 0)
+		return io_debug_io(-1);
+
+	*plan->u.writepart.lenp = ret;
+	return io_debug_io(1);
+}
+
+/* Queue a partial write request. */
+struct io_plan io_write_partial_(const void *data, size_t *len,
+				 struct io_plan (*cb)(struct io_conn*, void *),
+				 void *arg)
+{
+	struct io_plan plan;
+
+	assert(cb);
+	plan.u.writepart.buf = data;
+	plan.u.writepart.lenp = len;
+	plan.io = do_write_partial;
+	plan.next = cb;
+	plan.next_arg = arg;
+	plan.pollflag = POLLOUT;
+
+	return plan;
+}
+
+static int already_connected(int fd, struct io_plan *plan)
+{
+	return io_debug_io(1);
+}
+
+static int do_connect(int fd, struct io_plan *plan)
+{
+	int err, ret;
+	socklen_t len = sizeof(err);
+
+	/* Has async connect finished? */
+	ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len);
+	if (ret < 0)
+		return -1;
+
+	if (err == 0) {
+		/* Restore blocking if it was initially. */
+		fcntl(fd, F_SETFD, plan->u.len_len.len1);
+		return 1;
+	}
+	return 0;
+}
+
+struct io_plan io_connect_(int fd, const struct addrinfo *addr,
+			   struct io_plan (*cb)(struct io_conn*, void *),
+			   void *arg)
+{
+	struct io_plan plan;
+
+	assert(cb);
+
+	plan.next = cb;
+	plan.next_arg = arg;
+
+	/* Save old flags, set nonblock if not already. */
+	plan.u.len_len.len1 = fcntl(fd, F_GETFD);
+	fcntl(fd, F_SETFD, plan.u.len_len.len1 | O_NONBLOCK);
+
+	/* Immediate connect can happen. */
+	if (connect(fd, addr->ai_addr, addr->ai_addrlen) == 0) {
+		/* Dummy will be called immediately. */
+		plan.pollflag = POLLOUT;
+		plan.io = already_connected;
+	} else {
+		if (errno != EINPROGRESS)
+			return io_close_();
+
+		plan.pollflag = POLLIN;
+		plan.io = do_connect;
+	}
+	return plan;
+}
+
+struct io_plan io_idle_(void)
+{
+	struct io_plan plan;
+
+	plan.pollflag = 0;
+	plan.io = NULL;
+	/* Never called (overridden by io_wake), but NULL means closing */
+	plan.next = (void *)io_idle_;
+
+	return plan;
+}
+
+void io_wake_(struct io_conn *conn, struct io_plan plan)
+
+{
+	io_plan_debug_again();
+
+	/* It might be closing, but we haven't called its finish() yet. */
+	if (!conn->plan.next)
+		return;
+	/* It was idle, right? */
+	assert(!conn->plan.io);
+	conn->plan = plan;
+	backend_plan_changed(conn);
+
+	debug_io_wake(conn);
+}
+
+void io_ready(struct io_conn *conn)
+{
+	set_current(conn);
+	switch (conn->plan.io(conn->fd.fd, &conn->plan)) {
+	case -1: /* Failure means a new plan: close up. */
+		conn->plan = io_close();
+		backend_plan_changed(conn);
+		break;
+	case 0: /* Keep going with plan. */
+		break;
+	case 1: /* Done: get next plan. */
+		if (timeout_active(conn))
+			backend_del_timeout(conn);
+		conn->plan = conn->plan.next(conn, conn->plan.next_arg);
+		backend_plan_changed(conn);
+	}
+	set_current(NULL);
+}
+
+/* Close the connection, we're done. */
+struct io_plan io_close_(void)
+{
+	struct io_plan plan;
+
+	plan.pollflag = 0;
+	/* This means we're closing. */
+	plan.next = NULL;
+	plan.u.close.saved_errno = errno;
+
+	return plan;
+}
+
+struct io_plan io_close_cb(struct io_conn *conn, void *arg)
+{
+	return io_close();
+}
+
+/* Exit the loop, returning this (non-NULL) arg. */
+struct io_plan io_break_(void *ret, struct io_plan plan)
+{
+	io_plan_debug_again();
+
+	assert(ret);
+	io_loop_return = ret;
+
+	return plan;
+}

+ 493 - 0
ccan/io/io.h

@@ -0,0 +1,493 @@
+/* Licensed under LGPLv2.1+ - see LICENSE file for details */
+#ifndef CCAN_IO_H
+#define CCAN_IO_H
+#include <ccan/typesafe_cb/typesafe_cb.h>
+#include <ccan/time/time.h>
+#include <stdbool.h>
+#include <unistd.h>
+#include "io_plan.h"
+
+/**
+ * io_new_conn - create a new connection.
+ * @fd: the file descriptor.
+ * @plan: the first I/O to perform.
+ *
+ * This creates a connection which owns @fd.  @plan will be called on the
+ * next io_loop().
+ *
+ * Returns NULL on error (and sets errno).
+ *
+ * Example:
+ *	int fd[2];
+ *	struct io_conn *conn;
+ *
+ *	pipe(fd);
+ *	// Plan is to close the fd immediately.
+ *	conn = io_new_conn(fd[0], io_close());
+ *	if (!conn)
+ *		exit(1);
+ */
+#define io_new_conn(fd, plan)				\
+	(io_plan_no_debug(), io_new_conn_((fd), (plan)))
+struct io_conn *io_new_conn_(int fd, struct io_plan plan);
+
+/**
+ * io_set_finish - set finish function on a connection.
+ * @conn: the connection.
+ * @finish: the function to call when it's closed or fails.
+ * @arg: the argument to @finish.
+ *
+ * @finish will be called when an I/O operation fails, or you call
+ * io_close() on the connection.  errno will be set to the value
+ * after the failed I/O, or at the call to io_close().
+ *
+ * Example:
+ * static void finish(struct io_conn *conn, void *unused)
+ * {
+ *	// errno is not 0 after success, so this is a bit useless.
+ *	printf("Conn %p closed with errno %i\n", conn, errno);
+ * }
+ * ...
+ *	io_set_finish(conn, finish, NULL);
+ */
+#define io_set_finish(conn, finish, arg)				\
+	io_set_finish_((conn),						\
+		       typesafe_cb_preargs(void, void *,		\
+					   (finish), (arg),		\
+					   struct io_conn *),		\
+		       (arg))
+void io_set_finish_(struct io_conn *conn,
+		    void (*finish)(struct io_conn *, void *),
+		    void *arg);
+
+/**
+ * io_new_listener - create a new accepting listener.
+ * @fd: the file descriptor.
+ * @init: the function to call for a new connection
+ * @arg: the argument to @init.
+ *
+ * When @fd becomes readable, we accept() and pass that fd to init().
+ *
+ * Returns NULL on error (and sets errno).
+ *
+ * Example:
+ * #include <sys/types.h>
+ * #include <sys/socket.h>
+ * #include <netdb.h>
+ *
+ * static void start_conn(int fd, char *msg)
+ * {
+ *	printf("%s fd %i\n", msg, fd);
+ *	close(fd);
+ * }
+ *
+ * // Set up a listening socket, return it.
+ * static struct io_listener *do_listen(const char *port)
+ * {
+ *	struct addrinfo *addrinfo, hints;
+ *	int fd, on = 1;
+ *
+ *	memset(&hints, 0, sizeof(hints));
+ *	hints.ai_family = AF_UNSPEC;
+ *	hints.ai_socktype = SOCK_STREAM;
+ *	hints.ai_flags = AI_PASSIVE;
+ *	hints.ai_protocol = 0;
+ *
+ *	if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+ *		return NULL;
+ *
+ *	fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+ *		    addrinfo->ai_protocol);
+ *	if (fd < 0)
+ *		return NULL;
+ *
+ *	freeaddrinfo(addrinfo);
+ *	setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+ *	if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+ *		close(fd);
+ *		return NULL;
+ *	}
+ *	if (listen(fd, 1) != 0) {
+ *		close(fd);
+ *		return NULL;
+ *	}
+ *	return io_new_listener(fd, start_conn, (char *)"Got one!");
+ * }
+ */
+#define io_new_listener(fd, init, arg)					\
+	io_new_listener_((fd),						\
+			 typesafe_cb_preargs(void, void *,		\
+					     (init), (arg),		\
+					     int fd),			\
+			 (arg))
+struct io_listener *io_new_listener_(int fd,
+				     void (*init)(int fd, void *arg),
+				     void *arg);
+
+/**
+ * io_close_listener - delete a listener.
+ * @listener: the listener returned from io_new_listener.
+ *
+ * This closes the fd and frees @listener.
+ *
+ * Example:
+ * ...
+ *	struct io_listener *l = do_listen("8111");
+ *	if (l) {
+ *		io_loop();
+ *		io_close_listener(l);
+ *	}
+ */
+void io_close_listener(struct io_listener *listener);
+
+/**
+ * io_write - plan to write data.
+ * @data: the data buffer.
+ * @len: the length to write.
+ * @cb: function to call once it's done.
+ * @arg: @cb argument
+ *
+ * This creates a plan write out a data buffer.  Once it's all
+ * written, the @cb function will be called: on an error, the finish
+ * function is called instead.
+ *
+ * Note that the I/O may actually be done immediately.
+ *
+ * Example:
+ * static void start_conn_with_write(int fd, const char *msg)
+ * {
+ *	// Write message, then close.
+ *	io_new_conn(fd, io_write(msg, strlen(msg), io_close_cb, NULL));
+ * }
+ */
+#define io_write(data, len, cb, arg)					\
+	io_debug(io_write_((data), (len),				\
+			   typesafe_cb_preargs(struct io_plan, void *,	\
+					       (cb), (arg), struct io_conn *), \
+			   (arg)))
+struct io_plan io_write_(const void *data, size_t len,
+			 struct io_plan (*cb)(struct io_conn *, void *),
+			 void *arg);
+
+/**
+ * io_read - plan to read data.
+ * @data: the data buffer.
+ * @len: the length to read.
+ * @cb: function to call once it's done.
+ * @arg: @cb argument
+ *
+ * This creates a plan to read data into a buffer.  Once it's all
+ * read, the @cb function will be called: on an error, the finish
+ * function is called instead.
+ *
+ * Note that the I/O may actually be done immediately.
+ *
+ * Example:
+ * static void start_conn_with_read(int fd, char msg[12])
+ * {
+ *	// Read message, then close.
+ *	io_new_conn(fd, io_read(msg, 12, io_close_cb, NULL));
+ * }
+ */
+#define io_read(data, len, cb, arg)					\
+	io_debug(io_read_((data), (len),				\
+			  typesafe_cb_preargs(struct io_plan, void *,	\
+					      (cb), (arg), struct io_conn *), \
+			  (arg)))
+struct io_plan io_read_(void *data, size_t len,
+			struct io_plan (*cb)(struct io_conn *, void *),
+			void *arg);
+
+
+/**
+ * io_read_partial - plan to read some data.
+ * @data: the data buffer.
+ * @len: the maximum length to read, set to the length actually read.
+ * @cb: function to call once it's done.
+ * @arg: @cb argument
+ *
+ * This creates a plan to read data into a buffer.  Once any data is
+ * read, @len is updated and the @cb function will be called: on an
+ * error, the finish function is called instead.
+ *
+ * Note that the I/O may actually be done immediately.
+ *
+ * Example:
+ * struct buf {
+ *	size_t len;
+ *	char buf[12];
+ * };
+ *
+ * static struct io_plan dump_and_close(struct io_conn *conn, struct buf *b)
+ * {
+ *	printf("Partial read: '%*s'\n", (int)b->len, b->buf);
+ *	free(b);
+ *	return io_close();
+ * }
+ *
+ * static void start_conn_with_part_read(int fd, void *unused)
+ * {
+ *	struct buf *b = malloc(sizeof(*b));
+ *
+ *	// Read message, then dump and close.
+ *	b->len = sizeof(b->buf);
+ *	io_new_conn(fd, io_read_partial(b->buf, &b->len, dump_and_close, b));
+ * }
+ */
+#define io_read_partial(data, len, cb, arg)				\
+	io_debug(io_read_partial_((data), (len),			\
+				  typesafe_cb_preargs(struct io_plan, void *, \
+						      (cb), (arg),	\
+						      struct io_conn *), \
+				  (arg)))
+struct io_plan io_read_partial_(void *data, size_t *len,
+				struct io_plan (*cb)(struct io_conn *, void *),
+				void *arg);
+
+/**
+ * io_write_partial - plan to write some data.
+ * @data: the data buffer.
+ * @len: the maximum length to write, set to the length actually written.
+ * @cb: function to call once it's done.
+ * @arg: @cb argument
+ *
+ * This creates a plan to write data from a buffer.   Once any data is
+ * written, @len is updated and the @cb function will be called: on an
+ * error, the finish function is called instead.
+ *
+ * Note that the I/O may actually be done immediately.
+ *
+ * Example:
+ * struct buf {
+ *	size_t len;
+ *	char buf[12];
+ * };
+ *
+ * static struct io_plan show_remainder(struct io_conn *conn, struct buf *b)
+ * {
+ *	printf("Only wrote: '%*s'\n", (int)b->len, b->buf);
+ *	free(b);
+ *	return io_close();
+ * }
+ *
+ * static void start_conn_with_part_read(int fd, void *unused)
+ * {
+ *	struct buf *b = malloc(sizeof(*b));
+ *
+ *	// Write message, then dump and close.
+ *	b->len = sizeof(b->buf);
+ *	strcpy(b->buf, "Hello world");
+ *	io_new_conn(fd, io_write_partial(b->buf, &b->len, show_remainder, b));
+ * }
+ */
+#define io_write_partial(data, len, cb, arg)				\
+	io_debug(io_write_partial_((data), (len),			\
+				   typesafe_cb_preargs(struct io_plan, void *, \
+						       (cb), (arg),	\
+						       struct io_conn *), \
+				   (arg)))
+struct io_plan io_write_partial_(const void *data, size_t *len,
+				 struct io_plan (*cb)(struct io_conn *, void*),
+				 void *arg);
+
+/**
+ * io_connect - plan to connect to a listening socket.
+ * @fd: file descriptor.
+ * @addr: where to connect.
+ * @cb: function to call once it's done.
+ * @arg: @cb argument
+ *
+ * This initiates a connection, and creates a plan for
+ * (asynchronously).  completing it.  Once complete, @len is updated
+ * and the @cb function will be called: on an error, the finish
+ * function is called instead.
+ *
+ * Note that the connect may actually be done immediately.
+ *
+ * Example:
+ * #include <sys/types.h>
+ * #include <sys/socket.h>
+ * #include <netdb.h>
+ *
+ * // Write, then close socket.
+ * static struct io_plan start_write(struct io_conn *conn, void *unused)
+ * {
+ *	return io_write("hello", 5, io_close_cb, NULL);
+ * }
+ *
+ * ...
+ *
+ *	int fd;
+ *	struct addrinfo *addrinfo;
+ *
+ *	fd = socket(AF_INET, SOCK_STREAM, 0);
+ *	getaddrinfo("localhost", "8111", NULL, &addrinfo);
+ *	io_new_conn(fd, io_connect(fd, addrinfo, start_write, NULL));
+ */
+struct addrinfo;
+#define io_connect(fd, addr, cb, arg)					\
+	io_debug(io_connect_((fd), (addr),				\
+			     typesafe_cb_preargs(struct io_plan, void *, \
+						 (cb), (arg),		\
+						 struct io_conn *),	\
+			     (arg)))
+struct io_plan io_connect_(int fd, const struct addrinfo *addr,
+			   struct io_plan (*cb)(struct io_conn *, void*),
+			   void *arg);
+
+/**
+ * io_idle - plan to do nothing.
+ *
+ * This indicates the connection is idle: io_wake() will be called later do
+ * give the connection a new plan.
+ *
+ * Example:
+ *	struct io_conn *sleeper;
+ *	sleeper = io_new_conn(open("/dev/null", O_RDONLY), io_idle());
+ *	if (!sleeper)
+ *		exit(1);
+ */
+#define io_idle() io_debug(io_idle_())
+struct io_plan io_idle_(void);
+
+/**
+ * io_timeout - set timeout function if the callback doesn't complete.
+ * @conn: the current connection.
+ * @ts: how long until the timeout should be called.
+ * @cb: callback to call.
+ * @arg: argument to @cb.
+ *
+ * If the usual next callback is not called for this connection before @ts,
+ * this function will be called.  If next callback is called, the timeout
+ * is automatically removed.
+ *
+ * Returns false on allocation failure.  A connection can only have one
+ * timeout.
+ *
+ * Example:
+ *	static struct io_plan close_on_timeout(struct io_conn *conn, char *msg)
+ *	{
+ *		printf("%s\n", msg);
+ *		return io_close();
+ *	}
+ *
+ *	...
+ *	io_timeout(sleeper, time_from_msec(100),
+ *		   close_on_timeout, (char *)"Bye!");
+ */
+#define io_timeout(conn, ts, fn, arg)					\
+	io_timeout_((conn), (ts),					\
+		    typesafe_cb_preargs(struct io_plan, void *,		\
+					(fn), (arg),			\
+					struct io_conn *),		\
+		    (arg))
+bool io_timeout_(struct io_conn *conn, struct timespec ts,
+		 struct io_plan (*fn)(struct io_conn *, void *), void *arg);
+
+/**
+ * io_duplex - split an fd into two connections.
+ * @conn: a connection.
+ * @plan: the first I/O function to call.
+ *
+ * Sometimes you want to be able to simultaneously read and write on a
+ * single fd, but io forces a linear call sequence.  The solution is
+ * to have two connections for the same fd, and use one for read
+ * operations and one for write.
+ *
+ * You must io_close() both of them to close the fd.
+ *
+ * Example:
+ *	static void setup_read_write(int fd,
+ *				     char greet_in[5], const char greet_out[5])
+ *	{
+ *		struct io_conn *writer, *reader;
+ *
+ *		// Read their greeting and send ours at the same time.
+ *		writer = io_new_conn(fd,
+ *				     io_write(greet_out, 5, io_close_cb, NULL));
+ *		reader = io_duplex(writer,
+ *				     io_read(greet_in, 5, io_close_cb, NULL));
+ *		if (!reader || !writer)
+ *			exit(1);
+ *	}
+ */
+#define io_duplex(conn, plan)				\
+	(io_plan_no_debug(), io_duplex_((conn), (plan)))
+struct io_conn *io_duplex_(struct io_conn *conn, struct io_plan plan);
+
+/**
+ * io_wake - wake up an idle connection.
+ * @conn: an idle connection.
+ * @plan: the next I/O plan for @conn.
+ *
+ * This makes @conn ready to do I/O the next time around the io_loop().
+ *
+ * Example:
+ *	struct io_conn *sleeper;
+ *	sleeper = io_new_conn(open("/dev/null", O_RDONLY), io_idle());
+ *
+ *	io_wake(sleeper, io_write("junk", 4, io_close_cb, NULL));
+ */
+#define io_wake(conn, plan) (io_plan_no_debug(), io_wake_((conn), (plan)))
+void io_wake_(struct io_conn *conn, struct io_plan plan);
+
+/**
+ * io_break - return from io_loop()
+ * @ret: non-NULL value to return from io_loop().
+ * @plan: I/O to perform on return (if any)
+ *
+ * This breaks out of the io_loop.  As soon as the current @next
+ * function returns, any io_closed()'d connections will have their
+ * finish callbacks called, then io_loop() with return with @ret.
+ *
+ * If io_loop() is called again, then @plan will be carried out.
+ *
+ * Example:
+ *	static struct io_plan fail_on_timeout(struct io_conn *conn, char *msg)
+ *	{
+ *		return io_break(msg, io_close());
+ *	}
+ */
+#define io_break(ret, plan) (io_plan_no_debug(), io_break_((ret), (plan)))
+struct io_plan io_break_(void *ret, struct io_plan plan);
+
+/* FIXME: io_recvfrom/io_sendto */
+
+/**
+ * io_close - plan to close a connection.
+ *
+ * On return to io_loop, the connection will be closed.
+ *
+ * Example:
+ * static struct io_plan close_on_timeout(struct io_conn *conn, const char *msg)
+ * {
+ *	printf("closing: %s\n", msg);
+ *	return io_close();
+ * }
+ */
+#define io_close() io_debug(io_close_())
+struct io_plan io_close_(void);
+
+/**
+ * io_close_cb - helper callback to close a connection.
+ * @conn: the connection.
+ *
+ * This schedules a connection to be closed; designed to be used as
+ * a callback function.
+ *
+ * Example:
+ *	#define close_on_timeout io_close_cb
+ */
+struct io_plan io_close_cb(struct io_conn *, void *unused);
+
+/**
+ * io_loop - process fds until all closed on io_break.
+ *
+ * This is the core loop; it exits with the io_break() arg, or NULL if
+ * all connections and listeners are closed.
+ *
+ * Example:
+ *	io_loop();
+ */
+void *io_loop(void);
+#endif /* CCAN_IO_H */

+ 150 - 0
ccan/io/io_plan.h

@@ -0,0 +1,150 @@
+/* Licensed under LGPLv2.1+ - see LICENSE file for details */
+#ifndef CCAN_IO_PLAN_H
+#define CCAN_IO_PLAN_H
+struct io_conn;
+
+/**
+ * struct io_plan - a plan of what I/O to do.
+ * @pollflag: POLLIN or POLLOUT.
+ * @io: function to call when fd is available for @pollflag.
+ * @next: function to call after @io returns true.
+ * @next_arg: argument to @next.
+ * @u: scratch area for I/O.
+ *
+ * When the fd is POLLIN or POLLOUT (according to @pollflag), @io is
+ * called.  If it returns -1, io_close() becomed the new plan (and errno
+ * is saved).  If it returns 1, @next is called, otherwise @io is
+ * called again when @pollflag is available.
+ *
+ * You can use this to write your own io_plan functions.
+ */
+struct io_plan {
+	int pollflag;
+	/* Only NULL if idle. */
+	int (*io)(int fd, struct io_plan *plan);
+	/* Only NULL if closing. */
+	struct io_plan (*next)(struct io_conn *, void *arg);
+	void *next_arg;
+
+	union {
+		struct {
+			char *buf;
+			size_t len;
+		} read;
+		struct {
+			const char *buf;
+			size_t len;
+		} write;
+		struct {
+			char *buf;
+			size_t *lenp;
+		} readpart;
+		struct {
+			const char *buf;
+			size_t *lenp;
+		} writepart;
+		struct {
+			int saved_errno;
+		} close;
+		struct {
+			void *p;
+			size_t len;
+		} ptr_len;
+		struct {
+			void *p1;
+			void *p2;
+		} ptr_ptr;
+		struct {
+			size_t len1;
+			size_t len2;
+		} len_len;
+	} u;
+};
+
+#ifdef DEBUG
+/**
+ * io_debug_conn - routine to select connection(s) to debug.
+ *
+ * If this is set, the routine should return true if the connection is a
+ * debugging candidate.  If so, the callchain for I/O operations on this
+ * connection will be linear, for easier use of a debugger.
+ *
+ * You will also see calls to any callbacks which wake the connection
+ * which is being debugged.
+ *
+ * Example:
+ *	static bool debug_all(struct io_conn *conn)
+ *	{
+ *		return true();
+ *	}
+ *	...
+ *	io_debug_conn = debug_all;
+ */
+extern bool (*io_debug_conn)(struct io_conn *conn);
+
+/**
+ * io_debug - if we're debugging the current connection, call immediately.
+ *
+ * This determines if we are debugging the current connection: if so,
+ * it immediately applies the plan and calls back into io_loop() to
+ * create a linear call chain.
+ *
+ * Example:
+ *	#define io_idle() io_debug(io_idle_())
+ *	struct io_plan io_idle_(void);
+ */
+struct io_plan io_debug(struct io_plan plan);
+
+/**
+ * io_debug_io - return from function which actually does I/O.
+ *
+ * This determines if we are debugging the current connection: if so,
+ * it immediately sets the next function and calls into io_loop() to
+ * create a linear call chain.
+ *
+ * Example:
+ *
+ * static int do_write(int fd, struct io_plan *plan)
+ * {
+ *	ssize_t ret = write(fd, plan->u.write.buf, plan->u.write.len);
+ *	if (ret < 0)
+ *		return io_debug_io(-1);
+ *
+ *	plan->u.write.buf += ret;
+ *	plan->u.write.len -= ret;
+ *	return io_debug_io(plan->u.write.len == 0);
+ * }
+ */
+int io_debug_io(int ret);
+
+/**
+ * io_plan_no_debug - mark the next plan not to be called immediately.
+ *
+ * Most routines which take a plan are about to apply it to the current
+ * connection.  We (ab)use this pattern for debugging: as soon as such a
+ * plan is created it is called, to create a linear call chain.
+ *
+ * Some routines, like io_break(), io_duplex() and io_wake() take an
+ * io_plan, but they must not be applied immediately to the current
+ * connection, so we call this first.
+ *
+ * Example:
+ * #define io_break(ret, plan) (io_plan_no_debug(), io_break_((ret), (plan)))
+ * struct io_plan io_break_(void *ret, struct io_plan plan);
+ */
+#define io_plan_no_debug() ((io_plan_nodebug = true))
+
+extern bool io_plan_nodebug;
+#else
+static inline struct io_plan io_debug(struct io_plan plan)
+{
+	return plan;
+}
+static inline int io_debug_io(int ret)
+{
+	return ret;
+}
+#define io_plan_no_debug() (void)0
+#endif
+
+#endif /* CCAN_IO_PLAN_H */

+ 388 - 0
ccan/io/poll.c

@@ -0,0 +1,388 @@
+/* Licensed under LGPLv2.1+ - see LICENSE file for details */
+#include "io.h"
+#include "backend.h"
+#include <assert.h>
+#include <poll.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <limits.h>
+#include <errno.h>
+
+static size_t num_fds = 0, max_fds = 0, num_closing = 0, num_waiting = 0;
+static struct pollfd *pollfds = NULL;
+static struct fd **fds = NULL;
+static struct timers timeouts;
+#ifdef DEBUG
+static unsigned int io_loop_level;
+static struct io_conn *free_later;
+static void io_loop_enter(void)
+{
+	io_loop_level++;
+}
+static void io_loop_exit(void)
+{
+	io_loop_level--;
+	if (io_loop_level == 0) {
+		/* Delayed free. */
+		while (free_later) {
+			struct io_conn *c = free_later;
+			free_later = c->finish_arg;
+			free(c);
+		}
+	}
+}
+static void free_conn(struct io_conn *conn)
+{
+	/* Only free on final exit: chain via finish. */
+	if (io_loop_level > 1) {
+		struct io_conn *c;
+		for (c = free_later; c; c = c->finish_arg)
+			assert(c != conn);
+		conn->finish_arg = free_later;
+		free_later = conn;
+	} else
+		free(conn);
+}
+#else
+static void io_loop_enter(void)
+{
+}
+static void io_loop_exit(void)
+{
+}
+static void free_conn(struct io_conn *conn)
+{
+	free(conn);
+}
+#endif
+
+static bool add_fd(struct fd *fd, short events)
+{
+	if (num_fds + 1 > max_fds) {
+		struct pollfd *newpollfds;
+		struct fd **newfds;
+		size_t num = max_fds ? max_fds * 2 : 8;
+
+		newpollfds = realloc(pollfds, sizeof(*newpollfds) * num);
+		if (!newpollfds)
+			return false;
+		pollfds = newpollfds;
+		newfds = realloc(fds, sizeof(*newfds) * num);
+		if (!newfds)
+			return false;
+		fds = newfds;
+		max_fds = num;
+	}
+
+	pollfds[num_fds].events = events;
+	/* In case it's idle. */
+	if (!events)
+		pollfds[num_fds].fd = -fd->fd;
+	else
+		pollfds[num_fds].fd = fd->fd;
+	pollfds[num_fds].revents = 0; /* In case we're iterating now */
+	fds[num_fds] = fd;
+	fd->backend_info = num_fds;
+	num_fds++;
+	if (events)
+		num_waiting++;
+
+	return true;
+}
+
+static void del_fd(struct fd *fd)
+{
+	size_t n = fd->backend_info;
+
+	assert(n != -1);
+	assert(n < num_fds);
+	if (pollfds[n].events)
+		num_waiting--;
+	if (n != num_fds - 1) {
+		/* Move last one over us. */
+		pollfds[n] = pollfds[num_fds-1];
+		fds[n] = fds[num_fds-1];
+		assert(fds[n]->backend_info == num_fds-1);
+		fds[n]->backend_info = n;
+	} else if (num_fds == 1) {
+		/* Free everything when no more fds. */
+		free(pollfds);
+		free(fds);
+		pollfds = NULL;
+		fds = NULL;
+		max_fds = 0;
+	}
+	num_fds--;
+	fd->backend_info = -1;
+	close(fd->fd);
+}
+
+bool add_listener(struct io_listener *l)
+{
+	if (!add_fd(&l->fd, POLLIN))
+		return false;
+	return true;
+}
+
+void backend_plan_changed(struct io_conn *conn)
+{
+	struct pollfd *pfd;
+
+	/* This can happen with debugging and delayed free... */
+	if (conn->fd.backend_info == -1)
+		return;
+
+	pfd = &pollfds[conn->fd.backend_info];
+
+	if (pfd->events)
+		num_waiting--;
+
+	pfd->events = conn->plan.pollflag;
+	if (conn->duplex) {
+		int mask = conn->duplex->plan.pollflag;
+		/* You can't *both* read/write. */
+		assert(!mask || pfd->events != mask);
+		pfd->events |= mask;
+	}
+	if (pfd->events) {
+		num_waiting++;
+		pfd->fd = conn->fd.fd;
+	} else
+		pfd->fd = -conn->fd.fd;
+
+	if (!conn->plan.next)
+		num_closing++;
+}
+
+bool add_conn(struct io_conn *c)
+{
+	if (!add_fd(&c->fd, c->plan.pollflag))
+		return false;
+	/* Immediate close is allowed. */
+	if (!c->plan.next)
+		num_closing++;
+	return true;
+}
+
+bool add_duplex(struct io_conn *c)
+{
+	c->fd.backend_info = c->duplex->fd.backend_info;
+	backend_plan_changed(c);
+	return true;
+}
+
+void backend_del_conn(struct io_conn *conn)
+{
+	if (conn->finish) {
+		errno = conn->plan.u.close.saved_errno;
+		conn->finish(conn, conn->finish_arg);
+	}
+	if (timeout_active(conn))
+		backend_del_timeout(conn);
+	free(conn->timeout);
+	if (conn->duplex) {
+		/* In case fds[] pointed to the other one. */
+		fds[conn->fd.backend_info] = &conn->duplex->fd;
+		conn->duplex->duplex = NULL;
+		conn->fd.backend_info = -1;
+	} else
+		del_fd(&conn->fd);
+	num_closing--;
+	free_conn(conn);
+}
+
+void del_listener(struct io_listener *l)
+{
+	del_fd(&l->fd);
+}
+
+static void set_plan(struct io_conn *conn, struct io_plan plan)
+{
+	conn->plan = plan;
+	backend_plan_changed(conn);
+}
+
+static void accept_conn(struct io_listener *l)
+{
+	int fd = accept(l->fd.fd, NULL, NULL);
+
+	/* FIXME: What to do here? */
+	if (fd < 0)
+		return;
+	l->init(fd, l->arg);
+}
+
+/* It's OK to miss some, as long as we make progress. */
+static bool finish_conns(struct io_conn **ready)
+{
+	unsigned int i;
+
+	for (i = 0; !io_loop_return && i < num_fds; i++) {
+		struct io_conn *c, *duplex;
+
+		if (!num_closing)
+			break;
+
+		if (fds[i]->listener)
+			continue;
+		c = (void *)fds[i];
+		for (duplex = c->duplex; c; c = duplex, duplex = NULL) {
+			if (!c->plan.next) {
+				if (doing_debug_on(c) && ready) {
+					*ready = c;
+					return true;
+				}
+				backend_del_conn(c);
+				i--;
+			}
+		}
+	}
+	return false;
+}
+
+void backend_add_timeout(struct io_conn *conn, struct timespec duration)
+{
+	if (!timeouts.base)
+		timers_init(&timeouts, time_now());
+	timer_add(&timeouts, &conn->timeout->timer,
+		  time_add(time_now(), duration));
+	conn->timeout->conn = conn;
+}
+
+void backend_del_timeout(struct io_conn *conn)
+{
+	assert(conn->timeout->conn == conn);
+	timer_del(&timeouts, &conn->timeout->timer);
+	conn->timeout->conn = NULL;
+}
+
+/* This is the main loop. */
+void *do_io_loop(struct io_conn **ready)
+{
+	void *ret;
+
+	io_loop_enter();
+
+	while (!io_loop_return) {
+		int i, r, timeout = INT_MAX;
+		struct timespec now;
+		bool some_timeouts = false;
+
+		if (timeouts.base) {
+			struct timespec first;
+			struct list_head expired;
+			struct io_timeout *t;
+
+			now = time_now();
+
+			/* Call functions for expired timers. */
+			timers_expire(&timeouts, now, &expired);
+			while ((t = list_pop(&expired, struct io_timeout, timer.list))) {
+				struct io_conn *conn = t->conn;
+				/* Clear, in case timer re-adds */
+				t->conn = NULL;
+				set_current(conn);
+				set_plan(conn, t->next(conn, t->next_arg));
+				some_timeouts = true;
+			}
+
+			/* Now figure out how long to wait for the next one. */
+			if (timer_earliest(&timeouts, &first)) {
+				uint64_t f = time_to_msec(time_sub(first, now));
+				if (f < INT_MAX)
+					timeout = f;
+			}
+		}
+
+		if (num_closing) {
+			/* If this finishes a debugging con, return now. */
+			if (finish_conns(ready))
+				return NULL;
+			/* Could have started/finished more. */
+			continue;
+		}
+
+		/* debug can recurse on io_loop; anything can change. */
+		if (doing_debug() && some_timeouts)
+			continue;
+
+		if (num_fds == 0)
+			break;
+
+		/* You can't tell them all to go to sleep! */
+		assert(num_waiting);
+
+		r = poll(pollfds, num_fds, timeout);
+		if (r < 0)
+			break;
+
+		for (i = 0; i < num_fds && !io_loop_return; i++) {
+			struct io_conn *c = (void *)fds[i];
+			int events = pollfds[i].revents;
+
+			if (r == 0)
+				break;
+
+			if (fds[i]->listener) {
+				if (events & POLLIN) {
+					accept_conn((void *)c);
+					r--;
+				}
+			} else if (events & (POLLIN|POLLOUT)) {
+				r--;
+				if (c->duplex) {
+					int mask = c->duplex->plan.pollflag;
+					if (events & mask) {
+						if (doing_debug_on(c->duplex)
+							&& ready) {
+							*ready = c->duplex;
+							return NULL;
+						}
+						io_ready(c->duplex);
+						events &= ~mask;
+						/* debug can recurse;
+						 * anything can change. */
+						if (doing_debug())
+							break;
+						if (!(events&(POLLIN|POLLOUT)))
+							continue;
+					}
+				}
+				if (doing_debug_on(c) && ready) {
+					*ready = c;
+					return NULL;
+				}
+				io_ready(c);
+				/* debug can recurse; anything can change. */
+				if (doing_debug())
+					break;
+			} else if (events & (POLLHUP|POLLNVAL|POLLERR)) {
+				r--;
+				set_current(c);
+				errno = EBADF;
+				set_plan(c, io_close());
+				if (c->duplex) {
+					set_current(c->duplex);
+					set_plan(c->duplex, io_close());
+				}
+			}
+		}
+	}
+
+	while (num_closing && !io_loop_return) {
+		if (finish_conns(ready))
+			return NULL;
+	}
+
+	ret = io_loop_return;
+	io_loop_return = NULL;
+
+	io_loop_exit();
+	return ret;
+}
+
+void *io_loop(void)
+{
+	return do_io_loop(NULL);
+}

+ 8 - 0
ccan/io/test/run-01-start-finish-DEBUG.c

@@ -0,0 +1,8 @@
+#define DEBUG
+#define PORT "64001"
+#define main real_main
+int real_main(void);
+#include "run-01-start-finish.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }

+ 95 - 0
ccan/io/test/run-01-start-finish.c

@@ -0,0 +1,95 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#ifndef PORT
+#define PORT "65001"
+#endif
+
+static void finish_ok(struct io_conn *conn, int *state)
+{
+	ok1(*state == 1);
+	(*state)++;
+	io_break(state + 1, io_idle());
+}
+
+static void init_conn(int fd, int *state)
+{
+	ok1(*state == 0);
+	(*state)++;
+	io_set_finish(io_new_conn(fd, io_close()), finish_ok, state);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+	int fd, on = 1;
+	struct addrinfo *addrinfo, hints;
+
+	memset(&hints, 0, sizeof(hints));
+	hints.ai_family = AF_UNSPEC;
+	hints.ai_socktype = SOCK_STREAM;
+	hints.ai_flags = AI_PASSIVE;
+	hints.ai_protocol = 0;
+
+	if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+		return -1;
+
+	fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+		    addrinfo->ai_protocol);
+	if (fd < 0)
+		return -1;
+
+	setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+	if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+		close(fd);
+		return -1;
+	}
+	if (listen(fd, 1) != 0) {
+		close(fd);
+		return -1;
+	}
+	*info = addrinfo;
+	return fd;
+}
+
+int main(void)
+{
+	int state = 0;
+	struct addrinfo *addrinfo;
+	struct io_listener *l;
+	int fd;
+
+	/* This is how many tests you plan to run */
+	plan_tests(9);
+	fd = make_listen_fd(PORT, &addrinfo);
+	ok1(fd >= 0);
+	l = io_new_listener(fd, init_conn, &state);
+	ok1(l);
+	fflush(stdout);
+	if (!fork()) {
+		io_close_listener(l);
+		fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+			    addrinfo->ai_protocol);
+		if (fd < 0)
+			exit(1);
+		if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+			exit(2);
+		close(fd);
+		freeaddrinfo(addrinfo);
+		exit(0);
+	}
+	freeaddrinfo(addrinfo);
+	ok1(io_loop() == &state + 1);
+	ok1(state == 2);
+	io_close_listener(l);
+	ok1(wait(&state));
+	ok1(WIFEXITED(state));
+	ok1(WEXITSTATUS(state) == 0);
+
+	/* This exits depending on whether all tests passed */
+	return exit_status();
+}

+ 8 - 0
ccan/io/test/run-02-read-DEBUG.c

@@ -0,0 +1,8 @@
+#define DEBUG
+#define PORT "64002"
+#define main real_main
+int real_main(void);
+#include "run-02-read.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }

+ 115 - 0
ccan/io/test/run-02-read.c

@@ -0,0 +1,115 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#ifndef PORT
+#define PORT "65002"
+#endif
+
+struct data {
+	int state;
+	char buf[4];
+};
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+	ok1(d->state == 1);
+	d->state++;
+	io_break(d, io_idle());
+}
+
+static void init_conn(int fd, struct data *d)
+{
+	ok1(d->state == 0);
+	d->state++;
+
+	io_set_finish(io_new_conn(fd,
+				  io_read(d->buf, sizeof(d->buf), io_close_cb, d)),
+		      finish_ok, d);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+	int fd, on = 1;
+	struct addrinfo *addrinfo, hints;
+
+	memset(&hints, 0, sizeof(hints));
+	hints.ai_family = AF_UNSPEC;
+	hints.ai_socktype = SOCK_STREAM;
+	hints.ai_flags = AI_PASSIVE;
+	hints.ai_protocol = 0;
+
+	if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+		return -1;
+
+	fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+		    addrinfo->ai_protocol);
+	if (fd < 0)
+		return -1;
+
+	setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+	if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+		close(fd);
+		return -1;
+	}
+	if (listen(fd, 1) != 0) {
+		close(fd);
+		return -1;
+	}
+	*info = addrinfo;
+	return fd;
+}
+
+int main(void)
+{
+	struct data *d = malloc(sizeof(*d));
+	struct addrinfo *addrinfo;
+	struct io_listener *l;
+	int fd, status;
+
+	/* This is how many tests you plan to run */
+	plan_tests(10);
+	d->state = 0;
+	fd = make_listen_fd(PORT, &addrinfo);
+	ok1(fd >= 0);
+	l = io_new_listener(fd, init_conn, d);
+	ok1(l);
+	fflush(stdout);
+	if (!fork()) {
+		int i;
+
+		io_close_listener(l);
+		fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+			    addrinfo->ai_protocol);
+		if (fd < 0)
+			exit(1);
+		if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+			exit(2);
+		signal(SIGPIPE, SIG_IGN);
+		for (i = 0; i < strlen("hellothere"); i++) {
+			if (write(fd, "hellothere" + i, 1) != 1)
+				break;
+		}
+		close(fd);
+		freeaddrinfo(addrinfo);
+		free(d);
+		exit(0);
+	}
+	freeaddrinfo(addrinfo);
+	ok1(io_loop() == d);
+	ok1(d->state == 2);
+	ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0);
+	free(d);
+	io_close_listener(l);
+
+	ok1(wait(&status));
+	ok1(WIFEXITED(status));
+	ok1(WEXITSTATUS(status) == 0);
+
+	/* This exits depending on whether all tests passed */
+	return exit_status();
+}

+ 8 - 0
ccan/io/test/run-03-readpartial-DEBUG.c

@@ -0,0 +1,8 @@
+#define DEBUG
+#define PORT "64003"
+#define main real_main
+int real_main(void);
+#include "run-03-readpartial.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }

+ 144 - 0
ccan/io/test/run-03-readpartial.c

@@ -0,0 +1,144 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#ifndef PORT
+#define PORT "65003"
+#endif
+
+struct data {
+	int state;
+	size_t bytes;
+	char buf[4];
+};
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+	ok1(d->state == 1);
+	d->state++;
+	io_break(d, io_idle());
+}
+
+static void init_conn(int fd, struct data *d)
+{
+	ok1(d->state == 0);
+	d->state++;
+	d->bytes = sizeof(d->buf);
+
+	io_set_finish(io_new_conn(fd,
+				  io_read_partial(d->buf, &d->bytes, io_close_cb, d)),
+		      finish_ok, d);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+	int fd, on = 1;
+	struct addrinfo *addrinfo, hints;
+
+	memset(&hints, 0, sizeof(hints));
+	hints.ai_family = AF_UNSPEC;
+	hints.ai_socktype = SOCK_STREAM;
+	hints.ai_flags = AI_PASSIVE;
+	hints.ai_protocol = 0;
+
+	if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+		return -1;
+
+	fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+		    addrinfo->ai_protocol);
+	if (fd < 0)
+		return -1;
+
+	setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+	if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+		close(fd);
+		return -1;
+	}
+	if (listen(fd, 1) != 0) {
+		close(fd);
+		return -1;
+	}
+	*info = addrinfo;
+	return fd;
+}
+
+static void write_to_socket(const char *str, const struct addrinfo *addrinfo)
+{
+	int fd, i;
+
+	fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+		    addrinfo->ai_protocol);
+	if (fd < 0)
+		exit(1);
+	if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+		exit(2);
+	signal(SIGPIPE, SIG_IGN);
+	for (i = 0; i < strlen(str); i++) {
+		if (write(fd, str + i, 1) != 1)
+			break;
+	}
+	close(fd);
+}
+
+int main(void)
+{
+	struct data *d = malloc(sizeof(*d));
+	struct addrinfo *addrinfo;
+	struct io_listener *l;
+	int fd, status;
+
+	/* This is how many tests you plan to run */
+	plan_tests(22);
+	d->state = 0;
+	fd = make_listen_fd(PORT, &addrinfo);
+	ok1(fd >= 0);
+	l = io_new_listener(fd, init_conn, d);
+	ok1(l);
+	fflush(stdout);
+	if (!fork()) {
+		io_close_listener(l);
+		write_to_socket("hellothere", addrinfo);
+		freeaddrinfo(addrinfo);
+		free(d);
+		exit(0);
+	}
+	ok1(io_loop() == d);
+	ok1(d->state == 2);
+	ok1(d->bytes > 0);
+	ok1(d->bytes <= sizeof(d->buf));
+	ok1(memcmp(d->buf, "hellothere", d->bytes) == 0);
+
+	ok1(wait(&status));
+	ok1(WIFEXITED(status));
+	ok1(WEXITSTATUS(status) == 0);
+
+	fflush(stdout);
+	if (!fork()) {
+		io_close_listener(l);
+		write_to_socket("hi", addrinfo);
+		freeaddrinfo(addrinfo);
+		free(d);
+		exit(0);
+	}
+	d->state = 0;
+	ok1(io_loop() == d);
+	ok1(d->state == 2);
+	ok1(d->bytes > 0);
+	ok1(d->bytes <= strlen("hi"));
+	ok1(memcmp(d->buf, "hi", d->bytes) == 0);
+
+	freeaddrinfo(addrinfo);
+	free(d);
+	io_close_listener(l);
+
+	ok1(wait(&status));
+	ok1(WIFEXITED(status));
+	ok1(WEXITSTATUS(status) == 0);
+
+	/* This exits depending on whether all tests passed */
+	return exit_status();
+}

+ 8 - 0
ccan/io/test/run-04-writepartial-DEBUG.c

@@ -0,0 +1,8 @@
+#define DEBUG
+#define PORT "64004"
+#define main real_main
+int real_main(void);
+#include "run-04-writepartial.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }

+ 127 - 0
ccan/io/test/run-04-writepartial.c

@@ -0,0 +1,127 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#ifndef PORT
+#define PORT "65004"
+#endif
+
+struct data {
+	int state;
+	size_t bytes;
+	char *buf;
+};
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+	ok1(d->state == 1);
+	d->state++;
+	io_break(d, io_idle());
+}
+
+static void init_conn(int fd, struct data *d)
+{
+	ok1(d->state == 0);
+	d->state++;
+	io_set_finish(io_new_conn(fd,
+				  io_write_partial(d->buf, &d->bytes, io_close_cb, d)),
+				  finish_ok, d);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+	int fd, on = 1;
+	struct addrinfo *addrinfo, hints;
+
+	memset(&hints, 0, sizeof(hints));
+	hints.ai_family = AF_UNSPEC;
+	hints.ai_socktype = SOCK_STREAM;
+	hints.ai_flags = AI_PASSIVE;
+	hints.ai_protocol = 0;
+
+	if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+		return -1;
+
+	fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+		    addrinfo->ai_protocol);
+	if (fd < 0)
+		return -1;
+
+	setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+	if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+		close(fd);
+		return -1;
+	}
+	if (listen(fd, 1) != 0) {
+		close(fd);
+		return -1;
+	}
+	*info = addrinfo;
+	return fd;
+}
+
+static void read_from_socket(const char *str, const struct addrinfo *addrinfo)
+{
+	int fd;
+	char buf[100];
+
+	fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+		    addrinfo->ai_protocol);
+	if (fd < 0)
+		exit(1);
+	if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+		exit(2);
+	if (read(fd, buf, strlen(str)) != strlen(str))
+		exit(3);
+	if (memcmp(buf, str, strlen(str)) != 0)
+		exit(4);
+	close(fd);
+}
+
+int main(void)
+{
+	struct data *d = malloc(sizeof(*d));
+	struct addrinfo *addrinfo;
+	struct io_listener *l;
+	int fd, status;
+
+	/* This is how many tests you plan to run */
+	plan_tests(11);
+	d->state = 0;
+	d->bytes = 1024*1024;
+	d->buf = malloc(d->bytes);
+	memset(d->buf, 'a', d->bytes);
+	fd = make_listen_fd(PORT, &addrinfo);
+	ok1(fd >= 0);
+	l = io_new_listener(fd, init_conn, d);
+	ok1(l);
+	fflush(stdout);
+	if (!fork()) {
+		io_close_listener(l);
+		read_from_socket("aaaaaa", addrinfo);
+		freeaddrinfo(addrinfo);
+		free(d->buf);
+		free(d);
+		exit(0);
+	}
+	ok1(io_loop() == d);
+	ok1(d->state == 2);
+	ok1(d->bytes > 0);
+	ok1(d->bytes <= 1024*1024);
+
+	ok1(wait(&status));
+	ok1(WIFEXITED(status));
+	ok1(WEXITSTATUS(status) == 0);
+
+	freeaddrinfo(addrinfo);
+	free(d->buf);
+	free(d);
+	io_close_listener(l);
+
+	/* This exits depending on whether all tests passed */
+	return exit_status();
+}

+ 8 - 0
ccan/io/test/run-05-write-DEBUG.c

@@ -0,0 +1,8 @@
+#define DEBUG
+#define PORT "64005"
+#define main real_main
+int real_main(void);
+#include "run-05-write.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }

+ 128 - 0
ccan/io/test/run-05-write.c

@@ -0,0 +1,128 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#ifndef PORT
+#define PORT "65005"
+#endif
+
+struct data {
+	int state;
+	size_t bytes;
+	char *buf;
+};
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+	ok1(d->state == 1);
+	d->state++;
+	io_break(d, io_idle());
+}
+
+static void init_conn(int fd, struct data *d)
+{
+	ok1(d->state == 0);
+	d->state++;
+	io_set_finish(io_new_conn(fd, io_write(d->buf, d->bytes,
+					       io_close_cb, d)),
+		      finish_ok, d);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+	int fd, on = 1;
+	struct addrinfo *addrinfo, hints;
+
+	memset(&hints, 0, sizeof(hints));
+	hints.ai_family = AF_UNSPEC;
+	hints.ai_socktype = SOCK_STREAM;
+	hints.ai_flags = AI_PASSIVE;
+	hints.ai_protocol = 0;
+
+	if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+		return -1;
+
+	fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+		    addrinfo->ai_protocol);
+	if (fd < 0)
+		return -1;
+
+	setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+	if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+		close(fd);
+		return -1;
+	}
+	if (listen(fd, 1) != 0) {
+		close(fd);
+		return -1;
+	}
+	*info = addrinfo;
+	return fd;
+}
+
+static void read_from_socket(size_t bytes, const struct addrinfo *addrinfo)
+{
+	int fd, done, r;
+	char buf[100];
+
+	fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+		    addrinfo->ai_protocol);
+	if (fd < 0)
+		exit(1);
+	if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+		exit(2);
+
+	for (done = 0; done < bytes; done += r) {
+		r = read(fd, buf, sizeof(buf));
+		if (r < 0)
+			exit(3);
+		done += r;
+	}
+	close(fd);
+}
+
+int main(void)
+{
+	struct data *d = malloc(sizeof(*d));
+	struct addrinfo *addrinfo;
+	struct io_listener *l;
+	int fd, status;
+
+	/* This is how many tests you plan to run */
+	plan_tests(9);
+	d->state = 0;
+	d->bytes = 1024*1024;
+	d->buf = malloc(d->bytes);
+	memset(d->buf, 'a', d->bytes);
+	fd = make_listen_fd(PORT, &addrinfo);
+	ok1(fd >= 0);
+	l = io_new_listener(fd, init_conn, d);
+	ok1(l);
+	fflush(stdout);
+	if (!fork()) {
+		io_close_listener(l);
+		read_from_socket(d->bytes, addrinfo);
+		freeaddrinfo(addrinfo);
+		free(d->buf);
+		free(d);
+		exit(0);
+	}
+	ok1(io_loop() == d);
+	ok1(d->state == 2);
+
+	ok1(wait(&status));
+	ok1(WIFEXITED(status));
+	ok1(WEXITSTATUS(status) == 0);
+
+	freeaddrinfo(addrinfo);
+	free(d->buf);
+	free(d);
+	io_close_listener(l);
+
+	/* This exits depending on whether all tests passed */
+	return exit_status();
+}

+ 8 - 0
ccan/io/test/run-06-idle-DEBUG.c

@@ -0,0 +1,8 @@
+#define DEBUG
+#define PORT "64006"
+#define main real_main
+int real_main(void);
+#include "run-06-idle.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }

+ 146 - 0
ccan/io/test/run-06-idle.c

@@ -0,0 +1,146 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+#ifndef PORT
+#define PORT "65006"
+#endif
+
+static struct io_conn *idler;
+
+struct data {
+	int state;
+	char buf[4];
+};
+
+static struct io_plan read_done(struct io_conn *conn, struct data *d)
+{
+	ok1(d->state == 2 || d->state == 3);
+	d->state++;
+	return io_close();
+}
+
+static void finish_waker(struct io_conn *conn, struct data *d)
+{
+	io_wake(idler, io_read(d->buf, sizeof(d->buf), read_done, d));
+	ok1(d->state == 1);
+	d->state++;
+}
+
+static void finish_idle(struct io_conn *conn, struct data *d)
+{
+	ok1(d->state == 3);
+	d->state++;
+	io_break(d, io_idle());
+}
+
+static struct io_plan never(struct io_conn *conn, void *arg)
+{
+	abort();
+}
+
+static void init_conn(int fd, struct data *d)
+{
+	int fd2;
+
+	ok1(d->state == 0);
+	d->state++;
+	idler = io_new_conn(fd, io_idle());
+	io_set_finish(idler, finish_idle, d);
+
+	/* This will wake us up, as read will fail. */
+	fd2 = open("/dev/null", O_RDONLY);
+	ok1(fd2 >= 0);
+	io_set_finish(io_new_conn(fd2, io_read(idler, 1, never, NULL)),
+		      finish_waker, d);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+	int fd, on = 1;
+	struct addrinfo *addrinfo, hints;
+
+	memset(&hints, 0, sizeof(hints));
+	hints.ai_family = AF_UNSPEC;
+	hints.ai_socktype = SOCK_STREAM;
+	hints.ai_flags = AI_PASSIVE;
+	hints.ai_protocol = 0;
+
+	if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+		return -1;
+
+	fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+		    addrinfo->ai_protocol);
+	if (fd < 0)
+		return -1;
+
+	setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+	if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+		close(fd);
+		return -1;
+	}
+	if (listen(fd, 1) != 0) {
+		close(fd);
+		return -1;
+	}
+	*info = addrinfo;
+	return fd;
+}
+
+int main(void)
+{
+	struct data *d = malloc(sizeof(*d));
+	struct addrinfo *addrinfo;
+	struct io_listener *l;
+	int fd, status;
+
+	/* This is how many tests you plan to run */
+	plan_tests(13);
+	d->state = 0;
+	fd = make_listen_fd(PORT, &addrinfo);
+	ok1(fd >= 0);
+	l = io_new_listener(fd, init_conn, d);
+	ok1(l);
+	fflush(stdout);
+	if (!fork()) {
+		int i;
+
+		io_close_listener(l);
+		fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+			    addrinfo->ai_protocol);
+		if (fd < 0)
+			exit(1);
+		if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+			exit(2);
+		signal(SIGPIPE, SIG_IGN);
+		for (i = 0; i < strlen("hellothere"); i++) {
+			if (write(fd, "hellothere" + i, 1) != 1)
+				break;
+		}
+		close(fd);
+		freeaddrinfo(addrinfo);
+		free(d);
+		exit(0);
+	}
+	freeaddrinfo(addrinfo);
+
+	ok1(io_loop() == d);
+	ok1(d->state == 4);
+	ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0);
+	free(d);
+	io_close_listener(l);
+
+	ok1(wait(&status));
+	ok1(WIFEXITED(status));
+	ok1(WEXITSTATUS(status) == 0);
+
+	/* This exits depending on whether all tests passed */
+	return exit_status();
+}

+ 8 - 0
ccan/io/test/run-07-break-DEBUG.c

@@ -0,0 +1,8 @@
+#define DEBUG
+#define PORT "64007"
+#define main real_main
+int real_main(void);
+#include "run-07-break.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }

+ 125 - 0
ccan/io/test/run-07-break.c

@@ -0,0 +1,125 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#ifndef PORT
+#define PORT "65007"
+#endif
+
+struct data {
+	int state;
+	char buf[4];
+};
+
+static struct io_plan read_done(struct io_conn *conn, struct data *d)
+{
+	ok1(d->state == 1);
+	d->state++;
+	return io_close();
+}
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+	ok1(d->state == 2);
+	d->state++;
+}
+
+static void init_conn(int fd, struct data *d)
+{
+	ok1(d->state == 0);
+	d->state++;
+
+	io_set_finish(io_new_conn(fd,
+				  io_break(d,
+					   io_read(d->buf, sizeof(d->buf), read_done, d))),
+		      finish_ok, d);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+	int fd, on = 1;
+	struct addrinfo *addrinfo, hints;
+
+	memset(&hints, 0, sizeof(hints));
+	hints.ai_family = AF_UNSPEC;
+	hints.ai_socktype = SOCK_STREAM;
+	hints.ai_flags = AI_PASSIVE;
+	hints.ai_protocol = 0;
+
+	if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+		return -1;
+
+	fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+		    addrinfo->ai_protocol);
+	if (fd < 0)
+		return -1;
+
+	setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+	if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+		close(fd);
+		return -1;
+	}
+	if (listen(fd, 1) != 0) {
+		close(fd);
+		return -1;
+	}
+	*info = addrinfo;
+	return fd;
+}
+
+int main(void)
+{
+	struct data *d = malloc(sizeof(*d));
+	struct addrinfo *addrinfo;
+	struct io_listener *l;
+	int fd, status;
+
+	/* This is how many tests you plan to run */
+	plan_tests(13);
+	d->state = 0;
+	fd = make_listen_fd(PORT, &addrinfo);
+	ok1(fd >= 0);
+	l = io_new_listener(fd, init_conn, d);
+	ok1(l);
+	fflush(stdout);
+	if (!fork()) {
+		int i;
+
+		io_close_listener(l);
+		fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+			    addrinfo->ai_protocol);
+		if (fd < 0)
+			exit(1);
+		if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+			exit(2);
+		signal(SIGPIPE, SIG_IGN);
+		for (i = 0; i < strlen("hellothere"); i++) {
+			if (write(fd, "hellothere" + i, 1) != 1)
+				break;
+		}
+		close(fd);
+		freeaddrinfo(addrinfo);
+		free(d);
+		exit(0);
+	}
+	freeaddrinfo(addrinfo);
+	ok1(io_loop() == d);
+	ok1(d->state == 1);
+	io_close_listener(l);
+
+	ok1(io_loop() == NULL);
+	ok1(d->state == 3);
+	ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0);
+	free(d);
+
+	ok1(wait(&status));
+	ok1(WIFEXITED(status));
+	ok1(WEXITSTATUS(status) == 0);
+
+	/* This exits depending on whether all tests passed */
+	return exit_status();
+}

+ 7 - 0
ccan/io/test/run-08-hangup-on-idle-DEBUG.c

@@ -0,0 +1,7 @@
+#define DEBUG
+#define main real_main
+int real_main(void);
+#include "run-08-hangup-on-idle.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }

+ 45 - 0
ccan/io/test/run-08-hangup-on-idle.c

@@ -0,0 +1,45 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+static int fds2[2];
+
+static struct io_plan timeout_wakeup(struct io_conn *conn, char *buf)
+{
+	/* This kills the dummy connection. */
+	close(fds2[1]);
+	return io_read(buf, 16, io_close_cb, NULL);
+}
+
+int main(void)
+{
+	int fds[2];
+	struct io_conn *conn;
+	char buf[16];
+
+	plan_tests(4);
+
+	ok1(pipe(fds) == 0);
+
+	/* Write then close. */
+	io_new_conn(fds[1], io_write("hello there world", 16,
+				     io_close_cb, NULL));
+	conn = io_new_conn(fds[0], io_idle());
+
+	/* To avoid assert(num_waiting) */
+	ok1(pipe(fds2) == 0);
+	io_new_conn(fds2[0], io_read(buf, 16, io_close_cb, NULL));
+
+	/* After half a second, it will read. */
+	io_timeout(conn, time_from_msec(500), timeout_wakeup, buf);
+
+	ok1(io_loop() == NULL);
+	ok1(memcmp(buf, "hello there world", 16) == 0);
+
+	/* This exits depending on whether all tests passed */
+	return exit_status();
+}

+ 7 - 0
ccan/io/test/run-08-read-after-hangup-DEBUG.c

@@ -0,0 +1,7 @@
+#define DEBUG
+#define main real_main
+int real_main(void);
+#include "run-08-read-after-hangup.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }

+ 34 - 0
ccan/io/test/run-08-read-after-hangup.c

@@ -0,0 +1,34 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+#include <signal.h>
+
+static char inbuf[8];
+
+static struct io_plan wake_it(struct io_conn *conn, struct io_conn *reader)
+{
+	io_wake(reader, io_read(inbuf, 8, io_close_cb, NULL));
+	return io_close();
+}
+
+int main(void)
+{
+	int fds[2];
+	struct io_conn *conn;
+
+	plan_tests(3);
+
+	ok1(pipe(fds) == 0);
+	conn = io_new_conn(fds[0], io_idle());
+	io_new_conn(fds[1], io_write("EASYTEST", 8, wake_it, conn));
+
+	ok1(io_loop() == NULL);
+	ok1(memcmp(inbuf, "EASYTEST", sizeof(inbuf)) == 0);
+
+	/* This exits depending on whether all tests passed */
+	return exit_status();
+}

+ 8 - 0
ccan/io/test/run-09-connect-DEBUG.c

@@ -0,0 +1,8 @@
+#define DEBUG
+#define PORT "64009"
+#define main real_main
+int real_main(void);
+#include "run-09-connect.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }

+ 103 - 0
ccan/io/test/run-09-connect.c

@@ -0,0 +1,103 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#ifndef PORT
+#define PORT "65009"
+#endif
+
+static struct io_listener *l;
+
+struct data {
+	int state;
+	char buf[10];
+};
+
+static struct io_plan closer(struct io_conn *conn, struct data *d)
+{
+	d->state++;
+	return io_close();
+}
+
+static struct io_plan connected(struct io_conn *conn, struct data *d2)
+{
+	ok1(d2->state == 0);
+	d2->state++;
+	return io_read(d2->buf, sizeof(d2->buf), closer, d2);
+}
+
+static void init_conn(int fd, struct data *d)
+{
+	ok1(d->state == 0);
+	d->state++;
+	io_new_conn(fd, io_write(d->buf, sizeof(d->buf), closer, d));
+	io_close_listener(l);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+	int fd, on = 1;
+	struct addrinfo *addrinfo, hints;
+
+	memset(&hints, 0, sizeof(hints));
+	hints.ai_family = AF_UNSPEC;
+	hints.ai_socktype = SOCK_STREAM;
+	hints.ai_flags = AI_PASSIVE;
+	hints.ai_protocol = 0;
+
+	if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+		return -1;
+
+	fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+		    addrinfo->ai_protocol);
+	if (fd < 0)
+		return -1;
+
+	setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+	if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+		close(fd);
+		return -1;
+	}
+	if (listen(fd, 1) != 0) {
+		close(fd);
+		return -1;
+	}
+	*info = addrinfo;
+	return fd;
+}
+
+int main(void)
+{
+	struct data *d = malloc(sizeof(*d)), *d2 = malloc(sizeof(*d2));
+	struct addrinfo *addrinfo;
+	int fd;
+
+	/* This is how many tests you plan to run */
+	plan_tests(8);
+	d->state = 0;
+	memset(d->buf, 'a', sizeof(d->buf));
+	fd = make_listen_fd(PORT, &addrinfo);
+	ok1(fd >= 0);
+	l = io_new_listener(fd, init_conn, d);
+	ok1(l);
+
+	fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+		    addrinfo->ai_protocol);
+	d2->state = 0;
+	ok1(io_new_conn(fd, io_connect(fd, addrinfo, connected, d2)));
+
+	ok1(io_loop() == NULL);
+	ok1(d->state == 2);
+	ok1(d2->state == 2);
+
+	freeaddrinfo(addrinfo);
+	free(d);
+	free(d2);
+
+	/* This exits depending on whether all tests passed */
+	return exit_status();
+}

+ 12 - 0
ccan/io/test/run-10-many-DEBUG.c

@@ -0,0 +1,12 @@
+#define DEBUG
+#define PORT "64010"
+#define main real_main
+int real_main(void);
+#include "run-10-many.c"
+#undef main
+/* We stack overflow if we debug all of them! */
+static bool debug_one(struct io_conn *conn)
+{
+	return conn == buf[1].reader;
+}
+int main(void) { io_debug_conn = debug_one; return real_main(); }

+ 105 - 0
ccan/io/test/run-10-many.c

@@ -0,0 +1,105 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#define NUM 100
+#define NUM_ITERS 1000
+
+struct buffer {
+	int iters;
+	struct io_conn *reader, *writer;
+	char buf[32];
+};
+
+static struct io_plan poke_reader(struct io_conn *conn, struct buffer *buf);
+
+static struct io_plan poke_writer(struct io_conn *conn, struct buffer *buf)
+{
+	assert(conn == buf->reader);
+
+	if (buf->iters == NUM_ITERS)
+		return io_close();
+
+	/* You write. */
+	io_wake(buf->writer,
+		io_write(&buf->buf, sizeof(buf->buf), poke_reader, buf));
+
+	/* I'll wait until you wake me. */
+	return io_idle();
+}
+
+static struct io_plan poke_reader(struct io_conn *conn, struct buffer *buf)
+{
+	assert(conn == buf->writer);
+	/* You read. */
+	io_wake(buf->reader,
+		io_read(&buf->buf, sizeof(buf->buf), poke_writer, buf));
+
+	if (++buf->iters == NUM_ITERS)
+		return io_close();
+
+	/* I'll wait until you tell me to write. */
+	return io_idle();
+}
+
+static struct buffer buf[NUM];
+
+int main(void)
+{
+	unsigned int i;
+	int fds[2], last_read, last_write;
+
+	plan_tests(5 + NUM);
+
+	ok1(pipe(fds) == 0);
+	last_read = fds[0];
+	last_write = fds[1];
+
+	for (i = 1; i < NUM; i++) {
+		if (pipe(fds) < 0)
+			break;
+		memset(buf[i].buf, i, sizeof(buf[i].buf));
+		sprintf(buf[i].buf, "%i-%i", i, i);
+
+		/* Wait for writer to tell us to read. */
+		buf[i].reader = io_new_conn(last_read, io_idle());
+		if (!buf[i].reader)
+			break;
+		buf[i].writer = io_new_conn(fds[1],
+					    io_write(&buf[i].buf,
+						     sizeof(buf[i].buf),
+						     poke_reader, &buf[i]));
+		if (!buf[i].writer)
+			break;
+		last_read = fds[0];
+	}
+	if (!ok1(i == NUM))
+		exit(exit_status());
+
+	/* Last one completes the cirle. */
+	i = 0;
+	sprintf(buf[i].buf, "%i-%i", i, i);
+	buf[i].reader = io_new_conn(last_read, io_idle());
+	ok1(buf[i].reader);
+	buf[i].writer = io_new_conn(last_write,
+				    io_write(&buf[i].buf, sizeof(buf[i].buf),
+					     poke_reader, &buf[i]));
+	ok1(buf[i].writer);
+
+	/* They should eventually exit */
+	ok1(io_loop() == NULL);
+
+	for (i = 0; i < NUM; i++) {
+		char b[sizeof(buf[0].buf)];
+		memset(b, i, sizeof(b));
+		sprintf(b, "%i-%i", i, i);
+		ok1(memcmp(b, buf[(i + NUM_ITERS) % NUM].buf, sizeof(b)) == 0);
+	}
+
+	/* This exits depending on whether all tests passed */
+	return exit_status();
+}

+ 8 - 0
ccan/io/test/run-12-bidir-DEBUG.c

@@ -0,0 +1,8 @@
+#define DEBUG
+#define PORT "64012"
+#define main real_main
+int real_main(void);
+#include "run-12-bidir.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }

+ 132 - 0
ccan/io/test/run-12-bidir.c

@@ -0,0 +1,132 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#ifndef PORT
+#define PORT "65012"
+#endif
+
+struct data {
+	struct io_listener *l;
+	int state;
+	char buf[4];
+	char wbuf[32];
+};
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+	d->state++;
+}
+
+static struct io_plan write_done(struct io_conn *conn, struct data *d)
+{
+	d->state++;
+	return io_close();
+}
+
+static void init_conn(int fd, struct data *d)
+{
+	struct io_conn *conn;
+
+	ok1(d->state == 0);
+	d->state++;
+
+	io_close_listener(d->l);
+
+	memset(d->wbuf, 7, sizeof(d->wbuf));
+
+	conn = io_new_conn(fd, io_read(d->buf, sizeof(d->buf), io_close_cb, d));
+	io_set_finish(conn, finish_ok, d);
+	conn = io_duplex(conn, io_write(d->wbuf, sizeof(d->wbuf), write_done, d));
+	ok1(conn);
+	io_set_finish(conn, finish_ok, d);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+	int fd, on = 1;
+	struct addrinfo *addrinfo, hints;
+
+	memset(&hints, 0, sizeof(hints));
+	hints.ai_family = AF_UNSPEC;
+	hints.ai_socktype = SOCK_STREAM;
+	hints.ai_flags = AI_PASSIVE;
+	hints.ai_protocol = 0;
+
+	if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+		return -1;
+
+	fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+		    addrinfo->ai_protocol);
+	if (fd < 0)
+		return -1;
+
+	setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+	if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+		close(fd);
+		return -1;
+	}
+	if (listen(fd, 1) != 0) {
+		close(fd);
+		return -1;
+	}
+	*info = addrinfo;
+	return fd;
+}
+
+int main(void)
+{
+	struct data *d = malloc(sizeof(*d));
+	struct addrinfo *addrinfo;
+	int fd, status;
+
+	/* This is how many tests you plan to run */
+	plan_tests(10);
+	d->state = 0;
+	fd = make_listen_fd(PORT, &addrinfo);
+	ok1(fd >= 0);
+	d->l = io_new_listener(fd, init_conn, d);
+	ok1(d->l);
+	fflush(stdout);
+	if (!fork()) {
+		int i;
+		char buf[32];
+
+		io_close_listener(d->l);
+		free(d);
+		fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+			    addrinfo->ai_protocol);
+		if (fd < 0)
+			exit(1);
+		if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+			exit(2);
+		signal(SIGPIPE, SIG_IGN);
+		for (i = 0; i < 32; i++) {
+			if (read(fd, buf+i, 1) != 1)
+				break;
+		}
+		for (i = 0; i < strlen("hellothere"); i++) {
+			if (write(fd, "hellothere" + i, 1) != 1)
+				break;
+		}
+		close(fd);
+		freeaddrinfo(addrinfo);
+		exit(0);
+	}
+	freeaddrinfo(addrinfo);
+	ok1(io_loop() == NULL);
+	ok1(d->state == 4);
+	ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0);
+	free(d);
+
+	ok1(wait(&status));
+	ok1(WIFEXITED(status));
+	ok1(WEXITSTATUS(status) == 0);
+
+	/* This exits depending on whether all tests passed */
+	return exit_status();
+}

+ 8 - 0
ccan/io/test/run-13-all-idle-DEBUG.c

@@ -0,0 +1,8 @@
+#define DEBUG
+#define PORT "64013"
+#define main real_main
+int real_main(void);
+#include "run-13-all-idle.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }

+ 31 - 0
ccan/io/test/run-13-all-idle.c

@@ -0,0 +1,31 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+#include <signal.h>
+
+int main(void)
+{
+	int status;
+
+	plan_tests(3);
+
+	if (fork() == 0) {
+		int fds[2];
+
+		ok1(pipe(fds) == 0);
+		io_new_conn(fds[0], io_idle());
+		io_loop();
+		exit(1);
+	}
+
+	ok1(wait(&status) != -1);
+	ok1(WIFSIGNALED(status));
+	ok1(WTERMSIG(status) == SIGABRT);
+
+	/* This exits depending on whether all tests passed */
+	return exit_status();
+}

+ 8 - 0
ccan/io/test/run-15-timeout-DEBUG.c

@@ -0,0 +1,8 @@
+#define DEBUG
+#define PORT "64015"
+#define main real_main
+int real_main(void);
+#include "run-15-timeout.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }

+ 174 - 0
ccan/io/test/run-15-timeout.c

@@ -0,0 +1,174 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+#include <unistd.h>
+
+#ifndef PORT
+#define PORT "65015"
+#endif
+
+struct data {
+	int state;
+	int timeout_usec;
+	bool timed_out;
+	char buf[4];
+};
+
+
+static struct io_plan no_timeout(struct io_conn *conn, struct data *d)
+{
+	ok1(d->state == 1);
+	d->state++;
+	return io_close();
+}
+
+static struct io_plan timeout(struct io_conn *conn, struct data *d)
+{
+	ok1(d->state == 1);
+	d->state++;
+	d->timed_out = true;
+	return io_close();
+}
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+	ok1(d->state == 2);
+	d->state++;
+	io_break(d, io_idle());
+}
+
+static void init_conn(int fd, struct data *d)
+{
+	struct io_conn *conn;
+
+	ok1(d->state == 0);
+	d->state++;
+
+	conn = io_new_conn(fd, io_read(d->buf, sizeof(d->buf), no_timeout, d));
+	io_set_finish(conn, finish_ok, d);
+	io_timeout(conn, time_from_usec(d->timeout_usec), timeout, d);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+	int fd, on = 1;
+	struct addrinfo *addrinfo, hints;
+
+	memset(&hints, 0, sizeof(hints));
+	hints.ai_family = AF_UNSPEC;
+	hints.ai_socktype = SOCK_STREAM;
+	hints.ai_flags = AI_PASSIVE;
+	hints.ai_protocol = 0;
+
+	if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+		return -1;
+
+	fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+		    addrinfo->ai_protocol);
+	if (fd < 0)
+		return -1;
+
+	setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+	if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+		close(fd);
+		return -1;
+	}
+	if (listen(fd, 1) != 0) {
+		close(fd);
+		return -1;
+	}
+	*info = addrinfo;
+	return fd;
+}
+
+int main(void)
+{
+	struct data *d = malloc(sizeof(*d));
+	struct addrinfo *addrinfo;
+	struct io_listener *l;
+	int fd, status;
+
+	/* This is how many tests you plan to run */
+	plan_tests(20);
+	d->state = 0;
+	d->timed_out = false;
+	d->timeout_usec = 100000;
+	fd = make_listen_fd(PORT, &addrinfo);
+	ok1(fd >= 0);
+	l = io_new_listener(fd, init_conn, d);
+	ok1(l);
+	fflush(stdout);
+
+	if (!fork()) {
+		int i;
+
+		io_close_listener(l);
+		fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+			    addrinfo->ai_protocol);
+		if (fd < 0)
+			exit(1);
+		if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+			exit(2);
+		signal(SIGPIPE, SIG_IGN);
+		usleep(500000);
+		for (i = 0; i < strlen("hellothere"); i++) {
+			if (write(fd, "hellothere" + i, 1) != 1)
+				break;
+		}
+		close(fd);
+		freeaddrinfo(addrinfo);
+		free(d);
+		exit(i);
+	}
+	ok1(io_loop() == d);
+	ok1(d->state == 3);
+	ok1(d->timed_out == true);
+	ok1(wait(&status));
+	ok1(WIFEXITED(status));
+	ok1(WEXITSTATUS(status) < sizeof(d->buf));
+
+	/* This one shouldn't time out. */
+	d->state = 0;
+	d->timed_out = false;
+	d->timeout_usec = 500000;
+	fflush(stdout);
+
+	if (!fork()) {
+		int i;
+
+		io_close_listener(l);
+		fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+			    addrinfo->ai_protocol);
+		if (fd < 0)
+			exit(1);
+		if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+			exit(2);
+		signal(SIGPIPE, SIG_IGN);
+		usleep(100000);
+		for (i = 0; i < strlen("hellothere"); i++) {
+			if (write(fd, "hellothere" + i, 1) != 1)
+				break;
+		}
+		close(fd);
+		freeaddrinfo(addrinfo);
+		free(d);
+		exit(i);
+	}
+	ok1(io_loop() == d);
+	ok1(d->state == 3);
+	ok1(d->timed_out == false);
+	ok1(wait(&status));
+	ok1(WIFEXITED(status));
+	ok1(WEXITSTATUS(status) >= sizeof(d->buf));
+
+	io_close_listener(l);
+	freeaddrinfo(addrinfo);
+	free(d);
+
+	/* This exits depending on whether all tests passed */
+	return exit_status();
+}

+ 8 - 0
ccan/io/test/run-17-homemade-io-DEBUG.c

@@ -0,0 +1,8 @@
+#define DEBUG
+#define PORT "64017"
+#define main real_main
+int real_main(void);
+#include "run-17-homemade-io.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }

+ 183 - 0
ccan/io/test/run-17-homemade-io.c

@@ -0,0 +1,183 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#ifndef PORT
+#define PORT "65017"
+#endif
+
+struct packet {
+	int state;
+	size_t len;
+	void *contents;
+};
+
+static void finish_ok(struct io_conn *conn, struct packet *pkt)
+{
+	ok1(pkt->state == 3);
+	pkt->state++;
+	io_break(pkt, io_idle());
+}
+
+static int do_read_packet(int fd, struct io_plan *plan)
+{
+	struct packet *pkt = plan->u.ptr_len.p;
+	char *dest;
+	ssize_t ret;
+	size_t off, totlen;
+
+	/* Reading len? */
+	if (plan->u.ptr_len.len < sizeof(size_t)) {
+		ok1(pkt->state == 1);
+		pkt->state++;
+		dest = (char *)&pkt->len;
+		off = plan->u.ptr_len.len;
+		totlen = sizeof(pkt->len);
+	} else {
+		ok1(pkt->state == 2);
+		pkt->state++;
+		if (pkt->len == 0)
+			return io_debug_io(1);
+		if (!pkt->contents && !(pkt->contents = malloc(pkt->len)))
+			goto fail;
+		else {
+			dest = pkt->contents;
+			off = plan->u.ptr_len.len - sizeof(pkt->len);
+			totlen = pkt->len;
+		}
+	}
+
+	ret = read(fd, dest + off, totlen - off);
+	if (ret <= 0)
+		goto fail;
+
+	plan->u.ptr_len.len += ret;
+
+	/* Finished? */
+	return io_debug_io(plan->u.ptr_len.len >= sizeof(pkt->len)
+			   && plan->u.ptr_len.len == pkt->len + sizeof(pkt->len));
+
+fail:
+	free(pkt->contents);
+	return io_debug_io(-1);
+}
+
+static struct io_plan io_read_packet(struct packet *pkt,
+				     struct io_plan (*cb)(struct io_conn *, void *),
+				     void *arg)
+{
+	struct io_plan plan;
+
+	assert(cb);
+	pkt->contents = NULL;
+	plan.u.ptr_len.p = pkt;
+	plan.u.ptr_len.len = 0;
+	plan.io = do_read_packet;
+	plan.next = cb;
+	plan.next_arg = arg;
+	plan.pollflag = POLLIN;
+
+	return plan;
+}
+
+static void init_conn(int fd, struct packet *pkt)
+{
+	ok1(pkt->state == 0);
+	pkt->state++;
+
+	io_set_finish(io_new_conn(fd, io_read_packet(pkt, io_close_cb, pkt)),
+		      finish_ok, pkt);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+	int fd, on = 1;
+	struct addrinfo *addrinfo, hints;
+
+	memset(&hints, 0, sizeof(hints));
+	hints.ai_family = AF_UNSPEC;
+	hints.ai_socktype = SOCK_STREAM;
+	hints.ai_flags = AI_PASSIVE;
+	hints.ai_protocol = 0;
+
+	if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+		return -1;
+
+	fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+		    addrinfo->ai_protocol);
+	if (fd < 0)
+		return -1;
+
+	setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+	if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+		close(fd);
+		return -1;
+	}
+	if (listen(fd, 1) != 0) {
+		close(fd);
+		return -1;
+	}
+	*info = addrinfo;
+	return fd;
+}
+
+int main(void)
+{
+	struct packet *pkt = malloc(sizeof(*pkt));
+	struct addrinfo *addrinfo;
+	struct io_listener *l;
+	int fd, status;
+
+	/* This is how many tests you plan to run */
+	plan_tests(13);
+	pkt->state = 0;
+	fd = make_listen_fd(PORT, &addrinfo);
+	ok1(fd >= 0);
+	l = io_new_listener(fd, init_conn, pkt);
+	ok1(l);
+	fflush(stdout);
+	if (!fork()) {
+		struct {
+			size_t len;
+			char data[8];
+		} data;
+
+		io_close_listener(l);
+		fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+			    addrinfo->ai_protocol);
+		if (fd < 0)
+			exit(1);
+		if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+			exit(2);
+		signal(SIGPIPE, SIG_IGN);
+
+		data.len = sizeof(data.data);
+		memcpy(data.data, "hithere!", sizeof(data.data));
+		if (write(fd, &data, sizeof(data)) != sizeof(data))
+			exit(3);
+
+		close(fd);
+		freeaddrinfo(addrinfo);
+		free(pkt);
+		exit(0);
+	}
+	freeaddrinfo(addrinfo);
+	ok1(io_loop() == pkt);
+	ok1(pkt->state == 4);
+	ok1(pkt->len == 8);
+	ok1(memcmp(pkt->contents, "hithere!", 8) == 0);
+	free(pkt->contents);
+	free(pkt);
+	io_close_listener(l);
+
+	ok1(wait(&status));
+	ok1(WIFEXITED(status));
+	ok1(WEXITSTATUS(status) == 0);
+
+	/* This exits depending on whether all tests passed */
+	return exit_status();
+}

+ 8 - 0
ccan/io/test/run-18-errno-DEBUG.c

@@ -0,0 +1,8 @@
+#define DEBUG
+#define PORT "64018"
+#define main real_main
+int real_main(void);
+#include "run-18-errno.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }

+ 120 - 0
ccan/io/test/run-18-errno.c

@@ -0,0 +1,120 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#ifndef PORT
+#define PORT "65018"
+#endif
+
+static void finish_100(struct io_conn *conn, int *state)
+{
+	ok1(errno == 100);
+	ok1(*state == 1);
+	(*state)++;
+}
+
+static void finish_EBADF(struct io_conn *conn, int *state)
+{
+	ok1(errno == EBADF);
+	ok1(*state == 3);
+	(*state)++;
+	io_break(state + 1, io_close());
+}
+
+static void init_conn(int fd, int *state)
+{
+	if (*state == 0) {
+		(*state)++;
+		errno = 100;
+		io_set_finish(io_new_conn(fd, io_close()), finish_100, state);
+	} else {
+		ok1(*state == 2);
+		(*state)++;
+		close(fd);
+		errno = 0;
+		io_set_finish(io_new_conn(fd, io_read(state, 0,
+						      io_close_cb, NULL)),
+			      finish_EBADF, state);
+	}
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+	int fd, on = 1;
+	struct addrinfo *addrinfo, hints;
+
+	memset(&hints, 0, sizeof(hints));
+	hints.ai_family = AF_UNSPEC;
+	hints.ai_socktype = SOCK_STREAM;
+	hints.ai_flags = AI_PASSIVE;
+	hints.ai_protocol = 0;
+
+	if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+		return -1;
+
+	fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+		    addrinfo->ai_protocol);
+	if (fd < 0)
+		return -1;
+
+	setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+	if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+		close(fd);
+		return -1;
+	}
+	if (listen(fd, 1) != 0) {
+		close(fd);
+		return -1;
+	}
+	*info = addrinfo;
+	return fd;
+}
+
+int main(void)
+{
+	int state = 0;
+	struct addrinfo *addrinfo;
+	struct io_listener *l;
+	int fd;
+
+	/* This is how many tests you plan to run */
+	plan_tests(12);
+	fd = make_listen_fd(PORT, &addrinfo);
+	ok1(fd >= 0);
+	l = io_new_listener(fd, init_conn, &state);
+	ok1(l);
+	fflush(stdout);
+	if (!fork()) {
+		io_close_listener(l);
+		fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+			    addrinfo->ai_protocol);
+		if (fd < 0)
+			exit(1);
+		if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+			exit(2);
+		close(fd);
+		fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+			    addrinfo->ai_protocol);
+		if (fd < 0)
+			exit(3);
+		if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+			exit(4);
+		close(fd);
+		freeaddrinfo(addrinfo);
+		exit(0);
+	}
+	freeaddrinfo(addrinfo);
+	ok1(io_loop() == &state + 1);
+	ok1(state == 4);
+	io_close_listener(l);
+	ok1(wait(&state));
+	ok1(WIFEXITED(state));
+	ok1(WEXITSTATUS(state) == 0);
+
+	/* This exits depending on whether all tests passed */
+	return exit_status();
+}