diff options
Diffstat (limited to 'net')
-rw-r--r-- | net/sunrpc/Kconfig | 1 | ||||
-rw-r--r-- | net/sunrpc/svc.c | 134 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/Makefile | 2 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma.c | 8 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 71 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_marshal.c | 89 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 79 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_rw.c | 512 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_sendto.c | 978 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_transport.c | 110 |
10 files changed, 1197 insertions, 787 deletions
diff --git a/net/sunrpc/Kconfig b/net/sunrpc/Kconfig index 04ce2c0b660e..ac09ca803296 100644 --- a/net/sunrpc/Kconfig +++ b/net/sunrpc/Kconfig @@ -52,6 +52,7 @@ config SUNRPC_XPRT_RDMA tristate "RPC-over-RDMA transport" depends on SUNRPC && INFINIBAND && INFINIBAND_ADDR_TRANS default SUNRPC && INFINIBAND + select SG_POOL help This option allows the NFS client and server to use RDMA transports (InfiniBand, iWARP, or RoCE). diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c index a08aeb56b8e4..bc0f5a0ecbdc 100644 --- a/net/sunrpc/svc.c +++ b/net/sunrpc/svc.c @@ -702,59 +702,32 @@ found_pool: return task; } -/* - * Create or destroy enough new threads to make the number - * of threads the given number. If `pool' is non-NULL, applies - * only to threads in that pool, otherwise round-robins between - * all pools. Caller must ensure that mutual exclusion between this and - * server startup or shutdown. - * - * Destroying threads relies on the service threads filling in - * rqstp->rq_task, which only the nfs ones do. Assumes the serv - * has been created using svc_create_pooled(). - * - * Based on code that used to be in nfsd_svc() but tweaked - * to be pool-aware. - */ -int -svc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs) +/* create new threads */ +static int +svc_start_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs) { struct svc_rqst *rqstp; struct task_struct *task; struct svc_pool *chosen_pool; - int error = 0; unsigned int state = serv->sv_nrthreads-1; int node; - if (pool == NULL) { - /* The -1 assumes caller has done a svc_get() */ - nrservs -= (serv->sv_nrthreads-1); - } else { - spin_lock_bh(&pool->sp_lock); - nrservs -= pool->sp_nrthreads; - spin_unlock_bh(&pool->sp_lock); - } - - /* create new threads */ - while (nrservs > 0) { + do { nrservs--; chosen_pool = choose_pool(serv, pool, &state); node = svc_pool_map_get_node(chosen_pool->sp_id); rqstp = svc_prepare_thread(serv, chosen_pool, node); - if (IS_ERR(rqstp)) { - error = PTR_ERR(rqstp); - break; - } + if (IS_ERR(rqstp)) + return PTR_ERR(rqstp); __module_get(serv->sv_ops->svo_module); task = kthread_create_on_node(serv->sv_ops->svo_function, rqstp, node, "%s", serv->sv_name); if (IS_ERR(task)) { - error = PTR_ERR(task); module_put(serv->sv_ops->svo_module); svc_exit_thread(rqstp); - break; + return PTR_ERR(task); } rqstp->rq_task = task; @@ -763,18 +736,103 @@ svc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs) svc_sock_update_bufs(serv); wake_up_process(task); - } + } while (nrservs > 0); + + return 0; +} + + +/* destroy old threads */ +static int +svc_signal_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs) +{ + struct task_struct *task; + unsigned int state = serv->sv_nrthreads-1; + /* destroy old threads */ - while (nrservs < 0 && - (task = choose_victim(serv, pool, &state)) != NULL) { + do { + task = choose_victim(serv, pool, &state); + if (task == NULL) + break; send_sig(SIGINT, task, 1); nrservs++; + } while (nrservs < 0); + + return 0; +} + +/* + * Create or destroy enough new threads to make the number + * of threads the given number. If `pool' is non-NULL, applies + * only to threads in that pool, otherwise round-robins between + * all pools. Caller must ensure that mutual exclusion between this and + * server startup or shutdown. + * + * Destroying threads relies on the service threads filling in + * rqstp->rq_task, which only the nfs ones do. Assumes the serv + * has been created using svc_create_pooled(). + * + * Based on code that used to be in nfsd_svc() but tweaked + * to be pool-aware. + */ +int +svc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs) +{ + if (pool == NULL) { + /* The -1 assumes caller has done a svc_get() */ + nrservs -= (serv->sv_nrthreads-1); + } else { + spin_lock_bh(&pool->sp_lock); + nrservs -= pool->sp_nrthreads; + spin_unlock_bh(&pool->sp_lock); } - return error; + if (nrservs > 0) + return svc_start_kthreads(serv, pool, nrservs); + if (nrservs < 0) + return svc_signal_kthreads(serv, pool, nrservs); + return 0; } EXPORT_SYMBOL_GPL(svc_set_num_threads); +/* destroy old threads */ +static int +svc_stop_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs) +{ + struct task_struct *task; + unsigned int state = serv->sv_nrthreads-1; + + /* destroy old threads */ + do { + task = choose_victim(serv, pool, &state); + if (task == NULL) + break; + kthread_stop(task); + nrservs++; + } while (nrservs < 0); + return 0; +} + +int +svc_set_num_threads_sync(struct svc_serv *serv, struct svc_pool *pool, int nrservs) +{ + if (pool == NULL) { + /* The -1 assumes caller has done a svc_get() */ + nrservs -= (serv->sv_nrthreads-1); + } else { + spin_lock_bh(&pool->sp_lock); + nrservs -= pool->sp_nrthreads; + spin_unlock_bh(&pool->sp_lock); + } + + if (nrservs > 0) + return svc_start_kthreads(serv, pool, nrservs); + if (nrservs < 0) + return svc_stop_kthreads(serv, pool, nrservs); + return 0; +} +EXPORT_SYMBOL_GPL(svc_set_num_threads_sync); + /* * Called from a server thread as it's exiting. Caller must hold the "service * mutex" for the service. diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile index ef19fa42c50f..c1ae8142ab73 100644 --- a/net/sunrpc/xprtrdma/Makefile +++ b/net/sunrpc/xprtrdma/Makefile @@ -4,5 +4,5 @@ rpcrdma-y := transport.o rpc_rdma.o verbs.o \ fmr_ops.o frwr_ops.o \ svc_rdma.o svc_rdma_backchannel.o svc_rdma_transport.o \ svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \ - module.o + svc_rdma_rw.o module.o rpcrdma-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel.o diff --git a/net/sunrpc/xprtrdma/svc_rdma.c b/net/sunrpc/xprtrdma/svc_rdma.c index c846ca9f1eba..a4a8f6989ee7 100644 --- a/net/sunrpc/xprtrdma/svc_rdma.c +++ b/net/sunrpc/xprtrdma/svc_rdma.c @@ -58,9 +58,9 @@ unsigned int svcrdma_max_requests = RPCRDMA_MAX_REQUESTS; unsigned int svcrdma_max_bc_requests = RPCRDMA_MAX_BC_REQUESTS; static unsigned int min_max_requests = 4; static unsigned int max_max_requests = 16384; -unsigned int svcrdma_max_req_size = RPCRDMA_MAX_REQ_SIZE; -static unsigned int min_max_inline = 4096; -static unsigned int max_max_inline = 65536; +unsigned int svcrdma_max_req_size = RPCRDMA_DEF_INLINE_THRESH; +static unsigned int min_max_inline = RPCRDMA_DEF_INLINE_THRESH; +static unsigned int max_max_inline = RPCRDMA_MAX_INLINE_THRESH; atomic_t rdma_stat_recv; atomic_t rdma_stat_read; @@ -247,8 +247,6 @@ int svc_rdma_init(void) dprintk("SVCRDMA Module Init, register RPC RDMA transport\n"); dprintk("\tsvcrdma_ord : %d\n", svcrdma_ord); dprintk("\tmax_requests : %u\n", svcrdma_max_requests); - dprintk("\tsq_depth : %u\n", - svcrdma_max_requests * RPCRDMA_SQ_DEPTH_MULT); dprintk("\tmax_bc_requests : %u\n", svcrdma_max_bc_requests); dprintk("\tmax_inline : %d\n", svcrdma_max_req_size); diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c index ff1df40f0d26..c676ed0efb5a 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c +++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c @@ -12,7 +12,17 @@ #undef SVCRDMA_BACKCHANNEL_DEBUG -int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, struct rpcrdma_msg *rmsgp, +/** + * svc_rdma_handle_bc_reply - Process incoming backchannel reply + * @xprt: controlling backchannel transport + * @rdma_resp: pointer to incoming transport header + * @rcvbuf: XDR buffer into which to decode the reply + * + * Returns: + * %0 if @rcvbuf is filled in, xprt_complete_rqst called, + * %-EAGAIN if server should call ->recvfrom again. + */ +int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp, struct xdr_buf *rcvbuf) { struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); @@ -27,13 +37,13 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, struct rpcrdma_msg *rmsgp, p = (__be32 *)src->iov_base; len = src->iov_len; - xid = rmsgp->rm_xid; + xid = *rdma_resp; #ifdef SVCRDMA_BACKCHANNEL_DEBUG pr_info("%s: xid=%08x, length=%zu\n", __func__, be32_to_cpu(xid), len); pr_info("%s: RPC/RDMA: %*ph\n", - __func__, (int)RPCRDMA_HDRLEN_MIN, rmsgp); + __func__, (int)RPCRDMA_HDRLEN_MIN, rdma_resp); pr_info("%s: RPC: %*ph\n", __func__, (int)len, p); #endif @@ -53,7 +63,7 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, struct rpcrdma_msg *rmsgp, goto out_unlock; memcpy(dst->iov_base, p, len); - credits = be32_to_cpu(rmsgp->rm_credit); + credits = be32_to_cpup(rdma_resp + 2); if (credits == 0) credits = 1; /* don't deadlock */ else if (credits > r_xprt->rx_buf.rb_bc_max_requests) @@ -90,9 +100,9 @@ out_notfound: * Caller holds the connection's mutex and has already marshaled * the RPC/RDMA request. * - * This is similar to svc_rdma_reply, but takes an rpc_rqst - * instead, does not support chunks, and avoids blocking memory - * allocation. + * This is similar to svc_rdma_send_reply_msg, but takes a struct + * rpc_rqst instead, does not support chunks, and avoids blocking + * memory allocation. * * XXX: There is still an opportunity to block in svc_rdma_send() * if there are no SQ entries to post the Send. This may occur if @@ -101,59 +111,36 @@ out_notfound: static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst) { - struct xdr_buf *sndbuf = &rqst->rq_snd_buf; struct svc_rdma_op_ctxt *ctxt; - struct svc_rdma_req_map *vec; - struct ib_send_wr send_wr; int ret; - vec = svc_rdma_get_req_map(rdma); - ret = svc_rdma_map_xdr(rdma, sndbuf, vec, false); - if (ret) + ctxt = svc_rdma_get_context(rdma); + + /* rpcrdma_bc_send_request builds the transport header and + * the backchannel RPC message in the same buffer. Thus only + * one SGE is needed to send both. + */ + ret = svc_rdma_map_reply_hdr(rdma, ctxt, rqst->rq_buffer, + rqst->rq_snd_buf.len); + if (ret < 0) goto out_err; ret = svc_rdma_repost_recv(rdma, GFP_NOIO); if (ret) goto out_err; - ctxt = svc_rdma_get_context(rdma); - ctxt->pages[0] = virt_to_page(rqst->rq_buffer); - ctxt->count = 1; - - ctxt->direction = DMA_TO_DEVICE; - ctxt->sge[0].lkey = rdma->sc_pd->local_dma_lkey; - ctxt->sge[0].length = sndbuf->len; - ctxt->sge[0].addr = - ib_dma_map_page(rdma->sc_cm_id->device, ctxt->pages[0], 0, - sndbuf->len, DMA_TO_DEVICE); - if (ib_dma_mapping_error(rdma->sc_cm_id->device, ctxt->sge[0].addr)) { - ret = -EIO; - goto out_unmap; - } - svc_rdma_count_mappings(rdma, ctxt); - - memset(&send_wr, 0, sizeof(send_wr)); - ctxt->cqe.done = svc_rdma_wc_send; - send_wr.wr_cqe = &ctxt->cqe; - send_wr.sg_list = ctxt->sge; - send_wr.num_sge = 1; - send_wr.opcode = IB_WR_SEND; - send_wr.send_flags = IB_SEND_SIGNALED; - - ret = svc_rdma_send(rdma, &send_wr); - if (ret) { - ret = -EIO; + ret = svc_rdma_post_send_wr(rdma, ctxt, 1, 0); + if (ret) goto out_unmap; - } out_err: - svc_rdma_put_req_map(rdma, vec); dprintk("svcrdma: %s returns %d\n", __func__, ret); return ret; out_unmap: svc_rdma_unmap_dma(ctxt); svc_rdma_put_context(ctxt, 1); + ret = -EIO; goto out_err; } diff --git a/net/sunrpc/xprtrdma/svc_rdma_marshal.c b/net/sunrpc/xprtrdma/svc_rdma_marshal.c index 1c4aabf0f657..bdcf7d85a3dc 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_marshal.c +++ b/net/sunrpc/xprtrdma/svc_rdma_marshal.c @@ -166,92 +166,3 @@ out_inval: dprintk("svcrdma: failed to parse transport header\n"); return -EINVAL; } - -int svc_rdma_xdr_encode_error(struct svcxprt_rdma *xprt, - struct rpcrdma_msg *rmsgp, - enum rpcrdma_errcode err, __be32 *va) -{ - __be32 *startp = va; - - *va++ = rmsgp->rm_xid; - *va++ = rmsgp->rm_vers; - *va++ = xprt->sc_fc_credits; - *va++ = rdma_error; - *va++ = cpu_to_be32(err); - if (err == ERR_VERS) { - *va++ = rpcrdma_version; - *va++ = rpcrdma_version; - } - - return (int)((unsigned long)va - (unsigned long)startp); -} - -/** - * svc_rdma_xdr_get_reply_hdr_length - Get length of Reply transport header - * @rdma_resp: buffer containing Reply transport header - * - * Returns length of transport header, in bytes. - */ -unsigned int svc_rdma_xdr_get_reply_hdr_len(__be32 *rdma_resp) -{ - unsigned int nsegs; - __be32 *p; - - p = rdma_resp; - - /* RPC-over-RDMA V1 replies never have a Read list. */ - p += rpcrdma_fixed_maxsz + 1; - - /* Skip Write list. */ - while (*p++ != xdr_zero) { - nsegs = be32_to_cpup(p++); - p += nsegs * rpcrdma_segment_maxsz; - } - - /* Skip Reply chunk. */ - if (*p++ != xdr_zero) { - nsegs = be32_to_cpup(p++); - p += nsegs * rpcrdma_segment_maxsz; - } - - return (unsigned long)p - (unsigned long)rdma_resp; -} - -void svc_rdma_xdr_encode_write_list(struct rpcrdma_msg *rmsgp, int chunks) -{ - struct rpcrdma_write_array *ary; - - /* no read-list */ - rmsgp->rm_body.rm_chunks[0] = xdr_zero; - - /* write-array discrim */ - ary = (struct rpcrdma_write_array *) - &rmsgp->rm_body.rm_chunks[1]; - ary->wc_discrim = xdr_one; - ary->wc_nchunks = cpu_to_be32(chunks); - - /* write-list terminator */ - ary->wc_array[chunks].wc_target.rs_handle = xdr_zero; - - /* reply-array discriminator */ - ary->wc_array[chunks].wc_target.rs_length = xdr_zero; -} - -void svc_rdma_xdr_encode_reply_array(struct rpcrdma_write_array *ary, - int chunks) -{ - ary->wc_discrim = xdr_one; - ary->wc_nchunks = cpu_to_be32(chunks); -} - -void svc_rdma_xdr_encode_array_chunk(struct rpcrdma_write_array *ary, - int chunk_no, - __be32 rs_handle, - __be64 rs_offset, - u32 write_len) -{ - struct rpcrdma_segment *seg = &ary->wc_array[chunk_no].wc_target; - seg->rs_handle = rs_handle; - seg->rs_offset = rs_offset; - seg->rs_length = cpu_to_be32(write_len); -} diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c index f7b2daf72a86..27a99bf5b1a6 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c +++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c @@ -558,33 +558,85 @@ static void rdma_read_complete(struct svc_rqst *rqstp, rqstp->rq_arg.buflen = head->arg.buflen; } +static void svc_rdma_send_error(struct svcxprt_rdma *xprt, + __be32 *rdma_argp, int status) +{ + struct svc_rdma_op_ctxt *ctxt; + __be32 *p, *err_msgp; + unsigned int length; + struct page *page; + int ret; + + ret = svc_rdma_repost_recv(xprt, GFP_KERNEL); + if (ret) + return; + + page = alloc_page(GFP_KERNEL); + if (!page) + return; + err_msgp = page_address(page); + + p = err_msgp; + *p++ = *rdma_argp; + *p++ = *(rdma_argp + 1); + *p++ = xprt->sc_fc_credits; + *p++ = rdma_error; + if (status == -EPROTONOSUPPORT) { + *p++ = err_vers; + *p++ = rpcrdma_version; + *p++ = rpcrdma_version; + } else { + *p++ = err_chunk; + } + length = (unsigned long)p - (unsigned long)err_msgp; + + /* Map transport header; no RPC message payload */ + ctxt = svc_rdma_get_context(xprt); + ret = svc_rdma_map_reply_hdr(xprt, ctxt, err_msgp, length); + if (ret) { + dprintk("svcrdma: Error %d mapping send for protocol error\n", + ret); + return; + } + + ret = svc_rdma_post_send_wr(xprt, ctxt, 1, 0); + if (ret) { + dprintk("svcrdma: Error %d posting send for protocol error\n", + ret); + svc_rdma_unmap_dma(ctxt); + svc_rdma_put_context(ctxt, 1); + } +} + /* By convention, backchannel calls arrive via rdma_msg type * messages, and never populate the chunk lists. This makes * the RPC/RDMA header small and fixed in size, so it is * straightforward to check the RPC header's direction field. */ -static bool -svc_rdma_is_backchannel_reply(struct svc_xprt *xprt, struct rpcrdma_msg *rmsgp) +static bool svc_rdma_is_backchannel_reply(struct svc_xprt *xprt, + __be32 *rdma_resp) { - __be32 *p = (__be32 *)rmsgp; + __be32 *p; if (!xprt->xpt_bc_xprt) return false; - if (rmsgp->rm_type != rdma_msg) + p = rdma_resp + 3; + if (*p++ != rdma_msg) return false; - if (rmsgp->rm_body.rm_chunks[0] != xdr_zero) + + if (*p++ != xdr_zero) return false; - if (rmsgp->rm_body.rm_chunks[1] != xdr_zero) + if (*p++ != xdr_zero) return false; - if (rmsgp->rm_body.rm_chunks[2] != xdr_zero) + if (*p++ != xdr_zero) return false; - /* sanity */ - if (p[7] != rmsgp->rm_xid) + /* XID sanity */ + if (*p++ != *rdma_resp) return false; /* call direction */ - if (p[8] == cpu_to_be32(RPC_CALL)) + if (*p == cpu_to_be32(RPC_CALL)) return false; return true; @@ -650,8 +702,9 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp) goto out_drop; rqstp->rq_xprt_hlen = ret; - if (svc_rdma_is_backchannel_reply(xprt, rmsgp)) { - ret = svc_rdma_handle_bc_reply(xprt->xpt_bc_xprt, rmsgp, + if (svc_rdma_is_backchannel_reply(xprt, &rmsgp->rm_xid)) { + ret = svc_rdma_handle_bc_reply(xprt->xpt_bc_xprt, + &rmsgp->rm_xid, &rqstp->rq_arg); svc_rdma_put_context(ctxt, 0); if (ret) @@ -686,7 +739,7 @@ complete: return ret; out_err: - svc_rdma_send_error(rdma_xprt, rmsgp, ret); + svc_rdma_send_error(rdma_xprt, &rmsgp->rm_xid, ret); svc_rdma_put_context(ctxt, 0); return 0; diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c new file mode 100644 index 000000000000..0cf620277693 --- /dev/null +++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c @@ -0,0 +1,512 @@ +/* + * Copyright (c) 2016 Oracle. All rights reserved. + * + * Use the core R/W API to move RPC-over-RDMA Read and Write chunks. + */ + +#include <linux/sunrpc/rpc_rdma.h> +#include <linux/sunrpc/svc_rdma.h> +#include <linux/sunrpc/debug.h> + +#include <rdma/rw.h> + +#define RPCDBG_FACILITY RPCDBG_SVCXPRT + +/* Each R/W context contains state for one chain of RDMA Read or + * Write Work Requests. + * + * Each WR chain handles a single contiguous server-side buffer, + * because scatterlist entries after the first have to start on + * page alignment. xdr_buf iovecs cannot guarantee alignment. + * + * Each WR chain handles only one R_key. Each RPC-over-RDMA segment + * from a client may contain a unique R_key, so each WR chain moves + * up to one segment at a time. + * + * The scatterlist makes this data structure over 4KB in size. To + * make it less likely to fail, and to handle the allocation for + * smaller I/O requests without disabling bottom-halves, these + * contexts are created on demand, but cached and reused until the + * controlling svcxprt_rdma is destroyed. + */ +struct svc_rdma_rw_ctxt { + struct list_head rw_list; + struct rdma_rw_ctx rw_ctx; + int rw_nents; + struct sg_table rw_sg_table; + struct scatterlist rw_first_sgl[0]; +}; + +static inline struct svc_rdma_rw_ctxt * +svc_rdma_next_ctxt(struct list_head *list) +{ + return list_first_entry_or_null(list, struct svc_rdma_rw_ctxt, + rw_list); +} + +static struct svc_rdma_rw_ctxt * +svc_rdma_get_rw_ctxt(struct svcxprt_rdma *rdma, unsigned int sges) +{ + struct svc_rdma_rw_ctxt *ctxt; + + spin_lock(&rdma->sc_rw_ctxt_lock); + + ctxt = svc_rdma_next_ctxt(&rdma->sc_rw_ctxts); + if (ctxt) { + list_del(&ctxt->rw_list); + spin_unlock(&rdma->sc_rw_ctxt_lock); + } else { + spin_unlock(&rdma->sc_rw_ctxt_lock); + ctxt = kmalloc(sizeof(*ctxt) + + SG_CHUNK_SIZE * sizeof(struct scatterlist), + GFP_KERNEL); + if (!ctxt) + goto out; + INIT_LIST_HEAD(&ctxt->rw_list); + } + + ctxt->rw_sg_table.sgl = ctxt->rw_first_sgl; + if (sg_alloc_table_chained(&ctxt->rw_sg_table, sges, + ctxt->rw_sg_table.sgl)) { + kfree(ctxt); + ctxt = NULL; + } +out: + return ctxt; +} + +static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma, + struct svc_rdma_rw_ctxt *ctxt) +{ + sg_free_table_chained(&ctxt->rw_sg_table, true); + + spin_lock(&rdma->sc_rw_ctxt_lock); + list_add(&ctxt->rw_list, &rdma->sc_rw_ctxts); + spin_unlock(&rdma->sc_rw_ctxt_lock); +} + +/** + * svc_rdma_destroy_rw_ctxts - Free accumulated R/W contexts + * @rdma: transport about to be destroyed + * + */ +void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma) +{ + struct svc_rdma_rw_ctxt *ctxt; + + while ((ctxt = svc_rdma_next_ctxt(&rdma->sc_rw_ctxts)) != NULL) { + list_del(&ctxt->rw_list); + kfree(ctxt); + } +} + +/* A chunk context tracks all I/O for moving one Read or Write + * chunk. This is a a set of rdma_rw's that handle data movement + * for all segments of one chunk. + * + * These are small, acquired with a single allocator call, and + * no more than one is needed per chunk. They are allocated on + * demand, and not cached. + */ +struct svc_rdma_chunk_ctxt { + struct ib_cqe cc_cqe; + struct svcxprt_rdma *cc_rdma; + struct list_head cc_rwctxts; + int cc_sqecount; + enum dma_data_direction cc_dir; +}; + +static void svc_rdma_cc_init(struct svcxprt_rdma *rdma, + struct svc_rdma_chunk_ctxt *cc, + enum dma_data_direction dir) +{ + cc->cc_rdma = rdma; + svc_xprt_get(&rdma->sc_xprt); + + INIT_LIST_HEAD(&cc->cc_rwctxts); + cc->cc_sqecount = 0; + cc->cc_dir = dir; +} + +static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt *cc) +{ + struct svcxprt_rdma *rdma = cc->cc_rdma; + struct svc_rdma_rw_ctxt *ctxt; + + while ((ctxt = svc_rdma_next_ctxt(&cc->cc_rwctxts)) != NULL) { + list_del(&ctxt->rw_list); + + rdma_rw_ctx_destroy(&ctxt->rw_ctx, rdma->sc_qp, + rdma->sc_port_num, ctxt->rw_sg_table.sgl, + ctxt->rw_nents, cc->cc_dir); + svc_rdma_put_rw_ctxt(rdma, ctxt); + } + svc_xprt_put(&rdma->sc_xprt); +} + +/* State for sending a Write or Reply chunk. + * - Tracks progress of writing one chunk over all its segments + * - Stores arguments for the SGL constructor functions + */ +struct svc_rdma_write_info { + /* write state of this chunk */ + unsigned int wi_seg_off; + unsigned int wi_seg_no; + unsigned int wi_nsegs; + __be32 *wi_segs; + + /* SGL constructor arguments */ + struct xdr_buf *wi_xdr; + unsigned char *wi_base; + unsigned int wi_next_off; + + struct svc_rdma_chunk_ctxt wi_cc; +}; + +static struct svc_rdma_write_info * +svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma, __be32 *chunk) +{ + struct svc_rdma_write_info *info; + + info = kmalloc(sizeof(*info), GFP_KERNEL); + if (!info) + return info; + + info->wi_seg_off = 0; + info->wi_seg_no = 0; + info->wi_nsegs = be32_to_cpup(++chunk); + info->wi_segs = ++chunk; + svc_rdma_cc_init(rdma, &info->wi_cc, DMA_TO_DEVICE); + return info; +} + +static void svc_rdma_write_info_free(struct svc_rdma_write_info *info) +{ + svc_rdma_cc_release(&info->wi_cc); + kfree(info); +} + +/** + * svc_rdma_write_done - Write chunk completion + * @cq: controlling Completion Queue + * @wc: Work Completion + * + * Pages under I/O are freed by a subsequent Send completion. + */ +static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc) +{ + struct ib_cqe *cqe = wc->wr_cqe; + struct svc_rdma_chunk_ctxt *cc = + container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe); + struct svcxprt_rdma *rdma = cc->cc_rdma; + struct svc_rdma_write_info *info = + container_of(cc, struct svc_rdma_write_info, wi_cc); + + atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail); + wake_up(&rdma->sc_send_wait); + + if (unlikely(wc->status != IB_WC_SUCCESS)) { + set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags); + if (wc->status != IB_WC_WR_FLUSH_ERR) + pr_err("svcrdma: write ctx: %s (%u/0x%x)\n", + ib_wc_status_msg(wc->status), + wc->status, wc->vendor_err); + } + + svc_rdma_write_info_free(info); +} + +/* This function sleeps when the transport's Send Queue is congested. + * + * Assumptions: + * - If ib_post_send() succeeds, only one completion is expected, + * even if one or more WRs are flushed. This is true when posting + * an rdma_rw_ctx or when posting a single signaled WR. + */ +static int svc_rdma_post_chunk_ctxt(struct svc_rdma_chunk_ctxt *cc) +{ + struct svcxprt_rdma *rdma = cc->cc_rdma; + struct svc_xprt *xprt = &rdma->sc_xprt; + struct ib_send_wr *first_wr, *bad_wr; + struct list_head *tmp; + struct ib_cqe *cqe; + int ret; + + first_wr = NULL; + cqe = &cc->cc_cqe; + list_for_each(tmp, &cc->cc_rwctxts) { + struct svc_rdma_rw_ctxt *ctxt; + + ctxt = list_entry(tmp, struct svc_rdma_rw_ctxt, rw_list); + first_wr = rdma_rw_ctx_wrs(&ctxt->rw_ctx, rdma->sc_qp, + rdma->sc_port_num, cqe, first_wr); + cqe = NULL; + } + + do { + if (atomic_sub_return(cc->cc_sqecount, + &rdma->sc_sq_avail) > 0) { + ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr); + if (ret) + break; + return 0; + } + + atomic_inc(&rdma_stat_sq_starve); + atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail); + wait_event(rdma->sc_send_wait, + atomic_read(&rdma->sc_sq_avail) > cc->cc_sqecount); + } while (1); + + pr_err("svcrdma: ib_post_send failed (%d)\n", ret); + set_bit(XPT_CLOSE, &xprt->xpt_flags); + + /* If even one was posted, there will be a completion. */ + if (bad_wr != first_wr) + return 0; + + atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail); + wake_up(&rdma->sc_send_wait); + return -ENOTCONN; +} + +/* Build and DMA-map an SGL that covers one kvec in an xdr_buf + */ +static void svc_rdma_vec_to_sg(struct svc_rdma_write_info *info, + unsigned int len, + struct svc_rdma_rw_ctxt *ctxt) +{ + struct scatterlist *sg = ctxt->rw_sg_table.sgl; + + sg_set_buf(&sg[0], info->wi_base, len); + info->wi_base += len; + + ctxt->rw_nents = 1; +} + +/* Build and DMA-map an SGL that covers part of an xdr_buf's pagelist. + */ +static void svc_rdma_pagelist_to_sg(struct svc_rdma_write_info *info, + unsigned int remaining, + struct svc_rdma_rw_ctxt *ctxt) +{ + unsigned int sge_no, sge_bytes, page_off, page_no; + struct xdr_buf *xdr = info->wi_xdr; + struct scatterlist *sg; + struct page **page; + + page_off = (info->wi_next_off + xdr->page_base) & ~PAGE_MASK; + page_no = (info->wi_next_off + xdr->page_base) >> PAGE_SHIFT; + page = xdr->pages + page_no; + info->wi_next_off += remaining; + sg = ctxt->rw_sg_table.sgl; + sge_no = 0; + do { + sge_bytes = min_t(unsigned int, remaining, + PAGE_SIZE - page_off); + sg_set_page(sg, *page, sge_bytes, page_off); + + remaining -= sge_bytes; + sg = sg_next(sg); + page_off = 0; + sge_no++; + page++; + } while (remaining); + + ctxt->rw_nents = sge_no; +} + +/* Construct RDMA Write WRs to send a portion of an xdr_buf containing + * an RPC Reply. + */ +static int +svc_rdma_build_writes(struct svc_rdma_write_info *info, + void (*constructor)(struct svc_rdma_write_info *info, + unsigned int len, + struct svc_rdma_rw_ctxt *ctxt), + unsigned int remaining) +{ + struct svc_rdma_chunk_ctxt *cc = &info->wi_cc; + struct svcxprt_rdma *rdma = cc->cc_rdma; + struct svc_rdma_rw_ctxt *ctxt; + __be32 *seg; + int ret; + + cc->cc_cqe.done = svc_rdma_write_done; + seg = info->wi_segs + info->wi_seg_no * rpcrdma_segment_maxsz; + do { + unsigned int write_len; + u32 seg_length, seg_handle; + u64 seg_offset; + + if (info->wi_seg_no >= info->wi_nsegs) + goto out_overflow; + + seg_handle = be32_to_cpup(seg); + seg_length = be32_to_cpup(seg + 1); + xdr_decode_hyper(seg + 2, &seg_offset); + seg_offset += info->wi_seg_off; + + write_len = min(remaining, seg_length - info->wi_seg_off); + ctxt = svc_rdma_get_rw_ctxt(rdma, + (write_len >> PAGE_SHIFT) + 2); + if (!ctxt) + goto out_noctx; + + constructor(info, write_len, ctxt); + ret = rdma_rw_ctx_init(&ctxt->rw_ctx, rdma->sc_qp, + rdma->sc_port_num, ctxt->rw_sg_table.sgl, + ctxt->rw_nents, 0, seg_offset, + seg_handle, DMA_TO_DEVICE); + if (ret < 0) + goto out_initerr; + + list_add(&ctxt->rw_list, &cc->cc_rwctxts); + cc->cc_sqecount += ret; + if (write_len == seg_length - info->wi_seg_off) { + seg += 4; + info->wi_seg_no++; + info->wi_seg_off = 0; + } else { + info->wi_seg_off += write_len; + } + remaining -= write_len; + } while (remaining); + + return 0; + +out_overflow: + dprintk("svcrdma: inadequate space in Write chunk (%u)\n", + info->wi_nsegs); + return -E2BIG; + +out_noctx: + dprintk("svcrdma: no R/W ctxs available\n"); + return -ENOMEM; + +out_initerr: + svc_rdma_put_rw_ctxt(rdma, ctxt); + pr_err("svcrdma: failed to map pagelist (%d)\n", ret); + return -EIO; +} + +/* Send one of an xdr_buf's kvecs by itself. To send a Reply + * chunk, the whole RPC Reply is written back to the client. + * This function writes either the head or tail of the xdr_buf + * containing the Reply. + */ +static int svc_rdma_send_xdr_kvec(struct svc_rdma_write_info *info, + struct kvec *vec) +{ + info->wi_base = vec->iov_base; + return svc_rdma_build_writes(info, svc_rdma_vec_to_sg, + vec->iov_len); +} + +/* Send an xdr_buf's page list by itself. A Write chunk is + * just the page list. a Reply chunk is the head, page list, + * and tail. This function is shared between the two types + * of chunk. + */ +static int svc_rdma_send_xdr_pagelist(struct svc_rdma_write_info *info, + struct xdr_buf *xdr) +{ + info->wi_xdr = xdr; + info->wi_next_off = 0; + return svc_rdma_build_writes(info, svc_rdma_pagelist_to_sg, + xdr->page_len); +} + +/** + * svc_rdma_send_write_chunk - Write all segments in a Write chunk + * @rdma: controlling RDMA transport + * @wr_ch: Write chunk provided by client + * @xdr: xdr_buf containing the data payload + * + * Returns a non-negative number of bytes the chunk consumed, or + * %-E2BIG if the payload was larger than the Write chunk, + * %-ENOMEM if rdma_rw context pool was exhausted, + * %-ENOTCONN if posting failed (connection is lost), + * %-EIO if rdma_rw initialization failed (DMA mapping, etc). + */ +int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, __be32 *wr_ch, + struct xdr_buf *xdr) +{ + struct svc_rdma_write_info *info; + int ret; + + if (!xdr->page_len) + return 0; + + info = svc_rdma_write_info_alloc(rdma, wr_ch); + if (!info) + return -ENOMEM; + + ret = svc_rdma_send_xdr_pagelist(info, xdr); + if (ret < 0) + goto out_err; + + ret = svc_rdma_post_chunk_ctxt(&info->wi_cc); + if (ret < 0) + goto out_err; + return xdr->page_len; + +out_err: + svc_rdma_write_info_free(info); + return ret; +} + +/** + * svc_rdma_send_reply_chunk - Write all segments in the Reply chunk + * @rdma: controlling RDMA transport + * @rp_ch: Reply chunk provided by client + * @writelist: true if client provided a Write list + * @xdr: xdr_buf containing an RPC Reply + * + * Returns a non-negative number of bytes the chunk consumed, or + * %-E2BIG if the payload was larger than the Reply chunk, + * %-ENOMEM if rdma_rw context pool was exhausted, + * %-ENOTCONN if posting failed (connection is lost), + * %-EIO if rdma_rw initialization failed (DMA mapping, etc). + */ +int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, __be32 *rp_ch, + bool writelist, struct xdr_buf *xdr) +{ + struct svc_rdma_write_info *info; + int consumed, ret; + + info = svc_rdma_write_info_alloc(rdma, rp_ch); + if (!info) + return -ENOMEM; + + ret = svc_rdma_send_xdr_kvec(info, &xdr->head[0]); + if (ret < 0) + goto out_err; + consumed = xdr->head[0].iov_len; + + /* Send the page list in the Reply chunk only if the + * client did not provide Write chunks. + */ + if (!writelist && xdr->page_len) { + ret = svc_rdma_send_xdr_pagelist(info, xdr); + if (ret < 0) + goto out_err; + consumed += xdr->page_len; + } + + if (xdr->tail[0].iov_len) { + ret = svc_rdma_send_xdr_kvec(info, &xdr->tail[0]); + if (ret < 0) + goto out_err; + consumed += xdr->tail[0].iov_len; + } + + ret = svc_rdma_post_chunk_ctxt(&info->wi_cc); + if (ret < 0) + goto out_err; + return consumed; + +out_err: + svc_rdma_write_info_free(info); + return ret; +} diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c index 515221b16d09..1736337f3a55 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c +++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c @@ -1,4 +1,5 @@ /* + * Copyright (c) 2016 Oracle. All rights reserved. * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved. * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved. * @@ -40,6 +41,63 @@ * Author: Tom Tucker <tom@opengridcomputing.com> */ +/* Operation + * + * The main entry point is svc_rdma_sendto. This is called by the + * RPC server when an RPC Reply is ready to be transmitted to a client. + * + * The passed-in svc_rqst contains a struct xdr_buf which holds an + * XDR-encoded RPC Reply message. sendto must construct the RPC-over-RDMA + * transport header, post all Write WRs needed for this Reply, then post + * a Send WR conveying the transport header and the RPC message itself to + * the client. + * + * svc_rdma_sendto must fully transmit the Reply before returning, as + * the svc_rqst will be recycled as soon as sendto returns. Remaining + * resources referred to by the svc_rqst are also recycled at that time. + * Therefore any resources that must remain longer must be detached + * from the svc_rqst and released later. + * + * Page Management + * + * The I/O that performs Reply transmission is asynchronous, and may + * complete well after sendto returns. Thus pages under I/O must be + * removed from the svc_rqst before sendto returns. + * + * The logic here depends on Send Queue and completion ordering. Since + * the Send WR is always posted last, it will always complete last. Thus + * when it completes, it is guaranteed that all previous Write WRs have + * also completed. + * + * Write WRs are constructed and posted. Each Write segment gets its own + * svc_rdma_rw_ctxt, allowing the Write completion handler to find and + * DMA-unmap the pages under I/O for that Write segment. The Write + * completion handler does not release any pages. + * + * When the Send WR is constructed, it also gets its own svc_rdma_op_ctxt. + * The ownership of all of the Reply's pages are transferred into that + * ctxt, the Send WR is posted, and sendto returns. + * + * The svc_rdma_op_ctxt is presented when the Send WR completes. The + * Send completion handler finally releases the Reply's pages. + * + * This mechanism also assumes that completions on the transport's Send + * Completion Queue do not run in parallel. Otherwise a Write completion + * and Send completion running at the same time could release pages that + * are still DMA-mapped. + * + * Error Handling + * + * - If the Send WR is posted successfully, it will either complete + * successfully, or get flushed. Either way, the Send completion + * handler releases the Reply's pages. + * - If the Send WR cannot be not posted, the forward path releases + * the Reply's pages. + * + * This handles the case, without the use of page reference counting, + * where two different Write segments send portions of the same page. + */ + #include <linux/sunrpc/debug.h> #include <linux/sunrpc/rpc_rdma.h> #include <linux/spinlock.h> @@ -55,113 +113,141 @@ static u32 xdr_padsize(u32 len) return (len & 3) ? (4 - (len & 3)) : 0; } -int svc_rdma_map_xdr(struct svcxprt_rdma *xprt, - struct xdr_buf *xdr, - struct svc_rdma_req_map *vec, - bool write_chunk_present) +/* Returns length of transport header, in bytes. + */ +static unsigned int svc_rdma_reply_hdr_len(__be32 *rdma_resp) { - int sge_no; - u32 sge_bytes; - u32 page_bytes; - u32 page_off; - int page_no; - - if (xdr->len != - (xdr->head[0].iov_len + xdr->page_len + xdr->tail[0].iov_len)) { - pr_err("svcrdma: %s: XDR buffer length error\n", __func__); - return -EIO; - } + unsigned int nsegs; + __be32 *p; - /* Skip the first sge, this is for the RPCRDMA header */ - sge_no = 1; + p = rdma_resp; + + /* RPC-over-RDMA V1 replies never have a Read list. */ + p += rpcrdma_fixed_maxsz + 1; - /* Head SGE */ - vec->sge[sge_no].iov_base = xdr->head[0].iov_base; - vec->sge[sge_no].iov_len = xdr->head[0].iov_len; - sge_no++; - - /* pages SGE */ - page_no = 0; - page_bytes = xdr->page_len; - page_off = xdr->page_base; - while (page_bytes) { - vec->sge[sge_no].iov_base = - page_address(xdr->pages[page_no]) + page_off; - sge_bytes = min_t(u32, page_bytes, (PAGE_SIZE - page_off)); - page_bytes -= sge_bytes; - vec->sge[sge_no].iov_len = sge_bytes; - - sge_no++; - page_no++; - page_off = 0; /* reset for next time through loop */ + /* Skip Write list. */ + while (*p++ != xdr_zero) { + nsegs = be32_to_cpup(p++); + p += nsegs * rpcrdma_segment_maxsz; } - /* Tail SGE */ - if (xdr->tail[0].iov_len) { - unsigned char *base = xdr->tail[0].iov_base; - size_t len = xdr->tail[0].iov_len; - u32 xdr_pad = xdr_padsize(xdr->page_len); + /* Skip Reply chunk. */ + if (*p++ != xdr_zero) { + nsegs = be32_to_cpup(p++); + p += nsegs * rpcrdma_segment_maxsz; + } - if (write_chunk_present && xdr_pad) { - base += xdr_pad; - len -= xdr_pad; - } + return (unsigned long)p - (unsigned long)rdma_resp; +} - if (len) { - vec->sge[sge_no].iov_base = base; - vec->sge[sge_no].iov_len = len; - sge_no++; +/* One Write chunk is copied from Call transport header to Reply + * transport header. Each segment's length field is updated to + * reflect number of bytes consumed in the segment. + * + * Returns number of segments in this chunk. + */ +static unsigned int xdr_encode_write_chunk(__be32 *dst, __be32 *src, + unsigned int remaining) +{ + unsigned int i, nsegs; + u32 seg_len; + + /* Write list discriminator */ + *dst++ = *src++; + + /* number of segments in this chunk */ + nsegs = be32_to_cpup(src); + *dst++ = *src++; + + for (i = nsegs; i; i--) { + /* segment's RDMA handle */ + *dst++ = *src++; + + /* bytes returned in this segment */ + seg_len = be32_to_cpu(*src); + if (remaining >= seg_len) { + /* entire segment was consumed */ + *dst = *src; + remaining -= seg_len; + } else { + /* segment only partly filled */ + *dst = cpu_to_be32(remaining); + remaining = 0; } - } + dst++; src++; - dprintk("svcrdma: %s: sge_no %d page_no %d " - "page_base %u page_len %u head_len %zu tail_len %zu\n", - __func__, sge_no, page_no, xdr->page_base, xdr->page_len, - xdr->head[0].iov_len, xdr->tail[0].iov_len); + /* segment's RDMA offset */ + *dst++ = *src++; + *dst++ = *src++; + } - vec->count = sge_no; - return 0; + return nsegs; } -static dma_addr_t dma_map_xdr(struct svcxprt_rdma *xprt, - struct xdr_buf *xdr, - u32 xdr_off, size_t len, int dir) +/* The client provided a Write list in the Call message. Fill in + * the segments in the first Write chunk in the Reply's transport + * header with the number of bytes consumed in each segment. + * Remaining chunks are returned unused. + * + * Assumptions: + * - Client has provided only one Write chunk + */ +static void svc_rdma_xdr_encode_write_list(__be32 *rdma_resp, __be32 *wr_ch, + unsigned int consumed) { - struct page *page; - dma_addr_t dma_addr; - if (xdr_off < xdr->head[0].iov_len) { - /* This offset is in the head */ - xdr_off += (unsigned long)xdr->head[0].iov_base & ~PAGE_MASK; - page = virt_to_page(xdr->head[0].iov_base); - } else { - xdr_off -= xdr->head[0].iov_len; - if (xdr_off < xdr->page_len) { - /* This offset is in the page list */ - xdr_off += xdr->page_base; - page = xdr->pages[xdr_off >> PAGE_SHIFT]; - xdr_off &= ~PAGE_MASK; - } else { - /* This offset is in the tail */ - xdr_off -= xdr->page_len; - xdr_off += (unsigned long) - xdr->tail[0].iov_base & ~PAGE_MASK; - page = virt_to_page(xdr->tail[0].iov_base); - } + unsigned int nsegs; + __be32 *p, *q; + + /* RPC-over-RDMA V1 replies never have a Read list. */ + p = rdma_resp + rpcrdma_fixed_maxsz + 1; + + q = wr_ch; + while (*q != xdr_zero) { + nsegs = xdr_encode_write_chunk(p, q, consumed); + q += 2 + nsegs * rpcrdma_segment_maxsz; + p += 2 + nsegs * rpcrdma_segment_maxsz; + consumed = 0; } - dma_addr = ib_dma_map_page(xprt->sc_cm_id->device, page, xdr_off, - min_t(size_t, PAGE_SIZE, len), dir); - return dma_addr; + + /* Terminate Write list */ + *p++ = xdr_zero; + + /* Reply chunk discriminator; may be replaced later */ + *p = xdr_zero; +} + +/* The client provided a Reply chunk in the Call message. Fill in + * the segments in the Reply chunk in the Reply message with the + * number of bytes consumed in each segment. + * + * Assumptions: + * - Reply can always fit in the provided Reply chunk + */ +static void svc_rdma_xdr_encode_reply_chunk(__be32 *rdma_resp, __be32 *rp_ch, + unsigned int consumed) +{ + __be32 *p; + + /* Find the Reply chunk in the Reply's xprt header. + * RPC-over-RDMA V1 replies never have a Read list. + */ + p = rdma_resp + rpcrdma_fixed_maxsz + 1; + + /* Skip past Write list */ + while (*p++ != xdr_zero) + p += 1 + be32_to_cpup(p) * rpcrdma_segment_maxsz; + + xdr_encode_write_chunk(p, rp_ch, consumed); } /* Parse the RPC Call's transport header. */ -static void svc_rdma_get_write_arrays(struct rpcrdma_msg *rmsgp, - struct rpcrdma_write_array **write, - struct rpcrdma_write_array **reply) +static void svc_rdma_get_write_arrays(__be32 *rdma_argp, + __be32 **write, __be32 **reply) { __be32 *p; - p = (__be32 *)&rmsgp->rm_body.rm_chunks[0]; + p = rdma_argp + rpcrdma_fixed_maxsz; /* Read list */ while (*p++ != xdr_zero) @@ -169,7 +255,7 @@ static void svc_rdma_get_write_arrays(struct rpcrdma_msg *rmsgp, /* Write list */ if (*p != xdr_zero) { - *write = (struct rpcrdma_write_array *)p; + *write = p; while (*p++ != xdr_zero) p += 1 + be32_to_cpu(*p) * 4; } else { @@ -179,7 +265,7 @@ static void svc_rdma_get_write_arrays(struct rpcrdma_msg *rmsgp, /* Reply chunk */ if (*p != xdr_zero) - *reply = (struct rpcrdma_write_array *)p; + *reply = p; else *reply = NULL; } @@ -189,360 +275,321 @@ static void svc_rdma_get_write_arrays(struct rpcrdma_msg *rmsgp, * Invalidate, and responder chooses one rkey to invalidate. * * Find a candidate rkey to invalidate when sending a reply. Picks the - * first rkey it finds in the chunks lists. + * first R_key it finds in the chunk lists. * * Returns zero if RPC's chunk lists are empty. */ -static u32 svc_rdma_get_inv_rkey(struct rpcrdma_msg *rdma_argp, - struct rpcrdma_write_array *wr_ary, - struct rpcrdma_write_array *rp_ary) +static u32 svc_rdma_get_inv_rkey(__be32 *rdma_argp, + __be32 *wr_lst, __be32 *rp_ch) { - struct rpcrdma_read_chunk *rd_ary; - struct rpcrdma_segment *arg_ch; + __be32 *p; - rd_ary = (struct rpcrdma_read_chunk *)&rdma_argp->rm_body.rm_chunks[0]; - if (rd_ary->rc_discrim != xdr_zero) - return be32_to_cpu(rd_ary->rc_target.rs_handle); + p = rdma_argp + rpcrdma_fixed_maxsz; + if (*p != xdr_zero) + p += 2; + else if (wr_lst && be32_to_cpup(wr_lst + 1)) + p = wr_lst + 2; + else if (rp_ch && be32_to_cpup(rp_ch + 1)) + p = rp_ch + 2; + else + return 0; + return be32_to_cpup(p); +} - if (wr_ary && be32_to_cpu(wr_ary->wc_nchunks)) { - arg_ch = &wr_ary->wc_array[0].wc_target; - return be32_to_cpu(arg_ch->rs_handle); - } +/* ib_dma_map_page() is used here because svc_rdma_dma_unmap() + * is used during completion to DMA-unmap this memory, and + * it uses ib_dma_unmap_page() exclusively. + */ +static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma, + struct svc_rdma_op_ctxt *ctxt, + unsigned int sge_no, + unsigned char *base, + unsigned int len) +{ + unsigned long offset = (unsigned long)base & ~PAGE_MASK; + struct ib_device *dev = rdma->sc_cm_id->device; + dma_addr_t dma_addr; - if (rp_ary && be32_to_cpu(rp_ary->wc_nchunks)) { - arg_ch = &rp_ary->wc_array[0].wc_target; - return be32_to_cpu(arg_ch->rs_handle); - } + dma_addr = ib_dma_map_page(dev, virt_to_page(base), + offset, len, DMA_TO_DEVICE); + if (ib_dma_mapping_error(dev, dma_addr)) + return -EIO; + ctxt->sge[sge_no].addr = dma_addr; + ctxt->sge[sge_no].length = len; + ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey; + svc_rdma_count_mappings(rdma, ctxt); return 0; } -/* Assumptions: - * - The specified write_len can be represented in sc_max_sge * PAGE_SIZE - */ -static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp, - u32 rmr, u64 to, - u32 xdr_off, int write_len, - struct svc_rdma_req_map *vec) +static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma, + struct svc_rdma_op_ctxt *ctxt, + unsigned int sge_no, + struct page *page, + unsigned int offset, + unsigned int len) { - struct ib_rdma_wr write_wr; - struct ib_sge *sge; - int xdr_sge_no; - int sge_no; - int sge_bytes; - int sge_off; - int bc; - struct svc_rdma_op_ctxt *ctxt; + struct ib_device *dev = rdma->sc_cm_id->device; + dma_addr_t dma_addr; - if (vec->count > RPCSVC_MAXPAGES) { - pr_err("svcrdma: Too many pages (%lu)\n", vec->count); + dma_addr = ib_dma_map_page(dev, page, offset, len, DMA_TO_DEVICE); + if (ib_dma_mapping_error(dev, dma_addr)) return -EIO; - } - dprintk("svcrdma: RDMA_WRITE rmr=%x, to=%llx, xdr_off=%d, " - "write_len=%d, vec->sge=%p, vec->count=%lu\n", - rmr, (unsigned long long)to, xdr_off, - write_len, vec->sge, vec->count); + ctxt->sge[sge_no].addr = dma_addr; + ctxt->sge[sge_no].length = len; + ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey; + svc_rdma_count_mappings(rdma, ctxt); + return 0; +} - ctxt = svc_rdma_get_context(xprt); +/** + * svc_rdma_map_reply_hdr - DMA map the transport header buffer + * @rdma: controlling transport + * @ctxt: op_ctxt for the Send WR + * @rdma_resp: buffer containing transport header + * @len: length of transport header + * + * Returns: + * %0 if the header is DMA mapped, + * %-EIO if DMA mapping failed. + */ +int svc_rdma_map_reply_hdr(struct svcxprt_rdma *rdma, + struct svc_rdma_op_ctxt *ctxt, + __be32 *rdma_resp, + unsigned int len) +{ ctxt->direction = DMA_TO_DEVICE; - sge = ctxt->sge; - - /* Find the SGE associated with xdr_off */ - for (bc = xdr_off, xdr_sge_no = 1; bc && xdr_sge_no < vec->count; - xdr_sge_no++) { - if (vec->sge[xdr_sge_no].iov_len > bc) - break; - bc -= vec->sge[xdr_sge_no].iov_len; - } - - sge_off = bc; - bc = write_len; - sge_no = 0; - - /* Copy the remaining SGE */ - while (bc != 0) { - sge_bytes = min_t(size_t, - bc, vec->sge[xdr_sge_no].iov_len-sge_off); - sge[sge_no].length = sge_bytes; - sge[sge_no].addr = - dma_map_xdr(xprt, &rqstp->rq_res, xdr_off, - sge_bytes, DMA_TO_DEVICE); - xdr_off += sge_bytes; - if (ib_dma_mapping_error(xprt->sc_cm_id->device, - sge[sge_no].addr)) - goto err; - svc_rdma_count_mappings(xprt, ctxt); - sge[sge_no].lkey = xprt->sc_pd->local_dma_lkey; - ctxt->count++; - sge_off = 0; - sge_no++; - xdr_sge_no++; - if (xdr_sge_no > vec->count) { - pr_err("svcrdma: Too many sges (%d)\n", xdr_sge_no); - goto err; - } - bc -= sge_bytes; - if (sge_no == xprt->sc_max_sge) - break; - } - - /* Prepare WRITE WR */ - memset(&write_wr, 0, sizeof write_wr); - ctxt->cqe.done = svc_rdma_wc_write; - write_wr.wr.wr_cqe = &ctxt->cqe; - write_wr.wr.sg_list = &sge[0]; - write_wr.wr.num_sge = sge_no; - write_wr.wr.opcode = IB_WR_RDMA_WRITE; - write_wr.wr.send_flags = IB_SEND_SIGNALED; - write_wr.rkey = rmr; - write_wr.remote_addr = to; - - /* Post It */ - atomic_inc(&rdma_stat_write); - if (svc_rdma_send(xprt, &write_wr.wr)) - goto err; - return write_len - bc; - err: - svc_rdma_unmap_dma(ctxt); - svc_rdma_put_context(ctxt, 0); - return -EIO; + ctxt->pages[0] = virt_to_page(rdma_resp); + ctxt->count = 1; + return svc_rdma_dma_map_page(rdma, ctxt, 0, ctxt->pages[0], 0, len); } -noinline -static int send_write_chunks(struct svcxprt_rdma *xprt, - struct rpcrdma_write_array *wr_ary, - struct rpcrdma_msg *rdma_resp, - struct svc_rqst *rqstp, - struct svc_rdma_req_map *vec) +/* Load the xdr_buf into the ctxt's sge array, and DMA map each + * element as it is added. + * + * Returns the number of sge elements loaded on success, or + * a negative errno on failure. + */ +static int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma, + struct svc_rdma_op_ctxt *ctxt, + struct xdr_buf *xdr, __be32 *wr_lst) { - u32 xfer_len = rqstp->rq_res.page_len; - int write_len; - u32 xdr_off; - int chunk_off; - int chunk_no; - int nchunks; - struct rpcrdma_write_array *res_ary; + unsigned int len, sge_no, remaining, page_off; + struct page **ppages; + unsigned char *base; + u32 xdr_pad; int ret; - res_ary = (struct rpcrdma_write_array *) - &rdma_resp->rm_body.rm_chunks[1]; - - /* Write chunks start at the pagelist */ - nchunks = be32_to_cpu(wr_ary->wc_nchunks); - for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0; - xfer_len && chunk_no < nchunks; - chunk_no++) { - struct rpcrdma_segment *arg_ch; - u64 rs_offset; - - arg_ch = &wr_ary->wc_array[chunk_no].wc_target; - write_len = min(xfer_len, be32_to_cpu(arg_ch->rs_length)); - - /* Prepare the response chunk given the length actually - * written */ - xdr_decode_hyper((__be32 *)&arg_ch->rs_offset, &rs_offset); - svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no, - arg_ch->rs_handle, - arg_ch->rs_offset, - write_len); - chunk_off = 0; - while (write_len) { - ret = send_write(xprt, rqstp, - be32_to_cpu(arg_ch->rs_handle), - rs_offset + chunk_off, - xdr_off, - write_len, - vec); - if (ret <= 0) - goto out_err; - chunk_off += ret; - xdr_off += ret; - xfer_len -= ret; - write_len -= ret; + sge_no = 1; + + ret = svc_rdma_dma_map_buf(rdma, ctxt, sge_no++, + xdr->head[0].iov_base, + xdr->head[0].iov_len); + if (ret < 0) + return ret; + + /* If a Write chunk is present, the xdr_buf's page list + * is not included inline. However the Upper Layer may + * have added XDR padding in the tail buffer, and that + * should not be included inline. + */ + if (wr_lst) { + base = xdr->tail[0].iov_base; + len = xdr->tail[0].iov_len; + xdr_pad = xdr_padsize(xdr->page_len); + + if (len && xdr_pad) { + base += xdr_pad; + len -= xdr_pad; } + + goto tail; + } + + ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT); + page_off = xdr->page_base & ~PAGE_MASK; + remaining = xdr->page_len; + while (remaining) { + len = min_t(u32, PAGE_SIZE - page_off, remaining); + + ret = svc_rdma_dma_map_page(rdma, ctxt, sge_no++, + *ppages++, page_off, len); + if (ret < 0) + return ret; + + remaining -= len; + page_off = 0; } - /* Update the req with the number of chunks actually used */ - svc_rdma_xdr_encode_write_list(rdma_resp, chunk_no); - return rqstp->rq_res.page_len; + base = xdr->tail[0].iov_base; + len = xdr->tail[0].iov_len; +tail: + if (len) { + ret = svc_rdma_dma_map_buf(rdma, ctxt, sge_no++, base, len); + if (ret < 0) + return ret; + } -out_err: - pr_err("svcrdma: failed to send write chunks, rc=%d\n", ret); - return -EIO; + return sge_no - 1; } -noinline -static int send_reply_chunks(struct svcxprt_rdma *xprt, - struct rpcrdma_write_array *rp_ary, - struct rpcrdma_msg *rdma_resp, - struct svc_rqst *rqstp, - struct svc_rdma_req_map *vec) +/* The svc_rqst and all resources it owns are released as soon as + * svc_rdma_sendto returns. Transfer pages under I/O to the ctxt + * so they are released by the Send completion handler. + */ +static void svc_rdma_save_io_pages(struct svc_rqst *rqstp, + struct svc_rdma_op_ctxt *ctxt) { - u32 xfer_len = rqstp->rq_res.len; - int write_len; - u32 xdr_off; - int chunk_no; - int chunk_off; - int nchunks; - struct rpcrdma_segment *ch; - struct rpcrdma_write_array *res_ary; - int ret; + int i, pages = rqstp->rq_next_page - rqstp->rq_respages; - /* XXX: need to fix when reply lists occur with read-list and or - * write-list */ - res_ary = (struct rpcrdma_write_array *) - &rdma_resp->rm_body.rm_chunks[2]; - - /* xdr offset starts at RPC message */ - nchunks = be32_to_cpu(rp_ary->wc_nchunks); - for (xdr_off = 0, chunk_no = 0; - xfer_len && chunk_no < nchunks; - chunk_no++) { - u64 rs_offset; - ch = &rp_ary->wc_array[chunk_no].wc_target; - write_len = min(xfer_len, be32_to_cpu(ch->rs_length)); - - /* Prepare the reply chunk given the length actually - * written */ - xdr_decode_hyper((__be32 *)&ch->rs_offset, &rs_offset); - svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no, - ch->rs_handle, ch->rs_offset, - write_len); - chunk_off = 0; - while (write_len) { - ret = send_write(xprt, rqstp, - be32_to_cpu(ch->rs_handle), - rs_offset + chunk_off, - xdr_off, - write_len, - vec); - if (ret <= 0) - goto out_err; - chunk_off += ret; - xdr_off += ret; - xfer_len -= ret; - write_len -= ret; - } + ctxt->count += pages; + for (i = 0; i < pages; i++) { + ctxt->pages[i + 1] = rqstp->rq_respages[i]; + rqstp->rq_respages[i] = NULL; } - /* Update the req with the number of chunks actually used */ - svc_rdma_xdr_encode_reply_array(res_ary, chunk_no); + rqstp->rq_next_page = rqstp->rq_respages + 1; +} - return rqstp->rq_res.len; +/** + * svc_rdma_post_send_wr - Set up and post one Send Work Request + * @rdma: controlling transport + * @ctxt: op_ctxt for transmitting the Send WR + * @num_sge: number of SGEs to send + * @inv_rkey: R_key argument to Send With Invalidate, or zero + * + * Returns: + * %0 if the Send* was posted successfully, + * %-ENOTCONN if the connection was lost or dropped, + * %-EINVAL if there was a problem with the Send we built, + * %-ENOMEM if ib_post_send failed. + */ +int svc_rdma_post_send_wr(struct svcxprt_rdma *rdma, + struct svc_rdma_op_ctxt *ctxt, int num_sge, + u32 inv_rkey) +{ + struct ib_send_wr *send_wr = &ctxt->send_wr; -out_err: - pr_err("svcrdma: failed to send reply chunks, rc=%d\n", ret); - return -EIO; + dprintk("svcrdma: posting Send WR with %u sge(s)\n", num_sge); + + send_wr->next = NULL; + ctxt->cqe.done = svc_rdma_wc_send; + send_wr->wr_cqe = &ctxt->cqe; + send_wr->sg_list = ctxt->sge; + send_wr->num_sge = num_sge; + send_wr->send_flags = IB_SEND_SIGNALED; + if (inv_rkey) { + send_wr->opcode = IB_WR_SEND_WITH_INV; + send_wr->ex.invalidate_rkey = inv_rkey; + } else { + send_wr->opcode = IB_WR_SEND; + } + + return svc_rdma_send(rdma, send_wr); } -/* This function prepares the portion of the RPCRDMA message to be - * sent in the RDMA_SEND. This function is called after data sent via - * RDMA has already been transmitted. There are three cases: - * - The RPCRDMA header, RPC header, and payload are all sent in a - * single RDMA_SEND. This is the "inline" case. - * - The RPCRDMA header and some portion of the RPC header and data - * are sent via this RDMA_SEND and another portion of the data is - * sent via RDMA. - * - The RPCRDMA header [NOMSG] is sent in this RDMA_SEND and the RPC - * header and data are all transmitted via RDMA. - * In all three cases, this function prepares the RPCRDMA header in - * sge[0], the 'type' parameter indicates the type to place in the - * RPCRDMA header, and the 'byte_count' field indicates how much of - * the XDR to include in this RDMA_SEND. NB: The offset of the payload - * to send is zero in the XDR. +/* Prepare the portion of the RPC Reply that will be transmitted + * via RDMA Send. The RPC-over-RDMA transport header is prepared + * in sge[0], and the RPC xdr_buf is prepared in following sges. + * + * Depending on whether a Write list or Reply chunk is present, + * the server may send all, a portion of, or none of the xdr_buf. + * In the latter case, only the transport header (sge[0]) is + * transmitted. + * + * RDMA Send is the last step of transmitting an RPC reply. Pages + * involved in the earlier RDMA Writes are here transferred out + * of the rqstp and into the ctxt's page array. These pages are + * DMA unmapped by each Write completion, but the subsequent Send + * completion finally releases these pages. + * + * Assumptions: + * - The Reply's transport header will never be larger than a page. */ -static int send_reply(struct svcxprt_rdma *rdma, - struct svc_rqst *rqstp, - struct page *page, - struct rpcrdma_msg *rdma_resp, - struct svc_rdma_req_map *vec, - int byte_count, - u32 inv_rkey) +static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma, + __be32 *rdma_argp, __be32 *rdma_resp, + struct svc_rqst *rqstp, + __be32 *wr_lst, __be32 *rp_ch) { struct svc_rdma_op_ctxt *ctxt; - struct ib_send_wr send_wr; - u32 xdr_off; - int sge_no; - int sge_bytes; - int page_no; - int pages; - int ret = -EIO; - - /* Prepare the context */ + u32 inv_rkey; + int ret; + + dprintk("svcrdma: sending %s reply: head=%zu, pagelen=%u, tail=%zu\n", + (rp_ch ? "RDMA_NOMSG" : "RDMA_MSG"), + rqstp->rq_res.head[0].iov_len, + rqstp->rq_res.page_len, + rqstp->rq_res.tail[0].iov_len); + ctxt = svc_rdma_get_context(rdma); - ctxt->direction = DMA_TO_DEVICE; - ctxt->pages[0] = page; - ctxt->count = 1; - /* Prepare the SGE for the RPCRDMA Header */ - ctxt->sge[0].lkey = rdma->sc_pd->local_dma_lkey; - ctxt->sge[0].length = - svc_rdma_xdr_get_reply_hdr_len((__be32 *)rdma_resp); - ctxt->sge[0].addr = - ib_dma_map_page(rdma->sc_cm_id->device, page, 0, - ctxt->sge[0].length, DMA_TO_DEVICE); - if (ib_dma_mapping_error(rdma->sc_cm_id->device, ctxt->sge[0].addr)) + ret = svc_rdma_map_reply_hdr(rdma, ctxt, rdma_resp, + svc_rdma_reply_hdr_len(rdma_resp)); + if (ret < 0) goto err; - svc_rdma_count_mappings(rdma, ctxt); - - ctxt->direction = DMA_TO_DEVICE; - /* Map the payload indicated by 'byte_count' */ - xdr_off = 0; - for (sge_no = 1; byte_count && sge_no < vec->count; sge_no++) { - sge_bytes = min_t(size_t, vec->sge[sge_no].iov_len, byte_count); - byte_count -= sge_bytes; - ctxt->sge[sge_no].addr = - dma_map_xdr(rdma, &rqstp->rq_res, xdr_off, - sge_bytes, DMA_TO_DEVICE); - xdr_off += sge_bytes; - if (ib_dma_mapping_error(rdma->sc_cm_id->device, - ctxt->sge[sge_no].addr)) + if (!rp_ch) { + ret = svc_rdma_map_reply_msg(rdma, ctxt, + &rqstp->rq_res, wr_lst); + if (ret < 0) goto err; - svc_rdma_count_mappings(rdma, ctxt); - ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey; - ctxt->sge[sge_no].length = sge_bytes; } - if (byte_count != 0) { - pr_err("svcrdma: Could not map %d bytes\n", byte_count); + + svc_rdma_save_io_pages(rqstp, ctxt); + + inv_rkey = 0; + if (rdma->sc_snd_w_inv) + inv_rkey = svc_rdma_get_inv_rkey(rdma_argp, wr_lst, rp_ch); + ret = svc_rdma_post_send_wr(rdma, ctxt, 1 + ret, inv_rkey); + if (ret) goto err; - } - /* Save all respages in the ctxt and remove them from the - * respages array. They are our pages until the I/O - * completes. + return 0; + +err: + pr_err("svcrdma: failed to post Send WR (%d)\n", ret); + svc_rdma_unmap_dma(ctxt); + svc_rdma_put_context(ctxt, 1); + return ret; +} + +/* Given the client-provided Write and Reply chunks, the server was not + * able to form a complete reply. Return an RDMA_ERROR message so the + * client can retire this RPC transaction. As above, the Send completion + * routine releases payload pages that were part of a previous RDMA Write. + * + * Remote Invalidation is skipped for simplicity. + */ +static int svc_rdma_send_error_msg(struct svcxprt_rdma *rdma, + __be32 *rdma_resp, struct svc_rqst *rqstp) +{ + struct svc_rdma_op_ctxt *ctxt; + __be32 *p; + int ret; + + ctxt = svc_rdma_get_context(rdma); + + /* Replace the original transport header with an + * RDMA_ERROR response. XID etc are preserved. */ - pages = rqstp->rq_next_page - rqstp->rq_respages; - for (page_no = 0; page_no < pages; page_no++) { - ctxt->pages[page_no+1] = rqstp->rq_respages[page_no]; - ctxt->count++; - rqstp->rq_respages[page_no] = NULL; - } - rqstp->rq_next_page = rqstp->rq_respages + 1; + p = rdma_resp + 3; + *p++ = rdma_error; + *p = err_chunk; - if (sge_no > rdma->sc_max_sge) { - pr_err("svcrdma: Too many sges (%d)\n", sge_no); + ret = svc_rdma_map_reply_hdr(rdma, ctxt, rdma_resp, 20); + if (ret < 0) goto err; - } - memset(&send_wr, 0, sizeof send_wr); - ctxt->cqe.done = svc_rdma_wc_send; - send_wr.wr_cqe = &ctxt->cqe; - send_wr.sg_list = ctxt->sge; - send_wr.num_sge = sge_no; - if (inv_rkey) { - send_wr.opcode = IB_WR_SEND_WITH_INV; - send_wr.ex.invalidate_rkey = inv_rkey; - } else - send_wr.opcode = IB_WR_SEND; - send_wr.send_flags = IB_SEND_SIGNALED; - ret = svc_rdma_send(rdma, &send_wr); + svc_rdma_save_io_pages(rqstp, ctxt); + + ret = svc_rdma_post_send_wr(rdma, ctxt, 1 + ret, 0); if (ret) goto err; return 0; - err: +err: + pr_err("svcrdma: failed to post Send WR (%d)\n", ret); svc_rdma_unmap_dma(ctxt); svc_rdma_put_context(ctxt, 1); return ret; @@ -552,39 +599,36 @@ void svc_rdma_prep_reply_hdr(struct svc_rqst *rqstp) { } +/** + * svc_rdma_sendto - Transmit an RPC reply + * @rqstp: processed RPC request, reply XDR already in ::rq_res + * + * Any resources still associated with @rqstp are released upon return. + * If no reply message was possible, the connection is closed. + * + * Returns: + * %0 if an RPC reply has been successfully posted, + * %-ENOMEM if a resource shortage occurred (connection is lost), + * %-ENOTCONN if posting failed (connection is lost). + */ int svc_rdma_sendto(struct svc_rqst *rqstp) { struct svc_xprt *xprt = rqstp->rq_xprt; struct svcxprt_rdma *rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt); - struct rpcrdma_msg *rdma_argp; - struct rpcrdma_msg *rdma_resp; - struct rpcrdma_write_array *wr_ary, *rp_ary; - int ret; - int inline_bytes; + __be32 *p, *rdma_argp, *rdma_resp, *wr_lst, *rp_ch; + struct xdr_buf *xdr = &rqstp->rq_res; struct page *res_page; - struct svc_rdma_req_map *vec; - u32 inv_rkey; - __be32 *p; - - dprintk("svcrdma: sending response for rqstp=%p\n", rqstp); + int ret; - /* Get the RDMA request header. The receive logic always - * places this at the start of page 0. + /* Find the call's chunk lists to decide how to send the reply. + * Receive places the Call's xprt header at the start of page 0. */ rdma_argp = page_address(rqstp->rq_pages[0]); - svc_rdma_get_write_arrays(rdma_argp, &wr_ary, &rp_ary); - - inv_rkey = 0; - if (rdma->sc_snd_w_inv) - inv_rkey = svc_rdma_get_inv_rkey(rdma_argp, wr_ary, rp_ary); + svc_rdma_get_write_arrays(rdma_argp, &wr_lst, &rp_ch); - /* Build an req vec for the XDR */ - vec = svc_rdma_get_req_map(rdma); - ret = svc_rdma_map_xdr(rdma, &rqstp->rq_res, vec, wr_ary != NULL); - if (ret) - goto err0; - inline_bytes = rqstp->rq_res.len; + dprintk("svcrdma: preparing response for XID 0x%08x\n", + be32_to_cpup(rdma_argp)); /* Create the RDMA response header. xprt->xpt_mutex, * acquired in svc_send(), serializes RPC replies. The @@ -598,115 +642,57 @@ int svc_rdma_sendto(struct svc_rqst *rqstp) goto err0; rdma_resp = page_address(res_page); - p = &rdma_resp->rm_xid; - *p++ = rdma_argp->rm_xid; - *p++ = rdma_argp->rm_vers; + p = rdma_resp; + *p++ = *rdma_argp; + *p++ = *(rdma_argp + 1); *p++ = rdma->sc_fc_credits; - *p++ = rp_ary ? rdma_nomsg : rdma_msg; + *p++ = rp_ch ? rdma_nomsg : rdma_msg; /* Start with empty chunks */ *p++ = xdr_zero; *p++ = xdr_zero; *p = xdr_zero; - /* Send any write-chunk data and build resp write-list */ - if (wr_ary) { - ret = send_write_chunks(rdma, wr_ary, rdma_resp, rqstp, vec); + if (wr_lst) { + /* XXX: Presume the client sent only one Write chunk */ + ret = svc_rdma_send_write_chunk(rdma, wr_lst, xdr); if (ret < 0) - goto err1; - inline_bytes -= ret + xdr_padsize(ret); + goto err2; + svc_rdma_xdr_encode_write_list(rdma_resp, wr_lst, ret); } - - /* Send any reply-list data and update resp reply-list */ - if (rp_ary) { - ret = send_reply_chunks(rdma, rp_ary, rdma_resp, rqstp, vec); + if (rp_ch) { + ret = svc_rdma_send_reply_chunk(rdma, rp_ch, wr_lst, xdr); if (ret < 0) - goto err1; - inline_bytes -= ret; + goto err2; + svc_rdma_xdr_encode_reply_chunk(rdma_resp, rp_ch, ret); } - /* Post a fresh Receive buffer _before_ sending the reply */ ret = svc_rdma_post_recv(rdma, GFP_KERNEL); if (ret) goto err1; - - ret = send_reply(rdma, rqstp, res_page, rdma_resp, vec, - inline_bytes, inv_rkey); + ret = svc_rdma_send_reply_msg(rdma, rdma_argp, rdma_resp, rqstp, + wr_lst, rp_ch); if (ret < 0) goto err0; + return 0; - svc_rdma_put_req_map(rdma, vec); - dprintk("svcrdma: send_reply returns %d\n", ret); - return ret; + err2: + if (ret != -E2BIG) + goto err1; + + ret = svc_rdma_post_recv(rdma, GFP_KERNEL); + if (ret) + goto err1; + ret = svc_rdma_send_error_msg(rdma, rdma_resp, rqstp); + if (ret < 0) + goto err0; + return 0; err1: put_page(res_page); err0: - svc_rdma_put_req_map(rdma, vec); pr_err("svcrdma: Could not send reply, err=%d. Closing transport.\n", ret); - set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags); + set_bit(XPT_CLOSE, &xprt->xpt_flags); return -ENOTCONN; } - -void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp, - int status) -{ - struct ib_send_wr err_wr; - struct page *p; - struct svc_rdma_op_ctxt *ctxt; - enum rpcrdma_errcode err; - __be32 *va; - int length; - int ret; - - ret = svc_rdma_repost_recv(xprt, GFP_KERNEL); - if (ret) - return; - - p = alloc_page(GFP_KERNEL); - if (!p) - return; - va = page_address(p); - - /* XDR encode an error reply */ - err = ERR_CHUNK; - if (status == -EPROTONOSUPPORT) - err = ERR_VERS; - length = svc_rdma_xdr_encode_error(xprt, rmsgp, err, va); - - ctxt = svc_rdma_get_context(xprt); - ctxt->direction = DMA_TO_DEVICE; - ctxt->count = 1; - ctxt->pages[0] = p; - - /* Prepare SGE for local address */ - ctxt->sge[0].lkey = xprt->sc_pd->local_dma_lkey; - ctxt->sge[0].length = length; - ctxt->sge[0].addr = ib_dma_map_page(xprt->sc_cm_id->device, - p, 0, length, DMA_TO_DEVICE); - if (ib_dma_mapping_error(xprt->sc_cm_id->device, ctxt->sge[0].addr)) { - dprintk("svcrdma: Error mapping buffer for protocol error\n"); - svc_rdma_put_context(ctxt, 1); - return; - } - svc_rdma_count_mappings(xprt, ctxt); - - /* Prepare SEND WR */ - memset(&err_wr, 0, sizeof(err_wr)); - ctxt->cqe.done = svc_rdma_wc_send; - err_wr.wr_cqe = &ctxt->cqe; - err_wr.sg_list = ctxt->sge; - err_wr.num_sge = 1; - err_wr.opcode = IB_WR_SEND; - err_wr.send_flags = IB_SEND_SIGNALED; - - /* Post It */ - ret = svc_rdma_send(xprt, &err_wr); - if (ret) { - dprintk("svcrdma: Error %d posting send for protocol error\n", - ret); - svc_rdma_unmap_dma(ctxt); - svc_rdma_put_context(ctxt, 1); - } -} diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c index fc8f14c7bfec..a9d9cb1ba4c6 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c @@ -272,85 +272,6 @@ static void svc_rdma_destroy_ctxts(struct svcxprt_rdma *xprt) } } -static struct svc_rdma_req_map *alloc_req_map(gfp_t flags) -{ - struct svc_rdma_req_map *map; - - map = kmalloc(sizeof(*map), flags); - if (map) - INIT_LIST_HEAD(&map->free); - return map; -} - -static bool svc_rdma_prealloc_maps(struct svcxprt_rdma *xprt) -{ - unsigned int i; - - /* One for each receive buffer on this connection. */ - i = xprt->sc_max_requests; - - while (i--) { - struct svc_rdma_req_map *map; - - map = alloc_req_map(GFP_KERNEL); - if (!map) { - dprintk("svcrdma: No memory for request map\n"); - return false; - } - list_add(&map->free, &xprt->sc_maps); - } - return true; -} - -struct svc_rdma_req_map *svc_rdma_get_req_map(struct svcxprt_rdma *xprt) -{ - struct svc_rdma_req_map *map = NULL; - - spin_lock(&xprt->sc_map_lock); - if (list_empty(&xprt->sc_maps)) - goto out_empty; - - map = list_first_entry(&xprt->sc_maps, - struct svc_rdma_req_map, free); - list_del_init(&map->free); - spin_unlock(&xprt->sc_map_lock); - -out: - map->count = 0; - return map; - -out_empty: - spin_unlock(&xprt->sc_map_lock); - - /* Pre-allocation amount was incorrect */ - map = alloc_req_map(GFP_NOIO); - if (map) - goto out; - - WARN_ONCE(1, "svcrdma: empty request map list?\n"); - return NULL; -} - -void svc_rdma_put_req_map(struct svcxprt_rdma *xprt, - struct svc_rdma_req_map *map) -{ - spin_lock(&xprt->sc_map_lock); - list_add(&map->free, &xprt->sc_maps); - spin_unlock(&xprt->sc_map_lock); -} - -static void svc_rdma_destroy_maps(struct svcxprt_rdma *xprt) -{ - while (!list_empty(&xprt->sc_maps)) { - struct svc_rdma_req_map *map; - - map = list_first_entry(&xprt->sc_maps, - struct svc_rdma_req_map, free); - list_del(&map->free); - kfree(map); - } -} - /* QP event handler */ static void qp_event_handler(struct ib_event *event, void *context) { @@ -474,24 +395,6 @@ void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) } /** - * svc_rdma_wc_write - Invoked by RDMA provider for each polled Write WC - * @cq: completion queue - * @wc: completed WR - * - */ -void svc_rdma_wc_write(struct ib_cq *cq, struct ib_wc *wc) -{ - struct ib_cqe *cqe = wc->wr_cqe; - struct svc_rdma_op_ctxt *ctxt; - - svc_rdma_send_wc_common_put(cq, wc, "write"); - - ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe); - svc_rdma_unmap_dma(ctxt); - svc_rdma_put_context(ctxt, 0); -} - -/** * svc_rdma_wc_reg - Invoked by RDMA provider for each polled FASTREG WC * @cq: completion queue * @wc: completed WR @@ -561,14 +464,14 @@ static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv, INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q); INIT_LIST_HEAD(&cma_xprt->sc_frmr_q); INIT_LIST_HEAD(&cma_xprt->sc_ctxts); - INIT_LIST_HEAD(&cma_xprt->sc_maps); + INIT_LIST_HEAD(&cma_xprt->sc_rw_ctxts); init_waitqueue_head(&cma_xprt->sc_send_wait); spin_lock_init(&cma_xprt->sc_lock); spin_lock_init(&cma_xprt->sc_rq_dto_lock); spin_lock_init(&cma_xprt->sc_frmr_q_lock); spin_lock_init(&cma_xprt->sc_ctxt_lock); - spin_lock_init(&cma_xprt->sc_map_lock); + spin_lock_init(&cma_xprt->sc_rw_ctxt_lock); /* * Note that this implies that the underlying transport support @@ -999,6 +902,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) newxprt, newxprt->sc_cm_id); dev = newxprt->sc_cm_id->device; + newxprt->sc_port_num = newxprt->sc_cm_id->port_num; /* Qualify the transport resource defaults with the * capabilities of this particular device */ @@ -1014,13 +918,11 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) svcrdma_max_bc_requests); newxprt->sc_rq_depth = newxprt->sc_max_requests + newxprt->sc_max_bc_requests; - newxprt->sc_sq_depth = RPCRDMA_SQ_DEPTH_MULT * newxprt->sc_rq_depth; + newxprt->sc_sq_depth = newxprt->sc_rq_depth; atomic_set(&newxprt->sc_sq_avail, newxprt->sc_sq_depth); if (!svc_rdma_prealloc_ctxts(newxprt)) goto errout; - if (!svc_rdma_prealloc_maps(newxprt)) - goto errout; /* * Limit ORD based on client limit, local device limit, and @@ -1050,6 +952,8 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) memset(&qp_attr, 0, sizeof qp_attr); qp_attr.event_handler = qp_event_handler; qp_attr.qp_context = &newxprt->sc_xprt; + qp_attr.port_num = newxprt->sc_cm_id->port_num; + qp_attr.cap.max_rdma_ctxs = newxprt->sc_max_requests; qp_attr.cap.max_send_wr = newxprt->sc_sq_depth; qp_attr.cap.max_recv_wr = newxprt->sc_rq_depth; qp_attr.cap.max_send_sge = newxprt->sc_max_sge; @@ -1248,8 +1152,8 @@ static void __svc_rdma_free(struct work_struct *work) } rdma_dealloc_frmr_q(rdma); + svc_rdma_destroy_rw_ctxts(rdma); svc_rdma_destroy_ctxts(rdma); - svc_rdma_destroy_maps(rdma); /* Destroy the QP if present (not a listener) */ if (rdma->sc_qp && !IS_ERR(rdma->sc_qp)) |