diff --git a/lithops/serverless/backends/one/config.py b/lithops/serverless/backends/one/config.py index 2c0e3aab8..c55000a17 100644 --- a/lithops/serverless/backends/one/config.py +++ b/lithops/serverless/backends/one/config.py @@ -164,6 +164,7 @@ class OneConfigError(Exception): 'kubecfg_path': '/tmp/kube_config', 'oneke_config_path': None, 'delete': False, + 'minimum_nodes': 0 }) diff --git a/lithops/serverless/backends/one/one.py b/lithops/serverless/backends/one/one.py index f578dbd6c..eb9fc6ef2 100644 --- a/lithops/serverless/backends/one/one.py +++ b/lithops/serverless/backends/one/one.py @@ -45,6 +45,11 @@ class OpenNebula(KubernetesBackend): def __init__(self, one_config, internal_storage): logger.info("Initializing OpenNebula backend") + # Overwrite config values + self.name = 'one' + self.timeout = one_config['timeout'] + self.minimum_nodes = one_config['minimum_nodes'] + logger.debug("Initializing Oneflow python client") self.client = oneflow.OneFlowClient() @@ -53,67 +58,66 @@ def __init__(self, one_config, internal_storage): # service_template_id: instantiate master node if 'service_template_id' in one_config: - one_config['service_id'] = self._instantiate_oneke( + self.service_id = self._instantiate_oneke( one_config['service_template_id'], one_config['oneke_config'], one_config['oneke_config_path'] ) - self._wait_for_oneke(one_config['service_id'], one_config['timeout']) - elif 'service_id' not in one_config: + self._wait_for_oneke('RUNNING') + elif 'service_id' in one_config: + self.service_id = one_config['service_id'] + else: raise OneError( "OpenNebula backend must contain 'service_template_id' or 'service_id'" ) - - # Check OneKE status - self._check_oneke(one_config['service_id']) + self._check_oneke() # Get and Save kubeconfig from OneKE - kubecfg = self._get_kube_config(one_config['service_id']) + kubecfg = self._get_kube_config() with open(one_config['kubecfg_path'], 'w') as file: file.write(kubecfg) - - # Overwrite config values - self.name = 'one' self.kubecfg_path = one_config['kubecfg_path'] super().__init__(one_config, internal_storage) def invoke(self, docker_image_name, runtime_memory, job_payload): - # Get current nodes super()._get_nodes() - logger.info(f"Found {len(self.nodes)} nodes") - logger.info(f"Nodes: {self.nodes}") - # TODO: add dynamic scaling logic + pods, scale_nodes, chunksize, worker_processes = self._granularity( + job_payload['total_calls'] + ) + + # Scale nodes + # TODO: Add logic to see if it's worth waiting for the cooldown or not + if scale_nodes > len(self.nodes): + self._scale_oneke(self.nodes, scale_nodes) + + # Setup granularity + job_payload['max_workers'] = pods + job_payload['chunksize'] = chunksize + job_payload['worker_processes'] = worker_processes super().invoke(docker_image_name, runtime_memory, job_payload) def clear(self, job_keys=None): super().clear(job_keys) - # TODO: if all are deteleted -> suspend OneKE VMs (scale down) and - # delete them after X minutes - pass + # scale down only if the cooldown time has passed + state = self._get_latest_state() + if state != 'COOLDOWN': + super()._get_nodes() + self._scale_oneke(self.nodes, self.minimum_nodes) - def _check_oneke(self, service_id): + def _check_oneke(self): logger.info("Checking OpenNebula OneKE service status") # Check service status - _service_json = self.client.servicepool[service_id].info() - logs = _service_json[str(service_id)]['TEMPLATE']['BODY'].get('log', []) - state = self._get_latest_state(logs) - if state is None: - raise OneError("No state found in logs") + state = self._get_latest_state() if state != 'RUNNING': raise OneError(f"OpenNebula OneKE service is not 'RUNNING': {state}") # Check VMs status - vm_ids = { - node['vm_info']['VM']['ID'] - for role in _service_json[str(service_id)]['TEMPLATE']['BODY']['roles'] - for node in role['nodes'] - } - self._check_vms_status(vm_ids) + self._check_vms_status() def _instantiate_oneke(self, service_template_id, oneke_config, oneke_config_path): @@ -133,40 +137,40 @@ def _instantiate_oneke(self, service_template_id, oneke_config, oneke_config_pat return service_id - def _wait_for_oneke(self, service_id, timeout): + def _wait_for_oneke(self, state): start_time = time.time() - minutes_timeout = int(timeout/60) + minutes_timeout = int(self.timeout/60) logger.info( - f"Waiting for OneKE service to become 'RUNNING'. " - f"Be patient, this process can take up to {minutes_timeout} minutes" + f"Waiting for OneKE service to become {state}. " + f"Wait time: {minutes_timeout} minutes" ) while True: - _service_json = self.client.servicepool[service_id].info() - logs = _service_json[str(service_id)]['TEMPLATE']['BODY'].get('log', []) - if logs: - state = self._get_latest_state(logs) - # Check OneKE deployment status - if state == 'FAILED_DEPLOYING': - raise OneError("OneKE deployment has failed") - elif state == 'RUNNING': - break - # Check timeout elapsed_time = time.time() - start_time - if elapsed_time > timeout: + if elapsed_time > self.timeout: raise OneError( - f"Deployment timed out after {timeout} seconds. " - f"You can try again once OneKE is in RUNNING state with the service_id option" + f"Can't reach {state} state. OneKE timed out after {self.timeout} seconds. " + f"You can try again once OneKE is in `'RUNNING'` state with the `service_id` option" ) - time.sleep(10) - logger.info(f"OneKE service is RUNNING after {int(elapsed_time)} seconds") + # Check OneKE deployment status + current_state = self._get_latest_state() + if current_state == 'FAILED_DEPLOYING': + raise OneError("OneKE deployment has failed") + elif current_state == 'FAILED_SCALING': + raise OneError("OneKE scaling has failed") + elif current_state == state: + break - def _get_kube_config(self, service_id): + time.sleep(5) + logger.info(f"OneKE service is {state} after {int(elapsed_time)} seconds") + + + def _get_kube_config(self): # Get master VM ID master_vm_id = None - _service_json = self.client.servicepool[service_id].info() - roles = _service_json[str(service_id)]['TEMPLATE']['BODY']['roles'] + _service_json = self.client.servicepool[self.service_id].info() + roles = _service_json[str(self.service_id)]['TEMPLATE']['BODY']['roles'] for role in roles: if role['name'] == 'master': master_vm_id = role['nodes'][0]['vm_info']['VM']['ID'] @@ -174,8 +178,9 @@ def _get_kube_config(self, service_id): if master_vm_id is None: raise OneError( "Master VM ID not found. " - "Please change the name of the master node to 'master' and try again" + "Please change the name of the master node role to 'master' and try again" ) + # Get kubeconfig vm = self.pyone.vm.info(int(master_vm_id)) encoded_kubeconfig = vm.USER_TEMPLATE.get('ONEKE_KUBECONFIG') @@ -184,16 +189,23 @@ def _get_kube_config(self, service_id): return decoded_kubeconfig - def _get_latest_state(self, logs): + def _get_latest_state(self): + _service_json = self.client.servicepool[self.service_id].info() + logs = _service_json[str(self.service_id)]['TEMPLATE']['BODY'].get('log', []) for log in reversed(logs): if 'New state:' in log['message']: return log['message'].split(':')[-1].strip() - return None + raise OneError("No state found in logs") - def _check_vms_status(self, vm_ids): + def _check_vms_status(self): + _service_json = self.client.servicepool[self.service_id].info() + vm_ids = { + node['vm_info']['VM']['ID'] + for role in _service_json[str(self.service_id)]['TEMPLATE']['BODY']['roles'] + for node in role['nodes'] + } if len(vm_ids) == 0: - # TODO: scale up OneKE VMs to default size raise OneError("No VMs found in OneKE service") for vm_id in vm_ids: vm = self.pyone.vm.info(int(vm_id)) @@ -206,4 +218,45 @@ def _check_vms_status(self, vm_ids): f"VM {vm_id} fails validation: " f"STATE={state_desc} (code {state}), " f"LCM_STATE={lcm_state_desc} (code {lcm_state})" - ) \ No newline at end of file + ) + + + def _granularity(self, total_functions): + MAX_PODS_PER_NODE = 1 + MAX_FUNCTIONS_PER_POD = 25 + + # Calculate number of WORKERS (PODs) + # TODO: current number of pods depends on node resources + current_pods = len(self.nodes) // MAX_PODS_PER_NODE + req_pods = ( + total_functions + MAX_FUNCTIONS_PER_POD - 1 + ) // MAX_FUNCTIONS_PER_POD + pods = max(req_pods, current_pods) + + # Calculate number of NODES + # TODO: 1 node can have multiple pods + nodes = max(pods, len(self.nodes)) + + # Calculate number of functions executors per WORKER (POD) + # TODO: current number of executors depends on POD resources + worker_processes = 1 + + # Calculate number of functions per WORKER (POD) + # TODO: depends on worker_processes + chunksize = 1 + + logger.info( + f"Pods: {pods}, Nodes: {nodes}, " + f"Chunksize: {chunksize}, Worker Processes: {worker_processes}" + ) + return pods, nodes, chunksize, worker_processes + + + def _scale_oneke(self, nodes, scale_nodes): + logger.info(f"Scaling workers from {len(nodes)} to {scale_nodes} nodes") + # Ensure the service can be scaled + state = self._get_latest_state() + if len(self.nodes) == 0 and state == 'COOLDOWN': + self._wait_for_oneke('RUNNING') + self.client.servicepool[self.service_id].role["worker"].scale(int(scale_nodes)) + self._wait_for_oneke('COOLDOWN') diff --git a/lithops/serverless/backends/one/one_config.yaml b/lithops/serverless/backends/one/one_config.yaml index e501e409c..3c09f004b 100644 --- a/lithops/serverless/backends/one/one_config.yaml +++ b/lithops/serverless/backends/one/one_config.yaml @@ -8,7 +8,8 @@ one: public_network_id: # ID for Public vnet private_network_id: # ID for Private vnet (if not passed: create a new one) - delete: # if set to True, OneKE VMs will be deleted after all the jobs are finished + delete: # if set to True, OneKE will be deleted after all the jobs are finished + minimum_nodes: # minimum number of nodes in OneKE oneke_config_path: # PATH to OneKE JSON config timeout: # time to wait for OneKE to be ready