Skip to content

Commit

Permalink
Test data are loaded on the globalworker. (#489)
Browse files Browse the repository at this point in the history
* Add Mipdb to GlobalWorker: Integrate Mipdb into GlobalWorker to enable dataset monitoring provided by the Worker Landscape Aggregator (WLA).

* Added test datasets.
Exclude this test data from Exareme2 flow

* Update Deployment Logic:
- Kubernetes: Ensure GlobalWorker is deployed even in single worker deployments.
- Flower: Configure the server to run on the GlobalWorker.
  • Loading branch information
KFilippopolitis authored Jul 18, 2024
1 parent bddfea1 commit 644ef1b
Show file tree
Hide file tree
Showing 37 changed files with 984 additions and 274 deletions.
110 changes: 36 additions & 74 deletions .github/workflows/prod_env_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -203,83 +203,45 @@ jobs:
- name: Wait for pods to get healthy
run: timeout 300 bash -c 'while true; do if kubectl get pods --no-headers | awk '\''{if ($2 != "1/1" && $2 != "2/2" && $2 != "3/3" && $2 != "4/4") exit 1;}'\''; then echo "All pods are ready!"; break; else kubectl get pods -o wide; sleep 20; fi done'

- name: Initialize MONETDB from mipdb container
- name: Load data models into localworkers and globalworker
run: |
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[0].metadata.name') -c db-importer -- sh -c 'mipdb init'
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[1].metadata.name') -c db-importer -- sh -c 'mipdb init'
- name: Load dementia data model into localworkers
run: |
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[0].metadata.name') -c db-importer -- sh -c 'mipdb add-data-model /opt/data/dementia_v_0_1/CDEsMetadata.json'
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[1].metadata.name') -c db-importer -- sh -c 'mipdb add-data-model /opt/data/dementia_v_0_1/CDEsMetadata.json'
- name: Load dementia dataset csvs with suffix even numbers into localworker 1
run: |
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[0].metadata.name') -c db-importer -- sh -c 'mipdb add-dataset /opt/data/dementia_v_0_1/edsd0.csv -d dementia -v 0.1'
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[0].metadata.name') -c db-importer -- sh -c 'mipdb add-dataset /opt/data/dementia_v_0_1/ppmi0.csv -d dementia -v 0.1'
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[0].metadata.name') -c db-importer -- sh -c 'mipdb add-dataset /opt/data/dementia_v_0_1/desd-synthdata0.csv -d dementia -v 0.1'
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[0].metadata.name') -c db-importer -- sh -c 'mipdb add-dataset /opt/data/dementia_v_0_1/edsd2.csv -d dementia -v 0.1'
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[0].metadata.name') -c db-importer -- sh -c 'mipdb add-dataset /opt/data/dementia_v_0_1/ppmi2.csv -d dementia -v 0.1'
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[0].metadata.name') -c db-importer -- sh -c 'mipdb add-dataset /opt/data/dementia_v_0_1/desd-synthdata2.csv -d dementia -v 0.1'
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[0].metadata.name') -c db-importer -- sh -c 'mipdb add-dataset /opt/data/dementia_v_0_1/edsd4.csv -d dementia -v 0.1'
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[0].metadata.name') -c db-importer -- sh -c 'mipdb add-dataset /opt/data/dementia_v_0_1/ppmi4.csv -d dementia -v 0.1'
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[0].metadata.name') -c db-importer -- sh -c 'mipdb add-dataset /opt/data/dementia_v_0_1/desd-synthdata4.csv -d dementia -v 0.1'
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[0].metadata.name') -c db-importer -- sh -c 'mipdb add-dataset /opt/data/dementia_v_0_1/edsd6.csv -d dementia -v 0.1'
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[0].metadata.name') -c db-importer -- sh -c 'mipdb add-dataset /opt/data/dementia_v_0_1/ppmi6.csv -d dementia -v 0.1'
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[0].metadata.name') -c db-importer -- sh -c 'mipdb add-dataset /opt/data/dementia_v_0_1/desd-synthdata6.csv -d dementia -v 0.1'
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[0].metadata.name') -c db-importer -- sh -c 'mipdb add-dataset /opt/data/dementia_v_0_1/edsd8.csv -d dementia -v 0.1'
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[0].metadata.name') -c db-importer -- sh -c 'mipdb add-dataset /opt/data/dementia_v_0_1/ppmi8.csv -d dementia -v 0.1'
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[0].metadata.name') -c db-importer -- sh -c 'mipdb add-dataset /opt/data/dementia_v_0_1/desd-synthdata8.csv -d dementia -v 0.1'
- name: Load dementia dataset csvs with suffix odd numbers into localworker 2
run: |
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[1].metadata.name') -c db-importer -- sh -c 'mipdb add-dataset /opt/data/dementia_v_0_1/edsd1.csv -d dementia -v 0.1'
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[1].metadata.name') -c db-importer -- sh -c 'mipdb add-dataset /opt/data/dementia_v_0_1/ppmi1.csv -d dementia -v 0.1'
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[1].metadata.name') -c db-importer -- sh -c 'mipdb add-dataset /opt/data/dementia_v_0_1/desd-synthdata1.csv -d dementia -v 0.1'
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[1].metadata.name') -c db-importer -- sh -c 'mipdb add-dataset /opt/data/dementia_v_0_1/edsd3.csv -d dementia -v 0.1'
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[1].metadata.name') -c db-importer -- sh -c 'mipdb add-dataset /opt/data/dementia_v_0_1/ppmi3.csv -d dementia -v 0.1'
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[1].metadata.name') -c db-importer -- sh -c 'mipdb add-dataset /opt/data/dementia_v_0_1/desd-synthdata3.csv -d dementia -v 0.1'
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[1].metadata.name') -c db-importer -- sh -c 'mipdb add-dataset /opt/data/dementia_v_0_1/edsd5.csv -d dementia -v 0.1'
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[1].metadata.name') -c db-importer -- sh -c 'mipdb add-dataset /opt/data/dementia_v_0_1/ppmi5.csv -d dementia -v 0.1'
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[1].metadata.name') -c db-importer -- sh -c 'mipdb add-dataset /opt/data/dementia_v_0_1/desd-synthdata5.csv -d dementia -v 0.1'
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[1].metadata.name') -c db-importer -- sh -c 'mipdb add-dataset /opt/data/dementia_v_0_1/edsd7.csv -d dementia -v 0.1'
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[1].metadata.name') -c db-importer -- sh -c 'mipdb add-dataset /opt/data/dementia_v_0_1/ppmi7.csv -d dementia -v 0.1'
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[1].metadata.name') -c db-importer -- sh -c 'mipdb add-dataset /opt/data/dementia_v_0_1/desd-synthdata7.csv -d dementia -v 0.1'
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[1].metadata.name') -c db-importer -- sh -c 'mipdb add-dataset /opt/data/dementia_v_0_1/edsd9.csv -d dementia -v 0.1'
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[1].metadata.name') -c db-importer -- sh -c 'mipdb add-dataset /opt/data/dementia_v_0_1/ppmi9.csv -d dementia -v 0.1'
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[1].metadata.name') -c db-importer -- sh -c 'mipdb add-dataset /opt/data/dementia_v_0_1/desd-synthdata9.csv -d dementia -v 0.1'
- name: Load tbi data model into localworkers
run: |
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[0].metadata.name') -c db-importer -- sh -c 'mipdb add-data-model /opt/data/tbi_v_0_1/CDEsMetadata.json'
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[1].metadata.name') -c db-importer -- sh -c 'mipdb add-data-model /opt/data/tbi_v_0_1/CDEsMetadata.json'
- name: Load tbi dataset csvs with suffix of even numbers into localworker 1
run: |
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[0].metadata.name') -c db-importer -- sh -c 'mipdb add-dataset /opt/data/tbi_v_0_1/dummy_tbi0.csv -d tbi -v 0.1'
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[0].metadata.name') -c db-importer -- sh -c 'mipdb add-dataset /opt/data/tbi_v_0_1/dummy_tbi2.csv -d tbi -v 0.1'
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[0].metadata.name') -c db-importer -- sh -c 'mipdb add-dataset /opt/data/tbi_v_0_1/dummy_tbi4.csv -d tbi -v 0.1'
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[0].metadata.name') -c db-importer -- sh -c 'mipdb add-dataset /opt/data/tbi_v_0_1/dummy_tbi6.csv -d tbi -v 0.1'
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[0].metadata.name') -c db-importer -- sh -c 'mipdb add-dataset /opt/data/tbi_v_0_1/dummy_tbi8.csv -d tbi -v 0.1'
- name: Load tbi dataset csvs with suffix of odd numbers into localworker 2
LOCALWORKER1=$(kubectl get pods -o json | jq -r '.items[] | select(.spec.nodeName=="localworker1") | .metadata.name')
LOCALWORKER2=$(kubectl get pods -o json | jq -r '.items[] | select(.spec.nodeName=="localworker2") | .metadata.name')
GLOBALWORKER=$(kubectl get pods -l=nodeType=globalworker -o json | jq -r ".items[0].metadata.name")
for POD in $LOCALWORKER1 $LOCALWORKER2 $GLOBALWORKER; do
kubectl exec $POD -c db-importer -- sh -c 'mipdb init'
for model in dementia_v_0_1 tbi_v_0_1 longitudinal_dementia_v_0_1; do
kubectl exec $POD -c db-importer -- sh -c "mipdb add-data-model /opt/data/${model}/CDEsMetadata.json"
done
done
- name: Load Dataset CSVs into Localworkers and Globalworker
run: |
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[1].metadata.name') -c db-importer -- sh -c 'mipdb add-dataset /opt/data/tbi_v_0_1/dummy_tbi1.csv -d tbi -v 0.1'
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[1].metadata.name') -c db-importer -- sh -c 'mipdb add-dataset /opt/data/tbi_v_0_1/dummy_tbi3.csv -d tbi -v 0.1'
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[1].metadata.name') -c db-importer -- sh -c 'mipdb add-dataset /opt/data/tbi_v_0_1/dummy_tbi5.csv -d tbi -v 0.1'
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[1].metadata.name') -c db-importer -- sh -c 'mipdb add-dataset /opt/data/tbi_v_0_1/dummy_tbi7.csv -d tbi -v 0.1'
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[1].metadata.name') -c db-importer -- sh -c 'mipdb add-dataset /opt/data/tbi_v_0_1/dummy_tbi9.csv -d tbi -v 0.1'
LOCALWORKER1=$(kubectl get pods -o json | jq -r '.items[] | select(.spec.nodeName=="localworker1") | .metadata.name')
LOCALWORKER2=$(kubectl get pods -o json | jq -r '.items[] | select(.spec.nodeName=="localworker2") | .metadata.name')
GLOBALWORKER=$(kubectl get pods -l=nodeType=globalworker -o json | jq -r ".items[0].metadata.name")
for model in dementia_v_0_1 tbi_v_0_1 longitudinal_dementia_v_0_1; do
for filepath in $(kubectl exec $GLOBALWORKER -c db-importer -- ls /opt/data/${model}); do
filepath=/opt/data/${model}/${filepath}
if [[ $filepath == *test.csv ]]; then
echo "Loading file: $filepath at $GLOBALWORKER"
kubectl exec $GLOBALWORKER -c db-importer -- mipdb add-dataset $filepath -d ${model%_v_*} -v 0.1
elif [[ $filepath == *.csv ]]; then
filename=$(basename $filepath)
suffix=$(echo $filename | grep -o '[0-9]*' | tail -1)
if (( suffix % 2 == 0 )); then
POD_NAME=$LOCALWORKER2
else
POD_NAME=$LOCALWORKER1
fi
echo "Loading file: $filepath at $POD_NAME"
kubectl exec $POD_NAME -c db-importer -- mipdb add-dataset $filepath -d ${model%_v_*} -v 0.1
fi
done
done
- name: Load longitudinal dementia data model into localworkers
run: |
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[0].metadata.name') -c db-importer -- sh -c 'mipdb add-data-model /opt/data/longitudinal_dementia_v_0_1/CDEsMetadata.json'
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[1].metadata.name') -c db-importer -- sh -c 'mipdb add-data-model /opt/data/longitudinal_dementia_v_0_1/CDEsMetadata.json'
- name: Load longitudinal dementia datasets csvs into localworkers
run: |
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[0].metadata.name') -c db-importer -- sh -c 'mipdb add-dataset /opt/data/longitudinal_dementia_v_0_1/longitudinal_dementia0.csv -d longitudinal_dementia -v 0.1'
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[1].metadata.name') -c db-importer -- sh -c 'mipdb add-dataset /opt/data/longitudinal_dementia_v_0_1/longitudinal_dementia1.csv -d longitudinal_dementia -v 0.1'
kubectl exec $(kubectl get pods -l=nodeType=localworker -o json | jq -r '.items[1].metadata.name') -c db-importer -- sh -c 'mipdb add-dataset /opt/data/longitudinal_dementia_v_0_1/longitudinal_dementia2.csv -d longitudinal_dementia -v 0.1'
- name: Controller logs
run: kubectl logs -l app=exareme2-controller --tail -1
Expand Down
17 changes: 1 addition & 16 deletions exareme2/algorithms/flower/inputdata_preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ def apply_inputdata(df: pd.DataFrame, inputdata: Inputdata) -> pd.DataFrame:
return df


def fetch_client_data(inputdata) -> pd.DataFrame:
FLOWER_LOGGER.error(f"BROOO {os.getenv('CSV_PATHS')}")
def fetch_data(inputdata) -> pd.DataFrame:
dataframes = [
pd.read_csv(f"{os.getenv('DATA_PATH')}{csv_path}")
for csv_path in os.getenv("CSV_PATHS").split(",")
Expand All @@ -49,20 +48,6 @@ def fetch_client_data(inputdata) -> pd.DataFrame:
return apply_inputdata(df, inputdata)


def fetch_server_data(inputdata) -> pd.DataFrame:
data_folder = Path(
f"{os.getenv('DATA_PATH')}/{inputdata.data_model.split(':')[0]}_v_0_1"
)
print(f"Loading data from folder: {data_folder}")
dataframes = [
pd.read_csv(data_folder / f"{dataset}.csv")
for dataset in inputdata.datasets
if (data_folder / f"{dataset}.csv").exists()
]
df = pd.concat(dataframes, ignore_index=True)
return apply_inputdata(df, inputdata)


def preprocess_data(inputdata, full_data):
# Ensure x and y are specified and correct
if not inputdata.x or not inputdata.y:
Expand Down
4 changes: 2 additions & 2 deletions exareme2/algorithms/flower/logistic_regression/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from utils import set_initial_params
from utils import set_model_params

from exareme2.algorithms.flower.inputdata_preprocessing import fetch_client_data
from exareme2.algorithms.flower.inputdata_preprocessing import fetch_data
from exareme2.algorithms.flower.inputdata_preprocessing import get_input
from exareme2.algorithms.flower.inputdata_preprocessing import preprocess_data

Expand Down Expand Up @@ -42,7 +42,7 @@ def evaluate(self, parameters, config):
if __name__ == "__main__":
model = LogisticRegression(penalty="l2", max_iter=1, warm_start=True)
inputdata = get_input()
full_data = fetch_client_data(inputdata)
full_data = fetch_data(inputdata)
X_train, y_train = preprocess_data(inputdata, full_data)
set_initial_params(model, X_train, full_data, inputdata)

Expand Down
4 changes: 2 additions & 2 deletions exareme2/algorithms/flower/logistic_regression/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from utils import set_initial_params
from utils import set_model_params

from exareme2.algorithms.flower.inputdata_preprocessing import fetch_server_data
from exareme2.algorithms.flower.inputdata_preprocessing import fetch_data
from exareme2.algorithms.flower.inputdata_preprocessing import get_input
from exareme2.algorithms.flower.inputdata_preprocessing import post_result
from exareme2.algorithms.flower.inputdata_preprocessing import preprocess_data
Expand Down Expand Up @@ -35,7 +35,7 @@ def evaluate(server_round, parameters, config):
if __name__ == "__main__":
model = LogisticRegression()
inputdata = get_input()
full_data = fetch_server_data(inputdata)
full_data = fetch_data(inputdata)
X_train, y_train = preprocess_data(inputdata, full_data)
set_initial_params(model, X_train, full_data, inputdata)
strategy = fl.server.strategy.FedAvg(
Expand Down
3 changes: 2 additions & 1 deletion exareme2/controller/celery/tasks_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,14 +310,15 @@ def start_flower_client(
)

def start_flower_server(
self, request_id, algorithm_name, number_of_clients, server_address
self, request_id, algorithm_name, number_of_clients, server_address, csv_paths
) -> WorkerTaskResult:
return self._queue_task(
task_signature=TASK_SIGNATURES["start_flower_server"],
request_id=request_id,
algorithm_name=algorithm_name,
number_of_clients=number_of_clients,
server_address=server_address,
csv_paths=csv_paths,
)

def stop_flower_server(
Expand Down
19 changes: 10 additions & 9 deletions exareme2/controller/services/flower/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,12 @@ async def exec_algorithm(self, algorithm_name, algorithm_request_dto):
for worker in workers_info
]

server_task_handler, server_ip = task_handlers[0], workers_info[0].ip
if len(task_handlers) > 1:
global_worker = self.worker_landscape_aggregator.get_global_worker()
server_task_handler = self._create_worker_tasks_handler(
request_id, global_worker
)
server_ip = global_worker.ip

global_worker = self.worker_landscape_aggregator.get_global_worker()
server_task_handler = self._create_worker_tasks_handler(
request_id, global_worker
)
server_ip = global_worker.ip
server_id = global_worker.id
# Garbage Collect
server_task_handler.garbage_collect()
for handler in task_handlers:
Expand All @@ -92,7 +90,10 @@ async def exec_algorithm(self, algorithm_name, algorithm_request_dto):

try:
server_pid = server_task_handler.start_flower_server(
algorithm_name, len(task_handlers), str(server_address)
algorithm_name,
len(task_handlers),
str(server_address),
csv_paths_per_worker_id[server_id],
)
clients_pids = {
handler.start_flower_client(
Expand Down
8 changes: 6 additions & 2 deletions exareme2/controller/services/flower/tasks_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,14 @@ def start_flower_client(
).get(timeout=self._tasks_timeout)

def start_flower_server(
self, algorithm_name: str, number_of_clients: int, server_address
self, algorithm_name: str, number_of_clients: int, server_address, csv_paths
) -> int:
return self._worker_tasks_handler.start_flower_server(
self._request_id, algorithm_name, number_of_clients, server_address
self._request_id,
algorithm_name,
number_of_clients,
server_address,
csv_paths,
).get(timeout=self._tasks_timeout)

def stop_flower_server(self, pid: int, algorithm_name: str):
Expand Down
Loading

0 comments on commit 644ef1b

Please sign in to comment.