Browse Source

io: change io_idle() to io_wait()

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
Rusty Russell 11 years ago
parent
commit
f7ab2c65d4

+ 57 - 74
ccan/io/_info

@@ -25,76 +25,74 @@
  * #include <signal.h>
  * #include <sys/types.h>
  * #include <sys/wait.h>
+ * #include <string.h>
  *
  * struct buffer {
- * 	size_t max, off, rlen;
- * 	char *buf;
+ * 	bool finished;
+ * 	size_t start, end, rlen, wlen;
+ * 	char buf[4096];
  * };
  *
- * struct stdin_buffer {
- * 	struct io_conn *reader, *writer;
- * 	size_t len;
- * 	char inbuf[4096];
- * };
- *
- * // This reads from stdin.
- * static struct io_plan wake_writer(struct io_conn *, struct stdin_buffer *);
- * // This writes the stdin buffer to the child.
- * static struct io_plan wake_reader(struct io_conn *, struct stdin_buffer *);
- *
- * static struct io_plan wake_writer(struct io_conn *c, struct stdin_buffer *b)
+ * static void finish(struct io_conn *c, struct buffer *b)
  * {
- * 	assert(c == b->reader);
- * 	io_wake(b->writer, io_write(b->inbuf, b->len, wake_reader, b));
- * 	return io_idle();
+ * 	// Mark us finished.
+ * 	b->finished = true;
+ * 	// Wake writer just in case it's asleep.
+ * 	io_wake(b);
  * }
  *
- * static void reader_exit(struct io_conn *c, struct stdin_buffer *b)
+ * static struct io_plan read_in(struct io_conn *c, struct buffer *b)
  * {
- * 	assert(c == b->reader);
- * 	io_wake(b->writer, io_close());
- * 	b->reader = NULL;
- * }
+ * 	// Add what we just read.
+ * 	b->end += b->rlen;
+ * 	assert(b->end <= sizeof(b->buf));
  *
- * static struct io_plan wake_reader(struct io_conn *c, struct stdin_buffer *b)
- * {
- * 	assert(c == b->writer);
- *	if (!b->reader)
- *		return io_close();
- *	b->len = sizeof(b->inbuf);
- * 	io_wake(b->reader, io_read_partial(b->inbuf, &b->len, wake_writer, b));
- * 	return io_idle();
- * }
+ * 	// If we just read something, wake writer.
+ * 	if (b->rlen != 0)
+ * 		io_wake(b);
  *
- * static void fail_child_write(struct io_conn *conn, struct stdin_buffer *b)
- * {
- * 	if (b->reader)
- * 		err(1, "Failed writing to child.");
+ * 	// If buffer is empty, return to start.
+ * 	if (b->start == b->end)
+ * 		b->start = b->end = 0;
+ *
+ * 	// Read in some of the rest.
+ * 	b->rlen = sizeof(b->buf) - b->end;
+ *
+ * 	// No room?  Wait for writer
+ * 	if (b->rlen == 0)
+ * 		return io_wait(b, read_in, b);
+ *
+ * 	return io_read_partial(b->buf + b->end, &b->rlen, read_in, b);
  * }
  *
- * // This reads from the child and saves it into buffer.
- * static struct io_plan read_from_child(struct io_conn *conn,
- *					 struct buffer *b)
+ * static struct io_plan write_out(struct io_conn *c, struct buffer *b)
  * {
- * 	b->off += b->rlen;
- *
- * 	if (b->off == b->max)
- * 		b->buf = realloc(b->buf, b->max *= 2);
+ * 	// Remove what we just wrote.
+ * 	b->start += b->wlen;
+ * 	assert(b->start <= sizeof(b->buf));
+ *
+ * 	// If we wrote somthing, wake writer.
+ * 	if (b->wlen != 0)
+ * 		io_wake(b);
+ *
+ * 	b->wlen = b->end - b->start;
+ * 	// Nothing to write?  Wait for reader.
+ * 	if (b->wlen == 0) {
+ * 		if (b->finished)
+ * 			return io_close();
+ * 		return io_wait(b, write_out, b);
+ * 	}
  *
- * 	b->rlen = b->max - b->off;
- * 	return io_read_partial(b->buf + b->off, &b->rlen, read_from_child, b);
+ * 	return io_write_partial(b->buf + b->start, &b->wlen, write_out, b);
  * }
  *
  * // Feed a program our stdin, gather its stdout, print that at end.
  * int main(int argc, char *argv[])
  * {
  * 	int tochild[2], fromchild[2];
- * 	struct buffer out;
- * 	struct stdin_buffer sbuf;
+ * 	struct buffer to, from;
  * 	int status;
- * 	size_t off;
- * 	ssize_t ret;
- *	struct io_conn *from_child;
+ * 	struct io_conn *reader;
  *
  * 	if (argc == 1)
  * 		errx(1, "Usage: runner <cmdline>...");
@@ -117,35 +115,20 @@
  * 	close(fromchild[1]);
  * 	signal(SIGPIPE, SIG_IGN);
  *
- *	sbuf.len = sizeof(sbuf.inbuf);
- * 	sbuf.reader = io_new_conn(STDIN_FILENO,
- *				  io_read_partial(sbuf.inbuf, &sbuf.len,
- *						  wake_writer, &sbuf));
- * 	sbuf.writer = io_new_conn(tochild[1], io_idle());
- *
- *	out.max = 128;
- *	out.off = 0;
- *	out.rlen = 128;
- *	out.buf = malloc(out.max);
- *	from_child = io_new_conn(fromchild[0],
- *				 io_read_partial(out.buf, &out.rlen,
- *						 read_from_child, &out));
- * 	if (!sbuf.reader || !sbuf.writer || !from_child)
- * 		err(1, "Allocating connections");
- *
- *	io_set_finish(sbuf.reader, reader_exit, &sbuf);
- *	io_set_finish(sbuf.writer, fail_child_write, &sbuf);
+ * 	// Read from stdin, write to child.
+ * 	memset(&to, 0, sizeof(to));
+ * 	reader = io_new_conn(STDIN_FILENO, read_in(NULL, &to));
+ * 	io_set_finish(reader, finish, &to);
+ * 	io_new_conn(tochild[1], write_out(NULL, &to));
+ *
+ * 	// Read from child, write to stdout.
+ * 	reader = io_new_conn(fromchild[0], read_in(NULL, &from));
+ * 	io_set_finish(reader, finish, &from);
+ * 	io_new_conn(STDOUT_FILENO, write_out(NULL, &from));
  *
  * 	io_loop();
  * 	wait(&status);
  *
- * 	for (off = 0; off < out.off; off += ret) {
- * 		ret = write(STDOUT_FILENO, out.buf+off, out.off-off);
- * 		if (ret < 0)
- * 			err(1, "Writing stdout");
- * 	}
- * 	free(out.buf);
- *
  * 	return WIFEXITED(status) ? WEXITSTATUS(status) : 2;
  * }
  *

+ 1 - 0
ccan/io/backend.h

@@ -91,6 +91,7 @@ bool add_conn(struct io_conn *c);
 bool add_duplex(struct io_conn *c);
 void del_listener(struct io_listener *l);
 void backend_plan_changed(struct io_conn *conn);
+void backend_wait_changed(const void *wait);
 void backend_add_timeout(struct io_conn *conn, struct timespec ts);
 void backend_del_timeout(struct io_conn *conn);
 void backend_del_conn(struct io_conn *conn);

+ 13 - 39
ccan/io/io.c

@@ -24,8 +24,6 @@ bool io_plan_nodebug;
 struct io_conn *current;
 /* User-defined function to select which connection(s) to debug. */
 bool (*io_debug_conn)(struct io_conn *conn);
-/* Set when we wake up an connection we are debugging. */
-bool io_debug_wakeup;
 
 struct io_plan io_debug(struct io_plan plan)
 {
@@ -36,12 +34,9 @@ struct io_plan io_debug(struct io_plan plan)
 		return plan;
 	}
 
-	if (!current || !doing_debug_on(current)) {
-		if (!io_debug_wakeup)
-			return plan;
-	}
+	if (!current || !doing_debug_on(current))
+		return plan;
 
-	io_debug_wakeup = false;
 	current->plan = plan;
 	backend_plan_changed(current);
 
@@ -68,7 +63,7 @@ struct io_plan io_debug(struct io_plan plan)
 
 	/* Return a do-nothing plan, so backend_plan_changed in
 	 * io_ready doesn't do anything (it's already been called). */
-	return io_idle_();
+	return io_wait_(NULL, (void *)1, NULL);
 }
 
 int io_debug_io(int ret)
@@ -107,22 +102,12 @@ int io_debug_io(int ret)
 	return 2;
 }
 
-static void debug_io_wake(struct io_conn *conn)
-{
-	/* We want linear if we wake a debugged connection, too. */
-	if (io_debug_conn && io_debug_conn(conn))
-		io_debug_wakeup = true;
-}
-
 /* Counterpart to io_plan_no_debug(), called in macros in io.h */
 static void io_plan_debug_again(void)
 {
 	io_plan_nodebug = false;
 }
 #else
-static void debug_io_wake(struct io_conn *conn)
-{
-}
 static void io_plan_debug_again(void)
 {
 }
@@ -436,37 +421,26 @@ struct io_plan io_connect_(int fd, const struct addrinfo *addr,
 	return plan;
 }
 
-struct io_plan io_idle_(void)
+struct io_plan io_wait_(const void *wait,
+			struct io_plan (*cb)(struct io_conn *, void*),
+			void *arg)
 {
 	struct io_plan plan;
 
+	assert(cb);
 	plan.pollflag = 0;
 	plan.io = NULL;
-	/* Never called (overridden by io_wake), but NULL means closing */
-	plan.next = (void *)io_idle_;
+	plan.next = cb;
+	plan.next_arg = arg;
 
-	return plan;
-}
+	plan.u1.const_vp = wait;
 
-bool io_is_idle(const struct io_conn *conn)
-{
-	return conn->plan.io == NULL;
+	return plan;
 }
 
-void io_wake_(struct io_conn *conn, struct io_plan plan)
-
+void io_wake(const void *wait)
 {
-	io_plan_debug_again();
-
-	/* It might be closing, but we haven't called its finish() yet. */
-	if (!conn->plan.next)
-		return;
-	/* It was idle, right? */
-	assert(!conn->plan.io);
-	conn->plan = plan;
-	backend_plan_changed(conn);
-
-	debug_io_wake(conn);
+	backend_wait_changed(wait);
 }
 
 void io_ready(struct io_conn *conn)

+ 27 - 30
ccan/io/io.h

@@ -314,6 +314,7 @@ struct io_plan io_write_partial_(const void *data, size_t *len,
 struct io_plan io_always_(struct io_plan (*cb)(struct io_conn *, void *),
 			  void *arg);
 
+
 /**
  * io_connect - plan to connect to a listening socket.
  * @fd: file descriptor.
@@ -360,19 +361,32 @@ struct io_plan io_connect_(int fd, const struct addrinfo *addr,
 			   void *arg);
 
 /**
- * io_idle - plan to do nothing.
+ * io_wait - plan to wait for something.
+ * @wait: the address to wait on.
+ * @cb: function to call after waiting.
+ * @arg: @cb argument
  *
- * This indicates the connection is idle: io_wake() will be called later do
- * give the connection a new plan.
+ * This indicates the connection is idle: io_wake() will be called later to
+ * restart the connection.
  *
  * Example:
  *	struct io_conn *sleeper;
- *	sleeper = io_new_conn(open("/dev/null", O_RDONLY), io_idle());
+ *	unsigned int counter = 0;
+ *	sleeper = io_new_conn(open("/dev/null", O_RDONLY),
+ *			      io_wait(&counter, io_close_cb, NULL));
  *	if (!sleeper)
  *		exit(1);
  */
-#define io_idle() io_debug(io_idle_())
-struct io_plan io_idle_(void);
+#define io_wait(wait, cb, arg)						\
+	io_debug(io_wait_(wait,						\
+			  typesafe_cb_preargs(struct io_plan, void *,	\
+					      (cb), (arg),		\
+					      struct io_conn *),	\
+			  (arg)))
+
+struct io_plan io_wait_(const void *wait,
+			struct io_plan (*cb)(struct io_conn *, void *),
+			void *arg);
 
 /**
  * io_timeout - set timeout function if the callback doesn't complete.
@@ -440,35 +454,18 @@ bool io_timeout_(struct io_conn *conn, struct timespec ts,
 struct io_conn *io_duplex_(struct io_conn *conn, struct io_plan plan);
 
 /**
- * io_wake - wake up an idle connection.
- * @conn: an idle connection.
- * @plan: the next I/O plan for @conn.
- *
- * This makes @conn ready to do I/O the next time around the io_loop().
+ * io_wake - wake up any connections waiting on @wait
+ * @wait: the address to trigger.
  *
  * Example:
- *	struct io_conn *sleeper;
- *	sleeper = io_new_conn(open("/dev/null", O_RDONLY), io_idle());
- *
- *	io_wake(sleeper, io_write("junk", 4, io_close_cb, NULL));
- */
-#define io_wake(conn, plan) (io_plan_no_debug(), io_wake_((conn), (plan)))
-void io_wake_(struct io_conn *conn, struct io_plan plan);
-
-/**
- * io_is_idle - is a connection idle?
- *
- * This can be useful for complex protocols, eg. where you want a connection
- * to send something, so you queue it and wake it if it's idle.
+ *	unsigned int wait;
  *
- * Example:
- *	struct io_conn *sleeper;
- *	sleeper = io_new_conn(open("/dev/null", O_RDONLY), io_idle());
+ *	io_new_conn(open("/dev/null", O_RDONLY),
+ *		   io_wait(&wait, io_close_cb, NULL));
  *
- *	assert(io_is_idle(sleeper));
- *	io_wake(sleeper, io_write("junk", 4, io_close_cb, NULL));
+ *	io_wake(&wait);
  */
-bool io_is_idle(const struct io_conn *conn);
+void io_wake(const void *wait);
 
 /**
  * io_break - return from io_loop()

+ 28 - 0
ccan/io/poll.c

@@ -167,6 +167,34 @@ void backend_plan_changed(struct io_conn *conn)
 		some_always = true;
 }
 
+void backend_wait_changed(const void *wait)
+{
+	unsigned int i;
+
+	for (i = 0; i < num_fds; i++) {
+		struct io_conn *c, *duplex;
+
+		/* Ignore listeners */
+		if (fds[i]->listener)
+			continue;
+		c = (void *)fds[i];
+		for (duplex = c->duplex; c; c = duplex, duplex = NULL) {
+			/* Ignore closing. */
+			if (!c->plan.next)
+				continue;
+			/* Not idle? */
+			if (c->plan.io)
+				continue;
+			/* Waiting on something else? */
+			if (c->plan.u1.const_vp != wait)
+				continue;
+			/* Make it do the next thing. */
+			c->plan = io_always_(c->plan.next, c->plan.next_arg);
+			backend_plan_changed(c);
+		}
+	}
+}
+
 bool add_conn(struct io_conn *c)
 {
 	if (!add_fd(&c->fd, c->plan.pollflag & (POLLIN|POLLOUT)))

+ 8 - 4
ccan/io/test/run-06-idle.c

@@ -29,8 +29,7 @@ static struct io_plan read_done(struct io_conn *conn, struct data *d)
 
 static void finish_waker(struct io_conn *conn, struct data *d)
 {
-	ok1(io_is_idle(idler));
-	io_wake(idler, io_read(d->buf, sizeof(d->buf), read_done, d));
+	io_wake(d);
 	ok1(d->state == 1);
 	d->state++;
 }
@@ -47,13 +46,18 @@ static struct io_plan never(struct io_conn *conn, void *arg)
 	abort();
 }
 
+static struct io_plan read_buf(struct io_conn *conn, struct data *d)
+{
+	return io_read(d->buf, sizeof(d->buf), read_done, d);
+}
+
 static void init_conn(int fd, struct data *d)
 {
 	int fd2;
 
 	ok1(d->state == 0);
 	d->state++;
-	idler = io_new_conn(fd, io_idle());
+	idler = io_new_conn(fd, io_wait(d, read_buf, d));
 	io_set_finish(idler, finish_idle, d);
 
 	/* This will wake us up, as read will fail. */
@@ -103,7 +107,7 @@ int main(void)
 	int fd, status;
 
 	/* This is how many tests you plan to run */
-	plan_tests(14);
+	plan_tests(13);
 	d->state = 0;
 	fd = make_listen_fd(PORT, &addrinfo);
 	ok1(fd >= 0);

+ 6 - 1
ccan/io/test/run-08-hangup-on-idle.c

@@ -15,6 +15,11 @@ static struct io_plan timeout_wakeup(struct io_conn *conn, char *buf)
 	return io_read(buf, 16, io_close_cb, NULL);
 }
 
+static struct io_plan never(struct io_conn *conn, void *unused)
+{
+	abort();
+}
+
 int main(void)
 {
 	int fds[2];
@@ -28,7 +33,7 @@ int main(void)
 	/* Write then close. */
 	io_new_conn(fds[1], io_write("hello there world", 16,
 				     io_close_cb, NULL));
-	conn = io_new_conn(fds[0], io_idle());
+	conn = io_new_conn(fds[0], io_wait(buf, never, NULL));
 
 	/* To avoid assert(num_waiting) */
 	ok1(pipe(fds2) == 0);

+ 7 - 2
ccan/io/test/run-08-read-after-hangup.c

@@ -11,10 +11,15 @@ static char inbuf[8];
 
 static struct io_plan wake_it(struct io_conn *conn, struct io_conn *reader)
 {
-	io_wake(reader, io_read(inbuf, 8, io_close_cb, NULL));
+	io_wake(inbuf);
 	return io_close();
 }
 
+static struct io_plan read_buf(struct io_conn *conn, void *unused)
+{
+	return io_read(inbuf, 8, io_close_cb, NULL);
+}
+
 int main(void)
 {
 	int fds[2];
@@ -23,7 +28,7 @@ int main(void)
 	plan_tests(3);
 
 	ok1(pipe(fds) == 0);
-	conn = io_new_conn(fds[0], io_idle());
+	conn = io_new_conn(fds[0], io_wait(inbuf, read_buf, NULL));
 	io_new_conn(fds[1], io_write("EASYTEST", 8, wake_it, conn));
 
 	ok1(io_loop() == NULL);

+ 22 - 15
ccan/io/test/run-10-many.c

@@ -16,6 +16,12 @@ struct buffer {
 };
 
 static struct io_plan poke_reader(struct io_conn *conn, struct buffer *buf);
+static struct io_plan poke_writer(struct io_conn *conn, struct buffer *buf);
+
+static struct io_plan read_buf(struct io_conn *conn, struct buffer *buf)
+{
+	return io_read(&buf->buf, sizeof(buf->buf), poke_writer, buf);
+}
 
 static struct io_plan poke_writer(struct io_conn *conn, struct buffer *buf)
 {
@@ -25,25 +31,28 @@ static struct io_plan poke_writer(struct io_conn *conn, struct buffer *buf)
 		return io_close();
 
 	/* You write. */
-	io_wake(buf->writer,
-		io_write(&buf->buf, sizeof(buf->buf), poke_reader, buf));
+	io_wake(&buf->writer);
 
 	/* I'll wait until you wake me. */
-	return io_idle();
+	return io_wait(&buf->reader, read_buf, buf);
+}
+
+static struct io_plan write_buf(struct io_conn *conn, struct buffer *buf)
+{
+	return io_write(&buf->buf, sizeof(buf->buf), poke_reader, buf);
 }
 
 static struct io_plan poke_reader(struct io_conn *conn, struct buffer *buf)
 {
 	assert(conn == buf->writer);
 	/* You read. */
-	io_wake(buf->reader,
-		io_read(&buf->buf, sizeof(buf->buf), poke_writer, buf));
+	io_wake(&buf->reader);
 
 	if (++buf->iters == NUM_ITERS)
 		return io_close();
 
 	/* I'll wait until you tell me to write. */
-	return io_idle();
+	return io_wait(&buf->writer, write_buf, buf);
 }
 
 static struct buffer buf[NUM];
@@ -66,13 +75,12 @@ int main(void)
 		sprintf(buf[i].buf, "%i-%i", i, i);
 
 		/* Wait for writer to tell us to read. */
-		buf[i].reader = io_new_conn(last_read, io_idle());
+		buf[i].reader = io_new_conn(last_read,
+					    io_wait(&buf[i].reader, read_buf,
+						    &buf[i]));
 		if (!buf[i].reader)
 			break;
-		buf[i].writer = io_new_conn(fds[1],
-					    io_write(&buf[i].buf,
-						     sizeof(buf[i].buf),
-						     poke_reader, &buf[i]));
+		buf[i].writer = io_new_conn(fds[1], write_buf(NULL, &buf[i]));
 		if (!buf[i].writer)
 			break;
 		last_read = fds[0];
@@ -83,11 +91,10 @@ int main(void)
 	/* Last one completes the cirle. */
 	i = 0;
 	sprintf(buf[i].buf, "%i-%i", i, i);
-	buf[i].reader = io_new_conn(last_read, io_idle());
+	buf[i].reader = io_new_conn(last_read,
+				    io_wait(&buf[i].reader, read_buf, &buf[i]));
 	ok1(buf[i].reader);
-	buf[i].writer = io_new_conn(last_write,
-				    io_write(&buf[i].buf, sizeof(buf[i].buf),
-					     poke_reader, &buf[i]));
+	buf[i].writer = io_new_conn(last_write, write_buf(NULL, &buf[i]));
 	ok1(buf[i].writer);
 
 	/* They should eventually exit */

+ 1 - 1
ccan/io/test/run-13-all-idle.c

@@ -17,7 +17,7 @@ int main(void)
 		int fds[2];
 
 		ok1(pipe(fds) == 0);
-		io_new_conn(fds[0], io_idle());
+		io_new_conn(fds[0], io_wait(&status, io_close_cb, NULL));
 		io_loop();
 		exit(1);
 	}

+ 1 - 1
ccan/io/test/run-16-duplex-test.c

@@ -28,7 +28,7 @@ static void finish_ok(struct io_conn *conn, struct data *d)
 static struct io_plan write_done(struct io_conn *conn, struct data *d)
 {
 	d->state++;
-	return io_idle();
+	return io_wait(d, io_close_cb, NULL);
 }
 
 static struct io_plan read_done(struct io_conn *conn, struct data *d)