summaryrefslogtreecommitdiff
path: root/fs/nfs/localio.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/nfs/localio.c')
-rw-r--r--fs/nfs/localio.c236
1 files changed, 177 insertions, 59 deletions
diff --git a/fs/nfs/localio.c b/fs/nfs/localio.c
index 4b8618cf114c..5c21caeae075 100644
--- a/fs/nfs/localio.c
+++ b/fs/nfs/localio.c
@@ -35,6 +35,7 @@ struct nfs_local_kiocb {
struct bio_vec *bvec;
struct nfs_pgio_header *hdr;
struct work_struct work;
+ void (*aio_complete_work)(struct work_struct *);
struct nfsd_file *localio;
};
@@ -48,9 +49,14 @@ struct nfs_local_fsync_ctx {
static bool localio_enabled __read_mostly = true;
module_param(localio_enabled, bool, 0644);
+static bool localio_O_DIRECT_semantics __read_mostly = false;
+module_param(localio_O_DIRECT_semantics, bool, 0644);
+MODULE_PARM_DESC(localio_O_DIRECT_semantics,
+ "LOCALIO will use O_DIRECT semantics to filesystem.");
+
static inline bool nfs_client_is_local(const struct nfs_client *clp)
{
- return !!test_bit(NFS_CS_LOCAL_IO, &clp->cl_flags);
+ return !!rcu_access_pointer(clp->cl_uuid.net);
}
bool nfs_server_is_local(const struct nfs_client *clp)
@@ -116,30 +122,6 @@ const struct rpc_program nfslocalio_program = {
};
/*
- * nfs_local_enable - enable local i/o for an nfs_client
- */
-static void nfs_local_enable(struct nfs_client *clp)
-{
- spin_lock(&clp->cl_localio_lock);
- set_bit(NFS_CS_LOCAL_IO, &clp->cl_flags);
- trace_nfs_local_enable(clp);
- spin_unlock(&clp->cl_localio_lock);
-}
-
-/*
- * nfs_local_disable - disable local i/o for an nfs_client
- */
-void nfs_local_disable(struct nfs_client *clp)
-{
- spin_lock(&clp->cl_localio_lock);
- if (test_and_clear_bit(NFS_CS_LOCAL_IO, &clp->cl_flags)) {
- trace_nfs_local_disable(clp);
- nfs_uuid_invalidate_one_client(&clp->cl_uuid);
- }
- spin_unlock(&clp->cl_localio_lock);
-}
-
-/*
* nfs_init_localioclient - Initialise an NFS localio client connection
*/
static struct rpc_clnt *nfs_init_localioclient(struct nfs_client *clp)
@@ -178,7 +160,7 @@ static bool nfs_server_uuid_is_local(struct nfs_client *clp)
rpc_shutdown_client(rpcclient_localio);
/* Server is only local if it initialized required struct members */
- if (status || !clp->cl_uuid.net || !clp->cl_uuid.dom)
+ if (status || !rcu_access_pointer(clp->cl_uuid.net) || !clp->cl_uuid.dom)
return false;
return true;
@@ -194,44 +176,64 @@ void nfs_local_probe(struct nfs_client *clp)
/* Disallow localio if disabled via sysfs or AUTH_SYS isn't used */
if (!localio_enabled ||
clp->cl_rpcclient->cl_auth->au_flavor != RPC_AUTH_UNIX) {
- nfs_local_disable(clp);
+ nfs_localio_disable_client(clp);
return;
}
if (nfs_client_is_local(clp)) {
/* If already enabled, disable and re-enable */
- nfs_local_disable(clp);
+ nfs_localio_disable_client(clp);
}
if (!nfs_uuid_begin(&clp->cl_uuid))
return;
if (nfs_server_uuid_is_local(clp))
- nfs_local_enable(clp);
+ nfs_localio_enable_client(clp);
nfs_uuid_end(&clp->cl_uuid);
}
EXPORT_SYMBOL_GPL(nfs_local_probe);
+void nfs_local_probe_async_work(struct work_struct *work)
+{
+ struct nfs_client *clp =
+ container_of(work, struct nfs_client, cl_local_probe_work);
+
+ nfs_local_probe(clp);
+}
+
+void nfs_local_probe_async(struct nfs_client *clp)
+{
+ queue_work(nfsiod_workqueue, &clp->cl_local_probe_work);
+}
+EXPORT_SYMBOL_GPL(nfs_local_probe_async);
+
+static inline struct nfsd_file *nfs_local_file_get(struct nfsd_file *nf)
+{
+ return nfs_to->nfsd_file_get(nf);
+}
+
+static inline void nfs_local_file_put(struct nfsd_file *nf)
+{
+ nfs_to->nfsd_file_put(nf);
+}
+
/*
- * nfs_local_open_fh - open a local filehandle in terms of nfsd_file
+ * __nfs_local_open_fh - open a local filehandle in terms of nfsd_file.
*
- * Returns a pointer to a struct nfsd_file or NULL
+ * Returns a pointer to a struct nfsd_file or ERR_PTR.
+ * Caller must release returned nfsd_file with nfs_to_nfsd_file_put_local().
*/
-struct nfsd_file *
-nfs_local_open_fh(struct nfs_client *clp, const struct cred *cred,
- struct nfs_fh *fh, const fmode_t mode)
+static struct nfsd_file *
+__nfs_local_open_fh(struct nfs_client *clp, const struct cred *cred,
+ struct nfs_fh *fh, struct nfs_file_localio *nfl,
+ const fmode_t mode)
{
struct nfsd_file *localio;
- int status;
-
- if (!nfs_server_is_local(clp))
- return NULL;
- if (mode & ~(FMODE_READ | FMODE_WRITE))
- return NULL;
localio = nfs_open_local_fh(&clp->cl_uuid, clp->cl_rpcclient,
- cred, fh, mode);
+ cred, fh, nfl, mode);
if (IS_ERR(localio)) {
- status = PTR_ERR(localio);
+ int status = PTR_ERR(localio);
trace_nfs_local_open_fh(fh, mode, status);
switch (status) {
case -ENOMEM:
@@ -240,10 +242,59 @@ nfs_local_open_fh(struct nfs_client *clp, const struct cred *cred,
/* Revalidate localio, will disable if unsupported */
nfs_local_probe(clp);
}
- return NULL;
}
return localio;
}
+
+/*
+ * nfs_local_open_fh - open a local filehandle in terms of nfsd_file.
+ * First checking if the open nfsd_file is already cached, otherwise
+ * must __nfs_local_open_fh and insert the nfsd_file in nfs_file_localio.
+ *
+ * Returns a pointer to a struct nfsd_file or NULL.
+ */
+struct nfsd_file *
+nfs_local_open_fh(struct nfs_client *clp, const struct cred *cred,
+ struct nfs_fh *fh, struct nfs_file_localio *nfl,
+ const fmode_t mode)
+{
+ struct nfsd_file *nf, *new, __rcu **pnf;
+
+ if (!nfs_server_is_local(clp))
+ return NULL;
+ if (mode & ~(FMODE_READ | FMODE_WRITE))
+ return NULL;
+
+ if (mode & FMODE_WRITE)
+ pnf = &nfl->rw_file;
+ else
+ pnf = &nfl->ro_file;
+
+ new = NULL;
+ rcu_read_lock();
+ nf = rcu_dereference(*pnf);
+ if (!nf) {
+ rcu_read_unlock();
+ new = __nfs_local_open_fh(clp, cred, fh, nfl, mode);
+ if (IS_ERR(new))
+ return NULL;
+ /* try to swap in the pointer */
+ spin_lock(&clp->cl_uuid.lock);
+ nf = rcu_dereference_protected(*pnf, 1);
+ if (!nf) {
+ nf = new;
+ new = NULL;
+ rcu_assign_pointer(*pnf, nf);
+ }
+ spin_unlock(&clp->cl_uuid.lock);
+ rcu_read_lock();
+ }
+ nf = nfs_local_file_get(nf);
+ rcu_read_unlock();
+ if (new)
+ nfs_to_nfsd_file_put_local(new);
+ return nf;
+}
EXPORT_SYMBOL_GPL(nfs_local_open_fh);
static struct bio_vec *
@@ -285,10 +336,19 @@ nfs_local_iocb_alloc(struct nfs_pgio_header *hdr,
kfree(iocb);
return NULL;
}
- init_sync_kiocb(&iocb->kiocb, file);
+
+ if (localio_O_DIRECT_semantics &&
+ test_bit(NFS_IOHDR_ODIRECT, &hdr->flags)) {
+ iocb->kiocb.ki_filp = file;
+ iocb->kiocb.ki_flags = IOCB_DIRECT;
+ } else
+ init_sync_kiocb(&iocb->kiocb, file);
+
iocb->kiocb.ki_pos = hdr->args.offset;
iocb->hdr = hdr;
iocb->kiocb.ki_flags &= ~IOCB_APPEND;
+ iocb->aio_complete_work = NULL;
+
return iocb;
}
@@ -328,7 +388,7 @@ nfs_local_pgio_done(struct nfs_pgio_header *hdr, long status)
hdr->res.op_status = NFS4_OK;
hdr->task.tk_status = 0;
} else {
- hdr->res.op_status = nfs4_stat_to_errno(status);
+ hdr->res.op_status = nfs_localio_errno_to_nfs4_stat(status);
hdr->task.tk_status = status;
}
}
@@ -338,11 +398,23 @@ nfs_local_pgio_release(struct nfs_local_kiocb *iocb)
{
struct nfs_pgio_header *hdr = iocb->hdr;
- nfs_to_nfsd_file_put_local(iocb->localio);
+ nfs_local_file_put(iocb->localio);
nfs_local_iocb_free(iocb);
nfs_local_hdr_release(hdr, hdr->task.tk_ops);
}
+/*
+ * Complete the I/O from iocb->kiocb.ki_complete()
+ *
+ * Note that this function can be called from a bottom half context,
+ * hence we need to queue the rpc_call_done() etc to a workqueue
+ */
+static inline void nfs_local_pgio_aio_complete(struct nfs_local_kiocb *iocb)
+{
+ INIT_WORK(&iocb->work, iocb->aio_complete_work);
+ queue_work(nfsiod_workqueue, &iocb->work);
+}
+
static void
nfs_local_read_done(struct nfs_local_kiocb *iocb, long status)
{
@@ -365,6 +437,23 @@ nfs_local_read_done(struct nfs_local_kiocb *iocb, long status)
status > 0 ? status : 0, hdr->res.eof);
}
+static void nfs_local_read_aio_complete_work(struct work_struct *work)
+{
+ struct nfs_local_kiocb *iocb =
+ container_of(work, struct nfs_local_kiocb, work);
+
+ nfs_local_pgio_release(iocb);
+}
+
+static void nfs_local_read_aio_complete(struct kiocb *kiocb, long ret)
+{
+ struct nfs_local_kiocb *iocb =
+ container_of(kiocb, struct nfs_local_kiocb, kiocb);
+
+ nfs_local_read_done(iocb, ret);
+ nfs_local_pgio_aio_complete(iocb); /* Calls nfs_local_read_aio_complete_work */
+}
+
static void nfs_local_call_read(struct work_struct *work)
{
struct nfs_local_kiocb *iocb =
@@ -379,10 +468,10 @@ static void nfs_local_call_read(struct work_struct *work)
nfs_local_iter_init(&iter, iocb, READ);
status = filp->f_op->read_iter(&iocb->kiocb, &iter);
- WARN_ON_ONCE(status == -EIOCBQUEUED);
-
- nfs_local_read_done(iocb, status);
- nfs_local_pgio_release(iocb);
+ if (status != -EIOCBQUEUED) {
+ nfs_local_read_done(iocb, status);
+ nfs_local_pgio_release(iocb);
+ }
revert_creds(save_cred);
}
@@ -410,6 +499,11 @@ nfs_do_local_read(struct nfs_pgio_header *hdr,
nfs_local_pgio_init(hdr, call_ops);
hdr->res.eof = false;
+ if (iocb->kiocb.ki_flags & IOCB_DIRECT) {
+ iocb->kiocb.ki_complete = nfs_local_read_aio_complete;
+ iocb->aio_complete_work = nfs_local_read_aio_complete_work;
+ }
+
INIT_WORK(&iocb->work, nfs_local_call_read);
queue_work(nfslocaliod_workqueue, &iocb->work);
@@ -534,6 +628,24 @@ nfs_local_write_done(struct nfs_local_kiocb *iocb, long status)
nfs_local_pgio_done(hdr, status);
}
+static void nfs_local_write_aio_complete_work(struct work_struct *work)
+{
+ struct nfs_local_kiocb *iocb =
+ container_of(work, struct nfs_local_kiocb, work);
+
+ nfs_local_vfs_getattr(iocb);
+ nfs_local_pgio_release(iocb);
+}
+
+static void nfs_local_write_aio_complete(struct kiocb *kiocb, long ret)
+{
+ struct nfs_local_kiocb *iocb =
+ container_of(kiocb, struct nfs_local_kiocb, kiocb);
+
+ nfs_local_write_done(iocb, ret);
+ nfs_local_pgio_aio_complete(iocb); /* Calls nfs_local_write_aio_complete_work */
+}
+
static void nfs_local_call_write(struct work_struct *work)
{
struct nfs_local_kiocb *iocb =
@@ -552,11 +664,11 @@ static void nfs_local_call_write(struct work_struct *work)
file_start_write(filp);
status = filp->f_op->write_iter(&iocb->kiocb, &iter);
file_end_write(filp);
- WARN_ON_ONCE(status == -EIOCBQUEUED);
-
- nfs_local_write_done(iocb, status);
- nfs_local_vfs_getattr(iocb);
- nfs_local_pgio_release(iocb);
+ if (status != -EIOCBQUEUED) {
+ nfs_local_write_done(iocb, status);
+ nfs_local_vfs_getattr(iocb);
+ nfs_local_pgio_release(iocb);
+ }
revert_creds(save_cred);
current->flags = old_flags;
@@ -592,10 +704,16 @@ nfs_do_local_write(struct nfs_pgio_header *hdr,
case NFS_FILE_SYNC:
iocb->kiocb.ki_flags |= IOCB_DSYNC|IOCB_SYNC;
}
+
nfs_local_pgio_init(hdr, call_ops);
nfs_set_local_verifier(hdr->inode, hdr->res.verf, hdr->args.stable);
+ if (iocb->kiocb.ki_flags & IOCB_DIRECT) {
+ iocb->kiocb.ki_complete = nfs_local_write_aio_complete;
+ iocb->aio_complete_work = nfs_local_write_aio_complete_work;
+ }
+
INIT_WORK(&iocb->work, nfs_local_call_write);
queue_work(nfslocaliod_workqueue, &iocb->work);
@@ -626,8 +744,8 @@ int nfs_local_doio(struct nfs_client *clp, struct nfsd_file *localio,
if (status != 0) {
if (status == -EAGAIN)
- nfs_local_disable(clp);
- nfs_to_nfsd_file_put_local(localio);
+ nfs_localio_disable_client(clp);
+ nfs_local_file_put(localio);
hdr->task.tk_status = status;
nfs_local_hdr_release(hdr, call_ops);
}
@@ -668,7 +786,7 @@ nfs_local_commit_done(struct nfs_commit_data *data, int status)
data->task.tk_status = 0;
} else {
nfs_reset_boot_verifier(data->inode);
- data->res.op_status = nfs4_stat_to_errno(status);
+ data->res.op_status = nfs_localio_errno_to_nfs4_stat(status);
data->task.tk_status = status;
}
}
@@ -678,7 +796,7 @@ nfs_local_release_commit_data(struct nfsd_file *localio,
struct nfs_commit_data *data,
const struct rpc_call_ops *call_ops)
{
- nfs_to_nfsd_file_put_local(localio);
+ nfs_local_file_put(localio);
call_ops->rpc_call_done(&data->task, data);
call_ops->rpc_release(data);
}