Browse Source

tdb2: set owner for locks

The last step before sharing struct tdb_file is to add a field to
identify which tdb context is holding each lock, so we can fail when
they conflict.
Rusty Russell 15 years ago
parent
commit
2960e90f2d
3 changed files with 54 additions and 5 deletions
  1. 49 5
      ccan/tdb2/lock.c
  2. 1 0
      ccan/tdb2/open.c
  3. 4 0
      ccan/tdb2/private.h

+ 49 - 5
ccan/tdb2/lock.c

@@ -29,6 +29,17 @@
 #include <assert.h>
 #include <ccan/build_assert/build_assert.h>
 
+/* If we were threaded, we could wait for unlock, but we're not, so fail. */
+static bool owner_conflict(struct tdb_context *tdb, struct tdb_lock *lock)
+{
+	if (lock->owner != tdb) {
+		tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR,
+			   "Lock already owned by another opener");
+		return true;
+	}
+	return false;
+}
+
 static int fcntl_lock(struct tdb_context *tdb,
 		      int rw, off_t off, off_t len, bool waitflag)
 {
@@ -236,12 +247,15 @@ enum TDB_ERROR tdb_allrecord_upgrade(struct tdb_context *tdb)
 			  "tdb_allrecord_upgrade failed");
 }
 
-static struct tdb_lock *find_nestlock(struct tdb_context *tdb, tdb_off_t offset)
+static struct tdb_lock *find_nestlock(struct tdb_context *tdb, tdb_off_t offset,
+				      const struct tdb_context *owner)
 {
 	unsigned int i;
 
 	for (i=0; i<tdb->file->num_lockrecs; i++) {
 		if (tdb->file->lockrecs[i].off == offset) {
+			if (owner && tdb->file->lockrecs[i].owner != owner)
+				return NULL;
 			return &tdb->file->lockrecs[i];
 		}
 	}
@@ -290,8 +304,11 @@ static enum TDB_ERROR tdb_nest_lock(struct tdb_context *tdb,
 
 	add_stat(tdb, locks, 1);
 
-	new_lck = find_nestlock(tdb, offset);
+	new_lck = find_nestlock(tdb, offset, NULL);
 	if (new_lck) {
+		if (owner_conflict(tdb, new_lck))
+			return -1;
+
 		if (new_lck->ltype == F_RDLCK && ltype == F_WRLCK) {
 			return tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_ERROR,
 					  "tdb_nest_lock:"
@@ -348,6 +365,7 @@ static enum TDB_ERROR tdb_nest_lock(struct tdb_context *tdb,
 		}
 	}
 
+	tdb->file->lockrecs[tdb->file->num_lockrecs].owner = tdb;
 	tdb->file->lockrecs[tdb->file->num_lockrecs].off = offset;
 	tdb->file->lockrecs[tdb->file->num_lockrecs].count = 1;
 	tdb->file->lockrecs[tdb->file->num_lockrecs].ltype = ltype;
@@ -365,7 +383,7 @@ static enum TDB_ERROR tdb_nest_unlock(struct tdb_context *tdb,
 	if (tdb->flags & TDB_NOLOCK)
 		return TDB_SUCCESS;
 
-	lck = find_nestlock(tdb, off);
+	lck = find_nestlock(tdb, off, tdb);
 	if ((lck == NULL) || (lck->count == 0)) {
 		return tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_ERROR,
 				  "tdb_nest_unlock: no lock for %zu",
@@ -508,6 +526,7 @@ again:
 		return ecode;
 	}
 
+	tdb->file->allrecord_lock.owner = tdb;
 	tdb->file->allrecord_lock.count = 1;
 	/* If it's upgradable, it's actually exclusive so we can treat
 	 * it as a write lock. */
@@ -545,7 +564,7 @@ void tdb_unlock_open(struct tdb_context *tdb)
 bool tdb_has_open_lock(struct tdb_context *tdb)
 {
 	return !(tdb->flags & TDB_NOLOCK)
-		&& find_nestlock(tdb, TDB_OPEN_LOCK) != NULL;
+		&& find_nestlock(tdb, TDB_OPEN_LOCK, tdb) != NULL;
 }
 
 enum TDB_ERROR tdb_lock_expand(struct tdb_context *tdb, int ltype)
@@ -569,6 +588,12 @@ void tdb_allrecord_unlock(struct tdb_context *tdb, int ltype)
 		return;
 	}
 
+	if (tdb->file->allrecord_lock.owner != tdb) {
+		tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR,
+			   "tdb_allrecord_unlock: not locked by us!");
+		return;
+	}
+
 	/* Upgradable locks are marked as write locks. */
 	if (tdb->file->allrecord_lock.ltype != ltype
 	    && (!tdb->file->allrecord_lock.off || ltype != F_RDLCK)) {
@@ -592,7 +617,7 @@ void tdb_allrecord_unlock(struct tdb_context *tdb, int ltype)
 
 bool tdb_has_expansion_lock(struct tdb_context *tdb)
 {
-	return find_nestlock(tdb, TDB_EXPANSION_LOCK) != NULL;
+	return find_nestlock(tdb, TDB_EXPANSION_LOCK, tdb) != NULL;
 }
 
 bool tdb_has_hash_locks(struct tdb_context *tdb)
@@ -729,3 +754,22 @@ void tdb_unlock_free_bucket(struct tdb_context *tdb, tdb_off_t b_off)
 
 	tdb_nest_unlock(tdb, free_lock_off(b_off), F_WRLCK);
 }
+
+void tdb_unlock_all(struct tdb_context *tdb)
+{
+	unsigned int i;
+
+	while (tdb->file->allrecord_lock.count
+	       && tdb->file->allrecord_lock.owner == tdb) {
+		tdb_allrecord_unlock(tdb, tdb->file->allrecord_lock.ltype);
+	}
+
+	for (i=0; i<tdb->file->num_lockrecs; i++) {
+		if (tdb->file->lockrecs[i].owner == tdb) {
+			tdb_nest_unlock(tdb,
+					tdb->file->lockrecs[i].off,
+					tdb->file->lockrecs[i].ltype);
+			i--;
+		}
+	}
+}

+ 1 - 0
ccan/tdb2/open.c

@@ -448,6 +448,7 @@ struct tdb_context *tdb_open(const char *name, int tdb_flags,
 #endif
 	free((char *)tdb->name);
 	if (tdb->file) {
+		tdb_unlock_all(tdb);
 		if (tdb->file->map_ptr) {
 			if (tdb->flags & TDB_INTERNAL) {
 				free(tdb->file->map_ptr);

+ 4 - 0
ccan/tdb2/private.h

@@ -301,6 +301,7 @@ enum tdb_lock_flags {
 };
 
 struct tdb_lock {
+	struct tdb_context *owner;
 	uint32_t off;
 	uint32_t count;
 	uint32_t ltype;
@@ -536,6 +537,9 @@ enum TDB_ERROR tdb_unlock_hashes(struct tdb_context *tdb,
 				 tdb_off_t hash_lock,
 				 tdb_len_t hash_range, int ltype);
 
+/* For closing the file. */
+void tdb_unlock_all(struct tdb_context *tdb);
+
 /* Lock/unlock a particular free bucket. */
 enum TDB_ERROR tdb_lock_free_bucket(struct tdb_context *tdb, tdb_off_t b_off,
 				    enum tdb_lock_flags waitflag);