Browse Source

tdb2: tdb_lockall() and tdb_lockall_read() support.

We also change tdb_chainunlock not to return an error (as per the
other locking functions: we log a message, but don't return an error).
Rusty Russell 15 years ago
parent
commit
f513c5701b
6 changed files with 142 additions and 8 deletions
  1. 2 3
      ccan/tdb2/hash.c
  2. 21 1
      ccan/tdb2/lock.c
  3. 2 2
      ccan/tdb2/open.c
  4. 1 1
      ccan/tdb2/private.h
  5. 36 1
      ccan/tdb2/tdb2.h
  6. 80 0
      ccan/tdb2/test/run-lockall.c

+ 2 - 3
ccan/tdb2/hash.c

@@ -861,7 +861,7 @@ enum TDB_ERROR tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
 					   "tdb_chainlock");
 					   "tdb_chainlock");
 }
 }
 
 
-enum TDB_ERROR tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
+void tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
 {
 {
 	uint64_t h = tdb_hash(tdb, key.dptr, key.dsize);
 	uint64_t h = tdb_hash(tdb, key.dptr, key.dsize);
 	tdb_off_t lockstart, locksize;
 	tdb_off_t lockstart, locksize;
@@ -873,6 +873,5 @@ enum TDB_ERROR tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
 	lockstart = hlock_range(group, &locksize);
 	lockstart = hlock_range(group, &locksize);
 
 
 	tdb_trace_1rec(tdb, "tdb_chainunlock", key);
 	tdb_trace_1rec(tdb, "tdb_chainunlock", key);
-	return tdb->last_error = tdb_unlock_hashes(tdb, lockstart, locksize,
-						   F_WRLCK);
+	tdb_unlock_hashes(tdb, lockstart, locksize, F_WRLCK);
 }
 }

+ 21 - 1
ccan/tdb2/lock.c

@@ -762,7 +762,27 @@ void tdb_unlock_free_bucket(struct tdb_context *tdb, tdb_off_t b_off)
 	tdb_nest_unlock(tdb, free_lock_off(b_off), F_WRLCK);
 	tdb_nest_unlock(tdb, free_lock_off(b_off), F_WRLCK);
 }
 }
 
 
-void tdb_unlock_all(struct tdb_context *tdb)
+enum TDB_ERROR tdb_lockall(struct tdb_context *tdb)
+{
+	return tdb_allrecord_lock(tdb, F_WRLCK, TDB_LOCK_WAIT, false);
+}
+
+void tdb_unlockall(struct tdb_context *tdb)
+{
+	tdb_allrecord_unlock(tdb, F_WRLCK);
+}
+
+enum TDB_ERROR tdb_lockall_read(struct tdb_context *tdb)
+{
+	return tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, false);
+}
+
+void tdb_unlockall_read(struct tdb_context *tdb)
+{
+	tdb_allrecord_unlock(tdb, F_RDLCK);
+}
+
+void tdb_lock_cleanup(struct tdb_context *tdb)
 {
 {
 	unsigned int i;
 	unsigned int i;
 
 

+ 2 - 2
ccan/tdb2/open.c

@@ -456,7 +456,7 @@ fail_errno:
 #endif
 #endif
 	free(cast_const(char *, tdb->name));
 	free(cast_const(char *, tdb->name));
 	if (tdb->file) {
 	if (tdb->file) {
-		tdb_unlock_all(tdb);
+		tdb_lock_cleanup(tdb);
 		if (--tdb->file->refcnt == 0) {
 		if (--tdb->file->refcnt == 0) {
 			assert(tdb->file->num_lockrecs == 0);
 			assert(tdb->file->num_lockrecs == 0);
 			if (tdb->file->map_ptr) {
 			if (tdb->file->map_ptr) {
@@ -499,7 +499,7 @@ int tdb_close(struct tdb_context *tdb)
 	if (tdb->file) {
 	if (tdb->file) {
 		struct tdb_file **i;
 		struct tdb_file **i;
 
 
-		tdb_unlock_all(tdb);
+		tdb_lock_cleanup(tdb);
 		if (--tdb->file->refcnt == 0) {
 		if (--tdb->file->refcnt == 0) {
 			ret = close(tdb->file->fd);
 			ret = close(tdb->file->fd);
 
 

+ 1 - 1
ccan/tdb2/private.h

@@ -549,7 +549,7 @@ enum TDB_ERROR tdb_unlock_hashes(struct tdb_context *tdb,
 				 tdb_len_t hash_range, int ltype);
 				 tdb_len_t hash_range, int ltype);
 
 
 /* For closing the file. */
 /* For closing the file. */
-void tdb_unlock_all(struct tdb_context *tdb);
+void tdb_lock_cleanup(struct tdb_context *tdb);
 
 
 /* Lock/unlock a particular free bucket. */
 /* Lock/unlock a particular free bucket. */
 enum TDB_ERROR tdb_lock_free_bucket(struct tdb_context *tdb, tdb_off_t b_off,
 enum TDB_ERROR tdb_lock_free_bucket(struct tdb_context *tdb, tdb_off_t b_off,

+ 36 - 1
ccan/tdb2/tdb2.h

@@ -411,8 +411,43 @@ enum TDB_ERROR tdb_chainlock(struct tdb_context *tdb, TDB_DATA key);
  * tdb_chainunlock - unlock a record in the TDB
  * tdb_chainunlock - unlock a record in the TDB
  * @tdb: the tdb context returned from tdb_open()
  * @tdb: the tdb context returned from tdb_open()
  * @key: the key to unlock.
  * @key: the key to unlock.
+ *
+ * The key must have previously been locked by tdb_chainlock().
+ */
+void tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key);
+
+/**
+ * tdb_lockall - lock the entire TDB
+ * @tdb: the tdb context returned from tdb_open()
+ *
+ * You cannot hold a tdb_chainlock while calling this.  It nests, so you
+ * must call tdb_unlockall as many times as you call tdb_lockall.
+ */
+enum TDB_ERROR tdb_lockall(struct tdb_context *tdb);
+
+/**
+ * tdb_unlockall - unlock the entire TDB
+ * @tdb: the tdb context returned from tdb_open()
+ */
+void tdb_unlockall(struct tdb_context *tdb);
+
+/**
+ * tdb_lockall_read - lock the entire TDB for reading
+ * @tdb: the tdb context returned from tdb_open()
+ *
+ * This prevents others writing to the database, eg. tdb_delete, tdb_store,
+ * tdb_append, but not tdb_fetch.
+ *
+ * You cannot hold a tdb_chainlock while calling this.  It nests, so you
+ * must call tdb_unlockall_read as many times as you call tdb_lockall_read.
+ */
+enum TDB_ERROR tdb_lockall_read(struct tdb_context *tdb);
+
+/**
+ * tdb_unlockall_read - unlock the entire TDB for reading
+ * @tdb: the tdb context returned from tdb_open()
  */
  */
-enum TDB_ERROR tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key);
+void tdb_unlockall_read(struct tdb_context *tdb);
 
 
 /**
 /**
  * tdb_wipe_all - wipe the database clean
  * tdb_wipe_all - wipe the database clean

+ 80 - 0
ccan/tdb2/test/run-lockall.c

@@ -0,0 +1,80 @@
+#include "config.h"
+#include <unistd.h>
+#include "lock-tracking.h"
+
+#define fcntl fcntl_with_lockcheck
+
+#include <ccan/tdb2/tdb.c>
+#include <ccan/tdb2/open.c>
+#include <ccan/tdb2/free.c>
+#include <ccan/tdb2/lock.c>
+#include <ccan/tdb2/io.c>
+#include <ccan/tdb2/hash.c>
+#include <ccan/tdb2/check.c>
+#include <ccan/tdb2/transaction.c>
+#include <ccan/tap/tap.h>
+#include <stdlib.h>
+#include <stdbool.h>
+#include <stdarg.h>
+#include <err.h>
+#include "external-agent.h"
+#include "logging.h"
+
+#define TEST_DBNAME "run-lockall.tdb"
+
+#undef fcntl
+
+int main(int argc, char *argv[])
+{
+	struct agent *agent;
+	const int flags[] = { TDB_DEFAULT,
+			      TDB_NOMMAP,
+			      TDB_CONVERT,
+			      TDB_CONVERT | TDB_NOMMAP };
+	int i;
+
+	plan_tests(13 * sizeof(flags)/sizeof(flags[0]) + 1);
+	agent = prepare_external_agent();
+	if (!agent)
+		err(1, "preparing agent");
+
+	for (i = 0; i < sizeof(flags)/sizeof(flags[0]); i++) {
+		enum agent_return ret;
+		struct tdb_context *tdb;
+
+		tdb = tdb_open(TEST_DBNAME, flags[i],
+			       O_RDWR|O_CREAT|O_TRUNC, 0600, &tap_log_attr);
+		ok1(tdb);
+
+		ret = external_agent_operation(agent, OPEN, TEST_DBNAME);
+		ok1(ret == SUCCESS);
+
+		ok1(tdb_lockall(tdb) == TDB_SUCCESS);
+		ok1(external_agent_operation(agent, STORE, "key")
+		    == WOULD_HAVE_BLOCKED);
+		ok1(external_agent_operation(agent, FETCH, "key")
+		    == WOULD_HAVE_BLOCKED);
+		/* Test nesting. */
+		ok1(tdb_lockall(tdb) == TDB_SUCCESS);
+		tdb_unlockall(tdb);
+		tdb_unlockall(tdb);
+
+		ok1(external_agent_operation(agent, STORE, "key") == SUCCESS);
+
+		ok1(tdb_lockall_read(tdb) == TDB_SUCCESS);
+		ok1(external_agent_operation(agent, STORE, "key")
+		    == WOULD_HAVE_BLOCKED);
+		ok1(external_agent_operation(agent, FETCH, "key") == SUCCESS);
+		ok1(tdb_lockall_read(tdb) == TDB_SUCCESS);
+		tdb_unlockall_read(tdb);
+		tdb_unlockall_read(tdb);
+
+		ok1(external_agent_operation(agent, STORE, "key") == SUCCESS);
+		ok1(external_agent_operation(agent, CLOSE, NULL) == SUCCESS);
+		tdb_close(tdb);
+	}
+
+	free_external_agent(agent);
+	ok1(tap_log_messages == 0);
+	return exit_status();
+}