Ceph 学习——OSD读写流程与源码分析(一)
消息從客戶端發(fā)送而來,之前幾節(jié)介紹了 客戶端下 對象存儲、塊存儲庫的實現(xiàn)以及他們在客戶端下API請求的發(fā)送過程(Ceph學習——Librados與Osdc實現(xiàn)源碼解析 、 Ceph學習——客戶端讀寫操作分析 、 Ceph學習——Librbd塊存儲庫與RBD讀寫流程源碼分析)。當請求被封裝后,通過消息發(fā)送模塊(Ceph學習——Ceph網(wǎng)絡通信機制與源碼分析)將請求及其相關信息發(fā)送到服務端實現(xiàn)真正的對數(shù)據(jù)的操作。服務端的操作模塊便是由OSD、OS模塊完成的,這節(jié)先介紹OSD模塊。
- OSD 模塊主要的類
- OSD類
- PrimaryLogPG類
- PGBackend類
- OSD讀寫函數(shù)調用流程
直接上圖:
同樣當前最新的版本,和之前的版本有所不同,有一些模塊簡化了,類的名字也改了。先介紹圖中涉及的相關的類,然后在對類中具體函數(shù)主要調用流程進行分析。
OSD 模塊主要的類
盜圖:其中ReplicatedPG 在最新的版本中去掉了,更改為PrimaryLogPG類
OSD類
OSD和OSDService是核心類,他們直接在頂層負責一個OSD節(jié)點的工作,從客戶端的得到的消息,就是先到達OSD類中,通過OSD類的處理,在調用PrimaryLogPG(之前為ReplicatedPG 類)類進行處理。該類中,在讀寫流程中的主要工作是消息(Message)封裝為 RequestOp,檢查epoch (版本)是否需要更新,并獲取PG句柄,并做PG相關的檢查,最后將請求加入隊列。
PrimaryLogPG類
該類繼承自PG類,PGBackend::Listener(該類是一個抽象類)類PG類處理相關狀態(tài)的維護,以及實現(xiàn)PG層面的功能,核心功能是用boost庫的statechart狀態(tài)機來做PG狀態(tài)轉換。它實現(xiàn)了PG內的數(shù)據(jù)讀寫等功能。
PGBackend類
該類主要功能是將請求數(shù)據(jù)通過事務的形式同步到一個PG的其它從OSD上(注意:主OSD的操作PrimaryLogPG來完成)。
他有兩個子類,分別是 ReplicatedBackend和ECBackend,對應著PG的的兩種類型的實現(xiàn)。
OSD讀寫函數(shù)調用流程
1)OSD::ms_fast_dispatch 函數(shù)是接收消息Message的入口函數(shù),他被網(wǎng)絡模塊的接收線程調用。主要工作是 檢查service服務 、把Message封裝為OpRequest類型、獲取session、獲取最新的OSdMap,最后dispatch_session_waiting,進入下一步。
void OSD::ms_fast_dispatch(Message *m) {FUNCTRACE();if (service.is_stopping()) {//檢查service,如果停止了直接返回m->put();return;}OpRequestRef op = op_tracker.create_request<OpRequest, Message*>(m);//把Message封裝為OpRequest類型 ... ...if (m->get_connection()->has_features(CEPH_FEATUREMASK_RESEND_ON_SPLIT) ||m->get_type() != CEPH_MSG_OSD_OP) {// queue it directly直接調用enqueue_op處理enqueue_op(static_cast<MOSDFastDispatchOp*>(m)->get_spg(),op,static_cast<MOSDFastDispatchOp*>(m)->get_map_epoch());} else {Session *session = static_cast<Session*>(m->get_connection()->get_priv());//獲取 session 其中包含了一個Connection的相關信息if (session) {{Mutex::Locker l(session->session_dispatch_lock);op->get();session->waiting_on_map.push_back(*op);//將請求加如waiting_on_map的列表里OSDMapRef nextmap = service.get_nextmap_reserved();//獲取最新的OSDMAPdispatch_session_waiting(session, nextmap);//該函數(shù)中 循環(huán)處理請求service.release_map(nextmap);}session->put();}}OID_EVENT_TRACE_WITH_MSG(m, "MS_FAST_DISPATCH_END", false); }2)OSD::dispatch_session_waiting 主要工作是循環(huán)處理隊列waiting_on_map中的元素,對比OSDmap,以及獲取他們的pgid,最后調用enqueue_op處理。
void OSD::dispatch_session_waiting(Session *session, OSDMapRef osdmap) {assert(session->session_dispatch_lock.is_locked());auto i = session->waiting_on_map.begin();while (i != session->waiting_on_map.end()) {//循環(huán)處理waiting_on_map中的元素OpRequestRef op = &(*i);assert(ms_can_fast_dispatch(op->get_req()));const MOSDFastDispatchOp *m = static_cast<const MOSDFastDispatchOp*>(op->get_req());if (m->get_min_epoch() > osdmap->get_epoch()) {//osdmap版本不對應break;}session->waiting_on_map.erase(i++);op->put();spg_t pgid;if (m->get_type() == CEPH_MSG_OSD_OP) {pg_t actual_pgid = osdmap->raw_pg_to_pg(static_cast<const MOSDOp*>(m)->get_pg());//osdmap->get_primary_shard(actual_pgid, &pgid)獲取 pgid 該PG的主OSDif (!osdmap->get_primary_shard(actual_pgid, &pgid)) {continue;}} else {pgid = m->get_spg();}enqueue_op(pgid, op, m->get_map_epoch());//獲取成功則調用enqueue_op處理}if (session->waiting_on_map.empty()) {clear_session_waiting_on_map(session);} else {register_session_waiting_on_map(session);} }3)OSD::enqueue_op 的主要工作是將求情加入到op_shardedwq隊列中
void OSD::enqueue_op(spg_t pg, OpRequestRef& op, epoch_t epoch) { ...op->osd_trace.event("enqueue op");op->osd_trace.keyval("priority", op->get_req()->get_priority());op->osd_trace.keyval("cost", op->get_req()->get_cost());op->mark_queued_for_pg();logger->tinc(l_osd_op_before_queue_op_lat, latency);//加入op_shardedwq隊列中op_shardedwq.queue(OpQueueItem(unique_ptr<OpQueueItem::OpQueueable>(new PGOpItem(pg, op)),op->get_req()->get_cost(),op->get_req()->get_priority(),op->get_req()->get_recv_stamp(),op->get_req()->get_source().num(),epoch)); }4)OSD::dequeue_op 調用函數(shù)進行osdmap的更新,調用do_request進入PG處理流程
void OSD::dequeue_op(PGRef pg, OpRequestRef op,ThreadPool::TPHandle &handle) { ... ...logger->tinc(l_osd_op_before_dequeue_op_lat, latency);Session *session = static_cast<Session *>(op->get_req()->get_connection()->get_priv());if (session) {//調用該函數(shù)進行 osdmap的更新maybe_share_map(session, op, pg->get_osdmap());session->put();}//正在是刪除、直接返回if (pg->is_deleting())return;op->mark_reached_pg();op->osd_trace.event("dequeue_op");//調用pg的do_request處理pg->do_request(op, handle);// finishdout(10) << "dequeue_op " << op << " finish" << dendl;OID_EVENT_TRACE_WITH_MSG(op->get_req(), "DEQUEUE_OP_END", false); }5)PrimaryLogPG::do_request該函數(shù) 主要你檢查PG的狀態(tài),以及根據(jù)消息類型進行不同處理
void PrimaryLogPG::do_request(OpRequestRef& op,ThreadPool::TPHandle &handle) { ...// make sure we have a new enough map//檢查 osdmapauto p = waiting_for_map.find(op->get_source()); ...//是否可以丟棄if (can_discard_request(op)) {return;} ... ...//PG還沒有peeredif (!is_peered()) {// Delay unless PGBackend says it's ok//檢查pgbackend是否可以處理這個請求if (pgbackend->can_handle_while_inactive(op)) {bool handled = pgbackend->handle_message(op);//可以處理,則調用該函數(shù)處理assert(handled);return;} else {waiting_for_peered.push_back(op);//不可以則加入waiting_for_peered隊列op->mark_delayed("waiting for peered");return;}}......//PG處于Peered 并且flushes_in_progress為0的狀態(tài)下assert(is_peered() && flushes_in_progress == 0);if (pgbackend->handle_message(op))return;// 根據(jù)不同的消息請求類型,進行相應的處理switch (op->get_req()->get_type()) {case CEPH_MSG_OSD_OP:case CEPH_MSG_OSD_BACKOFF:if (!is_active()) {//該PG狀態(tài) 為非active狀態(tài)dout(20) << " peered, not active, waiting for active on " << op << dendl;waiting_for_active.push_back(op);//加入隊列op->mark_delayed("waiting for active");return;}switch (op->get_req()->get_type()) {case CEPH_MSG_OSD_OP:// verify client features 如果是cache pool ,操作沒有帶CEPH_FEATURE_OSD_CACHEPOOL的feature標志,返回錯誤信息if ((pool.info.has_tiers() || pool.info.is_tier()) &&!op->has_feature(CEPH_FEATURE_OSD_CACHEPOOL)) {osd->reply_op_error(op, -EOPNOTSUPP);return;}do_op(op);//調用do_op 處理break;case CEPH_MSG_OSD_BACKOFF:// object-level backoff acks handled in osdop contexthandle_backoff(op);break;}break;... //各種消息類型 ...default:assert(0 == "bad message type in do_request");} }6)PrimaryLogPG::do_op 函數(shù)很長很負責,這里著看相關調用流程好了,主要功能是解析出操作來,然后對操作的個中參數(shù)進行檢查,檢查相關對象的狀態(tài),以及該對象的head、snap、clone對象的狀態(tài)等,并調用函數(shù)獲取對象的上下文、操作的上下文(ObjectContext、OPContext)
void PrimaryLogPG::do_op(OpRequestRef& op) {FUNCTRACE();// NOTE: take a non-const pointer here; we must be careful not to// change anything that will break other reads on m (operator<<).MOSDOp *m = static_cast<MOSDOp*>(op->get_nonconst_req());assert(m->get_type() == CEPH_MSG_OSD_OP);//解析字段,從bufferlist解析數(shù)據(jù)if (m->finish_decode()) {op->reset_desc(); // for TrackedOpm->clear_payload();} ... ...if ((m->get_flags() & (CEPH_OSD_FLAG_BALANCE_READS |CEPH_OSD_FLAG_LOCALIZE_READS)) &&op->may_read() &&!(op->may_write() || op->may_cache())) {// balanced reads; any replica will do 平衡讀,則主從OSD都可以讀取if (!(is_primary() || is_replica())) {osd->handle_misdirected_op(this, op);return;}} else {// normal case; must be primary 否則只能讀取主OSDif (!is_primary()) {osd->handle_misdirected_op(this, op);return;}}if (!op_has_sufficient_caps(op)) {osd->reply_op_error(op, -EPERM);return;}//op中包含includes_pg_op該操作,則調用 do_pg_op(op)處理if (op->includes_pg_op()) {return do_pg_op(op);}// object name too long?//檢查名字是否太長if (m->get_oid().name.size() > cct->_conf->osd_max_object_name_len) {dout(4) << "do_op name is longer than "<< cct->_conf->osd_max_object_name_len<< " bytes" << dendl;osd->reply_op_error(op, -ENAMETOOLONG);return;} ... ...// blacklisted?//發(fā)送請求的客戶端是黑名單中的一個if (get_osdmap()->is_blacklisted(m->get_source_addr())) {dout(10) << "do_op " << m->get_source_addr() << " is blacklisted" << dendl;osd->reply_op_error(op, -EBLACKLISTED);return;} ... ...// missing object?//head對象是否處于缺失狀態(tài)if (is_unreadable_object(head)) {if (!is_primary()) {osd->reply_op_error(op, -EAGAIN);return;}if (can_backoff &&(g_conf->osd_backoff_on_degraded ||(g_conf->osd_backoff_on_unfound && missing_loc.is_unfound(head)))) {add_backoff(session, head, head);maybe_kick_recovery(head);} else {wait_for_unreadable_object(head, op);//加入隊列,等待恢復完成}return;}// degraded object?//順序寫 且head對象正在恢復狀態(tài)if (write_ordered && is_degraded_or_backfilling_object(head)) {if (can_backoff && g_conf->osd_backoff_on_degraded) {add_backoff(session, head, head);maybe_kick_recovery(head);} else {wait_for_degraded_object(head, op);//加入隊列,等待}return;}//順序寫,切處于數(shù)據(jù)一致性檢查 scrub時期if (write_ordered && scrubber.is_chunky_scrub_active() &&scrubber.write_blocked_by_scrub(head)) {dout(20) << __func__ << ": waiting for scrub" << dendl;waiting_for_scrub.push_back(op);op->mark_delayed("waiting for scrub");return;}......//若果是順序寫,并且該對象在該隊列中if (write_ordered && objects_blocked_on_cache_full.count(head)) {block_write_on_full_cache(head, op);return;}......// io blocked on obc?//檢查對象是否被blockedif (!m->has_flag(CEPH_OSD_FLAG_FLUSH) &&maybe_await_blocked_head(oid, op)) {return;}//調用find_object_context獲取object_contextint r = find_object_context(oid, &obc, can_create,m->has_flag(CEPH_OSD_FLAG_MAP_SNAP_CLONE),&missing_oid);// hit.set 不為空 則設置bool in_hit_set = false;if (hit_set) {if (obc.get()) {if (obc->obs.oi.soid != hobject_t() && hit_set->contains(obc->obs.oi.soid))in_hit_set = true;} else {if (missing_oid != hobject_t() && hit_set->contains(missing_oid))in_hit_set = true;}if (!op->hitset_inserted) {hit_set->insert(oid);op->hitset_inserted = true;if (hit_set->is_full() ||hit_set_start_stamp + pool.info.hit_set_period <= m->get_recv_stamp()) {hit_set_persist();}}}//agent_state 不為空if (agent_state) {if (agent_choose_mode(false, op))// 調用該函數(shù)進行選擇agent的狀態(tài)return;} ... ... ...op->mark_started();execute_ctx(ctx);//調用該函數(shù),執(zhí)行相關操作utime_t prepare_latency = ceph_clock_now();prepare_latency -= op->get_dequeued_time();osd->logger->tinc(l_osd_op_prepare_lat, prepare_latency);if (op->may_read() && op->may_write()) {osd->logger->tinc(l_osd_op_rw_prepare_lat, prepare_latency);} else if (op->may_read()) {osd->logger->tinc(l_osd_op_r_prepare_lat, prepare_latency);} else if (op->may_write() || op->may_cache()) {osd->logger->tinc(l_osd_op_w_prepare_lat, prepare_latency);}// force recovery of the oldest missing object if too many logsmaybe_force_recovery(); }7) PrimaryLogPG::find_object_context 函數(shù)主要根據(jù) 不同發(fā)情況 通過調用 PrimaryLogPG::get_object_context函數(shù)獲取 對象上下文。
/** If we return an error, and set *pmissing, then promoting that* object may help.** If we return -EAGAIN, we will always set *pmissing to the missing* object to wait for.** If we return an error but do not set *pmissing, then we know the* object does not exist.*/ //獲取一個對象的ObjectContext int PrimaryLogPG::find_object_context(const hobject_t& oid,ObjectContextRef *pobc,bool can_create,bool map_snapid_to_clone,hobject_t *pmissing) {FUNCTRACE();assert(oid.pool == static_cast<int64_t>(info.pgid.pool()));// want the head?if (oid.snap == CEPH_NOSNAP) {ObjectContextRef obc = get_object_context(oid, can_create);//如果是想要原始對象(head)直接調用if (!obc) {if (pmissing)*pmissing = oid;return -ENOENT;}dout(10) << "find_object_context " << oid<< " @" << oid.snap<< " oi=" << obc->obs.oi<< dendl;*pobc = obc;return 0;}hobject_t head = oid.get_head();// we want a snap//不是map_snapid_to_clone對象且,該snap快照已經(jīng)被刪除,直接返回-ENOENTif (!map_snapid_to_clone && pool.info.is_removed_snap(oid.snap)) {dout(10) << __func__ << " snap " << oid.snap << " is removed" << dendl;return -ENOENT;}SnapSetContext *ssc = get_snapset_context(oid, can_create);//調用get_snapset_context對象來獲取SnapSetContext對象。if (!ssc || !(ssc->exists || can_create)) {dout(20) << __func__ << " " << oid << " no snapset" << dendl;if (pmissing)*pmissing = head; // start by getting the headif (ssc)put_snapset_context(ssc);return -ENOENT;} //如果是map_snapid_to_cloneif (map_snapid_to_clone) {dout(10) << "find_object_context " << oid << " @" << oid.snap<< " snapset " << ssc->snapset<< " map_snapid_to_clone=true" << dendl;if (oid.snap > ssc->snapset.seq) {//大于說明 該快照最新,osd還沒完成相關信息的更新,直接返回head對象的上下文// already must be readableObjectContextRef obc = get_object_context(head, false);//直接返回head對象的上下文dout(10) << "find_object_context " << oid << " @" << oid.snap<< " snapset " << ssc->snapset<< " maps to head" << dendl;*pobc = obc;put_snapset_context(ssc);return (obc && obc->obs.exists) ? 0 : -ENOENT;} else {vector<snapid_t>::const_iterator citer = std::find(//否則檢查snapset的克隆列表ssc->snapset.clones.begin(),ssc->snapset.clones.end(),oid.snap);if (citer == ssc->snapset.clones.end()) {dout(10) << "find_object_context " << oid << " @" << oid.snap<< " snapset " << ssc->snapset<< " maps to nothing" << dendl;put_snapset_context(ssc);return -ENOENT;}......//找到,但處于缺失狀態(tài)if (pg_log.get_missing().is_missing(oid)) {dout(10) << "find_object_context " << oid << " @" << oid.snap<< " snapset " << ssc->snapset<< " " << oid << " is missing" << dendl;if (pmissing)*pmissing = oid;put_snapset_context(ssc);return -EAGAIN;}......//各種情況下的find_object_context }8)get_object_context 實際去獲取上下文,先在緩存里面找,如果沒有在調用函數(shù)去獲取。另外在調用get_snapset_context獲取SnapSetContext。
ObjectContextRef PrimaryLogPG::get_object_context(const hobject_t& soid,bool can_create,const map<string, bufferlist> *attrs) { ... //先在緩存里面找ObjectContextRef obc = object_contexts.lookup(soid);osd->logger->inc(l_osd_object_ctx_cache_total);if (obc) {osd->logger->inc(l_osd_object_ctx_cache_hit);dout(10) << __func__ << ": found obc in cache: " << obc<< dendl;} else {dout(10) << __func__ << ": obc NOT found in cache: " << soid << dendl;// check diskbufferlist bv;if (attrs) {assert(attrs->count(OI_ATTR));bv = attrs->find(OI_ATTR)->second;} else {int r = pgbackend->objects_get_attr(soid, OI_ATTR, &bv);//緩存沒有就調用函數(shù)去獲取if (r < 0) {if (!can_create) {dout(10) << __func__ << ": no obc for soid "<< soid << " and !can_create"<< dendl;return ObjectContextRef(); // -ENOENT!}dout(10) << __func__ << ": no obc for soid "<< soid << " but can_create"<< dendl;// new object.object_info_t oi(soid);//調用get_snapset_context獲取 SnapSetContextSnapSetContext *ssc = get_snapset_context(soid, true, 0, false);assert(ssc);obc = create_object_context(oi, ssc);dout(10) << __func__ << ": " << obc << " " << soid<< " " << obc->rwstate<< " oi: " << obc->obs.oi<< " ssc: " << obc->ssc<< " snapset: " << obc->ssc->snapset << dendl;return obc;}}......} }9)
SnapSetContext *PrimaryLogPG::get_snapset_context(const hobject_t& oid,bool can_create,const map<string, bufferlist> *attrs,bool oid_existed) {Mutex::Locker l(snapset_contexts_lock);SnapSetContext *ssc;map<hobject_t, SnapSetContext*>::iterator p = snapset_contexts.find(oid.get_snapdir());if (p != snapset_contexts.end()) {if (can_create || p->second->exists) {ssc = p->second;} else {return NULL;}} else {bufferlist bv;if (!attrs) {int r = -ENOENT;if (!(oid.is_head() && !oid_existed)) {r = pgbackend->objects_get_attr(oid.get_head(), SS_ATTR, &bv);}if (r < 0 && !can_create)return NULL;} else {assert(attrs->count(SS_ATTR));bv = attrs->find(SS_ATTR)->second;}ssc = new SnapSetContext(oid.get_snapdir());_register_snapset_context(ssc);if (bv.length()) {bufferlist::iterator bvp = bv.begin();try {ssc->snapset.decode(bvp);} catch (buffer::error& e) {dout(0) << __func__ << " Can't decode snapset: " << e << dendl;return NULL;}ssc->exists = true;} else {ssc->exists = false;}}assert(ssc);ssc->ref++;return ssc; }10)該函數(shù)是由do_op調用的, 主要工作是檢查對象狀態(tài)和上下文相關信息的獲取,并調用函數(shù)prepare _transactions 把操作封裝到事務中。如果是讀取操作,則調用相關讀取函數(shù)(同步、異步)。如果是寫操作,則 調用calc_trim_to計算是否將舊的PG log日志進行trim操作、 issue_repop(repop, ctx)向各個副本發(fā)送同步操作請求、eval_repop(repop)檢查發(fā)向各個副本的同步操作請求是否已經(jīng)reply成功
void PrimaryLogPG::execute_ctx(OpContext *ctx) {FUNCTRACE();dout(10) << __func__ << " " << ctx << dendl;ctx->reset_obs(ctx->obc);ctx->update_log_only = false; // reset in case finish_copyfrom() is re-running execute_ctxOpRequestRef op = ctx->op;const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());ObjectContextRef obc = ctx->obc;const hobject_t& soid = obc->obs.oi.soid;// this method must be idempotent since we may call it several times// before we finally apply the resulting transaction.ctx->op_t.reset(new PGTransaction);if (op->may_write() || op->may_cache()) {// snapif (!(m->has_flag(CEPH_OSD_FLAG_ENFORCE_SNAPC)) &&//如果是對整個pool的快照操作pool.info.is_pool_snaps_mode()) {// use pool's snapcctx->snapc = pool.snapc;//設置為該值 pool的信息} else {//如果是用戶特定的快照 如RBD// client specified snapcctx->snapc.seq = m->get_snap_seq();//設置為信息帶的相關信息ctx->snapc.snaps = m->get_snaps();filter_snapc(ctx->snapc.snaps);}if ((m->has_flag(CEPH_OSD_FLAG_ORDERSNAP)) &&ctx->snapc.seq < obc->ssc->snapset.seq) {//客戶端的 snap序號小于服務端的 返回錯誤dout(10) << " ORDERSNAP flag set and snapc seq " << ctx->snapc.seq<< " < snapset seq " << obc->ssc->snapset.seq<< " on " << obc->obs.oi.soid << dendl;reply_ctx(ctx, -EOLDSNAPC);return;} ...if (!ctx->user_at_version)ctx->user_at_version = obc->obs.oi.user_version;dout(30) << __func__ << " user_at_version " << ctx->user_at_version << dendl; //若是讀操作,給objectContext加上ondisk_read_lock鎖if (op->may_read()) {dout(10) << " taking ondisk_read_lock" << dendl;obc->ondisk_read_lock();}{ #ifdef WITH_LTTNGosd_reqid_t reqid = ctx->op->get_reqid(); #endiftracepoint(osd, prepare_tx_enter, reqid.name._type,reqid.name._num, reqid.tid, reqid.inc);}int result = prepare_transaction(ctx);//將相關的操作封裝到 ctx->op_t中 封裝成事務{ #ifdef WITH_LTTNGosd_reqid_t reqid = ctx->op->get_reqid(); #endiftracepoint(osd, prepare_tx_exit, reqid.name._type,reqid.name._num, reqid.tid, reqid.inc);}if (op->may_read()) {dout(10) << " dropping ondisk_read_lock" << dendl;obc->ondisk_read_unlock();}bool pending_async_reads = !ctx->pending_async_reobc->ondisk_read_lock();ads.empty();if (result == -EINPROGRESS || pending_async_reads) {// come back later.if (pending_async_reads) {assert(pool.info.is_erasure());in_progress_async_reads.push_back(make_pair(op, ctx));ctx->start_async_reads(this);//如果是,則調用該函數(shù) 異步讀取}return;}if (result == -EAGAIN) {// clean up after the ctxclose_op_ctx(ctx);return;}bool successful_write = !ctx->op_t->empty() && op->may_write() && result >= 0;// prepare the replyctx->reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0,successful_write, op->qos_resp);// read or error?if ((ctx->op_t->empty() || result < 0) && !ctx->update_log_only) {// finish side-effectsif (result >= 0)do_osd_op_effects(ctx, m->get_connection());complete_read_ctx(result, ctx);//同步讀取,return;}ctx->reply->set_reply_versions(ctx->at_version, ctx->user_at_version);assert(op->may_write() || op->may_cache());// trim log?calc_trim_to();//調用函數(shù) 計算是否將舊的PG log日志進行trim操作......issue_repop(repop, ctx);//向各個副本發(fā)送同步操作請求eval_repop(repop);//檢查發(fā)向各個副本的同步操作請求是否已經(jīng)reply成功repop->put(); }11)PrimaryLogPG::issue_repop函數(shù) 主要是把講求發(fā)送到 副本OSD上進行處理
void PrimaryLogPG::issue_repop(RepGather *repop, OpContext *ctx) {FUNCTRACE();const hobject_t& soid = ctx->obs->oi.soid;dout(7) << "issue_repop rep_tid " << repop->rep_tid<< " o " << soid<< dendl;repop->v = ctx->at_version;if (ctx->at_version > eversion_t()) {for (set<pg_shard_t>::iterator i = actingbackfill.begin();i != actingbackfill.end();++i) {if (*i == get_primary()) continue;pg_info_t &pinfo = peer_info[*i];// keep peer_info up to dateif (pinfo.last_complete == pinfo.last_update)pinfo.last_complete = ctx->at_version;pinfo.last_update = ctx->at_version;}}//為寫做準備 給相關對象加ondisk_write_lock鎖ctx->obc->ondisk_write_lock();ctx->op_t->add_obc(ctx->obc);if (ctx->clone_obc) {ctx->clone_obc->ondisk_write_lock();ctx->op_t->add_obc(ctx->clone_obc);}if (ctx->head_obc) {ctx->head_obc->ondisk_write_lock();ctx->op_t->add_obc(ctx->head_obc);}Context *on_all_commit = new C_OSD_RepopCommit(this, repop);Context *on_all_applied = new C_OSD_RepopApplied(this, repop);Context *onapplied_sync = new C_OSD_OndiskWriteUnlock(ctx->obc,ctx->clone_obc,ctx->head_obc);if (!(ctx->log.empty())) {assert(ctx->at_version >= projected_last_update);projected_last_update = ctx->at_version;}for (auto &&entry: ctx->log) {projected_log.add(entry);}//將事務發(fā)送到OSD處理,對于不同的PG實現(xiàn),調用不同的類,PGBackend有兩個子類,ReplicatedBackend 和 ECBackend 兩個類對應不同的實現(xiàn)pgbackend->submit_transaction(soid,ctx->delta_stats,ctx->at_version,std::move(ctx->op_t),pg_trim_to,min_last_complete_ondisk,ctx->log,ctx->updated_hset_history,onapplied_sync,on_all_applied,on_all_commit,repop->rep_tid,ctx->reqid,ctx->op); }12)該函數(shù)用于最終調用網(wǎng)絡接口,把更新請求發(fā)送給從OSD,并調用queue_transactions 函數(shù)對該PG的主OSD上的實現(xiàn)更改。
void ReplicatedBackend::submit_transaction(const hobject_t &soid,const object_stat_sum_t &delta_stats,const eversion_t &at_version,PGTransactionUPtr &&_t,const eversion_t &trim_to,const eversion_t &roll_forward_to,const vector<pg_log_entry_t> &_log_entries,boost::optional<pg_hit_set_history_t> &hset_history,Context *on_local_applied_sync,Context *on_all_acked,Context *on_all_commit,ceph_tid_t tid,osd_reqid_t reqid,OpRequestRef orig_op) {parent->apply_stats(soid,delta_stats);vector<pg_log_entry_t> log_entries(_log_entries);ObjectStore::Transaction op_t;PGTransactionUPtr t(std::move(_t));set<hobject_t> added, removed;generate_transaction(t,coll,log_entries,&op_t,&added,&removed);assert(added.size() <= 1);assert(removed.size() <= 1);auto insert_res = in_progress_ops.insert(make_pair(tid,InProgressOp(tid, on_all_commit, on_all_acked,orig_op, at_version)));assert(insert_res.second);//構件InProgressOp請求記錄InProgressOp &op = insert_res.first->second;op.waiting_for_applied.insert(parent->get_actingbackfill_shards().begin(),parent->get_actingbackfill_shards().end());op.waiting_for_commit.insert(parent->get_actingbackfill_shards().begin(),parent->get_actingbackfill_shards().end());//調用該函數(shù),把請求發(fā)送出去,發(fā)送到從OSDissue_op(soid,at_version,tid,reqid,trim_to,at_version,added.size() ? *(added.begin()) : hobject_t(),removed.size() ? *(removed.begin()) : hobject_t(),log_entries,hset_history,&op,op_t);add_temp_objs(added);clear_temp_objs(removed);parent->log_operation(log_entries,hset_history,trim_to,at_version,true,op_t);op_t.register_on_applied_sync(on_local_applied_sync);op_t.register_on_applied(parent->bless_context(new C_OSD_OnOpApplied(this, &op)));op_t.register_on_commit(parent->bless_context(new C_OSD_OnOpCommit(this, &op)));vector<ObjectStore::Transaction> tls;tls.push_back(std::move(op_t));parent->queue_transactions(tls, op.op);//調用該函數(shù)完成最后的操作,對該PG的主OSD上的本地對象完成操作 }13) 調用的queue_transactions函數(shù),會調用到os層。
調用的函數(shù)位于 PrinaryLogPG.h
其中 osd->store 定義為
ObjectStore *store;
總結
以上是生活随笔為你收集整理的Ceph 学习——OSD读写流程与源码分析(一)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 素数(质数)的概念及如何判断素数
- 下一篇: 一辈子的尽头,原来就是毕业。