Skip to content

Commit

Permalink
Merge pull request ceph#61561 from athanatos/sjust/wip-crimson-recove…
Browse files Browse the repository at this point in the history
…ry-69412

crimson: take obc lock during push commit on primary

Reviewed-by: Matan Breizman <[email protected]>
  • Loading branch information
athanatos authored Jan 31, 2025
2 parents afcfc4a + 6093f91 commit 44b51db
Show file tree
Hide file tree
Showing 5 changed files with 363 additions and 335 deletions.
10 changes: 10 additions & 0 deletions src/crimson/osd/object_context_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ class ObjectContextLoader {
return *this;
}

void lock_excl_sync() {
target_state.lock_excl_sync();
}

ObjectContextRef &get_obc() {
ceph_assert(!target_state.is_empty());
ceph_assert(target_state.obc->is_loaded());
Expand Down Expand Up @@ -219,6 +223,12 @@ class ObjectContextLoader {
return ret;
}

Manager get_obc_manager(ObjectContextRef obc) {
Manager ret = get_obc_manager(obc->obs.oi.soid, false);
ret.set_state_obc(ret.target_state, obc);
return ret;
}

Manager get_obc_manager(
Orderer &orderer, const hobject_t &oid, bool resolve_clone = true) {
Manager ret = get_obc_manager(oid, resolve_clone);
Expand Down
71 changes: 39 additions & 32 deletions src/crimson/osd/recovery_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "crimson/common/coroutine.h"
#include "crimson/common/exception.h"
#include "crimson/common/log.h"
#include "crimson/osd/recovery_backend.h"
#include "crimson/osd/pg.h"
#include "crimson/osd/pg_backend.h"
Expand All @@ -13,23 +14,20 @@
#include "messages/MOSDFastDispatchOp.h"
#include "osd/osd_types.h"

namespace {
seastar::logger& logger() {
return crimson::get_logger(ceph_subsys_osd);
}
}
SET_SUBSYS(osd);

hobject_t RecoveryBackend::get_temp_recovery_object(
const hobject_t& target,
eversion_t version) const
{
LOG_PREFIX(RecoveryBackend::get_temp_recovery_object);
hobject_t hoid =
target.make_temp_hobject(fmt::format("temp_recovering_{}_{}_{}_{}",
pg.get_info().pgid,
version,
pg.get_info().history.same_interval_since,
target.snap));
logger().debug("{} {}", __func__, hoid);
DEBUGDPP("{}", pg, hoid);
return hoid;
}

Expand Down Expand Up @@ -123,7 +121,8 @@ void RecoveryBackend::handle_backfill_finish(
MOSDPGBackfill& m,
crimson::net::ConnectionXcoreRef conn)
{
logger().debug("{}", __func__);
LOG_PREFIX(RecoveryBackend::handle_backfill_finish);
DEBUGDPP("", pg);
ceph_assert(!pg.is_primary());
ceph_assert(crimson::common::local_conf()->osd_kill_backfill_at != 1);
auto reply = crimson::make_message<MOSDPGBackfill>(
Expand All @@ -146,7 +145,8 @@ RecoveryBackend::interruptible_future<>
RecoveryBackend::handle_backfill_progress(
MOSDPGBackfill& m)
{
logger().debug("{}", __func__);
LOG_PREFIX(RecoveryBackend::handle_backfill_progress);
DEBUGDPP("", pg);
ceph_assert(!pg.is_primary());
ceph_assert(crimson::common::local_conf()->osd_kill_backfill_at != 2);

Expand All @@ -156,7 +156,7 @@ RecoveryBackend::handle_backfill_progress(
m.stats,
m.op == MOSDPGBackfill::OP_BACKFILL_PROGRESS,
t);
logger().debug("RecoveryBackend::handle_backfill_progress: do_transaction...");
DEBUGDPP("submitting transaction", pg);
return shard_services.get_store().do_transaction(
pg.get_collection_ref(), std::move(t)).or_terminate();
}
Expand All @@ -165,7 +165,8 @@ RecoveryBackend::interruptible_future<>
RecoveryBackend::handle_backfill_finish_ack(
MOSDPGBackfill& m)
{
logger().debug("{}", __func__);
LOG_PREFIX(RecoveryBackend::handle_backfill_finish_ack);
DEBUGDPP("", pg);
ceph_assert(pg.is_primary());
ceph_assert(crimson::common::local_conf()->osd_kill_backfill_at != 3);
auto recovery_handler = pg.get_recovery_handler();
Expand All @@ -178,9 +179,10 @@ RecoveryBackend::handle_backfill(
MOSDPGBackfill& m,
crimson::net::ConnectionXcoreRef conn)
{
logger().debug("{}", __func__);
LOG_PREFIX(RecoveryBackend::handle_backfill);
DEBUGDPP("", pg);
if (pg.old_peering_msg(m.map_epoch, m.query_epoch)) {
logger().debug("{}: discarding {}", __func__, m);
DEBUGDPP("discarding {}", pg, m);
return seastar::now();
}
switch (m.op) {
Expand All @@ -201,7 +203,8 @@ RecoveryBackend::interruptible_future<>
RecoveryBackend::handle_backfill_remove(
MOSDPGBackfillRemove& m)
{
logger().debug("{} m.ls={}", __func__, m.ls);
LOG_PREFIX(RecoveryBackend::handle_backfill_remove);
DEBUGDPP("m.ls={}", pg, m.ls);
assert(m.get_type() == MSG_OSD_PG_BACKFILL_REMOVE);

ObjectStore::Transaction t;
Expand All @@ -211,7 +214,7 @@ RecoveryBackend::handle_backfill_remove(
pg.remove_maybe_snapmapped_object(t, soid);
});
}
logger().debug("RecoveryBackend::handle_backfill_remove: do_transaction...");
DEBUGDPP("submitting transaction", pg);
co_await interruptor::make_interruptible(
shard_services.get_store().do_transaction(
pg.get_collection_ref(), std::move(t)).or_terminate());
Expand All @@ -223,25 +226,26 @@ RecoveryBackend::scan_for_backfill(
[[maybe_unused]] const std::int64_t min,
const std::int64_t max)
{
logger().debug("{} starting from {}", __func__, start);
LOG_PREFIX(RecoveryBackend::scan_for_backfill);
DEBUGDPP("starting from {}", pg, start);
auto version_map = seastar::make_lw_shared<std::map<hobject_t, eversion_t>>();
return backend->list_objects(start, max).then_interruptible(
[this, start, version_map] (auto&& ret) {
[FNAME, this, start, version_map] (auto&& ret) {
auto&& [objects, next] = std::move(ret);
return seastar::do_with(
std::move(objects),
[this, version_map](auto &objects) {
[FNAME, this, version_map](auto &objects) {
return interruptor::parallel_for_each(objects,
[this, version_map] (const hobject_t& object)
[FNAME, this, version_map] (const hobject_t& object)
-> interruptible_future<> {
crimson::osd::ObjectContextRef obc;
if (pg.is_primary()) {
obc = pg.obc_registry.maybe_get_cached_obc(object);
}
if (obc) {
if (obc->obs.exists) {
logger().debug("scan_for_backfill found (primary): {} {}",
object, obc->obs.oi.version);
DEBUGDPP("found (primary): {} {}",
pg, object, obc->obs.oi.version);
version_map->emplace(object, obc->obs.oi.version);
} else {
// if the object does not exist here, it must have been removed
Expand All @@ -251,25 +255,25 @@ RecoveryBackend::scan_for_backfill(
return seastar::now();
} else {
return backend->load_metadata(object).safe_then_interruptible(
[version_map, object] (auto md) {
[FNAME, this, version_map, object] (auto md) {
if (md->os.exists) {
logger().debug("scan_for_backfill found: {} {}",
object, md->os.oi.version);
DEBUGDPP("found: {} {}", pg,
object, md->os.oi.version);
version_map->emplace(object, md->os.oi.version);
}
return seastar::now();
}, PGBackend::load_metadata_ertr::assert_all{});
}
});
}).then_interruptible([version_map, start=std::move(start), next=std::move(next), this] {
}).then_interruptible([FNAME, this, version_map, start=std::move(start), next=std::move(next)] {
BackfillInterval bi;
bi.begin = std::move(start);
bi.end = std::move(next);
bi.version = pg.get_info().last_update;
bi.objects = std::move(*version_map);
logger().debug("{} BackfillInterval filled, leaving, {}",
"scan_for_backfill",
bi);
DEBUGDPP("{} BackfillInterval filled, leaving, {}",
"scan_for_backfill",
pg, bi);
return seastar::make_ready_future<BackfillInterval>(std::move(bi));
});
});
Expand All @@ -280,7 +284,8 @@ RecoveryBackend::handle_scan_get_digest(
MOSDPGScan& m,
crimson::net::ConnectionXcoreRef conn)
{
logger().debug("{}", __func__);
LOG_PREFIX(RecoveryBackend::handle_scan_get_digest);
DEBUGDPP("", pg);
if (false /* FIXME: check for backfill too full */) {
std::ignore = shard_services.start_operation<crimson::osd::LocalPeeringEvent>(
// TODO: abstract start_background_recovery
Expand Down Expand Up @@ -316,7 +321,8 @@ RecoveryBackend::interruptible_future<>
RecoveryBackend::handle_scan_digest(
MOSDPGScan& m)
{
logger().debug("{}", __func__);
LOG_PREFIX(RecoveryBackend::handle_scan_digest);
DEBUGDPP("", pg);
// Check that from is in backfill_targets vector
ceph_assert(pg.is_backfill_target(m.from));

Expand All @@ -342,9 +348,10 @@ RecoveryBackend::handle_scan(
MOSDPGScan& m,
crimson::net::ConnectionXcoreRef conn)
{
logger().debug("{}", __func__);
LOG_PREFIX(RecoveryBackend::handle_scan);
DEBUGDPP("", pg);
if (pg.old_peering_msg(m.map_epoch, m.query_epoch)) {
logger().debug("{}: discarding {}", __func__, m);
DEBUGDPP("discarding {}", pg, m);
return seastar::now();
}
switch (m.op) {
Expand All @@ -360,7 +367,7 @@ RecoveryBackend::handle_scan(
}

RecoveryBackend::interruptible_future<>
RecoveryBackend::handle_recovery_op(
RecoveryBackend::handle_backfill_op(
Ref<MOSDFastDispatchOp> m,
crimson::net::ConnectionXcoreRef conn)
{
Expand Down
6 changes: 5 additions & 1 deletion src/crimson/osd/recovery_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class RecoveryBackend {

virtual interruptible_future<> handle_recovery_op(
Ref<MOSDFastDispatchOp> m,
crimson::net::ConnectionXcoreRef conn);
crimson::net::ConnectionXcoreRef conn) = 0;

virtual interruptible_future<> recover_object(
const hobject_t& soid,
Expand Down Expand Up @@ -268,6 +268,10 @@ class RecoveryBackend {

void clean_up(ceph::os::Transaction& t, interrupt_cause_t why);
virtual seastar::future<> on_stop() = 0;

virtual interruptible_future<> handle_backfill_op(
Ref<MOSDFastDispatchOp> m,
crimson::net::ConnectionXcoreRef conn);
private:
void handle_backfill_finish(
MOSDPGBackfill& m,
Expand Down
Loading

0 comments on commit 44b51db

Please sign in to comment.