Skip to content

Commit a723f07

Browse files
committed
clean slate implementation
1 parent 36dd4d2 commit a723f07

33 files changed

+976
-30
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@ cmake-build-debug
44
cmake-build-release
55
.idea
66
result
7+
Makefile

examples/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@ add_subdirectory(count)
77
add_subdirectory(ports)
88
add_subdirectory(hello)
99
add_subdirectory(power_train)
10+
add_subdirectory(multiport_mutation)
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
add_executable(mutation_multiports main.cc)
2+
target_link_libraries(mutation_multiports reactor-cpp)
3+
add_dependencies(examples mutation_multiports)
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
#ifndef CONSUMER_HH // NOLINT
2+
#define CONSUMER_HH // NOLINT
3+
4+
#include <reactor-cpp/reactor-cpp.hh>
5+
#include <reactor-cpp/scopes.hh>
6+
7+
using namespace reactor;
8+
using namespace std::chrono_literals;
9+
10+
class Consumer final : public Reactor { // NOLINT
11+
class Inner : public Scope {
12+
Inner(Reactor* reactor, std::size_t index)
13+
: Scope(reactor)
14+
, index_(index) {}
15+
std::size_t index_ = 0;
16+
17+
void reaction_1(const Input<unsigned>& in) const {
18+
std::cout << "consumer: " << index_ << " received value:" << *in.get() << '\n';
19+
}
20+
21+
friend Consumer;
22+
};
23+
24+
Inner _lf_inner;
25+
Reaction handle{"handle", 1, this, [this]() { _lf_inner.reaction_1(this->in); }};
26+
27+
public:
28+
Consumer(const std::string& name, Environment* env, std::size_t index)
29+
: Reactor(name, env)
30+
, _lf_inner(this, index) {
31+
std::cout << "creating instance of consumer" << '\n';
32+
}
33+
~Consumer() override { std::cout << "Consumer Object is deleted" << '\n'; };
34+
35+
Input<unsigned> in{"in", this}; // NOLINT
36+
37+
void assemble() override {
38+
handle.declare_trigger(&in);
39+
}
40+
};
41+
42+
#endif // CONSUMER_HH
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
#ifndef LOAD_BALANCER_HH // NOLINT
2+
#define LOAD_BALANCER_HH // NOLINT
3+
4+
#include <reactor-cpp/reactor-cpp.hh>
5+
6+
#include "reactor-cpp/mutations/multiport.hh"
7+
8+
using namespace reactor;
9+
using namespace std::chrono_literals;
10+
11+
class LoadBalancer final : public Reactor { // NOLINT
12+
class Inner : public MutableScope {
13+
explicit Inner(Reactor* reactor)
14+
: MutableScope(reactor) {}
15+
16+
// reaction bodies
17+
static void reaction_1(const Input<unsigned>& inbound, Output<unsigned>& scale_bank,
18+
Multiport<Output<unsigned>>& outbound) {
19+
if (std::rand() % 15 == 0) { // NOLINT
20+
scale_bank.set(std::rand() % 20 + 1); // NOLINT
21+
}
22+
const unsigned sel = std::rand() % outbound.size(); // NOLINT
23+
std::cout << "Sending out to:" << sel << '\n';
24+
outbound[sel].set(inbound.get());
25+
outbound[std::min(4ul, outbound.size() - 1)].set(inbound.get());
26+
}
27+
28+
29+
friend LoadBalancer;
30+
};
31+
32+
Inner _lf_inner;
33+
Reaction process{"process", 2, this, [this]() { Inner::reaction_1(this->inbound, this->scale_bank, this->out); }};
34+
35+
public:
36+
LoadBalancer(const std::string& name, Environment* env)
37+
: Reactor(name, env)
38+
, _lf_inner(this) {
39+
out.reserve(4);
40+
std::cout << "creating instance of load balancer" << '\n';
41+
for (size_t _lf_idx = 0; _lf_idx < 4; _lf_idx++) {
42+
out.create_new_port();
43+
}
44+
}
45+
~LoadBalancer() override = default;
46+
47+
ModifableMultiport<Output<unsigned>> out{"out", this}; // NOLINT
48+
std::size_t out_size_ = 0;
49+
Input<unsigned> inbound{"inbound", this}; // NOLINT
50+
Output<unsigned> scale_bank{"scale_bank", this}; // NOLINT
51+
52+
void assemble() override {
53+
std::cout << "assemble LoadBalancer\n";
54+
for (auto& _lf_port : out) {
55+
process.declare_antidependency(&_lf_port);
56+
}
57+
process.declare_trigger(&inbound);
58+
}
59+
};
60+
61+
#endif // LOAD_BALANCER_HH

examples/multiport_mutation/main.cc

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
#include <iostream>
2+
#include <memory>
3+
4+
#include <reactor-cpp/mutations/bank.hh>
5+
#include <reactor-cpp/mutations/connection.hh>
6+
7+
#include "./multiport_to_bank.hh"
8+
#include "./consumer.hh"
9+
#include "./load_balancer.hh"
10+
#include "./producer.hh"
11+
#include <reactor-cpp/reactor-cpp.hh>
12+
13+
class Deployment final : public Reactor { // NOLINT
14+
15+
std::unique_ptr<Producer> producer_;
16+
std::unique_ptr<LoadBalancer> load_balancer_;
17+
std::vector<std::unique_ptr<Consumer>> consumers_;
18+
19+
Reaction scale_bank{"scale_bank", 1, this,
20+
[this]() { this->_inner.reaction_1(this->scale, this->consumers_, load_balancer_->out); }};
21+
22+
23+
class Inner : public MutableScope {
24+
int state = 0;
25+
26+
public:
27+
Inner(Reactor* reactor)
28+
: MutableScope(reactor) {}
29+
void reaction_1(const Input<unsigned>& scale, std::vector<std::unique_ptr<Consumer>>& reactor_bank,
30+
ModifableMultiport<Output<unsigned>>& load_balancer) {
31+
std::size_t new_size = *scale.get();
32+
std::size_t old_size = reactor_bank.size();
33+
std::function lambda = [](Environment* env, std::size_t index) {
34+
std::string _lf_inst_name = "consumer_" + std::to_string(index);
35+
return std::make_unique<Consumer>(_lf_inst_name, env, index);
36+
};
37+
38+
std::function get_input_port = [](const std::unique_ptr<Consumer>& consumer) {
39+
return &consumer->in;
40+
};
41+
auto rescale = std::make_shared<ResizeMultiportToBank<unsigned, Consumer>>(
42+
&load_balancer,
43+
&reactor_bank,
44+
get_input_port,
45+
lambda,
46+
new_size);
47+
48+
add_to_transaction(rescale);
49+
50+
commit_transaction(true);
51+
}
52+
53+
friend LoadBalancer;
54+
};
55+
56+
Inner _inner;
57+
58+
public:
59+
Deployment(const std::string& name, Environment* env)
60+
: Reactor(name, env)
61+
, _inner(this)
62+
, producer_(std::make_unique<Producer>("producer", environment()))
63+
, load_balancer_(std::make_unique<LoadBalancer>("load_balancer", environment())) {
64+
std::cout << "creating instance of deployment" << '\n';
65+
consumers_.reserve(4);
66+
for (size_t _lf_idx = 0; _lf_idx < 4; _lf_idx++) {
67+
std::string _lf_inst_name = "consumer_" + std::to_string(_lf_idx);
68+
consumers_.push_back(std::make_unique<Consumer>(_lf_inst_name, environment(), _lf_idx));
69+
}
70+
}
71+
~Deployment() override = default;
72+
73+
Input<unsigned> scale{"scale", this}; // NOLINT
74+
75+
void assemble() override {
76+
for (size_t _lf_idx = 0; _lf_idx < 4; _lf_idx++) {
77+
environment()->draw_connection(load_balancer_->out[_lf_idx], consumers_[_lf_idx]->in, ConnectionProperties{});
78+
environment()->draw_connection(producer_->value, load_balancer_->inbound, ConnectionProperties{});
79+
}
80+
environment()->draw_connection(load_balancer_->scale_bank, scale, ConnectionProperties{});
81+
scale_bank.declare_trigger(&this->scale);
82+
}
83+
};
84+
85+
auto main() -> int {
86+
Environment env{4, true};
87+
auto deployment = std::make_unique<Deployment>("c1", &env);
88+
env.optimize();
89+
env.assemble();
90+
auto thread = env.startup();
91+
thread.join();
92+
return 0;
93+
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
//
2+
// Created by tanneberger on 1/13/25.
3+
//
4+
5+
#ifndef MULTIPORT_TO_BANK_HH
6+
#define MULTIPORT_TO_BANK_HH
7+
8+
#include <reactor-cpp/mutations.hh>
9+
#include <reactor-cpp/multiport.hh>
10+
#include <reactor-cpp/port.hh>
11+
#include <reactor-cpp/mutations/multiport.hh>
12+
#include <reactor-cpp/mutations/bank.hh>
13+
#include <reactor-cpp/mutations/connection.hh>
14+
#include <reactor-cpp/reactor.hh>
15+
16+
#include "../../lib/mutations/bank.cc"
17+
#include "../../lib/mutations/connection.cc"
18+
#include "../../lib/mutations/multiport.cc"
19+
20+
#include <functional>
21+
22+
namespace reactor {
23+
24+
template<class PortType, class ReactorType>
25+
class ResizeMultiportToBank : public Mutation {
26+
ModifableMultiport<Output<PortType>>* multiport_;
27+
std::vector<std::unique_ptr<ReactorType>>* bank_;
28+
std::function<Input<PortType>*(const std::unique_ptr<ReactorType>&)> get_input_port_;
29+
std::function<std::unique_ptr<ReactorType>(Environment* env, std::size_t index)> create_lambda_;
30+
std::size_t new_size_ = 0;
31+
public:
32+
ResizeMultiportToBank(ModifableMultiport<Output<PortType>>* multiport,
33+
std::vector<std::unique_ptr<ReactorType>>* bank,
34+
std::function<Input<PortType>*(const std::unique_ptr<ReactorType>&)> get_input_port,
35+
std::function<std::unique_ptr<ReactorType>(Environment* env, std::size_t index)> create_lambda,
36+
std::size_t new_size) :
37+
multiport_(multiport), bank_(bank), get_input_port_(get_input_port), create_lambda_(create_lambda), new_size_(new_size) {}
38+
39+
~ResizeMultiportToBank() = default;
40+
auto run() -> MutationResult {
41+
if (multiport_->size() != bank_->size()) {
42+
return NotMatchingBankSize;
43+
}
44+
auto old_size = multiport_->size();
45+
46+
if (new_size_ > old_size) {
47+
// TODO: this is an assumption
48+
auto change_multiport_size =
49+
std::make_shared<MutationChangeOutputMultiportSize<unsigned>>(multiport_, new_size_);
50+
51+
change_multiport_size->run();
52+
53+
auto change_bank_size = std::make_shared<MutationChangeBankSize<std::unique_ptr<ReactorType>>>(
54+
bank_, (*bank_)[0]->environment(), new_size_, create_lambda_);
55+
56+
change_bank_size->run();
57+
58+
for (auto i = old_size; i < new_size_; i++) {
59+
auto add_conn = std::make_shared<MutationAddConnection<Output<PortType>, Input<PortType>>>(
60+
&(*multiport_)[i], get_input_port_((*bank_)[i]), (*bank_)[0]->environment(), true);
61+
62+
add_conn->run();
63+
}
64+
} else if (new_size_ < old_size) {
65+
for (auto i = old_size - 1; i >= new_size_; i--) {
66+
auto add_conn = std::make_shared<MutationAddConnection<Output<PortType>, Input<PortType>>>(
67+
&(*multiport_)[i], get_input_port_((*bank_)[i]), (*bank_)[0]->environment(), false);
68+
69+
add_conn->run();
70+
}
71+
72+
auto change_multiport_size =
73+
std::make_shared<MutationChangeOutputMultiportSize<unsigned>>(multiport_, new_size_);
74+
75+
change_multiport_size->run();
76+
77+
auto change_bank_size = std::make_shared<MutationChangeBankSize<std::unique_ptr<ReactorType>>>(
78+
bank_, (*bank_)[0]->environment(), new_size_, create_lambda_);
79+
80+
change_bank_size->run();
81+
}
82+
83+
84+
return Success;
85+
}
86+
87+
auto rollback() -> MutationResult {
88+
return Success;
89+
}
90+
};
91+
}
92+
93+
94+
95+
#endif //MULTIPORT_TO_BANK_HH
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
#ifndef PRODUCER_HH // NOLINT
2+
#define PRODUCER_HH // NOLINT
3+
4+
#include <reactor-cpp/reactor-cpp.hh>
5+
6+
using namespace reactor;
7+
using namespace std::chrono_literals;
8+
9+
class Producer final : public Reactor { // NOLINT
10+
private:
11+
Timer timer{"timer", this, 1s, 1s};
12+
Reaction r_timer{"r_timer", 1, this, [this]() { _lf_inner.reaction_1(this->value); }};
13+
14+
class Inner : public Scope {
15+
unsigned int counter_ = 0;
16+
17+
void reaction_1([[maybe_unused]] Output<unsigned>& out) {
18+
std::cout << "producing value:" << counter_ << "\n";
19+
out.set(counter_++);
20+
}
21+
22+
explicit Inner(Reactor* reactor)
23+
: Scope(reactor) {}
24+
25+
friend Producer;
26+
};
27+
28+
Inner _lf_inner;
29+
30+
public:
31+
Producer(const std::string& name, Environment* env)
32+
: Reactor(name, env)
33+
, _lf_inner(this) {
34+
std::cout << "creating instance of producer\n";
35+
}
36+
Producer() = delete;
37+
~Producer() override = default;
38+
39+
Output<unsigned> value{"value", this}; // NOLINT
40+
41+
void assemble() override {
42+
r_timer.declare_trigger(&timer);
43+
r_timer.declare_antidependency(&value);
44+
}
45+
};
46+
47+
#endif // PRODUCER_HH

0 commit comments

Comments
 (0)