io.c 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536
  1. /* Licensed under LGPLv2.1+ - see LICENSE file for details */
  2. #include "io.h"
  3. #include "backend.h"
  4. #include <sys/types.h>
  5. #include <sys/socket.h>
  6. #include <netdb.h>
  7. #include <string.h>
  8. #include <errno.h>
  9. #include <stdlib.h>
  10. #include <assert.h>
  11. #include <unistd.h>
  12. #include <fcntl.h>
  13. void *io_loop_return;
  14. struct io_alloc io_alloc = {
  15. malloc, realloc, free
  16. };
  17. #ifdef DEBUG
  18. /* Set to skip the next plan. */
  19. bool io_plan_nodebug;
  20. /* The current connection to apply plan to. */
  21. struct io_conn *current;
  22. /* User-defined function to select which connection(s) to debug. */
  23. bool (*io_debug_conn)(struct io_conn *conn);
  24. struct io_plan io_debug(struct io_plan plan)
  25. {
  26. struct io_conn *ready = NULL;
  27. if (io_plan_nodebug) {
  28. io_plan_nodebug = false;
  29. return plan;
  30. }
  31. if (!current || !doing_debug_on(current))
  32. return plan;
  33. current->plan = plan;
  34. backend_plan_changed(current);
  35. /* Call back into the loop immediately. */
  36. io_loop_return = do_io_loop(&ready);
  37. if (ready) {
  38. set_current(ready);
  39. if (!ready->plan.next) {
  40. /* Call finish function immediately. */
  41. if (ready->finish) {
  42. errno = ready->plan.u1.s;
  43. ready->finish(ready, ready->finish_arg);
  44. ready->finish = NULL;
  45. }
  46. backend_del_conn(ready);
  47. } else {
  48. /* Calls back in itself, via io_debug_io(). */
  49. if (ready->plan.io(ready->fd.fd, &ready->plan) != 2)
  50. abort();
  51. }
  52. set_current(NULL);
  53. }
  54. /* Return a do-nothing plan, so backend_plan_changed in
  55. * io_ready doesn't do anything (it's already been called). */
  56. return io_wait_(NULL, (void *)1, NULL);
  57. }
  58. int io_debug_io(int ret)
  59. {
  60. /* Cache it for debugging; current changes. */
  61. struct io_conn *conn = current;
  62. int saved_errno = errno;
  63. if (!doing_debug_on(conn))
  64. return ret;
  65. /* These will all go linearly through the io_debug() path above. */
  66. switch (ret) {
  67. case -1:
  68. /* This will call io_debug above. */
  69. errno = saved_errno;
  70. io_close();
  71. break;
  72. case 0: /* Keep going with plan. */
  73. io_debug(conn->plan);
  74. break;
  75. case 1: /* Done: get next plan. */
  76. if (timeout_active(conn))
  77. backend_del_timeout(conn);
  78. /* In case they call io_duplex, clear our poll flags so
  79. * both sides don't seem to be both doing read or write
  80. * (See assert(!mask || pfd->events != mask) in poll.c) */
  81. conn->plan.pollflag = 0;
  82. conn->plan.next(conn, conn->plan.next_arg);
  83. break;
  84. default:
  85. abort();
  86. }
  87. /* Normally-invalid value, used for sanity check. */
  88. return 2;
  89. }
  90. /* Counterpart to io_plan_no_debug(), called in macros in io.h */
  91. static void io_plan_debug_again(void)
  92. {
  93. io_plan_nodebug = false;
  94. }
  95. #else
  96. static void io_plan_debug_again(void)
  97. {
  98. }
  99. #endif
  100. struct io_listener *io_new_listener_(int fd,
  101. void (*init)(int fd, void *arg),
  102. void *arg)
  103. {
  104. struct io_listener *l = io_alloc.alloc(sizeof(*l));
  105. if (!l)
  106. return NULL;
  107. l->fd.listener = true;
  108. l->fd.fd = fd;
  109. l->init = init;
  110. l->arg = arg;
  111. if (!add_listener(l)) {
  112. io_alloc.free(l);
  113. return NULL;
  114. }
  115. return l;
  116. }
  117. void io_close_listener(struct io_listener *l)
  118. {
  119. close(l->fd.fd);
  120. del_listener(l);
  121. io_alloc.free(l);
  122. }
  123. struct io_conn *io_new_conn_(int fd, struct io_plan plan)
  124. {
  125. struct io_conn *conn = io_alloc.alloc(sizeof(*conn));
  126. io_plan_debug_again();
  127. if (!conn)
  128. return NULL;
  129. conn->fd.listener = false;
  130. conn->fd.fd = fd;
  131. conn->plan = plan;
  132. conn->finish = NULL;
  133. conn->finish_arg = NULL;
  134. conn->duplex = NULL;
  135. conn->timeout = NULL;
  136. if (!add_conn(conn)) {
  137. io_alloc.free(conn);
  138. return NULL;
  139. }
  140. return conn;
  141. }
  142. void io_set_finish_(struct io_conn *conn,
  143. void (*finish)(struct io_conn *, void *),
  144. void *arg)
  145. {
  146. conn->finish = finish;
  147. conn->finish_arg = arg;
  148. }
  149. struct io_conn *io_duplex_(struct io_conn *old, struct io_plan plan)
  150. {
  151. struct io_conn *conn;
  152. io_plan_debug_again();
  153. assert(!old->duplex);
  154. conn = io_alloc.alloc(sizeof(*conn));
  155. if (!conn)
  156. return NULL;
  157. conn->fd.listener = false;
  158. conn->fd.fd = old->fd.fd;
  159. conn->plan = plan;
  160. conn->duplex = old;
  161. conn->finish = NULL;
  162. conn->finish_arg = NULL;
  163. conn->timeout = NULL;
  164. if (!add_duplex(conn)) {
  165. io_alloc.free(conn);
  166. return NULL;
  167. }
  168. old->duplex = conn;
  169. return conn;
  170. }
  171. bool io_timeout_(struct io_conn *conn, struct timerel t,
  172. struct io_plan (*cb)(struct io_conn *, void *), void *arg)
  173. {
  174. assert(cb);
  175. if (!conn->timeout) {
  176. conn->timeout = io_alloc.alloc(sizeof(*conn->timeout));
  177. if (!conn->timeout)
  178. return false;
  179. } else
  180. assert(!timeout_active(conn));
  181. conn->timeout->next = cb;
  182. conn->timeout->next_arg = arg;
  183. backend_add_timeout(conn, t);
  184. return true;
  185. }
  186. /* Always done: call the next thing. */
  187. static int do_always(int fd, struct io_plan *plan)
  188. {
  189. return 1;
  190. }
  191. struct io_plan io_always_(struct io_plan (*cb)(struct io_conn *, void *),
  192. void *arg)
  193. {
  194. struct io_plan plan;
  195. assert(cb);
  196. plan.io = do_always;
  197. plan.next = cb;
  198. plan.next_arg = arg;
  199. plan.pollflag = POLLALWAYS;
  200. return plan;
  201. }
  202. /* Returns true if we're finished. */
  203. static int do_write(int fd, struct io_plan *plan)
  204. {
  205. ssize_t ret = write(fd, plan->u1.cp, plan->u2.s);
  206. if (ret < 0)
  207. return io_debug_io(-1);
  208. plan->u1.cp += ret;
  209. plan->u2.s -= ret;
  210. return io_debug_io(plan->u2.s == 0);
  211. }
  212. /* Queue some data to be written. */
  213. struct io_plan io_write_(const void *data, size_t len,
  214. struct io_plan (*cb)(struct io_conn *, void *),
  215. void *arg)
  216. {
  217. struct io_plan plan;
  218. assert(cb);
  219. if (len == 0)
  220. return io_always_(cb, arg);
  221. plan.u1.const_vp = data;
  222. plan.u2.s = len;
  223. plan.io = do_write;
  224. plan.next = cb;
  225. plan.next_arg = arg;
  226. plan.pollflag = POLLOUT;
  227. return plan;
  228. }
  229. static int do_read(int fd, struct io_plan *plan)
  230. {
  231. ssize_t ret = read(fd, plan->u1.cp, plan->u2.s);
  232. if (ret <= 0)
  233. return io_debug_io(-1);
  234. plan->u1.cp += ret;
  235. plan->u2.s -= ret;
  236. return io_debug_io(plan->u2.s == 0);
  237. }
  238. /* Queue a request to read into a buffer. */
  239. struct io_plan io_read_(void *data, size_t len,
  240. struct io_plan (*cb)(struct io_conn *, void *),
  241. void *arg)
  242. {
  243. struct io_plan plan;
  244. assert(cb);
  245. if (len == 0)
  246. return io_always_(cb, arg);
  247. plan.u1.cp = data;
  248. plan.u2.s = len;
  249. plan.io = do_read;
  250. plan.next = cb;
  251. plan.next_arg = arg;
  252. plan.pollflag = POLLIN;
  253. return plan;
  254. }
  255. static int do_read_partial(int fd, struct io_plan *plan)
  256. {
  257. ssize_t ret = read(fd, plan->u1.cp, *(size_t *)plan->u2.vp);
  258. if (ret <= 0)
  259. return io_debug_io(-1);
  260. *(size_t *)plan->u2.vp = ret;
  261. return io_debug_io(1);
  262. }
  263. /* Queue a partial request to read into a buffer. */
  264. struct io_plan io_read_partial_(void *data, size_t *len,
  265. struct io_plan (*cb)(struct io_conn *, void *),
  266. void *arg)
  267. {
  268. struct io_plan plan;
  269. assert(cb);
  270. if (*len == 0)
  271. return io_always_(cb, arg);
  272. plan.u1.cp = data;
  273. plan.u2.vp = len;
  274. plan.io = do_read_partial;
  275. plan.next = cb;
  276. plan.next_arg = arg;
  277. plan.pollflag = POLLIN;
  278. return plan;
  279. }
  280. static int do_write_partial(int fd, struct io_plan *plan)
  281. {
  282. ssize_t ret = write(fd, plan->u1.cp, *(size_t *)plan->u2.vp);
  283. if (ret < 0)
  284. return io_debug_io(-1);
  285. *(size_t *)plan->u2.vp = ret;
  286. return io_debug_io(1);
  287. }
  288. /* Queue a partial write request. */
  289. struct io_plan io_write_partial_(const void *data, size_t *len,
  290. struct io_plan (*cb)(struct io_conn*, void *),
  291. void *arg)
  292. {
  293. struct io_plan plan;
  294. assert(cb);
  295. if (*len == 0)
  296. return io_always_(cb, arg);
  297. plan.u1.const_vp = data;
  298. plan.u2.vp = len;
  299. plan.io = do_write_partial;
  300. plan.next = cb;
  301. plan.next_arg = arg;
  302. plan.pollflag = POLLOUT;
  303. return plan;
  304. }
  305. static int already_connected(int fd, struct io_plan *plan)
  306. {
  307. return io_debug_io(1);
  308. }
  309. static int do_connect(int fd, struct io_plan *plan)
  310. {
  311. int err, ret;
  312. socklen_t len = sizeof(err);
  313. /* Has async connect finished? */
  314. ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len);
  315. if (ret < 0)
  316. return -1;
  317. if (err == 0) {
  318. /* Restore blocking if it was initially. */
  319. fcntl(fd, F_SETFL, plan->u1.s);
  320. return 1;
  321. } else if (err == EINPROGRESS)
  322. return 0;
  323. errno = err;
  324. return -1;
  325. }
  326. struct io_plan io_connect_(int fd, const struct addrinfo *addr,
  327. struct io_plan (*cb)(struct io_conn*, void *),
  328. void *arg)
  329. {
  330. struct io_plan plan;
  331. assert(cb);
  332. plan.next = cb;
  333. plan.next_arg = arg;
  334. /* Save old flags, set nonblock if not already. */
  335. plan.u1.s = fcntl(fd, F_GETFL);
  336. fcntl(fd, F_SETFL, plan.u1.s | O_NONBLOCK);
  337. /* Immediate connect can happen. */
  338. if (connect(fd, addr->ai_addr, addr->ai_addrlen) == 0) {
  339. /* Dummy will be called immediately. */
  340. plan.pollflag = POLLOUT;
  341. plan.io = already_connected;
  342. } else {
  343. if (errno != EINPROGRESS)
  344. return io_close_();
  345. plan.pollflag = POLLIN;
  346. plan.io = do_connect;
  347. }
  348. return plan;
  349. }
  350. struct io_plan io_wait_(const void *wait,
  351. struct io_plan (*cb)(struct io_conn *, void*),
  352. void *arg)
  353. {
  354. struct io_plan plan;
  355. assert(cb);
  356. plan.pollflag = 0;
  357. plan.io = NULL;
  358. plan.next = cb;
  359. plan.next_arg = arg;
  360. plan.u1.const_vp = wait;
  361. return plan;
  362. }
  363. void io_wake(const void *wait)
  364. {
  365. backend_wait_changed(wait);
  366. }
  367. void io_ready(struct io_conn *conn)
  368. {
  369. /* Beware io_close_other! */
  370. if (!conn->plan.next)
  371. return;
  372. set_current(conn);
  373. switch (conn->plan.io(conn->fd.fd, &conn->plan)) {
  374. case -1: /* Failure means a new plan: close up. */
  375. conn->plan = io_close();
  376. backend_plan_changed(conn);
  377. break;
  378. case 0: /* Keep going with plan. */
  379. break;
  380. case 1: /* Done: get next plan. */
  381. if (timeout_active(conn))
  382. backend_del_timeout(conn);
  383. /* In case they call io_duplex, clear our poll flags so
  384. * both sides don't seem to be both doing read or write
  385. * (See assert(!mask || pfd->events != mask) in poll.c) */
  386. conn->plan.pollflag = 0;
  387. conn->plan = conn->plan.next(conn, conn->plan.next_arg);
  388. backend_plan_changed(conn);
  389. }
  390. set_current(NULL);
  391. }
  392. /* Close the connection, we're done. */
  393. struct io_plan io_close_(void)
  394. {
  395. struct io_plan plan;
  396. plan.pollflag = 0;
  397. /* This means we're closing. */
  398. plan.next = NULL;
  399. plan.u1.s = errno;
  400. return plan;
  401. }
  402. struct io_plan io_close_cb(struct io_conn *conn, void *arg)
  403. {
  404. return io_close();
  405. }
  406. void io_close_other(struct io_conn *conn)
  407. {
  408. /* Don't close if already closing! */
  409. if (conn->plan.next) {
  410. conn->plan = io_close_();
  411. backend_plan_changed(conn);
  412. }
  413. }
  414. /* Exit the loop, returning this (non-NULL) arg. */
  415. struct io_plan io_break_(void *ret, struct io_plan plan)
  416. {
  417. io_plan_debug_again();
  418. assert(ret);
  419. io_loop_return = ret;
  420. return plan;
  421. }
  422. static struct io_plan io_never_called(struct io_conn *conn, void *arg)
  423. {
  424. abort();
  425. }
  426. struct io_plan io_never(void)
  427. {
  428. return io_always_(io_never_called, NULL);
  429. }
  430. int io_conn_fd(const struct io_conn *conn)
  431. {
  432. return conn->fd.fd;
  433. }
  434. void io_set_alloc(void *(*allocfn)(size_t size),
  435. void *(*reallocfn)(void *ptr, size_t size),
  436. void (*freefn)(void *ptr))
  437. {
  438. io_alloc.alloc = allocfn;
  439. io_alloc.realloc = reallocfn;
  440. io_alloc.free = freefn;
  441. }