Skip to content

Commit eca7c94

Browse files
committed
Finish multifile
1 parent 4b72add commit eca7c94

File tree

4 files changed

+14
-272
lines changed

4 files changed

+14
-272
lines changed

CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ set(sources
1313
jitter.cpp
1414
run_cmd.cpp
1515
status_manager.cpp
16+
output_manager.cpp
17+
proc_manager.cpp
1618
tsp.cpp)
1719

1820
add_executable(tsp-hpc ${sources})

run_cmd.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
namespace tsp
1010
{
11-
Run_cmd::Run_cmd(char *cmdline[], int start, int end) : proc_to_run(), is_openmpi(check_mpi()), rf_copy(nullptr), dash_rf(nullptr), rf_name()
11+
Run_cmd::Run_cmd(char *cmdline[], int start, int end) : is_openmpi(check_mpi(cmdline[0])), rf_copy(nullptr), dash_rf(nullptr), rf_name()
1212
{
1313
for (int i = start; i < end; i++)
1414
{
@@ -69,9 +69,9 @@ namespace tsp
6969
rf_stream.close();
7070
return rank_file;
7171
}
72-
bool Run_cmd::check_mpi()
72+
bool Run_cmd::check_mpi(const char *exe_name)
7373
{
74-
std::string prog_name(proc_to_run[0]);
74+
std::string prog_name(exe_name);
7575
if (prog_name == "mpirun" || prog_name == "mpiexec")
7676
{
7777
// OpenMPI does not respect parent process binding,

run_cmd.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,6 @@ namespace tsp
1919
char *dash_rf;
2020
std::filesystem::path rf_name;
2121
std::filesystem::path make_rankfile(std::vector<uint32_t> procs, uint32_t nslots);
22-
bool check_mpi();
22+
bool check_mpi(const char* exe_name);
2323
};
2424
}

tsp.cpp

Lines changed: 8 additions & 268 deletions
Original file line numberDiff line numberDiff line change
@@ -1,284 +1,24 @@
1+
#include <cinttypes>
2+
#include <filesystem>
3+
#include <string>
14
#include <cstdlib>
2-
#include <vector>
35
#include <iostream>
46
#include <fstream>
5-
#include <string>
6-
#include <filesystem>
7-
#include <stdexcept>
8-
#include <numeric>
9-
#include <algorithm>
10-
#include <iterator>
11-
#include <chrono>
127
#include <thread>
13-
#include <ranges>
14-
#include <cstring>
15-
#include <random>
16-
#include <iomanip>
178

189
#include <unistd.h>
19-
#include <sys/wait.h>
2010
#include <sys/types.h>
21-
#include <sys/stat.h>
22-
#include <fcntl.h>
11+
#include <sys/wait.h>
2312

2413
#include "functions.hpp"
2514
#include "jitter.hpp"
2615
#include "semaphore.hpp"
2716
#include "run_cmd.hpp"
2817
#include "status_manager.hpp"
18+
#include "proc_manager.hpp"
19+
#include "output_manager.hpp"
2920

30-
#define CGROUP_CPUSET_PATH_PREFIX "/sys/fs/cgroup/cpuset"
31-
#define CPUSET_FILE "/cpuset.cpus"
3221
#define BASE_WAIT_PERIOD 2
33-
#define OUTPUT_FILE_TEMPLATE "/tsp.o"
34-
#define ERROR_FILE_TEMPLATE "/tsp.e"
35-
36-
37-
class Tsp_Proc
38-
{
39-
public:
40-
pid_t pid;
41-
std::vector<uint32_t> allowed_cores;
42-
43-
Tsp_Proc(uint32_t nslots)
44-
{
45-
this->nslots = nslots;
46-
pid = getpid();
47-
my_path = std::filesystem::read_symlink("/proc/self/exe");
48-
// Open cgroups file
49-
cpuset_from_cgroup = get_cgroup();
50-
if (nslots > cpuset_from_cgroup.size())
51-
{
52-
throw std::runtime_error("More slots requested than available on the system, this process can never run.");
53-
}
54-
refresh_allowed_cores();
55-
};
56-
57-
bool allowed_to_run()
58-
{
59-
return allowed_cores.size() > nslots;
60-
}
61-
62-
void refresh_allowed_cores()
63-
{
64-
auto sibling_pids = get_siblings();
65-
std::vector<uint32_t> siblings_affinity;
66-
for (auto i : sibling_pids)
67-
{
68-
auto tmp = get_sibling_affinity(i);
69-
siblings_affinity.insert(siblings_affinity.end(), tmp.begin(), tmp.end());
70-
}
71-
std::sort(siblings_affinity.begin(), siblings_affinity.end());
72-
allowed_cores.clear();
73-
std::set_difference(cpuset_from_cgroup.begin(), cpuset_from_cgroup.end(),
74-
siblings_affinity.begin(), siblings_affinity.end(),
75-
std::inserter(allowed_cores, allowed_cores.begin()));
76-
}
77-
78-
~Tsp_Proc() {};
79-
80-
private:
81-
std::vector<uint32_t> cpuset_from_cgroup;
82-
std::filesystem::path my_path;
83-
uint32_t nslots;
84-
85-
std::vector<pid_t> get_siblings()
86-
{
87-
std::vector<pid_t> out;
88-
std::vector<std::string> skip_paths = {std::to_string(pid), "self", "thread-self"};
89-
// Find all the other versions of this application running
90-
for (const auto &entry : std::filesystem::directory_iterator("/proc"))
91-
{
92-
if (std::find(skip_paths.begin(), skip_paths.end(), entry.path().filename()) != skip_paths.end())
93-
{
94-
continue;
95-
}
96-
if (std::filesystem::exists(entry.path() / "exe"))
97-
{
98-
try
99-
{
100-
if (std::filesystem::read_symlink(entry.path() / "exe") == my_path)
101-
{
102-
out.push_back(std::stoul(entry.path().filename()));
103-
}
104-
}
105-
catch (std::filesystem::filesystem_error &e)
106-
{
107-
// process went away
108-
continue;
109-
}
110-
}
111-
}
112-
return out;
113-
};
114-
115-
std::vector<std::uint32_t> parse_cpuset_range(std::string in)
116-
{
117-
std::stringstream ss1(in);
118-
std::string token;
119-
std::vector<std::uint32_t> out;
120-
while (std::getline(ss1, token, ','))
121-
{
122-
if (token.find('-') == std::string::npos)
123-
{
124-
out.push_back(std::stoul(token));
125-
}
126-
else
127-
{
128-
std::stringstream ss2(token);
129-
std::string starts, ends;
130-
std::getline(ss2, starts, '-');
131-
std::getline(ss2, ends, '-');
132-
std::vector<std::uint32_t> tmp(std::stoul(ends) - std::stoul(starts) + 1);
133-
std::iota(tmp.begin(), tmp.end(), std::stoul(starts));
134-
out.insert(out.end(), tmp.begin(), tmp.end());
135-
}
136-
}
137-
return out;
138-
};
139-
140-
std::vector<uint32_t> get_sibling_affinity(pid_t pid)
141-
{
142-
std::vector<uint32_t> out;
143-
cpu_set_t mask;
144-
// Just return an empty vector if the semaphore file is present
145-
try
146-
{
147-
for (const auto &entry : std::filesystem::directory_iterator("/proc/" + std::to_string(pid) + "/fd"))
148-
if (std::filesystem::read_symlink(entry).string().find(SEMAPHORE_FILE_TEMPLATE) != std::string::npos)
149-
{
150-
// Semaphore present, ignore
151-
return out;
152-
}
153-
}
154-
155-
catch (std::filesystem::filesystem_error &e)
156-
{
157-
// Process went away
158-
return out;
159-
}
160-
if (sched_getaffinity(pid, sizeof(mask), &mask) == -1)
161-
{
162-
// Process may have been killed - so it isn't taking
163-
// resources any more
164-
return out;
165-
}
166-
for (const auto &i : cpuset_from_cgroup)
167-
{
168-
if (CPU_ISSET(i, &mask))
169-
{
170-
out.push_back(i);
171-
}
172-
}
173-
return out;
174-
};
175-
176-
std::vector<uint32_t> get_cgroup()
177-
{
178-
std::filesystem::path cgroup_fn(std::string("/proc/" + std::to_string(pid) + "/cgroup"));
179-
if (!std::filesystem::exists(cgroup_fn))
180-
{
181-
throw std::runtime_error("Cgroup file for process " + std::to_string(pid) + " not found");
182-
}
183-
std::string line;
184-
std::filesystem::path cpuset_path;
185-
// get cpuset path
186-
std::ifstream cgroup_file(cgroup_fn);
187-
if (cgroup_file.is_open())
188-
{
189-
while (std::getline(cgroup_file, line))
190-
{
191-
std::vector<std::string> seglist;
192-
std::string segment;
193-
std::stringstream ss(line);
194-
while (std::getline(ss, segment, ':'))
195-
{
196-
seglist.push_back(segment);
197-
};
198-
if (seglist[1] == "cpuset")
199-
{
200-
cpuset_path = CGROUP_CPUSET_PATH_PREFIX;
201-
cpuset_path += seglist[2];
202-
cpuset_path += CPUSET_FILE;
203-
}
204-
if (!cpuset_path.empty())
205-
{
206-
break;
207-
}
208-
}
209-
cgroup_file.close();
210-
}
211-
else
212-
{
213-
throw std::runtime_error("Unable to open cgroup file " + cgroup_fn.string());
214-
}
215-
// read cpuset file
216-
std::ifstream cpuset_file(cpuset_path);
217-
if (cpuset_file.is_open())
218-
{
219-
std::getline(cpuset_file, line);
220-
return parse_cpuset_range(line);
221-
}
222-
else
223-
{
224-
throw std::runtime_error("Unable to open cpuset file " + cpuset_path.string());
225-
}
226-
}
227-
};
228-
229-
class Output_handler
230-
{
231-
public:
232-
int stdout_fd = -1;
233-
int stderr_fd = -1;
234-
Output_handler(bool disappear, bool separate_stderr, const char *jobid)
235-
{
236-
if (disappear)
237-
{
238-
stdout_fd = open("/dev/null", O_WRONLY | O_CREAT, 0666);
239-
stderr_fd = open("/dev/null", O_WRONLY | O_CREAT, 0666);
240-
}
241-
else
242-
{
243-
asprintf(&stdout_fn, "%s" OUTPUT_FILE_TEMPLATE "%s", get_tmp(), jobid);
244-
stdout_fd = open(stdout_fn, O_WRONLY | O_CREAT, 0600);
245-
dup2(stdout_fd, 1);
246-
if (separate_stderr)
247-
{
248-
asprintf(&stderr_fn, "%s" ERROR_FILE_TEMPLATE "%s", get_tmp(), jobid);
249-
dup2(stderr_fd, 2);
250-
stderr_fd = open(stderr_fn, O_WRONLY | O_CREAT, 0600);
251-
}
252-
else
253-
{
254-
dup2(stdout_fd, 2);
255-
}
256-
}
257-
}
258-
~Output_handler()
259-
{
260-
if (stdout_fn != nullptr)
261-
{
262-
free(stdout_fn);
263-
}
264-
if (stderr_fn != nullptr)
265-
{
266-
free(stderr_fn);
267-
}
268-
if (stdout_fd != -1)
269-
{
270-
close(stdout_fd);
271-
}
272-
if (stderr_fd != -1)
273-
{
274-
close(stderr_fd);
275-
}
276-
}
277-
278-
private:
279-
char *stdout_fn = nullptr;
280-
char *stderr_fn = nullptr;
281-
};
28222

28323
int main(int argc, char *argv[])
28424
{
@@ -345,7 +85,7 @@ int main(int argc, char *argv[])
34585

34686
tsp::Run_cmd cmd(argv, optind, argc);
34787
tsp::Status_Manager stat(cmd, nslots);
348-
Tsp_Proc me(nslots);
88+
tsp::Tsp_Proc me(nslots);
34989

35090
cpu_set_t mask;
35191
CPU_ZERO(&mask);
@@ -382,7 +122,7 @@ int main(int argc, char *argv[])
382122
pid_t waited_on_pid;
383123
if (0 == (waited_on_pid = fork()))
384124
{
385-
Output_handler *handler = new Output_handler(disappear_output, separate_stderr, stat.jobid.c_str());
125+
tsp::Output_handler *handler = new tsp::Output_handler(disappear_output, separate_stderr, stat.jobid.c_str());
386126
if (cmd.is_openmpi)
387127
{
388128
setenv("OMPI_MCA_rmaps_base_mapping_policy", "", 1);

0 commit comments

Comments
 (0)