Browse Source

Merge branch 'rpc_longreply' into bfgminer

Conflicts:
	README.RPC
Luke Dashjr 12 years ago
parent
commit
920caba94a
4 changed files with 110 additions and 156 deletions
  1. 4 0
      README.RPC
  2. 14 3
      api-example.c
  3. 10 5
      api-example.py
  4. 82 148
      api.c

+ 4 - 0
README.RPC

@@ -432,6 +432,10 @@ API V1.25.3
 Modified API commands:
 Modified API commands:
  'setconfig' - add 'http-port' number
  'setconfig' - add 'http-port' number
 
 
+Removed output limitation:
+ All replies can now be longer than the previous limitation of 64k, and will
+  only be truncated on a 50ms timeout sending.
+
 ----------
 ----------
 
 
 API V1.25.2
 API V1.25.2

+ 14 - 3
api-example.c

@@ -9,6 +9,7 @@
 
 
 #include "config.h"
 #include "config.h"
 
 
+#include <assert.h>
 #include <stdio.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <stdlib.h>
 #include <string.h>
 #include <string.h>
@@ -186,13 +187,15 @@ void display(char *buf)
 
 
 int callapi(char *command, char *host, short int port)
 int callapi(char *command, char *host, short int port)
 {
 {
-	char buf[RECVSIZE+1];
+	size_t bufsz = RECVSIZE;
+	char *buf = malloc(bufsz+1);
 	struct hostent *ip;
 	struct hostent *ip;
 	struct sockaddr_in serv;
 	struct sockaddr_in serv;
 	SOCKETTYPE sock;
 	SOCKETTYPE sock;
 	int ret = 0;
 	int ret = 0;
 	int n, p;
 	int n, p;
 
 
+	assert(buf);
 	SOCKETINIT;
 	SOCKETINIT;
 
 
 	ip = gethostbyname(host);
 	ip = gethostbyname(host);
@@ -221,8 +224,16 @@ int callapi(char *command, char *host, short int port)
 	else {
 	else {
 		p = 0;
 		p = 0;
 		buf[0] = '\0';
 		buf[0] = '\0';
-		while (p < RECVSIZE) {
-			n = recv(sock, &buf[p], RECVSIZE - p , 0);
+		while (true)
+		{
+			if (bufsz < RECVSIZE + p)
+			{
+				bufsz *= 2;
+				buf = realloc(buf, bufsz);
+				assert(buf);
+			}
+			
+			n = recv(sock, &buf[p], RECVSIZE, 0);
 
 
 			if (SOCKETFAIL(n)) {
 			if (SOCKETFAIL(n)) {
 				printf("Recv failed: %s\n", SOCKERRMSG);
 				printf("Recv failed: %s\n", SOCKERRMSG);

+ 10 - 5
api-example.py

@@ -39,11 +39,16 @@ except socket.error, e:
     logging.error(e)
     logging.error(e)
 
 
 
 
-data = None
-try:
-    data = s.recv(1024)
-except socket.error, e:
-    logging.error(e)
+data = ''
+while True:
+    try:
+        newdata = s.recv(1024)
+        if newdata:
+            data += newdata
+        else:
+            break
+    except socket.error, e:
+        break
 
 
 try:
 try:
     s.close()
     s.close()

+ 82 - 148
api.c

@@ -559,104 +559,89 @@ extern struct device_drv cpu_drv;
 #endif
 #endif
 
 
 struct io_data {
 struct io_data {
-	size_t siz;
-	char *ptr;
-	char *cur;
-	bool sock;
-	bool full;
+	bytes_t data;
+	SOCKETTYPE sock;
+	
+	// Whether to add various things
 	bool close;
 	bool close;
 };
 };
-
-struct io_list {
-	struct io_data *io_data;
-	struct io_list *prev;
-	struct io_list *next;
-};
-
-static struct io_list *io_head = NULL;
-
-#define io_new(init) _io_new(init, false)
-#define sock_io_new() _io_new(SOCKBUFSIZ, true)
+static struct io_data *rpc_io_data;
 
 
 static void io_reinit(struct io_data *io_data)
 static void io_reinit(struct io_data *io_data)
 {
 {
-	io_data->cur = io_data->ptr;
-	*(io_data->ptr) = '\0';
-	io_data->full = false;
+	bytes_reset(&io_data->data);
 	io_data->close = false;
 	io_data->close = false;
 }
 }
 
 
-static struct io_data *_io_new(size_t initial, bool socket_buf)
+static
+struct io_data *sock_io_new()
 {
 {
-	struct io_data *io_data;
-	struct io_list *io_list;
-
-	io_data = malloc(sizeof(*io_data));
-	io_data->ptr = malloc(initial);
-	io_data->siz = initial;
-	io_data->sock = socket_buf;
+	struct io_data *io_data = malloc(sizeof(struct io_data));
+	bytes_init(&io_data->data);
+	io_data->sock = INVSOCK;
 	io_reinit(io_data);
 	io_reinit(io_data);
-
-	io_list = malloc(sizeof(*io_list));
-
-	io_list->io_data = io_data;
-
-	if (io_head) {
-		io_list->next = io_head;
-		io_list->prev = io_head->prev;
-		io_list->next->prev = io_list;
-		io_list->prev->next = io_list;
-	} else {
-		io_list->prev = io_list;
-		io_list->next = io_list;
-		io_head = io_list;
-	}
-
 	return io_data;
 	return io_data;
 }
 }
 
 
-static bool io_add(struct io_data *io_data, char *buf)
+static
+size_t io_flush(struct io_data *io_data, bool complete)
 {
 {
-	size_t len, dif, tot;
-
-	if (io_data->full)
-		return false;
-
-	len = strlen(buf);
-	dif = io_data->cur - io_data->ptr;
-	tot = len + 1 + dif;
-
-	if (tot > io_data->siz) {
-		size_t new = io_data->siz * 2;
-
-		if (new < tot)
-			new = tot * 2;
-
-		if (io_data->sock) {
-			if (new > SOCKBUFSIZ) {
-				if (tot > SOCKBUFSIZ) {
-					io_data->full = true;
-					return false;
-				}
-
-				new = SOCKBUFSIZ;
-			}
+	size_t sent = 0, tosend = bytes_len(&io_data->data);
+	ssize_t n;
+	struct timeval timeout = {0, complete ? 50000: 0}, tv;
+	fd_set wd;
+	int count = 0;
+	
+	while (tosend)
+	{
+		FD_ZERO(&wd);
+		FD_SET(io_data->sock, &wd);
+		tv = timeout;
+		if (select(io_data->sock + 1, NULL, &wd, NULL, &tv) < 1)
+			break;
+		
+		n = send(io_data->sock, &bytes_buf(&io_data->data)[sent], tosend, 0);
+		if (SOCKETFAIL(n))
+		{
+			if (!sock_blocks())
+				applog(LOG_WARNING, "API: send (%d) failed: %s", tosend, SOCKERRMSG);
+			break;
 		}
 		}
-
-		io_data->ptr = realloc(io_data->ptr, new);
-		io_data->cur = io_data->ptr + dif;
-		io_data->siz = new;
+		if (count <= 1)
+		{
+			if (n == tosend)
+				applog(LOG_DEBUG, "API: sent all of %d first go", tosend);
+			else
+				applog(LOG_DEBUG, "API: sent %d of %d first go", n, tosend);
+		}
+		else
+		{
+			if (n == tosend)
+				applog(LOG_DEBUG, "API: sent all of remaining %d (count=%d)", tosend, count);
+			else
+				applog(LOG_DEBUG, "API: sent %d of remaining %d (count=%d)", n, tosend, count);
+		}
+		sent += n;
+		tosend -= n;
 	}
 	}
+	
+	bytes_shift(&io_data->data, sent);
+	
+	return sent;
+}
 
 
-	memcpy(io_data->cur, buf, len + 1);
-	io_data->cur += len;
-
+static bool io_add(struct io_data *io_data, char *buf)
+{
+	size_t len = strlen(buf);
+	if (bytes_len(&io_data->data) + len > SOCKBUFSIZ)
+		io_flush(io_data, false);
+	bytes_append(&io_data->data, buf, len);
 	return true;
 	return true;
 }
 }
 
 
 static bool io_put(struct io_data *io_data, char *buf)
 static bool io_put(struct io_data *io_data, char *buf)
 {
 {
-	io_reinit(io_data);
+	bytes_reset(&io_data->data);
 	return io_add(io_data, buf);
 	return io_add(io_data, buf);
 }
 }
 
 
@@ -667,21 +652,9 @@ static void io_close(struct io_data *io_data)
 
 
 static void io_free()
 static void io_free()
 {
 {
-	struct io_list *io_list, *io_next;
-
-	if (io_head) {
-		io_list = io_head;
-		do {
-			io_next = io_list->next;
-
-			free(io_list->io_data);
-			free(io_list);
-
-			io_list = io_next;
-		} while (io_list != io_head);
-
-		io_head = NULL;
-	}
+	bytes_free(&rpc_io_data->data);
+	free(rpc_io_data);
+	rpc_io_data = NULL;
 }
 }
 
 
 // This is only called when expected to be needed (rarely)
 // This is only called when expected to be needed (rarely)
@@ -3329,64 +3302,23 @@ static void checkcommand(struct io_data *io_data, __maybe_unused SOCKETTYPE c, c
 
 
 static void send_result(struct io_data *io_data, SOCKETTYPE c, bool isjson)
 static void send_result(struct io_data *io_data, SOCKETTYPE c, bool isjson)
 {
 {
-	char buf[SOCKBUFSIZ + sizeof(JSON_CLOSE) + sizeof(JSON_END)];
-	int count, res, tosend, len, n;
-
-	strcpy(buf, io_data->ptr);
-
 	if (io_data->close)
 	if (io_data->close)
-		strcat(buf, JSON_CLOSE);
-
-	if (isjson) {
-		if (io_data->full)
-			strcat(buf, JSON_END_TRUNCATED);
-		else
-			strcat(buf, JSON_END);
-	}
-
-	len = strlen(buf);
-	tosend = len+1;
-
-	applog(LOG_DEBUG, "API: send reply: (%d) '%.10s%s'", tosend, buf, len > 10 ? "..." : BLANK);
-
-	count = 0;
-	while (count++ < 5 && tosend > 0) {
-		// allow 50ms per attempt
-		struct timeval timeout = {0, 50000};
-		fd_set wd;
-
-		FD_ZERO(&wd);
-		FD_SET(c, &wd);
-		if ((res = select(c + 1, NULL, &wd, NULL, &timeout)) < 1) {
-			applog(LOG_WARNING, "API: send select failed (%d)", res);
-			return;
-		}
-
-		n = send(c, buf, tosend, 0);
-
-		if (SOCKETFAIL(n)) {
-			if (sock_blocks())
-				continue;
-
-			applog(LOG_WARNING, "API: send (%d) failed: %s", tosend, SOCKERRMSG);
-
-			return;
-		} else {
-			if (count <= 1) {
-				if (n == tosend)
-					applog(LOG_DEBUG, "API: sent all of %d first go", tosend);
-				else
-					applog(LOG_DEBUG, "API: sent %d of %d first go", n, tosend);
-			} else {
-				if (n == tosend)
-					applog(LOG_DEBUG, "API: sent all of remaining %d (count=%d)", tosend, count);
-				else
-					applog(LOG_DEBUG, "API: sent %d of remaining %d (count=%d)", n, tosend, count);
-			}
-
-			tosend -= n;
-		}
-	}
+		io_add(io_data, JSON_CLOSE);
+	
+	if (isjson)
+		io_add(io_data, JSON_END);
+	
+	bytes_nullterminate(&io_data->data);
+	applog(LOG_DEBUG, "API: send reply: (%ld) '%.10s%s'",
+	       (long)bytes_len(&io_data->data),
+	       bytes_buf(&io_data->data),
+	       bytes_len(&io_data->data) > 10 ? "..." : BLANK);
+	
+	io_flush(io_data, true);
+	
+	if (bytes_len(&io_data->data))
+		applog(LOG_WARNING, "RPC: Timed out with %ld bytes left to send",
+		       (long)bytes_len(&io_data->data));
 }
 }
 
 
 static void tidyup(__maybe_unused void *arg)
 static void tidyup(__maybe_unused void *arg)
@@ -3721,6 +3653,7 @@ void api(int api_thr_id)
 		return;
 		return;
 	}
 	}
 
 
+	rpc_io_data =
 	io_data = sock_io_new();
 	io_data = sock_io_new();
 
 
 	mutex_init(&quit_restart_lock);
 	mutex_init(&quit_restart_lock);
@@ -3861,6 +3794,7 @@ void api(int api_thr_id)
 				// the time of the request in now
 				// the time of the request in now
 				when = time(NULL);
 				when = time(NULL);
 				io_reinit(io_data);
 				io_reinit(io_data);
+				io_data->sock = c;
 
 
 				did = false;
 				did = false;