diff options
author | David S. Miller <davem@davemloft.net> | 2016-02-06 03:42:09 -0500 |
---|---|---|
committer | David S. Miller <davem@davemloft.net> | 2016-02-06 03:42:09 -0500 |
commit | 9a23ac475cd98a1155ce16a8fab4d28e171c0d9d (patch) | |
tree | 1956a277b4830fef41fbdbc5ae57db0cbfdf04e6 | |
parent | c6140a299bdf2425d092383fafaaf2a4a3745989 (diff) | |
parent | 06c8581f85e99bbf69723f76ad2a40fa8a8c8cdd (diff) | |
download | lwn-9a23ac475cd98a1155ce16a8fab4d28e171c0d9d.tar.gz lwn-9a23ac475cd98a1155ce16a8fab4d28e171c0d9d.zip |
Merge branch 'tipc-topology-updates'
Parthasarathy Bhuvaragan says:
====================
tipc: cleanups, fixes & improvements for topology server
This series contains topology server cleanups, fixes and improvements.
Cleanups in #1-#4:
We remove duplicate data structures and aligin the rest of the code accordingly.
Fixes in #5-#8:
The bugs occur either during configuration or while running on SMP targets,
which are race conditions that pop up under different situations.
Improvements in #9-#10:
Updates to decrease timer usage and improve readability.
v2: Updated commit message in patch 6 based on feedback from
Sergei Shtylyov sergei.shtylyov@cogentembedded.com
====================
Signed-off-by: David S. Miller <davem@davemloft.net>
-rw-r--r-- | net/tipc/name_table.c | 14 | ||||
-rw-r--r-- | net/tipc/server.c | 4 | ||||
-rw-r--r-- | net/tipc/subscr.c | 130 | ||||
-rw-r--r-- | net/tipc/subscr.h | 11 |
4 files changed, 96 insertions, 63 deletions
diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c index 91fce70291a8..777b979b8463 100644 --- a/net/tipc/name_table.c +++ b/net/tipc/name_table.c @@ -418,6 +418,9 @@ static void tipc_nameseq_subscribe(struct name_seq *nseq, struct tipc_subscription *s) { struct sub_seq *sseq = nseq->sseqs; + struct tipc_name_seq ns; + + tipc_subscrp_convert_seq(&s->evt.s.seq, s->swap, &ns); list_add(&s->nameseq_list, &nseq->subscriptions); @@ -425,7 +428,7 @@ static void tipc_nameseq_subscribe(struct name_seq *nseq, return; while (sseq != &nseq->sseqs[nseq->first_free]) { - if (tipc_subscrp_check_overlap(s, sseq->lower, sseq->upper)) { + if (tipc_subscrp_check_overlap(&ns, sseq->lower, sseq->upper)) { struct publication *crs; struct name_info *info = sseq->info; int must_report = 1; @@ -722,9 +725,10 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower, u32 ref, void tipc_nametbl_subscribe(struct tipc_subscription *s) { struct tipc_net *tn = net_generic(s->net, tipc_net_id); - u32 type = s->seq.type; + u32 type = tipc_subscrp_convert_seq_type(s->evt.s.seq.type, s->swap); int index = hash(type); struct name_seq *seq; + struct tipc_name_seq ns; spin_lock_bh(&tn->nametbl_lock); seq = nametbl_find_seq(s->net, type); @@ -735,8 +739,9 @@ void tipc_nametbl_subscribe(struct tipc_subscription *s) tipc_nameseq_subscribe(seq, s); spin_unlock_bh(&seq->lock); } else { + tipc_subscrp_convert_seq(&s->evt.s.seq, s->swap, &ns); pr_warn("Failed to create subscription for {%u,%u,%u}\n", - s->seq.type, s->seq.lower, s->seq.upper); + ns.type, ns.lower, ns.upper); } spin_unlock_bh(&tn->nametbl_lock); } @@ -748,9 +753,10 @@ void tipc_nametbl_unsubscribe(struct tipc_subscription *s) { struct tipc_net *tn = net_generic(s->net, tipc_net_id); struct name_seq *seq; + u32 type = tipc_subscrp_convert_seq_type(s->evt.s.seq.type, s->swap); spin_lock_bh(&tn->nametbl_lock); - seq = nametbl_find_seq(s->net, s->seq.type); + seq = nametbl_find_seq(s->net, type); if (seq != NULL) { spin_lock_bh(&seq->lock); list_del_init(&s->nameseq_list); diff --git a/net/tipc/server.c b/net/tipc/server.c index 922e04a43396..2446bfbaa309 100644 --- a/net/tipc/server.c +++ b/net/tipc/server.c @@ -571,13 +571,13 @@ static void tipc_work_stop(struct tipc_server *s) static int tipc_work_start(struct tipc_server *s) { - s->rcv_wq = alloc_workqueue("tipc_rcv", WQ_UNBOUND, 1); + s->rcv_wq = alloc_ordered_workqueue("tipc_rcv", 0); if (!s->rcv_wq) { pr_err("can't start tipc receive workqueue\n"); return -ENOMEM; } - s->send_wq = alloc_workqueue("tipc_send", WQ_UNBOUND, 1); + s->send_wq = alloc_ordered_workqueue("tipc_send", 0); if (!s->send_wq) { pr_err("can't start tipc send workqueue\n"); destroy_workqueue(s->rcv_wq); diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c index 69ee2eeef968..22963cafd5ed 100644 --- a/net/tipc/subscr.c +++ b/net/tipc/subscr.c @@ -92,25 +92,42 @@ static void tipc_subscrp_send_event(struct tipc_subscription *sub, * * Returns 1 if there is overlap, otherwise 0. */ -int tipc_subscrp_check_overlap(struct tipc_subscription *sub, u32 found_lower, +int tipc_subscrp_check_overlap(struct tipc_name_seq *seq, u32 found_lower, u32 found_upper) { - if (found_lower < sub->seq.lower) - found_lower = sub->seq.lower; - if (found_upper > sub->seq.upper) - found_upper = sub->seq.upper; + if (found_lower < seq->lower) + found_lower = seq->lower; + if (found_upper > seq->upper) + found_upper = seq->upper; if (found_lower > found_upper) return 0; return 1; } +u32 tipc_subscrp_convert_seq_type(u32 type, int swap) +{ + return htohl(type, swap); +} + +void tipc_subscrp_convert_seq(struct tipc_name_seq *in, int swap, + struct tipc_name_seq *out) +{ + out->type = htohl(in->type, swap); + out->lower = htohl(in->lower, swap); + out->upper = htohl(in->upper, swap); +} + void tipc_subscrp_report_overlap(struct tipc_subscription *sub, u32 found_lower, u32 found_upper, u32 event, u32 port_ref, u32 node, int must) { - if (!tipc_subscrp_check_overlap(sub, found_lower, found_upper)) + struct tipc_name_seq seq; + + tipc_subscrp_convert_seq(&sub->evt.s.seq, sub->swap, &seq); + if (!tipc_subscrp_check_overlap(&seq, found_lower, found_upper)) return; - if (!must && !(sub->filter & TIPC_SUB_PORTS)) + if (!must && + !(htohl(sub->evt.s.filter, sub->swap) & TIPC_SUB_PORTS)) return; tipc_subscrp_send_event(sub, found_lower, found_upper, event, port_ref, @@ -171,12 +188,14 @@ static struct tipc_subscriber *tipc_subscrb_create(int conid) static void tipc_subscrb_delete(struct tipc_subscriber *subscriber) { struct tipc_subscription *sub, *temp; + u32 timeout; spin_lock_bh(&subscriber->lock); /* Destroy any existing subscriptions for subscriber */ list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list, subscrp_list) { - if (del_timer(&sub->timer)) { + timeout = htohl(sub->evt.s.timeout, sub->swap); + if ((timeout == TIPC_WAIT_FOREVER) || del_timer(&sub->timer)) { tipc_subscrp_delete(sub); tipc_subscrb_put(subscriber); } @@ -200,13 +219,16 @@ static void tipc_subscrp_cancel(struct tipc_subscr *s, struct tipc_subscriber *subscriber) { struct tipc_subscription *sub, *temp; + u32 timeout; spin_lock_bh(&subscriber->lock); /* Find first matching subscription, exit if not found */ list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list, subscrp_list) { if (!memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr))) { - if (del_timer(&sub->timer)) { + timeout = htohl(sub->evt.s.timeout, sub->swap); + if ((timeout == TIPC_WAIT_FOREVER) || + del_timer(&sub->timer)) { tipc_subscrp_delete(sub); tipc_subscrb_put(subscriber); } @@ -216,66 +238,67 @@ static void tipc_subscrp_cancel(struct tipc_subscr *s, spin_unlock_bh(&subscriber->lock); } -static int tipc_subscrp_create(struct net *net, struct tipc_subscr *s, - struct tipc_subscriber *subscriber, - struct tipc_subscription **sub_p) +static struct tipc_subscription *tipc_subscrp_create(struct net *net, + struct tipc_subscr *s, + int swap) { struct tipc_net *tn = net_generic(net, tipc_net_id); struct tipc_subscription *sub; - int swap; - - /* Determine subscriber's endianness */ - swap = !(s->filter & (TIPC_SUB_PORTS | TIPC_SUB_SERVICE)); - - /* Detect & process a subscription cancellation request */ - if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) { - s->filter &= ~htohl(TIPC_SUB_CANCEL, swap); - tipc_subscrp_cancel(s, subscriber); - return 0; - } + u32 filter = htohl(s->filter, swap); /* Refuse subscription if global limit exceeded */ if (atomic_read(&tn->subscription_count) >= TIPC_MAX_SUBSCRIPTIONS) { pr_warn("Subscription rejected, limit reached (%u)\n", TIPC_MAX_SUBSCRIPTIONS); - return -EINVAL; + return NULL; } /* Allocate subscription object */ sub = kmalloc(sizeof(*sub), GFP_ATOMIC); if (!sub) { pr_warn("Subscription rejected, no memory\n"); - return -ENOMEM; + return NULL; } /* Initialize subscription object */ sub->net = net; - sub->seq.type = htohl(s->seq.type, swap); - sub->seq.lower = htohl(s->seq.lower, swap); - sub->seq.upper = htohl(s->seq.upper, swap); - sub->timeout = msecs_to_jiffies(htohl(s->timeout, swap)); - sub->filter = htohl(s->filter, swap); - if ((!(sub->filter & TIPC_SUB_PORTS) == - !(sub->filter & TIPC_SUB_SERVICE)) || - (sub->seq.lower > sub->seq.upper)) { + if (((filter & TIPC_SUB_PORTS) && (filter & TIPC_SUB_SERVICE)) || + (htohl(s->seq.lower, swap) > htohl(s->seq.upper, swap))) { pr_warn("Subscription rejected, illegal request\n"); kfree(sub); - return -EINVAL; + return NULL; } - spin_lock_bh(&subscriber->lock); - list_add(&sub->subscrp_list, &subscriber->subscrp_list); - spin_unlock_bh(&subscriber->lock); - sub->subscriber = subscriber; + sub->swap = swap; memcpy(&sub->evt.s, s, sizeof(*s)); atomic_inc(&tn->subscription_count); + return sub; +} + +static void tipc_subscrp_subscribe(struct net *net, struct tipc_subscr *s, + struct tipc_subscriber *subscriber, int swap) +{ + struct tipc_net *tn = net_generic(net, tipc_net_id); + struct tipc_subscription *sub = NULL; + u32 timeout; + + sub = tipc_subscrp_create(net, s, swap); + if (!sub) + return tipc_conn_terminate(tn->topsrv, subscriber->conid); + + spin_lock_bh(&subscriber->lock); + list_add(&sub->subscrp_list, &subscriber->subscrp_list); + tipc_subscrb_get(subscriber); + sub->subscriber = subscriber; + tipc_nametbl_subscribe(sub); + spin_unlock_bh(&subscriber->lock); + + timeout = htohl(sub->evt.s.timeout, swap); + if (timeout == TIPC_WAIT_FOREVER) + return; + setup_timer(&sub->timer, tipc_subscrp_timeout, (unsigned long)sub); - if (sub->timeout != TIPC_WAIT_FOREVER) - sub->timeout += jiffies; - if (!mod_timer(&sub->timer, sub->timeout)) - tipc_subscrb_get(subscriber); - *sub_p = sub; - return 0; + mod_timer(&sub->timer, jiffies + msecs_to_jiffies(timeout)); } /* Handle one termination request for the subscriber */ @@ -289,14 +312,21 @@ static void tipc_subscrb_rcv_cb(struct net *net, int conid, struct sockaddr_tipc *addr, void *usr_data, void *buf, size_t len) { - struct tipc_subscriber *subscrb = usr_data; - struct tipc_subscription *sub = NULL; - struct tipc_net *tn = net_generic(net, tipc_net_id); + struct tipc_subscriber *subscriber = usr_data; + struct tipc_subscr *s = (struct tipc_subscr *)buf; + int swap; + + /* Determine subscriber's endianness */ + swap = !(s->filter & (TIPC_SUB_PORTS | TIPC_SUB_SERVICE | + TIPC_SUB_CANCEL)); - if (tipc_subscrp_create(net, (struct tipc_subscr *)buf, subscrb, &sub)) - return tipc_conn_terminate(tn->topsrv, subscrb->conid); + /* Detect & process a subscription cancellation request */ + if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) { + s->filter &= ~htohl(TIPC_SUB_CANCEL, swap); + return tipc_subscrp_cancel(s, subscriber); + } - tipc_nametbl_subscribe(sub); + tipc_subscrp_subscribe(net, s, subscriber, swap); } /* Handle one request to establish a new subscriber */ diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h index 92ee18cc5fe6..be60103082c9 100644 --- a/net/tipc/subscr.h +++ b/net/tipc/subscr.h @@ -50,21 +50,15 @@ struct tipc_subscriber; * @subscriber: pointer to its subscriber * @seq: name sequence associated with subscription * @net: point to network namespace - * @timeout: duration of subscription (in ms) - * @filter: event filtering to be done for subscription * @timer: timer governing subscription duration (optional) * @nameseq_list: adjacent subscriptions in name sequence's subscription list * @subscrp_list: adjacent subscriptions in subscriber's subscription list - * @server_ref: object reference of server port associated with subscription * @swap: indicates if subscriber uses opposite endianness in its messages * @evt: template for events generated by subscription */ struct tipc_subscription { struct tipc_subscriber *subscriber; - struct tipc_name_seq seq; struct net *net; - unsigned long timeout; - u32 filter; struct timer_list timer; struct list_head nameseq_list; struct list_head subscrp_list; @@ -72,11 +66,14 @@ struct tipc_subscription { struct tipc_event evt; }; -int tipc_subscrp_check_overlap(struct tipc_subscription *sub, u32 found_lower, +int tipc_subscrp_check_overlap(struct tipc_name_seq *seq, u32 found_lower, u32 found_upper); void tipc_subscrp_report_overlap(struct tipc_subscription *sub, u32 found_lower, u32 found_upper, u32 event, u32 port_ref, u32 node, int must); +void tipc_subscrp_convert_seq(struct tipc_name_seq *in, int swap, + struct tipc_name_seq *out); +u32 tipc_subscrp_convert_seq_type(u32 type, int swap); int tipc_topsrv_start(struct net *net); void tipc_topsrv_stop(struct net *net); |