Skip to content

Commit

Permalink
F OpenNebula/one-aiops#70: basic scaling for one backend
Browse files Browse the repository at this point in the history
  • Loading branch information
MarioRobres committed Jun 28, 2024
1 parent a61e458 commit 7983dd7
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 58 deletions.
1 change: 1 addition & 0 deletions lithops/serverless/backends/one/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ class OneConfigError(Exception):
'kubecfg_path': '/tmp/kube_config',
'oneke_config_path': None,
'delete': False,
'minimum_nodes': 0
})


Expand Down
167 changes: 110 additions & 57 deletions lithops/serverless/backends/one/one.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ class OpenNebula(KubernetesBackend):
def __init__(self, one_config, internal_storage):
logger.info("Initializing OpenNebula backend")

# Overwrite config values
self.name = 'one'
self.timeout = one_config['timeout']
self.minimum_nodes = one_config['minimum_nodes']

logger.debug("Initializing Oneflow python client")
self.client = oneflow.OneFlowClient()

Expand All @@ -53,67 +58,66 @@ def __init__(self, one_config, internal_storage):

# service_template_id: instantiate master node
if 'service_template_id' in one_config:
one_config['service_id'] = self._instantiate_oneke(
self.service_id = self._instantiate_oneke(
one_config['service_template_id'],
one_config['oneke_config'],
one_config['oneke_config_path']
)
self._wait_for_oneke(one_config['service_id'], one_config['timeout'])
elif 'service_id' not in one_config:
self._wait_for_oneke('RUNNING')
elif 'service_id' in one_config:
self.service_id = one_config['service_id']
else:
raise OneError(
"OpenNebula backend must contain 'service_template_id' or 'service_id'"
)

# Check OneKE status
self._check_oneke(one_config['service_id'])
self._check_oneke()

# Get and Save kubeconfig from OneKE
kubecfg = self._get_kube_config(one_config['service_id'])
kubecfg = self._get_kube_config()
with open(one_config['kubecfg_path'], 'w') as file:
file.write(kubecfg)

# Overwrite config values
self.name = 'one'
self.kubecfg_path = one_config['kubecfg_path']

super().__init__(one_config, internal_storage)


def invoke(self, docker_image_name, runtime_memory, job_payload):
# Get current nodes
super()._get_nodes()
logger.info(f"Found {len(self.nodes)} nodes")
logger.info(f"Nodes: {self.nodes}")
# TODO: add dynamic scaling logic
pods, scale_nodes, chunksize, worker_processes = self._granularity(
job_payload['total_calls']
)

# Scale nodes
# TODO: Add logic to see if it's worth waiting for the cooldown or not
if scale_nodes > len(self.nodes):
self._scale_oneke(self.nodes, scale_nodes)

# Setup granularity
job_payload['max_workers'] = pods
job_payload['chunksize'] = chunksize
job_payload['worker_processes'] = worker_processes
super().invoke(docker_image_name, runtime_memory, job_payload)


def clear(self, job_keys=None):
super().clear(job_keys)
# TODO: if all are deteleted -> suspend OneKE VMs (scale down) and
# delete them after X minutes
pass
# scale down only if the cooldown time has passed
state = self._get_latest_state()
if state != 'COOLDOWN':
super()._get_nodes()
self._scale_oneke(self.nodes, self.minimum_nodes)


def _check_oneke(self, service_id):
def _check_oneke(self):
logger.info("Checking OpenNebula OneKE service status")

# Check service status
_service_json = self.client.servicepool[service_id].info()
logs = _service_json[str(service_id)]['TEMPLATE']['BODY'].get('log', [])
state = self._get_latest_state(logs)
if state is None:
raise OneError("No state found in logs")
state = self._get_latest_state()
if state != 'RUNNING':
raise OneError(f"OpenNebula OneKE service is not 'RUNNING': {state}")

# Check VMs status
vm_ids = {
node['vm_info']['VM']['ID']
for role in _service_json[str(service_id)]['TEMPLATE']['BODY']['roles']
for node in role['nodes']
}
self._check_vms_status(vm_ids)
self._check_vms_status()


def _instantiate_oneke(self, service_template_id, oneke_config, oneke_config_path):
Expand All @@ -133,49 +137,50 @@ def _instantiate_oneke(self, service_template_id, oneke_config, oneke_config_pat
return service_id


def _wait_for_oneke(self, service_id, timeout):
def _wait_for_oneke(self, state):
start_time = time.time()
minutes_timeout = int(timeout/60)
minutes_timeout = int(self.timeout/60)
logger.info(
f"Waiting for OneKE service to become 'RUNNING'. "
f"Be patient, this process can take up to {minutes_timeout} minutes"
f"Waiting for OneKE service to become {state}. "
f"Wait time: {minutes_timeout} minutes"
)
while True:
_service_json = self.client.servicepool[service_id].info()
logs = _service_json[str(service_id)]['TEMPLATE']['BODY'].get('log', [])
if logs:
state = self._get_latest_state(logs)
# Check OneKE deployment status
if state == 'FAILED_DEPLOYING':
raise OneError("OneKE deployment has failed")
elif state == 'RUNNING':
break

# Check timeout
elapsed_time = time.time() - start_time
if elapsed_time > timeout:
if elapsed_time > self.timeout:
raise OneError(
f"Deployment timed out after {timeout} seconds. "
f"You can try again once OneKE is in RUNNING state with the service_id option"
f"Can't reach {state} state. OneKE timed out after {self.timeout} seconds. "
f"You can try again once OneKE is in `'RUNNING'` state with the `service_id` option"
)
time.sleep(10)
logger.info(f"OneKE service is RUNNING after {int(elapsed_time)} seconds")

# Check OneKE deployment status
current_state = self._get_latest_state()
if current_state == 'FAILED_DEPLOYING':
raise OneError("OneKE deployment has failed")
elif current_state == 'FAILED_SCALING':
raise OneError("OneKE scaling has failed")
elif current_state == state:
break

def _get_kube_config(self, service_id):
time.sleep(5)
logger.info(f"OneKE service is {state} after {int(elapsed_time)} seconds")


def _get_kube_config(self):
# Get master VM ID
master_vm_id = None
_service_json = self.client.servicepool[service_id].info()
roles = _service_json[str(service_id)]['TEMPLATE']['BODY']['roles']
_service_json = self.client.servicepool[self.service_id].info()
roles = _service_json[str(self.service_id)]['TEMPLATE']['BODY']['roles']
for role in roles:
if role['name'] == 'master':
master_vm_id = role['nodes'][0]['vm_info']['VM']['ID']
break
if master_vm_id is None:
raise OneError(
"Master VM ID not found. "
"Please change the name of the master node to 'master' and try again"
"Please change the name of the master node role to 'master' and try again"
)

# Get kubeconfig
vm = self.pyone.vm.info(int(master_vm_id))
encoded_kubeconfig = vm.USER_TEMPLATE.get('ONEKE_KUBECONFIG')
Expand All @@ -184,16 +189,23 @@ def _get_kube_config(self, service_id):
return decoded_kubeconfig


def _get_latest_state(self, logs):
def _get_latest_state(self):
_service_json = self.client.servicepool[self.service_id].info()
logs = _service_json[str(self.service_id)]['TEMPLATE']['BODY'].get('log', [])
for log in reversed(logs):
if 'New state:' in log['message']:
return log['message'].split(':')[-1].strip()
return None
raise OneError("No state found in logs")


def _check_vms_status(self, vm_ids):
def _check_vms_status(self):
_service_json = self.client.servicepool[self.service_id].info()
vm_ids = {
node['vm_info']['VM']['ID']
for role in _service_json[str(self.service_id)]['TEMPLATE']['BODY']['roles']
for node in role['nodes']
}
if len(vm_ids) == 0:
# TODO: scale up OneKE VMs to default size
raise OneError("No VMs found in OneKE service")
for vm_id in vm_ids:
vm = self.pyone.vm.info(int(vm_id))
Expand All @@ -206,4 +218,45 @@ def _check_vms_status(self, vm_ids):
f"VM {vm_id} fails validation: "
f"STATE={state_desc} (code {state}), "
f"LCM_STATE={lcm_state_desc} (code {lcm_state})"
)
)


def _granularity(self, total_functions):
MAX_PODS_PER_NODE = 1
MAX_FUNCTIONS_PER_POD = 25

# Calculate number of WORKERS (PODs)
# TODO: current number of pods depends on node resources
current_pods = len(self.nodes) // MAX_PODS_PER_NODE
req_pods = (
total_functions + MAX_FUNCTIONS_PER_POD - 1
) // MAX_FUNCTIONS_PER_POD
pods = max(req_pods, current_pods)

# Calculate number of NODES
# TODO: 1 node can have multiple pods
nodes = max(pods, len(self.nodes))

# Calculate number of functions executors per WORKER (POD)
# TODO: current number of executors depends on POD resources
worker_processes = 1

# Calculate number of functions per WORKER (POD)
# TODO: depends on worker_processes
chunksize = 1

logger.info(
f"Pods: {pods}, Nodes: {nodes}, "
f"Chunksize: {chunksize}, Worker Processes: {worker_processes}"
)
return pods, nodes, chunksize, worker_processes


def _scale_oneke(self, nodes, scale_nodes):
logger.info(f"Scaling workers from {len(nodes)} to {scale_nodes} nodes")
# Ensure the service can be scaled
state = self._get_latest_state()
if len(self.nodes) == 0 and state == 'COOLDOWN':
self._wait_for_oneke('RUNNING')
self.client.servicepool[self.service_id].role["worker"].scale(int(scale_nodes))
self._wait_for_oneke('COOLDOWN')
3 changes: 2 additions & 1 deletion lithops/serverless/backends/one/one_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ one:
public_network_id: # ID for Public vnet
private_network_id: # ID for Private vnet (if not passed: create a new one)

delete: # if set to True, OneKE VMs will be deleted after all the jobs are finished
delete: # if set to True, OneKE will be deleted after all the jobs are finished
minimum_nodes: # minimum number of nodes in OneKE

oneke_config_path: # PATH to OneKE JSON config
timeout: # time to wait for OneKE to be ready
Expand Down

0 comments on commit 7983dd7

Please sign in to comment.