summaryrefslogtreecommitdiff
path: root/net
diff options
context:
space:
mode:
authorChuck Lever <chuck.lever@oracle.com>2020-03-13 10:42:10 -0400
committerChuck Lever <chuck.lever@oracle.com>2020-11-30 13:00:22 -0500
commitf6ad77590a5d432589a5d8a211c4e8e50cd8bb63 (patch)
treed0c7c6f982854252d413c06c1776a6f66bddc9da /net
parent76e5492b161f555c0fb69cad9eb39a7d8467f5fe (diff)
downloadlwn-f6ad77590a5d432589a5d8a211c4e8e50cd8bb63.tar.gz
lwn-f6ad77590a5d432589a5d8a211c4e8e50cd8bb63.zip
svcrdma: Post RDMA Writes while XDR encoding replies
The only RPC/RDMA ordering requirement between RDMA Writes and RDMA Sends is that the responder must post the Writes on the Send queue before posting the Send that conveys the RPC Reply for that Write payload. The Linux NFS server implementation now has a transport method that can post result Payload Writes earlier than svc_rdma_sendto: ->xpo_result_payload() This gets RDMA Writes going earlier so they are more likely to be complete at the remote end before the Send completes. Some care must be taken with pulled-up Replies. We don't want to push the Write chunk and then send the same payload data via Send. Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Diffstat (limited to 'net')
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_rw.c52
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_sendto.c60
2 files changed, 76 insertions, 36 deletions
diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c
index a75f8a133bef..f2ed1bf50251 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_rw.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c
@@ -539,12 +539,48 @@ static int svc_rdma_pages_write(struct svc_rdma_write_info *info,
}
/**
+ * svc_rdma_xb_write - Construct RDMA Writes to write an xdr_buf
+ * @xdr: xdr_buf to write
+ * @info: pointer to write arguments
+ *
+ * Returns:
+ * On succes, returns zero
+ * %-E2BIG if the client-provided Write chunk is too small
+ * %-ENOMEM if a resource has been exhausted
+ * %-EIO if an rdma-rw error occurred
+ */
+static int svc_rdma_xb_write(const struct xdr_buf *xdr,
+ struct svc_rdma_write_info *info)
+{
+ int ret;
+
+ if (xdr->head[0].iov_len) {
+ ret = svc_rdma_iov_write(info, &xdr->head[0]);
+ if (ret < 0)
+ return ret;
+ }
+
+ if (xdr->page_len) {
+ ret = svc_rdma_pages_write(info, xdr, xdr->head[0].iov_len,
+ xdr->page_len);
+ if (ret < 0)
+ return ret;
+ }
+
+ if (xdr->tail[0].iov_len) {
+ ret = svc_rdma_iov_write(info, &xdr->tail[0]);
+ if (ret < 0)
+ return ret;
+ }
+
+ return xdr->len;
+}
+
+/**
* svc_rdma_send_write_chunk - Write all segments in a Write chunk
* @rdma: controlling RDMA transport
* @wr_ch: Write chunk provided by client
* @xdr: xdr_buf containing the data payload
- * @offset: payload's byte offset in @xdr
- * @length: size of payload, in bytes
*
* Returns a non-negative number of bytes the chunk consumed, or
* %-E2BIG if the payload was larger than the Write chunk,
@@ -554,21 +590,17 @@ static int svc_rdma_pages_write(struct svc_rdma_write_info *info,
* %-EIO if rdma_rw initialization failed (DMA mapping, etc).
*/
int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, __be32 *wr_ch,
- struct xdr_buf *xdr,
- unsigned int offset, unsigned long length)
+ const struct xdr_buf *xdr)
{
struct svc_rdma_write_info *info;
int ret;
- if (!length)
- return 0;
-
info = svc_rdma_write_info_alloc(rdma, wr_ch);
if (!info)
return -ENOMEM;
- ret = svc_rdma_pages_write(info, xdr, offset, length);
- if (ret < 0)
+ ret = svc_rdma_xb_write(xdr, info);
+ if (ret != xdr->len)
goto out_err;
ret = svc_rdma_post_chunk_ctxt(&info->wi_cc);
@@ -576,7 +608,7 @@ int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, __be32 *wr_ch,
goto out_err;
trace_svcrdma_send_write_chunk(xdr->page_len);
- return length;
+ return xdr->len;
out_err:
svc_rdma_write_info_free(info);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index d6436c13d5c4..e8b0d030e1e6 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -468,11 +468,14 @@ svc_rdma_encode_write_list(const struct svc_rdma_recv_ctxt *rctxt,
{
ssize_t len, ret;
- ret = svc_rdma_encode_write_chunk(rctxt->rc_write_list, sctxt,
- rctxt->rc_read_payload_length);
- if (ret < 0)
- return ret;
- len = ret;
+ len = 0;
+ if (rctxt->rc_write_list) {
+ ret = svc_rdma_encode_write_chunk(rctxt->rc_write_list, sctxt,
+ rctxt->rc_read_payload_length);
+ if (ret < 0)
+ return ret;
+ len = ret;
+ }
/* Terminate the Write list */
ret = xdr_stream_encode_item_absent(&sctxt->sc_stream);
@@ -556,11 +559,13 @@ static bool svc_rdma_pull_up_needed(struct svcxprt_rdma *rdma,
const struct svc_rdma_recv_ctxt *rctxt,
struct xdr_buf *xdr)
{
+ bool write_chunk_present = rctxt && rctxt->rc_write_list;
int elements;
/* For small messages, copying bytes is cheaper than DMA mapping.
*/
- if (sctxt->sc_hdrbuf.len + xdr->len < RPCRDMA_PULLUP_THRESH)
+ if (!write_chunk_present &&
+ sctxt->sc_hdrbuf.len + xdr->len < RPCRDMA_PULLUP_THRESH)
return true;
/* Check whether the xdr_buf has more elements than can
@@ -893,9 +898,7 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
container_of(xprt, struct svcxprt_rdma, sc_xprt);
struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
__be32 *rdma_argp = rctxt->rc_recv_buf;
- __be32 *wr_lst = rctxt->rc_write_list;
__be32 *rp_ch = rctxt->rc_reply_chunk;
- struct xdr_buf *xdr = &rqstp->rq_res;
struct svc_rdma_send_ctxt *sctxt;
__be32 *p;
int ret;
@@ -920,19 +923,8 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
if (svc_rdma_encode_read_list(sctxt) < 0)
goto err0;
- if (wr_lst) {
- /* XXX: Presume the client sent only one Write chunk */
- ret = svc_rdma_send_write_chunk(rdma, wr_lst, xdr,
- rctxt->rc_read_payload_offset,
- rctxt->rc_read_payload_length);
- if (ret < 0)
- goto err2;
- if (svc_rdma_encode_write_list(rctxt, sctxt) < 0)
- goto err0;
- } else {
- if (xdr_stream_encode_item_absent(&sctxt->sc_stream) < 0)
- goto err0;
- }
+ if (svc_rdma_encode_write_list(rctxt, sctxt) < 0)
+ goto err0;
if (rp_ch) {
ret = svc_rdma_send_reply_chunk(rdma, rctxt, &rqstp->rq_res);
if (ret < 0)
@@ -974,16 +966,25 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
* @offset: payload's byte offset in @xdr
* @length: size of payload, in bytes
*
- * Returns zero on success.
- *
- * For the moment, just record the xdr_buf location of the result
- * payload. svc_rdma_sendto will use that location later when
- * we actually send the payload.
+ * Return values:
+ * %0 if successful or nothing needed to be done
+ * %-EMSGSIZE on XDR buffer overflow
+ * %-E2BIG if the payload was larger than the Write chunk
+ * %-EINVAL if client provided too many segments
+ * %-ENOMEM if rdma_rw context pool was exhausted
+ * %-ENOTCONN if posting failed (connection is lost)
+ * %-EIO if rdma_rw initialization failed (DMA mapping, etc)
*/
int svc_rdma_result_payload(struct svc_rqst *rqstp, unsigned int offset,
unsigned int length)
{
struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
+ struct svcxprt_rdma *rdma;
+ struct xdr_buf subbuf;
+ int ret;
+
+ if (!rctxt->rc_write_list || !length)
+ return 0;
/* XXX: Just one READ payload slot for now, since our
* transport implementation currently supports only one
@@ -992,5 +993,12 @@ int svc_rdma_result_payload(struct svc_rqst *rqstp, unsigned int offset,
rctxt->rc_read_payload_offset = offset;
rctxt->rc_read_payload_length = length;
+ if (xdr_buf_subsegment(&rqstp->rq_res, &subbuf, offset, length))
+ return -EMSGSIZE;
+
+ rdma = container_of(rqstp->rq_xprt, struct svcxprt_rdma, sc_xprt);
+ ret = svc_rdma_send_write_chunk(rdma, rctxt->rc_write_list, &subbuf);
+ if (ret < 0)
+ return ret;
return 0;
}