Browse Source

Merge branch 'submit_async' into bfgminer

Luke Dashjr 13 years ago
parent
commit
cbe34dd072
6 changed files with 425 additions and 143 deletions
  1. 6 5
      README
  2. 4 0
      driver-modminer.c
  3. 270 86
      miner.c
  4. 1 0
      miner.h
  5. 136 52
      util.c
  6. 8 0
      util.h

+ 6 - 5
README

@@ -163,7 +163,7 @@ Options for both config file and command line:
 --shares <arg>      Quit after mining N shares (default: unlimited)
 --skip-security-checks <arg> Skip security checks sometimes to save bandwidth; only check 1/<arg>th of the time (default: never skip)
 --socks-proxy <arg> Set socks4 proxy (host:port) for all pools without a proxy specified
---submit-threads    Maximum number of share submission threads (default: 64)
+--submit-threads    Minimum number of concurrent share submissions (default: 64)
 --syslog            Use system log for output messages (default: standard error)
 --temp-cutoff <arg> Temperature where a device will be automatically disabled, one value or comma separated list (default: 95)
 --temp-hysteresis <arg> Set how much the temperature can fluctuate outside limits when automanaging speeds (default: 3)
@@ -406,7 +406,7 @@ dedicated to this program,
 	http://forum.bitcoin.org/index.php?topic=28402.0
 
 The output line shows the following:
-5s:1713.6 avg:1707.8 u:1710.2 Mh/s | A:729  R:8  HW:0  E:242%  U:22.53/m
+5s:1713.6 avg:1707.8 u:1710.2 Mh/s | A:729 R:8 S:0 HW:0 E:242% U:22.53/m
 
 Each column is as follows:
 5s:  A 5 second exponentially decaying average hash rate
@@ -414,6 +414,7 @@ avg: An all time average hash rate
 u:   An all time average hash rate based on actual accepted shares
 A:   The number of Accepted shares
 R:   The number of Rejected shares
+S:   Stale shares discarded (not submitted so don't count as rejects)
 HW:  The number of HardWare errors
 E:   The Efficiency defined as number of shares returned / work item
 U:   The Utility defined as the number of shares / minute
@@ -432,16 +433,16 @@ The number of hardware erorrs
 The utility defines as the number of shares / minute
 
 The BFGMiner status line shows:
- TQ: 1  ST: 1  SS: 0  DW: 0  NB: 1  GW: 301  LW: 8  GF: 1  RF: 1
+ TQ: 1  ST: 1  DW: 0  GW: 301  LW: 8  GF: 1  NB: 1  AS: 0  RF: 1
 
 TQ is Total Queued work items.
 ST is STaged work items (ready to use).
-SS is Stale Shares discarded (detected and not submitted so don't count as rejects)
 DW is Discarded Work items (work from block no longer valid to work on)
-NB is New Blocks detected on the network
 GW is GetWork requested (work items from pools)
 LW is Locally generated Work items
 GF is Getwork Fail Occasions (server slow to provide work)
+NB is New Blocks detected on the network
+AS is Active Submissions (shares in the process of submitting)
 RF is Remote Fail occasions (server slow to accept work)
 
 NOTE: Running intensities above 9 with current hardware is likely to only

+ 4 - 0
driver-modminer.c

@@ -9,6 +9,10 @@
 
 #include "config.h"
 
+#ifdef WIN32
+#define FD_SETSIZE 4096
+#endif
+
 #include <stdarg.h>
 #include <stdio.h>
 #include <unistd.h>

+ 270 - 86
miner.c

@@ -11,6 +11,10 @@
 
 #include "config.h"
 
+#ifdef WIN32
+#define FD_SETSIZE 4096
+#endif
+
 #ifdef HAVE_CURSES
 #include <curses.h>
 #endif
@@ -236,8 +240,9 @@ static struct timeval miner_started;
 pthread_mutex_t control_lock;
 
 static pthread_mutex_t submitting_lock;
-static int submitting;
+static int submitting, total_submitting;
 static struct list_head submit_waiting;
+int submit_waiting_notifier[2];
 
 int hw_errors;
 int total_accepted, total_rejected, total_diff1;
@@ -1326,7 +1331,7 @@ static struct opt_table opt_config_table[] = {
 	                opt_hidden),
 	OPT_WITHOUT_ARG("--submit-threads",
 	                opt_set_intval, &opt_submit_threads,
-	                "Maximum number of share submission threads (default: 64)"),
+	                "Minimum number of concurrent share submissions (default: 64)"),
 #ifdef HAVE_SYSLOG_H
 	OPT_WITHOUT_ARG("--syslog",
 			opt_set_bool, &use_syslog,
@@ -2143,10 +2148,14 @@ static void curses_print_status(void)
 	mvwhline(statuswin, 1, 0, '-', 80);
 	mvwprintw(statuswin, 2, 0, " %s", statusline);
 	wclrtoeol(statuswin);
-	mvwprintw(statuswin, 3, 0, " TQ: %d  ST: %d  SS: %d  DW: %d  NB: %d  GW: %d  LW: %d  GF: %d  RF: %d",
-		global_queued(), total_staged(), total_stale, total_discarded, new_blocks,
+	mvwprintw(statuswin, 3, 0, " TQ: %d  ST: %d  DW: %d  GW: %d  LW: %d  GF: %d  NB: %d  AS: %d  RF: %d",
+		global_queued(), total_staged(), total_discarded,
 		total_getworks,
-		local_work, total_go, total_ro);
+		local_work,
+		total_go,
+		new_blocks,
+		total_submitting,
+		total_ro);
 	wclrtoeol(statuswin);
 	if ((pool_strategy == POOL_LOADBALANCE  || pool_strategy == POOL_BALANCE) && total_pools > 1) {
 		mvwprintw(statuswin, 4, 0, " Connected to multiple pools with%s LP",
@@ -2615,20 +2624,11 @@ static uint64_t scrypt_diff(const struct work *work)
 	return ret;
 }
 
-static bool submit_upstream_work(struct work *work, CURL *curl, bool resubmit)
+static char *submit_upstream_work_request(struct work *work)
 {
 	char *hexstr = NULL;
-	json_t *val, *res, *err;
 	char *s, *sd;
-	bool rc = false;
-	int thr_id = work->thr_id;
-	struct cgpu_info *cgpu = thr_info[thr_id].cgpu;
 	struct pool *pool = work->pool;
-	int rolltime;
-	uint32_t *hash32;
-	struct timeval tv_submit, tv_submit_reply;
-	char hashshow[64 + 4] = "";
-	char worktime[200] = "";
 
 	if (work->tmpl) {
 		unsigned char data[80];
@@ -2647,6 +2647,7 @@ static bool submit_upstream_work(struct work *work, CURL *curl, bool resubmit)
 		s = realloc_strcat(s, hexstr);
 		s = realloc_strcat(s, "\" ], \"id\":1}");
 
+		free(hexstr);
 		sd = s;
 
 	}
@@ -2657,12 +2658,21 @@ static bool submit_upstream_work(struct work *work, CURL *curl, bool resubmit)
 	else
 		s = realloc_strcat(s, "\n");
 
-	gettimeofday(&tv_submit, NULL);
-	/* issue JSON-RPC request */
-	val = json_rpc_call(curl, pool->rpc_url, pool->rpc_userpass, s, false, false, &rolltime, pool, true);
+	return s;
+}
+
+static bool submit_upstream_work_completed(struct work *work, bool resubmit, struct timeval *ptv_submit, json_t *val) {
+	json_t *res, *err;
+	bool rc = false;
+	int thr_id = work->thr_id;
+	struct cgpu_info *cgpu = thr_info[thr_id].cgpu;
+	struct pool *pool = work->pool;
+	uint32_t *hash32;
+	struct timeval tv_submit_reply;
+	char hashshow[64 + 4] = "";
+	char worktime[200] = "";
 
 	gettimeofday(&tv_submit_reply, NULL);
-	free(s);
 
 	if (unlikely(!val)) {
 		applog(LOG_INFO, "submit_upstream_work json_rpc_call failed");
@@ -2671,7 +2681,6 @@ static bool submit_upstream_work(struct work *work, CURL *curl, bool resubmit)
 			pool->remotefail_occasions++;
 			applog(LOG_WARNING, "Pool %d communication failure, caching submissions", pool->pool_no);
 		}
-		sleep(5);
 		goto out;
 	} else if (pool_tclear(pool, &pool->submit_fail))
 		applog(LOG_WARNING, "Pool %d communication resumed, submitting work", pool->pool_no);
@@ -2712,9 +2721,9 @@ static bool submit_upstream_work(struct work *work, CURL *curl, bool resubmit)
 							(struct timeval *)&(work->tv_getwork_reply));
 			double work_time = tdiff((struct timeval *)&(work->tv_work_found),
 							(struct timeval *)&(work->tv_work_start));
-			double work_to_submit = tdiff(&tv_submit,
+			double work_to_submit = tdiff(ptv_submit,
 							(struct timeval *)&(work->tv_work_found));
-			double submit_time = tdiff(&tv_submit_reply, &tv_submit);
+			double submit_time = tdiff(&tv_submit_reply, ptv_submit);
 			int diffplaces = 3;
 
 			tm = localtime(&(work->tv_getwork.tv_sec));
@@ -2763,7 +2772,6 @@ static bool submit_upstream_work(struct work *work, CURL *curl, bool resubmit)
 
 	rc = true;
 out:
-	free(hexstr);
 	return rc;
 }
 
@@ -3028,6 +3036,7 @@ static void workio_cmd_free(struct workio_cmd *wc)
 
 	switch (wc->cmd) {
 	case WC_SUBMIT_WORK:
+		if (wc->work)
 		free_work(wc->work);
 		break;
 	default: /* do nothing */
@@ -3183,7 +3192,7 @@ static void recruit_curl(struct pool *pool)
  * and there are already 5 curls in circulation. Limit total number to the
  * number of mining threads per pool as well to prevent blasting a pool during
  * network delays/outages. */
-static struct curl_ent *pop_curl_entry(struct pool *pool)
+static struct curl_ent *pop_curl_entry2(struct pool *pool, bool blocking)
 {
 	int curl_limit = opt_delaynet ? 5 : (mining_threads + opt_queue) * 2;
 	struct curl_ent *ce;
@@ -3193,7 +3202,11 @@ retry:
 	if (!pool->curls)
 		recruit_curl(pool);
 	else if (list_empty(&pool->curlring)) {
-		if (pool->curls >= curl_limit) {
+		if (pool->curls >= curl_limit && (blocking || pool->curls >= opt_submit_threads)) {
+			if (!blocking) {
+				mutex_unlock(&pool->pool_lock);
+				return NULL;
+			}
 			pthread_cond_wait(&pool->cr_cond, &pool->pool_lock);
 			goto retry;
 		} else
@@ -3206,6 +3219,11 @@ retry:
 	return ce;
 }
 
+static struct curl_ent *pop_curl_entry(struct pool *pool)
+{
+	return pop_curl_entry2(pool, true);
+}
+
 static void push_curl_entry(struct curl_ent *ce, struct pool *pool)
 {
 	mutex_lock(&pool->pool_lock);
@@ -3638,37 +3656,49 @@ static void submit_discard_share(struct work *work)
 	work->pool->diff_stale += work->work_difficulty;
 }
 
-static void *submit_work_thread(void *userdata)
-{
-	struct workio_cmd *wc = (struct workio_cmd *)userdata;
+struct submit_work_state {
 	struct work *work;
-	struct pool *pool;
 	bool resubmit;
 	struct curl_ent *ce;
 	int failures;
 	time_t staleexpire;
+	char *s;
+	struct timeval tv_submit;
+	struct submit_work_state *next;
+};
 
-	pthread_detach(pthread_self());
+static int my_curl_timer_set(__maybe_unused CURLM *curlm, long timeout_ms, void *userp)
+{
+	long *timeout = userp;
+	*timeout = timeout_ms;
+	return 0;
+}
 
-	RenameThread("submit_work");
+static void sws_has_ce(struct submit_work_state *sws)
+{
+	struct pool *pool = sws->work->pool;
+	sws->s = submit_upstream_work_request(sws->work);
+	gettimeofday(&sws->tv_submit, NULL);
+	json_rpc_call_async(sws->ce->curl, pool->rpc_url, pool->rpc_userpass, sws->s, false, pool, true, sws);
+}
 
-	applog(LOG_DEBUG, "Creating extra submit work thread");
+static struct submit_work_state *begin_submission(struct workio_cmd *wc)
+{
+	struct work *work;
+	struct pool *pool;
+	struct submit_work_state *sws = NULL;
 
-next_submit:
 	work = wc->work;
 	pool = work->pool;
-	resubmit = false;
-	failures = 0;
+	sws = malloc(sizeof(*sws));
+	*sws = (struct submit_work_state){
+		.work = work,
+	};
 
 	check_solve(work);
 
 	if (stale_work(work, true)) {
 		work->stale = true;
-		if (unlikely(!list_empty(&submit_waiting))) {
-			applog(LOG_WARNING, "Pool %d stale share detected while queued submissions are waiting, discarding", pool->pool_no);
-			submit_discard_share(work);
-			goto out;
-		}
 		if (opt_submit_stale)
 			applog(LOG_NOTICE, "Pool %d stale share detected, submitting as user requested", pool->pool_no);
 		else if (pool->submit_old)
@@ -3678,14 +3708,14 @@ next_submit:
 			submit_discard_share(work);
 			goto out;
 		}
-		staleexpire = time(NULL) + 300;
+		sws->staleexpire = time(NULL) + 300;
 	}
 
 	if (work->stratum) {
 		struct stratum_share *sshare = calloc(sizeof(struct stratum_share), 1);
 		uint32_t nonce;
 		char *noncehex;
-		char s[1024];
+		char *s;
 
 		sshare->work = copy_work(work);
 		mutex_lock(&sshare_lock);
@@ -3696,30 +3726,43 @@ next_submit:
 
 		nonce = *((uint32_t *)(work->data + 76));
 		noncehex = bin2hex((const unsigned char *)&nonce, 4);
-		memset(s, 0, 1024);
+		s = malloc(1024);
 		sprintf(s, "{\"params\": [\"%s\", \"%s\", \"%s\", \"%s\", \"%s\"], \"id\": %d, \"method\": \"mining.submit\"}",
 			pool->rpc_user, work->job_id, work->nonce2, work->ntime, noncehex, sshare->id);
 		free(noncehex);
 
-		applog(LOG_DEBUG, "DBG: sending %s submit RPC call: %s", pool->stratum_url, s);
-
-		if (likely(stratum_send(pool, s, strlen(s)))) {
-			if (pool_tclear(pool, &pool->submit_fail))
-					applog(LOG_WARNING, "Pool %d communication resumed, submitting work", pool->pool_no);
-			applog(LOG_DEBUG, "Successfully submitted, adding to stratum_shares db");
-		} else if (!pool_tset(pool, &pool->submit_fail)) {
-			applog(LOG_WARNING, "Pool %d stratum share submission failure", pool->pool_no);
-			total_ro++;
-			pool->remotefail_occasions++;
+		sws->s = s;
+	} else {
+		/* submit solution to bitcoin via JSON-RPC */
+		sws->ce = pop_curl_entry2(pool, false);
+		if (sws->ce) {
+			sws_has_ce(sws);
+		} else {
+			sws->next = pool->sws_waiting_on_curl;
+			pool->sws_waiting_on_curl = sws;
+			if (sws->next)
+				applog(LOG_DEBUG, "submit_thread queuing submission");
+			else
+				applog(LOG_WARNING, "submit_thread queuing submissions (see --submit-threads)");
 		}
-
-		goto out;
 	}
 
-	ce = pop_curl_entry(pool);
-	/* submit solution to bitcoin via JSON-RPC */
-	while (!submit_upstream_work(work, ce->curl, resubmit)) {
-		resubmit = true;
+	wc->work = NULL;
+	workio_cmd_free(wc);
+	return sws;
+
+out:
+	free(sws);
+	workio_cmd_free(wc);
+	return NULL;
+}
+
+static bool retry_submission(struct submit_work_state *sws)
+{
+	struct work *work = sws->work;
+	struct pool *pool = work->pool;
+
+		sws->resubmit = true;
 		if ((!work->stale) && stale_work(work, true)) {
 			work->stale = true;
 			if (opt_submit_stale)
@@ -3729,45 +3772,183 @@ next_submit:
 			else {
 				applog(LOG_NOTICE, "Pool %d share became stale during submission failure, discarding", pool->pool_no);
 				submit_discard_share(work);
-				break;
+				return false;
 			}
-			staleexpire = time(NULL) + 300;
+			sws->staleexpire = time(NULL) + 300;
 		}
-		if (unlikely((opt_retries >= 0) && (++failures > opt_retries))) {
+		if (unlikely((opt_retries >= 0) && (++sws->failures > opt_retries))) {
 			applog(LOG_ERR, "Pool %d failed %d submission retries, discarding", pool->pool_no, opt_retries);
 			submit_discard_share(work);
-			break;
+			return false;
 		}
 		else if (work->stale) {
-			if (unlikely(!list_empty(&submit_waiting))) {
-				applog(LOG_WARNING, "Pool %d stale share failed to submit while queued submissions are waiting, discarding", pool->pool_no);
-				submit_discard_share(work);
-				break;
-			} else if (unlikely(opt_retries < 0 && staleexpire <= time(NULL))) {
+			if (unlikely(opt_retries < 0 && sws->staleexpire <= time(NULL))) {
 				applog(LOG_NOTICE, "Pool %d stale share failed to submit for 5 minutes, discarding", pool->pool_no);
 				submit_discard_share(work);
-				break;
+				return false;
 			}
 		}
 
 		/* pause, then restart work-request loop */
 		applog(LOG_INFO, "json_rpc_call failed on submit_work, retrying");
-	}
-	push_curl_entry(ce, pool);
-out:
-	workio_cmd_free(wc);
 
-	mutex_lock(&submitting_lock);
-	if (!list_empty(&submit_waiting)) {
-		applog(LOG_DEBUG, "submit_work continuing with queued submission");
-		wc = list_entry(submit_waiting.next, struct workio_cmd, list);
-		list_del(&wc->list);
+		gettimeofday(&sws->tv_submit, NULL);
+		json_rpc_call_async(sws->ce->curl, pool->rpc_url, pool->rpc_userpass, sws->s, false, pool, true, sws);
+	
+	return true;
+}
+
+static void free_sws(struct submit_work_state *sws)
+{
+	free(sws->s);
+	free_work(sws->work);
+	free(sws);
+}
+
+static void *submit_work_thread(void *userdata)
+{
+	struct workio_cmd *wc = (struct workio_cmd *)userdata;
+	int wip = 0;
+	CURLM *curlm;
+	long curlm_timeout_ms = -1;
+	struct submit_work_state *sws, **swsp;
+	struct submit_work_state *write_sws = NULL;
+
+	pthread_detach(pthread_self());
+
+	RenameThread("submit_work");
+
+	applog(LOG_DEBUG, "Creating extra submit work thread");
+
+	curlm = curl_multi_init();
+	curl_multi_setopt(curlm, CURLMOPT_TIMERFUNCTION, my_curl_timer_set);
+	curl_multi_setopt(curlm, CURLMOPT_TIMERDATA, &curlm_timeout_ms);
+
+	if ( (sws = begin_submission(wc)) ) {
+		if (sws->ce)
+			curl_multi_add_handle(curlm, sws->ce->curl);
+		else
+		if (sws->s)
+			write_sws = sws;
+		++wip;
+		++total_submitting;
+	}
+
+	fd_set rfds, wfds, efds;
+	int maxfd;
+	struct timeval timeout, *timeoutp;
+	int n;
+	CURLMsg *cm;
+	FD_ZERO(&rfds);
+	while (1) {
+		mutex_lock(&submitting_lock);
+		if (FD_ISSET(submit_waiting_notifier[0], &rfds)) {
+			char buf[0x10];
+			(void)read(submit_waiting_notifier[0], buf, sizeof(buf));
+		}
+		while (!list_empty(&submit_waiting)) {
+			wc = list_entry(submit_waiting.next, struct workio_cmd, list);
+			list_del(&wc->list);
+			if ( (sws = begin_submission(wc)) ) {
+				if (sws->ce)
+					curl_multi_add_handle(curlm, sws->ce->curl);
+				else if (sws->s) {
+					sws->next = write_sws;
+					write_sws = sws;
+				}
+				++wip;
+				++total_submitting;
+			}
+		}
+		if (!wip)
+			break;
 		mutex_unlock(&submitting_lock);
-		goto next_submit;
+		
+		FD_ZERO(&rfds);
+		FD_ZERO(&wfds);
+		FD_ZERO(&efds);
+		curl_multi_fdset(curlm, &rfds, &wfds, &efds, &maxfd);
+		if (curlm_timeout_ms >= 0) {
+			timeout.tv_sec = curlm_timeout_ms / 1000;
+			timeout.tv_usec = (curlm_timeout_ms % 1000) * 1000;
+			timeoutp = &timeout;
+		} else
+			timeoutp = NULL;
+		
+		for (sws = write_sws; sws; sws = sws->next) {
+			int fd = sws->work->pool->sock;
+			FD_SET(fd, &wfds);
+			if (fd > maxfd)
+				maxfd = fd;
+		}
+		
+		FD_SET(submit_waiting_notifier[0], &rfds);
+		if (submit_waiting_notifier[0] > maxfd)
+			maxfd = submit_waiting_notifier[0];
+		
+		if (select(maxfd+1, &rfds, &wfds, &efds, timeoutp) < 0) {
+			FD_ZERO(&rfds);
+			continue;
+		}
+		
+		for (swsp = &write_sws; (sws = *swsp); ) {
+			int fd = sws->work->pool->sock;
+			if (!FD_ISSET(fd, &wfds)) {
+				swsp = &sws->next;
+				continue;
+			}
+			
+			struct pool *pool = sws->work->pool;
+			char *s = sws->s;
+
+			applog(LOG_DEBUG, "DBG: sending %s submit RPC call: %s", pool->stratum_url, s);
+
+			if (likely(stratum_send(pool, s, strlen(s)))) {
+				if (pool_tclear(pool, &pool->submit_fail))
+						applog(LOG_WARNING, "Pool %d communication resumed, submitting work", pool->pool_no);
+				applog(LOG_DEBUG, "Successfully submitted, adding to stratum_shares db");
+			} else if (!pool_tset(pool, &pool->submit_fail)) {
+				applog(LOG_WARNING, "Pool %d stratum share submission failure", pool->pool_no);
+				total_ro++;
+				pool->remotefail_occasions++;
+			}
+			
+			// Clear the fd from wfds, to avoid potentially blocking on other submissions to the same socket
+			FD_CLR(fd, &wfds);
+			// Delete sws for this submission, since we're done with it
+			*swsp = sws->next;
+			free_sws(sws);
+			--wip;
+		}
+		
+		curl_multi_perform(curlm, &n);
+		while( (cm = curl_multi_info_read(curlm, &n)) ) {
+			if (cm->msg == CURLMSG_DONE)
+			{
+				json_t *val = json_rpc_call_completed(cm->easy_handle, cm->data.result, false, NULL, &sws);
+				if (submit_upstream_work_completed(sws->work, sws->resubmit, &sws->tv_submit, val) || !retry_submission(sws)) {
+					--wip;
+					--total_submitting;
+					struct pool *pool = sws->work->pool;
+					if (pool->sws_waiting_on_curl) {
+						pool->sws_waiting_on_curl->ce = sws->ce;
+						sws_has_ce(pool->sws_waiting_on_curl);
+						pool->sws_waiting_on_curl = pool->sws_waiting_on_curl->next;
+					} else {
+						curl_multi_remove_handle(curlm, cm->easy_handle);
+						push_curl_entry(sws->ce, sws->work->pool);
+					}
+					free_sws(sws);
+				}
+			}
+		}
 	}
+	assert(!write_sws);
 	--submitting;
 	mutex_unlock(&submitting_lock);
 
+	curl_multi_cleanup(curlm);
+
 	return NULL;
 }
 
@@ -5160,12 +5341,9 @@ static void *workio_thread(void *userdata)
 		case WC_SUBMIT_WORK:
 		{
 			mutex_lock(&submitting_lock);
-			if (submitting >= opt_submit_threads) {
-				if (list_empty(&submit_waiting))
-					applog(LOG_WARNING, "workio_thread queuing submissions (see --submit-threads)");
-				else
-					applog(LOG_DEBUG, "workio_thread queuing submission");
+			if (submitting) {
 				list_add_tail(&wc->list, &submit_waiting);
+				(void)write(submit_waiting_notifier[1], "\0", 1);
 				mutex_unlock(&submitting_lock);
 				break;
 			}
@@ -5310,12 +5488,13 @@ static void hashmeter(int thr_id, struct timeval *diff,
 		utility_to_hashrate(total_diff_accepted / (total_secs ?: 1) * 60),
 		H2B_SPACED);
 
-	sprintf(statusline, "%s%ds:%s avg:%s u:%s | A:%d  R:%d  HW:%d  E:%.0f%%  U:%.1f/m",
+	sprintf(statusline, "%s%ds:%s avg:%s u:%s | A:%d R:%d S:%d HW:%d E:%.0f%% U:%.1f/m",
 		want_per_device_stats ? "ALL " : "",
 		opt_log_interval,
 		cHr, aHr,
 		uHr,
-		total_accepted, total_rejected, hw_errors, efficiency, utility);
+		total_accepted, total_rejected, total_stale,
+		hw_errors, efficiency, utility);
 
 
 	local_mhashes_done = 0;
@@ -5429,6 +5608,8 @@ fishy:
 			applog(LOG_NOTICE, "Rejected untracked stratum share from pool %d", pool->pool_no);
 		goto out;
 	}
+	else
+		--total_submitting;
 	stratum_share_result(val, res_val, err_val, sshare);
 	free_work(sshare->work);
 	free(sshare);
@@ -5462,6 +5643,7 @@ static void clear_stratum_shares(struct pool *pool)
 
 	mutex_lock(&sshare_lock);
 	HASH_ITER(hh, stratum_shares, sshare, tmpshare) {
+		--total_submitting;
 		if (sshare->work->pool == pool) {
 			HASH_DEL(stratum_shares, sshare);
 			free_work(sshare->work);
@@ -7597,6 +7779,8 @@ int main(int argc, char *argv[])
 	if (unlikely(pthread_cond_init(&restart_cond, NULL)))
 		quit(1, "Failed to pthread_cond_init restart_cond");
 
+	notifier_init(submit_waiting_notifier);
+
 	sprintf(packagename, "%s %s", PACKAGE, VERSION);
 
 #ifdef WANT_CPUMINE

+ 1 - 0
miner.h

@@ -980,6 +980,7 @@ struct pool {
 	int curls;
 	pthread_cond_t cr_cond;
 	struct list_head curlring;
+	struct submit_work_state *sws_waiting_on_curl;
 
 	time_t last_share_time;
 	double last_share_diff;

+ 136 - 52
util.c

@@ -13,6 +13,10 @@
 #define _GNU_SOURCE
 #include "config.h"
 
+#ifdef WIN32
+#define FD_SETSIZE 4096
+#endif
+
 #include <stdio.h>
 #include <stdlib.h>
 #include <ctype.h>
@@ -289,32 +293,37 @@ static void set_nettime(void)
 	wr_unlock(&netacc_lock);
 }
 
-json_t *json_rpc_call(CURL *curl, const char *url,
+struct json_rpc_call_state {
+	struct data_buffer all_data;
+	struct header_info hi;
+	void *priv;
+	char curl_err_str[CURL_ERROR_SIZE];
+	struct curl_slist *headers;
+	struct upload_buffer upload_data;
+	struct pool *pool;
+};
+
+void json_rpc_call_async(CURL *curl, const char *url,
 		      const char *userpass, const char *rpc_req,
-		      bool probe, bool longpoll, int *rolltime,
-		      struct pool *pool, bool share)
+		      bool longpoll,
+		      struct pool *pool, bool share,
+		      void *priv)
 {
+	struct json_rpc_call_state *state = malloc(sizeof(struct json_rpc_call_state));
+	*state = (struct json_rpc_call_state){
+		.priv = priv,
+		.pool = pool,
+	};
 	long timeout = longpoll ? (60 * 60) : 60;
-	struct data_buffer all_data = {.len = 0};
-	struct header_info hi = {NULL, 0, NULL, NULL, false, false, false};
 	char len_hdr[64], user_agent_hdr[128];
-	char curl_err_str[CURL_ERROR_SIZE];
 	struct curl_slist *headers = NULL;
-	struct upload_buffer upload_data;
-	json_t *val, *err_val, *res_val;
-	bool probing = false;
-	json_error_t err;
-	int rc;
-
-	memset(&err, 0, sizeof(err));
 
 	if (longpoll)
-		all_data.idlemarker = &pool->lp_socket;
+		state->all_data.idlemarker = &pool->lp_socket;
 
 	/* it is assumed that 'curl' is freshly [re]initialized at this pt */
 
-	if (probe)
-		probing = !pool->probed;
+	curl_easy_setopt(curl, CURLOPT_PRIVATE, state);
 	curl_easy_setopt(curl, CURLOPT_TIMEOUT, timeout);
 
 #if 0 /* Disable curl debugging since it spews to stderr */
@@ -331,13 +340,13 @@ json_t *json_rpc_call(CURL *curl, const char *url,
 	if (!opt_delaynet || share)
 		curl_easy_setopt(curl, CURLOPT_TCP_NODELAY, 1);
 	curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, all_data_cb);
-	curl_easy_setopt(curl, CURLOPT_WRITEDATA, &all_data);
+	curl_easy_setopt(curl, CURLOPT_WRITEDATA, &state->all_data);
 	curl_easy_setopt(curl, CURLOPT_READFUNCTION, upload_data_cb);
-	curl_easy_setopt(curl, CURLOPT_READDATA, &upload_data);
-	curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, curl_err_str);
+	curl_easy_setopt(curl, CURLOPT_READDATA, &state->upload_data);
+	curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, &state->curl_err_str[0]);
 	curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1);
 	curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, resp_hdr_cb);
-	curl_easy_setopt(curl, CURLOPT_HEADERDATA, &hi);
+	curl_easy_setopt(curl, CURLOPT_HEADERDATA, &state->hi);
 	curl_easy_setopt(curl, CURLOPT_USE_SSL, CURLUSESSL_TRY);
 	if (pool->rpc_proxy) {
 		curl_easy_setopt(curl, CURLOPT_PROXY, pool->rpc_proxy);
@@ -356,10 +365,10 @@ json_t *json_rpc_call(CURL *curl, const char *url,
 	if (opt_protocol)
 		applog(LOG_DEBUG, "JSON protocol request:\n%s", rpc_req);
 
-	upload_data.buf = rpc_req;
-	upload_data.len = strlen(rpc_req);
+	state->upload_data.buf = rpc_req;
+	state->upload_data.len = strlen(rpc_req);
 	sprintf(len_hdr, "Content-Length: %lu",
-		(unsigned long) upload_data.len);
+		(unsigned long) state->upload_data.len);
 	sprintf(user_agent_hdr, "User-Agent: %s", PACKAGE_STRING);
 
 	headers = curl_slist_append(headers,
@@ -383,6 +392,7 @@ json_t *json_rpc_call(CURL *curl, const char *url,
 	headers = curl_slist_append(headers, "Expect:"); /* disable Expect hdr*/
 
 	curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
+	state->headers = headers;
 
 	if (opt_delaynet) {
 		/* Don't delay share submission, but still track the nettime */
@@ -406,14 +416,31 @@ json_t *json_rpc_call(CURL *curl, const char *url,
 		}
 		set_nettime();
 	}
+}
+
+json_t *json_rpc_call_completed(CURL *curl, int rc, bool probe, int *rolltime, void *out_priv)
+{
+	struct json_rpc_call_state *state;
+	if (curl_easy_getinfo(curl, CURLINFO_PRIVATE, &state) != CURLE_OK) {
+		applog(LOG_ERR, "Failed to get private curl data");
+		if (out_priv)
+			*(void**)out_priv = NULL;
+		goto err_out;
+	}
+	if (out_priv)
+		*(void**)out_priv = state->priv;
+
+	json_t *val, *err_val, *res_val;
+	json_error_t err;
+	struct pool *pool = state->pool;
+	bool probing = probe && !pool->probed;
 
-	rc = curl_easy_perform(curl);
 	if (rc) {
-		applog(LOG_INFO, "HTTP request failed: %s", curl_err_str);
+		applog(LOG_INFO, "HTTP request failed: %s", state->curl_err_str);
 		goto err_out;
 	}
 
-	if (!all_data.buf) {
+	if (!state->all_data.buf) {
 		applog(LOG_DEBUG, "Empty data received in json_rpc_call.");
 		goto err_out;
 	}
@@ -421,39 +448,40 @@ json_t *json_rpc_call(CURL *curl, const char *url,
 	if (probing) {
 		pool->probed = true;
 		/* If X-Long-Polling was found, activate long polling */
-		if (hi.lp_path) {
+		if (state->hi.lp_path) {
 			if (pool->hdr_path != NULL)
 				free(pool->hdr_path);
-			pool->hdr_path = hi.lp_path;
+			pool->hdr_path = state->hi.lp_path;
 		} else
 			pool->hdr_path = NULL;
-		if (hi.stratum_url) {
-			pool->stratum_url = hi.stratum_url;
-			hi.stratum_url = NULL;
+		if (state->hi.stratum_url) {
+			pool->stratum_url = state->hi.stratum_url;
+			state->hi.stratum_url = NULL;
 		}
 	} else {
-		if (hi.lp_path) {
-			free(hi.lp_path);
-			hi.lp_path = NULL;
+		if (state->hi.lp_path) {
+			free(state->hi.lp_path);
+			state->hi.lp_path = NULL;
 		}
-		if (hi.stratum_url) {
-			free(hi.stratum_url);
-			hi.stratum_url = NULL;
+		if (state->hi.stratum_url) {
+			free(state->hi.stratum_url);
+			state->hi.stratum_url = NULL;
 		}
 	}
 
-	*rolltime = hi.rolltime;
-	pool->cgminer_pool_stats.rolltime = hi.rolltime;
-	pool->cgminer_pool_stats.hadrolltime = hi.hadrolltime;
-	pool->cgminer_pool_stats.canroll = hi.canroll;
-	pool->cgminer_pool_stats.hadexpire = hi.hadexpire;
+	if (rolltime)
+		*rolltime = state->hi.rolltime;
+	pool->cgminer_pool_stats.rolltime = state->hi.rolltime;
+	pool->cgminer_pool_stats.hadrolltime = state->hi.hadrolltime;
+	pool->cgminer_pool_stats.canroll = state->hi.canroll;
+	pool->cgminer_pool_stats.hadexpire = state->hi.hadexpire;
 
-	val = JSON_LOADS(all_data.buf, &err);
+	val = JSON_LOADS(state->all_data.buf, &err);
 	if (!val) {
 		applog(LOG_INFO, "JSON decode failed(%d): %s", err.line, err.text);
 
 		if (opt_protocol)
-			applog(LOG_DEBUG, "JSON protocol response:\n%s", all_data.buf);
+			applog(LOG_DEBUG, "JSON protocol response:\n%s", state->all_data.buf);
 
 		goto err_out;
 	}
@@ -487,27 +515,39 @@ json_t *json_rpc_call(CURL *curl, const char *url,
 		goto err_out;
 	}
 
-	if (hi.reason) {
-		json_object_set_new(val, "reject-reason", json_string(hi.reason));
-		free(hi.reason);
-		hi.reason = NULL;
+	if (state->hi.reason) {
+		json_object_set_new(val, "reject-reason", json_string(state->hi.reason));
+		free(state->hi.reason);
+		state->hi.reason = NULL;
 	}
 	successful_connect = true;
-	databuf_free(&all_data);
-	curl_slist_free_all(headers);
+	databuf_free(&state->all_data);
+	curl_slist_free_all(state->headers);
 	curl_easy_reset(curl);
+	free(state);
 	return val;
 
 err_out:
-	databuf_free(&all_data);
-	curl_slist_free_all(headers);
+	databuf_free(&state->all_data);
+	curl_slist_free_all(state->headers);
 	curl_easy_reset(curl);
 	if (!successful_connect)
 		applog(LOG_DEBUG, "Failed to connect in json_rpc_call");
 	curl_easy_setopt(curl, CURLOPT_FRESH_CONNECT, 1);
+	free(state);
 	return NULL;
 }
 
+json_t *json_rpc_call(CURL *curl, const char *url,
+		      const char *userpass, const char *rpc_req,
+		      bool probe, bool longpoll, int *rolltime,
+		      struct pool *pool, bool share)
+{
+	json_rpc_call_async(curl, url, userpass, rpc_req, longpoll, pool, share, NULL);
+	int rc = curl_easy_perform(curl);
+	return json_rpc_call_completed(curl, rc, probe, rolltime, NULL);
+}
+
 bool our_curl_supports_proxy_uris()
 {
 	curl_version_info_data *data = curl_version_info(CURLVERSION_NOW);
@@ -1494,3 +1534,47 @@ void RenameThread(const char* name)
 	(void)name;
 #endif
 }
+
+void notifier_init(int pipefd[2])
+{
+#ifdef WIN32
+	SOCKET listener, connecter, acceptor;
+	listener = socket(AF_INET, SOCK_STREAM, 0);
+	if (listener == INVALID_SOCKET)
+		quit(1, "Failed to create listener socket in create_notifier");
+	connecter = socket(AF_INET, SOCK_STREAM, 0);
+	if (connecter == INVALID_SOCKET)
+		quit(1, "Failed to create connect socket in create_notifier");
+	struct sockaddr_in inaddr = {
+		.sin_family = AF_INET,
+		.sin_addr = {
+			.s_addr = htonl(INADDR_LOOPBACK),
+		},
+		.sin_port = 0,
+	};
+	{
+		char reuse = 1;
+		setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
+	}
+	if (bind(listener, (struct sockaddr*)&inaddr, sizeof(inaddr) == SOCKET_ERROR))
+		quit(1, "Failed to bind listener socket in create_notifier");
+	socklen_t inaddr_sz = sizeof(inaddr);
+	if (getsockname(listener, (struct sockaddr*)&inaddr, &inaddr_sz) == SOCKET_ERROR)
+		quit(1, "Failed to getsockname in create_notifier");
+	if (listen(listener, 1) == SOCKET_ERROR)
+		quit(1, "Failed to listen in create_notifier");
+	inaddr.sin_family = AF_INET;
+	inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+	if (connect(connecter, (struct sockaddr*)&inaddr, inaddr_sz) == SOCKET_ERROR)
+		quit(1, "Failed to connect in create_notifier");
+	acceptor = accept(listener, NULL, NULL);
+	if (acceptor == INVALID_SOCKET)
+		quit(1, "Failed to accept in create_notifier");
+	closesocket(listener);
+	pipefd[0] = connecter;
+	pipefd[1] = acceptor;
+#else
+	if (pipe(pipefd))
+		quit(1, "Failed to create pipe in create_notifier");
+#endif
+}

+ 8 - 0
util.h

@@ -1,6 +1,9 @@
 #ifndef __UTIL_H__
 #define __UTIL_H__
 
+#include <curl/curl.h>
+#include <jansson.h>
+
 #if defined(unix) || defined(__APPLE__)
 	#include <errno.h>
 	#include <sys/socket.h>
@@ -45,6 +48,10 @@
 struct pool;
 enum dev_reason;
 struct cgpu_info;
+
+extern void json_rpc_call_async(CURL *, const char *url, const char *userpass, const char *rpc_req, bool longpoll, struct pool *pool, bool share, void *priv);
+extern json_t *json_rpc_call_completed(CURL *, int rc, bool probe, int *rolltime, void *out_priv);
+
 bool stratum_send(struct pool *pool, char *s, ssize_t len);
 char *recv_line(struct pool *pool);
 bool parse_method(struct pool *pool, char *s);
@@ -55,5 +62,6 @@ void suspend_stratum(struct pool *pool);
 void dev_error(struct cgpu_info *dev, enum dev_reason reason);
 void *realloc_strcat(char *ptr, char *s);
 void RenameThread(const char* name);
+extern void notifier_init(int pipefd[2]);
 
 #endif /* __UTIL_H__ */