lbalance.c 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326
  1. #include <ccan/lbalance/lbalance.h>
  2. #include <ccan/tlist/tlist.h>
  3. #include <sys/time.h>
  4. #include <sys/resource.h>
  5. #include <unistd.h>
  6. #include <errno.h>
  7. #include <assert.h>
  8. #include <stdlib.h>
  9. /* Define tlist_lbalance_task */
  10. TLIST_TYPE(lbalance_task, struct lbalance_task);
  11. struct stats {
  12. /* How many stats of for this value do we have? */
  13. unsigned int num_stats;
  14. /* What was our total work rate? */
  15. float work_rate;
  16. };
  17. struct lbalance {
  18. struct tlist_lbalance_task tasks;
  19. unsigned int num_tasks;
  20. /* We figured out how many we want to run. */
  21. unsigned int target;
  22. /* We need to recalc once a report comes in via lbalance_task_free. */
  23. bool target_uptodate;
  24. /* Integral of how many tasks were running so far */
  25. struct timeval prev_tasks_time;
  26. float tasks_sum;
  27. /* For differential rusage. */
  28. struct rusage prev_usage;
  29. /* How many stats we have collected (we invalidate old ones). */
  30. unsigned int total_stats;
  31. /* Array of stats, indexed by number of tasks we were running. */
  32. unsigned int max_stats;
  33. struct stats *stats;
  34. };
  35. struct lbalance_task {
  36. struct lbalance *lb;
  37. struct list_node list;
  38. /* The time this task started */
  39. struct timeval start;
  40. float tasks_sum_start;
  41. };
  42. struct lbalance *lbalance_new(void)
  43. {
  44. struct lbalance *lb = malloc(sizeof *lb);
  45. if (!lb)
  46. return NULL;
  47. tlist_init(&lb->tasks);
  48. lb->num_tasks = 0;
  49. gettimeofday(&lb->prev_tasks_time, NULL);
  50. lb->tasks_sum = 0.0;
  51. getrusage(RUSAGE_CHILDREN, &lb->prev_usage);
  52. lb->max_stats = 1;
  53. lb->stats = malloc(sizeof(lb->stats[0]) * lb->max_stats);
  54. if (!lb->stats) {
  55. free(lb);
  56. return NULL;
  57. }
  58. lb->stats[0].num_stats = 0;
  59. lb->stats[0].work_rate = 0.0;
  60. lb->total_stats = 0;
  61. /* Start with # CPUS as a guess. */
  62. lb->target = -1L;
  63. #ifdef _SC_NPROCESSORS_ONLN
  64. lb->target = sysconf(_SC_NPROCESSORS_ONLN);
  65. #elif defined(_SC_NPROCESSORS_CONF)
  66. if (lb->target == (unsigned int)-1L)
  67. lb->target = sysconf(_SC_NPROCESSORS_CONF);
  68. #endif
  69. /* Otherwise, two is a good number. */
  70. if (lb->target == (unsigned int)-1L || lb->target < 2)
  71. lb->target = 2;
  72. lb->target_uptodate = true;
  73. return lb;
  74. }
  75. /* Return time differences in usec */
  76. static float timeval_sub(struct timeval recent, struct timeval old)
  77. {
  78. float diff;
  79. if (old.tv_usec > recent.tv_usec) {
  80. diff = 1000000 + recent.tv_usec - old.tv_usec;
  81. recent.tv_sec--;
  82. } else
  83. diff = recent.tv_usec - old.tv_usec;
  84. diff += (float)(recent.tv_sec - old.tv_sec) * 1000000;
  85. return diff;
  86. }
  87. /* There were num_tasks running between prev_tasks_time and now. */
  88. static void update_tasks_sum(struct lbalance *lb,
  89. const struct timeval *now)
  90. {
  91. lb->tasks_sum += timeval_sub(*now, lb->prev_tasks_time)
  92. * lb->num_tasks;
  93. lb->prev_tasks_time = *now;
  94. }
  95. struct lbalance_task *lbalance_task_new(struct lbalance *lb)
  96. {
  97. struct lbalance_task *task = malloc(sizeof *task);
  98. if (!task)
  99. return NULL;
  100. if (lb->num_tasks + 1 == lb->max_stats) {
  101. struct stats *s = realloc(lb->stats,
  102. sizeof(*s) * (lb->max_stats + 1));
  103. if (!s) {
  104. free(task);
  105. return NULL;
  106. }
  107. lb->stats = s;
  108. lb->stats[lb->max_stats].num_stats = 0;
  109. lb->stats[lb->max_stats].work_rate = 0.0;
  110. lb->max_stats++;
  111. }
  112. task->lb = lb;
  113. gettimeofday(&task->start, NULL);
  114. /* Record that we ran num_tasks up until now. */
  115. update_tasks_sum(lb, &task->start);
  116. task->tasks_sum_start = lb->tasks_sum;
  117. tlist_add_tail(&lb->tasks, task, list);
  118. lb->num_tasks++;
  119. return task;
  120. }
  121. /* We slowly erase old stats, once we have enough. */
  122. static void degrade_stats(struct lbalance *lb)
  123. {
  124. unsigned int i;
  125. if (lb->total_stats < lb->max_stats * 16)
  126. return;
  127. #if 0
  128. fprintf(stderr, ".");
  129. #endif
  130. for (i = 0; i < lb->max_stats; i++) {
  131. struct stats *s = &lb->stats[i];
  132. unsigned int stats_lost = (s->num_stats + 1) / 2;
  133. s->work_rate *= (float)(s->num_stats - stats_lost)
  134. / s->num_stats;
  135. s->num_stats -= stats_lost;
  136. lb->total_stats -= stats_lost;
  137. if (s->num_stats == 0)
  138. s->work_rate = 0.0;
  139. }
  140. }
  141. static void add_to_stats(struct lbalance *lb,
  142. unsigned int num_tasks,
  143. float work_rate)
  144. {
  145. #if 0
  146. fprintf(stderr, "With %.2f running, work rate was %.5f\n",
  147. num_tasks, work_rate);
  148. #endif
  149. assert(num_tasks >= 1);
  150. assert(num_tasks < lb->max_stats);
  151. lb->stats[num_tasks].num_stats++;
  152. lb->stats[num_tasks].work_rate += work_rate;
  153. lb->total_stats++;
  154. lb->target_uptodate = false;
  155. }
  156. void lbalance_task_free(struct lbalance_task *task,
  157. const struct rusage *usage)
  158. {
  159. float work_done, duration;
  160. unsigned int num_tasks;
  161. struct timeval now;
  162. struct rusage ru;
  163. gettimeofday(&now, NULL);
  164. duration = timeval_sub(now, task->start);
  165. getrusage(RUSAGE_CHILDREN, &ru);
  166. if (usage) {
  167. work_done = usage->ru_utime.tv_usec + usage->ru_stime.tv_usec
  168. + (usage->ru_utime.tv_sec + usage->ru_stime.tv_sec)
  169. * 1000000;
  170. } else {
  171. /* Take difference in rusage as rusage of that task. */
  172. work_done = timeval_sub(ru.ru_utime,
  173. task->lb->prev_usage.ru_utime)
  174. + timeval_sub(ru.ru_stime,
  175. task->lb->prev_usage.ru_utime);
  176. }
  177. /* Update previous usage. */
  178. task->lb->prev_usage = ru;
  179. /* Record that we ran num_tasks up until now. */
  180. update_tasks_sum(task->lb, &now);
  181. /* So, on average, how many tasks were running during this time? */
  182. num_tasks = (task->lb->tasks_sum - task->tasks_sum_start)
  183. / duration + 0.5;
  184. /* Record the work rate for that many tasks. */
  185. add_to_stats(task->lb, num_tasks, work_done / duration);
  186. /* We throw away old stats. */
  187. degrade_stats(task->lb);
  188. /* We need to recalculate the target. */
  189. task->lb->target_uptodate = false;
  190. /* Remove this task. */
  191. tlist_del_from(&task->lb->tasks, task, list);
  192. task->lb->num_tasks--;
  193. free(task);
  194. }
  195. /* We look for the point where the work rate starts to drop. Say you have
  196. * 4 cpus, we'd expect the work rate for 5 processes to drop 20%.
  197. *
  198. * If we're within 1/4 of that ideal ratio, we assume it's still
  199. * optimal. Any drop of more than 1/2 is interpreted as the point we
  200. * are overloaded. */
  201. static unsigned int best_target(const struct lbalance *lb)
  202. {
  203. unsigned int i, best = 0, found_drop = 0;
  204. float best_f_max = -1.0, cliff = -1.0;
  205. #if 0
  206. for (i = 1; i < lb->max_stats; i++) {
  207. printf("%u: %f (%u)\n", i,
  208. lb->stats[i].work_rate / lb->stats[i].num_stats,
  209. lb->stats[i].num_stats);
  210. }
  211. #endif
  212. for (i = 1; i < lb->max_stats; i++) {
  213. float f;
  214. if (!lb->stats[i].num_stats)
  215. f = 0;
  216. else
  217. f = lb->stats[i].work_rate / lb->stats[i].num_stats;
  218. if (f > best_f_max) {
  219. #if 0
  220. printf("Best is %i\n", i);
  221. #endif
  222. best_f_max = f - (f / (i + 1)) / 4;
  223. cliff = f - (f / (i + 1)) / 2;
  224. best = i;
  225. found_drop = 0;
  226. } else if (!found_drop && f < cliff) {
  227. #if 0
  228. printf("Found drop at %i\n", i);
  229. #endif
  230. found_drop = i;
  231. }
  232. }
  233. if (found_drop) {
  234. return found_drop - 1;
  235. }
  236. return i - 1;
  237. }
  238. static unsigned int calculate_target(struct lbalance *lb)
  239. {
  240. unsigned int target;
  241. target = best_target(lb);
  242. /* Jitter if the adjacent ones are unknown. */
  243. if (target >= lb->max_stats || lb->stats[target].num_stats == 0)
  244. return target;
  245. if (target + 1 == lb->max_stats || lb->stats[target+1].num_stats == 0)
  246. return target + 1;
  247. if (target > 1 && lb->stats[target-1].num_stats == 0)
  248. return target - 1;
  249. return target;
  250. }
  251. unsigned lbalance_target(struct lbalance *lb)
  252. {
  253. if (!lb->target_uptodate) {
  254. lb->target = calculate_target(lb);
  255. lb->target_uptodate = true;
  256. }
  257. return lb->target;
  258. }
  259. void lbalance_free(struct lbalance *lb)
  260. {
  261. struct lbalance_task *task;
  262. while ((task = tlist_top(&lb->tasks, struct lbalance_task, list))) {
  263. assert(task->lb == lb);
  264. tlist_del_from(&lb->tasks, task, list);
  265. lb->num_tasks--;
  266. free(task);
  267. }
  268. assert(lb->num_tasks == 0);
  269. free(lb->stats);
  270. free(lb);
  271. }