From 8f757888317a257bc9b634bb7e0b73ae0574b3a4 Mon Sep 17 00:00:00 2001 From: Apostolos Glenis Date: Wed, 18 Sep 2024 14:05:08 +0300 Subject: [PATCH 01/10] added initial xgboost --- exareme2/algorithms/flower/xgboost.json | 38 ++++ .../algorithms/flower/xgboost/__init__.py | 0 exareme2/algorithms/flower/xgboost/client.py | 173 ++++++++++++++++++ exareme2/algorithms/flower/xgboost/server.py | 36 ++++ 4 files changed, 247 insertions(+) create mode 100644 exareme2/algorithms/flower/xgboost.json create mode 100644 exareme2/algorithms/flower/xgboost/__init__.py create mode 100644 exareme2/algorithms/flower/xgboost/client.py create mode 100644 exareme2/algorithms/flower/xgboost/server.py diff --git a/exareme2/algorithms/flower/xgboost.json b/exareme2/algorithms/flower/xgboost.json new file mode 100644 index 000000000..ef3eee99d --- /dev/null +++ b/exareme2/algorithms/flower/xgboost.json @@ -0,0 +1,38 @@ +{ + "name": "xgboost", + "desc": "xgboost", + "label": "XGBoost on Flower", + "enabled": true, + "type": "flower", + "inputdata": { + "y": { + "label": "Variable (dependent)", + "desc": "A unique nominal variable. The variable is converted to binary by assigning 1 to the positive class and 0 to all other classes. ", + "types": [ + "int", + "text" + ], + "stattypes": [ + "nominal" + ], + "notblank": true, + "multiple": false + }, + "x": { + "label": "Covariates (independent)", + "desc": "One or more variables. Can be numerical or nominal. For nominal variables dummy encoding is used.", + "types": [ + "real", + "int", + "text" + ], + "stattypes": [ + "numerical", + "nominal" + ], + "notblank": true, + "multiple": true + }, + "validation": true + } +} diff --git a/exareme2/algorithms/flower/xgboost/__init__.py b/exareme2/algorithms/flower/xgboost/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/exareme2/algorithms/flower/xgboost/client.py b/exareme2/algorithms/flower/xgboost/client.py new file mode 100644 index 000000000..c775f28f9 --- /dev/null +++ b/exareme2/algorithms/flower/xgboost/client.py @@ -0,0 +1,173 @@ +import argparse +import warnings +from logging import INFO +from typing import Union + +import flwr as fl +import numpy as np +import pandas as pd +import xgboost as xgb +from flwr.common import Code +from flwr.common import EvaluateIns +from flwr.common import EvaluateRes +from flwr.common import FitIns +from flwr.common import FitRes +from flwr.common import GetParametersIns +from flwr.common import GetParametersRes +from flwr.common import Parameters +from flwr.common import Status +from flwr.common.logger import log +from sklearn import preprocessing + +from exareme2.algorithms.flower.inputdata_preprocessing import fetch_data +from exareme2.algorithms.flower.inputdata_preprocessing import get_input +from exareme2.algorithms.flower.inputdata_preprocessing import preprocess_data + +warnings.filterwarnings("ignore", category=UserWarning) + + +def transform_dataset_to_dmatrix(x, y) -> xgb.core.DMatrix: + new_data = xgb.DMatrix(x, label=y) + return new_data + + +# Hyper-parameters for xgboost training +num_local_round = 1 +params = { + "objective": "binary:logistic", + "eta": 0.1, # Learning rate + "max_depth": 8, + "eval_metric": "auc", + "nthread": 16, + "num_parallel_tree": 1, + "subsample": 1, + "tree_method": "hist", +} + + +# Define Flower client +class XgbClient(fl.client.Client): + def __init__(self, train_dmatrix, valid_dmatrix, num_train, num_val): + self.bst = None + self.config = None + + self.train_dmatrix = train_dmatrix + self.valid_dmatrix = valid_dmatrix + + self.num_train = num_train + self.num_val = num_val + + def get_parameters(self, ins: GetParametersIns) -> GetParametersRes: + _ = (self, ins) + return GetParametersRes( + status=Status( + code=Code.OK, + message="OK", + ), + parameters=Parameters(tensor_type="", tensors=[]), + ) + + def _local_boost(self): + # Update trees based on local training data. + for i in range(num_local_round): + self.bst.update(train_dmatrix, self.bst.num_boosted_rounds()) + + # Extract the last N=num_local_round trees for sever aggregation + bst = self.bst[ + self.bst.num_boosted_rounds() + - num_local_round : self.bst.num_boosted_rounds() + ] + + return bst + + def fit(self, ins: FitIns) -> FitRes: + if not self.bst: + # First round local training + log(INFO, "Start training at round 1") + bst = xgb.train( + params, + train_dmatrix, + num_boost_round=num_local_round, + evals=[(valid_dmatrix, "validate"), (train_dmatrix, "train")], + ) + self.config = bst.save_config() + self.bst = bst + else: + for item in ins.parameters.tensors: + global_model = bytearray(item) + + # Load global model into booster + self.bst.load_model(global_model) + self.bst.load_config(self.config) + + bst = self._local_boost() + + local_model = bst.save_raw("json") + local_model_bytes = bytes(local_model) + + return FitRes( + status=Status( + code=Code.OK, + message="OK", + ), + parameters=Parameters(tensor_type="", tensors=[local_model_bytes]), + num_examples=self.num_train, + metrics={}, + ) + + def evaluate(self, ins: EvaluateIns) -> EvaluateRes: + eval_results = self.bst.eval_set( + evals=[(valid_dmatrix, "valid")], + iteration=self.bst.num_boosted_rounds() - 1, + ) + auc = round(float(eval_results.split("\t")[1].split(":")[1]), 4) + + return EvaluateRes( + status=Status( + code=Code.OK, + message="OK", + ), + loss=0.0, + num_examples=self.num_val, + metrics={"AUC": auc}, + ) + + +# Start Flower client +# fl.client.start_client(server_address="127.0.0.1:8080", client=XgbClient().to_client()) + +if __name__ == "__main__": + inputdata = get_input() + full_data = fetch_data(inputdata) + X_train, y_train = preprocess_data(inputdata, full_data) + # hard coded for now, later we can split X_train and y_train + X_valid, y_valid = X_train, y_train + + # Reformat data to DMatrix for xgboost + log(INFO, "Reformatting data...") + train_dmatrix = transform_dataset_to_dmatrix(X_train, y=y_train) + valid_dmatrix = transform_dataset_to_dmatrix(X_valid, y=y_valid) + + num_train = X_train.shape[0] + num_val = X_valid.shape[0] + + client = XgbClient(train_dmatrix, valid_dmatrix, num_train, num_val) + + attempts = 0 + max_attempts = int(log2(int(os.environ["TIMEOUT"]))) + while True: + try: + fl.client.start_client( + server_address=os.environ["SERVER_ADDRESS"], client=client.to_client() + ) + FLOWER_LOGGER.debug("Connection successful on attempt", attempts + 1) + break + except Exception as e: + FLOWER_LOGGER.warning( + f"Connection with the server failed. Attempt {attempts + 1} failed: {e}" + ) + time.sleep(pow(2, attempts)) + attempts += 1 + if attempts >= max_attempts: + FLOWER_LOGGER.error("Could not establish connection to the server.") + raise e diff --git a/exareme2/algorithms/flower/xgboost/server.py b/exareme2/algorithms/flower/xgboost/server.py new file mode 100644 index 000000000..687581b6b --- /dev/null +++ b/exareme2/algorithms/flower/xgboost/server.py @@ -0,0 +1,36 @@ +import flwr as fl +from flwr.server.strategy import FedXgbBagging + +# FL experimental settings +pool_size = 2 +NUM_OF_ROUNDS = 5 +num_clients_per_round = 2 +num_evaluate_clients = 2 + + +def evaluate_metrics_aggregation(eval_metrics): + """Return an aggregated metric (AUC) for evaluation.""" + total_num = sum([num for num, _ in eval_metrics]) + auc_aggregated = ( + sum([metrics["AUC"] * num for num, metrics in eval_metrics]) / total_num + ) + metrics_aggregated = {"AUC": auc_aggregated} + return metrics_aggregated + + +if __name__ == "__main__": + # Define strategy + strategy = FedXgbBagging( + fraction_fit=(float(num_clients_per_round) / pool_size), + min_fit_clients=num_clients_per_round, + min_available_clients=pool_size, + min_evaluate_clients=num_evaluate_clients, + fraction_evaluate=1.0, + evaluate_metrics_aggregation_fn=evaluate_metrics_aggregation, + ) + + fl.server.start_server( + server_address=os.environ["SERVER_ADDRESS"], + strategy=strategy, + config=fl.server.ServerConfig(num_rounds=NUM_OF_ROUNDS), + ) From fc74c3b6966218c7240e73704fd3c3a4dd3f2f6f Mon Sep 17 00:00:00 2001 From: Kostas Filippopolitis Date: Thu, 3 Oct 2024 13:45:00 +0300 Subject: [PATCH 02/10] TEMPORARY COMMIT --- .../flower/logistic_regression/client.py | 2 +- exareme2/algorithms/flower/xgboost/client.py | 13 +++--- exareme2/algorithms/flower/xgboost/server.py | 22 +++++++--- poetry.lock | 43 ++++++++++++++++++- pyproject.toml | 1 + .../flower/test_xgboost.py | 28 ++++++++++++ 6 files changed, 94 insertions(+), 15 deletions(-) create mode 100644 tests/algorithm_validation_tests/flower/test_xgboost.py diff --git a/exareme2/algorithms/flower/logistic_regression/client.py b/exareme2/algorithms/flower/logistic_regression/client.py index f2fdf73c6..1e0346a16 100644 --- a/exareme2/algorithms/flower/logistic_regression/client.py +++ b/exareme2/algorithms/flower/logistic_regression/client.py @@ -55,7 +55,7 @@ def evaluate(self, parameters, config): fl.client.start_client( server_address=os.environ["SERVER_ADDRESS"], client=client.to_client() ) - FLOWER_LOGGER.debug("Connection successful on attempt", attempts + 1) + FLOWER_LOGGER.debug("Connection successful on attempt") break except Exception as e: FLOWER_LOGGER.warning( diff --git a/exareme2/algorithms/flower/xgboost/client.py b/exareme2/algorithms/flower/xgboost/client.py index c775f28f9..c2acb355f 100644 --- a/exareme2/algorithms/flower/xgboost/client.py +++ b/exareme2/algorithms/flower/xgboost/client.py @@ -1,11 +1,10 @@ -import argparse +import os +import time import warnings from logging import INFO -from typing import Union +from math import log2 import flwr as fl -import numpy as np -import pandas as pd import xgboost as xgb from flwr.common import Code from flwr.common import EvaluateIns @@ -16,8 +15,8 @@ from flwr.common import GetParametersRes from flwr.common import Parameters from flwr.common import Status +from flwr.common.logger import FLOWER_LOGGER from flwr.common.logger import log -from sklearn import preprocessing from exareme2.algorithms.flower.inputdata_preprocessing import fetch_data from exareme2.algorithms.flower.inputdata_preprocessing import get_input @@ -83,7 +82,7 @@ def _local_boost(self): def fit(self, ins: FitIns) -> FitRes: if not self.bst: # First round local training - log(INFO, "Start training at round 1") + FLOWER_LOGGER.info("Start training at round 1") bst = xgb.train( params, train_dmatrix, @@ -160,7 +159,7 @@ def evaluate(self, ins: EvaluateIns) -> EvaluateRes: fl.client.start_client( server_address=os.environ["SERVER_ADDRESS"], client=client.to_client() ) - FLOWER_LOGGER.debug("Connection successful on attempt", attempts + 1) + FLOWER_LOGGER.debug("Connection successful on attempt") break except Exception as e: FLOWER_LOGGER.warning( diff --git a/exareme2/algorithms/flower/xgboost/server.py b/exareme2/algorithms/flower/xgboost/server.py index 687581b6b..b94abda90 100644 --- a/exareme2/algorithms/flower/xgboost/server.py +++ b/exareme2/algorithms/flower/xgboost/server.py @@ -1,6 +1,10 @@ +import os + import flwr as fl from flwr.server.strategy import FedXgbBagging +from exareme2.algorithms.flower.inputdata_preprocessing import post_result + # FL experimental settings pool_size = 2 NUM_OF_ROUNDS = 5 @@ -10,12 +14,18 @@ def evaluate_metrics_aggregation(eval_metrics): """Return an aggregated metric (AUC) for evaluation.""" - total_num = sum([num for num, _ in eval_metrics]) - auc_aggregated = ( - sum([metrics["AUC"] * num for num, metrics in eval_metrics]) / total_num - ) - metrics_aggregated = {"AUC": auc_aggregated} - return metrics_aggregated + + def evaluate(server_round, parameters, config): + total_num = sum([num for num, _ in eval_metrics]) + auc_aggregated = ( + sum([metrics["AUC"] * num for num, metrics in eval_metrics]) / total_num + ) + metrics_aggregated = {"AUC": auc_aggregated} + if server_round == NUM_OF_ROUNDS: + post_result({"metrics_aggregated": metrics_aggregated}) + return metrics_aggregated + + return evaluate if __name__ == "__main__": diff --git a/poetry.lock b/poetry.lock index 39f71279b..7d10230c9 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1575,6 +1575,17 @@ files = [ {file = "numpy-1.24.1.tar.gz", hash = "sha256:2386da9a471cc00a1f47845e27d916d5ec5346ae9696e01a8a34760858fe9dd2"}, ] +[[package]] +name = "nvidia-nccl-cu12" +version = "2.23.4" +description = "NVIDIA Collective Communication Library (NCCL) Runtime" +optional = false +python-versions = ">=3" +files = [ + {file = "nvidia_nccl_cu12-2.23.4-py3-none-manylinux2014_aarch64.whl", hash = "sha256:aa946c8327e22ced28e7cef508a334673abc42064ec85f02d005ba1785ea4cec"}, + {file = "nvidia_nccl_cu12-2.23.4-py3-none-manylinux2014_x86_64.whl", hash = "sha256:b097258d9aab2fa9f686e33c6fe40ae57b27df60cedbd15d139701bb5509e0c1"}, +] + [[package]] name = "packaging" version = "24.1" @@ -2990,6 +3001,36 @@ files = [ [package.dependencies] h11 = ">=0.9.0,<1" +[[package]] +name = "xgboost" +version = "2.1.1" +description = "XGBoost Python Package" +optional = false +python-versions = ">=3.8" +files = [ + {file = "xgboost-2.1.1-py3-none-macosx_10_15_x86_64.macosx_11_0_x86_64.macosx_12_0_x86_64.whl", hash = "sha256:4163ab55118628f605cfccf950e2d667150640f6fc746bb5a173bddfd935950f"}, + {file = "xgboost-2.1.1-py3-none-macosx_12_0_arm64.whl", hash = "sha256:40d1f647022f497c1b0f69073765baf50ff5802ca77c6bb1aca55a6bc65df00d"}, + {file = "xgboost-2.1.1-py3-none-manylinux2014_aarch64.whl", hash = "sha256:4c534818aa08ab327ac2239ef211ef78db65a8573d069bc9898f824830fa2308"}, + {file = "xgboost-2.1.1-py3-none-manylinux2014_x86_64.whl", hash = "sha256:deef471e8d353afa99e5cc0e2af7d99ace7013f40684fcf3eed9124de033265d"}, + {file = "xgboost-2.1.1-py3-none-manylinux_2_28_aarch64.whl", hash = "sha256:8f3246a6d839dceb4553d3e5ea64ed718f9c692f072ee8275eeb895b58e283e6"}, + {file = "xgboost-2.1.1-py3-none-manylinux_2_28_x86_64.whl", hash = "sha256:6475ca35dede1f87d1dc485b362caba08f69f6020f4440e97b167676a533850e"}, + {file = "xgboost-2.1.1-py3-none-win_amd64.whl", hash = "sha256:fcf8413f3c621e97fdaaa45abb7ae808319c88eff5447328eff14c419c7c6ae0"}, + {file = "xgboost-2.1.1.tar.gz", hash = "sha256:4b1729837f9f1ba88a32ef1be3f8efb860fee6454a68719b196dc88032c23d97"}, +] + +[package.dependencies] +numpy = "*" +nvidia-nccl-cu12 = {version = "*", markers = "platform_system == \"Linux\" and platform_machine != \"aarch64\""} +scipy = "*" + +[package.extras] +dask = ["dask", "distributed", "pandas"] +datatable = ["datatable"] +pandas = ["pandas (>=1.2)"] +plotting = ["graphviz", "matplotlib"] +pyspark = ["cloudpickle", "pyspark", "scikit-learn"] +scikit-learn = ["scikit-learn"] + [[package]] name = "zipp" version = "3.19.2" @@ -3008,4 +3049,4 @@ test = ["big-O", "importlib-resources", "jaraco.functools", "jaraco.itertools", [metadata] lock-version = "2.0" python-versions = "~3.8" -content-hash = "72b48e2af97e04691afbe14e1ed311b8e4709902fbf04f420bdaf0a5b82db9d3" +content-hash = "f6000f15d29fd5dd53fe6e5b4e90f61cf1f29b7511ba0519b00437f2d14aad8c" diff --git a/pyproject.toml b/pyproject.toml index 5f307b39e..0afb4df97 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,6 +24,7 @@ eventlet = "~0.33" patsy = "~0.5.3" flwr = "1.7.0" psutil = "^5.9.8" +xgboost = "^2.1.1" [tool.poetry.dev-dependencies] pytest = "~7.4" diff --git a/tests/algorithm_validation_tests/flower/test_xgboost.py b/tests/algorithm_validation_tests/flower/test_xgboost.py new file mode 100644 index 000000000..4c9a96874 --- /dev/null +++ b/tests/algorithm_validation_tests/flower/test_xgboost.py @@ -0,0 +1,28 @@ +def test_xgboost(get_algorithm_result): + input = { + "inputdata": { + "y": ["gender"], + "x": ["lefthippocampus"], + "data_model": "dementia:0.1", + "datasets": [ + "ppmi0", + "ppmi1", + "ppmi2", + "ppmi3", + "ppmi5", + "ppmi6", + "edsd6", + "ppmi7", + "ppmi8", + "ppmi9", + ], + "validation_datasets": ["ppmi_test"], + "filters": None, + }, + "parameters": None, + "test_case_num": 99, + } + input["type"] = "flower" + algorithm_result = get_algorithm_result("xgboost", input) + print(algorithm_result) + assert algorithm_result == {"accuracy": 0.63} From b9c6308f84c17e587d2539aededacd6886e53908 Mon Sep 17 00:00:00 2001 From: Apostolos Glenis Date: Thu, 3 Oct 2024 14:17:19 +0300 Subject: [PATCH 03/10] fixed xgboost strategy --- exareme2/algorithms/flower/xgboost/server.py | 50 +++++++++++++++----- 1 file changed, 37 insertions(+), 13 deletions(-) diff --git a/exareme2/algorithms/flower/xgboost/server.py b/exareme2/algorithms/flower/xgboost/server.py index b94abda90..55d9b6739 100644 --- a/exareme2/algorithms/flower/xgboost/server.py +++ b/exareme2/algorithms/flower/xgboost/server.py @@ -12,25 +12,49 @@ num_evaluate_clients = 2 -def evaluate_metrics_aggregation(eval_metrics): - """Return an aggregated metric (AUC) for evaluation.""" +class Custom_fed_xgboost(FedXgbBagging): + def aggregate_evaluate( + self, + server_round: int, + results: list[tuple[ClientProxy, EvaluateRes]], + failures: list[Union[tuple[ClientProxy, EvaluateRes], BaseException]], + ) -> tuple[Optional[float], dict[str, Scalar]]: + """Aggregate evaluation metrics using average.""" + if not results: + return None, {} + # Do not aggregate if there are failures and failures are not accepted + if not self.accept_failures and failures: + return None, {} + + # Aggregate custom metrics if aggregation fn was provided + metrics_aggregated = {} + if self.evaluate_metrics_aggregation_fn: + eval_metrics = [(res.num_examples, res.metrics) for _, res in results] + metrics_aggregated = self.evaluate_metrics_aggregation_fn( + server_round, eval_metrics + ) + elif server_round == 1: # Only log this warning once + log(WARNING, "No evaluate_metrics_aggregation_fn provided") + + return 0, metrics_aggregated - def evaluate(server_round, parameters, config): - total_num = sum([num for num, _ in eval_metrics]) - auc_aggregated = ( - sum([metrics["AUC"] * num for num, metrics in eval_metrics]) / total_num - ) - metrics_aggregated = {"AUC": auc_aggregated} - if server_round == NUM_OF_ROUNDS: - post_result({"metrics_aggregated": metrics_aggregated}) - return metrics_aggregated - return evaluate +def evaluate_metrics_aggregation(server_round, eval_metrics): + """Return an aggregated metric (AUC) for evaluation.""" + + total_num = sum([num for num, _ in eval_metrics]) + auc_aggregated = ( + sum([metrics["AUC"] * num for num, metrics in eval_metrics]) / total_num + ) + metrics_aggregated = {"AUC": auc_aggregated} + if server_round == NUM_OF_ROUNDS: + post_result({"metrics_aggregated": metrics_aggregated}) + return metrics_aggregated if __name__ == "__main__": # Define strategy - strategy = FedXgbBagging( + strategy = Custom_fed_xgboost( fraction_fit=(float(num_clients_per_round) / pool_size), min_fit_clients=num_clients_per_round, min_available_clients=pool_size, From f6b9211980f606aabbbdd8917c283163ceca8aab Mon Sep 17 00:00:00 2001 From: Kostas Filippopolitis Date: Thu, 3 Oct 2024 14:40:33 +0300 Subject: [PATCH 04/10] Fixed xgboost server/client to use our custom logger. --- exareme2/algorithms/flower/xgboost/client.py | 4 +--- exareme2/algorithms/flower/xgboost/server.py | 17 +++++++++++++---- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/exareme2/algorithms/flower/xgboost/client.py b/exareme2/algorithms/flower/xgboost/client.py index c2acb355f..e32426c71 100644 --- a/exareme2/algorithms/flower/xgboost/client.py +++ b/exareme2/algorithms/flower/xgboost/client.py @@ -1,7 +1,6 @@ import os import time import warnings -from logging import INFO from math import log2 import flwr as fl @@ -16,7 +15,6 @@ from flwr.common import Parameters from flwr.common import Status from flwr.common.logger import FLOWER_LOGGER -from flwr.common.logger import log from exareme2.algorithms.flower.inputdata_preprocessing import fetch_data from exareme2.algorithms.flower.inputdata_preprocessing import get_input @@ -143,7 +141,7 @@ def evaluate(self, ins: EvaluateIns) -> EvaluateRes: X_valid, y_valid = X_train, y_train # Reformat data to DMatrix for xgboost - log(INFO, "Reformatting data...") + FLOWER_LOGGER.info("Reformatting data...") train_dmatrix = transform_dataset_to_dmatrix(X_train, y=y_train) valid_dmatrix = transform_dataset_to_dmatrix(X_valid, y=y_valid) diff --git a/exareme2/algorithms/flower/xgboost/server.py b/exareme2/algorithms/flower/xgboost/server.py index 55d9b6739..75ee42c89 100644 --- a/exareme2/algorithms/flower/xgboost/server.py +++ b/exareme2/algorithms/flower/xgboost/server.py @@ -1,6 +1,15 @@ import os +from typing import Dict +from typing import List +from typing import Optional +from typing import Tuple +from typing import Union import flwr as fl +from flwr.common import EvaluateRes +from flwr.common import Scalar +from flwr.common.logger import FLOWER_LOGGER +from flwr.server.client_proxy import ClientProxy from flwr.server.strategy import FedXgbBagging from exareme2.algorithms.flower.inputdata_preprocessing import post_result @@ -16,9 +25,9 @@ class Custom_fed_xgboost(FedXgbBagging): def aggregate_evaluate( self, server_round: int, - results: list[tuple[ClientProxy, EvaluateRes]], - failures: list[Union[tuple[ClientProxy, EvaluateRes], BaseException]], - ) -> tuple[Optional[float], dict[str, Scalar]]: + results: List[Tuple[ClientProxy, EvaluateRes]], + failures: List[Union[Tuple[ClientProxy, EvaluateRes], BaseException]], + ) -> Tuple[Optional[float], Dict[str, Scalar]]: """Aggregate evaluation metrics using average.""" if not results: return None, {} @@ -34,7 +43,7 @@ def aggregate_evaluate( server_round, eval_metrics ) elif server_round == 1: # Only log this warning once - log(WARNING, "No evaluate_metrics_aggregation_fn provided") + FLOWER_LOGGER.warn("No evaluate_metrics_aggregation_fn provided") return 0, metrics_aggregated From 5b81d37ca60fa33c9690509478ef34ebaea14fd5 Mon Sep 17 00:00:00 2001 From: Apostolos Glenis Date: Tue, 15 Oct 2024 13:00:39 +0300 Subject: [PATCH 05/10] make code cleaner --- exareme2/algorithms/flower/xgboost/server.py | 56 ++++++-------------- 1 file changed, 15 insertions(+), 41 deletions(-) diff --git a/exareme2/algorithms/flower/xgboost/server.py b/exareme2/algorithms/flower/xgboost/server.py index 75ee42c89..06412c785 100644 --- a/exareme2/algorithms/flower/xgboost/server.py +++ b/exareme2/algorithms/flower/xgboost/server.py @@ -1,15 +1,6 @@ import os -from typing import Dict -from typing import List -from typing import Optional -from typing import Tuple -from typing import Union import flwr as fl -from flwr.common import EvaluateRes -from flwr.common import Scalar -from flwr.common.logger import FLOWER_LOGGER -from flwr.server.client_proxy import ClientProxy from flwr.server.strategy import FedXgbBagging from exareme2.algorithms.flower.inputdata_preprocessing import post_result @@ -21,49 +12,32 @@ num_evaluate_clients = 2 -class Custom_fed_xgboost(FedXgbBagging): - def aggregate_evaluate( - self, - server_round: int, - results: List[Tuple[ClientProxy, EvaluateRes]], - failures: List[Union[Tuple[ClientProxy, EvaluateRes], BaseException]], - ) -> Tuple[Optional[float], Dict[str, Scalar]]: - """Aggregate evaluation metrics using average.""" - if not results: - return None, {} - # Do not aggregate if there are failures and failures are not accepted - if not self.accept_failures and failures: - return None, {} - - # Aggregate custom metrics if aggregation fn was provided - metrics_aggregated = {} - if self.evaluate_metrics_aggregation_fn: - eval_metrics = [(res.num_examples, res.metrics) for _, res in results] - metrics_aggregated = self.evaluate_metrics_aggregation_fn( - server_round, eval_metrics - ) - elif server_round == 1: # Only log this warning once - FLOWER_LOGGER.warn("No evaluate_metrics_aggregation_fn provided") - - return 0, metrics_aggregated - - -def evaluate_metrics_aggregation(server_round, eval_metrics): +def evaluate_metrics_aggregation(eval_metrics): """Return an aggregated metric (AUC) for evaluation.""" - total_num = sum([num for num, _ in eval_metrics]) auc_aggregated = ( sum([metrics["AUC"] * num for num, metrics in eval_metrics]) / total_num ) metrics_aggregated = {"AUC": auc_aggregated} - if server_round == NUM_OF_ROUNDS: - post_result({"metrics_aggregated": metrics_aggregated}) return metrics_aggregated +class CustomFedXgbBagging(FedXgbBagging): + def __init__(self, num_rounds, **kwargs): + super().__init__(**kwargs) + self.num_rounds = num_rounds + + def aggregate_evaluate(self, rnd, results, failures): + aggregated_metrics = super().aggregate_evaluate(rnd, results, failures) + if rnd == self.num_rounds: + post_result({"metrics_aggregated": aggregated_metrics}) + return aggregated_metrics + + if __name__ == "__main__": # Define strategy - strategy = Custom_fed_xgboost( + strategy = CustomFedXgbBagging( + num_rounds=NUM_OF_ROUNDS, fraction_fit=(float(num_clients_per_round) / pool_size), min_fit_clients=num_clients_per_round, min_available_clients=pool_size, From db6a9ef1d4216a5f984c90d741e1a229138c5667 Mon Sep 17 00:00:00 2001 From: Apostolos Glenis Date: Tue, 15 Oct 2024 13:05:49 +0300 Subject: [PATCH 06/10] fixed test --- tests/algorithm_validation_tests/flower/test_xgboost.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/algorithm_validation_tests/flower/test_xgboost.py b/tests/algorithm_validation_tests/flower/test_xgboost.py index 4c9a96874..f359abd74 100644 --- a/tests/algorithm_validation_tests/flower/test_xgboost.py +++ b/tests/algorithm_validation_tests/flower/test_xgboost.py @@ -24,5 +24,7 @@ def test_xgboost(get_algorithm_result): } input["type"] = "flower" algorithm_result = get_algorithm_result("xgboost", input) + # {'metrics_aggregated': {'AUC': 0.7575790087463558}} print(algorithm_result) - assert algorithm_result == {"accuracy": 0.63} + auc_aggregated = algorithm_result["metrics_aggregated"]["AUC"] + assert auc_aggregated > 0.0 From 075b35101393f884d4df6ebc30129c8504345650 Mon Sep 17 00:00:00 2001 From: Apostolos Glenis Date: Mon, 4 Nov 2024 14:33:31 +0200 Subject: [PATCH 07/10] fixed bug in xgboost test --- tests/algorithm_validation_tests/flower/test_xgboost.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/algorithm_validation_tests/flower/test_xgboost.py b/tests/algorithm_validation_tests/flower/test_xgboost.py index f359abd74..82aeca582 100644 --- a/tests/algorithm_validation_tests/flower/test_xgboost.py +++ b/tests/algorithm_validation_tests/flower/test_xgboost.py @@ -26,5 +26,5 @@ def test_xgboost(get_algorithm_result): algorithm_result = get_algorithm_result("xgboost", input) # {'metrics_aggregated': {'AUC': 0.7575790087463558}} print(algorithm_result) - auc_aggregated = algorithm_result["metrics_aggregated"]["AUC"] + auc_aggregated = algorithm_result["metrics_aggregated"][1]["AUC"] assert auc_aggregated > 0.0 From 5ba597de7a248fdfd25415db98f9ecce440362ab Mon Sep 17 00:00:00 2001 From: Apostolos Glenis Date: Sun, 12 Jan 2025 15:00:04 +0200 Subject: [PATCH 08/10] added extra tests --- exareme2/algorithms/flower/xgboost/server.py | 18 +++++++++++++++++- .../flower/test_xgboost.py | 2 ++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/exareme2/algorithms/flower/xgboost/server.py b/exareme2/algorithms/flower/xgboost/server.py index 06412c785..28916952a 100644 --- a/exareme2/algorithms/flower/xgboost/server.py +++ b/exareme2/algorithms/flower/xgboost/server.py @@ -26,11 +26,27 @@ class CustomFedXgbBagging(FedXgbBagging): def __init__(self, num_rounds, **kwargs): super().__init__(**kwargs) self.num_rounds = num_rounds + self.initial_auc = 0.0 def aggregate_evaluate(self, rnd, results, failures): aggregated_metrics = super().aggregate_evaluate(rnd, results, failures) + if rnd == 1: + self.initial_auc = aggregated_metrics["AUC"] if rnd == self.num_rounds: - post_result({"metrics_aggregated": aggregated_metrics}) + print(aggregated_metrics) + curr_auc = aggregated_metrics["AUC"] + auc_diff = curr_auc - self.initial_auc + auc_ascending = "" + if auc_diff > 0.0: + auc_ascending = "correct" + else: + auc_ascending = "not_correct" + post_result( + { + "metrics_aggregated": aggregated_metrics, + "auc_ascending": auc_ascending, + } + ) return aggregated_metrics diff --git a/tests/algorithm_validation_tests/flower/test_xgboost.py b/tests/algorithm_validation_tests/flower/test_xgboost.py index 82aeca582..439c3a479 100644 --- a/tests/algorithm_validation_tests/flower/test_xgboost.py +++ b/tests/algorithm_validation_tests/flower/test_xgboost.py @@ -27,4 +27,6 @@ def test_xgboost(get_algorithm_result): # {'metrics_aggregated': {'AUC': 0.7575790087463558}} print(algorithm_result) auc_aggregated = algorithm_result["metrics_aggregated"][1]["AUC"] + auc_ascending = algorithm_result["auc_ascending"] assert auc_aggregated > 0.0 + assert auc_ascending == "correct" From ee1805f006fb9db3950718280c5fb249d7583466 Mon Sep 17 00:00:00 2001 From: Apostolos Glenis Date: Sun, 12 Jan 2025 15:23:44 +0200 Subject: [PATCH 09/10] added extra tests --- exareme2/algorithms/flower/xgboost/server.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/exareme2/algorithms/flower/xgboost/server.py b/exareme2/algorithms/flower/xgboost/server.py index 28916952a..728830f4d 100644 --- a/exareme2/algorithms/flower/xgboost/server.py +++ b/exareme2/algorithms/flower/xgboost/server.py @@ -34,10 +34,10 @@ def aggregate_evaluate(self, rnd, results, failures): self.initial_auc = aggregated_metrics["AUC"] if rnd == self.num_rounds: print(aggregated_metrics) - curr_auc = aggregated_metrics["AUC"] + curr_auc = aggregated_metrics[1]["AUC"] auc_diff = curr_auc - self.initial_auc auc_ascending = "" - if auc_diff > 0.0: + if auc_diff >= 0.0: auc_ascending = "correct" else: auc_ascending = "not_correct" From 87890cdbfa4657917eb58a7c45b302e194aef38f Mon Sep 17 00:00:00 2001 From: Apostolos Glenis Date: Wed, 22 Jan 2025 16:36:22 +0200 Subject: [PATCH 10/10] xgboost has new tests --- exareme2/algorithms/flower/xgboost/server.py | 21 ++++++++++++++----- .../flower/test_xgboost.py | 3 ++- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/exareme2/algorithms/flower/xgboost/server.py b/exareme2/algorithms/flower/xgboost/server.py index 728830f4d..13e415927 100644 --- a/exareme2/algorithms/flower/xgboost/server.py +++ b/exareme2/algorithms/flower/xgboost/server.py @@ -1,6 +1,8 @@ +import copy import os import flwr as fl +from flwr.common.logger import FLOWER_LOGGER from flwr.server.strategy import FedXgbBagging from exareme2.algorithms.flower.inputdata_preprocessing import post_result @@ -30,21 +32,30 @@ def __init__(self, num_rounds, **kwargs): def aggregate_evaluate(self, rnd, results, failures): aggregated_metrics = super().aggregate_evaluate(rnd, results, failures) + d2 = copy.deepcopy(aggregated_metrics) + curr_auc = d2[1]["AUC"] + if rnd == 1: - self.initial_auc = aggregated_metrics["AUC"] + # print(aggregated_metrics) + d3 = copy.deepcopy(aggregated_metrics) + curr_auc = d3[1]["AUC"] + self.initial_auc = curr_auc + if rnd == self.num_rounds: - print(aggregated_metrics) - curr_auc = aggregated_metrics[1]["AUC"] + FLOWER_LOGGER.debug("aggregated metrics is " + str(aggregated_metrics)) + auc_diff = curr_auc - self.initial_auc auc_ascending = "" - if auc_diff >= 0.0: + if auc_diff >= -0.05: auc_ascending = "correct" else: auc_ascending = "not_correct" + post_result( { - "metrics_aggregated": aggregated_metrics, + "AUC": curr_auc, "auc_ascending": auc_ascending, + "initial_auc": self.initial_auc, } ) return aggregated_metrics diff --git a/tests/algorithm_validation_tests/flower/test_xgboost.py b/tests/algorithm_validation_tests/flower/test_xgboost.py index 439c3a479..b806b03ac 100644 --- a/tests/algorithm_validation_tests/flower/test_xgboost.py +++ b/tests/algorithm_validation_tests/flower/test_xgboost.py @@ -26,7 +26,8 @@ def test_xgboost(get_algorithm_result): algorithm_result = get_algorithm_result("xgboost", input) # {'metrics_aggregated': {'AUC': 0.7575790087463558}} print(algorithm_result) - auc_aggregated = algorithm_result["metrics_aggregated"][1]["AUC"] + auc_aggregated = algorithm_result["AUC"] auc_ascending = algorithm_result["auc_ascending"] + assert auc_aggregated > 0.0 assert auc_ascending == "correct"