diff options
Diffstat (limited to 'fs/ceph/caps.c')
-rw-r--r-- | fs/ceph/caps.c | 323 |
1 files changed, 205 insertions, 118 deletions
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 16e6ded0b7f2..baea866a6751 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -987,96 +987,127 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release) __cap_delay_cancel(mdsc, ci); } +struct cap_msg_args { + struct ceph_mds_session *session; + u64 ino, cid, follows; + u64 flush_tid, oldest_flush_tid, size, max_size; + u64 xattr_version; + struct ceph_buffer *xattr_buf; + struct timespec atime, mtime, ctime; + int op, caps, wanted, dirty; + u32 seq, issue_seq, mseq, time_warp_seq; + u32 flags; + kuid_t uid; + kgid_t gid; + umode_t mode; + bool inline_data; +}; + /* * Build and send a cap message to the given MDS. * * Caller should be holding s_mutex. */ -static int send_cap_msg(struct ceph_mds_session *session, - u64 ino, u64 cid, int op, - int caps, int wanted, int dirty, - u32 seq, u64 flush_tid, u64 oldest_flush_tid, - u32 issue_seq, u32 mseq, u64 size, u64 max_size, - struct timespec *mtime, struct timespec *atime, - struct timespec *ctime, u32 time_warp_seq, - kuid_t uid, kgid_t gid, umode_t mode, - u64 xattr_version, - struct ceph_buffer *xattrs_buf, - u64 follows, bool inline_data) +static int send_cap_msg(struct cap_msg_args *arg) { struct ceph_mds_caps *fc; struct ceph_msg *msg; void *p; size_t extra_len; + struct timespec zerotime = {0}; dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s" " seq %u/%u tid %llu/%llu mseq %u follows %lld size %llu/%llu" - " xattr_ver %llu xattr_len %d\n", ceph_cap_op_name(op), - cid, ino, ceph_cap_string(caps), ceph_cap_string(wanted), - ceph_cap_string(dirty), - seq, issue_seq, flush_tid, oldest_flush_tid, - mseq, follows, size, max_size, - xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0); + " xattr_ver %llu xattr_len %d\n", ceph_cap_op_name(arg->op), + arg->cid, arg->ino, ceph_cap_string(arg->caps), + ceph_cap_string(arg->wanted), ceph_cap_string(arg->dirty), + arg->seq, arg->issue_seq, arg->flush_tid, arg->oldest_flush_tid, + arg->mseq, arg->follows, arg->size, arg->max_size, + arg->xattr_version, + arg->xattr_buf ? (int)arg->xattr_buf->vec.iov_len : 0); /* flock buffer size + inline version + inline data size + * osd_epoch_barrier + oldest_flush_tid */ - extra_len = 4 + 8 + 4 + 4 + 8; + extra_len = 4 + 8 + 4 + 4 + 8 + 4 + 4 + 4 + 8 + 8 + 4; msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc) + extra_len, GFP_NOFS, false); if (!msg) return -ENOMEM; - msg->hdr.version = cpu_to_le16(6); - msg->hdr.tid = cpu_to_le64(flush_tid); + msg->hdr.version = cpu_to_le16(10); + msg->hdr.tid = cpu_to_le64(arg->flush_tid); fc = msg->front.iov_base; memset(fc, 0, sizeof(*fc)); - fc->cap_id = cpu_to_le64(cid); - fc->op = cpu_to_le32(op); - fc->seq = cpu_to_le32(seq); - fc->issue_seq = cpu_to_le32(issue_seq); - fc->migrate_seq = cpu_to_le32(mseq); - fc->caps = cpu_to_le32(caps); - fc->wanted = cpu_to_le32(wanted); - fc->dirty = cpu_to_le32(dirty); - fc->ino = cpu_to_le64(ino); - fc->snap_follows = cpu_to_le64(follows); - - fc->size = cpu_to_le64(size); - fc->max_size = cpu_to_le64(max_size); - if (mtime) - ceph_encode_timespec(&fc->mtime, mtime); - if (atime) - ceph_encode_timespec(&fc->atime, atime); - if (ctime) - ceph_encode_timespec(&fc->ctime, ctime); - fc->time_warp_seq = cpu_to_le32(time_warp_seq); - - fc->uid = cpu_to_le32(from_kuid(&init_user_ns, uid)); - fc->gid = cpu_to_le32(from_kgid(&init_user_ns, gid)); - fc->mode = cpu_to_le32(mode); + fc->cap_id = cpu_to_le64(arg->cid); + fc->op = cpu_to_le32(arg->op); + fc->seq = cpu_to_le32(arg->seq); + fc->issue_seq = cpu_to_le32(arg->issue_seq); + fc->migrate_seq = cpu_to_le32(arg->mseq); + fc->caps = cpu_to_le32(arg->caps); + fc->wanted = cpu_to_le32(arg->wanted); + fc->dirty = cpu_to_le32(arg->dirty); + fc->ino = cpu_to_le64(arg->ino); + fc->snap_follows = cpu_to_le64(arg->follows); + + fc->size = cpu_to_le64(arg->size); + fc->max_size = cpu_to_le64(arg->max_size); + ceph_encode_timespec(&fc->mtime, &arg->mtime); + ceph_encode_timespec(&fc->atime, &arg->atime); + ceph_encode_timespec(&fc->ctime, &arg->ctime); + fc->time_warp_seq = cpu_to_le32(arg->time_warp_seq); + + fc->uid = cpu_to_le32(from_kuid(&init_user_ns, arg->uid)); + fc->gid = cpu_to_le32(from_kgid(&init_user_ns, arg->gid)); + fc->mode = cpu_to_le32(arg->mode); + + fc->xattr_version = cpu_to_le64(arg->xattr_version); + if (arg->xattr_buf) { + msg->middle = ceph_buffer_get(arg->xattr_buf); + fc->xattr_len = cpu_to_le32(arg->xattr_buf->vec.iov_len); + msg->hdr.middle_len = cpu_to_le32(arg->xattr_buf->vec.iov_len); + } p = fc + 1; - /* flock buffer size */ + /* flock buffer size (version 2) */ ceph_encode_32(&p, 0); - /* inline version */ - ceph_encode_64(&p, inline_data ? 0 : CEPH_INLINE_NONE); + /* inline version (version 4) */ + ceph_encode_64(&p, arg->inline_data ? 0 : CEPH_INLINE_NONE); /* inline data size */ ceph_encode_32(&p, 0); - /* osd_epoch_barrier */ + /* osd_epoch_barrier (version 5) */ ceph_encode_32(&p, 0); - /* oldest_flush_tid */ - ceph_encode_64(&p, oldest_flush_tid); + /* oldest_flush_tid (version 6) */ + ceph_encode_64(&p, arg->oldest_flush_tid); - fc->xattr_version = cpu_to_le64(xattr_version); - if (xattrs_buf) { - msg->middle = ceph_buffer_get(xattrs_buf); - fc->xattr_len = cpu_to_le32(xattrs_buf->vec.iov_len); - msg->hdr.middle_len = cpu_to_le32(xattrs_buf->vec.iov_len); - } + /* + * caller_uid/caller_gid (version 7) + * + * Currently, we don't properly track which caller dirtied the caps + * last, and force a flush of them when there is a conflict. For now, + * just set this to 0:0, to emulate how the MDS has worked up to now. + */ + ceph_encode_32(&p, 0); + ceph_encode_32(&p, 0); + + /* pool namespace (version 8) (mds always ignores this) */ + ceph_encode_32(&p, 0); - ceph_con_send(&session->s_con, msg); + /* + * btime and change_attr (version 9) + * + * We just zero these out for now, as the MDS ignores them unless + * the requisite feature flags are set (which we don't do yet). + */ + ceph_encode_timespec(p, &zerotime); + p += sizeof(struct ceph_timespec); + ceph_encode_64(&p, 0); + + /* Advisory flags (version 10) */ + ceph_encode_32(&p, arg->flags); + + ceph_con_send(&arg->session->s_con, msg); return 0; } @@ -1115,27 +1146,17 @@ void ceph_queue_caps_release(struct inode *inode) * caller should hold snap_rwsem (read), s_mutex. */ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, - int op, int used, int want, int retain, int flushing, - u64 flush_tid, u64 oldest_flush_tid) + int op, bool sync, int used, int want, int retain, + int flushing, u64 flush_tid, u64 oldest_flush_tid) __releases(cap->ci->i_ceph_lock) { struct ceph_inode_info *ci = cap->ci; struct inode *inode = &ci->vfs_inode; - u64 cap_id = cap->cap_id; - int held, revoking, dropping, keep; - u64 follows, size, max_size; - u32 seq, issue_seq, mseq, time_warp_seq; - struct timespec mtime, atime, ctime; + struct cap_msg_args arg; + int held, revoking, dropping; int wake = 0; - umode_t mode; - kuid_t uid; - kgid_t gid; - struct ceph_mds_session *session; - u64 xattr_version = 0; - struct ceph_buffer *xattr_blob = NULL; int delayed = 0; int ret; - bool inline_data; held = cap->issued | cap->implemented; revoking = cap->implemented & ~cap->issued; @@ -1148,7 +1169,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, ceph_cap_string(revoking)); BUG_ON((retain & CEPH_CAP_PIN) == 0); - session = cap->session; + arg.session = cap->session; /* don't release wanted unless we've waited a bit. */ if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0 && @@ -1177,40 +1198,51 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, cap->implemented &= cap->issued | used; cap->mds_wanted = want; - follows = flushing ? ci->i_head_snapc->seq : 0; - - keep = cap->implemented; - seq = cap->seq; - issue_seq = cap->issue_seq; - mseq = cap->mseq; - size = inode->i_size; - ci->i_reported_size = size; - max_size = ci->i_wanted_max_size; - ci->i_requested_max_size = max_size; - mtime = inode->i_mtime; - atime = inode->i_atime; - ctime = inode->i_ctime; - time_warp_seq = ci->i_time_warp_seq; - uid = inode->i_uid; - gid = inode->i_gid; - mode = inode->i_mode; + arg.ino = ceph_vino(inode).ino; + arg.cid = cap->cap_id; + arg.follows = flushing ? ci->i_head_snapc->seq : 0; + arg.flush_tid = flush_tid; + arg.oldest_flush_tid = oldest_flush_tid; + + arg.size = inode->i_size; + ci->i_reported_size = arg.size; + arg.max_size = ci->i_wanted_max_size; + ci->i_requested_max_size = arg.max_size; if (flushing & CEPH_CAP_XATTR_EXCL) { __ceph_build_xattrs_blob(ci); - xattr_blob = ci->i_xattrs.blob; - xattr_version = ci->i_xattrs.version; + arg.xattr_version = ci->i_xattrs.version; + arg.xattr_buf = ci->i_xattrs.blob; + } else { + arg.xattr_buf = NULL; } - inline_data = ci->i_inline_version != CEPH_INLINE_NONE; + arg.mtime = inode->i_mtime; + arg.atime = inode->i_atime; + arg.ctime = inode->i_ctime; + + arg.op = op; + arg.caps = cap->implemented; + arg.wanted = want; + arg.dirty = flushing; + + arg.seq = cap->seq; + arg.issue_seq = cap->issue_seq; + arg.mseq = cap->mseq; + arg.time_warp_seq = ci->i_time_warp_seq; + + arg.uid = inode->i_uid; + arg.gid = inode->i_gid; + arg.mode = inode->i_mode; + + arg.inline_data = ci->i_inline_version != CEPH_INLINE_NONE; + arg.flags = 0; + if (sync) + arg.flags |= CEPH_CLIENT_CAPS_SYNC; spin_unlock(&ci->i_ceph_lock); - ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id, - op, keep, want, flushing, seq, - flush_tid, oldest_flush_tid, issue_seq, mseq, - size, max_size, &mtime, &atime, &ctime, time_warp_seq, - uid, gid, mode, xattr_version, xattr_blob, - follows, inline_data); + ret = send_cap_msg(&arg); if (ret < 0) { dout("error sending cap msg, must requeue %p\n", inode); delayed = 1; @@ -1227,15 +1259,42 @@ static inline int __send_flush_snap(struct inode *inode, struct ceph_cap_snap *capsnap, u32 mseq, u64 oldest_flush_tid) { - return send_cap_msg(session, ceph_vino(inode).ino, 0, - CEPH_CAP_OP_FLUSHSNAP, capsnap->issued, 0, - capsnap->dirty, 0, capsnap->cap_flush.tid, - oldest_flush_tid, 0, mseq, capsnap->size, 0, - &capsnap->mtime, &capsnap->atime, - &capsnap->ctime, capsnap->time_warp_seq, - capsnap->uid, capsnap->gid, capsnap->mode, - capsnap->xattr_version, capsnap->xattr_blob, - capsnap->follows, capsnap->inline_data); + struct cap_msg_args arg; + + arg.session = session; + arg.ino = ceph_vino(inode).ino; + arg.cid = 0; + arg.follows = capsnap->follows; + arg.flush_tid = capsnap->cap_flush.tid; + arg.oldest_flush_tid = oldest_flush_tid; + + arg.size = capsnap->size; + arg.max_size = 0; + arg.xattr_version = capsnap->xattr_version; + arg.xattr_buf = capsnap->xattr_blob; + + arg.atime = capsnap->atime; + arg.mtime = capsnap->mtime; + arg.ctime = capsnap->ctime; + + arg.op = CEPH_CAP_OP_FLUSHSNAP; + arg.caps = capsnap->issued; + arg.wanted = 0; + arg.dirty = capsnap->dirty; + + arg.seq = 0; + arg.issue_seq = 0; + arg.mseq = mseq; + arg.time_warp_seq = capsnap->time_warp_seq; + + arg.uid = capsnap->uid; + arg.gid = capsnap->gid; + arg.mode = capsnap->mode; + + arg.inline_data = capsnap->inline_data; + arg.flags = 0; + + return send_cap_msg(&arg); } /* @@ -1858,9 +1917,9 @@ ack: sent++; /* __send_cap drops i_ceph_lock */ - delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, cap_used, - want, retain, flushing, - flush_tid, oldest_flush_tid); + delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, false, + cap_used, want, retain, flushing, + flush_tid, oldest_flush_tid); goto retry; /* retake i_ceph_lock and restart our cap scan. */ } @@ -1924,9 +1983,9 @@ retry: &flush_tid, &oldest_flush_tid); /* __send_cap drops i_ceph_lock */ - delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, used, want, - (cap->issued | cap->implemented), - flushing, flush_tid, oldest_flush_tid); + delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, true, + used, want, (cap->issued | cap->implemented), + flushing, flush_tid, oldest_flush_tid); if (delayed) { spin_lock(&ci->i_ceph_lock); @@ -1996,7 +2055,7 @@ static int unsafe_request_wait(struct inode *inode) } spin_unlock(&ci->i_unsafe_lock); - dout("unsafe_requeset_wait %p wait on tid %llu %llu\n", + dout("unsafe_request_wait %p wait on tid %llu %llu\n", inode, req1 ? req1->r_tid : 0ULL, req2 ? req2->r_tid : 0ULL); if (req1) { ret = !wait_for_completion_timeout(&req1->r_safe_completion, @@ -2119,7 +2178,7 @@ static void __kick_flushing_caps(struct ceph_mds_client *mdsc, inode, cap, cf->tid, ceph_cap_string(cf->caps)); ci->i_ceph_flags |= CEPH_I_NODELAY; ret = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, - __ceph_caps_used(ci), + false, __ceph_caps_used(ci), __ceph_caps_wanted(ci), cap->issued | cap->implemented, cf->caps, cf->tid, oldest_flush_tid); @@ -2479,6 +2538,27 @@ static void check_max_size(struct inode *inode, loff_t endoff) ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL); } +int ceph_try_get_caps(struct ceph_inode_info *ci, int need, int want, int *got) +{ + int ret, err = 0; + + BUG_ON(need & ~CEPH_CAP_FILE_RD); + BUG_ON(want & ~(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)); + ret = ceph_pool_perm_check(ci, need); + if (ret < 0) + return ret; + + ret = try_get_cap_refs(ci, need, want, 0, true, got, &err); + if (ret) { + if (err == -EAGAIN) { + ret = 0; + } else if (err < 0) { + ret = err; + } + } + return ret; +} + /* * Wait for caps, and take cap references. If we can't get a WR cap * due to a small max_size, make sure we check_max_size (and possibly @@ -2507,9 +2587,15 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, if (err < 0) ret = err; } else { - ret = wait_event_interruptible(ci->i_cap_wq, - try_get_cap_refs(ci, need, want, endoff, - true, &_got, &err)); + DEFINE_WAIT_FUNC(wait, woken_wake_function); + add_wait_queue(&ci->i_cap_wq, &wait); + + while (!try_get_cap_refs(ci, need, want, endoff, + true, &_got, &err)) + wait_woken(&wait, TASK_INTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT); + + remove_wait_queue(&ci->i_cap_wq, &wait); + if (err == -EAGAIN) continue; if (err < 0) @@ -3570,6 +3656,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, cap->cap_id = le64_to_cpu(h->cap_id); cap->mseq = mseq; cap->seq = seq; + cap->issue_seq = seq; spin_lock(&session->s_cap_lock); list_add_tail(&cap->session_caps, &session->s_cap_releases); |