-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfsi_thread.h
386 lines (357 loc) · 15.2 KB
/
fsi_thread.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
#ifndef FSI_THREAD_H
#define FSI_THREAD_H
/*! \ingroup PMsolver */
#include <atomic>
#include <thread>
#include <map>
#include <list>
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <iostream>
#include <cassert>
#pragma GCC diagnostic push
//#pragma GCC diagnostic ignored "-Wshadow"
extern std::condition_variable cv_parent;
// MPI_TAG-related
const unsigned int mask_shift = 26; // this is as big as it gets AFAIK - size limiting factor
const unsigned int MEX_MASK = 0x3U << mask_shift;
const unsigned int influence_mask = 0;
const unsigned int sourcevel_mask = 1U << mask_shift;
const unsigned int wingwakeinf_mask = 2U << mask_shift;
const unsigned int wakeinfluence_mask = 3U << mask_shift;
const unsigned int time_pulse = sourcevel_mask - 1; // all bits set to the right
const unsigned int do_finalize = time_pulse - 1; // tell MPI children to finalize and exit
const unsigned int influence_index = 0;
const unsigned int sourcevel_index = 1;
const unsigned int wingwakeinf_index = 2;
const unsigned int wakeinfluence_index = 3;
/*!
* \brief The mex_thread class
This header file provides definitions for the thread class, which is used to
provide cohesive multithread support with statistical counters.
Synchronization code for POSIX type and the POSIX threads in MPI
Synchronization is done in base class method wherever possible (except false sharing sync)
Calculations done in subclasses.
POSIX thread model:
- instantiate objects - one for each type of calculation
- start all child threads with mex_thread_main(), then wait
- at time step, call mex_thread_init() method from subclass, which calls
- mex_parent() method from base class, which awakens child threads to call
- mex_thread_run() for each object
MPI thread model:
- instantiate objects - one for each type of calculation
- start all child threads with mex_thread_main(), then wait
- at time step, call mex_thread_init() method from subclass, which calls
- mex_parent() method from base class, which awakens POSIX child threads to call
- mex_thread_run() for each object, loops, getting order number and sending to MPI child
- MPI child invokes compute() method according to calculation type, sending data
and posting generic MPI reads
OpenMP thread model:
- instantiate objects - one for each type of calculation
- at time step, call mex_thread_init() method from subclass, which does calculations
*/
class mex_thread {
public:
std::atomic<int> order, // next doublet order number to process
threads_finished; // number of threads done with time step processing
/*!
* \brief mex_thread
* \param pmexname ASCII name for mex type for general display use
* \param pnthreads number of threads
* \param pnx number of chord-wise elements
* \param pny number of span-wise elements
* \param pC colocation grid
* \param pB boundary grid, usually
* \param pworkinc work increment
*/
mex_thread(const char *pmexname,
const int pnthreads,
const int pnx,
const int pny,
const double * const pC,
const double * const pB,
int pworkinc
) : mexname(pmexname), C(pC), B(pB), nthreads(pnthreads), nx(pnx), ny(pny),
workinc(pworkinc) {}
virtual ~mex_thread() {}
/*!
* \brief mex_thread_run calculates doublets for POSIX thread type;
* requests. recieves and saves calculations from MPI for MPI thread type
* \param partner_rank MPI rank of corresponding remote MPI partner
*
* Implemented only in subclass
*
* Passed MPI partner rank (ignored by POSIX thread type).
* For MPI, sends data to MPI thread and awaits response,
* repeating until all work is done for at time step. Runs lock step with MPI compute
* Each MPI thread has a corresponding POSIX thread.
*
* Deposits calculations in buffer allocated by alloc_output() method
*
* Unused by OpenMP, which instead calculates in mex_thread_init()
*/
virtual void mex_thread_run(const int partner_rank) = 0;
void mex_parent();
bool has_unfinished_workers() { /*! avoid wait deadlock in POSIX parent thread */
return threads_finished < nthreads;
}
/*!
* \brief is_last_thread does last-thread processing, awakening parent if so
* \return true if last thread
*
* Synchronizes using atomic threads_finished
*/
bool is_last_thread() {
int old = threads_finished.fetch_add(1); // declare thread done, getting # already done
if (old >= nthreads - 1) { // if last thread to finish
// avoid race: parent decides to wait, parent notified (below), parent waits
extern std::mutex parent_mtx;
std::unique_lock<std::mutex> lck(parent_mtx);
try {
cv_parent.notify_one(); // awaken parent
}
catch (std::exception& e) {
std::cerr << "exception caught when cv_parent.notify_all() "
"Exception: " << e.what() << '\n';
}
}
return old >= nthreads - 1;
}
/*!
* \brief calc_wake_workinc calculate work increment for wakes based on work to do, workfactor, #threads.
* Divides work evenly between threads, then multiply that workfactor. ex: 1 means that all work
* should be done by each thread at once, evenly divided. 0.5 means that each thread should do half of
* that before requesting more work.
* \param ndoublets
* \param workfactor - fraction of ndoublets/nthreads to set workinc to.
* Double ranging between 0.0 and 1.0. If 0, always return 1 regardless of ndoublets
* \return computed workinc integer for each thread at the current time step, minimally 1
*/
inline int calc_wake_workinc(const int ndoublets, const double workfactor) {
if (workfactor < .000000001) return 1;
int winc = 1.0 * ndoublets / nthreads * workfactor;
#if 0
std::cerr << "ndoublets=" << 1.0 * ndoublets << " ndoublets/nthreads=" <<
1.0 * ndoublets / nthreads << " ndoublets/nthreads * workfactor=" <<
1.0 * ndoublets / nthreads * workfactor <<
" winc=" << winc << std::endl;
#endif
if (winc > 1) return winc;
//else if (nthreads % winc != 0) // not evenly divisible
// winc++; // round up TODO what's the point?
return 1; // prevent infinite loop when zero
}
#ifdef FSI_OPENMP
/*!
* \brief startend for OpenMP only calculates starting and ending order numbers for active thread
* \param tstart returned starting order number
* \param tend returned ending order number
* \param size number of doubles this time step
* \param thrdid thread index (zero-based)
* \param nthrds number of threads actually allocated by OpenMP
* Note: this does not evenly balance work, but leaves last thread with remainder
* Note: this is not used if the atomic mad grab for order number technique is used
*/
inline void startend(int &tstart, int &tend, const int size, const int thrdid, const int nthrds) {
int tinc = size / nthrds;
if (size % nthrds != 0) tinc++;
tstart = thrdid * tinc;
if (tstart > size) {
tstart = 0;
tend = 0;
return;
}
tend = tstart + tinc;
if (tend > size) tend = size;
}
#endif
const char * const mexname;
protected:
const double * const C, * const B;
const int nthreads;
const int nx, ny; // geometry
int workinc;
};
class influence_thread: public mex_thread {
public:
influence_thread(const char *pmexname, const int pnthreads,
const int pnx, const int pny, // of colocation grid
const double * const pC,
const double * const pB,
int pworkinc, int pgridsize
) : mex_thread (pmexname, pnthreads, pnx, pny, pC, pB, pworkinc),
mn(nx * ny), nmn(ny * mn), m2(nx + 2), mnmn(mn * mn), nx23((nx + 2) * 3),
gridsize(pgridsize), outsize(2 * mnmn) {}
/*!
* \brief mex_thread_init does init for "influence" calculations
*
* Either
* - initiates calculations by calling back in to mex_parent, (POSIX, MPI models)
* - or does calculations itself (OpenMP)
*/
void mex_thread_init(const double, double *);
/*!
* \brief compute do computations starting with order number iord given preloaded grids
* \param iord order number
* \param coeff coefficient
*/
void compute(int iord, const double coeff);
void mex_thread_run(const int) override;
inline double * doublet(double *AB,
const double CXmcnc, const double CYmcnc, const double CZmcnc,
const double *B,
const int m, const int n);
double *alloc_output() { return new double[outsize]; }
protected:
const int mn, nmn, m2, mnmn, nx23, gridsize, outsize;
double Miu = 1.0, *AB = nullptr;
};
class sourcevel_thread: public mex_thread {
public:
sourcevel_thread(const char *pmexname, int pnthreads, const int nts,
const int pnx, const int pny, // of colocation grid
const double * const pC,
const double * const pB,
int pworkinc, double workfactor
) : mex_thread (pmexname, pnthreads, pnx, pny, pC, pB, pworkinc),
nts(nts),
n1(ny + 1), m2(nx + 2),
sigsize(nx * ny), outsize(3 * nts * n1),
workfactor(workfactor) {}
void mex_thread_init(const int, const double *, double *);
void compute(int iord, const double * const pcoeff, const int t);
void mex_thread_run(const int) override;
double *alloc_output() { return new double[outsize]; }
inline int coloc(const int ord) {
const int it = ord % t1,
in = ord / t1;
assert(it < t + 1);
assert(in < ny + 1);
return 3 * (it + in * nts); // indexes uwakeS(nts, n1)
}
protected:
inline void doublet(double * const pwakeS,
const double CXitin, const double CYitin, const double CZitin,
const double *B,
const double *psmn,
const int m, const int n, const int m2);
const int nts,
n1, m2,
sigsize, outsize;
double workfactor;
int t = 0, t1 = 0, t1n1 = 0; // time-related variables
const double *sig = nullptr;
double *uwakeS = nullptr;
};
class wingwakeinf_thread: public mex_thread {
public:
wingwakeinf_thread(const char *pmexname, const int pnthreads, const int nts,
const int pnx, const int pny, // of colocation grid
const double * const pC,
const double * const pB,
int pworkinc, double workfactor
) : mex_thread (pmexname, pnthreads, pnx, pny, pC, pB, pworkinc),
nts(nts), workfactor(workfactor),
n1(ny + 1), nx2(nx + 2), nts3(nts * 3),
sigsize(nx * ny + nts * ny), outsize(3 * nts * n1) {}
void mex_thread_init(const int, const double *, const double *, double *);
void compute(int iord, const double * const pcoeff, const int t);
void mex_thread_run(const int) override;
inline void doublet(double * const uw,
const double CXitin, const double CYitin, const double CZitin,
const double *pB,
const double *pMU, const double * const MUE,
const int t);
double *alloc_output() { return new double[outsize]; }
inline int coloc(const int ord) {
const int it = ord % t1,
in = ord / t1;
assert(it < t + 1);
assert(in < ny + 1);
return 3 * (it + in * nts); // indexes uwakeS(nts, n1)
}
protected:
const int nts;
double workfactor;
const int n1, nx2, nts3, sigsize, outsize;
int t = 0, t1 = 0, t1n1 = 0;
const double *sig = nullptr, *sig2 = nullptr;
double *uwakeS = nullptr;
};
class wakeinfluence_thread: public mex_thread {
public:
wakeinfluence_thread(const char *pmexname, const int pnthreads, const int nts,
const int pnx, const int pny, // of colocation grid
const double * const pC,
const double * const pB,
int pworkinc
) : mex_thread (pmexname, pnthreads, pnx, pny, pC, pB, pworkinc),
nts(nts),
mn(nx * ny), nmn(mn * ny), nts3(nts * 3),
sigsize(nts * ny), outsize(nx * ny * nts * ny) {}
inline double * doublet(double * const,
const double xicjc, const double yicjc, const double zicjc,
const double * const W,
const double * const MUEW,
const int tm1, const int n);
void mex_thread_init(const int, const double *, double *);
void compute(int iord, const double * const pcoeff, const int t);
void mex_thread_run(const int) override;
double *alloc_output() { return new double[outsize]; }
protected:
const int nts,
mn, nmn, nts3,
sigsize, outsize;
int t = 0;
const double *sig = nullptr;
double *A = nullptr;
};
typedef std::chrono::microseconds mex_thread_time_units_t;
#ifdef FSI_STATS // accumulate statistical counters and append to XML
// manages stop watch
class timecheck {
public:
timecheck() : elapsed(0), is_running(false) {}
inline void start(std::chrono::steady_clock::time_point now) {
start_time = now;
is_running = true;
}
inline void stop(std::chrono::steady_clock::time_point now) {
elapsed += std::chrono::duration_cast<mex_thread_time_units_t>(now - start_time);
is_running = false;
}
mex_thread_time_units_t elapsed;
std::chrono::steady_clock::time_point start_time;
bool is_running;
};
extern timecheck mex_time;
class mex_stat;
typedef std::map<const mex_thread *, mex_stat *> mex_stat_map_t;
#ifdef FSI_OPENMP
extern std::map<const int, mex_stat_map_t *> thread_stat_map;
#else
extern std::map<const std::thread::id, mex_stat_map_t *> thread_stat_map;
#endif
// mapping: threads[id] -> pmex -> mex_stat
// so each thread has stats per mex routine
class mex_stat {
public:
mex_stat() : call_count(0), doublets(0), nsyncs(0), elapsed(0), openmp_denied(0) {}
int call_count; // #times mex routine is called
int doublets; // #doublets processed
int nsyncs; // #times mex pthread synchronizes in asynch mode
mex_thread_time_units_t elapsed; // total time active in named units
int openmp_denied; // total openmp threads requested - received
};
// starts/stops timer
#define TSTART(name, now) name.start(now)
#define TSTOP(name, now) name.stop(now)
#else // no stats
#define TSTART(...)
#define TSTOP(...)
#endif
//std::atomic<int> mex_thread::order; // this will define the order for SOURCEVEL workers
//std::atomic<int> mex_thread::threads_finished; // this will define the order for SOURCEVEL workers
#pragma GCC diagnostic pop
#endif // FSI_THREAD_H