Browse Source

Split up json_rpc_call so it can be used asynchronously in libcurl-multi

Luke Dashjr 13 years ago
parent
commit
ff695b39fb
2 changed files with 94 additions and 52 deletions
  1. 87 52
      util.c
  2. 7 0
      util.h

+ 87 - 52
util.c

@@ -289,32 +289,37 @@ static void set_nettime(void)
 	wr_unlock(&netacc_lock);
 }
 
-json_t *json_rpc_call(CURL *curl, const char *url,
+struct json_rpc_call_state {
+	struct data_buffer all_data;
+	struct header_info hi;
+	void *priv;
+	char curl_err_str[CURL_ERROR_SIZE];
+	struct curl_slist *headers;
+	struct upload_buffer upload_data;
+	struct pool *pool;
+};
+
+void json_rpc_call_async(CURL *curl, const char *url,
 		      const char *userpass, const char *rpc_req,
-		      bool probe, bool longpoll, int *rolltime,
-		      struct pool *pool, bool share)
+		      bool longpoll,
+		      struct pool *pool, bool share,
+		      void *priv)
 {
+	struct json_rpc_call_state *state = malloc(sizeof(struct json_rpc_call_state));
+	*state = (struct json_rpc_call_state){
+		.priv = priv,
+		.pool = pool,
+	};
 	long timeout = longpoll ? (60 * 60) : 60;
-	struct data_buffer all_data = {.len = 0};
-	struct header_info hi = {NULL, 0, NULL, NULL, false, false, false};
 	char len_hdr[64], user_agent_hdr[128];
-	char curl_err_str[CURL_ERROR_SIZE];
 	struct curl_slist *headers = NULL;
-	struct upload_buffer upload_data;
-	json_t *val, *err_val, *res_val;
-	bool probing = false;
-	json_error_t err;
-	int rc;
-
-	memset(&err, 0, sizeof(err));
 
 	if (longpoll)
-		all_data.idlemarker = &pool->lp_socket;
+		state->all_data.idlemarker = &pool->lp_socket;
 
 	/* it is assumed that 'curl' is freshly [re]initialized at this pt */
 
-	if (probe)
-		probing = !pool->probed;
+	curl_easy_setopt(curl, CURLOPT_PRIVATE, state);
 	curl_easy_setopt(curl, CURLOPT_TIMEOUT, timeout);
 
 #if 0 /* Disable curl debugging since it spews to stderr */
@@ -331,13 +336,13 @@ json_t *json_rpc_call(CURL *curl, const char *url,
 	if (!opt_delaynet || share)
 		curl_easy_setopt(curl, CURLOPT_TCP_NODELAY, 1);
 	curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, all_data_cb);
-	curl_easy_setopt(curl, CURLOPT_WRITEDATA, &all_data);
+	curl_easy_setopt(curl, CURLOPT_WRITEDATA, &state->all_data);
 	curl_easy_setopt(curl, CURLOPT_READFUNCTION, upload_data_cb);
-	curl_easy_setopt(curl, CURLOPT_READDATA, &upload_data);
-	curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, curl_err_str);
+	curl_easy_setopt(curl, CURLOPT_READDATA, &state->upload_data);
+	curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, &state->curl_err_str[0]);
 	curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1);
 	curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, resp_hdr_cb);
-	curl_easy_setopt(curl, CURLOPT_HEADERDATA, &hi);
+	curl_easy_setopt(curl, CURLOPT_HEADERDATA, &state->hi);
 	curl_easy_setopt(curl, CURLOPT_USE_SSL, CURLUSESSL_TRY);
 	if (pool->rpc_proxy) {
 		curl_easy_setopt(curl, CURLOPT_PROXY, pool->rpc_proxy);
@@ -356,10 +361,10 @@ json_t *json_rpc_call(CURL *curl, const char *url,
 	if (opt_protocol)
 		applog(LOG_DEBUG, "JSON protocol request:\n%s", rpc_req);
 
-	upload_data.buf = rpc_req;
-	upload_data.len = strlen(rpc_req);
+	state->upload_data.buf = rpc_req;
+	state->upload_data.len = strlen(rpc_req);
 	sprintf(len_hdr, "Content-Length: %lu",
-		(unsigned long) upload_data.len);
+		(unsigned long) state->upload_data.len);
 	sprintf(user_agent_hdr, "User-Agent: %s", PACKAGE_STRING);
 
 	headers = curl_slist_append(headers,
@@ -383,6 +388,7 @@ json_t *json_rpc_call(CURL *curl, const char *url,
 	headers = curl_slist_append(headers, "Expect:"); /* disable Expect hdr*/
 
 	curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
+	state->headers = headers;
 
 	if (opt_delaynet) {
 		/* Don't delay share submission, but still track the nettime */
@@ -406,14 +412,31 @@ json_t *json_rpc_call(CURL *curl, const char *url,
 		}
 		set_nettime();
 	}
+}
+
+json_t *json_rpc_call_completed(CURL *curl, int rc, bool probe, int *rolltime, void *out_priv)
+{
+	struct json_rpc_call_state *state;
+	if (curl_easy_getinfo(curl, CURLINFO_PRIVATE, &state) != CURLE_OK) {
+		applog(LOG_ERR, "Failed to get private curl data");
+		if (out_priv)
+			*(void**)out_priv = NULL;
+		goto err_out;
+	}
+	if (out_priv)
+		*(void**)out_priv = state->priv;
+
+	json_t *val, *err_val, *res_val;
+	json_error_t err;
+	struct pool *pool = state->pool;
+	bool probing = probe && !pool->probed;
 
-	rc = curl_easy_perform(curl);
 	if (rc) {
-		applog(LOG_INFO, "HTTP request failed: %s", curl_err_str);
+		applog(LOG_INFO, "HTTP request failed: %s", state->curl_err_str);
 		goto err_out;
 	}
 
-	if (!all_data.buf) {
+	if (!state->all_data.buf) {
 		applog(LOG_DEBUG, "Empty data received in json_rpc_call.");
 		goto err_out;
 	}
@@ -421,40 +444,40 @@ json_t *json_rpc_call(CURL *curl, const char *url,
 	if (probing) {
 		pool->probed = true;
 		/* If X-Long-Polling was found, activate long polling */
-		if (hi.lp_path) {
+		if (state->hi.lp_path) {
 			if (pool->hdr_path != NULL)
 				free(pool->hdr_path);
-			pool->hdr_path = hi.lp_path;
+			pool->hdr_path = state->hi.lp_path;
 		} else
 			pool->hdr_path = NULL;
-		if (hi.stratum_url) {
-			pool->stratum_url = hi.stratum_url;
-			hi.stratum_url = NULL;
+		if (state->hi.stratum_url) {
+			pool->stratum_url = state->hi.stratum_url;
+			state->hi.stratum_url = NULL;
 		}
 	} else {
-		if (hi.lp_path) {
-			free(hi.lp_path);
-			hi.lp_path = NULL;
+		if (state->hi.lp_path) {
+			free(state->hi.lp_path);
+			state->hi.lp_path = NULL;
 		}
-		if (hi.stratum_url) {
-			free(hi.stratum_url);
-			hi.stratum_url = NULL;
+		if (state->hi.stratum_url) {
+			free(state->hi.stratum_url);
+			state->hi.stratum_url = NULL;
 		}
 	}
 
 	if (rolltime)
-	*rolltime = hi.rolltime;
-	pool->cgminer_pool_stats.rolltime = hi.rolltime;
-	pool->cgminer_pool_stats.hadrolltime = hi.hadrolltime;
-	pool->cgminer_pool_stats.canroll = hi.canroll;
-	pool->cgminer_pool_stats.hadexpire = hi.hadexpire;
+		*rolltime = state->hi.rolltime;
+	pool->cgminer_pool_stats.rolltime = state->hi.rolltime;
+	pool->cgminer_pool_stats.hadrolltime = state->hi.hadrolltime;
+	pool->cgminer_pool_stats.canroll = state->hi.canroll;
+	pool->cgminer_pool_stats.hadexpire = state->hi.hadexpire;
 
-	val = JSON_LOADS(all_data.buf, &err);
+	val = JSON_LOADS(state->all_data.buf, &err);
 	if (!val) {
 		applog(LOG_INFO, "JSON decode failed(%d): %s", err.line, err.text);
 
 		if (opt_protocol)
-			applog(LOG_DEBUG, "JSON protocol response:\n%s", all_data.buf);
+			applog(LOG_DEBUG, "JSON protocol response:\n%s", state->all_data.buf);
 
 		goto err_out;
 	}
@@ -488,27 +511,39 @@ json_t *json_rpc_call(CURL *curl, const char *url,
 		goto err_out;
 	}
 
-	if (hi.reason) {
-		json_object_set_new(val, "reject-reason", json_string(hi.reason));
-		free(hi.reason);
-		hi.reason = NULL;
+	if (state->hi.reason) {
+		json_object_set_new(val, "reject-reason", json_string(state->hi.reason));
+		free(state->hi.reason);
+		state->hi.reason = NULL;
 	}
 	successful_connect = true;
-	databuf_free(&all_data);
-	curl_slist_free_all(headers);
+	databuf_free(&state->all_data);
+	curl_slist_free_all(state->headers);
 	curl_easy_reset(curl);
+	free(state);
 	return val;
 
 err_out:
-	databuf_free(&all_data);
-	curl_slist_free_all(headers);
+	databuf_free(&state->all_data);
+	curl_slist_free_all(state->headers);
 	curl_easy_reset(curl);
 	if (!successful_connect)
 		applog(LOG_DEBUG, "Failed to connect in json_rpc_call");
 	curl_easy_setopt(curl, CURLOPT_FRESH_CONNECT, 1);
+	free(state);
 	return NULL;
 }
 
+json_t *json_rpc_call(CURL *curl, const char *url,
+		      const char *userpass, const char *rpc_req,
+		      bool probe, bool longpoll, int *rolltime,
+		      struct pool *pool, bool share)
+{
+	json_rpc_call_async(curl, url, userpass, rpc_req, longpoll, pool, share, NULL);
+	int rc = curl_easy_perform(curl);
+	return json_rpc_call_completed(curl, rc, probe, rolltime, NULL);
+}
+
 bool our_curl_supports_proxy_uris()
 {
 	curl_version_info_data *data = curl_version_info(CURLVERSION_NOW);

+ 7 - 0
util.h

@@ -1,6 +1,9 @@
 #ifndef __UTIL_H__
 #define __UTIL_H__
 
+#include <curl/curl.h>
+#include <jansson.h>
+
 #if defined(unix) || defined(__APPLE__)
 	#include <errno.h>
 	#include <sys/socket.h>
@@ -45,6 +48,10 @@
 struct pool;
 enum dev_reason;
 struct cgpu_info;
+
+extern void json_rpc_call_async(CURL *, const char *url, const char *userpass, const char *rpc_req, bool longpoll, struct pool *pool, bool share, void *priv);
+extern json_t *json_rpc_call_completed(CURL *, int rc, bool probe, int *rolltime, void *out_priv);
+
 bool stratum_send(struct pool *pool, char *s, ssize_t len);
 char *recv_line(struct pool *pool);
 bool parse_method(struct pool *pool, char *s);