Browse Source

Add antithread. Not finished, but useful as example of module whose
dependents have dependents.

Rusty Russell 17 years ago
parent
commit
36e8b652d0

+ 23 - 0
ccan/antithread/_info.c

@@ -0,0 +1,23 @@
+#include <stdio.h>
+#include <string.h>
+#include "config.h"
+
+/**
+ * antithread - Accelerated Native Technology Implementation of "threads"
+ *
+ * Threads suck.  Antithreads try not to.  FIXME.
+ */
+int main(int argc, char *argv[])
+{
+	if (argc != 2)
+		return 1;
+
+	if (strcmp(argv[1], "depends") == 0) {
+		printf("ccan/talloc\n");
+		printf("ccan/alloc\n");
+		printf("ccan/noerr\n");
+		return 0;
+	}
+
+	return 1;
+}

+ 481 - 0
ccan/antithread/antithread.c

@@ -0,0 +1,481 @@
+#include <stdlib.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <stdbool.h>
+#include <string.h>
+#include <sys/mman.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+#include <errno.h>
+#include <err.h>
+#include "antithread.h"
+#include "noerr/noerr.h"
+#include "talloc/talloc.h"
+#include "alloc/alloc.h"
+
+/* FIXME: Valgrind support should be possible for some cases.  Tricky
+ * case is where another process allocates for you, but at worst we
+ * could reset what is valid and what isn't on every entry into the
+ * library or something. */
+
+struct at_pool
+{
+	const void *ctx;
+	void *pool;
+	unsigned long poolsize;
+	int fd;
+	int parent_rfd, parent_wfd;
+};
+
+struct athread
+{
+	pid_t pid;
+	int rfd, wfd;
+};
+
+/* FIXME: Better locking through futexes. */
+static void lock(int fd, unsigned long off)
+{
+	struct flock fl;
+
+	fl.l_type = F_WRLCK;
+	fl.l_whence = SEEK_SET;
+	fl.l_start = off;
+	fl.l_len = 1;
+
+	while (fcntl(fd, F_SETLKW, &fl) < 0) {
+		if (errno != EINTR)
+			err(1, "Failure locking antithread file");
+	}
+}
+
+static void unlock(int fd, unsigned long off)
+{
+	struct flock fl;
+	int serrno = errno;
+
+	fl.l_type = F_UNLCK;
+	fl.l_whence = SEEK_SET;
+	fl.l_start = off;
+	fl.l_len = 1;
+
+	fcntl(fd, F_SETLK, &fl);
+	errno = serrno;
+}
+
+static void *at_realloc(const void *parent, void *ptr, size_t size)
+{
+	struct at_pool *p = talloc_find_parent_bytype(parent, struct at_pool);
+	/* FIXME: realloc in ccan/alloc? */
+	void *new;
+
+	lock(p->fd, 0);
+	if (size == 0) {
+		alloc_free(p->pool, p->poolsize, ptr);
+		new = NULL;
+	} else if (ptr == NULL) {
+		/* FIXME: Alignment */
+		new = alloc_get(p->pool, p->poolsize, size, 16);
+	} else {
+		if (size <= alloc_size(p->pool, p->poolsize, ptr))
+			new = ptr;
+		else {
+			new = alloc_get(p->pool, p->poolsize, size, 16);
+			if (new) {
+				memcpy(new, ptr,
+				       alloc_size(p->pool, p->poolsize, ptr));
+				alloc_free(p->pool, p->poolsize, ptr);
+			}
+		}
+	}
+	unlock(p->fd, 0);
+	return new;
+}
+
+/* We add 16MB to size.  This compensates for address randomization. */
+#define PADDING (16 * 1024 * 1024)
+
+/* Create a new sharable pool. */
+struct at_pool *at_pool(unsigned long size)
+{
+	int fd;
+	struct at_pool *p;
+	FILE *f;
+
+	/* FIXME: How much should we actually add for overhead?. */
+	size += 32 * getpagesize();
+
+	/* Round up to whole pages. */
+	size = (size + getpagesize()-1) & ~(getpagesize()-1);
+
+	f = tmpfile();
+	if (!f)
+		return NULL;
+
+	fd = dup(fileno(f));
+	fclose_noerr(f);
+
+	if (fd < 0)
+		return NULL;
+
+	if (ftruncate(fd, size + PADDING) != 0)
+		goto fail_close;
+
+	p = talloc(NULL, struct at_pool);
+	if (!p)
+		goto fail_close;
+
+	/* First map gets a nice big area. */
+	p->pool = mmap(NULL, size+PADDING, PROT_READ|PROT_WRITE, MAP_SHARED, fd,
+		       0);
+	if (p->pool == MAP_FAILED)
+		goto fail_free;
+
+	/* Then we remap into the middle of it. */
+	munmap(p->pool, size+PADDING);
+	p->pool = mmap(p->pool + PADDING/2, size, PROT_READ|PROT_WRITE,
+		       MAP_SHARED, fd, 0);
+	if (p->pool == MAP_FAILED)
+		goto fail_free;
+
+	/* FIXME: Destructor? */
+	p->fd = fd;
+	p->poolsize = size;
+	p->parent_rfd = p->parent_wfd = -1;
+	alloc_init(p->pool, p->poolsize);
+
+	p->ctx = talloc_add_external(p, at_realloc);
+	if (!p->ctx)
+		goto fail_unmap;
+
+	return p;
+
+fail_unmap:
+	munmap(p->pool, size);
+fail_free:
+	talloc_free(p);
+fail_close:
+	close_noerr(fd);
+	return NULL;
+}
+
+/* Talloc off this to allocate from within the pool. */
+const void *at_pool_ctx(struct at_pool *atp)
+{
+	return atp->ctx;
+}
+
+static int cant_destroy_self(struct athread *at)
+{
+	/* Perhaps this means we want to detach, but it doesn't really
+	 * make sense. */
+	abort();
+	return 0;
+}
+
+static int destroy_at(struct athread *at)
+{
+	/* If it is already a zombie, this is harmless. */
+	kill(at->pid, SIGTERM);
+
+	close(at->rfd);
+	close(at->wfd);
+
+	/* FIXME: Should we do SIGKILL if process doesn't exit soon? */
+	if (waitpid(at->pid, NULL, 0) != at->pid)
+		err(1, "Waiting for athread %p (pid %u)", at, at->pid);
+
+	return 0;
+}
+
+/* Sets up thread and forks it.  NULL on error. */
+static struct athread *fork_thread(struct at_pool *pool)
+{
+	int p2c[2], c2p[2];
+	struct athread *at;
+
+	/* You can't already be a child of this pool. */
+	if (pool->parent_rfd != -1)
+		errx(1, "Can't create antithread on this pool: we're one");
+
+	/* We don't want this allocated *in* the pool. */
+	at = talloc_steal(pool, talloc(NULL, struct athread));
+
+	if (pipe(p2c) != 0)
+		goto free;
+
+	if (pipe(c2p) != 0)
+		goto close_p2c;
+
+	at->pid = fork();
+	if (at->pid == -1)
+		goto close_c2p;
+
+	if (at->pid == 0) {
+		/* Child */
+		close(c2p[0]);
+		close(p2c[1]);
+		pool->parent_rfd = p2c[0];
+		pool->parent_wfd = c2p[1];
+		talloc_set_destructor(at, cant_destroy_self);
+	} else {
+		/* Parent */
+		close(c2p[1]);
+		close(p2c[0]);
+		at->rfd = c2p[0];
+		at->wfd = p2c[1];
+		talloc_set_destructor(at, destroy_at);
+	}
+
+	return at;
+close_c2p:
+	close_noerr(c2p[0]);
+	close_noerr(c2p[1]);
+close_p2c:
+	close_noerr(p2c[0]);
+	close_noerr(p2c[1]);
+free:
+	talloc_free(at);
+	return NULL;
+}
+
+/* Creating an antithread via fork() */
+struct athread *_at_run(struct at_pool *pool,
+			void *(*fn)(struct at_pool *, void *),
+			void *obj)
+{
+	struct athread *at;
+
+	at = fork_thread(pool);
+	if (!at)
+		return NULL;
+
+	if (at->pid == 0) {
+		/* Child */
+		at_tell_parent(pool, fn(pool, obj));
+		exit(0);
+	}
+	/* Parent */
+	return at;
+}
+
+static unsigned int num_args(char *const argv[])
+{
+	unsigned int i;
+
+	for (i = 0; argv[i]; i++);
+	return i;
+}
+
+/* Fork and execvp, with added arguments for child to grab. */
+struct athread *at_spawn(struct at_pool *pool, void *arg, char *cmdline[])
+{
+	struct athread *at;
+	int err;
+
+	at = fork_thread(pool);
+	if (!at)
+		return NULL;
+
+	if (at->pid == 0) {
+		/* child */
+		char *argv[num_args(cmdline) + 2];
+		argv[0] = cmdline[0];
+		argv[1] = talloc_asprintf(NULL, "AT:%p/%lu/%i/%i/%i/%p",
+					  pool->pool, pool->poolsize,
+					  pool->fd, pool->parent_rfd,
+					  pool->parent_wfd, arg);
+		/* Copy including NULL terminator. */
+		memcpy(&argv[2], &cmdline[1], num_args(cmdline)*sizeof(char *));
+		execvp(argv[0], argv);
+
+		err = errno;
+		write(pool->parent_wfd, &err, sizeof(err));
+		exit(1);
+	}
+
+	/* Child should always write an error code (or 0). */
+	if (read(at->rfd, &err, sizeof(err)) != sizeof(err)) {
+		errno = ECHILD;
+		talloc_free(at);
+		return NULL;
+	}
+	if (err != 0) {
+		errno = err;
+		talloc_free(at);
+		return NULL;
+	}
+	return at;
+}
+
+/* The fd to poll on */
+int at_fd(struct athread *at)
+{
+	return at->rfd;
+}
+
+/* What's the antithread saying?  Blocks if fd not ready. */
+void *at_read(struct athread *at)
+{
+	void *ret;
+
+	switch (read(at->rfd, &ret, sizeof(ret))) {
+	case -1:
+		err(1, "Reading from athread %p (pid %u)", at, at->pid);
+	case 0:
+		/* Thread died. */
+		return NULL;
+	case sizeof(ret):
+		return ret;
+	default:
+		/* Should never happen. */
+		err(1, "Short read from athread %p (pid %u)", at, at->pid);
+	}
+}
+
+/* Say something to a child. */
+void at_tell(struct athread *at, const void *status)
+{
+	if (write(at->wfd, &status, sizeof(status)) != sizeof(status))
+		err(1, "Failure writing to athread %p (pid %u)", at, at->pid);
+}
+
+/* For child to grab arguments from command line (removes them) */
+struct at_pool *at_get_pool(int *argc, char *argv[], void **arg)
+{
+	struct at_pool *p = talloc(NULL, struct at_pool);
+	void *map;
+	int err;
+
+	if (!argv[1]) {
+		errno = EINVAL;
+		goto fail;
+	}
+
+	/* If they don't care, use dummy value. */
+	if (arg == NULL)
+		arg = &map;
+
+	if (sscanf(argv[1], "AT:%p/%lu/%i/%i/%i/%p", 
+		   &p->pool, &p->poolsize, &p->fd,
+		   &p->parent_rfd, &p->parent_wfd, arg) != 6) {
+		errno = EINVAL;
+		goto fail;
+	}
+
+	/* FIXME: To try to adjust for address space randomization, we
+	 * could re-exec a few times. */
+	map = mmap(p->pool, p->poolsize, PROT_READ|PROT_WRITE, MAP_SHARED,
+		   p->fd, 0);
+	if (map != p->pool) {
+		fprintf(stderr, "Mapping %lu bytes @%p gave %p\n",
+			p->poolsize, p->pool, map);
+		errno = ENOMEM;
+		goto fail;
+	}
+
+	p->ctx = talloc_add_external(p, at_realloc);
+	if (!p->ctx)
+		goto fail;
+
+	/* Tell parent we're good. */
+	err = 0;
+	if (write(p->parent_wfd, &err, sizeof(err)) != sizeof(err)) {
+		errno = EBADF;
+		goto fail;
+	}
+
+	/* Delete AT arg. */
+	memmove(&argv[1], &argv[2], --(*argc));
+	return p;
+
+fail:
+	/* FIXME: cleanup properly. */
+	talloc_free(p);
+	return NULL;
+}
+
+/* Say something to our parent (async). */
+void at_tell_parent(struct at_pool *pool, const void *status)
+{
+	if (pool->parent_wfd == -1)
+		errx(1, "This process is not an antithread of this pool");
+
+	if (write(pool->parent_wfd, &status, sizeof(status)) != sizeof(status))
+		err(1, "Failure writing to parent");
+}
+
+/* What's the parent saying?  Blocks if fd not ready. */
+void *at_read_parent(struct at_pool *pool)
+{
+	void *ret;
+
+	if (pool->parent_rfd == -1)
+		errx(1, "This process is not an antithread of this pool");
+
+	switch (read(pool->parent_rfd, &ret, sizeof(ret))) {
+	case -1:
+		err(1, "Reading from parent");
+	case 0:
+		/* Parent died. */
+		return NULL;
+	case sizeof(ret):
+		return ret;
+	default:
+		/* Should never happen. */
+		err(1, "Short read from parent");
+	}
+}
+
+/* The fd to poll on */
+int at_parent_fd(struct at_pool *pool)
+{
+	if (pool->parent_rfd == -1)
+		errx(1, "This process is not an antithread of this pool");
+
+	return pool->parent_rfd;
+}
+
+/* FIXME: Futexme. */
+void at_lock(void *obj)
+{
+	struct at_pool *p = talloc_find_parent_bytype(obj, struct at_pool);
+#if 0
+	unsigned int *l;
+
+	/* This isn't required yet, but ensures it's a talloc ptr */
+	l = talloc_lock_ptr(obj);
+#endif
+
+	lock(p->fd, (char *)obj - (char *)p->pool);
+
+#if 0
+	if (*l)
+		errx(1, "Object %p was already locked (something died?)", obj);
+	*l = 1;
+#endif
+}
+
+void at_unlock(void *obj)
+{
+	struct at_pool *p = talloc_find_parent_bytype(obj, struct at_pool);
+#if 0
+	unsigned int *l;
+
+	l = talloc_lock_ptr(obj);
+	if (!*l)
+		errx(1, "Object %p was already unlocked", obj);
+	*l = 0;
+#endif
+	unlock(p->fd, (char *)obj - (char *)p->pool);
+}
+
+void at_lock_all(struct at_pool *p)
+{
+	lock(p->fd, 0);
+}
+	
+void at_unlock_all(struct at_pool *p)
+{
+	unlock(p->fd, 0);
+}

+ 60 - 0
ccan/antithread/antithread.h

@@ -0,0 +1,60 @@
+#ifndef ANTITHREAD_H
+#define ANTITHREAD_H
+#include <ccan/typesafe_cb/typesafe_cb.h>
+
+struct at_pool;
+struct athread;
+
+/* Operations for the parent. */
+
+/* Create a new sharable pool. */
+struct at_pool *at_pool(unsigned long size);
+
+/* Talloc off this to allocate from within the pool. */
+const void *at_pool_ctx(struct at_pool *atp);
+
+/* Creating an antithread via fork().  Returned athread is child of pool. */
+#define at_run(pool, fn, arg)						\
+	_at_run(pool,							\
+		typesafe_cb_preargs(void *, (fn), (arg), struct at_pool *), \
+		(arg))
+
+/* Fork and execvp, with added arguments for child to grab.
+ * Returned athread is child of pool. */
+struct athread *at_spawn(struct at_pool *pool, void *arg, char *cmdline[]);
+
+/* The fd to poll on */
+int at_fd(struct athread *at);
+
+/* What's the antithread saying?  Blocks if fd not ready. */
+void *at_read(struct athread *at);
+
+/* Say something to a child (async). */
+void at_tell(struct athread *at, const void *status);
+
+/* Operations for the children */
+/* For child to grab arguments from command line (removes them) */
+struct at_pool *at_get_pool(int *argc, char *argv[], void **arg);
+
+/* Say something to our parent (async). */
+void at_tell_parent(struct at_pool *pool, const void *status);
+
+/* What's the parent saying?  Blocks if fd not ready. */
+void *at_read_parent(struct at_pool *pool);
+
+/* The fd to poll on */
+int at_parent_fd(struct at_pool *pool);
+
+/* Locking: any talloc pointer. */
+void at_lock(void *obj);
+void at_unlock(void *obj);
+
+void at_lock_all(struct at_pool *pool);
+void at_unlock_all(struct at_pool *pool);
+
+/* Internal function */
+struct athread *_at_run(struct at_pool *pool,
+			void *(*fn)(struct at_pool *, void *arg),
+			void *arg);
+
+#endif /* ANTITHREAD_H */

+ 12 - 0
ccan/antithread/examples/Makefile

@@ -0,0 +1,12 @@
+CFLAGS=-g -Wall -Wstrict-prototypes -Wold-style-definition -Wmissing-prototypes -Wmissing-declarations -I../../..
+
+all: find_md5 md5_worker dns_lookup
+
+find_md5: md5_server.c ../../../libccan.a
+	$(CC) $(CFLAGS) -o $@ $^ 
+
+md5_worker: md5_worker.c ../../../libccan.a
+	$(CC) $(CFLAGS) -o $@ $^ 
+
+dns_lookup: dns_lookup.c ../../../libccan.a
+	$(CC) $(CFLAGS) -o $@ $^ 

+ 139 - 0
ccan/antithread/examples/dns_lookup.c

@@ -0,0 +1,139 @@
+/* Async DNS lookup.  Shows passing complex data through pool. */
+#include "ccan/antithread/antithread.h"
+#include "ccan/string/string.h"
+#include "ccan/talloc/talloc.h"
+#include "md5_finder.h"
+#include <err.h>
+#include <sys/select.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <netdb.h>
+
+struct lookup_answer {
+	bool ok;
+	union {
+		struct hostent hent;
+		int herrno; /* If !ok */
+	};
+};
+
+/* Including NULL terminator. */
+static inline unsigned count_entries(char **entries)
+{
+	unsigned int i;
+
+	for (i = 0; entries[i]; i++);
+	return i+1;
+}
+
+/* Copy as one nice tallocated object.  Since ans is in the pool, it
+ * all gets put in the pool. */
+static void copy_answer(struct lookup_answer *ans, const struct hostent *host)
+{
+	unsigned int i;
+
+	ans->hent.h_name = talloc_strdup(ans, host->h_name);
+	ans->hent.h_aliases = talloc_array(ans, char *,
+					   count_entries(host->h_aliases));
+	for (i = 0; host->h_aliases[i]; i++)
+		ans->hent.h_aliases[i] = talloc_strdup(ans->hent.h_aliases,
+						       host->h_aliases[i]);
+	ans->hent.h_aliases[i] = NULL;
+	ans->hent.h_addrtype = host->h_addrtype;
+	ans->hent.h_length = host->h_length;
+	ans->hent.h_addr_list = talloc_array(ans, char *,
+					     count_entries(host->h_addr_list));
+	for (i = 0; host->h_addr_list[i]; i++)
+		ans->hent.h_addr_list[i] = talloc_memdup(ans->hent.h_addr_list,
+							 host->h_addr_list[i],
+							 ans->hent.h_length);
+}
+
+static void *lookup_dns(struct at_pool *atp, char *name)
+{
+	struct lookup_answer *ans;
+	struct hostent *host;
+
+	host = gethostbyname(name);
+
+	ans = talloc(at_pool_ctx(atp), struct lookup_answer);
+	if (!host) {
+		ans->ok = false;
+		ans->herrno = h_errno;
+	} else {
+		ans->ok = true;
+		copy_answer(ans, host);
+	}
+
+	return ans;
+}
+
+static void report_answer(const char *name, const struct lookup_answer *ans)
+{
+	unsigned int i;
+
+	if (!ans->ok) {
+		printf("%s: %s\n", name, hstrerror(ans->herrno));
+		return;
+	}
+
+	printf("%s: ", name);
+	for (i = 0; ans->hent.h_aliases[i]; i++)
+		printf("%c%s", i == 0 ? '[' : ' ', ans->hent.h_aliases[i]);
+	if (i)
+		printf("]");
+	printf("%#x", ans->hent.h_addrtype);
+	for (i = 0; ans->hent.h_addr_list[i]; i++) {
+		unsigned int j;
+		printf(" ");
+		for (j = 0; j < ans->hent.h_length; j++)
+			printf("%02x", ans->hent.h_addr_list[i][j]);
+	}
+	printf("\n");
+}
+
+int main(int argc, char *argv[])
+{
+	struct at_pool *atp;
+	struct athread **at;
+	unsigned int i;
+
+	if (argc < 2)
+		errx(1, "Usage: dns_lookup [--sync] name...");
+
+	/* Give it plenty of room. */
+	atp = at_pool(argc * 1024*1024);
+	if (!atp)
+		err(1, "Can't create pool");
+
+	/* Free pool on exit. */
+	talloc_steal(talloc_autofree_context(), atp);
+
+	if (streq(argv[1], "--sync")) {
+		for (i = 2; i < argc; i++) {
+			struct lookup_answer *ans = lookup_dns(atp, argv[i]);
+			report_answer(argv[i], ans);
+			talloc_free(ans);
+		}
+		return 0;
+	}			
+
+	at = talloc_array(atp, struct athread *, argc);
+
+	for (i = 1; i < argc; i++) {
+		at[i] = at_run(atp, lookup_dns, argv[i]);
+		if (!at[i])
+			err(1, "Can't spawn child");
+	}
+
+	for (i = 1; i < argc; i++) {
+		struct lookup_answer *ans = at_read(at[i]);
+		if (!ans)
+			warn("Child died on '%s'", argv[i]);
+		else {
+			report_answer(argv[i], ans);
+			talloc_free(ans);
+		}
+	}
+	return 0;
+}		

+ 24 - 0
ccan/antithread/examples/md5_finder.h

@@ -0,0 +1,24 @@
+#ifndef MD5_FINDER_H
+#define MD5_FINDER_H
+#include <stdint.h>
+#include <stdbool.h>
+
+#define MD5_HASH_WORDS		4
+
+#define u32 uint32_t
+#define u64 uint64_t
+#define u8 uint8_t
+
+struct md5_search {
+	u32 mask[MD5_HASH_WORDS];
+	u32 md5[MD5_HASH_WORDS];
+	bool success;
+	unsigned int num_tries;
+	unsigned int num_bytes;
+	u8 *pattern;
+};
+
+/* Child writes this value initially to say "ready". */
+#define INITIAL_POINTER ((void *)1)
+
+#endif /* MD5_FINDER_H */

+ 129 - 0
ccan/antithread/examples/md5_server.c

@@ -0,0 +1,129 @@
+/* Tries to find data with a given MD5 (up to N bits). */
+#include "ccan/antithread/antithread.h"
+#include "ccan/string/string.h"
+#include "ccan/talloc/talloc.h"
+#include "md5_finder.h"
+#include <err.h>
+#include <sys/select.h>
+#include <stdio.h>
+#include <stdlib.h>
+
+static void usage(void)
+{
+	errx(1, "Usage: md5calc <hexstring> <numcpus>");
+}
+
+static void parse_hexstring(const char *string, struct md5_search *md5s)
+{
+	unsigned int i;
+
+	if (strstarts(string, "0x") || strstarts(string, "0X"))
+		string += 2;
+
+	for (i = 0; i < MD5_HASH_WORDS; i++) {
+		unsigned int n[4], j;
+		int ret;
+
+		ret = sscanf(string, "%02x%02x%02x%02x",
+			     &n[0], &n[1], &n[2], &n[3]);
+		string += 8;
+
+		if (ret == EOF)
+			break;
+		for (j = 0; j < ret; j++) {
+			md5s->mask[MD5_HASH_WORDS-i-1] |= (0xFF << (8*j));
+			md5s->md5[MD5_HASH_WORDS-i-1] |= (n[j] << (8*j));
+		}
+
+		if (ret != 4)
+			break;
+	}
+}
+
+static void init_pattern(u8 *pattern, unsigned int num_bytes, u64 total)
+{
+	unsigned int i;
+
+	for (i = 0; i < num_bytes; i++) {
+		pattern[i] = 'A' + (total % 26);
+		total /= 26;
+	}
+}
+
+#define PATTERN_BYTES 32
+
+int main(int argc, char *argv[])
+{
+	struct at_pool *atp;
+	struct md5_search md5s;
+	unsigned int i, maxfd, numathreads = argc == 3 ? atoi(argv[2]) : 0;
+	u64 total = 0;
+	fd_set fds;
+	char *cmdline[] = { "./md5_worker", NULL };
+	struct athread *at[numathreads];
+
+	if (numathreads == 0)
+		usage();
+
+	memset(&md5s, 0, sizeof(md5s));
+	parse_hexstring(argv[1], &md5s);
+
+	md5s.num_tries = 1024*1024;
+	md5s.num_bytes = PATTERN_BYTES;
+
+	/* *2 to allow for allocation inefficiency. */
+	atp = at_pool((sizeof(md5s) + PATTERN_BYTES) * (numathreads + 1) * 2);
+	if (!atp)
+		err(1, "Can't create pool");
+
+	/* Free pool on exit. */
+//	talloc_steal(talloc_autofree_context(), atp);
+
+	FD_ZERO(&fds);
+	maxfd = 0;
+	for (i = 0; i < numathreads; i++) {
+		at[i] = at_spawn(atp, NULL, cmdline);
+		if (!at[i])
+			err(1, "Can't spawn child");
+		FD_SET(at_fd(at[i]), &fds);
+		if (at_fd(at[i]) > maxfd)
+			maxfd = at_fd(at[i]);
+	}
+
+	for (;;) {
+		struct md5_search *m, *res;
+		fd_set in = fds;
+
+		/* Shouldn't fail! */
+		m = talloc(at_pool_ctx(atp), struct md5_search);
+		*m = md5s;
+		md5s.num_tries++;
+		m->pattern = talloc_array(m, u8, m->num_bytes);
+		init_pattern(m->pattern, m->num_bytes, total);
+
+		select(maxfd+1, &in, NULL, NULL, NULL);
+		for (i = 0; i < numathreads; i++)
+			if (FD_ISSET(at_fd(at[i]), &in))
+				break;
+		if (i == numathreads)
+			errx(1, "Select returned, but noone ready?");
+
+		res = at_read(at[i]);
+		if (res == NULL) {
+			warn("Thread died?");
+			FD_CLR(at_fd(at[i]), &fds);
+			continue;
+		}
+		if (res != INITIAL_POINTER) {
+			if (res->success) {
+				printf("Success! '%.*s'\n",
+				       res->num_bytes, (char *)res->pattern);
+				exit(0);
+			}
+			m->num_tries++;
+			talloc_free(res);
+		}
+		at_tell(at[i], m);
+		total += m->num_tries;
+	}
+}		

+ 274 - 0
ccan/antithread/examples/md5_worker.c

@@ -0,0 +1,274 @@
+/* Worker thread: tries to find data with given MD5. */
+#include "ccan/antithread/antithread.h"
+#include "md5_finder.h"
+#include <netinet/in.h>
+#include <err.h>
+#include <string.h>
+
+/* 
+ * Cryptographic API.
+ *
+ * MD5 Message Digest Algorithm (RFC1321).
+ *
+ * Derived from cryptoapi implementation, originally based on the
+ * public domain implementation written by Colin Plumb in 1993.
+ *
+ * Copyright (c) Cryptoapi developers.
+ * Copyright (c) 2002 James Morris <jmorris@intercode.com.au>
+ * 
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the Free
+ * Software Foundation; either version 2 of the License, or (at your option) 
+ * any later version.
+ */
+#define MD5_DIGEST_SIZE		16
+#define MD5_HMAC_BLOCK_SIZE	64
+#define MD5_BLOCK_WORDS		16
+
+#define F1(x, y, z)	(z ^ (x & (y ^ z)))
+#define F2(x, y, z)	F1(z, x, y)
+#define F3(x, y, z)	(x ^ y ^ z)
+#define F4(x, y, z)	(y ^ (x | ~z))
+
+#define MD5STEP(f, w, x, y, z, in, s) \
+	(w += f(x, y, z) + in, w = (w<<s | w>>(32-s)) + x)
+
+struct md5_ctx {
+	u32 hash[MD5_HASH_WORDS];
+	u32 block[MD5_BLOCK_WORDS];
+	u64 byte_count;
+};
+
+static void md5_transform(u32 *hash, u32 const *in)
+{
+	u32 a, b, c, d;
+
+	a = hash[0];
+	b = hash[1];
+	c = hash[2];
+	d = hash[3];
+
+	MD5STEP(F1, a, b, c, d, in[0] + 0xd76aa478, 7);
+	MD5STEP(F1, d, a, b, c, in[1] + 0xe8c7b756, 12);
+	MD5STEP(F1, c, d, a, b, in[2] + 0x242070db, 17);
+	MD5STEP(F1, b, c, d, a, in[3] + 0xc1bdceee, 22);
+	MD5STEP(F1, a, b, c, d, in[4] + 0xf57c0faf, 7);
+	MD5STEP(F1, d, a, b, c, in[5] + 0x4787c62a, 12);
+	MD5STEP(F1, c, d, a, b, in[6] + 0xa8304613, 17);
+	MD5STEP(F1, b, c, d, a, in[7] + 0xfd469501, 22);
+	MD5STEP(F1, a, b, c, d, in[8] + 0x698098d8, 7);
+	MD5STEP(F1, d, a, b, c, in[9] + 0x8b44f7af, 12);
+	MD5STEP(F1, c, d, a, b, in[10] + 0xffff5bb1, 17);
+	MD5STEP(F1, b, c, d, a, in[11] + 0x895cd7be, 22);
+	MD5STEP(F1, a, b, c, d, in[12] + 0x6b901122, 7);
+	MD5STEP(F1, d, a, b, c, in[13] + 0xfd987193, 12);
+	MD5STEP(F1, c, d, a, b, in[14] + 0xa679438e, 17);
+	MD5STEP(F1, b, c, d, a, in[15] + 0x49b40821, 22);
+
+	MD5STEP(F2, a, b, c, d, in[1] + 0xf61e2562, 5);
+	MD5STEP(F2, d, a, b, c, in[6] + 0xc040b340, 9);
+	MD5STEP(F2, c, d, a, b, in[11] + 0x265e5a51, 14);
+	MD5STEP(F2, b, c, d, a, in[0] + 0xe9b6c7aa, 20);
+	MD5STEP(F2, a, b, c, d, in[5] + 0xd62f105d, 5);
+	MD5STEP(F2, d, a, b, c, in[10] + 0x02441453, 9);
+	MD5STEP(F2, c, d, a, b, in[15] + 0xd8a1e681, 14);
+	MD5STEP(F2, b, c, d, a, in[4] + 0xe7d3fbc8, 20);
+	MD5STEP(F2, a, b, c, d, in[9] + 0x21e1cde6, 5);
+	MD5STEP(F2, d, a, b, c, in[14] + 0xc33707d6, 9);
+	MD5STEP(F2, c, d, a, b, in[3] + 0xf4d50d87, 14);
+	MD5STEP(F2, b, c, d, a, in[8] + 0x455a14ed, 20);
+	MD5STEP(F2, a, b, c, d, in[13] + 0xa9e3e905, 5);
+	MD5STEP(F2, d, a, b, c, in[2] + 0xfcefa3f8, 9);
+	MD5STEP(F2, c, d, a, b, in[7] + 0x676f02d9, 14);
+	MD5STEP(F2, b, c, d, a, in[12] + 0x8d2a4c8a, 20);
+
+	MD5STEP(F3, a, b, c, d, in[5] + 0xfffa3942, 4);
+	MD5STEP(F3, d, a, b, c, in[8] + 0x8771f681, 11);
+	MD5STEP(F3, c, d, a, b, in[11] + 0x6d9d6122, 16);
+	MD5STEP(F3, b, c, d, a, in[14] + 0xfde5380c, 23);
+	MD5STEP(F3, a, b, c, d, in[1] + 0xa4beea44, 4);
+	MD5STEP(F3, d, a, b, c, in[4] + 0x4bdecfa9, 11);
+	MD5STEP(F3, c, d, a, b, in[7] + 0xf6bb4b60, 16);
+	MD5STEP(F3, b, c, d, a, in[10] + 0xbebfbc70, 23);
+	MD5STEP(F3, a, b, c, d, in[13] + 0x289b7ec6, 4);
+	MD5STEP(F3, d, a, b, c, in[0] + 0xeaa127fa, 11);
+	MD5STEP(F3, c, d, a, b, in[3] + 0xd4ef3085, 16);
+	MD5STEP(F3, b, c, d, a, in[6] + 0x04881d05, 23);
+	MD5STEP(F3, a, b, c, d, in[9] + 0xd9d4d039, 4);
+	MD5STEP(F3, d, a, b, c, in[12] + 0xe6db99e5, 11);
+	MD5STEP(F3, c, d, a, b, in[15] + 0x1fa27cf8, 16);
+	MD5STEP(F3, b, c, d, a, in[2] + 0xc4ac5665, 23);
+
+	MD5STEP(F4, a, b, c, d, in[0] + 0xf4292244, 6);
+	MD5STEP(F4, d, a, b, c, in[7] + 0x432aff97, 10);
+	MD5STEP(F4, c, d, a, b, in[14] + 0xab9423a7, 15);
+	MD5STEP(F4, b, c, d, a, in[5] + 0xfc93a039, 21);
+	MD5STEP(F4, a, b, c, d, in[12] + 0x655b59c3, 6);
+	MD5STEP(F4, d, a, b, c, in[3] + 0x8f0ccc92, 10);
+	MD5STEP(F4, c, d, a, b, in[10] + 0xffeff47d, 15);
+	MD5STEP(F4, b, c, d, a, in[1] + 0x85845dd1, 21);
+	MD5STEP(F4, a, b, c, d, in[8] + 0x6fa87e4f, 6);
+	MD5STEP(F4, d, a, b, c, in[15] + 0xfe2ce6e0, 10);
+	MD5STEP(F4, c, d, a, b, in[6] + 0xa3014314, 15);
+	MD5STEP(F4, b, c, d, a, in[13] + 0x4e0811a1, 21);
+	MD5STEP(F4, a, b, c, d, in[4] + 0xf7537e82, 6);
+	MD5STEP(F4, d, a, b, c, in[11] + 0xbd3af235, 10);
+	MD5STEP(F4, c, d, a, b, in[2] + 0x2ad7d2bb, 15);
+	MD5STEP(F4, b, c, d, a, in[9] + 0xeb86d391, 21);
+
+	hash[0] += a;
+	hash[1] += b;
+	hash[2] += c;
+	hash[3] += d;
+}
+
+/* XXX: this stuff can be optimized */
+static inline void le32_to_cpu_array(u32 *buf, unsigned int words)
+{
+	while (words--) {
+		*buf = ntohl(*buf);
+		buf++;
+	}
+}
+
+static inline void cpu_to_le32_array(u32 *buf, unsigned int words)
+{
+	while (words--) {
+		*buf = htonl(*buf);
+		buf++;
+	}
+}
+
+static inline void md5_transform_helper(struct md5_ctx *ctx)
+{
+	le32_to_cpu_array(ctx->block, sizeof(ctx->block) / sizeof(u32));
+	md5_transform(ctx->hash, ctx->block);
+}
+
+static void md5_init(struct md5_ctx *mctx)
+{
+	mctx->hash[0] = 0x67452301;
+	mctx->hash[1] = 0xefcdab89;
+	mctx->hash[2] = 0x98badcfe;
+	mctx->hash[3] = 0x10325476;
+	mctx->byte_count = 0;
+}
+
+static void md5_update(struct md5_ctx *mctx, const u8 *data, unsigned int len)
+{
+	const u32 avail = sizeof(mctx->block) - (mctx->byte_count & 0x3f);
+
+	mctx->byte_count += len;
+
+	if (avail > len) {
+		memcpy((char *)mctx->block + (sizeof(mctx->block) - avail),
+		       data, len);
+		return;
+	}
+
+	memcpy((char *)mctx->block + (sizeof(mctx->block) - avail),
+	       data, avail);
+
+	md5_transform_helper(mctx);
+	data += avail;
+	len -= avail;
+
+	while (len >= sizeof(mctx->block)) {
+		memcpy(mctx->block, data, sizeof(mctx->block));
+		md5_transform_helper(mctx);
+		data += sizeof(mctx->block);
+		len -= sizeof(mctx->block);
+	}
+
+	memcpy(mctx->block, data, len);
+}
+
+static void md5_final(struct md5_ctx *mctx)
+{
+	const unsigned int offset = mctx->byte_count & 0x3f;
+	char *p = (char *)mctx->block + offset;
+	int padding = 56 - (offset + 1);
+
+	*p++ = 0x80;
+	if (padding < 0) {
+		memset(p, 0x00, padding + sizeof (u64));
+		md5_transform_helper(mctx);
+		p = (char *)mctx->block;
+		padding = 56;
+	}
+
+	memset(p, 0, padding);
+	mctx->block[14] = mctx->byte_count << 3;
+	mctx->block[15] = mctx->byte_count >> 29;
+	le32_to_cpu_array(mctx->block, (sizeof(mctx->block) -
+	                  sizeof(u64)) / sizeof(u32));
+	md5_transform(mctx->hash, mctx->block);
+	cpu_to_le32_array(mctx->hash, sizeof(mctx->hash) / sizeof(u32));
+}
+
+static bool bits_match(const u32 a[MD5_HASH_WORDS],
+		       const u32 b[MD5_HASH_WORDS],
+		       const u32 mask[MD5_HASH_WORDS])
+{
+	unsigned int i;
+
+	for (i = 0; i < MD5_HASH_WORDS; i++) {
+		if ((a[i] & mask[i]) != (b[i] & mask[i]))
+			return false;
+	}
+
+#if 0
+	printf("mask = %08x%08x%08x%08x\n"
+	       "a = %08x%08x%08x%08x\n"
+	       "b = %08x%08x%08x%08x\n",
+	       mask[0], mask[1], mask[2], mask[3],
+	       a[0], a[1], a[2], a[3],
+	       b[0], b[1], b[2], b[3]);
+#endif
+
+	return true;
+}
+
+static void inc_pattern(u8 *pattern, unsigned int len)
+{
+	unsigned int i;
+
+	for (i = 0; i < len; i++) {
+		pattern[i]++;
+		if (pattern[i] <= 'Z')
+			break;
+		pattern[i] = 'A';
+	}
+}
+
+int main(int argc, char *argv[])
+{
+	struct at_pool *atp = at_get_pool(&argc, argv, NULL);
+	struct md5_search *md5s;
+
+	if (!atp)
+		err(1, "Not a worker thread?");
+
+	/* Tell parent we're ready. */
+	at_tell_parent(atp, INITIAL_POINTER);
+	while ((md5s = at_read_parent(atp)) != NULL) {
+		unsigned int i;
+		md5s->success = false;
+
+		for (i = 0; i < md5s->num_tries; i++) {
+			struct md5_ctx ctx;	
+
+			md5_init(&ctx);
+			md5_update(&ctx, md5s->pattern, md5s->num_bytes);
+			md5_final(&ctx);
+
+			if (bits_match(ctx.hash, md5s->md5, md5s->mask)) {
+				md5s->success = true;
+				break;
+			}
+			inc_pattern(md5s->pattern, md5s->num_bytes);
+		}
+		at_tell_parent(atp, md5s);
+	}
+	return 0;
+}

+ 61 - 0
ccan/antithread/test/run-lock.c

@@ -0,0 +1,61 @@
+#include "antithread/antithread.c"
+#include <assert.h>
+#include <unistd.h>
+#include "tap/tap.h"
+
+#define NUM_RUNS 100
+
+static void *test(struct at_pool *atp, int *val)
+{
+	unsigned int i;
+
+	if (at_read_parent(atp) != test) {
+		diag("Woah, at_read said bad");
+		return NULL;
+	}
+
+	/* We increment val, then sleep a little. */
+	for (i = 0; i < NUM_RUNS; i++) {
+		at_lock(val);
+		(*(volatile int *)val)++;
+		usleep(i * 100);
+		at_unlock(val);
+		usleep(i * 100);
+	}
+
+	return val;
+};
+
+int main(int argc, char *argv[])
+{
+	struct at_pool *atp;
+	struct athread *at;
+	int *val, i;
+
+	plan_tests(3);
+
+	atp = at_pool(1*1024*1024);
+	assert(atp);
+	val = talloc_zero(at_pool_ctx(atp), int);
+	at = at_run(atp, test, val);
+	assert(at);
+
+	ok1(*val == 0);
+
+	at_tell(at, test);
+
+	/* We increment val, then sleep a little. */
+	for (i = 0; i < NUM_RUNS; i++) {
+		at_lock(val);
+		(*(volatile int *)val)++;
+		usleep(i * 100);
+		at_unlock(val);
+		usleep(i * 100);
+	}
+	ok1(at_read(at) == val);
+	talloc_free(at);
+
+	ok1(*val == NUM_RUNS*2);
+
+	return exit_status();
+}

+ 35 - 0
ccan/antithread/test/run-simple.c

@@ -0,0 +1,35 @@
+#include "antithread/antithread.c"
+#include <assert.h>
+#include "tap/tap.h"
+
+static void *test(struct at_pool *atp, int *pid)
+{
+	*pid = getpid();
+	return NULL;
+};
+
+int main(int argc, char *argv[])
+{
+	struct at_pool *atp;
+	struct athread *at;
+	int *pid;
+
+	plan_tests(4);
+
+	atp = at_pool(1*1024*1024);
+	assert(atp);
+	pid = talloc(at_pool_ctx(atp), int);
+	assert(pid);
+	ok1((char *)pid >= (char *)atp->pool
+	    && (char *)pid < (char *)atp->pool + atp->poolsize);
+	at = at_run(atp, test, pid);
+	assert(at);
+
+	ok1(at_read(at) == NULL);
+	talloc_free(at);
+
+	ok1(*pid != 0);
+	ok1(*pid != getpid());
+
+	return exit_status();
+}

+ 34 - 0
ccan/antithread/test/run-spawn-NULL.c

@@ -0,0 +1,34 @@
+#include "antithread/antithread.c"
+#include <assert.h>
+#include "tap/tap.h"
+
+int main(int argc, char *argv[])
+{
+	struct at_pool *atp;
+	struct athread *at;
+	int err;
+
+	atp = at_get_pool(&argc, argv, NULL);
+	if (atp) {
+		at_tell_parent(atp, (void *)1UL);
+		exit(0);
+	}
+	assert(!argv[1]);
+
+	err = errno;
+	plan_tests(3);
+	ok1(err == EINVAL);
+
+	atp = at_pool(1*1024*1024);
+	assert(atp);
+
+	/* This should work */
+	at = at_spawn(atp, NULL, argv);
+	ok1(at);
+
+	/* Should read back the magic pointer. */
+	ok1(at_read(at) == (void *)1);
+	talloc_free(at);
+
+	return exit_status();
+}

+ 48 - 0
ccan/antithread/test/run-spawn.c

@@ -0,0 +1,48 @@
+#include "antithread/antithread.c"
+#include <assert.h>
+#include "tap/tap.h"
+
+int main(int argc, char *argv[])
+{
+	struct at_pool *atp;
+	struct athread *at;
+	int err, *pid;
+	void *arg;
+	char *bad_args[] = { "/", NULL };
+
+	atp = at_get_pool(&argc, argv, &arg);
+	if (atp) {
+		*(int *)arg = getpid();
+		at_tell_parent(atp, arg);
+		exit(0);
+	}
+	assert(!argv[1]);
+
+	err = errno;
+	plan_tests(7);
+	ok1(err == EINVAL);
+
+	atp = at_pool(1*1024*1024);
+	assert(atp);
+	pid = talloc(at_pool_ctx(atp), int);
+	assert(pid);
+	ok1((char *)pid >= (char *)atp->pool
+	    && (char *)pid < (char *)atp->pool + atp->poolsize);
+
+	/* This is a failed spawn. */
+	at = at_spawn(atp, pid, bad_args);
+	ok1(at == NULL);
+
+	/* This should work */
+	at = at_spawn(atp, pid, argv);
+	ok1(at);
+
+	/* Should read back the pid pointer. */
+	ok1(at_read(at) == pid);
+	talloc_free(at);
+
+	ok1(*pid != 0);
+	ok1(*pid != getpid());
+
+	return exit_status();
+}

+ 30 - 0
ccan/antithread/test/run-tell.c

@@ -0,0 +1,30 @@
+#include "antithread/antithread.c"
+#include <assert.h>
+#include "tap/tap.h"
+
+static void *test(struct at_pool *atp, void *unused)
+{
+	char *p;
+	p = at_read_parent(atp);
+	at_tell_parent(atp, p + 1);
+	return NULL;
+};
+
+int main(int argc, char *argv[])
+{
+	struct at_pool *atp;
+	struct athread *at;
+
+	plan_tests(1);
+
+	atp = at_pool(1*1024*1024);
+	assert(atp);
+	at = at_run(atp, test, NULL);
+	assert(at);
+
+	at_tell(at, argv[0]);
+	ok1(at_read(at) == argv[0] + 1);
+	talloc_free(at);
+
+	return exit_status();
+}

+ 36 - 0
ccan/antithread/test/run-tell_parent.c

@@ -0,0 +1,36 @@
+#include "antithread/antithread.c"
+#include <assert.h>
+#include "tap/tap.h"
+
+static void *test(struct at_pool *atp, int *pid)
+{
+	*pid = getpid();
+	at_tell_parent(atp, test);
+	return NULL;
+};
+
+int main(int argc, char *argv[])
+{
+	struct at_pool *atp;
+	struct athread *at;
+	int *pid;
+
+	plan_tests(4);
+
+	atp = at_pool(1*1024*1024);
+	assert(atp);
+	pid = talloc(at_pool_ctx(atp), int);
+	assert(pid);
+	ok1((char *)pid >= (char *)atp->pool
+	    && (char *)pid < (char *)atp->pool + atp->poolsize);
+	at = at_run(atp, test, pid);
+	assert(at);
+
+	ok1(at_read(at) == test);
+	talloc_free(at);
+
+	ok1(*pid != 0);
+	ok1(*pid != getpid());
+
+	return exit_status();
+}