Skip to content
78 changes: 52 additions & 26 deletions openfl/databases/tensor_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def __init__(self) -> None:
self.tensor_db = pd.DataFrame(
{col: pd.Series(dtype=dtype) for col, dtype in types_dict.items()}
)
self.secondary_db = self.tensor_db
self._bind_convenience_methods()

self.mutex = Lock()
Expand Down Expand Up @@ -93,6 +94,13 @@ def clean_up(self, remove_older_than: int = 1) -> None:
(self.tensor_db["round"].astype(int) > current_round - remove_older_than)
| self.tensor_db["report"]
].reset_index(drop=True)
self.secondary_db = self.tensor_db[
~self.tensor_db["tags"].apply(
lambda x: any(
keyword in item for item in x for keyword in ["collaborator", "metric"]
)
)
].reset_index(drop=True)
Comment on lines +97 to +103
Copy link
Copy Markdown
Collaborator

@MasterSkepticista MasterSkepticista May 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand, this snippet removes collaborator metrics on the aggregator side (which are merely a couple of floats for each collaborator). I am not sure if this is related to scaling issues, or memory usage in general. Do you have any pre/post analysis in support of this proposal?

Copy link
Copy Markdown
Collaborator Author

@porteratzo porteratzo May 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well as evidence I have profiled the get_cached_tensor with and without filtering these rows and got an ~80% speed up image

As an example I have this calculation
image

So the collaborator tensor rows grows a lot with more collaborators, this number of rows remains constant through the experiment and though the metrics increase relatively little per round, given many reported metrics and many collaborators and rounds these can grow to be a significant part of the tensor_db.

Most calls to get_tensor_from_cache only really care about the aggregator tensor so removing all the collaborator model rows and metric rows does make a large improvement to query time

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, two experiment suggestions:

  1. Could you enable memory profiling and check the memory trends with/without proposed changes?
  2. What is the time taken for 10 rounds of histology (assuming 2 collaborators) with/without proposed changes?

If either of these two results are significant improvements, we can prioritize merging this PR

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes are most impactful with more collaborators and also depend on model size so the following experiment was run on a custom modified workspace from MNIST with 30 collaborators using resnet50 for 10 rounds.

Without changes
real 39m51.381s

With changes
real 34m12.750s

so around 13% reduction in time to completion.


def cache_tensor(self, tensor_key_dict: Dict[TensorKey, np.ndarray]) -> None:
"""Insert a tensor into TensorDB (dataframe).
Expand All @@ -105,26 +113,35 @@ def cache_tensor(self, tensor_key_dict: Dict[TensorKey, np.ndarray]) -> None:
None
"""
entries_to_add = []
with self.mutex:
for tensor_key, nparray in tensor_key_dict.items():
tensor_name, origin, fl_round, report, tags = tensor_key
entries_to_add.append(
pd.DataFrame(
[
[
tensor_name,
origin,
fl_round,
report,
tags,
nparray,
]
],
columns=list(self.tensor_db.columns),
)
)

self.tensor_db = pd.concat([self.tensor_db, *entries_to_add], ignore_index=True)
for tensor_key, nparray in tensor_key_dict.items():
tensor_name, origin, fl_round, report, tags = tensor_key
entries_to_add.append(
{
"tensor_name": tensor_name,
"origin": origin,
"round": fl_round,
"report": report,
"tags": tags,
"nparray": nparray,
}
)

if len(entries_to_add) > 0:
new_data = pd.DataFrame(entries_to_add)
with self.mutex:
self.tensor_db = pd.concat([self.tensor_db, new_data], ignore_index=True)
filtered_new_data = new_data[
~new_data["tags"].apply(
lambda x: any(
keyword in item for item in x for keyword in ["collaborator", "metric"]
)
)
].reset_index(drop=True)
if len(filtered_new_data) > 0:
self.secondary_db = pd.concat(
[self.secondary_db, filtered_new_data], ignore_index=True
)

def get_tensor_from_cache(self, tensor_key: TensorKey) -> Optional[np.ndarray]:
"""Perform a lookup of the tensor_key in the TensorDB.
Expand All @@ -139,13 +156,22 @@ def get_tensor_from_cache(self, tensor_key: TensorKey) -> Optional[np.ndarray]:
tensor_name, origin, fl_round, report, tags = tensor_key

# TODO come up with easy way to ignore compression
df = self.tensor_db[
(self.tensor_db["tensor_name"] == tensor_name)
& (self.tensor_db["origin"] == origin)
& (self.tensor_db["round"] == fl_round)
& (self.tensor_db["report"] == report)
& (self.tensor_db["tags"] == tags)
]
if any(keyword in item for item in tags for keyword in ["collaborator", "metric"]):
df = self.tensor_db[
(self.tensor_db["tensor_name"] == tensor_name)
& (self.tensor_db["origin"] == origin)
& (self.tensor_db["round"] == fl_round)
& (self.tensor_db["report"] == report)
& (self.tensor_db["tags"] == tags)
]
else:
df = self.secondary_db[
(self.secondary_db["tensor_name"] == tensor_name)
& (self.secondary_db["origin"] == origin)
& (self.secondary_db["round"] == fl_round)
& (self.secondary_db["report"] == report)
& (self.secondary_db["tags"] == tags)
]

if len(df) == 0:
return None
Expand Down
Loading