Skip to content

Commit fe51fea

Browse files
committed
UCP/PROTO: Add CPU performance type
When there are multiple parallel stages and each of them consumes CPU time, we need to accumulate this CPU time rather than taking just the maximum.
1 parent 1c27876 commit fe51fea

File tree

10 files changed

+147
-66
lines changed

10 files changed

+147
-66
lines changed

.clang-format

+1
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ ForEachMacros: ['_UCS_BITMAP_FOR_EACH_WORD',
7373
'kh_foreach',
7474
'kh_foreach_key',
7575
'kh_foreach_value',
76+
'UCP_PROTO_PERF_TYPE_FOREACH',
7677
'ucp_unpacked_address_for_each',
7778
'ucs_array_for_each',
7879
'UCS_BITMAP_FOR_EACH_BIT',

src/ucp/proto/proto.c

+32-1
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ const ucp_proto_t *ucp_protocols[] = {
7878

7979
const char *ucp_proto_perf_type_names[] = {
8080
[UCP_PROTO_PERF_TYPE_SINGLE] = "single",
81-
[UCP_PROTO_PERF_TYPE_MULTI] = "multi"
81+
[UCP_PROTO_PERF_TYPE_MULTI] = "multi",
82+
[UCP_PROTO_PERF_TYPE_CPU] = "cpu"
8283
};
8384

8485
const char *ucp_operation_names[] = {
@@ -131,3 +132,33 @@ void ucp_proto_default_query(const ucp_proto_query_params_t *params,
131132
ucs_strncpy_safe(attr->desc, params->proto->desc, sizeof(attr->desc));
132133
ucs_strncpy_safe(attr->config, "", sizeof(attr->config));
133134
}
135+
136+
void ucp_proto_perf_set(ucs_linear_func_t perf[UCP_PROTO_PERF_TYPE_LAST],
137+
ucs_linear_func_t func)
138+
{
139+
ucp_proto_perf_type_t perf_type;
140+
141+
UCP_PROTO_PERF_TYPE_FOREACH(perf_type) {
142+
perf[perf_type] = func;
143+
}
144+
}
145+
146+
void ucp_proto_perf_copy(ucs_linear_func_t dest[UCP_PROTO_PERF_TYPE_LAST],
147+
const ucs_linear_func_t src[UCP_PROTO_PERF_TYPE_LAST])
148+
{
149+
ucp_proto_perf_type_t perf_type;
150+
151+
UCP_PROTO_PERF_TYPE_FOREACH(perf_type) {
152+
dest[perf_type] = src[perf_type];
153+
}
154+
}
155+
156+
void ucp_proto_perf_add(ucs_linear_func_t perf[UCP_PROTO_PERF_TYPE_LAST],
157+
ucs_linear_func_t func)
158+
{
159+
ucp_proto_perf_type_t perf_type;
160+
161+
UCP_PROTO_PERF_TYPE_FOREACH(perf_type) {
162+
ucs_linear_func_add_inplace(&perf[perf_type], func);
163+
}
164+
}

src/ucp/proto/proto.h

+29-16
Original file line numberDiff line numberDiff line change
@@ -113,16 +113,33 @@ typedef struct {
113113
* +---------+---------------+
114114
*/
115115
typedef enum {
116+
UCP_PROTO_PERF_TYPE_FIRST,
117+
116118
/* Time to complete this operation assuming it's the only one. */
117-
UCP_PROTO_PERF_TYPE_SINGLE,
119+
UCP_PROTO_PERF_TYPE_SINGLE = UCP_PROTO_PERF_TYPE_FIRST,
118120

119121
/* Time to complete this operation after all previous ones complete. */
120122
UCP_PROTO_PERF_TYPE_MULTI,
121123

124+
/* CPU time the operation consumes (it would be less than or equal to the
125+
* SINGLE and MULTI times).
126+
*/
127+
UCP_PROTO_PERF_TYPE_CPU,
128+
122129
UCP_PROTO_PERF_TYPE_LAST
123130
} ucp_proto_perf_type_t;
124131

125132

133+
/*
134+
* Iterate over performance types.
135+
*
136+
* @param _perf_type Performance type iterator variable.
137+
*/
138+
#define UCP_PROTO_PERF_TYPE_FOREACH(_perf_type) \
139+
for (_perf_type = UCP_PROTO_PERF_TYPE_FIRST; \
140+
_perf_type < UCP_PROTO_PERF_TYPE_LAST; ++(_perf_type))
141+
142+
126143
/*
127144
* Performance estimation for a range of message sizes.
128145
*/
@@ -312,20 +329,16 @@ unsigned ucp_protocols_count(void);
312329
void ucp_proto_default_query(const ucp_proto_query_params_t *params,
313330
ucp_proto_query_attr_t *attr);
314331

315-
static inline void
316-
ucp_proto_perf_copy(ucs_linear_func_t dest[UCP_PROTO_PERF_TYPE_LAST],
317-
const ucs_linear_func_t src[UCP_PROTO_PERF_TYPE_LAST])
318-
{
319-
dest[UCP_PROTO_PERF_TYPE_SINGLE] = src[UCP_PROTO_PERF_TYPE_SINGLE];
320-
dest[UCP_PROTO_PERF_TYPE_MULTI] = src[UCP_PROTO_PERF_TYPE_MULTI];
321-
}
322-
323-
static inline void
324-
ucp_proto_perf_add(ucs_linear_func_t perf[UCP_PROTO_PERF_TYPE_LAST],
325-
ucs_linear_func_t func)
326-
{
327-
ucs_linear_func_add_inplace(&perf[UCP_PROTO_PERF_TYPE_SINGLE], func);
328-
ucs_linear_func_add_inplace(&perf[UCP_PROTO_PERF_TYPE_MULTI], func);
329-
}
332+
333+
void ucp_proto_perf_set(ucs_linear_func_t perf[UCP_PROTO_PERF_TYPE_LAST],
334+
ucs_linear_func_t func);
335+
336+
337+
void ucp_proto_perf_copy(ucs_linear_func_t dest[UCP_PROTO_PERF_TYPE_LAST],
338+
const ucs_linear_func_t src[UCP_PROTO_PERF_TYPE_LAST]);
339+
340+
341+
void ucp_proto_perf_add(ucs_linear_func_t perf[UCP_PROTO_PERF_TYPE_LAST],
342+
ucs_linear_func_t func);
330343

331344
#endif

src/ucp/proto/proto_debug.h

+4-2
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,12 @@
3636
* of different types. See ucp_proto_perf_type_t */
3737
#define UCP_PROTO_PERF_FUNC_TYPES_FMT \
3838
UCP_PROTO_PERF_FUNC_FMT(single) \
39-
UCP_PROTO_PERF_FUNC_FMT(multi)
39+
UCP_PROTO_PERF_FUNC_FMT(multi) \
40+
UCP_PROTO_PERF_FUNC_FMT(cpu)
4041
#define UCP_PROTO_PERF_FUNC_TYPES_ARG(_perf_func) \
4142
UCP_PROTO_PERF_FUNC_ARG((&(_perf_func)[UCP_PROTO_PERF_TYPE_SINGLE])), \
42-
UCP_PROTO_PERF_FUNC_ARG((&(_perf_func)[UCP_PROTO_PERF_TYPE_MULTI]))
43+
UCP_PROTO_PERF_FUNC_ARG((&(_perf_func)[UCP_PROTO_PERF_TYPE_MULTI])), \
44+
UCP_PROTO_PERF_FUNC_ARG((&(_perf_func)[UCP_PROTO_PERF_TYPE_CPU]))
4345

4446

4547
/*

src/ucp/proto/proto_init.c

+60-28
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ void ucp_proto_common_add_ppln_range(const ucp_proto_init_params_t *init_params,
6262
ppln_range->perf[UCP_PROTO_PERF_TYPE_MULTI] =
6363
frag_range->perf[UCP_PROTO_PERF_TYPE_MULTI];
6464

65+
ppln_range->perf[UCP_PROTO_PERF_TYPE_CPU] =
66+
frag_range->perf[UCP_PROTO_PERF_TYPE_CPU];
67+
6568
ppln_range->max_length = max_length;
6669

6770
ucp_proto_perf_range_add_data(ppln_range);
@@ -96,6 +99,8 @@ void ucp_proto_perf_range_add_data(const ucp_proto_perf_range_t *range)
9699
range->perf[UCP_PROTO_PERF_TYPE_SINGLE]);
97100
ucp_proto_perf_node_add_data(range->node, "mult",
98101
range->perf[UCP_PROTO_PERF_TYPE_MULTI]);
102+
ucp_proto_perf_node_add_data(range->node, "cpu",
103+
range->perf[UCP_PROTO_PERF_TYPE_CPU]);
99104
}
100105

101106
ucs_status_t
@@ -183,23 +188,24 @@ ucp_proto_perf_envelope_make(const ucp_proto_perf_list_t *perf_list,
183188
}
184189

185190
ucs_status_t
186-
ucp_proto_init_parallel_stages(const ucp_proto_init_params_t *params,
191+
ucp_proto_init_parallel_stages(const ucp_proto_common_init_params_t *params,
187192
size_t range_start, size_t range_end,
188193
size_t frag_size, double bias,
189194
const ucp_proto_perf_range_t **stages,
190195
unsigned num_stages)
191196
{
192-
ucp_proto_caps_t *caps = params->caps;
197+
ucp_proto_caps_t *caps = params->super.caps;
193198
ucs_linear_func_t bias_func = ucs_linear_func_make(0.0, 1.0 - bias);
194-
UCS_ARRAY_DEFINE_ONSTACK(stage_list, ucp_proto_perf_list, 4);
195-
UCS_ARRAY_DEFINE_ONSTACK(concave, ucp_proto_perf_envelope, 4);
196-
const ucs_linear_func_t *single_perf, *multi_perf;
199+
UCS_ARRAY_DEFINE_ONSTACK(stage_list, ucp_proto_perf_list, 16);
200+
UCS_ARRAY_DEFINE_ONSTACK(concave, ucp_proto_perf_envelope, 16);
201+
ucs_linear_func_t perf[UCP_PROTO_PERF_TYPE_LAST];
202+
ucs_linear_func_t sum_single_perf, sum_cpu_perf;
197203
const ucp_proto_perf_range_t **stage_elem;
198204
ucp_proto_perf_envelope_elem_t *elem;
199205
ucp_proto_perf_node_t *stage_node;
206+
ucp_proto_perf_type_t perf_type;
200207
ucp_proto_perf_range_t *range;
201208
ucs_linear_func_t *perf_elem;
202-
ucs_linear_func_t sum_perf;
203209
char frag_size_str[64];
204210
ucs_status_t status;
205211
char range_str[64];
@@ -211,18 +217,30 @@ ucp_proto_init_parallel_stages(const ucp_proto_init_params_t *params,
211217
frag_size_str, bias * 100.0);
212218

213219
ucs_log_indent(1);
214-
sum_perf = UCS_LINEAR_FUNC_ZERO;
220+
sum_single_perf = UCS_LINEAR_FUNC_ZERO;
221+
sum_cpu_perf = UCS_LINEAR_FUNC_ZERO;
215222
ucs_carray_for_each(stage_elem, stages, num_stages) {
216-
/* Single-fragment is adding overheads and transfer time */
217-
single_perf = &(*stage_elem)->perf[UCP_PROTO_PERF_TYPE_SINGLE];
218-
ucs_linear_func_add_inplace(&sum_perf, *single_perf);
223+
UCP_PROTO_PERF_TYPE_FOREACH(perf_type) {
224+
perf[perf_type] = (*stage_elem)->perf[perf_type];
225+
/* For multi-fragment protocols, we need to apply the fragment
226+
* size to the performance function linear factor.
227+
*/
228+
if (!(params->flags & UCP_PROTO_COMMON_INIT_FLAG_SINGLE_FRAG)) {
229+
perf[perf_type].m += perf[perf_type].c / frag_size;
230+
}
231+
}
219232

220-
/* account for the overhead of each fragment of a multi-fragment message */
221-
multi_perf = &(*stage_elem)->perf[UCP_PROTO_PERF_TYPE_MULTI];
222-
perf_elem = ucs_array_append(ucp_proto_perf_list, &stage_list,
223-
status = UCS_ERR_NO_MEMORY; goto out);
224-
perf_elem->c = multi_perf->c;
225-
perf_elem->m = multi_perf->m + (multi_perf->c / frag_size);
233+
/* Summarize single and CPU time */
234+
ucs_linear_func_add_inplace(&sum_single_perf,
235+
perf[UCP_PROTO_PERF_TYPE_SINGLE]);
236+
ucs_linear_func_add_inplace(&sum_cpu_perf,
237+
perf[UCP_PROTO_PERF_TYPE_CPU]);
238+
239+
/* Add all multi perf ranges to envelope array */
240+
perf_elem = ucs_array_append(ucp_proto_perf_list, &stage_list,
241+
status = UCS_ERR_NO_MEMORY;
242+
goto out);
243+
*perf_elem = perf[UCP_PROTO_PERF_TYPE_MULTI];
226244

227245
ucs_trace("stage[%zu] %s " UCP_PROTO_PERF_FUNC_TYPES_FMT
228246
UCP_PROTO_PERF_FUNC_FMT(perf_elem),
@@ -232,6 +250,12 @@ ucp_proto_init_parallel_stages(const ucp_proto_init_params_t *params,
232250
UCP_PROTO_PERF_FUNC_ARG(perf_elem));
233251
}
234252

253+
/* Add CPU time as another parallel stage */
254+
perf_elem = ucs_array_append(ucp_proto_perf_list, &stage_list,
255+
status = UCS_ERR_NO_MEMORY;
256+
goto out);
257+
*perf_elem = sum_cpu_perf;
258+
235259
/* Multi-fragment is pipelining overheads and network transfer */
236260
status = ucp_proto_perf_envelope_make(&stage_list, range_start, range_end,
237261
0, &concave);
@@ -242,16 +266,19 @@ ucp_proto_init_parallel_stages(const ucp_proto_init_params_t *params,
242266
ucs_array_for_each(elem, &concave) {
243267
range = &caps->ranges[caps->num_ranges];
244268
range->max_length = elem->max_length;
245-
range->node = ucp_proto_perf_node_new_data(params->proto_name, "");
269+
range->node = ucp_proto_perf_node_new_data(params->super.proto_name,
270+
"");
246271

247272
/* "single" performance estimation is sum of "stages" with the bias */
248273
range->perf[UCP_PROTO_PERF_TYPE_SINGLE] =
249-
ucs_linear_func_compose(bias_func, sum_perf);
274+
ucs_linear_func_compose(bias_func, sum_single_perf);
250275

251276
/* "multiple" performance estimation is concave envelope of "stages" */
252-
multi_perf = &ucs_array_elem(&stage_list, elem->index);
253-
range->perf[UCP_PROTO_PERF_TYPE_MULTI] =
254-
ucs_linear_func_compose(bias_func, *multi_perf);
277+
range->perf[UCP_PROTO_PERF_TYPE_MULTI] = ucs_linear_func_compose(
278+
bias_func, ucs_array_elem(&stage_list, elem->index));
279+
280+
/* CPU overhead is the sum of all stages */
281+
range->perf[UCP_PROTO_PERF_TYPE_CPU] = sum_cpu_perf;
255282

256283
ucp_proto_perf_range_add_data(range);
257284

@@ -331,11 +358,13 @@ ucp_proto_common_init_send_perf(const ucp_proto_common_init_params_t *params,
331358
ucp_proto_perf_node_own_child(send_perf->node, &child_perf_node);
332359
}
333360

334-
/* Add constant CPU overhead */
335-
send_overhead.c += tl_perf->send_pre_overhead;
336-
send_perf->perf[UCP_PROTO_PERF_TYPE_SINGLE] = send_overhead;
337-
send_perf->perf[UCP_PROTO_PERF_TYPE_MULTI] = send_overhead;
338-
send_perf->perf[UCP_PROTO_PERF_TYPE_MULTI].c += tl_perf->send_post_overhead;
361+
send_overhead.c += tl_perf->send_pre_overhead;
362+
send_perf->perf[UCP_PROTO_PERF_TYPE_SINGLE] = send_overhead;
363+
364+
send_overhead.c += tl_perf->send_post_overhead;
365+
send_perf->perf[UCP_PROTO_PERF_TYPE_MULTI] = send_overhead;
366+
send_perf->perf[UCP_PROTO_PERF_TYPE_CPU] = send_overhead;
367+
339368
ucp_proto_perf_range_add_data(send_perf);
340369

341370
return UCS_OK;
@@ -368,6 +397,7 @@ ucp_proto_common_init_xfer_perf(const ucp_proto_common_init_params_t *params,
368397
xfer_perf->perf[UCP_PROTO_PERF_TYPE_SINGLE].c += tl_perf->latency +
369398
tl_perf->sys_latency;
370399
xfer_perf->perf[UCP_PROTO_PERF_TYPE_MULTI] = xfer_time;
400+
xfer_perf->perf[UCP_PROTO_PERF_TYPE_CPU] = UCS_LINEAR_FUNC_ZERO;
371401

372402
/*
373403
* Add the latency of response/ACK back from the receiver.
@@ -448,6 +478,8 @@ ucp_proto_common_init_recv_perf(const ucp_proto_common_init_params_t *params,
448478

449479
recv_perf->perf[UCP_PROTO_PERF_TYPE_SINGLE] = recv_overhead;
450480
recv_perf->perf[UCP_PROTO_PERF_TYPE_MULTI] = recv_overhead;
481+
recv_perf->perf[UCP_PROTO_PERF_TYPE_CPU] = UCS_LINEAR_FUNC_ZERO;
482+
451483
ucp_proto_perf_range_add_data(recv_perf);
452484

453485
return UCS_OK;
@@ -503,8 +535,8 @@ ucp_proto_common_init_caps(const ucp_proto_common_init_params_t *params,
503535
parallel_stages[2] = &recv_perf;
504536

505537
/* Add ranges representing sending single fragment */
506-
status = ucp_proto_init_parallel_stages(&params->super, 0, frag_size,
507-
frag_size, 0.0, parallel_stages, 3);
538+
status = ucp_proto_init_parallel_stages(params, 0, frag_size, frag_size,
539+
0.0, parallel_stages, 3);
508540
if (status != UCS_OK) {
509541
goto out_deref_recv_perf;
510542
}

src/ucp/proto/proto_init.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ ucp_proto_perf_envelope_make(const ucp_proto_perf_list_t *perf_list,
7979
* @param [in] num_stages Number of parallel stages in the protocol.
8080
*/
8181
ucs_status_t
82-
ucp_proto_init_parallel_stages(const ucp_proto_init_params_t *params,
82+
ucp_proto_init_parallel_stages(const ucp_proto_common_init_params_t *params,
8383
size_t range_start, size_t range_end,
8484
size_t frag_size, double bias,
8585
const ucp_proto_perf_range_t **stages,

src/ucp/proto/proto_reconfig.c

+1-4
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ static ucs_status_t
7070
ucp_proto_reconfig_init(const ucp_proto_init_params_t *init_params)
7171
{
7272
ucp_proto_perf_range_t *perf_range = &init_params->caps->ranges[0];
73-
ucp_proto_perf_type_t perf_type;
7473

7574
/* Default reconfiguration protocol is a fallback for any case protocol
7675
* selection is unsuccessful. The protocol keeps queuing requests until they
@@ -85,9 +84,7 @@ ucp_proto_reconfig_init(const ucp_proto_init_params_t *init_params)
8584

8685
/* Set the performance estimation as worse than any other protocol */
8786
perf_range->max_length = SIZE_MAX;
88-
for (perf_type = 0; perf_type < UCP_PROTO_PERF_TYPE_LAST; ++perf_type) {
89-
perf_range->perf[perf_type] = ucs_linear_func_make(INFINITY, 0);
90-
}
87+
ucp_proto_perf_set(perf_range->perf, ucs_linear_func_make(INFINITY, 0));
9188

9289
perf_range->node = ucp_proto_perf_node_new_data("dummy", "");
9390
return UCS_OK;

src/ucp/rma/amo_sw.c

+2-1
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,8 @@ ucp_proto_amo_sw_init(const ucp_proto_init_params_t *init_params, unsigned flags
393393
.super.hdr_size = 0,
394394
.super.send_op = UCT_EP_OP_AM_BCOPY,
395395
.super.memtype_op = UCT_EP_OP_GET_SHORT,
396-
.super.flags = flags | UCP_PROTO_COMMON_INIT_FLAG_CAP_SEG_SIZE,
396+
.super.flags = flags | UCP_PROTO_COMMON_INIT_FLAG_SINGLE_FRAG |
397+
UCP_PROTO_COMMON_INIT_FLAG_CAP_SEG_SIZE,
397398
.lane_type = UCP_LANE_TYPE_AM,
398399
.tl_cap_flags = 0
399400
};

src/ucp/rndv/proto_rndv.c

+10-5
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,8 @@ ucp_proto_rndv_ctrl_init(const ucp_proto_rndv_ctrl_init_params_t *params)
275275
ucs_trace("rndv" UCP_PROTO_TIME_FMT(ctrl_latency),
276276
UCP_PROTO_TIME_ARG(ctrl_latency));
277277
ctrl_perf.perf[UCP_PROTO_PERF_TYPE_SINGLE] =
278-
ctrl_perf.perf[UCP_PROTO_PERF_TYPE_MULTI] = ucs_linear_func_add3(
278+
ctrl_perf.perf[UCP_PROTO_PERF_TYPE_MULTI] =
279+
ctrl_perf.perf[UCP_PROTO_PERF_TYPE_CPU] = ucs_linear_func_add3(
279280
memreg_time, ucs_linear_func_make(ctrl_latency, 0.0),
280281
params->unpack_time);
281282
ucp_proto_perf_range_add_data(&ctrl_perf);
@@ -306,9 +307,9 @@ ucp_proto_rndv_ctrl_init(const ucp_proto_rndv_ctrl_init_params_t *params)
306307

307308
parallel_stages[0] = &ctrl_perf;
308309
parallel_stages[1] = &remote_perf;
309-
status = ucp_proto_init_parallel_stages(&params->super.super,
310-
min_length, range_max_length,
311-
SIZE_MAX, params->perf_bias,
310+
status = ucp_proto_init_parallel_stages(&params->super, min_length,
311+
range_max_length, SIZE_MAX,
312+
params->perf_bias,
312313
parallel_stages, 2);
313314
if (status != UCS_OK) {
314315
goto out_deref_perf_node;
@@ -400,7 +401,9 @@ ucp_proto_rndv_ack_perf(const ucp_proto_init_params_t *init_params,
400401

401402
ack_perf[UCP_PROTO_PERF_TYPE_SINGLE] =
402403
ucs_linear_func_make(send_time + receive_time, 0);
403-
ack_perf[UCP_PROTO_PERF_TYPE_MULTI] = ucs_linear_func_make(send_time, 0);
404+
ack_perf[UCP_PROTO_PERF_TYPE_MULTI] =
405+
ack_perf[UCP_PROTO_PERF_TYPE_CPU] =
406+
ucs_linear_func_make(send_time, 0);
404407

405408
return UCS_OK;
406409
}
@@ -440,6 +443,8 @@ ucs_status_t ucp_proto_rndv_ack_init(const ucp_proto_init_params_t *init_params,
440443
ack_perf[UCP_PROTO_PERF_TYPE_SINGLE]);
441444
ucp_proto_perf_node_add_data(ack_perf_node, "mult",
442445
ack_perf[UCP_PROTO_PERF_TYPE_MULTI]);
446+
ucp_proto_perf_node_add_data(ack_perf_node, "cpu",
447+
ack_perf[UCP_PROTO_PERF_TYPE_CPU]);
443448

444449
/* Copy basic capabilities from bulk protocol */
445450
init_params->caps->cfg_thresh = bulk_caps->cfg_thresh;

0 commit comments

Comments
 (0)