summaryrefslogtreecommitdiff
path: root/net
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2026-04-20 10:44:02 -0700
committerLinus Torvalds <torvalds@linux-foundation.org>2026-04-20 10:44:02 -0700
commit36d179fd6bea35698d53444b7bd3025fa3788266 (patch)
tree399ec5f312f24b6d6eadc60125960e1d53216086 /net
parentc1f49dea2b8f335813d3b348fd39117fb8efb428 (diff)
parentd644a698de12e996778657f65a4608299368e138 (diff)
downloadlwn-36d179fd6bea35698d53444b7bd3025fa3788266.tar.gz
lwn-36d179fd6bea35698d53444b7bd3025fa3788266.zip
Merge tag 'nfsd-7.1' of git://git.kernel.org/pub/scm/linux/kernel/git/cel/linux
Pull nfsd updates from Chuck Lever: - filehandle signing to defend against filehandle-guessing attacks (Benjamin Coddington) The server now appends a SipHash-2-4 MAC to each filehandle when the new "sign_fh" export option is enabled. NFSD then verifies filehandles received from clients against the expected MAC; mismatches return NFS error STALE - convert the entire NLMv4 server-side XDR layer from hand-written C to xdrgen-generated code, spanning roughly thirty patches (Chuck Lever) XDR functions are generally boilerplate code and are easy to get wrong. The goals of this conversion are improved memory safety, lower maintenance burden, and groundwork for eventual Rust code generation for these functions. - improve pNFS block/SCSI layout robustness with two related changes (Dai Ngo) SCSI persistent reservation fencing is now tracked per client and per device via an xarray, to avoid both redundant preempt operations on devices already fenced and a potential NFSD deadlock when all nfsd threads are waiting for a layout return. - scalability and infrastructure improvements Sincere thanks to all contributors, reviewers, testers, and bug reporters who participated in the v7.1 NFSD development cycle. * tag 'nfsd-7.1' of git://git.kernel.org/pub/scm/linux/kernel/git/cel/linux: (83 commits) NFSD: Docs: clean up pnfs server timeout docs nfsd: fix comment typo in nfsxdr nfsd: fix comment typo in nfs3xdr NFSD: convert callback RPC program to per-net namespace NFSD: use per-operation statidx for callback procedures svcrdma: Use contiguous pages for RDMA Read sink buffers SUNRPC: Add svc_rqst_page_release() helper SUNRPC: xdr.h: fix all kernel-doc warnings svcrdma: Factor out WR chain linking into helper svcrdma: Add Write chunk WRs to the RPC's Send WR chain svcrdma: Clean up use of rdma->sc_pd->device svcrdma: Clean up use of rdma->sc_pd->device in Receive paths svcrdma: Add fair queuing for Send Queue access SUNRPC: Optimize rq_respages allocation in svc_alloc_arg SUNRPC: Track consumed rq_pages entries svcrdma: preserve rq_next_page in svc_rdma_save_io_pages SUNRPC: Handle NULL entries in svc_rqst_release_pages SUNRPC: Allocate a separate Reply page array SUNRPC: Tighten bounds checking in svc_rqst_replace_page NFSD: Sign filehandles ...
Diffstat (limited to 'net')
-rw-r--r--net/sunrpc/auth_gss/gss_krb5_test.c93
-rw-r--r--net/sunrpc/cache.c249
-rw-r--r--net/sunrpc/svc.c68
-rw-r--r--net/sunrpc/svc_xprt.c47
-rw-r--r--net/sunrpc/svcsock.c9
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_recvfrom.c28
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_rw.c374
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_sendto.c196
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_transport.c33
9 files changed, 726 insertions, 371 deletions
diff --git a/net/sunrpc/auth_gss/gss_krb5_test.c b/net/sunrpc/auth_gss/gss_krb5_test.c
index a5bff02cd7ba..dde1ee934d0d 100644
--- a/net/sunrpc/auth_gss/gss_krb5_test.c
+++ b/net/sunrpc/auth_gss/gss_krb5_test.c
@@ -63,10 +63,11 @@ static void kdf_case(struct kunit *test)
KUNIT_ASSERT_EQ(test, err, 0);
/* Assert */
- KUNIT_EXPECT_EQ_MSG(test,
- memcmp(param->expected_result->data,
- derivedkey.data, derivedkey.len), 0,
- "key mismatch");
+ KUNIT_EXPECT_MEMEQ_MSG(test,
+ param->expected_result->data,
+ derivedkey.data,
+ derivedkey.len,
+ "key mismatch");
}
static void checksum_case(struct kunit *test)
@@ -111,10 +112,11 @@ static void checksum_case(struct kunit *test)
KUNIT_ASSERT_EQ(test, err, 0);
/* Assert */
- KUNIT_EXPECT_EQ_MSG(test,
- memcmp(param->expected_result->data,
- checksum.data, checksum.len), 0,
- "checksum mismatch");
+ KUNIT_EXPECT_MEMEQ_MSG(test,
+ param->expected_result->data,
+ checksum.data,
+ checksum.len,
+ "checksum mismatch");
crypto_free_ahash(tfm);
}
@@ -314,10 +316,11 @@ static void rfc3961_nfold_case(struct kunit *test)
param->expected_result->len * 8, result);
/* Assert */
- KUNIT_EXPECT_EQ_MSG(test,
- memcmp(param->expected_result->data,
- result, param->expected_result->len), 0,
- "result mismatch");
+ KUNIT_EXPECT_MEMEQ_MSG(test,
+ param->expected_result->data,
+ result,
+ param->expected_result->len,
+ "result mismatch");
}
static struct kunit_case rfc3961_test_cases[] = {
@@ -569,14 +572,16 @@ static void rfc3962_encrypt_case(struct kunit *test)
KUNIT_EXPECT_EQ_MSG(test,
param->expected_result->len, buf.len,
"ciphertext length mismatch");
- KUNIT_EXPECT_EQ_MSG(test,
- memcmp(param->expected_result->data,
- text, param->expected_result->len), 0,
- "ciphertext mismatch");
- KUNIT_EXPECT_EQ_MSG(test,
- memcmp(param->next_iv->data, iv,
- param->next_iv->len), 0,
- "IV mismatch");
+ KUNIT_EXPECT_MEMEQ_MSG(test,
+ param->expected_result->data,
+ text,
+ param->expected_result->len,
+ "ciphertext mismatch");
+ KUNIT_EXPECT_MEMEQ_MSG(test,
+ param->next_iv->data,
+ iv,
+ param->next_iv->len,
+ "IV mismatch");
crypto_free_sync_skcipher(cts_tfm);
crypto_free_sync_skcipher(cbc_tfm);
@@ -1194,15 +1199,17 @@ static void rfc6803_encrypt_case(struct kunit *test)
KUNIT_EXPECT_EQ_MSG(test, param->expected_result->len,
buf.len + checksum.len,
"ciphertext length mismatch");
- KUNIT_EXPECT_EQ_MSG(test,
- memcmp(param->expected_result->data,
- buf.head[0].iov_base, buf.len), 0,
- "encrypted result mismatch");
- KUNIT_EXPECT_EQ_MSG(test,
- memcmp(param->expected_result->data +
- (param->expected_result->len - checksum.len),
- checksum.data, checksum.len), 0,
- "HMAC mismatch");
+ KUNIT_EXPECT_MEMEQ_MSG(test,
+ param->expected_result->data,
+ buf.head[0].iov_base,
+ buf.len,
+ "encrypted result mismatch");
+ KUNIT_EXPECT_MEMEQ_MSG(test,
+ param->expected_result->data +
+ (param->expected_result->len - checksum.len),
+ checksum.data,
+ checksum.len,
+ "HMAC mismatch");
crypto_free_ahash(ahash_tfm);
crypto_free_sync_skcipher(cts_tfm);
@@ -1687,15 +1694,16 @@ static void rfc8009_encrypt_case(struct kunit *test)
KUNIT_EXPECT_EQ_MSG(test,
param->expected_result->len, buf.len,
"ciphertext length mismatch");
- KUNIT_EXPECT_EQ_MSG(test,
- memcmp(param->expected_result->data,
- buf.head[0].iov_base,
- param->expected_result->len), 0,
- "ciphertext mismatch");
- KUNIT_EXPECT_EQ_MSG(test, memcmp(param->expected_hmac->data,
- checksum.data,
- checksum.len), 0,
- "HMAC mismatch");
+ KUNIT_EXPECT_MEMEQ_MSG(test,
+ param->expected_result->data,
+ buf.head[0].iov_base,
+ param->expected_result->len,
+ "ciphertext mismatch");
+ KUNIT_EXPECT_MEMEQ_MSG(test,
+ param->expected_hmac->data,
+ checksum.data,
+ checksum.len,
+ "HMAC mismatch");
crypto_free_ahash(ahash_tfm);
crypto_free_sync_skcipher(cts_tfm);
@@ -1826,10 +1834,11 @@ static void encrypt_selftest_case(struct kunit *test)
KUNIT_EXPECT_EQ_MSG(test,
param->plaintext->len, buf.len,
"length mismatch");
- KUNIT_EXPECT_EQ_MSG(test,
- memcmp(param->plaintext->data,
- buf.head[0].iov_base, buf.len), 0,
- "plaintext mismatch");
+ KUNIT_EXPECT_MEMEQ_MSG(test,
+ param->plaintext->data,
+ buf.head[0].iov_base,
+ buf.len,
+ "plaintext mismatch");
crypto_free_sync_skcipher(cts_tfm);
crypto_free_sync_skcipher(cbc_tfm);
diff --git a/net/sunrpc/cache.c b/net/sunrpc/cache.c
index ef8b7e8b1e9c..7081c1214e6c 100644
--- a/net/sunrpc/cache.c
+++ b/net/sunrpc/cache.c
@@ -134,11 +134,11 @@ static struct cache_head *sunrpc_cache_add_entry(struct cache_detail *detail,
return tmp;
}
+ cache_get(new);
hlist_add_head_rcu(&new->cache_list, head);
detail->entries++;
if (detail->nextcheck > new->expiry_time)
detail->nextcheck = new->expiry_time + 1;
- cache_get(new);
spin_unlock(&detail->hash_lock);
if (freeme)
@@ -233,9 +233,9 @@ struct cache_head *sunrpc_cache_update(struct cache_detail *detail,
spin_lock(&detail->hash_lock);
cache_entry_update(detail, tmp, new);
- hlist_add_head(&tmp->cache_list, &detail->hash_table[hash]);
- detail->entries++;
cache_get(tmp);
+ hlist_add_head_rcu(&tmp->cache_list, &detail->hash_table[hash]);
+ detail->entries++;
cache_fresh_locked(tmp, new->expiry_time, detail);
cache_fresh_locked(old, 0, detail);
spin_unlock(&detail->hash_lock);
@@ -399,7 +399,11 @@ static struct delayed_work cache_cleaner;
void sunrpc_init_cache_detail(struct cache_detail *cd)
{
spin_lock_init(&cd->hash_lock);
- INIT_LIST_HEAD(&cd->queue);
+ INIT_LIST_HEAD(&cd->requests);
+ INIT_LIST_HEAD(&cd->readers);
+ spin_lock_init(&cd->queue_lock);
+ init_waitqueue_head(&cd->queue_wait);
+ cd->next_seqno = 0;
spin_lock(&cache_list_lock);
cd->nextcheck = 0;
cd->entries = 0;
@@ -794,31 +798,20 @@ void cache_clean_deferred(void *owner)
* On read, you get a full request, or block.
* On write, an update request is processed.
* Poll works if anything to read, and always allows write.
- *
- * Implemented by linked list of requests. Each open file has
- * a ->private that also exists in this list. New requests are added
- * to the end and may wakeup and preceding readers.
- * New readers are added to the head. If, on read, an item is found with
- * CACHE_UPCALLING clear, we free it from the list.
- *
*/
-static DEFINE_SPINLOCK(queue_lock);
-
-struct cache_queue {
- struct list_head list;
- int reader; /* if 0, then request */
-};
struct cache_request {
- struct cache_queue q;
+ struct list_head list;
struct cache_head *item;
- char * buf;
+ char *buf;
int len;
int readers;
+ u64 seqno;
};
struct cache_reader {
- struct cache_queue q;
+ struct list_head list;
int offset; /* if non-0, we have a refcnt on next request */
+ u64 next_seqno;
};
static int cache_request(struct cache_detail *detail,
@@ -833,6 +826,17 @@ static int cache_request(struct cache_detail *detail,
return PAGE_SIZE - len;
}
+static struct cache_request *
+cache_next_request(struct cache_detail *cd, u64 seqno)
+{
+ struct cache_request *rq;
+
+ list_for_each_entry(rq, &cd->requests, list)
+ if (rq->seqno >= seqno)
+ return rq;
+ return NULL;
+}
+
static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
loff_t *ppos, struct cache_detail *cd)
{
@@ -847,25 +851,18 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
inode_lock(inode); /* protect against multiple concurrent
* readers on this file */
again:
- spin_lock(&queue_lock);
+ spin_lock(&cd->queue_lock);
/* need to find next request */
- while (rp->q.list.next != &cd->queue &&
- list_entry(rp->q.list.next, struct cache_queue, list)
- ->reader) {
- struct list_head *next = rp->q.list.next;
- list_move(&rp->q.list, next);
- }
- if (rp->q.list.next == &cd->queue) {
- spin_unlock(&queue_lock);
+ rq = cache_next_request(cd, rp->next_seqno);
+ if (!rq) {
+ spin_unlock(&cd->queue_lock);
inode_unlock(inode);
WARN_ON_ONCE(rp->offset);
return 0;
}
- rq = container_of(rp->q.list.next, struct cache_request, q.list);
- WARN_ON_ONCE(rq->q.reader);
if (rp->offset == 0)
rq->readers++;
- spin_unlock(&queue_lock);
+ spin_unlock(&cd->queue_lock);
if (rq->len == 0) {
err = cache_request(cd, rq);
@@ -876,9 +873,7 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
if (rp->offset == 0 && !test_bit(CACHE_PENDING, &rq->item->flags)) {
err = -EAGAIN;
- spin_lock(&queue_lock);
- list_move(&rp->q.list, &rq->q.list);
- spin_unlock(&queue_lock);
+ rp->next_seqno = rq->seqno + 1;
} else {
if (rp->offset + count > rq->len)
count = rq->len - rp->offset;
@@ -888,26 +883,24 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
rp->offset += count;
if (rp->offset >= rq->len) {
rp->offset = 0;
- spin_lock(&queue_lock);
- list_move(&rp->q.list, &rq->q.list);
- spin_unlock(&queue_lock);
+ rp->next_seqno = rq->seqno + 1;
}
err = 0;
}
out:
if (rp->offset == 0) {
/* need to release rq */
- spin_lock(&queue_lock);
+ spin_lock(&cd->queue_lock);
rq->readers--;
if (rq->readers == 0 &&
!test_bit(CACHE_PENDING, &rq->item->flags)) {
- list_del(&rq->q.list);
- spin_unlock(&queue_lock);
+ list_del(&rq->list);
+ spin_unlock(&cd->queue_lock);
cache_put(rq->item, cd);
kfree(rq->buf);
kfree(rq);
} else
- spin_unlock(&queue_lock);
+ spin_unlock(&cd->queue_lock);
}
if (err == -EAGAIN)
goto again;
@@ -971,16 +964,13 @@ out:
return ret;
}
-static DECLARE_WAIT_QUEUE_HEAD(queue_wait);
-
static __poll_t cache_poll(struct file *filp, poll_table *wait,
struct cache_detail *cd)
{
__poll_t mask;
struct cache_reader *rp = filp->private_data;
- struct cache_queue *cq;
- poll_wait(filp, &queue_wait, wait);
+ poll_wait(filp, &cd->queue_wait, wait);
/* alway allow write */
mask = EPOLLOUT | EPOLLWRNORM;
@@ -988,15 +978,11 @@ static __poll_t cache_poll(struct file *filp, poll_table *wait,
if (!rp)
return mask;
- spin_lock(&queue_lock);
+ spin_lock(&cd->queue_lock);
- for (cq= &rp->q; &cq->list != &cd->queue;
- cq = list_entry(cq->list.next, struct cache_queue, list))
- if (!cq->reader) {
- mask |= EPOLLIN | EPOLLRDNORM;
- break;
- }
- spin_unlock(&queue_lock);
+ if (cache_next_request(cd, rp->next_seqno))
+ mask |= EPOLLIN | EPOLLRDNORM;
+ spin_unlock(&cd->queue_lock);
return mask;
}
@@ -1006,25 +992,20 @@ static int cache_ioctl(struct inode *ino, struct file *filp,
{
int len = 0;
struct cache_reader *rp = filp->private_data;
- struct cache_queue *cq;
+ struct cache_request *rq;
if (cmd != FIONREAD || !rp)
return -EINVAL;
- spin_lock(&queue_lock);
+ spin_lock(&cd->queue_lock);
/* only find the length remaining in current request,
* or the length of the next request
*/
- for (cq= &rp->q; &cq->list != &cd->queue;
- cq = list_entry(cq->list.next, struct cache_queue, list))
- if (!cq->reader) {
- struct cache_request *cr =
- container_of(cq, struct cache_request, q);
- len = cr->len - rp->offset;
- break;
- }
- spin_unlock(&queue_lock);
+ rq = cache_next_request(cd, rp->next_seqno);
+ if (rq)
+ len = rq->len - rp->offset;
+ spin_unlock(&cd->queue_lock);
return put_user(len, (int __user *)arg);
}
@@ -1044,11 +1025,11 @@ static int cache_open(struct inode *inode, struct file *filp,
return -ENOMEM;
}
rp->offset = 0;
- rp->q.reader = 1;
+ rp->next_seqno = 0;
- spin_lock(&queue_lock);
- list_add(&rp->q.list, &cd->queue);
- spin_unlock(&queue_lock);
+ spin_lock(&cd->queue_lock);
+ list_add(&rp->list, &cd->readers);
+ spin_unlock(&cd->queue_lock);
}
if (filp->f_mode & FMODE_WRITE)
atomic_inc(&cd->writers);
@@ -1064,29 +1045,24 @@ static int cache_release(struct inode *inode, struct file *filp,
if (rp) {
struct cache_request *rq = NULL;
- spin_lock(&queue_lock);
+ spin_lock(&cd->queue_lock);
if (rp->offset) {
- struct cache_queue *cq;
- for (cq = &rp->q; &cq->list != &cd->queue;
- cq = list_entry(cq->list.next,
- struct cache_queue, list))
- if (!cq->reader) {
- struct cache_request *cr =
- container_of(cq,
- struct cache_request, q);
- cr->readers--;
- if (cr->readers == 0 &&
- !test_bit(CACHE_PENDING,
- &cr->item->flags)) {
- list_del(&cr->q.list);
- rq = cr;
- }
- break;
+ struct cache_request *cr;
+
+ cr = cache_next_request(cd, rp->next_seqno);
+ if (cr) {
+ cr->readers--;
+ if (cr->readers == 0 &&
+ !test_bit(CACHE_PENDING,
+ &cr->item->flags)) {
+ list_del(&cr->list);
+ rq = cr;
}
+ }
rp->offset = 0;
}
- list_del(&rp->q.list);
- spin_unlock(&queue_lock);
+ list_del(&rp->list);
+ spin_unlock(&cd->queue_lock);
if (rq) {
cache_put(rq->item, cd);
@@ -1109,27 +1085,24 @@ static int cache_release(struct inode *inode, struct file *filp,
static void cache_dequeue(struct cache_detail *detail, struct cache_head *ch)
{
- struct cache_queue *cq, *tmp;
- struct cache_request *cr;
+ struct cache_request *cr, *tmp;
LIST_HEAD(dequeued);
- spin_lock(&queue_lock);
- list_for_each_entry_safe(cq, tmp, &detail->queue, list)
- if (!cq->reader) {
- cr = container_of(cq, struct cache_request, q);
- if (cr->item != ch)
- continue;
- if (test_bit(CACHE_PENDING, &ch->flags))
- /* Lost a race and it is pending again */
- break;
- if (cr->readers != 0)
- continue;
- list_move(&cr->q.list, &dequeued);
- }
- spin_unlock(&queue_lock);
+ spin_lock(&detail->queue_lock);
+ list_for_each_entry_safe(cr, tmp, &detail->requests, list) {
+ if (cr->item != ch)
+ continue;
+ if (test_bit(CACHE_PENDING, &ch->flags))
+ /* Lost a race and it is pending again */
+ break;
+ if (cr->readers != 0)
+ continue;
+ list_move(&cr->list, &dequeued);
+ }
+ spin_unlock(&detail->queue_lock);
while (!list_empty(&dequeued)) {
- cr = list_entry(dequeued.next, struct cache_request, q.list);
- list_del(&cr->q.list);
+ cr = list_entry(dequeued.next, struct cache_request, list);
+ list_del(&cr->list);
cache_put(cr->item, detail);
kfree(cr->buf);
kfree(cr);
@@ -1247,20 +1220,20 @@ static int cache_pipe_upcall(struct cache_detail *detail, struct cache_head *h)
return -EAGAIN;
}
- crq->q.reader = 0;
crq->buf = buf;
crq->len = 0;
crq->readers = 0;
- spin_lock(&queue_lock);
+ spin_lock(&detail->queue_lock);
if (test_bit(CACHE_PENDING, &h->flags)) {
crq->item = cache_get(h);
- list_add_tail(&crq->q.list, &detail->queue);
+ crq->seqno = detail->next_seqno++;
+ list_add_tail(&crq->list, &detail->requests);
trace_cache_entry_upcall(detail, h);
} else
/* Lost a race, no longer PENDING, so don't enqueue */
ret = -EAGAIN;
- spin_unlock(&queue_lock);
- wake_up(&queue_wait);
+ spin_unlock(&detail->queue_lock);
+ wake_up(&detail->queue_wait);
if (ret == -EAGAIN) {
kfree(buf);
kfree(crq);
@@ -1378,18 +1351,14 @@ static void *__cache_seq_start(struct seq_file *m, loff_t *pos)
hlist_for_each_entry_rcu(ch, &cd->hash_table[hash], cache_list)
if (!entry--)
return ch;
- n &= ~((1LL<<32) - 1);
- do {
- hash++;
- n += 1LL<<32;
- } while(hash < cd->hash_size &&
- hlist_empty(&cd->hash_table[hash]));
- if (hash >= cd->hash_size)
- return NULL;
- *pos = n+1;
- return hlist_entry_safe(rcu_dereference_raw(
+ ch = NULL;
+ while (!ch && ++hash < cd->hash_size)
+ ch = hlist_entry_safe(rcu_dereference(
hlist_first_rcu(&cd->hash_table[hash])),
struct cache_head, cache_list);
+
+ *pos = ((long long)hash << 32) + 1;
+ return ch;
}
static void *cache_seq_next(struct seq_file *m, void *p, loff_t *pos)
@@ -1398,29 +1367,29 @@ static void *cache_seq_next(struct seq_file *m, void *p, loff_t *pos)
int hash = (*pos >> 32);
struct cache_detail *cd = m->private;
- if (p == SEQ_START_TOKEN)
+ if (p == SEQ_START_TOKEN) {
hash = 0;
- else if (ch->cache_list.next == NULL) {
- hash++;
- *pos += 1LL<<32;
- } else {
- ++*pos;
- return hlist_entry_safe(rcu_dereference_raw(
- hlist_next_rcu(&ch->cache_list)),
- struct cache_head, cache_list);
+ ch = NULL;
}
- *pos &= ~((1LL<<32) - 1);
- while (hash < cd->hash_size &&
- hlist_empty(&cd->hash_table[hash])) {
+ while (hash < cd->hash_size) {
+ if (ch)
+ ch = hlist_entry_safe(
+ rcu_dereference(
+ hlist_next_rcu(&ch->cache_list)),
+ struct cache_head, cache_list);
+ else
+ ch = hlist_entry_safe(
+ rcu_dereference(
+ hlist_first_rcu(&cd->hash_table[hash])),
+ struct cache_head, cache_list);
+ if (ch) {
+ ++*pos;
+ return ch;
+ }
hash++;
- *pos += 1LL<<32;
+ *pos = (long long)hash << 32;
}
- if (hash >= cd->hash_size)
- return NULL;
- ++*pos;
- return hlist_entry_safe(rcu_dereference_raw(
- hlist_first_rcu(&cd->hash_table[hash])),
- struct cache_head, cache_list);
+ return NULL;
}
void *cache_seq_start_rcu(struct seq_file *m, loff_t *pos)
diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
index d8ccb8e4b5c2..576fa42e7abf 100644
--- a/net/sunrpc/svc.c
+++ b/net/sunrpc/svc.c
@@ -638,13 +638,25 @@ svc_init_buffer(struct svc_rqst *rqstp, const struct svc_serv *serv, int node)
{
rqstp->rq_maxpages = svc_serv_maxpages(serv);
- /* rq_pages' last entry is NULL for historical reasons. */
+ /* +1 for a NULL sentinel readable by nfsd_splice_actor() */
rqstp->rq_pages = kcalloc_node(rqstp->rq_maxpages + 1,
sizeof(struct page *),
GFP_KERNEL, node);
if (!rqstp->rq_pages)
return false;
+ /* +1 for a NULL sentinel at rq_page_end (see svc_rqst_replace_page) */
+ rqstp->rq_respages = kcalloc_node(rqstp->rq_maxpages + 1,
+ sizeof(struct page *),
+ GFP_KERNEL, node);
+ if (!rqstp->rq_respages) {
+ kfree(rqstp->rq_pages);
+ rqstp->rq_pages = NULL;
+ return false;
+ }
+
+ rqstp->rq_pages_nfree = rqstp->rq_maxpages;
+ rqstp->rq_next_page = rqstp->rq_respages + rqstp->rq_maxpages;
return true;
}
@@ -656,10 +668,19 @@ svc_release_buffer(struct svc_rqst *rqstp)
{
unsigned long i;
- for (i = 0; i < rqstp->rq_maxpages; i++)
- if (rqstp->rq_pages[i])
- put_page(rqstp->rq_pages[i]);
- kfree(rqstp->rq_pages);
+ if (rqstp->rq_pages) {
+ for (i = 0; i < rqstp->rq_maxpages; i++)
+ if (rqstp->rq_pages[i])
+ put_page(rqstp->rq_pages[i]);
+ kfree(rqstp->rq_pages);
+ }
+
+ if (rqstp->rq_respages) {
+ for (i = 0; i < rqstp->rq_maxpages; i++)
+ if (rqstp->rq_respages[i])
+ put_page(rqstp->rq_respages[i]);
+ kfree(rqstp->rq_respages);
+ }
}
static void
@@ -934,11 +955,11 @@ svc_set_num_threads(struct svc_serv *serv, unsigned int min_threads,
EXPORT_SYMBOL_GPL(svc_set_num_threads);
/**
- * svc_rqst_replace_page - Replace one page in rq_pages[]
+ * svc_rqst_replace_page - Replace one page in rq_respages[]
* @rqstp: svc_rqst with pages to replace
* @page: replacement page
*
- * When replacing a page in rq_pages, batch the release of the
+ * When replacing a page in rq_respages, batch the release of the
* replaced pages to avoid hammering the page allocator.
*
* Return values:
@@ -947,19 +968,16 @@ EXPORT_SYMBOL_GPL(svc_set_num_threads);
*/
bool svc_rqst_replace_page(struct svc_rqst *rqstp, struct page *page)
{
- struct page **begin = rqstp->rq_pages;
- struct page **end = &rqstp->rq_pages[rqstp->rq_maxpages];
+ struct page **begin = rqstp->rq_respages;
+ struct page **end = rqstp->rq_page_end;
if (unlikely(rqstp->rq_next_page < begin || rqstp->rq_next_page > end)) {
trace_svc_replace_page_err(rqstp);
return false;
}
- if (*rqstp->rq_next_page) {
- if (!folio_batch_add(&rqstp->rq_fbatch,
- page_folio(*rqstp->rq_next_page)))
- __folio_batch_release(&rqstp->rq_fbatch);
- }
+ if (*rqstp->rq_next_page)
+ svc_rqst_page_release(rqstp, *rqstp->rq_next_page);
get_page(page);
*(rqstp->rq_next_page++) = page;
@@ -971,18 +989,24 @@ EXPORT_SYMBOL_GPL(svc_rqst_replace_page);
* svc_rqst_release_pages - Release Reply buffer pages
* @rqstp: RPC transaction context
*
- * Release response pages that might still be in flight after
- * svc_send, and any spliced filesystem-owned pages.
+ * Release response pages in the range [rq_respages, rq_next_page).
+ * NULL entries in this range are skipped, allowing transports to
+ * transfer pages to a send context before this function runs.
*/
void svc_rqst_release_pages(struct svc_rqst *rqstp)
{
- int i, count = rqstp->rq_next_page - rqstp->rq_respages;
-
- if (count) {
- release_pages(rqstp->rq_respages, count);
- for (i = 0; i < count; i++)
- rqstp->rq_respages[i] = NULL;
+ struct page **pp;
+
+ for (pp = rqstp->rq_respages; pp < rqstp->rq_next_page; pp++) {
+ if (*pp) {
+ if (!folio_batch_add(&rqstp->rq_fbatch,
+ page_folio(*pp)))
+ __folio_batch_release(&rqstp->rq_fbatch);
+ *pp = NULL;
+ }
}
+ if (rqstp->rq_fbatch.nr)
+ __folio_batch_release(&rqstp->rq_fbatch);
}
/**
diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c
index 56a663b8939f..b16e710926c1 100644
--- a/net/sunrpc/svc_xprt.c
+++ b/net/sunrpc/svc_xprt.c
@@ -650,14 +650,13 @@ static void svc_check_conn_limits(struct svc_serv *serv)
}
}
-static bool svc_alloc_arg(struct svc_rqst *rqstp)
+static bool svc_fill_pages(struct svc_rqst *rqstp, struct page **pages,
+ unsigned long npages)
{
- struct xdr_buf *arg = &rqstp->rq_arg;
- unsigned long pages, filled, ret;
+ unsigned long filled, ret;
- pages = rqstp->rq_maxpages;
- for (filled = 0; filled < pages; filled = ret) {
- ret = alloc_pages_bulk(GFP_KERNEL, pages, rqstp->rq_pages);
+ for (filled = 0; filled < npages; filled = ret) {
+ ret = alloc_pages_bulk(GFP_KERNEL, npages, pages);
if (ret > filled)
/* Made progress, don't sleep yet */
continue;
@@ -667,11 +666,40 @@ static bool svc_alloc_arg(struct svc_rqst *rqstp)
set_current_state(TASK_RUNNING);
return false;
}
- trace_svc_alloc_arg_err(pages, ret);
+ trace_svc_alloc_arg_err(npages, ret);
memalloc_retry_wait(GFP_KERNEL);
}
- rqstp->rq_page_end = &rqstp->rq_pages[pages];
- rqstp->rq_pages[pages] = NULL; /* this might be seen in nfsd_splice_actor() */
+ return true;
+}
+
+static bool svc_alloc_arg(struct svc_rqst *rqstp)
+{
+ struct xdr_buf *arg = &rqstp->rq_arg;
+ unsigned long pages, nfree;
+
+ pages = rqstp->rq_maxpages;
+
+ nfree = rqstp->rq_pages_nfree;
+ if (nfree) {
+ if (!svc_fill_pages(rqstp, rqstp->rq_pages, nfree))
+ return false;
+ rqstp->rq_pages_nfree = 0;
+ }
+
+ if (WARN_ON_ONCE(rqstp->rq_next_page < rqstp->rq_respages))
+ return false;
+ nfree = rqstp->rq_next_page - rqstp->rq_respages;
+ if (nfree) {
+ if (!svc_fill_pages(rqstp, rqstp->rq_respages, nfree))
+ return false;
+ }
+
+ rqstp->rq_next_page = rqstp->rq_respages;
+ rqstp->rq_page_end = &rqstp->rq_respages[pages];
+ /* svc_rqst_replace_page() dereferences *rq_next_page even
+ * at rq_page_end; NULL prevents releasing a garbage page.
+ */
+ rqstp->rq_page_end[0] = NULL;
/* Make arg->head point to first page and arg->pages point to rest */
arg->head[0].iov_base = page_address(rqstp->rq_pages[0]);
@@ -1277,7 +1305,6 @@ static noinline int svc_deferred_recv(struct svc_rqst *rqstp)
rqstp->rq_addrlen = dr->addrlen;
/* Save off transport header len in case we get deferred again */
rqstp->rq_daddr = dr->daddr;
- rqstp->rq_respages = rqstp->rq_pages;
rqstp->rq_xprt_ctxt = dr->xprt_ctxt;
dr->xprt_ctxt = NULL;
diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c
index f28c6076f7e8..7be3de1a1aed 100644
--- a/net/sunrpc/svcsock.c
+++ b/net/sunrpc/svcsock.c
@@ -351,8 +351,6 @@ static ssize_t svc_tcp_read_msg(struct svc_rqst *rqstp, size_t buflen,
for (i = 0, t = 0; t < buflen; i++, t += PAGE_SIZE)
bvec_set_page(&bvec[i], rqstp->rq_pages[i], PAGE_SIZE, 0);
- rqstp->rq_respages = &rqstp->rq_pages[i];
- rqstp->rq_next_page = rqstp->rq_respages + 1;
iov_iter_bvec(&msg.msg_iter, ITER_DEST, bvec, i, buflen);
if (seek) {
@@ -677,13 +675,9 @@ static int svc_udp_recvfrom(struct svc_rqst *rqstp)
if (len <= rqstp->rq_arg.head[0].iov_len) {
rqstp->rq_arg.head[0].iov_len = len;
rqstp->rq_arg.page_len = 0;
- rqstp->rq_respages = rqstp->rq_pages+1;
} else {
rqstp->rq_arg.page_len = len - rqstp->rq_arg.head[0].iov_len;
- rqstp->rq_respages = rqstp->rq_pages + 1 +
- DIV_ROUND_UP(rqstp->rq_arg.page_len, PAGE_SIZE);
}
- rqstp->rq_next_page = rqstp->rq_respages+1;
if (serv->sv_stats)
serv->sv_stats->netudpcnt++;
@@ -994,7 +988,7 @@ static size_t svc_tcp_restore_pages(struct svc_sock *svsk,
npages = (len + PAGE_SIZE - 1) >> PAGE_SHIFT;
for (i = 0; i < npages; i++) {
if (rqstp->rq_pages[i] != NULL)
- put_page(rqstp->rq_pages[i]);
+ svc_rqst_page_release(rqstp, rqstp->rq_pages[i]);
BUG_ON(svsk->sk_pages[i] == NULL);
rqstp->rq_pages[i] = svsk->sk_pages[i];
svsk->sk_pages[i] = NULL;
@@ -1015,6 +1009,7 @@ static void svc_tcp_save_pages(struct svc_sock *svsk, struct svc_rqst *rqstp)
svsk->sk_pages[i] = rqstp->rq_pages[i];
rqstp->rq_pages[i] = NULL;
}
+ rqstp->rq_pages_nfree = npages;
}
static void svc_tcp_clear_pages(struct svc_sock *svsk)
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index e7e4a39ca6c6..f8a0638eb095 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -118,7 +118,8 @@ svc_rdma_next_recv_ctxt(struct list_head *list)
static struct svc_rdma_recv_ctxt *
svc_rdma_recv_ctxt_alloc(struct svcxprt_rdma *rdma)
{
- int node = ibdev_to_node(rdma->sc_cm_id->device);
+ struct ib_device *device = rdma->sc_cm_id->device;
+ int node = ibdev_to_node(device);
struct svc_rdma_recv_ctxt *ctxt;
unsigned long pages;
dma_addr_t addr;
@@ -133,9 +134,9 @@ svc_rdma_recv_ctxt_alloc(struct svcxprt_rdma *rdma)
buffer = kmalloc_node(rdma->sc_max_req_size, GFP_KERNEL, node);
if (!buffer)
goto fail1;
- addr = ib_dma_map_single(rdma->sc_pd->device, buffer,
- rdma->sc_max_req_size, DMA_FROM_DEVICE);
- if (ib_dma_mapping_error(rdma->sc_pd->device, addr))
+ addr = ib_dma_map_single(device, buffer, rdma->sc_max_req_size,
+ DMA_FROM_DEVICE);
+ if (ib_dma_mapping_error(device, addr))
goto fail2;
svc_rdma_recv_cid_init(rdma, &ctxt->rc_cid);
@@ -167,7 +168,7 @@ fail0:
static void svc_rdma_recv_ctxt_destroy(struct svcxprt_rdma *rdma,
struct svc_rdma_recv_ctxt *ctxt)
{
- ib_dma_unmap_single(rdma->sc_pd->device, ctxt->rc_recv_sge.addr,
+ ib_dma_unmap_single(rdma->sc_cm_id->device, ctxt->rc_recv_sge.addr,
ctxt->rc_recv_sge.length, DMA_FROM_DEVICE);
kfree(ctxt->rc_recv_buf);
kfree(ctxt);
@@ -861,18 +862,12 @@ static noinline void svc_rdma_read_complete(struct svc_rqst *rqstp,
unsigned int i;
/* Transfer the Read chunk pages into @rqstp.rq_pages, replacing
- * the rq_pages that were already allocated for this rqstp.
+ * the receive buffer pages already allocated for this rqstp.
*/
- release_pages(rqstp->rq_respages, ctxt->rc_page_count);
+ release_pages(rqstp->rq_pages, ctxt->rc_page_count);
for (i = 0; i < ctxt->rc_page_count; i++)
rqstp->rq_pages[i] = ctxt->rc_pages[i];
- /* Update @rqstp's result send buffer to start after the
- * last page in the RDMA Read payload.
- */
- rqstp->rq_respages = &rqstp->rq_pages[ctxt->rc_page_count];
- rqstp->rq_next_page = rqstp->rq_respages + 1;
-
/* Prevent svc_rdma_recv_ctxt_put() from releasing the
* pages in ctxt::rc_pages a second time.
*/
@@ -931,10 +926,9 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
struct svc_rdma_recv_ctxt *ctxt;
int ret;
- /* Prevent svc_xprt_release() from releasing pages in rq_pages
- * when returning 0 or an error.
+ /* Precaution: a zero page count on error return causes
+ * svc_rqst_release_pages() to release nothing.
*/
- rqstp->rq_respages = rqstp->rq_pages;
rqstp->rq_next_page = rqstp->rq_respages;
rqstp->rq_xprt_ctxt = NULL;
@@ -962,7 +956,7 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
return 0;
percpu_counter_inc(&svcrdma_stat_recv);
- ib_dma_sync_single_for_cpu(rdma_xprt->sc_pd->device,
+ ib_dma_sync_single_for_cpu(rdma_xprt->sc_cm_id->device,
ctxt->rc_recv_sge.addr, ctxt->rc_byte_len,
DMA_FROM_DEVICE);
svc_rdma_build_arg_xdr(rqstp, ctxt);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c
index 4ec2f9ae06aa..402e2ceca4ff 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_rw.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c
@@ -252,6 +252,28 @@ static void svc_rdma_write_info_free(struct svc_rdma_write_info *info)
}
/**
+ * svc_rdma_write_chunk_release - Release Write chunk I/O resources
+ * @rdma: controlling transport
+ * @ctxt: Send context that is being released
+ *
+ * Write chunk resources remain live until Send completion because
+ * Write WRs are chained to the Send WR. This function releases all
+ * write_info structures accumulated on @ctxt->sc_write_info_list.
+ */
+void svc_rdma_write_chunk_release(struct svcxprt_rdma *rdma,
+ struct svc_rdma_send_ctxt *ctxt)
+{
+ struct svc_rdma_write_info *info;
+
+ while (!list_empty(&ctxt->sc_write_info_list)) {
+ info = list_first_entry(&ctxt->sc_write_info_list,
+ struct svc_rdma_write_info, wi_list);
+ list_del(&info->wi_list);
+ svc_rdma_write_info_free(info);
+ }
+}
+
+/**
* svc_rdma_reply_chunk_release - Release Reply chunk I/O resources
* @rdma: controlling transport
* @ctxt: Send context that is being released
@@ -307,13 +329,11 @@ static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc)
struct ib_cqe *cqe = wc->wr_cqe;
struct svc_rdma_chunk_ctxt *cc =
container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
- struct svc_rdma_write_info *info =
- container_of(cc, struct svc_rdma_write_info, wi_cc);
switch (wc->status) {
case IB_WC_SUCCESS:
trace_svcrdma_wc_write(&cc->cc_cid);
- break;
+ return;
case IB_WC_WR_FLUSH_ERR:
trace_svcrdma_wc_write_flush(wc, &cc->cc_cid);
break;
@@ -321,12 +341,11 @@ static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc)
trace_svcrdma_wc_write_err(wc, &cc->cc_cid);
}
- svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount);
-
- if (unlikely(wc->status != IB_WC_SUCCESS))
- svc_xprt_deferred_close(&rdma->sc_xprt);
-
- svc_rdma_write_info_free(info);
+ /* The RDMA Write has flushed, so the client won't get
+ * some of the outgoing RPC message. Signal the loss
+ * to the client by closing the connection.
+ */
+ svc_xprt_deferred_close(&rdma->sc_xprt);
}
/**
@@ -405,34 +424,17 @@ static int svc_rdma_post_chunk_ctxt(struct svcxprt_rdma *rdma,
cqe = NULL;
}
- do {
- if (atomic_sub_return(cc->cc_sqecount,
- &rdma->sc_sq_avail) > 0) {
- cc->cc_posttime = ktime_get();
- ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr);
- if (ret)
- break;
- return 0;
- }
-
- percpu_counter_inc(&svcrdma_stat_sq_starve);
- trace_svcrdma_sq_full(rdma, &cc->cc_cid);
- atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
- wait_event(rdma->sc_send_wait,
- atomic_read(&rdma->sc_sq_avail) > cc->cc_sqecount);
- trace_svcrdma_sq_retry(rdma, &cc->cc_cid);
- } while (1);
-
- trace_svcrdma_sq_post_err(rdma, &cc->cc_cid, ret);
- svc_xprt_deferred_close(&rdma->sc_xprt);
-
- /* If even one was posted, there will be a completion. */
- if (bad_wr != first_wr)
- return 0;
+ ret = svc_rdma_sq_wait(rdma, &cc->cc_cid, cc->cc_sqecount);
+ if (ret < 0)
+ return ret;
- atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
- wake_up(&rdma->sc_send_wait);
- return -ENOTCONN;
+ cc->cc_posttime = ktime_get();
+ ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr);
+ if (ret)
+ return svc_rdma_post_send_err(rdma, &cc->cc_cid, bad_wr,
+ first_wr, cc->cc_sqecount,
+ ret);
+ return 0;
}
/* Build a bvec that covers one kvec in an xdr_buf.
@@ -617,9 +619,37 @@ static int svc_rdma_xb_write(const struct xdr_buf *xdr, void *data)
return xdr->len;
}
-static int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
- const struct svc_rdma_chunk *chunk,
- const struct xdr_buf *xdr)
+/* Link chunk WRs onto @sctxt's WR chain. Completion is requested
+ * for the tail WR, which is posted first.
+ */
+static void svc_rdma_cc_link_wrs(struct svcxprt_rdma *rdma,
+ struct svc_rdma_send_ctxt *sctxt,
+ struct svc_rdma_chunk_ctxt *cc)
+{
+ struct ib_send_wr *first_wr;
+ struct list_head *pos;
+ struct ib_cqe *cqe;
+
+ first_wr = sctxt->sc_wr_chain;
+ cqe = &cc->cc_cqe;
+ list_for_each(pos, &cc->cc_rwctxts) {
+ struct svc_rdma_rw_ctxt *rwc;
+
+ rwc = list_entry(pos, struct svc_rdma_rw_ctxt, rw_list);
+ first_wr = rdma_rw_ctx_wrs(&rwc->rw_ctx, rdma->sc_qp,
+ rdma->sc_port_num, cqe, first_wr);
+ cqe = NULL;
+ }
+ sctxt->sc_wr_chain = first_wr;
+ sctxt->sc_sqecount += cc->cc_sqecount;
+}
+
+/* Link Write WRs for @chunk onto @sctxt's WR chain.
+ */
+static int svc_rdma_prepare_write_chunk(struct svcxprt_rdma *rdma,
+ struct svc_rdma_send_ctxt *sctxt,
+ const struct svc_rdma_chunk *chunk,
+ const struct xdr_buf *xdr)
{
struct svc_rdma_write_info *info;
struct svc_rdma_chunk_ctxt *cc;
@@ -639,10 +669,14 @@ static int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
if (ret != payload.len)
goto out_err;
- trace_svcrdma_post_write_chunk(&cc->cc_cid, cc->cc_sqecount);
- ret = svc_rdma_post_chunk_ctxt(rdma, cc);
- if (ret < 0)
+ ret = -EINVAL;
+ if (unlikely(sctxt->sc_sqecount + cc->cc_sqecount > rdma->sc_sq_depth))
goto out_err;
+
+ svc_rdma_cc_link_wrs(rdma, sctxt, cc);
+ list_add(&info->wi_list, &sctxt->sc_write_info_list);
+
+ trace_svcrdma_post_write_chunk(&cc->cc_cid, cc->cc_sqecount);
return 0;
out_err:
@@ -651,17 +685,19 @@ out_err:
}
/**
- * svc_rdma_send_write_list - Send all chunks on the Write list
+ * svc_rdma_prepare_write_list - Construct WR chain for sending Write list
* @rdma: controlling RDMA transport
* @rctxt: Write list provisioned by the client
+ * @sctxt: Send WR resources
* @xdr: xdr_buf containing an RPC Reply message
*
- * Returns zero on success, or a negative errno if one or more
- * Write chunks could not be sent.
+ * Returns zero on success, or a negative errno if WR chain
+ * construction fails for one or more Write chunks.
*/
-int svc_rdma_send_write_list(struct svcxprt_rdma *rdma,
- const struct svc_rdma_recv_ctxt *rctxt,
- const struct xdr_buf *xdr)
+int svc_rdma_prepare_write_list(struct svcxprt_rdma *rdma,
+ const struct svc_rdma_recv_ctxt *rctxt,
+ struct svc_rdma_send_ctxt *sctxt,
+ const struct xdr_buf *xdr)
{
struct svc_rdma_chunk *chunk;
int ret;
@@ -669,7 +705,7 @@ int svc_rdma_send_write_list(struct svcxprt_rdma *rdma,
pcl_for_each_chunk(chunk, &rctxt->rc_write_pcl) {
if (!chunk->ch_payload_length)
break;
- ret = svc_rdma_send_write_chunk(rdma, chunk, xdr);
+ ret = svc_rdma_prepare_write_chunk(rdma, sctxt, chunk, xdr);
if (ret < 0)
return ret;
}
@@ -699,9 +735,6 @@ int svc_rdma_prepare_reply_chunk(struct svcxprt_rdma *rdma,
{
struct svc_rdma_write_info *info = &sctxt->sc_reply_info;
struct svc_rdma_chunk_ctxt *cc = &info->wi_cc;
- struct ib_send_wr *first_wr;
- struct list_head *pos;
- struct ib_cqe *cqe;
int ret;
info->wi_rdma = rdma;
@@ -715,23 +748,222 @@ int svc_rdma_prepare_reply_chunk(struct svcxprt_rdma *rdma,
if (ret < 0)
return ret;
- first_wr = sctxt->sc_wr_chain;
- cqe = &cc->cc_cqe;
- list_for_each(pos, &cc->cc_rwctxts) {
- struct svc_rdma_rw_ctxt *rwc;
-
- rwc = list_entry(pos, struct svc_rdma_rw_ctxt, rw_list);
- first_wr = rdma_rw_ctx_wrs(&rwc->rw_ctx, rdma->sc_qp,
- rdma->sc_port_num, cqe, first_wr);
- cqe = NULL;
- }
- sctxt->sc_wr_chain = first_wr;
- sctxt->sc_sqecount += cc->cc_sqecount;
+ svc_rdma_cc_link_wrs(rdma, sctxt, cc);
trace_svcrdma_post_reply_chunk(&cc->cc_cid, cc->cc_sqecount);
return xdr->len;
}
+/*
+ * Cap contiguous RDMA Read sink allocations at order-4.
+ * Higher orders risk allocation failure under
+ * __GFP_NORETRY, which would negate the benefit of the
+ * contiguous fast path.
+ */
+#define SVC_RDMA_CONTIG_MAX_ORDER 4
+
+/**
+ * svc_rdma_alloc_read_pages - Allocate physically contiguous pages
+ * @nr_pages: number of pages needed
+ * @order: on success, set to the allocation order
+ *
+ * Attempts a higher-order allocation, falling back to smaller orders.
+ * The returned pages are split immediately so each sub-page has its
+ * own refcount and can be freed independently.
+ *
+ * Returns a pointer to the first page on success, or NULL if even
+ * order-1 allocation fails.
+ */
+static struct page *
+svc_rdma_alloc_read_pages(unsigned int nr_pages, unsigned int *order)
+{
+ unsigned int o;
+ struct page *page;
+
+ o = min(get_order(nr_pages << PAGE_SHIFT),
+ SVC_RDMA_CONTIG_MAX_ORDER);
+
+ while (o >= 1) {
+ page = alloc_pages(GFP_KERNEL | __GFP_NORETRY | __GFP_NOWARN,
+ o);
+ if (page) {
+ split_page(page, o);
+ *order = o;
+ return page;
+ }
+ o--;
+ }
+ return NULL;
+}
+
+/*
+ * svc_rdma_fill_contig_bvec - Replace rq_pages with a contiguous allocation
+ * @rqstp: RPC transaction context
+ * @head: context for ongoing I/O
+ * @bv: bvec entry to fill
+ * @pages_left: number of data pages remaining in the segment
+ * @len_left: bytes remaining in the segment
+ *
+ * On success, fills @bv with a bvec spanning the contiguous range and
+ * advances rc_curpage/rc_page_count. Returns the byte length covered,
+ * or zero if the allocation failed or would overrun rq_maxpages.
+ */
+static unsigned int
+svc_rdma_fill_contig_bvec(struct svc_rqst *rqstp,
+ struct svc_rdma_recv_ctxt *head,
+ struct bio_vec *bv, unsigned int pages_left,
+ unsigned int len_left)
+{
+ unsigned int order, npages, chunk_pages, chunk_len, i;
+ struct page *page;
+
+ page = svc_rdma_alloc_read_pages(pages_left, &order);
+ if (!page)
+ return 0;
+ npages = 1 << order;
+
+ if (head->rc_curpage + npages > rqstp->rq_maxpages) {
+ for (i = 0; i < npages; i++)
+ __free_page(page + i);
+ return 0;
+ }
+
+ /*
+ * Replace rq_pages[] entries with pages from the contiguous
+ * allocation. If npages exceeds chunk_pages, the extra pages
+ * stay in rq_pages[] for later reuse or normal rqst teardown.
+ */
+ for (i = 0; i < npages; i++) {
+ svc_rqst_page_release(rqstp,
+ rqstp->rq_pages[head->rc_curpage + i]);
+ rqstp->rq_pages[head->rc_curpage + i] = page + i;
+ }
+
+ chunk_pages = min(npages, pages_left);
+ chunk_len = min_t(unsigned int, chunk_pages << PAGE_SHIFT, len_left);
+ bvec_set_page(bv, page, chunk_len, 0);
+ head->rc_page_count += chunk_pages;
+ head->rc_curpage += chunk_pages;
+ return chunk_len;
+}
+
+/*
+ * svc_rdma_fill_page_bvec - Add a single rq_page to the bvec array
+ * @head: context for ongoing I/O
+ * @ctxt: R/W context whose bvec array is being filled
+ * @cur: page to add
+ * @bvec_idx: pointer to current bvec index, not advanced on merge
+ * @len_left: bytes remaining in the segment
+ *
+ * If @cur is physically contiguous with the preceding bvec, it is
+ * merged by extending that bvec's length. Otherwise a new bvec
+ * entry is created. Returns the byte length covered.
+ */
+static unsigned int
+svc_rdma_fill_page_bvec(struct svc_rdma_recv_ctxt *head,
+ struct svc_rdma_rw_ctxt *ctxt, struct page *cur,
+ unsigned int *bvec_idx, unsigned int len_left)
+{
+ unsigned int chunk_len = min_t(unsigned int, PAGE_SIZE, len_left);
+
+ head->rc_page_count++;
+ head->rc_curpage++;
+
+ if (*bvec_idx > 0) {
+ struct bio_vec *prev = &ctxt->rw_bvec[*bvec_idx - 1];
+
+ if (page_to_phys(prev->bv_page) + prev->bv_offset +
+ prev->bv_len == page_to_phys(cur)) {
+ prev->bv_len += chunk_len;
+ return chunk_len;
+ }
+ }
+
+ bvec_set_page(&ctxt->rw_bvec[*bvec_idx], cur, chunk_len, 0);
+ (*bvec_idx)++;
+ return chunk_len;
+}
+
+/**
+ * svc_rdma_build_read_segment_contig - Build RDMA Read WR with contiguous pages
+ * @rqstp: RPC transaction context
+ * @head: context for ongoing I/O
+ * @segment: co-ordinates of remote memory to be read
+ *
+ * Greedily allocates higher-order pages to cover the segment,
+ * building one bvec per contiguous chunk. Each allocation is
+ * split so sub-pages have independent refcounts. When a
+ * higher-order allocation fails, remaining pages are covered
+ * individually, merging adjacent pages into the preceding bvec
+ * when they are physically contiguous. The split sub-pages
+ * replace entries in rq_pages[] so downstream cleanup is
+ * unchanged.
+ *
+ * Returns:
+ * %0: the Read WR was constructed successfully
+ * %-ENOMEM: allocation failed
+ * %-EIO: a DMA mapping error occurred
+ */
+static int svc_rdma_build_read_segment_contig(struct svc_rqst *rqstp,
+ struct svc_rdma_recv_ctxt *head,
+ const struct svc_rdma_segment *segment)
+{
+ struct svcxprt_rdma *rdma = svc_rdma_rqst_rdma(rqstp);
+ struct svc_rdma_chunk_ctxt *cc = &head->rc_cc;
+ unsigned int nr_data_pages, bvec_idx;
+ struct svc_rdma_rw_ctxt *ctxt;
+ unsigned int len_left;
+ int ret;
+
+ nr_data_pages = PAGE_ALIGN(segment->rs_length) >> PAGE_SHIFT;
+ if (head->rc_curpage + nr_data_pages > rqstp->rq_maxpages)
+ return -ENOMEM;
+
+ ctxt = svc_rdma_get_rw_ctxt(rdma, nr_data_pages);
+ if (!ctxt)
+ return -ENOMEM;
+
+ bvec_idx = 0;
+ len_left = segment->rs_length;
+ while (len_left) {
+ unsigned int pages_left = PAGE_ALIGN(len_left) >> PAGE_SHIFT;
+ unsigned int chunk_len = 0;
+
+ if (pages_left >= 2)
+ chunk_len = svc_rdma_fill_contig_bvec(rqstp, head,
+ &ctxt->rw_bvec[bvec_idx],
+ pages_left, len_left);
+ if (chunk_len) {
+ bvec_idx++;
+ } else {
+ struct page *cur =
+ rqstp->rq_pages[head->rc_curpage];
+ chunk_len = svc_rdma_fill_page_bvec(head, ctxt, cur,
+ &bvec_idx,
+ len_left);
+ }
+
+ len_left -= chunk_len;
+ }
+
+ ctxt->rw_nents = bvec_idx;
+
+ head->rc_pageoff = offset_in_page(segment->rs_length);
+ if (head->rc_pageoff)
+ head->rc_curpage--;
+
+ ret = svc_rdma_rw_ctx_init(rdma, ctxt, segment->rs_offset,
+ segment->rs_handle, segment->rs_length,
+ DMA_FROM_DEVICE);
+ if (ret < 0)
+ return -EIO;
+ percpu_counter_inc(&svcrdma_stat_read);
+
+ list_add(&ctxt->rw_list, &cc->cc_rwctxts);
+ cc->cc_sqecount += ret;
+ return 0;
+}
+
/**
* svc_rdma_build_read_segment - Build RDMA Read WQEs to pull one RDMA segment
* @rqstp: RPC transaction context
@@ -758,6 +990,14 @@ static int svc_rdma_build_read_segment(struct svc_rqst *rqstp,
if (check_add_overflow(head->rc_pageoff, len, &total))
return -EINVAL;
nr_bvec = PAGE_ALIGN(total) >> PAGE_SHIFT;
+
+ if (head->rc_pageoff == 0 && nr_bvec >= 2) {
+ ret = svc_rdma_build_read_segment_contig(rqstp, head,
+ segment);
+ if (ret != -ENOMEM)
+ return ret;
+ }
+
ctxt = svc_rdma_get_rw_ctxt(rdma, nr_bvec);
if (!ctxt)
return -ENOMEM;
@@ -1103,10 +1343,16 @@ static void svc_rdma_clear_rqst_pages(struct svc_rqst *rqstp,
{
unsigned int i;
+ /*
+ * Move only pages containing RPC data into rc_pages[]. Pages
+ * from a contiguous allocation that were not used for the
+ * payload remain in rq_pages[] for subsequent reuse.
+ */
for (i = 0; i < head->rc_page_count; i++) {
head->rc_pages[i] = rqstp->rq_pages[i];
rqstp->rq_pages[i] = NULL;
}
+ rqstp->rq_pages_nfree = head->rc_page_count;
}
/**
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 914cd263c2f1..8b3f0c8c14b2 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -116,7 +116,8 @@ static void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc);
static struct svc_rdma_send_ctxt *
svc_rdma_send_ctxt_alloc(struct svcxprt_rdma *rdma)
{
- int node = ibdev_to_node(rdma->sc_cm_id->device);
+ struct ib_device *device = rdma->sc_cm_id->device;
+ int node = ibdev_to_node(device);
struct svc_rdma_send_ctxt *ctxt;
unsigned long pages;
dma_addr_t addr;
@@ -136,9 +137,9 @@ svc_rdma_send_ctxt_alloc(struct svcxprt_rdma *rdma)
buffer = kmalloc_node(rdma->sc_max_req_size, GFP_KERNEL, node);
if (!buffer)
goto fail2;
- addr = ib_dma_map_single(rdma->sc_pd->device, buffer,
- rdma->sc_max_req_size, DMA_TO_DEVICE);
- if (ib_dma_mapping_error(rdma->sc_pd->device, addr))
+ addr = ib_dma_map_single(device, buffer, rdma->sc_max_req_size,
+ DMA_TO_DEVICE);
+ if (ib_dma_mapping_error(device, addr))
goto fail3;
svc_rdma_send_cid_init(rdma, &ctxt->sc_cid);
@@ -149,6 +150,7 @@ svc_rdma_send_ctxt_alloc(struct svcxprt_rdma *rdma)
ctxt->sc_send_wr.sg_list = ctxt->sc_sges;
ctxt->sc_send_wr.send_flags = IB_SEND_SIGNALED;
ctxt->sc_cqe.done = svc_rdma_wc_send;
+ INIT_LIST_HEAD(&ctxt->sc_write_info_list);
ctxt->sc_xprt_buf = buffer;
xdr_buf_init(&ctxt->sc_hdrbuf, ctxt->sc_xprt_buf,
rdma->sc_max_req_size);
@@ -175,15 +177,14 @@ fail0:
*/
void svc_rdma_send_ctxts_destroy(struct svcxprt_rdma *rdma)
{
+ struct ib_device *device = rdma->sc_cm_id->device;
struct svc_rdma_send_ctxt *ctxt;
struct llist_node *node;
while ((node = llist_del_first(&rdma->sc_send_ctxts)) != NULL) {
ctxt = llist_entry(node, struct svc_rdma_send_ctxt, sc_node);
- ib_dma_unmap_single(rdma->sc_pd->device,
- ctxt->sc_sges[0].addr,
- rdma->sc_max_req_size,
- DMA_TO_DEVICE);
+ ib_dma_unmap_single(device, ctxt->sc_sges[0].addr,
+ rdma->sc_max_req_size, DMA_TO_DEVICE);
kfree(ctxt->sc_xprt_buf);
kfree(ctxt->sc_pages);
kfree(ctxt);
@@ -237,6 +238,7 @@ static void svc_rdma_send_ctxt_release(struct svcxprt_rdma *rdma,
struct ib_device *device = rdma->sc_cm_id->device;
unsigned int i;
+ svc_rdma_write_chunk_release(rdma, ctxt);
svc_rdma_reply_chunk_release(rdma, ctxt);
if (ctxt->sc_page_count)
@@ -295,6 +297,117 @@ void svc_rdma_wake_send_waiters(struct svcxprt_rdma *rdma, int avail)
}
/**
+ * svc_rdma_sq_wait - Wait for SQ slots using fair queuing
+ * @rdma: controlling transport
+ * @cid: completion ID for tracing
+ * @sqecount: number of SQ entries needed
+ *
+ * A ticket-based system ensures fair ordering when multiple threads
+ * wait for Send Queue capacity. Each waiter takes a ticket and is
+ * served in order, preventing starvation.
+ *
+ * Protocol invariant: every ticket holder must increment
+ * sc_sq_ticket_tail exactly once, whether the reservation
+ * succeeds or the connection closes. Failing to advance the
+ * tail stalls all subsequent waiters.
+ *
+ * The ticket counters are signed 32-bit atomics. After
+ * wrapping through INT_MAX, the equality check
+ * (tail == ticket) remains correct because both counters
+ * advance monotonically and the comparison uses exact
+ * equality rather than relational operators.
+ *
+ * Return values:
+ * %0: SQ slots were reserved successfully
+ * %-ENOTCONN: The connection was lost
+ */
+int svc_rdma_sq_wait(struct svcxprt_rdma *rdma,
+ const struct rpc_rdma_cid *cid, int sqecount)
+{
+ int ticket;
+
+ /* Fast path: try to reserve SQ slots without waiting.
+ *
+ * A failed reservation temporarily understates sc_sq_avail
+ * until the compensating atomic_add restores it. A Send
+ * completion arriving in that window sees a lower count
+ * than reality, but the value self-corrects once the add
+ * completes. No ordering guarantee is needed here because
+ * the slow path serializes all contended waiters.
+ */
+ if (likely(atomic_sub_return(sqecount, &rdma->sc_sq_avail) >= 0))
+ return 0;
+ atomic_add(sqecount, &rdma->sc_sq_avail);
+
+ /* Slow path: take a ticket and wait in line */
+ ticket = atomic_fetch_inc(&rdma->sc_sq_ticket_head);
+
+ percpu_counter_inc(&svcrdma_stat_sq_starve);
+ trace_svcrdma_sq_full(rdma, cid);
+
+ /* Wait until all earlier tickets have been served */
+ wait_event(rdma->sc_sq_ticket_wait,
+ test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags) ||
+ atomic_read(&rdma->sc_sq_ticket_tail) == ticket);
+ if (test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags))
+ goto out_close;
+
+ /* It's our turn. Wait for enough SQ slots to be available. */
+ while (atomic_sub_return(sqecount, &rdma->sc_sq_avail) < 0) {
+ atomic_add(sqecount, &rdma->sc_sq_avail);
+
+ wait_event(rdma->sc_send_wait,
+ test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags) ||
+ atomic_read(&rdma->sc_sq_avail) >= sqecount);
+ if (test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags))
+ goto out_close;
+ }
+
+ /* Slots reserved successfully. Let the next waiter proceed. */
+ atomic_inc(&rdma->sc_sq_ticket_tail);
+ wake_up(&rdma->sc_sq_ticket_wait);
+ trace_svcrdma_sq_retry(rdma, cid);
+ return 0;
+
+out_close:
+ atomic_inc(&rdma->sc_sq_ticket_tail);
+ wake_up(&rdma->sc_sq_ticket_wait);
+ return -ENOTCONN;
+}
+
+/**
+ * svc_rdma_post_send_err - Handle ib_post_send failure
+ * @rdma: controlling transport
+ * @cid: completion ID for tracing
+ * @bad_wr: first WR that was not posted
+ * @first_wr: first WR in the chain
+ * @sqecount: number of SQ entries that were reserved
+ * @ret: error code from ib_post_send
+ *
+ * Return values:
+ * %0: At least one WR was posted; a completion handles cleanup
+ * %-ENOTCONN: No WRs were posted; SQ slots are released
+ */
+int svc_rdma_post_send_err(struct svcxprt_rdma *rdma,
+ const struct rpc_rdma_cid *cid,
+ const struct ib_send_wr *bad_wr,
+ const struct ib_send_wr *first_wr,
+ int sqecount, int ret)
+{
+ trace_svcrdma_sq_post_err(rdma, cid, ret);
+ svc_xprt_deferred_close(&rdma->sc_xprt);
+
+ /* If even one WR was posted, a Send completion will
+ * return the reserved SQ slots.
+ */
+ if (bad_wr != first_wr)
+ return 0;
+
+ svc_rdma_wake_send_waiters(rdma, sqecount);
+ return -ENOTCONN;
+}
+
+/**
* svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC
* @cq: Completion Queue context
* @wc: Work Completion object
@@ -336,11 +449,6 @@ flushed:
* that these values remain available after the ib_post_send() call.
* In some error flow cases, svc_rdma_wc_send() releases @ctxt.
*
- * Note there is potential for starvation when the Send Queue is
- * full because there is no order to when waiting threads are
- * awoken. The transport is typically provisioned with a deep
- * enough Send Queue that SQ exhaustion should be a rare event.
- *
* Return values:
* %0: @ctxt's WR chain was posted successfully
* %-ENOTCONN: The connection was lost
@@ -357,47 +465,21 @@ int svc_rdma_post_send(struct svcxprt_rdma *rdma,
might_sleep();
/* Sync the transport header buffer */
- ib_dma_sync_single_for_device(rdma->sc_pd->device,
+ ib_dma_sync_single_for_device(rdma->sc_cm_id->device,
send_wr->sg_list[0].addr,
send_wr->sg_list[0].length,
DMA_TO_DEVICE);
- /* If the SQ is full, wait until an SQ entry is available */
- while (!test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags)) {
- if (atomic_sub_return(sqecount, &rdma->sc_sq_avail) < 0) {
- svc_rdma_wake_send_waiters(rdma, sqecount);
-
- /* When the transport is torn down, assume
- * ib_drain_sq() will trigger enough Send
- * completions to wake us. The XPT_CLOSE test
- * above should then cause the while loop to
- * exit.
- */
- percpu_counter_inc(&svcrdma_stat_sq_starve);
- trace_svcrdma_sq_full(rdma, &cid);
- wait_event(rdma->sc_send_wait,
- atomic_read(&rdma->sc_sq_avail) > 0);
- trace_svcrdma_sq_retry(rdma, &cid);
- continue;
- }
-
- trace_svcrdma_post_send(ctxt);
- ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr);
- if (ret) {
- trace_svcrdma_sq_post_err(rdma, &cid, ret);
- svc_xprt_deferred_close(&rdma->sc_xprt);
-
- /* If even one WR was posted, there will be a
- * Send completion that bumps sc_sq_avail.
- */
- if (bad_wr == first_wr) {
- svc_rdma_wake_send_waiters(rdma, sqecount);
- break;
- }
- }
- return 0;
- }
- return -ENOTCONN;
+ ret = svc_rdma_sq_wait(rdma, &cid, sqecount);
+ if (ret < 0)
+ return ret;
+
+ trace_svcrdma_post_send(ctxt);
+ ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr);
+ if (ret)
+ return svc_rdma_post_send_err(rdma, &cid, bad_wr,
+ first_wr, sqecount, ret);
+ return 0;
}
/**
@@ -858,7 +940,8 @@ int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
/* The svc_rqst and all resources it owns are released as soon as
* svc_rdma_sendto returns. Transfer pages under I/O to the ctxt
- * so they are released by the Send completion handler.
+ * so they are released only after Send completion, and not by
+ * svc_rqst_release_pages().
*/
static void svc_rdma_save_io_pages(struct svc_rqst *rqstp,
struct svc_rdma_send_ctxt *ctxt)
@@ -870,9 +953,6 @@ static void svc_rdma_save_io_pages(struct svc_rqst *rqstp,
ctxt->sc_pages[i] = rqstp->rq_respages[i];
rqstp->rq_respages[i] = NULL;
}
-
- /* Prevent svc_xprt_release from releasing pages in rq_pages */
- rqstp->rq_next_page = rqstp->rq_respages;
}
/* Prepare the portion of the RPC Reply that will be transmitted
@@ -976,6 +1056,12 @@ void svc_rdma_send_error_msg(struct svcxprt_rdma *rdma,
sctxt->sc_send_wr.num_sge = 1;
sctxt->sc_send_wr.opcode = IB_WR_SEND;
sctxt->sc_sges[0].length = sctxt->sc_hdrbuf.len;
+
+ /* Ensure only the error message is posted, not any previously
+ * prepared Write chunk WRs.
+ */
+ sctxt->sc_wr_chain = &sctxt->sc_send_wr;
+ sctxt->sc_sqecount = 1;
if (svc_rdma_post_send(rdma, sctxt))
goto put_ctxt;
return;
@@ -1023,7 +1109,7 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
if (!p)
goto put_ctxt;
- ret = svc_rdma_send_write_list(rdma, rctxt, &rqstp->rq_res);
+ ret = svc_rdma_prepare_write_list(rdma, rctxt, sctxt, &rqstp->rq_res);
if (ret < 0)
goto put_ctxt;
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index 9b623849723e..f18bc60d9f4f 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -179,6 +179,7 @@ static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv,
init_llist_head(&cma_xprt->sc_recv_ctxts);
init_llist_head(&cma_xprt->sc_rw_ctxts);
init_waitqueue_head(&cma_xprt->sc_send_wait);
+ init_waitqueue_head(&cma_xprt->sc_sq_ticket_wait);
spin_lock_init(&cma_xprt->sc_lock);
spin_lock_init(&cma_xprt->sc_rq_dto_lock);
@@ -414,7 +415,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
struct ib_qp_init_attr qp_attr;
struct ib_device *dev;
int ret = 0;
- RPC_IFDEBUG(struct sockaddr *sap);
listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt);
clear_bit(XPT_CONN, &xprt->xpt_flags);
@@ -478,6 +478,8 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
if (newxprt->sc_sq_depth > dev->attrs.max_qp_wr)
newxprt->sc_sq_depth = dev->attrs.max_qp_wr;
atomic_set(&newxprt->sc_sq_avail, newxprt->sc_sq_depth);
+ atomic_set(&newxprt->sc_sq_ticket_head, 0);
+ atomic_set(&newxprt->sc_sq_ticket_tail, 0);
newxprt->sc_pd = ib_alloc_pd(dev, 0);
if (IS_ERR(newxprt->sc_pd)) {
@@ -560,18 +562,20 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
goto errout;
}
-#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
- dprintk("svcrdma: new connection accepted on device %s:\n", dev->name);
- sap = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr;
- dprintk(" local address : %pIS:%u\n", sap, rpc_get_port(sap));
- sap = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr;
- dprintk(" remote address : %pIS:%u\n", sap, rpc_get_port(sap));
- dprintk(" max_sge : %d\n", newxprt->sc_max_send_sges);
- dprintk(" sq_depth : %d\n", newxprt->sc_sq_depth);
- dprintk(" rdma_rw_ctxs : %d\n", ctxts);
- dprintk(" max_requests : %d\n", newxprt->sc_max_requests);
- dprintk(" ord : %d\n", conn_param.initiator_depth);
-#endif
+ if (IS_ENABLED(CONFIG_SUNRPC_DEBUG)) {
+ struct sockaddr *sap;
+
+ dprintk("svcrdma: new connection accepted on device %s:\n", dev->name);
+ sap = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr;
+ dprintk(" local address : %pIS:%u\n", sap, rpc_get_port(sap));
+ sap = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr;
+ dprintk(" remote address : %pIS:%u\n", sap, rpc_get_port(sap));
+ dprintk(" max_sge : %d\n", newxprt->sc_max_send_sges);
+ dprintk(" sq_depth : %d\n", newxprt->sc_sq_depth);
+ dprintk(" rdma_rw_ctxs : %d\n", ctxts);
+ dprintk(" max_requests : %d\n", newxprt->sc_max_requests);
+ dprintk(" ord : %d\n", conn_param.initiator_depth);
+ }
return &newxprt->sc_xprt;
@@ -648,7 +652,8 @@ static int svc_rdma_has_wspace(struct svc_xprt *xprt)
* If there are already waiters on the SQ,
* return false.
*/
- if (waitqueue_active(&rdma->sc_send_wait))
+ if (waitqueue_active(&rdma->sc_send_wait) ||
+ waitqueue_active(&rdma->sc_sq_ticket_wait))
return 0;
/* Otherwise return true. */