diff options
author | Jiri Kosina <jkosina@suse.cz> | 2011-04-26 10:22:15 +0200 |
---|---|---|
committer | Jiri Kosina <jkosina@suse.cz> | 2011-04-26 10:22:59 +0200 |
commit | 07f9479a40cc778bc1462ada11f95b01360ae4ff (patch) | |
tree | 0676cf38df3844004bb3ebfd99dfa67a4a8998f5 /net/ceph/osd_client.c | |
parent | 9d5e6bdb3013acfb311ab407eeca0b6a6a3dedbf (diff) | |
parent | cd2e49e90f1cae7726c9a2c54488d881d7f1cd1c (diff) | |
download | lwn-07f9479a40cc778bc1462ada11f95b01360ae4ff.tar.gz lwn-07f9479a40cc778bc1462ada11f95b01360ae4ff.zip |
Merge branch 'master' into for-next
Fast-forwarded to current state of Linus' tree as there are patches to be
applied for files that didn't exist on the old branch.
Diffstat (limited to 'net/ceph/osd_client.c')
-rw-r--r-- | net/ceph/osd_client.c | 634 |
1 files changed, 496 insertions, 138 deletions
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 3e20a122ffa2..5a80f41c0cba 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -22,10 +22,15 @@ #define OSD_OPREPLY_FRONT_LEN 512 static const struct ceph_connection_operations osd_con_ops; -static int __kick_requests(struct ceph_osd_client *osdc, - struct ceph_osd *kickosd); -static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd); +static void send_queued(struct ceph_osd_client *osdc); +static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd); +static void __register_request(struct ceph_osd_client *osdc, + struct ceph_osd_request *req); +static void __unregister_linger_request(struct ceph_osd_client *osdc, + struct ceph_osd_request *req); +static int __send_request(struct ceph_osd_client *osdc, + struct ceph_osd_request *req); static int op_needs_trail(int op) { @@ -34,6 +39,7 @@ static int op_needs_trail(int op) case CEPH_OSD_OP_SETXATTR: case CEPH_OSD_OP_CMPXATTR: case CEPH_OSD_OP_CALL: + case CEPH_OSD_OP_NOTIFY: return 1; default: return 0; @@ -209,6 +215,8 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, init_completion(&req->r_completion); init_completion(&req->r_safe_completion); INIT_LIST_HEAD(&req->r_unsafe_item); + INIT_LIST_HEAD(&req->r_linger_item); + INIT_LIST_HEAD(&req->r_linger_osd); req->r_flags = flags; WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0); @@ -315,6 +323,24 @@ static void osd_req_encode_op(struct ceph_osd_request *req, break; case CEPH_OSD_OP_STARTSYNC: break; + case CEPH_OSD_OP_NOTIFY: + { + __le32 prot_ver = cpu_to_le32(src->watch.prot_ver); + __le32 timeout = cpu_to_le32(src->watch.timeout); + + BUG_ON(!req->r_trail); + + ceph_pagelist_append(req->r_trail, + &prot_ver, sizeof(prot_ver)); + ceph_pagelist_append(req->r_trail, + &timeout, sizeof(timeout)); + } + case CEPH_OSD_OP_NOTIFY_ACK: + case CEPH_OSD_OP_WATCH: + dst->watch.cookie = cpu_to_le64(src->watch.cookie); + dst->watch.ver = cpu_to_le64(src->watch.ver); + dst->watch.flag = src->watch.flag; + break; default: pr_err("unrecognized osd opcode %d\n", dst->op); WARN_ON(1); @@ -529,6 +555,51 @@ __lookup_request_ge(struct ceph_osd_client *osdc, return NULL; } +/* + * Resubmit requests pending on the given osd. + */ +static void __kick_osd_requests(struct ceph_osd_client *osdc, + struct ceph_osd *osd) +{ + struct ceph_osd_request *req, *nreq; + int err; + + dout("__kick_osd_requests osd%d\n", osd->o_osd); + err = __reset_osd(osdc, osd); + if (err == -EAGAIN) + return; + + list_for_each_entry(req, &osd->o_requests, r_osd_item) { + list_move(&req->r_req_lru_item, &osdc->req_unsent); + dout("requeued %p tid %llu osd%d\n", req, req->r_tid, + osd->o_osd); + if (!req->r_linger) + req->r_flags |= CEPH_OSD_FLAG_RETRY; + } + + list_for_each_entry_safe(req, nreq, &osd->o_linger_requests, + r_linger_osd) { + /* + * reregister request prior to unregistering linger so + * that r_osd is preserved. + */ + BUG_ON(!list_empty(&req->r_req_lru_item)); + __register_request(osdc, req); + list_add(&req->r_req_lru_item, &osdc->req_unsent); + list_add(&req->r_osd_item, &req->r_osd->o_requests); + __unregister_linger_request(osdc, req); + dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid, + osd->o_osd); + } +} + +static void kick_osd_requests(struct ceph_osd_client *osdc, + struct ceph_osd *kickosd) +{ + mutex_lock(&osdc->request_mutex); + __kick_osd_requests(osdc, kickosd); + mutex_unlock(&osdc->request_mutex); +} /* * If the osd connection drops, we need to resubmit all requests. @@ -543,7 +614,8 @@ static void osd_reset(struct ceph_connection *con) dout("osd_reset osd%d\n", osd->o_osd); osdc = osd->o_osdc; down_read(&osdc->map_sem); - kick_requests(osdc, osd); + kick_osd_requests(osdc, osd); + send_queued(osdc); up_read(&osdc->map_sem); } @@ -561,6 +633,7 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc) atomic_set(&osd->o_ref, 1); osd->o_osdc = osdc; INIT_LIST_HEAD(&osd->o_requests); + INIT_LIST_HEAD(&osd->o_linger_requests); INIT_LIST_HEAD(&osd->o_osd_lru); osd->o_incarnation = 1; @@ -650,7 +723,8 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) int ret = 0; dout("__reset_osd %p osd%d\n", osd, osd->o_osd); - if (list_empty(&osd->o_requests)) { + if (list_empty(&osd->o_requests) && + list_empty(&osd->o_linger_requests)) { __remove_osd(osdc, osd); } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd], &osd->o_con.peer_addr, @@ -723,15 +797,14 @@ static void __cancel_osd_timeout(struct ceph_osd_client *osdc) * Register request, assign tid. If this is the first request, set up * the timeout event. */ -static void register_request(struct ceph_osd_client *osdc, - struct ceph_osd_request *req) +static void __register_request(struct ceph_osd_client *osdc, + struct ceph_osd_request *req) { - mutex_lock(&osdc->request_mutex); req->r_tid = ++osdc->last_tid; req->r_request->hdr.tid = cpu_to_le64(req->r_tid); INIT_LIST_HEAD(&req->r_req_lru_item); - dout("register_request %p tid %lld\n", req, req->r_tid); + dout("__register_request %p tid %lld\n", req, req->r_tid); __insert_request(osdc, req); ceph_osdc_get_request(req); osdc->num_requests++; @@ -740,6 +813,13 @@ static void register_request(struct ceph_osd_client *osdc, dout(" first request, scheduling timeout\n"); __schedule_osd_timeout(osdc); } +} + +static void register_request(struct ceph_osd_client *osdc, + struct ceph_osd_request *req) +{ + mutex_lock(&osdc->request_mutex); + __register_request(osdc, req); mutex_unlock(&osdc->request_mutex); } @@ -758,9 +838,13 @@ static void __unregister_request(struct ceph_osd_client *osdc, ceph_con_revoke(&req->r_osd->o_con, req->r_request); list_del_init(&req->r_osd_item); - if (list_empty(&req->r_osd->o_requests)) + if (list_empty(&req->r_osd->o_requests) && + list_empty(&req->r_osd->o_linger_requests)) { + dout("moving osd to %p lru\n", req->r_osd); __move_osd_to_lru(osdc, req->r_osd); - req->r_osd = NULL; + } + if (list_empty(&req->r_linger_item)) + req->r_osd = NULL; } ceph_osdc_put_request(req); @@ -781,20 +865,73 @@ static void __cancel_request(struct ceph_osd_request *req) ceph_con_revoke(&req->r_osd->o_con, req->r_request); req->r_sent = 0; } - list_del_init(&req->r_req_lru_item); } +static void __register_linger_request(struct ceph_osd_client *osdc, + struct ceph_osd_request *req) +{ + dout("__register_linger_request %p\n", req); + list_add_tail(&req->r_linger_item, &osdc->req_linger); + list_add_tail(&req->r_linger_osd, &req->r_osd->o_linger_requests); +} + +static void __unregister_linger_request(struct ceph_osd_client *osdc, + struct ceph_osd_request *req) +{ + dout("__unregister_linger_request %p\n", req); + if (req->r_osd) { + list_del_init(&req->r_linger_item); + list_del_init(&req->r_linger_osd); + + if (list_empty(&req->r_osd->o_requests) && + list_empty(&req->r_osd->o_linger_requests)) { + dout("moving osd to %p lru\n", req->r_osd); + __move_osd_to_lru(osdc, req->r_osd); + } + if (list_empty(&req->r_osd_item)) + req->r_osd = NULL; + } +} + +void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc, + struct ceph_osd_request *req) +{ + mutex_lock(&osdc->request_mutex); + if (req->r_linger) { + __unregister_linger_request(osdc, req); + ceph_osdc_put_request(req); + } + mutex_unlock(&osdc->request_mutex); +} +EXPORT_SYMBOL(ceph_osdc_unregister_linger_request); + +void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc, + struct ceph_osd_request *req) +{ + if (!req->r_linger) { + dout("set_request_linger %p\n", req); + req->r_linger = 1; + /* + * caller is now responsible for calling + * unregister_linger_request + */ + ceph_osdc_get_request(req); + } +} +EXPORT_SYMBOL(ceph_osdc_set_request_linger); + /* * Pick an osd (the first 'up' osd in the pg), allocate the osd struct * (as needed), and set the request r_osd appropriately. If there is - * no up osd, set r_osd to NULL. + * no up osd, set r_osd to NULL. Move the request to the appropriate list + * (unsent, homeless) or leave on in-flight lru. * * Return 0 if unchanged, 1 if changed, or negative on error. * * Caller should hold map_sem for read and request_mutex. */ -static int __map_osds(struct ceph_osd_client *osdc, - struct ceph_osd_request *req) +static int __map_request(struct ceph_osd_client *osdc, + struct ceph_osd_request *req) { struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base; struct ceph_pg pgid; @@ -802,11 +939,13 @@ static int __map_osds(struct ceph_osd_client *osdc, int o = -1, num = 0; int err; - dout("map_osds %p tid %lld\n", req, req->r_tid); + dout("map_request %p tid %lld\n", req, req->r_tid); err = ceph_calc_object_layout(&reqhead->layout, req->r_oid, &req->r_file_layout, osdc->osdmap); - if (err) + if (err) { + list_move(&req->r_req_lru_item, &osdc->req_notarget); return err; + } pgid = reqhead->layout.ol_pgid; req->r_pgid = pgid; @@ -823,7 +962,7 @@ static int __map_osds(struct ceph_osd_client *osdc, (req->r_osd == NULL && o == -1)) return 0; /* no change */ - dout("map_osds tid %llu pgid %d.%x osd%d (was osd%d)\n", + dout("map_request tid %llu pgid %d.%x osd%d (was osd%d)\n", req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o, req->r_osd ? req->r_osd->o_osd : -1); @@ -841,10 +980,12 @@ static int __map_osds(struct ceph_osd_client *osdc, if (!req->r_osd && o >= 0) { err = -ENOMEM; req->r_osd = create_osd(osdc); - if (!req->r_osd) + if (!req->r_osd) { + list_move(&req->r_req_lru_item, &osdc->req_notarget); goto out; + } - dout("map_osds osd %p is osd%d\n", req->r_osd, o); + dout("map_request osd %p is osd%d\n", req->r_osd, o); req->r_osd->o_osd = o; req->r_osd->o_con.peer_name.num = cpu_to_le64(o); __insert_osd(osdc, req->r_osd); @@ -855,6 +996,9 @@ static int __map_osds(struct ceph_osd_client *osdc, if (req->r_osd) { __remove_osd_from_lru(req->r_osd); list_add(&req->r_osd_item, &req->r_osd->o_requests); + list_move(&req->r_req_lru_item, &osdc->req_unsent); + } else { + list_move(&req->r_req_lru_item, &osdc->req_notarget); } err = 1; /* osd or pg changed */ @@ -869,16 +1013,6 @@ static int __send_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req) { struct ceph_osd_request_head *reqhead; - int err; - - err = __map_osds(osdc, req); - if (err < 0) - return err; - if (req->r_osd == NULL) { - dout("send_request %p no up osds in pg\n", req); - ceph_monc_request_next_osdmap(&osdc->client->monc); - return 0; - } dout("send_request %p tid %llu to osd%d flags %d\n", req, req->r_tid, req->r_osd->o_osd, req->r_flags); @@ -898,6 +1032,21 @@ static int __send_request(struct ceph_osd_client *osdc, } /* + * Send any requests in the queue (req_unsent). + */ +static void send_queued(struct ceph_osd_client *osdc) +{ + struct ceph_osd_request *req, *tmp; + + dout("send_queued\n"); + mutex_lock(&osdc->request_mutex); + list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) { + __send_request(osdc, req); + } + mutex_unlock(&osdc->request_mutex); +} + +/* * Timeout callback, called every N seconds when 1 or more osd * requests has been active for more than N seconds. When this * happens, we ping all OSDs with requests who have timed out to @@ -916,30 +1065,13 @@ static void handle_timeout(struct work_struct *work) unsigned long keepalive = osdc->client->options->osd_keepalive_timeout * HZ; unsigned long last_stamp = 0; - struct rb_node *p; struct list_head slow_osds; - dout("timeout\n"); down_read(&osdc->map_sem); ceph_monc_request_next_osdmap(&osdc->client->monc); mutex_lock(&osdc->request_mutex); - for (p = rb_first(&osdc->requests); p; p = rb_next(p)) { - req = rb_entry(p, struct ceph_osd_request, r_node); - - if (req->r_resend) { - int err; - - dout("osdc resending prev failed %lld\n", req->r_tid); - err = __send_request(osdc, req); - if (err) - dout("osdc failed again on %lld\n", req->r_tid); - else - req->r_resend = false; - continue; - } - } /* * reset osds that appear to be _really_ unresponsive. this @@ -963,7 +1095,7 @@ static void handle_timeout(struct work_struct *work) BUG_ON(!osd); pr_warning(" tid %llu timed out on osd%d, will reset osd\n", req->r_tid, osd->o_osd); - __kick_requests(osdc, osd); + __kick_osd_requests(osdc, osd); } /* @@ -991,7 +1123,7 @@ static void handle_timeout(struct work_struct *work) __schedule_osd_timeout(osdc); mutex_unlock(&osdc->request_mutex); - + send_queued(osdc); up_read(&osdc->map_sem); } @@ -1035,7 +1167,6 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, numops * sizeof(struct ceph_osd_op)) goto bad; dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result); - /* lookup */ mutex_lock(&osdc->request_mutex); req = __lookup_request(osdc, tid); @@ -1079,6 +1210,9 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, dout("handle_reply tid %llu flags %d\n", tid, flags); + if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK)) + __register_linger_request(osdc, req); + /* either this is a read, or we got the safe response */ if (result < 0 || (flags & CEPH_OSD_FLAG_ONDISK) || @@ -1099,6 +1233,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, } done: + dout("req=%p req->r_linger=%d\n", req, req->r_linger); ceph_osdc_put_request(req); return; @@ -1109,108 +1244,83 @@ bad: ceph_msg_dump(msg); } - -static int __kick_requests(struct ceph_osd_client *osdc, - struct ceph_osd *kickosd) +static void reset_changed_osds(struct ceph_osd_client *osdc) { - struct ceph_osd_request *req; struct rb_node *p, *n; - int needmap = 0; - int err; - dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1); - if (kickosd) { - err = __reset_osd(osdc, kickosd); - if (err == -EAGAIN) - return 1; - } else { - for (p = rb_first(&osdc->osds); p; p = n) { - struct ceph_osd *osd = - rb_entry(p, struct ceph_osd, o_node); - - n = rb_next(p); - if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) || - memcmp(&osd->o_con.peer_addr, - ceph_osd_addr(osdc->osdmap, - osd->o_osd), - sizeof(struct ceph_entity_addr)) != 0) - __reset_osd(osdc, osd); - } + for (p = rb_first(&osdc->osds); p; p = n) { + struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node); + + n = rb_next(p); + if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) || + memcmp(&osd->o_con.peer_addr, + ceph_osd_addr(osdc->osdmap, + osd->o_osd), + sizeof(struct ceph_entity_addr)) != 0) + __reset_osd(osdc, osd); } +} + +/* + * Requeue requests whose mapping to an OSD has changed. If requests map to + * no osd, request a new map. + * + * Caller should hold map_sem for read and request_mutex. + */ +static void kick_requests(struct ceph_osd_client *osdc) +{ + struct ceph_osd_request *req, *nreq; + struct rb_node *p; + int needmap = 0; + int err; + dout("kick_requests\n"); + mutex_lock(&osdc->request_mutex); for (p = rb_first(&osdc->requests); p; p = rb_next(p)) { req = rb_entry(p, struct ceph_osd_request, r_node); - - if (req->r_resend) { - dout(" r_resend set on tid %llu\n", req->r_tid); - __cancel_request(req); - goto kick; - } - if (req->r_osd && kickosd == req->r_osd) { - __cancel_request(req); - goto kick; + err = __map_request(osdc, req); + if (err < 0) + continue; /* error */ + if (req->r_osd == NULL) { + dout("%p tid %llu maps to no osd\n", req, req->r_tid); + needmap++; /* request a newer map */ + } else if (err > 0) { + dout("%p tid %llu requeued on osd%d\n", req, req->r_tid, + req->r_osd ? req->r_osd->o_osd : -1); + if (!req->r_linger) + req->r_flags |= CEPH_OSD_FLAG_RETRY; } + } + + list_for_each_entry_safe(req, nreq, &osdc->req_linger, + r_linger_item) { + dout("linger req=%p req->r_osd=%p\n", req, req->r_osd); - err = __map_osds(osdc, req); + err = __map_request(osdc, req); if (err == 0) - continue; /* no change */ - if (err < 0) { - /* - * FIXME: really, we should set the request - * error and fail if this isn't a 'nofail' - * request, but that's a fair bit more - * complicated to do. So retry! - */ - dout(" setting r_resend on %llu\n", req->r_tid); - req->r_resend = true; - continue; - } + continue; /* no change and no osd was specified */ + if (err < 0) + continue; /* hrm! */ if (req->r_osd == NULL) { dout("tid %llu maps to no valid osd\n", req->r_tid); needmap++; /* request a newer map */ continue; } -kick: - dout("kicking %p tid %llu osd%d\n", req, req->r_tid, + dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid, req->r_osd ? req->r_osd->o_osd : -1); - req->r_flags |= CEPH_OSD_FLAG_RETRY; - err = __send_request(osdc, req); - if (err) { - dout(" setting r_resend on %llu\n", req->r_tid); - req->r_resend = true; - } + __unregister_linger_request(osdc, req); + __register_request(osdc, req); } - - return needmap; -} - -/* - * Resubmit osd requests whose osd or osd address has changed. Request - * a new osd map if osds are down, or we are otherwise unable to determine - * how to direct a request. - * - * Close connections to down osds. - * - * If @who is specified, resubmit requests for that specific osd. - * - * Caller should hold map_sem for read and request_mutex. - */ -static void kick_requests(struct ceph_osd_client *osdc, - struct ceph_osd *kickosd) -{ - int needmap; - - mutex_lock(&osdc->request_mutex); - needmap = __kick_requests(osdc, kickosd); mutex_unlock(&osdc->request_mutex); if (needmap) { dout("%d requests for down osds, need new map\n", needmap); ceph_monc_request_next_osdmap(&osdc->client->monc); } - } + + /* * Process updated osd map. * @@ -1263,6 +1373,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) ceph_osdmap_destroy(osdc->osdmap); osdc->osdmap = newmap; } + kick_requests(osdc); + reset_changed_osds(osdc); } else { dout("ignoring incremental map %u len %d\n", epoch, maplen); @@ -1300,6 +1412,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) osdc->osdmap = newmap; if (oldmap) ceph_osdmap_destroy(oldmap); + kick_requests(osdc); } p += maplen; nr_maps--; @@ -1308,8 +1421,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) done: downgrade_write(&osdc->map_sem); ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch); - if (newmap) - kick_requests(osdc, NULL); + send_queued(osdc); up_read(&osdc->map_sem); wake_up_all(&osdc->client->auth_wq); return; @@ -1322,6 +1434,223 @@ bad: } /* + * watch/notify callback event infrastructure + * + * These callbacks are used both for watch and notify operations. + */ +static void __release_event(struct kref *kref) +{ + struct ceph_osd_event *event = + container_of(kref, struct ceph_osd_event, kref); + + dout("__release_event %p\n", event); + kfree(event); +} + +static void get_event(struct ceph_osd_event *event) +{ + kref_get(&event->kref); +} + +void ceph_osdc_put_event(struct ceph_osd_event *event) +{ + kref_put(&event->kref, __release_event); +} +EXPORT_SYMBOL(ceph_osdc_put_event); + +static void __insert_event(struct ceph_osd_client *osdc, + struct ceph_osd_event *new) +{ + struct rb_node **p = &osdc->event_tree.rb_node; + struct rb_node *parent = NULL; + struct ceph_osd_event *event = NULL; + + while (*p) { + parent = *p; + event = rb_entry(parent, struct ceph_osd_event, node); + if (new->cookie < event->cookie) + p = &(*p)->rb_left; + else if (new->cookie > event->cookie) + p = &(*p)->rb_right; + else + BUG(); + } + + rb_link_node(&new->node, parent, p); + rb_insert_color(&new->node, &osdc->event_tree); +} + +static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc, + u64 cookie) +{ + struct rb_node **p = &osdc->event_tree.rb_node; + struct rb_node *parent = NULL; + struct ceph_osd_event *event = NULL; + + while (*p) { + parent = *p; + event = rb_entry(parent, struct ceph_osd_event, node); + if (cookie < event->cookie) + p = &(*p)->rb_left; + else if (cookie > event->cookie) + p = &(*p)->rb_right; + else + return event; + } + return NULL; +} + +static void __remove_event(struct ceph_osd_event *event) +{ + struct ceph_osd_client *osdc = event->osdc; + + if (!RB_EMPTY_NODE(&event->node)) { + dout("__remove_event removed %p\n", event); + rb_erase(&event->node, &osdc->event_tree); + ceph_osdc_put_event(event); + } else { + dout("__remove_event didn't remove %p\n", event); + } +} + +int ceph_osdc_create_event(struct ceph_osd_client *osdc, + void (*event_cb)(u64, u64, u8, void *), + int one_shot, void *data, + struct ceph_osd_event **pevent) +{ + struct ceph_osd_event *event; + + event = kmalloc(sizeof(*event), GFP_NOIO); + if (!event) + return -ENOMEM; + + dout("create_event %p\n", event); + event->cb = event_cb; + event->one_shot = one_shot; + event->data = data; + event->osdc = osdc; + INIT_LIST_HEAD(&event->osd_node); + kref_init(&event->kref); /* one ref for us */ + kref_get(&event->kref); /* one ref for the caller */ + init_completion(&event->completion); + + spin_lock(&osdc->event_lock); + event->cookie = ++osdc->event_count; + __insert_event(osdc, event); + spin_unlock(&osdc->event_lock); + + *pevent = event; + return 0; +} +EXPORT_SYMBOL(ceph_osdc_create_event); + +void ceph_osdc_cancel_event(struct ceph_osd_event *event) +{ + struct ceph_osd_client *osdc = event->osdc; + + dout("cancel_event %p\n", event); + spin_lock(&osdc->event_lock); + __remove_event(event); + spin_unlock(&osdc->event_lock); + ceph_osdc_put_event(event); /* caller's */ +} +EXPORT_SYMBOL(ceph_osdc_cancel_event); + + +static void do_event_work(struct work_struct *work) +{ + struct ceph_osd_event_work *event_work = + container_of(work, struct ceph_osd_event_work, work); + struct ceph_osd_event *event = event_work->event; + u64 ver = event_work->ver; + u64 notify_id = event_work->notify_id; + u8 opcode = event_work->opcode; + + dout("do_event_work completing %p\n", event); + event->cb(ver, notify_id, opcode, event->data); + complete(&event->completion); + dout("do_event_work completed %p\n", event); + ceph_osdc_put_event(event); + kfree(event_work); +} + + +/* + * Process osd watch notifications + */ +void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg) +{ + void *p, *end; + u8 proto_ver; + u64 cookie, ver, notify_id; + u8 opcode; + struct ceph_osd_event *event; + struct ceph_osd_event_work *event_work; + + p = msg->front.iov_base; + end = p + msg->front.iov_len; + + ceph_decode_8_safe(&p, end, proto_ver, bad); + ceph_decode_8_safe(&p, end, opcode, bad); + ceph_decode_64_safe(&p, end, cookie, bad); + ceph_decode_64_safe(&p, end, ver, bad); + ceph_decode_64_safe(&p, end, notify_id, bad); + + spin_lock(&osdc->event_lock); + event = __find_event(osdc, cookie); + if (event) { + get_event(event); + if (event->one_shot) + __remove_event(event); + } + spin_unlock(&osdc->event_lock); + dout("handle_watch_notify cookie %lld ver %lld event %p\n", + cookie, ver, event); + if (event) { + event_work = kmalloc(sizeof(*event_work), GFP_NOIO); + if (!event_work) { + dout("ERROR: could not allocate event_work\n"); + goto done_err; + } + INIT_WORK(&event_work->work, do_event_work); + event_work->event = event; + event_work->ver = ver; + event_work->notify_id = notify_id; + event_work->opcode = opcode; + if (!queue_work(osdc->notify_wq, &event_work->work)) { + dout("WARNING: failed to queue notify event work\n"); + goto done_err; + } + } + + return; + +done_err: + complete(&event->completion); + ceph_osdc_put_event(event); + return; + +bad: + pr_err("osdc handle_watch_notify corrupt msg\n"); + return; +} + +int ceph_osdc_wait_event(struct ceph_osd_event *event, unsigned long timeout) +{ + int err; + + dout("wait_event %p\n", event); + err = wait_for_completion_interruptible_timeout(&event->completion, + timeout * HZ); + ceph_osdc_put_event(event); + if (err > 0) + err = 0; + dout("wait_event %p returns %d\n", event, err); + return err; +} +EXPORT_SYMBOL(ceph_osdc_wait_event); + +/* * Register request, send initial attempt. */ int ceph_osdc_start_request(struct ceph_osd_client *osdc, @@ -1347,18 +1676,27 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc, * the request still han't been touched yet. */ if (req->r_sent == 0) { - rc = __send_request(osdc, req); - if (rc) { - if (nofail) { - dout("osdc_start_request failed send, " - " marking %lld\n", req->r_tid); - req->r_resend = true; - rc = 0; - } else { - __unregister_request(osdc, req); + rc = __map_request(osdc, req); + if (rc < 0) + goto out_unlock; + if (req->r_osd == NULL) { + dout("send_request %p no up osds in pg\n", req); + ceph_monc_request_next_osdmap(&osdc->client->monc); + } else { + rc = __send_request(osdc, req); + if (rc) { + if (nofail) { + dout("osdc_start_request failed send, " + " will retry %lld\n", req->r_tid); + rc = 0; + } else { + __unregister_request(osdc, req); + } } } } + +out_unlock: mutex_unlock(&osdc->request_mutex); up_read(&osdc->map_sem); return rc; @@ -1441,9 +1779,15 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) INIT_LIST_HEAD(&osdc->osd_lru); osdc->requests = RB_ROOT; INIT_LIST_HEAD(&osdc->req_lru); + INIT_LIST_HEAD(&osdc->req_unsent); + INIT_LIST_HEAD(&osdc->req_notarget); + INIT_LIST_HEAD(&osdc->req_linger); osdc->num_requests = 0; INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout); + spin_lock_init(&osdc->event_lock); + osdc->event_tree = RB_ROOT; + osdc->event_count = 0; schedule_delayed_work(&osdc->osds_timeout_work, round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ)); @@ -1463,6 +1807,13 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) "osd_op_reply"); if (err < 0) goto out_msgpool; + + osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify"); + if (IS_ERR(osdc->notify_wq)) { + err = PTR_ERR(osdc->notify_wq); + osdc->notify_wq = NULL; + goto out_msgpool; + } return 0; out_msgpool: @@ -1476,6 +1827,8 @@ EXPORT_SYMBOL(ceph_osdc_init); void ceph_osdc_stop(struct ceph_osd_client *osdc) { + flush_workqueue(osdc->notify_wq); + destroy_workqueue(osdc->notify_wq); cancel_delayed_work_sync(&osdc->timeout_work); cancel_delayed_work_sync(&osdc->osds_timeout_work); if (osdc->osdmap) { @@ -1483,6 +1836,7 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc) osdc->osdmap = NULL; } remove_old_osds(osdc, 1); + WARN_ON(!RB_EMPTY_ROOT(&osdc->osds)); mempool_destroy(osdc->req_mempool); ceph_msgpool_destroy(&osdc->msgpool_op); ceph_msgpool_destroy(&osdc->msgpool_op_reply); @@ -1591,6 +1945,9 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) case CEPH_MSG_OSD_OPREPLY: handle_reply(osdc, msg, con); break; + case CEPH_MSG_WATCH_NOTIFY: + handle_watch_notify(osdc, msg); + break; default: pr_err("received unknown message type %d %s\n", type, @@ -1684,6 +2041,7 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con, switch (type) { case CEPH_MSG_OSD_MAP: + case CEPH_MSG_WATCH_NOTIFY: return ceph_msg_new(type, front, GFP_NOFS); case CEPH_MSG_OSD_OPREPLY: return get_reply(con, hdr, skip); |