Browse Source

tdb2: handle chains of free tables

This adds chains of free tables: we choose one at random at the start and
we iterate through them when they are exhausted.

Currently there is no code to actually add a new free table, but we test
that we can handle it if we add one in future.
Rusty Russell 15 years ago
parent
commit
ef9dec6018

+ 24 - 12
ccan/tdb2/check.c

@@ -258,17 +258,17 @@ fail:
 
 
 static bool check_hash(struct tdb_context *tdb,
 static bool check_hash(struct tdb_context *tdb,
 		       tdb_off_t used[],
 		       tdb_off_t used[],
-		       size_t num_used)
+		       size_t num_used, size_t num_flists)
 {
 {
-	size_t num_found = 0;
+	/* Free lists also show up as used. */
+	size_t num_found = num_flists;
 
 
 	if (!check_hash_tree(tdb, offsetof(struct tdb_header, hashtable),
 	if (!check_hash_tree(tdb, offsetof(struct tdb_header, hashtable),
 			     TDB_TOPLEVEL_HASH_BITS-TDB_HASH_GROUP_BITS,
 			     TDB_TOPLEVEL_HASH_BITS-TDB_HASH_GROUP_BITS,
 			     0, 0, used, num_used, &num_found))
 			     0, 0, used, num_used, &num_found))
 		return false;
 		return false;
 
 
-	/* 1 is for the free list. */
-	if (num_found != num_used - 1) {
+	if (num_found != num_used) {
 		tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
 		tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
 			 "tdb_check: Not all entries are in hash\n");
 			 "tdb_check: Not all entries are in hash\n");
 		return false;
 		return false;
@@ -279,7 +279,7 @@ static bool check_hash(struct tdb_context *tdb,
 static bool check_free(struct tdb_context *tdb,
 static bool check_free(struct tdb_context *tdb,
 		       tdb_off_t off,
 		       tdb_off_t off,
 		       const struct tdb_free_record *frec,
 		       const struct tdb_free_record *frec,
-		       tdb_off_t prev, unsigned int bucket)
+		       tdb_off_t prev, tdb_off_t flist_off, unsigned int bucket)
 {
 {
 	if (frec_magic(frec) != TDB_FREE_MAGIC) {
 	if (frec_magic(frec) != TDB_FREE_MAGIC) {
 		tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
 		tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
@@ -287,6 +287,13 @@ static bool check_free(struct tdb_context *tdb,
 			 (long long)off, (long long)frec->magic_and_meta);
 			 (long long)off, (long long)frec->magic_and_meta);
 		return false;
 		return false;
 	}
 	}
+	if (frec_flist(frec) != flist_off) {
+		tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
+			 "tdb_check: offset %llu bad freelist 0x%llx\n",
+			 (long long)off, (long long)frec_flist(frec));
+		return false;
+	}
+
 	if (tdb->methods->oob(tdb, off
 	if (tdb->methods->oob(tdb, off
 			      + frec->data_len+sizeof(struct tdb_used_record),
 			      + frec->data_len+sizeof(struct tdb_used_record),
 			      false))
 			      false))
@@ -341,7 +348,7 @@ static bool check_free_list(struct tdb_context *tdb,
 				return false;
 				return false;
 			if (tdb_read_convert(tdb, off, &f, sizeof(f)))
 			if (tdb_read_convert(tdb, off, &f, sizeof(f)))
 				return false;
 				return false;
-			if (!check_free(tdb, off, &f, prev, i))
+			if (!check_free(tdb, off, &f, prev, flist_off, i))
 				return false;
 				return false;
 
 
 			/* FIXME: Check hash bits */
 			/* FIXME: Check hash bits */
@@ -438,8 +445,8 @@ int tdb_check(struct tdb_context *tdb,
 	      int (*check)(TDB_DATA key, TDB_DATA data, void *private_data),
 	      int (*check)(TDB_DATA key, TDB_DATA data, void *private_data),
 	      void *private_data)
 	      void *private_data)
 {
 {
-	tdb_off_t *free = NULL, *used = NULL;
-	size_t num_free = 0, num_used = 0, num_found = 0;
+	tdb_off_t *free = NULL, *used = NULL, flist;
+	size_t num_free = 0, num_used = 0, num_found = 0, num_flists = 0;
 
 
 	if (tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, false) != 0)
 	if (tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, false) != 0)
 		return -1;
 		return -1;
@@ -456,11 +463,16 @@ int tdb_check(struct tdb_context *tdb,
 	if (!check_linear(tdb, &used, &num_used, &free, &num_free))
 	if (!check_linear(tdb, &used, &num_used, &free, &num_free))
 		goto fail;
 		goto fail;
 
 
-	/* FIXME: Check key uniqueness? */
-	if (!check_hash(tdb, used, num_used))
-		goto fail;
+	for (flist = first_flist(tdb); flist; flist = next_flist(tdb, flist)) {
+		if (flist == TDB_OFF_ERR)
+			goto fail;
+		if (!check_free_list(tdb, flist, free, num_free, &num_found))
+			goto fail;
+		num_flists++;
+	}
 
 
-	if (!check_free_list(tdb, tdb->flist_off, free, num_free, &num_found))
+	/* FIXME: Check key uniqueness? */
+	if (!check_hash(tdb, used, num_used, num_flists))
 		goto fail;
 		goto fail;
 
 
 	if (num_found != num_free) {
 	if (num_found != num_free) {

+ 66 - 25
ccan/tdb2/free.c

@@ -49,12 +49,36 @@ unsigned int size_to_bucket(tdb_len_t data_len)
 	return bucket;
 	return bucket;
 }
 }
 
 
+tdb_off_t first_flist(struct tdb_context *tdb)
+{
+	return tdb_read_off(tdb, offsetof(struct tdb_header, free_list));
+}
+
+tdb_off_t next_flist(struct tdb_context *tdb, tdb_off_t flist)
+{
+	return tdb_read_off(tdb, flist + offsetof(struct tdb_freelist, next));
+}
+
 int tdb_flist_init(struct tdb_context *tdb)
 int tdb_flist_init(struct tdb_context *tdb)
 {
 {
-	tdb->flist_off = tdb_read_off(tdb,
-				      offsetof(struct tdb_header, free_list));
-	if (tdb->flist_off == TDB_OFF_ERR)
-		return -1;
+	/* Use reservoir sampling algorithm to select a free list at random. */
+	unsigned int rnd, max = 0;
+	tdb_off_t off;
+
+	tdb->flist_off = off = first_flist(tdb);
+
+	while (off) {
+		if (off == TDB_OFF_ERR)
+			return -1;
+
+		rnd = random();
+		if (rnd >= max) {
+			tdb->flist_off = off;
+			max = rnd;
+		}
+
+		off = next_flist(tdb, off);
+	}
 	return 0;
 	return 0;
 }
 }
 
 
@@ -66,10 +90,12 @@ tdb_off_t bucket_off(tdb_off_t flist_off, unsigned bucket)
 }
 }
 
 
 /* Returns free_buckets + 1, or list number to search. */
 /* Returns free_buckets + 1, or list number to search. */
-static tdb_off_t find_free_head(struct tdb_context *tdb, tdb_off_t bucket)
+static tdb_off_t find_free_head(struct tdb_context *tdb,
+				tdb_off_t flist_off,
+				tdb_off_t bucket)
 {
 {
 	/* Speculatively search for a non-zero bucket. */
 	/* Speculatively search for a non-zero bucket. */
-	return tdb_find_nonzero_off(tdb, bucket_off(tdb->flist_off, 0),
+	return tdb_find_nonzero_off(tdb, bucket_off(flist_off, 0),
 				    bucket, TDB_FREE_BUCKETS);
 				    bucket, TDB_FREE_BUCKETS);
 }
 }
 
 
@@ -168,7 +194,8 @@ int add_free_record(struct tdb_context *tdb,
 
 
 	assert(len_with_header >= sizeof(new));
 	assert(len_with_header >= sizeof(new));
 
 
-	new.magic_and_meta = TDB_FREE_MAGIC;
+	new.magic_and_meta = TDB_FREE_MAGIC << (64 - TDB_OFF_UPPER_STEAL)
+		| tdb->flist_off;
 	new.data_len = len_with_header - sizeof(struct tdb_used_record);
 	new.data_len = len_with_header - sizeof(struct tdb_used_record);
 
 
 	b_off = bucket_off(tdb->flist_off, size_to_bucket(new.data_len));
 	b_off = bucket_off(tdb->flist_off, size_to_bucket(new.data_len));
@@ -227,8 +254,7 @@ static int coalesce(struct tdb_context *tdb,
 		if (frec_magic(r) != TDB_FREE_MAGIC)
 		if (frec_magic(r) != TDB_FREE_MAGIC)
 			break;
 			break;
 
 
-		/* FIXME: Use flist from record */
-		nb_off = bucket_off(tdb->flist_off,size_to_bucket(r->data_len));
+		nb_off = bucket_off(frec_flist(r), size_to_bucket(r->data_len));
 
 
 		/* We may be violating lock order here, so best effort. */
 		/* We may be violating lock order here, so best effort. */
 		if (tdb_lock_free_bucket(tdb, nb_off, TDB_LOCK_NOWAIT) == -1)
 		if (tdb_lock_free_bucket(tdb, nb_off, TDB_LOCK_NOWAIT) == -1)
@@ -246,7 +272,7 @@ static int coalesce(struct tdb_context *tdb,
 			break;
 			break;
 		}
 		}
 
 
-		if (unlikely(bucket_off(tdb->flist_off,
+		if (unlikely(bucket_off(frec_flist(r),
 					size_to_bucket(r->data_len))
 					size_to_bucket(r->data_len))
 			     != nb_off)) {
 			     != nb_off)) {
 			tdb_unlock_free_bucket(tdb, nb_off);
 			tdb_unlock_free_bucket(tdb, nb_off);
@@ -288,7 +314,7 @@ static int coalesce(struct tdb_context *tdb,
 
 
 	/* We have to drop this to avoid deadlocks, so make sure record
 	/* We have to drop this to avoid deadlocks, so make sure record
 	 * doesn't get coalesced by someone else! */
 	 * doesn't get coalesced by someone else! */
-	r->magic_and_meta = TDB_COALESCING_MAGIC;
+	r->magic_and_meta = TDB_COALESCING_MAGIC << (64 - TDB_OFF_UPPER_STEAL);
 	r->data_len = end - off - sizeof(struct tdb_used_record);
 	r->data_len = end - off - sizeof(struct tdb_used_record);
 	if (tdb_access_commit(tdb, r) != 0)
 	if (tdb_access_commit(tdb, r) != 0)
 		goto err;
 		goto err;
@@ -425,8 +451,9 @@ static tdb_off_t get_free(struct tdb_context *tdb,
 			  size_t keylen, size_t datalen, bool want_extra,
 			  size_t keylen, size_t datalen, bool want_extra,
 			  unsigned hashlow)
 			  unsigned hashlow)
 {
 {
-	tdb_off_t off;
+	tdb_off_t off, flist;
 	unsigned start_b, b;
 	unsigned start_b, b;
+	bool wrapped = false;
 
 
 	/* If they are growing, add 50% to get to higher bucket. */
 	/* If they are growing, add 50% to get to higher bucket. */
 	if (want_extra)
 	if (want_extra)
@@ -435,20 +462,34 @@ static tdb_off_t get_free(struct tdb_context *tdb,
 	else
 	else
 		start_b = size_to_bucket(adjust_size(keylen, datalen));
 		start_b = size_to_bucket(adjust_size(keylen, datalen));
 
 
-	/* Start at exact size bucket, and search up... */
-	for (b = find_free_head(tdb, start_b);
-	     b < TDB_FREE_BUCKETS;
-	     b = find_free_head(tdb, b + 1)) {
-		/* Try getting one from list. */
-		off = lock_and_alloc(tdb, tdb->flist_off,
-				     b, keylen, datalen, want_extra,
-				     hashlow);
-		if (off == TDB_OFF_ERR)
-			return TDB_OFF_ERR;
-		if (off != 0)
-			return off;
-		/* Didn't work.  Try next bucket. */
+	flist = tdb->flist_off;
+	while (!wrapped || flist != tdb->flist_off) {
+		/* Start at exact size bucket, and search up... */
+		for (b = find_free_head(tdb, flist, start_b);
+		     b < TDB_FREE_BUCKETS;
+		     b = find_free_head(tdb, flist, b + 1)) {
+			/* Try getting one from list. */
+			off = lock_and_alloc(tdb, flist,
+					     b, keylen, datalen, want_extra,
+					     hashlow);
+			if (off == TDB_OFF_ERR)
+				return TDB_OFF_ERR;
+			if (off != 0) {
+				/* Worked?  Stay using this list. */
+				tdb->flist_off = flist;
+				return off;
+			}
+			/* Didn't work.  Try next bucket. */
+		}
+
+		/* Hmm, try next list. */
+		flist = next_flist(tdb, flist);
+		if (flist == 0) {
+			wrapped = true;
+			flist = first_flist(tdb);
+		}
 	}
 	}
+
 	return 0;
 	return 0;
 }
 }
 
 

+ 14 - 4
ccan/tdb2/private.h

@@ -64,8 +64,8 @@ typedef uint64_t tdb_off_t;
 #define TDB_MAGIC_FOOD "TDB file\n"
 #define TDB_MAGIC_FOOD "TDB file\n"
 #define TDB_VERSION ((uint64_t)(0x26011967 + 7))
 #define TDB_VERSION ((uint64_t)(0x26011967 + 7))
 #define TDB_MAGIC ((uint64_t)0x1999)
 #define TDB_MAGIC ((uint64_t)0x1999)
-#define TDB_FREE_MAGIC ((~(uint64_t)TDB_MAGIC) << 6)
-#define TDB_COALESCING_MAGIC (0xBAD1DEA2FEEDULL << 6)
+#define TDB_FREE_MAGIC ((uint64_t)0xFE)
+#define TDB_COALESCING_MAGIC ((uint64_t)0xFD)
 #define TDB_HASH_MAGIC (0xA1ABE11A01092008ULL)
 #define TDB_HASH_MAGIC (0xA1ABE11A01092008ULL)
 #define TDB_RECOVERY_MAGIC (0xf53bc0e7U)
 #define TDB_RECOVERY_MAGIC (0xf53bc0e7U)
 #define TDB_RECOVERY_INVALID_MAGIC (0x0)
 #define TDB_RECOVERY_INVALID_MAGIC (0x0)
@@ -173,7 +173,7 @@ static inline uint16_t rec_magic(const struct tdb_used_record *r)
 }
 }
 
 
 struct tdb_free_record {
 struct tdb_free_record {
-        uint64_t magic_and_meta;
+        uint64_t magic_and_meta; /* TDB_OFF_UPPER_STEAL bits of magic */
         uint64_t data_len; /* Not counting these two fields. */
         uint64_t data_len; /* Not counting these two fields. */
 	/* This is why the minimum record size is 16 bytes.  */
 	/* This is why the minimum record size is 16 bytes.  */
 	uint64_t next, prev;
 	uint64_t next, prev;
@@ -181,7 +181,12 @@ struct tdb_free_record {
 
 
 static inline uint64_t frec_magic(const struct tdb_free_record *f)
 static inline uint64_t frec_magic(const struct tdb_free_record *f)
 {
 {
-	return f->magic_and_meta;
+	return f->magic_and_meta >> (64 - TDB_OFF_UPPER_STEAL);
+}
+
+static inline uint64_t frec_flist(const struct tdb_free_record *f)
+{
+	return f->magic_and_meta & ((1ULL << (64 - TDB_OFF_UPPER_STEAL)) - 1);
 }
 }
 
 
 /* this is stored at the front of every database */
 /* this is stored at the front of every database */
@@ -201,6 +206,7 @@ struct tdb_header {
 
 
 struct tdb_freelist {
 struct tdb_freelist {
 	struct tdb_used_record hdr;
 	struct tdb_used_record hdr;
+	tdb_off_t next;
 	tdb_off_t buckets[TDB_FREE_BUCKETS];
 	tdb_off_t buckets[TDB_FREE_BUCKETS];
 };
 };
 
 
@@ -350,6 +356,10 @@ bool is_subhash(tdb_off_t val);
 /* free.c: */
 /* free.c: */
 int tdb_flist_init(struct tdb_context *tdb);
 int tdb_flist_init(struct tdb_context *tdb);
 
 
+/* check.c needs these to iterate through free lists. */
+tdb_off_t first_flist(struct tdb_context *tdb);
+tdb_off_t next_flist(struct tdb_context *tdb, tdb_off_t flist);
+
 /* If this fails, try tdb_expand. */
 /* If this fails, try tdb_expand. */
 tdb_off_t alloc(struct tdb_context *tdb, size_t keylen, size_t datalen,
 tdb_off_t alloc(struct tdb_context *tdb, size_t keylen, size_t datalen,
 		uint64_t hash, bool growing);
 		uint64_t hash, bool growing);

+ 2 - 1
ccan/tdb2/tdb.c

@@ -111,7 +111,8 @@ static int tdb_new_database(struct tdb_context *tdb,
 	newdb.hdr.free_list = offsetof(struct new_database, flist);
 	newdb.hdr.free_list = offsetof(struct new_database, flist);
 	memset(&newdb.flist, 0, sizeof(newdb.flist));
 	memset(&newdb.flist, 0, sizeof(newdb.flist));
 	set_header(NULL, &newdb.flist.hdr, 0,
 	set_header(NULL, &newdb.flist.hdr, 0,
-		   sizeof(newdb.flist.buckets), sizeof(newdb.flist.buckets), 1);
+		   sizeof(newdb.flist) - sizeof(newdb.flist.hdr),
+		   sizeof(newdb.flist) - sizeof(newdb.flist.hdr), 1);
 
 
 	/* Magic food */
 	/* Magic food */
 	memset(newdb.hdr.magic_food, 0, sizeof(newdb.hdr.magic_food));
 	memset(newdb.hdr.magic_food, 0, sizeof(newdb.hdr.magic_food));

+ 40 - 11
ccan/tdb2/test/layout.c

@@ -30,11 +30,13 @@ void tdb_layout_add_freelist(struct tdb_layout *layout)
 	add(layout, elem);
 	add(layout, elem);
 }
 }
 
 
-void tdb_layout_add_free(struct tdb_layout *layout, tdb_len_t len)
+void tdb_layout_add_free(struct tdb_layout *layout, tdb_len_t len,
+			 unsigned flist)
 {
 {
 	union tdb_layout_elem elem;
 	union tdb_layout_elem elem;
 	elem.base.type = FREE;
 	elem.base.type = FREE;
 	elem.free.len = len;
 	elem.free.len = len;
+	elem.free.flist_num = flist;
 	add(layout, elem);
 	add(layout, elem);
 }
 }
 
 
@@ -114,19 +116,29 @@ static void set_hashtable(void *mem, struct tdb_context *tdb,
 }
 }
 
 
 static void set_freelist(void *mem, struct tdb_context *tdb,
 static void set_freelist(void *mem, struct tdb_context *tdb,
-			 struct tle_freelist *freelist)
+			 struct tle_freelist *freelist, struct tdb_header *hdr,
+			 tdb_off_t last_flist)
 {
 {
 	struct tdb_freelist *flist = mem;
 	struct tdb_freelist *flist = mem;
 	memset(flist, 0, sizeof(*flist));
 	memset(flist, 0, sizeof(*flist));
 	set_header(tdb, &flist->hdr, 0,
 	set_header(tdb, &flist->hdr, 0,
 		   sizeof(*flist) - sizeof(flist->hdr),
 		   sizeof(*flist) - sizeof(flist->hdr),
 		   sizeof(*flist) - sizeof(flist->hdr), 1);
 		   sizeof(*flist) - sizeof(flist->hdr), 1);
+
+	if (last_flist) {
+		flist = (struct tdb_freelist *)((char *)hdr + last_flist);
+		flist->next = freelist->base.off;
+	} else {
+		hdr->free_list = freelist->base.off;
+	}
 }
 }
 
 
 static void add_to_freetable(struct tdb_context *tdb,
 static void add_to_freetable(struct tdb_context *tdb,
 			     tdb_off_t eoff,
 			     tdb_off_t eoff,
-			     tdb_off_t elen)
+			     tdb_off_t elen,
+			     struct tle_freelist *freelist)
 {
 {
+	tdb->flist_off = freelist->base.off;
 	add_free_record(tdb, eoff, sizeof(struct tdb_used_record) + elen);
 	add_free_record(tdb, eoff, sizeof(struct tdb_used_record) + elen);
 }
 }
 
 
@@ -190,11 +202,25 @@ static void add_to_hashtable(struct tdb_context *tdb,
 	abort();
 	abort();
 }
 }
 
 
+static struct tle_freelist *find_flist(struct tdb_layout *layout, unsigned num)
+{
+	unsigned i;
+
+	for (i = 0; i < layout->num_elems; i++) {
+		if (layout->elem[i].base.type != FREELIST)
+			continue;
+		if (num == 0)
+			return &layout->elem[i].flist;
+		num--;
+	}
+	abort();
+}
+
 /* FIXME: Support TDB_CONVERT */
 /* FIXME: Support TDB_CONVERT */
 struct tdb_context *tdb_layout_get(struct tdb_layout *layout)
 struct tdb_context *tdb_layout_get(struct tdb_layout *layout)
 {
 {
 	unsigned int i;
 	unsigned int i;
-	tdb_off_t off, len, flist_off = 0;
+	tdb_off_t off, len, last_flist;
 	char *mem;
 	char *mem;
 	struct tdb_context *tdb;
 	struct tdb_context *tdb;
 
 
@@ -206,8 +232,6 @@ struct tdb_context *tdb_layout_get(struct tdb_layout *layout)
 		e->base.off = off;
 		e->base.off = off;
 		switch (e->base.type) {
 		switch (e->base.type) {
 		case FREELIST:
 		case FREELIST:
-			assert(flist_off == 0);
-			flist_off = off;
 			len = freelist_len(&e->flist);
 			len = freelist_len(&e->flist);
 			break;
 			break;
 		case FREE:
 		case FREE:
@@ -224,8 +248,6 @@ struct tdb_context *tdb_layout_get(struct tdb_layout *layout)
 		}
 		}
 		off += len;
 		off += len;
 	}
 	}
-	/* Must have a free list! */
-	assert(flist_off);
 
 
 	mem = malloc(off);
 	mem = malloc(off);
 	/* Now populate our header, cribbing from a real TDB header. */
 	/* Now populate our header, cribbing from a real TDB header. */
@@ -236,13 +258,15 @@ struct tdb_context *tdb_layout_get(struct tdb_layout *layout)
 	free(tdb->map_ptr);
 	free(tdb->map_ptr);
 	tdb->map_ptr = mem;
 	tdb->map_ptr = mem;
 	tdb->map_size = off;
 	tdb->map_size = off;
-	tdb->flist_off = flist_off;
 
 
+	last_flist = 0;
 	for (i = 0; i < layout->num_elems; i++) {
 	for (i = 0; i < layout->num_elems; i++) {
 		union tdb_layout_elem *e = &layout->elem[i];
 		union tdb_layout_elem *e = &layout->elem[i];
 		switch (e->base.type) {
 		switch (e->base.type) {
 		case FREELIST:
 		case FREELIST:
-			set_freelist(mem + e->base.off, tdb, &e->flist);
+			set_freelist(mem + e->base.off, tdb, &e->flist,
+				     (struct tdb_header *)mem, last_flist);
+			last_flist = e->base.off;
 			break;
 			break;
 		case FREE:
 		case FREE:
 			set_free_record(mem + e->base.off, e->free.len);
 			set_free_record(mem + e->base.off, e->free.len);
@@ -255,13 +279,16 @@ struct tdb_context *tdb_layout_get(struct tdb_layout *layout)
 			break;
 			break;
 		}
 		}
 	}
 	}
+	/* Must have a free list! */
+	assert(last_flist);
 
 
 	/* Now fill the free and hash tables. */
 	/* Now fill the free and hash tables. */
 	for (i = 0; i < layout->num_elems; i++) {
 	for (i = 0; i < layout->num_elems; i++) {
 		union tdb_layout_elem *e = &layout->elem[i];
 		union tdb_layout_elem *e = &layout->elem[i];
 		switch (e->base.type) {
 		switch (e->base.type) {
 		case FREE:
 		case FREE:
-			add_to_freetable(tdb, e->base.off, e->free.len);
+			add_to_freetable(tdb, e->base.off, e->free.len,
+					 find_flist(layout, e->free.flist_num));
 			break;
 			break;
 		case DATA:
 		case DATA:
 			add_to_hashtable(tdb, e->base.off, e->used.key);
 			add_to_hashtable(tdb, e->base.off, e->used.key);
@@ -271,6 +298,8 @@ struct tdb_context *tdb_layout_get(struct tdb_layout *layout)
 		}
 		}
 	}
 	}
 
 
+	tdb->flist_off = find_flist(layout, 0)->base.off;
+
 	/* Get physical if they asked for it. */
 	/* Get physical if they asked for it. */
 	if (layout->filename) {
 	if (layout->filename) {
 		int fd = open(layout->filename, O_WRONLY|O_TRUNC|O_CREAT,
 		int fd = open(layout->filename, O_WRONLY|O_TRUNC|O_CREAT,

+ 3 - 1
ccan/tdb2/test/layout.h

@@ -4,7 +4,8 @@
 
 
 struct tdb_layout *new_tdb_layout(const char *filename);
 struct tdb_layout *new_tdb_layout(const char *filename);
 void tdb_layout_add_freelist(struct tdb_layout *layout);
 void tdb_layout_add_freelist(struct tdb_layout *layout);
-void tdb_layout_add_free(struct tdb_layout *layout, tdb_len_t len);
+void tdb_layout_add_free(struct tdb_layout *layout, tdb_len_t len,
+			 unsigned flist);
 void tdb_layout_add_used(struct tdb_layout *layout,
 void tdb_layout_add_used(struct tdb_layout *layout,
 			 TDB_DATA key, TDB_DATA data,
 			 TDB_DATA key, TDB_DATA data,
 			 tdb_len_t extra);
 			 tdb_len_t extra);
@@ -33,6 +34,7 @@ struct tle_freelist {
 struct tle_free {
 struct tle_free {
 	struct tle_base base;
 	struct tle_base base;
 	tdb_len_t len;
 	tdb_len_t len;
+	unsigned flist_num;
 };
 };
 
 
 struct tle_used {
 struct tle_used {

+ 9 - 9
ccan/tdb2/test/run-03-coalesce.c

@@ -39,7 +39,7 @@ int main(int argc, char *argv[])
 	layout = new_tdb_layout(NULL);
 	layout = new_tdb_layout(NULL);
 	tdb_layout_add_freelist(layout);
 	tdb_layout_add_freelist(layout);
 	len = 1024;
 	len = 1024;
-	tdb_layout_add_free(layout, len);
+	tdb_layout_add_free(layout, len, 0);
 	tdb = tdb_layout_get(layout);
 	tdb = tdb_layout_get(layout);
 	ok1(tdb_check(tdb, NULL, NULL) == 0);
 	ok1(tdb_check(tdb, NULL, NULL) == 0);
 	ok1(free_record_length(tdb, layout->elem[1].base.off) == len);
 	ok1(free_record_length(tdb, layout->elem[1].base.off) == len);
@@ -57,7 +57,7 @@ int main(int argc, char *argv[])
 	/* No coalescing can be done due to used record */
 	/* No coalescing can be done due to used record */
 	layout = new_tdb_layout(NULL);
 	layout = new_tdb_layout(NULL);
 	tdb_layout_add_freelist(layout);
 	tdb_layout_add_freelist(layout);
-	tdb_layout_add_free(layout, 1024);
+	tdb_layout_add_free(layout, 1024, 0);
 	tdb_layout_add_used(layout, key, data, 6);
 	tdb_layout_add_used(layout, key, data, 6);
 	tdb = tdb_layout_get(layout);
 	tdb = tdb_layout_get(layout);
 	ok1(free_record_length(tdb, layout->elem[1].base.off) == 1024);
 	ok1(free_record_length(tdb, layout->elem[1].base.off) == 1024);
@@ -76,8 +76,8 @@ int main(int argc, char *argv[])
 	/* Coalescing can be done due to two free records, then EOF */
 	/* Coalescing can be done due to two free records, then EOF */
 	layout = new_tdb_layout(NULL);
 	layout = new_tdb_layout(NULL);
 	tdb_layout_add_freelist(layout);
 	tdb_layout_add_freelist(layout);
-	tdb_layout_add_free(layout, 1024);
-	tdb_layout_add_free(layout, 2048);
+	tdb_layout_add_free(layout, 1024, 0);
+	tdb_layout_add_free(layout, 2048, 0);
 	tdb = tdb_layout_get(layout);
 	tdb = tdb_layout_get(layout);
 	ok1(free_record_length(tdb, layout->elem[1].base.off) == 1024);
 	ok1(free_record_length(tdb, layout->elem[1].base.off) == 1024);
 	ok1(free_record_length(tdb, layout->elem[2].base.off) == 2048);
 	ok1(free_record_length(tdb, layout->elem[2].base.off) == 2048);
@@ -97,8 +97,8 @@ int main(int argc, char *argv[])
 	/* Coalescing can be done due to two free records, then data */
 	/* Coalescing can be done due to two free records, then data */
 	layout = new_tdb_layout(NULL);
 	layout = new_tdb_layout(NULL);
 	tdb_layout_add_freelist(layout);
 	tdb_layout_add_freelist(layout);
-	tdb_layout_add_free(layout, 1024);
-	tdb_layout_add_free(layout, 512);
+	tdb_layout_add_free(layout, 1024, 0);
+	tdb_layout_add_free(layout, 512, 0);
 	tdb_layout_add_used(layout, key, data, 6);
 	tdb_layout_add_used(layout, key, data, 6);
 	tdb = tdb_layout_get(layout);
 	tdb = tdb_layout_get(layout);
 	ok1(free_record_length(tdb, layout->elem[1].base.off) == 1024);
 	ok1(free_record_length(tdb, layout->elem[1].base.off) == 1024);
@@ -119,9 +119,9 @@ int main(int argc, char *argv[])
 	/* Coalescing can be done due to three free records, then EOF */
 	/* Coalescing can be done due to three free records, then EOF */
 	layout = new_tdb_layout(NULL);
 	layout = new_tdb_layout(NULL);
 	tdb_layout_add_freelist(layout);
 	tdb_layout_add_freelist(layout);
-	tdb_layout_add_free(layout, 1024);
-	tdb_layout_add_free(layout, 512);
-	tdb_layout_add_free(layout, 256);
+	tdb_layout_add_free(layout, 1024, 0);
+	tdb_layout_add_free(layout, 512, 0);
+	tdb_layout_add_free(layout, 256, 0);
 	tdb = tdb_layout_get(layout);
 	tdb = tdb_layout_get(layout);
 	ok1(free_record_length(tdb, layout->elem[1].base.off) == 1024);
 	ok1(free_record_length(tdb, layout->elem[1].base.off) == 1024);
 	ok1(free_record_length(tdb, layout->elem[2].base.off) == 512);
 	ok1(free_record_length(tdb, layout->elem[2].base.off) == 512);

+ 66 - 0
ccan/tdb2/test/run-50-multiple-freelists.c

@@ -0,0 +1,66 @@
+#include <ccan/tdb2/tdb.c>
+#include <ccan/tdb2/free.c>
+#include <ccan/tdb2/lock.c>
+#include <ccan/tdb2/io.c>
+#include <ccan/tdb2/hash.c>
+#include <ccan/tdb2/check.c>
+#include <ccan/tap/tap.h>
+#include "logging.h"
+#include "layout.h"
+
+int main(int argc, char *argv[])
+{
+	tdb_off_t off;
+	struct tdb_context *tdb;
+	struct tdb_layout *layout;
+	TDB_DATA key, data;
+
+	plan_tests(11);
+	key.dptr = (unsigned char *)"Hello";
+	data.dptr = (unsigned char *)"world";
+	data.dsize = 5;
+	key.dsize = 5;
+
+	/* Create a TDB with three free lists. */
+	layout = new_tdb_layout(NULL);
+	tdb_layout_add_freelist(layout);
+	tdb_layout_add_freelist(layout);
+	tdb_layout_add_freelist(layout);
+	tdb_layout_add_free(layout, 80, 0);
+	/* Used record prevent coalescing. */
+	tdb_layout_add_used(layout, key, data, 6);
+	tdb_layout_add_free(layout, 160, 1);
+	key.dsize--;
+	tdb_layout_add_used(layout, key, data, 7);
+	tdb_layout_add_free(layout, 320, 2);
+	key.dsize--;
+	tdb_layout_add_used(layout, key, data, 8);
+	tdb_layout_add_free(layout, 40, 0);
+	tdb = tdb_layout_get(layout);
+	ok1(tdb_check(tdb, NULL, NULL) == 0);
+
+	off = get_free(tdb, 0, 80 - sizeof(struct tdb_used_record), 0, 0);
+	ok1(off == layout->elem[3].base.off);
+	ok1(tdb->flist_off == layout->elem[0].base.off);
+
+	off = get_free(tdb, 0, 160 - sizeof(struct tdb_used_record), 0, 0);
+	ok1(off == layout->elem[5].base.off);
+	ok1(tdb->flist_off == layout->elem[1].base.off);
+
+	off = get_free(tdb, 0, 320 - sizeof(struct tdb_used_record), 0, 0);
+	ok1(off == layout->elem[7].base.off);
+	ok1(tdb->flist_off == layout->elem[2].base.off);
+
+	off = get_free(tdb, 0, 40 - sizeof(struct tdb_used_record), 0, 0);
+	ok1(off == layout->elem[9].base.off);
+	ok1(tdb->flist_off == layout->elem[0].base.off);
+
+	/* Now we fail. */
+	off = get_free(tdb, 0, 0, 1, 0);
+	ok1(off == 0);
+
+	tdb_close(tdb);
+
+	ok1(tap_log_messages == 0);
+	return exit_status();
+}