|
@@ -160,6 +160,7 @@ static int api_thr_id;
|
|
|
static int total_threads;
|
|
static int total_threads;
|
|
|
|
|
|
|
|
static pthread_mutex_t hash_lock;
|
|
static pthread_mutex_t hash_lock;
|
|
|
|
|
+static pthread_mutex_t qd_lock;
|
|
|
static pthread_mutex_t *stgd_lock;
|
|
static pthread_mutex_t *stgd_lock;
|
|
|
#ifdef HAVE_CURSES
|
|
#ifdef HAVE_CURSES
|
|
|
static pthread_mutex_t curses_lock;
|
|
static pthread_mutex_t curses_lock;
|
|
@@ -2190,9 +2191,12 @@ static bool stale_work(struct work *work, bool share)
|
|
|
/* Factor in the average getwork delay of this pool, rounding it up to
|
|
/* Factor in the average getwork delay of this pool, rounding it up to
|
|
|
* the nearest second */
|
|
* the nearest second */
|
|
|
getwork_delay = pool->cgminer_pool_stats.getwork_wait_rolling * 5 + 1;
|
|
getwork_delay = pool->cgminer_pool_stats.getwork_wait_rolling * 5 + 1;
|
|
|
- work_expiry -= getwork_delay;
|
|
|
|
|
- if (unlikely(work_expiry < 5))
|
|
|
|
|
- work_expiry = 5;
|
|
|
|
|
|
|
+ if (!share) {
|
|
|
|
|
+ work_expiry -= getwork_delay;
|
|
|
|
|
+ if (unlikely(work_expiry < 5))
|
|
|
|
|
+ work_expiry = 5;
|
|
|
|
|
+ } else
|
|
|
|
|
+ work_expiry += getwork_delay;
|
|
|
|
|
|
|
|
gettimeofday(&now, NULL);
|
|
gettimeofday(&now, NULL);
|
|
|
if ((now.tv_sec - work->tv_staged.tv_sec) >= work_expiry)
|
|
if ((now.tv_sec - work->tv_staged.tv_sec) >= work_expiry)
|
|
@@ -2379,6 +2383,11 @@ void switch_pools(struct pool *selected)
|
|
|
if (pool != last_pool)
|
|
if (pool != last_pool)
|
|
|
applog(LOG_WARNING, "Switching to %s", pool->rpc_url);
|
|
applog(LOG_WARNING, "Switching to %s", pool->rpc_url);
|
|
|
|
|
|
|
|
|
|
+ /* Reset the queued amount to allow more to be queued for the new pool */
|
|
|
|
|
+ mutex_lock(&qd_lock);
|
|
|
|
|
+ total_queued = 0;
|
|
|
|
|
+ mutex_unlock(&qd_lock);
|
|
|
|
|
+
|
|
|
mutex_lock(&lp_lock);
|
|
mutex_lock(&lp_lock);
|
|
|
pthread_cond_broadcast(&lp_cond);
|
|
pthread_cond_broadcast(&lp_cond);
|
|
|
mutex_unlock(&lp_lock);
|
|
mutex_unlock(&lp_lock);
|
|
@@ -2396,34 +2405,58 @@ static void discard_work(struct work *work)
|
|
|
free_work(work);
|
|
free_work(work);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-/* Done lockless since this is not a critical value */
|
|
|
|
|
-static inline void inc_queued(void)
|
|
|
|
|
|
|
+/* This is overkill, but at least we'll know accurately how much work is
|
|
|
|
|
+ * queued to prevent ever being left without work */
|
|
|
|
|
+static void inc_queued(void)
|
|
|
{
|
|
{
|
|
|
|
|
+ mutex_lock(&qd_lock);
|
|
|
total_queued++;
|
|
total_queued++;
|
|
|
|
|
+ mutex_unlock(&qd_lock);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-static inline void dec_queued(void)
|
|
|
|
|
|
|
+static void dec_queued(struct work *work)
|
|
|
{
|
|
{
|
|
|
- if (likely(total_queued > 0))
|
|
|
|
|
|
|
+ if (work->clone)
|
|
|
|
|
+ return;
|
|
|
|
|
+
|
|
|
|
|
+ mutex_lock(&qd_lock);
|
|
|
|
|
+ if (total_queued > 0)
|
|
|
total_queued--;
|
|
total_queued--;
|
|
|
|
|
+ mutex_unlock(&qd_lock);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
static int requests_queued(void)
|
|
static int requests_queued(void)
|
|
|
{
|
|
{
|
|
|
- return requests_staged() - staged_extras;
|
|
|
|
|
|
|
+ int ret;
|
|
|
|
|
+
|
|
|
|
|
+ mutex_lock(&qd_lock);
|
|
|
|
|
+ ret = total_queued;
|
|
|
|
|
+ mutex_unlock(&qd_lock);
|
|
|
|
|
+ return ret;
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+static void subtract_queued(int work_units)
|
|
|
|
|
+{
|
|
|
|
|
+ mutex_lock(&qd_lock);
|
|
|
|
|
+ total_queued -= work_units;
|
|
|
|
|
+ if (total_queued < 0)
|
|
|
|
|
+ total_queued = 0;
|
|
|
|
|
+ mutex_unlock(&qd_lock);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
static int discard_stale(void)
|
|
static int discard_stale(void)
|
|
|
{
|
|
{
|
|
|
struct work *work, *tmp;
|
|
struct work *work, *tmp;
|
|
|
- int i, stale = 0;
|
|
|
|
|
|
|
+ int stale = 0, nonclone = 0;
|
|
|
|
|
|
|
|
mutex_lock(stgd_lock);
|
|
mutex_lock(stgd_lock);
|
|
|
HASH_ITER(hh, staged_work, work, tmp) {
|
|
HASH_ITER(hh, staged_work, work, tmp) {
|
|
|
if (stale_work(work, false)) {
|
|
if (stale_work(work, false)) {
|
|
|
HASH_DEL(staged_work, work);
|
|
HASH_DEL(staged_work, work);
|
|
|
- if (work->clone || work->longpoll)
|
|
|
|
|
|
|
+ if (work->clone)
|
|
|
--staged_extras;
|
|
--staged_extras;
|
|
|
|
|
+ else
|
|
|
|
|
+ nonclone++;
|
|
|
discard_work(work);
|
|
discard_work(work);
|
|
|
stale++;
|
|
stale++;
|
|
|
}
|
|
}
|
|
@@ -2433,8 +2466,7 @@ static int discard_stale(void)
|
|
|
applog(LOG_DEBUG, "Discarded %d stales that didn't match current hash", stale);
|
|
applog(LOG_DEBUG, "Discarded %d stales that didn't match current hash", stale);
|
|
|
|
|
|
|
|
/* Dec queued outside the loop to not have recursive locks */
|
|
/* Dec queued outside the loop to not have recursive locks */
|
|
|
- for (i = 0; i < stale; i++)
|
|
|
|
|
- dec_queued();
|
|
|
|
|
|
|
+ subtract_queued(nonclone);
|
|
|
|
|
|
|
|
return stale;
|
|
return stale;
|
|
|
}
|
|
}
|
|
@@ -2584,7 +2616,7 @@ static bool hash_push(struct work *work)
|
|
|
if (likely(!getq->frozen)) {
|
|
if (likely(!getq->frozen)) {
|
|
|
HASH_ADD_INT(staged_work, id, work);
|
|
HASH_ADD_INT(staged_work, id, work);
|
|
|
HASH_SORT(staged_work, tv_sort);
|
|
HASH_SORT(staged_work, tv_sort);
|
|
|
- if (work->clone || work->longpoll)
|
|
|
|
|
|
|
+ if (work->clone)
|
|
|
++staged_extras;
|
|
++staged_extras;
|
|
|
} else
|
|
} else
|
|
|
rc = false;
|
|
rc = false;
|
|
@@ -3529,43 +3561,70 @@ static void pool_resus(struct pool *pool)
|
|
|
switch_pools(NULL);
|
|
switch_pools(NULL);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+static time_t requested_tv_sec;
|
|
|
|
|
+
|
|
|
static bool queue_request(struct thr_info *thr, bool needed)
|
|
static bool queue_request(struct thr_info *thr, bool needed)
|
|
|
{
|
|
{
|
|
|
- int rs = requests_staged(), rq = requests_queued();
|
|
|
|
|
|
|
+ int toq, rq = requests_queued(), rs = requests_staged();
|
|
|
struct workio_cmd *wc;
|
|
struct workio_cmd *wc;
|
|
|
|
|
+ struct timeval now;
|
|
|
|
|
+ time_t scan_post;
|
|
|
|
|
+
|
|
|
|
|
+ /* Grab more work every 2/3 of the scan time to avoid all work expiring
|
|
|
|
|
+ * at the same time */
|
|
|
|
|
+ scan_post = opt_scantime * 2 / 3;
|
|
|
|
|
+ if (scan_post < 5)
|
|
|
|
|
+ scan_post = 5;
|
|
|
|
|
|
|
|
- if (rq >= mining_threads || (rq >= opt_queue && rs >= mining_threads))
|
|
|
|
|
|
|
+ gettimeofday(&now, NULL);
|
|
|
|
|
+
|
|
|
|
|
+ /* Test to make sure we have enough work for pools without rolltime
|
|
|
|
|
+ * and enough original work for pools with rolltime */
|
|
|
|
|
+ if ((rq >= mining_threads || rs >= mining_threads) &&
|
|
|
|
|
+ rq > staged_extras + opt_queue &&
|
|
|
|
|
+ now.tv_sec - requested_tv_sec < scan_post)
|
|
|
return true;
|
|
return true;
|
|
|
|
|
|
|
|
- /* fill out work request message */
|
|
|
|
|
- wc = calloc(1, sizeof(*wc));
|
|
|
|
|
- if (unlikely(!wc)) {
|
|
|
|
|
- applog(LOG_ERR, "Failed to calloc wc in queue_request");
|
|
|
|
|
- return false;
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ requested_tv_sec = now.tv_sec;
|
|
|
|
|
|
|
|
- wc->cmd = WC_GET_WORK;
|
|
|
|
|
- if (thr)
|
|
|
|
|
- wc->thr = thr;
|
|
|
|
|
|
|
+ if (rq > rs)
|
|
|
|
|
+ toq = rq - mining_threads;
|
|
|
else
|
|
else
|
|
|
- wc->thr = NULL;
|
|
|
|
|
|
|
+ toq = rs - mining_threads;
|
|
|
|
|
|
|
|
- /* If we're queueing work faster than we can stage it, consider the
|
|
|
|
|
- * system lagging and allow work to be gathered from another pool if
|
|
|
|
|
- * possible */
|
|
|
|
|
- if (rq && needed && !requests_staged() && !opt_fail_only)
|
|
|
|
|
- wc->lagging = true;
|
|
|
|
|
|
|
+ do {
|
|
|
|
|
+ inc_queued();
|
|
|
|
|
|
|
|
- applog(LOG_DEBUG, "Queueing getwork request to work thread");
|
|
|
|
|
|
|
+ /* fill out work request message */
|
|
|
|
|
+ wc = calloc(1, sizeof(*wc));
|
|
|
|
|
+ if (unlikely(!wc)) {
|
|
|
|
|
+ applog(LOG_ERR, "Failed to calloc wc in queue_request");
|
|
|
|
|
+ return false;
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- /* send work request to workio thread */
|
|
|
|
|
- if (unlikely(!tq_push(thr_info[work_thr_id].q, wc))) {
|
|
|
|
|
- applog(LOG_ERR, "Failed to tq_push in queue_request");
|
|
|
|
|
- workio_cmd_free(wc);
|
|
|
|
|
- return false;
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ wc->cmd = WC_GET_WORK;
|
|
|
|
|
+ if (thr)
|
|
|
|
|
+ wc->thr = thr;
|
|
|
|
|
+ else
|
|
|
|
|
+ wc->thr = NULL;
|
|
|
|
|
+
|
|
|
|
|
+ /* If we're queueing work faster than we can stage it, consider the
|
|
|
|
|
+ * system lagging and allow work to be gathered from another pool if
|
|
|
|
|
+ * possible */
|
|
|
|
|
+ if (rq && needed && !rs && !opt_fail_only)
|
|
|
|
|
+ wc->lagging = true;
|
|
|
|
|
+
|
|
|
|
|
+ applog(LOG_DEBUG, "Queueing getwork request to work thread");
|
|
|
|
|
+
|
|
|
|
|
+ /* send work request to workio thread */
|
|
|
|
|
+ if (unlikely(!tq_push(thr_info[work_thr_id].q, wc))) {
|
|
|
|
|
+ applog(LOG_ERR, "Failed to tq_push in queue_request");
|
|
|
|
|
+ workio_cmd_free(wc);
|
|
|
|
|
+ return false;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ } while (--toq > 0);
|
|
|
|
|
|
|
|
- inc_queued();
|
|
|
|
|
return true;
|
|
return true;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -3581,7 +3640,7 @@ static struct work *hash_pop(const struct timespec *abstime)
|
|
|
if (HASH_COUNT(staged_work)) {
|
|
if (HASH_COUNT(staged_work)) {
|
|
|
work = staged_work;
|
|
work = staged_work;
|
|
|
HASH_DEL(staged_work, work);
|
|
HASH_DEL(staged_work, work);
|
|
|
- if (work->clone || work->longpoll)
|
|
|
|
|
|
|
+ if (work->clone)
|
|
|
--staged_extras;
|
|
--staged_extras;
|
|
|
}
|
|
}
|
|
|
mutex_unlock(stgd_lock);
|
|
mutex_unlock(stgd_lock);
|
|
@@ -3656,7 +3715,6 @@ static struct work *clone_work(struct work *work)
|
|
|
return work;
|
|
return work;
|
|
|
|
|
|
|
|
cloned = false;
|
|
cloned = false;
|
|
|
-
|
|
|
|
|
work_clone = make_clone(work);
|
|
work_clone = make_clone(work);
|
|
|
while (mrs-- > 0 && can_roll(work) && should_roll(work)) {
|
|
while (mrs-- > 0 && can_roll(work) && should_roll(work)) {
|
|
|
applog(LOG_DEBUG, "Pushing rolled converted work to stage thread");
|
|
applog(LOG_DEBUG, "Pushing rolled converted work to stage thread");
|
|
@@ -3716,7 +3774,7 @@ retry:
|
|
|
goto out;
|
|
goto out;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- if (!pool->lagging && requested && !newreq && !requests_staged()) {
|
|
|
|
|
|
|
+ if (!pool->lagging && requested && !newreq && !requests_staged() && requests_queued() >= mining_threads) {
|
|
|
struct cgpu_info *cgpu = thr->cgpu;
|
|
struct cgpu_info *cgpu = thr->cgpu;
|
|
|
bool stalled = true;
|
|
bool stalled = true;
|
|
|
int i;
|
|
int i;
|
|
@@ -3753,7 +3811,7 @@ retry:
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if (stale_work(work_heap, false)) {
|
|
if (stale_work(work_heap, false)) {
|
|
|
- dec_queued();
|
|
|
|
|
|
|
+ dec_queued(work_heap);
|
|
|
discard_work(work_heap);
|
|
discard_work(work_heap);
|
|
|
goto retry;
|
|
goto retry;
|
|
|
}
|
|
}
|
|
@@ -3768,9 +3826,8 @@ retry:
|
|
|
|
|
|
|
|
work_heap = clone_work(work_heap);
|
|
work_heap = clone_work(work_heap);
|
|
|
memcpy(work, work_heap, sizeof(struct work));
|
|
memcpy(work, work_heap, sizeof(struct work));
|
|
|
|
|
+ dec_queued(work_heap);
|
|
|
free_work(work_heap);
|
|
free_work(work_heap);
|
|
|
- if (!work->clone)
|
|
|
|
|
- dec_queued();
|
|
|
|
|
|
|
|
|
|
ret = true;
|
|
ret = true;
|
|
|
out:
|
|
out:
|
|
@@ -4135,8 +4192,10 @@ static void convert_to_work(json_t *val, int rolltime, struct pool *pool)
|
|
|
|
|
|
|
|
if (unlikely(!stage_work(work)))
|
|
if (unlikely(!stage_work(work)))
|
|
|
free_work(work);
|
|
free_work(work);
|
|
|
- else
|
|
|
|
|
|
|
+ else {
|
|
|
|
|
+ inc_queued();
|
|
|
applog(LOG_DEBUG, "Converted longpoll data to work");
|
|
applog(LOG_DEBUG, "Converted longpoll data to work");
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/* If we want longpoll, enable it for the chosen default pool, or, if
|
|
/* If we want longpoll, enable it for the chosen default pool, or, if
|
|
@@ -4374,7 +4433,7 @@ static void age_work(void)
|
|
|
{
|
|
{
|
|
|
int discarded = 0;
|
|
int discarded = 0;
|
|
|
|
|
|
|
|
- while (requests_staged() > mining_threads * 4 / 3) {
|
|
|
|
|
|
|
+ while (requests_staged() > mining_threads * 4 / 3 + opt_queue) {
|
|
|
struct work *work = hash_pop(NULL);
|
|
struct work *work = hash_pop(NULL);
|
|
|
|
|
|
|
|
if (unlikely(!work))
|
|
if (unlikely(!work))
|
|
@@ -4958,6 +5017,7 @@ int main(int argc, char *argv[])
|
|
|
#endif
|
|
#endif
|
|
|
|
|
|
|
|
mutex_init(&hash_lock);
|
|
mutex_init(&hash_lock);
|
|
|
|
|
+ mutex_init(&qd_lock);
|
|
|
#ifdef HAVE_CURSES
|
|
#ifdef HAVE_CURSES
|
|
|
mutex_init(&curses_lock);
|
|
mutex_init(&curses_lock);
|
|
|
#endif
|
|
#endif
|