diff options
author | Linus Torvalds <torvalds@linux-foundation.org> | 2014-10-15 06:46:01 +0200 |
---|---|---|
committer | Linus Torvalds <torvalds@linux-foundation.org> | 2014-10-15 06:46:01 +0200 |
commit | 6b0490816671b2f4126a99998c9bf3c8c0472de2 (patch) | |
tree | 016543455c2bdbe47b422fed6a3b4ffb991c97d6 /fs/ceph | |
parent | ce9d7f7b45930ed16c512aabcfe651d44f1c8619 (diff) | |
parent | 0bc62284ee3f2a228c64902ed818b6ba8e04159b (diff) | |
download | lwn-6b0490816671b2f4126a99998c9bf3c8c0472de2.tar.gz lwn-6b0490816671b2f4126a99998c9bf3c8c0472de2.zip |
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph updates from Sage Weil:
"There is the long-awaited discard support for RBD (Guangliang Zhao,
Josh Durgin), a pile of RBD bug fixes that didn't belong in late -rc's
(Ilya Dryomov, Li RongQing), a pile of fs/ceph bug fixes and
performance and debugging improvements (Yan, Zheng, John Spray), and a
smattering of cleanups (Chao Yu, Fabian Frederick, Joe Perches)"
* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (40 commits)
ceph: fix divide-by-zero in __validate_layout()
rbd: rbd workqueues need a resque worker
libceph: ceph-msgr workqueue needs a resque worker
ceph: fix bool assignments
libceph: separate multiple ops with commas in debugfs output
libceph: sync osd op definitions in rados.h
libceph: remove redundant declaration
ceph: additional debugfs output
ceph: export ceph_session_state_name function
ceph: include the initial ACL in create/mkdir/mknod MDS requests
ceph: use pagelist to present MDS request data
libceph: reference counting pagelist
ceph: fix llistxattr on symlink
ceph: send client metadata to MDS
ceph: remove redundant code for max file size verification
ceph: remove redundant io_iter_advance()
ceph: move ceph_find_inode() outside the s_mutex
ceph: request xattrs if xattr_version is zero
rbd: set the remaining discard properties to enable support
rbd: use helpers to handle discard for layered images correctly
...
Diffstat (limited to 'fs/ceph')
-rw-r--r-- | fs/ceph/acl.c | 125 | ||||
-rw-r--r-- | fs/ceph/addr.c | 9 | ||||
-rw-r--r-- | fs/ceph/caps.c | 37 | ||||
-rw-r--r-- | fs/ceph/debugfs.c | 46 | ||||
-rw-r--r-- | fs/ceph/dir.c | 41 | ||||
-rw-r--r-- | fs/ceph/file.c | 33 | ||||
-rw-r--r-- | fs/ceph/inode.c | 16 | ||||
-rw-r--r-- | fs/ceph/ioctl.c | 6 | ||||
-rw-r--r-- | fs/ceph/mds_client.c | 136 | ||||
-rw-r--r-- | fs/ceph/mds_client.h | 6 | ||||
-rw-r--r-- | fs/ceph/super.h | 27 | ||||
-rw-r--r-- | fs/ceph/xattr.c | 81 |
12 files changed, 386 insertions, 177 deletions
diff --git a/fs/ceph/acl.c b/fs/ceph/acl.c index cebf2ebefb55..5bd853ba44ff 100644 --- a/fs/ceph/acl.c +++ b/fs/ceph/acl.c @@ -169,36 +169,109 @@ out: return ret; } -int ceph_init_acl(struct dentry *dentry, struct inode *inode, struct inode *dir) +int ceph_pre_init_acls(struct inode *dir, umode_t *mode, + struct ceph_acls_info *info) { - struct posix_acl *default_acl, *acl; - umode_t new_mode = inode->i_mode; - int error; - - error = posix_acl_create(dir, &new_mode, &default_acl, &acl); - if (error) - return error; - - if (!default_acl && !acl) { - cache_no_acl(inode); - if (new_mode != inode->i_mode) { - struct iattr newattrs = { - .ia_mode = new_mode, - .ia_valid = ATTR_MODE, - }; - error = ceph_setattr(dentry, &newattrs); + struct posix_acl *acl, *default_acl; + size_t val_size1 = 0, val_size2 = 0; + struct ceph_pagelist *pagelist = NULL; + void *tmp_buf = NULL; + int err; + + err = posix_acl_create(dir, mode, &default_acl, &acl); + if (err) + return err; + + if (acl) { + int ret = posix_acl_equiv_mode(acl, mode); + if (ret < 0) + goto out_err; + if (ret == 0) { + posix_acl_release(acl); + acl = NULL; } - return error; } - if (default_acl) { - error = ceph_set_acl(inode, default_acl, ACL_TYPE_DEFAULT); - posix_acl_release(default_acl); - } + if (!default_acl && !acl) + return 0; + + if (acl) + val_size1 = posix_acl_xattr_size(acl->a_count); + if (default_acl) + val_size2 = posix_acl_xattr_size(default_acl->a_count); + + err = -ENOMEM; + tmp_buf = kmalloc(max(val_size1, val_size2), GFP_NOFS); + if (!tmp_buf) + goto out_err; + pagelist = kmalloc(sizeof(struct ceph_pagelist), GFP_NOFS); + if (!pagelist) + goto out_err; + ceph_pagelist_init(pagelist); + + err = ceph_pagelist_reserve(pagelist, PAGE_SIZE); + if (err) + goto out_err; + + ceph_pagelist_encode_32(pagelist, acl && default_acl ? 2 : 1); + if (acl) { - if (!error) - error = ceph_set_acl(inode, acl, ACL_TYPE_ACCESS); - posix_acl_release(acl); + size_t len = strlen(POSIX_ACL_XATTR_ACCESS); + err = ceph_pagelist_reserve(pagelist, len + val_size1 + 8); + if (err) + goto out_err; + ceph_pagelist_encode_string(pagelist, POSIX_ACL_XATTR_ACCESS, + len); + err = posix_acl_to_xattr(&init_user_ns, acl, + tmp_buf, val_size1); + if (err < 0) + goto out_err; + ceph_pagelist_encode_32(pagelist, val_size1); + ceph_pagelist_append(pagelist, tmp_buf, val_size1); } - return error; + if (default_acl) { + size_t len = strlen(POSIX_ACL_XATTR_DEFAULT); + err = ceph_pagelist_reserve(pagelist, len + val_size2 + 8); + if (err) + goto out_err; + err = ceph_pagelist_encode_string(pagelist, + POSIX_ACL_XATTR_DEFAULT, len); + err = posix_acl_to_xattr(&init_user_ns, default_acl, + tmp_buf, val_size2); + if (err < 0) + goto out_err; + ceph_pagelist_encode_32(pagelist, val_size2); + ceph_pagelist_append(pagelist, tmp_buf, val_size2); + } + + kfree(tmp_buf); + + info->acl = acl; + info->default_acl = default_acl; + info->pagelist = pagelist; + return 0; + +out_err: + posix_acl_release(acl); + posix_acl_release(default_acl); + kfree(tmp_buf); + if (pagelist) + ceph_pagelist_release(pagelist); + return err; +} + +void ceph_init_inode_acls(struct inode* inode, struct ceph_acls_info *info) +{ + if (!inode) + return; + ceph_set_cached_acl(inode, ACL_TYPE_ACCESS, info->acl); + ceph_set_cached_acl(inode, ACL_TYPE_DEFAULT, info->default_acl); +} + +void ceph_release_acls_info(struct ceph_acls_info *info) +{ + posix_acl_release(info->acl); + posix_acl_release(info->default_acl); + if (info->pagelist) + ceph_pagelist_release(info->pagelist); } diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 90b3954d48ed..18c06bbaf136 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -1076,12 +1076,6 @@ retry_locked: /* past end of file? */ i_size = inode->i_size; /* caller holds i_mutex */ - if (i_size + len > inode->i_sb->s_maxbytes) { - /* file is too big */ - r = -EINVAL; - goto fail; - } - if (page_off >= i_size || (pos_in_page == 0 && (pos+len) >= i_size && end_in_page - pos_in_page != PAGE_CACHE_SIZE)) { @@ -1099,9 +1093,6 @@ retry_locked: if (r < 0) goto fail_nosnap; goto retry_locked; - -fail: - up_read(&mdsc->snap_rwsem); fail_nosnap: unlock_page(page); return r; diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 6d1cd45dca89..659f2ea9e6f7 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -2397,12 +2397,12 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc, u64 max_size = le64_to_cpu(grant->max_size); struct timespec mtime, atime, ctime; int check_caps = 0; - bool wake = 0; - bool writeback = 0; - bool queue_trunc = 0; - bool queue_invalidate = 0; - bool queue_revalidate = 0; - bool deleted_inode = 0; + bool wake = false; + bool writeback = false; + bool queue_trunc = false; + bool queue_invalidate = false; + bool queue_revalidate = false; + bool deleted_inode = false; dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n", inode, cap, mds, seq, ceph_cap_string(newcaps)); @@ -2437,7 +2437,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc, /* there were locked pages.. invalidate later in a separate thread. */ if (ci->i_rdcache_revoking != ci->i_rdcache_gen) { - queue_invalidate = 1; + queue_invalidate = true; ci->i_rdcache_revoking = ci->i_rdcache_gen; } } @@ -2466,7 +2466,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc, set_nlink(inode, le32_to_cpu(grant->nlink)); if (inode->i_nlink == 0 && (newcaps & (CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL))) - deleted_inode = 1; + deleted_inode = true; } if ((issued & CEPH_CAP_XATTR_EXCL) == 0 && grant->xattr_len) { @@ -2487,7 +2487,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc, /* Do we need to revalidate our fscache cookie. Don't bother on the * first cache cap as we already validate at cookie creation time. */ if ((issued & CEPH_CAP_FILE_CACHE) && ci->i_rdcache_gen > 1) - queue_revalidate = 1; + queue_revalidate = true; if (newcaps & CEPH_CAP_ANY_RD) { /* ctime/mtime/atime? */ @@ -2516,7 +2516,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc, ci->i_wanted_max_size = 0; /* reset */ ci->i_requested_max_size = 0; } - wake = 1; + wake = true; } } @@ -2546,7 +2546,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc, ceph_cap_string(newcaps), ceph_cap_string(revoking)); if (revoking & used & CEPH_CAP_FILE_BUFFER) - writeback = 1; /* initiate writeback; will delay ack */ + writeback = true; /* initiate writeback; will delay ack */ else if (revoking == CEPH_CAP_FILE_CACHE && (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 && queue_invalidate) @@ -2572,7 +2572,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc, cap->implemented |= newcaps; /* add bits only, to * avoid stepping on a * pending revocation */ - wake = 1; + wake = true; } BUG_ON(cap->issued & ~cap->implemented); @@ -2586,7 +2586,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc, kick_flushing_inode_caps(mdsc, session, inode); up_read(&mdsc->snap_rwsem); if (newcaps & ~issued) - wake = 1; + wake = true; } if (queue_trunc) { @@ -3045,6 +3045,12 @@ void ceph_handle_caps(struct ceph_mds_session *session, } } + /* lookup ino */ + inode = ceph_find_inode(sb, vino); + ci = ceph_inode(inode); + dout(" op %s ino %llx.%llx inode %p\n", ceph_cap_op_name(op), vino.ino, + vino.snap, inode); + mutex_lock(&session->s_mutex); session->s_seq++; dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq, @@ -3053,11 +3059,6 @@ void ceph_handle_caps(struct ceph_mds_session *session, if (op == CEPH_CAP_OP_IMPORT) ceph_add_cap_releases(mdsc, session); - /* lookup ino */ - inode = ceph_find_inode(sb, vino); - ci = ceph_inode(inode); - dout(" op %s ino %llx.%llx inode %p\n", ceph_cap_op_name(op), vino.ino, - vino.snap, inode); if (!inode) { dout(" i don't have ino %llx\n", vino.ino); diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c index 5a743ac141ab..5d5a4c8c8496 100644 --- a/fs/ceph/debugfs.c +++ b/fs/ceph/debugfs.c @@ -158,10 +158,47 @@ static int dentry_lru_show(struct seq_file *s, void *ptr) return 0; } +static int mds_sessions_show(struct seq_file *s, void *ptr) +{ + struct ceph_fs_client *fsc = s->private; + struct ceph_mds_client *mdsc = fsc->mdsc; + struct ceph_auth_client *ac = fsc->client->monc.auth; + struct ceph_options *opt = fsc->client->options; + int mds = -1; + + mutex_lock(&mdsc->mutex); + + /* The 'num' portion of an 'entity name' */ + seq_printf(s, "global_id %llu\n", ac->global_id); + + /* The -o name mount argument */ + seq_printf(s, "name \"%s\"\n", opt->name ? opt->name : ""); + + /* The list of MDS session rank+state */ + for (mds = 0; mds < mdsc->max_sessions; mds++) { + struct ceph_mds_session *session = + __ceph_lookup_mds_session(mdsc, mds); + if (!session) { + continue; + } + mutex_unlock(&mdsc->mutex); + seq_printf(s, "mds.%d %s\n", + session->s_mds, + ceph_session_state_name(session->s_state)); + + ceph_put_mds_session(session); + mutex_lock(&mdsc->mutex); + } + mutex_unlock(&mdsc->mutex); + + return 0; +} + CEPH_DEFINE_SHOW_FUNC(mdsmap_show) CEPH_DEFINE_SHOW_FUNC(mdsc_show) CEPH_DEFINE_SHOW_FUNC(caps_show) CEPH_DEFINE_SHOW_FUNC(dentry_lru_show) +CEPH_DEFINE_SHOW_FUNC(mds_sessions_show) /* @@ -193,6 +230,7 @@ void ceph_fs_debugfs_cleanup(struct ceph_fs_client *fsc) debugfs_remove(fsc->debugfs_bdi); debugfs_remove(fsc->debugfs_congestion_kb); debugfs_remove(fsc->debugfs_mdsmap); + debugfs_remove(fsc->debugfs_mds_sessions); debugfs_remove(fsc->debugfs_caps); debugfs_remove(fsc->debugfs_mdsc); debugfs_remove(fsc->debugfs_dentry_lru); @@ -231,6 +269,14 @@ int ceph_fs_debugfs_init(struct ceph_fs_client *fsc) if (!fsc->debugfs_mdsmap) goto out; + fsc->debugfs_mds_sessions = debugfs_create_file("mds_sessions", + 0600, + fsc->client->debugfs_dir, + fsc, + &mds_sessions_show_fops); + if (!fsc->debugfs_mds_sessions) + goto out; + fsc->debugfs_mdsc = debugfs_create_file("mdsc", 0600, fsc->client->debugfs_dir, diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index b6c59eaa4f64..e6d63f8f98c0 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -682,17 +682,22 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry, struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb); struct ceph_mds_client *mdsc = fsc->mdsc; struct ceph_mds_request *req; + struct ceph_acls_info acls = {}; int err; if (ceph_snap(dir) != CEPH_NOSNAP) return -EROFS; + err = ceph_pre_init_acls(dir, &mode, &acls); + if (err < 0) + return err; + dout("mknod in dir %p dentry %p mode 0%ho rdev %d\n", dir, dentry, mode, rdev); req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_MKNOD, USE_AUTH_MDS); if (IS_ERR(req)) { - d_drop(dentry); - return PTR_ERR(req); + err = PTR_ERR(req); + goto out; } req->r_dentry = dget(dentry); req->r_num_caps = 2; @@ -701,15 +706,20 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry, req->r_args.mknod.rdev = cpu_to_le32(rdev); req->r_dentry_drop = CEPH_CAP_FILE_SHARED; req->r_dentry_unless = CEPH_CAP_FILE_EXCL; + if (acls.pagelist) { + req->r_pagelist = acls.pagelist; + acls.pagelist = NULL; + } err = ceph_mdsc_do_request(mdsc, dir, req); if (!err && !req->r_reply_info.head->is_dentry) err = ceph_handle_notrace_create(dir, dentry); ceph_mdsc_put_request(req); - +out: if (!err) - ceph_init_acl(dentry, dentry->d_inode, dir); + ceph_init_inode_acls(dentry->d_inode, &acls); else d_drop(dentry); + ceph_release_acls_info(&acls); return err; } @@ -733,8 +743,8 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry, dout("symlink in dir %p dentry %p to '%s'\n", dir, dentry, dest); req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SYMLINK, USE_AUTH_MDS); if (IS_ERR(req)) { - d_drop(dentry); - return PTR_ERR(req); + err = PTR_ERR(req); + goto out; } req->r_dentry = dget(dentry); req->r_num_caps = 2; @@ -746,9 +756,8 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry, if (!err && !req->r_reply_info.head->is_dentry) err = ceph_handle_notrace_create(dir, dentry); ceph_mdsc_put_request(req); - if (!err) - ceph_init_acl(dentry, dentry->d_inode, dir); - else +out: + if (err) d_drop(dentry); return err; } @@ -758,6 +767,7 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode) struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb); struct ceph_mds_client *mdsc = fsc->mdsc; struct ceph_mds_request *req; + struct ceph_acls_info acls = {}; int err = -EROFS; int op; @@ -772,6 +782,12 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode) } else { goto out; } + + mode |= S_IFDIR; + err = ceph_pre_init_acls(dir, &mode, &acls); + if (err < 0) + goto out; + req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS); if (IS_ERR(req)) { err = PTR_ERR(req); @@ -784,15 +800,20 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode) req->r_args.mkdir.mode = cpu_to_le32(mode); req->r_dentry_drop = CEPH_CAP_FILE_SHARED; req->r_dentry_unless = CEPH_CAP_FILE_EXCL; + if (acls.pagelist) { + req->r_pagelist = acls.pagelist; + acls.pagelist = NULL; + } err = ceph_mdsc_do_request(mdsc, dir, req); if (!err && !req->r_reply_info.head->is_dentry) err = ceph_handle_notrace_create(dir, dentry); ceph_mdsc_put_request(req); out: if (!err) - ceph_init_acl(dentry, dentry->d_inode, dir); + ceph_init_inode_acls(dentry->d_inode, &acls); else d_drop(dentry); + ceph_release_acls_info(&acls); return err; } diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 2eb02f80a0ab..d7e0da8366e6 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -235,6 +235,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry, struct ceph_mds_client *mdsc = fsc->mdsc; struct ceph_mds_request *req; struct dentry *dn; + struct ceph_acls_info acls = {}; int err; dout("atomic_open %p dentry %p '%.*s' %s flags %d mode 0%o\n", @@ -248,22 +249,34 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry, if (err < 0) return err; + if (flags & O_CREAT) { + err = ceph_pre_init_acls(dir, &mode, &acls); + if (err < 0) + return err; + } + /* do the open */ req = prepare_open_request(dir->i_sb, flags, mode); - if (IS_ERR(req)) - return PTR_ERR(req); + if (IS_ERR(req)) { + err = PTR_ERR(req); + goto out_acl; + } req->r_dentry = dget(dentry); req->r_num_caps = 2; if (flags & O_CREAT) { req->r_dentry_drop = CEPH_CAP_FILE_SHARED; req->r_dentry_unless = CEPH_CAP_FILE_EXCL; + if (acls.pagelist) { + req->r_pagelist = acls.pagelist; + acls.pagelist = NULL; + } } req->r_locked_dir = dir; /* caller holds dir->i_mutex */ err = ceph_mdsc_do_request(mdsc, (flags & (O_CREAT|O_TRUNC)) ? dir : NULL, req); if (err) - goto out_err; + goto out_req; err = ceph_handle_snapdir(req, dentry, err); if (err == 0 && (flags & O_CREAT) && !req->r_reply_info.head->is_dentry) @@ -278,7 +291,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry, dn = NULL; } if (err) - goto out_err; + goto out_req; if (dn || dentry->d_inode == NULL || S_ISLNK(dentry->d_inode->i_mode)) { /* make vfs retry on splice, ENOENT, or symlink */ dout("atomic_open finish_no_open on dn %p\n", dn); @@ -286,15 +299,17 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry, } else { dout("atomic_open finish_open on dn %p\n", dn); if (req->r_op == CEPH_MDS_OP_CREATE && req->r_reply_info.has_create_ino) { - ceph_init_acl(dentry, dentry->d_inode, dir); + ceph_init_inode_acls(dentry->d_inode, &acls); *opened |= FILE_CREATED; } err = finish_open(file, dentry, ceph_open, opened); } -out_err: +out_req: if (!req->r_err && req->r_target_inode) ceph_put_fmode(ceph_inode(req->r_target_inode), req->r_fmode); ceph_mdsc_put_request(req); +out_acl: + ceph_release_acls_info(&acls); dout("atomic_open result=%d\n", err); return err; } @@ -826,8 +841,7 @@ again: ceph_put_cap_refs(ci, got); if (checkeof && ret >= 0) { - int statret = ceph_do_getattr(inode, - CEPH_STAT_CAP_SIZE); + int statret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE, false); /* hit EOF or hole? */ if (statret == 0 && iocb->ki_pos < inode->i_size && @@ -836,7 +850,6 @@ again: ", reading more\n", iocb->ki_pos, inode->i_size); - iov_iter_advance(to, ret); read += ret; len -= ret; checkeof = 0; @@ -995,7 +1008,7 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int whence) mutex_lock(&inode->i_mutex); if (whence == SEEK_END || whence == SEEK_DATA || whence == SEEK_HOLE) { - ret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE); + ret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE, false); if (ret < 0) { offset = ret; goto out; diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 04c89c266cec..7b6139004401 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -766,7 +766,7 @@ static int fill_inode(struct inode *inode, /* xattrs */ /* note that if i_xattrs.len <= 4, i_xattrs.data will still be NULL. */ - if ((issued & CEPH_CAP_XATTR_EXCL) == 0 && + if ((ci->i_xattrs.version == 0 || !(issued & CEPH_CAP_XATTR_EXCL)) && le64_to_cpu(info->xattr_version) > ci->i_xattrs.version) { if (ci->i_xattrs.blob) ceph_buffer_put(ci->i_xattrs.blob); @@ -1813,10 +1813,6 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) if (ia_valid & ATTR_SIZE) { dout("setattr %p size %lld -> %lld\n", inode, inode->i_size, attr->ia_size); - if (attr->ia_size > inode->i_sb->s_maxbytes) { - err = -EINVAL; - goto out; - } if ((issued & CEPH_CAP_FILE_EXCL) && attr->ia_size > inode->i_size) { inode->i_size = attr->ia_size; @@ -1896,8 +1892,6 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) if (mask & CEPH_SETATTR_SIZE) __ceph_do_pending_vmtruncate(inode); return err; -out: - spin_unlock(&ci->i_ceph_lock); out_put: ceph_mdsc_put_request(req); return err; @@ -1907,7 +1901,7 @@ out_put: * Verify that we have a lease on the given mask. If not, * do a getattr against an mds. */ -int ceph_do_getattr(struct inode *inode, int mask) +int ceph_do_getattr(struct inode *inode, int mask, bool force) { struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb); struct ceph_mds_client *mdsc = fsc->mdsc; @@ -1920,7 +1914,7 @@ int ceph_do_getattr(struct inode *inode, int mask) } dout("do_getattr inode %p mask %s mode 0%o\n", inode, ceph_cap_string(mask), inode->i_mode); - if (ceph_caps_issued_mask(ceph_inode(inode), mask, 1)) + if (!force && ceph_caps_issued_mask(ceph_inode(inode), mask, 1)) return 0; req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETATTR, USE_ANY_MDS); @@ -1948,7 +1942,7 @@ int ceph_permission(struct inode *inode, int mask) if (mask & MAY_NOT_BLOCK) return -ECHILD; - err = ceph_do_getattr(inode, CEPH_CAP_AUTH_SHARED); + err = ceph_do_getattr(inode, CEPH_CAP_AUTH_SHARED, false); if (!err) err = generic_permission(inode, mask); @@ -1966,7 +1960,7 @@ int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry, struct ceph_inode_info *ci = ceph_inode(inode); int err; - err = ceph_do_getattr(inode, CEPH_STAT_CAP_INODE_ALL); + err = ceph_do_getattr(inode, CEPH_STAT_CAP_INODE_ALL, false); if (!err) { generic_fillattr(inode, stat); stat->ino = ceph_translate_ino(inode->i_sb, inode->i_ino); diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c index a822a6e58290..f851d8d70158 100644 --- a/fs/ceph/ioctl.c +++ b/fs/ceph/ioctl.c @@ -19,7 +19,7 @@ static long ceph_ioctl_get_layout(struct file *file, void __user *arg) struct ceph_ioctl_layout l; int err; - err = ceph_do_getattr(file_inode(file), CEPH_STAT_CAP_LAYOUT); + err = ceph_do_getattr(file_inode(file), CEPH_STAT_CAP_LAYOUT, false); if (!err) { l.stripe_unit = ceph_file_layout_su(ci->i_layout); l.stripe_count = ceph_file_layout_stripe_count(ci->i_layout); @@ -41,7 +41,7 @@ static long __validate_layout(struct ceph_mds_client *mdsc, /* validate striping parameters */ if ((l->object_size & ~PAGE_MASK) || (l->stripe_unit & ~PAGE_MASK) || - (l->stripe_unit != 0 && + ((unsigned)l->stripe_unit != 0 && ((unsigned)l->object_size % (unsigned)l->stripe_unit))) return -EINVAL; @@ -74,7 +74,7 @@ static long ceph_ioctl_set_layout(struct file *file, void __user *arg) return -EFAULT; /* validate changed params against current layout */ - err = ceph_do_getattr(file_inode(file), CEPH_STAT_CAP_LAYOUT); + err = ceph_do_getattr(file_inode(file), CEPH_STAT_CAP_LAYOUT, false); if (err) return err; diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index bad07c09f91e..a92d3f5c6c12 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -7,6 +7,7 @@ #include <linux/sched.h> #include <linux/debugfs.h> #include <linux/seq_file.h> +#include <linux/utsname.h> #include "super.h" #include "mds_client.h" @@ -334,7 +335,7 @@ static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info) /* * sessions */ -static const char *session_state_name(int s) +const char *ceph_session_state_name(int s) { switch (s) { case CEPH_MDS_SESSION_NEW: return "new"; @@ -542,6 +543,8 @@ void ceph_mdsc_release_request(struct kref *kref) } kfree(req->r_path1); kfree(req->r_path2); + if (req->r_pagelist) + ceph_pagelist_release(req->r_pagelist); put_request_session(req); ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation); kfree(req); @@ -812,6 +815,74 @@ static struct ceph_msg *create_session_msg(u32 op, u64 seq) h = msg->front.iov_base; h->op = cpu_to_le32(op); h->seq = cpu_to_le64(seq); + + return msg; +} + +/* + * session message, specialization for CEPH_SESSION_REQUEST_OPEN + * to include additional client metadata fields. + */ +static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq) +{ + struct ceph_msg *msg; + struct ceph_mds_session_head *h; + int i = -1; + int metadata_bytes = 0; + int metadata_key_count = 0; + struct ceph_options *opt = mdsc->fsc->client->options; + void *p; + + const char* metadata[3][2] = { + {"hostname", utsname()->nodename}, + {"entity_id", opt->name ? opt->name : ""}, + {NULL, NULL} + }; + + /* Calculate serialized length of metadata */ + metadata_bytes = 4; /* map length */ + for (i = 0; metadata[i][0] != NULL; ++i) { + metadata_bytes += 8 + strlen(metadata[i][0]) + + strlen(metadata[i][1]); + metadata_key_count++; + } + + /* Allocate the message */ + msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + metadata_bytes, + GFP_NOFS, false); + if (!msg) { + pr_err("create_session_msg ENOMEM creating msg\n"); + return NULL; + } + h = msg->front.iov_base; + h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN); + h->seq = cpu_to_le64(seq); + + /* + * Serialize client metadata into waiting buffer space, using + * the format that userspace expects for map<string, string> + */ + msg->hdr.version = 2; /* ClientSession messages with metadata are v2 */ + + /* The write pointer, following the session_head structure */ + p = msg->front.iov_base + sizeof(*h); + + /* Number of entries in the map */ + ceph_encode_32(&p, metadata_key_count); + + /* Two length-prefixed strings for each entry in the map */ + for (i = 0; metadata[i][0] != NULL; ++i) { + size_t const key_len = strlen(metadata[i][0]); + size_t const val_len = strlen(metadata[i][1]); + + ceph_encode_32(&p, key_len); + memcpy(p, metadata[i][0], key_len); + p += key_len; + ceph_encode_32(&p, val_len); + memcpy(p, metadata[i][1], val_len); + p += val_len; + } + return msg; } @@ -835,7 +906,7 @@ static int __open_session(struct ceph_mds_client *mdsc, session->s_renew_requested = jiffies; /* send connect message */ - msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq); + msg = create_session_open_msg(mdsc, session->s_seq); if (!msg) return -ENOMEM; ceph_con_send(&session->s_con, msg); @@ -1164,7 +1235,7 @@ static int send_flushmsg_ack(struct ceph_mds_client *mdsc, struct ceph_msg *msg; dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n", - session->s_mds, session_state_name(session->s_state), seq); + session->s_mds, ceph_session_state_name(session->s_state), seq); msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq); if (!msg) return -ENOMEM; @@ -1216,7 +1287,7 @@ static int request_close_session(struct ceph_mds_client *mdsc, struct ceph_msg *msg; dout("request_close_session mds%d state %s seq %lld\n", - session->s_mds, session_state_name(session->s_state), + session->s_mds, ceph_session_state_name(session->s_state), session->s_seq); msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq); if (!msg) @@ -1847,13 +1918,15 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, msg->front.iov_len = p - msg->front.iov_base; msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); - if (req->r_data_len) { - /* outbound data set only by ceph_sync_setxattr() */ - BUG_ON(!req->r_pages); - ceph_msg_data_add_pages(msg, req->r_pages, req->r_data_len, 0); + if (req->r_pagelist) { + struct ceph_pagelist *pagelist = req->r_pagelist; + atomic_inc(&pagelist->refcnt); + ceph_msg_data_add_pagelist(msg, pagelist); + msg->hdr.data_len = cpu_to_le32(pagelist->length); + } else { + msg->hdr.data_len = 0; } - msg->hdr.data_len = cpu_to_le32(req->r_data_len); msg->hdr.data_off = cpu_to_le16(0); out_free2: @@ -2007,7 +2080,7 @@ static int __do_request(struct ceph_mds_client *mdsc, req->r_session = get_session(session); dout("do_request mds%d session %p state %s\n", mds, session, - session_state_name(session->s_state)); + ceph_session_state_name(session->s_state)); if (session->s_state != CEPH_MDS_SESSION_OPEN && session->s_state != CEPH_MDS_SESSION_HUNG) { if (session->s_state == CEPH_MDS_SESSION_NEW || @@ -2078,6 +2151,7 @@ static void kick_requests(struct ceph_mds_client *mdsc, int mds) if (req->r_session && req->r_session->s_mds == mds) { dout(" kicking tid %llu\n", req->r_tid); + list_del_init(&req->r_wait); __do_request(mdsc, req); } } @@ -2444,7 +2518,7 @@ static void handle_session(struct ceph_mds_session *session, dout("handle_session mds%d %s %p state %s seq %llu\n", mds, ceph_session_op_name(op), session, - session_state_name(session->s_state), seq); + ceph_session_state_name(session->s_state), seq); if (session->s_state == CEPH_MDS_SESSION_HUNG) { session->s_state = CEPH_MDS_SESSION_OPEN; @@ -2471,9 +2545,8 @@ static void handle_session(struct ceph_mds_session *session, if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) pr_info("mds%d reconnect denied\n", session->s_mds); remove_session_caps(session); - wake = 1; /* for good measure */ + wake = 2; /* for good measure */ wake_up_all(&mdsc->session_close_wq); - kick_requests(mdsc, mds); break; case CEPH_SESSION_STALE: @@ -2503,6 +2576,8 @@ static void handle_session(struct ceph_mds_session *session, if (wake) { mutex_lock(&mdsc->mutex); __wake_requests(mdsc, &session->s_waiting); + if (wake == 2) + kick_requests(mdsc, mds); mutex_unlock(&mdsc->mutex); } return; @@ -2695,18 +2770,8 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, session->s_state = CEPH_MDS_SESSION_RECONNECTING; session->s_seq = 0; - ceph_con_close(&session->s_con); - ceph_con_open(&session->s_con, - CEPH_ENTITY_TYPE_MDS, mds, - ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); - - /* replay unsafe requests */ - replay_unsafe_requests(mdsc, session); - - down_read(&mdsc->snap_rwsem); - dout("session %p state %s\n", session, - session_state_name(session->s_state)); + ceph_session_state_name(session->s_state)); spin_lock(&session->s_gen_ttl_lock); session->s_cap_gen++; @@ -2723,6 +2788,19 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, discard_cap_releases(mdsc, session); spin_unlock(&session->s_cap_lock); + /* trim unused caps to reduce MDS's cache rejoin time */ + shrink_dcache_parent(mdsc->fsc->sb->s_root); + + ceph_con_close(&session->s_con); + ceph_con_open(&session->s_con, + CEPH_ENTITY_TYPE_MDS, mds, + ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); + + /* replay unsafe requests */ + replay_unsafe_requests(mdsc, session); + + down_read(&mdsc->snap_rwsem); + /* traverse this session's caps */ s_nr_caps = session->s_nr_caps; err = ceph_pagelist_encode_32(pagelist, s_nr_caps); @@ -2791,7 +2869,6 @@ fail: mutex_unlock(&session->s_mutex); fail_nomsg: ceph_pagelist_release(pagelist); - kfree(pagelist); fail_nopagelist: pr_err("error %d preparing reconnect for mds%d\n", err, mds); return; @@ -2827,7 +2904,7 @@ static void check_new_map(struct ceph_mds_client *mdsc, ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "", ceph_mds_state_name(newstate), ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "", - session_state_name(s->s_state)); + ceph_session_state_name(s->s_state)); if (i >= newmap->m_max_mds || memcmp(ceph_mdsmap_get_addr(oldmap, i), @@ -2939,14 +3016,15 @@ static void handle_lease(struct ceph_mds_client *mdsc, if (dname.len != get_unaligned_le32(h+1)) goto bad; - mutex_lock(&session->s_mutex); - session->s_seq++; - /* lookup inode */ inode = ceph_find_inode(sb, vino); dout("handle_lease %s, ino %llx %p %.*s\n", ceph_lease_op_name(h->action), vino.ino, inode, dname.len, dname.name); + + mutex_lock(&session->s_mutex); + session->s_seq++; + if (inode == NULL) { dout("handle_lease no inode %llx\n", vino.ino); goto release; diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index e00737cf523c..3288359353e9 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -202,9 +202,7 @@ struct ceph_mds_request { bool r_direct_is_hash; /* true if r_direct_hash is valid */ /* data payload is used for xattr ops */ - struct page **r_pages; - int r_num_pages; - int r_data_len; + struct ceph_pagelist *r_pagelist; /* what caps shall we drop? */ int r_inode_drop, r_inode_unless; @@ -332,6 +330,8 @@ ceph_get_mds_session(struct ceph_mds_session *s) return s; } +extern const char *ceph_session_state_name(int s); + extern void ceph_put_mds_session(struct ceph_mds_session *s); extern int ceph_send_msg_mds(struct ceph_mds_client *mdsc, diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 12b20744e386..b82f507979b8 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -95,6 +95,7 @@ struct ceph_fs_client { struct dentry *debugfs_congestion_kb; struct dentry *debugfs_bdi; struct dentry *debugfs_mdsc, *debugfs_mdsmap; + struct dentry *debugfs_mds_sessions; #endif #ifdef CONFIG_CEPH_FSCACHE @@ -714,7 +715,7 @@ extern void ceph_queue_vmtruncate(struct inode *inode); extern void ceph_queue_invalidate(struct inode *inode); extern void ceph_queue_writeback(struct inode *inode); -extern int ceph_do_getattr(struct inode *inode, int mask); +extern int ceph_do_getattr(struct inode *inode, int mask, bool force); extern int ceph_permission(struct inode *inode, int mask); extern int ceph_setattr(struct dentry *dentry, struct iattr *attr); extern int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry, @@ -733,15 +734,23 @@ extern void __ceph_build_xattrs_blob(struct ceph_inode_info *ci); extern void __ceph_destroy_xattrs(struct ceph_inode_info *ci); extern void __init ceph_xattr_init(void); extern void ceph_xattr_exit(void); +extern const struct xattr_handler *ceph_xattr_handlers[]; /* acl.c */ -extern const struct xattr_handler *ceph_xattr_handlers[]; +struct ceph_acls_info { + void *default_acl; + void *acl; + struct ceph_pagelist *pagelist; +}; #ifdef CONFIG_CEPH_FS_POSIX_ACL struct posix_acl *ceph_get_acl(struct inode *, int); int ceph_set_acl(struct inode *inode, struct posix_acl *acl, int type); -int ceph_init_acl(struct dentry *, struct inode *, struct inode *); +int ceph_pre_init_acls(struct inode *dir, umode_t *mode, + struct ceph_acls_info *info); +void ceph_init_inode_acls(struct inode *inode, struct ceph_acls_info *info); +void ceph_release_acls_info(struct ceph_acls_info *info); static inline void ceph_forget_all_cached_acls(struct inode *inode) { @@ -753,12 +762,18 @@ static inline void ceph_forget_all_cached_acls(struct inode *inode) #define ceph_get_acl NULL #define ceph_set_acl NULL -static inline int ceph_init_acl(struct dentry *dentry, struct inode *inode, - struct inode *dir) +static inline int ceph_pre_init_acls(struct inode *dir, umode_t *mode, + struct ceph_acls_info *info) { return 0; } - +static inline void ceph_init_inode_acls(struct inode *inode, + struct ceph_acls_info *info) +{ +} +static inline void ceph_release_acls_info(struct ceph_acls_info *info) +{ +} static inline int ceph_acl_chmod(struct dentry *dentry, struct inode *inode) { return 0; diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c index 12f58d22e017..678b0d2bbbc4 100644 --- a/fs/ceph/xattr.c +++ b/fs/ceph/xattr.c @@ -1,4 +1,5 @@ #include <linux/ceph/ceph_debug.h> +#include <linux/ceph/pagelist.h> #include "super.h" #include "mds_client.h" @@ -284,8 +285,7 @@ static size_t ceph_vxattrs_name_size(struct ceph_vxattr *vxattrs) return ceph_dir_vxattrs_name_size; if (vxattrs == ceph_file_vxattrs) return ceph_file_vxattrs_name_size; - BUG(); - + BUG_ON(vxattrs); return 0; } @@ -736,24 +736,20 @@ ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value, dout("getxattr %p ver=%lld index_ver=%lld\n", inode, ci->i_xattrs.version, ci->i_xattrs.index_version); - if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1) && - (ci->i_xattrs.index_version >= ci->i_xattrs.version)) { - goto get_xattr; - } else { + if (ci->i_xattrs.version == 0 || + !__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1)) { spin_unlock(&ci->i_ceph_lock); /* get xattrs from mds (if we don't already have them) */ - err = ceph_do_getattr(inode, CEPH_STAT_CAP_XATTR); + err = ceph_do_getattr(inode, CEPH_STAT_CAP_XATTR, true); if (err) return err; + spin_lock(&ci->i_ceph_lock); } - spin_lock(&ci->i_ceph_lock); - err = __build_xattrs(inode); if (err < 0) goto out; -get_xattr: err = -ENODATA; /* == ENOATTR */ xattr = __get_xattr(ci, name); if (!xattr) @@ -798,23 +794,18 @@ ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size) dout("listxattr %p ver=%lld index_ver=%lld\n", inode, ci->i_xattrs.version, ci->i_xattrs.index_version); - if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1) && - (ci->i_xattrs.index_version >= ci->i_xattrs.version)) { - goto list_xattr; - } else { + if (ci->i_xattrs.version == 0 || + !__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1)) { spin_unlock(&ci->i_ceph_lock); - err = ceph_do_getattr(inode, CEPH_STAT_CAP_XATTR); + err = ceph_do_getattr(inode, CEPH_STAT_CAP_XATTR, true); if (err) return err; + spin_lock(&ci->i_ceph_lock); } - spin_lock(&ci->i_ceph_lock); - err = __build_xattrs(inode); if (err < 0) goto out; - -list_xattr: /* * Start with virtual dir xattr names (if any) (including * terminating '\0' characters for each). @@ -860,35 +851,25 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name, struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_mds_request *req; struct ceph_mds_client *mdsc = fsc->mdsc; + struct ceph_pagelist *pagelist = NULL; int err; - int i, nr_pages; - struct page **pages = NULL; - void *kaddr; - - /* copy value into some pages */ - nr_pages = calc_pages_for(0, size); - if (nr_pages) { - pages = kmalloc(sizeof(pages[0])*nr_pages, GFP_NOFS); - if (!pages) + + if (value) { + /* copy value into pagelist */ + pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS); + if (!pagelist) return -ENOMEM; - err = -ENOMEM; - for (i = 0; i < nr_pages; i++) { - pages[i] = __page_cache_alloc(GFP_NOFS); - if (!pages[i]) { - nr_pages = i; - goto out; - } - kaddr = kmap(pages[i]); - memcpy(kaddr, value + i*PAGE_CACHE_SIZE, - min(PAGE_CACHE_SIZE, size-i*PAGE_CACHE_SIZE)); - } + + ceph_pagelist_init(pagelist); + err = ceph_pagelist_append(pagelist, value, size); + if (err) + goto out; + } else { + flags |= CEPH_XATTR_REMOVE; } dout("setxattr value=%.*s\n", (int)size, value); - if (!value) - flags |= CEPH_XATTR_REMOVE; - /* do request */ req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SETXATTR, USE_AUTH_MDS); @@ -903,9 +884,8 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name, req->r_args.setxattr.flags = cpu_to_le32(flags); req->r_path2 = kstrdup(name, GFP_NOFS); - req->r_pages = pages; - req->r_num_pages = nr_pages; - req->r_data_len = size; + req->r_pagelist = pagelist; + pagelist = NULL; dout("xattr.ver (before): %lld\n", ci->i_xattrs.version); err = ceph_mdsc_do_request(mdsc, NULL, req); @@ -913,11 +893,8 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name, dout("xattr.ver (after): %lld\n", ci->i_xattrs.version); out: - if (pages) { - for (i = 0; i < nr_pages; i++) - __free_page(pages[i]); - kfree(pages); - } + if (pagelist) + ceph_pagelist_release(pagelist); return err; } @@ -968,7 +945,7 @@ int __ceph_setxattr(struct dentry *dentry, const char *name, retry: issued = __ceph_caps_issued(ci, NULL); dout("setxattr %p issued %s\n", inode, ceph_cap_string(issued)); - if (!(issued & CEPH_CAP_XATTR_EXCL)) + if (ci->i_xattrs.version == 0 || !(issued & CEPH_CAP_XATTR_EXCL)) goto do_sync; __build_xattrs(inode); @@ -1077,7 +1054,7 @@ retry: issued = __ceph_caps_issued(ci, NULL); dout("removexattr %p issued %s\n", inode, ceph_cap_string(issued)); - if (!(issued & CEPH_CAP_XATTR_EXCL)) + if (ci->i_xattrs.version == 0 || !(issued & CEPH_CAP_XATTR_EXCL)) goto do_sync; __build_xattrs(inode); |