async.c 5.1 KB


  1. #include "ccanlint.h"
  2. #include "../tools.h"
  3. #include <sys/types.h>
  4. #include <sys/time.h>
  5. #include <sys/resource.h>
  6. #include <sys/wait.h>
  7. #include <sys/stat.h>
  8. #include <fcntl.h>
  9. #include <stdlib.h>
  10. #include <unistd.h>
  11. #include <err.h>
  12. #include <assert.h>
  13. #include <ccan/lbalance/lbalance.h>
  14. #include <ccan/tlist/tlist.h>
  15. #include <ccan/grab_file/grab_file.h>
  16. #include <ccan/time/time.h>
  17. #include <ccan/talloc/talloc.h>
  18. static struct lbalance *lb;
  19. TLIST_TYPE(command, struct command);
  20. static struct tlist_command pending = TLIST_INIT(pending);
  21. static struct tlist_command running = TLIST_INIT(running);
  22. static unsigned int num_running = 0;
  23. static struct tlist_command done = TLIST_INIT(done);
  24. struct command {
  25. struct list_node list;
  26. char *command;
  27. pid_t pid;
  28. int output_fd;
  29. unsigned int time_ms;
  30. struct lbalance_task *task;
  31. int status;
  32. char *output;
  33. bool done;
  34. const void *ctx;
  35. };
  36. static void killme(int sig)
  37. {
  38. kill(-getpid(), SIGKILL);
  39. }
  40. static void run_more(void)
  41. {
  42. struct command *c;
  43. while (num_running < lbalance_target(lb)) {
  44. int p[2];
  45. c = tlist_top(&pending, list);
  46. if (!c)
  47. break;
  48. fflush(stdout);
  49. if (pipe(p) != 0)
  50. err(1, "Pipe failed");
  51. c->pid = fork();
  52. if (c->pid == -1)
  53. err(1, "Fork failed");
  54. if (c->pid == 0) {
  55. struct itimerval itim;
  56. if (dup2(p[1], STDOUT_FILENO) != STDOUT_FILENO
  57. || dup2(p[1], STDERR_FILENO) != STDERR_FILENO
  58. || close(p[0]) != 0
  59. || close(STDIN_FILENO) != 0
  60. || open("/dev/null", O_RDONLY) != STDIN_FILENO)
  61. exit(128);
  62. signal(SIGALRM, killme);
  63. itim.it_interval.tv_sec = itim.it_interval.tv_usec = 0;
  64. itim.it_value = time_from_msec(c->time_ms);
  65. setitimer(ITIMER_REAL, &itim, NULL);
  66. c->status = system(c->command);
  67. if (WIFEXITED(c->status))
  68. exit(WEXITSTATUS(c->status));
  69. /* Here's a hint... */
  70. exit(128 + WTERMSIG(c->status));
  71. }
  72. if (tools_verbose)
  73. printf("Running async: %s => %i\n", c->command, c->pid);
  74. close(p[1]);
  75. c->output_fd = p[0];
  76. c->task = lbalance_task_new(lb);
  77. tlist_del_from(&pending, c, list);
  78. tlist_add_tail(&running, c, list);
  79. num_running++;
  80. }
  81. }
  82. static int destroy_command(struct command *command)
  83. {
  84. if (!command->done && command->pid) {
  85. kill(-command->pid, SIGKILL);
  86. close(command->output_fd);
  87. num_running--;
  88. }
  89. tlist_del(command, list);
  90. return 0;
  91. }
  92. void run_command_async(const void *ctx, unsigned int time_ms,
  93. const char *fmt, ...)
  94. {
  95. struct command *command;
  96. va_list ap;
  97. assert(ctx);
  98. if (!lb)
  99. lb = lbalance_new();
  100. command = talloc(ctx, struct command);
  101. command->ctx = ctx;
  102. command->time_ms = time_ms;
  103. command->pid = 0;
  104. command->output = talloc_strdup(command, "");
  105. va_start(ap, fmt);
  106. command->command = talloc_vasprintf(command, fmt, ap);
  107. va_end(ap);
  108. tlist_add_tail(&pending, command, list);
  109. command->done = false;
  110. talloc_set_destructor(command, destroy_command);
  111. run_more();
  112. }
  113. static void reap_output(void)
  114. {
  115. fd_set in;
  116. struct command *c, *next;
  117. int max_fd = 0;
  118. FD_ZERO(&in);
  119. tlist_for_each(&running, c, list) {
  120. FD_SET(c->output_fd, &in);
  121. if (c->output_fd > max_fd)
  122. max_fd = c->output_fd;
  123. }
  124. if (select(max_fd+1, &in, NULL, NULL, NULL) < 0)
  125. err(1, "select failed");
  126. tlist_for_each_safe(&running, c, next, list) {
  127. if (FD_ISSET(c->output_fd, &in)) {
  128. int old_len, len;
  129. /* This length includes nul terminator! */
  130. old_len = talloc_array_length(c->output);
  131. c->output = talloc_realloc(c, c->output, char,
  132. old_len + 1024);
  133. len = read(c->output_fd, c->output + old_len - 1, 1024);
  134. if (len < 0)
  135. err(1, "Reading from async command");
  136. c->output = talloc_realloc(c, c->output, char,
  137. old_len + len);
  138. c->output[old_len + len - 1] = '\0';
  139. if (len == 0) {
  140. struct rusage ru;
  141. wait4(c->pid, &c->status, 0, &ru);
  142. if (tools_verbose)
  143. printf("Finished async %i: %s %u\n",
  144. c->pid,
  145. WIFEXITED(c->status)
  146. ? "exit status"
  147. : "killed by signal",
  148. WIFEXITED(c->status)
  149. ? WEXITSTATUS(c->status)
  150. : WTERMSIG(c->status));
  151. lbalance_task_free(c->task, &ru);
  152. c->task = NULL;
  153. c->done = true;
  154. close(c->output_fd);
  155. tlist_del_from(&running, c, list);
  156. tlist_add_tail(&done, c, list);
  157. num_running--;
  158. }
  159. }
  160. }
  161. }
  162. void *collect_command(bool *ok, char **output)
  163. {
  164. struct command *c;
  165. const void *ctx;
  166. while ((c = tlist_top(&done, list)) == NULL) {
  167. if (tlist_empty(&pending) && tlist_empty(&running))
  168. return NULL;
  169. reap_output();
  170. run_more();
  171. }
  172. *ok = (WIFEXITED(c->status) && WEXITSTATUS(c->status) == 0);
  173. ctx = c->ctx;
  174. *output = talloc_steal(ctx, c->output);
  175. talloc_free(c);
  176. return (void *)ctx;
  177. }
  178. /* Compile and link single C file, with object files, async. */
  179. void compile_and_link_async(const void *ctx, unsigned int time_ms,
  180. const char *cfile, const char *ccandir,
  181. const char *objs, const char *compiler,
  182. const char *cflags,
  183. const char *libs, const char *outfile)
  184. {
  185. if (compile_verbose)
  186. printf("Compiling and linking (async) %s\n", outfile);
  187. run_command_async(ctx, time_ms,
  188. "%s %s -I%s -o %s %s %s %s",
  189. compiler, cflags,
  190. ccandir, outfile, cfile, objs, libs);
  191. }