io.c 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501
  1. /* Licensed under LGPLv2.1+ - see LICENSE file for details */
  2. #include "io.h"
  3. #include "backend.h"
  4. #include <sys/types.h>
  5. #include <sys/socket.h>
  6. #include <netdb.h>
  7. #include <string.h>
  8. #include <errno.h>
  9. #include <stdlib.h>
  10. #include <assert.h>
  11. #include <poll.h>
  12. #include <unistd.h>
  13. #include <fcntl.h>
  14. void *io_loop_return;
  15. struct io_alloc io_alloc = {
  16. malloc, realloc, free
  17. };
  18. #ifdef DEBUG
  19. /* Set to skip the next plan. */
  20. bool io_plan_nodebug;
  21. /* The current connection to apply plan to. */
  22. struct io_conn *current;
  23. /* User-defined function to select which connection(s) to debug. */
  24. bool (*io_debug_conn)(struct io_conn *conn);
  25. /* Set when we wake up an connection we are debugging. */
  26. bool io_debug_wakeup;
  27. struct io_plan io_debug(struct io_plan plan)
  28. {
  29. struct io_conn *ready = NULL;
  30. if (io_plan_nodebug) {
  31. io_plan_nodebug = false;
  32. return plan;
  33. }
  34. if (!current || !doing_debug_on(current)) {
  35. if (!io_debug_wakeup)
  36. return plan;
  37. }
  38. io_debug_wakeup = false;
  39. current->plan = plan;
  40. backend_plan_changed(current);
  41. /* If it closed, close duplex. */
  42. if (!current->plan.next && current->duplex) {
  43. current->duplex->plan = io_close_();
  44. backend_plan_changed(current->duplex);
  45. }
  46. /* Call back into the loop immediately. */
  47. io_loop_return = do_io_loop(&ready);
  48. if (ready) {
  49. set_current(ready);
  50. if (!ready->plan.next) {
  51. /* Call finish function immediately. */
  52. if (ready->finish) {
  53. errno = ready->plan.u1.s;
  54. ready->finish(ready, ready->finish_arg);
  55. ready->finish = NULL;
  56. }
  57. backend_del_conn(ready);
  58. } else {
  59. /* Calls back in itself, via io_debug_io(). */
  60. if (ready->plan.io(ready->fd.fd, &ready->plan) != 2)
  61. abort();
  62. }
  63. set_current(NULL);
  64. }
  65. /* Return a do-nothing plan, so backend_plan_changed in
  66. * io_ready doesn't do anything (it's already been called). */
  67. return io_idle_();
  68. }
  69. int io_debug_io(int ret)
  70. {
  71. /* Cache it for debugging; current changes. */
  72. struct io_conn *conn = current;
  73. int saved_errno = errno;
  74. if (!doing_debug_on(conn))
  75. return ret;
  76. /* These will all go linearly through the io_debug() path above. */
  77. switch (ret) {
  78. case -1:
  79. /* This will call io_debug above. */
  80. errno = saved_errno;
  81. io_close();
  82. break;
  83. case 0: /* Keep going with plan. */
  84. io_debug(conn->plan);
  85. break;
  86. case 1: /* Done: get next plan. */
  87. if (timeout_active(conn))
  88. backend_del_timeout(conn);
  89. conn->plan.next(conn, conn->plan.next_arg);
  90. break;
  91. default:
  92. abort();
  93. }
  94. /* Normally-invalid value, used for sanity check. */
  95. return 2;
  96. }
  97. static void debug_io_wake(struct io_conn *conn)
  98. {
  99. /* We want linear if we wake a debugged connection, too. */
  100. if (io_debug_conn && io_debug_conn(conn))
  101. io_debug_wakeup = true;
  102. }
  103. /* Counterpart to io_plan_no_debug(), called in macros in io.h */
  104. static void io_plan_debug_again(void)
  105. {
  106. io_plan_nodebug = false;
  107. }
  108. #else
  109. static void debug_io_wake(struct io_conn *conn)
  110. {
  111. }
  112. static void io_plan_debug_again(void)
  113. {
  114. }
  115. #endif
  116. struct io_listener *io_new_listener_(int fd,
  117. void (*init)(int fd, void *arg),
  118. void *arg)
  119. {
  120. struct io_listener *l = io_alloc.alloc(sizeof(*l));
  121. if (!l)
  122. return NULL;
  123. l->fd.listener = true;
  124. l->fd.fd = fd;
  125. l->init = init;
  126. l->arg = arg;
  127. if (!add_listener(l)) {
  128. io_alloc.free(l);
  129. return NULL;
  130. }
  131. return l;
  132. }
  133. void io_close_listener(struct io_listener *l)
  134. {
  135. close(l->fd.fd);
  136. del_listener(l);
  137. io_alloc.free(l);
  138. }
  139. struct io_conn *io_new_conn_(int fd, struct io_plan plan)
  140. {
  141. struct io_conn *conn = io_alloc.alloc(sizeof(*conn));
  142. io_plan_debug_again();
  143. if (!conn)
  144. return NULL;
  145. conn->fd.listener = false;
  146. conn->fd.fd = fd;
  147. conn->plan = plan;
  148. conn->finish = NULL;
  149. conn->finish_arg = NULL;
  150. conn->duplex = NULL;
  151. conn->timeout = NULL;
  152. if (!add_conn(conn)) {
  153. io_alloc.free(conn);
  154. return NULL;
  155. }
  156. return conn;
  157. }
  158. void io_set_finish_(struct io_conn *conn,
  159. void (*finish)(struct io_conn *, void *),
  160. void *arg)
  161. {
  162. conn->finish = finish;
  163. conn->finish_arg = arg;
  164. }
  165. struct io_conn *io_duplex_(struct io_conn *old, struct io_plan plan)
  166. {
  167. struct io_conn *conn;
  168. io_plan_debug_again();
  169. assert(!old->duplex);
  170. conn = io_alloc.alloc(sizeof(*conn));
  171. if (!conn)
  172. return NULL;
  173. conn->fd.listener = false;
  174. conn->fd.fd = old->fd.fd;
  175. conn->plan = plan;
  176. conn->duplex = old;
  177. conn->finish = NULL;
  178. conn->finish_arg = NULL;
  179. conn->timeout = NULL;
  180. if (!add_duplex(conn)) {
  181. io_alloc.free(conn);
  182. return NULL;
  183. }
  184. old->duplex = conn;
  185. return conn;
  186. }
  187. bool io_timeout_(struct io_conn *conn, struct timespec ts,
  188. struct io_plan (*cb)(struct io_conn *, void *), void *arg)
  189. {
  190. assert(cb);
  191. if (!conn->timeout) {
  192. conn->timeout = io_alloc.alloc(sizeof(*conn->timeout));
  193. if (!conn->timeout)
  194. return false;
  195. } else
  196. assert(!timeout_active(conn));
  197. conn->timeout->next = cb;
  198. conn->timeout->next_arg = arg;
  199. backend_add_timeout(conn, ts);
  200. return true;
  201. }
  202. /* Returns true if we're finished. */
  203. static int do_write(int fd, struct io_plan *plan)
  204. {
  205. ssize_t ret = write(fd, plan->u1.cp, plan->u2.s);
  206. if (ret < 0)
  207. return io_debug_io(-1);
  208. plan->u1.cp += ret;
  209. plan->u2.s -= ret;
  210. return io_debug_io(plan->u2.s == 0);
  211. }
  212. /* Queue some data to be written. */
  213. struct io_plan io_write_(const void *data, size_t len,
  214. struct io_plan (*cb)(struct io_conn *, void *),
  215. void *arg)
  216. {
  217. struct io_plan plan;
  218. assert(cb);
  219. plan.u1.const_vp = data;
  220. plan.u2.s = len;
  221. plan.io = do_write;
  222. plan.next = cb;
  223. plan.next_arg = arg;
  224. plan.pollflag = POLLOUT;
  225. return plan;
  226. }
  227. static int do_read(int fd, struct io_plan *plan)
  228. {
  229. ssize_t ret = read(fd, plan->u1.cp, plan->u2.s);
  230. if (ret <= 0)
  231. return io_debug_io(-1);
  232. plan->u1.cp += ret;
  233. plan->u2.s -= ret;
  234. return io_debug_io(plan->u2.s == 0);
  235. }
  236. /* Queue a request to read into a buffer. */
  237. struct io_plan io_read_(void *data, size_t len,
  238. struct io_plan (*cb)(struct io_conn *, void *),
  239. void *arg)
  240. {
  241. struct io_plan plan;
  242. assert(cb);
  243. plan.u1.cp = data;
  244. plan.u2.s = len;
  245. plan.io = do_read;
  246. plan.next = cb;
  247. plan.next_arg = arg;
  248. plan.pollflag = POLLIN;
  249. return plan;
  250. }
  251. static int do_read_partial(int fd, struct io_plan *plan)
  252. {
  253. ssize_t ret = read(fd, plan->u1.cp, *(size_t *)plan->u2.vp);
  254. if (ret <= 0)
  255. return io_debug_io(-1);
  256. *(size_t *)plan->u2.vp = ret;
  257. return io_debug_io(1);
  258. }
  259. /* Queue a partial request to read into a buffer. */
  260. struct io_plan io_read_partial_(void *data, size_t *len,
  261. struct io_plan (*cb)(struct io_conn *, void *),
  262. void *arg)
  263. {
  264. struct io_plan plan;
  265. assert(cb);
  266. plan.u1.cp = data;
  267. plan.u2.vp = len;
  268. plan.io = do_read_partial;
  269. plan.next = cb;
  270. plan.next_arg = arg;
  271. plan.pollflag = POLLIN;
  272. return plan;
  273. }
  274. static int do_write_partial(int fd, struct io_plan *plan)
  275. {
  276. ssize_t ret = write(fd, plan->u1.cp, *(size_t *)plan->u2.vp);
  277. if (ret < 0)
  278. return io_debug_io(-1);
  279. *(size_t *)plan->u2.vp = ret;
  280. return io_debug_io(1);
  281. }
  282. /* Queue a partial write request. */
  283. struct io_plan io_write_partial_(const void *data, size_t *len,
  284. struct io_plan (*cb)(struct io_conn*, void *),
  285. void *arg)
  286. {
  287. struct io_plan plan;
  288. assert(cb);
  289. plan.u1.const_vp = data;
  290. plan.u2.vp = len;
  291. plan.io = do_write_partial;
  292. plan.next = cb;
  293. plan.next_arg = arg;
  294. plan.pollflag = POLLOUT;
  295. return plan;
  296. }
  297. static int already_connected(int fd, struct io_plan *plan)
  298. {
  299. return io_debug_io(1);
  300. }
  301. static int do_connect(int fd, struct io_plan *plan)
  302. {
  303. int err, ret;
  304. socklen_t len = sizeof(err);
  305. /* Has async connect finished? */
  306. ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len);
  307. if (ret < 0)
  308. return -1;
  309. if (err == 0) {
  310. /* Restore blocking if it was initially. */
  311. fcntl(fd, F_SETFD, plan->u1.s);
  312. return 1;
  313. }
  314. return 0;
  315. }
  316. struct io_plan io_connect_(int fd, const struct addrinfo *addr,
  317. struct io_plan (*cb)(struct io_conn*, void *),
  318. void *arg)
  319. {
  320. struct io_plan plan;
  321. assert(cb);
  322. plan.next = cb;
  323. plan.next_arg = arg;
  324. /* Save old flags, set nonblock if not already. */
  325. plan.u1.s = fcntl(fd, F_GETFD);
  326. fcntl(fd, F_SETFD, plan.u1.s | O_NONBLOCK);
  327. /* Immediate connect can happen. */
  328. if (connect(fd, addr->ai_addr, addr->ai_addrlen) == 0) {
  329. /* Dummy will be called immediately. */
  330. plan.pollflag = POLLOUT;
  331. plan.io = already_connected;
  332. } else {
  333. if (errno != EINPROGRESS)
  334. return io_close_();
  335. plan.pollflag = POLLIN;
  336. plan.io = do_connect;
  337. }
  338. return plan;
  339. }
  340. struct io_plan io_idle_(void)
  341. {
  342. struct io_plan plan;
  343. plan.pollflag = 0;
  344. plan.io = NULL;
  345. /* Never called (overridden by io_wake), but NULL means closing */
  346. plan.next = (void *)io_idle_;
  347. return plan;
  348. }
  349. void io_wake_(struct io_conn *conn, struct io_plan plan)
  350. {
  351. io_plan_debug_again();
  352. /* It might be closing, but we haven't called its finish() yet. */
  353. if (!conn->plan.next)
  354. return;
  355. /* It was idle, right? */
  356. assert(!conn->plan.io);
  357. conn->plan = plan;
  358. backend_plan_changed(conn);
  359. debug_io_wake(conn);
  360. }
  361. void io_ready(struct io_conn *conn)
  362. {
  363. set_current(conn);
  364. switch (conn->plan.io(conn->fd.fd, &conn->plan)) {
  365. case -1: /* Failure means a new plan: close up. */
  366. conn->plan = io_close();
  367. backend_plan_changed(conn);
  368. break;
  369. case 0: /* Keep going with plan. */
  370. break;
  371. case 1: /* Done: get next plan. */
  372. if (timeout_active(conn))
  373. backend_del_timeout(conn);
  374. conn->plan = conn->plan.next(conn, conn->plan.next_arg);
  375. backend_plan_changed(conn);
  376. }
  377. set_current(NULL);
  378. /* If it closed, close duplex. */
  379. if (!conn->plan.next && conn->duplex) {
  380. set_current(conn->duplex);
  381. conn->duplex->plan = io_close();
  382. backend_plan_changed(conn->duplex);
  383. set_current(NULL);
  384. }
  385. }
  386. /* Close the connection, we're done. */
  387. struct io_plan io_close_(void)
  388. {
  389. struct io_plan plan;
  390. plan.pollflag = 0;
  391. /* This means we're closing. */
  392. plan.next = NULL;
  393. plan.u1.s = errno;
  394. return plan;
  395. }
  396. struct io_plan io_close_cb(struct io_conn *conn, void *arg)
  397. {
  398. return io_close();
  399. }
  400. /* Exit the loop, returning this (non-NULL) arg. */
  401. struct io_plan io_break_(void *ret, struct io_plan plan)
  402. {
  403. io_plan_debug_again();
  404. assert(ret);
  405. io_loop_return = ret;
  406. return plan;
  407. }
  408. int io_conn_fd(const struct io_conn *conn)
  409. {
  410. return conn->fd.fd;
  411. }
  412. void io_set_alloc(void *(*allocfn)(size_t size),
  413. void *(*reallocfn)(void *ptr, size_t size),
  414. void (*freefn)(void *ptr))
  415. {
  416. io_alloc.alloc = allocfn;
  417. io_alloc.realloc = reallocfn;
  418. io_alloc.free = freefn;
  419. }