diff options
Diffstat (limited to 'net')
-rw-r--r-- | net/ceph/messenger.c | 107 | ||||
-rw-r--r-- | net/ceph/msgpool.c | 27 | ||||
-rw-r--r-- | net/ceph/osd_client.c | 363 | ||||
-rw-r--r-- | net/ceph/pagelist.c | 20 |
4 files changed, 348 insertions, 169 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 0a187196aeed..88e35830198c 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -156,7 +156,6 @@ static bool con_flag_test_and_set(struct ceph_connection *con, /* Slab caches for frequently-allocated structures */ static struct kmem_cache *ceph_msg_cache; -static struct kmem_cache *ceph_msg_data_cache; /* static tag bytes (protocol control messages) */ static char tag_msg = CEPH_MSGR_TAG_MSG; @@ -235,23 +234,11 @@ static int ceph_msgr_slab_init(void) if (!ceph_msg_cache) return -ENOMEM; - BUG_ON(ceph_msg_data_cache); - ceph_msg_data_cache = KMEM_CACHE(ceph_msg_data, 0); - if (ceph_msg_data_cache) - return 0; - - kmem_cache_destroy(ceph_msg_cache); - ceph_msg_cache = NULL; - - return -ENOMEM; + return 0; } static void ceph_msgr_slab_exit(void) { - BUG_ON(!ceph_msg_data_cache); - kmem_cache_destroy(ceph_msg_data_cache); - ceph_msg_data_cache = NULL; - BUG_ON(!ceph_msg_cache); kmem_cache_destroy(ceph_msg_cache); ceph_msg_cache = NULL; @@ -1141,16 +1128,13 @@ static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor) static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length) { struct ceph_msg_data_cursor *cursor = &msg->cursor; - struct ceph_msg_data *data; BUG_ON(!length); BUG_ON(length > msg->data_length); - BUG_ON(list_empty(&msg->data)); + BUG_ON(!msg->num_data_items); - cursor->data_head = &msg->data; cursor->total_resid = length; - data = list_first_entry(&msg->data, struct ceph_msg_data, links); - cursor->data = data; + cursor->data = msg->data; __ceph_msg_data_cursor_init(cursor); } @@ -1231,8 +1215,7 @@ static void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor, if (!cursor->resid && cursor->total_resid) { WARN_ON(!cursor->last_piece); - BUG_ON(list_is_last(&cursor->data->links, cursor->data_head)); - cursor->data = list_next_entry(cursor->data, links); + cursor->data++; __ceph_msg_data_cursor_init(cursor); new_piece = true; } @@ -1248,9 +1231,6 @@ static size_t sizeof_footer(struct ceph_connection *con) static void prepare_message_data(struct ceph_msg *msg, u32 data_len) { - BUG_ON(!msg); - BUG_ON(!data_len); - /* Initialize data cursor */ ceph_msg_data_cursor_init(msg, (size_t)data_len); @@ -1590,7 +1570,7 @@ static int write_partial_message_data(struct ceph_connection *con) dout("%s %p msg %p\n", __func__, con, msg); - if (list_empty(&msg->data)) + if (!msg->num_data_items) return -EINVAL; /* @@ -2347,8 +2327,7 @@ static int read_partial_msg_data(struct ceph_connection *con) u32 crc = 0; int ret; - BUG_ON(!msg); - if (list_empty(&msg->data)) + if (!msg->num_data_items) return -EIO; if (do_datacrc) @@ -3256,32 +3235,16 @@ bool ceph_con_keepalive_expired(struct ceph_connection *con, return false; } -static struct ceph_msg_data *ceph_msg_data_create(enum ceph_msg_data_type type) +static struct ceph_msg_data *ceph_msg_data_add(struct ceph_msg *msg) { - struct ceph_msg_data *data; - - if (WARN_ON(!ceph_msg_data_type_valid(type))) - return NULL; - - data = kmem_cache_zalloc(ceph_msg_data_cache, GFP_NOFS); - if (!data) - return NULL; - - data->type = type; - INIT_LIST_HEAD(&data->links); - - return data; + BUG_ON(msg->num_data_items >= msg->max_data_items); + return &msg->data[msg->num_data_items++]; } static void ceph_msg_data_destroy(struct ceph_msg_data *data) { - if (!data) - return; - - WARN_ON(!list_empty(&data->links)); if (data->type == CEPH_MSG_DATA_PAGELIST) ceph_pagelist_release(data->pagelist); - kmem_cache_free(ceph_msg_data_cache, data); } void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages, @@ -3292,13 +3255,12 @@ void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages, BUG_ON(!pages); BUG_ON(!length); - data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES); - BUG_ON(!data); + data = ceph_msg_data_add(msg); + data->type = CEPH_MSG_DATA_PAGES; data->pages = pages; data->length = length; data->alignment = alignment & ~PAGE_MASK; - list_add_tail(&data->links, &msg->data); msg->data_length += length; } EXPORT_SYMBOL(ceph_msg_data_add_pages); @@ -3311,11 +3273,11 @@ void ceph_msg_data_add_pagelist(struct ceph_msg *msg, BUG_ON(!pagelist); BUG_ON(!pagelist->length); - data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST); - BUG_ON(!data); + data = ceph_msg_data_add(msg); + data->type = CEPH_MSG_DATA_PAGELIST; + refcount_inc(&pagelist->refcnt); data->pagelist = pagelist; - list_add_tail(&data->links, &msg->data); msg->data_length += pagelist->length; } EXPORT_SYMBOL(ceph_msg_data_add_pagelist); @@ -3326,12 +3288,11 @@ void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos, { struct ceph_msg_data *data; - data = ceph_msg_data_create(CEPH_MSG_DATA_BIO); - BUG_ON(!data); + data = ceph_msg_data_add(msg); + data->type = CEPH_MSG_DATA_BIO; data->bio_pos = *bio_pos; data->bio_length = length; - list_add_tail(&data->links, &msg->data); msg->data_length += length; } EXPORT_SYMBOL(ceph_msg_data_add_bio); @@ -3342,11 +3303,10 @@ void ceph_msg_data_add_bvecs(struct ceph_msg *msg, { struct ceph_msg_data *data; - data = ceph_msg_data_create(CEPH_MSG_DATA_BVECS); - BUG_ON(!data); + data = ceph_msg_data_add(msg); + data->type = CEPH_MSG_DATA_BVECS; data->bvec_pos = *bvec_pos; - list_add_tail(&data->links, &msg->data); msg->data_length += bvec_pos->iter.bi_size; } EXPORT_SYMBOL(ceph_msg_data_add_bvecs); @@ -3355,8 +3315,8 @@ EXPORT_SYMBOL(ceph_msg_data_add_bvecs); * construct a new message with given type, size * the new msg has a ref count of 1. */ -struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, - bool can_fail) +struct ceph_msg *ceph_msg_new2(int type, int front_len, int max_data_items, + gfp_t flags, bool can_fail) { struct ceph_msg *m; @@ -3370,7 +3330,6 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, INIT_LIST_HEAD(&m->list_head); kref_init(&m->kref); - INIT_LIST_HEAD(&m->data); /* front */ if (front_len) { @@ -3385,6 +3344,15 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, } m->front_alloc_len = m->front.iov_len = front_len; + if (max_data_items) { + m->data = kmalloc_array(max_data_items, sizeof(*m->data), + flags); + if (!m->data) + goto out2; + + m->max_data_items = max_data_items; + } + dout("ceph_msg_new %p front %d\n", m, front_len); return m; @@ -3401,6 +3369,13 @@ out: } return NULL; } +EXPORT_SYMBOL(ceph_msg_new2); + +struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, + bool can_fail) +{ + return ceph_msg_new2(type, front_len, 0, flags, can_fail); +} EXPORT_SYMBOL(ceph_msg_new); /* @@ -3496,13 +3471,14 @@ static void ceph_msg_free(struct ceph_msg *m) { dout("%s %p\n", __func__, m); kvfree(m->front.iov_base); + kfree(m->data); kmem_cache_free(ceph_msg_cache, m); } static void ceph_msg_release(struct kref *kref) { struct ceph_msg *m = container_of(kref, struct ceph_msg, kref); - struct ceph_msg_data *data, *next; + int i; dout("%s %p\n", __func__, m); WARN_ON(!list_empty(&m->list_head)); @@ -3515,11 +3491,8 @@ static void ceph_msg_release(struct kref *kref) m->middle = NULL; } - list_for_each_entry_safe(data, next, &m->data, links) { - list_del_init(&data->links); - ceph_msg_data_destroy(data); - } - m->data_length = 0; + for (i = 0; i < m->num_data_items; i++) + ceph_msg_data_destroy(&m->data[i]); if (m->pool) ceph_msgpool_put(m->pool, m); diff --git a/net/ceph/msgpool.c b/net/ceph/msgpool.c index 72571535883f..e3ecb80cd182 100644 --- a/net/ceph/msgpool.c +++ b/net/ceph/msgpool.c @@ -14,7 +14,8 @@ static void *msgpool_alloc(gfp_t gfp_mask, void *arg) struct ceph_msgpool *pool = arg; struct ceph_msg *msg; - msg = ceph_msg_new(pool->type, pool->front_len, gfp_mask, true); + msg = ceph_msg_new2(pool->type, pool->front_len, pool->max_data_items, + gfp_mask, true); if (!msg) { dout("msgpool_alloc %s failed\n", pool->name); } else { @@ -35,11 +36,13 @@ static void msgpool_free(void *element, void *arg) } int ceph_msgpool_init(struct ceph_msgpool *pool, int type, - int front_len, int size, bool blocking, const char *name) + int front_len, int max_data_items, int size, + const char *name) { dout("msgpool %s init\n", name); pool->type = type; pool->front_len = front_len; + pool->max_data_items = max_data_items; pool->pool = mempool_create(size, msgpool_alloc, msgpool_free, pool); if (!pool->pool) return -ENOMEM; @@ -53,18 +56,21 @@ void ceph_msgpool_destroy(struct ceph_msgpool *pool) mempool_destroy(pool->pool); } -struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool, - int front_len) +struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool, int front_len, + int max_data_items) { struct ceph_msg *msg; - if (front_len > pool->front_len) { - dout("msgpool_get %s need front %d, pool size is %d\n", - pool->name, front_len, pool->front_len); - WARN_ON(1); + if (front_len > pool->front_len || + max_data_items > pool->max_data_items) { + pr_warn_ratelimited("%s need %d/%d, pool %s has %d/%d\n", + __func__, front_len, max_data_items, pool->name, + pool->front_len, pool->max_data_items); + WARN_ON_ONCE(1); /* try to alloc a fresh message */ - return ceph_msg_new(pool->type, front_len, GFP_NOFS, false); + return ceph_msg_new2(pool->type, front_len, max_data_items, + GFP_NOFS, false); } msg = mempool_alloc(pool->pool, GFP_NOFS); @@ -80,6 +86,9 @@ void ceph_msgpool_put(struct ceph_msgpool *pool, struct ceph_msg *msg) msg->front.iov_len = pool->front_len; msg->hdr.front_len = cpu_to_le32(pool->front_len); + msg->data_length = 0; + msg->num_data_items = 0; + kref_init(&msg->kref); /* retake single ref */ mempool_free(msg, pool->pool); } diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 60934bd8796c..d23a9f81f3d7 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -126,6 +126,9 @@ static void ceph_osd_data_init(struct ceph_osd_data *osd_data) osd_data->type = CEPH_OSD_DATA_TYPE_NONE; } +/* + * Consumes @pages if @own_pages is true. + */ static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data, struct page **pages, u64 length, u32 alignment, bool pages_from_pool, bool own_pages) @@ -138,6 +141,9 @@ static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data, osd_data->own_pages = own_pages; } +/* + * Consumes a ref on @pagelist. + */ static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data, struct ceph_pagelist *pagelist) { @@ -362,6 +368,8 @@ static void ceph_osd_data_release(struct ceph_osd_data *osd_data) num_pages = calc_pages_for((u64)osd_data->alignment, (u64)osd_data->length); ceph_release_page_vector(osd_data->pages, num_pages); + } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) { + ceph_pagelist_release(osd_data->pagelist); } ceph_osd_data_init(osd_data); } @@ -402,6 +410,9 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req, case CEPH_OSD_OP_LIST_WATCHERS: ceph_osd_data_release(&op->list_watchers.response_data); break; + case CEPH_OSD_OP_COPY_FROM: + ceph_osd_data_release(&op->copy_from.osd_data); + break; default: break; } @@ -606,12 +617,15 @@ static int ceph_oloc_encoding_size(const struct ceph_object_locator *oloc) return 8 + 4 + 4 + 4 + (oloc->pool_ns ? oloc->pool_ns->len : 0); } -int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp) +static int __ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp, + int num_request_data_items, + int num_reply_data_items) { struct ceph_osd_client *osdc = req->r_osdc; struct ceph_msg *msg; int msg_size; + WARN_ON(req->r_request || req->r_reply); WARN_ON(ceph_oid_empty(&req->r_base_oid)); WARN_ON(ceph_oloc_empty(&req->r_base_oloc)); @@ -633,9 +647,11 @@ int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp) msg_size += 4 + 8; /* retry_attempt, features */ if (req->r_mempool) - msg = ceph_msgpool_get(&osdc->msgpool_op, 0); + msg = ceph_msgpool_get(&osdc->msgpool_op, msg_size, + num_request_data_items); else - msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp, true); + msg = ceph_msg_new2(CEPH_MSG_OSD_OP, msg_size, + num_request_data_items, gfp, true); if (!msg) return -ENOMEM; @@ -648,9 +664,11 @@ int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp) msg_size += req->r_num_ops * sizeof(struct ceph_osd_op); if (req->r_mempool) - msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); + msg = ceph_msgpool_get(&osdc->msgpool_op_reply, msg_size, + num_reply_data_items); else - msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg_size, gfp, true); + msg = ceph_msg_new2(CEPH_MSG_OSD_OPREPLY, msg_size, + num_reply_data_items, gfp, true); if (!msg) return -ENOMEM; @@ -658,7 +676,6 @@ int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp) return 0; } -EXPORT_SYMBOL(ceph_osdc_alloc_messages); static bool osd_req_opcode_valid(u16 opcode) { @@ -671,6 +688,65 @@ __CEPH_FORALL_OSD_OPS(GENERATE_CASE) } } +static void get_num_data_items(struct ceph_osd_request *req, + int *num_request_data_items, + int *num_reply_data_items) +{ + struct ceph_osd_req_op *op; + + *num_request_data_items = 0; + *num_reply_data_items = 0; + + for (op = req->r_ops; op != &req->r_ops[req->r_num_ops]; op++) { + switch (op->op) { + /* request */ + case CEPH_OSD_OP_WRITE: + case CEPH_OSD_OP_WRITEFULL: + case CEPH_OSD_OP_SETXATTR: + case CEPH_OSD_OP_CMPXATTR: + case CEPH_OSD_OP_NOTIFY_ACK: + case CEPH_OSD_OP_COPY_FROM: + *num_request_data_items += 1; + break; + + /* reply */ + case CEPH_OSD_OP_STAT: + case CEPH_OSD_OP_READ: + case CEPH_OSD_OP_LIST_WATCHERS: + *num_reply_data_items += 1; + break; + + /* both */ + case CEPH_OSD_OP_NOTIFY: + *num_request_data_items += 1; + *num_reply_data_items += 1; + break; + case CEPH_OSD_OP_CALL: + *num_request_data_items += 2; + *num_reply_data_items += 1; + break; + + default: + WARN_ON(!osd_req_opcode_valid(op->op)); + break; + } + } +} + +/* + * oid, oloc and OSD op opcode(s) must be filled in before this function + * is called. + */ +int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp) +{ + int num_request_data_items, num_reply_data_items; + + get_num_data_items(req, &num_request_data_items, &num_reply_data_items); + return __ceph_osdc_alloc_messages(req, gfp, num_request_data_items, + num_reply_data_items); +} +EXPORT_SYMBOL(ceph_osdc_alloc_messages); + /* * This is an osd op init function for opcodes that have no data or * other information associated with them. It also serves as a @@ -767,22 +843,19 @@ void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req, EXPORT_SYMBOL(osd_req_op_extent_dup_last); int osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, - u16 opcode, const char *class, const char *method) + const char *class, const char *method) { - struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, - opcode, 0); + struct ceph_osd_req_op *op; struct ceph_pagelist *pagelist; size_t payload_len = 0; size_t size; - BUG_ON(opcode != CEPH_OSD_OP_CALL); + op = _osd_req_op_init(osd_req, which, CEPH_OSD_OP_CALL, 0); - pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS); + pagelist = ceph_pagelist_alloc(GFP_NOFS); if (!pagelist) return -ENOMEM; - ceph_pagelist_init(pagelist); - op->cls.class_name = class; size = strlen(class); BUG_ON(size > (size_t) U8_MAX); @@ -815,12 +888,10 @@ int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which, BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR); - pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS); + pagelist = ceph_pagelist_alloc(GFP_NOFS); if (!pagelist) return -ENOMEM; - ceph_pagelist_init(pagelist); - payload_len = strlen(name); op->xattr.name_len = payload_len; ceph_pagelist_append(pagelist, name, payload_len); @@ -900,12 +971,6 @@ static void ceph_osdc_msg_data_add(struct ceph_msg *msg, static u32 osd_req_encode_op(struct ceph_osd_op *dst, const struct ceph_osd_req_op *src) { - if (WARN_ON(!osd_req_opcode_valid(src->op))) { - pr_err("unrecognized osd opcode %d\n", src->op); - - return 0; - } - switch (src->op) { case CEPH_OSD_OP_STAT: break; @@ -955,6 +1020,14 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst, case CEPH_OSD_OP_CREATE: case CEPH_OSD_OP_DELETE: break; + case CEPH_OSD_OP_COPY_FROM: + dst->copy_from.snapid = cpu_to_le64(src->copy_from.snapid); + dst->copy_from.src_version = + cpu_to_le64(src->copy_from.src_version); + dst->copy_from.flags = src->copy_from.flags; + dst->copy_from.src_fadvise_flags = + cpu_to_le32(src->copy_from.src_fadvise_flags); + break; default: pr_err("unsupported osd opcode %s\n", ceph_osd_op_name(src->op)); @@ -1038,7 +1111,15 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, if (flags & CEPH_OSD_FLAG_WRITE) req->r_data_offset = off; - r = ceph_osdc_alloc_messages(req, GFP_NOFS); + if (num_ops > 1) + /* + * This is a special case for ceph_writepages_start(), but it + * also covers ceph_uninline_data(). If more multi-op request + * use cases emerge, we will need a separate helper. + */ + r = __ceph_osdc_alloc_messages(req, GFP_NOFS, num_ops, 0); + else + r = ceph_osdc_alloc_messages(req, GFP_NOFS); if (r) goto fail; @@ -1845,48 +1926,55 @@ static bool should_plug_request(struct ceph_osd_request *req) return true; } -static void setup_request_data(struct ceph_osd_request *req, - struct ceph_msg *msg) +/* + * Keep get_num_data_items() in sync with this function. + */ +static void setup_request_data(struct ceph_osd_request *req) { - u32 data_len = 0; - int i; + struct ceph_msg *request_msg = req->r_request; + struct ceph_msg *reply_msg = req->r_reply; + struct ceph_osd_req_op *op; - if (!list_empty(&msg->data)) + if (req->r_request->num_data_items || req->r_reply->num_data_items) return; - WARN_ON(msg->data_length); - for (i = 0; i < req->r_num_ops; i++) { - struct ceph_osd_req_op *op = &req->r_ops[i]; - + WARN_ON(request_msg->data_length || reply_msg->data_length); + for (op = req->r_ops; op != &req->r_ops[req->r_num_ops]; op++) { switch (op->op) { /* request */ case CEPH_OSD_OP_WRITE: case CEPH_OSD_OP_WRITEFULL: WARN_ON(op->indata_len != op->extent.length); - ceph_osdc_msg_data_add(msg, &op->extent.osd_data); + ceph_osdc_msg_data_add(request_msg, + &op->extent.osd_data); break; case CEPH_OSD_OP_SETXATTR: case CEPH_OSD_OP_CMPXATTR: WARN_ON(op->indata_len != op->xattr.name_len + op->xattr.value_len); - ceph_osdc_msg_data_add(msg, &op->xattr.osd_data); + ceph_osdc_msg_data_add(request_msg, + &op->xattr.osd_data); break; case CEPH_OSD_OP_NOTIFY_ACK: - ceph_osdc_msg_data_add(msg, + ceph_osdc_msg_data_add(request_msg, &op->notify_ack.request_data); break; + case CEPH_OSD_OP_COPY_FROM: + ceph_osdc_msg_data_add(request_msg, + &op->copy_from.osd_data); + break; /* reply */ case CEPH_OSD_OP_STAT: - ceph_osdc_msg_data_add(req->r_reply, + ceph_osdc_msg_data_add(reply_msg, &op->raw_data_in); break; case CEPH_OSD_OP_READ: - ceph_osdc_msg_data_add(req->r_reply, + ceph_osdc_msg_data_add(reply_msg, &op->extent.osd_data); break; case CEPH_OSD_OP_LIST_WATCHERS: - ceph_osdc_msg_data_add(req->r_reply, + ceph_osdc_msg_data_add(reply_msg, &op->list_watchers.response_data); break; @@ -1895,25 +1983,23 @@ static void setup_request_data(struct ceph_osd_request *req, WARN_ON(op->indata_len != op->cls.class_len + op->cls.method_len + op->cls.indata_len); - ceph_osdc_msg_data_add(msg, &op->cls.request_info); + ceph_osdc_msg_data_add(request_msg, + &op->cls.request_info); /* optional, can be NONE */ - ceph_osdc_msg_data_add(msg, &op->cls.request_data); + ceph_osdc_msg_data_add(request_msg, + &op->cls.request_data); /* optional, can be NONE */ - ceph_osdc_msg_data_add(req->r_reply, + ceph_osdc_msg_data_add(reply_msg, &op->cls.response_data); break; case CEPH_OSD_OP_NOTIFY: - ceph_osdc_msg_data_add(msg, + ceph_osdc_msg_data_add(request_msg, &op->notify.request_data); - ceph_osdc_msg_data_add(req->r_reply, + ceph_osdc_msg_data_add(reply_msg, &op->notify.response_data); break; } - - data_len += op->indata_len; } - - WARN_ON(data_len != msg->data_length); } static void encode_pgid(void **p, const struct ceph_pg *pgid) @@ -1961,7 +2047,7 @@ static void encode_request_partial(struct ceph_osd_request *req, req->r_data_offset || req->r_snapc); } - setup_request_data(req, msg); + setup_request_data(req); encode_spgid(&p, &req->r_t.spgid); /* actual spg */ ceph_encode_32(&p, req->r_t.pgid.seed); /* raw hash */ @@ -3001,11 +3087,21 @@ static void linger_submit(struct ceph_osd_linger_request *lreq) struct ceph_osd_client *osdc = lreq->osdc; struct ceph_osd *osd; + down_write(&osdc->lock); + linger_register(lreq); + if (lreq->is_watch) { + lreq->reg_req->r_ops[0].watch.cookie = lreq->linger_id; + lreq->ping_req->r_ops[0].watch.cookie = lreq->linger_id; + } else { + lreq->reg_req->r_ops[0].notify.cookie = lreq->linger_id; + } + calc_target(osdc, &lreq->t, NULL, false); osd = lookup_create_osd(osdc, lreq->t.osd, true); link_linger(osd, lreq); send_linger(lreq); + up_write(&osdc->lock); } static void cancel_linger_map_check(struct ceph_osd_linger_request *lreq) @@ -4318,9 +4414,7 @@ static void handle_watch_notify(struct ceph_osd_client *osdc, lreq->notify_id, notify_id); } else if (!completion_done(&lreq->notify_finish_wait)) { struct ceph_msg_data *data = - list_first_entry_or_null(&msg->data, - struct ceph_msg_data, - links); + msg->num_data_items ? &msg->data[0] : NULL; if (data) { if (lreq->preply_pages) { @@ -4476,6 +4570,23 @@ alloc_linger_request(struct ceph_osd_linger_request *lreq) ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid); ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc); + return req; +} + +static struct ceph_osd_request * +alloc_watch_request(struct ceph_osd_linger_request *lreq, u8 watch_opcode) +{ + struct ceph_osd_request *req; + + req = alloc_linger_request(lreq); + if (!req) + return NULL; + + /* + * Pass 0 for cookie because we don't know it yet, it will be + * filled in by linger_submit(). + */ + osd_req_op_watch_init(req, 0, 0, watch_opcode); if (ceph_osdc_alloc_messages(req, GFP_NOIO)) { ceph_osdc_put_request(req); @@ -4514,27 +4625,19 @@ ceph_osdc_watch(struct ceph_osd_client *osdc, lreq->t.flags = CEPH_OSD_FLAG_WRITE; ktime_get_real_ts64(&lreq->mtime); - lreq->reg_req = alloc_linger_request(lreq); + lreq->reg_req = alloc_watch_request(lreq, CEPH_OSD_WATCH_OP_WATCH); if (!lreq->reg_req) { ret = -ENOMEM; goto err_put_lreq; } - lreq->ping_req = alloc_linger_request(lreq); + lreq->ping_req = alloc_watch_request(lreq, CEPH_OSD_WATCH_OP_PING); if (!lreq->ping_req) { ret = -ENOMEM; goto err_put_lreq; } - down_write(&osdc->lock); - linger_register(lreq); /* before osd_req_op_* */ - osd_req_op_watch_init(lreq->reg_req, 0, lreq->linger_id, - CEPH_OSD_WATCH_OP_WATCH); - osd_req_op_watch_init(lreq->ping_req, 0, lreq->linger_id, - CEPH_OSD_WATCH_OP_PING); linger_submit(lreq); - up_write(&osdc->lock); - ret = linger_reg_commit_wait(lreq); if (ret) { linger_cancel(lreq); @@ -4599,11 +4702,10 @@ static int osd_req_op_notify_ack_init(struct ceph_osd_request *req, int which, op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0); - pl = kmalloc(sizeof(*pl), GFP_NOIO); + pl = ceph_pagelist_alloc(GFP_NOIO); if (!pl) return -ENOMEM; - ceph_pagelist_init(pl); ret = ceph_pagelist_encode_64(pl, notify_id); ret |= ceph_pagelist_encode_64(pl, cookie); if (payload) { @@ -4641,12 +4743,12 @@ int ceph_osdc_notify_ack(struct ceph_osd_client *osdc, ceph_oloc_copy(&req->r_base_oloc, oloc); req->r_flags = CEPH_OSD_FLAG_READ; - ret = ceph_osdc_alloc_messages(req, GFP_NOIO); + ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload, + payload_len); if (ret) goto out_put_req; - ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload, - payload_len); + ret = ceph_osdc_alloc_messages(req, GFP_NOIO); if (ret) goto out_put_req; @@ -4670,11 +4772,10 @@ static int osd_req_op_notify_init(struct ceph_osd_request *req, int which, op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY, 0); op->notify.cookie = cookie; - pl = kmalloc(sizeof(*pl), GFP_NOIO); + pl = ceph_pagelist_alloc(GFP_NOIO); if (!pl) return -ENOMEM; - ceph_pagelist_init(pl); ret = ceph_pagelist_encode_32(pl, 1); /* prot_ver */ ret |= ceph_pagelist_encode_32(pl, timeout); ret |= ceph_pagelist_encode_32(pl, payload_len); @@ -4733,29 +4834,30 @@ int ceph_osdc_notify(struct ceph_osd_client *osdc, goto out_put_lreq; } + /* + * Pass 0 for cookie because we don't know it yet, it will be + * filled in by linger_submit(). + */ + ret = osd_req_op_notify_init(lreq->reg_req, 0, 0, 1, timeout, + payload, payload_len); + if (ret) + goto out_put_lreq; + /* for notify_id */ pages = ceph_alloc_page_vector(1, GFP_NOIO); if (IS_ERR(pages)) { ret = PTR_ERR(pages); goto out_put_lreq; } - - down_write(&osdc->lock); - linger_register(lreq); /* before osd_req_op_* */ - ret = osd_req_op_notify_init(lreq->reg_req, 0, lreq->linger_id, 1, - timeout, payload, payload_len); - if (ret) { - linger_unregister(lreq); - up_write(&osdc->lock); - ceph_release_page_vector(pages, 1); - goto out_put_lreq; - } ceph_osd_data_pages_init(osd_req_op_data(lreq->reg_req, 0, notify, response_data), pages, PAGE_SIZE, 0, false, true); - linger_submit(lreq); - up_write(&osdc->lock); + ret = ceph_osdc_alloc_messages(lreq->reg_req, GFP_NOIO); + if (ret) + goto out_put_lreq; + + linger_submit(lreq); ret = linger_reg_commit_wait(lreq); if (!ret) ret = linger_notify_finish_wait(lreq); @@ -4881,10 +4983,6 @@ int ceph_osdc_list_watchers(struct ceph_osd_client *osdc, ceph_oloc_copy(&req->r_base_oloc, oloc); req->r_flags = CEPH_OSD_FLAG_READ; - ret = ceph_osdc_alloc_messages(req, GFP_NOIO); - if (ret) - goto out_put_req; - pages = ceph_alloc_page_vector(1, GFP_NOIO); if (IS_ERR(pages)) { ret = PTR_ERR(pages); @@ -4896,6 +4994,10 @@ int ceph_osdc_list_watchers(struct ceph_osd_client *osdc, response_data), pages, PAGE_SIZE, 0, false, true); + ret = ceph_osdc_alloc_messages(req, GFP_NOIO); + if (ret) + goto out_put_req; + ceph_osdc_start_request(osdc, req, false); ret = ceph_osdc_wait_request(osdc, req); if (ret >= 0) { @@ -4958,11 +5060,7 @@ int ceph_osdc_call(struct ceph_osd_client *osdc, ceph_oloc_copy(&req->r_base_oloc, oloc); req->r_flags = flags; - ret = ceph_osdc_alloc_messages(req, GFP_NOIO); - if (ret) - goto out_put_req; - - ret = osd_req_op_cls_init(req, 0, CEPH_OSD_OP_CALL, class, method); + ret = osd_req_op_cls_init(req, 0, class, method); if (ret) goto out_put_req; @@ -4973,6 +5071,10 @@ int ceph_osdc_call(struct ceph_osd_client *osdc, osd_req_op_cls_response_data_pages(req, 0, &resp_page, *resp_len, 0, false, false); + ret = ceph_osdc_alloc_messages(req, GFP_NOIO); + if (ret) + goto out_put_req; + ceph_osdc_start_request(osdc, req, false); ret = ceph_osdc_wait_request(osdc, req); if (ret >= 0) { @@ -5021,11 +5123,12 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) goto out_map; err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP, - PAGE_SIZE, 10, true, "osd_op"); + PAGE_SIZE, CEPH_OSD_SLAB_OPS, 10, "osd_op"); if (err < 0) goto out_mempool; err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY, - PAGE_SIZE, 10, true, "osd_op_reply"); + PAGE_SIZE, CEPH_OSD_SLAB_OPS, 10, + "osd_op_reply"); if (err < 0) goto out_msgpool; @@ -5168,6 +5271,80 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, } EXPORT_SYMBOL(ceph_osdc_writepages); +static int osd_req_op_copy_from_init(struct ceph_osd_request *req, + u64 src_snapid, u64 src_version, + struct ceph_object_id *src_oid, + struct ceph_object_locator *src_oloc, + u32 src_fadvise_flags, + u32 dst_fadvise_flags, + u8 copy_from_flags) +{ + struct ceph_osd_req_op *op; + struct page **pages; + void *p, *end; + + pages = ceph_alloc_page_vector(1, GFP_KERNEL); + if (IS_ERR(pages)) + return PTR_ERR(pages); + + op = _osd_req_op_init(req, 0, CEPH_OSD_OP_COPY_FROM, dst_fadvise_flags); + op->copy_from.snapid = src_snapid; + op->copy_from.src_version = src_version; + op->copy_from.flags = copy_from_flags; + op->copy_from.src_fadvise_flags = src_fadvise_flags; + + p = page_address(pages[0]); + end = p + PAGE_SIZE; + ceph_encode_string(&p, end, src_oid->name, src_oid->name_len); + encode_oloc(&p, end, src_oloc); + op->indata_len = PAGE_SIZE - (end - p); + + ceph_osd_data_pages_init(&op->copy_from.osd_data, pages, + op->indata_len, 0, false, true); + return 0; +} + +int ceph_osdc_copy_from(struct ceph_osd_client *osdc, + u64 src_snapid, u64 src_version, + struct ceph_object_id *src_oid, + struct ceph_object_locator *src_oloc, + u32 src_fadvise_flags, + struct ceph_object_id *dst_oid, + struct ceph_object_locator *dst_oloc, + u32 dst_fadvise_flags, + u8 copy_from_flags) +{ + struct ceph_osd_request *req; + int ret; + + req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL); + if (!req) + return -ENOMEM; + + req->r_flags = CEPH_OSD_FLAG_WRITE; + + ceph_oloc_copy(&req->r_t.base_oloc, dst_oloc); + ceph_oid_copy(&req->r_t.base_oid, dst_oid); + + ret = osd_req_op_copy_from_init(req, src_snapid, src_version, src_oid, + src_oloc, src_fadvise_flags, + dst_fadvise_flags, copy_from_flags); + if (ret) + goto out; + + ret = ceph_osdc_alloc_messages(req, GFP_KERNEL); + if (ret) + goto out; + + ceph_osdc_start_request(osdc, req, false); + ret = ceph_osdc_wait_request(osdc, req); + +out: + ceph_osdc_put_request(req); + return ret; +} +EXPORT_SYMBOL(ceph_osdc_copy_from); + int __init ceph_osdc_setup(void) { size_t size = sizeof(struct ceph_osd_request) + @@ -5295,7 +5472,7 @@ static struct ceph_msg *alloc_msg_with_page_vector(struct ceph_msg_header *hdr) u32 front_len = le32_to_cpu(hdr->front_len); u32 data_len = le32_to_cpu(hdr->data_len); - m = ceph_msg_new(type, front_len, GFP_NOIO, false); + m = ceph_msg_new2(type, front_len, 1, GFP_NOIO, false); if (!m) return NULL; diff --git a/net/ceph/pagelist.c b/net/ceph/pagelist.c index 2ea0564771d2..65e34f78b05d 100644 --- a/net/ceph/pagelist.c +++ b/net/ceph/pagelist.c @@ -6,6 +6,26 @@ #include <linux/highmem.h> #include <linux/ceph/pagelist.h> +struct ceph_pagelist *ceph_pagelist_alloc(gfp_t gfp_flags) +{ + struct ceph_pagelist *pl; + + pl = kmalloc(sizeof(*pl), gfp_flags); + if (!pl) + return NULL; + + INIT_LIST_HEAD(&pl->head); + pl->mapped_tail = NULL; + pl->length = 0; + pl->room = 0; + INIT_LIST_HEAD(&pl->free_list); + pl->num_pages_free = 0; + refcount_set(&pl->refcnt, 1); + + return pl; +} +EXPORT_SYMBOL(ceph_pagelist_alloc); + static void ceph_pagelist_unmap_tail(struct ceph_pagelist *pl) { if (pl->mapped_tail) { |