Browse Source

Add long polling support

Jeff Garzik 15 years ago
parent
commit
7a87bee999
3 changed files with 196 additions and 21 deletions
  1. 104 18
      cpu-miner.c
  2. 12 2
      miner.h
  3. 80 1
      util.c

+ 104 - 18
cpu-miner.c

@@ -31,12 +31,6 @@
 #define DEF_RPC_URL		"http://127.0.0.1:8332/"
 #define DEF_RPC_URL		"http://127.0.0.1:8332/"
 #define DEF_RPC_USERPASS	"rpcuser:rpcpass"
 #define DEF_RPC_USERPASS	"rpcuser:rpcpass"
 
 
-struct thr_info {
-	int		id;
-	pthread_t	pth;
-	struct thread_q	*q;
-};
-
 enum workio_commands {
 enum workio_commands {
 	WC_GET_WORK,
 	WC_GET_WORK,
 	WC_SUBMIT_WORK,
 	WC_SUBMIT_WORK,
@@ -78,18 +72,21 @@ static const char *algo_names[] = {
 
 
 bool opt_debug = false;
 bool opt_debug = false;
 bool opt_protocol = false;
 bool opt_protocol = false;
+bool want_longpoll = true;
+bool have_longpoll = false;
 static bool opt_quiet = false;
 static bool opt_quiet = false;
 static int opt_retries = 10;
 static int opt_retries = 10;
 static int opt_fail_pause = 30;
 static int opt_fail_pause = 30;
-static int opt_scantime = 5;
+int opt_scantime = 5;
 static json_t *opt_config;
 static json_t *opt_config;
 static const bool opt_time = true;
 static const bool opt_time = true;
 static enum sha256_algos opt_algo = ALGO_C;
 static enum sha256_algos opt_algo = ALGO_C;
 static int opt_n_threads = 1;
 static int opt_n_threads = 1;
 static char *rpc_url;
 static char *rpc_url;
 static char *userpass;
 static char *userpass;
-static struct thr_info *thr_info;
+struct thr_info *thr_info;
 static int work_thr_id;
 static int work_thr_id;
+int longpoll_thr_id;
 struct work_restart *work_restart = NULL;
 struct work_restart *work_restart = NULL;
 
 
 
 
@@ -130,6 +127,9 @@ static struct option_help options_help[] = {
 	{ "debug",
 	{ "debug",
 	  "(-D) Enable debug output (default: off)" },
 	  "(-D) Enable debug output (default: off)" },
 
 
+	{ "no-longpoll",
+	  "Disable X-Long-Polling support (default: enabled)" },
+
 	{ "protocol-dump",
 	{ "protocol-dump",
 	  "(-P) Verbose dump of protocol-level activities (default: off)" },
 	  "(-P) Verbose dump of protocol-level activities (default: off)" },
 
 
@@ -170,6 +170,7 @@ static struct option options[] = {
 	{ "scantime", 1, NULL, 's' },
 	{ "scantime", 1, NULL, 's' },
 	{ "url", 1, NULL, 1001 },
 	{ "url", 1, NULL, 1001 },
 	{ "userpass", 1, NULL, 1002 },
 	{ "userpass", 1, NULL, 1002 },
+	{ "no-longpoll", 0, NULL, 1003 },
 	{ }
 	{ }
 };
 };
 
 
@@ -262,7 +263,7 @@ static bool submit_upstream_work(CURL *curl, const struct work *work)
 		fprintf(stderr, "DBG: sending RPC call:\n%s", s);
 		fprintf(stderr, "DBG: sending RPC call:\n%s", s);
 
 
 	/* issue JSON-RPC request */
 	/* issue JSON-RPC request */
-	val = json_rpc_call(curl, rpc_url, userpass, s);
+	val = json_rpc_call(curl, rpc_url, userpass, s, false, false);
 	if (!val) {
 	if (!val) {
 		fprintf(stderr, "submit_upstream_work json_rpc_call failed\n");
 		fprintf(stderr, "submit_upstream_work json_rpc_call failed\n");
 		goto out;
 		goto out;
@@ -285,14 +286,16 @@ out:
 	return rc;
 	return rc;
 }
 }
 
 
+static const char *rpc_req =
+	"{\"method\": \"getwork\", \"params\": [], \"id\":0}\r\n";
+
 static bool get_upstream_work(CURL *curl, struct work *work)
 static bool get_upstream_work(CURL *curl, struct work *work)
 {
 {
-	static const char *rpc_req =
-		"{\"method\": \"getwork\", \"params\": [], \"id\":0}\r\n";
 	json_t *val;
 	json_t *val;
 	bool rc;
 	bool rc;
 
 
-	val = json_rpc_call(curl, rpc_url, userpass, rpc_req);
+	val = json_rpc_call(curl, rpc_url, userpass, rpc_req,
+			    want_longpoll, false);
 	if (!val)
 	if (!val)
 		return false;
 		return false;
 
 
@@ -593,7 +596,7 @@ out:
 	return NULL;
 	return NULL;
 }
 }
 
 
-void restart_threads(void)
+static void restart_threads(void)
 {
 {
 	int i;
 	int i;
 
 
@@ -601,6 +604,68 @@ void restart_threads(void)
 		work_restart[i].restart = 1;
 		work_restart[i].restart = 1;
 }
 }
 
 
+static void *longpoll_thread(void *userdata)
+{
+	struct thr_info *mythr = userdata;
+	CURL *curl = NULL;
+	char *copy_start, *hdr_path, *lp_url = NULL;
+	bool need_slash = false;
+	int failures = 0;
+
+	hdr_path = tq_pop(mythr->q, NULL);
+	if (!hdr_path)
+		goto out;
+	copy_start = (*hdr_path == '/') ? (hdr_path + 1) : hdr_path;
+	if (rpc_url[strlen(rpc_url) - 1] != '/')
+		need_slash = true;
+
+	lp_url = malloc(strlen(rpc_url) + strlen(copy_start) + 2);
+	if (!lp_url)
+		goto out;
+
+	sprintf(lp_url, "%s%s%s", rpc_url, need_slash ? "/" : "", copy_start);
+
+	fprintf(stderr, "Long-polling activated for %s\n", lp_url);
+
+	curl = curl_easy_init();
+	if (!curl) {
+		fprintf(stderr, "CURL initialization failed\n");
+		goto out;
+	}
+
+	while (1) {
+		json_t *val;
+
+		val = json_rpc_call(curl, lp_url, userpass, rpc_req,
+				    false, true);
+		if (val) {
+			failures = 0;
+			json_decref(val);
+			fprintf(stderr, "LONGPOLL detected new block\n");
+			restart_threads();
+		} else {
+			if (failures++ < 10) {
+				sleep(30);
+				fprintf(stderr,
+					"longpoll failed, sleeping for 30s\n");
+			} else {
+				fprintf(stderr,
+					"longpoll failed, ending thread\n");
+				goto out;
+			}
+		}
+	}
+
+out:
+	free(hdr_path);
+	free(lp_url);
+	tq_freeze(mythr->q);
+	if (curl)
+		curl_easy_cleanup(curl);
+
+	return NULL;
+}
+
 static void show_usage(void)
 static void show_usage(void)
 {
 {
 	int i;
 	int i;
@@ -696,6 +761,9 @@ static void parse_arg (int key, char *arg)
 		free(userpass);
 		free(userpass);
 		userpass = strdup(arg);
 		userpass = strdup(arg);
 		break;
 		break;
+	case 1003:
+		want_longpoll = false;
+		break;
 	default:
 	default:
 		show_usage();
 		show_usage();
 	}
 	}
@@ -763,17 +831,18 @@ int main (int argc, char *argv[])
 	if (setpriority(PRIO_PROCESS, 0, 19))
 	if (setpriority(PRIO_PROCESS, 0, 19))
 		perror("setpriority");
 		perror("setpriority");
 
 
-	thr_info = calloc(opt_n_threads + 1, sizeof(*thr));
-	if (!thr_info)
-		return 1;
-
 	work_restart = calloc(opt_n_threads, sizeof(*work_restart));
 	work_restart = calloc(opt_n_threads, sizeof(*work_restart));
 	if (!work_restart)
 	if (!work_restart)
 		return 1;
 		return 1;
 
 
+	thr_info = calloc(opt_n_threads + 2, sizeof(*thr));
+	if (!thr_info)
+		return 1;
+
+	/* init workio thread info */
 	work_thr_id = opt_n_threads;
 	work_thr_id = opt_n_threads;
 	thr = &thr_info[work_thr_id];
 	thr = &thr_info[work_thr_id];
-	thr->id = opt_n_threads;
+	thr->id = work_thr_id;
 	thr->q = tq_new();
 	thr->q = tq_new();
 	if (!thr->q)
 	if (!thr->q)
 		return 1;
 		return 1;
@@ -784,6 +853,23 @@ int main (int argc, char *argv[])
 		return 1;
 		return 1;
 	}
 	}
 
 
+	/* init longpoll thread info */
+	if (want_longpoll) {
+		longpoll_thr_id = opt_n_threads + 1;
+		thr = &thr_info[longpoll_thr_id];
+		thr->id = longpoll_thr_id;
+		thr->q = tq_new();
+		if (!thr->q)
+			return 1;
+
+		/* start longpoll thread */
+		if (pthread_create(&thr->pth, NULL, longpoll_thread, thr)) {
+			fprintf(stderr, "longpoll thread create failed\n");
+			return 1;
+		}
+	} else
+		longpoll_thr_id = -1;
+
 	/* start mining threads */
 	/* start mining threads */
 	for (i = 0; i < opt_n_threads; i++) {
 	for (i = 0; i < opt_n_threads; i++) {
 		thr = &thr_info[i];
 		thr = &thr_info[i];

+ 12 - 2
miner.h

@@ -42,6 +42,12 @@
 #define ARRAY_SIZE(arr) (sizeof(arr) / sizeof((arr)[0]))
 #define ARRAY_SIZE(arr) (sizeof(arr) / sizeof((arr)[0]))
 #endif
 #endif
 
 
+struct thr_info {
+	int		id;
+	pthread_t	pth;
+	struct thread_q	*q;
+};
+
 static inline uint32_t swab32(uint32_t v)
 static inline uint32_t swab32(uint32_t v)
 {
 {
 #ifdef WANT_BUILTIN_BSWAP
 #ifdef WANT_BUILTIN_BSWAP
@@ -70,7 +76,7 @@ extern bool opt_debug;
 extern bool opt_protocol;
 extern bool opt_protocol;
 extern const uint32_t sha256_init_state[];
 extern const uint32_t sha256_init_state[];
 extern json_t *json_rpc_call(CURL *curl, const char *url, const char *userpass,
 extern json_t *json_rpc_call(CURL *curl, const char *url, const char *userpass,
-			     const char *rpc_req);
+			     const char *rpc_req, bool, bool);
 extern char *bin2hex(const unsigned char *p, size_t len);
 extern char *bin2hex(const unsigned char *p, size_t len);
 extern bool hex2bin(unsigned char *p, const char *hexstr, size_t len);
 extern bool hex2bin(unsigned char *p, const char *hexstr, size_t len);
 
 
@@ -110,6 +116,9 @@ timeval_subtract (struct timeval *result, struct timeval *x, struct timeval *y);
 
 
 extern bool fulltest(const unsigned char *hash, const unsigned char *target);
 extern bool fulltest(const unsigned char *hash, const unsigned char *target);
 
 
+extern int opt_scantime;
+extern bool want_longpoll;
+extern bool have_longpoll;
 struct thread_q;
 struct thread_q;
 
 
 struct work_restart {
 struct work_restart {
@@ -117,8 +126,9 @@ struct work_restart {
 	char			padding[128 - sizeof(unsigned long)];
 	char			padding[128 - sizeof(unsigned long)];
 };
 };
 
 
+extern struct thr_info *thr_info;
+extern int longpoll_thr_id;
 extern struct work_restart *work_restart;
 extern struct work_restart *work_restart;
-extern void restart_threads(void);
 
 
 extern struct thread_q *tq_new(void);
 extern struct thread_q *tq_new(void);
 extern void tq_free(struct thread_q *tq);
 extern void tq_free(struct thread_q *tq);

+ 80 - 1
util.c

@@ -13,6 +13,7 @@
 
 
 #include <stdio.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <stdlib.h>
+#include <ctype.h>
 #include <string.h>
 #include <string.h>
 #include <jansson.h>
 #include <jansson.h>
 #include <curl/curl.h>
 #include <curl/curl.h>
@@ -29,6 +30,10 @@ struct upload_buffer {
 	size_t		len;
 	size_t		len;
 };
 };
 
 
+struct header_info {
+	char		*lp_path;
+};
+
 struct tq_ent {
 struct tq_ent {
 	void			*data;
 	void			*data;
 	struct list_head	q_node;
 	struct list_head	q_node;
@@ -95,8 +100,62 @@ static size_t upload_data_cb(void *ptr, size_t size, size_t nmemb,
 	return len;
 	return len;
 }
 }
 
 
+static size_t resp_hdr_cb(void *ptr, size_t size, size_t nmemb, void *user_data)
+{
+	struct header_info *hi = user_data;
+	size_t remlen, slen, ptrlen = size * nmemb;
+	char *rem, *val = NULL, *key = NULL;
+	void *tmp;
+
+	if (opt_protocol)
+		printf("In resp_hdr_cb\n");
+
+	val = calloc(1, ptrlen);
+	key = calloc(1, ptrlen);
+	if (!key || !val)
+		goto out;
+
+	tmp = memchr(ptr, ':', ptrlen);
+	if (!tmp || (tmp == ptr))	/* skip empty keys / blanks */
+		goto out;
+	slen = tmp - ptr;
+	if ((slen + 1) == ptrlen)	/* skip key w/ no value */
+		goto out;
+	memcpy(key, ptr, slen);		/* store & nul term key */
+	key[slen] = 0;
+
+	rem = ptr + slen + 1;		/* trim value's leading whitespace */
+	remlen = ptrlen - slen - 1;
+	while ((remlen > 0) && (isspace(*rem))) {
+		remlen--;
+		rem++;
+	}
+
+	memcpy(val, rem, remlen);	/* store value, trim trailing ws */
+	val[remlen] = 0;
+	while ((*val) && (isspace(val[strlen(val) - 1]))) {
+		val[strlen(val) - 1] = 0;
+	}
+	if (!*val)			/* skip blank value */
+		goto out;
+
+	if (opt_protocol)
+		printf("HTTP hdr(%s): %s\n", key, val);
+
+	if (!strcasecmp("X-Long-Polling", key)) {
+		hi->lp_path = val;	/* steal memory reference */
+		val = NULL;
+	}
+
+out:
+	free(key);
+	free(val);
+	return ptrlen;
+}
+
 json_t *json_rpc_call(CURL *curl, const char *url,
 json_t *json_rpc_call(CURL *curl, const char *url,
-		      const char *userpass, const char *rpc_req)
+		      const char *userpass, const char *rpc_req,
+		      bool longpoll_scan, bool longpoll)
 {
 {
 	json_t *val, *err_val, *res_val;
 	json_t *val, *err_val, *res_val;
 	int rc;
 	int rc;
@@ -106,9 +165,15 @@ json_t *json_rpc_call(CURL *curl, const char *url,
 	struct curl_slist *headers = NULL;
 	struct curl_slist *headers = NULL;
 	char len_hdr[64];
 	char len_hdr[64];
 	char curl_err_str[CURL_ERROR_SIZE];
 	char curl_err_str[CURL_ERROR_SIZE];
+	long timeout = longpoll ? (60 * 60) : (60 * 10);
+	struct header_info hi = { };
+	bool lp_scanning = false;
 
 
 	/* it is assumed that 'curl' is freshly [re]initialized at this pt */
 	/* it is assumed that 'curl' is freshly [re]initialized at this pt */
 
 
+	if (longpoll_scan)
+		lp_scanning = want_longpoll && !have_longpoll;
+
 	if (opt_protocol)
 	if (opt_protocol)
 		curl_easy_setopt(curl, CURLOPT_VERBOSE, 1);
 		curl_easy_setopt(curl, CURLOPT_VERBOSE, 1);
 	curl_easy_setopt(curl, CURLOPT_URL, url);
 	curl_easy_setopt(curl, CURLOPT_URL, url);
@@ -121,6 +186,11 @@ json_t *json_rpc_call(CURL *curl, const char *url,
 	curl_easy_setopt(curl, CURLOPT_READDATA, &upload_data);
 	curl_easy_setopt(curl, CURLOPT_READDATA, &upload_data);
 	curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, curl_err_str);
 	curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, curl_err_str);
 	curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1);
 	curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1);
+	curl_easy_setopt(curl, CURLOPT_TIMEOUT, timeout);
+	if (lp_scanning) {
+		curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, resp_hdr_cb);
+		curl_easy_setopt(curl, CURLOPT_HEADERDATA, &hi);
+	}
 	if (userpass) {
 	if (userpass) {
 		curl_easy_setopt(curl, CURLOPT_USERPWD, userpass);
 		curl_easy_setopt(curl, CURLOPT_USERPWD, userpass);
 		curl_easy_setopt(curl, CURLOPT_HTTPAUTH, CURLAUTH_BASIC);
 		curl_easy_setopt(curl, CURLOPT_HTTPAUTH, CURLAUTH_BASIC);
@@ -148,6 +218,15 @@ json_t *json_rpc_call(CURL *curl, const char *url,
 		goto err_out;
 		goto err_out;
 	}
 	}
 
 
+	/* If X-Long-Polling was found, activate long polling */
+	if (hi.lp_path) {
+		have_longpoll = true;
+		opt_scantime = 60;
+		tq_push(thr_info[longpoll_thr_id].q, hi.lp_path);
+	} else
+		free(hi.lp_path);
+	hi.lp_path = NULL;
+
 	val = json_loads(all_data.buf, &err);
 	val = json_loads(all_data.buf, &err);
 	if (!val) {
 	if (!val) {
 		fprintf(stderr, "JSON decode failed(%d): %s\n", err.line, err.text);
 		fprintf(stderr, "JSON decode failed(%d): %s\n", err.line, err.text);