Browse Source

tdb2: fix tdb_chainlock

We can't enlarge the lock without risking deadlock, so tdb_chainlock() can't
simply grab a one-byte lock; it needs to grab the lock we'd need to protect
the hash.

In theory, tdb_chainlock_read() could use a one-byte lock though.
Rusty Russell 15 years ago
parent
commit
c5e3f07a30
2 changed files with 60 additions and 33 deletions
  1. 51 5
      ccan/tdb2/hash.c
  2. 9 28
      ccan/tdb2/lock.c

+ 51 - 5
ccan/tdb2/hash.c

@@ -129,6 +129,13 @@ bool is_subhash(tdb_off_t val)
 	return val >> (64-TDB_OFF_UPPER_STEAL) == (1<<TDB_OFF_UPPER_STEAL) - 1;
 	return val >> (64-TDB_OFF_UPPER_STEAL) == (1<<TDB_OFF_UPPER_STEAL) - 1;
 }
 }
 
 
+/* FIXME: Guess the depth, don't over-lock! */
+static tdb_off_t hlock_range(tdb_off_t group, tdb_off_t *size)
+{
+	*size = 1ULL << (64 - (TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS));
+	return group << (64 - (TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS));
+}
+
 /* This is the core routine which searches the hashtable for an entry.
 /* This is the core routine which searches the hashtable for an entry.
  * On error, no locks are held and TDB_OFF_ERR is returned.
  * On error, no locks are held and TDB_OFF_ERR is returned.
  * Otherwise, hinfo is filled in (and the optional tinfo).
  * Otherwise, hinfo is filled in (and the optional tinfo).
@@ -149,11 +156,7 @@ tdb_off_t find_and_lock(struct tdb_context *tdb,
 	group = use_bits(h, TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS);
 	group = use_bits(h, TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS);
 	h->home_bucket = use_bits(h, TDB_HASH_GROUP_BITS);
 	h->home_bucket = use_bits(h, TDB_HASH_GROUP_BITS);
 
 
-	/* FIXME: Guess the depth, don't over-lock! */
-	h->hlock_start = (tdb_off_t)group
-		<< (64 - (TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS));
-	h->hlock_range = 1ULL << (64 - (TDB_TOPLEVEL_HASH_BITS
-					- TDB_HASH_GROUP_BITS));
+	h->hlock_start = hlock_range(group, &h->hlock_range);
 	if (tdb_lock_hashes(tdb, h->hlock_start, h->hlock_range, ltype,
 	if (tdb_lock_hashes(tdb, h->hlock_start, h->hlock_range, ltype,
 			    TDB_LOCK_WAIT))
 			    TDB_LOCK_WAIT))
 		return TDB_OFF_ERR;
 		return TDB_OFF_ERR;
@@ -610,3 +613,46 @@ int first_in_hash(struct tdb_context *tdb, int ltype,
 
 
 	return next_in_hash(tdb, ltype, tinfo, kbuf, dlen);
 	return next_in_hash(tdb, ltype, tinfo, kbuf, dlen);
 }
 }
+
+/* Even if the entry isn't in this hash bucket, you'd have to lock this
+ * bucket to find it. */
+static int chainlock(struct tdb_context *tdb, const TDB_DATA *key,
+		     int ltype, enum tdb_lock_flags waitflag,
+		     const char *func)
+{
+	int ret;
+	uint64_t h = tdb_hash(tdb, key->dptr, key->dsize);
+	tdb_off_t lockstart, locksize;
+	unsigned int group, gbits;
+
+	gbits = TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS;
+	group = bits(h, 64 - gbits, gbits);
+
+	lockstart = hlock_range(group, &locksize);
+
+	ret = tdb_lock_hashes(tdb, lockstart, locksize, ltype, waitflag);
+	tdb_trace_1rec(tdb, func, *key);
+	return ret;
+}
+
+/* lock/unlock one hash chain. This is meant to be used to reduce
+   contention - it cannot guarantee how many records will be locked */
+int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
+{
+	return chainlock(tdb, &key, F_WRLCK, TDB_LOCK_WAIT, "tdb_chainlock");
+}
+
+int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
+{
+	uint64_t h = tdb_hash(tdb, key.dptr, key.dsize);
+	tdb_off_t lockstart, locksize;
+	unsigned int group, gbits;
+
+	gbits = TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS;
+	group = bits(h, 64 - gbits, gbits);
+
+	lockstart = hlock_range(group, &locksize);
+
+	tdb_trace_1rec(tdb, "tdb_chainunlock", key);
+	return tdb_unlock_hashes(tdb, lockstart, locksize, F_WRLCK);
+}

+ 9 - 28
ccan/tdb2/lock.c

@@ -278,6 +278,15 @@ static int tdb_nest_lock(struct tdb_context *tdb, tdb_off_t offset, int ltype,
 		return 0;
 		return 0;
 	}
 	}
 
 
+	if (tdb->num_lockrecs
+	    && offset >= TDB_HASH_LOCK_START
+	    && offset < TDB_HASH_LOCK_START + TDB_HASH_LOCK_RANGE) {
+		tdb->ecode = TDB_ERR_LOCK;
+		tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
+			 "tdb_nest_lock: already have a hash lock?\n");
+		return -1;
+	}
+
 	new_lck = (struct tdb_lock_type *)realloc(
 	new_lck = (struct tdb_lock_type *)realloc(
 		tdb->lockrecs,
 		tdb->lockrecs,
 		sizeof(*tdb->lockrecs) * (tdb->num_lockrecs+1));
 		sizeof(*tdb->lockrecs) * (tdb->num_lockrecs+1));
@@ -756,34 +765,6 @@ void tdb_unlock_free_bucket(struct tdb_context *tdb, tdb_off_t b_off)
 	tdb_nest_unlock(tdb, free_lock_off(b_off), F_WRLCK);
 	tdb_nest_unlock(tdb, free_lock_off(b_off), F_WRLCK);
 }
 }
 
 
-/* Even if the entry isn't in this hash bucket, you'd have to lock this
- * bucket to find it. */
-static int chainlock(struct tdb_context *tdb, const TDB_DATA *key,
-		     int ltype, enum tdb_lock_flags waitflag,
-		     const char *func)
-{
-	int ret;
-	uint64_t h = tdb_hash(tdb, key->dptr, key->dsize);
-
-	ret = tdb_lock_hashes(tdb, h, 1, ltype, waitflag);
-	tdb_trace_1rec(tdb, func, *key);
-	return ret;
-}
-
-/* lock/unlock one hash chain. This is meant to be used to reduce
-   contention - it cannot guarantee how many records will be locked */
-int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
-{
-	return chainlock(tdb, &key, F_WRLCK, TDB_LOCK_WAIT, "tdb_chainlock");
-}
-
-int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
-{
-	uint64_t h = tdb_hash(tdb, key.dptr, key.dsize);
-	tdb_trace_1rec(tdb, "tdb_chainunlock", key);
-	return tdb_unlock_hashes(tdb, h, 1, F_WRLCK);
-}
-
 #if 0
 #if 0
 /* lock/unlock one hash chain, non-blocking. This is meant to be used
 /* lock/unlock one hash chain, non-blocking. This is meant to be used
    to reduce contention - it cannot guarantee how many records will be
    to reduce contention - it cannot guarantee how many records will be