diff options
Diffstat (limited to 'fs/ceph')
-rw-r--r-- | fs/ceph/messenger.c | 20 | ||||
-rw-r--r-- | fs/ceph/messenger.h | 4 | ||||
-rw-r--r-- | fs/ceph/osd_client.c | 118 | ||||
-rw-r--r-- | fs/ceph/osd_client.h | 8 |
4 files changed, 100 insertions, 50 deletions
diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c index f708803e6857..81bc779adb90 100644 --- a/fs/ceph/messenger.c +++ b/fs/ceph/messenger.c @@ -1985,30 +1985,30 @@ void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg) } /* - * Revoke a page vector that we may be reading data into + * Revoke a message that we may be reading data into */ -void ceph_con_revoke_pages(struct ceph_connection *con, struct page **pages) +void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg) { mutex_lock(&con->mutex); - if (con->in_msg && con->in_msg->pages == pages) { + if (con->in_msg && con->in_msg == msg) { + unsigned front_len = le32_to_cpu(con->in_hdr.front_len); + unsigned middle_len = le32_to_cpu(con->in_hdr.middle_len); unsigned data_len = le32_to_cpu(con->in_hdr.data_len); /* skip rest of message */ - dout("con_revoke_pages %p msg %p pages %p revoked\n", con, - con->in_msg, pages); - if (con->in_msg_pos.data_pos < data_len) - con->in_base_pos = con->in_msg_pos.data_pos - data_len; - else + dout("con_revoke_pages %p msg %p revoked\n", con, msg); con->in_base_pos = con->in_base_pos - sizeof(struct ceph_msg_header) - + front_len - + middle_len - + data_len - sizeof(struct ceph_msg_footer); - con->in_msg->pages = NULL; ceph_msg_put(con->in_msg); con->in_msg = NULL; con->in_tag = CEPH_MSGR_TAG_READY; } else { dout("con_revoke_pages %p msg %p pages %p no-op\n", - con, con->in_msg, pages); + con, con->in_msg, msg); } mutex_unlock(&con->mutex); } diff --git a/fs/ceph/messenger.h b/fs/ceph/messenger.h index dca2d32b40de..c26a3d8aa78c 100644 --- a/fs/ceph/messenger.h +++ b/fs/ceph/messenger.h @@ -226,8 +226,8 @@ extern void ceph_con_open(struct ceph_connection *con, extern void ceph_con_close(struct ceph_connection *con); extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg); extern void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg); -extern void ceph_con_revoke_pages(struct ceph_connection *con, - struct page **pages); +extern void ceph_con_revoke_message(struct ceph_connection *con, + struct ceph_msg *msg); extern void ceph_con_keepalive(struct ceph_connection *con); extern struct ceph_connection *ceph_con_get(struct ceph_connection *con); extern void ceph_con_put(struct ceph_connection *con); diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c index 44abe299c69f..df2106839713 100644 --- a/fs/ceph/osd_client.c +++ b/fs/ceph/osd_client.c @@ -13,6 +13,8 @@ #include "decode.h" #include "auth.h" +#define OSD_REPLY_RESERVE_FRONT_LEN 512 + const static struct ceph_connection_operations osd_con_ops; static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd); @@ -73,6 +75,16 @@ static void calc_layout(struct ceph_osd_client *osdc, req->r_oid, req->r_oid_len, objoff, objlen, req->r_num_pages); } +static void remove_replies(struct ceph_osd_request *req) +{ + int i; + int max = ARRAY_SIZE(req->replies); + + for (i=0; i<max; i++) { + if (req->replies[i]) + ceph_msg_put(req->replies[i]); + } +} /* * requests @@ -87,12 +99,13 @@ void ceph_osdc_release_request(struct kref *kref) ceph_msg_put(req->r_request); if (req->r_reply) ceph_msg_put(req->r_reply); - if (req->r_con_filling_pages) { + remove_replies(req); + if (req->r_con_filling_msg) { dout("release_request revoking pages %p from con %p\n", - req->r_pages, req->r_con_filling_pages); - ceph_con_revoke_pages(req->r_con_filling_pages, - req->r_pages); - ceph_con_put(req->r_con_filling_pages); + req->r_pages, req->r_con_filling_msg); + ceph_con_revoke_message(req->r_con_filling_msg, + req->r_reply); + ceph_con_put(req->r_con_filling_msg); } if (req->r_own_pages) ceph_release_page_vector(req->r_pages, @@ -104,6 +117,60 @@ void ceph_osdc_release_request(struct kref *kref) kfree(req); } +static int alloc_replies(struct ceph_osd_request *req, int num_reply) +{ + int i; + int max = ARRAY_SIZE(req->replies); + + BUG_ON(num_reply > max); + + for (i=0; i<num_reply; i++) { + req->replies[i] = ceph_msg_new(0, OSD_REPLY_RESERVE_FRONT_LEN, 0, 0, NULL); + if (IS_ERR(req->replies[i])) { + int j; + int err = PTR_ERR(req->replies[i]); + for (j = 0; j<=i; j++) { + ceph_msg_put(req->replies[j]); + } + return err; + } + } + + for (; i<max; i++) { + req->replies[i] = NULL; + } + + req->cur_reply = 0; + + return 0; +} + +static struct ceph_msg *__get_next_reply(struct ceph_connection *con, + struct ceph_osd_request *req, + int front_len) +{ + struct ceph_msg *reply; + if (req->r_con_filling_msg) { + dout("revoking reply msg %p from old con %p\n", req->r_reply, + req->r_con_filling_msg); + ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply); + ceph_con_put(req->r_con_filling_msg); + req->cur_reply = 0; + } + reply = req->replies[req->cur_reply]; + if (!reply || front_len > OSD_REPLY_RESERVE_FRONT_LEN) { + /* maybe we can allocate it now? */ + reply = ceph_msg_new(0, front_len, 0, 0, NULL); + if (!reply || IS_ERR(reply)) { + pr_err(" reply alloc failed, front_len=%d\n", front_len); + return ERR_PTR(-ENOMEM); + } + } + req->r_con_filling_msg = ceph_con_get(con); + req->r_reply = ceph_msg_get(reply); /* for duration of read over socket */ + return ceph_msg_get(reply); +} + /* * build new request AND message, calculate layout, and adjust file * extent as needed. @@ -147,7 +214,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, if (req == NULL) return ERR_PTR(-ENOMEM); - err = ceph_msgpool_resv(&osdc->msgpool_op_reply, num_reply); + err = alloc_replies(req, num_reply); if (err) { ceph_osdc_put_request(req); return ERR_PTR(-ENOMEM); @@ -173,7 +240,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, else msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, 0, 0, NULL); if (IS_ERR(msg)) { - ceph_msgpool_resv(&osdc->msgpool_op_reply, -num_reply); ceph_osdc_put_request(req); return ERR_PTR(PTR_ERR(msg)); } @@ -471,8 +537,6 @@ static void __unregister_request(struct ceph_osd_client *osdc, rb_erase(&req->r_node, &osdc->requests); osdc->num_requests--; - ceph_msgpool_resv(&osdc->msgpool_op_reply, -req->r_num_prealloc_reply); - if (req->r_osd) { /* make sure the original request isn't in flight. */ ceph_con_revoke(&req->r_osd->o_con, req->r_request); @@ -724,12 +788,12 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, flags = le32_to_cpu(rhead->flags); /* - * if this connection filled our pages, drop our reference now, to + * if this connection filled our message, drop our reference now, to * avoid a (safe but slower) revoke later. */ - if (req->r_con_filling_pages == con && req->r_pages == msg->pages) { - dout(" got pages, dropping con_filling_pages ref %p\n", con); - req->r_con_filling_pages = NULL; + if (req->r_con_filling_msg == con && req->r_reply == msg) { + dout(" got pages, dropping con_filling_msg ref %p\n", con); + req->r_con_filling_msg = NULL; ceph_con_put(con); } @@ -998,7 +1062,7 @@ bad: * find those pages. * 0 = success, -1 failure. */ -static int prepare_pages(struct ceph_connection *con, +static int __prepare_pages(struct ceph_connection *con, struct ceph_msg_header *hdr, struct ceph_osd_request *req, u64 tid, @@ -1017,20 +1081,10 @@ static int prepare_pages(struct ceph_connection *con, osdc = osd->o_osdc; - dout("prepare_pages on msg %p want %d\n", m, want); - dout("prepare_pages tid %llu has %d pages, want %d\n", + dout("__prepare_pages on msg %p tid %llu, has %d pages, want %d\n", m, tid, req->r_num_pages, want); if (unlikely(req->r_num_pages < want)) goto out; - - if (req->r_con_filling_pages) { - dout("revoking pages %p from old con %p\n", req->r_pages, - req->r_con_filling_pages); - ceph_con_revoke_pages(req->r_con_filling_pages, req->r_pages); - ceph_con_put(req->r_con_filling_pages); - } - req->r_con_filling_pages = ceph_con_get(con); - req->r_reply = ceph_msg_get(m); /* for duration of read over socket */ m->pages = req->r_pages; m->nr_pages = req->r_num_pages; ret = 0; /* success */ @@ -1164,13 +1218,8 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) err = ceph_msgpool_init(&osdc->msgpool_op, 4096, 10, true); if (err < 0) goto out_mempool; - err = ceph_msgpool_init(&osdc->msgpool_op_reply, 512, 0, false); - if (err < 0) - goto out_msgpool; return 0; -out_msgpool: - ceph_msgpool_destroy(&osdc->msgpool_op); out_mempool: mempool_destroy(osdc->req_mempool); out: @@ -1186,7 +1235,6 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc) } mempool_destroy(osdc->req_mempool); ceph_msgpool_destroy(&osdc->msgpool_op); - ceph_msgpool_destroy(&osdc->msgpool_op_reply); } /* @@ -1323,17 +1371,17 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con, if (!req) { *skip = 1; m = NULL; - dout("prepare_pages unknown tid %llu\n", tid); + dout("alloc_msg unknown tid %llu\n", tid); goto out; } - m = ceph_msgpool_get(&osdc->msgpool_op_reply, front); - if (!m) { + m = __get_next_reply(con, req, front); + if (!m || IS_ERR(m)) { *skip = 1; goto out; } if (data_len > 0) { - err = prepare_pages(con, hdr, req, tid, m); + err = __prepare_pages(con, hdr, req, tid, m); if (err < 0) { *skip = 1; ceph_msg_put(m); diff --git a/fs/ceph/osd_client.h b/fs/ceph/osd_client.h index 4162c6810a8f..8d533d9406ff 100644 --- a/fs/ceph/osd_client.h +++ b/fs/ceph/osd_client.h @@ -44,7 +44,7 @@ struct ceph_osd_request { struct ceph_osd *r_osd; struct ceph_pg r_pgid; - struct ceph_connection *r_con_filling_pages; + struct ceph_connection *r_con_filling_msg; struct ceph_msg *r_request, *r_reply; int r_result; @@ -75,6 +75,9 @@ struct ceph_osd_request { struct page **r_pages; /* pages for data payload */ int r_pages_from_pool; int r_own_pages; /* if true, i own page list */ + + struct ceph_msg *replies[2]; + int cur_reply; }; struct ceph_osd_client { @@ -98,8 +101,7 @@ struct ceph_osd_client { mempool_t *req_mempool; - struct ceph_msgpool msgpool_op; - struct ceph_msgpool msgpool_op_reply; + struct ceph_msgpool msgpool_op; }; extern int ceph_osdc_init(struct ceph_osd_client *osdc, |