Browse Source

tdb2: new tests, and new fixes.

Rusty Russell 15 years ago
parent
commit
cb399d268c

+ 9 - 1
ccan/tdb2/free.c

@@ -279,7 +279,7 @@ static int coalesce(struct tdb_context *tdb, tdb_off_t off,
 			break;
 		}
 
-		if (remove_from_list(tdb, list, r) == -1) {
+		if (remove_from_list(tdb, nlist, r) == -1) {
 			tdb_unlock_free_list(tdb, nlist);
 			goto err;
 		}
@@ -297,6 +297,14 @@ static int coalesce(struct tdb_context *tdb, tdb_off_t off,
 	if (!r)
 		goto err;
 
+	if (r->data_len != data_len) {
+		tdb->ecode = TDB_ERR_CORRUPT;
+		tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
+			 "coalesce: expected data len %llu not %llu\n",
+			 (long long)data_len, (long long)r->data_len);
+		goto err;
+	}
+
 	if (remove_from_list(tdb, list, r) == -1)
 		goto err;
 

+ 5 - 3
ccan/tdb2/tdb.c

@@ -268,6 +268,8 @@ struct tdb_context *tdb_open(const char *name, int tdb_flags,
 		}
 		TEST_IT(tdb->flags & TDB_CONVERT);
 		tdb_convert(tdb, &tdb->header, sizeof(tdb->header));
+		/* Zones don't matter for internal db. */
+		tdb->last_zone = 0;
 		return tdb;
 	}
 
@@ -518,9 +520,9 @@ static int update_rec_hdr(struct tdb_context *tdb,
 			  struct tdb_used_record *rec,
 			  uint64_t h)
 {
-	uint64_t room = rec_data_length(rec) + rec_extra_padding(rec);
+	uint64_t dataroom = rec_data_length(rec) + rec_extra_padding(rec);
 
-	if (set_header(tdb, rec, keylen, datalen, room - datalen, h))
+	if (set_header(tdb, rec, keylen, datalen, keylen + dataroom, h))
 		return -1;
 
 	return tdb_write_convert(tdb, off, rec, sizeof(*rec));
@@ -688,11 +690,11 @@ int tdb_store(struct tdb_context *tdb,
 				+ rec_extra_padding(&rec));
 	}
 
-write:
 	/* FIXME: Encode extra hash bits! */
 	if (tdb_write_off(tdb, hash_off(tdb, old_bucket), new_off) == -1)
 		goto fail;
 
+write:
 	off = new_off + sizeof(struct tdb_used_record);
 	if (tdb->methods->write(tdb, off, key.dptr, key.dsize) == -1)
 		goto fail;

+ 265 - 0
ccan/tdb2/test/layout.c

@@ -0,0 +1,265 @@
+/* TDB tools to create various canned database layouts. */
+#include "layout.h"
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+
+struct tdb_layout *new_tdb_layout(void)
+{
+	struct tdb_layout *layout = malloc(sizeof(*layout));
+	layout->num_elems = 0;
+	layout->elem = NULL;
+	layout->ftable = layout->htable = -1;
+	return layout;
+}
+
+static void add(struct tdb_layout *layout, union tdb_layout_elem elem)
+{
+	layout->elem = realloc(layout->elem,
+			       sizeof(layout->elem[0])
+			       * (layout->num_elems+1));
+	layout->elem[layout->num_elems++] = elem;
+}
+
+void tdb_layout_add_free(struct tdb_layout *layout, tdb_len_t len)
+{
+	union tdb_layout_elem elem;
+	elem.base.type = FREE;
+	elem.free.len = len;
+	add(layout, elem);
+}
+
+static struct tdb_data dup_key(struct tdb_data key)
+{
+	struct tdb_data ret;
+	ret.dsize = key.dsize;
+	ret.dptr = malloc(ret.dsize);
+	memcpy(ret.dptr, key.dptr, ret.dsize);
+	return ret;
+}
+
+void tdb_layout_add_used(struct tdb_layout *layout,
+			 TDB_DATA key, TDB_DATA data,
+			 tdb_len_t extra)
+{
+	union tdb_layout_elem elem;
+	elem.base.type = DATA;
+	elem.used.key = dup_key(key);
+	elem.used.data = dup_key(data);
+	elem.used.extra = extra;
+	add(layout, elem);
+}
+
+void tdb_layout_add_hashtable(struct tdb_layout *layout,
+			      unsigned int hash_bits,
+			      tdb_len_t extra)
+{
+	union tdb_layout_elem elem;
+	elem.base.type = HASHTABLE;
+	elem.hashtable.hash_bits = hash_bits;
+	elem.hashtable.extra = extra;
+	assert(layout->htable == -1U);
+	layout->htable = layout->num_elems;
+	add(layout, elem);
+}
+
+void tdb_layout_add_freetable(struct tdb_layout *layout,
+			      unsigned int num_zones,
+			      unsigned int zone_bits,
+			      unsigned int num_buckets,
+			      tdb_len_t extra)
+{
+	union tdb_layout_elem elem;
+	elem.base.type = FREETABLE;
+	elem.freetable.num_zones = num_zones;
+	elem.freetable.zone_bits = zone_bits;
+	elem.freetable.num_buckets = num_buckets;
+	elem.freetable.extra = extra;
+	assert(layout->ftable == -1U);
+	layout->ftable = layout->num_elems;
+	add(layout, elem);
+}
+
+static tdb_len_t free_record_len(tdb_len_t len)
+{
+	return sizeof(struct tdb_used_record) + len;
+}
+
+static tdb_len_t data_record_len(struct tle_used *used)
+{
+	tdb_len_t len;
+	len = sizeof(struct tdb_used_record)
+		+ used->key.dsize + used->data.dsize + used->extra;
+	assert(len >= sizeof(struct tdb_free_record));
+	return len;
+}
+
+static tdb_len_t hashtable_len(struct tle_hashtable *htable)
+{
+	return sizeof(struct tdb_used_record)
+		+ (sizeof(tdb_off_t) << htable->hash_bits);
+}
+
+static tdb_len_t freetable_len(struct tle_freetable *ftable)
+{
+	return sizeof(struct tdb_used_record)
+		+ (sizeof(tdb_off_t) * ftable->num_zones
+		   * (ftable->num_buckets + 1));
+}
+
+static void set_free_record(void *mem, tdb_len_t len)
+{
+	/* We do all the work in add_to_freetable */
+}
+
+static void set_data_record(void *mem, struct tdb_context *tdb,
+			    struct tle_used *used)
+{
+	struct tdb_used_record *u = mem;
+
+	set_header(tdb, u, used->key.dsize, used->data.dsize,
+		   used->key.dsize + used->data.dsize + used->extra,
+		   tdb_hash(tdb, used->key.dptr, used->key.dsize));
+	memcpy(u + 1, used->key.dptr, used->key.dsize);
+	memcpy((char *)(u + 1) + used->key.dsize,
+	       used->data.dptr, used->data.dsize);
+}
+
+static void set_hashtable(void *mem, struct tdb_context *tdb,
+			  struct tle_hashtable *htable)
+{
+	struct tdb_used_record *u = mem;
+	tdb_len_t len = sizeof(tdb_off_t) << htable->hash_bits;
+
+	set_header(tdb, u, 0, len, len + htable->extra, 0);
+	memset(u + 1, 0, len);
+}
+
+static void set_freetable(void *mem, struct tdb_context *tdb,
+			  struct tle_freetable *ftable)
+{
+	struct tdb_used_record *u = mem;
+	tdb_len_t len = sizeof(tdb_off_t) * ftable->num_zones
+		* (ftable->num_buckets + 1);
+	set_header(tdb, u, 0, len, len + ftable->extra, 0);
+	memset(u + 1, 0, len);
+}
+
+static void add_to_freetable(struct tdb_context *tdb,
+			     tdb_off_t eoff,
+			     tdb_off_t elen)
+{
+	add_free_record(tdb, eoff, sizeof(struct tdb_used_record) + elen);
+}
+
+static tdb_off_t hash_off(struct tdb_context *tdb, uint64_t list)
+{
+	return tdb->header.v.hash_off
+		+ ((list & ((1ULL << tdb->header.v.hash_bits) - 1))
+		   * sizeof(tdb_off_t));
+}
+
+static void add_to_hashtable(struct tdb_context *tdb,
+			     tdb_off_t eoff,
+			     struct tdb_data key)
+{
+	uint64_t hash = tdb_hash(tdb, key.dptr, key.dsize);
+	tdb_off_t hoff;
+
+	while (tdb_read_off(tdb, hoff = hash_off(tdb, hash)) != 0)
+		hash++;
+
+	tdb_write_off(tdb, hoff, eoff);
+}
+
+/* FIXME: Support TDB_CONVERT */
+struct tdb_context *tdb_layout_get(struct tdb_layout *layout)
+{
+	unsigned int i;
+	tdb_off_t len;
+	struct tdb_header *hdr;
+	char *mem;
+	struct tdb_context *tdb;
+
+	assert(layout->ftable != -1U);
+	assert(layout->htable != -1U);
+
+	len = sizeof(struct tdb_header);
+
+	/* First pass of layout: calc lengths */
+	for (i = 0; i < layout->num_elems; i++) {
+		union tdb_layout_elem *e = &layout->elem[i];
+		e->base.off = len;
+		switch (e->base.type) {
+		case FREE:
+			len += free_record_len(e->free.len);
+			break;
+		case DATA:
+			len += data_record_len(&e->used);
+			break;
+		case HASHTABLE:
+			len += hashtable_len(&e->hashtable);
+			break;
+		case FREETABLE:
+			len += freetable_len(&e->freetable);
+			break;
+		}
+	}
+
+	mem = malloc(len);
+	/* Now populate our header, cribbing from a real TDB header. */
+	tdb = tdb_open(NULL, TDB_INTERNAL, O_RDWR, 0, NULL);
+	hdr = (void *)mem;
+	*hdr = tdb->header;
+	hdr->v.generation++;
+	hdr->v.num_zones = layout->elem[layout->ftable].freetable.num_zones;
+	hdr->v.zone_bits = layout->elem[layout->ftable].freetable.zone_bits;
+	hdr->v.free_buckets
+		= layout->elem[layout->ftable].freetable.num_buckets;
+	hdr->v.free_off = layout->elem[layout->ftable].base.off
+		+ sizeof(struct tdb_used_record);
+	hdr->v.hash_bits = layout->elem[layout->htable].hashtable.hash_bits;
+	hdr->v.hash_off = layout->elem[layout->htable].base.off
+		+ sizeof(struct tdb_used_record);
+
+	/* Mug the tdb we have to make it use this. */
+	free(tdb->map_ptr);
+	tdb->map_ptr = mem;
+	tdb->map_size = len;
+	header_changed(tdb);
+
+	for (i = 0; i < layout->num_elems; i++) {
+		union tdb_layout_elem *e = &layout->elem[i];
+		switch (e->base.type) {
+		case FREE:
+			set_free_record(mem + e->base.off, e->free.len);
+			break;
+		case DATA:
+			set_data_record(mem + e->base.off, tdb, &e->used);
+			break;
+		case HASHTABLE:
+			set_hashtable(mem + e->base.off, tdb, &e->hashtable);
+			break;
+		case FREETABLE:
+			set_freetable(mem + e->base.off, tdb, &e->freetable);
+			break;
+		}
+	}
+
+	/* Now fill the free and hash tables. */
+	for (i = 0; i < layout->num_elems; i++) {
+		union tdb_layout_elem *e = &layout->elem[i];
+		switch (e->base.type) {
+		case FREE:
+			add_to_freetable(tdb, e->base.off, e->free.len);
+			break;
+		case DATA:
+			add_to_hashtable(tdb, e->base.off, e->used.key);
+			break;
+		default:
+			break;
+		}
+	}
+
+	return tdb;
+}

+ 69 - 0
ccan/tdb2/test/layout.h

@@ -0,0 +1,69 @@
+#ifndef TDB2_TEST_LAYOUT_H
+#define TDB2_TEST_LAYOUT_H
+#include <ccan/tdb2/private.h>
+
+struct tdb_layout *new_tdb_layout(void);
+void tdb_layout_add_free(struct tdb_layout *layout, tdb_len_t len);
+void tdb_layout_add_used(struct tdb_layout *layout,
+			 TDB_DATA key, TDB_DATA data,
+			 tdb_len_t extra);
+void tdb_layout_add_hashtable(struct tdb_layout *layout,
+			      unsigned int hash_bits,
+			      tdb_len_t extra);
+void tdb_layout_add_freetable(struct tdb_layout *layout,
+			      unsigned int num_zones,
+			      unsigned int zone_bits,
+			      unsigned int num_buckets,
+			      tdb_len_t extra);
+struct tdb_context *tdb_layout_get(struct tdb_layout *layout);
+
+enum layout_type {
+	FREE, DATA, HASHTABLE, FREETABLE,
+};
+
+/* Shared by all union members. */
+struct tle_base {
+	enum layout_type type;
+	tdb_off_t off;
+};
+
+struct tle_free {
+	struct tle_base base;
+	tdb_len_t len;
+};
+
+struct tle_used {
+	struct tle_base base;
+	TDB_DATA key;
+	TDB_DATA data;
+	tdb_len_t extra;
+};
+
+struct tle_hashtable {
+	struct tle_base base;
+	unsigned hash_bits;
+	tdb_len_t extra;
+};
+
+struct tle_freetable {
+	struct tle_base base;
+	unsigned int num_zones;
+	unsigned int zone_bits;
+	unsigned int num_buckets;
+	tdb_len_t extra;
+};
+
+union tdb_layout_elem {
+	struct tle_base base;
+	struct tle_free free;
+	struct tle_used used;
+	struct tle_freetable freetable;
+	struct tle_hashtable hashtable;
+};
+
+struct tdb_layout {
+	unsigned int num_elems;
+	union tdb_layout_elem *elem;
+	unsigned int ftable, htable;
+};
+#endif /* TDB2_TEST_LAYOUT_H */

+ 218 - 0
ccan/tdb2/test/run-coalesce.c

@@ -0,0 +1,218 @@
+#include <ccan/tdb2/tdb.c>
+#include <ccan/tdb2/free.c>
+#include <ccan/tdb2/lock.c>
+#include <ccan/tdb2/io.c>
+#include <ccan/tdb2/check.c>
+#include <ccan/tap/tap.h>
+#include "logging.h"
+#include "layout.h"
+
+static tdb_len_t free_record_length(struct tdb_context *tdb, tdb_off_t off)
+{
+	struct tdb_free_record f;
+
+	if (tdb_read_convert(tdb, off, &f, sizeof(f)) != 0)
+		return TDB_OFF_ERR;
+	if (f.magic != TDB_FREE_MAGIC)
+		return TDB_OFF_ERR;
+	return f.data_len;
+}
+
+int main(int argc, char *argv[])
+{
+	tdb_off_t list;
+	struct tdb_context *tdb;
+	struct tdb_layout *layout;
+	struct tdb_data data, key;
+	tdb_len_t total;
+	unsigned int i;
+
+	/* FIXME: Test TDB_CONVERT */
+
+	plan_tests(62);
+	data.dptr = (void *)"world";
+	data.dsize = 5;
+	key.dptr = (void *)"hello";
+	key.dsize = 5;
+
+	/* No coalescing can be done due to EOF */
+	layout = new_tdb_layout();
+	tdb_layout_add_hashtable(layout, 12, 0);
+	tdb_layout_add_freetable(layout, 1, 16, 12, 0);
+	tdb_layout_add_free(layout, 1024);
+	tdb = tdb_layout_get(layout);
+	tdb->log = tap_log_fn;
+	ok1(tdb_check(tdb, NULL, NULL) == 0);
+	ok1(free_record_length(tdb, layout->elem[2].base.off) == 1024);
+
+	/* Figure out which list free entry is. */
+	list = size_to_bucket(tdb, 1024);
+	/* Lock and fail to coalesce. */
+	ok1(tdb_lock_list(tdb, 0, F_WRLCK, TDB_LOCK_WAIT) == 0);
+	ok1(tdb_lock_free_list(tdb, list, TDB_LOCK_WAIT) == 0);
+	ok1(coalesce(tdb, layout->elem[2].base.off, list, 1024) == 0);
+	tdb_unlock_free_list(tdb, list);
+	tdb_unlock_list(tdb, 0, F_WRLCK);
+	ok1(free_record_length(tdb, layout->elem[2].base.off) == 1024);
+	ok1(tdb_check(tdb, NULL, NULL) == 0);
+	tdb_close(tdb);
+
+	/* No coalescing can be done due to used record */
+	layout = new_tdb_layout();
+	tdb_layout_add_hashtable(layout, 12, 0);
+	tdb_layout_add_freetable(layout, 1, 16, 12, 0);
+	tdb_layout_add_free(layout, 1024);
+	tdb_layout_add_used(layout, key, data, 6);
+	tdb = tdb_layout_get(layout);
+	tdb->log = tap_log_fn;
+	ok1(free_record_length(tdb, layout->elem[2].base.off) == 1024);
+	ok1(tdb_check(tdb, NULL, NULL) == 0);
+
+	/* Figure out which list free entry is. */
+	list = size_to_bucket(tdb, 1024);
+	/* Lock and fail to coalesce. */
+	ok1(tdb_lock_list(tdb, 0, F_WRLCK, TDB_LOCK_WAIT) == 0);
+	ok1(tdb_lock_free_list(tdb, list, TDB_LOCK_WAIT) == 0);
+	ok1(coalesce(tdb, layout->elem[2].base.off, list, 1024) == 0);
+	tdb_unlock_free_list(tdb, list);
+	tdb_unlock_list(tdb, 0, F_WRLCK);
+	ok1(free_record_length(tdb, layout->elem[2].base.off) == 1024);
+	ok1(tdb_check(tdb, NULL, NULL) == 0);
+	tdb_close(tdb);
+
+	/* Coalescing can be done due to two free records, then EOF */
+	layout = new_tdb_layout();
+	tdb_layout_add_hashtable(layout, 12, 0);
+	tdb_layout_add_freetable(layout, 1, 16, 12, 0);
+	tdb_layout_add_free(layout, 1024);
+	tdb_layout_add_free(layout, 512);
+	tdb = tdb_layout_get(layout);
+	tdb->log = tap_log_fn;
+	ok1(free_record_length(tdb, layout->elem[2].base.off) == 1024);
+	ok1(free_record_length(tdb, layout->elem[3].base.off) == 512);
+	ok1(tdb_check(tdb, NULL, NULL) == 0);
+
+	/* Figure out which list free entry is. */
+	list = size_to_bucket(tdb, 1024);
+	/* Lock and coalesce. */
+	ok1(tdb_lock_list(tdb, 0, F_WRLCK, TDB_LOCK_WAIT) == 0);
+	ok1(tdb_lock_free_list(tdb, list, TDB_LOCK_WAIT) == 0);
+	ok1(coalesce(tdb, layout->elem[2].base.off, list, 1024) == 1);
+	tdb_unlock_list(tdb, 0, F_WRLCK);
+	ok1(!tdb_has_locks(tdb));
+	ok1(free_record_length(tdb, layout->elem[2].base.off)
+	    == 1024 + sizeof(struct tdb_used_record) + 512);
+	ok1(tdb_check(tdb, NULL, NULL) == 0);
+	tdb_close(tdb);
+
+	/* Coalescing can be done due to two free records, then data */
+	layout = new_tdb_layout();
+	tdb_layout_add_hashtable(layout, 12, 0);
+	tdb_layout_add_freetable(layout, 1, 16, 12, 0);
+	tdb_layout_add_free(layout, 1024);
+	tdb_layout_add_free(layout, 512);
+	tdb_layout_add_used(layout, key, data, 6);
+	tdb = tdb_layout_get(layout);
+	tdb->log = tap_log_fn;
+	ok1(free_record_length(tdb, layout->elem[2].base.off) == 1024);
+	ok1(free_record_length(tdb, layout->elem[3].base.off) == 512);
+	ok1(tdb_check(tdb, NULL, NULL) == 0);
+
+	/* Figure out which list free entry is. */
+	list = size_to_bucket(tdb, 1024);
+	/* Lock and coalesce. */
+	ok1(tdb_lock_list(tdb, 0, F_WRLCK, TDB_LOCK_WAIT) == 0);
+	ok1(tdb_lock_free_list(tdb, list, TDB_LOCK_WAIT) == 0);
+	ok1(coalesce(tdb, layout->elem[2].base.off, list, 1024) == 1);
+	tdb_unlock_list(tdb, 0, F_WRLCK);
+	ok1(!tdb_has_locks(tdb));
+	ok1(free_record_length(tdb, layout->elem[2].base.off)
+	    == 1024 + sizeof(struct tdb_used_record) + 512);
+	ok1(tdb_check(tdb, NULL, NULL) == 0);
+	tdb_close(tdb);
+
+	/* Coalescing can be done due to three free records, then EOF */
+	layout = new_tdb_layout();
+	tdb_layout_add_hashtable(layout, 12, 0);
+	tdb_layout_add_freetable(layout, 1, 16, 12, 0);
+	tdb_layout_add_free(layout, 1024);
+	tdb_layout_add_free(layout, 512);
+	tdb_layout_add_free(layout, 32);
+	tdb = tdb_layout_get(layout);
+	tdb->log = tap_log_fn;
+	ok1(free_record_length(tdb, layout->elem[2].base.off) == 1024);
+	ok1(free_record_length(tdb, layout->elem[3].base.off) == 512);
+	ok1(free_record_length(tdb, layout->elem[4].base.off) == 32);
+	ok1(tdb_check(tdb, NULL, NULL) == 0);
+
+	/* Figure out which list free entry is. */
+	list = size_to_bucket(tdb, 1024);
+	/* Lock and coalesce. */
+	ok1(tdb_lock_list(tdb, 0, F_WRLCK, TDB_LOCK_WAIT) == 0);
+	ok1(tdb_lock_free_list(tdb, list, TDB_LOCK_WAIT) == 0);
+	ok1(coalesce(tdb, layout->elem[2].base.off, list, 1024) == 1);
+	tdb_unlock_list(tdb, 0, F_WRLCK);
+	ok1(!tdb_has_locks(tdb));
+	ok1(free_record_length(tdb, layout->elem[2].base.off)
+	    == 1024 + sizeof(struct tdb_used_record) + 512
+	    + sizeof(struct tdb_used_record) + 32);
+	ok1(tdb_check(tdb, NULL, NULL) == 0);
+	tdb_close(tdb);
+
+	/* Coalescing across two zones. */
+	layout = new_tdb_layout();
+	tdb_layout_add_hashtable(layout, 12, 0);
+	tdb_layout_add_freetable(layout, 2, 16, 12, 0);
+	tdb_layout_add_free(layout, 32768);
+	tdb_layout_add_free(layout, 30000);
+	tdb = tdb_layout_get(layout);
+	tdb->log = tap_log_fn;
+	ok1(free_record_length(tdb, layout->elem[2].base.off) == 32768);
+	ok1(zone_of(tdb, layout->elem[2].base.off) == 0);
+	ok1(free_record_length(tdb, layout->elem[3].base.off) == 30000);
+	ok1(zone_of(tdb, layout->elem[3].base.off) == 1);
+	ok1(tdb_check(tdb, NULL, NULL) == 0);
+
+	/* Figure out which list free entry is. */
+	list = size_to_bucket(tdb, 32768);
+	/* Lock and coalesce. */
+	ok1(tdb_lock_list(tdb, 0, F_WRLCK, TDB_LOCK_WAIT) == 0);
+	ok1(tdb_lock_free_list(tdb, list, TDB_LOCK_WAIT) == 0);
+	ok1(coalesce(tdb, layout->elem[2].base.off, list, 32768) == 1);
+	tdb_unlock_list(tdb, 0, F_WRLCK);
+	ok1(!tdb_has_locks(tdb));
+	ok1(free_record_length(tdb, layout->elem[2].base.off)
+	    == 32768 + sizeof(struct tdb_used_record) + 30000);
+	ok1(tdb_check(tdb, NULL, NULL) == 0);
+	tdb_close(tdb);
+
+	/* Coalescing many across many zones. */
+	layout = new_tdb_layout();
+	tdb_layout_add_hashtable(layout, 12, 0);
+	tdb_layout_add_freetable(layout, 8, 16, 12, 0);
+	total = 0;
+	for (i = 4; i < 16; i++) {
+		tdb_layout_add_free(layout, 1 << i);
+		total += sizeof(struct tdb_used_record) + (1 << i);
+	}
+	total -= sizeof(struct tdb_used_record);
+	tdb = tdb_layout_get(layout);
+	tdb->log = tap_log_fn;
+	ok1(free_record_length(tdb, layout->elem[2].base.off) == 1 << 4);
+	ok1(tdb_check(tdb, NULL, NULL) == 0);
+
+	/* Figure out which list free entry is. */
+	list = size_to_bucket(tdb, 1 << 4);
+	/* Lock and coalesce. */
+	ok1(tdb_lock_list(tdb, 0, F_WRLCK, TDB_LOCK_WAIT) == 0);
+	ok1(tdb_lock_free_list(tdb, list, TDB_LOCK_WAIT) == 0);
+	ok1(coalesce(tdb, layout->elem[2].base.off, list, 1 << 4) == 1);
+	tdb_unlock_list(tdb, 0, F_WRLCK);
+	ok1(!tdb_has_locks(tdb));
+	ok1(free_record_length(tdb, layout->elem[2].base.off) == total);
+	ok1(tdb_check(tdb, NULL, NULL) == 0);
+	tdb_close(tdb);
+
+	ok1(tap_log_messages == 0);
+	return exit_status();
+}

+ 48 - 0
ccan/tdb2/test/run-record-expand.c

@@ -0,0 +1,48 @@
+#include <ccan/tdb2/tdb.c>
+#include <ccan/tdb2/free.c>
+#include <ccan/tdb2/lock.c>
+#include <ccan/tdb2/io.c>
+#include <ccan/tdb2/check.c>
+#include <ccan/tap/tap.h>
+#include "logging.h"
+
+#define MAX_SIZE 10000
+#define SIZE_STEP 131
+
+int main(int argc, char *argv[])
+{
+	unsigned int i;
+	struct tdb_context *tdb;
+	int flags[] = { TDB_INTERNAL, TDB_DEFAULT,
+			TDB_INTERNAL|TDB_CONVERT, TDB_CONVERT };
+	struct tdb_data key = { (unsigned char *)"key", 3 };
+	struct tdb_data data;
+
+	data.dptr = malloc(MAX_SIZE);
+	memset(data.dptr, 0x24, MAX_SIZE);
+
+	plan_tests(sizeof(flags) / sizeof(flags[0])
+		   * (3 + (1 + (MAX_SIZE/SIZE_STEP)) * 2) + 1);
+	for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
+		tdb = tdb_open("/tmp/run-new_database.tdb", flags[i],
+			       O_RDWR|O_CREAT|O_TRUNC, 0600, NULL);
+		tdb->log = tap_log_fn;
+		ok1(tdb);
+		if (!tdb)
+			continue;
+
+		data.dsize = 0;
+		ok1(tdb_store(tdb, key, data, TDB_INSERT) == 0);
+		ok1(tdb_check(tdb, NULL, NULL) == 0);
+		for (data.dsize = 0;
+		     data.dsize < MAX_SIZE;
+		     data.dsize += SIZE_STEP) {
+			memset(data.dptr, data.dsize, data.dsize);
+			ok1(tdb_store(tdb, key, data, TDB_MODIFY) == 0);
+			ok1(tdb_check(tdb, NULL, NULL) == 0);
+		}
+		tdb_close(tdb);
+	}
+	ok1(tap_log_messages == 0);
+	return exit_status();
+}