diff options
Diffstat (limited to 'fs/ceph')
-rw-r--r-- | fs/ceph/addr.c | 1259 | ||||
-rw-r--r-- | fs/ceph/dir.c | 45 | ||||
-rw-r--r-- | fs/ceph/inode.c | 31 | ||||
-rw-r--r-- | fs/ceph/mds_client.c | 2 | ||||
-rw-r--r-- | fs/ceph/mds_client.h | 3 | ||||
-rw-r--r-- | fs/ceph/super.c | 11 | ||||
-rw-r--r-- | fs/ceph/super.h | 2 |
7 files changed, 851 insertions, 502 deletions
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index f5224a566b69..29be367905a1 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -82,6 +82,7 @@ static bool ceph_dirty_folio(struct address_space *mapping, struct folio *folio) { struct inode *inode = mapping->host; struct ceph_client *cl = ceph_inode_to_client(inode); + struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); struct ceph_inode_info *ci; struct ceph_snap_context *snapc; @@ -92,6 +93,8 @@ static bool ceph_dirty_folio(struct address_space *mapping, struct folio *folio) return false; } + atomic64_inc(&mdsc->dirty_folios); + ci = ceph_inode(inode); /* dirty the head */ @@ -568,7 +571,36 @@ struct ceph_writeback_ctl u64 truncate_size; u32 truncate_seq; bool size_stable; + bool head_snapc; + struct ceph_snap_context *snapc; + struct ceph_snap_context *last_snapc; + + bool done; + bool should_loop; + bool range_whole; + pgoff_t start_index; + pgoff_t index; + pgoff_t end; + xa_mark_t tag; + + pgoff_t strip_unit_end; + unsigned int wsize; + unsigned int nr_folios; + unsigned int max_pages; + unsigned int locked_pages; + + int op_idx; + int num_ops; + u64 offset; + u64 len; + + struct folio_batch fbatch; + unsigned int processed_in_fbatch; + + bool from_pool; + struct page **pages; + struct page **data_pages; }; /* @@ -666,22 +698,23 @@ static u64 get_writepages_data_length(struct inode *inode, } /* - * Write a single page, but leave the page locked. + * Write a folio, but leave it locked. * * If we get a write error, mark the mapping for error, but still adjust the - * dirty page accounting (i.e., page is no longer dirty). + * dirty page accounting (i.e., folio is no longer dirty). */ -static int writepage_nounlock(struct page *page, struct writeback_control *wbc) +static int write_folio_nounlock(struct folio *folio, + struct writeback_control *wbc) { - struct folio *folio = page_folio(page); - struct inode *inode = page->mapping->host; + struct page *page = &folio->page; + struct inode *inode = folio->mapping->host; struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); struct ceph_client *cl = fsc->client; struct ceph_snap_context *snapc, *oldest; - loff_t page_off = page_offset(page); + loff_t page_off = folio_pos(folio); int err; - loff_t len = thp_size(page); + loff_t len = folio_size(folio); loff_t wlen; struct ceph_writeback_ctl ceph_wbc; struct ceph_osd_client *osdc = &fsc->client->osdc; @@ -689,27 +722,27 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc) bool caching = ceph_is_cache_enabled(inode); struct page *bounce_page = NULL; - doutc(cl, "%llx.%llx page %p idx %lu\n", ceph_vinop(inode), page, - page->index); + doutc(cl, "%llx.%llx folio %p idx %lu\n", ceph_vinop(inode), folio, + folio->index); if (ceph_inode_is_shutdown(inode)) return -EIO; /* verify this is a writeable snap context */ - snapc = page_snap_context(page); + snapc = page_snap_context(&folio->page); if (!snapc) { - doutc(cl, "%llx.%llx page %p not dirty?\n", ceph_vinop(inode), - page); + doutc(cl, "%llx.%llx folio %p not dirty?\n", ceph_vinop(inode), + folio); return 0; } oldest = get_oldest_context(inode, &ceph_wbc, snapc); if (snapc->seq > oldest->seq) { - doutc(cl, "%llx.%llx page %p snapc %p not writeable - noop\n", - ceph_vinop(inode), page, snapc); + doutc(cl, "%llx.%llx folio %p snapc %p not writeable - noop\n", + ceph_vinop(inode), folio, snapc); /* we should only noop if called by kswapd */ WARN_ON(!(current->flags & PF_MEMALLOC)); ceph_put_snap_context(oldest); - redirty_page_for_writepage(wbc, page); + folio_redirty_for_writepage(wbc, folio); return 0; } ceph_put_snap_context(oldest); @@ -726,8 +759,8 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc) len = ceph_wbc.i_size - page_off; wlen = IS_ENCRYPTED(inode) ? round_up(len, CEPH_FSCRYPT_BLOCK_SIZE) : len; - doutc(cl, "%llx.%llx page %p index %lu on %llu~%llu snapc %p seq %lld\n", - ceph_vinop(inode), page, page->index, page_off, wlen, snapc, + doutc(cl, "%llx.%llx folio %p index %lu on %llu~%llu snapc %p seq %lld\n", + ceph_vinop(inode), folio, folio->index, page_off, wlen, snapc, snapc->seq); if (atomic_long_inc_return(&fsc->writeback_count) > @@ -740,32 +773,32 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc) ceph_wbc.truncate_seq, ceph_wbc.truncate_size, true); if (IS_ERR(req)) { - redirty_page_for_writepage(wbc, page); + folio_redirty_for_writepage(wbc, folio); return PTR_ERR(req); } if (wlen < len) len = wlen; - set_page_writeback(page); + folio_start_writeback(folio); if (caching) - ceph_set_page_fscache(page); + ceph_set_page_fscache(&folio->page); ceph_fscache_write_to_cache(inode, page_off, len, caching); if (IS_ENCRYPTED(inode)) { - bounce_page = fscrypt_encrypt_pagecache_blocks(page, + bounce_page = fscrypt_encrypt_pagecache_blocks(folio, CEPH_FSCRYPT_BLOCK_SIZE, 0, GFP_NOFS); if (IS_ERR(bounce_page)) { - redirty_page_for_writepage(wbc, page); - end_page_writeback(page); + folio_redirty_for_writepage(wbc, folio); + folio_end_writeback(folio); ceph_osdc_put_request(req); return PTR_ERR(bounce_page); } } /* it may be a short write due to an object boundary */ - WARN_ON_ONCE(len > thp_size(page)); + WARN_ON_ONCE(len > folio_size(folio)); osd_req_op_extent_osd_data_pages(req, 0, bounce_page ? &bounce_page : &page, wlen, 0, false, false); @@ -791,25 +824,25 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc) if (err == -ERESTARTSYS) { /* killed by SIGKILL */ doutc(cl, "%llx.%llx interrupted page %p\n", - ceph_vinop(inode), page); - redirty_page_for_writepage(wbc, page); - end_page_writeback(page); + ceph_vinop(inode), folio); + folio_redirty_for_writepage(wbc, folio); + folio_end_writeback(folio); return err; } if (err == -EBLOCKLISTED) fsc->blocklisted = true; - doutc(cl, "%llx.%llx setting page/mapping error %d %p\n", - ceph_vinop(inode), err, page); + doutc(cl, "%llx.%llx setting mapping error %d %p\n", + ceph_vinop(inode), err, folio); mapping_set_error(&inode->i_data, err); wbc->pages_skipped++; } else { doutc(cl, "%llx.%llx cleaned page %p\n", - ceph_vinop(inode), page); + ceph_vinop(inode), folio); err = 0; /* vfs expects us to return 0 */ } - oldest = detach_page_private(page); + oldest = folio_detach_private(folio); WARN_ON_ONCE(oldest != snapc); - end_page_writeback(page); + folio_end_writeback(folio); ceph_put_wrbuffer_cap_refs(ci, 1, snapc); ceph_put_snap_context(snapc); /* page's reference */ @@ -820,32 +853,6 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc) return err; } -static int ceph_writepage(struct page *page, struct writeback_control *wbc) -{ - int err; - struct inode *inode = page->mapping->host; - BUG_ON(!inode); - ihold(inode); - - if (wbc->sync_mode == WB_SYNC_NONE && - ceph_inode_to_fs_client(inode)->write_congested) { - redirty_page_for_writepage(wbc, page); - return AOP_WRITEPAGE_ACTIVATE; - } - - folio_wait_private_2(page_folio(page)); /* [DEPRECATED] */ - - err = writepage_nounlock(page, wbc); - if (err == -ERESTARTSYS) { - /* direct memory reclaimer was killed by SIGKILL. return 0 - * to prevent caller from setting mapping/page error */ - err = 0; - } - unlock_page(page); - iput(inode); - return err; -} - /* * async writeback completion handler. * @@ -865,6 +872,7 @@ static void writepages_finish(struct ceph_osd_request *req) struct ceph_snap_context *snapc = req->r_snapc; struct address_space *mapping = inode->i_mapping; struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); + struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); unsigned int len = 0; bool remove_page; @@ -920,6 +928,12 @@ static void writepages_finish(struct ceph_osd_request *req) ceph_put_snap_context(detach_page_private(page)); end_page_writeback(page); + + if (atomic64_dec_return(&mdsc->dirty_folios) <= 0) { + wake_up_all(&mdsc->flush_end_wq); + WARN_ON(atomic64_read(&mdsc->dirty_folios) < 0); + } + doutc(cl, "unlocking %p\n", page); if (remove_page) @@ -949,36 +963,13 @@ static void writepages_finish(struct ceph_osd_request *req) ceph_dec_osd_stopping_blocker(fsc->mdsc); } -/* - * initiate async writeback - */ -static int ceph_writepages_start(struct address_space *mapping, - struct writeback_control *wbc) +static inline +bool is_forced_umount(struct address_space *mapping) { struct inode *inode = mapping->host; struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); struct ceph_client *cl = fsc->client; - struct ceph_vino vino = ceph_vino(inode); - pgoff_t index, start_index, end = -1; - struct ceph_snap_context *snapc = NULL, *last_snapc = NULL, *pgsnapc; - struct folio_batch fbatch; - int rc = 0; - unsigned int wsize = i_blocksize(inode); - struct ceph_osd_request *req = NULL; - struct ceph_writeback_ctl ceph_wbc; - bool should_loop, range_whole = false; - bool done = false; - bool caching = ceph_is_cache_enabled(inode); - xa_mark_t tag; - - if (wbc->sync_mode == WB_SYNC_NONE && - fsc->write_congested) - return 0; - - doutc(cl, "%llx.%llx (mode=%s)\n", ceph_vinop(inode), - wbc->sync_mode == WB_SYNC_NONE ? "NONE" : - (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD")); if (ceph_inode_is_shutdown(inode)) { if (ci->i_wrbuffer_ref > 0) { @@ -987,388 +978,733 @@ static int ceph_writepages_start(struct address_space *mapping, ceph_vinop(inode), ceph_ino(inode)); } mapping_set_error(mapping, -EIO); - return -EIO; /* we're in a forced umount, don't write! */ + return true; } + + return false; +} + +static inline +unsigned int ceph_define_write_size(struct address_space *mapping) +{ + struct inode *inode = mapping->host; + struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); + unsigned int wsize = i_blocksize(inode); + if (fsc->mount_options->wsize < wsize) wsize = fsc->mount_options->wsize; - folio_batch_init(&fbatch); + return wsize; +} + +static inline +void ceph_folio_batch_init(struct ceph_writeback_ctl *ceph_wbc) +{ + folio_batch_init(&ceph_wbc->fbatch); + ceph_wbc->processed_in_fbatch = 0; +} + +static inline +void ceph_folio_batch_reinit(struct ceph_writeback_ctl *ceph_wbc) +{ + folio_batch_release(&ceph_wbc->fbatch); + ceph_folio_batch_init(ceph_wbc); +} + +static inline +void ceph_init_writeback_ctl(struct address_space *mapping, + struct writeback_control *wbc, + struct ceph_writeback_ctl *ceph_wbc) +{ + ceph_wbc->snapc = NULL; + ceph_wbc->last_snapc = NULL; + + ceph_wbc->strip_unit_end = 0; + ceph_wbc->wsize = ceph_define_write_size(mapping); - start_index = wbc->range_cyclic ? mapping->writeback_index : 0; - index = start_index; + ceph_wbc->nr_folios = 0; + ceph_wbc->max_pages = 0; + ceph_wbc->locked_pages = 0; + + ceph_wbc->done = false; + ceph_wbc->should_loop = false; + ceph_wbc->range_whole = false; + + ceph_wbc->start_index = wbc->range_cyclic ? mapping->writeback_index : 0; + ceph_wbc->index = ceph_wbc->start_index; + ceph_wbc->end = -1; if (wbc->sync_mode == WB_SYNC_ALL || wbc->tagged_writepages) { - tag = PAGECACHE_TAG_TOWRITE; + ceph_wbc->tag = PAGECACHE_TAG_TOWRITE; } else { - tag = PAGECACHE_TAG_DIRTY; + ceph_wbc->tag = PAGECACHE_TAG_DIRTY; } -retry: + + ceph_wbc->op_idx = -1; + ceph_wbc->num_ops = 0; + ceph_wbc->offset = 0; + ceph_wbc->len = 0; + ceph_wbc->from_pool = false; + + ceph_folio_batch_init(ceph_wbc); + + ceph_wbc->pages = NULL; + ceph_wbc->data_pages = NULL; +} + +static inline +int ceph_define_writeback_range(struct address_space *mapping, + struct writeback_control *wbc, + struct ceph_writeback_ctl *ceph_wbc) +{ + struct inode *inode = mapping->host; + struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); + struct ceph_client *cl = fsc->client; + /* find oldest snap context with dirty data */ - snapc = get_oldest_context(inode, &ceph_wbc, NULL); - if (!snapc) { + ceph_wbc->snapc = get_oldest_context(inode, ceph_wbc, NULL); + if (!ceph_wbc->snapc) { /* hmm, why does writepages get called when there is no dirty data? */ doutc(cl, " no snap context with dirty data?\n"); - goto out; + return -ENODATA; } - doutc(cl, " oldest snapc is %p seq %lld (%d snaps)\n", snapc, - snapc->seq, snapc->num_snaps); - should_loop = false; - if (ceph_wbc.head_snapc && snapc != last_snapc) { + doutc(cl, " oldest snapc is %p seq %lld (%d snaps)\n", + ceph_wbc->snapc, ceph_wbc->snapc->seq, + ceph_wbc->snapc->num_snaps); + + ceph_wbc->should_loop = false; + + if (ceph_wbc->head_snapc && ceph_wbc->snapc != ceph_wbc->last_snapc) { /* where to start/end? */ if (wbc->range_cyclic) { - index = start_index; - end = -1; - if (index > 0) - should_loop = true; - doutc(cl, " cyclic, start at %lu\n", index); + ceph_wbc->index = ceph_wbc->start_index; + ceph_wbc->end = -1; + if (ceph_wbc->index > 0) + ceph_wbc->should_loop = true; + doutc(cl, " cyclic, start at %lu\n", ceph_wbc->index); } else { - index = wbc->range_start >> PAGE_SHIFT; - end = wbc->range_end >> PAGE_SHIFT; + ceph_wbc->index = wbc->range_start >> PAGE_SHIFT; + ceph_wbc->end = wbc->range_end >> PAGE_SHIFT; if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX) - range_whole = true; - doutc(cl, " not cyclic, %lu to %lu\n", index, end); + ceph_wbc->range_whole = true; + doutc(cl, " not cyclic, %lu to %lu\n", + ceph_wbc->index, ceph_wbc->end); } - } else if (!ceph_wbc.head_snapc) { + } else if (!ceph_wbc->head_snapc) { /* Do not respect wbc->range_{start,end}. Dirty pages * in that range can be associated with newer snapc. * They are not writeable until we write all dirty pages * associated with 'snapc' get written */ - if (index > 0) - should_loop = true; + if (ceph_wbc->index > 0) + ceph_wbc->should_loop = true; doutc(cl, " non-head snapc, range whole\n"); } - if (wbc->sync_mode == WB_SYNC_ALL || wbc->tagged_writepages) - tag_pages_for_writeback(mapping, index, end); + ceph_put_snap_context(ceph_wbc->last_snapc); + ceph_wbc->last_snapc = ceph_wbc->snapc; - ceph_put_snap_context(last_snapc); - last_snapc = snapc; + return 0; +} - while (!done && index <= end) { - int num_ops = 0, op_idx; - unsigned i, nr_folios, max_pages, locked_pages = 0; - struct page **pages = NULL, **data_pages; - struct page *page; - pgoff_t strip_unit_end = 0; - u64 offset = 0, len = 0; - bool from_pool = false; +static inline +bool has_writeback_done(struct ceph_writeback_ctl *ceph_wbc) +{ + return ceph_wbc->done && ceph_wbc->index > ceph_wbc->end; +} - max_pages = wsize >> PAGE_SHIFT; +static inline +bool can_next_page_be_processed(struct ceph_writeback_ctl *ceph_wbc, + unsigned index) +{ + return index < ceph_wbc->nr_folios && + ceph_wbc->locked_pages < ceph_wbc->max_pages; +} -get_more_pages: - nr_folios = filemap_get_folios_tag(mapping, &index, - end, tag, &fbatch); - doutc(cl, "pagevec_lookup_range_tag got %d\n", nr_folios); - if (!nr_folios && !locked_pages) - break; - for (i = 0; i < nr_folios && locked_pages < max_pages; i++) { - struct folio *folio = fbatch.folios[i]; +static +int ceph_check_page_before_write(struct address_space *mapping, + struct writeback_control *wbc, + struct ceph_writeback_ctl *ceph_wbc, + struct folio *folio) +{ + struct inode *inode = mapping->host; + struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); + struct ceph_client *cl = fsc->client; + struct ceph_snap_context *pgsnapc; - page = &folio->page; - doutc(cl, "? %p idx %lu\n", page, page->index); - if (locked_pages == 0) - lock_page(page); /* first page */ - else if (!trylock_page(page)) - break; + /* only dirty folios, or our accounting breaks */ + if (unlikely(!folio_test_dirty(folio) || folio->mapping != mapping)) { + doutc(cl, "!dirty or !mapping %p\n", folio); + return -ENODATA; + } - /* only dirty pages, or our accounting breaks */ - if (unlikely(!PageDirty(page)) || - unlikely(page->mapping != mapping)) { - doutc(cl, "!dirty or !mapping %p\n", page); - unlock_page(page); - continue; - } - /* only if matching snap context */ - pgsnapc = page_snap_context(page); - if (pgsnapc != snapc) { - doutc(cl, "page snapc %p %lld != oldest %p %lld\n", - pgsnapc, pgsnapc->seq, snapc, snapc->seq); - if (!should_loop && - !ceph_wbc.head_snapc && - wbc->sync_mode != WB_SYNC_NONE) - should_loop = true; - unlock_page(page); - continue; + /* only if matching snap context */ + pgsnapc = page_snap_context(&folio->page); + if (pgsnapc != ceph_wbc->snapc) { + doutc(cl, "folio snapc %p %lld != oldest %p %lld\n", + pgsnapc, pgsnapc->seq, + ceph_wbc->snapc, ceph_wbc->snapc->seq); + + if (!ceph_wbc->should_loop && !ceph_wbc->head_snapc && + wbc->sync_mode != WB_SYNC_NONE) + ceph_wbc->should_loop = true; + + return -ENODATA; + } + + if (folio_pos(folio) >= ceph_wbc->i_size) { + doutc(cl, "folio at %lu beyond eof %llu\n", + folio->index, ceph_wbc->i_size); + + if ((ceph_wbc->size_stable || + folio_pos(folio) >= i_size_read(inode)) && + folio_clear_dirty_for_io(folio)) + folio_invalidate(folio, 0, folio_size(folio)); + + return -ENODATA; + } + + if (ceph_wbc->strip_unit_end && + (folio->index > ceph_wbc->strip_unit_end)) { + doutc(cl, "end of strip unit %p\n", folio); + return -E2BIG; + } + + return 0; +} + +static inline +void __ceph_allocate_page_array(struct ceph_writeback_ctl *ceph_wbc, + unsigned int max_pages) +{ + ceph_wbc->pages = kmalloc_array(max_pages, + sizeof(*ceph_wbc->pages), + GFP_NOFS); + if (!ceph_wbc->pages) { + ceph_wbc->from_pool = true; + ceph_wbc->pages = mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS); + BUG_ON(!ceph_wbc->pages); + } +} + +static inline +void ceph_allocate_page_array(struct address_space *mapping, + struct ceph_writeback_ctl *ceph_wbc, + struct folio *folio) +{ + struct inode *inode = mapping->host; + struct ceph_inode_info *ci = ceph_inode(inode); + u64 objnum; + u64 objoff; + u32 xlen; + + /* prepare async write request */ + ceph_wbc->offset = (u64)folio_pos(folio); + ceph_calc_file_object_mapping(&ci->i_layout, + ceph_wbc->offset, ceph_wbc->wsize, + &objnum, &objoff, &xlen); + + ceph_wbc->num_ops = 1; + ceph_wbc->strip_unit_end = folio->index + ((xlen - 1) >> PAGE_SHIFT); + + BUG_ON(ceph_wbc->pages); + ceph_wbc->max_pages = calc_pages_for(0, (u64)xlen); + __ceph_allocate_page_array(ceph_wbc, ceph_wbc->max_pages); + + ceph_wbc->len = 0; +} + +static inline +bool is_folio_index_contiguous(const struct ceph_writeback_ctl *ceph_wbc, + const struct folio *folio) +{ + return folio->index == (ceph_wbc->offset + ceph_wbc->len) >> PAGE_SHIFT; +} + +static inline +bool is_num_ops_too_big(struct ceph_writeback_ctl *ceph_wbc) +{ + return ceph_wbc->num_ops >= + (ceph_wbc->from_pool ? CEPH_OSD_SLAB_OPS : CEPH_OSD_MAX_OPS); +} + +static inline +bool is_write_congestion_happened(struct ceph_fs_client *fsc) +{ + return atomic_long_inc_return(&fsc->writeback_count) > + CONGESTION_ON_THRESH(fsc->mount_options->congestion_kb); +} + +static inline int move_dirty_folio_in_page_array(struct address_space *mapping, + struct writeback_control *wbc, + struct ceph_writeback_ctl *ceph_wbc, struct folio *folio) +{ + struct inode *inode = mapping->host; + struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); + struct ceph_client *cl = fsc->client; + struct page **pages = ceph_wbc->pages; + unsigned int index = ceph_wbc->locked_pages; + gfp_t gfp_flags = ceph_wbc->locked_pages ? GFP_NOWAIT : GFP_NOFS; + + if (IS_ENCRYPTED(inode)) { + pages[index] = fscrypt_encrypt_pagecache_blocks(folio, + PAGE_SIZE, + 0, + gfp_flags); + if (IS_ERR(pages[index])) { + if (PTR_ERR(pages[index]) == -EINVAL) { + pr_err_client(cl, "inode->i_blkbits=%hhu\n", + inode->i_blkbits); } - if (page_offset(page) >= ceph_wbc.i_size) { - doutc(cl, "folio at %lu beyond eof %llu\n", - folio->index, ceph_wbc.i_size); - if ((ceph_wbc.size_stable || - folio_pos(folio) >= i_size_read(inode)) && - folio_clear_dirty_for_io(folio)) - folio_invalidate(folio, 0, - folio_size(folio)); + + /* better not fail on first page! */ + BUG_ON(ceph_wbc->locked_pages == 0); + + pages[index] = NULL; + return PTR_ERR(pages[index]); + } + } else { + pages[index] = &folio->page; + } + + ceph_wbc->locked_pages++; + + return 0; +} + +static +int ceph_process_folio_batch(struct address_space *mapping, + struct writeback_control *wbc, + struct ceph_writeback_ctl *ceph_wbc) +{ + struct inode *inode = mapping->host; + struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); + struct ceph_client *cl = fsc->client; + struct folio *folio = NULL; + unsigned i; + int rc = 0; + + for (i = 0; can_next_page_be_processed(ceph_wbc, i); i++) { + folio = ceph_wbc->fbatch.folios[i]; + + if (!folio) + continue; + + doutc(cl, "? %p idx %lu, folio_test_writeback %#x, " + "folio_test_dirty %#x, folio_test_locked %#x\n", + folio, folio->index, folio_test_writeback(folio), + folio_test_dirty(folio), + folio_test_locked(folio)); + + if (folio_test_writeback(folio) || + folio_test_private_2(folio) /* [DEPRECATED] */) { + doutc(cl, "waiting on writeback %p\n", folio); + folio_wait_writeback(folio); + folio_wait_private_2(folio); /* [DEPRECATED] */ + continue; + } + + if (ceph_wbc->locked_pages == 0) + folio_lock(folio); + else if (!folio_trylock(folio)) + break; + + rc = ceph_check_page_before_write(mapping, wbc, + ceph_wbc, folio); + if (rc == -ENODATA) { + rc = 0; + folio_unlock(folio); + ceph_wbc->fbatch.folios[i] = NULL; + continue; + } else if (rc == -E2BIG) { + rc = 0; + folio_unlock(folio); + ceph_wbc->fbatch.folios[i] = NULL; + break; + } + + if (!folio_clear_dirty_for_io(folio)) { + doutc(cl, "%p !folio_clear_dirty_for_io\n", folio); + folio_unlock(folio); + ceph_wbc->fbatch.folios[i] = NULL; + continue; + } + + /* + * We have something to write. If this is + * the first locked page this time through, + * calculate max possible write size and + * allocate a page array + */ + if (ceph_wbc->locked_pages == 0) { + ceph_allocate_page_array(mapping, ceph_wbc, folio); + } else if (!is_folio_index_contiguous(ceph_wbc, folio)) { + if (is_num_ops_too_big(ceph_wbc)) { + folio_redirty_for_writepage(wbc, folio); folio_unlock(folio); - continue; - } - if (strip_unit_end && (page->index > strip_unit_end)) { - doutc(cl, "end of strip unit %p\n", page); - unlock_page(page); break; } - if (folio_test_writeback(folio) || - folio_test_private_2(folio) /* [DEPRECATED] */) { - if (wbc->sync_mode == WB_SYNC_NONE) { - doutc(cl, "%p under writeback\n", folio); - folio_unlock(folio); - continue; - } - doutc(cl, "waiting on writeback %p\n", folio); - folio_wait_writeback(folio); - folio_wait_private_2(folio); /* [DEPRECATED] */ - } - if (!clear_page_dirty_for_io(page)) { - doutc(cl, "%p !clear_page_dirty_for_io\n", page); - unlock_page(page); - continue; - } + ceph_wbc->num_ops++; + ceph_wbc->offset = (u64)folio_pos(folio); + ceph_wbc->len = 0; + } - /* - * We have something to write. If this is - * the first locked page this time through, - * calculate max possinle write size and - * allocate a page array - */ - if (locked_pages == 0) { - u64 objnum; - u64 objoff; - u32 xlen; - - /* prepare async write request */ - offset = (u64)page_offset(page); - ceph_calc_file_object_mapping(&ci->i_layout, - offset, wsize, - &objnum, &objoff, - &xlen); - len = xlen; - - num_ops = 1; - strip_unit_end = page->index + - ((len - 1) >> PAGE_SHIFT); - - BUG_ON(pages); - max_pages = calc_pages_for(0, (u64)len); - pages = kmalloc_array(max_pages, - sizeof(*pages), - GFP_NOFS); - if (!pages) { - from_pool = true; - pages = mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS); - BUG_ON(!pages); - } - - len = 0; - } else if (page->index != - (offset + len) >> PAGE_SHIFT) { - if (num_ops >= (from_pool ? CEPH_OSD_SLAB_OPS : - CEPH_OSD_MAX_OPS)) { - redirty_page_for_writepage(wbc, page); - unlock_page(page); - break; - } - - num_ops++; - offset = (u64)page_offset(page); - len = 0; - } + /* note position of first page in fbatch */ + doutc(cl, "%llx.%llx will write folio %p idx %lu\n", + ceph_vinop(inode), folio, folio->index); - /* note position of first page in fbatch */ - doutc(cl, "%llx.%llx will write page %p idx %lu\n", - ceph_vinop(inode), page, page->index); - - if (atomic_long_inc_return(&fsc->writeback_count) > - CONGESTION_ON_THRESH( - fsc->mount_options->congestion_kb)) - fsc->write_congested = true; - - if (IS_ENCRYPTED(inode)) { - pages[locked_pages] = - fscrypt_encrypt_pagecache_blocks(page, - PAGE_SIZE, 0, - locked_pages ? GFP_NOWAIT : GFP_NOFS); - if (IS_ERR(pages[locked_pages])) { - if (PTR_ERR(pages[locked_pages]) == -EINVAL) - pr_err_client(cl, - "inode->i_blkbits=%hhu\n", - inode->i_blkbits); - /* better not fail on first page! */ - BUG_ON(locked_pages == 0); - pages[locked_pages] = NULL; - redirty_page_for_writepage(wbc, page); - unlock_page(page); - break; - } - ++locked_pages; - } else { - pages[locked_pages++] = page; - } + fsc->write_congested = is_write_congestion_happened(fsc); - fbatch.folios[i] = NULL; - len += thp_size(page); + rc = move_dirty_folio_in_page_array(mapping, wbc, ceph_wbc, + folio); + if (rc) { + folio_redirty_for_writepage(wbc, folio); + folio_unlock(folio); + break; } - /* did we get anything? */ - if (!locked_pages) - goto release_folios; - if (i) { - unsigned j, n = 0; - /* shift unused page to beginning of fbatch */ - for (j = 0; j < nr_folios; j++) { - if (!fbatch.folios[j]) - continue; - if (n < j) - fbatch.folios[n] = fbatch.folios[j]; - n++; - } - fbatch.nr = n; + ceph_wbc->fbatch.folios[i] = NULL; + ceph_wbc->len += folio_size(folio); + } - if (nr_folios && i == nr_folios && - locked_pages < max_pages) { - doutc(cl, "reached end fbatch, trying for more\n"); - folio_batch_release(&fbatch); - goto get_more_pages; - } + ceph_wbc->processed_in_fbatch = i; + + return rc; +} + +static inline +void ceph_shift_unused_folios_left(struct folio_batch *fbatch) +{ + unsigned j, n = 0; + + /* shift unused page to beginning of fbatch */ + for (j = 0; j < folio_batch_count(fbatch); j++) { + if (!fbatch->folios[j]) + continue; + + if (n < j) { + fbatch->folios[n] = fbatch->folios[j]; } + n++; + } + + fbatch->nr = n; +} + +static +int ceph_submit_write(struct address_space *mapping, + struct writeback_control *wbc, + struct ceph_writeback_ctl *ceph_wbc) +{ + struct inode *inode = mapping->host; + struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); + struct ceph_client *cl = fsc->client; + struct ceph_vino vino = ceph_vino(inode); + struct ceph_osd_request *req = NULL; + struct page *page = NULL; + bool caching = ceph_is_cache_enabled(inode); + u64 offset; + u64 len; + unsigned i; + new_request: - offset = ceph_fscrypt_page_offset(pages[0]); - len = wsize; + offset = ceph_fscrypt_page_offset(ceph_wbc->pages[0]); + len = ceph_wbc->wsize; + req = ceph_osdc_new_request(&fsc->client->osdc, + &ci->i_layout, vino, + offset, &len, 0, ceph_wbc->num_ops, + CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE, + ceph_wbc->snapc, ceph_wbc->truncate_seq, + ceph_wbc->truncate_size, false); + if (IS_ERR(req)) { req = ceph_osdc_new_request(&fsc->client->osdc, - &ci->i_layout, vino, - offset, &len, 0, num_ops, - CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE, - snapc, ceph_wbc.truncate_seq, - ceph_wbc.truncate_size, false); - if (IS_ERR(req)) { - req = ceph_osdc_new_request(&fsc->client->osdc, - &ci->i_layout, vino, - offset, &len, 0, - min(num_ops, - CEPH_OSD_SLAB_OPS), - CEPH_OSD_OP_WRITE, - CEPH_OSD_FLAG_WRITE, - snapc, ceph_wbc.truncate_seq, - ceph_wbc.truncate_size, true); - BUG_ON(IS_ERR(req)); + &ci->i_layout, vino, + offset, &len, 0, + min(ceph_wbc->num_ops, + CEPH_OSD_SLAB_OPS), + CEPH_OSD_OP_WRITE, + CEPH_OSD_FLAG_WRITE, + ceph_wbc->snapc, + ceph_wbc->truncate_seq, + ceph_wbc->truncate_size, + true); + BUG_ON(IS_ERR(req)); + } + + page = ceph_wbc->pages[ceph_wbc->locked_pages - 1]; + BUG_ON(len < ceph_fscrypt_page_offset(page) + thp_size(page) - offset); + + if (!ceph_inc_osd_stopping_blocker(fsc->mdsc)) { + for (i = 0; i < folio_batch_count(&ceph_wbc->fbatch); i++) { + struct folio *folio = ceph_wbc->fbatch.folios[i]; + + if (!folio) + continue; + + page = &folio->page; + redirty_page_for_writepage(wbc, page); + unlock_page(page); } - BUG_ON(len < ceph_fscrypt_page_offset(pages[locked_pages - 1]) + - thp_size(pages[locked_pages - 1]) - offset); - if (!ceph_inc_osd_stopping_blocker(fsc->mdsc)) { - rc = -EIO; - goto release_folios; + for (i = 0; i < ceph_wbc->locked_pages; i++) { + page = ceph_fscrypt_pagecache_page(ceph_wbc->pages[i]); + + if (!page) + continue; + + redirty_page_for_writepage(wbc, page); + unlock_page(page); } - req->r_callback = writepages_finish; - req->r_inode = inode; - - /* Format the osd request message and submit the write */ - len = 0; - data_pages = pages; - op_idx = 0; - for (i = 0; i < locked_pages; i++) { - struct page *page = ceph_fscrypt_pagecache_page(pages[i]); - - u64 cur_offset = page_offset(page); - /* - * Discontinuity in page range? Ceph can handle that by just passing - * multiple extents in the write op. - */ - if (offset + len != cur_offset) { - /* If it's full, stop here */ - if (op_idx + 1 == req->r_num_ops) - break; - - /* Kick off an fscache write with what we have so far. */ - ceph_fscache_write_to_cache(inode, offset, len, caching); - - /* Start a new extent */ - osd_req_op_extent_dup_last(req, op_idx, - cur_offset - offset); - doutc(cl, "got pages at %llu~%llu\n", offset, - len); - osd_req_op_extent_osd_data_pages(req, op_idx, - data_pages, len, 0, - from_pool, false); - osd_req_op_extent_update(req, op_idx, len); - - len = 0; - offset = cur_offset; - data_pages = pages + i; - op_idx++; - } - set_page_writeback(page); - if (caching) - ceph_set_page_fscache(page); - len += thp_size(page); + ceph_osdc_put_request(req); + return -EIO; + } + + req->r_callback = writepages_finish; + req->r_inode = inode; + + /* Format the osd request message and submit the write */ + len = 0; + ceph_wbc->data_pages = ceph_wbc->pages; + ceph_wbc->op_idx = 0; + for (i = 0; i < ceph_wbc->locked_pages; i++) { + u64 cur_offset; + + page = ceph_fscrypt_pagecache_page(ceph_wbc->pages[i]); + cur_offset = page_offset(page); + + /* + * Discontinuity in page range? Ceph can handle that by just passing + * multiple extents in the write op. + */ + if (offset + len != cur_offset) { + /* If it's full, stop here */ + if (ceph_wbc->op_idx + 1 == req->r_num_ops) + break; + + /* Kick off an fscache write with what we have so far. */ + ceph_fscache_write_to_cache(inode, offset, len, caching); + + /* Start a new extent */ + osd_req_op_extent_dup_last(req, ceph_wbc->op_idx, + cur_offset - offset); + + doutc(cl, "got pages at %llu~%llu\n", offset, len); + + osd_req_op_extent_osd_data_pages(req, ceph_wbc->op_idx, + ceph_wbc->data_pages, + len, 0, + ceph_wbc->from_pool, + false); + osd_req_op_extent_update(req, ceph_wbc->op_idx, len); + + len = 0; + offset = cur_offset; + ceph_wbc->data_pages = ceph_wbc->pages + i; + ceph_wbc->op_idx++; } - ceph_fscache_write_to_cache(inode, offset, len, caching); - - if (ceph_wbc.size_stable) { - len = min(len, ceph_wbc.i_size - offset); - } else if (i == locked_pages) { - /* writepages_finish() clears writeback pages - * according to the data length, so make sure - * data length covers all locked pages */ - u64 min_len = len + 1 - thp_size(page); - len = get_writepages_data_length(inode, pages[i - 1], - offset); - len = max(len, min_len); + + set_page_writeback(page); + + if (caching) + ceph_set_page_fscache(page); + + len += thp_size(page); + } + + ceph_fscache_write_to_cache(inode, offset, len, caching); + + if (ceph_wbc->size_stable) { + len = min(len, ceph_wbc->i_size - offset); + } else if (i == ceph_wbc->locked_pages) { + /* writepages_finish() clears writeback pages + * according to the data length, so make sure + * data length covers all locked pages */ + u64 min_len = len + 1 - thp_size(page); + len = get_writepages_data_length(inode, + ceph_wbc->pages[i - 1], + offset); + len = max(len, min_len); + } + + if (IS_ENCRYPTED(inode)) + len = round_up(len, CEPH_FSCRYPT_BLOCK_SIZE); + + doutc(cl, "got pages at %llu~%llu\n", offset, len); + + if (IS_ENCRYPTED(inode) && + ((offset | len) & ~CEPH_FSCRYPT_BLOCK_MASK)) { + pr_warn_client(cl, + "bad encrypted write offset=%lld len=%llu\n", + offset, len); + } + + osd_req_op_extent_osd_data_pages(req, ceph_wbc->op_idx, + ceph_wbc->data_pages, len, + 0, ceph_wbc->from_pool, false); + osd_req_op_extent_update(req, ceph_wbc->op_idx, len); + + BUG_ON(ceph_wbc->op_idx + 1 != req->r_num_ops); + + ceph_wbc->from_pool = false; + if (i < ceph_wbc->locked_pages) { + BUG_ON(ceph_wbc->num_ops <= req->r_num_ops); + ceph_wbc->num_ops -= req->r_num_ops; + ceph_wbc->locked_pages -= i; + + /* allocate new pages array for next request */ + ceph_wbc->data_pages = ceph_wbc->pages; + __ceph_allocate_page_array(ceph_wbc, ceph_wbc->locked_pages); + memcpy(ceph_wbc->pages, ceph_wbc->data_pages + i, + ceph_wbc->locked_pages * sizeof(*ceph_wbc->pages)); + memset(ceph_wbc->data_pages + i, 0, + ceph_wbc->locked_pages * sizeof(*ceph_wbc->pages)); + } else { + BUG_ON(ceph_wbc->num_ops != req->r_num_ops); + /* request message now owns the pages array */ + ceph_wbc->pages = NULL; + } + + req->r_mtime = inode_get_mtime(inode); + ceph_osdc_start_request(&fsc->client->osdc, req); + req = NULL; + + wbc->nr_to_write -= i; + if (ceph_wbc->pages) + goto new_request; + + return 0; +} + +static +void ceph_wait_until_current_writes_complete(struct address_space *mapping, + struct writeback_control *wbc, + struct ceph_writeback_ctl *ceph_wbc) +{ + struct page *page; + unsigned i, nr; + + if (wbc->sync_mode != WB_SYNC_NONE && + ceph_wbc->start_index == 0 && /* all dirty pages were checked */ + !ceph_wbc->head_snapc) { + ceph_wbc->index = 0; + + while ((ceph_wbc->index <= ceph_wbc->end) && + (nr = filemap_get_folios_tag(mapping, + &ceph_wbc->index, + (pgoff_t)-1, + PAGECACHE_TAG_WRITEBACK, + &ceph_wbc->fbatch))) { + for (i = 0; i < nr; i++) { + page = &ceph_wbc->fbatch.folios[i]->page; + if (page_snap_context(page) != ceph_wbc->snapc) + continue; + wait_on_page_writeback(page); + } + + folio_batch_release(&ceph_wbc->fbatch); + cond_resched(); } - if (IS_ENCRYPTED(inode)) - len = round_up(len, CEPH_FSCRYPT_BLOCK_SIZE); + } +} + +/* + * initiate async writeback + */ +static int ceph_writepages_start(struct address_space *mapping, + struct writeback_control *wbc) +{ + struct inode *inode = mapping->host; + struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); + struct ceph_client *cl = fsc->client; + struct ceph_writeback_ctl ceph_wbc; + int rc = 0; - doutc(cl, "got pages at %llu~%llu\n", offset, len); + if (wbc->sync_mode == WB_SYNC_NONE && fsc->write_congested) + return 0; - if (IS_ENCRYPTED(inode) && - ((offset | len) & ~CEPH_FSCRYPT_BLOCK_MASK)) - pr_warn_client(cl, - "bad encrypted write offset=%lld len=%llu\n", - offset, len); - - osd_req_op_extent_osd_data_pages(req, op_idx, data_pages, len, - 0, from_pool, false); - osd_req_op_extent_update(req, op_idx, len); - - BUG_ON(op_idx + 1 != req->r_num_ops); - - from_pool = false; - if (i < locked_pages) { - BUG_ON(num_ops <= req->r_num_ops); - num_ops -= req->r_num_ops; - locked_pages -= i; - - /* allocate new pages array for next request */ - data_pages = pages; - pages = kmalloc_array(locked_pages, sizeof(*pages), - GFP_NOFS); - if (!pages) { - from_pool = true; - pages = mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS); - BUG_ON(!pages); + doutc(cl, "%llx.%llx (mode=%s)\n", ceph_vinop(inode), + wbc->sync_mode == WB_SYNC_NONE ? "NONE" : + (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD")); + + if (is_forced_umount(mapping)) { + /* we're in a forced umount, don't write! */ + return -EIO; + } + + ceph_init_writeback_ctl(mapping, wbc, &ceph_wbc); + + if (!ceph_inc_osd_stopping_blocker(fsc->mdsc)) { + rc = -EIO; + goto out; + } + +retry: + rc = ceph_define_writeback_range(mapping, wbc, &ceph_wbc); + if (rc == -ENODATA) { + /* hmm, why does writepages get called when there + is no dirty data? */ + rc = 0; + goto dec_osd_stopping_blocker; + } + + if (wbc->sync_mode == WB_SYNC_ALL || wbc->tagged_writepages) + tag_pages_for_writeback(mapping, ceph_wbc.index, ceph_wbc.end); + + while (!has_writeback_done(&ceph_wbc)) { + ceph_wbc.locked_pages = 0; + ceph_wbc.max_pages = ceph_wbc.wsize >> PAGE_SHIFT; + +get_more_pages: + ceph_folio_batch_reinit(&ceph_wbc); + + ceph_wbc.nr_folios = filemap_get_folios_tag(mapping, + &ceph_wbc.index, + ceph_wbc.end, + ceph_wbc.tag, + &ceph_wbc.fbatch); + doutc(cl, "pagevec_lookup_range_tag for tag %#x got %d\n", + ceph_wbc.tag, ceph_wbc.nr_folios); + + if (!ceph_wbc.nr_folios && !ceph_wbc.locked_pages) + break; + +process_folio_batch: + rc = ceph_process_folio_batch(mapping, wbc, &ceph_wbc); + if (rc) + goto release_folios; + + /* did we get anything? */ + if (!ceph_wbc.locked_pages) + goto release_folios; + + if (ceph_wbc.processed_in_fbatch) { + ceph_shift_unused_folios_left(&ceph_wbc.fbatch); + + if (folio_batch_count(&ceph_wbc.fbatch) == 0 && + ceph_wbc.locked_pages < ceph_wbc.max_pages) { + doutc(cl, "reached end fbatch, trying for more\n"); + goto get_more_pages; } - memcpy(pages, data_pages + i, - locked_pages * sizeof(*pages)); - memset(data_pages + i, 0, - locked_pages * sizeof(*pages)); - } else { - BUG_ON(num_ops != req->r_num_ops); - index = pages[i - 1]->index + 1; - /* request message now owns the pages array */ - pages = NULL; } - req->r_mtime = inode_get_mtime(inode); - ceph_osdc_start_request(&fsc->client->osdc, req); - req = NULL; + rc = ceph_submit_write(mapping, wbc, &ceph_wbc); + if (rc) + goto release_folios; + + ceph_wbc.locked_pages = 0; + ceph_wbc.strip_unit_end = 0; - wbc->nr_to_write -= i; - if (pages) - goto new_request; + if (folio_batch_count(&ceph_wbc.fbatch) > 0) { + ceph_wbc.nr_folios = + folio_batch_count(&ceph_wbc.fbatch); + goto process_folio_batch; + } /* * We stop writing back only if we are not doing @@ -1377,61 +1713,44 @@ new_request: * we tagged for writeback prior to entering this loop. */ if (wbc->nr_to_write <= 0 && wbc->sync_mode == WB_SYNC_NONE) - done = true; + ceph_wbc.done = true; release_folios: doutc(cl, "folio_batch release on %d folios (%p)\n", - (int)fbatch.nr, fbatch.nr ? fbatch.folios[0] : NULL); - folio_batch_release(&fbatch); + (int)ceph_wbc.fbatch.nr, + ceph_wbc.fbatch.nr ? ceph_wbc.fbatch.folios[0] : NULL); + folio_batch_release(&ceph_wbc.fbatch); } - if (should_loop && !done) { + if (ceph_wbc.should_loop && !ceph_wbc.done) { /* more to do; loop back to beginning of file */ doutc(cl, "looping back to beginning of file\n"); - end = start_index - 1; /* OK even when start_index == 0 */ + /* OK even when start_index == 0 */ + ceph_wbc.end = ceph_wbc.start_index - 1; /* to write dirty pages associated with next snapc, * we need to wait until current writes complete */ - if (wbc->sync_mode != WB_SYNC_NONE && - start_index == 0 && /* all dirty pages were checked */ - !ceph_wbc.head_snapc) { - struct page *page; - unsigned i, nr; - index = 0; - while ((index <= end) && - (nr = filemap_get_folios_tag(mapping, &index, - (pgoff_t)-1, - PAGECACHE_TAG_WRITEBACK, - &fbatch))) { - for (i = 0; i < nr; i++) { - page = &fbatch.folios[i]->page; - if (page_snap_context(page) != snapc) - continue; - wait_on_page_writeback(page); - } - folio_batch_release(&fbatch); - cond_resched(); - } - } + ceph_wait_until_current_writes_complete(mapping, wbc, &ceph_wbc); - start_index = 0; - index = 0; + ceph_wbc.start_index = 0; + ceph_wbc.index = 0; goto retry; } - if (wbc->range_cyclic || (range_whole && wbc->nr_to_write > 0)) - mapping->writeback_index = index; + if (wbc->range_cyclic || (ceph_wbc.range_whole && wbc->nr_to_write > 0)) + mapping->writeback_index = ceph_wbc.index; + +dec_osd_stopping_blocker: + ceph_dec_osd_stopping_blocker(fsc->mdsc); out: - ceph_osdc_put_request(req); - ceph_put_snap_context(last_snapc); + ceph_put_snap_context(ceph_wbc.last_snapc); doutc(cl, "%llx.%llx dend - startone, rc = %d\n", ceph_vinop(inode), rc); + return rc; } - - /* * See if a given @snapc is either writeable, or already written. */ @@ -1447,56 +1766,56 @@ static int context_is_writeable_or_written(struct inode *inode, /** * ceph_find_incompatible - find an incompatible context and return it - * @page: page being dirtied + * @folio: folio being dirtied * - * We are only allowed to write into/dirty a page if the page is + * We are only allowed to write into/dirty a folio if the folio is * clean, or already dirty within the same snap context. Returns a * conflicting context if there is one, NULL if there isn't, or a * negative error code on other errors. * - * Must be called with page lock held. + * Must be called with folio lock held. */ static struct ceph_snap_context * -ceph_find_incompatible(struct page *page) +ceph_find_incompatible(struct folio *folio) { - struct inode *inode = page->mapping->host; + struct inode *inode = folio->mapping->host; struct ceph_client *cl = ceph_inode_to_client(inode); struct ceph_inode_info *ci = ceph_inode(inode); if (ceph_inode_is_shutdown(inode)) { - doutc(cl, " %llx.%llx page %p is shutdown\n", - ceph_vinop(inode), page); + doutc(cl, " %llx.%llx folio %p is shutdown\n", + ceph_vinop(inode), folio); return ERR_PTR(-ESTALE); } for (;;) { struct ceph_snap_context *snapc, *oldest; - wait_on_page_writeback(page); + folio_wait_writeback(folio); - snapc = page_snap_context(page); + snapc = page_snap_context(&folio->page); if (!snapc || snapc == ci->i_head_snapc) break; /* - * this page is already dirty in another (older) snap + * this folio is already dirty in another (older) snap * context! is it writeable now? */ oldest = get_oldest_context(inode, NULL, NULL); if (snapc->seq > oldest->seq) { /* not writeable -- return it for the caller to deal with */ ceph_put_snap_context(oldest); - doutc(cl, " %llx.%llx page %p snapc %p not current or oldest\n", - ceph_vinop(inode), page, snapc); + doutc(cl, " %llx.%llx folio %p snapc %p not current or oldest\n", + ceph_vinop(inode), folio, snapc); return ceph_get_snap_context(snapc); } ceph_put_snap_context(oldest); - /* yay, writeable, do it now (without dropping page lock) */ - doutc(cl, " %llx.%llx page %p snapc %p not current, but oldest\n", - ceph_vinop(inode), page, snapc); - if (clear_page_dirty_for_io(page)) { - int r = writepage_nounlock(page, NULL); + /* yay, writeable, do it now (without dropping folio lock) */ + doutc(cl, " %llx.%llx folio %p snapc %p not current, but oldest\n", + ceph_vinop(inode), folio, snapc); + if (folio_clear_dirty_for_io(folio)) { + int r = write_folio_nounlock(folio, NULL); if (r < 0) return ERR_PTR(r); } @@ -1511,7 +1830,7 @@ static int ceph_netfs_check_write_begin(struct file *file, loff_t pos, unsigned struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_snap_context *snapc; - snapc = ceph_find_incompatible(folio_page(*foliop, 0)); + snapc = ceph_find_incompatible(*foliop); if (snapc) { int r; @@ -1594,7 +1913,6 @@ out: const struct address_space_operations ceph_aops = { .read_folio = netfs_read_folio, .readahead = netfs_readahead, - .writepage = ceph_writepage, .writepages = ceph_writepages_start, .write_begin = ceph_write_begin, .write_end = ceph_write_end, @@ -1602,6 +1920,7 @@ const struct address_space_operations ceph_aops = { .invalidate_folio = ceph_invalidate_folio, .release_folio = netfs_release_folio, .direct_IO = noop_direct_IO, + .migrate_folio = filemap_migrate_folio, }; static void ceph_block_sigs(sigset_t *oldset) @@ -1718,8 +2037,8 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf) struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_file_info *fi = vma->vm_file->private_data; struct ceph_cap_flush *prealloc_cf; - struct page *page = vmf->page; - loff_t off = page_offset(page); + struct folio *folio = page_folio(vmf->page); + loff_t off = folio_pos(folio); loff_t size = i_size_read(inode); size_t len; int want, got, err; @@ -1736,10 +2055,10 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf) sb_start_pagefault(inode->i_sb); ceph_block_sigs(&oldset); - if (off + thp_size(page) <= size) - len = thp_size(page); + if (off + folio_size(folio) <= size) + len = folio_size(folio); else - len = offset_in_thp(page, size); + len = offset_in_folio(folio, size); doutc(cl, "%llx.%llx %llu~%zd getting caps i_size %llu\n", ceph_vinop(inode), off, len, size); @@ -1756,30 +2075,30 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf) doutc(cl, "%llx.%llx %llu~%zd got cap refs on %s\n", ceph_vinop(inode), off, len, ceph_cap_string(got)); - /* Update time before taking page lock */ + /* Update time before taking folio lock */ file_update_time(vma->vm_file); inode_inc_iversion_raw(inode); do { struct ceph_snap_context *snapc; - lock_page(page); + folio_lock(folio); - if (page_mkwrite_check_truncate(page, inode) < 0) { - unlock_page(page); + if (folio_mkwrite_check_truncate(folio, inode) < 0) { + folio_unlock(folio); ret = VM_FAULT_NOPAGE; break; } - snapc = ceph_find_incompatible(page); + snapc = ceph_find_incompatible(folio); if (!snapc) { - /* success. we'll keep the page locked. */ - set_page_dirty(page); + /* success. we'll keep the folio locked. */ + folio_mark_dirty(folio); ret = VM_FAULT_LOCKED; break; } - unlock_page(page); + folio_unlock(folio); if (IS_ERR(snapc)) { ret = VM_FAULT_SIGBUS; diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index 62e99e65250d..a321aa6d0ed2 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -141,17 +141,18 @@ __dcache_find_get_entry(struct dentry *parent, u64 idx, if (ptr_pos >= i_size_read(dir)) return NULL; - if (!cache_ctl->page || ptr_pgoff != cache_ctl->page->index) { + if (!cache_ctl->folio || ptr_pgoff != cache_ctl->folio->index) { ceph_readdir_cache_release(cache_ctl); - cache_ctl->page = find_lock_page(&dir->i_data, ptr_pgoff); - if (!cache_ctl->page) { - doutc(cl, " page %lu not found\n", ptr_pgoff); + cache_ctl->folio = filemap_lock_folio(&dir->i_data, ptr_pgoff); + if (IS_ERR(cache_ctl->folio)) { + cache_ctl->folio = NULL; + doutc(cl, " folio %lu not found\n", ptr_pgoff); return ERR_PTR(-EAGAIN); } /* reading/filling the cache are serialized by - i_rwsem, no need to use page lock */ - unlock_page(cache_ctl->page); - cache_ctl->dentries = kmap(cache_ctl->page); + i_rwsem, no need to use folio lock */ + folio_unlock(cache_ctl->folio); + cache_ctl->dentries = kmap_local_folio(cache_ctl->folio, 0); } cache_ctl->index = idx & idx_mask; @@ -1092,19 +1093,20 @@ out: return err; } -static int ceph_mkdir(struct mnt_idmap *idmap, struct inode *dir, - struct dentry *dentry, umode_t mode) +static struct dentry *ceph_mkdir(struct mnt_idmap *idmap, struct inode *dir, + struct dentry *dentry, umode_t mode) { struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(dir->i_sb); struct ceph_client *cl = mdsc->fsc->client; struct ceph_mds_request *req; struct ceph_acl_sec_ctx as_ctx = {}; + struct dentry *ret; int err; int op; err = ceph_wait_on_conflict_unlink(dentry); if (err) - return err; + return ERR_PTR(err); if (ceph_snap(dir) == CEPH_SNAPDIR) { /* mkdir .snap/foo is a MKSNAP */ @@ -1116,32 +1118,32 @@ static int ceph_mkdir(struct mnt_idmap *idmap, struct inode *dir, ceph_vinop(dir), dentry, dentry, mode); op = CEPH_MDS_OP_MKDIR; } else { - err = -EROFS; + ret = ERR_PTR(-EROFS); goto out; } if (op == CEPH_MDS_OP_MKDIR && ceph_quota_is_max_files_exceeded(dir)) { - err = -EDQUOT; + ret = ERR_PTR(-EDQUOT); goto out; } if ((op == CEPH_MDS_OP_MKSNAP) && IS_ENCRYPTED(dir) && !fscrypt_has_encryption_key(dir)) { - err = -ENOKEY; + ret = ERR_PTR(-ENOKEY); goto out; } req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS); if (IS_ERR(req)) { - err = PTR_ERR(req); + ret = ERR_CAST(req); goto out; } mode |= S_IFDIR; req->r_new_inode = ceph_new_inode(dir, dentry, &mode, &as_ctx); if (IS_ERR(req->r_new_inode)) { - err = PTR_ERR(req->r_new_inode); + ret = ERR_CAST(req->r_new_inode); req->r_new_inode = NULL; goto out_req; } @@ -1165,15 +1167,22 @@ static int ceph_mkdir(struct mnt_idmap *idmap, struct inode *dir, !req->r_reply_info.head->is_target && !req->r_reply_info.head->is_dentry) err = ceph_handle_notrace_create(dir, dentry); + ret = ERR_PTR(err); out_req: + if (!IS_ERR(ret) && req->r_dentry != dentry) + /* Some other dentry was spliced in */ + ret = dget(req->r_dentry); ceph_mdsc_put_request(req); out: - if (!err) + if (!IS_ERR(ret)) { + if (ret) + dentry = ret; ceph_init_inode_acls(d_inode(dentry), &as_ctx); - else + } else { d_drop(dentry); + } ceph_release_acl_sec_ctx(&as_ctx); - return err; + return ret; } static int ceph_link(struct dentry *old_dentry, struct inode *dir, diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 7dd6c2275085..6ac2bd555e86 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -1845,10 +1845,9 @@ static int readdir_prepopulate_inodes_only(struct ceph_mds_request *req, void ceph_readdir_cache_release(struct ceph_readdir_cache_control *ctl) { - if (ctl->page) { - kunmap(ctl->page); - put_page(ctl->page); - ctl->page = NULL; + if (ctl->folio) { + folio_release_kmap(ctl->folio, ctl->dentries); + ctl->folio = NULL; } } @@ -1862,20 +1861,26 @@ static int fill_readdir_cache(struct inode *dir, struct dentry *dn, unsigned idx = ctl->index % nsize; pgoff_t pgoff = ctl->index / nsize; - if (!ctl->page || pgoff != ctl->page->index) { + if (!ctl->folio || pgoff != ctl->folio->index) { ceph_readdir_cache_release(ctl); + fgf_t fgf = FGP_LOCK; + if (idx == 0) - ctl->page = grab_cache_page(&dir->i_data, pgoff); - else - ctl->page = find_lock_page(&dir->i_data, pgoff); - if (!ctl->page) { + fgf |= FGP_ACCESSED | FGP_CREAT; + + ctl->folio = __filemap_get_folio(&dir->i_data, pgoff, + fgf, mapping_gfp_mask(&dir->i_data)); + if (IS_ERR(ctl->folio)) { + int err = PTR_ERR(ctl->folio); + + ctl->folio = NULL; ctl->index = -1; - return idx == 0 ? -ENOMEM : 0; + return idx == 0 ? err : 0; } /* reading/filling the cache are serialized by - * i_rwsem, no need to use page lock */ - unlock_page(ctl->page); - ctl->dentries = kmap(ctl->page); + * i_rwsem, no need to use folio lock */ + folio_unlock(ctl->folio); + ctl->dentries = kmap_local_folio(ctl->folio, 0); if (idx == 0) memset(ctl->dentries, 0, PAGE_SIZE); } diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 54b3421501e9..230e0c3f341f 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -5489,6 +5489,8 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) spin_lock_init(&mdsc->stopping_lock); atomic_set(&mdsc->stopping_blockers, 0); init_completion(&mdsc->stopping_waiter); + atomic64_set(&mdsc->dirty_folios, 0); + init_waitqueue_head(&mdsc->flush_end_wq); init_waitqueue_head(&mdsc->session_close_wq); INIT_LIST_HEAD(&mdsc->waiting_for_map); mdsc->quotarealms_inodes = RB_ROOT; diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 7c9fee9e80d4..3e2a6fa7c19a 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -458,6 +458,9 @@ struct ceph_mds_client { atomic_t stopping_blockers; struct completion stopping_waiter; + atomic64_t dirty_folios; + wait_queue_head_t flush_end_wq; + atomic64_t quotarealms_count; /* # realms with quota */ /* * We keep a list of inodes we don't see in the mountpoint but that we diff --git a/fs/ceph/super.c b/fs/ceph/super.c index 4344e1f11806..f3951253e393 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -1563,6 +1563,17 @@ static void ceph_kill_sb(struct super_block *s) */ sync_filesystem(s); + if (atomic64_read(&mdsc->dirty_folios) > 0) { + wait_queue_head_t *wq = &mdsc->flush_end_wq; + long timeleft = wait_event_killable_timeout(*wq, + atomic64_read(&mdsc->dirty_folios) <= 0, + fsc->client->options->mount_timeout); + if (!timeleft) /* timed out */ + pr_warn_client(cl, "umount timed out, %ld\n", timeleft); + else if (timeleft < 0) /* killed */ + pr_warn_client(cl, "umount was killed, %ld\n", timeleft); + } + spin_lock(&mdsc->stopping_lock); mdsc->stopping = CEPH_MDSC_STOPPING_FLUSHING; wait = !!atomic_read(&mdsc->stopping_blockers); diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 7fa1e7be50e4..bb0db0cc8003 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -903,7 +903,7 @@ ceph_find_rw_context(struct ceph_file_info *cf) } struct ceph_readdir_cache_control { - struct page *page; + struct folio *folio; struct dentry **dentries; int index; }; |