Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.0] feat: method findFileByMetadata #8046

Open
wants to merge 3 commits into
base: rel-v8r0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 203 additions & 0 deletions src/DIRAC/Resources/Catalog/RucioFileCatalogClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ class RucioFileCatalogClient(FileCatalogClientBase):
"resolveDataset",
"getLFNForPFN",
"getUserDirectory",
"getFileUserMetadata",
"findFilesByMetadata",

]

WRITE_METHODS = FileCatalogClientBase.WRITE_METHODS + [
Expand All @@ -78,13 +81,15 @@ class RucioFileCatalogClient(FileCatalogClientBase):
"createDataset",
"changePathOwner",
"changePathMode",
"setMetadata",
]

NO_LFN_METHODS = FileCatalogClientBase.NO_LFN_METHODS + [
"getUserDirectory",
"createUserDirectory",
"createUserMapping",
"removeUserDirectory",
"findFilesByMetadata",
]

ADMIN_METHODS = FileCatalogClientBase.ADMIN_METHODS + [
Expand Down Expand Up @@ -697,3 +702,201 @@ def getDirectorySize(self, lfns, longOutput=False, rawFiles=False):
except Exception as err:
return S_ERROR(str(err))
return S_OK(resDict)

@checkCatalogArguments
def getFileUserMetadata(self, path):
"""Get the meta data attached to a file, but also to
all its parents
"""
path=next(iter(path))
resDict = {"Successful": {}, "Failed": {}}
try:
did = self.__getDidsFromLfn(path)
meta = next(self.client.get_metadata_bulk(dids=[did], inherit=True, plugin="ALL"))
if meta["did_type"] == "FILE": # Should we also return the metadata for the directories ?
resDict["Successful"][path] = meta
else:
resDict["Failed"][path] = "Not a file"
except DataIdentifierNotFound:
resDict["Failed"][path] = "No such file or directory"
except Exception as err:
return S_ERROR(str(err))
return S_OK(resDict)

@checkCatalogArguments
def getFileUserMetadataBulk(self, lfns):
"""Get the meta data attached to a list of files, but also to
all their parents
"""
resDict = {"Successful": {}, "Failed": {}}
dids = []
lfnChunks = breakListIntoChunks(lfns, 1000)
for lfnList in lfnChunks:
try:
dids = [self.__getDidsFromLfn(lfn) for lfn in lfnList]
except Exception as err:
return S_ERROR(str(err))
try:
for met in self.client.get_metadata_bulk(dids=dids, inherit=True):
lfn = met["name"]
resDict["Successful"][lfn] = met
for lfn in lfnList:
if lfn not in resDict["Successful"]:
resDict["Failed"][lfn] = "No such file or directory"
except Exception as err:
return S_ERROR(str(err))
return S_OK(resDict)

@checkCatalogArguments
def setMetadataBulk(self, pathMetadataDict):
"""Add metadata for the given paths"""
resDict = {"Successful": {}, "Failed": {}}
dids = []
for path, metadataDict in pathMetadataDict.items():
try:
did = self.__getDidsFromLfn(path)
did["meta"] = metadataDict
dids.append(did)
except Exception as err:
return S_ERROR(str(err))
try:
self.client.set_dids_metadata_bulk(dids=dids, recursive=False)
except Exception as err:
return S_ERROR(str(err))
return S_OK(resDict)

@checkCatalogArguments
def setMetadata(self, path, metadataDict):
"""Add metadata to the given path"""
pathMetadataDict = {}
path=next(iter(path))
pathMetadataDict[path] = metadataDict
return self.setMetadataBulk(pathMetadataDict)

@checkCatalogArguments
def removeMetadata(self, path, metadata):
"""Remove the specified metadata for the given file"""
resDict = {"Successful": {}, "Failed": {}}
try:
did = self.__getDidsFromLfn(path)
failedMeta = {}
# TODO : Implement bulk delete_metadata method in Rucio
for meta in metadata:
try:
self.client.delete_metadata(scope=did["scope"], name=did["name"], key=meta)
except DataIdentifierNotFound:
return S_ERROR(f"File {path} not found")
except Exception as err:
failedMeta[meta] = str(err)

if failedMeta:
metaExample = list(failedMeta)[0]
result = S_ERROR(f"Failed to remove {len(failedMeta)} metadata, e.g. {failedMeta[metaExample]}")
result["FailedMetadata"] = failedMeta
except Exception as err:
return S_ERROR(str(err))
return S_OK()

def findFilesByMetadata(self, metadataFilterDict, path="/", timeout=120):
"""find the dids for the given metadataFilterDict"""
ruciometadataFilterDict=self.__transform_DIRAC_filter_dict_to_Rucio_filter_dict([metadataFilterDict])
dids=[]
for scope in self.scopes:
try:
dids.extend(self.client.list_dids(scope=scope, filters=ruciometadataFilterDict,did_type="all" ))
except Exception as err:
return S_ERROR(str(err))
return S_OK(dids)

def __transform_DIRAC_operator_to_Rucio(self, DIRAC_dict):
"""
Transforms a DIRAC's metadata Query dictionary to a Rucio-compatible dictionary.
This method takes a dictionary with DIRAC operators and converts it to a
dictionary with Rucio-compatible operators based on predefined mappings.
for example :
input_dict={'key1': 'value1', 'key2': {'>': 10}, 'key3': {'=': 10}}
return = {'key1': 'value1', 'key2.gt': 10, 'key3': 10}
"""
rucio_dict = {}
operator_mapping = {
'>': '.gt',
'<': '.lt',
'>=': '.gte',
'<=': '.lte',
'=<': '.lte',
'!=': '.ne',
'=' : ''
}

for key, value in DIRAC_dict.items():
if isinstance(value, dict):
for operator, num in value.items():
if operator in operator_mapping:
mapped_operator = operator_mapping[operator]
rucio_dict[f"{key}{mapped_operator}"] = num
else:
rucio_dict[key] = value

return rucio_dict

def __transform_dict_with_in_operateur(self, DIRAC_dict_with_in_operator_list):
"""
Transforms a list of DIRAC dictionaries containing 'in' operators into a combined list of dictionaries,
expanding the 'in' operator into individual dictionaries while preserving other keys.
example
input_dict_list = [{'particle': {'in': ['proton','electron']},'site': {'in': [ "LaPalma", 'paranal']},'configuration_id': {'=': 14} } ]
return = [{'particle': 'proton', 'site': 'LaPalma', 'configuration_id': {'=': 14} }, {'particle': 'proton', 'site': 'paranal', 'configuration_id': {'=': 14} }, {'particle': 'electron', 'site': 'LaPalma', 'configuration_id': {'=': 14} }, {'particle': 'electron', 'site': 'paranal', 'configuration_id': {'=': 14} }]
"""
if not isinstance(DIRAC_dict_with_in_operator_list, list):
raise TypeError("DIRAC_dict_with_in_operator_list must be a list of dictionaries")

combined_dict_list = [] # Final list of transformed dictionaries
break_reached = False # Boolean to track if 'in' was found and processed in any dictionary

# Process each dictionary in the input list
for DIRAC_dict_with_in_operator in DIRAC_dict_with_in_operator_list:
if not isinstance(DIRAC_dict_with_in_operator, dict):
raise TypeError("Each element in DIRAC_dict_with_in_operator_list must be a dictionary")

in_key = None
in_values = []

# Extract the key with 'in' operator and the list of values
for key, value in DIRAC_dict_with_in_operator.items():
if isinstance(value, dict) and 'in' in value:
in_key = key
in_values = value['in']
break_reached = True # 'in' operator found
break

# If an 'in' key exists, expand the dictionary for each value
if in_key:
for val in in_values:
# Copy the original dictionary and replace the 'in' key
new_dict = DIRAC_dict_with_in_operator.copy()
new_dict[in_key] = val # Replace the 'in' key with the current value
combined_dict_list.append(new_dict)
else:
# If no 'in' key, simply add the input dictionary as-is
combined_dict_list.append(DIRAC_dict_with_in_operator)

return combined_dict_list, break_reached

def __transform_DIRAC_filter_dict_to_Rucio_filter_dict(self, DIRAC_filter_dict_list):
"""
Transforms a list of DIRAC filter dictionaries into a list of Rucio filter dictionaries.
This method takes a list of filter dictionaries used in DIRAC and converts them into a format
that is compatible with Rucio. It handles the transformation of operators and expands filters
that use the 'in' operator.
example:
input_dict_list = [{'particle': {'in': ['proton','electron']},'site': {'in': [ "LaPalma", 'paranal']},'configuration_id': {'=': 14} } ]
return = [{'particle': 'proton', 'site': 'LaPalma', 'configuration_id': 14}, {'particle': 'proton', 'site': 'paranal', 'configuration_id': 14}, {'particle': 'electron', 'site': 'LaPalma', 'configuration_id': 14}, {'particle': 'electron', 'site': 'paranal', 'configuration_id': 14}]
"""
break_detected=True
DIRAC_expanded_filters=DIRAC_filter_dict_list
while break_detected:
DIRAC_expanded_filters, break_detected = self.__transform_dict_with_in_operateur(DIRAC_expanded_filters)
Rucio_filters=[]
for filter in DIRAC_expanded_filters:
Rucio_filters.append(self.__transform_DIRAC_operator_to_Rucio(filter))
return Rucio_filters
129 changes: 129 additions & 0 deletions src/DIRAC/Resources/Catalog/test/Test_RucioFileCatalogClient.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import unittest
from unittest.mock import MagicMock, patch
from DIRAC.Resources.Catalog.RucioFileCatalogClient import RucioFileCatalogClient

class TestRucioFileCatalogClient(unittest.TestCase):

def setUp(self):
self.patcher = patch.object(RucioFileCatalogClient, 'client', new_callable=MagicMock)
self.client = RucioFileCatalogClient()
self.client.scopes = ['test_scope']
self.patcher.start()

def tearDown(self):
self.patcher.stop()

def test_transform_DIRAC_operator_to_Rucio(self):
DIRAC_dict = {
'key1': 'value1',
'key2': {'>': 10},
'key3': {'=': 10}
}
expected_output = {
'key1': 'value1',
'key2.gt': 10,
'key3': 10
}
result = self.client._RucioFileCatalogClient__transform_DIRAC_operator_to_Rucio(DIRAC_dict)
self.assertEqual(result, expected_output)

def test_transform_dict_with_in_operateur_2steps(self):
DIRAC_dict_with_in_operator_list = [
{'particle': {'in': ['proton', 'electron']}, 'site': {'in': ['LaPalma', 'paranal']}, 'configuration_id': {'=': 14}}
]
expected_intermediate_output = [
{'particle': 'proton', 'site': {'in': ['LaPalma', 'paranal']}, 'configuration_id': {'=': 14}},
{'particle': 'electron', 'site': {'in': ['LaPalma', 'paranal']}, 'configuration_id': {'=': 14}}
]
expected_final_output = [
{'particle': 'proton', 'site': 'LaPalma', 'configuration_id': {'=': 14}},
{'particle': 'proton', 'site': 'paranal', 'configuration_id': {'=': 14}},
{'particle': 'electron', 'site': 'LaPalma', 'configuration_id': {'=': 14}},
{'particle': 'electron', 'site': 'paranal', 'configuration_id': {'=': 14}}
]
result_intermediate, _ = self.client._RucioFileCatalogClient__transform_dict_with_in_operateur(DIRAC_dict_with_in_operator_list)
self.assertEqual(result_intermediate, expected_intermediate_output)
result_final, _ = self.client._RucioFileCatalogClient__transform_dict_with_in_operateur( result_intermediate)
self.assertEqual(result_final, expected_final_output)

def test_transform_DIRAC_operator_to_Rucio_simple_key_value(self):
input_dict = {'key1': 'value1', 'key2': 'value2'}
expected_output = {'key1': 'value1', 'key2': 'value2'}
result = self.client._RucioFileCatalogClient__transform_DIRAC_operator_to_Rucio(input_dict)
self.assertEqual(result,expected_output)

def test_transform_DIRAC_operator_to_Rucio_nested_dict_with_operators_gl(self):
input_dict ={'start' : {'>=': 10}, 'end' : {'>': 5}, 'pointingZ' : {'>=': 0.1} , 'organization' : 'ViaCorp' , 'data_levels' : 'DL3'}
expected_output = {'start.gte': 10, 'end.gt': 5, 'pointingZ.gte' : 0.1 ,'organization': 'ViaCorp', 'data_levels' :'DL3'}
result = self.client._RucioFileCatalogClient__transform_DIRAC_operator_to_Rucio(input_dict)
self.assertEqual(result,expected_output)

def test_transform_DIRAC_operator_to_Rucio_nested_dict_with_operators_equals(self):
input_dict = {'start' : {'=': 10}, 'pointingZ' : {'=': 0.1} , 'organization' : 'ViaCorp' , 'data_levels' : 'DL3'}
expected_output = {'start': 10, 'pointingZ' : 0.1 ,'organization': 'ViaCorp', 'data_levels' :'DL3'}
result = self.client._RucioFileCatalogClient__transform_DIRAC_operator_to_Rucio(input_dict)
assert result == expected_output

def test_transform_DIRAC_operator_to_Rucio_mixed_dict(self):
input_dict = {'key1': 'value1', 'key2': {'>': 10}, 'key3': {'=': 10}}
expected_output = {'key1': 'value1', 'key2.gt': 10, 'key3': 10}
result = self.client._RucioFileCatalogClient__transform_DIRAC_operator_to_Rucio(input_dict)
assert result == expected_output

def test_transform_DIRAC_operator_to_Rucio_in_operator(self):
input_dict = [{'analysis_prog': {'in': ['ctapipe-merge', 'ctapipe-process', 'ctapipe-apply-models']}, 'key1': 'value1','key3': {'=': 10},'key4': {'<': 5} }]
expected_intermediate = [{'key1': 'value1', 'key3': 10, 'key4.lt': 5, 'analysis_prog': 'ctapipe-merge'}, {'key1': 'value1', 'key3': 10, 'key4.lt': 5, 'analysis_prog': 'ctapipe-process'}, {'key1': 'value1', 'key3': 10, 'key4.lt': 5, 'analysis_prog': 'ctapipe-apply-models'}]
result_interm = self.client._RucioFileCatalogClient__transform_DIRAC_filter_dict_to_Rucio_filter_dict(input_dict)
assert result_interm == expected_intermediate


def test_transform_DIRAC_operator_to_Rucio_2timesin_operator(self):
input_dict = [{ 'particle': {'in': ['proton','electron']},'site': {'in': [ "LaPalma", 'paranal'] } }]
expected = [{'particle': 'proton', 'site': 'LaPalma'}, {'particle': 'proton', 'site': 'paranal'}, {'particle': 'electron', 'site': 'LaPalma'}, {'particle': 'electron', 'site': 'paranal'}]
result = self.client._RucioFileCatalogClient__transform_DIRAC_filter_dict_to_Rucio_filter_dict(input_dict)
assert result == expected

def test_2timesin_mix_operator(self):
input_dict = [{
'particle': {'in': ['proton','electron']},'site': {'in': [ "LaPalma", 'paranal']},'configuration_id': {'=': 14} } ]
expected = [{'particle': 'proton', 'site': 'LaPalma', 'configuration_id': 14}, {'particle': 'proton', 'site': 'paranal', 'configuration_id': 14}, {'particle': 'electron', 'site': 'LaPalma', 'configuration_id': 14}, {'particle': 'electron', 'site': 'paranal', 'configuration_id': 14}]
result = self.client._RucioFileCatalogClient__transform_DIRAC_filter_dict_to_Rucio_filter_dict(input_dict)
assert result == expected

input_dict = [{
'particle': {'in': ['proton','electron']},'configuration_id': {'=': 14},'site': {'in': [ "LaPalma", 'paranal']} } ]
expected = [{'particle': 'proton', 'configuration_id': 14, 'site': 'LaPalma'}, {'particle': 'proton', 'configuration_id': 14, 'site': 'paranal'}, {'particle': 'electron', 'configuration_id': 14, 'site': 'LaPalma'}, {'particle': 'electron', 'configuration_id': 14, 'site': 'paranal'}]
result = self.client._RucioFileCatalogClient__transform_DIRAC_filter_dict_to_Rucio_filter_dict(input_dict)
assert result == expected



def test_transform_DIRAC_filter_dict_to_Rucio_filter_dict(self):
DIRAC_filter_dict_list = [
{'particle': {'in': ['proton', 'electron']}, 'configuration_id': {'=': 14}, 'site': {'in': ['LaPalma', 'paranal']}}
]
expected_output = [
{'particle': 'proton', 'configuration_id': 14, 'site': 'LaPalma'},
{'particle': 'proton', 'configuration_id': 14, 'site': 'paranal'},
{'particle': 'electron', 'configuration_id': 14, 'site': 'LaPalma'},
{'particle': 'electron', 'configuration_id': 14, 'site': 'paranal'}
]
result = self.client._RucioFileCatalogClient__transform_DIRAC_filter_dict_to_Rucio_filter_dict(DIRAC_filter_dict_list)
self.assertEqual(result, expected_output)

def test_findFilesByMetadata(self):
self.client.client.list_dids.return_value = ['did1', 'did2']
metadataFilterDict = {'key1': 'value1'}
result = self.client.findFilesByMetadata(metadataFilterDict)
self.assertTrue(result['OK'])
self.assertEqual(result['Value'], ['did1', 'did2'])

def test_findFilesByMetadata_with_error(self):
self.client.client.list_dids.side_effect = Exception('Test error')
metadataFilterDict = {'key1': 'value1'}
result = self.client.findFilesByMetadata(metadataFilterDict)
self.assertFalse(result['OK'])
self.assertIn('Test error', result['Message'])

if __name__ == '__main__':
unittest.main()
Loading