Browse Source

Run a single share submission thread asynchronously submitting all shares in parallel

Luke Dashjr 13 years ago
parent
commit
a11f4234a0
3 changed files with 90 additions and 30 deletions
  1. 45 30
      miner.c
  2. 44 0
      util.c
  3. 1 0
      util.h

+ 45 - 30
miner.c

@@ -238,6 +238,7 @@ pthread_mutex_t control_lock;
 static pthread_mutex_t submitting_lock;
 static int submitting;
 static struct list_head submit_waiting;
+int submit_waiting_notifier[2];
 
 int hw_errors;
 int total_accepted, total_rejected, total_diff1;
@@ -3673,11 +3674,6 @@ static struct submit_work_state *begin_submission(struct workio_cmd *wc)
 
 	if (stale_work(work, true)) {
 		work->stale = true;
-		if (unlikely(!list_empty(&submit_waiting))) {
-			applog(LOG_WARNING, "Pool %d stale share detected while queued submissions are waiting, discarding", pool->pool_no);
-			submit_discard_share(work);
-			goto out;
-		}
 		if (opt_submit_stale)
 			applog(LOG_NOTICE, "Pool %d stale share detected, submitting as user requested", pool->pool_no);
 		else if (pool->submit_old)
@@ -3755,11 +3751,7 @@ static bool retry_submission(struct submit_work_state *sws)
 			return false;
 		}
 		else if (work->stale) {
-			if (unlikely(!list_empty(&submit_waiting))) {
-				applog(LOG_WARNING, "Pool %d stale share failed to submit while queued submissions are waiting, discarding", pool->pool_no);
-				submit_discard_share(work);
-				return false;
-			} else if (unlikely(opt_retries < 0 && sws->staleexpire <= time(NULL))) {
+			if (unlikely(opt_retries < 0 && sws->staleexpire <= time(NULL))) {
 				applog(LOG_NOTICE, "Pool %d stale share failed to submit for 5 minutes, discarding", pool->pool_no);
 				submit_discard_share(work);
 				return false;
@@ -3803,7 +3795,6 @@ static void *submit_work_thread(void *userdata)
 	curl_multi_setopt(curlm, CURLMOPT_TIMERFUNCTION, my_curl_timer_set);
 	curl_multi_setopt(curlm, CURLMOPT_TIMERDATA, &curlm_timeout_ms);
 
-next_submit:
 	if ( (sws = begin_submission(wc)) ) {
 		if (sws->ce)
 			curl_multi_add_handle(curlm, sws->ce->curl);
@@ -3814,16 +3805,43 @@ next_submit:
 
 	fd_set rfds, wfds, efds;
 	int maxfd;
-	struct timeval timeout;
+	struct timeval timeout, *timeoutp;
 	int n;
 	CURLMsg *cm;
-	while (wip) {
+	FD_ZERO(&rfds);
+	while (1) {
+		mutex_lock(&submitting_lock);
+		if (FD_ISSET(submit_waiting_notifier[0], &rfds)) {
+			char buf[0x10];
+			(void)read(submit_waiting_notifier[0], buf, sizeof(buf));
+		}
+		while (!list_empty(&submit_waiting)) {
+			wc = list_entry(submit_waiting.next, struct workio_cmd, list);
+			list_del(&wc->list);
+			if ( (sws = begin_submission(wc)) ) {
+				if (sws->ce)
+					curl_multi_add_handle(curlm, sws->ce->curl);
+				else {
+					sws->next = write_sws;
+					write_sws = sws;
+				}
+				++wip;
+			}
+		}
+		if (!wip)
+			break;
+		mutex_unlock(&submitting_lock);
+		
 		FD_ZERO(&rfds);
 		FD_ZERO(&wfds);
 		FD_ZERO(&efds);
 		curl_multi_fdset(curlm, &rfds, &wfds, &efds, &maxfd);
-		timeout.tv_sec = curlm_timeout_ms / 1000;
-		timeout.tv_usec = (curlm_timeout_ms % 1000) * 1000;
+		if (curlm_timeout_ms >= 0) {
+			timeout.tv_sec = curlm_timeout_ms / 1000;
+			timeout.tv_usec = (curlm_timeout_ms % 1000) * 1000;
+			timeoutp = &timeout;
+		} else
+			timeoutp = NULL;
 		
 		for (sws = write_sws; sws; sws = sws->next) {
 			int fd = sws->work->pool->sock;
@@ -3832,7 +3850,14 @@ next_submit:
 				maxfd = fd;
 		}
 		
-		select(maxfd+1, &rfds, &wfds, &efds, &timeout);
+		FD_SET(submit_waiting_notifier[0], &rfds);
+		if (submit_waiting_notifier[0] > maxfd)
+			maxfd = submit_waiting_notifier[0];
+		
+		if (select(maxfd+1, &rfds, &wfds, &efds, timeoutp) < 0) {
+			FD_ZERO(&rfds);
+			continue;
+		}
 		
 		for (swsp = &write_sws; (sws = *swsp); ) {
 			int fd = sws->work->pool->sock;
@@ -3878,15 +3903,6 @@ next_submit:
 		}
 	}
 	assert(!write_sws);
-
-	mutex_lock(&submitting_lock);
-	if (!list_empty(&submit_waiting)) {
-		applog(LOG_DEBUG, "submit_work continuing with queued submission");
-		wc = list_entry(submit_waiting.next, struct workio_cmd, list);
-		list_del(&wc->list);
-		mutex_unlock(&submitting_lock);
-		goto next_submit;
-	}
 	--submitting;
 	mutex_unlock(&submitting_lock);
 
@@ -5284,12 +5300,9 @@ static void *workio_thread(void *userdata)
 		case WC_SUBMIT_WORK:
 		{
 			mutex_lock(&submitting_lock);
-			if (submitting >= opt_submit_threads) {
-				if (list_empty(&submit_waiting))
-					applog(LOG_WARNING, "workio_thread queuing submissions (see --submit-threads)");
-				else
-					applog(LOG_DEBUG, "workio_thread queuing submission");
+			if (submitting) {
 				list_add_tail(&wc->list, &submit_waiting);
+				(void)write(submit_waiting_notifier[1], "\0", 1);
 				mutex_unlock(&submitting_lock);
 				break;
 			}
@@ -7721,6 +7734,8 @@ int main(int argc, char *argv[])
 	if (unlikely(pthread_cond_init(&restart_cond, NULL)))
 		quit(1, "Failed to pthread_cond_init restart_cond");
 
+	notifier_init(submit_waiting_notifier);
+
 	sprintf(packagename, "%s %s", PACKAGE, VERSION);
 
 #ifdef WANT_CPUMINE

+ 44 - 0
util.c

@@ -1530,3 +1530,47 @@ void RenameThread(const char* name)
 	(void)name;
 #endif
 }
+
+void notifier_init(int pipefd[2])
+{
+#ifdef WIN32
+	SOCKET listener, connecter, acceptor;
+	listener = socket(AF_INET, SOCK_STREAM, 0);
+	if (listener == INVALID_SOCKET)
+		quit(1, "Failed to create listener socket in create_notifier");
+	connecter = socket(AF_INET, SOCK_STREAM, 0);
+	if (connecter == INVALID_SOCKET)
+		quit(1, "Failed to create connect socket in create_notifier");
+	struct sockaddr_in inaddr = {
+		.sin_family = AF_INET,
+		.sin_addr = {
+			.s_addr = htonl(INADDR_LOOPBACK),
+		},
+		.sin_port = 0,
+	};
+	{
+		char reuse = 1;
+		setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
+	}
+	if (bind(listener, (struct sockaddr*)&inaddr, sizeof(inaddr) == SOCKET_ERROR))
+		quit(1, "Failed to bind listener socket in create_notifier");
+	socklen_t inaddr_sz = sizeof(inaddr);
+	if (getsockname(listener, (struct sockaddr*)&inaddr, &inaddr_sz) == SOCKET_ERROR)
+		quit(1, "Failed to getsockname in create_notifier");
+	if (listen(listener, 1) == SOCKET_ERROR)
+		quit(1, "Failed to listen in create_notifier");
+	inaddr.sin_family = AF_INET;
+	inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+	if (connect(connecter, (struct sockaddr*)&inaddr, inaddr_sz) == SOCKET_ERROR)
+		quit(1, "Failed to connect in create_notifier");
+	acceptor = accept(listener, NULL, NULL);
+	if (acceptor == INVALID_SOCKET)
+		quit(1, "Failed to accept in create_notifier");
+	closesocket(listener);
+	pipefd[0] = connecter;
+	pipefd[1] = acceptor;
+#else
+	if (pipe(pipefd))
+		quit(1, "Failed to create pipe in create_notifier");
+#endif
+}

+ 1 - 0
util.h

@@ -62,5 +62,6 @@ void suspend_stratum(struct pool *pool);
 void dev_error(struct cgpu_info *dev, enum dev_reason reason);
 void *realloc_strcat(char *ptr, char *s);
 void RenameThread(const char* name);
+extern void notifier_init(int pipefd[2]);
 
 #endif /* __UTIL_H__ */