Browse Source

tdb2: clarify locking heirarchy; hashes, freelist, then expand lock.

Rusty Russell 15 years ago
parent
commit
a0ac303da0
2 changed files with 64 additions and 24 deletions
  1. 3 1
      ccan/tdb2/io.c
  2. 61 23
      ccan/tdb2/lock.c

+ 3 - 1
ccan/tdb2/io.c

@@ -73,7 +73,9 @@ static int tdb_oob(struct tdb_context *tdb, tdb_off_t len, bool probe)
 	int ret;
 
 	/* We can't hold pointers during this: we could unmap! */
-	assert(!tdb->direct_access || tdb_has_expansion_lock(tdb));
+	assert(!tdb->direct_access
+	       || (tdb->flags & TDB_NOLOCK)
+	       || tdb_has_expansion_lock(tdb));
 
 	if (len <= tdb->map_size)
 		return 0;

+ 61 - 23
ccan/tdb2/lock.c

@@ -260,7 +260,7 @@ static int tdb_nest_lock(struct tdb_context *tdb, tdb_off_t offset, int ltype,
 	if (offset >= TDB_HASH_LOCK_START + TDB_HASH_LOCK_RANGE + tdb->map_size / 8) {
 		tdb->ecode = TDB_ERR_LOCK;
 		tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
-			 "tdb_lock: invalid offset %llu ltype=%d\n",
+			 "tdb_nest_lock: invalid offset %llu ltype=%d\n",
 			 (long long)offset, ltype);
 		return -1;
 	}
@@ -284,7 +284,7 @@ static int tdb_nest_lock(struct tdb_context *tdb, tdb_off_t offset, int ltype,
 	if (new_lck == NULL) {
 		tdb->ecode = TDB_ERR_OOM;
 		tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
-			 "tdb_lock: unable to allocate %llu lock structure",
+			 "tdb_nest_lock: unable to allocate %llu lock struct",
 			 (long long)(tdb->num_lockrecs + 1));
 		errno = ENOMEM;
 		return -1;
@@ -351,7 +351,7 @@ static int tdb_nest_unlock(struct tdb_context *tdb, tdb_off_t off, int ltype)
 	if ((lck == NULL) || (lck->count == 0)) {
 		tdb->ecode = TDB_ERR_LOCK;
 		tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
-			 "tdb_unlock: no lock for %llu\n", (long long)off);
+			 "tdb_nest_unlock: no lock for %llu\n", (long long)off);
 		return -1;
 	}
 
@@ -433,9 +433,7 @@ static int tdb_lock_gradual(struct tdb_context *tdb,
 }
 
 /* lock/unlock entire database.  It can only be upgradable if you have some
- * other way of guaranteeing exclusivity (ie. transaction write lock).
- * Note that we don't lock the free chains: currently noone can get those locks
- * without a hash chain lock first. */
+ * other way of guaranteeing exclusivity (ie. transaction write lock). */
 int tdb_allrecord_lock(struct tdb_context *tdb, int ltype,
 		       enum tdb_lock_flags flags, bool upgradable)
 {
@@ -479,16 +477,30 @@ int tdb_allrecord_lock(struct tdb_context *tdb, int ltype,
 	}
 
 again:
+	/* Lock hashes, gradually. */
 	if (tdb_lock_gradual(tdb, ltype, flags, TDB_HASH_LOCK_START,
 			     TDB_HASH_LOCK_RANGE)) {
 		if (!(flags & TDB_LOCK_PROBE)) {
 			tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
-				 "tdb_lockall hashes failed (%s)\n",
+				 "tdb_allrecord_lock hashes failed (%s)\n",
 				 strerror(errno));
 		}
 		return -1;
 	}
 
+	/* Lock free lists: there to end of file. */
+	if (tdb_brlock(tdb, ltype, TDB_HASH_LOCK_START + TDB_HASH_LOCK_RANGE,
+		       0, flags)) {
+		if (!(flags & TDB_LOCK_PROBE)) {
+			tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
+				 "tdb_allrecord_lock freelist failed (%s)\n",
+				 strerror(errno));
+		}
+		tdb_brunlock(tdb, ltype, TDB_HASH_LOCK_START, 
+			     TDB_HASH_LOCK_RANGE);
+		return -1;
+	}
+
 	tdb->allrecord_lock.count = 1;
 	/* If it's upgradable, it's actually exclusive so we can treat
 	 * it as a write lock. */
@@ -564,14 +576,12 @@ int tdb_allrecord_unlock(struct tdb_context *tdb, int ltype)
 	tdb->allrecord_lock.count = 0;
 	tdb->allrecord_lock.ltype = 0;
 
-	return tdb_brunlock(tdb, ltype, TDB_HASH_LOCK_START,
-			    TDB_HASH_LOCK_RANGE);
+	return tdb_brunlock(tdb, ltype, TDB_HASH_LOCK_START, 0);
 }
 
 bool tdb_has_expansion_lock(struct tdb_context *tdb)
 {
-	return find_nestlock(tdb, TDB_EXPANSION_LOCK) != NULL
-		|| (tdb->flags & TDB_NOLOCK);
+	return find_nestlock(tdb, TDB_EXPANSION_LOCK) != NULL;
 }
 
 bool tdb_has_locks(struct tdb_context *tdb)
@@ -625,6 +635,18 @@ int tdb_unlockall_read(struct tdb_context *tdb)
 }
 #endif
 
+static bool tdb_has_free_lock(struct tdb_context *tdb)
+{
+	unsigned int i;
+
+	for (i=0; i<tdb->num_lockrecs; i++) {
+		if (tdb->lockrecs[i].off
+		    > TDB_HASH_LOCK_START + TDB_HASH_LOCK_RANGE)
+			return true;
+	}
+	return false;
+}
+
 int tdb_lock_hashes(struct tdb_context *tdb,
 		    tdb_off_t hash_lock,
 		    tdb_len_t hash_range,
@@ -643,12 +665,26 @@ int tdb_lock_hashes(struct tdb_context *tdb,
 	if (tdb->allrecord_lock.count) {
 		tdb->ecode = TDB_ERR_LOCK;
 		tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
-			 "tdb_lock_list: have %s allrecordlock\n",
+			 "tdb_lock_hashes: have %s allrecordlock\n",
 			 tdb->allrecord_lock.ltype == F_RDLCK
 			 ? "read" : "write");
 		return -1;
 	}
 
+	if (tdb_has_free_lock(tdb)) {
+		tdb->ecode = TDB_ERR_LOCK;
+		tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
+			 "tdb_lock_hashes: have free lock already\n");
+		return -1;
+	}
+
+	if (tdb_has_expansion_lock(tdb)) {
+		tdb->ecode = TDB_ERR_LOCK;
+		tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
+			 "tdb_lock_hashes: have expansion lock already\n");
+		return -1;
+	}
+
 	return tdb_nest_lock(tdb, lock, ltype, waitflag);
 }
 
@@ -665,7 +701,7 @@ int tdb_unlock_hashes(struct tdb_context *tdb,
 		    && ltype == F_WRLCK) {
 			tdb->ecode = TDB_ERR_LOCK;
 			tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
-				 "tdb_unlock_list RO allrecord!\n");
+				 "tdb_unlock_hashes RO allrecord!\n");
 			return -1;
 		}
 		return 0;
@@ -681,7 +717,8 @@ int tdb_unlock_hashes(struct tdb_context *tdb,
  */
 static tdb_off_t free_lock_off(tdb_off_t b_off)
 {
-	return TDB_HASH_LOCK_START + TDB_HASH_LOCK_RANGE + b_off / sizeof(tdb_off_t);
+	return TDB_HASH_LOCK_START + TDB_HASH_LOCK_RANGE
+		+ b_off / sizeof(tdb_off_t);
 }
 
 int tdb_lock_free_bucket(struct tdb_context *tdb, tdb_off_t b_off,
@@ -689,24 +726,25 @@ int tdb_lock_free_bucket(struct tdb_context *tdb, tdb_off_t b_off,
 {
 	assert(b_off >= sizeof(struct tdb_header));
 
-	/* You're supposed to have a hash lock first! */
-	if (!(tdb->flags & TDB_NOLOCK) && !tdb_has_locks(tdb)) {
-		tdb->ecode = TDB_ERR_LOCK;
-		tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
-			 "tdb_lock_free_list without lock!\n");
-		return -1;
-	}
-
 	/* a allrecord lock allows us to avoid per chain locks */
 	if (tdb->allrecord_lock.count) {
 		if (tdb->allrecord_lock.ltype == F_WRLCK)
 			return 0;
 		tdb->ecode = TDB_ERR_LOCK;
 		tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
-			 "tdb_lock_free_list with RO allrecordlock!\n");
+			 "tdb_lock_free_bucket with RO allrecordlock!\n");
 		return -1;
 	}
 
+#if 0 /* FIXME */
+	if (tdb_has_expansion_lock(tdb)) {
+		tdb->ecode = TDB_ERR_LOCK;
+		tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
+			 "tdb_lock_free_bucket: have expansion lock already\n");
+		return -1;
+	}
+#endif
+
 	return tdb_nest_lock(tdb, free_lock_off(b_off), F_WRLCK, waitflag);
 }