diff options
Diffstat (limited to 'net/sunrpc/xprtrdma')
-rw-r--r-- | net/sunrpc/xprtrdma/backchannel.c | 6 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/fmr_ops.c | 19 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/frwr_ops.c | 27 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/rpc_rdma.c | 363 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/transport.c | 19 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/verbs.c | 236 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/xprt_rdma.h | 119 |
7 files changed, 503 insertions, 286 deletions
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c index 823a781ec89c..8b818bb3518a 100644 --- a/net/sunrpc/xprtrdma/backchannel.c +++ b/net/sunrpc/xprtrdma/backchannel.c @@ -43,7 +43,7 @@ static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt, req = rpcrdma_create_req(r_xprt); if (IS_ERR(req)) return PTR_ERR(req); - req->rl_backchannel = true; + __set_bit(RPCRDMA_REQ_F_BACKCHANNEL, &req->rl_flags); rb = rpcrdma_alloc_regbuf(RPCRDMA_HDRBUF_SIZE, DMA_TO_DEVICE, GFP_KERNEL); @@ -223,8 +223,8 @@ int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst) *p++ = xdr_zero; *p = xdr_zero; - if (!rpcrdma_prepare_send_sges(&r_xprt->rx_ia, req, RPCRDMA_HDRLEN_MIN, - &rqst->rq_snd_buf, rpcrdma_noch)) + if (rpcrdma_prepare_send_sges(r_xprt, req, RPCRDMA_HDRLEN_MIN, + &rqst->rq_snd_buf, rpcrdma_noch)) return -EIO; return 0; } diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c index fa759dd2b0f3..29fc84c7ff98 100644 --- a/net/sunrpc/xprtrdma/fmr_ops.c +++ b/net/sunrpc/xprtrdma/fmr_ops.c @@ -306,28 +306,9 @@ out_reset: } } -/* Use a slow, safe mechanism to invalidate all memory regions - * that were registered for "req". - */ -static void -fmr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, - bool sync) -{ - struct rpcrdma_mw *mw; - - while (!list_empty(&req->rl_registered)) { - mw = rpcrdma_pop_mw(&req->rl_registered); - if (sync) - fmr_op_recover_mr(mw); - else - rpcrdma_defer_mr_recovery(mw); - } -} - const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops = { .ro_map = fmr_op_map, .ro_unmap_sync = fmr_op_unmap_sync, - .ro_unmap_safe = fmr_op_unmap_safe, .ro_recover_mr = fmr_op_recover_mr, .ro_open = fmr_op_open, .ro_maxpages = fmr_op_maxpages, diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c index 35d7517ef0e6..773e66e10a15 100644 --- a/net/sunrpc/xprtrdma/frwr_ops.c +++ b/net/sunrpc/xprtrdma/frwr_ops.c @@ -420,7 +420,6 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE : IB_ACCESS_REMOTE_READ; - rpcrdma_set_signaled(&r_xprt->rx_ep, ®_wr->wr); rc = ib_post_send(ia->ri_id->qp, ®_wr->wr, &bad_wr); if (rc) goto out_senderr; @@ -508,12 +507,6 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mws) f->fr_cqe.done = frwr_wc_localinv_wake; reinit_completion(&f->fr_linv_done); - /* Initialize CQ count, since there is always a signaled - * WR being posted here. The new cqcount depends on how - * many SQEs are about to be consumed. - */ - rpcrdma_init_cqcount(&r_xprt->rx_ep, count); - /* Transport disconnect drains the receive CQ before it * replaces the QP. The RPC reply handler won't call us * unless ri_id->qp is a valid pointer. @@ -546,7 +539,6 @@ reset_mrs: /* Find and reset the MRs in the LOCAL_INV WRs that did not * get posted. */ - rpcrdma_init_cqcount(&r_xprt->rx_ep, -count); while (bad_wr) { f = container_of(bad_wr, struct rpcrdma_frmr, fr_invwr); @@ -559,28 +551,9 @@ reset_mrs: goto unmap; } -/* Use a slow, safe mechanism to invalidate all memory regions - * that were registered for "req". - */ -static void -frwr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, - bool sync) -{ - struct rpcrdma_mw *mw; - - while (!list_empty(&req->rl_registered)) { - mw = rpcrdma_pop_mw(&req->rl_registered); - if (sync) - frwr_op_recover_mr(mw); - else - rpcrdma_defer_mr_recovery(mw); - } -} - const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = { .ro_map = frwr_op_map, .ro_unmap_sync = frwr_op_unmap_sync, - .ro_unmap_safe = frwr_op_unmap_safe, .ro_recover_mr = frwr_op_recover_mr, .ro_open = frwr_op_open, .ro_maxpages = frwr_op_maxpages, diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index f1889f4d4803..ed34dc0f144c 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -1,4 +1,5 @@ /* + * Copyright (c) 2014-2017 Oracle. All rights reserved. * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. * * This software is available to you under a choice of one of two @@ -75,11 +76,11 @@ static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs) /* Maximum Read list size */ maxsegs += 2; /* segment for head and tail buffers */ - size = maxsegs * sizeof(struct rpcrdma_read_chunk); + size = maxsegs * rpcrdma_readchunk_maxsz * sizeof(__be32); /* Minimal Read chunk size */ size += sizeof(__be32); /* segment count */ - size += sizeof(struct rpcrdma_segment); + size += rpcrdma_segment_maxsz * sizeof(__be32); size += sizeof(__be32); /* list discriminator */ dprintk("RPC: %s: max call header size = %u\n", @@ -102,7 +103,7 @@ static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs) /* Maximum Write list size */ maxsegs += 2; /* segment for head and tail buffers */ size = sizeof(__be32); /* segment count */ - size += maxsegs * sizeof(struct rpcrdma_segment); + size += maxsegs * rpcrdma_segment_maxsz * sizeof(__be32); size += sizeof(__be32); /* list discriminator */ dprintk("RPC: %s: max reply header size = %u\n", @@ -511,27 +512,60 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, return 0; } -/* Prepare the RPC-over-RDMA header SGE. +/** + * rpcrdma_unmap_sendctx - DMA-unmap Send buffers + * @sc: sendctx containing SGEs to unmap + * + */ +void +rpcrdma_unmap_sendctx(struct rpcrdma_sendctx *sc) +{ + struct rpcrdma_ia *ia = &sc->sc_xprt->rx_ia; + struct ib_sge *sge; + unsigned int count; + + dprintk("RPC: %s: unmapping %u sges for sc=%p\n", + __func__, sc->sc_unmap_count, sc); + + /* The first two SGEs contain the transport header and + * the inline buffer. These are always left mapped so + * they can be cheaply re-used. + */ + sge = &sc->sc_sges[2]; + for (count = sc->sc_unmap_count; count; ++sge, --count) + ib_dma_unmap_page(ia->ri_device, + sge->addr, sge->length, DMA_TO_DEVICE); + + if (test_and_clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &sc->sc_req->rl_flags)) { + smp_mb__after_atomic(); + wake_up_bit(&sc->sc_req->rl_flags, RPCRDMA_REQ_F_TX_RESOURCES); + } +} + +/* Prepare an SGE for the RPC-over-RDMA transport header. */ static bool rpcrdma_prepare_hdr_sge(struct rpcrdma_ia *ia, struct rpcrdma_req *req, u32 len) { + struct rpcrdma_sendctx *sc = req->rl_sendctx; struct rpcrdma_regbuf *rb = req->rl_rdmabuf; - struct ib_sge *sge = &req->rl_send_sge[0]; + struct ib_sge *sge = sc->sc_sges; - if (unlikely(!rpcrdma_regbuf_is_mapped(rb))) { - if (!__rpcrdma_dma_map_regbuf(ia, rb)) - return false; - sge->addr = rdmab_addr(rb); - sge->lkey = rdmab_lkey(rb); - } + if (!rpcrdma_dma_map_regbuf(ia, rb)) + goto out_regbuf; + sge->addr = rdmab_addr(rb); sge->length = len; + sge->lkey = rdmab_lkey(rb); ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, sge->length, DMA_TO_DEVICE); - req->rl_send_wr.num_sge++; + sc->sc_wr.num_sge++; return true; + +out_regbuf: + pr_err("rpcrdma: failed to DMA map a Send buffer\n"); + return false; } /* Prepare the Send SGEs. The head and tail iovec, and each entry @@ -541,10 +575,11 @@ static bool rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req, struct xdr_buf *xdr, enum rpcrdma_chunktype rtype) { + struct rpcrdma_sendctx *sc = req->rl_sendctx; unsigned int sge_no, page_base, len, remaining; struct rpcrdma_regbuf *rb = req->rl_sendbuf; struct ib_device *device = ia->ri_device; - struct ib_sge *sge = req->rl_send_sge; + struct ib_sge *sge = sc->sc_sges; u32 lkey = ia->ri_pd->local_dma_lkey; struct page *page, **ppages; @@ -552,7 +587,7 @@ rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req, * DMA-mapped. Sync the content that has changed. */ if (!rpcrdma_dma_map_regbuf(ia, rb)) - return false; + goto out_regbuf; sge_no = 1; sge[sge_no].addr = rdmab_addr(rb); sge[sge_no].length = xdr->head[0].iov_len; @@ -607,7 +642,7 @@ rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req, sge[sge_no].length = len; sge[sge_no].lkey = lkey; - req->rl_mapped_sges++; + sc->sc_unmap_count++; ppages++; remaining -= len; page_base = 0; @@ -633,56 +668,61 @@ map_tail: goto out_mapping_err; sge[sge_no].length = len; sge[sge_no].lkey = lkey; - req->rl_mapped_sges++; + sc->sc_unmap_count++; } out: - req->rl_send_wr.num_sge = sge_no + 1; + sc->sc_wr.num_sge += sge_no; + if (sc->sc_unmap_count) + __set_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags); return true; +out_regbuf: + pr_err("rpcrdma: failed to DMA map a Send buffer\n"); + return false; + out_mapping_overflow: + rpcrdma_unmap_sendctx(sc); pr_err("rpcrdma: too many Send SGEs (%u)\n", sge_no); return false; out_mapping_err: + rpcrdma_unmap_sendctx(sc); pr_err("rpcrdma: Send mapping error\n"); return false; } -bool -rpcrdma_prepare_send_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req, - u32 hdrlen, struct xdr_buf *xdr, - enum rpcrdma_chunktype rtype) +/** + * rpcrdma_prepare_send_sges - Construct SGEs for a Send WR + * @r_xprt: controlling transport + * @req: context of RPC Call being marshalled + * @hdrlen: size of transport header, in bytes + * @xdr: xdr_buf containing RPC Call + * @rtype: chunk type being encoded + * + * Returns 0 on success; otherwise a negative errno is returned. + */ +int +rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, u32 hdrlen, + struct xdr_buf *xdr, enum rpcrdma_chunktype rtype) { - req->rl_send_wr.num_sge = 0; - req->rl_mapped_sges = 0; - - if (!rpcrdma_prepare_hdr_sge(ia, req, hdrlen)) - goto out_map; + req->rl_sendctx = rpcrdma_sendctx_get_locked(&r_xprt->rx_buf); + if (!req->rl_sendctx) + return -ENOBUFS; + req->rl_sendctx->sc_wr.num_sge = 0; + req->rl_sendctx->sc_unmap_count = 0; + req->rl_sendctx->sc_req = req; + __clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags); + + if (!rpcrdma_prepare_hdr_sge(&r_xprt->rx_ia, req, hdrlen)) + return -EIO; if (rtype != rpcrdma_areadch) - if (!rpcrdma_prepare_msg_sges(ia, req, xdr, rtype)) - goto out_map; - - return true; - -out_map: - pr_err("rpcrdma: failed to DMA map a Send buffer\n"); - return false; -} - -void -rpcrdma_unmap_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req) -{ - struct ib_device *device = ia->ri_device; - struct ib_sge *sge; - int count; + if (!rpcrdma_prepare_msg_sges(&r_xprt->rx_ia, req, xdr, rtype)) + return -EIO; - sge = &req->rl_send_sge[2]; - for (count = req->rl_mapped_sges; count--; sge++) - ib_dma_unmap_page(device, sge->addr, sge->length, - DMA_TO_DEVICE); - req->rl_mapped_sges = 0; + return 0; } /** @@ -833,12 +873,10 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) transfertypes[rtype], transfertypes[wtype], xdr_stream_pos(xdr)); - if (!rpcrdma_prepare_send_sges(&r_xprt->rx_ia, req, - xdr_stream_pos(xdr), - &rqst->rq_snd_buf, rtype)) { - ret = -EIO; + ret = rpcrdma_prepare_send_sges(r_xprt, req, xdr_stream_pos(xdr), + &rqst->rq_snd_buf, rtype); + if (ret) goto out_err; - } return 0; out_err: @@ -970,14 +1008,13 @@ rpcrdma_mark_remote_invalidation(struct list_head *mws, * straightforward to check the RPC header's direction field. */ static bool -rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep, - __be32 xid, __be32 proc) +rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep) #if defined(CONFIG_SUNRPC_BACKCHANNEL) { struct xdr_stream *xdr = &rep->rr_stream; __be32 *p; - if (proc != rdma_msg) + if (rep->rr_proc != rdma_msg) return false; /* Peek at stream contents without advancing. */ @@ -992,7 +1029,7 @@ rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep, return false; /* RPC header */ - if (*p++ != xid) + if (*p++ != rep->rr_xid) return false; if (*p != cpu_to_be32(RPC_CALL)) return false; @@ -1212,105 +1249,170 @@ rpcrdma_decode_error(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep, return -EREMOTEIO; } +/* Perform XID lookup, reconstruction of the RPC reply, and + * RPC completion while holding the transport lock to ensure + * the rep, rqst, and rq_task pointers remain stable. + */ +void rpcrdma_complete_rqst(struct rpcrdma_rep *rep) +{ + struct rpcrdma_xprt *r_xprt = rep->rr_rxprt; + struct rpc_xprt *xprt = &r_xprt->rx_xprt; + struct rpc_rqst *rqst = rep->rr_rqst; + unsigned long cwnd; + int status; + + xprt->reestablish_timeout = 0; + + switch (rep->rr_proc) { + case rdma_msg: + status = rpcrdma_decode_msg(r_xprt, rep, rqst); + break; + case rdma_nomsg: + status = rpcrdma_decode_nomsg(r_xprt, rep); + break; + case rdma_error: + status = rpcrdma_decode_error(r_xprt, rep, rqst); + break; + default: + status = -EIO; + } + if (status < 0) + goto out_badheader; + +out: + spin_lock(&xprt->recv_lock); + cwnd = xprt->cwnd; + xprt->cwnd = r_xprt->rx_buf.rb_credits << RPC_CWNDSHIFT; + if (xprt->cwnd > cwnd) + xprt_release_rqst_cong(rqst->rq_task); + + xprt_complete_rqst(rqst->rq_task, status); + xprt_unpin_rqst(rqst); + spin_unlock(&xprt->recv_lock); + return; + +/* If the incoming reply terminated a pending RPC, the next + * RPC call will post a replacement receive buffer as it is + * being marshaled. + */ +out_badheader: + dprintk("RPC: %5u %s: invalid rpcrdma reply (type %u)\n", + rqst->rq_task->tk_pid, __func__, be32_to_cpu(rep->rr_proc)); + r_xprt->rx_stats.bad_reply_count++; + status = -EIO; + goto out; +} + +void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) +{ + /* Invalidate and unmap the data payloads before waking + * the waiting application. This guarantees the memory + * regions are properly fenced from the server before the + * application accesses the data. It also ensures proper + * send flow control: waking the next RPC waits until this + * RPC has relinquished all its Send Queue entries. + */ + if (!list_empty(&req->rl_registered)) + r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, + &req->rl_registered); + + /* Ensure that any DMA mapped pages associated with + * the Send of the RPC Call have been unmapped before + * allowing the RPC to complete. This protects argument + * memory not controlled by the RPC client from being + * re-used before we're done with it. + */ + if (test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) { + r_xprt->rx_stats.reply_waits_for_send++; + out_of_line_wait_on_bit(&req->rl_flags, + RPCRDMA_REQ_F_TX_RESOURCES, + bit_wait, + TASK_UNINTERRUPTIBLE); + } +} + +/* Reply handling runs in the poll worker thread. Anything that + * might wait is deferred to a separate workqueue. + */ +void rpcrdma_deferred_completion(struct work_struct *work) +{ + struct rpcrdma_rep *rep = + container_of(work, struct rpcrdma_rep, rr_work); + struct rpcrdma_req *req = rpcr_to_rdmar(rep->rr_rqst); + + rpcrdma_mark_remote_invalidation(&req->rl_registered, rep); + rpcrdma_release_rqst(rep->rr_rxprt, req); + rpcrdma_complete_rqst(rep); +} + /* Process received RPC/RDMA messages. * * Errors must result in the RPC task either being awakened, or * allowed to timeout, to discover the errors at that time. */ -void -rpcrdma_reply_handler(struct work_struct *work) +void rpcrdma_reply_handler(struct rpcrdma_rep *rep) { - struct rpcrdma_rep *rep = - container_of(work, struct rpcrdma_rep, rr_work); struct rpcrdma_xprt *r_xprt = rep->rr_rxprt; struct rpc_xprt *xprt = &r_xprt->rx_xprt; - struct xdr_stream *xdr = &rep->rr_stream; + struct rpcrdma_buffer *buf = &r_xprt->rx_buf; struct rpcrdma_req *req; struct rpc_rqst *rqst; - __be32 *p, xid, vers, proc; - unsigned long cwnd; - int status; + u32 credits; + __be32 *p; dprintk("RPC: %s: incoming rep %p\n", __func__, rep); if (rep->rr_hdrbuf.head[0].iov_len == 0) goto out_badstatus; - xdr_init_decode(xdr, &rep->rr_hdrbuf, + xdr_init_decode(&rep->rr_stream, &rep->rr_hdrbuf, rep->rr_hdrbuf.head[0].iov_base); /* Fixed transport header fields */ - p = xdr_inline_decode(xdr, 4 * sizeof(*p)); + p = xdr_inline_decode(&rep->rr_stream, 4 * sizeof(*p)); if (unlikely(!p)) goto out_shortreply; - xid = *p++; - vers = *p++; - p++; /* credits */ - proc = *p++; + rep->rr_xid = *p++; + rep->rr_vers = *p++; + credits = be32_to_cpu(*p++); + rep->rr_proc = *p++; + + if (rep->rr_vers != rpcrdma_version) + goto out_badversion; - if (rpcrdma_is_bcall(r_xprt, rep, xid, proc)) + if (rpcrdma_is_bcall(r_xprt, rep)) return; /* Match incoming rpcrdma_rep to an rpcrdma_req to * get context for handling any incoming chunks. */ spin_lock(&xprt->recv_lock); - rqst = xprt_lookup_rqst(xprt, xid); + rqst = xprt_lookup_rqst(xprt, rep->rr_xid); if (!rqst) goto out_norqst; xprt_pin_rqst(rqst); + + if (credits == 0) + credits = 1; /* don't deadlock */ + else if (credits > buf->rb_max_requests) + credits = buf->rb_max_requests; + buf->rb_credits = credits; + spin_unlock(&xprt->recv_lock); + req = rpcr_to_rdmar(rqst); req->rl_reply = rep; + rep->rr_rqst = rqst; + clear_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags); dprintk("RPC: %s: reply %p completes request %p (xid 0x%08x)\n", - __func__, rep, req, be32_to_cpu(xid)); - - /* Invalidate and unmap the data payloads before waking the - * waiting application. This guarantees the memory regions - * are properly fenced from the server before the application - * accesses the data. It also ensures proper send flow control: - * waking the next RPC waits until this RPC has relinquished - * all its Send Queue entries. - */ - if (!list_empty(&req->rl_registered)) { - rpcrdma_mark_remote_invalidation(&req->rl_registered, rep); - r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, - &req->rl_registered); - } - - xprt->reestablish_timeout = 0; - if (vers != rpcrdma_version) - goto out_badversion; + __func__, rep, req, be32_to_cpu(rep->rr_xid)); - switch (proc) { - case rdma_msg: - status = rpcrdma_decode_msg(r_xprt, rep, rqst); - break; - case rdma_nomsg: - status = rpcrdma_decode_nomsg(r_xprt, rep); - break; - case rdma_error: - status = rpcrdma_decode_error(r_xprt, rep, rqst); - break; - default: - status = -EIO; - } - if (status < 0) - goto out_badheader; - -out: - spin_lock(&xprt->recv_lock); - cwnd = xprt->cwnd; - xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT; - if (xprt->cwnd > cwnd) - xprt_release_rqst_cong(rqst->rq_task); - - xprt_complete_rqst(rqst->rq_task, status); - xprt_unpin_rqst(rqst); - spin_unlock(&xprt->recv_lock); - dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n", - __func__, xprt, rqst, status); + if (list_empty(&req->rl_registered) && + !test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) + rpcrdma_complete_rqst(rep); + else + queue_work(rpcrdma_receive_wq, &rep->rr_work); return; out_badstatus: @@ -1321,37 +1423,22 @@ out_badstatus: } return; -/* If the incoming reply terminated a pending RPC, the next - * RPC call will post a replacement receive buffer as it is - * being marshaled. - */ out_badversion: dprintk("RPC: %s: invalid version %d\n", - __func__, be32_to_cpu(vers)); - status = -EIO; - r_xprt->rx_stats.bad_reply_count++; - goto out; - -out_badheader: - dprintk("RPC: %5u %s: invalid rpcrdma reply (type %u)\n", - rqst->rq_task->tk_pid, __func__, be32_to_cpu(proc)); - r_xprt->rx_stats.bad_reply_count++; - status = -EIO; - goto out; + __func__, be32_to_cpu(rep->rr_vers)); + goto repost; -/* The req was still available, but by the time the recv_lock - * was acquired, the rqst and task had been released. Thus the RPC - * has already been terminated. +/* The RPC transaction has already been terminated, or the header + * is corrupt. */ out_norqst: spin_unlock(&xprt->recv_lock); dprintk("RPC: %s: no match for incoming xid 0x%08x\n", - __func__, be32_to_cpu(xid)); + __func__, be32_to_cpu(rep->rr_xid)); goto repost; out_shortreply: dprintk("RPC: %s: short/invalid reply\n", __func__); - goto repost; /* If no pending RPC transaction was matched, post a replacement * receive buffer before returning. diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index c84e2b644e13..646c24494ea7 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -1,4 +1,5 @@ /* + * Copyright (c) 2014-2017 Oracle. All rights reserved. * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. * * This software is available to you under a choice of one of two @@ -678,16 +679,14 @@ xprt_rdma_free(struct rpc_task *task) struct rpc_rqst *rqst = task->tk_rqstp; struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); struct rpcrdma_req *req = rpcr_to_rdmar(rqst); - struct rpcrdma_ia *ia = &r_xprt->rx_ia; - if (req->rl_backchannel) + if (test_bit(RPCRDMA_REQ_F_BACKCHANNEL, &req->rl_flags)) return; dprintk("RPC: %s: called on 0x%p\n", __func__, req->rl_reply); - if (!list_empty(&req->rl_registered)) - ia->ri_ops->ro_unmap_safe(r_xprt, req, !RPC_IS_ASYNC(task)); - rpcrdma_unmap_sges(ia, req); + if (test_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags)) + rpcrdma_release_rqst(r_xprt, req); rpcrdma_buffer_put(req); } @@ -728,7 +727,8 @@ xprt_rdma_send_request(struct rpc_task *task) /* On retransmit, remove any previously registered chunks */ if (unlikely(!list_empty(&req->rl_registered))) - r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, false); + r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, + &req->rl_registered); rc = rpcrdma_marshal_req(r_xprt, rqst); if (rc < 0) @@ -742,6 +742,7 @@ xprt_rdma_send_request(struct rpc_task *task) goto drop_connection; req->rl_connect_cookie = xprt->connect_cookie; + set_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags); if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req)) goto drop_connection; @@ -789,11 +790,13 @@ void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) r_xprt->rx_stats.failed_marshal_count, r_xprt->rx_stats.bad_reply_count, r_xprt->rx_stats.nomsg_call_count); - seq_printf(seq, "%lu %lu %lu %lu\n", + seq_printf(seq, "%lu %lu %lu %lu %lu %lu\n", r_xprt->rx_stats.mrs_recovered, r_xprt->rx_stats.mrs_orphaned, r_xprt->rx_stats.mrs_allocated, - r_xprt->rx_stats.local_inv_needed); + r_xprt->rx_stats.local_inv_needed, + r_xprt->rx_stats.empty_sendctx_q, + r_xprt->rx_stats.reply_waits_for_send); } static int diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index 11a1fbf7e59e..710b3f77db82 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -1,4 +1,5 @@ /* + * Copyright (c) 2014-2017 Oracle. All rights reserved. * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. * * This software is available to you under a choice of one of two @@ -49,9 +50,10 @@ #include <linux/interrupt.h> #include <linux/slab.h> -#include <linux/prefetch.h> #include <linux/sunrpc/addr.h> #include <linux/sunrpc/svc_rdma.h> + +#include <asm-generic/barrier.h> #include <asm/bitops.h> #include <rdma/ib_cm.h> @@ -73,7 +75,7 @@ static void rpcrdma_create_mrs(struct rpcrdma_xprt *r_xprt); static void rpcrdma_destroy_mrs(struct rpcrdma_buffer *buf); static void rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb); -static struct workqueue_struct *rpcrdma_receive_wq __read_mostly; +struct workqueue_struct *rpcrdma_receive_wq __read_mostly; int rpcrdma_alloc_wq(void) @@ -126,30 +128,17 @@ rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context) static void rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) { + struct ib_cqe *cqe = wc->wr_cqe; + struct rpcrdma_sendctx *sc = + container_of(cqe, struct rpcrdma_sendctx, sc_cqe); + /* WARNING: Only wr_cqe and status are reliable at this point */ if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR) pr_err("rpcrdma: Send: %s (%u/0x%x)\n", ib_wc_status_msg(wc->status), wc->status, wc->vendor_err); -} - -/* Perform basic sanity checking to avoid using garbage - * to update the credit grant value. - */ -static void -rpcrdma_update_granted_credits(struct rpcrdma_rep *rep) -{ - struct rpcrdma_buffer *buffer = &rep->rr_rxprt->rx_buf; - __be32 *p = rep->rr_rdmabuf->rg_base; - u32 credits; - credits = be32_to_cpup(p + 2); - if (credits == 0) - credits = 1; /* don't deadlock */ - else if (credits > buffer->rb_max_requests) - credits = buffer->rb_max_requests; - - atomic_set(&buffer->rb_credits, credits); + rpcrdma_sendctx_put_locked(sc); } /** @@ -181,11 +170,8 @@ rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc) rdmab_addr(rep->rr_rdmabuf), wc->byte_len, DMA_FROM_DEVICE); - if (wc->byte_len >= RPCRDMA_HDRLEN_ERR) - rpcrdma_update_granted_credits(rep); - out_schedule: - queue_work(rpcrdma_receive_wq, &rep->rr_work); + rpcrdma_reply_handler(rep); return; out_fail: @@ -295,7 +281,7 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event) case RDMA_CM_EVENT_DISCONNECTED: connstate = -ECONNABORTED; connected: - atomic_set(&xprt->rx_buf.rb_credits, 1); + xprt->rx_buf.rb_credits = 1; ep->rep_connected = connstate; rpcrdma_conn_func(ep); wake_up_all(&ep->rep_connect_wait); @@ -564,16 +550,15 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, ep->rep_attr.cap.max_recv_sge); /* set trigger for requesting send completion */ - ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1; - if (ep->rep_cqinit <= 2) - ep->rep_cqinit = 0; /* always signal? */ - rpcrdma_init_cqcount(ep, 0); + ep->rep_send_batch = min_t(unsigned int, RPCRDMA_MAX_SEND_BATCH, + cdata->max_requests >> 2); + ep->rep_send_count = ep->rep_send_batch; init_waitqueue_head(&ep->rep_connect_wait); INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker); sendcq = ib_alloc_cq(ia->ri_device, NULL, ep->rep_attr.cap.max_send_wr + 1, - 0, IB_POLL_SOFTIRQ); + 1, IB_POLL_WORKQUEUE); if (IS_ERR(sendcq)) { rc = PTR_ERR(sendcq); dprintk("RPC: %s: failed to create send CQ: %i\n", @@ -583,7 +568,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, recvcq = ib_alloc_cq(ia->ri_device, NULL, ep->rep_attr.cap.max_recv_wr + 1, - 0, IB_POLL_SOFTIRQ); + 0, IB_POLL_WORKQUEUE); if (IS_ERR(recvcq)) { rc = PTR_ERR(recvcq); dprintk("RPC: %s: failed to create recv CQ: %i\n", @@ -846,6 +831,168 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) ib_drain_qp(ia->ri_id->qp); } +/* Fixed-size circular FIFO queue. This implementation is wait-free and + * lock-free. + * + * Consumer is the code path that posts Sends. This path dequeues a + * sendctx for use by a Send operation. Multiple consumer threads + * are serialized by the RPC transport lock, which allows only one + * ->send_request call at a time. + * + * Producer is the code path that handles Send completions. This path + * enqueues a sendctx that has been completed. Multiple producer + * threads are serialized by the ib_poll_cq() function. + */ + +/* rpcrdma_sendctxs_destroy() assumes caller has already quiesced + * queue activity, and ib_drain_qp has flushed all remaining Send + * requests. + */ +static void rpcrdma_sendctxs_destroy(struct rpcrdma_buffer *buf) +{ + unsigned long i; + + for (i = 0; i <= buf->rb_sc_last; i++) + kfree(buf->rb_sc_ctxs[i]); + kfree(buf->rb_sc_ctxs); +} + +static struct rpcrdma_sendctx *rpcrdma_sendctx_create(struct rpcrdma_ia *ia) +{ + struct rpcrdma_sendctx *sc; + + sc = kzalloc(sizeof(*sc) + + ia->ri_max_send_sges * sizeof(struct ib_sge), + GFP_KERNEL); + if (!sc) + return NULL; + + sc->sc_wr.wr_cqe = &sc->sc_cqe; + sc->sc_wr.sg_list = sc->sc_sges; + sc->sc_wr.opcode = IB_WR_SEND; + sc->sc_cqe.done = rpcrdma_wc_send; + return sc; +} + +static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt) +{ + struct rpcrdma_buffer *buf = &r_xprt->rx_buf; + struct rpcrdma_sendctx *sc; + unsigned long i; + + /* Maximum number of concurrent outstanding Send WRs. Capping + * the circular queue size stops Send Queue overflow by causing + * the ->send_request call to fail temporarily before too many + * Sends are posted. + */ + i = buf->rb_max_requests + RPCRDMA_MAX_BC_REQUESTS; + dprintk("RPC: %s: allocating %lu send_ctxs\n", __func__, i); + buf->rb_sc_ctxs = kcalloc(i, sizeof(sc), GFP_KERNEL); + if (!buf->rb_sc_ctxs) + return -ENOMEM; + + buf->rb_sc_last = i - 1; + for (i = 0; i <= buf->rb_sc_last; i++) { + sc = rpcrdma_sendctx_create(&r_xprt->rx_ia); + if (!sc) + goto out_destroy; + + sc->sc_xprt = r_xprt; + buf->rb_sc_ctxs[i] = sc; + } + + return 0; + +out_destroy: + rpcrdma_sendctxs_destroy(buf); + return -ENOMEM; +} + +/* The sendctx queue is not guaranteed to have a size that is a + * power of two, thus the helpers in circ_buf.h cannot be used. + * The other option is to use modulus (%), which can be expensive. + */ +static unsigned long rpcrdma_sendctx_next(struct rpcrdma_buffer *buf, + unsigned long item) +{ + return likely(item < buf->rb_sc_last) ? item + 1 : 0; +} + +/** + * rpcrdma_sendctx_get_locked - Acquire a send context + * @buf: transport buffers from which to acquire an unused context + * + * Returns pointer to a free send completion context; or NULL if + * the queue is empty. + * + * Usage: Called to acquire an SGE array before preparing a Send WR. + * + * The caller serializes calls to this function (per rpcrdma_buffer), + * and provides an effective memory barrier that flushes the new value + * of rb_sc_head. + */ +struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_buffer *buf) +{ + struct rpcrdma_xprt *r_xprt; + struct rpcrdma_sendctx *sc; + unsigned long next_head; + + next_head = rpcrdma_sendctx_next(buf, buf->rb_sc_head); + + if (next_head == READ_ONCE(buf->rb_sc_tail)) + goto out_emptyq; + + /* ORDER: item must be accessed _before_ head is updated */ + sc = buf->rb_sc_ctxs[next_head]; + + /* Releasing the lock in the caller acts as a memory + * barrier that flushes rb_sc_head. + */ + buf->rb_sc_head = next_head; + + return sc; + +out_emptyq: + /* The queue is "empty" if there have not been enough Send + * completions recently. This is a sign the Send Queue is + * backing up. Cause the caller to pause and try again. + */ + dprintk("RPC: %s: empty sendctx queue\n", __func__); + r_xprt = container_of(buf, struct rpcrdma_xprt, rx_buf); + r_xprt->rx_stats.empty_sendctx_q++; + return NULL; +} + +/** + * rpcrdma_sendctx_put_locked - Release a send context + * @sc: send context to release + * + * Usage: Called from Send completion to return a sendctxt + * to the queue. + * + * The caller serializes calls to this function (per rpcrdma_buffer). + */ +void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc) +{ + struct rpcrdma_buffer *buf = &sc->sc_xprt->rx_buf; + unsigned long next_tail; + + /* Unmap SGEs of previously completed by unsignaled + * Sends by walking up the queue until @sc is found. + */ + next_tail = buf->rb_sc_tail; + do { + next_tail = rpcrdma_sendctx_next(buf, next_tail); + + /* ORDER: item must be accessed _before_ tail is updated */ + rpcrdma_unmap_sendctx(buf->rb_sc_ctxs[next_tail]); + + } while (buf->rb_sc_ctxs[next_tail] != sc); + + /* Paired with READ_ONCE */ + smp_store_release(&buf->rb_sc_tail, next_tail); +} + static void rpcrdma_mr_recovery_worker(struct work_struct *work) { @@ -941,13 +1088,8 @@ rpcrdma_create_req(struct rpcrdma_xprt *r_xprt) spin_lock(&buffer->rb_reqslock); list_add(&req->rl_all, &buffer->rb_allreqs); spin_unlock(&buffer->rb_reqslock); - req->rl_cqe.done = rpcrdma_wc_send; req->rl_buffer = &r_xprt->rx_buf; INIT_LIST_HEAD(&req->rl_registered); - req->rl_send_wr.next = NULL; - req->rl_send_wr.wr_cqe = &req->rl_cqe; - req->rl_send_wr.sg_list = req->rl_send_sge; - req->rl_send_wr.opcode = IB_WR_SEND; return req; } @@ -974,7 +1116,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt) rep->rr_cqe.done = rpcrdma_wc_receive; rep->rr_rxprt = r_xprt; - INIT_WORK(&rep->rr_work, rpcrdma_reply_handler); + INIT_WORK(&rep->rr_work, rpcrdma_deferred_completion); rep->rr_recv_wr.next = NULL; rep->rr_recv_wr.wr_cqe = &rep->rr_cqe; rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov; @@ -995,7 +1137,6 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) buf->rb_max_requests = r_xprt->rx_data.max_requests; buf->rb_bc_srv_max_requests = 0; - atomic_set(&buf->rb_credits, 1); spin_lock_init(&buf->rb_mwlock); spin_lock_init(&buf->rb_lock); spin_lock_init(&buf->rb_recovery_lock); @@ -1022,7 +1163,6 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) rc = PTR_ERR(req); goto out; } - req->rl_backchannel = false; list_add(&req->rl_list, &buf->rb_send_bufs); } @@ -1040,6 +1180,10 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) list_add(&rep->rr_list, &buf->rb_recv_bufs); } + rc = rpcrdma_sendctxs_create(r_xprt); + if (rc) + goto out; + return 0; out: rpcrdma_buffer_destroy(buf); @@ -1116,6 +1260,8 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) cancel_delayed_work_sync(&buf->rb_recovery_worker); cancel_delayed_work_sync(&buf->rb_refresh_worker); + rpcrdma_sendctxs_destroy(buf); + while (!list_empty(&buf->rb_recv_bufs)) { struct rpcrdma_rep *rep; @@ -1231,7 +1377,6 @@ rpcrdma_buffer_put(struct rpcrdma_req *req) struct rpcrdma_buffer *buffers = req->rl_buffer; struct rpcrdma_rep *rep = req->rl_reply; - req->rl_send_wr.num_sge = 0; req->rl_reply = NULL; spin_lock(&buffers->rb_lock); @@ -1363,7 +1508,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep, struct rpcrdma_req *req) { - struct ib_send_wr *send_wr = &req->rl_send_wr; + struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr; struct ib_send_wr *send_wr_fail; int rc; @@ -1377,7 +1522,14 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia, dprintk("RPC: %s: posting %d s/g entries\n", __func__, send_wr->num_sge); - rpcrdma_set_signaled(ep, send_wr); + if (!ep->rep_send_count || + test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) { + send_wr->send_flags |= IB_SEND_SIGNALED; + ep->rep_send_count = ep->rep_send_batch; + } else { + send_wr->send_flags &= ~IB_SEND_SIGNALED; + --ep->rep_send_count; + } rc = ib_post_send(ia->ri_id->qp, send_wr, &send_wr_fail); if (rc) goto out_postsend_err; diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index e26a97d2f922..51686d9eac5f 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -1,4 +1,5 @@ /* + * Copyright (c) 2014-2017 Oracle. All rights reserved. * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. * * This software is available to you under a choice of one of two @@ -93,8 +94,8 @@ enum { */ struct rpcrdma_ep { - atomic_t rep_cqcount; - int rep_cqinit; + unsigned int rep_send_count; + unsigned int rep_send_batch; int rep_connected; struct ib_qp_init_attr rep_attr; wait_queue_head_t rep_connect_wait; @@ -104,25 +105,6 @@ struct rpcrdma_ep { struct delayed_work rep_connect_worker; }; -static inline void -rpcrdma_init_cqcount(struct rpcrdma_ep *ep, int count) -{ - atomic_set(&ep->rep_cqcount, ep->rep_cqinit - count); -} - -/* To update send queue accounting, provider must take a - * send completion every now and then. - */ -static inline void -rpcrdma_set_signaled(struct rpcrdma_ep *ep, struct ib_send_wr *send_wr) -{ - send_wr->send_flags = 0; - if (unlikely(atomic_sub_return(1, &ep->rep_cqcount) <= 0)) { - rpcrdma_init_cqcount(ep, 0); - send_wr->send_flags = IB_SEND_SIGNALED; - } -} - /* Pre-allocate extra Work Requests for handling backward receives * and sends. This is a fixed value because the Work Queues are * allocated when the forward channel is set up. @@ -164,12 +146,6 @@ rdmab_lkey(struct rpcrdma_regbuf *rb) return rb->rg_iov.lkey; } -static inline struct rpcrdma_msg * -rdmab_to_msg(struct rpcrdma_regbuf *rb) -{ - return (struct rpcrdma_msg *)rb->rg_base; -} - static inline struct ib_device * rdmab_device(struct rpcrdma_regbuf *rb) { @@ -202,22 +178,24 @@ enum { }; /* - * struct rpcrdma_rep -- this structure encapsulates state required to recv - * and complete a reply, asychronously. It needs several pieces of - * state: - * o recv buffer (posted to provider) - * o ib_sge (also donated to provider) - * o status of reply (length, success or not) - * o bookkeeping state to get run by reply handler (list, etc) + * struct rpcrdma_rep -- this structure encapsulates state required + * to receive and complete an RPC Reply, asychronously. It needs + * several pieces of state: * - * These are allocated during initialization, per-transport instance. + * o receive buffer and ib_sge (donated to provider) + * o status of receive (success or not, length, inv rkey) + * o bookkeeping state to get run by reply handler (XDR stream) * - * N of these are associated with a transport instance, and stored in - * struct rpcrdma_buffer. N is the max number of outstanding requests. + * These structures are allocated during transport initialization. + * N of these are associated with a transport instance, managed by + * struct rpcrdma_buffer. N is the max number of outstanding RPCs. */ struct rpcrdma_rep { struct ib_cqe rr_cqe; + __be32 rr_xid; + __be32 rr_vers; + __be32 rr_proc; int rr_wc_flags; u32 rr_inv_rkey; struct rpcrdma_regbuf *rr_rdmabuf; @@ -225,10 +203,34 @@ struct rpcrdma_rep { struct work_struct rr_work; struct xdr_buf rr_hdrbuf; struct xdr_stream rr_stream; + struct rpc_rqst *rr_rqst; struct list_head rr_list; struct ib_recv_wr rr_recv_wr; }; +/* struct rpcrdma_sendctx - DMA mapped SGEs to unmap after Send completes + */ +struct rpcrdma_req; +struct rpcrdma_xprt; +struct rpcrdma_sendctx { + struct ib_send_wr sc_wr; + struct ib_cqe sc_cqe; + struct rpcrdma_xprt *sc_xprt; + struct rpcrdma_req *sc_req; + unsigned int sc_unmap_count; + struct ib_sge sc_sges[]; +}; + +/* Limit the number of SGEs that can be unmapped during one + * Send completion. This caps the amount of work a single + * completion can do before returning to the provider. + * + * Setting this to zero disables Send completion batching. + */ +enum { + RPCRDMA_MAX_SEND_BATCH = 7, +}; + /* * struct rpcrdma_mw - external memory region metadata * @@ -340,26 +342,30 @@ enum { struct rpcrdma_buffer; struct rpcrdma_req { struct list_head rl_list; - unsigned int rl_mapped_sges; unsigned int rl_connect_cookie; struct rpcrdma_buffer *rl_buffer; struct rpcrdma_rep *rl_reply; struct xdr_stream rl_stream; struct xdr_buf rl_hdrbuf; - struct ib_send_wr rl_send_wr; - struct ib_sge rl_send_sge[RPCRDMA_MAX_SEND_SGES]; + struct rpcrdma_sendctx *rl_sendctx; struct rpcrdma_regbuf *rl_rdmabuf; /* xprt header */ struct rpcrdma_regbuf *rl_sendbuf; /* rq_snd_buf */ struct rpcrdma_regbuf *rl_recvbuf; /* rq_rcv_buf */ - struct ib_cqe rl_cqe; struct list_head rl_all; - bool rl_backchannel; + unsigned long rl_flags; struct list_head rl_registered; /* registered segments */ struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS]; }; +/* rl_flags */ +enum { + RPCRDMA_REQ_F_BACKCHANNEL = 0, + RPCRDMA_REQ_F_PENDING, + RPCRDMA_REQ_F_TX_RESOURCES, +}; + static inline void rpcrdma_set_xprtdata(struct rpc_rqst *rqst, struct rpcrdma_req *req) { @@ -399,12 +405,17 @@ struct rpcrdma_buffer { struct list_head rb_mws; struct list_head rb_all; + unsigned long rb_sc_head; + unsigned long rb_sc_tail; + unsigned long rb_sc_last; + struct rpcrdma_sendctx **rb_sc_ctxs; + spinlock_t rb_lock; /* protect buf lists */ int rb_send_count, rb_recv_count; struct list_head rb_send_bufs; struct list_head rb_recv_bufs; u32 rb_max_requests; - atomic_t rb_credits; /* most recent credit grant */ + u32 rb_credits; /* most recent credit grant */ u32 rb_bc_srv_max_requests; spinlock_t rb_reqslock; /* protect rb_allreqs */ @@ -453,10 +464,12 @@ struct rpcrdma_stats { unsigned long mrs_recovered; unsigned long mrs_orphaned; unsigned long mrs_allocated; + unsigned long empty_sendctx_q; /* accessed when receiving a reply */ unsigned long long total_rdma_reply; unsigned long long fixup_copy_count; + unsigned long reply_waits_for_send; unsigned long local_inv_needed; unsigned long nomsg_call_count; unsigned long bcall_count; @@ -473,8 +486,6 @@ struct rpcrdma_memreg_ops { struct rpcrdma_mw **); void (*ro_unmap_sync)(struct rpcrdma_xprt *, struct list_head *); - void (*ro_unmap_safe)(struct rpcrdma_xprt *, - struct rpcrdma_req *, bool); void (*ro_recover_mr)(struct rpcrdma_mw *); int (*ro_open)(struct rpcrdma_ia *, struct rpcrdma_ep *, @@ -532,6 +543,8 @@ void rpcrdma_ia_close(struct rpcrdma_ia *); bool frwr_is_supported(struct rpcrdma_ia *); bool fmr_is_supported(struct rpcrdma_ia *); +extern struct workqueue_struct *rpcrdma_receive_wq; + /* * Endpoint calls - xprtrdma/verbs.c */ @@ -554,6 +567,8 @@ struct rpcrdma_rep *rpcrdma_create_rep(struct rpcrdma_xprt *); void rpcrdma_destroy_req(struct rpcrdma_req *); int rpcrdma_buffer_create(struct rpcrdma_xprt *); void rpcrdma_buffer_destroy(struct rpcrdma_buffer *); +struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_buffer *buf); +void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc); struct rpcrdma_mw *rpcrdma_get_mw(struct rpcrdma_xprt *); void rpcrdma_put_mw(struct rpcrdma_xprt *, struct rpcrdma_mw *); @@ -610,12 +625,18 @@ enum rpcrdma_chunktype { rpcrdma_replych }; -bool rpcrdma_prepare_send_sges(struct rpcrdma_ia *, struct rpcrdma_req *, - u32, struct xdr_buf *, enum rpcrdma_chunktype); -void rpcrdma_unmap_sges(struct rpcrdma_ia *, struct rpcrdma_req *); +int rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, u32 hdrlen, + struct xdr_buf *xdr, + enum rpcrdma_chunktype rtype); +void rpcrdma_unmap_sendctx(struct rpcrdma_sendctx *sc); int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst); void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *); -void rpcrdma_reply_handler(struct work_struct *work); +void rpcrdma_complete_rqst(struct rpcrdma_rep *rep); +void rpcrdma_reply_handler(struct rpcrdma_rep *rep); +void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req); +void rpcrdma_deferred_completion(struct work_struct *work); static inline void rpcrdma_set_xdrlen(struct xdr_buf *xdr, size_t len) { |