diff options
Diffstat (limited to 'fs/dlm')
-rw-r--r-- | fs/dlm/lowcomms.c | 30 | ||||
-rw-r--r-- | fs/dlm/midcomms.c | 76 |
2 files changed, 31 insertions, 75 deletions
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index 345a316ae54c..68092f953830 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -860,30 +860,8 @@ struct dlm_processed_nodes { struct list_head list; }; -static void add_processed_node(int nodeid, struct list_head *processed_nodes) -{ - struct dlm_processed_nodes *n; - - list_for_each_entry(n, processed_nodes, list) { - /* we already remembered this node */ - if (n->nodeid == nodeid) - return; - } - - /* if it's fails in worst case we simple don't send an ack back. - * We try it next time. - */ - n = kmalloc(sizeof(*n), GFP_NOFS); - if (!n) - return; - - n->nodeid = nodeid; - list_add(&n->list, processed_nodes); -} - static void process_dlm_messages(struct work_struct *work) { - struct dlm_processed_nodes *n, *n_tmp; struct processqueue_entry *pentry; LIST_HEAD(processed_nodes); @@ -902,7 +880,6 @@ static void process_dlm_messages(struct work_struct *work) for (;;) { dlm_process_incoming_buffer(pentry->nodeid, pentry->buf, pentry->buflen); - add_processed_node(pentry->nodeid, &processed_nodes); free_processqueue_entry(pentry); spin_lock(&processqueue_lock); @@ -917,13 +894,6 @@ static void process_dlm_messages(struct work_struct *work) list_del(&pentry->list); spin_unlock(&processqueue_lock); } - - /* send ack back after we processed couple of messages */ - list_for_each_entry_safe(n, n_tmp, &processed_nodes, list) { - list_del(&n->list); - dlm_midcomms_receive_done(n->nodeid); - kfree(n); - } } /* Data received from remote end */ diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c index 70286737eb0a..e1a0df67b566 100644 --- a/fs/dlm/midcomms.c +++ b/fs/dlm/midcomms.c @@ -148,6 +148,8 @@ /* 5 seconds wait to sync ending of dlm */ #define DLM_SHUTDOWN_TIMEOUT msecs_to_jiffies(5000) #define DLM_VERSION_NOT_SET 0 +#define DLM_SEND_ACK_BACK_MSG_THRESHOLD 32 +#define DLM_RECV_ACK_BACK_MSG_THRESHOLD (DLM_SEND_ACK_BACK_MSG_THRESHOLD * 8) struct midcomms_node { int nodeid; @@ -165,7 +167,7 @@ struct midcomms_node { #define DLM_NODE_FLAG_CLOSE 1 #define DLM_NODE_FLAG_STOP_TX 2 #define DLM_NODE_FLAG_STOP_RX 3 -#define DLM_NODE_ULP_DELIVERED 4 + atomic_t ulp_delivered; unsigned long flags; wait_queue_head_t shutdown_wait; @@ -319,6 +321,7 @@ static void midcomms_node_reset(struct midcomms_node *node) atomic_set(&node->seq_next, DLM_SEQ_INIT); atomic_set(&node->seq_send, DLM_SEQ_INIT); + atomic_set(&node->ulp_delivered, 0); node->version = DLM_VERSION_NOT_SET; node->flags = 0; @@ -393,6 +396,28 @@ static int dlm_send_ack(int nodeid, uint32_t seq) return 0; } +static void dlm_send_ack_threshold(struct midcomms_node *node, + uint32_t threshold) +{ + uint32_t oval, nval; + bool send_ack; + + /* let only send one user trigger threshold to send ack back */ + do { + oval = atomic_read(&node->ulp_delivered); + send_ack = (oval > threshold); + /* abort if threshold is not reached */ + if (!send_ack) + break; + + nval = 0; + /* try to reset ulp_delivered counter */ + } while (atomic_cmpxchg(&node->ulp_delivered, oval, nval) != oval); + + if (send_ack) + dlm_send_ack(node->nodeid, atomic_read(&node->seq_next)); +} + static int dlm_send_fin(struct midcomms_node *node, void (*ack_rcv)(struct midcomms_node *node)) { @@ -560,7 +585,9 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p, WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags)); dlm_receive_buffer_3_2_trace(seq, p); dlm_receive_buffer(p, node->nodeid); - set_bit(DLM_NODE_ULP_DELIVERED, &node->flags); + atomic_inc(&node->ulp_delivered); + /* unlikely case to send ack back when we don't transmit */ + dlm_send_ack_threshold(node, DLM_RECV_ACK_BACK_MSG_THRESHOLD); break; } } else { @@ -969,49 +996,6 @@ int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len) return ret; } -void dlm_midcomms_receive_done(int nodeid) -{ - struct midcomms_node *node; - int idx; - - idx = srcu_read_lock(&nodes_srcu); - node = nodeid2node(nodeid, 0); - if (!node) { - srcu_read_unlock(&nodes_srcu, idx); - return; - } - - /* old protocol, we do nothing */ - switch (node->version) { - case DLM_VERSION_3_2: - break; - default: - srcu_read_unlock(&nodes_srcu, idx); - return; - } - - /* do nothing if we didn't delivered stateful to ulp */ - if (!test_and_clear_bit(DLM_NODE_ULP_DELIVERED, - &node->flags)) { - srcu_read_unlock(&nodes_srcu, idx); - return; - } - - spin_lock(&node->state_lock); - /* we only ack if state is ESTABLISHED */ - switch (node->state) { - case DLM_ESTABLISHED: - spin_unlock(&node->state_lock); - dlm_send_ack(node->nodeid, atomic_read(&node->seq_next)); - break; - default: - spin_unlock(&node->state_lock); - /* do nothing FIN has it's own ack send */ - break; - } - srcu_read_unlock(&nodes_srcu, idx); -} - void dlm_midcomms_unack_msg_resend(int nodeid) { struct midcomms_node *node; @@ -1142,6 +1126,8 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len, goto err; } + /* send ack back if necessary */ + dlm_send_ack_threshold(node, DLM_SEND_ACK_BACK_MSG_THRESHOLD); break; default: dlm_free_mhandle(mh); |