|
1 |
| -from typing import Optional, Union |
| 1 | +from logging import warning |
| 2 | +from typing import Optional, Union, Dict |
2 | 3 |
|
3 | 4 | from rdflib import Namespace, URIRef
|
4 | 5 |
|
5 | 6 |
|
6 | 7 | class CurieNamespace(Namespace):
|
7 |
| - def __new__(cls, prefix: str, value: Union[str, URIRef]): |
8 |
| - value = str(value) |
9 |
| - try: |
10 |
| - rt = str.__new__(cls, value) |
11 |
| - except UnicodeDecodeError: |
12 |
| - rt = str.__new__(cls, value, 'utf-8') |
| 8 | + # We would prefer to use curies.Converter here, but there doesn't appear to be any way to build it incrementally |
| 9 | + catalog: Dict[str, "CurieNamespace"] = dict() |
| 10 | + |
| 11 | + @classmethod |
| 12 | + def to_curie(cls, uri: Union[str, URIRef]) -> str: |
| 13 | + uri = str(uri) |
| 14 | + candidate_ns = "" |
| 15 | + for prefix, ns in cls.catalog.items(): |
| 16 | + if uri.startswith(ns) and len(ns) > len(candidate_ns): |
| 17 | + candidate_ns = ns |
| 18 | + if candidate_ns: |
| 19 | + return candidate_ns.curie(uri[len(candidate_ns):]) |
| 20 | + return None |
| 21 | + |
| 22 | + @classmethod |
| 23 | + def to_uri(cls, curie: str) -> Optional[URIRef]: |
| 24 | + prefix, localname = curie.split(':', 1) |
| 25 | + ns = CurieNamespace.catalog.get(prefix, None) |
| 26 | + return ns[localname] if ns else None |
| 27 | + |
| 28 | + def __new__(cls, prefix: str, ns: Union[str, bytes, URIRef]) -> "CurieNamespace": |
| 29 | + rt = Namespace.__new__(cls, str(ns) if not isinstance(ns, bytes) else ns) |
13 | 30 | rt.prefix = prefix
|
| 31 | + if prefix in CurieNamespace.catalog: |
| 32 | + if CurieNamespace.catalog[prefix] != str(rt): |
| 33 | + # prefix is bound to a different namespace |
| 34 | + warning(f"Prefix: {prefix} already references {CurieNamespace.catalog[prefix]} - not updated to {rt}") |
| 35 | + else: |
| 36 | + CurieNamespace.catalog[prefix] = rt |
14 | 37 | return rt
|
15 | 38 |
|
16 | 39 | def curie(self, reference: Optional[str] = '') -> str:
|
|
0 commit comments