From 5266698661401afc5e4a1a521cf9ba10724d10dd Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 22 Oct 2015 08:51:41 -0400 Subject: tipc: let broadcast packet reception use new link receive function The code path for receiving broadcast packets is currently distinct from the unicast path. This leads to unnecessary code and data duplication, something that can be avoided with some effort. We now introduce separate per-peer tipc_link instances for handling broadcast packet reception. Each receive link keeps a pointer to the common, single, broadcast link instance, and can hence handle release and retransmission of send buffers as if they belonged to the own instance. Furthermore, we let each unicast link instance keep a reference to both the pertaining broadcast receive link, and to the common send link. This makes it possible for the unicast links to easily access data for broadcast link synchronization, as well as for carrying acknowledges for received broadcast packets. Signed-off-by: Jon Maloy Reviewed-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/node.c | 158 +++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 106 insertions(+), 52 deletions(-) (limited to 'net/tipc/node.c') diff --git a/net/tipc/node.c b/net/tipc/node.c index 28bcd7be23c6..cd924552244b 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -72,7 +72,6 @@ static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id, static void tipc_node_link_down(struct tipc_node *n, int bearer_id, bool delete); static void node_lost_contact(struct tipc_node *n, struct sk_buff_head *inputq); -static void node_established_contact(struct tipc_node *n_ptr); static void tipc_node_delete(struct tipc_node *node); static void tipc_node_timeout(unsigned long data); static void tipc_node_fsm_evt(struct tipc_node *n, int evt); @@ -165,8 +164,10 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities) INIT_LIST_HEAD(&n_ptr->list); INIT_LIST_HEAD(&n_ptr->publ_list); INIT_LIST_HEAD(&n_ptr->conn_sks); - skb_queue_head_init(&n_ptr->bclink.namedq); - __skb_queue_head_init(&n_ptr->bclink.deferdq); + skb_queue_head_init(&n_ptr->bc_entry.namedq); + skb_queue_head_init(&n_ptr->bc_entry.inputq1); + __skb_queue_head_init(&n_ptr->bc_entry.arrvq); + skb_queue_head_init(&n_ptr->bc_entry.inputq2); hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]); list_for_each_entry_rcu(temp_node, &tn->node_list, list) { if (n_ptr->addr < temp_node->addr) @@ -177,6 +178,18 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities) n_ptr->signature = INVALID_NODE_SIG; n_ptr->active_links[0] = INVALID_BEARER_ID; n_ptr->active_links[1] = INVALID_BEARER_ID; + if (!tipc_link_bc_create(n_ptr, tipc_own_addr(net), n_ptr->addr, + U16_MAX, tipc_bc_sndlink(net)->window, + n_ptr->capabilities, + &n_ptr->bc_entry.inputq1, + &n_ptr->bc_entry.namedq, + tipc_bc_sndlink(net), + &n_ptr->bc_entry.link)) { + pr_warn("Broadcast rcv link creation failed, no memory\n"); + kfree(n_ptr); + n_ptr = NULL; + goto exit; + } tipc_node_get(n_ptr); setup_timer(&n_ptr->timer, tipc_node_timeout, (unsigned long)n_ptr); n_ptr->keepalive_intv = U32_MAX; @@ -203,6 +216,7 @@ static void tipc_node_delete(struct tipc_node *node) { list_del_rcu(&node->list); hlist_del_rcu(&node->hash); + kfree(node->bc_entry.link); kfree_rcu(node, rcu); } @@ -340,8 +354,9 @@ static void __tipc_node_link_up(struct tipc_node *n, int bearer_id, if (!ol) { *slot0 = bearer_id; *slot1 = bearer_id; - tipc_link_build_bcast_sync_msg(nl, xmitq); - node_established_contact(n); + tipc_node_fsm_evt(n, SELF_ESTABL_CONTACT_EVT); + n->action_flags |= TIPC_NOTIFY_NODE_UP; + tipc_bcast_add_peer(n->net, n->addr, nl, xmitq); return; } @@ -585,9 +600,10 @@ void tipc_node_check_dest(struct net *net, u32 onode, b->net_plane, b->mtu, b->priority, b->window, mod(tipc_net(net)->random), tipc_own_addr(net), onode, - n->capabilities, - &le->maddr, &le->inputq, - &n->bclink.namedq, &l)) { + n->capabilities, &le->maddr, + tipc_bc_sndlink(n->net), n->bc_entry.link, + &le->inputq, + &n->bc_entry.namedq, &l)) { *respond = false; goto exit; } @@ -830,58 +846,36 @@ bool tipc_node_filter_pkt(struct tipc_node *n, struct tipc_msg *hdr) return true; } -static void node_established_contact(struct tipc_node *n_ptr) -{ - tipc_node_fsm_evt(n_ptr, SELF_ESTABL_CONTACT_EVT); - n_ptr->action_flags |= TIPC_NOTIFY_NODE_UP; - n_ptr->bclink.oos_state = 0; - n_ptr->bclink.acked = tipc_bclink_get_last_sent(n_ptr->net); - tipc_bclink_add_node(n_ptr->net, n_ptr->addr); -} - -static void node_lost_contact(struct tipc_node *n_ptr, +static void node_lost_contact(struct tipc_node *n, struct sk_buff_head *inputq) { char addr_string[16]; struct tipc_sock_conn *conn, *safe; struct tipc_link *l; - struct list_head *conns = &n_ptr->conn_sks; + struct list_head *conns = &n->conn_sks; struct sk_buff *skb; - struct tipc_net *tn = net_generic(n_ptr->net, tipc_net_id); uint i; pr_debug("Lost contact with %s\n", - tipc_addr_string_fill(addr_string, n_ptr->addr)); - - /* Flush broadcast link info associated with lost node */ - if (n_ptr->bclink.recv_permitted) { - __skb_queue_purge(&n_ptr->bclink.deferdq); + tipc_addr_string_fill(addr_string, n->addr)); - if (n_ptr->bclink.reasm_buf) { - kfree_skb(n_ptr->bclink.reasm_buf); - n_ptr->bclink.reasm_buf = NULL; - } - - tipc_bclink_remove_node(n_ptr->net, n_ptr->addr); - tipc_bclink_acknowledge(n_ptr, INVALID_LINK_SEQ); - - n_ptr->bclink.recv_permitted = false; - } + /* Clean up broadcast state */ + tipc_bcast_remove_peer(n->net, n->addr, n->bc_entry.link); /* Abort any ongoing link failover */ for (i = 0; i < MAX_BEARERS; i++) { - l = n_ptr->links[i].link; + l = n->links[i].link; if (l) tipc_link_fsm_evt(l, LINK_FAILOVER_END_EVT); } /* Notify publications from this node */ - n_ptr->action_flags |= TIPC_NOTIFY_NODE_DOWN; + n->action_flags |= TIPC_NOTIFY_NODE_DOWN; /* Notify sockets connected to node */ list_for_each_entry_safe(conn, safe, conns, list) { skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG, - SHORT_H_SIZE, 0, tn->own_addr, + SHORT_H_SIZE, 0, tipc_own_addr(n->net), conn->peer_node, conn->port, conn->peer_port, TIPC_ERR_NO_NODE); if (likely(skb)) @@ -1085,6 +1079,67 @@ int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode, return 0; } +/** + * tipc_node_bc_rcv - process TIPC broadcast packet arriving from off-node + * @net: the applicable net namespace + * @skb: TIPC packet + * @bearer_id: id of bearer message arrived on + * + * Invoked with no locks held. + */ +void tipc_node_bc_rcv(struct net *net, struct sk_buff *skb, int bearer_id) +{ + int rc; + struct sk_buff_head xmitq; + struct tipc_bclink_entry *be; + struct tipc_link_entry *le; + struct tipc_msg *hdr = buf_msg(skb); + int usr = msg_user(hdr); + u32 dnode = msg_destnode(hdr); + struct tipc_node *n; + + __skb_queue_head_init(&xmitq); + + /* If NACK for other node, let rcv link for that node peek into it */ + if ((usr == BCAST_PROTOCOL) && (dnode != tipc_own_addr(net))) + n = tipc_node_find(net, dnode); + else + n = tipc_node_find(net, msg_prevnode(hdr)); + if (!n) { + kfree_skb(skb); + return; + } + be = &n->bc_entry; + le = &n->links[bearer_id]; + + rc = tipc_bcast_rcv(net, be->link, skb); + + /* Broadcast link reset may happen at reassembly failure */ + if (rc & TIPC_LINK_DOWN_EVT) + tipc_node_reset_links(n); + + /* Broadcast ACKs are sent on a unicast link */ + if (rc & TIPC_LINK_SND_BC_ACK) { + tipc_node_lock(n); + tipc_link_build_ack_msg(le->link, &xmitq); + tipc_node_unlock(n); + } + + if (!skb_queue_empty(&xmitq)) + tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr); + + /* Deliver. 'arrvq' is under inputq2's lock protection */ + if (!skb_queue_empty(&be->inputq1)) { + spin_lock_bh(&be->inputq2.lock); + spin_lock_bh(&be->inputq1.lock); + skb_queue_splice_tail_init(&be->inputq1, &be->arrvq); + spin_unlock_bh(&be->inputq1.lock); + spin_unlock_bh(&be->inputq2.lock); + tipc_sk_mcast_rcv(net, &be->arrvq, &be->inputq2); + } + tipc_node_put(n); +} + /** * tipc_node_check_state - check and if necessary update node state * @skb: TIPC packet @@ -1227,6 +1282,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b) int usr = msg_user(hdr); int bearer_id = b->identity; struct tipc_link_entry *le; + u16 bc_ack = msg_bcast_ack(hdr); int rc = 0; __skb_queue_head_init(&xmitq); @@ -1235,13 +1291,12 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b) if (unlikely(!tipc_msg_validate(skb))) goto discard; - /* Handle arrival of a non-unicast link packet */ + /* Handle arrival of discovery or broadcast packet */ if (unlikely(msg_non_seq(hdr))) { - if (usr == LINK_CONFIG) - tipc_disc_rcv(net, skb, b); + if (unlikely(usr == LINK_CONFIG)) + return tipc_disc_rcv(net, skb, b); else - tipc_bclink_rcv(net, skb); - return; + return tipc_node_bc_rcv(net, skb, bearer_id); } /* Locate neighboring node that sent packet */ @@ -1250,19 +1305,18 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b) goto discard; le = &n->links[bearer_id]; + /* Ensure broadcast reception is in synch with peer's send state */ + if (unlikely(usr == LINK_PROTOCOL)) + tipc_bcast_sync_rcv(net, n->bc_entry.link, hdr); + else if (unlikely(n->bc_entry.link->acked != bc_ack)) + tipc_bcast_ack_rcv(net, n->bc_entry.link, bc_ack); + tipc_node_lock(n); /* Is reception permitted at the moment ? */ if (!tipc_node_filter_pkt(n, hdr)) goto unlock; - if (unlikely(msg_user(hdr) == LINK_PROTOCOL)) - tipc_bclink_sync_state(n, hdr); - - /* Release acked broadcast packets */ - if (unlikely(n->bclink.acked != msg_bcast_ack(hdr))) - tipc_bclink_acknowledge(n, msg_bcast_ack(hdr)); - /* Check and if necessary update node state */ if (likely(tipc_node_check_state(n, skb, bearer_id, &xmitq))) { rc = tipc_link_rcv(le->link, skb, &xmitq); @@ -1277,8 +1331,8 @@ unlock: if (unlikely(rc & TIPC_LINK_DOWN_EVT)) tipc_node_link_down(n, bearer_id, false); - if (unlikely(!skb_queue_empty(&n->bclink.namedq))) - tipc_named_rcv(net, &n->bclink.namedq); + if (unlikely(!skb_queue_empty(&n->bc_entry.namedq))) + tipc_named_rcv(net, &n->bc_entry.namedq); if (!skb_queue_empty(&le->inputq)) tipc_sk_rcv(net, &le->inputq); -- cgit v1.2.3