summaryrefslogtreecommitdiff
path: root/fs/dlm/lock.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/dlm/lock.c')
-rw-r--r--fs/dlm/lock.c382
1 files changed, 240 insertions, 142 deletions
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index fee1a4164fc1..7c97181a04fe 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -320,6 +320,11 @@ static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
* Basic operations on rsb's and lkb's
*/
+static inline unsigned long rsb_toss_jiffies(void)
+{
+ return jiffies + (READ_ONCE(dlm_config.ci_toss_secs) * HZ);
+}
+
/* This is only called to add a reference when the code already holds
a valid reference to the rsb, so there's no need for locking. */
@@ -416,6 +421,229 @@ static int pre_rsb_struct(struct dlm_ls *ls)
return 0;
}
+/* connected with timer_delete_sync() in dlm_ls_stop() to stop
+ * new timers when recovery is triggered and don't run them
+ * again until a dlm_timer_resume() tries it again.
+ */
+static void __rsb_mod_timer(struct dlm_ls *ls, unsigned long jiffies)
+{
+ if (!dlm_locking_stopped(ls))
+ mod_timer(&ls->ls_timer, jiffies);
+}
+
+/* This function tries to resume the timer callback if a rsb
+ * is on the toss list and no timer is pending. It might that
+ * the first entry is on currently executed as timer callback
+ * but we don't care if a timer queued up again and does
+ * nothing. Should be a rare case.
+ */
+void dlm_timer_resume(struct dlm_ls *ls)
+{
+ struct dlm_rsb *r;
+
+ spin_lock_bh(&ls->ls_toss_q_lock);
+ r = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb,
+ res_toss_q_list);
+ if (r && !timer_pending(&ls->ls_timer))
+ __rsb_mod_timer(ls, r->res_toss_time);
+ spin_unlock_bh(&ls->ls_toss_q_lock);
+}
+
+/* ls_rsbtbl_lock must be held and being sure the rsb is in toss state */
+static void rsb_delete_toss_timer(struct dlm_ls *ls, struct dlm_rsb *r)
+{
+ struct dlm_rsb *first;
+
+ spin_lock_bh(&ls->ls_toss_q_lock);
+ r->res_toss_time = 0;
+
+ /* if the rsb is not queued do nothing */
+ if (list_empty(&r->res_toss_q_list))
+ goto out;
+
+ /* get the first element before delete */
+ first = list_first_entry(&ls->ls_toss_q, struct dlm_rsb,
+ res_toss_q_list);
+ list_del_init(&r->res_toss_q_list);
+ /* check if the first element was the rsb we deleted */
+ if (first == r) {
+ /* try to get the new first element, if the list
+ * is empty now try to delete the timer, if we are
+ * too late we don't care.
+ *
+ * if the list isn't empty and a new first element got
+ * in place, set the new timer expire time.
+ */
+ first = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb,
+ res_toss_q_list);
+ if (!first)
+ timer_delete(&ls->ls_timer);
+ else
+ __rsb_mod_timer(ls, first->res_toss_time);
+ }
+
+out:
+ spin_unlock_bh(&ls->ls_toss_q_lock);
+}
+
+/* Caller must held ls_rsbtbl_lock and need to be called every time
+ * when either the rsb enters toss state or the toss state changes
+ * the dir/master nodeid.
+ */
+static void rsb_mod_timer(struct dlm_ls *ls, struct dlm_rsb *r)
+{
+ int our_nodeid = dlm_our_nodeid();
+ struct dlm_rsb *first;
+
+ /* If we're the directory record for this rsb, and
+ * we're not the master of it, then we need to wait
+ * for the master node to send us a dir remove for
+ * before removing the dir record.
+ */
+ if (!dlm_no_directory(ls) &&
+ (r->res_master_nodeid != our_nodeid) &&
+ (dlm_dir_nodeid(r) == our_nodeid)) {
+ rsb_delete_toss_timer(ls, r);
+ return;
+ }
+
+ spin_lock_bh(&ls->ls_toss_q_lock);
+ /* set the new rsb absolute expire time in the rsb */
+ r->res_toss_time = rsb_toss_jiffies();
+ if (list_empty(&ls->ls_toss_q)) {
+ /* if the queue is empty add the element and it's
+ * our new expire time
+ */
+ list_add_tail(&r->res_toss_q_list, &ls->ls_toss_q);
+ __rsb_mod_timer(ls, r->res_toss_time);
+ } else {
+ /* check if the rsb was already queued, if so delete
+ * it from the toss queue
+ */
+ if (!list_empty(&r->res_toss_q_list))
+ list_del(&r->res_toss_q_list);
+
+ /* try to get the maybe new first element and then add
+ * to this rsb with the oldest expire time to the end
+ * of the queue. If the list was empty before this
+ * rsb expire time is our next expiration if it wasn't
+ * the now new first elemet is our new expiration time
+ */
+ first = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb,
+ res_toss_q_list);
+ list_add_tail(&r->res_toss_q_list, &ls->ls_toss_q);
+ if (!first)
+ __rsb_mod_timer(ls, r->res_toss_time);
+ else
+ __rsb_mod_timer(ls, first->res_toss_time);
+ }
+ spin_unlock_bh(&ls->ls_toss_q_lock);
+}
+
+/* if we hit contention we do in 250 ms a retry to trylock.
+ * if there is any other mod_timer in between we don't care
+ * about that it expires earlier again this is only for the
+ * unlikely case nothing happened in this time.
+ */
+#define DLM_TOSS_TIMER_RETRY (jiffies + msecs_to_jiffies(250))
+
+void dlm_rsb_toss_timer(struct timer_list *timer)
+{
+ struct dlm_ls *ls = from_timer(ls, timer, ls_timer);
+ int our_nodeid = dlm_our_nodeid();
+ struct dlm_rsb *r;
+ int rv;
+
+ while (1) {
+ /* interrupting point to leave iteration when
+ * recovery waits for timer_delete_sync(), recovery
+ * will take care to delete everything in toss queue.
+ */
+ if (dlm_locking_stopped(ls))
+ break;
+
+ rv = spin_trylock(&ls->ls_toss_q_lock);
+ if (!rv) {
+ /* rearm again try timer */
+ __rsb_mod_timer(ls, DLM_TOSS_TIMER_RETRY);
+ break;
+ }
+
+ r = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb,
+ res_toss_q_list);
+ if (!r) {
+ /* nothing to do anymore next rsb queue will
+ * set next mod_timer() expire.
+ */
+ spin_unlock(&ls->ls_toss_q_lock);
+ break;
+ }
+
+ /* test if the first rsb isn't expired yet, if
+ * so we stop freeing rsb from toss queue as
+ * the order in queue is ascending to the
+ * absolute res_toss_time jiffies
+ */
+ if (time_before(jiffies, r->res_toss_time)) {
+ /* rearm with the next rsb to expire in the future */
+ __rsb_mod_timer(ls, r->res_toss_time);
+ spin_unlock(&ls->ls_toss_q_lock);
+ break;
+ }
+
+ /* in find_rsb_dir/nodir there is a reverse order of this
+ * lock, however this is only a trylock if we hit some
+ * possible contention we try it again.
+ *
+ * This lock synchronized while holding ls_toss_q_lock
+ * synchronize everything that rsb_delete_toss_timer()
+ * or rsb_mod_timer() can't run after this timer callback
+ * deletes the rsb from the ls_toss_q. Whereas the other
+ * holders have always a priority to run as this is only
+ * a caching handling and the other holders might to put
+ * this rsb out of the toss state.
+ */
+ rv = spin_trylock(&ls->ls_rsbtbl_lock);
+ if (!rv) {
+ spin_unlock(&ls->ls_toss_q_lock);
+ /* rearm again try timer */
+ __rsb_mod_timer(ls, DLM_TOSS_TIMER_RETRY);
+ break;
+ }
+
+ list_del(&r->res_rsbs_list);
+ rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
+ dlm_rhash_rsb_params);
+
+ /* not necessary to held the ls_rsbtbl_lock when
+ * calling send_remove()
+ */
+ spin_unlock(&ls->ls_rsbtbl_lock);
+
+ /* remove the rsb out of the toss queue its gone
+ * drom DLM now
+ */
+ list_del_init(&r->res_toss_q_list);
+ spin_unlock(&ls->ls_toss_q_lock);
+
+ /* no rsb in this state should ever run a timer */
+ WARN_ON(!dlm_no_directory(ls) &&
+ (r->res_master_nodeid != our_nodeid) &&
+ (dlm_dir_nodeid(r) == our_nodeid));
+
+ /* We're the master of this rsb but we're not
+ * the directory record, so we need to tell the
+ * dir node to remove the dir record
+ */
+ if (!dlm_no_directory(ls) &&
+ (r->res_master_nodeid == our_nodeid) &&
+ (dlm_dir_nodeid(r) != our_nodeid))
+ send_remove(r);
+
+ free_toss_rsb(r);
+ }
+}
+
/* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
unlock any spinlocks, go back and call pre_rsb_struct again.
Otherwise, take an rsb off the list and return it. */
@@ -451,6 +679,7 @@ static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
INIT_LIST_HEAD(&r->res_convertqueue);
INIT_LIST_HEAD(&r->res_waitqueue);
INIT_LIST_HEAD(&r->res_root_list);
+ INIT_LIST_HEAD(&r->res_toss_q_list);
INIT_LIST_HEAD(&r->res_recover_list);
INIT_LIST_HEAD(&r->res_masters_list);
@@ -638,6 +867,9 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
* valid for keep state rsbs
*/
kref_init(&r->res_ref);
+ rsb_delete_toss_timer(ls, r);
+ spin_unlock_bh(&ls->ls_rsbtbl_lock);
+
goto out_unlock;
@@ -777,6 +1009,9 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
* valid for keep state rsbs
*/
kref_init(&r->res_ref);
+ rsb_delete_toss_timer(ls, r);
+ spin_unlock_bh(&ls->ls_rsbtbl_lock);
+
goto out_unlock;
@@ -1050,7 +1285,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags,
r_nodeid, result);
- r->res_toss_time = jiffies;
+ rsb_mod_timer(ls, r);
/* the rsb was inactive (on toss list) */
spin_unlock_bh(&ls->ls_rsbtbl_lock);
@@ -1070,7 +1305,6 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
r->res_master_nodeid = from_nodeid;
r->res_nodeid = from_nodeid;
kref_init(&r->res_ref);
- r->res_toss_time = jiffies;
rsb_set_flag(r, RSB_TOSS);
error = rsb_insert(r, &ls->ls_rsbtbl);
@@ -1082,6 +1316,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
}
list_add(&r->res_rsbs_list, &ls->ls_toss);
+ rsb_mod_timer(ls, r);
if (result)
*result = DLM_LU_ADD;
@@ -1126,8 +1361,8 @@ static void toss_rsb(struct kref *kref)
DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
rsb_set_flag(r, RSB_TOSS);
list_move(&r->res_rsbs_list, &ls->ls_toss);
- r->res_toss_time = jiffies;
- set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl_flags);
+ rsb_mod_timer(ls, r);
+
if (r->res_lvbptr) {
dlm_free_lvb(r->res_lvbptr);
r->res_lvbptr = NULL;
@@ -1150,15 +1385,12 @@ void free_toss_rsb(struct dlm_rsb *r)
{
WARN_ON_ONCE(!rsb_flag(r, RSB_TOSS));
- /* check if all work is done after the rsb is on toss list
- * and it can be freed.
- */
-
DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
+ DLM_ASSERT(list_empty(&r->res_toss_q_list), dlm_dump_rsb(r););
DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
DLM_ASSERT(list_empty(&r->res_masters_list), dlm_dump_rsb(r););
@@ -1572,140 +1804,6 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb,
return error;
}
-static void shrink_bucket(struct dlm_ls *ls)
-{
- struct dlm_rsb *r, *safe;
- char *name;
- int our_nodeid = dlm_our_nodeid();
- int remote_count = 0;
- int need_shrink = 0;
- int i, len, rv;
-
- memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX);
-
- spin_lock_bh(&ls->ls_rsbtbl_lock);
-
- if (!test_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl_flags)) {
- spin_unlock_bh(&ls->ls_rsbtbl_lock);
- return;
- }
-
- list_for_each_entry_safe(r, safe, &ls->ls_toss, res_rsbs_list) {
- /* If we're the directory record for this rsb, and
- we're not the master of it, then we need to wait
- for the master node to send us a dir remove for
- before removing the dir record. */
-
- if (!dlm_no_directory(ls) &&
- (r->res_master_nodeid != our_nodeid) &&
- (dlm_dir_nodeid(r) == our_nodeid)) {
- continue;
- }
-
- need_shrink = 1;
-
- if (!time_after_eq(jiffies, r->res_toss_time +
- dlm_config.ci_toss_secs * HZ)) {
- continue;
- }
-
- if (!dlm_no_directory(ls) &&
- (r->res_master_nodeid == our_nodeid) &&
- (dlm_dir_nodeid(r) != our_nodeid)) {
-
- /* We're the master of this rsb but we're not
- the directory record, so we need to tell the
- dir node to remove the dir record. */
-
- ls->ls_remove_lens[remote_count] = r->res_length;
- memcpy(ls->ls_remove_names[remote_count], r->res_name,
- DLM_RESNAME_MAXLEN);
- remote_count++;
-
- if (remote_count >= DLM_REMOVE_NAMES_MAX)
- break;
- continue;
- }
-
- list_del(&r->res_rsbs_list);
- rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
- dlm_rhash_rsb_params);
- free_toss_rsb(r);
- }
-
- if (need_shrink)
- set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl_flags);
- else
- clear_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl_flags);
- spin_unlock_bh(&ls->ls_rsbtbl_lock);
-
- /*
- * While searching for rsb's to free, we found some that require
- * remote removal. We leave them in place and find them again here
- * so there is a very small gap between removing them from the toss
- * list and sending the removal. Keeping this gap small is
- * important to keep us (the master node) from being out of sync
- * with the remote dir node for very long.
- */
-
- for (i = 0; i < remote_count; i++) {
- name = ls->ls_remove_names[i];
- len = ls->ls_remove_lens[i];
-
- spin_lock_bh(&ls->ls_rsbtbl_lock);
- rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
- if (rv) {
- spin_unlock_bh(&ls->ls_rsbtbl_lock);
- log_error(ls, "remove_name not found %s", name);
- continue;
- }
-
- if (!rsb_flag(r, RSB_TOSS)) {
- spin_unlock_bh(&ls->ls_rsbtbl_lock);
- log_debug(ls, "remove_name not toss %s", name);
- continue;
- }
-
- if (r->res_master_nodeid != our_nodeid) {
- spin_unlock_bh(&ls->ls_rsbtbl_lock);
- log_debug(ls, "remove_name master %d dir %d our %d %s",
- r->res_master_nodeid, r->res_dir_nodeid,
- our_nodeid, name);
- continue;
- }
-
- if (r->res_dir_nodeid == our_nodeid) {
- /* should never happen */
- spin_unlock_bh(&ls->ls_rsbtbl_lock);
- log_error(ls, "remove_name dir %d master %d our %d %s",
- r->res_dir_nodeid, r->res_master_nodeid,
- our_nodeid, name);
- continue;
- }
-
- if (!time_after_eq(jiffies, r->res_toss_time +
- dlm_config.ci_toss_secs * HZ)) {
- spin_unlock_bh(&ls->ls_rsbtbl_lock);
- log_debug(ls, "remove_name toss_time %lu now %lu %s",
- r->res_toss_time, jiffies, name);
- continue;
- }
-
- list_del(&r->res_rsbs_list);
- rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
- dlm_rhash_rsb_params);
- send_remove(r);
- spin_unlock_bh(&ls->ls_rsbtbl_lock);
-
- free_toss_rsb(r);
- }
-}
-
-void dlm_scan_rsbs(struct dlm_ls *ls)
-{
- shrink_bucket(ls);
-}
-
/* lkb is master or local copy */
static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)