Browse Source

Create a stratum thread per pool that has stratum that monitors the socket and serves received data.

Con Kolivas 13 years ago
parent
commit
56255a0c86
4 changed files with 61 additions and 6 deletions
  1. 56 4
      cgminer.c
  2. 1 0
      miner.h
  3. 2 2
      util.c
  4. 2 0
      util.h

+ 56 - 4
cgminer.c

@@ -3957,6 +3957,50 @@ out_unlock:
 	}
 }
 
+/* One stratum thread per pool that has stratum waits on the socket checking
+ * for new messages and for the integrity of the socket connection. We reset
+ * the connection based on the integrity of the receive side only as the send
+ * side will eventually expire data it fails to send. */
+static void *stratum_thread(void *userdata)
+{
+	struct pool *pool = (struct pool *)userdata;
+	SOCKETTYPE sock = pool->sock;
+	fd_set rd;
+
+	pthread_detach(pthread_self());
+
+	FD_ZERO(&rd);
+	FD_SET(sock, &rd);
+
+	while (42) {
+		char *s;
+
+		if (select(sock + 1, &rd, NULL, NULL, NULL) < 0) {
+			pool->stratum_active = pool->stratum_auth = false;
+			applog(LOG_WARNING, "Stratum connection to pool %d interrupted", pool->pool_no);
+			pool->getfail_occasions++;
+			if (!initiate_stratum(pool) || !auth_stratum(pool)) {
+				pool_died(pool);
+				break;
+			}
+		}
+		s = recv_line(sock);
+		if (unlikely(!s))
+			continue;
+		if (!parse_stratum(pool, s)) /* Create message queues here */
+			applog(LOG_INFO, "Unknown stratum msg: %s", s);
+		free(s);
+	}
+
+	return NULL;
+}
+
+static void init_stratum_thread(struct pool *pool)
+{
+	if (unlikely(pthread_create(&pool->stratum_thread, NULL, stratum_thread, (void *)pool)))
+		quit(1, "Failed to create stratum thread");
+}
+
 static void *longpoll_thread(void *userdata);
 
 static bool pool_active(struct pool *pool, bool pinging)
@@ -3970,9 +4014,15 @@ static bool pool_active(struct pool *pool, bool pinging)
 	if (pool->has_stratum) {
 		if ((!pool->stratum_active || pinging) && !initiate_stratum(pool))
 			return false;
-		if (!pool->stratum_auth && !auth_stratum(pool))
-			return false;
-		return false;
+		if (!pool->stratum_auth) {
+			if (!auth_stratum(pool))
+				return false;
+			/* We create the stratum thread for each pool just
+			 * after successful authorisation */
+			init_stratum_thread(pool);
+			return true;
+		}
+		return true;
 	}
 
 	curl = curl_easy_init();
@@ -5473,7 +5523,9 @@ int main(int argc, char *argv[])
 	sigemptyset(&handler.sa_mask);
 	sigaction(SIGTERM, &handler, &termhandler);
 	sigaction(SIGINT, &handler, &inthandler);
-
+#ifndef WIN32
+	signal(SIGPIPE, SIG_IGN);
+#endif
 	opt_kernel_path = alloca(PATH_MAX);
 	strcpy(opt_kernel_path, CGMINER_PREFIX);
 	cgminer_path = alloca(PATH_MAX);

+ 1 - 0
miner.h

@@ -842,6 +842,7 @@ struct pool {
 	bool stratum_active;
 	bool stratum_auth;
 	struct stratum_work swork;
+	pthread_t stratum_thread;
 };
 
 #define GETWORK_MODE_TESTPOOL 'T'

+ 2 - 2
util.c

@@ -897,7 +897,7 @@ static bool sock_full(SOCKETTYPE sock, bool wait)
 
 /* Peeks at a socket to find the first end of line and then reads just that
  * from the socket and returns that as a malloced char */
-static char *recv_line(SOCKETTYPE sock)
+char *recv_line(SOCKETTYPE sock)
 {
 	char *sret = NULL, *s;
 	ssize_t len;
@@ -1048,7 +1048,7 @@ static bool parse_diff(struct pool *pool, json_t *val)
 	return true;
 }
 
-static bool parse_stratum(struct pool *pool, char *s)
+bool parse_stratum(struct pool *pool, char *s)
 {
 	json_t *val = NULL, *method, *err_val, *params;
 	json_error_t err;

+ 2 - 0
util.h

@@ -109,6 +109,8 @@
 	#endif
 #endif
 struct pool;
+char *recv_line(SOCKETTYPE sock);
+bool parse_stratum(struct pool *pool, char *s);
 bool extract_sockaddr(struct pool *pool, char *url);
 bool auth_stratum(struct pool *pool);
 bool initiate_stratum(struct pool *pool);