diff options
author | David Howells <dhowells@redhat.com> | 2019-11-15 13:30:32 +0000 |
---|---|---|
committer | David Howells <dhowells@redhat.com> | 2019-10-31 15:12:34 +0000 |
commit | 8cefc107ca54c8b06438b7dc9cc08bc0a11d5b98 (patch) | |
tree | 3e01800bf202ddff7841a717ddc47d1e5771768a /fs | |
parent | f94df9890e98f2090c6a8d70c795134863b70201 (diff) | |
download | lwn-8cefc107ca54c8b06438b7dc9cc08bc0a11d5b98.tar.gz lwn-8cefc107ca54c8b06438b7dc9cc08bc0a11d5b98.zip |
pipe: Use head and tail pointers for the ring, not cursor and length
Convert pipes to use head and tail pointers for the buffer ring rather than
pointer and length as the latter requires two atomic ops to update (or a
combined op) whereas the former only requires one.
(1) The head pointer is the point at which production occurs and points to
the slot in which the next buffer will be placed. This is equivalent
to pipe->curbuf + pipe->nrbufs.
The head pointer belongs to the write-side.
(2) The tail pointer is the point at which consumption occurs. It points
to the next slot to be consumed. This is equivalent to pipe->curbuf.
The tail pointer belongs to the read-side.
(3) head and tail are allowed to run to UINT_MAX and wrap naturally. They
are only masked off when the array is being accessed, e.g.:
pipe->bufs[head & mask]
This means that it is not necessary to have a dead slot in the ring as
head == tail isn't ambiguous.
(4) The ring is empty if "head == tail".
A helper, pipe_empty(), is provided for this.
(5) The occupancy of the ring is "head - tail".
A helper, pipe_occupancy(), is provided for this.
(6) The number of free slots in the ring is "pipe->ring_size - occupancy".
A helper, pipe_space_for_user() is provided to indicate how many slots
userspace may use.
(7) The ring is full if "head - tail >= pipe->ring_size".
A helper, pipe_full(), is provided for this.
Signed-off-by: David Howells <dhowells@redhat.com>
Diffstat (limited to 'fs')
-rw-r--r-- | fs/fuse/dev.c | 31 | ||||
-rw-r--r-- | fs/pipe.c | 170 | ||||
-rw-r--r-- | fs/splice.c | 190 |
3 files changed, 232 insertions, 159 deletions
diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c index dadd617d826c..c56011f95a87 100644 --- a/fs/fuse/dev.c +++ b/fs/fuse/dev.c @@ -703,7 +703,7 @@ static int fuse_copy_fill(struct fuse_copy_state *cs) cs->pipebufs++; cs->nr_segs--; } else { - if (cs->nr_segs == cs->pipe->buffers) + if (cs->nr_segs >= cs->pipe->ring_size) return -EIO; page = alloc_page(GFP_HIGHUSER); @@ -879,7 +879,7 @@ static int fuse_ref_page(struct fuse_copy_state *cs, struct page *page, struct pipe_buffer *buf; int err; - if (cs->nr_segs == cs->pipe->buffers) + if (cs->nr_segs >= cs->pipe->ring_size) return -EIO; err = unlock_request(cs->req); @@ -1341,7 +1341,7 @@ static ssize_t fuse_dev_splice_read(struct file *in, loff_t *ppos, if (!fud) return -EPERM; - bufs = kvmalloc_array(pipe->buffers, sizeof(struct pipe_buffer), + bufs = kvmalloc_array(pipe->ring_size, sizeof(struct pipe_buffer), GFP_KERNEL); if (!bufs) return -ENOMEM; @@ -1353,7 +1353,7 @@ static ssize_t fuse_dev_splice_read(struct file *in, loff_t *ppos, if (ret < 0) goto out; - if (pipe->nrbufs + cs.nr_segs > pipe->buffers) { + if (pipe_occupancy(pipe->head, pipe->tail) + cs.nr_segs > pipe->ring_size) { ret = -EIO; goto out; } @@ -1935,6 +1935,7 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe, struct file *out, loff_t *ppos, size_t len, unsigned int flags) { + unsigned int head, tail, mask, count; unsigned nbuf; unsigned idx; struct pipe_buffer *bufs; @@ -1949,8 +1950,12 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe, pipe_lock(pipe); - bufs = kvmalloc_array(pipe->nrbufs, sizeof(struct pipe_buffer), - GFP_KERNEL); + head = pipe->head; + tail = pipe->tail; + mask = pipe->ring_size - 1; + count = head - tail; + + bufs = kvmalloc_array(count, sizeof(struct pipe_buffer), GFP_KERNEL); if (!bufs) { pipe_unlock(pipe); return -ENOMEM; @@ -1958,8 +1963,8 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe, nbuf = 0; rem = 0; - for (idx = 0; idx < pipe->nrbufs && rem < len; idx++) - rem += pipe->bufs[(pipe->curbuf + idx) & (pipe->buffers - 1)].len; + for (idx = tail; idx < head && rem < len; idx++) + rem += pipe->bufs[idx & mask].len; ret = -EINVAL; if (rem < len) @@ -1970,16 +1975,16 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe, struct pipe_buffer *ibuf; struct pipe_buffer *obuf; - BUG_ON(nbuf >= pipe->buffers); - BUG_ON(!pipe->nrbufs); - ibuf = &pipe->bufs[pipe->curbuf]; + BUG_ON(nbuf >= pipe->ring_size); + BUG_ON(tail == head); + ibuf = &pipe->bufs[tail & mask]; obuf = &bufs[nbuf]; if (rem >= ibuf->len) { *obuf = *ibuf; ibuf->ops = NULL; - pipe->curbuf = (pipe->curbuf + 1) & (pipe->buffers - 1); - pipe->nrbufs--; + tail++; + pipe->tail = tail; } else { if (!pipe_buf_get(pipe, ibuf)) goto out_free; diff --git a/fs/pipe.c b/fs/pipe.c index 8a2ab2f974bd..e9b361cb093e 100644 --- a/fs/pipe.c +++ b/fs/pipe.c @@ -43,10 +43,12 @@ unsigned long pipe_user_pages_hard; unsigned long pipe_user_pages_soft = PIPE_DEF_BUFFERS * INR_OPEN_CUR; /* - * We use a start+len construction, which provides full use of the - * allocated memory. - * -- Florian Coosmann (FGC) - * + * We use head and tail indices that aren't masked off, except at the point of + * dereference, but rather they're allowed to wrap naturally. This means there + * isn't a dead spot in the buffer, but the ring has to be a power of two and + * <= 2^31. + * -- David Howells 2019-09-23. + * * Reads with count = 0 should always return 0. * -- Julian Bradfield 1999-06-07. * @@ -285,10 +287,12 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to) ret = 0; __pipe_lock(pipe); for (;;) { - int bufs = pipe->nrbufs; - if (bufs) { - int curbuf = pipe->curbuf; - struct pipe_buffer *buf = pipe->bufs + curbuf; + unsigned int head = pipe->head; + unsigned int tail = pipe->tail; + unsigned int mask = pipe->ring_size - 1; + + if (!pipe_empty(head, tail)) { + struct pipe_buffer *buf = &pipe->bufs[tail & mask]; size_t chars = buf->len; size_t written; int error; @@ -321,17 +325,17 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to) if (!buf->len) { pipe_buf_release(pipe, buf); - curbuf = (curbuf + 1) & (pipe->buffers - 1); - pipe->curbuf = curbuf; - pipe->nrbufs = --bufs; + tail++; + pipe->tail = tail; do_wakeup = 1; } total_len -= chars; if (!total_len) break; /* common path: read succeeded */ + if (!pipe_empty(head, tail)) /* More to do? */ + continue; } - if (bufs) /* More to do? */ - continue; + if (!pipe->writers) break; if (!pipe->waiting_writers) { @@ -380,6 +384,7 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from) { struct file *filp = iocb->ki_filp; struct pipe_inode_info *pipe = filp->private_data; + unsigned int head, tail, max_usage, mask; ssize_t ret = 0; int do_wakeup = 0; size_t total_len = iov_iter_count(from); @@ -397,12 +402,15 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from) goto out; } + tail = pipe->tail; + head = pipe->head; + max_usage = pipe->ring_size; + mask = pipe->ring_size - 1; + /* We try to merge small writes */ chars = total_len & (PAGE_SIZE-1); /* size of the last buffer */ - if (pipe->nrbufs && chars != 0) { - int lastbuf = (pipe->curbuf + pipe->nrbufs - 1) & - (pipe->buffers - 1); - struct pipe_buffer *buf = pipe->bufs + lastbuf; + if (!pipe_empty(head, tail) && chars != 0) { + struct pipe_buffer *buf = &pipe->bufs[(head - 1) & mask]; int offset = buf->offset + buf->len; if (pipe_buf_can_merge(buf) && offset + chars <= PAGE_SIZE) { @@ -423,18 +431,16 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from) } for (;;) { - int bufs; - if (!pipe->readers) { send_sig(SIGPIPE, current, 0); if (!ret) ret = -EPIPE; break; } - bufs = pipe->nrbufs; - if (bufs < pipe->buffers) { - int newbuf = (pipe->curbuf + bufs) & (pipe->buffers-1); - struct pipe_buffer *buf = pipe->bufs + newbuf; + + tail = pipe->tail; + if (!pipe_full(head, tail, max_usage)) { + struct pipe_buffer *buf = &pipe->bufs[head & mask]; struct page *page = pipe->tmp_page; int copied; @@ -470,14 +476,19 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from) buf->ops = &packet_pipe_buf_ops; buf->flags = PIPE_BUF_FLAG_PACKET; } - pipe->nrbufs = ++bufs; + + head++; + pipe->head = head; pipe->tmp_page = NULL; if (!iov_iter_count(from)) break; } - if (bufs < pipe->buffers) + + if (!pipe_full(head, tail, max_usage)) continue; + + /* Wait for buffer space to become available. */ if (filp->f_flags & O_NONBLOCK) { if (!ret) ret = -EAGAIN; @@ -515,17 +526,19 @@ out: static long pipe_ioctl(struct file *filp, unsigned int cmd, unsigned long arg) { struct pipe_inode_info *pipe = filp->private_data; - int count, buf, nrbufs; + int count, head, tail, mask; switch (cmd) { case FIONREAD: __pipe_lock(pipe); count = 0; - buf = pipe->curbuf; - nrbufs = pipe->nrbufs; - while (--nrbufs >= 0) { - count += pipe->bufs[buf].len; - buf = (buf+1) & (pipe->buffers - 1); + head = pipe->head; + tail = pipe->tail; + mask = pipe->ring_size - 1; + + while (tail != head) { + count += pipe->bufs[tail & mask].len; + tail++; } __pipe_unlock(pipe); @@ -541,21 +554,25 @@ pipe_poll(struct file *filp, poll_table *wait) { __poll_t mask; struct pipe_inode_info *pipe = filp->private_data; - int nrbufs; + unsigned int head = READ_ONCE(pipe->head); + unsigned int tail = READ_ONCE(pipe->tail); poll_wait(filp, &pipe->wait, wait); + BUG_ON(pipe_occupancy(head, tail) > pipe->ring_size); + /* Reading only -- no need for acquiring the semaphore. */ - nrbufs = pipe->nrbufs; mask = 0; if (filp->f_mode & FMODE_READ) { - mask = (nrbufs > 0) ? EPOLLIN | EPOLLRDNORM : 0; + if (!pipe_empty(head, tail)) + mask |= EPOLLIN | EPOLLRDNORM; if (!pipe->writers && filp->f_version != pipe->w_counter) mask |= EPOLLHUP; } if (filp->f_mode & FMODE_WRITE) { - mask |= (nrbufs < pipe->buffers) ? EPOLLOUT | EPOLLWRNORM : 0; + if (!pipe_full(head, tail, pipe->ring_size)) + mask |= EPOLLOUT | EPOLLWRNORM; /* * Most Unices do not set EPOLLERR for FIFOs but on Linux they * behave exactly like pipes for poll(). @@ -679,7 +696,7 @@ struct pipe_inode_info *alloc_pipe_info(void) if (pipe->bufs) { init_waitqueue_head(&pipe->wait); pipe->r_counter = pipe->w_counter = 1; - pipe->buffers = pipe_bufs; + pipe->ring_size = pipe_bufs; pipe->user = user; mutex_init(&pipe->mutex); return pipe; @@ -697,9 +714,9 @@ void free_pipe_info(struct pipe_inode_info *pipe) { int i; - (void) account_pipe_buffers(pipe->user, pipe->buffers, 0); + (void) account_pipe_buffers(pipe->user, pipe->ring_size, 0); free_uid(pipe->user); - for (i = 0; i < pipe->buffers; i++) { + for (i = 0; i < pipe->ring_size; i++) { struct pipe_buffer *buf = pipe->bufs + i; if (buf->ops) pipe_buf_release(pipe, buf); @@ -880,7 +897,7 @@ SYSCALL_DEFINE1(pipe, int __user *, fildes) static int wait_for_partner(struct pipe_inode_info *pipe, unsigned int *cnt) { - int cur = *cnt; + int cur = *cnt; while (cur == *cnt) { pipe_wait(pipe); @@ -955,7 +972,7 @@ static int fifo_open(struct inode *inode, struct file *filp) } } break; - + case FMODE_WRITE: /* * O_WRONLY @@ -975,7 +992,7 @@ static int fifo_open(struct inode *inode, struct file *filp) goto err_wr; } break; - + case FMODE_READ | FMODE_WRITE: /* * O_RDWR @@ -1054,14 +1071,14 @@ unsigned int round_pipe_size(unsigned long size) static long pipe_set_size(struct pipe_inode_info *pipe, unsigned long arg) { struct pipe_buffer *bufs; - unsigned int size, nr_pages; + unsigned int size, nr_slots, head, tail, mask, n; unsigned long user_bufs; long ret = 0; size = round_pipe_size(arg); - nr_pages = size >> PAGE_SHIFT; + nr_slots = size >> PAGE_SHIFT; - if (!nr_pages) + if (!nr_slots) return -EINVAL; /* @@ -1071,13 +1088,13 @@ static long pipe_set_size(struct pipe_inode_info *pipe, unsigned long arg) * Decreasing the pipe capacity is always permitted, even * if the user is currently over a limit. */ - if (nr_pages > pipe->buffers && + if (nr_slots > pipe->ring_size && size > pipe_max_size && !capable(CAP_SYS_RESOURCE)) return -EPERM; - user_bufs = account_pipe_buffers(pipe->user, pipe->buffers, nr_pages); + user_bufs = account_pipe_buffers(pipe->user, pipe->ring_size, nr_slots); - if (nr_pages > pipe->buffers && + if (nr_slots > pipe->ring_size && (too_many_pipe_buffers_hard(user_bufs) || too_many_pipe_buffers_soft(user_bufs)) && is_unprivileged_user()) { @@ -1086,17 +1103,21 @@ static long pipe_set_size(struct pipe_inode_info *pipe, unsigned long arg) } /* - * We can shrink the pipe, if arg >= pipe->nrbufs. Since we don't - * expect a lot of shrink+grow operations, just free and allocate - * again like we would do for growing. If the pipe currently + * We can shrink the pipe, if arg is greater than the ring occupancy. + * Since we don't expect a lot of shrink+grow operations, just free and + * allocate again like we would do for growing. If the pipe currently * contains more buffers than arg, then return busy. */ - if (nr_pages < pipe->nrbufs) { + mask = pipe->ring_size - 1; + head = pipe->head; + tail = pipe->tail; + n = pipe_occupancy(pipe->head, pipe->tail); + if (nr_slots < n) { ret = -EBUSY; goto out_revert_acct; } - bufs = kcalloc(nr_pages, sizeof(*bufs), + bufs = kcalloc(nr_slots, sizeof(*bufs), GFP_KERNEL_ACCOUNT | __GFP_NOWARN); if (unlikely(!bufs)) { ret = -ENOMEM; @@ -1105,33 +1126,36 @@ static long pipe_set_size(struct pipe_inode_info *pipe, unsigned long arg) /* * The pipe array wraps around, so just start the new one at zero - * and adjust the indexes. + * and adjust the indices. */ - if (pipe->nrbufs) { - unsigned int tail; - unsigned int head; - - tail = pipe->curbuf + pipe->nrbufs; - if (tail < pipe->buffers) - tail = 0; - else - tail &= (pipe->buffers - 1); - - head = pipe->nrbufs - tail; - if (head) - memcpy(bufs, pipe->bufs + pipe->curbuf, head * sizeof(struct pipe_buffer)); - if (tail) - memcpy(bufs + head, pipe->bufs, tail * sizeof(struct pipe_buffer)); + if (n > 0) { + unsigned int h = head & mask; + unsigned int t = tail & mask; + if (h > t) { + memcpy(bufs, pipe->bufs + t, + n * sizeof(struct pipe_buffer)); + } else { + unsigned int tsize = pipe->ring_size - t; + if (h > 0) + memcpy(bufs + tsize, pipe->bufs, + h * sizeof(struct pipe_buffer)); + memcpy(bufs, pipe->bufs + t, + tsize * sizeof(struct pipe_buffer)); + } } - pipe->curbuf = 0; + head = n; + tail = 0; + kfree(pipe->bufs); pipe->bufs = bufs; - pipe->buffers = nr_pages; - return nr_pages * PAGE_SIZE; + pipe->ring_size = nr_slots; + pipe->tail = tail; + pipe->head = head; + return pipe->ring_size * PAGE_SIZE; out_revert_acct: - (void) account_pipe_buffers(pipe->user, nr_pages, pipe->buffers); + (void) account_pipe_buffers(pipe->user, nr_slots, pipe->ring_size); return ret; } @@ -1161,7 +1185,7 @@ long pipe_fcntl(struct file *file, unsigned int cmd, unsigned long arg) ret = pipe_set_size(pipe, arg); break; case F_GETPIPE_SZ: - ret = pipe->buffers * PAGE_SIZE; + ret = pipe->ring_size * PAGE_SIZE; break; default: ret = -EINVAL; diff --git a/fs/splice.c b/fs/splice.c index 98412721f056..22b0a47a35c0 100644 --- a/fs/splice.c +++ b/fs/splice.c @@ -185,6 +185,9 @@ ssize_t splice_to_pipe(struct pipe_inode_info *pipe, struct splice_pipe_desc *spd) { unsigned int spd_pages = spd->nr_pages; + unsigned int tail = pipe->tail; + unsigned int head = pipe->head; + unsigned int mask = pipe->ring_size - 1; int ret = 0, page_nr = 0; if (!spd_pages) @@ -196,9 +199,8 @@ ssize_t splice_to_pipe(struct pipe_inode_info *pipe, goto out; } - while (pipe->nrbufs < pipe->buffers) { - int newbuf = (pipe->curbuf + pipe->nrbufs) & (pipe->buffers - 1); - struct pipe_buffer *buf = pipe->bufs + newbuf; + while (!pipe_full(head, tail, pipe->ring_size)) { + struct pipe_buffer *buf = &pipe->bufs[head & mask]; buf->page = spd->pages[page_nr]; buf->offset = spd->partial[page_nr].offset; @@ -207,7 +209,8 @@ ssize_t splice_to_pipe(struct pipe_inode_info *pipe, buf->ops = spd->ops; buf->flags = 0; - pipe->nrbufs++; + head++; + pipe->head = head; page_nr++; ret += buf->len; @@ -228,17 +231,19 @@ EXPORT_SYMBOL_GPL(splice_to_pipe); ssize_t add_to_pipe(struct pipe_inode_info *pipe, struct pipe_buffer *buf) { + unsigned int head = pipe->head; + unsigned int tail = pipe->tail; + unsigned int mask = pipe->ring_size - 1; int ret; if (unlikely(!pipe->readers)) { send_sig(SIGPIPE, current, 0); ret = -EPIPE; - } else if (pipe->nrbufs == pipe->buffers) { + } else if (pipe_full(head, tail, pipe->ring_size)) { ret = -EAGAIN; } else { - int newbuf = (pipe->curbuf + pipe->nrbufs) & (pipe->buffers - 1); - pipe->bufs[newbuf] = *buf; - pipe->nrbufs++; + pipe->bufs[head & mask] = *buf; + pipe->head = head + 1; return buf->len; } pipe_buf_release(pipe, buf); @@ -252,14 +257,14 @@ EXPORT_SYMBOL(add_to_pipe); */ int splice_grow_spd(const struct pipe_inode_info *pipe, struct splice_pipe_desc *spd) { - unsigned int buffers = READ_ONCE(pipe->buffers); + unsigned int max_usage = READ_ONCE(pipe->ring_size); - spd->nr_pages_max = buffers; - if (buffers <= PIPE_DEF_BUFFERS) + spd->nr_pages_max = max_usage; + if (max_usage <= PIPE_DEF_BUFFERS) return 0; - spd->pages = kmalloc_array(buffers, sizeof(struct page *), GFP_KERNEL); - spd->partial = kmalloc_array(buffers, sizeof(struct partial_page), + spd->pages = kmalloc_array(max_usage, sizeof(struct page *), GFP_KERNEL); + spd->partial = kmalloc_array(max_usage, sizeof(struct partial_page), GFP_KERNEL); if (spd->pages && spd->partial) @@ -298,10 +303,11 @@ ssize_t generic_file_splice_read(struct file *in, loff_t *ppos, { struct iov_iter to; struct kiocb kiocb; - int idx, ret; + unsigned int i_head; + int ret; iov_iter_pipe(&to, READ, pipe, len); - idx = to.idx; + i_head = to.head; init_sync_kiocb(&kiocb, in); kiocb.ki_pos = *ppos; ret = call_read_iter(in, &kiocb, &to); @@ -309,7 +315,7 @@ ssize_t generic_file_splice_read(struct file *in, loff_t *ppos, *ppos = kiocb.ki_pos; file_accessed(in); } else if (ret < 0) { - to.idx = idx; + to.head = i_head; to.iov_offset = 0; iov_iter_advance(&to, 0); /* to free what was emitted */ /* @@ -370,11 +376,12 @@ static ssize_t default_file_splice_read(struct file *in, loff_t *ppos, struct iov_iter to; struct page **pages; unsigned int nr_pages; + unsigned int mask; size_t offset, base, copied = 0; ssize_t res; int i; - if (pipe->nrbufs == pipe->buffers) + if (pipe_full(pipe->head, pipe->tail, pipe->ring_size)) return -EAGAIN; /* @@ -400,8 +407,9 @@ static ssize_t default_file_splice_read(struct file *in, loff_t *ppos, } } - pipe->bufs[to.idx].offset = offset; - pipe->bufs[to.idx].len -= offset; + mask = pipe->ring_size - 1; + pipe->bufs[to.head & mask].offset = offset; + pipe->bufs[to.head & mask].len -= offset; for (i = 0; i < nr_pages; i++) { size_t this_len = min_t(size_t, len, PAGE_SIZE - offset); @@ -443,7 +451,8 @@ static int pipe_to_sendpage(struct pipe_inode_info *pipe, more = (sd->flags & SPLICE_F_MORE) ? MSG_MORE : 0; - if (sd->len < sd->total_len && pipe->nrbufs > 1) + if (sd->len < sd->total_len && + pipe_occupancy(pipe->head, pipe->tail) > 1) more |= MSG_SENDPAGE_NOTLAST; return file->f_op->sendpage(file, buf->page, buf->offset, @@ -481,10 +490,13 @@ static void wakeup_pipe_writers(struct pipe_inode_info *pipe) static int splice_from_pipe_feed(struct pipe_inode_info *pipe, struct splice_desc *sd, splice_actor *actor) { + unsigned int head = pipe->head; + unsigned int tail = pipe->tail; + unsigned int mask = pipe->ring_size - 1; int ret; - while (pipe->nrbufs) { - struct pipe_buffer *buf = pipe->bufs + pipe->curbuf; + while (!pipe_empty(tail, head)) { + struct pipe_buffer *buf = &pipe->bufs[tail & mask]; sd->len = buf->len; if (sd->len > sd->total_len) @@ -511,8 +523,8 @@ static int splice_from_pipe_feed(struct pipe_inode_info *pipe, struct splice_des if (!buf->len) { pipe_buf_release(pipe, buf); - pipe->curbuf = (pipe->curbuf + 1) & (pipe->buffers - 1); - pipe->nrbufs--; + tail++; + pipe->tail = tail; if (pipe->files) sd->need_wakeup = true; } @@ -543,7 +555,7 @@ static int splice_from_pipe_next(struct pipe_inode_info *pipe, struct splice_des if (signal_pending(current)) return -ERESTARTSYS; - while (!pipe->nrbufs) { + while (pipe_empty(pipe->head, pipe->tail)) { if (!pipe->writers) return 0; @@ -686,7 +698,7 @@ iter_file_splice_write(struct pipe_inode_info *pipe, struct file *out, .pos = *ppos, .u.file = out, }; - int nbufs = pipe->buffers; + int nbufs = pipe->ring_size; struct bio_vec *array = kcalloc(nbufs, sizeof(struct bio_vec), GFP_KERNEL); ssize_t ret; @@ -699,16 +711,19 @@ iter_file_splice_write(struct pipe_inode_info *pipe, struct file *out, splice_from_pipe_begin(&sd); while (sd.total_len) { struct iov_iter from; + unsigned int head = pipe->head; + unsigned int tail = pipe->tail; + unsigned int mask = pipe->ring_size - 1; size_t left; - int n, idx; + int n; ret = splice_from_pipe_next(pipe, &sd); if (ret <= 0) break; - if (unlikely(nbufs < pipe->buffers)) { + if (unlikely(nbufs < pipe->ring_size)) { kfree(array); - nbufs = pipe->buffers; + nbufs = pipe->ring_size; array = kcalloc(nbufs, sizeof(struct bio_vec), GFP_KERNEL); if (!array) { @@ -719,16 +734,13 @@ iter_file_splice_write(struct pipe_inode_info *pipe, struct file *out, /* build the vector */ left = sd.total_len; - for (n = 0, idx = pipe->curbuf; left && n < pipe->nrbufs; n++, idx++) { - struct pipe_buffer *buf = pipe->bufs + idx; + for (n = 0; !pipe_empty(head, tail) && left && n < nbufs; tail++, n++) { + struct pipe_buffer *buf = &pipe->bufs[tail & mask]; size_t this_len = buf->len; if (this_len > left) this_len = left; - if (idx == pipe->buffers - 1) - idx = -1; - ret = pipe_buf_confirm(pipe, buf); if (unlikely(ret)) { if (ret == -ENODATA) @@ -752,14 +764,15 @@ iter_file_splice_write(struct pipe_inode_info *pipe, struct file *out, *ppos = sd.pos; /* dismiss the fully eaten buffers, adjust the partial one */ + tail = pipe->tail; while (ret) { - struct pipe_buffer *buf = pipe->bufs + pipe->curbuf; + struct pipe_buffer *buf = &pipe->bufs[tail & mask]; if (ret >= buf->len) { ret -= buf->len; buf->len = 0; pipe_buf_release(pipe, buf); - pipe->curbuf = (pipe->curbuf + 1) & (pipe->buffers - 1); - pipe->nrbufs--; + tail++; + pipe->tail = tail; if (pipe->files) sd.need_wakeup = true; } else { @@ -942,15 +955,17 @@ ssize_t splice_direct_to_actor(struct file *in, struct splice_desc *sd, sd->flags &= ~SPLICE_F_NONBLOCK; more = sd->flags & SPLICE_F_MORE; - WARN_ON_ONCE(pipe->nrbufs != 0); + WARN_ON_ONCE(!pipe_empty(pipe->head, pipe->tail)); while (len) { + unsigned int p_space; size_t read_len; loff_t pos = sd->pos, prev_pos = pos; /* Don't try to read more the pipe has space for. */ - read_len = min_t(size_t, len, - (pipe->buffers - pipe->nrbufs) << PAGE_SHIFT); + p_space = pipe->ring_size - + pipe_occupancy(pipe->head, pipe->tail); + read_len = min_t(size_t, len, p_space << PAGE_SHIFT); ret = do_splice_to(in, &pos, pipe, read_len, flags); if (unlikely(ret <= 0)) goto out_release; @@ -989,7 +1004,7 @@ ssize_t splice_direct_to_actor(struct file *in, struct splice_desc *sd, } done: - pipe->nrbufs = pipe->curbuf = 0; + pipe->tail = pipe->head = 0; file_accessed(in); return bytes; @@ -998,8 +1013,8 @@ out_release: * If we did an incomplete transfer we must release * the pipe buffers in question: */ - for (i = 0; i < pipe->buffers; i++) { - struct pipe_buffer *buf = pipe->bufs + i; + for (i = 0; i < pipe->ring_size; i++) { + struct pipe_buffer *buf = &pipe->bufs[i]; if (buf->ops) pipe_buf_release(pipe, buf); @@ -1075,7 +1090,7 @@ static int wait_for_space(struct pipe_inode_info *pipe, unsigned flags) send_sig(SIGPIPE, current, 0); return -EPIPE; } - if (pipe->nrbufs != pipe->buffers) + if (!pipe_full(pipe->head, pipe->tail, pipe->ring_size)) return 0; if (flags & SPLICE_F_NONBLOCK) return -EAGAIN; @@ -1442,16 +1457,16 @@ static int ipipe_prep(struct pipe_inode_info *pipe, unsigned int flags) int ret; /* - * Check ->nrbufs without the inode lock first. This function + * Check the pipe occupancy without the inode lock first. This function * is speculative anyways, so missing one is ok. */ - if (pipe->nrbufs) + if (!pipe_empty(pipe->head, pipe->tail)) return 0; ret = 0; pipe_lock(pipe); - while (!pipe->nrbufs) { + while (pipe_empty(pipe->head, pipe->tail)) { if (signal_pending(current)) { ret = -ERESTARTSYS; break; @@ -1480,16 +1495,16 @@ static int opipe_prep(struct pipe_inode_info *pipe, unsigned int flags) int ret; /* - * Check ->nrbufs without the inode lock first. This function + * Check pipe occupancy without the inode lock first. This function * is speculative anyways, so missing one is ok. */ - if (pipe->nrbufs < pipe->buffers) + if (pipe_full(pipe->head, pipe->tail, pipe->ring_size)) return 0; ret = 0; pipe_lock(pipe); - while (pipe->nrbufs >= pipe->buffers) { + while (pipe_full(pipe->head, pipe->tail, pipe->ring_size)) { if (!pipe->readers) { send_sig(SIGPIPE, current, 0); ret = -EPIPE; @@ -1520,7 +1535,10 @@ static int splice_pipe_to_pipe(struct pipe_inode_info *ipipe, size_t len, unsigned int flags) { struct pipe_buffer *ibuf, *obuf; - int ret = 0, nbuf; + unsigned int i_head, o_head; + unsigned int i_tail, o_tail; + unsigned int i_mask, o_mask; + int ret = 0; bool input_wakeup = false; @@ -1540,7 +1558,14 @@ retry: */ pipe_double_lock(ipipe, opipe); + i_tail = ipipe->tail; + i_mask = ipipe->ring_size - 1; + o_head = opipe->head; + o_mask = opipe->ring_size - 1; + do { + size_t o_len; + if (!opipe->readers) { send_sig(SIGPIPE, current, 0); if (!ret) @@ -1548,14 +1573,18 @@ retry: break; } - if (!ipipe->nrbufs && !ipipe->writers) + i_head = ipipe->head; + o_tail = opipe->tail; + + if (pipe_empty(i_head, i_tail) && !ipipe->writers) break; /* * Cannot make any progress, because either the input * pipe is empty or the output pipe is full. */ - if (!ipipe->nrbufs || opipe->nrbufs >= opipe->buffers) { + if (pipe_empty(i_head, i_tail) || + pipe_full(o_head, o_tail, opipe->ring_size)) { /* Already processed some buffers, break */ if (ret) break; @@ -1575,9 +1604,8 @@ retry: goto retry; } - ibuf = ipipe->bufs + ipipe->curbuf; - nbuf = (opipe->curbuf + opipe->nrbufs) & (opipe->buffers - 1); - obuf = opipe->bufs + nbuf; + ibuf = &ipipe->bufs[i_tail & i_mask]; + obuf = &opipe->bufs[o_head & o_mask]; if (len >= ibuf->len) { /* @@ -1585,10 +1613,12 @@ retry: */ *obuf = *ibuf; ibuf->ops = NULL; - opipe->nrbufs++; - ipipe->curbuf = (ipipe->curbuf + 1) & (ipipe->buffers - 1); - ipipe->nrbufs--; + i_tail++; + ipipe->tail = i_tail; input_wakeup = true; + o_len = obuf->len; + o_head++; + opipe->head = o_head; } else { /* * Get a reference to this pipe buffer, @@ -1610,12 +1640,14 @@ retry: pipe_buf_mark_unmergeable(obuf); obuf->len = len; - opipe->nrbufs++; - ibuf->offset += obuf->len; - ibuf->len -= obuf->len; + ibuf->offset += len; + ibuf->len -= len; + o_len = len; + o_head++; + opipe->head = o_head; } - ret += obuf->len; - len -= obuf->len; + ret += o_len; + len -= o_len; } while (len); pipe_unlock(ipipe); @@ -1641,7 +1673,10 @@ static int link_pipe(struct pipe_inode_info *ipipe, size_t len, unsigned int flags) { struct pipe_buffer *ibuf, *obuf; - int ret = 0, i = 0, nbuf; + unsigned int i_head, o_head; + unsigned int i_tail, o_tail; + unsigned int i_mask, o_mask; + int ret = 0; /* * Potential ABBA deadlock, work around it by ordering lock @@ -1650,6 +1685,11 @@ static int link_pipe(struct pipe_inode_info *ipipe, */ pipe_double_lock(ipipe, opipe); + i_tail = ipipe->tail; + i_mask = ipipe->ring_size - 1; + o_head = opipe->head; + o_mask = opipe->ring_size - 1; + do { if (!opipe->readers) { send_sig(SIGPIPE, current, 0); @@ -1658,15 +1698,19 @@ static int link_pipe(struct pipe_inode_info *ipipe, break; } + i_head = ipipe->head; + o_tail = opipe->tail; + /* - * If we have iterated all input buffers or ran out of + * If we have iterated all input buffers or run out of * output room, break. */ - if (i >= ipipe->nrbufs || opipe->nrbufs >= opipe->buffers) + if (pipe_empty(i_head, i_tail) || + pipe_full(o_head, o_tail, opipe->ring_size)) break; - ibuf = ipipe->bufs + ((ipipe->curbuf + i) & (ipipe->buffers-1)); - nbuf = (opipe->curbuf + opipe->nrbufs) & (opipe->buffers - 1); + ibuf = &ipipe->bufs[i_tail & i_mask]; + obuf = &opipe->bufs[o_head & o_mask]; /* * Get a reference to this pipe buffer, @@ -1678,7 +1722,6 @@ static int link_pipe(struct pipe_inode_info *ipipe, break; } - obuf = opipe->bufs + nbuf; *obuf = *ibuf; /* @@ -1691,11 +1734,12 @@ static int link_pipe(struct pipe_inode_info *ipipe, if (obuf->len > len) obuf->len = len; - - opipe->nrbufs++; ret += obuf->len; len -= obuf->len; - i++; + + o_head++; + opipe->head = o_head; + i_tail++; } while (len); /* |