Browse Source

Handle share submissions asynchronously, one at a time (still threaded)

Luke Dashjr 13 years ago
parent
commit
281a16bd7c
1 changed files with 164 additions and 52 deletions
  1. 164 52
      miner.c

+ 164 - 52
miner.c

@@ -3027,6 +3027,7 @@ static void workio_cmd_free(struct workio_cmd *wc)
 
 
 	switch (wc->cmd) {
 	switch (wc->cmd) {
 	case WC_SUBMIT_WORK:
 	case WC_SUBMIT_WORK:
+		if (wc->work)
 		free_work(wc->work);
 		free_work(wc->work);
 		break;
 		break;
 	default: /* do nothing */
 	default: /* do nothing */
@@ -3637,27 +3638,36 @@ static void submit_discard_share(struct work *work)
 	work->pool->diff_stale += work->work_difficulty;
 	work->pool->diff_stale += work->work_difficulty;
 }
 }
 
 
-static void *submit_work_thread(void *userdata)
-{
-	struct workio_cmd *wc = (struct workio_cmd *)userdata;
+struct submit_work_state {
 	struct work *work;
 	struct work *work;
-	struct pool *pool;
 	bool resubmit;
 	bool resubmit;
 	struct curl_ent *ce;
 	struct curl_ent *ce;
 	int failures;
 	int failures;
 	time_t staleexpire;
 	time_t staleexpire;
+	char *s;
+	struct timeval tv_submit;
+	struct submit_work_state *next;
+};
 
 
-	pthread_detach(pthread_self());
-
-	RenameThread("submit_work");
+static int my_curl_timer_set(__maybe_unused CURLM *curlm, long timeout_ms, void *userp)
+{
+	long *timeout = userp;
+	*timeout = timeout_ms;
+	return 0;
+}
 
 
-	applog(LOG_DEBUG, "Creating extra submit work thread");
+static struct submit_work_state *begin_submission(struct workio_cmd *wc)
+{
+	struct work *work;
+	struct pool *pool;
+	struct submit_work_state *sws = NULL;
 
 
-next_submit:
 	work = wc->work;
 	work = wc->work;
 	pool = work->pool;
 	pool = work->pool;
-	resubmit = false;
-	failures = 0;
+	sws = malloc(sizeof(*sws));
+	*sws = (struct submit_work_state){
+		.work = work,
+	};
 
 
 	check_solve(work);
 	check_solve(work);
 
 
@@ -3677,14 +3687,14 @@ next_submit:
 			submit_discard_share(work);
 			submit_discard_share(work);
 			goto out;
 			goto out;
 		}
 		}
-		staleexpire = time(NULL) + 300;
+		sws->staleexpire = time(NULL) + 300;
 	}
 	}
 
 
 	if (work->stratum) {
 	if (work->stratum) {
 		struct stratum_share *sshare = calloc(sizeof(struct stratum_share), 1);
 		struct stratum_share *sshare = calloc(sizeof(struct stratum_share), 1);
 		uint32_t nonce;
 		uint32_t nonce;
 		char *noncehex;
 		char *noncehex;
-		char s[1024];
+		char *s;
 
 
 		sshare->work = copy_work(work);
 		sshare->work = copy_work(work);
 		mutex_lock(&sshare_lock);
 		mutex_lock(&sshare_lock);
@@ -3695,43 +3705,37 @@ next_submit:
 
 
 		nonce = *((uint32_t *)(work->data + 76));
 		nonce = *((uint32_t *)(work->data + 76));
 		noncehex = bin2hex((const unsigned char *)&nonce, 4);
 		noncehex = bin2hex((const unsigned char *)&nonce, 4);
-		memset(s, 0, 1024);
+		s = malloc(1024);
 		sprintf(s, "{\"params\": [\"%s\", \"%s\", \"%s\", \"%s\", \"%s\"], \"id\": %d, \"method\": \"mining.submit\"}",
 		sprintf(s, "{\"params\": [\"%s\", \"%s\", \"%s\", \"%s\", \"%s\"], \"id\": %d, \"method\": \"mining.submit\"}",
 			pool->rpc_user, work->job_id, work->nonce2, work->ntime, noncehex, sshare->id);
 			pool->rpc_user, work->job_id, work->nonce2, work->ntime, noncehex, sshare->id);
 		free(noncehex);
 		free(noncehex);
 
 
-		applog(LOG_DEBUG, "DBG: sending %s submit RPC call: %s", pool->stratum_url, s);
-
-		if (likely(stratum_send(pool, s, strlen(s)))) {
-			if (pool_tclear(pool, &pool->submit_fail))
-					applog(LOG_WARNING, "Pool %d communication resumed, submitting work", pool->pool_no);
-			applog(LOG_DEBUG, "Successfully submitted, adding to stratum_shares db");
-		} else if (!pool_tset(pool, &pool->submit_fail)) {
-			applog(LOG_WARNING, "Pool %d stratum share submission failure", pool->pool_no);
-			total_ro++;
-			pool->remotefail_occasions++;
-		}
+		sws->s = s;
+	} else {
+		/* submit solution to bitcoin via JSON-RPC */
+		sws->ce = pop_curl_entry(pool);
+		sws->s = submit_upstream_work_request(work);
 
 
-		goto out;
+		gettimeofday(&sws->tv_submit, NULL);
+		json_rpc_call_async(sws->ce->curl, pool->rpc_url, pool->rpc_userpass, sws->s, false, pool, true, sws);
 	}
 	}
 
 
-	ce = pop_curl_entry(pool);
-	/* submit solution to bitcoin via JSON-RPC */
-	while (1) {
-		{
-			char *s = submit_upstream_work_request(work);
-			struct timeval tv_submit;
-			json_t *val;
-
-			gettimeofday(&tv_submit, NULL);
-			val = json_rpc_call(ce->curl, pool->rpc_url, pool->rpc_userpass, s, false, false, NULL, pool, true);
-			free(s);
-			if (submit_upstream_work_completed(work, resubmit, &tv_submit, val))
-				break;
-		}
+	wc->work = NULL;
+	workio_cmd_free(wc);
+	return sws;
 
 
-		
-		resubmit = true;
+out:
+	free(sws);
+	workio_cmd_free(wc);
+	return NULL;
+}
+
+static bool retry_submission(struct submit_work_state *sws)
+{
+	struct work *work = sws->work;
+	struct pool *pool = work->pool;
+
+		sws->resubmit = true;
 		if ((!work->stale) && stale_work(work, true)) {
 		if ((!work->stale) && stale_work(work, true)) {
 			work->stale = true;
 			work->stale = true;
 			if (opt_submit_stale)
 			if (opt_submit_stale)
@@ -3741,33 +3745,139 @@ next_submit:
 			else {
 			else {
 				applog(LOG_NOTICE, "Pool %d share became stale during submission failure, discarding", pool->pool_no);
 				applog(LOG_NOTICE, "Pool %d share became stale during submission failure, discarding", pool->pool_no);
 				submit_discard_share(work);
 				submit_discard_share(work);
-				break;
+				return false;
 			}
 			}
-			staleexpire = time(NULL) + 300;
+			sws->staleexpire = time(NULL) + 300;
 		}
 		}
-		if (unlikely((opt_retries >= 0) && (++failures > opt_retries))) {
+		if (unlikely((opt_retries >= 0) && (++sws->failures > opt_retries))) {
 			applog(LOG_ERR, "Pool %d failed %d submission retries, discarding", pool->pool_no, opt_retries);
 			applog(LOG_ERR, "Pool %d failed %d submission retries, discarding", pool->pool_no, opt_retries);
 			submit_discard_share(work);
 			submit_discard_share(work);
-			break;
+			return false;
 		}
 		}
 		else if (work->stale) {
 		else if (work->stale) {
 			if (unlikely(!list_empty(&submit_waiting))) {
 			if (unlikely(!list_empty(&submit_waiting))) {
 				applog(LOG_WARNING, "Pool %d stale share failed to submit while queued submissions are waiting, discarding", pool->pool_no);
 				applog(LOG_WARNING, "Pool %d stale share failed to submit while queued submissions are waiting, discarding", pool->pool_no);
 				submit_discard_share(work);
 				submit_discard_share(work);
-				break;
-			} else if (unlikely(opt_retries < 0 && staleexpire <= time(NULL))) {
+				return false;
+			} else if (unlikely(opt_retries < 0 && sws->staleexpire <= time(NULL))) {
 				applog(LOG_NOTICE, "Pool %d stale share failed to submit for 5 minutes, discarding", pool->pool_no);
 				applog(LOG_NOTICE, "Pool %d stale share failed to submit for 5 minutes, discarding", pool->pool_no);
 				submit_discard_share(work);
 				submit_discard_share(work);
-				break;
+				return false;
 			}
 			}
 		}
 		}
 
 
 		/* pause, then restart work-request loop */
 		/* pause, then restart work-request loop */
 		applog(LOG_INFO, "json_rpc_call failed on submit_work, retrying");
 		applog(LOG_INFO, "json_rpc_call failed on submit_work, retrying");
+
+		gettimeofday(&sws->tv_submit, NULL);
+		json_rpc_call_async(sws->ce->curl, pool->rpc_url, pool->rpc_userpass, sws->s, false, pool, true, sws);
+	
+	return true;
+}
+
+static void free_sws(struct submit_work_state *sws)
+{
+	if (sws->ce)
+		push_curl_entry(sws->ce, sws->work->pool);
+	free(sws->s);
+	free_work(sws->work);
+	free(sws);
+}
+
+static void *submit_work_thread(void *userdata)
+{
+	struct workio_cmd *wc = (struct workio_cmd *)userdata;
+	int wip = 0;
+	CURLM *curlm;
+	long curlm_timeout_ms = -1;
+	struct submit_work_state *sws, **swsp;
+	struct submit_work_state *write_sws = NULL;
+
+	pthread_detach(pthread_self());
+
+	RenameThread("submit_work");
+
+	applog(LOG_DEBUG, "Creating extra submit work thread");
+
+	curlm = curl_multi_init();
+	curl_multi_setopt(curlm, CURLMOPT_TIMERFUNCTION, my_curl_timer_set);
+	curl_multi_setopt(curlm, CURLMOPT_TIMERDATA, &curlm_timeout_ms);
+
+next_submit:
+	if ( (sws = begin_submission(wc)) ) {
+		if (sws->ce)
+			curl_multi_add_handle(curlm, sws->ce->curl);
+		else
+			write_sws = sws;
+		++wip;
+	}
+
+	fd_set rfds, wfds, efds;
+	int maxfd;
+	struct timeval timeout;
+	int n;
+	CURLMsg *cm;
+	while (wip) {
+		FD_ZERO(&rfds);
+		FD_ZERO(&wfds);
+		FD_ZERO(&efds);
+		curl_multi_fdset(curlm, &rfds, &wfds, &efds, &maxfd);
+		timeout.tv_sec = curlm_timeout_ms / 1000;
+		timeout.tv_usec = (curlm_timeout_ms % 1000) * 1000;
+		
+		for (sws = write_sws; sws; sws = sws->next) {
+			int fd = sws->work->pool->sock;
+			FD_SET(fd, &wfds);
+			if (fd > maxfd)
+				maxfd = fd;
+		}
+		
+		select(maxfd+1, &rfds, &wfds, &efds, &timeout);
+		
+		for (swsp = &write_sws; (sws = *swsp); ) {
+			int fd = sws->work->pool->sock;
+			if (!FD_ISSET(fd, &wfds)) {
+				swsp = &sws->next;
+				continue;
+			}
+			
+			struct pool *pool = sws->work->pool;
+			char *s = sws->s;
+
+			applog(LOG_DEBUG, "DBG: sending %s submit RPC call: %s", pool->stratum_url, s);
+
+			if (likely(stratum_send(pool, s, strlen(s)))) {
+				if (pool_tclear(pool, &pool->submit_fail))
+						applog(LOG_WARNING, "Pool %d communication resumed, submitting work", pool->pool_no);
+				applog(LOG_DEBUG, "Successfully submitted, adding to stratum_shares db");
+			} else if (!pool_tset(pool, &pool->submit_fail)) {
+				applog(LOG_WARNING, "Pool %d stratum share submission failure", pool->pool_no);
+				total_ro++;
+				pool->remotefail_occasions++;
+			}
+			
+			// Clear the fd from wfds, to avoid potentially blocking on other submissions to the same socket
+			FD_CLR(fd, &wfds);
+			// Delete sws for this submission, since we're done with it
+			*swsp = sws->next;
+			free_sws(sws);
+			--wip;
+		}
+		
+		curl_multi_perform(curlm, &n);
+		while( (cm = curl_multi_info_read(curlm, &n)) ) {
+			if (cm->msg == CURLMSG_DONE)
+			{
+				json_t *val = json_rpc_call_completed(cm->easy_handle, cm->data.result, false, NULL, &sws);
+				if (submit_upstream_work_completed(sws->work, sws->resubmit, &sws->tv_submit, val) || !retry_submission(sws)) {
+					--wip;
+					curl_multi_remove_handle(curlm, cm->easy_handle);
+					free_sws(sws);
+				}
+			}
+		}
 	}
 	}
-	push_curl_entry(ce, pool);
-out:
-	workio_cmd_free(wc);
+	assert(!write_sws);
 
 
 	mutex_lock(&submitting_lock);
 	mutex_lock(&submitting_lock);
 	if (!list_empty(&submit_waiting)) {
 	if (!list_empty(&submit_waiting)) {
@@ -3780,6 +3890,8 @@ out:
 	--submitting;
 	--submitting;
 	mutex_unlock(&submitting_lock);
 	mutex_unlock(&submitting_lock);
 
 
+	curl_multi_cleanup(curlm);
+
 	return NULL;
 	return NULL;
 }
 }