summaryrefslogtreecommitdiff
path: root/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
diff options
context:
space:
mode:
authorChuck Lever <chuck.lever@oracle.com>2018-05-07 15:27:21 -0400
committerJ. Bruce Fields <bfields@redhat.com>2018-05-11 15:48:57 -0400
commitecf85b2384ea5f7cb0577bf6143bc46d9ecfe4d3 (patch)
tree6f0ca0a83d39f3c6b82bd0df41896cfb315645f9 /net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
parentbd2abef33394dc16d63580c38c01420db991f0f2 (diff)
downloadlwn-ecf85b2384ea5f7cb0577bf6143bc46d9ecfe4d3.tar.gz
lwn-ecf85b2384ea5f7cb0577bf6143bc46d9ecfe4d3.zip
svcrdma: Introduce svc_rdma_recv_ctxt
svc_rdma_op_ctxt's are pre-allocated and maintained on a per-xprt free list. This eliminates the overhead of calling kmalloc / kfree, both of which grab a globally shared lock that disables interrupts. To reduce contention further, separate the use of these objects in the Receive and Send paths in svcrdma. Subsequent patches will take advantage of this separation by allocating real resources which are then cached in these objects. The allocations are freed when the transport is torn down. I've renamed the structure so that static type checking can be used to ensure that uses of op_ctxt and recv_ctxt are not confused. As an additional clean up, structure fields are renamed to conform with kernel coding conventions. As a final clean up, helpers related to recv_ctxt are moved closer to the functions that use them. Signed-off-by: Chuck Lever <chuck.lever@oracle.com> Signed-off-by: J. Bruce Fields <bfields@redhat.com>
Diffstat (limited to 'net/sunrpc/xprtrdma/svc_rdma_recvfrom.c')
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_recvfrom.c318
1 files changed, 275 insertions, 43 deletions
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index 330d542fd96e..b7d9c55ee896 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -1,6 +1,6 @@
// SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
/*
- * Copyright (c) 2016, 2017 Oracle. All rights reserved.
+ * Copyright (c) 2016-2018 Oracle. All rights reserved.
* Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
* Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
*
@@ -61,7 +61,7 @@
* svc_rdma_recvfrom must post RDMA Reads to pull the RPC Call's
* data payload from the client. svc_rdma_recvfrom sets up the
* RDMA Reads using pages in svc_rqst::rq_pages, which are
- * transferred to an svc_rdma_op_ctxt for the duration of the
+ * transferred to an svc_rdma_recv_ctxt for the duration of the
* I/O. svc_rdma_recvfrom then returns zero, since the RPC message
* is still not yet ready.
*
@@ -70,18 +70,18 @@
* svc_rdma_recvfrom again. This second call may use a different
* svc_rqst than the first one, thus any information that needs
* to be preserved across these two calls is kept in an
- * svc_rdma_op_ctxt.
+ * svc_rdma_recv_ctxt.
*
* The second call to svc_rdma_recvfrom performs final assembly
* of the RPC Call message, using the RDMA Read sink pages kept in
- * the svc_rdma_op_ctxt. The xdr_buf is copied from the
- * svc_rdma_op_ctxt to the second svc_rqst. The second call returns
+ * the svc_rdma_recv_ctxt. The xdr_buf is copied from the
+ * svc_rdma_recv_ctxt to the second svc_rqst. The second call returns
* the length of the completed RPC Call message.
*
* Page Management
*
* Pages under I/O must be transferred from the first svc_rqst to an
- * svc_rdma_op_ctxt before the first svc_rdma_recvfrom call returns.
+ * svc_rdma_recv_ctxt before the first svc_rdma_recvfrom call returns.
*
* The first svc_rqst supplies pages for RDMA Reads. These are moved
* from rqstp::rq_pages into ctxt::pages. The consumed elements of
@@ -89,7 +89,7 @@
* svc_rdma_recvfrom call returns.
*
* During the second svc_rdma_recvfrom call, RDMA Read sink pages
- * are transferred from the svc_rdma_op_ctxt to the second svc_rqst
+ * are transferred from the svc_rdma_recv_ctxt to the second svc_rqst
* (see rdma_read_complete() below).
*/
@@ -108,13 +108,247 @@
#define RPCDBG_FACILITY RPCDBG_SVCXPRT
+static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc);
+
+static inline struct svc_rdma_recv_ctxt *
+svc_rdma_next_recv_ctxt(struct list_head *list)
+{
+ return list_first_entry_or_null(list, struct svc_rdma_recv_ctxt,
+ rc_list);
+}
+
+/**
+ * svc_rdma_recv_ctxts_destroy - Release all recv_ctxt's for an xprt
+ * @rdma: svcxprt_rdma being torn down
+ *
+ */
+void svc_rdma_recv_ctxts_destroy(struct svcxprt_rdma *rdma)
+{
+ struct svc_rdma_recv_ctxt *ctxt;
+
+ while ((ctxt = svc_rdma_next_recv_ctxt(&rdma->sc_recv_ctxts))) {
+ list_del(&ctxt->rc_list);
+ kfree(ctxt);
+ }
+}
+
+static struct svc_rdma_recv_ctxt *
+svc_rdma_recv_ctxt_get(struct svcxprt_rdma *rdma)
+{
+ struct svc_rdma_recv_ctxt *ctxt;
+
+ spin_lock(&rdma->sc_recv_lock);
+ ctxt = svc_rdma_next_recv_ctxt(&rdma->sc_recv_ctxts);
+ if (!ctxt)
+ goto out_empty;
+ list_del(&ctxt->rc_list);
+ spin_unlock(&rdma->sc_recv_lock);
+
+out:
+ ctxt->rc_recv_wr.num_sge = 0;
+ ctxt->rc_page_count = 0;
+ return ctxt;
+
+out_empty:
+ spin_unlock(&rdma->sc_recv_lock);
+
+ ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL);
+ if (!ctxt)
+ return NULL;
+ goto out;
+}
+
+static void svc_rdma_recv_ctxt_unmap(struct svcxprt_rdma *rdma,
+ struct svc_rdma_recv_ctxt *ctxt)
+{
+ struct ib_device *device = rdma->sc_cm_id->device;
+ int i;
+
+ for (i = 0; i < ctxt->rc_recv_wr.num_sge; i++)
+ ib_dma_unmap_page(device,
+ ctxt->rc_sges[i].addr,
+ ctxt->rc_sges[i].length,
+ DMA_FROM_DEVICE);
+}
+
+/**
+ * svc_rdma_recv_ctxt_put - Return recv_ctxt to free list
+ * @rdma: controlling svcxprt_rdma
+ * @ctxt: object to return to the free list
+ * @free_pages: Non-zero if rc_pages should be freed
+ *
+ */
+void svc_rdma_recv_ctxt_put(struct svcxprt_rdma *rdma,
+ struct svc_rdma_recv_ctxt *ctxt,
+ int free_pages)
+{
+ unsigned int i;
+
+ if (free_pages)
+ for (i = 0; i < ctxt->rc_page_count; i++)
+ put_page(ctxt->rc_pages[i]);
+ spin_lock(&rdma->sc_recv_lock);
+ list_add(&ctxt->rc_list, &rdma->sc_recv_ctxts);
+ spin_unlock(&rdma->sc_recv_lock);
+}
+
+static int svc_rdma_post_recv(struct svcxprt_rdma *rdma)
+{
+ struct ib_device *device = rdma->sc_cm_id->device;
+ struct svc_rdma_recv_ctxt *ctxt;
+ struct ib_recv_wr *bad_recv_wr;
+ int sge_no, buflen, ret;
+ struct page *page;
+ dma_addr_t pa;
+
+ ctxt = svc_rdma_recv_ctxt_get(rdma);
+ if (!ctxt)
+ return -ENOMEM;
+
+ buflen = 0;
+ ctxt->rc_cqe.done = svc_rdma_wc_receive;
+ for (sge_no = 0; buflen < rdma->sc_max_req_size; sge_no++) {
+ if (sge_no >= rdma->sc_max_sge) {
+ pr_err("svcrdma: Too many sges (%d)\n", sge_no);
+ goto err_put_ctxt;
+ }
+
+ page = alloc_page(GFP_KERNEL);
+ if (!page)
+ goto err_put_ctxt;
+ ctxt->rc_pages[sge_no] = page;
+ ctxt->rc_page_count++;
+
+ pa = ib_dma_map_page(device, ctxt->rc_pages[sge_no],
+ 0, PAGE_SIZE, DMA_FROM_DEVICE);
+ if (ib_dma_mapping_error(device, pa))
+ goto err_put_ctxt;
+ ctxt->rc_sges[sge_no].addr = pa;
+ ctxt->rc_sges[sge_no].length = PAGE_SIZE;
+ ctxt->rc_sges[sge_no].lkey = rdma->sc_pd->local_dma_lkey;
+ ctxt->rc_recv_wr.num_sge++;
+
+ buflen += PAGE_SIZE;
+ }
+ ctxt->rc_recv_wr.next = NULL;
+ ctxt->rc_recv_wr.sg_list = &ctxt->rc_sges[0];
+ ctxt->rc_recv_wr.wr_cqe = &ctxt->rc_cqe;
+
+ svc_xprt_get(&rdma->sc_xprt);
+ ret = ib_post_recv(rdma->sc_qp, &ctxt->rc_recv_wr, &bad_recv_wr);
+ trace_svcrdma_post_recv(&ctxt->rc_recv_wr, ret);
+ if (ret)
+ goto err_post;
+ return 0;
+
+err_put_ctxt:
+ svc_rdma_recv_ctxt_unmap(rdma, ctxt);
+ svc_rdma_recv_ctxt_put(rdma, ctxt, 1);
+ return -ENOMEM;
+err_post:
+ svc_rdma_recv_ctxt_unmap(rdma, ctxt);
+ svc_rdma_recv_ctxt_put(rdma, ctxt, 1);
+ svc_xprt_put(&rdma->sc_xprt);
+ return ret;
+}
+
+/**
+ * svc_rdma_post_recvs - Post initial set of Recv WRs
+ * @rdma: fresh svcxprt_rdma
+ *
+ * Returns true if successful, otherwise false.
+ */
+bool svc_rdma_post_recvs(struct svcxprt_rdma *rdma)
+{
+ unsigned int i;
+ int ret;
+
+ for (i = 0; i < rdma->sc_max_requests; i++) {
+ ret = svc_rdma_post_recv(rdma);
+ if (ret) {
+ pr_err("svcrdma: failure posting recv buffers: %d\n",
+ ret);
+ return false;
+ }
+ }
+ return true;
+}
+
+/**
+ * svc_rdma_wc_receive - Invoked by RDMA provider for each polled Receive WC
+ * @cq: Completion Queue context
+ * @wc: Work Completion object
+ *
+ * NB: The svc_xprt/svcxprt_rdma is pinned whenever it's possible that
+ * the Receive completion handler could be running.
+ */
+static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
+{
+ struct svcxprt_rdma *rdma = cq->cq_context;
+ struct ib_cqe *cqe = wc->wr_cqe;
+ struct svc_rdma_recv_ctxt *ctxt;
+
+ trace_svcrdma_wc_receive(wc);
+
+ /* WARNING: Only wc->wr_cqe and wc->status are reliable */
+ ctxt = container_of(cqe, struct svc_rdma_recv_ctxt, rc_cqe);
+ svc_rdma_recv_ctxt_unmap(rdma, ctxt);
+
+ if (wc->status != IB_WC_SUCCESS)
+ goto flushed;
+
+ if (svc_rdma_post_recv(rdma))
+ goto post_err;
+
+ /* All wc fields are now known to be valid */
+ ctxt->rc_byte_len = wc->byte_len;
+ spin_lock(&rdma->sc_rq_dto_lock);
+ list_add_tail(&ctxt->rc_list, &rdma->sc_rq_dto_q);
+ spin_unlock(&rdma->sc_rq_dto_lock);
+ set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags);
+ if (!test_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags))
+ svc_xprt_enqueue(&rdma->sc_xprt);
+ goto out;
+
+flushed:
+ if (wc->status != IB_WC_WR_FLUSH_ERR)
+ pr_err("svcrdma: Recv: %s (%u/0x%x)\n",
+ ib_wc_status_msg(wc->status),
+ wc->status, wc->vendor_err);
+post_err:
+ svc_rdma_recv_ctxt_put(rdma, ctxt, 1);
+ set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
+ svc_xprt_enqueue(&rdma->sc_xprt);
+out:
+ svc_xprt_put(&rdma->sc_xprt);
+}
+
+/**
+ * svc_rdma_flush_recv_queues - Drain pending Receive work
+ * @rdma: svcxprt_rdma being shut down
+ *
+ */
+void svc_rdma_flush_recv_queues(struct svcxprt_rdma *rdma)
+{
+ struct svc_rdma_recv_ctxt *ctxt;
+
+ while ((ctxt = svc_rdma_next_recv_ctxt(&rdma->sc_read_complete_q))) {
+ list_del(&ctxt->rc_list);
+ svc_rdma_recv_ctxt_put(rdma, ctxt, 1);
+ }
+ while ((ctxt = svc_rdma_next_recv_ctxt(&rdma->sc_rq_dto_q))) {
+ list_del(&ctxt->rc_list);
+ svc_rdma_recv_ctxt_put(rdma, ctxt, 1);
+ }
+}
+
/*
* Replace the pages in the rq_argpages array with the pages from the SGE in
* the RDMA_RECV completion. The SGL should contain full pages up until the
* last one.
*/
static void svc_rdma_build_arg_xdr(struct svc_rqst *rqstp,
- struct svc_rdma_op_ctxt *ctxt)
+ struct svc_rdma_recv_ctxt *ctxt)
{
struct page *page;
int sge_no;
@@ -123,30 +357,30 @@ static void svc_rdma_build_arg_xdr(struct svc_rqst *rqstp,
/* The reply path assumes the Call's transport header resides
* in rqstp->rq_pages[0].
*/
- page = ctxt->pages[0];
+ page = ctxt->rc_pages[0];
put_page(rqstp->rq_pages[0]);
rqstp->rq_pages[0] = page;
/* Set up the XDR head */
rqstp->rq_arg.head[0].iov_base = page_address(page);
rqstp->rq_arg.head[0].iov_len =
- min_t(size_t, ctxt->byte_len, ctxt->sge[0].length);
- rqstp->rq_arg.len = ctxt->byte_len;
- rqstp->rq_arg.buflen = ctxt->byte_len;
+ min_t(size_t, ctxt->rc_byte_len, ctxt->rc_sges[0].length);
+ rqstp->rq_arg.len = ctxt->rc_byte_len;
+ rqstp->rq_arg.buflen = ctxt->rc_byte_len;
/* Compute bytes past head in the SGL */
- len = ctxt->byte_len - rqstp->rq_arg.head[0].iov_len;
+ len = ctxt->rc_byte_len - rqstp->rq_arg.head[0].iov_len;
/* If data remains, store it in the pagelist */
rqstp->rq_arg.page_len = len;
rqstp->rq_arg.page_base = 0;
sge_no = 1;
- while (len && sge_no < ctxt->count) {
- page = ctxt->pages[sge_no];
+ while (len && sge_no < ctxt->rc_recv_wr.num_sge) {
+ page = ctxt->rc_pages[sge_no];
put_page(rqstp->rq_pages[sge_no]);
rqstp->rq_pages[sge_no] = page;
- len -= min_t(u32, len, ctxt->sge[sge_no].length);
+ len -= min_t(u32, len, ctxt->rc_sges[sge_no].length);
sge_no++;
}
rqstp->rq_respages = &rqstp->rq_pages[sge_no];
@@ -154,11 +388,11 @@ static void svc_rdma_build_arg_xdr(struct svc_rqst *rqstp,
/* If not all pages were used from the SGL, free the remaining ones */
len = sge_no;
- while (sge_no < ctxt->count) {
- page = ctxt->pages[sge_no++];
+ while (sge_no < ctxt->rc_recv_wr.num_sge) {
+ page = ctxt->rc_pages[sge_no++];
put_page(page);
}
- ctxt->count = len;
+ ctxt->rc_page_count = len;
/* Set up tail */
rqstp->rq_arg.tail[0].iov_base = NULL;
@@ -364,29 +598,29 @@ out_inval:
}
static void rdma_read_complete(struct svc_rqst *rqstp,
- struct svc_rdma_op_ctxt *head)
+ struct svc_rdma_recv_ctxt *head)
{
int page_no;
/* Copy RPC pages */
- for (page_no = 0; page_no < head->count; page_no++) {
+ for (page_no = 0; page_no < head->rc_page_count; page_no++) {
put_page(rqstp->rq_pages[page_no]);
- rqstp->rq_pages[page_no] = head->pages[page_no];
+ rqstp->rq_pages[page_no] = head->rc_pages[page_no];
}
/* Point rq_arg.pages past header */
- rqstp->rq_arg.pages = &rqstp->rq_pages[head->hdr_count];
- rqstp->rq_arg.page_len = head->arg.page_len;
+ rqstp->rq_arg.pages = &rqstp->rq_pages[head->rc_hdr_count];
+ rqstp->rq_arg.page_len = head->rc_arg.page_len;
/* rq_respages starts after the last arg page */
rqstp->rq_respages = &rqstp->rq_pages[page_no];
rqstp->rq_next_page = rqstp->rq_respages + 1;
/* Rebuild rq_arg head and tail. */
- rqstp->rq_arg.head[0] = head->arg.head[0];
- rqstp->rq_arg.tail[0] = head->arg.tail[0];
- rqstp->rq_arg.len = head->arg.len;
- rqstp->rq_arg.buflen = head->arg.buflen;
+ rqstp->rq_arg.head[0] = head->rc_arg.head[0];
+ rqstp->rq_arg.tail[0] = head->rc_arg.tail[0];
+ rqstp->rq_arg.len = head->rc_arg.len;
+ rqstp->rq_arg.buflen = head->rc_arg.buflen;
}
static void svc_rdma_send_error(struct svcxprt_rdma *xprt,
@@ -506,28 +740,26 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
struct svc_xprt *xprt = rqstp->rq_xprt;
struct svcxprt_rdma *rdma_xprt =
container_of(xprt, struct svcxprt_rdma, sc_xprt);
- struct svc_rdma_op_ctxt *ctxt;
+ struct svc_rdma_recv_ctxt *ctxt;
__be32 *p;
int ret;
spin_lock(&rdma_xprt->sc_rq_dto_lock);
- if (!list_empty(&rdma_xprt->sc_read_complete_q)) {
- ctxt = list_first_entry(&rdma_xprt->sc_read_complete_q,
- struct svc_rdma_op_ctxt, list);
- list_del(&ctxt->list);
+ ctxt = svc_rdma_next_recv_ctxt(&rdma_xprt->sc_read_complete_q);
+ if (ctxt) {
+ list_del(&ctxt->rc_list);
spin_unlock(&rdma_xprt->sc_rq_dto_lock);
rdma_read_complete(rqstp, ctxt);
goto complete;
- } else if (!list_empty(&rdma_xprt->sc_rq_dto_q)) {
- ctxt = list_first_entry(&rdma_xprt->sc_rq_dto_q,
- struct svc_rdma_op_ctxt, list);
- list_del(&ctxt->list);
- } else {
+ }
+ ctxt = svc_rdma_next_recv_ctxt(&rdma_xprt->sc_rq_dto_q);
+ if (!ctxt) {
/* No new incoming requests, terminate the loop */
clear_bit(XPT_DATA, &xprt->xpt_flags);
spin_unlock(&rdma_xprt->sc_rq_dto_lock);
return 0;
}
+ list_del(&ctxt->rc_list);
spin_unlock(&rdma_xprt->sc_rq_dto_lock);
atomic_inc(&rdma_stat_recv);
@@ -545,7 +777,7 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
if (svc_rdma_is_backchannel_reply(xprt, p)) {
ret = svc_rdma_handle_bc_reply(xprt->xpt_bc_xprt, p,
&rqstp->rq_arg);
- svc_rdma_put_context(ctxt, 0);
+ svc_rdma_recv_ctxt_put(rdma_xprt, ctxt, 0);
return ret;
}
@@ -554,7 +786,7 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
goto out_readchunk;
complete:
- svc_rdma_put_context(ctxt, 0);
+ svc_rdma_recv_ctxt_put(rdma_xprt, ctxt, 0);
rqstp->rq_prot = IPPROTO_MAX;
svc_xprt_copy_addrs(rqstp, xprt);
return rqstp->rq_arg.len;
@@ -567,16 +799,16 @@ out_readchunk:
out_err:
svc_rdma_send_error(rdma_xprt, p, ret);
- svc_rdma_put_context(ctxt, 0);
+ svc_rdma_recv_ctxt_put(rdma_xprt, ctxt, 0);
return 0;
out_postfail:
if (ret == -EINVAL)
svc_rdma_send_error(rdma_xprt, p, ret);
- svc_rdma_put_context(ctxt, 1);
+ svc_rdma_recv_ctxt_put(rdma_xprt, ctxt, 1);
return ret;
out_drop:
- svc_rdma_put_context(ctxt, 1);
+ svc_rdma_recv_ctxt_put(rdma_xprt, ctxt, 1);
return 0;
}