Browse Source

Merge commit '636f4b1' into stratum

Luke Dashjr 13 years ago
parent
commit
ae228a7618
4 changed files with 491 additions and 49 deletions
  1. 160 5
      miner.c
  2. 6 5
      miner.h
  3. 322 39
      util.c
  4. 3 0
      util.h

+ 160 - 5
miner.c

@@ -4781,6 +4781,64 @@ out_unlock:
 	}
 }
 
+/* One stratum thread per pool that has stratum waits on the socket checking
+ * for new messages and for the integrity of the socket connection. We reset
+ * the connection based on the integrity of the receive side only as the send
+ * side will eventually expire data it fails to send. */
+static void *stratum_thread(void *userdata)
+{
+	struct pool *pool = (struct pool *)userdata;
+
+	pthread_detach(pthread_self());
+
+	while (42) {
+		fd_set rd;
+		char *s;
+
+		FD_ZERO(&rd);
+		FD_SET(pool->sock, &rd);
+
+		if (select(pool->sock + 1, &rd, NULL, NULL, NULL) < 0) {
+			pool->stratum_active = pool->stratum_auth = false;
+			applog(LOG_WARNING, "Stratum connection to pool %d interrupted", pool->pool_no);
+			pool->getfail_occasions++;
+			total_go++;
+			while (!initiate_stratum(pool) || !auth_stratum(pool)) {
+				if (!pool->idle)
+					pool_died(pool);
+				if (pool->removed)
+					goto out;
+				sleep(5);
+			}
+		}
+		s = recv_line(pool->sock);
+		if (unlikely(!s))
+			continue;
+		if (!parse_stratum(pool, s)) /* Create message queues here */
+			applog(LOG_INFO, "Unknown stratum msg: %s", s);
+		free(s);
+		if (unlikely(pool->swork.clean)) {
+			pool->swork.clean = false;
+			applog(LOG_NOTICE, "Stratum requested work restart for block change");
+			restart_threads();
+		}
+
+		if (unlikely(pool->removed)) {
+			CLOSESOCKET(pool->sock);
+			goto out;
+		}
+	}
+
+out:
+	return NULL;
+}
+
+static void init_stratum_thread(struct pool *pool)
+{
+	if (unlikely(pthread_create(&pool->stratum_thread, NULL, stratum_thread, (void *)pool)))
+		quit(1, "Failed to create stratum thread");
+}
+
 static void *longpoll_thread(void *userdata);
 
 // NOTE: This assumes reference URI is a root
@@ -4819,11 +4877,17 @@ static bool pool_active(struct pool *pool, bool pinging)
 	enum pool_protocol proto;
 
 	if (pool->has_stratum) {
-		if (pool->stratum_active && !pinging)
-			return true;
-		if (initiate_stratum(pool))
+		if ((!pool->stratum_active || pinging) && !initiate_stratum(pool))
+			return false;
+		if (!pool->stratum_auth) {
+			if (!auth_stratum(pool))
+				return false;
+			/* We create the stratum thread for each pool just
+			 * after successful authorisation */
+			init_stratum_thread(pool);
 			return true;
-		return false;
+		}
+		return true;
 	}
 
 	curl = curl_easy_init();
@@ -5071,6 +5135,90 @@ static struct work *clone_work(struct work *work)
 	return work;
 }
 
+static void gen_hash(unsigned char *data, unsigned char *hash, int len)
+{
+	unsigned char hash1[32];
+
+	sha2(data, len, hash1, false);
+	sha2(hash1, 32, hash, false);
+}
+
+static void gen_stratum_work(struct pool *pool, struct work *work)
+{
+	unsigned char merkle_root[32], merkle_sha[64], *merkle_hash;
+	char header[256], hash1[128], *coinbase, *nonce2, *buf;
+	uint32_t *data32, *swap32;
+	uint64_t diff, diff64;
+	int len, i;
+
+	mutex_lock(&pool->pool_lock);
+
+	/* Generate coinbase */
+	len = strlen(pool->swork.coinbase1) +
+	      strlen(pool->nonce1) +
+	      pool->n2size +
+	      strlen(pool->swork.coinbase2);
+	coinbase = alloca(len + 1);
+	sprintf(coinbase, "%s", pool->swork.coinbase1);
+	strcat(coinbase, pool->nonce1);
+	nonce2 = bin2hex((const unsigned char *)&pool->nonce2, pool->n2size);
+	pool->nonce2++;
+	strcat(coinbase, nonce2);
+	free(nonce2);
+	strcat(coinbase, pool->swork.coinbase2);
+
+	/* Generate merkle root */
+	gen_hash((unsigned char *)coinbase, merkle_root, len);
+	memcpy(merkle_sha, merkle_root, 32);
+	for (i = 0; i < pool->swork.merkles; i++) {
+		memcpy(merkle_sha + 32, pool->swork.merkle[i], 32);
+		gen_hash(merkle_sha, merkle_root, 64);
+		memcpy(merkle_sha, merkle_root, 32);
+	}
+	data32 = (uint32_t *)merkle_sha;
+	swap32 = (uint32_t *)merkle_root;
+	for (i = 0; i < 32 / 4; i++)
+		swap32[i] = swab32(data32[i]);
+	merkle_hash = (unsigned char *)bin2hex((const unsigned char *)merkle_root, 32);
+
+	sprintf(header, "%s", pool->swork.bbversion);
+	strcat(header, pool->swork.prev_hash);
+	strcat(header, (char *)merkle_hash);
+	strcat(header, pool->swork.ntime);
+	strcat(header, pool->swork.nbit);
+	strcat(header, "00000000"); /* nonce */
+	strcat(header, "000000800000000000000000000000000000000000000000000000000000000000000000000000000000000080020000");
+
+	diff = pool->swork.diff;
+
+	mutex_unlock(&pool->pool_lock);
+
+	applog(LOG_DEBUG, "Generated stratum coinbase %s", coinbase);
+	applog(LOG_DEBUG, "Generated stratum merkle %s", merkle_hash);
+	applog(LOG_DEBUG, "Generated stratum header %s", header);
+
+	free(merkle_hash);
+
+	/* Convert hex data to binary data for work */
+	if (!hex2bin(work->data, header, 128))
+		quit(1, "Failed to convert header to data in gen_stratum_work");
+	calc_midstate(work);
+	sprintf(hash1, "00000000000000000000000000000000000000000000000000000000000000000000008000000000000000000000000000000000000000000000000000010000");
+	if (!hex2bin(work->hash1, hash1, 64))
+		quit(1,  "Failed to convert hash1 in gen_stratum_work");
+
+	/* Generate target as hex where 0x00000000FFFFFFFF is diff 1 */
+	diff64 = 0x00000000FFFFFFFFULL * diff;
+	diff64 = ~htobe64(diff64);
+	sprintf((char *)work->target, "ffffffffffffffffffffffffffffffffffffffffffffffff");
+	buf = bin2hex((const unsigned char *)&diff64, 8);
+	if (!buf)
+		quit(1, "Failed to convert diff64 to buf in gen_stratum_work");
+	strcat((char *)work->target, buf);
+	free(buf);
+	applog(LOG_DEBUG, "Generated target %s", work->target);
+}
+
 static void get_work(struct work *work, struct thr_info *thr, const int thr_id)
 {
 	struct timespec abstime = {0, 0};
@@ -5090,6 +5238,11 @@ static void get_work(struct work *work, struct thr_info *thr, const int thr_id)
 retry:
 	pool = current_pool();
 
+	if (pool->has_stratum) {
+		gen_stratum_work(pool, work);
+		goto out;
+	}
+
 	if (reuse_work(work))
 		goto out;
 
@@ -6506,7 +6659,9 @@ int main(int argc, char *argv[])
 	sigemptyset(&handler.sa_mask);
 	sigaction(SIGTERM, &handler, &termhandler);
 	sigaction(SIGINT, &handler, &inthandler);
-
+#ifndef WIN32
+	signal(SIGPIPE, SIG_IGN);
+#endif
 	opt_kernel_path = alloca(PATH_MAX);
 	strcpy(opt_kernel_path, CGMINER_PREFIX);
 	cgminer_path = alloca(PATH_MAX);

+ 6 - 5
miner.h

@@ -831,20 +831,18 @@ enum pool_protocol {
 struct stratum_work {
 	/* id we sent to receive this work */
 	int id;
-	/* Reference to json structure all the following were extracted from */
-	json_t *json_val;
 
 	char *job_id;
 	char *prev_hash;
 	char *coinbase1;
 	char *coinbase2;
-	char *merkle1;
-	char *merkle2;
+	char **merkle;
 	char *bbversion;
 	char *nbit;
 	char *ntime;
 	bool clean;
 
+	int merkles;
 	int diff;
 };
 
@@ -922,10 +920,13 @@ struct pool {
 	struct sockaddr_in *server, client;
 	char *subscription;
 	char *nonce1;
-	int nonce2;
+	uint32_t nonce2;
+	int n2size;
 	bool has_stratum;
 	bool stratum_active;
+	bool stratum_auth;
 	struct stratum_work swork;
+	pthread_t stratum_thread;
 };
 
 #define GETWORK_MODE_TESTPOOL 'T'

+ 322 - 39
util.c

@@ -824,10 +824,17 @@ bool extract_sockaddr(struct pool *pool, char *url)
 	return true;
 }
 
+/* Send a single command across a socket, appending \n to it */
 static bool sock_send(int sock, char *s, ssize_t len)
 {
 	ssize_t sent = 0;
 
+	if (opt_protocol)
+		applog(LOG_DEBUG, "SEND: %s", s);
+
+	strcat(s, "\n");
+	len++;
+
 	while (len > 0 ) {
 		sent = send(sock, s + sent, len, 0);
 		if (SOCKETFAIL(sent))
@@ -841,18 +848,307 @@ static bool sock_send(int sock, char *s, ssize_t len)
 
 #define RECVSIZE 8192
 
+static void clear_sock(SOCKETTYPE sock)
+{
+	char *s = alloca(RECVSIZE);
+
+	recv(sock, s, RECVSIZE, MSG_DONTWAIT);
+}
+
+/* Check to see if Santa's been good to you */
+static bool sock_full(SOCKETTYPE sock, bool wait)
+{
+	struct timeval timeout;
+	fd_set rd;
+
+	FD_ZERO(&rd);
+	FD_SET(sock, &rd);
+	timeout.tv_usec = 0;
+	if (wait)
+		timeout.tv_sec = 60;
+	else
+		timeout.tv_sec = 0;
+	if (select(sock + 1, &rd, NULL, NULL, &timeout) > 0)
+		return true;
+	return false;
+}
+
+/* Peeks at a socket to find the first end of line and then reads just that
+ * from the socket and returns that as a malloced char */
+char *recv_line(SOCKETTYPE sock)
+{
+	char *sret = NULL, *s;
+	ssize_t len;
+
+	s = alloca(RECVSIZE);
+	if (SOCKETFAIL(recv(sock, s, RECVSIZE, MSG_PEEK))) {
+		applog(LOG_DEBUG, "Failed to recv sock in recv_line");
+		goto out;
+	}
+	sret = strtok(s, "\n");
+	if (!sret) {
+		applog(LOG_DEBUG, "Failed to parse a \\n terminated string in recv_line");
+		goto out;
+	}
+	len = strlen(sret) + 1;
+	/* We know how much data is in the buffer so this read should not fail */
+	if (SOCKETFAIL(recv(sock, s, len, 0)))
+		goto out;
+	if (s)
+		sret = strdup(strtok(s, "\n"));
+out:
+	if (!sret)
+		clear_sock(sock);
+	else if (opt_protocol)
+		applog(LOG_DEBUG, "RECVD: %s", sret);
+	return sret;
+}
+
+/* Extracts a string value from a json array with error checking. To be used
+ * when the value of the string returned is only examined and not to be stored.
+ * See json_array_string below */
+static char *__json_array_string(json_t *val, unsigned int entry)
+{
+	json_t *arr_entry;
+
+	if (json_is_null(val))
+		return NULL;
+	if (!json_is_array(val))
+		return NULL;
+	if (entry > json_array_size(val))
+		return NULL;
+	arr_entry = json_array_get(val, entry);
+	if (!json_is_string(arr_entry))
+		return NULL;
+
+	return (char *)json_string_value(arr_entry);
+}
+
+/* Creates a freshly malloced dup of __json_array_string */
+static char *json_array_string(json_t *val, unsigned int entry)
+{
+	char *buf = __json_array_string(val, entry);
+
+	if (buf)
+		return strdup(buf);
+	return NULL;
+}
+
+static bool parse_notify(struct pool *pool, json_t *val)
+{
+	char *job_id, *prev_hash, *coinbase1, *coinbase2, *bbversion, *nbit, *ntime;
+	int merkles, i;
+	json_t *arr;
+	bool clean;
+
+	arr = json_array_get(val, 4);
+	if (!arr || !json_is_array(arr))
+		return false;
+
+	merkles = json_array_size(arr);
+
+	job_id = json_array_string(val, 0);
+	prev_hash = json_array_string(val, 1);
+	coinbase1 = json_array_string(val, 2);
+	coinbase2 = json_array_string(val, 3);
+	bbversion = json_array_string(val, 5);
+	nbit = json_array_string(val, 6);
+	ntime = json_array_string(val, 7);
+	clean = json_is_true(json_array_get(val, 8));
+
+	if (!job_id || !prev_hash || !coinbase1 || !coinbase2 || !bbversion || !nbit || !ntime) {
+		/* Annoying but we must not leak memory */
+		if (job_id)
+			free(job_id);
+		if (prev_hash)
+			free(prev_hash);
+		if (coinbase1)
+			free(coinbase1);
+		if (coinbase2)
+			free(coinbase2);
+		if (bbversion)
+			free(bbversion);
+		if (nbit)
+			free(nbit);
+		if (ntime)
+			free(ntime);
+		return false;
+	}
+
+	mutex_lock(&pool->pool_lock);
+	pool->swork.job_id = job_id;
+	pool->swork.prev_hash = prev_hash;
+	pool->swork.coinbase1 = coinbase1;
+	pool->swork.coinbase2 = coinbase2;
+	pool->swork.bbversion = bbversion;
+	pool->swork.nbit = nbit;
+	pool->swork.ntime = ntime;
+	pool->swork.clean = clean;
+	for (i = 0; i < pool->swork.merkles; i++)
+		free(pool->swork.merkle[i]);
+	if (merkles) {
+		pool->swork.merkle = realloc(pool->swork.merkle, sizeof(char *) * merkles + 1);
+		for (i = 0; i < merkles; i++)
+			pool->swork.merkle[i] = json_array_string(arr, i);
+	}
+	pool->swork.merkles = merkles;
+	if (clean)
+		pool->nonce2 = 0;
+	mutex_unlock(&pool->pool_lock);
+
+	if (opt_protocol) {
+		applog(LOG_DEBUG, "job_id: %s", job_id);
+		applog(LOG_DEBUG, "prev_hash: %s", prev_hash);
+		applog(LOG_DEBUG, "coinbase1: %s", coinbase1);
+		applog(LOG_DEBUG, "coinbase2: %s", coinbase2);
+		for (i = 0; i < merkles; i++)
+			applog(LOG_DEBUG, "merkle%d: %s", i, pool->swork.merkle[i]);
+		applog(LOG_DEBUG, "bbversion: %s", bbversion);
+		applog(LOG_DEBUG, "nbit: %s", nbit);
+		applog(LOG_DEBUG, "ntime: %s", ntime);
+		applog(LOG_DEBUG, "clean: %s", clean ? "yes" : "no");
+	}
+
+	return true;
+}
+
+static bool parse_diff(struct pool *pool, json_t *val)
+{
+	int diff;
+
+	diff = json_integer_value(json_array_get(val, 0));
+	if (diff < 1)
+		return false;
+
+	mutex_lock(&pool->pool_lock);
+	pool->swork.diff = diff;
+	mutex_unlock(&pool->pool_lock);
+
+	applog(LOG_DEBUG, "Pool %d difficulty set to %d", pool->pool_no, diff);
+
+	return true;
+}
+
+bool parse_stratum(struct pool *pool, char *s)
+{
+	json_t *val = NULL, *method, *err_val, *params;
+	json_error_t err;
+	bool ret = false;
+	char *buf;
+
+	val = JSON_LOADS(s, &err);
+	if (!val) {
+		applog(LOG_INFO, "JSON decode failed(%d): %s", err.line, err.text);
+		goto out;
+	}
+
+	method = json_object_get(val, "method");
+	err_val = json_object_get(val, "error");
+	params = json_object_get(val, "params");
+
+	if (!method || json_is_null(method) ||
+	    (err_val && !json_is_null(err_val))) {
+		char *ss;
+
+		if (err_val)
+			ss = json_dumps(err_val, JSON_INDENT(3));
+		else
+			ss = strdup("(unknown reason)");
+
+		applog(LOG_INFO, "JSON-RPC decode failed: %s", ss);
+
+		free(ss);
+
+		goto out;
+	}
+
+	buf = (char *)json_string_value(method);
+	if (!buf)
+		goto out;
+
+	if (!strncasecmp(buf, "mining.notify", 13) && parse_notify(pool, params)) {
+		ret = true;
+		goto out;
+	}
+
+	if (!strncasecmp(buf, "mining.set_difficulty", 21) && parse_diff(pool, params)) {
+		ret = true;
+		goto out;
+	}
+
+out:
+	if (val)
+		json_decref(val);
+
+	return ret;
+}
+
+bool auth_stratum(struct pool *pool)
+{
+	json_t *val = NULL, *res_val, *err_val;
+	char *s, *sret = NULL;
+	json_error_t err;
+	bool ret = false;
+
+	s = alloca(RECVSIZE);
+	sprintf(s, "{\"id\": %d, \"method\": \"mining.authorize\", \"params\": [\"%s\", \"%s\"]}",
+		pool->swork.id++, pool->rpc_user, pool->rpc_pass);
+
+	/* Parse all data prior sending auth request */
+	while (sock_full(pool->sock, false)) {
+		sret = recv_line(pool->sock);
+		if (!parse_stratum(pool, sret)) {
+			clear_sock(pool->sock);
+			applog(LOG_WARNING, "Failed to parse stratum buffer");
+			free(sret);
+			return ret;
+		}
+		free(sret);
+	}
+
+	if (!sock_send(pool->sock, s, strlen(s)))
+		goto out;
+
+	sret = recv_line(pool->sock);
+	if (!sret)
+		goto out;
+	val = JSON_LOADS(sret, &err);
+	free(sret);
+	res_val = json_object_get(val, "result");
+	err_val = json_object_get(val, "error");
+
+	if (!res_val || json_is_false(res_val) || (err_val && !json_is_null(err_val)))  {
+		char *ss;
+
+		if (err_val)
+			ss = json_dumps(err_val, JSON_INDENT(3));
+		else
+			ss = strdup("(unknown reason)");
+		applog(LOG_WARNING, "JSON stratum auth failed: %s", ss);
+		free(ss);
+
+		goto out;
+	}
+	ret = true;
+	applog(LOG_INFO, "Stratum authorisation success for pool %d", pool->pool_no);
+out:
+	if (val)
+		json_decref(val);
+
+	pool->stratum_auth = ret;
+
+	return ret;
+}
+
 bool initiate_stratum(struct pool *pool)
 {
 	json_t *val, *res_val, *err_val, *notify_val;
 	char *s, *buf, *sret = NULL;
-	struct timeval timeout;
 	json_error_t err;
 	bool ret = false;
-	ssize_t len;
-	fd_set rd;
 
 	s = alloca(RECVSIZE);
-	sprintf(s, "{\"id\": 0, \"method\": \"mining.subscribe\", \"params\": []}\n");
+	sprintf(s, "{\"id\": %d, \"method\": \"mining.subscribe\", \"params\": []}", pool->swork.id++);
 
 	pool->sock = socket(AF_INET, SOCK_STREAM, 0);
 	if (pool->sock == INVSOCK)
@@ -867,33 +1163,19 @@ bool initiate_stratum(struct pool *pool)
 		goto out;
 	}
 
-	/* Use select to timeout instead of waiting forever for a response */
-	FD_ZERO(&rd);
-	FD_SET(pool->sock, &rd);
-	timeout.tv_sec = 60;
-	if (select(pool->sock + 1, &rd, NULL, NULL, &timeout) < 1) {
+	if (!sock_full(pool->sock, true)) {
 		applog(LOG_DEBUG, "Timed out waiting for response in initiate_stratum");
 		goto out;
 	}
 
-	if (SOCKETFAIL(recv(pool->sock, s, RECVSIZE, MSG_PEEK | MSG_DONTWAIT))) {
-		applog(LOG_DEBUG, "Failed to recv sock in initiate_stratum");
-		goto out;
-	}
-
-	sret = strtok(s, "\n");
-	if (!sret) {
-		applog(LOG_DEBUG, "Failed to parse a \\n terminated string in initiate_stratum");
+	sret = recv_line(pool->sock);
+	if (!sret)
 		goto out;
-	}
 
-	/* We know how much data is in the buffer so this read should not fail */
-	len = strlen(sret);
-	read(pool->sock, s, len);
-
-	val = JSON_LOADS(s, &err);
+	val = JSON_LOADS(sret, &err);
+	free(sret);
 	if (!val) {
-		applog(LOG_DEBUG, "JSON decode failed(%d): %s", err.line, err.text);
+		applog(LOG_INFO, "JSON decode failed(%d): %s", err.line, err.text);
 		goto out;
 	}
 
@@ -909,7 +1191,7 @@ bool initiate_stratum(struct pool *pool)
 		else
 			ss = strdup("(unknown reason)");
 
-		applog(LOG_INFO, "JSON-RPC call failed: %s", ss);
+		applog(LOG_INFO, "JSON-RPC decode failed: %s", ss);
 
 		free(ss);
 
@@ -922,41 +1204,42 @@ bool initiate_stratum(struct pool *pool)
 		goto out;
 	}
 
-	buf = (char *)json_string_value(json_array_get(notify_val, 0));
+	buf = __json_array_string(notify_val, 0);
 	if (!buf || strcasecmp(buf, "mining.notify")) {
 		applog(LOG_WARNING, "Failed to get mining notify in initiate_stratum");
 		goto out;
 	}
-	pool->subscription = (char *)json_string_value(json_array_get(notify_val, 1));
+	pool->subscription = json_array_string(notify_val, 1);
 	if (!pool->subscription) {
 		applog(LOG_WARNING, "Failed to get a subscription in initiate_stratum");
 		goto out;
 	}
 
-	pool->nonce1 = (char *)json_string_value(json_array_get(res_val, 1));
+	pool->nonce1 = json_array_string(res_val, 1);
 	if (!pool->nonce1) {
 		applog(LOG_WARNING, "Failed to get nonce1 in initiate_stratum");
 		goto out;
 	}
-	pool->nonce2 = json_integer_value(json_array_get(res_val, 2));
-	if (!pool->nonce2) {
-		applog(LOG_WARNING, "Failed to get nonce2 in initiate_stratum");
+	pool->n2size = json_integer_value(json_array_get(res_val, 2));
+	if (!pool->n2size) {
+		applog(LOG_WARNING, "Failed to get n2size in initiate_stratum");
 		goto out;
 	}
 
 	ret = true;
 out:
-	if (!ret) {
-		CLOSESOCKET(pool->sock);
-		if (val)
-			json_decref(val);
-	} else {
+	if (val)
+		json_decref(val);
+
+	if (ret) {
 		pool->stratum_active = true;
+		pool->swork.diff = 1;
 		if (opt_protocol) {
-			applog(LOG_DEBUG, "Pool %d confirmed mining.notify with subscription %s extranonce1 %s extranonce2 %d",
-			       pool->pool_no, pool->subscription, pool->nonce1, pool->nonce2);
+			applog(LOG_DEBUG, "Pool %d confirmed mining.notify with subscription %s extranonce1 %s extran2size %d",
+			       pool->pool_no, pool->subscription, pool->nonce1, pool->n2size);
 		}
-	}
+	} else
+		CLOSESOCKET(pool->sock);
 
 	return ret;
 }

+ 3 - 0
util.h

@@ -109,7 +109,10 @@
 	#endif
 #endif
 struct pool;
+char *recv_line(SOCKETTYPE sock);
+bool parse_stratum(struct pool *pool, char *s);
 bool extract_sockaddr(struct pool *pool, char *url);
+bool auth_stratum(struct pool *pool);
 bool initiate_stratum(struct pool *pool);
 
 #endif /* __UTIL_H__ */