Skip to content

Commit 2d32542

Browse files
committed
Back on track. Fix locking/database sync issues. Remove unneeded class variables. More sensible sqlite usage.
1 parent f5fc5e6 commit 2d32542

13 files changed

+262
-191
lines changed

jitter.cpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@
55

66
namespace tsp {
77
Jitter::Jitter(std::chrono::milliseconds limit)
8-
: rng(std::mt19937(std::random_device{}())),
9-
dist(std::uniform_int_distribution<int64_t>(-abs(limit).count(),
8+
: rng_(std::mt19937(std::random_device{}())),
9+
dist_(std::uniform_int_distribution<int64_t>(-abs(limit).count(),
1010
abs(limit).count())) {}
1111
std::chrono::milliseconds Jitter::get() {
12-
return std::chrono::milliseconds(dist(rng));
12+
return std::chrono::milliseconds(dist_(rng_));
1313
}
1414
} // namespace tsp

jitter.hpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ class Jitter {
1111
std::chrono::milliseconds get();
1212

1313
private:
14-
std::mt19937 rng;
15-
std::uniform_int_distribution<int64_t> dist;
14+
std::mt19937 rng_;
15+
std::uniform_int_distribution<int64_t> dist_;
1616
};
1717
} // namespace tsp

locker.cpp

+5-1
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,20 @@
1212
#include "functions.hpp"
1313

1414
namespace tsp {
15+
16+
std::array<sighandler_t, NSIG> prev_sigs;
17+
1518
int lockfile_fd = -1;
1619
void handle_signal(int sig) {
1720
if (lockfile_fd != -1) {
1821
flock(lockfile_fd, LOCK_UN);
1922
close(lockfile_fd);
2023
}
24+
prev_sigs[sig](sig);
2125
}
2226

2327
Locker::Locker() {
24-
lockfile_fd = open(lock_file_path.c_str(), O_WRONLY | O_CREAT, 0600);
28+
lockfile_fd = open(lock_file_path_.c_str(), O_RDONLY | O_CREAT, 0600);
2529
if (lockfile_fd == -1) {
2630
die_with_err_errno("Unable to open lockfile", lockfile_fd);
2731
}

locker.hpp

+11-17
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,18 @@
11
#pragma once
22

33
#include <string>
4-
#include <array>
5-
#include <unistd.h>
6-
#include <signal.h>
74

85
#include "functions.hpp"
96

10-
namespace tsp
11-
{
12-
class Locker
13-
{
14-
public:
15-
Locker();
16-
~Locker();
17-
void lock();
18-
void unlock();
7+
namespace tsp {
8+
class Locker {
9+
public:
10+
Locker();
11+
~Locker();
12+
void lock();
13+
void unlock();
1914

20-
private:
21-
const std::string lock_file_path{get_tmp() / ".affinity_lock_file.lock"};
22-
std::array<sighandler_t, NSIG> prev_sigs;
23-
};
24-
}
15+
private:
16+
const std::string lock_file_path_{get_tmp() / ".affinity_lock_file.lock"};
17+
};
18+
} // namespace tsp

output_manager.cpp

+20-20
Original file line numberDiff line numberDiff line change
@@ -16,49 +16,49 @@
1616
namespace tsp {
1717
Output_handler::Output_handler(bool disappear, bool separate_stderr,
1818
std::string jobid, bool rw)
19-
: stdout_fn{get_tmp() / (out_file_template + jobid)},
20-
stderr_fn{get_tmp() / (err_file_template + jobid)}, disappear_{disappear},
19+
: stdout_fn_{get_tmp() / (out_file_template + jobid)},
20+
stderr_fn_{get_tmp() / (err_file_template + jobid)}, disappear_{disappear},
2121
separate_stderr_{separate_stderr} {}
2222
void Output_handler::init_pipes() {
2323
if (disappear_) {
24-
stdout_fd = open("/dev/null", O_WRONLY | O_CREAT, 0666);
25-
stderr_fd = open("/dev/null", O_WRONLY | O_CREAT, 0666);
24+
stdout_fd_ = open("/dev/null", O_WRONLY | O_CREAT, 0666);
25+
stderr_fd_ = open("/dev/null", O_WRONLY | O_CREAT, 0666);
2626
} else {
27-
stdout_fd = open(stdout_fn.c_str(), O_WRONLY | O_CREAT, 0600);
27+
stdout_fd_ = open(stdout_fn_.c_str(), O_WRONLY | O_CREAT, 0600);
2828

2929
if (separate_stderr_) {
30-
stderr_fd = open(stderr_fn.c_str(), O_WRONLY | O_CREAT, 0600);
30+
stderr_fd_ = open(stderr_fn_.c_str(), O_WRONLY | O_CREAT, 0600);
3131
} else {
32-
stderr_fd = stdout_fd;
32+
stderr_fd_ = stdout_fd_;
3333
}
3434
}
35-
dup2(stdout_fd, 1);
36-
dup2(stderr_fd, 2);
35+
dup2(stdout_fd_, 1);
36+
dup2(stderr_fd_, 2);
3737
}
3838

3939
std::pair<std::string, std::string> Output_handler::get_output() {
40-
std::ifstream stdout_stream(stdout_fn);
40+
std::ifstream stdout_stream(stdout_fn_);
4141
std::stringstream ss_out{};
4242
ss_out << stdout_stream.rdbuf();
4343
stdout_stream.close();
44-
std::filesystem::remove(stdout_fn);
45-
out_bufs.first = ss_out.str();
44+
//std::filesystem::remove(stdout_fn);
45+
out_bufs_.first = ss_out.str();
4646

47-
std::ifstream stderr_stream(stderr_fn);
47+
std::ifstream stderr_stream(stderr_fn_);
4848
std::stringstream ss_err{};
4949
ss_err << stdout_stream.rdbuf();
5050
stderr_stream.close();
51-
std::filesystem::remove(stderr_fn);
52-
out_bufs.second = ss_err.str();
51+
std::filesystem::remove(stderr_fn_);
52+
out_bufs_.second = ss_err.str();
5353

54-
return out_bufs;
54+
return out_bufs_;
5555
}
5656
Output_handler::~Output_handler() {
57-
if (stdout_fd != -1) {
58-
close(stdout_fd);
57+
if (stdout_fd_ != -1) {
58+
close(stdout_fd_);
5959
}
60-
if (stderr_fd != -1) {
61-
close(stderr_fd);
60+
if (stderr_fd_ != -1) {
61+
close(stderr_fd_);
6262
}
6363
}
6464
} // namespace tsp

output_manager.hpp

+5-5
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,19 @@ constexpr std::string err_file_template{"tsp.e"};
77
namespace tsp {
88
class Output_handler {
99
public:
10-
int stdout_fd;
11-
int stderr_fd;
1210
Output_handler(bool disappear, bool separate_stderr, std::string jobid,
1311
bool rw);
1412
void init_pipes();
1513
std::pair<std::string, std::string> get_output();
1614
~Output_handler();
1715

1816
private:
19-
std::string stdout_fn{};
20-
std::string stderr_fn{};
17+
std::string stdout_fn_{};
18+
std::string stderr_fn_{};
19+
int stdout_fd_;
20+
int stderr_fd_;
2121
bool disappear_;
2222
bool separate_stderr_;
23-
std::pair<std::string, std::string> out_bufs{};
23+
std::pair<std::string, std::string> out_bufs_{};
2424
};
2525
} // namespace tsp

proc_affinity.cpp

+20-29
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,21 @@
55
#include <numeric>
66

77
#include "functions.hpp"
8+
#include "status_manager.hpp"
89

910
namespace tsp {
1011

11-
Proc_affinity::Proc_affinity(uint32_t nslots, pid_t pid)
12-
: nslots(nslots), pid(pid),
13-
my_path(std::filesystem::read_symlink("/proc/self/exe")),
14-
cpuset_from_cgroup(get_cgroup()) {
12+
Proc_affinity::Proc_affinity(Status_Manager &sm, uint32_t nslots, pid_t pid)
13+
: sm_(sm), nslots_(nslots), pid_(pid),
14+
my_path_(std::filesystem::read_symlink("/proc/self/exe")),
15+
cpuset_from_cgroup_(get_cgroup()) {
1516
// Open cgroups file
16-
if (nslots > cpuset_from_cgroup.size()) {
17+
if (nslots > cpuset_from_cgroup_.size()) {
1718
die_with_err("More slots requested than available on the system, this "
1819
"process can never run.",
1920
-1);
2021
}
21-
CPU_ZERO(&mask);
22+
CPU_ZERO(&mask_);
2223
}
2324

2425
std::vector<uint32_t> Proc_affinity::bind() {
@@ -31,52 +32,42 @@ std::vector<uint32_t> Proc_affinity::bind() {
3132
}
3233
std::sort(siblings_affinity.begin(), siblings_affinity.end());
3334
std::vector<uint32_t> allowed_cores;
34-
std::set_difference(cpuset_from_cgroup.begin(), cpuset_from_cgroup.end(),
35+
std::set_difference(cpuset_from_cgroup_.begin(), cpuset_from_cgroup_.end(),
3536
siblings_affinity.begin(), siblings_affinity.end(),
3637
std::inserter(allowed_cores, allowed_cores.begin()));
3738

38-
for (auto i = 0ul; i < nslots; i++) {
39-
CPU_SET(allowed_cores[i], &mask);
39+
for (auto i = 0ul; i < nslots_; i++) {
40+
CPU_SET(allowed_cores[i], &mask_);
4041
out.push_back(allowed_cores[i]);
4142
}
42-
if (sched_setaffinity(0, sizeof(cpu_set_t), &mask) == -1) {
43+
if (sched_setaffinity(0, sizeof(cpu_set_t), &mask_) == -1) {
4344
die_with_err_errno("Unable to set CPU affinity", -1);
4445
}
4546
return out;
4647
}
4748

4849
std::vector<pid_t> Proc_affinity::get_siblings() {
49-
std::vector<pid_t> out;
50-
// Find all the other versions of this application running
51-
for (const auto &entry : std::filesystem::directory_iterator("/proc")) {
52-
if (std::find(skip_paths.begin(), skip_paths.end(),
53-
entry.path().filename()) != skip_paths.end()) {
54-
continue;
55-
}
56-
if (std::filesystem::exists(entry.path() / "exe")) {
57-
try {
58-
if (std::filesystem::read_symlink(entry.path() / "exe") == my_path) {
59-
out.push_back(std::stoul(entry.path().filename()));
60-
}
61-
} catch (std::filesystem::filesystem_error &e) {
62-
// process went away
63-
continue;
64-
}
50+
auto all_tsp_pids = sm_.get_running_job_pids();
51+
for (std::vector<pid_t>::iterator it = all_tsp_pids.begin();
52+
it != all_tsp_pids.end();) {
53+
if (*it == pid_) {
54+
all_tsp_pids.erase(it);
55+
} else {
56+
it++;
6557
}
6658
}
67-
return out;
59+
return all_tsp_pids;
6860
};
6961

7062
std::vector<uint32_t> Proc_affinity::get_sibling_affinity(pid_t pid) {
7163
std::vector<uint32_t> out;
7264
cpu_set_t mask;
73-
// Just return an empty vector if the semaphore file is present
7465
if (sched_getaffinity(pid, sizeof(mask), &mask) == -1) {
7566
// Process may have been killed - so it isn't taking
7667
// resources any more
7768
return out;
7869
}
79-
for (const auto &i : cpuset_from_cgroup) {
70+
for (const auto &i : cpuset_from_cgroup_) {
8071
if (CPU_ISSET(i, &mask)) {
8172
out.push_back(i);
8273
}

proc_affinity.hpp

+9-8
Original file line numberDiff line numberDiff line change
@@ -8,21 +8,22 @@
88
#include <unistd.h>
99
#include <vector>
1010

11+
#include "status_manager.hpp"
12+
1113
namespace tsp {
1214
class Proc_affinity {
1315
public:
14-
Proc_affinity(uint32_t nslots, pid_t pid);
16+
Proc_affinity(Status_Manager &sm, uint32_t nslots, pid_t pid);
1517
std::vector<uint32_t> bind();
1618

1719
private:
18-
const uint32_t nslots;
19-
const pid_t pid;
20-
const std::filesystem::path my_path;
21-
const std::vector<uint32_t> cpuset_from_cgroup;
22-
cpu_set_t mask;
20+
Status_Manager &sm_;
21+
const uint32_t nslots_;
22+
const pid_t pid_;
23+
const std::filesystem::path my_path_;
24+
const std::vector<uint32_t> cpuset_from_cgroup_;
25+
cpu_set_t mask_;
2326
std::vector<pid_t> get_siblings();
2427
std::vector<uint32_t> get_sibling_affinity(pid_t pid);
25-
std::vector<std::string> skip_paths = {std::to_string(pid), "self",
26-
"thread-self"};
2728
};
2829
} // namespace tsp

run_cmd.cpp

+36-12
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <cstring>
55
#include <filesystem>
66
#include <fstream>
7+
#include <ranges>
78
#include <sys/types.h>
89
#include <sys/wait.h>
910
#include <unistd.h>
@@ -12,38 +13,57 @@
1213

1314
namespace tsp {
1415
Run_cmd::Run_cmd(char *cmdline[], int start, int end)
15-
: is_openmpi(check_mpi(cmdline[0])), rf_name(get_tmp()) {
16+
: is_openmpi(check_mpi(cmdline[start])) {
1617
for (int i = start; i < end; i++) {
17-
proc_to_run.push_back(cmdline[i]);
18+
proc_to_run_.emplace_back(cmdline[i]);
1819
}
19-
proc_to_run.push_back(nullptr);
2020
}
21+
2122
Run_cmd::~Run_cmd() {
2223
if (is_openmpi) {
23-
if (!rf_name.empty()) {
24-
std::filesystem::remove(rf_name);
24+
if (!rf_name_.empty()) {
25+
std::filesystem::remove(rf_name_);
26+
}
27+
if (argv_holder_ != nullptr) {
28+
free(argv_holder_);
2529
}
2630
}
2731
}
2832

2933
std::string Run_cmd::print() {
3034
std::stringstream out;
31-
for (const auto &i : proc_to_run) {
35+
for (const auto &i : proc_to_run_) {
3236
out << i << " ";
3337
}
3438
return out.str();
3539
}
3640

41+
const char *Run_cmd::get_argv_0() { return proc_to_run_[0].c_str(); }
42+
43+
char **Run_cmd::get_argv() {
44+
// Do it the old fashioned way
45+
if (argv_holder_ == nullptr) {
46+
if (nullptr == (argv_holder_ = static_cast<char **>(
47+
malloc((proc_to_run_.size() + 1) * sizeof(char *))))) {
48+
die_with_err_errno("Malloc failed", -1);
49+
}
50+
for (const auto &[i, p] : std::views::enumerate(proc_to_run_)) {
51+
argv_holder_[i] = p.data();
52+
}
53+
argv_holder_[proc_to_run_.size()] = nullptr;
54+
}
55+
return argv_holder_;
56+
}
57+
3758
void Run_cmd::add_rankfile(std::vector<uint32_t> procs, uint32_t nslots) {
3859
make_rankfile(procs, nslots);
39-
proc_to_run.insert(proc_to_run.begin() + 1, rf_name.string().data());
40-
proc_to_run.insert(proc_to_run.begin() + 1, std::string("rf").data());
41-
proc_to_run.push_back(nullptr);
60+
proc_to_run_.emplace(proc_to_run_.begin() + 1, rf_name_);
61+
proc_to_run_.emplace(proc_to_run_.begin() + 1, "-rf");
4262
}
4363

4464
void Run_cmd::make_rankfile(std::vector<uint32_t> procs, uint32_t nslots) {
45-
rf_name /= std::to_string(getpid()) + "_rankfile.txt";
46-
std::ofstream rf_stream(rf_name);
65+
rf_name_ = get_tmp() / (std::to_string(getpid()) + "_rankfile.txt");
66+
std::ofstream rf_stream(rf_name_);
4767
if (rf_stream.is_open()) {
4868
for (uint32_t i = 0; i < nslots; i++) {
4969
rf_stream << "rank " + std::to_string(i) +
@@ -55,7 +75,11 @@ void Run_cmd::make_rankfile(std::vector<uint32_t> procs, uint32_t nslots) {
5575
}
5676
bool Run_cmd::check_mpi(const char *exe_name) {
5777
std::string prog_name(exe_name);
58-
if (prog_name == "mpirun" || prog_name == "mpiexec") {
78+
std::string prog_test(prog_name);
79+
if (prog_name.starts_with('/')) {
80+
prog_test = std::filesystem::path(prog_name).filename().string();
81+
}
82+
if (prog_test == "mpirun" || prog_test == "mpiexec") {
5983
// OpenMPI does not respect parent process binding,
6084
// so we need to check if we're attempting to run
6185
// OpenMPI, and if we are, we need to construct a

0 commit comments

Comments
 (0)