diff --git a/src/DIRAC/Resources/Catalog/RucioFileCatalogClient.py b/src/DIRAC/Resources/Catalog/RucioFileCatalogClient.py index abe6f91a61a..67b389f1f6f 100644 --- a/src/DIRAC/Resources/Catalog/RucioFileCatalogClient.py +++ b/src/DIRAC/Resources/Catalog/RucioFileCatalogClient.py @@ -60,6 +60,9 @@ class RucioFileCatalogClient(FileCatalogClientBase): "resolveDataset", "getLFNForPFN", "getUserDirectory", + "getFileUserMetadata", + "findFilesByMetadata", + ] WRITE_METHODS = FileCatalogClientBase.WRITE_METHODS + [ @@ -78,6 +81,7 @@ class RucioFileCatalogClient(FileCatalogClientBase): "createDataset", "changePathOwner", "changePathMode", + "setMetadata", ] NO_LFN_METHODS = FileCatalogClientBase.NO_LFN_METHODS + [ @@ -85,6 +89,7 @@ class RucioFileCatalogClient(FileCatalogClientBase): "createUserDirectory", "createUserMapping", "removeUserDirectory", + "findFilesByMetadata", ] ADMIN_METHODS = FileCatalogClientBase.ADMIN_METHODS + [ @@ -697,3 +702,201 @@ def getDirectorySize(self, lfns, longOutput=False, rawFiles=False): except Exception as err: return S_ERROR(str(err)) return S_OK(resDict) + + @checkCatalogArguments + def getFileUserMetadata(self, path): + """Get the meta data attached to a file, but also to + all its parents + """ + path=next(iter(path)) + resDict = {"Successful": {}, "Failed": {}} + try: + did = self.__getDidsFromLfn(path) + meta = next(self.client.get_metadata_bulk(dids=[did], inherit=True, plugin="ALL")) + if meta["did_type"] == "FILE": # Should we also return the metadata for the directories ? + resDict["Successful"][path] = meta + else: + resDict["Failed"][path] = "Not a file" + except DataIdentifierNotFound: + resDict["Failed"][path] = "No such file or directory" + except Exception as err: + return S_ERROR(str(err)) + return S_OK(resDict) + + @checkCatalogArguments + def getFileUserMetadataBulk(self, lfns): + """Get the meta data attached to a list of files, but also to + all their parents + """ + resDict = {"Successful": {}, "Failed": {}} + dids = [] + lfnChunks = breakListIntoChunks(lfns, 1000) + for lfnList in lfnChunks: + try: + dids = [self.__getDidsFromLfn(lfn) for lfn in lfnList] + except Exception as err: + return S_ERROR(str(err)) + try: + for met in self.client.get_metadata_bulk(dids=dids, inherit=True): + lfn = met["name"] + resDict["Successful"][lfn] = met + for lfn in lfnList: + if lfn not in resDict["Successful"]: + resDict["Failed"][lfn] = "No such file or directory" + except Exception as err: + return S_ERROR(str(err)) + return S_OK(resDict) + + @checkCatalogArguments + def setMetadataBulk(self, pathMetadataDict): + """Add metadata for the given paths""" + resDict = {"Successful": {}, "Failed": {}} + dids = [] + for path, metadataDict in pathMetadataDict.items(): + try: + did = self.__getDidsFromLfn(path) + did["meta"] = metadataDict + dids.append(did) + except Exception as err: + return S_ERROR(str(err)) + try: + self.client.set_dids_metadata_bulk(dids=dids, recursive=False) + except Exception as err: + return S_ERROR(str(err)) + return S_OK(resDict) + + @checkCatalogArguments + def setMetadata(self, path, metadataDict): + """Add metadata to the given path""" + pathMetadataDict = {} + path=next(iter(path)) + pathMetadataDict[path] = metadataDict + return self.setMetadataBulk(pathMetadataDict) + + @checkCatalogArguments + def removeMetadata(self, path, metadata): + """Remove the specified metadata for the given file""" + resDict = {"Successful": {}, "Failed": {}} + try: + did = self.__getDidsFromLfn(path) + failedMeta = {} + # TODO : Implement bulk delete_metadata method in Rucio + for meta in metadata: + try: + self.client.delete_metadata(scope=did["scope"], name=did["name"], key=meta) + except DataIdentifierNotFound: + return S_ERROR(f"File {path} not found") + except Exception as err: + failedMeta[meta] = str(err) + + if failedMeta: + metaExample = list(failedMeta)[0] + result = S_ERROR(f"Failed to remove {len(failedMeta)} metadata, e.g. {failedMeta[metaExample]}") + result["FailedMetadata"] = failedMeta + except Exception as err: + return S_ERROR(str(err)) + return S_OK() + + def findFilesByMetadata(self, metadataFilterDict, path="/", timeout=120): + """find the dids for the given metadataFilterDict""" + ruciometadataFilterDict=self.__transform_DIRAC_filter_dict_to_Rucio_filter_dict([metadataFilterDict]) + dids=[] + for scope in self.scopes: + try: + dids.extend(self.client.list_dids(scope=scope, filters=ruciometadataFilterDict,did_type="all" )) + except Exception as err: + return S_ERROR(str(err)) + return S_OK(dids) + + def __transform_DIRAC_operator_to_Rucio(self, DIRAC_dict): + """ + Transforms a DIRAC's metadata Query dictionary to a Rucio-compatible dictionary. + This method takes a dictionary with DIRAC operators and converts it to a + dictionary with Rucio-compatible operators based on predefined mappings. + for example : + input_dict={'key1': 'value1', 'key2': {'>': 10}, 'key3': {'=': 10}} + return = {'key1': 'value1', 'key2.gt': 10, 'key3': 10} + """ + rucio_dict = {} + operator_mapping = { + '>': '.gt', + '<': '.lt', + '>=': '.gte', + '<=': '.lte', + '=<': '.lte', + '!=': '.ne', + '=' : '' + } + + for key, value in DIRAC_dict.items(): + if isinstance(value, dict): + for operator, num in value.items(): + if operator in operator_mapping: + mapped_operator = operator_mapping[operator] + rucio_dict[f"{key}{mapped_operator}"] = num + else: + rucio_dict[key] = value + + return rucio_dict + + def __transform_dict_with_in_operateur(self, DIRAC_dict_with_in_operator_list): + """ + Transforms a list of DIRAC dictionaries containing 'in' operators into a combined list of dictionaries, + expanding the 'in' operator into individual dictionaries while preserving other keys. + example + input_dict_list = [{'particle': {'in': ['proton','electron']},'site': {'in': [ "LaPalma", 'paranal']},'configuration_id': {'=': 14} } ] + return = [{'particle': 'proton', 'site': 'LaPalma', 'configuration_id': {'=': 14} }, {'particle': 'proton', 'site': 'paranal', 'configuration_id': {'=': 14} }, {'particle': 'electron', 'site': 'LaPalma', 'configuration_id': {'=': 14} }, {'particle': 'electron', 'site': 'paranal', 'configuration_id': {'=': 14} }] + """ + if not isinstance(DIRAC_dict_with_in_operator_list, list): + raise TypeError("DIRAC_dict_with_in_operator_list must be a list of dictionaries") + + combined_dict_list = [] # Final list of transformed dictionaries + break_reached = False # Boolean to track if 'in' was found and processed in any dictionary + + # Process each dictionary in the input list + for DIRAC_dict_with_in_operator in DIRAC_dict_with_in_operator_list: + if not isinstance(DIRAC_dict_with_in_operator, dict): + raise TypeError("Each element in DIRAC_dict_with_in_operator_list must be a dictionary") + + in_key = None + in_values = [] + + # Extract the key with 'in' operator and the list of values + for key, value in DIRAC_dict_with_in_operator.items(): + if isinstance(value, dict) and 'in' in value: + in_key = key + in_values = value['in'] + break_reached = True # 'in' operator found + break + + # If an 'in' key exists, expand the dictionary for each value + if in_key: + for val in in_values: + # Copy the original dictionary and replace the 'in' key + new_dict = DIRAC_dict_with_in_operator.copy() + new_dict[in_key] = val # Replace the 'in' key with the current value + combined_dict_list.append(new_dict) + else: + # If no 'in' key, simply add the input dictionary as-is + combined_dict_list.append(DIRAC_dict_with_in_operator) + + return combined_dict_list, break_reached + + def __transform_DIRAC_filter_dict_to_Rucio_filter_dict(self, DIRAC_filter_dict_list): + """ + Transforms a list of DIRAC filter dictionaries into a list of Rucio filter dictionaries. + This method takes a list of filter dictionaries used in DIRAC and converts them into a format + that is compatible with Rucio. It handles the transformation of operators and expands filters + that use the 'in' operator. + example: + input_dict_list = [{'particle': {'in': ['proton','electron']},'site': {'in': [ "LaPalma", 'paranal']},'configuration_id': {'=': 14} } ] + return = [{'particle': 'proton', 'site': 'LaPalma', 'configuration_id': 14}, {'particle': 'proton', 'site': 'paranal', 'configuration_id': 14}, {'particle': 'electron', 'site': 'LaPalma', 'configuration_id': 14}, {'particle': 'electron', 'site': 'paranal', 'configuration_id': 14}] + """ + break_detected=True + DIRAC_expanded_filters=DIRAC_filter_dict_list + while break_detected: + DIRAC_expanded_filters, break_detected = self.__transform_dict_with_in_operateur(DIRAC_expanded_filters) + Rucio_filters=[] + for filter in DIRAC_expanded_filters: + Rucio_filters.append(self.__transform_DIRAC_operator_to_Rucio(filter)) + return Rucio_filters \ No newline at end of file diff --git a/src/DIRAC/Resources/Catalog/test/Test_RucioFileCatalogClient.py b/src/DIRAC/Resources/Catalog/test/Test_RucioFileCatalogClient.py new file mode 100644 index 00000000000..537084062dc --- /dev/null +++ b/src/DIRAC/Resources/Catalog/test/Test_RucioFileCatalogClient.py @@ -0,0 +1,129 @@ +import unittest +from unittest.mock import MagicMock, patch +from DIRAC.Resources.Catalog.RucioFileCatalogClient import RucioFileCatalogClient + +class TestRucioFileCatalogClient(unittest.TestCase): + + def setUp(self): + self.patcher = patch.object(RucioFileCatalogClient, 'client', new_callable=MagicMock) + self.client = RucioFileCatalogClient() + self.client.scopes = ['test_scope'] + self.patcher.start() + + def tearDown(self): + self.patcher.stop() + + def test_transform_DIRAC_operator_to_Rucio(self): + DIRAC_dict = { + 'key1': 'value1', + 'key2': {'>': 10}, + 'key3': {'=': 10} + } + expected_output = { + 'key1': 'value1', + 'key2.gt': 10, + 'key3': 10 + } + result = self.client._RucioFileCatalogClient__transform_DIRAC_operator_to_Rucio(DIRAC_dict) + self.assertEqual(result, expected_output) + + def test_transform_dict_with_in_operateur_2steps(self): + DIRAC_dict_with_in_operator_list = [ + {'particle': {'in': ['proton', 'electron']}, 'site': {'in': ['LaPalma', 'paranal']}, 'configuration_id': {'=': 14}} + ] + expected_intermediate_output = [ + {'particle': 'proton', 'site': {'in': ['LaPalma', 'paranal']}, 'configuration_id': {'=': 14}}, + {'particle': 'electron', 'site': {'in': ['LaPalma', 'paranal']}, 'configuration_id': {'=': 14}} + ] + expected_final_output = [ + {'particle': 'proton', 'site': 'LaPalma', 'configuration_id': {'=': 14}}, + {'particle': 'proton', 'site': 'paranal', 'configuration_id': {'=': 14}}, + {'particle': 'electron', 'site': 'LaPalma', 'configuration_id': {'=': 14}}, + {'particle': 'electron', 'site': 'paranal', 'configuration_id': {'=': 14}} + ] + result_intermediate, _ = self.client._RucioFileCatalogClient__transform_dict_with_in_operateur(DIRAC_dict_with_in_operator_list) + self.assertEqual(result_intermediate, expected_intermediate_output) + result_final, _ = self.client._RucioFileCatalogClient__transform_dict_with_in_operateur( result_intermediate) + self.assertEqual(result_final, expected_final_output) + + def test_transform_DIRAC_operator_to_Rucio_simple_key_value(self): + input_dict = {'key1': 'value1', 'key2': 'value2'} + expected_output = {'key1': 'value1', 'key2': 'value2'} + result = self.client._RucioFileCatalogClient__transform_DIRAC_operator_to_Rucio(input_dict) + self.assertEqual(result,expected_output) + + def test_transform_DIRAC_operator_to_Rucio_nested_dict_with_operators_gl(self): + input_dict ={'start' : {'>=': 10}, 'end' : {'>': 5}, 'pointingZ' : {'>=': 0.1} , 'organization' : 'ViaCorp' , 'data_levels' : 'DL3'} + expected_output = {'start.gte': 10, 'end.gt': 5, 'pointingZ.gte' : 0.1 ,'organization': 'ViaCorp', 'data_levels' :'DL3'} + result = self.client._RucioFileCatalogClient__transform_DIRAC_operator_to_Rucio(input_dict) + self.assertEqual(result,expected_output) + + def test_transform_DIRAC_operator_to_Rucio_nested_dict_with_operators_equals(self): + input_dict = {'start' : {'=': 10}, 'pointingZ' : {'=': 0.1} , 'organization' : 'ViaCorp' , 'data_levels' : 'DL3'} + expected_output = {'start': 10, 'pointingZ' : 0.1 ,'organization': 'ViaCorp', 'data_levels' :'DL3'} + result = self.client._RucioFileCatalogClient__transform_DIRAC_operator_to_Rucio(input_dict) + assert result == expected_output + + def test_transform_DIRAC_operator_to_Rucio_mixed_dict(self): + input_dict = {'key1': 'value1', 'key2': {'>': 10}, 'key3': {'=': 10}} + expected_output = {'key1': 'value1', 'key2.gt': 10, 'key3': 10} + result = self.client._RucioFileCatalogClient__transform_DIRAC_operator_to_Rucio(input_dict) + assert result == expected_output + + def test_transform_DIRAC_operator_to_Rucio_in_operator(self): + input_dict = [{'analysis_prog': {'in': ['ctapipe-merge', 'ctapipe-process', 'ctapipe-apply-models']}, 'key1': 'value1','key3': {'=': 10},'key4': {'<': 5} }] + expected_intermediate = [{'key1': 'value1', 'key3': 10, 'key4.lt': 5, 'analysis_prog': 'ctapipe-merge'}, {'key1': 'value1', 'key3': 10, 'key4.lt': 5, 'analysis_prog': 'ctapipe-process'}, {'key1': 'value1', 'key3': 10, 'key4.lt': 5, 'analysis_prog': 'ctapipe-apply-models'}] + result_interm = self.client._RucioFileCatalogClient__transform_DIRAC_filter_dict_to_Rucio_filter_dict(input_dict) + assert result_interm == expected_intermediate + + + def test_transform_DIRAC_operator_to_Rucio_2timesin_operator(self): + input_dict = [{ 'particle': {'in': ['proton','electron']},'site': {'in': [ "LaPalma", 'paranal'] } }] + expected = [{'particle': 'proton', 'site': 'LaPalma'}, {'particle': 'proton', 'site': 'paranal'}, {'particle': 'electron', 'site': 'LaPalma'}, {'particle': 'electron', 'site': 'paranal'}] + result = self.client._RucioFileCatalogClient__transform_DIRAC_filter_dict_to_Rucio_filter_dict(input_dict) + assert result == expected + + def test_2timesin_mix_operator(self): + input_dict = [{ + 'particle': {'in': ['proton','electron']},'site': {'in': [ "LaPalma", 'paranal']},'configuration_id': {'=': 14} } ] + expected = [{'particle': 'proton', 'site': 'LaPalma', 'configuration_id': 14}, {'particle': 'proton', 'site': 'paranal', 'configuration_id': 14}, {'particle': 'electron', 'site': 'LaPalma', 'configuration_id': 14}, {'particle': 'electron', 'site': 'paranal', 'configuration_id': 14}] + result = self.client._RucioFileCatalogClient__transform_DIRAC_filter_dict_to_Rucio_filter_dict(input_dict) + assert result == expected + + input_dict = [{ + 'particle': {'in': ['proton','electron']},'configuration_id': {'=': 14},'site': {'in': [ "LaPalma", 'paranal']} } ] + expected = [{'particle': 'proton', 'configuration_id': 14, 'site': 'LaPalma'}, {'particle': 'proton', 'configuration_id': 14, 'site': 'paranal'}, {'particle': 'electron', 'configuration_id': 14, 'site': 'LaPalma'}, {'particle': 'electron', 'configuration_id': 14, 'site': 'paranal'}] + result = self.client._RucioFileCatalogClient__transform_DIRAC_filter_dict_to_Rucio_filter_dict(input_dict) + assert result == expected + + + + def test_transform_DIRAC_filter_dict_to_Rucio_filter_dict(self): + DIRAC_filter_dict_list = [ + {'particle': {'in': ['proton', 'electron']}, 'configuration_id': {'=': 14}, 'site': {'in': ['LaPalma', 'paranal']}} + ] + expected_output = [ + {'particle': 'proton', 'configuration_id': 14, 'site': 'LaPalma'}, + {'particle': 'proton', 'configuration_id': 14, 'site': 'paranal'}, + {'particle': 'electron', 'configuration_id': 14, 'site': 'LaPalma'}, + {'particle': 'electron', 'configuration_id': 14, 'site': 'paranal'} + ] + result = self.client._RucioFileCatalogClient__transform_DIRAC_filter_dict_to_Rucio_filter_dict(DIRAC_filter_dict_list) + self.assertEqual(result, expected_output) + + def test_findFilesByMetadata(self): + self.client.client.list_dids.return_value = ['did1', 'did2'] + metadataFilterDict = {'key1': 'value1'} + result = self.client.findFilesByMetadata(metadataFilterDict) + self.assertTrue(result['OK']) + self.assertEqual(result['Value'], ['did1', 'did2']) + + def test_findFilesByMetadata_with_error(self): + self.client.client.list_dids.side_effect = Exception('Test error') + metadataFilterDict = {'key1': 'value1'} + result = self.client.findFilesByMetadata(metadataFilterDict) + self.assertFalse(result['OK']) + self.assertIn('Test error', result['Message']) + +if __name__ == '__main__': + unittest.main()