summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Teigland <teigland@redhat.com>2012-05-15 16:07:49 -0500
committerDavid Teigland <teigland@redhat.com>2012-07-16 14:17:52 -0500
commit1d7c484eeb167fc374294e38ae402de4097c8611 (patch)
tree82ed8a279b8f399205a15951c50c22aea67d7323
parentc04fecb4d9f7753e0cbff7edd03ec68f8721cdce (diff)
downloadlwn-1d7c484eeb167fc374294e38ae402de4097c8611.tar.gz
lwn-1d7c484eeb167fc374294e38ae402de4097c8611.zip
dlm: use idr instead of list for recovered rsbs
When a large number of resources are being recovered, a linear search of the recover_list takes a long time. Use an idr in place of a list. Signed-off-by: David Teigland <teigland@redhat.com>
-rw-r--r--fs/dlm/dlm_internal.h3
-rw-r--r--fs/dlm/lockspace.c3
-rw-r--r--fs/dlm/rcom.c2
-rw-r--r--fs/dlm/recover.c116
4 files changed, 101 insertions, 23 deletions
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 3093207a7684..a5f82d5b3946 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -288,6 +288,7 @@ struct dlm_rsb {
int res_nodeid;
int res_master_nodeid;
int res_dir_nodeid;
+ int res_id; /* for ls_recover_idr */
uint32_t res_lvbseq;
uint32_t res_hash;
uint32_t res_bucket; /* rsbtbl */
@@ -587,6 +588,8 @@ struct dlm_ls {
struct list_head ls_recover_list;
spinlock_t ls_recover_list_lock;
int ls_recover_list_count;
+ struct idr ls_recover_idr;
+ spinlock_t ls_recover_idr_lock;
wait_queue_head_t ls_wait_general;
struct mutex ls_clear_proc_locks;
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 065bb75ed609..d4d3b3165c6c 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -565,6 +565,8 @@ static int new_lockspace(const char *name, const char *cluster,
INIT_LIST_HEAD(&ls->ls_recover_list);
spin_lock_init(&ls->ls_recover_list_lock);
+ idr_init(&ls->ls_recover_idr);
+ spin_lock_init(&ls->ls_recover_idr_lock);
ls->ls_recover_list_count = 0;
ls->ls_local_handle = ls;
init_waitqueue_head(&ls->ls_wait_general);
@@ -636,6 +638,7 @@ static int new_lockspace(const char *name, const char *cluster,
spin_lock(&lslist_lock);
list_del(&ls->ls_list);
spin_unlock(&lslist_lock);
+ idr_destroy(&ls->ls_recover_idr);
kfree(ls->ls_recover_buf);
out_lkbfree:
idr_destroy(&ls->ls_lkbidr);
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index c8c298d81463..87f1a56eab32 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -325,7 +325,7 @@ int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
if (error)
goto out;
memcpy(rc->rc_buf, r->res_name, r->res_length);
- rc->rc_id = (unsigned long) r;
+ rc->rc_id = (unsigned long) r->res_id;
send_rcom(ls, mh, rc);
out:
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index 3c025fe49ad3..ff6f27629a0c 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -277,22 +277,6 @@ static void recover_list_del(struct dlm_rsb *r)
dlm_put_rsb(r);
}
-static struct dlm_rsb *recover_list_find(struct dlm_ls *ls, uint64_t id)
-{
- struct dlm_rsb *r = NULL;
-
- spin_lock(&ls->ls_recover_list_lock);
-
- list_for_each_entry(r, &ls->ls_recover_list, res_recover_list) {
- if (id == (unsigned long) r)
- goto out;
- }
- r = NULL;
- out:
- spin_unlock(&ls->ls_recover_list_lock);
- return r;
-}
-
static void recover_list_clear(struct dlm_ls *ls)
{
struct dlm_rsb *r, *s;
@@ -313,6 +297,94 @@ static void recover_list_clear(struct dlm_ls *ls)
spin_unlock(&ls->ls_recover_list_lock);
}
+static int recover_idr_empty(struct dlm_ls *ls)
+{
+ int empty = 1;
+
+ spin_lock(&ls->ls_recover_idr_lock);
+ if (ls->ls_recover_list_count)
+ empty = 0;
+ spin_unlock(&ls->ls_recover_idr_lock);
+
+ return empty;
+}
+
+static int recover_idr_add(struct dlm_rsb *r)
+{
+ struct dlm_ls *ls = r->res_ls;
+ int rv, id;
+
+ rv = idr_pre_get(&ls->ls_recover_idr, GFP_NOFS);
+ if (!rv)
+ return -ENOMEM;
+
+ spin_lock(&ls->ls_recover_idr_lock);
+ if (r->res_id) {
+ spin_unlock(&ls->ls_recover_idr_lock);
+ return -1;
+ }
+ rv = idr_get_new_above(&ls->ls_recover_idr, r, 1, &id);
+ if (rv) {
+ spin_unlock(&ls->ls_recover_idr_lock);
+ return rv;
+ }
+ r->res_id = id;
+ ls->ls_recover_list_count++;
+ dlm_hold_rsb(r);
+ spin_unlock(&ls->ls_recover_idr_lock);
+ return 0;
+}
+
+static void recover_idr_del(struct dlm_rsb *r)
+{
+ struct dlm_ls *ls = r->res_ls;
+
+ spin_lock(&ls->ls_recover_idr_lock);
+ idr_remove(&ls->ls_recover_idr, r->res_id);
+ r->res_id = 0;
+ ls->ls_recover_list_count--;
+ spin_unlock(&ls->ls_recover_idr_lock);
+
+ dlm_put_rsb(r);
+}
+
+static struct dlm_rsb *recover_idr_find(struct dlm_ls *ls, uint64_t id)
+{
+ struct dlm_rsb *r;
+
+ spin_lock(&ls->ls_recover_idr_lock);
+ r = idr_find(&ls->ls_recover_idr, (int)id);
+ spin_unlock(&ls->ls_recover_idr_lock);
+ return r;
+}
+
+static int recover_idr_clear_rsb(int id, void *p, void *data)
+{
+ struct dlm_ls *ls = data;
+ struct dlm_rsb *r = p;
+
+ r->res_id = 0;
+ r->res_recover_locks_count = 0;
+ ls->ls_recover_list_count--;
+
+ dlm_put_rsb(r);
+ return 0;
+}
+
+static void recover_idr_clear(struct dlm_ls *ls)
+{
+ spin_lock(&ls->ls_recover_idr_lock);
+ idr_for_each(&ls->ls_recover_idr, recover_idr_clear_rsb, ls);
+ idr_remove_all(&ls->ls_recover_idr);
+
+ if (ls->ls_recover_list_count != 0) {
+ log_error(ls, "warning: recover_list_count %d",
+ ls->ls_recover_list_count);
+ ls->ls_recover_list_count = 0;
+ }
+ spin_unlock(&ls->ls_recover_idr_lock);
+}
+
/* Master recovery: find new master node for rsb's that were
mastered on nodes that have been removed.
@@ -408,7 +480,7 @@ static int recover_master(struct dlm_rsb *r, unsigned int *count)
set_new_master(r);
error = 0;
} else {
- recover_list_add(r);
+ recover_idr_add(r);
error = dlm_send_rcom_lookup(r, dir_nodeid);
}
@@ -493,10 +565,10 @@ int dlm_recover_masters(struct dlm_ls *ls)
log_debug(ls, "dlm_recover_masters %u of %u", count, total);
- error = dlm_wait_function(ls, &recover_list_empty);
+ error = dlm_wait_function(ls, &recover_idr_empty);
out:
if (error)
- recover_list_clear(ls);
+ recover_idr_clear(ls);
return error;
}
@@ -505,7 +577,7 @@ int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
struct dlm_rsb *r;
int ret_nodeid, new_master;
- r = recover_list_find(ls, rc->rc_id);
+ r = recover_idr_find(ls, rc->rc_id);
if (!r) {
log_error(ls, "dlm_recover_master_reply no id %llx",
(unsigned long long)rc->rc_id);
@@ -524,9 +596,9 @@ int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
r->res_nodeid = new_master;
set_new_master(r);
unlock_rsb(r);
- recover_list_del(r);
+ recover_idr_del(r);
- if (recover_list_empty(ls))
+ if (recover_idr_empty(ls))
wake_up(&ls->ls_wait_general);
out:
return 0;