diff --git a/src/crimson/osd/object_context_loader.h b/src/crimson/osd/object_context_loader.h index 49f8f1572bf3b..5f8da15e02767 100644 --- a/src/crimson/osd/object_context_loader.h +++ b/src/crimson/osd/object_context_loader.h @@ -163,6 +163,10 @@ class ObjectContextLoader { return *this; } + void lock_excl_sync() { + target_state.lock_excl_sync(); + } + ObjectContextRef &get_obc() { ceph_assert(!target_state.is_empty()); ceph_assert(target_state.obc->is_loaded()); @@ -219,6 +223,12 @@ class ObjectContextLoader { return ret; } + Manager get_obc_manager(ObjectContextRef obc) { + Manager ret = get_obc_manager(obc->obs.oi.soid, false); + ret.set_state_obc(ret.target_state, obc); + return ret; + } + Manager get_obc_manager( Orderer &orderer, const hobject_t &oid, bool resolve_clone = true) { Manager ret = get_obc_manager(oid, resolve_clone); diff --git a/src/crimson/osd/recovery_backend.cc b/src/crimson/osd/recovery_backend.cc index aa13b9594c77f..f1dd797d254f8 100644 --- a/src/crimson/osd/recovery_backend.cc +++ b/src/crimson/osd/recovery_backend.cc @@ -5,6 +5,7 @@ #include "crimson/common/coroutine.h" #include "crimson/common/exception.h" +#include "crimson/common/log.h" #include "crimson/osd/recovery_backend.h" #include "crimson/osd/pg.h" #include "crimson/osd/pg_backend.h" @@ -13,23 +14,20 @@ #include "messages/MOSDFastDispatchOp.h" #include "osd/osd_types.h" -namespace { - seastar::logger& logger() { - return crimson::get_logger(ceph_subsys_osd); - } -} +SET_SUBSYS(osd); hobject_t RecoveryBackend::get_temp_recovery_object( const hobject_t& target, eversion_t version) const { + LOG_PREFIX(RecoveryBackend::get_temp_recovery_object); hobject_t hoid = target.make_temp_hobject(fmt::format("temp_recovering_{}_{}_{}_{}", pg.get_info().pgid, version, pg.get_info().history.same_interval_since, target.snap)); - logger().debug("{} {}", __func__, hoid); + DEBUGDPP("{}", pg, hoid); return hoid; } @@ -123,7 +121,8 @@ void RecoveryBackend::handle_backfill_finish( MOSDPGBackfill& m, crimson::net::ConnectionXcoreRef conn) { - logger().debug("{}", __func__); + LOG_PREFIX(RecoveryBackend::handle_backfill_finish); + DEBUGDPP("", pg); ceph_assert(!pg.is_primary()); ceph_assert(crimson::common::local_conf()->osd_kill_backfill_at != 1); auto reply = crimson::make_message( @@ -146,7 +145,8 @@ RecoveryBackend::interruptible_future<> RecoveryBackend::handle_backfill_progress( MOSDPGBackfill& m) { - logger().debug("{}", __func__); + LOG_PREFIX(RecoveryBackend::handle_backfill_progress); + DEBUGDPP("", pg); ceph_assert(!pg.is_primary()); ceph_assert(crimson::common::local_conf()->osd_kill_backfill_at != 2); @@ -156,7 +156,7 @@ RecoveryBackend::handle_backfill_progress( m.stats, m.op == MOSDPGBackfill::OP_BACKFILL_PROGRESS, t); - logger().debug("RecoveryBackend::handle_backfill_progress: do_transaction..."); + DEBUGDPP("submitting transaction", pg); return shard_services.get_store().do_transaction( pg.get_collection_ref(), std::move(t)).or_terminate(); } @@ -165,7 +165,8 @@ RecoveryBackend::interruptible_future<> RecoveryBackend::handle_backfill_finish_ack( MOSDPGBackfill& m) { - logger().debug("{}", __func__); + LOG_PREFIX(RecoveryBackend::handle_backfill_finish_ack); + DEBUGDPP("", pg); ceph_assert(pg.is_primary()); ceph_assert(crimson::common::local_conf()->osd_kill_backfill_at != 3); auto recovery_handler = pg.get_recovery_handler(); @@ -178,9 +179,10 @@ RecoveryBackend::handle_backfill( MOSDPGBackfill& m, crimson::net::ConnectionXcoreRef conn) { - logger().debug("{}", __func__); + LOG_PREFIX(RecoveryBackend::handle_backfill); + DEBUGDPP("", pg); if (pg.old_peering_msg(m.map_epoch, m.query_epoch)) { - logger().debug("{}: discarding {}", __func__, m); + DEBUGDPP("discarding {}", pg, m); return seastar::now(); } switch (m.op) { @@ -201,7 +203,8 @@ RecoveryBackend::interruptible_future<> RecoveryBackend::handle_backfill_remove( MOSDPGBackfillRemove& m) { - logger().debug("{} m.ls={}", __func__, m.ls); + LOG_PREFIX(RecoveryBackend::handle_backfill_remove); + DEBUGDPP("m.ls={}", pg, m.ls); assert(m.get_type() == MSG_OSD_PG_BACKFILL_REMOVE); ObjectStore::Transaction t; @@ -211,7 +214,7 @@ RecoveryBackend::handle_backfill_remove( pg.remove_maybe_snapmapped_object(t, soid); }); } - logger().debug("RecoveryBackend::handle_backfill_remove: do_transaction..."); + DEBUGDPP("submitting transaction", pg); co_await interruptor::make_interruptible( shard_services.get_store().do_transaction( pg.get_collection_ref(), std::move(t)).or_terminate()); @@ -223,16 +226,17 @@ RecoveryBackend::scan_for_backfill( [[maybe_unused]] const std::int64_t min, const std::int64_t max) { - logger().debug("{} starting from {}", __func__, start); + LOG_PREFIX(RecoveryBackend::scan_for_backfill); + DEBUGDPP("starting from {}", pg, start); auto version_map = seastar::make_lw_shared>(); return backend->list_objects(start, max).then_interruptible( - [this, start, version_map] (auto&& ret) { + [FNAME, this, start, version_map] (auto&& ret) { auto&& [objects, next] = std::move(ret); return seastar::do_with( std::move(objects), - [this, version_map](auto &objects) { + [FNAME, this, version_map](auto &objects) { return interruptor::parallel_for_each(objects, - [this, version_map] (const hobject_t& object) + [FNAME, this, version_map] (const hobject_t& object) -> interruptible_future<> { crimson::osd::ObjectContextRef obc; if (pg.is_primary()) { @@ -240,8 +244,8 @@ RecoveryBackend::scan_for_backfill( } if (obc) { if (obc->obs.exists) { - logger().debug("scan_for_backfill found (primary): {} {}", - object, obc->obs.oi.version); + DEBUGDPP("found (primary): {} {}", + pg, object, obc->obs.oi.version); version_map->emplace(object, obc->obs.oi.version); } else { // if the object does not exist here, it must have been removed @@ -251,25 +255,25 @@ RecoveryBackend::scan_for_backfill( return seastar::now(); } else { return backend->load_metadata(object).safe_then_interruptible( - [version_map, object] (auto md) { + [FNAME, this, version_map, object] (auto md) { if (md->os.exists) { - logger().debug("scan_for_backfill found: {} {}", - object, md->os.oi.version); + DEBUGDPP("found: {} {}", pg, + object, md->os.oi.version); version_map->emplace(object, md->os.oi.version); } return seastar::now(); }, PGBackend::load_metadata_ertr::assert_all{}); } }); - }).then_interruptible([version_map, start=std::move(start), next=std::move(next), this] { + }).then_interruptible([FNAME, this, version_map, start=std::move(start), next=std::move(next)] { BackfillInterval bi; bi.begin = std::move(start); bi.end = std::move(next); bi.version = pg.get_info().last_update; bi.objects = std::move(*version_map); - logger().debug("{} BackfillInterval filled, leaving, {}", - "scan_for_backfill", - bi); + DEBUGDPP("{} BackfillInterval filled, leaving, {}", + "scan_for_backfill", + pg, bi); return seastar::make_ready_future(std::move(bi)); }); }); @@ -280,7 +284,8 @@ RecoveryBackend::handle_scan_get_digest( MOSDPGScan& m, crimson::net::ConnectionXcoreRef conn) { - logger().debug("{}", __func__); + LOG_PREFIX(RecoveryBackend::handle_scan_get_digest); + DEBUGDPP("", pg); if (false /* FIXME: check for backfill too full */) { std::ignore = shard_services.start_operation( // TODO: abstract start_background_recovery @@ -316,7 +321,8 @@ RecoveryBackend::interruptible_future<> RecoveryBackend::handle_scan_digest( MOSDPGScan& m) { - logger().debug("{}", __func__); + LOG_PREFIX(RecoveryBackend::handle_scan_digest); + DEBUGDPP("", pg); // Check that from is in backfill_targets vector ceph_assert(pg.is_backfill_target(m.from)); @@ -342,9 +348,10 @@ RecoveryBackend::handle_scan( MOSDPGScan& m, crimson::net::ConnectionXcoreRef conn) { - logger().debug("{}", __func__); + LOG_PREFIX(RecoveryBackend::handle_scan); + DEBUGDPP("", pg); if (pg.old_peering_msg(m.map_epoch, m.query_epoch)) { - logger().debug("{}: discarding {}", __func__, m); + DEBUGDPP("discarding {}", pg, m); return seastar::now(); } switch (m.op) { @@ -360,7 +367,7 @@ RecoveryBackend::handle_scan( } RecoveryBackend::interruptible_future<> -RecoveryBackend::handle_recovery_op( +RecoveryBackend::handle_backfill_op( Ref m, crimson::net::ConnectionXcoreRef conn) { diff --git a/src/crimson/osd/recovery_backend.h b/src/crimson/osd/recovery_backend.h index 21154cb710679..2d755d54789fe 100644 --- a/src/crimson/osd/recovery_backend.h +++ b/src/crimson/osd/recovery_backend.h @@ -77,7 +77,7 @@ class RecoveryBackend { virtual interruptible_future<> handle_recovery_op( Ref m, - crimson::net::ConnectionXcoreRef conn); + crimson::net::ConnectionXcoreRef conn) = 0; virtual interruptible_future<> recover_object( const hobject_t& soid, @@ -268,6 +268,10 @@ class RecoveryBackend { void clean_up(ceph::os::Transaction& t, interrupt_cause_t why); virtual seastar::future<> on_stop() = 0; + + virtual interruptible_future<> handle_backfill_op( + Ref m, + crimson::net::ConnectionXcoreRef conn); private: void handle_backfill_finish( MOSDPGBackfill& m, diff --git a/src/crimson/osd/replicated_recovery_backend.cc b/src/crimson/osd/replicated_recovery_backend.cc index 0d6c9d3823665..d9a4481200a28 100644 --- a/src/crimson/osd/replicated_recovery_backend.cc +++ b/src/crimson/osd/replicated_recovery_backend.cc @@ -6,17 +6,14 @@ #include #include +#include "crimson/common/log.h" #include "crimson/osd/pg.h" #include "crimson/osd/pg_backend.h" #include "osd/osd_types_fmt.h" #include "replicated_recovery_backend.h" #include "msg/Message.h" -namespace { - seastar::logger& logger() { - return crimson::get_logger(ceph_subsys_osd); - } -} +SET_SUBSYS(osd); using std::less; using std::map; @@ -27,34 +24,39 @@ ReplicatedRecoveryBackend::recover_object( const hobject_t& soid, eversion_t need) { - logger().debug("{}: {}, {}", __func__, soid, need); + LOG_PREFIX(ReplicatedRecoveryBackend::recover_object); + DEBUGDPP("{}, {}", pg, soid, need); // always add_recovering(soid) before recover_object(soid) assert(is_recovering(soid)); // start tracking the recovery of soid - return maybe_pull_missing_obj(soid, need).then_interruptible([this, soid, need] { - logger().debug("recover_object: loading obc: {}", soid); - return pg.obc_loader.with_obc(soid, - [this, soid, need](auto head, auto obc) { - if (!obc->obs.exists) { - // XXX: this recovery must be triggered by backfills and the corresponding - // object must have been deleted by some client request after the object - // is enqueued for push but before the lock is acquired by the recovery. - // - // Abort the recovery in this case, a "recover_delete" must have been - // added for this object by the client request that deleted it. - return interruptor::now(); - } - logger().debug("recover_object: loaded obc: {}", obc->obs.oi.soid); - auto& recovery_waiter = get_recovering(soid); - recovery_waiter.obc = obc; - return maybe_push_shards(head, soid, need); - }, false).handle_error_interruptible( - crimson::osd::PG::load_obc_ertr::all_same_way([soid](auto& code) { - // TODO: may need eio handling? - logger().error("recover_object saw error code {}, ignoring object {}", - code, soid); - return seastar::now(); - })); + return maybe_pull_missing_obj( + soid, need + ).then_interruptible([FNAME, this, soid, need] { + DEBUGDPP("loading obc: {}", pg, soid); + return pg.obc_loader.with_obc( + soid, + [FNAME, this, soid, need](auto head, auto obc) { + if (!obc->obs.exists) { + // XXX: this recovery must be triggered by backfills and the corresponding + // object must have been deleted by some client request after the object + // is enqueued for push but before the lock is acquired by the recovery. + // + // Abort the recovery in this case, a "recover_delete" must have been + // added for this object by the client request that deleted it. + return interruptor::now(); + } + DEBUGDPP("loaded obc: {}", pg, obc->obs.oi.soid); + auto& recovery_waiter = get_recovering(soid); + recovery_waiter.obc = obc; + return maybe_push_shards(head, soid, need); + }, false).handle_error_interruptible( + crimson::osd::PG::load_obc_ertr::all_same_way( + [FNAME, this, soid](auto& code) { + // TODO: may need eio handling? + ERRORDPP("saw error code {}, ignoring object {}", + pg, code, soid); + return seastar::now(); + })); }); } @@ -116,7 +118,8 @@ ReplicatedRecoveryBackend::maybe_pull_missing_obj( const hobject_t& soid, eversion_t need) { - logger().debug("{}: {}, {}", __func__, soid, need); + LOG_PREFIX(ReplicatedRecoveryBackend::maybe_pull_missing_obj); + DEBUGDPP("{}, {}", pg, soid, need); pg_missing_tracker_t local_missing = pg.get_local_missing(); if (!local_missing.is_missing(soid)) { // object is not missing, don't pull @@ -159,12 +162,13 @@ ReplicatedRecoveryBackend::push_delete( const hobject_t& soid, eversion_t need) { - logger().debug("{}: {}, {}", __func__, soid, need); + LOG_PREFIX(ReplicatedRecoveryBackend::push_delete); + DEBUGDPP("{}, {}", pg, soid, need); epoch_t min_epoch = pg.get_last_peering_reset(); assert(pg.get_acting_recovery_backfill().size() > 0); return interruptor::parallel_for_each(pg.get_acting_recovery_backfill(), - [this, soid, need, min_epoch](pg_shard_t shard) + [FNAME, this, soid, need, min_epoch](pg_shard_t shard) -> interruptible_future<> { if (shard == pg.get_pg_whoami()) return seastar::make_ready_future<>(); @@ -172,7 +176,7 @@ ReplicatedRecoveryBackend::push_delete( if (iter == pg.get_shard_missing().end()) return seastar::make_ready_future<>(); if (iter->second.is_missing(soid)) { - logger().debug("push_delete: will remove {} from {}", soid, shard); + DEBUGDPP("will remove {} from {}", pg, soid, shard); pg.begin_peer_recover(shard, soid); spg_t target_pg(pg.get_info().pgid.pgid, shard.shard); auto msg = crimson::make_message( @@ -194,7 +198,8 @@ RecoveryBackend::interruptible_future<> ReplicatedRecoveryBackend::handle_recovery_delete( Ref m) { - logger().debug("{}: {}", __func__, *m); + LOG_PREFIX(ReplicatedRecoveryBackend::handle_recovery_delete); + DEBUGDPP("{}", pg, *m); auto& p = m->objects.front(); //TODO: only one delete per message for now. return local_recover_delete(p.first, p.second, pg.get_osdmap_epoch()) @@ -218,14 +223,15 @@ ReplicatedRecoveryBackend::on_local_recover_persist( bool is_delete, epoch_t epoch_frozen) { - logger().debug("{}", __func__); + LOG_PREFIX(ReplicatedRecoveryBackend::on_local_recover_persist); + DEBUGDPP("", pg); return seastar::do_with( ceph::os::Transaction(), - [this, soid, &_recovery_info, is_delete, epoch_frozen](auto &t) { + [FNAME, this, soid, &_recovery_info, is_delete, epoch_frozen](auto &t) { return pg.get_recovery_handler()->on_local_recover( soid, _recovery_info, is_delete, t - ).then_interruptible([this, &t] { - logger().debug("ReplicatedRecoveryBackend::{}: do_transaction...", __func__); + ).then_interruptible([FNAME, this, &t] { + DEBUGDPP("submitting transaction", pg); return shard_services.get_store().do_transaction(coll, std::move(t)); }).then_interruptible( [this, epoch_frozen, last_complete = pg.get_info().last_complete] { @@ -241,17 +247,18 @@ ReplicatedRecoveryBackend::local_recover_delete( eversion_t need, epoch_t epoch_to_freeze) { - logger().debug("{}: {}, {}", __func__, soid, need); - return backend->load_metadata(soid).safe_then_interruptible([this] + LOG_PREFIX(ReplicatedRecoveryBackend::local_recover_delete); + DEBUGDPP("{}, {}", pg, soid, need); + return backend->load_metadata(soid).safe_then_interruptible([FNAME, this] (auto lomt) -> interruptible_future<> { if (lomt->os.exists) { return seastar::do_with(ceph::os::Transaction(), - [this, lomt = std::move(lomt)](auto& txn) mutable { + [FNAME, this, lomt = std::move(lomt)](auto& txn) mutable { return interruptor::async([this, lomt=std::move(lomt), &txn] { pg.remove_maybe_snapmapped_object(txn, lomt->os.oi.soid); }).then_interruptible( - [this, &txn]() mutable { - logger().debug("ReplicatedRecoveryBackend::local_recover_delete: do_transaction..."); + [FNAME, this, &txn]() mutable { + DEBUGDPP("submitting transaction", pg); return shard_services.get_store().do_transaction(coll, std::move(txn)); }); @@ -285,13 +292,14 @@ RecoveryBackend::interruptible_future<> ReplicatedRecoveryBackend::recover_delete( const hobject_t &soid, eversion_t need) { - logger().debug("{}: {}, {}", __func__, soid, need); + LOG_PREFIX(ReplicatedRecoveryBackend::recover_delete); + DEBUGDPP("{}, {}", pg, soid, need); epoch_t cur_epoch = pg.get_osdmap_epoch(); return seastar::do_with(object_stat_sum_t(), - [this, soid, need, cur_epoch](auto& stat_diff) { + [FNAME, this, soid, need, cur_epoch](auto& stat_diff) { return local_recover_delete(soid, need, cur_epoch).then_interruptible( - [this, &stat_diff, cur_epoch, soid, need]() + [FNAME, this, &stat_diff, cur_epoch, soid, need]() -> interruptible_future<> { if (!pg.has_reset_since(cur_epoch)) { bool object_missing = false; @@ -299,8 +307,9 @@ ReplicatedRecoveryBackend::recover_delete( if (shard == pg.get_pg_whoami()) continue; if (pg.get_shard_missing(shard)->is_missing(soid)) { - logger().debug("recover_delete: soid {} needs to deleted from replca {}", - soid, shard); + DEBUGDPP( + "soid {} needs to be deleted from replica {}", + pg, soid, shard); object_missing = true; break; } @@ -330,7 +339,8 @@ ReplicatedRecoveryBackend::prep_push_to_replica( eversion_t need, pg_shard_t pg_shard) { - logger().debug("{}: {}, {}", __func__, soid, need); + LOG_PREFIX(ReplicatedRecoveryBackend::prep_push_to_replica); + DEBUGDPP("{}, {}", pg, soid, need); auto& recovery_waiter = get_recovering(soid); auto& obc = recovery_waiter.obc; @@ -347,8 +357,8 @@ ReplicatedRecoveryBackend::prep_push_to_replica( // try to base push off of clones that succeed/preceed poid // we need the head (and current SnapSet) locally to do that. if (pg.get_local_missing().is_missing(head)) { - logger().debug("{} missing head {}, pushing raw clone", - __func__, head); + DEBUGDPP("missing head {}, pushing raw clone", + pg, head); if (obc->obs.oi.size) { subsets.data_subset.insert(0, obc->obs.oi.size); } @@ -361,8 +371,7 @@ ReplicatedRecoveryBackend::prep_push_to_replica( auto ssc = obc->ssc; ceph_assert(ssc); push_info_ss = ssc->snapset; - logger().debug("push_to_replica snapset is {}", - ssc->snapset); + DEBUGDPP("snapset is {}", pg, ssc->snapset); subsets = crimson::osd::calc_clone_subsets( ssc->snapset, soid, @@ -375,8 +384,7 @@ ReplicatedRecoveryBackend::prep_push_to_replica( // base this on partially on replica's clones? auto ssc = obc->ssc; ceph_assert(ssc); - logger().debug("push_to_replica snapset is {}", - ssc->snapset); + DEBUGDPP("snapset is {}", pg, ssc->snapset); subsets = crimson::osd::calc_head_subsets( obc->obs.oi.size, ssc->snapset, soid, @@ -399,7 +407,8 @@ ReplicatedRecoveryBackend::prep_push( const crimson::osd::subsets_t& subsets, const SnapSet push_info_ss) { - logger().debug("{}: {}, {}", __func__, soid, need); + LOG_PREFIX(ReplicatedRecoveryBackend::prep_push); + DEBUGDPP("{}, {}", pg, soid, need); auto& recovery_waiter = get_recovering(soid); auto& obc = recovery_waiter.obc; @@ -439,8 +448,10 @@ void ReplicatedRecoveryBackend::prepare_pull( PullOp& pull_op, pull_info_t& pull_info, const hobject_t& soid, - eversion_t need) { - logger().debug("{}: {}, {}", __func__, soid, need); + eversion_t need) +{ + LOG_PREFIX(ReplicatedRecoveryBackend::prepare_pull); + DEBUGDPP("{}, {}", pg, soid, need); pg_missing_tracker_t local_missing = pg.get_local_missing(); const auto missing_iter = local_missing.get_items().find(soid); @@ -471,6 +482,7 @@ ObjectRecoveryInfo ReplicatedRecoveryBackend::set_recovery_info( const hobject_t& soid, const crimson::osd::SnapSetContextRef ssc) { + LOG_PREFIX(ReplicatedRecoveryBackend::set_recovery_info); pg_missing_tracker_t local_missing = pg.get_local_missing(); const auto missing_iter = local_missing.get_items().find(soid); ObjectRecoveryInfo recovery_info; @@ -481,7 +493,7 @@ ObjectRecoveryInfo ReplicatedRecoveryBackend::set_recovery_info( auto subsets = crimson::osd::calc_clone_subsets( ssc->snapset, soid, local_missing, pg.get_info().last_backfill); crimson::osd::set_subsets(subsets, recovery_info); - logger().debug("{}: pulling {}", __func__, recovery_info); + DEBUGDPP("pulling {}", pg, recovery_info); ceph_assert(ssc->snapset.clone_size.count(soid.snap)); recovery_info.size = ssc->snapset.clone_size[soid.snap]; } else { @@ -504,40 +516,41 @@ ReplicatedRecoveryBackend::build_push_op( const ObjectRecoveryProgress& progress, object_stat_sum_t* stat) { - logger().debug("{} {} @{}", - __func__, recovery_info.soid, recovery_info.version); + LOG_PREFIX(ReplicatedRecoveryBackend::build_push_op); + DEBUGDPP("{} @{}", pg, recovery_info.soid, recovery_info.version); return seastar::do_with(ObjectRecoveryProgress(progress), uint64_t(crimson::common::local_conf() ->osd_recovery_max_chunk), recovery_info.version, PushOp(), - [this, &recovery_info, &progress, stat] + [FNAME, this, &recovery_info, &progress, stat] (auto& new_progress, auto& available, auto& v, auto& push_op) { return read_metadata_for_push_op(recovery_info.soid, progress, new_progress, v, &push_op - ).then_interruptible([&](eversion_t local_ver) mutable { + ).then_interruptible([&, FNAME](eversion_t local_ver) mutable { // If requestor didn't know the version, use ours if (v == eversion_t()) { v = local_ver; } else if (v != local_ver) { - logger().error("build_push_op: {} push {} v{} failed because local copy is {}", - pg.get_pgid(), recovery_info.soid, recovery_info.version, local_ver); + ERRORDPP( + "push {} v{} failed because local copy is {}", + pg, recovery_info.soid, recovery_info.version, local_ver); // TODO: bail out } return read_omap_for_push_op(recovery_info.soid, progress, new_progress, available, &push_op); - }).then_interruptible([this, &recovery_info, &progress, + }).then_interruptible([FNAME, this, &recovery_info, &progress, &available, &push_op]() mutable { - logger().debug("build_push_op: available: {}, copy_subset: {}", - available, recovery_info.copy_subset); + DEBUGDPP("available: {}, copy_subset: {}", + pg, available, recovery_info.copy_subset); return read_object_for_push_op(recovery_info.soid, recovery_info.copy_subset, progress.data_recovered_to, available, &push_op); - }).then_interruptible([&recovery_info, &v, &progress, + }).then_interruptible([FNAME, this, &recovery_info, &v, &progress, &new_progress, stat, &push_op] (uint64_t recovered_to) mutable { new_progress.data_recovered_to = recovered_to; @@ -559,9 +572,8 @@ ReplicatedRecoveryBackend::build_push_op( push_op.recovery_info = recovery_info; push_op.after_progress = new_progress; push_op.before_progress = progress; - logger().debug("build_push_op: push_op version:" - " {}, push_op data length: {}", - push_op.version, push_op.data.length()); + DEBUGDPP("push_op version: {}, push_op data length: {}", + pg, push_op.version, push_op.data.length()); return seastar::make_ready_future(std::move(push_op)); }); }); @@ -575,7 +587,8 @@ ReplicatedRecoveryBackend::read_metadata_for_push_op( eversion_t ver, PushOp* push_op) { - logger().debug("{}, {}", __func__, oid); + LOG_PREFIX(ReplicatedRecoveryBackend::read_metadata_for_push_op); + DEBUGDPP("{}", pg, oid); if (!progress.first) { return seastar::make_ready_future(ver); } @@ -584,30 +597,30 @@ ReplicatedRecoveryBackend::read_metadata_for_push_op( coll, ghobject_t(oid), CEPH_OSD_OP_FLAG_FADVISE_DONTNEED ).handle_error_interruptible( crimson::os::FuturizedStore::Shard::read_errorator::all_same_way( - [oid] (const std::error_code& e) { - logger().debug("read_metadata_for_push_op, error {} when getting omap header: {}", e, oid); + [FNAME, this, oid] (const std::error_code& e) { + DEBUGDPP("error {} when getting omap header: {}", pg, e, oid); return seastar::make_ready_future(); })), interruptor::make_interruptible( store->get_attrs(coll, ghobject_t(oid), CEPH_OSD_OP_FLAG_FADVISE_DONTNEED) ).handle_error_interruptible( crimson::os::FuturizedStore::Shard::get_attrs_ertr::all_same_way( - [oid] (const std::error_code& e) { - logger().debug("read_metadata_for_push_op, error {} when getting attrs: {}", e, oid); + [FNAME, this, oid] (const std::error_code& e) { + DEBUGDPP("error {} when getting attrs: {}", pg, e, oid); return seastar::make_ready_future(); })) - )).then_unpack_interruptible([&new_progress, push_op](auto bl, auto attrs) { + )).then_unpack_interruptible([FNAME, this, &new_progress, push_op](auto bl, auto attrs) { if (bl.length() == 0) { - logger().warn("read_metadata_for_push_op: fail to read omap header"); + WARNDPP("fail to read omap header", pg); } else if (attrs.empty()) { - logger().error("read_metadata_for_push_op: fail to read attrs"); + ERRORDPP("fail to read attrs", pg); return eversion_t{}; } push_op->omap_header.claim_append(std::move(bl)); for (auto&& [key, val] : attrs) { push_op->attrset.emplace(std::move(key), std::move(val)); } - logger().debug("read_metadata_for_push_op: {}", push_op->attrset[OI_ATTR]); + DEBUGDPP("{}", pg, push_op->attrset[OI_ATTR]); object_info_t oi; oi.decode_no_oid(push_op->attrset[OI_ATTR]); new_progress.first = false; @@ -623,6 +636,7 @@ ReplicatedRecoveryBackend::read_object_for_push_op( uint64_t max_len, PushOp* push_op) { + LOG_PREFIX(ReplicatedRecoveryBackend::read_object_for_push_op); if (max_len == 0 || copy_subset.empty()) { push_op->data_included.clear(); return seastar::make_ready_future(offset); @@ -668,8 +682,8 @@ ReplicatedRecoveryBackend::read_object_for_push_op( recovered_to = push_op->data_included.range_end(); } return seastar::make_ready_future(recovered_to); - }, PGBackend::read_errorator::all_same_way([](auto e) { - logger().debug("build_push_op: read exception"); + }, PGBackend::read_errorator::all_same_way([FNAME, this](auto e) { + DEBUGDPP("read exception", pg); return seastar::make_exception_future(e); })); } @@ -757,16 +771,18 @@ ReplicatedRecoveryBackend::get_shards_to_push(const hobject_t& soid) const RecoveryBackend::interruptible_future<> ReplicatedRecoveryBackend::handle_pull(Ref m) { - logger().debug("{}: {}", __func__, *m); + LOG_PREFIX(ReplicatedRecoveryBackend::handle_pull); + DEBUGDPP("{}", pg, *m); if (pg.can_discard_replica_op(*m)) { - logger().debug("{}: discarding {}", __func__, *m); + DEBUGDPP("discarding {}", pg, *m); return seastar::now(); } - return seastar::do_with(m->take_pulls(), [this, from=m->from](auto& pulls) { - return interruptor::parallel_for_each(pulls, - [this, from](auto& pull_op) { + return seastar::do_with(m->take_pulls(), [FNAME, this, from=m->from](auto& pulls) { + return interruptor::parallel_for_each( + pulls, + [FNAME, this, from](auto& pull_op) { const hobject_t& soid = pull_op.soid; - logger().debug("handle_pull: {}", soid); + DEBUGDPP("{}", pg, soid); return backend->stat(coll, ghobject_t(soid)).then_interruptible( [this, &pull_op](auto st) { ObjectRecoveryInfo &recovery_info = pull_op.recovery_info; @@ -803,12 +819,13 @@ RecoveryBackend::interruptible_future ReplicatedRecoveryBackend::_handle_pull_response( pg_shard_t from, PushOp& push_op, - PullOp* response, - ceph::os::Transaction* t) + PullOp* response) { - logger().debug("handle_pull_response {} {} data.size() is {} data_included: {}", - push_op.recovery_info, push_op.after_progress, - push_op.data.length(), push_op.data_included); + LOG_PREFIX(ReplicatedRecoveryBackend::handle_pull); + DEBUGDPP("{} {} data.size() is {} data_included: {}", + pg, push_op.recovery_info, push_op.after_progress, + push_op.data.length(), push_op.data_included); + ceph::os::Transaction t; const hobject_t &hoid = push_op.soid; auto& recovery_waiter = get_recovering(hoid); @@ -823,12 +840,10 @@ ReplicatedRecoveryBackend::_handle_pull_response( if (pull_info.recovery_info.version == eversion_t()) pull_info.recovery_info.version = push_op.version; - auto prepare_waiter = interruptor::make_interruptible( - seastar::make_ready_future<>()); if (pull_info.recovery_progress.first) { - prepare_waiter = pg.obc_loader.with_obc( + auto fut = pg.obc_loader.with_obc( pull_info.recovery_info.soid, - [this, &pull_info, &recovery_waiter, &push_op](auto, auto obc) { + [FNAME, this, &pull_info, &recovery_waiter, &push_op](auto, auto obc) { pull_info.obc = obc; recovery_waiter.obc = obc; obc->obs.oi.decode_no_oid(push_op.attrset.at(OI_ATTR), @@ -843,7 +858,7 @@ ReplicatedRecoveryBackend::_handle_pull_response( obc->ssc->snapset = SnapSet(ss_attr_iter->second); obc->ssc->exists = true; } catch (const buffer::error&) { - logger().warn("unable to decode SnapSet"); + WARNDPP("unable to decode SnapSet", pg); throw crimson::osd::invalid_argument(); } assert(!pull_info.obc->ssc->exists || @@ -857,55 +872,59 @@ ReplicatedRecoveryBackend::_handle_pull_response( } return crimson::osd::PG::load_obc_ertr::now(); }, false).handle_error_interruptible(crimson::ct_error::assert_all{}); + co_await std::move(fut); }; - return prepare_waiter.then_interruptible( - [this, &pull_info, &push_op, t, response]() mutable { - const bool first = pull_info.recovery_progress.first; - pull_info.recovery_progress = push_op.after_progress; - logger().debug("new recovery_info {}, new progress {}", - pull_info.recovery_info, pull_info.recovery_progress); - interval_set data_zeros; - { - uint64_t offset = push_op.before_progress.data_recovered_to; - uint64_t length = (push_op.after_progress.data_recovered_to - - push_op.before_progress.data_recovered_to); - if (length) { - data_zeros.insert(offset, length); - } + + const bool first = pull_info.recovery_progress.first; + pull_info.recovery_progress = push_op.after_progress; + DEBUGDPP("new recovery_info {}, new progress {}", + pg, pull_info.recovery_info, pull_info.recovery_progress); + interval_set data_zeros; + { + uint64_t offset = push_op.before_progress.data_recovered_to; + uint64_t length = (push_op.after_progress.data_recovered_to - + push_op.before_progress.data_recovered_to); + if (length) { + data_zeros.insert(offset, length); } - auto [usable_intervals, data] = - trim_pushed_data(pull_info.recovery_info.copy_subset, - push_op.data_included, push_op.data); - bool complete = pull_info.is_complete(); - bool clear_omap = !push_op.before_progress.omap_complete; - return submit_push_data(pull_info.recovery_info, - first, complete, clear_omap, + } + auto [usable_intervals, data] = + trim_pushed_data(pull_info.recovery_info.copy_subset, + push_op.data_included, push_op.data); + bool complete = pull_info.is_complete(); + bool clear_omap = !push_op.before_progress.omap_complete; + co_await submit_push_data(pull_info.recovery_info, + first, complete, clear_omap, std::move(data_zeros), std::move(usable_intervals), std::move(data), std::move(push_op.omap_header), - push_op.attrset, std::move(push_op.omap_entries), t) - .then_interruptible( - [this, response, &pull_info, &push_op, complete, - t, bytes_recovered=data.length()]() - -> RecoveryBackend::interruptible_future { - pull_info.stat.num_keys_recovered += push_op.omap_entries.size(); - pull_info.stat.num_bytes_recovered += bytes_recovered; - - if (complete) { - pull_info.stat.num_objects_recovered++; - return pg.get_recovery_handler()->on_local_recover( - push_op.soid, get_recovering(push_op.soid).pull_info->recovery_info, - false, *t - ).then_interruptible([] { - return true; - }); - } else { - response->soid = push_op.soid; - response->recovery_info = pull_info.recovery_info; - response->recovery_progress = pull_info.recovery_progress; - return seastar::make_ready_future(false); - } - }); - }); + push_op.attrset, std::move(push_op.omap_entries), &t); + + const auto bytes_recovered = data.length(); + pull_info.stat.num_keys_recovered += push_op.omap_entries.size(); + pull_info.stat.num_bytes_recovered += bytes_recovered; + + if (complete) { + pull_info.stat.num_objects_recovered++; + auto manager = pg.obc_loader.get_obc_manager( + recovery_waiter.obc); + manager.lock_excl_sync(); /* cannot already be locked */ + co_await pg.get_recovery_handler()->on_local_recover( + push_op.soid, get_recovering(push_op.soid).pull_info->recovery_info, + false, t + ); + DEBUGDPP("submitting transaction, complete", pg); + co_await interruptor::make_interruptible( + shard_services.get_store().do_transaction(coll, std::move(t))); + } else { + response->soid = push_op.soid; + response->recovery_info = pull_info.recovery_info; + response->recovery_progress = pull_info.recovery_progress; + DEBUGDPP("submitting transaction, incomplete", pg); + co_await interruptor::make_interruptible( + shard_services.get_store().do_transaction(coll, std::move(t))); + } + + co_return complete; } void ReplicatedRecoveryBackend::recalc_subsets( @@ -923,56 +942,47 @@ RecoveryBackend::interruptible_future<> ReplicatedRecoveryBackend::handle_pull_response( Ref m) { + LOG_PREFIX(ReplicatedRecoveryBackend::handle_pull_response); if (pg.can_discard_replica_op(*m)) { - logger().debug("{}: discarding {}", __func__, *m); - return seastar::now(); + DEBUGDPP("discarding {}", pg, *m); + co_return; } - const PushOp& push_op = m->pushes[0]; //TODO: only one push per message for now. + PushOp& push_op = m->pushes[0]; //TODO: only one push per message for now. if (push_op.version == eversion_t()) { // replica doesn't have it! pg.get_recovery_handler()->on_failed_recover({ m->from }, push_op.soid, get_recovering(push_op.soid).pull_info->recovery_info.version); - return seastar::make_exception_future<>( - std::runtime_error(fmt::format( - "Error on pushing side {} when pulling obj {}", - m->from, push_op.soid))); + throw std::runtime_error( + fmt::format( + "Error on pushing side {} when pulling obj {}", + m->from, push_op.soid)); } - logger().debug("{}: {}", __func__, *m); - return seastar::do_with(PullOp(), [this, m](auto& response) { - return seastar::do_with(ceph::os::Transaction(), m.get(), - [this, &response](auto& t, auto& m) { - pg_shard_t from = m->from; - PushOp& push_op = m->pushes[0]; // only one push per message for now - return _handle_pull_response(from, push_op, &response, &t - ).then_interruptible( - [this, &t](bool complete) { - epoch_t epoch_frozen = pg.get_osdmap_epoch(); - logger().debug("ReplicatedRecoveryBackend::handle_pull_response: do_transaction..."); - return shard_services.get_store().do_transaction(coll, std::move(t)) - .then([this, epoch_frozen, complete, - last_complete = pg.get_info().last_complete] { - pg.get_recovery_handler()->_committed_pushed_object(epoch_frozen, last_complete); - return seastar::make_ready_future(complete); - }); - }); - }).then_interruptible([this, m, &response](bool complete) { - if (complete) { - auto& push_op = m->pushes[0]; - get_recovering(push_op.soid).set_pulled(); - return seastar::make_ready_future<>(); - } else { - auto reply = crimson::make_message(); - reply->from = pg.get_pg_whoami(); - reply->set_priority(m->get_priority()); - reply->pgid = pg.get_info().pgid; - reply->map_epoch = m->map_epoch; - reply->min_epoch = m->min_epoch; - reply->set_pulls({std::move(response)}); - return shard_services.send_to_osd(m->from.osd, std::move(reply), pg.get_osdmap_epoch()); - } - }); - }); + DEBUGDPP("{}", pg, *m); + PullOp response; + + pg_shard_t from = m->from; + + epoch_t epoch_frozen = pg.get_osdmap_epoch(); + const bool complete = co_await _handle_pull_response( + from, push_op, &response); + + if (complete) { + pg.get_recovery_handler()->_committed_pushed_object( + epoch_frozen, pg.get_info().last_complete); + get_recovering(push_op.soid).set_pulled(); + } else { + auto reply = crimson::make_message(); + reply->from = pg.get_pg_whoami(); + reply->set_priority(m->get_priority()); + reply->pgid = pg.get_info().pgid; + reply->map_epoch = m->map_epoch; + reply->min_epoch = m->min_epoch; + reply->set_pulls({std::move(response)}); + co_await interruptor::make_interruptible( + shard_services.send_to_osd( + m->from.osd, std::move(reply), pg.get_osdmap_epoch())); + } } RecoveryBackend::interruptible_future<> @@ -982,7 +992,8 @@ ReplicatedRecoveryBackend::_handle_push( PushReplyOp *response, ceph::os::Transaction *t) { - logger().debug("{}", __func__); + LOG_PREFIX(ReplicatedRecoveryBackend::_handle_push); + DEBUGDPP("{}", pg); bool first = push_op.before_progress.first; interval_set data_zeros; @@ -1021,23 +1032,16 @@ RecoveryBackend::interruptible_future<> ReplicatedRecoveryBackend::handle_push( Ref m) { - if (pg.can_discard_replica_op(*m)) { - logger().debug("{}: discarding {}", __func__, *m); - return seastar::now(); - } - if (pg.is_primary()) { - return handle_pull_response(m); - } - - logger().debug("{}: {}", __func__, *m); - return seastar::do_with(PushReplyOp(), [this, m](auto& response) { + LOG_PREFIX(ReplicatedRecoveryBackend::handle_push); + DEBUGDPP("{}", pg, *m); + return seastar::do_with(PushReplyOp(), [FNAME, this, m](auto& response) { PushOp& push_op = m->pushes[0]; // TODO: only one push per message for now return seastar::do_with(ceph::os::Transaction(), - [this, m, &push_op, &response](auto& t) { + [FNAME, this, m, &push_op, &response](auto& t) { return _handle_push(m->from, push_op, &response, &t).then_interruptible( - [this, &t] { + [FNAME, this, &t] { epoch_t epoch_frozen = pg.get_osdmap_epoch(); - logger().debug("ReplicatedRecoveryBackend::handle_push: do_transaction..."); + DEBUGDPP("submitting transaction", pg); return interruptor::make_interruptible( shard_services.get_store().do_transaction(coll, std::move(t))).then_interruptible( [this, epoch_frozen, last_complete = pg.get_info().last_complete] { @@ -1065,12 +1069,13 @@ ReplicatedRecoveryBackend::_handle_push_reply( pg_shard_t peer, const PushReplyOp &op) { + LOG_PREFIX(ReplicatedRecoveryBackend::handle_push); const hobject_t& soid = op.soid; - logger().debug("{}, soid {}, from {}", __func__, soid, peer); + DEBUGDPP("soid {}, from {}", pg, soid, peer); auto recovering_iter = recovering.find(soid); if (recovering_iter == recovering.end() || !recovering_iter->second->pushing.count(peer)) { - logger().debug("huh, i wasn't pushing {} to osd.{}", soid, peer); + DEBUGDPP("huh, i wasn't pushing {} to osd.{}", pg, soid, peer); return seastar::make_ready_future>(); } else { auto& push_info = recovering_iter->second->pushing[peer]; @@ -1103,7 +1108,8 @@ RecoveryBackend::interruptible_future<> ReplicatedRecoveryBackend::handle_push_reply( Ref m) { - logger().debug("{}: {}", __func__, *m); + LOG_PREFIX(ReplicatedRecoveryBackend::handle_push_reply); + DEBUGDPP("{}", pg, *m); auto from = m->from; auto& push_reply = m->replies[0]; //TODO: only one reply per message @@ -1133,7 +1139,8 @@ ReplicatedRecoveryBackend::trim_pushed_data( const interval_set &intervals_received, ceph::bufferlist data_received) { - logger().debug("{}", __func__); + LOG_PREFIX(ReplicatedRecoveryBackend::trim_pushed_data); + DEBUGDPP("", pg); // what i have is only a subset of what i want if (intervals_received.subset_of(copy_subset)) { return {intervals_received, data_received}; @@ -1166,12 +1173,12 @@ ReplicatedRecoveryBackend::prep_push_target( bool clear_omap, ObjectStore::Transaction* t, const map>& attrs, - bufferlist&& omap_header) + bufferlist omap_header) { + LOG_PREFIX(ReplicatedRecoveryBackend::prep_push_target); if (!first) { - return seastar::make_ready_future( - get_temp_recovery_object(recovery_info.soid, - recovery_info.version)); + co_return get_temp_recovery_object(recovery_info.soid, + recovery_info.version); } ghobject_t target_oid; @@ -1181,10 +1188,11 @@ ReplicatedRecoveryBackend::prep_push_target( } else { target_oid = ghobject_t(get_temp_recovery_object(recovery_info.soid, recovery_info.version)); - logger().debug("{}: Adding oid {} in the temp collection", - __func__, target_oid); + DEBUGDPP("Adding oid {} in the temp collection", + pg, target_oid); add_temp_obj(target_oid.hobj); } + // create a new object if (!complete || !recovery_info.object_exist) { t->remove(coll->get_cid(), target_oid); @@ -1196,6 +1204,7 @@ ReplicatedRecoveryBackend::prep_push_target( oi.expected_write_size, oi.alloc_hint_flags); } + if (complete) { // remove xattr and update later if overwrite on original object t->rmattrs(coll->get_cid(), target_oid); @@ -1209,109 +1218,102 @@ ReplicatedRecoveryBackend::prep_push_target( t->omap_setheader(coll->get_cid(), target_oid, omap_header); } if (complete || !recovery_info.object_exist) { - return seastar::make_ready_future(target_oid.hobj); + co_return target_oid.hobj; } + // clone overlap content in local object if using a new object - return interruptor::make_interruptible(store->stat(coll, ghobject_t(recovery_info.soid))) - .then_interruptible( - [this, &recovery_info, t, target_oid] (auto st) { - // TODO: pg num bytes counting - uint64_t local_size = std::min(recovery_info.size, (uint64_t)st.st_size); - interval_set local_intervals_included, local_intervals_excluded; - if (local_size) { - local_intervals_included.insert(0, local_size); - local_intervals_excluded.intersection_of(local_intervals_included, recovery_info.copy_subset); - local_intervals_included.subtract(local_intervals_excluded); - } - for (auto [off, len] : local_intervals_included) { - logger().debug(" clone_range {} {}~{}", - recovery_info.soid, off, len); - t->clone_range(coll->get_cid(), ghobject_t(recovery_info.soid), - target_oid, off, len, off); - } - return seastar::make_ready_future(target_oid.hobj); - }); + auto st = co_await interruptor::make_interruptible( + store->stat(coll, ghobject_t(recovery_info.soid))); + + // TODO: pg num bytes counting + uint64_t local_size = std::min(recovery_info.size, (uint64_t)st.st_size); + interval_set local_intervals_included, local_intervals_excluded; + if (local_size) { + local_intervals_included.insert(0, local_size); + local_intervals_excluded.intersection_of(local_intervals_included, recovery_info.copy_subset); + local_intervals_included.subtract(local_intervals_excluded); + } + for (auto [off, len] : local_intervals_included) { + DEBUGDPP("clone_range {} {}~{}", + pg, recovery_info.soid, off, len); + t->clone_range(coll->get_cid(), ghobject_t(recovery_info.soid), + target_oid, off, len, off); + } + co_return target_oid.hobj; } + RecoveryBackend::interruptible_future<> ReplicatedRecoveryBackend::submit_push_data( const ObjectRecoveryInfo &recovery_info, bool first, bool complete, bool clear_omap, - interval_set&& data_zeros, - interval_set&& intervals_included, - bufferlist&& data_included, - bufferlist&& omap_header, + interval_set data_zeros, + interval_set intervals_included, + bufferlist data_included, + bufferlist omap_header, const map> &attrs, - map&& omap_entries, + map omap_entries, ObjectStore::Transaction *t) { - logger().debug("{}", __func__); - return prep_push_target(recovery_info, first, complete, - clear_omap, t, attrs, - std::move(omap_header)).then_interruptible( - [this, - &recovery_info, t, - first, complete, - data_zeros=std::move(data_zeros), - intervals_included=std::move(intervals_included), - data_included=std::move(data_included), - omap_entries=std::move(omap_entries), - &attrs](auto target_oid) mutable { - - uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL; - // Punch zeros for data, if fiemap indicates nothing but it is marked dirty - if (!data_zeros.empty()) { - data_zeros.intersection_of(recovery_info.copy_subset); - assert(intervals_included.subset_of(data_zeros)); - data_zeros.subtract(intervals_included); - - logger().debug("submit_push_data recovering object {} copy_subset: {} " - "intervals_included: {} data_zeros: {}", - recovery_info.soid, recovery_info.copy_subset, - intervals_included, data_zeros); - - for (auto [start, len] : data_zeros) { - t->zero(coll->get_cid(), ghobject_t(target_oid), start, len); - } - } - uint64_t off = 0; - for (auto [start, len] : intervals_included) { - bufferlist bit; - bit.substr_of(data_included, off, len); - t->write(coll->get_cid(), ghobject_t(target_oid), - start, len, bit, fadvise_flags); - off += len; + LOG_PREFIX(ReplicatedRecoveryBackend::submit_push_data); + DEBUGDPP("", pg); + auto target_oid = co_await prep_push_target( + recovery_info, first, complete, + clear_omap, t, attrs, + std::move(omap_header)); + + uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL; + // Punch zeros for data, if fiemap indicates nothing but it is marked dirty + if (!data_zeros.empty()) { + data_zeros.intersection_of(recovery_info.copy_subset); + assert(intervals_included.subset_of(data_zeros)); + data_zeros.subtract(intervals_included); + + DEBUGDPP("recovering object {} copy_subset: {} " + "intervals_included: {} data_zeros: {}", + pg, recovery_info.soid, recovery_info.copy_subset, + intervals_included, data_zeros); + + for (auto [start, len] : data_zeros) { + t->zero(coll->get_cid(), ghobject_t(target_oid), start, len); } + } + uint64_t off = 0; + for (auto [start, len] : intervals_included) { + bufferlist bit; + bit.substr_of(data_included, off, len); + t->write(coll->get_cid(), ghobject_t(target_oid), + start, len, bit, fadvise_flags); + off += len; + } - if (!omap_entries.empty()) - t->omap_setkeys(coll->get_cid(), ghobject_t(target_oid), omap_entries); - if (!attrs.empty()) - t->setattrs(coll->get_cid(), ghobject_t(target_oid), attrs); + if (!omap_entries.empty()) + t->omap_setkeys(coll->get_cid(), ghobject_t(target_oid), omap_entries); + if (!attrs.empty()) + t->setattrs(coll->get_cid(), ghobject_t(target_oid), attrs); - if (complete) { - if (!first) { - logger().debug("submit_push_data: Removing oid {} from the temp collection", - target_oid); - clear_temp_obj(target_oid); - t->remove(coll->get_cid(), ghobject_t(recovery_info.soid)); - t->collection_move_rename(coll->get_cid(), ghobject_t(target_oid), - coll->get_cid(), ghobject_t(recovery_info.soid)); - } - submit_push_complete(recovery_info, t); + if (complete) { + if (!first) { + DEBUGDPP("Removing oid {} from the temp collection", + pg, target_oid); + clear_temp_obj(target_oid); + t->remove(coll->get_cid(), ghobject_t(recovery_info.soid)); + t->collection_move_rename(coll->get_cid(), ghobject_t(target_oid), + coll->get_cid(), ghobject_t(recovery_info.soid)); } - logger().debug("submit_push_data: done"); - return seastar::make_ready_future<>(); - }); + submit_push_complete(recovery_info, t); + } } void ReplicatedRecoveryBackend::submit_push_complete( const ObjectRecoveryInfo &recovery_info, ObjectStore::Transaction *t) { + LOG_PREFIX(ReplicatedRecoveryBackend::submit_push_complete); for (const auto& [oid, extents] : recovery_info.clone_subset) { for (const auto& [off, len] : extents) { - logger().debug(" clone_range {} {}~{}", oid, off, len); + DEBUGDPP("clone_range {} {}~{}", pg, oid, off, len); t->clone_range(coll->get_cid(), ghobject_t(oid), ghobject_t(recovery_info.soid), off, len, off); } @@ -1336,8 +1338,9 @@ ReplicatedRecoveryBackend::handle_recovery_op( Ref m, crimson::net::ConnectionXcoreRef conn) { + LOG_PREFIX(ReplicatedRecoveryBackend::handle_recovery_op); if (pg.can_discard_replica_op(*m)) { - logger().debug("{}: discarding {}", __func__, *m); + DEBUGDPP("discarding {}", pg, *m); return seastar::now(); } @@ -1345,7 +1348,12 @@ ReplicatedRecoveryBackend::handle_recovery_op( case MSG_OSD_PG_PULL: return handle_pull(boost::static_pointer_cast(m)); case MSG_OSD_PG_PUSH: - return handle_push(boost::static_pointer_cast(m)); + if (pg.is_primary()) { + return handle_pull_response( + boost::static_pointer_cast(m)); + } else { + return handle_push(boost::static_pointer_cast(m)); + } case MSG_OSD_PG_PUSH_REPLY: return handle_push_reply( boost::static_pointer_cast(m)); @@ -1356,8 +1364,8 @@ ReplicatedRecoveryBackend::handle_recovery_op( return handle_recovery_delete_reply( boost::static_pointer_cast(m)); default: - // delegate to parent class for handling backend-agnostic recovery ops. - return RecoveryBackend::handle_recovery_op(std::move(m), conn); + // delegate backfill messages to parent class + return handle_backfill_op(std::move(m), conn); } } diff --git a/src/crimson/osd/replicated_recovery_backend.h b/src/crimson/osd/replicated_recovery_backend.h index adf7188c89185..e8c3a44ece9b2 100644 --- a/src/crimson/osd/replicated_recovery_backend.h +++ b/src/crimson/osd/replicated_recovery_backend.h @@ -79,8 +79,7 @@ class ReplicatedRecoveryBackend : public RecoveryBackend { interruptible_future _handle_pull_response( pg_shard_t from, PushOp& push_op, - PullOp* response, - ceph::os::Transaction* t); + PullOp* response); void recalc_subsets( ObjectRecoveryInfo& recovery_info, crimson::osd::SnapSetContextRef ssc); @@ -93,12 +92,12 @@ class ReplicatedRecoveryBackend : public RecoveryBackend { bool first, bool complete, bool clear_omap, - interval_set&& data_zeros, - interval_set&& intervals_included, - ceph::bufferlist&& data_included, - ceph::bufferlist&& omap_header, + interval_set data_zeros, + interval_set intervals_included, + ceph::bufferlist data_included, + ceph::bufferlist omap_header, const std::map> &attrs, - std::map&& omap_entries, + std::map omap_entries, ceph::os::Transaction *t); void submit_push_complete( const ObjectRecoveryInfo &recovery_info, @@ -178,7 +177,7 @@ class ReplicatedRecoveryBackend : public RecoveryBackend { bool clear_omap, ObjectStore::Transaction* t, const std::map> &attrs, - bufferlist&& omap_header); + bufferlist omap_header); using interruptor = crimson::interruptible::interruptor< crimson::osd::IOInterruptCondition>; };