Browse Source

io: io_always, and zero-length operations support.

A zero-length read should complete immediately, even if the fd isn't readable.
Wire this up, and expose it for callers to use.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
Rusty Russell 12 years ago
parent
commit
12ab811533
7 changed files with 240 additions and 5 deletions
  1. 4 0
      ccan/io/backend.h
  2. 37 1
      ccan/io/io.c
  3. 23 0
      ccan/io/io.h
  4. 34 3
      ccan/io/poll.c
  5. 1 1
      ccan/io/test/run-18-errno.c
  6. 8 0
      ccan/io/test/run-19-always-DEBUG.c
  7. 133 0
      ccan/io/test/run-19-always.c

+ 4 - 0
ccan/io/backend.h

@@ -3,6 +3,10 @@
 #define CCAN_IO_BACKEND_H
 #include <stdbool.h>
 #include <ccan/timer/timer.h>
+#include <poll.h>
+
+/* A setting for actions to always run (eg. zero-length reads). */
+#define POLLALWAYS (((POLLIN|POLLOUT) + 1) & ~((POLLIN|POLLOUT)))
 
 struct io_alloc {
 	void *(*alloc)(size_t size);

+ 37 - 1
ccan/io/io.c

@@ -8,7 +8,6 @@
 #include <errno.h>
 #include <stdlib.h>
 #include <assert.h>
-#include <poll.h>
 #include <unistd.h>
 #include <fcntl.h>
 
@@ -232,6 +231,26 @@ bool io_timeout_(struct io_conn *conn, struct timespec ts,
 	return true;
 }
 
+/* Always done: call the next thing. */
+static int do_always(int fd, struct io_plan *plan)
+{
+	return 1;
+}
+
+struct io_plan io_always_(struct io_plan (*cb)(struct io_conn *, void *),
+			  void *arg)
+{
+	struct io_plan plan;
+
+	assert(cb);
+	plan.io = do_always;
+	plan.next = cb;
+	plan.next_arg = arg;
+	plan.pollflag = POLLALWAYS;
+
+	return plan;
+}
+
 /* Returns true if we're finished. */
 static int do_write(int fd, struct io_plan *plan)
 {
@@ -252,6 +271,10 @@ struct io_plan io_write_(const void *data, size_t len,
 	struct io_plan plan;
 
 	assert(cb);
+
+	if (len == 0)
+		return io_always_(cb, arg);
+
 	plan.u1.const_vp = data;
 	plan.u2.s = len;
 	plan.io = do_write;
@@ -281,11 +304,16 @@ struct io_plan io_read_(void *data, size_t len,
 	struct io_plan plan;
 
 	assert(cb);
+
+	if (len == 0)
+		return io_always_(cb, arg);
+
 	plan.u1.cp = data;
 	plan.u2.s = len;
 	plan.io = do_read;
 	plan.next = cb;
 	plan.next_arg = arg;
+
 	plan.pollflag = POLLIN;
 
 	return plan;
@@ -309,6 +337,10 @@ struct io_plan io_read_partial_(void *data, size_t *len,
 	struct io_plan plan;
 
 	assert(cb);
+
+	if (*len == 0)
+		return io_always_(cb, arg);
+
 	plan.u1.cp = data;
 	plan.u2.vp = len;
 	plan.io = do_read_partial;
@@ -337,6 +369,10 @@ struct io_plan io_write_partial_(const void *data, size_t *len,
 	struct io_plan plan;
 
 	assert(cb);
+
+	if (*len == 0)
+		return io_always_(cb, arg);
+
 	plan.u1.const_vp = data;
 	plan.u2.vp = len;
 	plan.io = do_write_partial;

+ 23 - 0
ccan/io/io.h

@@ -290,6 +290,29 @@ struct io_plan io_write_partial_(const void *data, size_t *len,
 				 struct io_plan (*cb)(struct io_conn *, void*),
 				 void *arg);
 
+/**
+ * io_always - plan to immediately call next callback.
+ * @cb: function to call.
+ * @arg: @cb argument
+ *
+ * Sometimes it's neater to plan a callback rather than call it directly;
+ * for example, if you only need to read data for one path and not another.
+ *
+ * Example:
+ * static void start_conn_with_nothing(int fd)
+ * {
+ *	// Silly example: close on next time around loop.
+ *	io_new_conn(fd, io_always(io_close_cb, NULL));
+ * }
+ */
+#define io_always(cb, arg)						\
+	io_debug(io_always_(typesafe_cb_preargs(struct io_plan, void *,	\
+						(cb), (arg),		\
+						struct io_conn *),	\
+			    (arg)))
+struct io_plan io_always_(struct io_plan (*cb)(struct io_conn *, void *),
+			  void *arg);
+
 /**
  * io_connect - plan to connect to a listening socket.
  * @fd: file descriptor.

+ 34 - 3
ccan/io/poll.c

@@ -10,6 +10,7 @@
 #include <errno.h>
 
 static size_t num_fds = 0, max_fds = 0, num_closing = 0, num_waiting = 0;
+static bool some_always = false;
 static struct pollfd *pollfds = NULL;
 static struct fd **fds = NULL;
 static struct timers timeouts;
@@ -146,9 +147,9 @@ void backend_plan_changed(struct io_conn *conn)
 	if (pfd->events)
 		num_waiting--;
 
-	pfd->events = conn->plan.pollflag;
+	pfd->events = conn->plan.pollflag & (POLLIN|POLLOUT);
 	if (conn->duplex) {
-		int mask = conn->duplex->plan.pollflag;
+		int mask = conn->duplex->plan.pollflag & (POLLIN|POLLOUT);
 		/* You can't *both* read/write. */
 		assert(!mask || pfd->events != mask);
 		pfd->events |= mask;
@@ -161,15 +162,20 @@ void backend_plan_changed(struct io_conn *conn)
 
 	if (!conn->plan.next)
 		num_closing++;
+
+	if (conn->plan.pollflag == POLLALWAYS)
+		some_always = true;
 }
 
 bool add_conn(struct io_conn *c)
 {
-	if (!add_fd(&c->fd, c->plan.pollflag))
+	if (!add_fd(&c->fd, c->plan.pollflag & (POLLIN|POLLOUT)))
 		return false;
 	/* Immediate close is allowed. */
 	if (!c->plan.next)
 		num_closing++;
+	if (c->plan.pollflag == POLLALWAYS)
+		some_always = true;
 	return true;
 }
 
@@ -267,6 +273,26 @@ void backend_del_timeout(struct io_conn *conn)
 	conn->timeout->conn = NULL;
 }
 
+static void handle_always(void)
+{
+	int i;
+
+	some_always = false;
+
+	for (i = 0; i < num_fds && !io_loop_return; i++) {
+		struct io_conn *c = (void *)fds[i];
+
+		if (fds[i]->listener)
+			continue;
+
+		if (c->plan.pollflag == POLLALWAYS)
+			io_ready(c);
+
+		if (c->duplex && c->duplex->plan.pollflag == POLLALWAYS)
+			io_ready(c->duplex);
+	}
+}
+
 /* This is the main loop. */
 void *do_io_loop(struct io_conn **ready)
 {
@@ -317,6 +343,11 @@ void *do_io_loop(struct io_conn **ready)
 		if (doing_debug() && some_timeouts)
 			continue;
 
+		if (some_always) {
+			handle_always();
+			continue;
+		}
+
 		if (num_fds == 0)
 			break;
 

+ 1 - 1
ccan/io/test/run-18-errno.c

@@ -36,7 +36,7 @@ static void init_conn(int fd, int *state)
 		(*state)++;
 		close(fd);
 		errno = 0;
-		io_set_finish(io_new_conn(fd, io_read(state, 0,
+		io_set_finish(io_new_conn(fd, io_read(state, 1,
 						      io_close_cb, NULL)),
 			      finish_EBADF, state);
 	}

+ 8 - 0
ccan/io/test/run-19-always-DEBUG.c

@@ -0,0 +1,8 @@
+#define DEBUG
+#define PORT "64019"
+#define main real_main
+int real_main(void);
+#include "run-19-always.c"
+#undef main
+static bool always_debug(struct io_conn *conn) { return true; }
+int main(void) { io_debug_conn = always_debug; return real_main(); }

+ 133 - 0
ccan/io/test/run-19-always.c

@@ -0,0 +1,133 @@
+#include <ccan/io/io.h>
+/* Include the C files directly. */
+#include <ccan/io/poll.c>
+#include <ccan/io/io.c>
+#include <ccan/tap/tap.h>
+#include <sys/wait.h>
+#include <stdio.h>
+
+#ifndef PORT
+#define PORT "65019"
+#endif
+
+struct data {
+	int state;
+	size_t bytes;
+	char *buf;
+};
+
+static void finish_ok(struct io_conn *conn, struct data *d)
+{
+	ok1(d->state == 1);
+	d->state++;
+	io_break(d, io_idle());
+}
+
+static struct io_plan write_buf(struct io_conn *conn, struct data *d)
+{
+	return io_write(d->buf, d->bytes, io_close_cb, d);
+}
+
+static void init_conn(int fd, struct data *d)
+{
+	ok1(d->state == 0);
+	d->state++;
+	/* Empty read should run immediately... */
+	io_set_finish(io_new_conn(fd, io_read(NULL, 0, write_buf, d)),
+		      finish_ok, d);
+}
+
+static int make_listen_fd(const char *port, struct addrinfo **info)
+{
+	int fd, on = 1;
+	struct addrinfo *addrinfo, hints;
+
+	memset(&hints, 0, sizeof(hints));
+	hints.ai_family = AF_UNSPEC;
+	hints.ai_socktype = SOCK_STREAM;
+	hints.ai_flags = AI_PASSIVE;
+	hints.ai_protocol = 0;
+
+	if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
+		return -1;
+
+	fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+		    addrinfo->ai_protocol);
+	if (fd < 0)
+		return -1;
+
+	setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+	if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
+		close(fd);
+		return -1;
+	}
+	if (listen(fd, 1) != 0) {
+		close(fd);
+		return -1;
+	}
+	*info = addrinfo;
+	return fd;
+}
+
+static void read_from_socket(size_t bytes, const struct addrinfo *addrinfo)
+{
+	int fd, done, r;
+	char buf[100];
+
+	fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
+		    addrinfo->ai_protocol);
+	if (fd < 0)
+		exit(1);
+	if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
+		exit(2);
+
+	for (done = 0; done < bytes; done += r) {
+		r = read(fd, buf, sizeof(buf));
+		if (r < 0)
+			exit(3);
+		done += r;
+	}
+	close(fd);
+}
+
+int main(void)
+{
+	struct data *d = malloc(sizeof(*d));
+	struct addrinfo *addrinfo;
+	struct io_listener *l;
+	int fd, status;
+
+	/* This is how many tests you plan to run */
+	plan_tests(9);
+	d->state = 0;
+	d->bytes = 1024*1024;
+	d->buf = malloc(d->bytes);
+	memset(d->buf, 'a', d->bytes);
+	fd = make_listen_fd(PORT, &addrinfo);
+	ok1(fd >= 0);
+	l = io_new_listener(fd, init_conn, d);
+	ok1(l);
+	fflush(stdout);
+	if (!fork()) {
+		io_close_listener(l);
+		read_from_socket(d->bytes, addrinfo);
+		freeaddrinfo(addrinfo);
+		free(d->buf);
+		free(d);
+		exit(0);
+	}
+	ok1(io_loop() == d);
+	ok1(d->state == 2);
+
+	ok1(wait(&status));
+	ok1(WIFEXITED(status));
+	ok1(WEXITSTATUS(status) == 0);
+
+	freeaddrinfo(addrinfo);
+	free(d->buf);
+	free(d);
+	io_close_listener(l);
+
+	/* This exits depending on whether all tests passed */
+	return exit_status();
+}