|
@@ -3814,8 +3814,11 @@ struct submit_work_state {
|
|
|
|
|
|
|
|
static int my_curl_timer_set(__maybe_unused CURLM *curlm, long timeout_ms, void *userp)
|
|
static int my_curl_timer_set(__maybe_unused CURLM *curlm, long timeout_ms, void *userp)
|
|
|
{
|
|
{
|
|
|
- long *timeout = userp;
|
|
|
|
|
- *timeout = timeout_ms;
|
|
|
|
|
|
|
+ struct timeval *tvp_timer = userp;
|
|
|
|
|
+ if (timeout_ms == -1)
|
|
|
|
|
+ tvp_timer->tv_sec = -1;
|
|
|
|
|
+ else
|
|
|
|
|
+ timer_set_delay_from_now(tvp_timer, timeout_ms * 1000);
|
|
|
return 0;
|
|
return 0;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -3957,7 +3960,7 @@ static void *submit_work_thread(__maybe_unused void *userdata)
|
|
|
{
|
|
{
|
|
|
int wip = 0;
|
|
int wip = 0;
|
|
|
CURLM *curlm;
|
|
CURLM *curlm;
|
|
|
- long curlm_timeout_ms = -1;
|
|
|
|
|
|
|
+ struct timeval curlm_timer;
|
|
|
struct submit_work_state *sws, **swsp;
|
|
struct submit_work_state *sws, **swsp;
|
|
|
struct submit_work_state *write_sws = NULL;
|
|
struct submit_work_state *write_sws = NULL;
|
|
|
unsigned tsreduce = 0;
|
|
unsigned tsreduce = 0;
|
|
@@ -3969,12 +3972,13 @@ static void *submit_work_thread(__maybe_unused void *userdata)
|
|
|
applog(LOG_DEBUG, "Creating extra submit work thread");
|
|
applog(LOG_DEBUG, "Creating extra submit work thread");
|
|
|
|
|
|
|
|
curlm = curl_multi_init();
|
|
curlm = curl_multi_init();
|
|
|
|
|
+ curlm_timer.tv_sec = -1;
|
|
|
|
|
+ curl_multi_setopt(curlm, CURLMOPT_TIMERDATA, &curlm_timer);
|
|
|
curl_multi_setopt(curlm, CURLMOPT_TIMERFUNCTION, my_curl_timer_set);
|
|
curl_multi_setopt(curlm, CURLMOPT_TIMERFUNCTION, my_curl_timer_set);
|
|
|
- curl_multi_setopt(curlm, CURLMOPT_TIMERDATA, &curlm_timeout_ms);
|
|
|
|
|
|
|
|
|
|
fd_set rfds, wfds, efds;
|
|
fd_set rfds, wfds, efds;
|
|
|
int maxfd;
|
|
int maxfd;
|
|
|
- struct timeval timeout, *timeoutp;
|
|
|
|
|
|
|
+ struct timeval tv_timeout, tv_now;
|
|
|
int n;
|
|
int n;
|
|
|
CURLMsg *cm;
|
|
CURLMsg *cm;
|
|
|
FD_ZERO(&rfds);
|
|
FD_ZERO(&rfds);
|
|
@@ -3985,6 +3989,8 @@ static void *submit_work_thread(__maybe_unused void *userdata)
|
|
|
if (FD_ISSET(submit_waiting_notifier[0], &rfds)) {
|
|
if (FD_ISSET(submit_waiting_notifier[0], &rfds)) {
|
|
|
notifier_read(submit_waiting_notifier);
|
|
notifier_read(submit_waiting_notifier);
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+ // Receive any new submissions
|
|
|
while (!list_empty(&submit_waiting)) {
|
|
while (!list_empty(&submit_waiting)) {
|
|
|
struct work *work = list_entry(submit_waiting.next, struct work, list);
|
|
struct work *work = list_entry(submit_waiting.next, struct work, list);
|
|
|
list_del(&work->list);
|
|
list_del(&work->list);
|
|
@@ -4002,6 +4008,7 @@ static void *submit_work_thread(__maybe_unused void *userdata)
|
|
|
free_work(work);
|
|
free_work(work);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
if (unlikely(shutting_down && !wip))
|
|
if (unlikely(shutting_down && !wip))
|
|
|
break;
|
|
break;
|
|
|
mutex_unlock(&submitting_lock);
|
|
mutex_unlock(&submitting_lock);
|
|
@@ -4009,14 +4016,13 @@ static void *submit_work_thread(__maybe_unused void *userdata)
|
|
|
FD_ZERO(&rfds);
|
|
FD_ZERO(&rfds);
|
|
|
FD_ZERO(&wfds);
|
|
FD_ZERO(&wfds);
|
|
|
FD_ZERO(&efds);
|
|
FD_ZERO(&efds);
|
|
|
|
|
+ tv_timeout.tv_sec = -1;
|
|
|
|
|
+
|
|
|
|
|
+ // Setup cURL with select
|
|
|
curl_multi_fdset(curlm, &rfds, &wfds, &efds, &maxfd);
|
|
curl_multi_fdset(curlm, &rfds, &wfds, &efds, &maxfd);
|
|
|
- if (curlm_timeout_ms >= 0) {
|
|
|
|
|
- timeout.tv_sec = curlm_timeout_ms / 1000;
|
|
|
|
|
- timeout.tv_usec = (curlm_timeout_ms % 1000) * 1000;
|
|
|
|
|
- timeoutp = &timeout;
|
|
|
|
|
- } else
|
|
|
|
|
- timeoutp = NULL;
|
|
|
|
|
|
|
+ reduce_timeout_to(&tv_timeout, &curlm_timer);
|
|
|
|
|
|
|
|
|
|
+ // Setup waiting stratum submissions with select
|
|
|
for (swsp = &write_sws; (sws = *swsp); ) {
|
|
for (swsp = &write_sws; (sws = *swsp); ) {
|
|
|
int fd = sws->work->pool->sock;
|
|
int fd = sws->work->pool->sock;
|
|
|
if (fd == INVSOCK) {
|
|
if (fd == INVSOCK) {
|
|
@@ -4029,8 +4035,7 @@ static void *submit_work_thread(__maybe_unused void *userdata)
|
|
|
continue;
|
|
continue;
|
|
|
}
|
|
}
|
|
|
FD_SET(fd, &wfds);
|
|
FD_SET(fd, &wfds);
|
|
|
- if (fd > maxfd)
|
|
|
|
|
- maxfd = fd;
|
|
|
|
|
|
|
+ set_maxfd(&maxfd, fd);
|
|
|
swsp = &sws->next;
|
|
swsp = &sws->next;
|
|
|
}
|
|
}
|
|
|
if (tsreduce) {
|
|
if (tsreduce) {
|
|
@@ -4040,15 +4045,18 @@ static void *submit_work_thread(__maybe_unused void *userdata)
|
|
|
tsreduce = 0;
|
|
tsreduce = 0;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ // Setup "submit waiting" notifier with select
|
|
|
FD_SET(submit_waiting_notifier[0], &rfds);
|
|
FD_SET(submit_waiting_notifier[0], &rfds);
|
|
|
- if (submit_waiting_notifier[0] > maxfd)
|
|
|
|
|
- maxfd = submit_waiting_notifier[0];
|
|
|
|
|
|
|
+ set_maxfd(&maxfd, submit_waiting_notifier[0]);
|
|
|
|
|
|
|
|
- if (select(maxfd+1, &rfds, &wfds, &efds, timeoutp) < 0) {
|
|
|
|
|
|
|
+ // Wait for something interesting to happen :)
|
|
|
|
|
+ gettimeofday(&tv_now, NULL);
|
|
|
|
|
+ if (select(maxfd+1, &rfds, &wfds, &efds, select_timeout(&tv_timeout, &tv_now)) < 0) {
|
|
|
FD_ZERO(&rfds);
|
|
FD_ZERO(&rfds);
|
|
|
continue;
|
|
continue;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ // Handle any stratum ready-to-write results
|
|
|
for (swsp = &write_sws; (sws = *swsp); ) {
|
|
for (swsp = &write_sws; (sws = *swsp); ) {
|
|
|
int fd = sws->work->pool->sock;
|
|
int fd = sws->work->pool->sock;
|
|
|
if (fd == -1 || !FD_ISSET(fd, &wfds)) {
|
|
if (fd == -1 || !FD_ISSET(fd, &wfds)) {
|
|
@@ -4079,6 +4087,7 @@ static void *submit_work_thread(__maybe_unused void *userdata)
|
|
|
--wip;
|
|
--wip;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ // Handle any cURL activities
|
|
|
curl_multi_perform(curlm, &n);
|
|
curl_multi_perform(curlm, &n);
|
|
|
while( (cm = curl_multi_info_read(curlm, &n)) ) {
|
|
while( (cm = curl_multi_info_read(curlm, &n)) ) {
|
|
|
if (cm->msg == CURLMSG_DONE)
|
|
if (cm->msg == CURLMSG_DONE)
|