Browse Source

RPC: Rewrite io_data to cleanly handle unlimited sized responses

Luke Dashjr 12 years ago
parent
commit
caf0700ce8
2 changed files with 90 additions and 148 deletions
  1. 8 0
      README.RPC
  2. 82 148
      api.c

+ 8 - 0
README.RPC

@@ -427,6 +427,14 @@ api-example.py - a Python script to access the API
 Feature Changelog for external applications using the API:
 
 
+API V1.25.3
+
+Removed output limitation:
+ All replies can now be longer than the previous limitation of 64k, and will
+  only be truncated on a 50ms timeout sending.
+
+----------
+
 API V1.25.2
 
 Modified API commands:

+ 82 - 148
api.c

@@ -556,104 +556,89 @@ extern struct device_drv cpu_drv;
 #endif
 
 struct io_data {
-	size_t siz;
-	char *ptr;
-	char *cur;
-	bool sock;
-	bool full;
+	bytes_t data;
+	SOCKETTYPE sock;
+	
+	// Whether to add various things
 	bool close;
 };
-
-struct io_list {
-	struct io_data *io_data;
-	struct io_list *prev;
-	struct io_list *next;
-};
-
-static struct io_list *io_head = NULL;
-
-#define io_new(init) _io_new(init, false)
-#define sock_io_new() _io_new(SOCKBUFSIZ, true)
+static struct io_data *rpc_io_data;
 
 static void io_reinit(struct io_data *io_data)
 {
-	io_data->cur = io_data->ptr;
-	*(io_data->ptr) = '\0';
-	io_data->full = false;
+	bytes_reset(&io_data->data);
 	io_data->close = false;
 }
 
-static struct io_data *_io_new(size_t initial, bool socket_buf)
+static
+struct io_data *sock_io_new()
 {
-	struct io_data *io_data;
-	struct io_list *io_list;
-
-	io_data = malloc(sizeof(*io_data));
-	io_data->ptr = malloc(initial);
-	io_data->siz = initial;
-	io_data->sock = socket_buf;
+	struct io_data *io_data = malloc(sizeof(struct io_data));
+	bytes_init(&io_data->data);
+	io_data->sock = INVSOCK;
 	io_reinit(io_data);
-
-	io_list = malloc(sizeof(*io_list));
-
-	io_list->io_data = io_data;
-
-	if (io_head) {
-		io_list->next = io_head;
-		io_list->prev = io_head->prev;
-		io_list->next->prev = io_list;
-		io_list->prev->next = io_list;
-	} else {
-		io_list->prev = io_list;
-		io_list->next = io_list;
-		io_head = io_list;
-	}
-
 	return io_data;
 }
 
-static bool io_add(struct io_data *io_data, char *buf)
+static
+size_t io_flush(struct io_data *io_data, bool complete)
 {
-	size_t len, dif, tot;
-
-	if (io_data->full)
-		return false;
-
-	len = strlen(buf);
-	dif = io_data->cur - io_data->ptr;
-	tot = len + 1 + dif;
-
-	if (tot > io_data->siz) {
-		size_t new = io_data->siz * 2;
-
-		if (new < tot)
-			new = tot * 2;
-
-		if (io_data->sock) {
-			if (new > SOCKBUFSIZ) {
-				if (tot > SOCKBUFSIZ) {
-					io_data->full = true;
-					return false;
-				}
-
-				new = SOCKBUFSIZ;
-			}
+	size_t sent = 0, tosend = bytes_len(&io_data->data);
+	ssize_t n;
+	struct timeval timeout = {0, complete ? 50000: 0}, tv;
+	fd_set wd;
+	int count = 0;
+	
+	while (tosend)
+	{
+		FD_ZERO(&wd);
+		FD_SET(io_data->sock, &wd);
+		tv = timeout;
+		if (select(io_data->sock + 1, NULL, &wd, NULL, &tv) < 1)
+			break;
+		
+		n = send(io_data->sock, &bytes_buf(&io_data->data)[sent], tosend, 0);
+		if (SOCKETFAIL(n))
+		{
+			if (!sock_blocks())
+				applog(LOG_WARNING, "API: send (%d) failed: %s", tosend, SOCKERRMSG);
+			break;
 		}
-
-		io_data->ptr = realloc(io_data->ptr, new);
-		io_data->cur = io_data->ptr + dif;
-		io_data->siz = new;
+		if (count <= 1)
+		{
+			if (n == tosend)
+				applog(LOG_DEBUG, "API: sent all of %d first go", tosend);
+			else
+				applog(LOG_DEBUG, "API: sent %d of %d first go", n, tosend);
+		}
+		else
+		{
+			if (n == tosend)
+				applog(LOG_DEBUG, "API: sent all of remaining %d (count=%d)", tosend, count);
+			else
+				applog(LOG_DEBUG, "API: sent %d of remaining %d (count=%d)", n, tosend, count);
+		}
+		sent += n;
+		tosend -= n;
 	}
+	
+	bytes_shift(&io_data->data, sent);
+	
+	return sent;
+}
 
-	memcpy(io_data->cur, buf, len + 1);
-	io_data->cur += len;
-
+static bool io_add(struct io_data *io_data, char *buf)
+{
+	size_t len = strlen(buf);
+	if (bytes_len(&io_data->data) + len > SOCKBUFSIZ)
+		io_flush(io_data, false);
+	bytes_append(&io_data->data, buf, len);
 	return true;
 }
 
 static bool io_put(struct io_data *io_data, char *buf)
 {
-	io_reinit(io_data);
+	bytes_reset(&io_data->data);
 	return io_add(io_data, buf);
 }
 
@@ -664,21 +649,9 @@ static void io_close(struct io_data *io_data)
 
 static void io_free()
 {
-	struct io_list *io_list, *io_next;
-
-	if (io_head) {
-		io_list = io_head;
-		do {
-			io_next = io_list->next;
-
-			free(io_list->io_data);
-			free(io_list);
-
-			io_list = io_next;
-		} while (io_list != io_head);
-
-		io_head = NULL;
-	}
+	bytes_free(&rpc_io_data->data);
+	free(rpc_io_data);
+	rpc_io_data = NULL;
 }
 
 // This is only called when expected to be needed (rarely)
@@ -3317,64 +3290,23 @@ static void checkcommand(struct io_data *io_data, __maybe_unused SOCKETTYPE c, c
 
 static void send_result(struct io_data *io_data, SOCKETTYPE c, bool isjson)
 {
-	char buf[SOCKBUFSIZ + sizeof(JSON_CLOSE) + sizeof(JSON_END)];
-	int count, res, tosend, len, n;
-
-	strcpy(buf, io_data->ptr);
-
 	if (io_data->close)
-		strcat(buf, JSON_CLOSE);
-
-	if (isjson) {
-		if (io_data->full)
-			strcat(buf, JSON_END_TRUNCATED);
-		else
-			strcat(buf, JSON_END);
-	}
-
-	len = strlen(buf);
-	tosend = len+1;
-
-	applog(LOG_DEBUG, "API: send reply: (%d) '%.10s%s'", tosend, buf, len > 10 ? "..." : BLANK);
-
-	count = 0;
-	while (count++ < 5 && tosend > 0) {
-		// allow 50ms per attempt
-		struct timeval timeout = {0, 50000};
-		fd_set wd;
-
-		FD_ZERO(&wd);
-		FD_SET(c, &wd);
-		if ((res = select(c + 1, NULL, &wd, NULL, &timeout)) < 1) {
-			applog(LOG_WARNING, "API: send select failed (%d)", res);
-			return;
-		}
-
-		n = send(c, buf, tosend, 0);
-
-		if (SOCKETFAIL(n)) {
-			if (sock_blocks())
-				continue;
-
-			applog(LOG_WARNING, "API: send (%d) failed: %s", tosend, SOCKERRMSG);
-
-			return;
-		} else {
-			if (count <= 1) {
-				if (n == tosend)
-					applog(LOG_DEBUG, "API: sent all of %d first go", tosend);
-				else
-					applog(LOG_DEBUG, "API: sent %d of %d first go", n, tosend);
-			} else {
-				if (n == tosend)
-					applog(LOG_DEBUG, "API: sent all of remaining %d (count=%d)", tosend, count);
-				else
-					applog(LOG_DEBUG, "API: sent %d of remaining %d (count=%d)", n, tosend, count);
-			}
-
-			tosend -= n;
-		}
-	}
+		io_add(io_data, JSON_CLOSE);
+	
+	if (isjson)
+		io_add(io_data, JSON_END);
+	
+	bytes_nullterminate(&io_data->data);
+	applog(LOG_DEBUG, "API: send reply: (%ld) '%.10s%s'",
+	       (long)bytes_len(&io_data->data),
+	       bytes_buf(&io_data->data),
+	       bytes_len(&io_data->data) > 10 ? "..." : BLANK);
+	
+	io_flush(io_data, true);
+	
+	if (bytes_len(&io_data->data))
+		applog(LOG_WARNING, "RPC: Timed out with %ld bytes left to send",
+		       (long)bytes_len(&io_data->data));
 }
 
 static void tidyup(__maybe_unused void *arg)
@@ -3709,6 +3641,7 @@ void api(int api_thr_id)
 		return;
 	}
 
+	rpc_io_data =
 	io_data = sock_io_new();
 
 	mutex_init(&quit_restart_lock);
@@ -3849,6 +3782,7 @@ void api(int api_thr_id)
 				// the time of the request in now
 				when = time(NULL);
 				io_reinit(io_data);
+				io_data->sock = c;
 
 				did = false;