Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const std::string NORMAL_STRATEGY("normal");
const std::string ALL_STRATEGY("all");
const std::string HANDLER1("handler1");
const std::string HANDLER2("handler2");
const std::string strategy_info("info");

template <typename Entry> std::vector<std::string> make_names(const std::vector<Entry>& entries) {
std::vector<std::string> result;
Expand Down Expand Up @@ -148,9 +149,9 @@ TEST_F(FlushHistoryTest, empty_history) {
}

TEST_F(FlushHistoryTest, track_flushes) {
_flush_history.start_flush(HANDLER1, "a1", 3s, 5);
_flush_history.start_flush(HANDLER2, "a2", 1s, 6);
_flush_history.start_flush(HANDLER1, "a3", 4s, 7);
_flush_history.start_flush(HANDLER1, "a1", strategy_info, 3s, 5);
_flush_history.start_flush(HANDLER2, "a2", strategy_info, 1s, 6);
_flush_history.start_flush(HANDLER1, "a3", strategy_info, 4s, 7);
_flush_history.flush_done(6);
_flush_history.flush_done(5);
_flush_history.prune_done(6);
Expand All @@ -165,12 +166,12 @@ TEST_F(FlushHistoryTest, track_flushes) {
}

TEST_F(FlushHistoryTest, tracks_pending_flushes) {
_flush_history.add_pending_flush(HANDLER1, "a1", 3s);
_flush_history.add_pending_flush(HANDLER2, "a2", 1s);
_flush_history.add_pending_flush(HANDLER2, "a3", 4s);
_flush_history.add_pending_flush(HANDLER1, "a4", 7s);
_flush_history.start_flush(HANDLER1, "a1", 3s, 5);
_flush_history.start_flush(HANDLER2, "a2", 1s, 6);
_flush_history.add_pending_flush(HANDLER1, "a1", strategy_info, 3s);
_flush_history.add_pending_flush(HANDLER2, "a2", strategy_info, 1s);
_flush_history.add_pending_flush(HANDLER2, "a3", strategy_info, 4s);
_flush_history.add_pending_flush(HANDLER1, "a4", strategy_info, 7s);
_flush_history.start_flush(HANDLER1, "a1", strategy_info, 3s, 5);
_flush_history.start_flush(HANDLER2, "a2", strategy_info, 1s, 6);
_flush_history.flush_done(6);
_flush_history.prune_done(6);
auto view = _flush_history.make_view();
Expand All @@ -184,7 +185,7 @@ TEST_F(FlushHistoryTest, tracks_pending_flushes) {
}

TEST_F(FlushHistoryTest, pending_flushes_can_be_cleared) {
_flush_history.add_pending_flush(HANDLER1, "a1", 3s);
_flush_history.add_pending_flush(HANDLER1, "a1", strategy_info, 3s);
_flush_history.clear_pending_flushes();
auto view = _flush_history.make_view();
EXPECT_TRUE(view->pending().empty());
Expand All @@ -210,12 +211,12 @@ TEST_F(FlushHistoryTest, flush_strategy_can_be_changed) {
* { "all", id = 43, priority_strategy = true } started 2 flushes: handler2.a2 and handler1.a3
* { "normal", id = 44, priority_strategy = false } started no flushes
*/
_flush_history.start_flush(HANDLER1, "a1", 3s, 5);
_flush_history.start_flush(HANDLER1, "a1", strategy_info, 3s, 5);
_flush_history.set_strategy(ALL_STRATEGY, 43, true);
_flush_history.add_pending_flush(HANDLER2, "a2", 1s);
_flush_history.add_pending_flush(HANDLER1, "a3", 4s);
_flush_history.start_flush(HANDLER2, "a2", 1s, 6);
_flush_history.start_flush(HANDLER1, "a3", 4s, 7);
_flush_history.add_pending_flush(HANDLER2, "a2", strategy_info, 1s);
_flush_history.add_pending_flush(HANDLER1, "a3", strategy_info, 4s);
_flush_history.start_flush(HANDLER2, "a2", strategy_info, 1s, 6);
_flush_history.start_flush(HANDLER1, "a3", strategy_info, 4s, 7);
_flush_history.set_strategy(NORMAL_STRATEGY, 44, false);

/*
Expand Down
37 changes: 25 additions & 12 deletions searchcore/src/tests/proton/flushengine/flushengine_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ class SimpleStrategy : public IFlushStrategy {
enum class OrderBy { INDEX_OF, SERIAL };
std::vector<IFlushTarget::SP> _targets;
OrderBy _orderBy;
bool _priority_strategy;

struct CompareIndexOf {
CompareIndexOf(const SimpleStrategy& flush) : _flush(flush) {}
Expand All @@ -362,8 +363,8 @@ class SimpleStrategy : public IFlushStrategy {
const SimpleStrategy& _flush;
};

FlushContext::List getFlushTargets(const FlushContext::List& targetList, const flushengine::TlsStatsMap&,
const flushengine::ActiveFlushStats&) const override {
FlushStrategyResult getFlushTargets(const FlushContext::List& targetList, const flushengine::TlsStatsMap&,
const flushengine::ActiveFlushStats&) const override {
FlushContext::List fv(targetList);
if (_orderBy == OrderBy::INDEX_OF) {
std::sort(fv.begin(), fv.end(), CompareIndexOf(*this));
Expand All @@ -372,17 +373,28 @@ class SimpleStrategy : public IFlushStrategy {
return a->getTarget()->getFlushedSerialNum() < b->getTarget()->getFlushedSerialNum();
});
}
return fv;
return FlushStrategyResult(std::move(fv), name(), _id, _priority_strategy, order_name());
}

std::string name() const override { return "flush_simple"; }

std::string order_name() const noexcept {
switch (_orderBy) {
case OrderBy::INDEX_OF:
return "index_of";
case OrderBy::SERIAL:
default:
return "serial";
}
}

bool compare(const IFlushTarget::SP& lhs, const IFlushTarget::SP& rhs) const {
LOG(info, "SimpleStrategy::compare(%p, %p)", lhs.get(), rhs.get());
return indexOf(lhs) < indexOf(rhs);
}

SimpleStrategy(OrderBy orderBy) noexcept : _targets(), _orderBy(orderBy) {}
SimpleStrategy(OrderBy orderBy, bool priority_strategy) noexcept
: _targets(), _orderBy(orderBy), _priority_strategy(priority_strategy) {}

uint32_t indexOf(const IFlushTarget::SP& target) const {
IFlushTarget* raw = target.get();
Expand All @@ -407,10 +419,10 @@ class SimpleStrategy : public IFlushStrategy {

class NoFlushStrategy : public SimpleStrategy {
public:
NoFlushStrategy() noexcept : SimpleStrategy(OrderBy::INDEX_OF) {}
FlushContext::List getFlushTargets(const FlushContext::List&, const flushengine::TlsStatsMap&,
const flushengine::ActiveFlushStats&) const override {
return {};
NoFlushStrategy() noexcept : SimpleStrategy(OrderBy::INDEX_OF, false) {}
FlushStrategyResult getFlushTargets(const FlushContext::List&, const flushengine::TlsStatsMap&,
const flushengine::ActiveFlushStats&) const override {
return FlushStrategyResult({}, name(), _id, false, name());
}
std::string name() const override { return "flush_nothing"; }
};
Expand Down Expand Up @@ -446,7 +458,8 @@ struct Fixture {
engine(tlsStatsFactory, strategy, numThreads, idleInterval, std::numeric_limits<uint64_t>::max()) {}

Fixture(uint32_t numThreads, vespalib::duration idleInterval)
: Fixture(numThreads, idleInterval, std::make_shared<SimpleStrategy>(SimpleStrategy::OrderBy::INDEX_OF)) {}
: Fixture(numThreads, idleInterval,
std::make_shared<SimpleStrategy>(SimpleStrategy::OrderBy::INDEX_OF, false)) {}

~Fixture();

Expand Down Expand Up @@ -737,7 +750,7 @@ TEST(FlushEngineTest, require_that_concurrency_works) {
}

TEST(FlushEngineTest, require_that_there_is_room_for_one_and_only_one_high_pri_target) {
Fixture f(2, 1ms, std::make_unique<SimpleStrategy>(SimpleStrategy::OrderBy::SERIAL));
Fixture f(2, 1ms, std::make_unique<SimpleStrategy>(SimpleStrategy::OrderBy::SERIAL, false));
auto target1 = std::make_shared<SimpleTarget>("target1", 1, false);
auto target2 = std::make_shared<SimpleTarget>("target2", 2, false);
auto target3 = std::make_shared<HighPriorityTarget>("target3", 3, false);
Expand Down Expand Up @@ -770,7 +783,7 @@ TEST(FlushEngineTest, require_that_there_is_room_for_one_and_only_one_high_pri_t
}

TEST(FlushEngineTest, require_that_high_priority_does_not_jump_the_queue) {
Fixture f(2, 1ms, std::make_unique<SimpleStrategy>(SimpleStrategy::OrderBy::SERIAL));
Fixture f(2, 1ms, std::make_unique<SimpleStrategy>(SimpleStrategy::OrderBy::SERIAL, false));
auto target1 = std::make_shared<SimpleTarget>("target1", 1, false);
auto target2 = std::make_shared<SimpleTarget>("target2", 2, false);
auto target3 = std::make_shared<SimpleTarget>("target3", 3, false);
Expand Down Expand Up @@ -872,7 +885,7 @@ TEST(FlushEngineTest, require_that_oldest_serial_is_updated_when_finishing_prior
auto target1 = std::make_shared<SimpleTarget>("target1", 10, true);
auto handler = f.addSimpleHandler({target1});
f.assertOldestSerial(*handler, 10);
f.engine.set_strategy(std::make_shared<SimpleStrategy>(SimpleStrategy::OrderBy::INDEX_OF)).wait();
f.engine.set_strategy(std::make_shared<SimpleStrategy>(SimpleStrategy::OrderBy::INDEX_OF, true)).wait();
EXPECT_EQ(20u, handler->_oldestSerial);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <cassert>

using namespace proton;
using proton::flushengine::FlushStrategyResult;
using search::SerialNum;
using searchcorespi::IFlushTarget;

Expand Down Expand Up @@ -201,7 +202,7 @@ struct FlushStrategyFixture {
[[nodiscard]] FlushContext::List getFlushTargets(const FlushContext::List& targetList,
const flushengine::TlsStatsMap& tlsStatsMap) const {
flushengine::ActiveFlushStats active_flushes;
return strategy.getFlushTargets(targetList, tlsStatsMap, active_flushes);
return strategy.getFlushTargets(targetList, tlsStatsMap, active_flushes).list();
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ using DiskGain = IFlushTarget::DiskGain;

class MyFlushHandler : public IFlushHandler {
SerialNum _current_serial;

public:
MyFlushHandler(const std::string& name) noexcept : IFlushHandler(name), _current_serial(100) {}
~MyFlushHandler() override;
Expand Down Expand Up @@ -99,10 +100,10 @@ class ContextBuilder {
return flushengine::TlsStatsMap(std::move(map));
}
FlushContext::List flush_targets(const IFlushStrategy& strategy) const {
return strategy.getFlushTargets(list(), tlsStats(), _active_flushes);
return strategy.getFlushTargets(list(), tlsStats(), _active_flushes).list();
}
[[nodiscard]] StringList flush_target_names(const IFlushStrategy& strategy) const {
auto ctx_list = flush_targets(strategy);
auto ctx_list = flush_targets(strategy);
StringList target_names;
target_names.reserve(ctx_list.size());
for (auto& ctx : ctx_list) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ std::shared_ptr<FlushStrategyIdNotifier> make_flush_strategy_id_notifier() {
return notifier;
}

const std::string strategy_info("info");

} // namespace

class PrepareRestart2RpcHandlerTest : public ::testing::Test {
Expand Down Expand Up @@ -476,13 +478,13 @@ TEST_F(PrepareRestart2RpcHandlerTest, poll_sequence) {
* { "prepare_restart", id = 201, priority_strategy = true } started 2 flushes: handler2.a2 and handler1.a3
* and set 1 flush as pending: handler1.a4
*/
_history->start_flush(HANDLER1, "a1", 3s, 5);
_history->start_flush(HANDLER1, "a1", strategy_info, 3s, 5);
_history->set_strategy(PREPARE_RESTART_STRATEGY, 201, true);
_history->add_pending_flush(HANDLER2, "a2", 1s);
_history->add_pending_flush(HANDLER1, "a3", 4s);
_history->add_pending_flush(HANDLER1, "a4", 1s);
_history->start_flush(HANDLER2, "a2", 1s, 6);
_history->start_flush(HANDLER1, "a3", 4s, 7);
_history->add_pending_flush(HANDLER2, "a2", strategy_info, 1s);
_history->add_pending_flush(HANDLER1, "a3", strategy_info, 4s);
_history->add_pending_flush(HANDLER1, "a4", strategy_info, 1s);
_history->start_flush(HANDLER2, "a2", strategy_info, 1s, 6);
_history->start_flush(HANDLER1, "a3", strategy_info, 4s, 7);

auto future = test_handler(1, 0s);
future.wait();
Expand All @@ -502,7 +504,7 @@ TEST_F(PrepareRestart2RpcHandlerTest, poll_sequence) {
*/
_history->flush_done(6);
_history->prune_done(6);
_history->start_flush(HANDLER1, "a4", 1s, 8);
_history->start_flush(HANDLER1, "a4", strategy_info, 1s, 8);
_history->set_strategy(MEMORY_STRATEGY, 202, false);
_return_handler->alloc_req();
future = test_handler(1, 0s);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,12 @@ void EventLogger::transactionLogReplayComplete(const string& domainName, vespali
doTransactionLogReplayComplete(domainName, elapsedTime, "transactionlog.replay.complete");
}

void EventLogger::flushInit(const string& name) {
void EventLogger::flushInit(const string& name, const std::string& strategy_name, const std::string& strategy_info) {
JSONStringer jstr;
jstr.beginObject();
jstr.appendKey("name").appendString(name);
jstr.appendKey("strategy_name").appendString(strategy_name);
jstr.appendKey("strategy_info").appendString(strategy_info);
jstr.endObject();
EV_STATE("flush.init", jstr.str().c_str());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class EventLogger {
static void transactionLogReplayStart(const string& domainName, SerialNum first, SerialNum last);
static void transactionLogReplayProgress(const string& domainName, float progress, SerialNum first,
SerialNum last, SerialNum current);
static void flushInit(const string& name);
static void flushInit(const string& name, const std::string& strategy_name, const std::string& strategy_info);
static void flushStart(const string& name, int64_t beforeMemory, int64_t afterMemory, int64_t toFreeMemory,
SerialNum unflushed, SerialNum current);
static void flushComplete(const string& name, vespalib::duration elapsedTime, SerialNum flushed,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ vespa_add_library(searchcore_flushengine STATIC
flush_strategy_history_entry.cpp
flush_strategy_id_listener.cpp
flush_strategy_id_notifier.cpp
flush_strategy_result.cpp
flush_target_candidate.cpp
flush_target_candidates.cpp
flushtargetproxy.cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include <algorithm>

using proton::flushengine::FlushStrategyResult;
using search::SerialNum;
using searchcorespi::IFlushTarget;

Expand All @@ -24,21 +25,22 @@ bool CompareTarget::operator()(const FlushContext::SP& lfc, const FlushContext::
}

std::string strategy_name("flush_all");
std::string strategy_info("all");

} // namespace

FlushAllStrategy::FlushAllStrategy() : IFlushStrategy() {
}

FlushContext::List FlushAllStrategy::getFlushTargets(const FlushContext::List& targetList,
const flushengine::TlsStatsMap&,
const flushengine::ActiveFlushStats&) const {
FlushStrategyResult FlushAllStrategy::getFlushTargets(const FlushContext::List& targetList,
const flushengine::TlsStatsMap&,
const flushengine::ActiveFlushStats&) const {
if (targetList.empty()) {
return {};
return FlushStrategyResult({}, strategy_name, _id, true, strategy_info);
}
FlushContext::List fv(targetList);
std::sort(fv.begin(), fv.end(), CompareTarget());
return fv;
return FlushStrategyResult(std::move(fv), strategy_name, _id, true, strategy_info);
}

std::string FlushAllStrategy::name() const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ class FlushAllStrategy : public IFlushStrategy {
public:
FlushAllStrategy();

FlushContext::List getFlushTargets(const FlushContext::List& targetList, const flushengine::TlsStatsMap&,
const flushengine::ActiveFlushStats&) const override;
flushengine::FlushStrategyResult getFlushTargets(const FlushContext::List& targetList,
const flushengine::TlsStatsMap&,
const flushengine::ActiveFlushStats&) const override;
std::string name() const override;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ void FlushHistory::strategy_flush_done(uint32_t strategy_id, time_point now) {
}

void FlushHistory::start_flush(const std::string& handler_name, const std::string& target_name,
duration last_flush_duration, uint32_t id) {
const std::string& strategy_info, duration last_flush_duration, uint32_t id) {
// Note: this member function is called when queueing flush engine task, initFlush has already completed.
auto name = build_name(handler_name, target_name);
std::lock_guard guard(_mutex);
Expand All @@ -122,7 +122,8 @@ void FlushHistory::start_flush(const std::string& handler_name, const std::strin
} else {
auto now = steady_clock::now();
active_it = _active.emplace_hint(
active_it, id, FlushHistoryEntry(name, _active_strategy, now, last_flush_duration, ++_pending_id));
active_it, id,
FlushHistoryEntry(name, _active_strategy, strategy_info, now, last_flush_duration, ++_pending_id));
}
_active_strategy.start_flush();
auto now = steady_clock::now();
Expand Down Expand Up @@ -152,17 +153,19 @@ void FlushHistory::prune_done(uint32_t id) {
}

void FlushHistory::add_pending_flush(const std::string& handler_name, const std::string& target_name,
duration last_flush_duration) {
const std::string& strategy_info, duration last_flush_duration) {
// Called when priority flush strategy is used.
auto name = build_name(handler_name, target_name);
std::lock_guard guard(_mutex);
auto pending_it = _pending.lower_bound(name);
auto now = steady_clock::now();
if (pending_it != _pending.end() && pending_it->first == name) {
pending_it->second = FlushHistoryEntry(name, _active_strategy, now, last_flush_duration, ++_pending_id);
pending_it->second =
FlushHistoryEntry(name, _active_strategy, strategy_info, now, last_flush_duration, ++_pending_id);
} else {
_pending.emplace_hint(pending_it, name,
FlushHistoryEntry(name, _active_strategy, now, last_flush_duration, ++_pending_id));
_pending.emplace_hint(
pending_it, name,
FlushHistoryEntry(name, _active_strategy, strategy_info, now, last_flush_duration, ++_pending_id));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ class FlushHistory {
~FlushHistory();
FlushHistory& operator=(const FlushHistory&) = delete;
FlushHistory& operator=(FlushHistory&&) = delete;
void start_flush(const std::string& handler_name, const std::string& target_name, duration last_flush_duration,
uint32_t id);
void start_flush(const std::string& handler_name, const std::string& target_name,
const std::string& strategy_info, duration last_flush_duration, uint32_t id);
void flush_done(uint32_t id);
void prune_done(uint32_t id);
void add_pending_flush(const std::string& handler_name, const std::string& target_name,
duration last_flush_duration);
const std::string& strategy_info, duration last_flush_duration);
void drop_pending_flush(const std::string& handler_name, const std::string& target_name);
void clear_pending_flushes();
void set_strategy(std::string stategy, uint32_t strategy_id, bool priority_strategy);
Expand Down
Loading