Browse Source

tdb2: use immobile free buckets, rename tests to show some ordering.

We put the free lists at the beginning of a zone; this means no record
can be larger than a zone, but means they cannot move.  Once we change
hashes to be expanding, they won't move either and the result should be
simpler.
Rusty Russell 15 years ago
parent
commit
1a24a87084

+ 175 - 140
ccan/tdb2/check.c

@@ -55,29 +55,8 @@ static bool check_header(struct tdb_context *tdb)
 			 (long long)tdb->header.v.hash_bits);
 			 (long long)tdb->header.v.hash_bits);
 		return false;
 		return false;
 	}
 	}
-	if (tdb->header.v.zone_bits < INITIAL_ZONE_BITS) {
-		tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
-			 "check: bad zone_bits %llu\n",
-			 (long long)tdb->header.v.zone_bits);
-		return false;
-	}
-	if (tdb->header.v.free_buckets < INITIAL_FREE_BUCKETS) {
-		tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
-			 "check: bad free_buckets %llu\n",
-			 (long long)tdb->header.v.free_buckets);
-		return false;
-	}
-	if ((1ULL << tdb->header.v.zone_bits) * tdb->header.v.num_zones
-	    < tdb->map_size) {
-		tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
-			 "check: %llu zones size %llu don't cover %llu\n",
-			 (long long)tdb->header.v.num_zones,
-			 (long long)(1ULL << tdb->header.v.zone_bits),
-			 (long long)tdb->map_size);
-		return false;
-	}
 
 
-	/* We check hash_off and free_off later. */
+	/* We check hash_off later. */
 
 
 	/* Don't check reserved: they *can* be used later. */
 	/* Don't check reserved: they *can* be used later. */
 	return true;
 	return true;
@@ -161,7 +140,7 @@ static bool check_hash_list(struct tdb_context *tdb,
 		hash = hash_record(tdb, off);
 		hash = hash_record(tdb, off);
 
 
 		/* Top bits must match header. */
 		/* Top bits must match header. */
-		if (hash >> (64 - 11) != rec_hash(&rec)) {
+		if (hash >> (64 - 5) != rec_hash(&rec)) {
 			tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
 			tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
 				 "tdb_check: Bad hash magic at offset %llu"
 				 "tdb_check: Bad hash magic at offset %llu"
 				 " (0x%llx vs 0x%llx)\n",
 				 " (0x%llx vs 0x%llx)\n",
@@ -187,8 +166,8 @@ static bool check_hash_list(struct tdb_context *tdb,
 		num_nonzero++;
 		num_nonzero++;
 	}
 	}
 
 
-	/* free table and hash table are two of the used blocks. */
-	if (num_found != num_used - 2) {
+	/* hash table is one of the used blocks. */
+	if (num_found != num_used - 1) {
 		tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
 		tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
 			 "tdb_check: Not all entries are in hash\n");
 			 "tdb_check: Not all entries are in hash\n");
 		return false;
 		return false;
@@ -200,30 +179,32 @@ static bool check_free(struct tdb_context *tdb,
 		       tdb_off_t off,
 		       tdb_off_t off,
 		       const struct tdb_free_record *frec,
 		       const struct tdb_free_record *frec,
 		       tdb_off_t prev,
 		       tdb_off_t prev,
-		       tdb_off_t zone, unsigned int bucket)
+		       tdb_off_t zone_off, unsigned int bucket)
 {
 {
-	if (frec->magic != TDB_FREE_MAGIC) {
+	if (frec_magic(frec) != TDB_FREE_MAGIC) {
 		tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
 		tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
 			 "tdb_check: offset %llu bad magic 0x%llx\n",
 			 "tdb_check: offset %llu bad magic 0x%llx\n",
-			 (long long)off, (long long)frec->magic);
+			 (long long)off, (long long)frec->magic_and_meta);
 		return false;
 		return false;
 	}
 	}
 	if (tdb->methods->oob(tdb, off
 	if (tdb->methods->oob(tdb, off
 			      + frec->data_len-sizeof(struct tdb_used_record),
 			      + frec->data_len-sizeof(struct tdb_used_record),
 			      true))
 			      true))
 		return false;
 		return false;
-	if (zone_of(tdb, off) != zone) {
+	if (off < zone_off || off >= zone_off + (1ULL<<frec_zone_bits(frec))) {
 		tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
 		tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
-			 "tdb_check: offset %llu in wrong zone %llu vs %llu\n",
+			 "tdb_check: offset %llu outside zone %llu-%llu\n",
 			 (long long)off,
 			 (long long)off,
-			 (long long)zone, (long long)zone_of(tdb, off));
+			 (long long)zone_off,
+			 (long long)zone_off + (1ULL<<frec_zone_bits(frec)));
 		return false;
 		return false;
 	}
 	}
-	if (size_to_bucket(tdb, frec->data_len) != bucket) {
+	if (size_to_bucket(frec_zone_bits(frec), frec->data_len) != bucket) {
 		tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
 		tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
 			 "tdb_check: offset %llu in wrong bucket %u vs %u\n",
 			 "tdb_check: offset %llu in wrong bucket %u vs %u\n",
 			 (long long)off,
 			 (long long)off,
-			 bucket, size_to_bucket(tdb, frec->data_len));
+			 bucket,
+			 size_to_bucket(frec_zone_bits(frec), frec->data_len));
 		return false;
 		return false;
 	}
 	}
 	if (prev != frec->prev) {
 	if (prev != frec->prev) {
@@ -236,103 +217,88 @@ static bool check_free(struct tdb_context *tdb,
 	return true;
 	return true;
 }
 }
 		       
 		       
-static bool check_free_list(struct tdb_context *tdb,
-			    tdb_off_t free[],
-			    size_t num_free)
+static tdb_len_t check_free_list(struct tdb_context *tdb,
+				 tdb_off_t zone_off,
+				 tdb_off_t free[],
+				 size_t num_free,
+				 size_t *num_found)
 {
 {
-	struct tdb_used_record rec;
-	tdb_len_t freelen, i, j;
+	struct free_zone_header zhdr;
 	tdb_off_t h;
 	tdb_off_t h;
-	size_t num_found;
+	unsigned int i;
 
 
-	freelen = sizeof(tdb_off_t) * tdb->header.v.num_zones
-		* (tdb->header.v.free_buckets + 1);
+	if (tdb_read_convert(tdb, zone_off, &zhdr, sizeof(zhdr)) == -1)
+		return TDB_OFF_ERR;
 
 
-	if (tdb_read_convert(tdb, tdb->header.v.free_off - sizeof(rec),
-			     &rec, sizeof(rec)) == -1)
-		return false;
-
-	if (rec_data_length(&rec) != freelen) {
-		tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
-			 "tdb_check: Bad free table length %llu vs %llu\n",
-			 (long long)rec_data_length(&rec),
-			 (long long)freelen);
-		return false;
-	}
-	if (rec_key_length(&rec) != 0) {
-		tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
-			 "tdb_check: Bad free table key length %llu\n",
-			 (long long)rec_key_length(&rec));
-		return false;
-	}
-	if (rec_hash(&rec) != 0) {
-		tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
-			 "tdb_check: Bad free table hash value %llu\n",
-			 (long long)rec_hash(&rec));
-		return false;
-	}
+	for (i = 0; i <= BUCKETS_FOR_ZONE(zhdr.zone_bits); i++) {
+		tdb_off_t off, prev = 0, *p;
+		struct tdb_free_record f;
 
 
-	num_found = 0;
-	h = tdb->header.v.free_off;
-	for (i = 0; i < tdb->header.v.num_zones; i++) {
-		for (j = 0; j <= tdb->header.v.free_buckets;
-		     j++, h += sizeof(tdb_off_t)) {
-			tdb_off_t off, prev = 0, *p;
-			struct tdb_free_record f;
+		h = bucket_off(zone_off, i);
+		for (off = tdb_read_off(tdb, h); off; off = f.next) {
+			if (off == TDB_OFF_ERR)
+				return false;
+			if (tdb_read_convert(tdb, off, &f, sizeof(f)))
+				return false;
+			if (!check_free(tdb, off, &f, prev, zone_off, i))
+				return false;
 
 
-			for (off = tdb_read_off(tdb, h); off; off = f.next) {
-				if (off == TDB_OFF_ERR)
-					return false;
-				if (tdb_read_convert(tdb, off, &f, sizeof(f)))
-					return false;
-				if (!check_free(tdb, off, &f, prev, i, j))
-					return false;
-
-				/* FIXME: Check hash bits */
-				p = asearch(&off, free, num_free, off_cmp);
-				if (!p) {
-					tdb->log(tdb, TDB_DEBUG_ERROR,
-						 tdb->log_priv,
-						 "tdb_check: Invalid offset"
-						 " %llu in free table\n",
-						 (long long)off);
-					return false;
-				}
-				/* Mark it invalid. */
-				*p ^= 1;
-				num_found++;
-				prev = off;
+			/* FIXME: Check hash bits */
+			p = asearch(&off, free, num_free, off_cmp);
+			if (!p) {
+				tdb->log(tdb, TDB_DEBUG_ERROR,
+					 tdb->log_priv,
+					 "tdb_check: Invalid offset"
+					 " %llu in free table\n",
+					 (long long)off);
+				return false;
 			}
 			}
+			/* Mark it invalid. */
+			*p ^= 1;
+			(*num_found)++;
+			prev = off;
 		}
 		}
 	}
 	}
-	if (num_found != num_free) {
-		tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
-			 "tdb_check: Not all entries are in free table\n");
-		return false;
-	}
-	return true;
+	return 1ULL << zhdr.zone_bits;
 }
 }
 
 
-/* FIXME: call check() function. */
-int tdb_check(struct tdb_context *tdb,
-	      int (*check)(TDB_DATA key, TDB_DATA data, void *private_data),
-	      void *private_data)
+static tdb_off_t check_zone(struct tdb_context *tdb, tdb_off_t zone_off,
+			    tdb_off_t **used, size_t *num_used,
+			    tdb_off_t **free, size_t *num_free,
+			    bool *hash_found, unsigned int *max_zone_bits)
 {
 {
-	tdb_off_t *free = NULL, *used = NULL, off;
+	struct free_zone_header zhdr;
+	tdb_off_t off, hdrlen;
 	tdb_len_t len;
 	tdb_len_t len;
-	size_t num_free = 0, num_used = 0;
-	bool hash_found = false, free_found = false;
 
 
-	/* This always ensures the header is uptodate. */
-	if (tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, false) != 0)
-		return -1;
+	if (tdb_read_convert(tdb, zone_off, &zhdr, sizeof(zhdr)) == -1)
+		return TDB_OFF_ERR;
 
 
-	if (!check_header(tdb))
-		goto fail;
+	if (zhdr.zone_bits < INITIAL_ZONE_BITS) {
+		tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
+			 "check: bad zone_bits %llu at zone %llu\n",
+			 (long long)zhdr.zone_bits, (long long)zone_off);
+		return TDB_OFF_ERR;
+	}
 
 
-	/* First we do a linear scan, checking all records. */
-	for (off = sizeof(struct tdb_header);
-	     off < tdb->map_size;
+	/* Zone bits can only increase... */
+	if (zhdr.zone_bits > *max_zone_bits)
+		*max_zone_bits = zhdr.zone_bits;
+	else if (zhdr.zone_bits < *max_zone_bits) {
+		tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
+			 "check: small zone_bits %llu at zone %llu\n",
+			 (long long)zhdr.zone_bits, (long long)zone_off);
+		return TDB_OFF_ERR;
+	}
+
+	/* Zone must be within file! */
+	if (tdb->methods->oob(tdb, zone_off + (1ULL << zhdr.zone_bits), false))
+		return TDB_OFF_ERR;
+
+	hdrlen = sizeof(zhdr)
+		+ (BUCKETS_FOR_ZONE(zhdr.zone_bits) + 1) * sizeof(tdb_off_t);
+	for (off = zone_off + hdrlen;
+	     off < zone_off + (1ULL << zhdr.zone_bits);
 	     off += len) {
 	     off += len) {
 		union {
 		union {
 			struct tdb_used_record u;
 			struct tdb_used_record u;
@@ -340,14 +306,27 @@ int tdb_check(struct tdb_context *tdb,
 		} pad, *p;
 		} pad, *p;
 		p = tdb_get(tdb, off, &pad, sizeof(pad));
 		p = tdb_get(tdb, off, &pad, sizeof(pad));
 		if (!p)
 		if (!p)
-			goto fail;
-		if (p->f.magic == TDB_FREE_MAGIC) {
+			return TDB_OFF_ERR;
+		if (frec_magic(&p->f) == TDB_FREE_MAGIC) {
+			if (frec_zone_bits(&p->f) != zhdr.zone_bits) {
+				tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
+					 "tdb_check: Bad free zone bits %u"
+					 " at offset %llu\n",
+					 frec_zone_bits(&p->f),
+					 (long long)off);
+				return TDB_OFF_ERR;
+			}
 			/* This record is free! */
 			/* This record is free! */
-			if (!append(&free, &num_free, off))
-				goto fail;
+			if (!append(free, num_free, off))
+				return TDB_OFF_ERR;
 			len = sizeof(p->u) + p->f.data_len;
 			len = sizeof(p->u) + p->f.data_len;
-			if (tdb->methods->oob(tdb, off + len, false))
-				goto fail;
+			if (off + len > zone_off + (1ULL << zhdr.zone_bits)) {
+				tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
+					 "tdb_check: free overlength %llu"
+					 " at offset %llu\n",
+					 (long long)len, (long long)off);
+				return TDB_OFF_ERR;
+			}
 		} else {
 		} else {
 			uint64_t klen, dlen, extra;
 			uint64_t klen, dlen, extra;
 
 
@@ -358,48 +337,94 @@ int tdb_check(struct tdb_context *tdb,
 					 " at offset %llu\n",
 					 " at offset %llu\n",
 					 (long long)rec_magic(&p->u),
 					 (long long)rec_magic(&p->u),
 					 (long long)off);
 					 (long long)off);
-				goto fail;
+				return TDB_OFF_ERR;
+			}
+
+			if (rec_zone_bits(&p->u) != zhdr.zone_bits) {
+				tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
+					 "tdb_check: Bad zone bits %u"
+					 " at offset %llu\n",
+					 rec_zone_bits(&p->u),
+					 (long long)off);
+				return TDB_OFF_ERR;
 			}
 			}
 			
 			
-			if (!append(&used, &num_used, off))
-				goto fail;
+			if (!append(used, num_used, off))
+				return TDB_OFF_ERR;
 
 
 			klen = rec_key_length(&p->u);
 			klen = rec_key_length(&p->u);
 			dlen = rec_data_length(&p->u);
 			dlen = rec_data_length(&p->u);
 			extra = rec_extra_padding(&p->u);
 			extra = rec_extra_padding(&p->u);
 
 
 			len = sizeof(p->u) + klen + dlen + extra;
 			len = sizeof(p->u) + klen + dlen + extra;
-			if (tdb->methods->oob(tdb, off + len, false))
-				goto fail;
+			if (off + len > zone_off + (1ULL << zhdr.zone_bits)) {
+				tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
+					 "tdb_check: used overlength %llu"
+					 " at offset %llu\n",
+					 (long long)len, (long long)off);
+				return TDB_OFF_ERR;
+			}
 
 
 			if (len < sizeof(p->f)) {
 			if (len < sizeof(p->f)) {
 				tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
 				tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
 					 "tdb_check: too short record %llu at"
 					 "tdb_check: too short record %llu at"
 					 " %llu\n",
 					 " %llu\n",
 					 (long long)len, (long long)off);
 					 (long long)len, (long long)off);
-				goto fail;
+				return TDB_OFF_ERR;
 			}
 			}
 
 
-			if (off + sizeof(p->u) == tdb->header.v.hash_off) {
-				hash_found = true;
-			} else if (off + sizeof(p->u)
-				   == tdb->header.v.free_off) {
-				free_found = true;
-			}
+			if (off + sizeof(p->u) == tdb->header.v.hash_off)
+				*hash_found = true;
 		}
 		}
 	}
 	}
+	return 1ULL << zhdr.zone_bits;
+}
 
 
-	if (!hash_found) {
+/* FIXME: call check() function. */
+int tdb_check(struct tdb_context *tdb,
+	      int (*check)(TDB_DATA key, TDB_DATA data, void *private_data),
+	      void *private_data)
+{
+	tdb_off_t *free = NULL, *used = NULL, off;
+	tdb_len_t len;
+	size_t num_free = 0, num_used = 0, num_found = 0;
+	bool hash_found = false;
+	unsigned max_zone_bits = INITIAL_ZONE_BITS;
+	uint8_t tailer;
+
+	/* FIXME: need more locking? against expansion? */
+	/* This always ensures the header is uptodate. */
+	if (tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, false) != 0)
+		return -1;
+
+	if (!check_header(tdb))
+		goto fail;
+
+	/* First we do a linear scan, checking all records. */
+	for (off = sizeof(struct tdb_header);
+	     off < tdb->map_size - 1;
+	     off += len) {
+		len = check_zone(tdb, off, &used, &num_used, &free, &num_free,
+				 &hash_found, &max_zone_bits);
+		if (len == TDB_OFF_ERR)
+			goto fail;
+	}
+
+	/* Check tailer. */
+	if (tdb->methods->read(tdb, tdb->map_size - 1, &tailer, 1) == -1)
+		goto fail;
+	if (tailer != max_zone_bits) {
+		tdb->ecode = TDB_ERR_CORRUPT;
 		tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
 		tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
-			 "tdb_check: hash table not found at %llu\n",
-			 (long long)tdb->header.v.hash_off);
+			 "tdb_check: Bad tailer value %u vs %u\n", tailer,
+			 max_zone_bits);
 		goto fail;
 		goto fail;
 	}
 	}
 
 
-	if (!free_found) {
+	if (!hash_found) {
 		tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
 		tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
-			 "tdb_check: free table not found at %llu\n",
-			 (long long)tdb->header.v.free_off);
+			 "tdb_check: hash table not found at %llu\n",
+			 (long long)tdb->header.v.hash_off);
 		goto fail;
 		goto fail;
 	}
 	}
 
 
@@ -407,8 +432,18 @@ int tdb_check(struct tdb_context *tdb,
 	if (!check_hash_list(tdb, used, num_used))
 	if (!check_hash_list(tdb, used, num_used))
 		goto fail;
 		goto fail;
 
 
-	if (!check_free_list(tdb, free, num_free))
-		goto fail;
+	for (off = sizeof(struct tdb_header);
+	     off < tdb->map_size - 1;
+	     off += len) {
+		len = check_free_list(tdb, off, free, num_free, &num_found);
+		if (len == TDB_OFF_ERR)
+			goto fail;
+	}
+	if (num_found != num_free) {
+		tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
+			 "tdb_check: Not all entries are in free table\n");
+		return false;
+	}
 
 
 	tdb_allrecord_unlock(tdb, F_RDLCK);
 	tdb_allrecord_unlock(tdb, F_RDLCK);
 	return 0;
 	return 0;

+ 247 - 336
ccan/tdb2/free.c

@@ -25,39 +25,6 @@
 #define MIN_DATA_LEN	\
 #define MIN_DATA_LEN	\
 	(sizeof(struct tdb_free_record) - sizeof(struct tdb_used_record))
 	(sizeof(struct tdb_free_record) - sizeof(struct tdb_used_record))
 
 
-/* We have a series of free lists, each one covering a "zone" of the file.
- *
- * For each zone we have a series of per-size buckets, and a final bucket for
- * "too big".
- *
- * It's possible to move the free_list_head, but *only* under the allrecord
- * lock. */
-static tdb_off_t free_list_off(struct tdb_context *tdb, unsigned int list)
-{
-	return tdb->header.v.free_off + list * sizeof(tdb_off_t);
-}
-
-/* We're a library: playing with srandom() is unfriendly.  srandom_r
- * probably lacks portability.  We don't need very random here. */
-static unsigned int quick_random(struct tdb_context *tdb)
-{
-	return getpid() + time(NULL) + (unsigned long)tdb;
-}
-
-/* Start by using a random zone to spread the load. */
-void tdb_zone_init(struct tdb_context *tdb)
-{
-	/*
-	 * We read num_zones without a proper lock, so we could have
-	 * gotten a partial read.  Since zone_bits is 1 byte long, we
-	 * can trust that; even if it's increased, the number of zones
-	 * cannot have decreased.  And using the map size means we
-	 * will not start with a zone which hasn't been filled yet.
-	 */
-	tdb->last_zone = quick_random(tdb)
-		% ((tdb->map_size >> tdb->header.v.zone_bits) + 1);
-}
-
 static unsigned fls64(uint64_t val)
 static unsigned fls64(uint64_t val)
 {
 {
 #if HAVE_BUILTIN_CLZL
 #if HAVE_BUILTIN_CLZL
@@ -101,7 +68,7 @@ static unsigned fls64(uint64_t val)
 }
 }
 
 
 /* In which bucket would we find a particular record size? (ignoring header) */
 /* In which bucket would we find a particular record size? (ignoring header) */
-unsigned int size_to_bucket(struct tdb_context *tdb, tdb_len_t data_len)
+unsigned int size_to_bucket(unsigned int zone_bits, tdb_len_t data_len)
 {
 {
 	unsigned int bucket;
 	unsigned int bucket;
 
 
@@ -117,39 +84,104 @@ unsigned int size_to_bucket(struct tdb_context *tdb, tdb_len_t data_len)
 		bucket = fls64(data_len - MIN_DATA_LEN) + 2;
 		bucket = fls64(data_len - MIN_DATA_LEN) + 2;
 	}
 	}
 
 
-	if (unlikely(bucket > tdb->header.v.free_buckets))
-		bucket = tdb->header.v.free_buckets;
+	if (unlikely(bucket > BUCKETS_FOR_ZONE(zone_bits)))
+		bucket = BUCKETS_FOR_ZONE(zone_bits);
 	return bucket;
 	return bucket;
 }
 }
 
 
-/* What zone does a block belong in? */ 
-tdb_off_t zone_of(struct tdb_context *tdb, tdb_off_t off)
+/* Subtract 1-byte tailer and header.  Then round up to next power of 2. */
+static unsigned max_zone_bits(struct tdb_context *tdb)
 {
 {
-	assert(tdb->header_uptodate);
+	return fls64(tdb->map_size-1-sizeof(struct tdb_header)-1) + 1;
+}
+
+/* Start by using a random zone to spread the load: returns the offset. */
+static uint64_t random_zone(struct tdb_context *tdb)
+{
+	struct free_zone_header zhdr;
+	tdb_off_t off = sizeof(struct tdb_header);
+	tdb_len_t half_bits;
+	uint64_t randbits = 0;
+	unsigned int i;
+
+	for (i = 0; i < 64; i += fls64(RAND_MAX)) 
+		randbits ^= ((uint64_t)random()) << i;
+
+	/* FIXME: Does this work?  Test! */
+	half_bits = max_zone_bits(tdb) - 1;
+	do {
+		/* Pick left or right side (not outside file) */
+		if ((randbits & 1)
+		    && !tdb->methods->oob(tdb, off + (1ULL << half_bits)
+					  + sizeof(zhdr), true)) {
+			off += 1ULL << half_bits;
+		}
+		randbits >>= 1;
+
+		if (tdb_read_convert(tdb, off, &zhdr, sizeof(zhdr)) == -1) 
+			return TDB_OFF_ERR;
+
+		if (zhdr.zone_bits == half_bits)
+			return off;
+
+		half_bits--;
+	} while (half_bits >= INITIAL_ZONE_BITS);
+
+	tdb->ecode = TDB_ERR_CORRUPT;
+	tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
+		 "random_zone: zone at %llu smaller than %u bits?",
+		 (long long)off, INITIAL_ZONE_BITS);
+	return TDB_OFF_ERR;
+}
+
+int tdb_zone_init(struct tdb_context *tdb)
+{
+	tdb->zone_off = random_zone(tdb);
+	if (tdb->zone_off == TDB_OFF_ERR)
+		return -1;
+	if (tdb_read_convert(tdb, tdb->zone_off,
+			     &tdb->zhdr, sizeof(tdb->zhdr)) == -1) 
+		return -1;
+	return 0;
+}
+
+/* Where's the header, given a zone size of 1 << zone_bits? */
+static tdb_off_t zone_off(tdb_off_t off, unsigned int zone_bits)
+{
+	off -= sizeof(struct tdb_header);
+	return (off & ~((1ULL << zone_bits) - 1)) + sizeof(struct tdb_header);
+}
 
 
-	return off >> tdb->header.v.zone_bits;
+/* Offset of a given bucket. */
+/* FIXME: bucket can be "unsigned" everywhere, or even uint8/16. */
+tdb_off_t bucket_off(tdb_off_t zone_off, tdb_off_t bucket)
+{
+	return zone_off
+		+ sizeof(struct free_zone_header)
+		+ bucket * sizeof(tdb_off_t);
 }
 }
 
 
 /* Returns free_buckets + 1, or list number to search. */
 /* Returns free_buckets + 1, or list number to search. */
 static tdb_off_t find_free_head(struct tdb_context *tdb, tdb_off_t bucket)
 static tdb_off_t find_free_head(struct tdb_context *tdb, tdb_off_t bucket)
 {
 {
-	tdb_off_t first, off;
+	tdb_off_t b;
 
 
 	/* Speculatively search for a non-zero bucket. */
 	/* Speculatively search for a non-zero bucket. */
-	first = tdb->last_zone * (tdb->header.v.free_buckets+1) + bucket;
-	off = tdb_find_nonzero_off(tdb, free_list_off(tdb, first),
-				   tdb->header.v.free_buckets + 1 - bucket);
-	return bucket + off;
+	b = tdb_find_nonzero_off(tdb, bucket_off(tdb->zone_off, bucket),
+				 BUCKETS_FOR_ZONE(tdb->zhdr.zone_bits) + 1
+				 - bucket);
+	return bucket + b;
 }
 }
 
 
+/* Remove from free bucket. */
 static int remove_from_list(struct tdb_context *tdb,
 static int remove_from_list(struct tdb_context *tdb,
-			    tdb_off_t list, struct tdb_free_record *r)
+			    tdb_off_t b_off, struct tdb_free_record *r)
 {
 {
 	tdb_off_t off;
 	tdb_off_t off;
 
 
 	/* Front of list? */
 	/* Front of list? */
 	if (r->prev == 0) {
 	if (r->prev == 0) {
-		off = free_list_off(tdb, list);
+		off = b_off;
 	} else {
 	} else {
 		off = r->prev + offsetof(struct tdb_free_record, next);
 		off = r->prev + offsetof(struct tdb_free_record, next);
 	}
 	}
@@ -168,15 +200,15 @@ static int remove_from_list(struct tdb_context *tdb,
 	return 0;
 	return 0;
 }
 }
 
 
-/* Enqueue in this free list. */
+/* Enqueue in this free bucket. */
 static int enqueue_in_free(struct tdb_context *tdb,
 static int enqueue_in_free(struct tdb_context *tdb,
-			   tdb_off_t list,
+			   tdb_off_t b_off,
 			   tdb_off_t off,
 			   tdb_off_t off,
 			   struct tdb_free_record *new)
 			   struct tdb_free_record *new)
 {
 {
 	new->prev = 0;
 	new->prev = 0;
 	/* new->next = head. */
 	/* new->next = head. */
-	new->next = tdb_read_off(tdb, free_list_off(tdb, list));
+	new->next = tdb_read_off(tdb, b_off);
 	if (new->next == TDB_OFF_ERR)
 	if (new->next == TDB_OFF_ERR)
 		return -1;
 		return -1;
 
 
@@ -188,39 +220,40 @@ static int enqueue_in_free(struct tdb_context *tdb,
 			return -1;
 			return -1;
 	}
 	}
 	/* head = new */
 	/* head = new */
-	if (tdb_write_off(tdb, free_list_off(tdb, list), off) != 0)
+	if (tdb_write_off(tdb, b_off, off) != 0)
 		return -1;
 		return -1;
-	
+
 	return tdb_write_convert(tdb, off, new, sizeof(*new));
 	return tdb_write_convert(tdb, off, new, sizeof(*new));
 }
 }
 
 
-/* List isn't locked. */
+/* List need not be locked. */
 int add_free_record(struct tdb_context *tdb,
 int add_free_record(struct tdb_context *tdb,
+		    unsigned int zone_bits,
 		    tdb_off_t off, tdb_len_t len_with_header)
 		    tdb_off_t off, tdb_len_t len_with_header)
 {
 {
 	struct tdb_free_record new;
 	struct tdb_free_record new;
-	tdb_off_t list;
+	tdb_off_t b_off;
 	int ret;
 	int ret;
 
 
 	assert(len_with_header >= sizeof(new));
 	assert(len_with_header >= sizeof(new));
+	assert(zone_bits < (1 << 6));
 
 
-	new.magic = TDB_FREE_MAGIC;
+	new.magic_and_meta = TDB_FREE_MAGIC | zone_bits;
 	new.data_len = len_with_header - sizeof(struct tdb_used_record);
 	new.data_len = len_with_header - sizeof(struct tdb_used_record);
 
 
-	tdb->last_zone = zone_of(tdb, off);
-	list = tdb->last_zone * (tdb->header.v.free_buckets+1)
-		+ size_to_bucket(tdb, new.data_len);
-
-	if (tdb_lock_free_list(tdb, list, TDB_LOCK_WAIT) != 0)
+	b_off = bucket_off(zone_off(off, zone_bits),
+			   size_to_bucket(zone_bits, new.data_len));
+	if (tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) != 0)
 		return -1;
 		return -1;
 
 
-	ret = enqueue_in_free(tdb, list, off, &new);
-	tdb_unlock_free_list(tdb, list);
+	ret = enqueue_in_free(tdb, b_off, off, &new);
+	tdb_unlock_free_bucket(tdb, b_off);
 	return ret;
 	return ret;
 }
 }
 
 
 /* If we have enough left over to be useful, split that off. */
 /* If we have enough left over to be useful, split that off. */
 static int to_used_record(struct tdb_context *tdb,
 static int to_used_record(struct tdb_context *tdb,
+			  unsigned int zone_bits,
 			  tdb_off_t off,
 			  tdb_off_t off,
 			  tdb_len_t needed,
 			  tdb_len_t needed,
 			  tdb_len_t total_len,
 			  tdb_len_t total_len,
@@ -236,56 +269,59 @@ static int to_used_record(struct tdb_context *tdb,
 	*actual = total_len - leftover;
 	*actual = total_len - leftover;
 
 
 	if (leftover) {
 	if (leftover) {
-		if (add_free_record(tdb, off + sizeof(used) + *actual,
+		if (add_free_record(tdb, zone_bits,
+				    off + sizeof(used) + *actual,
 				    total_len - needed))
 				    total_len - needed))
 			return -1;
 			return -1;
 	}
 	}
 	return 0;
 	return 0;
 }
 }
 
 
-/* Note: we unlock the current list if we coalesce or fail. */
-static int coalesce(struct tdb_context *tdb, tdb_off_t off,
-		    tdb_off_t list, tdb_len_t data_len)
+/* Note: we unlock the current bucket if we coalesce or fail. */
+static int coalesce(struct tdb_context *tdb,
+		    tdb_off_t zone_off, unsigned zone_bits,
+		    tdb_off_t off, tdb_off_t b_off, tdb_len_t data_len)
 {
 {
 	struct tdb_free_record pad, *r;
 	struct tdb_free_record pad, *r;
 	tdb_off_t end = off + sizeof(struct tdb_used_record) + data_len;
 	tdb_off_t end = off + sizeof(struct tdb_used_record) + data_len;
 
 
-	while (!tdb->methods->oob(tdb, end + sizeof(*r), 1)) {
-		tdb_off_t nlist;
+	while (end < (zone_off + (1ULL << zone_bits))) {
+		tdb_off_t nb_off;
 
 
+		/* FIXME: do tdb_get here and below really win? */
 		r = tdb_get(tdb, end, &pad, sizeof(pad));
 		r = tdb_get(tdb, end, &pad, sizeof(pad));
 		if (!r)
 		if (!r)
 			goto err;
 			goto err;
 
 
-		if (r->magic != TDB_FREE_MAGIC)
+		if (frec_magic(r) != TDB_FREE_MAGIC)
 			break;
 			break;
 
 
-		nlist = zone_of(tdb, end) * (tdb->header.v.free_buckets+1)
-			+ size_to_bucket(tdb, r->data_len);
+		nb_off = bucket_off(zone_off,
+				    size_to_bucket(zone_bits, r->data_len));
 
 
 		/* We may be violating lock order here, so best effort. */
 		/* We may be violating lock order here, so best effort. */
-		if (tdb_lock_free_list(tdb, nlist, TDB_LOCK_NOWAIT) == -1)
+		if (tdb_lock_free_bucket(tdb, nb_off, TDB_LOCK_NOWAIT) == -1)
 			break;
 			break;
 
 
 		/* Now we have lock, re-check. */
 		/* Now we have lock, re-check. */
 		r = tdb_get(tdb, end, &pad, sizeof(pad));
 		r = tdb_get(tdb, end, &pad, sizeof(pad));
 		if (!r) {
 		if (!r) {
-			tdb_unlock_free_list(tdb, nlist);
+			tdb_unlock_free_bucket(tdb, nb_off);
 			goto err;
 			goto err;
 		}
 		}
 
 
-		if (unlikely(r->magic != TDB_FREE_MAGIC)) {
-			tdb_unlock_free_list(tdb, nlist);
+		if (unlikely(frec_magic(r) != TDB_FREE_MAGIC)) {
+			tdb_unlock_free_bucket(tdb, nb_off);
 			break;
 			break;
 		}
 		}
 
 
-		if (remove_from_list(tdb, nlist, r) == -1) {
-			tdb_unlock_free_list(tdb, nlist);
+		if (remove_from_list(tdb, nb_off, r) == -1) {
+			tdb_unlock_free_bucket(tdb, nb_off);
 			goto err;
 			goto err;
 		}
 		}
 
 
 		end += sizeof(struct tdb_used_record) + r->data_len;
 		end += sizeof(struct tdb_used_record) + r->data_len;
-		tdb_unlock_free_list(tdb, nlist);
+		tdb_unlock_free_bucket(tdb, nb_off);
 	}
 	}
 
 
 	/* Didn't find any adjacent free? */
 	/* Didn't find any adjacent free? */
@@ -305,59 +341,63 @@ static int coalesce(struct tdb_context *tdb, tdb_off_t off,
 		goto err;
 		goto err;
 	}
 	}
 
 
-	if (remove_from_list(tdb, list, r) == -1)
+	if (remove_from_list(tdb, b_off, r) == -1)
 		goto err;
 		goto err;
 
 
 	/* We have to drop this to avoid deadlocks. */
 	/* We have to drop this to avoid deadlocks. */
-	tdb_unlock_free_list(tdb, list);
+	tdb_unlock_free_bucket(tdb, b_off);
 
 
-	if (add_free_record(tdb, off, end - off) == -1)
+	if (add_free_record(tdb, zone_bits, off, end - off) == -1)
 		return -1;
 		return -1;
 	return 1;
 	return 1;
 
 
 err:
 err:
-	/* To unify error paths, we *always* unlock list. */
-	tdb_unlock_free_list(tdb, list);
+	/* To unify error paths, we *always* unlock bucket on error. */
+	tdb_unlock_free_bucket(tdb, b_off);
 	return -1;
 	return -1;
 }
 }
 
 
 /* We need size bytes to put our key and data in. */
 /* We need size bytes to put our key and data in. */
 static tdb_off_t lock_and_alloc(struct tdb_context *tdb,
 static tdb_off_t lock_and_alloc(struct tdb_context *tdb,
-				tdb_off_t bucket, size_t size,
+				tdb_off_t zone_off,
+				unsigned zone_bits,
+				tdb_off_t bucket,
+				size_t size,
 				tdb_len_t *actual)
 				tdb_len_t *actual)
 {
 {
-	tdb_off_t list;
-	tdb_off_t off, best_off;
+	tdb_off_t off, b_off,best_off;
 	struct tdb_free_record pad, best = { 0 }, *r;
 	struct tdb_free_record pad, best = { 0 }, *r;
 	double multiplier;
 	double multiplier;
 
 
 again:
 again:
-	list = tdb->last_zone * (tdb->header.v.free_buckets+1) + bucket;
+	b_off = bucket_off(zone_off, bucket);
 
 
-	/* Lock this list. */
-	if (tdb_lock_free_list(tdb, list, TDB_LOCK_WAIT) == -1) {
+	/* Lock this bucket. */
+	if (tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == -1) {
 		return TDB_OFF_ERR;
 		return TDB_OFF_ERR;
 	}
 	}
 
 
 	best.data_len = -1ULL;
 	best.data_len = -1ULL;
 	best_off = 0;
 	best_off = 0;
+	/* FIXME: Start with larger multiplier if we're growing. */
 	multiplier = 1.0;
 	multiplier = 1.0;
 
 
 	/* Walk the list to see if any are large enough, getting less fussy
 	/* Walk the list to see if any are large enough, getting less fussy
 	 * as we go. */
 	 * as we go. */
-	off = tdb_read_off(tdb, free_list_off(tdb, list));
+	off = tdb_read_off(tdb, b_off);
 	if (unlikely(off == TDB_OFF_ERR))
 	if (unlikely(off == TDB_OFF_ERR))
 		goto unlock_err;
 		goto unlock_err;
 
 
 	while (off) {
 	while (off) {
+		/* FIXME: Does tdb_get win anything here? */
 		r = tdb_get(tdb, off, &pad, sizeof(*r));
 		r = tdb_get(tdb, off, &pad, sizeof(*r));
 		if (!r)
 		if (!r)
 			goto unlock_err;
 			goto unlock_err;
 
 
-		if (r->magic != TDB_FREE_MAGIC) {
+		if (frec_magic(r) != TDB_FREE_MAGIC) {
 			tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
 			tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
 				 "lock_and_alloc: %llu non-free 0x%llx\n",
 				 "lock_and_alloc: %llu non-free 0x%llx\n",
-				 (long long)off, (long long)r->magic);
+				 (long long)off, (long long)r->magic_and_meta);
 			goto unlock_err;
 			goto unlock_err;
 		}
 		}
 
 
@@ -372,7 +412,8 @@ again:
 		multiplier *= 1.01;
 		multiplier *= 1.01;
 
 
 		/* Since we're going slow anyway, try coalescing here. */
 		/* Since we're going slow anyway, try coalescing here. */
-		switch (coalesce(tdb, off, list, r->data_len)) {
+		switch (coalesce(tdb, zone_off, zone_bits, off, b_off,
+				 r->data_len)) {
 		case -1:
 		case -1:
 			/* This has already unlocked on error. */
 			/* This has already unlocked on error. */
 			return -1;
 			return -1;
@@ -387,86 +428,76 @@ again:
 	if (best_off) {
 	if (best_off) {
 	use_best:
 	use_best:
 		/* We're happy with this size: take it. */
 		/* We're happy with this size: take it. */
-		if (remove_from_list(tdb, list, &best) != 0)
+		if (remove_from_list(tdb, b_off, &best) != 0)
 			goto unlock_err;
 			goto unlock_err;
-		tdb_unlock_free_list(tdb, list);
+		tdb_unlock_free_bucket(tdb, b_off);
 
 
-		if (to_used_record(tdb, best_off, size, best.data_len,
-				   actual)) {
+		if (to_used_record(tdb, zone_bits, best_off, size,
+				   best.data_len, actual)) {
 			return -1;
 			return -1;
 		}
 		}
 		return best_off;
 		return best_off;
 	}
 	}
 
 
-	tdb_unlock_free_list(tdb, list);
+	tdb_unlock_free_bucket(tdb, b_off);
 	return 0;
 	return 0;
 
 
 unlock_err:
 unlock_err:
-	tdb_unlock_free_list(tdb, list);
+	tdb_unlock_free_bucket(tdb, b_off);
 	return TDB_OFF_ERR;
 	return TDB_OFF_ERR;
 }
 }
 
 
-/* We want a really big chunk.  Look through every zone's oversize bucket */
-static tdb_off_t huge_alloc(struct tdb_context *tdb, size_t size,
-			    tdb_len_t *actual)
+static bool next_zone(struct tdb_context *tdb)
 {
 {
-	tdb_off_t i, off;
+	tdb_off_t next = tdb->zone_off + (1ULL << tdb->zhdr.zone_bits);
 
 
-	for (i = 0; i < tdb->header.v.num_zones; i++) {
-		/* Try getting one from list. */
-		off = lock_and_alloc(tdb, tdb->header.v.free_buckets,
-				     size, actual);
-		if (off == TDB_OFF_ERR)
-			return TDB_OFF_ERR;
-		if (off != 0)
-			return off;
-		/* FIXME: Coalesce! */
-	}
-	return 0;
+	/* We must have a header. */
+	if (tdb->methods->oob(tdb, next + sizeof(tdb->zhdr), true))
+		return false;
+
+	tdb->zone_off = next;
+	return tdb_read_convert(tdb, next, &tdb->zhdr, sizeof(tdb->zhdr)) == 0;
 }
 }
 
 
+/* Offset returned is within current zone (which it may alter). */
 static tdb_off_t get_free(struct tdb_context *tdb, size_t size,
 static tdb_off_t get_free(struct tdb_context *tdb, size_t size,
 			  tdb_len_t *actual)
 			  tdb_len_t *actual)
 {
 {
-	tdb_off_t off, bucket;
-	unsigned int num_empty, step = 0;
-
-	bucket = size_to_bucket(tdb, size);
+	tdb_off_t start_zone = tdb->zone_off, off;
+	bool wrapped = false;
 
 
-	/* If we're after something bigger than a single zone, handle
-	 * specially. */
-	if (unlikely(sizeof(struct tdb_used_record) + size
-		     >= (1ULL << tdb->header.v.zone_bits))) {
-		return huge_alloc(tdb, size, actual);
-	}
-
-	/* Number of zones we search is proportional to the log of them. */
-	for (num_empty = 0; num_empty < fls64(tdb->header.v.num_zones);
-	     num_empty++) {
+	while (!wrapped || tdb->zone_off != start_zone) {
 		tdb_off_t b;
 		tdb_off_t b;
 
 
+		/* Shortcut for really huge allocations... */
+		if ((size >> tdb->zhdr.zone_bits) != 0)
+			continue;
+
 		/* Start at exact size bucket, and search up... */
 		/* Start at exact size bucket, and search up... */
-		for (b = bucket; b <= tdb->header.v.free_buckets; b++) {
-			b = find_free_head(tdb, b);
-
-			/* Non-empty list?  Try getting block. */
-			if (b <= tdb->header.v.free_buckets) {
-				/* Try getting one from list. */
-				off = lock_and_alloc(tdb, b, size, actual);
-				if (off == TDB_OFF_ERR)
-					return TDB_OFF_ERR;
-				if (off != 0)
-					return off;
-				/* Didn't work.  Try next bucket. */
-			}
+		b = size_to_bucket(tdb->zhdr.zone_bits, size);
+		for (b = find_free_head(tdb, b);
+		     b <= BUCKETS_FOR_ZONE(tdb->zhdr.zone_bits);
+		     b += find_free_head(tdb, b + 1)) {
+			/* Try getting one from list. */
+			off = lock_and_alloc(tdb, tdb->zone_off,
+					     tdb->zhdr.zone_bits,
+					     b, size, actual);
+			if (off == TDB_OFF_ERR)
+				return TDB_OFF_ERR;
+			if (off != 0)
+				return off;
+			/* Didn't work.  Try next bucket. */
 		}
 		}
 
 
-		/* Try another zone, at pseudo random.  Avoid duplicates by
-		   using an odd step. */
-		if (step == 0)
-			step = ((quick_random(tdb)) % 65536) * 2 + 1;
-		tdb->last_zone = (tdb->last_zone + step)
-			% tdb->header.v.num_zones;
+		/* Didn't work, try next zone, if it exists. */
+		if (!next_zone(tdb)) {
+			wrapped = true;
+			tdb->zone_off = sizeof(struct tdb_header);
+			if (tdb_read_convert(tdb, tdb->zone_off,
+					     &tdb->zhdr, sizeof(tdb->zhdr))) {
+				return TDB_OFF_ERR;
+			}
+		}
 	}
 	}
 	return 0;
 	return 0;
 }
 }
@@ -474,14 +505,16 @@ static tdb_off_t get_free(struct tdb_context *tdb, size_t size,
 int set_header(struct tdb_context *tdb,
 int set_header(struct tdb_context *tdb,
 	       struct tdb_used_record *rec,
 	       struct tdb_used_record *rec,
 	       uint64_t keylen, uint64_t datalen,
 	       uint64_t keylen, uint64_t datalen,
-	       uint64_t actuallen, uint64_t hash)
+	       uint64_t actuallen, uint64_t hash,
+	       unsigned int zone_bits)
 {
 {
 	uint64_t keybits = (fls64(keylen) + 1) / 2;
 	uint64_t keybits = (fls64(keylen) + 1) / 2;
 
 
 	/* Use top bits of hash, so it's independent of hash table size. */
 	/* Use top bits of hash, so it's independent of hash table size. */
 	rec->magic_and_meta
 	rec->magic_and_meta
-		= (actuallen - (keylen + datalen))
-		| ((hash >> 53) << 32)
+		= zone_bits
+		| ((hash >> 59) << 6)
+		| ((actuallen - (keylen + datalen)) << 11)
 		| (keybits << 43)
 		| (keybits << 43)
 		| (TDB_MAGIC << 48);
 		| (TDB_MAGIC << 48);
 	rec->key_and_data_len = (keylen | (datalen << (keybits*2)));
 	rec->key_and_data_len = (keylen | (datalen << (keybits*2)));
@@ -533,8 +566,11 @@ tdb_off_t alloc(struct tdb_context *tdb, size_t keylen, size_t datalen,
 		return off;
 		return off;
 
 
 	/* Some supergiant values can't be encoded. */
 	/* Some supergiant values can't be encoded. */
-	if (set_header(tdb, &rec, keylen, datalen, actual, hash) != 0) {
-		add_free_record(tdb, off, sizeof(rec) + actual);
+	/* FIXME: Check before, and limit actual in get_free. */
+	if (set_header(tdb, &rec, keylen, datalen, actual, hash,
+		       tdb->zhdr.zone_bits) != 0) {
+		add_free_record(tdb, tdb->zhdr.zone_bits, off,
+				sizeof(rec) + actual);
 		return TDB_OFF_ERR;
 		return TDB_OFF_ERR;
 	}
 	}
 
 
@@ -544,223 +580,98 @@ tdb_off_t alloc(struct tdb_context *tdb, size_t keylen, size_t datalen,
 	return off;
 	return off;
 }
 }
 
 
-static bool larger_buckets_might_help(struct tdb_context *tdb)
-{
-	/* If our buckets are already covering 1/8 of a zone, don't
-	 * bother (note: might become an 1/16 of a zone if we double
-	 * zone size). */
-	tdb_len_t size = (1ULL << tdb->header.v.zone_bits) / 8;
-
-	if (size >= MIN_DATA_LEN
-	    && size_to_bucket(tdb, size) < tdb->header.v.free_buckets) {
-		return false;
-	}
-
-	/* FIXME: Put stats in tdb_context or examine db itself! */
-	/* It's fairly cheap to do as we expand database. */
-	return true;
-}
-
 static bool zones_happy(struct tdb_context *tdb)
 static bool zones_happy(struct tdb_context *tdb)
 {
 {
 	/* FIXME: look at distribution of zones. */
 	/* FIXME: look at distribution of zones. */
 	return true;
 	return true;
 }
 }
 
 
-/* Returns how much extra room we get, or TDB_OFF_ERR. */
-static tdb_len_t expand_to_fill_zones(struct tdb_context *tdb)
+/* Assume we want buckets up to the comfort factor. */
+static tdb_len_t overhead(unsigned int zone_bits)
 {
 {
-	tdb_len_t add;
-
-	/* We can enlarge zones without enlarging file to match. */
-	add = (tdb->header.v.num_zones<<tdb->header.v.zone_bits)
-		- tdb->map_size;
-	if (add <= sizeof(struct tdb_free_record))
-		return 0;
-
-	/* Updates tdb->map_size. */
-	if (tdb->methods->expand_file(tdb, add) == -1)
-		return TDB_OFF_ERR;
-	if (add_free_record(tdb, tdb->map_size - add, add) == -1)
-		return TDB_OFF_ERR;
-	return add;
+	return sizeof(struct free_zone_header)
+		+ (BUCKETS_FOR_ZONE(zone_bits) + 1) * sizeof(tdb_off_t);
 }
 }
 
 
-static int update_zones(struct tdb_context *tdb,
-			uint64_t new_num_zones,
-			uint64_t new_zone_bits,
-			uint64_t new_num_buckets,
-			tdb_len_t add)
-{
-	tdb_len_t freebucket_size;
-	const tdb_off_t *oldf;
-	tdb_off_t i, off, old_num_total, old_free_off;
-	struct tdb_used_record fhdr;
-
-	/* Updates tdb->map_size. */
-	if (tdb->methods->expand_file(tdb, add) == -1)
-		return -1;
-
-	/* Use first part as new free bucket array. */
-	off = tdb->map_size - add;
-	freebucket_size = new_num_zones
-		* (new_num_buckets + 1) * sizeof(tdb_off_t);
-
-	/* Write header. */
-	if (set_header(tdb, &fhdr, 0, freebucket_size, freebucket_size, 0))
-		return -1;
-	if (tdb_write_convert(tdb, off, &fhdr, sizeof(fhdr)) == -1)
-		return -1;
-
-	/* Adjust off to point to start of buckets, add to be remainder. */
-	add -= freebucket_size + sizeof(fhdr);
-	off += sizeof(fhdr);
-
-	/* Access the old zones. */
-	old_num_total = tdb->header.v.num_zones*(tdb->header.v.free_buckets+1);
-	old_free_off = tdb->header.v.free_off;
-	oldf = tdb_access_read(tdb, old_free_off,
-			       old_num_total * sizeof(tdb_off_t), true);
-	if (!oldf)
-		return -1;
-
-	/* Switch to using our new zone. */
-	if (zero_out(tdb, off, freebucket_size) == -1)
-		goto fail_release;
-
-	tdb->header.v.free_off = off;
-	tdb->header.v.num_zones = new_num_zones;
-	tdb->header.v.zone_bits = new_zone_bits;
-	tdb->header.v.free_buckets = new_num_buckets;
-
-	/* FIXME: If zone size hasn't changed, can simply copy pointers. */
-	/* FIXME: Coalesce? */
-	for (i = 0; i < old_num_total; i++) {
-		tdb_off_t next;
-		struct tdb_free_record rec;
-		tdb_off_t list;
-
-		for (off = oldf[i]; off; off = next) {
-			if (tdb_read_convert(tdb, off, &rec, sizeof(rec)))
-				goto fail_release;
-
-			list = zone_of(tdb, off)
-				* (tdb->header.v.free_buckets+1)
-				+ size_to_bucket(tdb, rec.data_len);
-			next = rec.next;
-		
-			if (enqueue_in_free(tdb, list, off, &rec) == -1)
-				goto fail_release;
-		}
-	}
-
-	/* Free up the old free buckets. */
-	old_free_off -= sizeof(fhdr);
-	if (tdb_read_convert(tdb, old_free_off, &fhdr, sizeof(fhdr)) == -1)
-		goto fail_release;
-	if (add_free_record(tdb, old_free_off,
-			    sizeof(fhdr)
-			    + rec_data_length(&fhdr)
-			    + rec_extra_padding(&fhdr)))
-		goto fail_release;
-
-	/* Add the rest as a new free record. */
-	if (add_free_record(tdb, tdb->map_size - add, add) == -1)
-		goto fail_release;
-
-	/* Start allocating from where the new space is. */
-	tdb->last_zone = zone_of(tdb, tdb->map_size - add);
-	tdb_access_release(tdb, oldf);
-	return write_header(tdb);
-
-fail_release:
-	tdb_access_release(tdb, oldf);
-	return -1;
-}
-
-/* Expand the database. */
+/* Expand the database (by adding a zone). */
 int tdb_expand(struct tdb_context *tdb, tdb_len_t klen, tdb_len_t dlen,
 int tdb_expand(struct tdb_context *tdb, tdb_len_t klen, tdb_len_t dlen,
 	       bool growing)
 	       bool growing)
 {
 {
-	uint64_t new_num_buckets, new_num_zones, new_zone_bits;
-	uint64_t old_num_zones, old_size, old_zone_bits;
-	tdb_len_t add, needed;
+	uint64_t old_size;
+	tdb_off_t off;
+	uint8_t zone_bits;
+	unsigned int num_buckets;
+	tdb_len_t wanted;
+	struct free_zone_header zhdr;
+	bool enlarge_zone;
 
 
 	/* We need room for the record header too. */
 	/* We need room for the record header too. */
-	needed = sizeof(struct tdb_used_record)
-		+ adjust_size(klen, dlen, growing);
-
-	/* tdb_allrecord_lock will update header; did zones change? */
-	old_zone_bits = tdb->header.v.zone_bits;
-	old_num_zones = tdb->header.v.num_zones;
+	wanted = sizeof(struct tdb_used_record)
+		+ (adjust_size(klen, dlen, growing)<<TDB_COMFORT_FACTOR_BITS);
 
 
-	/* FIXME: this is overkill.  An expand lock? */
-	if (tdb_allrecord_lock(tdb, F_WRLCK, TDB_LOCK_WAIT, false) == -1)
+	/* Only one person can expand file at a time. */
+	if (tdb_lock_expand(tdb, F_WRLCK) != 0)
 		return -1;
 		return -1;
 
 
-	/* Someone may have expanded for us. */
-	if (old_zone_bits != tdb->header.v.zone_bits
-	    || old_num_zones != tdb->header.v.num_zones)
-		goto success;
-
-	/* They may have also expanded the underlying size (otherwise we'd
-	 * have expanded our mmap to look at those offsets already). */
+	/* Someone else may have expanded the file, so retry. */
 	old_size = tdb->map_size;
 	old_size = tdb->map_size;
 	tdb->methods->oob(tdb, tdb->map_size + 1, true);
 	tdb->methods->oob(tdb, tdb->map_size + 1, true);
 	if (tdb->map_size != old_size)
 	if (tdb->map_size != old_size)
 		goto success;
 		goto success;
 
 
-	add = expand_to_fill_zones(tdb);
-	if (add == TDB_OFF_ERR)
+	/* zone bits tailer char is protected by EXPAND lock. */
+	if (tdb->methods->read(tdb, old_size - 1, &zone_bits, 1) == -1)
 		goto fail;
 		goto fail;
 
 
-	if (add >= needed) {
-		/* Allocate from this zone. */
-		tdb->last_zone = zone_of(tdb, tdb->map_size - add);
-		goto success;
+	/* If zones aren't working well, add larger zone if possible. */
+	enlarge_zone = !zones_happy(tdb);
+
+	/* New zone can be between zone_bits or larger if we're on the right
+	 * boundary. */
+	for (;;) {
+		/* Does this fit the allocation comfortably? */
+		if ((1ULL << zone_bits) >= overhead(zone_bits) + wanted) {
+			/* Only let enlarge_zone enlarge us once. */
+			if (!enlarge_zone)
+				break;
+			enlarge_zone = false;
+		}
+		if ((old_size - 1 - sizeof(struct tdb_header))
+		    & (1 << zone_bits))
+			break;
+		zone_bits++;
 	}
 	}
 
 
-	/* Slow path.  Should we increase the number of buckets? */
-	new_num_buckets = tdb->header.v.free_buckets;
-	if (larger_buckets_might_help(tdb))
-		new_num_buckets++;
-
-	/* Now we'll need room for the new free buckets, too.  Assume
-	 * worst case (zones expand). */
-	needed += sizeof(struct tdb_used_record)
-		+ ((tdb->header.v.num_zones+1)
-		   * (new_num_buckets+1) * sizeof(tdb_off_t));
-
-	/* If we need less that one zone, and they're working well, just add
-	 * another one. */
-	if (needed < (1UL<<tdb->header.v.zone_bits) && zones_happy(tdb)) {
-		new_num_zones = tdb->header.v.num_zones+1;
-		new_zone_bits = tdb->header.v.zone_bits;
-		add = 1ULL << tdb->header.v.zone_bits;
-	} else {
-		/* Increase the zone size. */
-		new_num_zones = tdb->header.v.num_zones;
-		new_zone_bits = tdb->header.v.zone_bits+1;
-		while ((new_num_zones << new_zone_bits)
-		       < tdb->map_size + needed) {
-			new_zone_bits++;
-		}
+	zhdr.zone_bits = zone_bits;
+	num_buckets = BUCKETS_FOR_ZONE(zone_bits);
 
 
-		/* We expand by enough full zones to meet the need. */
-		add = ((tdb->map_size + needed + (1ULL << new_zone_bits)-1)
-		       & ~((1ULL << new_zone_bits)-1))
-			- tdb->map_size;
-	}
+	if (tdb->methods->expand_file(tdb, 1ULL << zone_bits) == -1)
+		goto fail;
+
+	/* Write new tailer. */
+	if (tdb->methods->write(tdb, tdb->map_size - 1, &zone_bits, 1) == -1)
+		goto fail;
+
+	/* Write new zone header (just before old tailer). */
+	off = old_size - 1;
+	if (tdb_write_convert(tdb, off, &zhdr, sizeof(zhdr)) == -1)
+		goto fail;
+
+	/* Now write empty buckets. */
+	off += sizeof(zhdr);
+	if (zero_out(tdb, off, (num_buckets+1) * sizeof(tdb_off_t)) == -1)
+		goto fail;
+	off += (num_buckets+1) * sizeof(tdb_off_t);
 
 
-	if (update_zones(tdb, new_num_zones, new_zone_bits, new_num_buckets,
-			 add) == -1)
+	/* Now add the rest as our free record. */
+	if (add_free_record(tdb, zone_bits, off, tdb->map_size-1-off) == -1)
 		goto fail;
 		goto fail;
 
 
 success:
 success:
-	tdb_allrecord_unlock(tdb, F_WRLCK);
+	tdb_unlock_expand(tdb, F_WRLCK);
 	return 0;
 	return 0;
 
 
 fail:
 fail:
-	tdb_allrecord_unlock(tdb, F_WRLCK);
+	tdb_unlock_expand(tdb, F_WRLCK);
 	return -1;
 	return -1;
 }
 }

+ 11 - 1
ccan/tdb2/io.c

@@ -70,6 +70,8 @@ void tdb_mmap(struct tdb_context *tdb)
 static int tdb_oob(struct tdb_context *tdb, tdb_off_t len, bool probe)
 static int tdb_oob(struct tdb_context *tdb, tdb_off_t len, bool probe)
 {
 {
 	struct stat st;
 	struct stat st;
+	int ret;
+
 	if (len <= tdb->map_size)
 	if (len <= tdb->map_size)
 		return 0;
 		return 0;
 	if (tdb->flags & TDB_INTERNAL) {
 	if (tdb->flags & TDB_INTERNAL) {
@@ -85,7 +87,14 @@ static int tdb_oob(struct tdb_context *tdb, tdb_off_t len, bool probe)
 		return -1;
 		return -1;
 	}
 	}
 
 
-	if (fstat(tdb->fd, &st) == -1) {
+	if (tdb_lock_expand(tdb, F_RDLCK) != 0)
+		return -1;
+
+	ret = fstat(tdb->fd, &st);
+
+	tdb_unlock_expand(tdb, F_RDLCK);
+
+	if (ret == -1) {
 		tdb->ecode = TDB_ERR_IO;
 		tdb->ecode = TDB_ERR_IO;
 		return -1;
 		return -1;
 	}
 	}
@@ -103,6 +112,7 @@ static int tdb_oob(struct tdb_context *tdb, tdb_off_t len, bool probe)
 
 
 	/* Unmap, update size, remap */
 	/* Unmap, update size, remap */
 	tdb_munmap(tdb);
 	tdb_munmap(tdb);
+
 	tdb->map_size = st.st_size;
 	tdb->map_size = st.st_size;
 	tdb_mmap(tdb);
 	tdb_mmap(tdb);
 	return 0;
 	return 0;

+ 36 - 22
ccan/tdb2/lock.c

@@ -26,6 +26,8 @@
 */
 */
 
 
 #include "private.h"
 #include "private.h"
+#include <assert.h>
+#include <ccan/build_assert/build_assert.h>
 
 
 static int fcntl_lock(struct tdb_context *tdb,
 static int fcntl_lock(struct tdb_context *tdb,
 		      int rw, off_t off, off_t len, bool waitflag)
 		      int rw, off_t off, off_t len, bool waitflag)
@@ -255,19 +257,14 @@ static int tdb_nest_lock(struct tdb_context *tdb, tdb_off_t offset, int ltype,
 {
 {
 	struct tdb_lock_type *new_lck;
 	struct tdb_lock_type *new_lck;
 
 
-	/* Header is not valid for open lock; valgrind complains. */
-	if (offset >= TDB_HASH_LOCK_START) {
-		if (offset > TDB_HASH_LOCK_START
-		    + (1ULL << tdb->header.v.hash_bits)
-		    + (tdb->header.v.num_zones
-		       * (tdb->header.v.free_buckets+1))) {
-			tdb->ecode = TDB_ERR_LOCK;
-			tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
-				 "tdb_lock: invalid offset %llu ltype=%d\n",
-				 (long long)offset, ltype);
-			return -1;
-		}
+	if (offset >= TDB_HASH_LOCK_START + (1 << 30) + tdb->map_size / 8) {
+		tdb->ecode = TDB_ERR_LOCK;
+		tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
+			 "tdb_lock: invalid offset %llu ltype=%d\n",
+			 (long long)offset, ltype);
+		return -1;
 	}
 	}
+
 	if (tdb->flags & TDB_NOLOCK)
 	if (tdb->flags & TDB_NOLOCK)
 		return 0;
 		return 0;
 
 
@@ -534,6 +531,16 @@ void tdb_unlock_open(struct tdb_context *tdb)
 	tdb_nest_unlock(tdb, TDB_OPEN_LOCK, F_WRLCK);
 	tdb_nest_unlock(tdb, TDB_OPEN_LOCK, F_WRLCK);
 }
 }
 
 
+int tdb_lock_expand(struct tdb_context *tdb, int ltype)
+{
+	return tdb_nest_lock(tdb, TDB_EXPANSION_LOCK, ltype, TDB_LOCK_WAIT);
+}
+
+void tdb_unlock_expand(struct tdb_context *tdb, int ltype)
+{
+	tdb_nest_unlock(tdb, TDB_EXPANSION_LOCK, ltype);
+}
+
 /* unlock entire db */
 /* unlock entire db */
 int tdb_allrecord_unlock(struct tdb_context *tdb, int ltype)
 int tdb_allrecord_unlock(struct tdb_context *tdb, int ltype)
 {
 {
@@ -687,10 +694,21 @@ int tdb_unlock_list(struct tdb_context *tdb, tdb_off_t list, int ltype)
 	}
 	}
 }
 }
 
 
-/* Free list locks come after hash locks */
-int tdb_lock_free_list(struct tdb_context *tdb, tdb_off_t flist,
-		       enum tdb_lock_flags waitflag)
+/* Hash locks use TDB_HASH_LOCK_START + the next 30 bits.
+ * Then we begin; bucket offsets are sizeof(tdb_len_t) apart, so we divide.
+ * The result is that on 32 bit systems we don't use lock values > 2^31 on
+ * files that are less than 4GB.
+ */
+static tdb_off_t free_lock_off(tdb_off_t b_off)
 {
 {
+	return TDB_HASH_LOCK_START + (1 << 30) + b_off / sizeof(tdb_off_t);
+}
+
+int tdb_lock_free_bucket(struct tdb_context *tdb, tdb_off_t b_off,
+			 enum tdb_lock_flags waitflag)
+{
+	assert(b_off >= sizeof(struct tdb_header));
+
 	/* You're supposed to have a hash lock first! */
 	/* You're supposed to have a hash lock first! */
 	if (!(tdb->flags & TDB_NOLOCK) && !tdb_has_locks(tdb)) {
 	if (!(tdb->flags & TDB_NOLOCK) && !tdb_has_locks(tdb)) {
 		tdb->ecode = TDB_ERR_LOCK;
 		tdb->ecode = TDB_ERR_LOCK;
@@ -709,19 +727,15 @@ int tdb_lock_free_list(struct tdb_context *tdb, tdb_off_t flist,
 		return -1;
 		return -1;
 	}
 	}
 
 
-	return tdb_nest_lock(tdb, TDB_HASH_LOCK_START
-			     + (1ULL << tdb->header.v.hash_bits)
-			     + flist, F_WRLCK, waitflag);
+	return tdb_nest_lock(tdb, free_lock_off(b_off), F_WRLCK, waitflag);
 }
 }
 
 
-void tdb_unlock_free_list(struct tdb_context *tdb, tdb_off_t flist)
+void tdb_unlock_free_bucket(struct tdb_context *tdb, tdb_off_t b_off)
 {
 {
 	if (tdb->allrecord_lock.count)
 	if (tdb->allrecord_lock.count)
 		return;
 		return;
 
 
-	tdb_nest_unlock(tdb, TDB_HASH_LOCK_START
-			+ (1ULL << tdb->header.v.hash_bits)
-			+ flist, F_WRLCK);
+	tdb_nest_unlock(tdb, free_lock_off(b_off), F_WRLCK);
 }
 }
 
 
 /* Even if the entry isn't in this hash bucket, you'd have to lock this
 /* Even if the entry isn't in this hash bucket, you'd have to lock this

+ 64 - 27
ccan/tdb2/private.h

@@ -67,12 +67,10 @@ typedef uint64_t tdb_off_t;
 #define TDB_MAGIC_FOOD "TDB file\n"
 #define TDB_MAGIC_FOOD "TDB file\n"
 #define TDB_VERSION ((uint64_t)(0x26011967 + 7))
 #define TDB_VERSION ((uint64_t)(0x26011967 + 7))
 #define TDB_MAGIC ((uint64_t)0x1999)
 #define TDB_MAGIC ((uint64_t)0x1999)
-#define TDB_FREE_MAGIC (~(uint64_t)TDB_MAGIC)
+#define TDB_FREE_MAGIC ((~(uint64_t)TDB_MAGIC) << 6)
 #define TDB_HASH_MAGIC (0xA1ABE11A01092008ULL)
 #define TDB_HASH_MAGIC (0xA1ABE11A01092008ULL)
 #define TDB_RECOVERY_MAGIC (0xf53bc0e7U)
 #define TDB_RECOVERY_MAGIC (0xf53bc0e7U)
 #define TDB_RECOVERY_INVALID_MAGIC (0x0)
 #define TDB_RECOVERY_INVALID_MAGIC (0x0)
-#define TDB_EXTRA_HASHBITS (11) /* We steal 11 bits to stash hash info. */
-#define TDB_EXTRA_HASHBITS_NUM (3)
 
 
 #define TDB_OFF_ERR ((tdb_off_t)-1)
 #define TDB_OFF_ERR ((tdb_off_t)-1)
 
 
@@ -80,13 +78,21 @@ typedef uint64_t tdb_off_t;
 #define TDB_OPEN_LOCK 0
 #define TDB_OPEN_LOCK 0
 /* Doing a transaction. */
 /* Doing a transaction. */
 #define TDB_TRANSACTION_LOCK 1
 #define TDB_TRANSACTION_LOCK 1
+/* Expanding file. */
+#define TDB_EXPANSION_LOCK 2
 /* Hash chain locks. */
 /* Hash chain locks. */
-#define TDB_HASH_LOCK_START 2
+#define TDB_HASH_LOCK_START 3
 
 
-/* We start wih 256 hash buckets, 10 free buckets.  A 4k-sized zone. */
+/* We start wih 256 hash buckets, and a 64k-sized zone. */
 #define INITIAL_HASH_BITS 8
 #define INITIAL_HASH_BITS 8
-#define INITIAL_FREE_BUCKETS 10
-#define INITIAL_ZONE_BITS 12
+#define INITIAL_ZONE_BITS 16
+
+/* Try to create zones at least 32 times larger than allocations. */
+#define TDB_COMFORT_FACTOR_BITS 5
+
+/* We ensure buckets up to size 1 << (zone_bits - TDB_COMFORT_FACTOR_BITS). */
+/* FIXME: test this matches size_to_bucket! */
+#define BUCKETS_FOR_ZONE(zone_bits) ((zone_bits) + 2 - TDB_COMFORT_FACTOR_BITS)
 
 
 #if !HAVE_BSWAP_64
 #if !HAVE_BSWAP_64
 static inline uint64_t bswap_64(uint64_t x)
 static inline uint64_t bswap_64(uint64_t x)
@@ -106,8 +112,9 @@ struct tdb_used_record {
 	/* For on-disk compatibility, we avoid bitfields:
 	/* For on-disk compatibility, we avoid bitfields:
 	   magic: 16,        (highest)
 	   magic: 16,        (highest)
 	   key_len_bits: 5,
 	   key_len_bits: 5,
-           hash:11,
-	   extra_padding: 32 (lowest)
+	   extra_padding: 32
+	   hash_bits: 5,
+           zone_bits: 6 (lowest)
 	*/
 	*/
         uint64_t magic_and_meta;
         uint64_t magic_and_meta;
 	/* The bottom key_len_bits*2 are key length, rest is data length. */
 	/* The bottom key_len_bits*2 are key length, rest is data length. */
@@ -131,12 +138,17 @@ static inline uint64_t rec_data_length(const struct tdb_used_record *r)
 
 
 static inline uint64_t rec_extra_padding(const struct tdb_used_record *r)
 static inline uint64_t rec_extra_padding(const struct tdb_used_record *r)
 {
 {
-	return r->magic_and_meta & 0xFFFFFFFF;
+	return (r->magic_and_meta >> 11) & 0xFFFFFFFF;
+}
+
+static inline unsigned int rec_zone_bits(const struct tdb_used_record *r)
+{
+	return r->magic_and_meta & ((1 << 6) - 1);
 }
 }
 
 
-static inline uint64_t rec_hash(const struct tdb_used_record *r)
+static inline uint32_t rec_hash(const struct tdb_used_record *r)
 {
 {
-	return ((r->magic_and_meta >> 32) & ((1ULL << 11) - 1));
+	return (r->magic_and_meta >> 6) & ((1 << 5) - 1);
 }
 }
 
 
 static inline uint16_t rec_magic(const struct tdb_used_record *r)
 static inline uint16_t rec_magic(const struct tdb_used_record *r)
@@ -145,26 +157,33 @@ static inline uint16_t rec_magic(const struct tdb_used_record *r)
 }
 }
 
 
 struct tdb_free_record {
 struct tdb_free_record {
-        uint64_t magic;
+        uint64_t magic_and_meta; /* Bottom 6 bits are zone bits. */
         uint64_t data_len; /* Not counting these two fields. */
         uint64_t data_len; /* Not counting these two fields. */
 	/* This is why the minimum record size is 16 bytes.  */
 	/* This is why the minimum record size is 16 bytes.  */
 	uint64_t next, prev;
 	uint64_t next, prev;
 };
 };
 
 
+static inline unsigned int frec_zone_bits(const struct tdb_free_record *f)
+{
+	return f->magic_and_meta & ((1 << 6) - 1);
+}
+
+static inline uint64_t frec_magic(const struct tdb_free_record *f)
+{
+	return f->magic_and_meta & ~((1ULL << 6) - 1);
+}
+
 /* These parts can change while we have db open. */
 /* These parts can change while we have db open. */
 struct tdb_header_volatile {
 struct tdb_header_volatile {
 	uint64_t generation; /* Makes sure it changes on every update. */
 	uint64_t generation; /* Makes sure it changes on every update. */
 	uint64_t hash_bits; /* Entries in hash table. */
 	uint64_t hash_bits; /* Entries in hash table. */
 	uint64_t hash_off; /* Offset of hash table. */
 	uint64_t hash_off; /* Offset of hash table. */
-	uint64_t num_zones; /* How many zones in the file. */
-	uint64_t zone_bits; /* Size of zones. */
-	uint64_t free_buckets; /* How many buckets in each zone. */
-	uint64_t free_off; /* Arrays of free entries. */
 };
 };
 
 
 /* this is stored at the front of every database */
 /* this is stored at the front of every database */
 struct tdb_header {
 struct tdb_header {
 	char magic_food[32]; /* for /etc/magic */
 	char magic_food[32]; /* for /etc/magic */
+	/* FIXME: Make me 32 bit? */
 	uint64_t version; /* version of the code */
 	uint64_t version; /* version of the code */
 	uint64_t hash_test; /* result of hashing HASH_MAGIC. */
 	uint64_t hash_test; /* result of hashing HASH_MAGIC. */
 	uint64_t hash_seed; /* "random" seed written at creation time. */
 	uint64_t hash_seed; /* "random" seed written at creation time. */
@@ -174,6 +193,16 @@ struct tdb_header {
 	tdb_off_t reserved[19];
 	tdb_off_t reserved[19];
 };
 };
 
 
+/* Each zone has its set of free lists at the head.
+ *
+ * For each zone we have a series of per-size buckets, and a final bucket for
+ * "too big". */
+struct free_zone_header {
+	/* How much does this zone cover? */
+	uint64_t zone_bits;
+	/* tdb_off_t buckets[free_buckets + 1] */
+};
+
 enum tdb_lock_flags {
 enum tdb_lock_flags {
 	/* WAIT == F_SETLKW, NOWAIT == F_SETLK */
 	/* WAIT == F_SETLKW, NOWAIT == F_SETLK */
 	TDB_LOCK_NOWAIT = 0,
 	TDB_LOCK_NOWAIT = 0,
@@ -227,7 +256,9 @@ struct tdb_context {
 	struct tdb_transaction *transaction;
 	struct tdb_transaction *transaction;
 	
 	
 	/* What zone of the tdb to use, for spreading load. */
 	/* What zone of the tdb to use, for spreading load. */
-	uint64_t last_zone; 
+	uint64_t zone_off;
+	/* Cached copy of zone header. */
+	struct free_zone_header zhdr;
 
 
 	/* IO methods: changes for transactions. */
 	/* IO methods: changes for transactions. */
 	const struct tdb_methods *methods;
 	const struct tdb_methods *methods;
@@ -268,25 +299,26 @@ tdb_off_t hash_off(struct tdb_context *tdb, uint64_t list);
 
 
 
 
 /* free.c: */
 /* free.c: */
-void tdb_zone_init(struct tdb_context *tdb);
+int tdb_zone_init(struct tdb_context *tdb);
 
 
 /* If this fails, try tdb_expand. */
 /* If this fails, try tdb_expand. */
 tdb_off_t alloc(struct tdb_context *tdb, size_t keylen, size_t datalen,
 tdb_off_t alloc(struct tdb_context *tdb, size_t keylen, size_t datalen,
 		uint64_t hash, bool growing);
 		uint64_t hash, bool growing);
 
 
 /* Put this record in a free list. */
 /* Put this record in a free list. */
-int add_free_record(struct tdb_context *tdb,
+int add_free_record(struct tdb_context *tdb, unsigned int zone_bits,
 		    tdb_off_t off, tdb_len_t len_with_header);
 		    tdb_off_t off, tdb_len_t len_with_header);
 
 
 /* Set up header for a used record. */
 /* Set up header for a used record. */
 int set_header(struct tdb_context *tdb,
 int set_header(struct tdb_context *tdb,
 	       struct tdb_used_record *rec,
 	       struct tdb_used_record *rec,
 	       uint64_t keylen, uint64_t datalen,
 	       uint64_t keylen, uint64_t datalen,
-	       uint64_t actuallen, uint64_t hash);
+	       uint64_t actuallen, uint64_t hash,
+	       unsigned int zone_bits);
 
 
 /* Used by tdb_check to verify. */
 /* Used by tdb_check to verify. */
-unsigned int size_to_bucket(struct tdb_context *tdb, tdb_len_t data_len);
-tdb_off_t zone_of(struct tdb_context *tdb, tdb_off_t off);
+unsigned int size_to_bucket(unsigned int free_buckets, tdb_len_t data_len);
+tdb_off_t bucket_off(tdb_off_t zone_off, tdb_off_t bucket);
 
 
 /* io.c: */
 /* io.c: */
 /* Initialize tdb->methods. */
 /* Initialize tdb->methods. */
@@ -352,10 +384,10 @@ tdb_off_t tdb_lock_list(struct tdb_context *tdb, uint64_t hash,
 			int ltype, enum tdb_lock_flags waitflag);
 			int ltype, enum tdb_lock_flags waitflag);
 int tdb_unlock_list(struct tdb_context *tdb, tdb_off_t list, int ltype);
 int tdb_unlock_list(struct tdb_context *tdb, tdb_off_t list, int ltype);
 
 
-/* Lock/unlock a particular free list. */
-int tdb_lock_free_list(struct tdb_context *tdb, tdb_off_t flist,
-		       enum tdb_lock_flags waitflag);
-void tdb_unlock_free_list(struct tdb_context *tdb, tdb_off_t flist);
+/* Lock/unlock a particular free bucket. */
+int tdb_lock_free_bucket(struct tdb_context *tdb, tdb_off_t b_off,
+			 enum tdb_lock_flags waitflag);
+void tdb_unlock_free_bucket(struct tdb_context *tdb, tdb_off_t b_off);
 
 
 /* Do we have any locks? */
 /* Do we have any locks? */
 bool tdb_has_locks(struct tdb_context *tdb);
 bool tdb_has_locks(struct tdb_context *tdb);
@@ -368,6 +400,11 @@ int tdb_allrecord_unlock(struct tdb_context *tdb, int ltype);
 /* Serialize db open. */
 /* Serialize db open. */
 int tdb_lock_open(struct tdb_context *tdb);
 int tdb_lock_open(struct tdb_context *tdb);
 void tdb_unlock_open(struct tdb_context *tdb);
 void tdb_unlock_open(struct tdb_context *tdb);
+
+/* Serialize db expand. */
+int tdb_lock_expand(struct tdb_context *tdb, int ltype);
+void tdb_unlock_expand(struct tdb_context *tdb, int ltype);
+
 /* Expand the file. */
 /* Expand the file. */
 int tdb_expand(struct tdb_context *tdb, tdb_len_t klen, tdb_len_t dlen,
 int tdb_expand(struct tdb_context *tdb, tdb_len_t klen, tdb_len_t dlen,
 	       bool growing);
 	       bool growing);

+ 77 - 50
ccan/tdb2/tdb.c

@@ -124,12 +124,22 @@ static uint64_t random_number(struct tdb_context *tdb)
 	return ret;
 	return ret;
 }
 }
 
 
-struct new_database {
+struct new_db_head {
 	struct tdb_header hdr;
 	struct tdb_header hdr;
+	struct free_zone_header zhdr;
+	tdb_off_t free[BUCKETS_FOR_ZONE(INITIAL_ZONE_BITS) + 1];
 	struct tdb_used_record hrec;
 	struct tdb_used_record hrec;
 	tdb_off_t hash[1ULL << INITIAL_HASH_BITS];
 	tdb_off_t hash[1ULL << INITIAL_HASH_BITS];
-	struct tdb_used_record frec;
-	tdb_off_t free[INITIAL_FREE_BUCKETS + 1]; /* One overflow bucket */
+	struct tdb_free_record frec;
+};
+
+struct new_database {
+	struct new_db_head h;
+	/* Rest up to 1 << INITIAL_ZONE_BITS is empty. */
+	char space[(1 << INITIAL_ZONE_BITS)
+		   - (sizeof(struct new_db_head) - sizeof(struct tdb_header))];
+	uint8_t tailer;
+	/* Don't count final padding! */
 };
 };
 
 
 /* initialise a new database */
 /* initialise a new database */
@@ -137,51 +147,61 @@ static int tdb_new_database(struct tdb_context *tdb)
 {
 {
 	/* We make it up in memory, then write it out if not internal */
 	/* We make it up in memory, then write it out if not internal */
 	struct new_database newdb;
 	struct new_database newdb;
-	unsigned int magic_off = offsetof(struct tdb_header, magic_food);
+	unsigned int bucket, magic_off, dbsize;
 
 
-	/* Fill in the header */
-	newdb.hdr.version = TDB_VERSION;
-	newdb.hdr.hash_seed = random_number(tdb);
-	newdb.hdr.hash_test = TDB_HASH_MAGIC;
-	newdb.hdr.hash_test = tdb->khash(&newdb.hdr.hash_test,
-					 sizeof(newdb.hdr.hash_test),
-					 newdb.hdr.hash_seed,
-					 tdb->hash_priv);
-	memset(newdb.hdr.reserved, 0, sizeof(newdb.hdr.reserved));
-	newdb.hdr.v.generation = 0;
-
-	/* The initial zone must cover the initial database size! */
-	BUILD_ASSERT((1ULL << INITIAL_ZONE_BITS) >= sizeof(newdb));
-
-	/* Free array has 1 zone, 10 buckets.  All buckets empty. */
-	newdb.hdr.v.num_zones = 1;
-	newdb.hdr.v.zone_bits = INITIAL_ZONE_BITS;
-	newdb.hdr.v.free_buckets = INITIAL_FREE_BUCKETS;
-	newdb.hdr.v.free_off = offsetof(struct new_database, free);
-	set_header(tdb, &newdb.frec, 0,
-		   sizeof(newdb.free), sizeof(newdb.free), 0);
-	memset(newdb.free, 0, sizeof(newdb.free));
+	/* Don't want any extra padding! */
+	dbsize = offsetof(struct new_database, tailer) + sizeof(newdb.tailer);
 
 
+	/* Fill in the header */
+	newdb.h.hdr.version = TDB_VERSION;
+	newdb.h.hdr.hash_seed = random_number(tdb);
+	newdb.h.hdr.hash_test = TDB_HASH_MAGIC;
+	newdb.h.hdr.hash_test = tdb->khash(&newdb.h.hdr.hash_test,
+					   sizeof(newdb.h.hdr.hash_test),
+					   newdb.h.hdr.hash_seed,
+					   tdb->hash_priv);
+	memset(newdb.h.hdr.reserved, 0, sizeof(newdb.h.hdr.reserved));
+	newdb.h.hdr.v.generation = 0;
 	/* Initial hashes are empty. */
 	/* Initial hashes are empty. */
-	newdb.hdr.v.hash_bits = INITIAL_HASH_BITS;
-	newdb.hdr.v.hash_off = offsetof(struct new_database, hash);
-	set_header(tdb, &newdb.hrec, 0,
-		   sizeof(newdb.hash), sizeof(newdb.hash), 0);
-	memset(newdb.hash, 0, sizeof(newdb.hash));
+	newdb.h.hdr.v.hash_bits = INITIAL_HASH_BITS;
+	newdb.h.hdr.v.hash_off = offsetof(struct new_database, h.hash);
+	set_header(tdb, &newdb.h.hrec, 0,
+		   sizeof(newdb.h.hash), sizeof(newdb.h.hash), 0,
+		   INITIAL_ZONE_BITS);
+	memset(newdb.h.hash, 0, sizeof(newdb.h.hash));
+
+	/* Create the single free entry. */
+	newdb.h.frec.magic_and_meta = TDB_FREE_MAGIC | INITIAL_ZONE_BITS;
+	newdb.h.frec.data_len = (sizeof(newdb.h.frec)
+				 - sizeof(struct tdb_used_record)
+				 + sizeof(newdb.space));
+
+	/* Free is mostly empty... */
+	newdb.h.zhdr.zone_bits = INITIAL_ZONE_BITS;
+	memset(newdb.h.free, 0, sizeof(newdb.h.free));
+
+	/* ... except for this one bucket. */
+	bucket = size_to_bucket(INITIAL_ZONE_BITS, newdb.h.frec.data_len);
+	newdb.h.free[bucket] = offsetof(struct new_database, h.frec);
+	newdb.h.frec.next = newdb.h.frec.prev = 0;
+
+	/* Tailer contains maximum number of free_zone bits. */
+	newdb.tailer = INITIAL_ZONE_BITS;
 
 
 	/* Magic food */
 	/* Magic food */
-	memset(newdb.hdr.magic_food, 0, sizeof(newdb.hdr.magic_food));
-	strcpy(newdb.hdr.magic_food, TDB_MAGIC_FOOD);
+	memset(newdb.h.hdr.magic_food, 0, sizeof(newdb.h.hdr.magic_food));
+	strcpy(newdb.h.hdr.magic_food, TDB_MAGIC_FOOD);
 
 
 	/* This creates an endian-converted database, as if read from disk */
 	/* This creates an endian-converted database, as if read from disk */
+	magic_off = offsetof(struct tdb_header, magic_food);
 	tdb_convert(tdb,
 	tdb_convert(tdb,
-		    (char *)&newdb.hdr + magic_off,
-		    sizeof(newdb) - magic_off);
+		    (char *)&newdb.h.hdr + magic_off,
+		    dbsize - 1 - magic_off);
 
 
-	tdb->header = newdb.hdr;
+	tdb->header = newdb.h.hdr;
 
 
 	if (tdb->flags & TDB_INTERNAL) {
 	if (tdb->flags & TDB_INTERNAL) {
-		tdb->map_size = sizeof(newdb);
+		tdb->map_size = dbsize;
 		tdb->map_ptr = malloc(tdb->map_size);
 		tdb->map_ptr = malloc(tdb->map_size);
 		if (!tdb->map_ptr) {
 		if (!tdb->map_ptr) {
 			tdb->ecode = TDB_ERR_OOM;
 			tdb->ecode = TDB_ERR_OOM;
@@ -196,7 +216,7 @@ static int tdb_new_database(struct tdb_context *tdb)
 	if (ftruncate(tdb->fd, 0) == -1)
 	if (ftruncate(tdb->fd, 0) == -1)
 		return -1;
 		return -1;
 
 
-	if (!tdb_pwrite_all(tdb->fd, &newdb, sizeof(newdb), 0)) {
+	if (!tdb_pwrite_all(tdb->fd, &newdb, dbsize, 0)) {
 		tdb->ecode = TDB_ERR_IO;
 		tdb->ecode = TDB_ERR_IO;
 		return -1;
 		return -1;
 	}
 	}
@@ -222,7 +242,7 @@ struct tdb_context *tdb_open(const char *name, int tdb_flags,
 	tdb->name = NULL;
 	tdb->name = NULL;
 	tdb->map_ptr = NULL;
 	tdb->map_ptr = NULL;
 	tdb->fd = -1;
 	tdb->fd = -1;
-	/* map_size will be set below. */
+	tdb->map_size = sizeof(struct tdb_header);
 	tdb->ecode = TDB_SUCCESS;
 	tdb->ecode = TDB_SUCCESS;
 	/* header will be read in below. */
 	/* header will be read in below. */
 	tdb->header_uptodate = false;
 	tdb->header_uptodate = false;
@@ -280,8 +300,7 @@ struct tdb_context *tdb_open(const char *name, int tdb_flags,
 		}
 		}
 		TEST_IT(tdb->flags & TDB_CONVERT);
 		TEST_IT(tdb->flags & TDB_CONVERT);
 		tdb_convert(tdb, &tdb->header, sizeof(tdb->header));
 		tdb_convert(tdb, &tdb->header, sizeof(tdb->header));
-		/* Zones don't matter for internal db. */
-		tdb->last_zone = 0;
+		tdb_zone_init(tdb);
 		return tdb;
 		return tdb;
 	}
 	}
 
 
@@ -357,12 +376,16 @@ struct tdb_context *tdb_open(const char *name, int tdb_flags,
 		goto fail;
 		goto fail;
 	}
 	}
 
 
-	tdb->map_size = st.st_size;
 	tdb->device = st.st_dev;
 	tdb->device = st.st_dev;
 	tdb->inode = st.st_ino;
 	tdb->inode = st.st_ino;
-	tdb_mmap(tdb);
 	tdb_unlock_open(tdb);
 	tdb_unlock_open(tdb);
-	tdb_zone_init(tdb);
+
+	/* This make sure we have current map_size and mmap. */
+	tdb->methods->oob(tdb, tdb->map_size + 1, true);
+
+	/* Now we can pick a random free zone to start from. */
+	if (tdb_zone_init(tdb) == -1)
+		goto fail;
 
 
 	tdb->next = tdbs;
 	tdb->next = tdbs;
 	tdbs = tdb;
 	tdbs = tdb;
@@ -543,7 +566,8 @@ static int update_rec_hdr(struct tdb_context *tdb,
 {
 {
 	uint64_t dataroom = rec_data_length(rec) + rec_extra_padding(rec);
 	uint64_t dataroom = rec_data_length(rec) + rec_extra_padding(rec);
 
 
-	if (set_header(tdb, rec, keylen, datalen, keylen + dataroom, h))
+	if (set_header(tdb, rec, keylen, datalen, keylen + dataroom, h,
+		       rec_zone_bits(rec)))
 		return -1;
 		return -1;
 
 
 	return tdb_write_convert(tdb, off, rec, sizeof(*rec));
 	return tdb_write_convert(tdb, off, rec, sizeof(*rec));
@@ -645,7 +669,7 @@ again:
 	r = tdb_get(tdb, oldoff - sizeof(*r), &pad, sizeof(*r));
 	r = tdb_get(tdb, oldoff - sizeof(*r), &pad, sizeof(*r));
 	if (!r)
 	if (!r)
 		goto oldheader;
 		goto oldheader;
-	add_free_record(tdb, oldoff - sizeof(*r),
+	add_free_record(tdb, rec_zone_bits(r), oldoff - sizeof(*r),
 			sizeof(*r)+rec_data_length(r)+rec_extra_padding(r));
 			sizeof(*r)+rec_data_length(r)+rec_extra_padding(r));
 
 
 	/* Now we write the modified header. */
 	/* Now we write the modified header. */
@@ -736,6 +760,7 @@ static int replace_data(struct tdb_context *tdb,
 			uint64_t h, struct tdb_data key, struct tdb_data dbuf,
 			uint64_t h, struct tdb_data key, struct tdb_data dbuf,
 			tdb_off_t bucket,
 			tdb_off_t bucket,
 			tdb_off_t old_off, tdb_len_t old_room,
 			tdb_off_t old_off, tdb_len_t old_room,
+			unsigned old_zone,
 			bool growing)
 			bool growing)
 {
 {
 	tdb_off_t new_off;
 	tdb_off_t new_off;
@@ -750,7 +775,7 @@ static int replace_data(struct tdb_context *tdb,
 
 
 	/* We didn't like the existing one: remove it. */
 	/* We didn't like the existing one: remove it. */
 	if (old_off)
 	if (old_off)
-		add_free_record(tdb, old_off,
+		add_free_record(tdb, old_zone, old_off,
 				sizeof(struct tdb_used_record)
 				sizeof(struct tdb_used_record)
 				+ key.dsize + old_room);
 				+ key.dsize + old_room);
 
 
@@ -820,7 +845,8 @@ int tdb_store(struct tdb_context *tdb,
 	}
 	}
 
 
 	/* If we didn't use the old record, this implies we're growing. */
 	/* If we didn't use the old record, this implies we're growing. */
-	ret = replace_data(tdb, h, key, dbuf, bucket, off, old_room, off != 0);
+	ret = replace_data(tdb, h, key, dbuf, bucket, off, old_room,
+			   rec_zone_bits(&rec), off != 0);
 	unlock_lists(tdb, start, num, F_WRLCK);
 	unlock_lists(tdb, start, num, F_WRLCK);
 
 
 	if (unlikely(ret == 1)) {
 	if (unlikely(ret == 1)) {
@@ -902,7 +928,8 @@ int tdb_append(struct tdb_context *tdb,
 	}
 	}
 
 
 	/* If they're using tdb_append(), it implies they're growing record. */
 	/* If they're using tdb_append(), it implies they're growing record. */
-	ret = replace_data(tdb, h, key, new_dbuf, bucket, off, old_room, true);
+	ret = replace_data(tdb, h, key, new_dbuf, bucket, off, old_room,
+			   rec_zone_bits(&rec), true);
 	unlock_lists(tdb, start, num, F_WRLCK);
 	unlock_lists(tdb, start, num, F_WRLCK);
 	free(newdata);
 	free(newdata);
 
 
@@ -1012,7 +1039,7 @@ int tdb_delete(struct tdb_context *tdb, struct tdb_data key)
 	}
 	}
 
 
 	/* Free the deleted entry. */
 	/* Free the deleted entry. */
-	if (add_free_record(tdb, off,
+	if (add_free_record(tdb, rec_zone_bits(&rec), off,
 			    sizeof(struct tdb_used_record)
 			    sizeof(struct tdb_used_record)
 			    + rec_key_length(&rec)
 			    + rec_key_length(&rec)
 			    + rec_data_length(&rec)
 			    + rec_data_length(&rec)

+ 77 - 56
ccan/tdb2/test/layout.c

@@ -10,7 +10,7 @@ struct tdb_layout *new_tdb_layout(void)
 	struct tdb_layout *layout = malloc(sizeof(*layout));
 	struct tdb_layout *layout = malloc(sizeof(*layout));
 	layout->num_elems = 0;
 	layout->num_elems = 0;
 	layout->elem = NULL;
 	layout->elem = NULL;
-	layout->ftable = layout->htable = -1;
+	layout->htable = -1;
 	return layout;
 	return layout;
 }
 }
 
 
@@ -22,6 +22,18 @@ static void add(struct tdb_layout *layout, union tdb_layout_elem elem)
 	layout->elem[layout->num_elems++] = elem;
 	layout->elem[layout->num_elems++] = elem;
 }
 }
 
 
+void tdb_layout_add_zone(struct tdb_layout *layout,
+			 unsigned int zone_bits,
+			 bool fill_prev)
+{
+	union tdb_layout_elem elem;
+	if (fill_prev)
+		tdb_layout_add_free(layout, 0);
+	elem.base.type = ZONE;
+	elem.zone.zone_bits = zone_bits;
+	add(layout, elem);
+}
+
 void tdb_layout_add_free(struct tdb_layout *layout, tdb_len_t len)
 void tdb_layout_add_free(struct tdb_layout *layout, tdb_len_t len)
 {
 {
 	union tdb_layout_elem elem;
 	union tdb_layout_elem elem;
@@ -64,23 +76,6 @@ void tdb_layout_add_hashtable(struct tdb_layout *layout,
 	add(layout, elem);
 	add(layout, elem);
 }
 }
 
 
-void tdb_layout_add_freetable(struct tdb_layout *layout,
-			      unsigned int num_zones,
-			      unsigned int zone_bits,
-			      unsigned int num_buckets,
-			      tdb_len_t extra)
-{
-	union tdb_layout_elem elem;
-	elem.base.type = FREETABLE;
-	elem.freetable.num_zones = num_zones;
-	elem.freetable.zone_bits = zone_bits;
-	elem.freetable.num_buckets = num_buckets;
-	elem.freetable.extra = extra;
-	assert(layout->ftable == -1U);
-	layout->ftable = layout->num_elems;
-	add(layout, elem);
-}
-
 static tdb_len_t free_record_len(tdb_len_t len)
 static tdb_len_t free_record_len(tdb_len_t len)
 {
 {
 	return sizeof(struct tdb_used_record) + len;
 	return sizeof(struct tdb_used_record) + len;
@@ -101,11 +96,10 @@ static tdb_len_t hashtable_len(struct tle_hashtable *htable)
 		+ (sizeof(tdb_off_t) << htable->hash_bits);
 		+ (sizeof(tdb_off_t) << htable->hash_bits);
 }
 }
 
 
-static tdb_len_t freetable_len(struct tle_freetable *ftable)
+static tdb_len_t zone_header_len(struct tle_zone *zone)
 {
 {
-	return sizeof(struct tdb_used_record)
-		+ (sizeof(tdb_off_t) * ftable->num_zones
-		   * (ftable->num_buckets + 1));
+	return sizeof(struct free_zone_header)
+		+ sizeof(tdb_off_t) * (BUCKETS_FOR_ZONE(zone->zone_bits)+1);
 }
 }
 
 
 static void set_free_record(void *mem, tdb_len_t len)
 static void set_free_record(void *mem, tdb_len_t len)
@@ -114,43 +108,47 @@ static void set_free_record(void *mem, tdb_len_t len)
 }
 }
 
 
 static void set_data_record(void *mem, struct tdb_context *tdb,
 static void set_data_record(void *mem, struct tdb_context *tdb,
+			    struct tle_zone *last_zone,
 			    struct tle_used *used)
 			    struct tle_used *used)
 {
 {
 	struct tdb_used_record *u = mem;
 	struct tdb_used_record *u = mem;
 
 
 	set_header(tdb, u, used->key.dsize, used->data.dsize,
 	set_header(tdb, u, used->key.dsize, used->data.dsize,
 		   used->key.dsize + used->data.dsize + used->extra,
 		   used->key.dsize + used->data.dsize + used->extra,
-		   tdb_hash(tdb, used->key.dptr, used->key.dsize));
+		   tdb_hash(tdb, used->key.dptr, used->key.dsize),
+		   last_zone->zone_bits);
 	memcpy(u + 1, used->key.dptr, used->key.dsize);
 	memcpy(u + 1, used->key.dptr, used->key.dsize);
 	memcpy((char *)(u + 1) + used->key.dsize,
 	memcpy((char *)(u + 1) + used->key.dsize,
 	       used->data.dptr, used->data.dsize);
 	       used->data.dptr, used->data.dsize);
 }
 }
 
 
 static void set_hashtable(void *mem, struct tdb_context *tdb,
 static void set_hashtable(void *mem, struct tdb_context *tdb,
+			  struct tle_zone *last_zone,
 			  struct tle_hashtable *htable)
 			  struct tle_hashtable *htable)
 {
 {
 	struct tdb_used_record *u = mem;
 	struct tdb_used_record *u = mem;
 	tdb_len_t len = sizeof(tdb_off_t) << htable->hash_bits;
 	tdb_len_t len = sizeof(tdb_off_t) << htable->hash_bits;
 
 
-	set_header(tdb, u, 0, len, len + htable->extra, 0);
+	set_header(tdb, u, 0, len, len + htable->extra, 0,
+		   last_zone->zone_bits);
 	memset(u + 1, 0, len);
 	memset(u + 1, 0, len);
 }
 }
 
 
-static void set_freetable(void *mem, struct tdb_context *tdb,
-			  struct tle_freetable *ftable)
+static void set_zone(void *mem, struct tdb_context *tdb,
+		     struct tle_zone *zone)
 {
 {
-	struct tdb_used_record *u = mem;
-	tdb_len_t len = sizeof(tdb_off_t) * ftable->num_zones
-		* (ftable->num_buckets + 1);
-	set_header(tdb, u, 0, len, len + ftable->extra, 0);
-	memset(u + 1, 0, len);
+	struct free_zone_header *fz = mem;
+	memset(fz, 0, zone_header_len(zone));
+	fz->zone_bits = zone->zone_bits;
 }
 }
 
 
 static void add_to_freetable(struct tdb_context *tdb,
 static void add_to_freetable(struct tdb_context *tdb,
+			     struct tle_zone *last_zone,
 			     tdb_off_t eoff,
 			     tdb_off_t eoff,
 			     tdb_off_t elen)
 			     tdb_off_t elen)
 {
 {
-	add_free_record(tdb, eoff, sizeof(struct tdb_used_record) + elen);
+	add_free_record(tdb, last_zone->zone_bits, eoff,
+			sizeof(struct tdb_used_record) + elen);
 }
 }
 
 
 static void add_to_hashtable(struct tdb_context *tdb,
 static void add_to_hashtable(struct tdb_context *tdb,
@@ -170,48 +168,62 @@ static void add_to_hashtable(struct tdb_context *tdb,
 struct tdb_context *tdb_layout_get(struct tdb_layout *layout)
 struct tdb_context *tdb_layout_get(struct tdb_layout *layout)
 {
 {
 	unsigned int i;
 	unsigned int i;
-	tdb_off_t len;
+	tdb_off_t off, len;
+	tdb_len_t zone_left;
 	struct tdb_header *hdr;
 	struct tdb_header *hdr;
 	char *mem;
 	char *mem;
 	struct tdb_context *tdb;
 	struct tdb_context *tdb;
+	struct tle_zone *last_zone = NULL;
 
 
-	assert(layout->ftable != -1U);
 	assert(layout->htable != -1U);
 	assert(layout->htable != -1U);
+	assert(layout->elem[0].base.type == ZONE);
 
 
-	len = sizeof(struct tdb_header);
+	zone_left = 0;
+	off = sizeof(struct tdb_header);
 
 
 	/* First pass of layout: calc lengths */
 	/* First pass of layout: calc lengths */
 	for (i = 0; i < layout->num_elems; i++) {
 	for (i = 0; i < layout->num_elems; i++) {
 		union tdb_layout_elem *e = &layout->elem[i];
 		union tdb_layout_elem *e = &layout->elem[i];
-		e->base.off = len;
+		e->base.off = off;
 		switch (e->base.type) {
 		switch (e->base.type) {
+		case ZONE:
+			assert(zone_left == 0);
+			len = zone_header_len(&e->zone);
+			zone_left = 1ULL << e->zone.zone_bits;
+			break;
 		case FREE:
 		case FREE:
-			len += free_record_len(e->free.len);
+			if (e->free.len == 0)
+				e->free.len = zone_left
+					- sizeof(struct tdb_used_record);
+			len = free_record_len(e->free.len);
 			break;
 			break;
 		case DATA:
 		case DATA:
-			len += data_record_len(&e->used);
+			len = data_record_len(&e->used);
 			break;
 			break;
 		case HASHTABLE:
 		case HASHTABLE:
-			len += hashtable_len(&e->hashtable);
-			break;
-		case FREETABLE:
-			len += freetable_len(&e->freetable);
+			len = hashtable_len(&e->hashtable);
 			break;
 			break;
 		}
 		}
+		off += len;
+		assert(zone_left >= len);
+		zone_left -= len;
 	}
 	}
 
 
-	mem = malloc(len);
+	/* Fill final zone with free record. */
+	if (zone_left != 0) {
+		tdb_layout_add_free(layout,
+				    zone_left
+				    - sizeof(struct tdb_used_record));
+		layout->elem[layout->num_elems-1].base.off = off;
+		off += zone_left;
+	}
+
+	mem = malloc(off+1);
 	/* Now populate our header, cribbing from a real TDB header. */
 	/* Now populate our header, cribbing from a real TDB header. */
 	tdb = tdb_open(NULL, TDB_INTERNAL, O_RDWR, 0, &tap_log_attr);
 	tdb = tdb_open(NULL, TDB_INTERNAL, O_RDWR, 0, &tap_log_attr);
 	hdr = (void *)mem;
 	hdr = (void *)mem;
 	*hdr = tdb->header;
 	*hdr = tdb->header;
 	hdr->v.generation++;
 	hdr->v.generation++;
-	hdr->v.num_zones = layout->elem[layout->ftable].freetable.num_zones;
-	hdr->v.zone_bits = layout->elem[layout->ftable].freetable.zone_bits;
-	hdr->v.free_buckets
-		= layout->elem[layout->ftable].freetable.num_buckets;
-	hdr->v.free_off = layout->elem[layout->ftable].base.off
-		+ sizeof(struct tdb_used_record);
 	hdr->v.hash_bits = layout->elem[layout->htable].hashtable.hash_bits;
 	hdr->v.hash_bits = layout->elem[layout->htable].hashtable.hash_bits;
 	hdr->v.hash_off = layout->elem[layout->htable].base.off
 	hdr->v.hash_off = layout->elem[layout->htable].base.off
 		+ sizeof(struct tdb_used_record);
 		+ sizeof(struct tdb_used_record);
@@ -219,23 +231,26 @@ struct tdb_context *tdb_layout_get(struct tdb_layout *layout)
 	/* Mug the tdb we have to make it use this. */
 	/* Mug the tdb we have to make it use this. */
 	free(tdb->map_ptr);
 	free(tdb->map_ptr);
 	tdb->map_ptr = mem;
 	tdb->map_ptr = mem;
-	tdb->map_size = len;
+	tdb->map_size = off+1;
 	header_changed(tdb);
 	header_changed(tdb);
 
 
 	for (i = 0; i < layout->num_elems; i++) {
 	for (i = 0; i < layout->num_elems; i++) {
 		union tdb_layout_elem *e = &layout->elem[i];
 		union tdb_layout_elem *e = &layout->elem[i];
 		switch (e->base.type) {
 		switch (e->base.type) {
+		case ZONE:
+			set_zone(mem + e->base.off, tdb, &e->zone);
+			last_zone = &e->zone;
+			break;
 		case FREE:
 		case FREE:
 			set_free_record(mem + e->base.off, e->free.len);
 			set_free_record(mem + e->base.off, e->free.len);
 			break;
 			break;
 		case DATA:
 		case DATA:
-			set_data_record(mem + e->base.off, tdb, &e->used);
+			set_data_record(mem + e->base.off, tdb, last_zone,
+					&e->used);
 			break;
 			break;
 		case HASHTABLE:
 		case HASHTABLE:
-			set_hashtable(mem + e->base.off, tdb, &e->hashtable);
-			break;
-		case FREETABLE:
-			set_freetable(mem + e->base.off, tdb, &e->freetable);
+			set_hashtable(mem + e->base.off, tdb, last_zone,
+				      &e->hashtable);
 			break;
 			break;
 		}
 		}
 	}
 	}
@@ -244,8 +259,12 @@ struct tdb_context *tdb_layout_get(struct tdb_layout *layout)
 	for (i = 0; i < layout->num_elems; i++) {
 	for (i = 0; i < layout->num_elems; i++) {
 		union tdb_layout_elem *e = &layout->elem[i];
 		union tdb_layout_elem *e = &layout->elem[i];
 		switch (e->base.type) {
 		switch (e->base.type) {
+		case ZONE:
+			last_zone = &e->zone;
+			break;
 		case FREE:
 		case FREE:
-			add_to_freetable(tdb, e->base.off, e->free.len);
+			add_to_freetable(tdb, last_zone,
+					 e->base.off, e->free.len);
 			break;
 			break;
 		case DATA:
 		case DATA:
 			add_to_hashtable(tdb, e->base.off, e->used.key);
 			add_to_hashtable(tdb, e->base.off, e->used.key);
@@ -255,5 +274,7 @@ struct tdb_context *tdb_layout_get(struct tdb_layout *layout)
 		}
 		}
 	}
 	}
 
 
+	/* Write tailer. */
+	((uint8_t *)tdb->map_ptr)[tdb->map_size-1] = last_zone->zone_bits;
 	return tdb;
 	return tdb;
 }
 }

+ 11 - 16
ccan/tdb2/test/layout.h

@@ -3,6 +3,9 @@
 #include <ccan/tdb2/private.h>
 #include <ccan/tdb2/private.h>
 
 
 struct tdb_layout *new_tdb_layout(void);
 struct tdb_layout *new_tdb_layout(void);
+void tdb_layout_add_zone(struct tdb_layout *layout,
+			 unsigned int zone_bits,
+			 bool fill_prev);
 void tdb_layout_add_free(struct tdb_layout *layout, tdb_len_t len);
 void tdb_layout_add_free(struct tdb_layout *layout, tdb_len_t len);
 void tdb_layout_add_used(struct tdb_layout *layout,
 void tdb_layout_add_used(struct tdb_layout *layout,
 			 TDB_DATA key, TDB_DATA data,
 			 TDB_DATA key, TDB_DATA data,
@@ -10,15 +13,10 @@ void tdb_layout_add_used(struct tdb_layout *layout,
 void tdb_layout_add_hashtable(struct tdb_layout *layout,
 void tdb_layout_add_hashtable(struct tdb_layout *layout,
 			      unsigned int hash_bits,
 			      unsigned int hash_bits,
 			      tdb_len_t extra);
 			      tdb_len_t extra);
-void tdb_layout_add_freetable(struct tdb_layout *layout,
-			      unsigned int num_zones,
-			      unsigned int zone_bits,
-			      unsigned int num_buckets,
-			      tdb_len_t extra);
 struct tdb_context *tdb_layout_get(struct tdb_layout *layout);
 struct tdb_context *tdb_layout_get(struct tdb_layout *layout);
 
 
 enum layout_type {
 enum layout_type {
-	FREE, DATA, HASHTABLE, FREETABLE,
+	ZONE, FREE, DATA, HASHTABLE,
 };
 };
 
 
 /* Shared by all union members. */
 /* Shared by all union members. */
@@ -27,6 +25,11 @@ struct tle_base {
 	tdb_off_t off;
 	tdb_off_t off;
 };
 };
 
 
+struct tle_zone {
+	struct tle_base base;
+	unsigned int zone_bits;
+};
+
 struct tle_free {
 struct tle_free {
 	struct tle_base base;
 	struct tle_base base;
 	tdb_len_t len;
 	tdb_len_t len;
@@ -45,25 +48,17 @@ struct tle_hashtable {
 	tdb_len_t extra;
 	tdb_len_t extra;
 };
 };
 
 
-struct tle_freetable {
-	struct tle_base base;
-	unsigned int num_zones;
-	unsigned int zone_bits;
-	unsigned int num_buckets;
-	tdb_len_t extra;
-};
-
 union tdb_layout_elem {
 union tdb_layout_elem {
 	struct tle_base base;
 	struct tle_base base;
+	struct tle_zone zone;
 	struct tle_free free;
 	struct tle_free free;
 	struct tle_used used;
 	struct tle_used used;
-	struct tle_freetable freetable;
 	struct tle_hashtable hashtable;
 	struct tle_hashtable hashtable;
 };
 };
 
 
 struct tdb_layout {
 struct tdb_layout {
 	unsigned int num_elems;
 	unsigned int num_elems;
 	union tdb_layout_elem *elem;
 	union tdb_layout_elem *elem;
-	unsigned int ftable, htable;
+	unsigned int htable;
 };
 };
 #endif /* TDB2_TEST_LAYOUT_H */
 #endif /* TDB2_TEST_LAYOUT_H */

+ 11 - 6
ccan/tdb2/test/run-encode.c → ccan/tdb2/test/run-001-encode.c

@@ -11,30 +11,35 @@ int main(int argc, char *argv[])
 	struct tdb_used_record rec;
 	struct tdb_used_record rec;
 	struct tdb_context tdb = { .log = tap_log_fn, .log_priv = NULL };
 	struct tdb_context tdb = { .log = tap_log_fn, .log_priv = NULL };
 
 
-	plan_tests(64 + 32 + 48*6 + 1);
+	plan_tests(64 + 32 + 48*7 + 1);
 
 
 	/* We should be able to encode any data value. */
 	/* We should be able to encode any data value. */
 	for (i = 0; i < 64; i++)
 	for (i = 0; i < 64; i++)
-		ok1(set_header(&tdb, &rec, 0, 1ULL << i, 1ULL << i, 0) == 0);
+		ok1(set_header(&tdb, &rec, 0, 1ULL << i, 1ULL << i, 0, 0)
+		    == 0);
 
 
 	/* And any key and data with < 64 bits between them. */
 	/* And any key and data with < 64 bits between them. */
 	for (i = 0; i < 32; i++) {
 	for (i = 0; i < 32; i++) {
 		tdb_len_t dlen = 1ULL >> (63 - i), klen = 1ULL << i;
 		tdb_len_t dlen = 1ULL >> (63 - i), klen = 1ULL << i;
-		ok1(set_header(&tdb, &rec, klen, dlen, klen + dlen, 0) == 0);
+		ok1(set_header(&tdb, &rec, klen, dlen, klen + dlen, 0, 0)
+		    == 0);
 	}
 	}
 
 
 	/* We should neatly encode all values. */
 	/* We should neatly encode all values. */
 	for (i = 0; i < 48; i++) {
 	for (i = 0; i < 48; i++) {
-		uint64_t h = 1ULL << (i < 11 ? 63 - i : 63 - 10);
+		uint64_t h = 1ULL << (i < 5 ? 63 - i : 63 - 4);
 		uint64_t klen = 1ULL << (i < 16 ? i : 15);
 		uint64_t klen = 1ULL << (i < 16 ? i : 15);
 		uint64_t dlen = 1ULL << i;
 		uint64_t dlen = 1ULL << i;
 		uint64_t xlen = 1ULL << (i < 32 ? i : 31);
 		uint64_t xlen = 1ULL << (i < 32 ? i : 31);
-		ok1(set_header(&tdb, &rec, klen, dlen, klen + dlen + xlen, h)
+		uint64_t zbits = 1ULL << (i < 6 ? i : 5);
+		ok1(set_header(&tdb, &rec, klen, dlen, klen + dlen + xlen, h,
+			       zbits)
 		    == 0);
 		    == 0);
 		ok1(rec_key_length(&rec) == klen);
 		ok1(rec_key_length(&rec) == klen);
 		ok1(rec_data_length(&rec) == dlen);
 		ok1(rec_data_length(&rec) == dlen);
 		ok1(rec_extra_padding(&rec) == xlen);
 		ok1(rec_extra_padding(&rec) == xlen);
-		ok1(rec_hash(&rec) << (64 - 11) == h);
+		ok1((uint64_t)rec_hash(&rec) << (64 - 5) == h);
+		ok1(rec_zone_bits(&rec) == zbits);
 		ok1(rec_magic(&rec) == TDB_MAGIC);
 		ok1(rec_magic(&rec) == TDB_MAGIC);
 	}
 	}
 	ok1(tap_log_messages == 0);
 	ok1(tap_log_messages == 0);

+ 0 - 0
ccan/tdb2/test/run-fls.c → ccan/tdb2/test/run-001-fls.c


+ 1 - 1
ccan/tdb2/test/run-new_database.c → ccan/tdb2/test/run-01-new_database.c

@@ -16,7 +16,7 @@ int main(int argc, char *argv[])
 
 
 	plan_tests(sizeof(flags) / sizeof(flags[0]) * 2 + 1);
 	plan_tests(sizeof(flags) / sizeof(flags[0]) * 2 + 1);
 	for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
 	for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
-		tdb = tdb_open("run-new_database", flags[i],
+		tdb = tdb_open("run-new_database.tdb", flags[i],
 			       O_RDWR|O_CREAT|O_TRUNC, 0600, &tap_log_attr);
 			       O_RDWR|O_CREAT|O_TRUNC, 0600, &tap_log_attr);
 		ok1(tdb);
 		ok1(tdb);
 		if (tdb) {
 		if (tdb) {

+ 62 - 0
ccan/tdb2/test/run-02-expand.c

@@ -0,0 +1,62 @@
+#include <ccan/tdb2/tdb.c>
+#include <ccan/tdb2/free.c>
+#include <ccan/tdb2/lock.c>
+#include <ccan/tdb2/io.c>
+#include <ccan/tdb2/check.c>
+#include <ccan/tap/tap.h>
+#include "logging.h"
+
+int main(int argc, char *argv[])
+{
+	unsigned int i;
+	uint64_t val;
+	struct tdb_context *tdb;
+	int flags[] = { TDB_INTERNAL, TDB_DEFAULT, TDB_NOMMAP,
+			TDB_INTERNAL|TDB_CONVERT, TDB_CONVERT,
+			TDB_NOMMAP|TDB_CONVERT };
+
+	plan_tests(sizeof(flags) / sizeof(flags[0]) * 18 + 1);
+
+	for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
+		tdb = tdb_open("run-expand.tdb", flags[i],
+			       O_RDWR|O_CREAT|O_TRUNC, 0600, &tap_log_attr);
+		ok1(tdb);
+		if (!tdb)
+			continue;
+
+		/* First expand. Should add a zone, doubling file size.. */
+		val = tdb->map_size - 1 - sizeof(struct tdb_header);
+		ok1(tdb_expand(tdb, 1, 1, false) == 0);
+		ok1(tdb->map_size == 2 * val + 1 + sizeof(struct tdb_header));
+		ok1(tdb_check(tdb, NULL, NULL) == 0);
+
+		/* Second expand, add another zone of same size. */
+		ok1(tdb_expand(tdb, 1, 1, false) == 0);
+		ok1(tdb->map_size == 3 * val + 1 + sizeof(struct tdb_header));
+		ok1(tdb_check(tdb, NULL, NULL) == 0);
+
+		/* Large expand, but can only add 4th zone of same size. */
+		ok1(tdb_expand(tdb, 0, 4*val, false) == 0);
+		ok1(tdb->map_size == 4 * val + 1 + sizeof(struct tdb_header));
+		ok1(tdb_check(tdb, NULL, NULL) == 0);
+
+		/* Large expand now will double file. */
+		ok1(tdb_expand(tdb, 0, 4*val, false) == 0);
+		ok1(tdb->map_size == 8 * val + 1 + sizeof(struct tdb_header));
+		ok1(tdb_check(tdb, NULL, NULL) == 0);
+
+		/* And again? */
+		ok1(tdb_expand(tdb, 0, 4*val, false) == 0);
+		ok1(tdb->map_size == 16 * val + 1 + sizeof(struct tdb_header));
+		ok1(tdb_check(tdb, NULL, NULL) == 0);
+
+		/* Below comfort level, will add a single 8*val zone. */
+		ok1(tdb_expand(tdb, 0, ((8*val) >> TDB_COMFORT_FACTOR_BITS)
+			       - sizeof(struct tdb_used_record), false) == 0);
+		ok1(tdb->map_size == 24 * val + 1 + sizeof(struct tdb_header));
+		tdb_close(tdb);
+	}
+
+	ok1(tap_log_messages == 0);
+	return exit_status();
+}

+ 183 - 0
ccan/tdb2/test/run-03-coalesce.c

@@ -0,0 +1,183 @@
+#include <ccan/tdb2/tdb.c>
+#include <ccan/tdb2/free.c>
+#include <ccan/tdb2/lock.c>
+#include <ccan/tdb2/io.c>
+#include <ccan/tdb2/check.c>
+#include <ccan/tap/tap.h>
+#include "logging.h"
+#include "layout.h"
+
+static tdb_len_t free_record_length(struct tdb_context *tdb, tdb_off_t off)
+{
+	struct tdb_free_record f;
+
+	if (tdb_read_convert(tdb, off, &f, sizeof(f)) != 0)
+		return TDB_OFF_ERR;
+	if (frec_magic(&f) != TDB_FREE_MAGIC)
+		return TDB_OFF_ERR;
+	return f.data_len;
+}
+
+int main(int argc, char *argv[])
+{
+	tdb_off_t b_off, zone_off;
+	struct tdb_context *tdb;
+	struct tdb_layout *layout;
+	struct tdb_data data, key;
+	tdb_len_t len;
+	unsigned int zone_bits = 16;
+
+	/* FIXME: Test TDB_CONVERT */
+
+	plan_tests(45);
+	data.dptr = (void *)"world";
+	data.dsize = 5;
+	key.dptr = (void *)"hello";
+	key.dsize = 5;
+
+	/* No coalescing can be done due to EOF */
+	layout = new_tdb_layout();
+	tdb_layout_add_zone(layout, zone_bits, false);
+	tdb_layout_add_hashtable(layout, 12, 0);
+	tdb = tdb_layout_get(layout);
+	len = layout->elem[2].free.len;
+	zone_off = layout->elem[0].base.off;
+	ok1(tdb_check(tdb, NULL, NULL) == 0);
+	ok1(free_record_length(tdb, layout->elem[2].base.off) == len);
+
+	/* Figure out which bucket free entry is. */
+	b_off = bucket_off(zone_off, size_to_bucket(zone_bits, len));
+	/* Lock and fail to coalesce. */
+	ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0);
+	ok1(coalesce(tdb, zone_off, zone_bits, layout->elem[2].base.off,
+		     b_off, len) == 0);
+	tdb_unlock_free_bucket(tdb, b_off);
+	tdb_unlock_list(tdb, 0, F_WRLCK);
+	ok1(free_record_length(tdb, layout->elem[2].base.off) == len);
+	ok1(tdb_check(tdb, NULL, NULL) == 0);
+	tdb_close(tdb);
+
+	/* No coalescing can be done due to used record */
+	layout = new_tdb_layout();
+	tdb_layout_add_zone(layout, zone_bits, false);
+	tdb_layout_add_hashtable(layout, 12, 0);
+	tdb_layout_add_free(layout, 1024);
+	tdb_layout_add_used(layout, key, data, 6);
+	tdb = tdb_layout_get(layout);
+	zone_off = layout->elem[0].base.off;
+	ok1(free_record_length(tdb, layout->elem[2].base.off) == 1024);
+	ok1(tdb_check(tdb, NULL, NULL) == 0);
+
+	/* Figure out which bucket free entry is. */
+	b_off = bucket_off(zone_off, size_to_bucket(zone_bits, 1024));
+	/* Lock and fail to coalesce. */
+	ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0);
+	ok1(coalesce(tdb, zone_off, zone_bits, layout->elem[2].base.off,
+		     b_off, 1024) == 0);
+	tdb_unlock_free_bucket(tdb, b_off);
+	ok1(free_record_length(tdb, layout->elem[2].base.off) == 1024);
+	ok1(tdb_check(tdb, NULL, NULL) == 0);
+	tdb_close(tdb);
+
+	/* Coalescing can be done due to two free records, then EOF */
+	layout = new_tdb_layout();
+	tdb_layout_add_zone(layout, zone_bits, false);
+	tdb_layout_add_hashtable(layout, 12, 0);
+	tdb_layout_add_free(layout, 1024);
+	tdb = tdb_layout_get(layout);
+	zone_off = layout->elem[0].base.off;
+	len = layout->elem[3].free.len;
+	ok1(free_record_length(tdb, layout->elem[2].base.off) == 1024);
+	ok1(free_record_length(tdb, layout->elem[3].base.off) == len);
+	ok1(tdb_check(tdb, NULL, NULL) == 0);
+
+	/* Figure out which bucket (first) free entry is. */
+	b_off = bucket_off(zone_off, size_to_bucket(zone_bits, 1024));
+	/* Lock and coalesce. */
+	ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0);
+	ok1(coalesce(tdb, zone_off, zone_bits, layout->elem[2].base.off,
+		     b_off, 1024) == 1);
+	ok1(!tdb_has_locks(tdb));
+	ok1(free_record_length(tdb, layout->elem[2].base.off)
+	    == 1024 + sizeof(struct tdb_used_record) + len);
+	ok1(tdb_check(tdb, NULL, NULL) == 0);
+	tdb_close(tdb);
+
+	/* Coalescing can be done due to two free records, then data */
+	layout = new_tdb_layout();
+	tdb_layout_add_zone(layout, zone_bits, false);
+	tdb_layout_add_hashtable(layout, 12, 0);
+	tdb_layout_add_free(layout, 1024);
+	tdb_layout_add_free(layout, 512);
+	tdb_layout_add_used(layout, key, data, 6);
+	tdb = tdb_layout_get(layout);
+	zone_off = layout->elem[0].base.off;
+	ok1(free_record_length(tdb, layout->elem[2].base.off) == 1024);
+	ok1(free_record_length(tdb, layout->elem[3].base.off) == 512);
+	ok1(tdb_check(tdb, NULL, NULL) == 0);
+
+	/* Figure out which bucket free entry is. */
+	b_off = bucket_off(zone_off, size_to_bucket(zone_bits, 1024));
+	/* Lock and coalesce. */
+	ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0);
+	ok1(coalesce(tdb, zone_off, zone_bits, layout->elem[2].base.off,
+		     b_off, 1024) == 1);
+	ok1(!tdb_has_locks(tdb));
+	ok1(free_record_length(tdb, layout->elem[2].base.off)
+	    == 1024 + sizeof(struct tdb_used_record) + 512);
+	ok1(tdb_check(tdb, NULL, NULL) == 0);
+	tdb_close(tdb);
+
+	/* Coalescing can be done due to three free records, then EOF */
+	layout = new_tdb_layout();
+	tdb_layout_add_zone(layout, zone_bits, false);
+	tdb_layout_add_hashtable(layout, 12, 0);
+	tdb_layout_add_free(layout, 1024);
+	tdb_layout_add_free(layout, 512);
+	tdb = tdb_layout_get(layout);
+	zone_off = layout->elem[0].base.off;
+	len = layout->elem[4].free.len;
+	ok1(free_record_length(tdb, layout->elem[2].base.off) == 1024);
+	ok1(free_record_length(tdb, layout->elem[3].base.off) == 512);
+	ok1(free_record_length(tdb, layout->elem[4].base.off) == len);
+	ok1(tdb_check(tdb, NULL, NULL) == 0);
+
+	/* Figure out which bucket free entry is. */
+	b_off = bucket_off(zone_off, size_to_bucket(zone_bits, 1024));
+	/* Lock and coalesce. */
+	ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0);
+	ok1(coalesce(tdb, zone_off, zone_bits, layout->elem[2].base.off,
+		     b_off, 1024) == 1);
+	ok1(!tdb_has_locks(tdb));
+	ok1(free_record_length(tdb, layout->elem[2].base.off)
+	    == 1024 + sizeof(struct tdb_used_record) + 512
+	    + sizeof(struct tdb_used_record) + len);
+	ok1(tdb_check(tdb, NULL, NULL) == 0);
+	tdb_close(tdb);
+
+	/* Coalescing across two zones isn't possible. */
+	layout = new_tdb_layout();
+	tdb_layout_add_zone(layout, zone_bits, false);
+	tdb_layout_add_hashtable(layout, 12, 0);
+	tdb_layout_add_zone(layout, zone_bits, true);
+	tdb = tdb_layout_get(layout);
+	zone_off = layout->elem[0].base.off;
+	len = layout->elem[2].free.len;
+	ok1(free_record_length(tdb, layout->elem[2].base.off) == len);
+	ok1(tdb_check(tdb, NULL, NULL) == 0);
+
+	/* Figure out which list free entry is. */
+	b_off = bucket_off(zone_off, size_to_bucket(zone_bits, len));
+	/* Lock and coalesce. */
+	ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0);
+	ok1(coalesce(tdb, zone_off, zone_bits, layout->elem[2].base.off,
+		     b_off, len) == 0);
+	tdb_unlock_free_bucket(tdb, b_off);
+	ok1(!tdb_has_locks(tdb));
+	ok1(free_record_length(tdb, layout->elem[2].base.off) == len);
+	ok1(tdb_check(tdb, NULL, NULL) == 0);
+	tdb_close(tdb);
+
+	ok1(tap_log_messages == 0);
+	return exit_status();
+}

+ 0 - 211
ccan/tdb2/test/run-coalesce.c

@@ -1,211 +0,0 @@
-#include <ccan/tdb2/tdb.c>
-#include <ccan/tdb2/free.c>
-#include <ccan/tdb2/lock.c>
-#include <ccan/tdb2/io.c>
-#include <ccan/tdb2/check.c>
-#include <ccan/tap/tap.h>
-#include "logging.h"
-#include "layout.h"
-
-static tdb_len_t free_record_length(struct tdb_context *tdb, tdb_off_t off)
-{
-	struct tdb_free_record f;
-
-	if (tdb_read_convert(tdb, off, &f, sizeof(f)) != 0)
-		return TDB_OFF_ERR;
-	if (f.magic != TDB_FREE_MAGIC)
-		return TDB_OFF_ERR;
-	return f.data_len;
-}
-
-int main(int argc, char *argv[])
-{
-	tdb_off_t list;
-	struct tdb_context *tdb;
-	struct tdb_layout *layout;
-	struct tdb_data data, key;
-	tdb_len_t total;
-	unsigned int i;
-
-	/* FIXME: Test TDB_CONVERT */
-
-	plan_tests(62);
-	data.dptr = (void *)"world";
-	data.dsize = 5;
-	key.dptr = (void *)"hello";
-	key.dsize = 5;
-
-	/* No coalescing can be done due to EOF */
-	layout = new_tdb_layout();
-	tdb_layout_add_hashtable(layout, 12, 0);
-	tdb_layout_add_freetable(layout, 1, 16, 12, 0);
-	tdb_layout_add_free(layout, 1024);
-	tdb = tdb_layout_get(layout);
-	ok1(tdb_check(tdb, NULL, NULL) == 0);
-	ok1(free_record_length(tdb, layout->elem[2].base.off) == 1024);
-
-	/* Figure out which list free entry is. */
-	list = size_to_bucket(tdb, 1024);
-	/* Lock and fail to coalesce. */
-	ok1(tdb_lock_list(tdb, 0, F_WRLCK, TDB_LOCK_WAIT) == 0);
-	ok1(tdb_lock_free_list(tdb, list, TDB_LOCK_WAIT) == 0);
-	ok1(coalesce(tdb, layout->elem[2].base.off, list, 1024) == 0);
-	tdb_unlock_free_list(tdb, list);
-	tdb_unlock_list(tdb, 0, F_WRLCK);
-	ok1(free_record_length(tdb, layout->elem[2].base.off) == 1024);
-	ok1(tdb_check(tdb, NULL, NULL) == 0);
-	tdb_close(tdb);
-
-	/* No coalescing can be done due to used record */
-	layout = new_tdb_layout();
-	tdb_layout_add_hashtable(layout, 12, 0);
-	tdb_layout_add_freetable(layout, 1, 16, 12, 0);
-	tdb_layout_add_free(layout, 1024);
-	tdb_layout_add_used(layout, key, data, 6);
-	tdb = tdb_layout_get(layout);
-	ok1(free_record_length(tdb, layout->elem[2].base.off) == 1024);
-	ok1(tdb_check(tdb, NULL, NULL) == 0);
-
-	/* Figure out which list free entry is. */
-	list = size_to_bucket(tdb, 1024);
-	/* Lock and fail to coalesce. */
-	ok1(tdb_lock_list(tdb, 0, F_WRLCK, TDB_LOCK_WAIT) == 0);
-	ok1(tdb_lock_free_list(tdb, list, TDB_LOCK_WAIT) == 0);
-	ok1(coalesce(tdb, layout->elem[2].base.off, list, 1024) == 0);
-	tdb_unlock_free_list(tdb, list);
-	tdb_unlock_list(tdb, 0, F_WRLCK);
-	ok1(free_record_length(tdb, layout->elem[2].base.off) == 1024);
-	ok1(tdb_check(tdb, NULL, NULL) == 0);
-	tdb_close(tdb);
-
-	/* Coalescing can be done due to two free records, then EOF */
-	layout = new_tdb_layout();
-	tdb_layout_add_hashtable(layout, 12, 0);
-	tdb_layout_add_freetable(layout, 1, 16, 12, 0);
-	tdb_layout_add_free(layout, 1024);
-	tdb_layout_add_free(layout, 512);
-	tdb = tdb_layout_get(layout);
-	ok1(free_record_length(tdb, layout->elem[2].base.off) == 1024);
-	ok1(free_record_length(tdb, layout->elem[3].base.off) == 512);
-	ok1(tdb_check(tdb, NULL, NULL) == 0);
-
-	/* Figure out which list free entry is. */
-	list = size_to_bucket(tdb, 1024);
-	/* Lock and coalesce. */
-	ok1(tdb_lock_list(tdb, 0, F_WRLCK, TDB_LOCK_WAIT) == 0);
-	ok1(tdb_lock_free_list(tdb, list, TDB_LOCK_WAIT) == 0);
-	ok1(coalesce(tdb, layout->elem[2].base.off, list, 1024) == 1);
-	tdb_unlock_list(tdb, 0, F_WRLCK);
-	ok1(!tdb_has_locks(tdb));
-	ok1(free_record_length(tdb, layout->elem[2].base.off)
-	    == 1024 + sizeof(struct tdb_used_record) + 512);
-	ok1(tdb_check(tdb, NULL, NULL) == 0);
-	tdb_close(tdb);
-
-	/* Coalescing can be done due to two free records, then data */
-	layout = new_tdb_layout();
-	tdb_layout_add_hashtable(layout, 12, 0);
-	tdb_layout_add_freetable(layout, 1, 16, 12, 0);
-	tdb_layout_add_free(layout, 1024);
-	tdb_layout_add_free(layout, 512);
-	tdb_layout_add_used(layout, key, data, 6);
-	tdb = tdb_layout_get(layout);
-	ok1(free_record_length(tdb, layout->elem[2].base.off) == 1024);
-	ok1(free_record_length(tdb, layout->elem[3].base.off) == 512);
-	ok1(tdb_check(tdb, NULL, NULL) == 0);
-
-	/* Figure out which list free entry is. */
-	list = size_to_bucket(tdb, 1024);
-	/* Lock and coalesce. */
-	ok1(tdb_lock_list(tdb, 0, F_WRLCK, TDB_LOCK_WAIT) == 0);
-	ok1(tdb_lock_free_list(tdb, list, TDB_LOCK_WAIT) == 0);
-	ok1(coalesce(tdb, layout->elem[2].base.off, list, 1024) == 1);
-	tdb_unlock_list(tdb, 0, F_WRLCK);
-	ok1(!tdb_has_locks(tdb));
-	ok1(free_record_length(tdb, layout->elem[2].base.off)
-	    == 1024 + sizeof(struct tdb_used_record) + 512);
-	ok1(tdb_check(tdb, NULL, NULL) == 0);
-	tdb_close(tdb);
-
-	/* Coalescing can be done due to three free records, then EOF */
-	layout = new_tdb_layout();
-	tdb_layout_add_hashtable(layout, 12, 0);
-	tdb_layout_add_freetable(layout, 1, 16, 12, 0);
-	tdb_layout_add_free(layout, 1024);
-	tdb_layout_add_free(layout, 512);
-	tdb_layout_add_free(layout, 32);
-	tdb = tdb_layout_get(layout);
-	ok1(free_record_length(tdb, layout->elem[2].base.off) == 1024);
-	ok1(free_record_length(tdb, layout->elem[3].base.off) == 512);
-	ok1(free_record_length(tdb, layout->elem[4].base.off) == 32);
-	ok1(tdb_check(tdb, NULL, NULL) == 0);
-
-	/* Figure out which list free entry is. */
-	list = size_to_bucket(tdb, 1024);
-	/* Lock and coalesce. */
-	ok1(tdb_lock_list(tdb, 0, F_WRLCK, TDB_LOCK_WAIT) == 0);
-	ok1(tdb_lock_free_list(tdb, list, TDB_LOCK_WAIT) == 0);
-	ok1(coalesce(tdb, layout->elem[2].base.off, list, 1024) == 1);
-	tdb_unlock_list(tdb, 0, F_WRLCK);
-	ok1(!tdb_has_locks(tdb));
-	ok1(free_record_length(tdb, layout->elem[2].base.off)
-	    == 1024 + sizeof(struct tdb_used_record) + 512
-	    + sizeof(struct tdb_used_record) + 32);
-	ok1(tdb_check(tdb, NULL, NULL) == 0);
-	tdb_close(tdb);
-
-	/* Coalescing across two zones. */
-	layout = new_tdb_layout();
-	tdb_layout_add_hashtable(layout, 12, 0);
-	tdb_layout_add_freetable(layout, 2, 16, 12, 0);
-	tdb_layout_add_free(layout, 32768);
-	tdb_layout_add_free(layout, 30000);
-	tdb = tdb_layout_get(layout);
-	ok1(free_record_length(tdb, layout->elem[2].base.off) == 32768);
-	ok1(zone_of(tdb, layout->elem[2].base.off) == 0);
-	ok1(free_record_length(tdb, layout->elem[3].base.off) == 30000);
-	ok1(zone_of(tdb, layout->elem[3].base.off) == 1);
-	ok1(tdb_check(tdb, NULL, NULL) == 0);
-
-	/* Figure out which list free entry is. */
-	list = size_to_bucket(tdb, 32768);
-	/* Lock and coalesce. */
-	ok1(tdb_lock_list(tdb, 0, F_WRLCK, TDB_LOCK_WAIT) == 0);
-	ok1(tdb_lock_free_list(tdb, list, TDB_LOCK_WAIT) == 0);
-	ok1(coalesce(tdb, layout->elem[2].base.off, list, 32768) == 1);
-	tdb_unlock_list(tdb, 0, F_WRLCK);
-	ok1(!tdb_has_locks(tdb));
-	ok1(free_record_length(tdb, layout->elem[2].base.off)
-	    == 32768 + sizeof(struct tdb_used_record) + 30000);
-	ok1(tdb_check(tdb, NULL, NULL) == 0);
-	tdb_close(tdb);
-
-	/* Coalescing many across many zones. */
-	layout = new_tdb_layout();
-	tdb_layout_add_hashtable(layout, 12, 0);
-	tdb_layout_add_freetable(layout, 8, 16, 12, 0);
-	total = 0;
-	for (i = 4; i < 16; i++) {
-		tdb_layout_add_free(layout, 1 << i);
-		total += sizeof(struct tdb_used_record) + (1 << i);
-	}
-	total -= sizeof(struct tdb_used_record);
-	tdb = tdb_layout_get(layout);
-	ok1(free_record_length(tdb, layout->elem[2].base.off) == 1 << 4);
-	ok1(tdb_check(tdb, NULL, NULL) == 0);
-
-	/* Figure out which list free entry is. */
-	list = size_to_bucket(tdb, 1 << 4);
-	/* Lock and coalesce. */
-	ok1(tdb_lock_list(tdb, 0, F_WRLCK, TDB_LOCK_WAIT) == 0);
-	ok1(tdb_lock_free_list(tdb, list, TDB_LOCK_WAIT) == 0);
-	ok1(coalesce(tdb, layout->elem[2].base.off, list, 1 << 4) == 1);
-	tdb_unlock_list(tdb, 0, F_WRLCK);
-	ok1(!tdb_has_locks(tdb));
-	ok1(free_record_length(tdb, layout->elem[2].base.off) == total);
-	ok1(tdb_check(tdb, NULL, NULL) == 0);
-	tdb_close(tdb);
-
-	ok1(tap_log_messages == 0);
-	return exit_status();
-}

+ 0 - 127
ccan/tdb2/test/run-expand.c

@@ -1,127 +0,0 @@
-#include <ccan/tdb2/tdb.c>
-#include <ccan/tdb2/free.c>
-#include <ccan/tdb2/lock.c>
-#include <ccan/tdb2/io.c>
-#include <ccan/tdb2/check.c>
-#include <ccan/tap/tap.h>
-#include "logging.h"
-
-/* Release lock to check db. */
-static void check(struct tdb_context *tdb)
-{
-	tdb_allrecord_unlock(tdb, F_WRLCK);
-	ok1(tdb_check(tdb, NULL, NULL) == 0);
-	ok1(tdb_allrecord_lock(tdb, F_WRLCK, TDB_LOCK_WAIT, false) == 0);
-}
-
-int main(int argc, char *argv[])
-{
-	unsigned int i;
-	tdb_off_t off;
-	uint64_t val, buckets;
-	struct tdb_context *tdb;
-	int flags[] = { TDB_INTERNAL, TDB_DEFAULT, TDB_NOMMAP,
-			TDB_INTERNAL|TDB_CONVERT, TDB_CONVERT,
-			TDB_NOMMAP|TDB_CONVERT };
-
-	plan_tests(sizeof(flags) / sizeof(flags[0]) * 40 + 1);
-
-	/* First, lower level expansion tests. */
-	for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
-		tdb = tdb_open("run-expand.tdb", flags[i],
-			       O_RDWR|O_CREAT|O_TRUNC, 0600, &tap_log_attr);
-		ok1(tdb);
-		if (!tdb)
-			continue;
-
-		ok1(tdb_allrecord_lock(tdb, F_WRLCK, TDB_LOCK_WAIT, false)
-		    == 0);
-
-		/* Expanding file is pretty easy. */
-		off = expand_to_fill_zones(tdb);
-		ok1(off > 0 && off != TDB_OFF_ERR);
-		check(tdb);
-
-		/* Second expand should do nothing. */
-		ok1(expand_to_fill_zones(tdb) == 0);
-		check(tdb);
-
-		/* Now, try adding a zone. */
-		val = tdb->header.v.num_zones + 1;
-		ok1(update_zones(tdb, val,
-				 tdb->header.v.zone_bits,
-				 tdb->header.v.free_buckets,
-				 1ULL << tdb->header.v.zone_bits) == 0);
-		ok1(tdb->header.v.num_zones == val);
-		check(tdb);
-
-		/* Now, try doubling zone size. */
-		val = tdb->header.v.zone_bits + 1;
-		ok1(update_zones(tdb, tdb->header.v.num_zones,
-				 val,
-				 tdb->header.v.free_buckets,
-				 1ULL << val) == 0);
-		ok1(tdb->header.v.zone_bits == val);
-		check(tdb);
-
-		/* Now, try adding a zone, and a bucket. */
-		val = tdb->header.v.num_zones + 1;
-		buckets = tdb->header.v.free_buckets + 1;
-		ok1(update_zones(tdb, val,
-				 tdb->header.v.zone_bits,
-				 buckets,
-				 1ULL << tdb->header.v.zone_bits) == 0);
-		ok1(tdb->header.v.num_zones == val);
-		ok1(tdb->header.v.free_buckets == buckets);
-		check(tdb);
-
-		/* Now, try doubling zone size, and adding a bucket. */
-		val = tdb->header.v.zone_bits + 1;
-		buckets = tdb->header.v.free_buckets + 1;
-		ok1(update_zones(tdb, tdb->header.v.num_zones,
-				 val,
-				 buckets,
-				 1ULL << val) == 0);
-		ok1(tdb->header.v.zone_bits == val);
-		ok1(tdb->header.v.free_buckets == buckets);
-		check(tdb);
-
-		/* Now, try massive zone increase. */
-		val = tdb->header.v.zone_bits + 4;
-		ok1(update_zones(tdb, tdb->header.v.num_zones,
-				 val,
-				 tdb->header.v.free_buckets,
-				 1ULL << val) == 0);
-		ok1(tdb->header.v.zone_bits == val);
-		check(tdb);
-
-		tdb_allrecord_unlock(tdb, F_WRLCK);
-		tdb_close(tdb);
-	}
-
-	/* Now using tdb_expand. */
-	for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
-		tdb = tdb_open("run-expand.tdb", flags[i],
-			       O_RDWR|O_CREAT|O_TRUNC, 0600, &tap_log_attr);
-		ok1(tdb);
-		if (!tdb)
-			continue;
-
-		/* First expand (expand file to fill zone). */
-		ok1(tdb_expand(tdb, 1, 1, false) == 0);
-		ok1(tdb->header.v.num_zones == 1);
-		ok1(tdb_check(tdb, NULL, NULL) == 0);
-		/* Little expand (extra zone). */
-		ok1(tdb_expand(tdb, 1, 1, false) == 0);
-		ok1(tdb->header.v.num_zones == 2);
-		ok1(tdb_check(tdb, NULL, NULL) == 0);
-		/* Big expand (enlarge zones) */
-		ok1(tdb_expand(tdb, 1, 4096, false) == 0);
-		ok1(tdb->header.v.num_zones == 2);
-		ok1(tdb_check(tdb, NULL, NULL) == 0);
-		tdb_close(tdb);
-	}
-
-	ok1(tap_log_messages == 0);
-	return exit_status();
-}