Skip to content

Commit 5da7c4d

Browse files
authored
ILP-based DAG Optimizer (skypilot-org#637)
* Refactor optimizer * Remove unnecessary import * yapf * Minor fix * Add NotImplementedError * ILP-based optimization * yapf * Add pulp in setup.py * Minor * Rename vars & Annotate types * Minor fix * Minor * Minor fix * yapf * Fix type annotation * yapf * [Minor] Address comment * Add type alias & enhance comments * yapf * Fix minor error in dag_lib.Dag * Add is_chain to Dag * Address comments * yapf * yapf * Address comments * Add total in optimizer msg * Add a comment in is_chain * Address reviews & Fix egress msg * yapf * Minor fix * Fix egress msg * yapf * obj -> objective * pass yapf * cost -> cost/time * Add random DAG generator * Add random DAG generator * Change variable names * Minor fix * yapf on test_random_dag.py * Add docstring * Rename * _optimize_cost -> _optimize_objective * Minor * Default num_tasks to 10 * Add docstrings & Fix variable names * yapf * Minor * Improve test_optimizer_random_dag * yapf * Fix optimizer * Add docstring about ILP objective * fix typo * yapf * Minor * Add monkeyptach * Fix docstring * yapf
1 parent 4ece4e6 commit 5da7c4d

File tree

3 files changed

+287
-4
lines changed

3 files changed

+287
-4
lines changed

sky/optimizer.py

+152-3
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ def optimize(dag: 'dag_lib.Dag',
8585
# This function is effectful: mutates every node in 'dag' by setting
8686
# node.best_resources if it is None.
8787
dag = Optimizer._add_dummy_source_sink_nodes(dag)
88-
optimized_dag, unused_best_plan = Optimizer._optimize_cost(
88+
optimized_dag, unused_best_plan = Optimizer._optimize_objective(
8989
dag,
9090
minimize_cost=minimize == OptimizeTarget.COST,
9191
blocked_launchable_resources=blocked_launchable_resources,
@@ -339,6 +339,154 @@ def _optimize_by_dp(
339339
best_resources = dp_point_backs[node][best_resources]
340340
return best_plan, best_total_objective
341341

342+
@staticmethod
343+
def _optimize_by_ilp(
344+
graph,
345+
topo_order: List[Task],
346+
node_to_cost_map: _TaskToCostMap,
347+
minimize_cost: bool = True,
348+
) -> Tuple[Dict[Task, resources_lib.Resources], float]:
349+
"""Optimizes a general DAG using an ILP solver.
350+
351+
Notations:
352+
V: the set of nodes (tasks).
353+
E: the set of edges (dependencies).
354+
k: node -> [r.cost for r in node.resources].
355+
F: (node i, node j) -> the egress cost/time between node i and j.
356+
c: node -> one-hot decision vector. c[node][i] = 1 means
357+
the node is assigned to the i-th resource.
358+
e: (node i, node j) -> linearization of c[node i] x c[node j].
359+
e[node i][node j][a][b] = 1 means node i and node j are assigned
360+
to the a-th and the b-th resources, respectively.
361+
362+
Objective:
363+
For cost optimization,
364+
minimize_{c} sum(c[v]^T @ k[v] for each v in V) +
365+
sum(c[u]^T @ F[u][v] @ c[v] for each u, v in E)
366+
s.t. sum(c[v] == 1) for each v in V
367+
which is equivalent (linearized) to,
368+
minimize_{c, e} sum(c[v]^T @ k[v] for each v in V) +
369+
sum(e[u][v]^T @ F[u][v] for each u, v in E)
370+
s.t. sum(c[v] == 1) for each v in V (i.e., c is one-hot)
371+
sum(e[u][v] == 1) for each u, v in E (i.e., e is one-hot)
372+
e[u][v] = flatten(c[u] @ c[v]^T) for each u, v in E
373+
The first term of the objective indicates the execution cost
374+
of the task v, and the second term indicates the egress cost
375+
of the parent task u to the task v.
376+
377+
For time optimization,
378+
minimize_{c} finish_time[sink_node]
379+
s.t. finish_time[v] >= c[v]^T @ k[v] + finish_time[u] +
380+
c[u]^T @ F[u][v] @ c[v]
381+
for each u, v in E
382+
sum(c[v] == 1) for each v in V
383+
which is equivalent (linearized) to,
384+
minimize_{c, e} finish_time[sink_node]
385+
s.t. finish_time[v] >= c[v]^T @ k[v] + finish_time[u] +
386+
e[u][v]^T @ F[u][v]
387+
for each u, v in E
388+
sum(c[v] == 1) for each v in V (i.e., c is one-hot)
389+
sum(e[u][v] == 1) for each u, v in E (i.e., e is one-hot)
390+
e[u][v] = flatten(c[u] @ c[v]^T) for each u, v in E
391+
The first term of the objective indicates the execution time
392+
of the task v, and the other two terms indicate that the task v
393+
starts executing no sooner than its parent tasks are finished and
394+
the output data from the parents has arrived to the task v.
395+
"""
396+
import pulp # pylint: disable=import-outside-toplevel
397+
398+
if minimize_cost:
399+
prob = pulp.LpProblem('Sky-Cost-Optimization', pulp.LpMinimize)
400+
else:
401+
prob = pulp.LpProblem('Sky-Runtime-Optimization', pulp.LpMinimize)
402+
403+
# Prepare the constants.
404+
V = topo_order # pylint: disable=invalid-name
405+
E = graph.edges() # pylint: disable=invalid-name
406+
k = {
407+
node: list(resource_cost_map.values())
408+
for node, resource_cost_map in node_to_cost_map.items()
409+
}
410+
F = collections.defaultdict(dict) # pylint: disable=invalid-name
411+
for u, v in E:
412+
F[u][v] = []
413+
for r_u in node_to_cost_map[u].keys():
414+
for r_v in node_to_cost_map[v].keys():
415+
F[u][v].append(
416+
Optimizer._egress_cost_or_time(minimize_cost, u, r_u, v,
417+
r_v))
418+
419+
# Define the decision variables.
420+
c = {
421+
v: pulp.LpVariable.matrix(v.name, (range(len(k[v])),), cat='Binary')
422+
for v in V
423+
}
424+
425+
e = collections.defaultdict(dict)
426+
for u, v in E:
427+
num_vars = len(c[u]) * len(c[v])
428+
e[u][v] = pulp.LpVariable.matrix(f'({u.name}->{v.name})',
429+
(range(num_vars),),
430+
cat='Binary')
431+
432+
# Formulate the constraints.
433+
# 1. c[v] is an one-hot vector.
434+
for v in V:
435+
prob += pulp.lpSum(c[v]) == 1
436+
437+
# 2. e[u][v] is an one-hot vector.
438+
for u, v in E:
439+
prob += pulp.lpSum(e[u][v]) == 1
440+
441+
# 3. e[u][v] linearizes c[u] x c[v].
442+
for u, v in E:
443+
e_uv = e[u][v] # 1-d one-hot vector
444+
N_u = len(c[u]) # pylint: disable=invalid-name
445+
N_v = len(c[v]) # pylint: disable=invalid-name
446+
447+
for row in range(N_u):
448+
prob += pulp.lpSum(
449+
e_uv[N_v * row + col] for col in range(N_v)) == c[u][row]
450+
451+
for col in range(N_v):
452+
prob += pulp.lpSum(
453+
e_uv[N_v * row + col] for row in range(N_u)) == c[v][col]
454+
455+
# Formulate the objective.
456+
if minimize_cost:
457+
objective = 0
458+
for v in V:
459+
objective += pulp.lpDot(c[v], k[v])
460+
for u, v in E:
461+
objective += pulp.lpDot(e[u][v], F[u][v])
462+
else:
463+
# We need additional decision variables.
464+
finish_time = {
465+
v: pulp.LpVariable(f'lat({v})', lowBound=0) for v in V
466+
}
467+
for u, v in E:
468+
prob += finish_time[v] >= (pulp.lpDot(
469+
c[v], k[v]) + finish_time[u] + pulp.lpDot(e[u][v], F[u][v]))
470+
sink_node = V[-1]
471+
objective = finish_time[sink_node]
472+
prob += objective
473+
474+
# Solve the optimization problem.
475+
prob.solve(solver=pulp.PULP_CBC_CMD(msg=False))
476+
assert prob.status != pulp.LpStatusInfeasible, \
477+
'Cannot solve the optimization problem'
478+
best_total_objective = prob.objective.value()
479+
480+
# Find the best plan for the DAG.
481+
# node -> best resources
482+
best_plan = {}
483+
for node, variables in c.items():
484+
selected = [variable.value() for variable in variables].index(1)
485+
best_resources = list(node_to_cost_map[node].keys())[selected]
486+
node.best_resources = best_resources
487+
best_plan[node] = best_resources
488+
return best_plan, best_total_objective
489+
342490
@staticmethod
343491
def _compute_total_time(
344492
graph,
@@ -510,7 +658,7 @@ def _print_candidates(node_to_candidate_map: _TaskToPerCloudCandidates):
510658
f'To list more details, run \'sky show-gpus {acc_name}\'.')
511659

512660
@staticmethod
513-
def _optimize_cost(
661+
def _optimize_objective(
514662
dag: 'dag_lib.Dag',
515663
minimize_cost: bool = True,
516664
blocked_launchable_resources: Optional[List[
@@ -540,7 +688,8 @@ def _optimize_cost(
540688
best_plan, best_total_objective = Optimizer._optimize_by_dp(
541689
topo_order, node_to_cost_map, minimize_cost)
542690
else:
543-
raise NotImplementedError('Currently Sky only supports chain DAGs.')
691+
best_plan, best_total_objective = Optimizer._optimize_by_ilp(
692+
graph, topo_order, node_to_cost_map, minimize_cost)
544693

545694
if minimize_cost:
546695
total_time = Optimizer._compute_total_time(graph, topo_order,

sky/setup_files/setup.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@
4343
# This is used by ray. The latest 1.44.0 will generate an error
4444
# `Fork support is only compatible with the epoll1 and poll
4545
# polling strategies`
46-
'grpcio<=1.43.0'
46+
'grpcio<=1.43.0',
47+
'pulp',
4748
]
4849

4950
extras_require = {

tests/test_optimizer_random_dag.py

+133
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
import copy
2+
import random
3+
4+
import numpy as np
5+
import sky
6+
7+
CLOUDS = {
8+
'AWS': sky.AWS(),
9+
'GCP': sky.GCP(),
10+
'Azure': sky.Azure(),
11+
}
12+
ALL_INSTANCE_TYPES = sum(sky.list_accelerators(gpus_only=True).values(), [])
13+
GCP_INSTANCE_TYPES = list(sky.GCP._ON_DEMAND_PRICES.keys())
14+
15+
DUMMY_NODES = [
16+
sky.optimizer._DUMMY_SOURCE_NAME,
17+
sky.optimizer._DUMMY_SINK_NAME,
18+
]
19+
20+
21+
def generate_random_dag(
22+
num_tasks: int,
23+
seed: int = 0,
24+
max_num_nodes: int = 10,
25+
max_num_parents: int = 5,
26+
max_num_candidate_resources: int = 5,
27+
max_task_runtime: int = 3600,
28+
max_data_size: int = 1000,
29+
) -> sky.Dag:
30+
"""Generates a random Sky DAG to test Sky optimizer."""
31+
random.seed(seed)
32+
with sky.Dag() as dag:
33+
for i in range(num_tasks):
34+
op = sky.Task(name=f'task{i}')
35+
task_runtime = random.random() * max_task_runtime
36+
op.set_time_estimator(lambda _: task_runtime)
37+
op.num_nodes = random.randint(1, max_num_nodes)
38+
39+
if i == 0:
40+
num_parents = 0
41+
else:
42+
num_parents = random.randint(0, min(i, max_num_parents))
43+
44+
if num_parents == 0:
45+
src_cloud = random.choice(['s3:', 'gs:', None])
46+
src_volume = random.randint(0, max_data_size)
47+
else:
48+
parents = random.choices(dag.tasks[:-1], k=num_parents)
49+
for parent in parents:
50+
parent >> op
51+
# NOTE: Sky only takes single input data source
52+
src_cloud = parents[0]
53+
# Sky uses the parent's output data size
54+
src_volume = None
55+
56+
if src_cloud is not None:
57+
op.set_inputs(src_cloud, src_volume)
58+
op.set_outputs('CLOUD', random.randint(0, max_data_size))
59+
60+
num_candidates = random.randint(1, max_num_candidate_resources)
61+
candidate_instance_types = random.choices(ALL_INSTANCE_TYPES,
62+
k=num_candidates)
63+
op.set_resources({
64+
sky.Resources(
65+
cloud=CLOUDS[candidate.cloud],
66+
instance_type=candidate.instance_type \
67+
if candidate.cloud != 'GCP' \
68+
else random.choice(GCP_INSTANCE_TYPES),
69+
accelerators={
70+
candidate.accelerator_name: candidate.accelerator_count},
71+
)
72+
for candidate in candidate_instance_types
73+
})
74+
return dag
75+
76+
77+
def find_min_objective(dag: sky.Dag, minimize_cost: bool) -> float:
78+
"""Manually finds the minimum objective value."""
79+
graph = dag.get_graph()
80+
topo_order = dag.tasks
81+
82+
def _optimize_by_brute_force(tasks, plan):
83+
"""Optimizes a Sky DAG in a brute-force manner."""
84+
# NOTE: Here we assume that the Sky DAG is topologically sorted.
85+
task = tasks[0]
86+
min_objective = np.inf
87+
for resources in task.get_resources():
88+
assert task.name in DUMMY_NODES or resources.is_launchable()
89+
plan[task] = resources
90+
if len(tasks) == 1:
91+
if minimize_cost:
92+
objective = sky.Optimizer._compute_total_cost(
93+
graph, topo_order, plan)
94+
else:
95+
objective = sky.Optimizer._compute_total_time(
96+
graph, topo_order, plan)
97+
else:
98+
objective = _optimize_by_brute_force(tasks[1:], plan)
99+
if objective < min_objective:
100+
min_objective = objective
101+
return min_objective
102+
103+
return _optimize_by_brute_force(topo_order, {})
104+
105+
106+
def compare_optimization_results(dag: sky.Dag, minimize_cost: bool):
107+
copy_dag = copy.deepcopy(dag)
108+
109+
_, optimizer_plan = sky.Optimizer._optimize_objective(dag, minimize_cost)
110+
if minimize_cost:
111+
objective = sky.Optimizer._compute_total_cost(dag.get_graph(),
112+
dag.tasks, optimizer_plan)
113+
else:
114+
objective = sky.Optimizer._compute_total_time(dag.get_graph(),
115+
dag.tasks, optimizer_plan)
116+
117+
min_objective = find_min_objective(copy_dag, minimize_cost)
118+
assert objective == min_objective
119+
120+
121+
def test_optimizer(monkeypatch):
122+
enabled_clouds = list(CLOUDS.values())
123+
monkeypatch.setattr(
124+
'sky.global_user_state.get_enabled_clouds',
125+
lambda: enabled_clouds,
126+
)
127+
monkeypatch.setattr('sky.check.check', lambda *_args, **_kwargs: None)
128+
129+
dag = generate_random_dag(num_tasks=5, seed=0)
130+
dag = sky.Optimizer._add_dummy_source_sink_nodes(dag)
131+
132+
compare_optimization_results(dag, minimize_cost=True)
133+
compare_optimization_results(dag, minimize_cost=False)

0 commit comments

Comments
 (0)