11"""Rucio plugin for Snakemake."""
22
3+ from __future__ import annotations
4+
35import dataclasses
46import inspect
7+ import random
58import re
69from collections .abc import Iterable , Sequence
710from urllib .parse import urlparse
@@ -188,7 +191,7 @@ def is_valid_query(cls, query: str) -> StorageQueryValidationResult:
188191 valid = False ,
189192 reason = f"cannot be parsed as URL ({ exc } )" ,
190193 )
191- if parsed .scheme != "rucio" :
194+ if not parsed .scheme :
192195 return StorageQueryValidationResult (
193196 query = query ,
194197 valid = False ,
@@ -215,6 +218,22 @@ def __post_init__(self) -> None:
215218 parsed = urlparse (self .query )
216219 self .scope = parsed .netloc
217220 self .file = parsed .path .lstrip ("/" )
221+ if not self .retrieve :
222+ streaming_url = self ._get_streaming_url ()
223+ if streaming_url is not None :
224+ # The snakemake code assumes that the query is equal to the
225+ # remote path when retrieve is set to False.
226+ # While this works for https and s3, it does not work for Rucio
227+ # because the Rucio query is not a a valid URL and the tools that
228+ # use the streaming URL are not aware of Rucio.
229+ # https://github.com/snakemake/snakemake/blob/6dee2b55fbfff3bdad33cecdbeb8bd55ff4586bc/src/snakemake/io/__init__.py#L1752-L1761
230+ # Therefore, we set the query to the streaming URL.
231+ self .query = streaming_url
232+ # When a job fails, it is displayed using the local_path(),
233+ # even though the path in the job script as executed is the query (see above).
234+ # Therefore, we set the local path to the streaming URL for easy
235+ # to understand error messages.
236+ self .set_local_path (streaming_url )
218237
219238 @property
220239 def client (self ) -> rucio .client .Client :
@@ -232,6 +251,9 @@ async def inventory(self, cache: IOCacheStorageInterface) -> None:
232251 # the given IOCache object, using self.cache_key() as key.
233252 # Optionally, this can take a custom local suffix, needed e.g. when you want
234253 # to cache more items than the current query: self.cache_key(local_suffix=...)
254+ if not self .retrieve :
255+ return
256+
235257 if self .get_inventory_parent () in cache .exists_in_storage :
236258 # record has been inventorized before
237259 return
@@ -308,6 +330,21 @@ def size(self) -> int:
308330 did = self .client .get_did (scope = self .scope , name = self .file )
309331 return did ["bytes" ]
310332
333+ @retry_decorator
334+ def _get_streaming_url (self ) -> str | None :
335+ """Return a URL for streaming the file."""
336+ replicas = self .client .list_replicas (
337+ dids = [{"scope" : self .scope , "name" : self .file }],
338+ rse_expression = self .provider .settings .download_rse ,
339+ )
340+ urls = [
341+ url
342+ for replica in replicas
343+ for site_urls in replica ["rses" ].values ()
344+ for url in site_urls
345+ ]
346+ return random .choice (urls ) if urls else None # noqa: S311
347+
311348 @retry_decorator
312349 def retrieve_object (self ) -> None :
313350 """Download the file to self.local_path()."""
0 commit comments