Skip to content

Commit 9942e50

Browse files
authored
Merge pull request #531 from lapkinvladimir/feature/add-jelly-support
Add Jelly RDF format support
2 parents c6db642 + 32bf6de commit 9942e50

File tree

7 files changed

+3221
-1468
lines changed

7 files changed

+3221
-1468
lines changed

kgx/parsers/jelly_parser.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
"""
2+
Jelly parser for KGX.
3+
4+
This class provides streaming parsing of Jelly format files,
5+
similar to CustomNTriplesParser but for Jelly format.
6+
"""
7+
8+
import gzip
9+
from typing import Generator, Optional, IO
10+
11+
from rdflib import URIRef
12+
13+
14+
class JellyParser:
15+
"""
16+
Streaming parser for Jelly format files.
17+
18+
Similar to CustomNTriplesParser interface but for Jelly format.
19+
"""
20+
21+
def __init__(self, sink):
22+
self.sink = sink
23+
24+
def parse(
25+
self,
26+
filename: str,
27+
compression: Optional[str] = None
28+
) -> Generator:
29+
file_obj = self._open_file(filename, compression)
30+
31+
try:
32+
yield from self._parse_jelly_stream(file_obj)
33+
finally:
34+
file_obj.close()
35+
36+
def _open_file(self, filename: str, compression: Optional[str]) -> IO[bytes]:
37+
if compression == "gz":
38+
return gzip.open(filename, "rb")
39+
else:
40+
return open(filename, "rb")
41+
42+
def _parse_jelly_stream(self, file_obj: IO[bytes]) -> Generator:
43+
from pyjelly.integrations.rdflib.parse import parse_jelly_flat, Triple, Quad
44+
45+
for item in parse_jelly_flat(file_obj):
46+
if isinstance(item, (Triple, Quad)):
47+
s, p, o = item[:3]
48+
49+
yield from self.sink.triple(s, p, o)

kgx/sink/rdf_sink.py

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class RdfSink(Sink):
3333
to an RDF serialization.
3434
3535
.. note::
36-
Currently only RDF N-Triples serialization is supported.
36+
Currently RDF N-Triples and Jelly serializations are supported.
3737
3838
Parameters
3939
----------
@@ -62,8 +62,10 @@ def __init__(
6262
**kwargs: Any,
6363
):
6464
super().__init__(owner)
65-
if format not in {"nt"}:
66-
raise ValueError(f"Only RDF N-Triples ('nt') serialization supported.")
65+
self.format = format
66+
if format not in {"nt", "jelly"}:
67+
raise ValueError(f"Unsupported RDF serialization format '{self.format}'")
68+
6769
self.DEFAULT = Namespace(self.prefix_manager.prefix_map[""])
6870
# self.OBO = Namespace('http://purl.obolibrary.org/obo/')
6971
self.OBAN = Namespace(self.prefix_manager.prefix_map["OBAN"])
@@ -80,11 +82,33 @@ def __init__(
8082
self.OBAN.association,
8183
}
8284
if compression == "gz":
83-
f = gzip.open(filename, "wb")
85+
self.FH = gzip.open(filename, "wb")
86+
else:
87+
self.FH = open(filename, "wb")
88+
89+
if self.format == "jelly":
90+
from pyjelly.serialize.streams import TripleStream, SerializerOptions
91+
from pyjelly.options import StreamParameters
92+
from pyjelly import jelly
93+
from pyjelly.serialize.ioutils import write_delimited
94+
95+
params = StreamParameters(
96+
generalized_statements=False,
97+
rdf_star=False,
98+
)
99+
100+
options = SerializerOptions(
101+
logical_type=jelly.LOGICAL_STREAM_TYPE_FLAT_TRIPLES,
102+
params=params,
103+
)
104+
105+
self._jelly_stream = TripleStream.for_rdflib(options=options)
106+
self._jelly_stream.enroll()
107+
108+
self._jelly_write = write_delimited
109+
84110
else:
85-
f = open(filename, "wb")
86-
self.FH = f
87-
self.encoding = "ascii"
111+
self.encoding = "ascii"
88112

89113
def set_reverse_predicate_mapping(self, m: Dict) -> None:
90114
"""
@@ -180,7 +204,12 @@ def _write_triple(self, s: URIRef, p: URIRef, o: Union[URIRef, Literal]) -> None
180204
The object
181205
182206
"""
183-
self.FH.write(_nt_row((s, p, o)).encode(self.encoding, "_rdflib_nt_escape"))
207+
if self.format == "jelly":
208+
frame = self._jelly_stream.triple((s, p, o))
209+
if frame:
210+
self._jelly_write(frame, self.FH)
211+
else:
212+
self.FH.write(_nt_row((s, p, o)).encode(self.encoding, "_rdflib_nt_escape"))
184213

185214
def write_edge(self, record: Dict) -> None:
186215
"""
@@ -567,4 +596,8 @@ def finalize(self) -> None:
567596
"""
568597
Perform any operations after writing the file.
569598
"""
599+
if self.format == "jelly":
600+
if frame := self._jelly_stream.flow.to_stream_frame():
601+
self._jelly_write(frame, self.FH)
602+
570603
self.FH.close()

kgx/source/rdf_source.py

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from kgx.prefix_manager import PrefixManager
1111
from kgx.config import get_logger
1212
from kgx.parsers.ntriples_parser import CustomNTriplesParser
13+
from kgx.parsers.jelly_parser import JellyParser
1314
from kgx.source.source import Source, DEFAULT_EDGE_PREDICATE
1415
from kgx.utils.graph_utils import curie_lookup
1516
from kgx.utils.kgx_utils import (
@@ -37,7 +38,7 @@ class RdfSource(Source):
3738
from RDF.
3839
3940
.. note::
40-
Currently only RDF N-Triples are supported.
41+
Currently only RDF N-Triples and Jelly are supported.
4142
4243
"""
4344

@@ -145,7 +146,7 @@ def parse(
145146
filename: str
146147
The filename to parse
147148
format: str
148-
The format (``nt``)
149+
The RDF serialization format (``nt`` or ``jelly``).
149150
compression: Optional[str]
150151
The compression type (``gz``)
151152
kwargs: Any
@@ -157,15 +158,24 @@ def parse(
157158
A generator for records
158159
159160
"""
160-
p = CustomNTriplesParser(self)
161-
162161
self.set_provenance_map(kwargs)
163162

164-
if compression == "gz":
165-
yield from p.parse(gzip.open(filename, "rb"))
163+
if format == "jelly":
164+
parser = JellyParser(self)
165+
yield from parser.parse(filename, compression)
166+
log.info(f"Done parsing {filename} (jelly)")
167+
168+
elif format == "nt":
169+
p = CustomNTriplesParser(self)
170+
171+
if compression == "gz":
172+
yield from p.parse(gzip.open(filename, "rb"))
173+
else:
174+
yield from p.parse(open(filename, "rb"))
175+
log.info(f"Done parsing {filename} (nt)")
176+
166177
else:
167-
yield from p.parse(open(filename, "rb"))
168-
log.info(f"Done parsing {filename}")
178+
raise ValueError(f"Unsupported format: {format}")
169179

170180
for n in self.reified_nodes:
171181
data = self.node_cache.pop(n)

kgx/transformer.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
"neo4j": NeoSource,
5151
"duckdb": DuckDbSource,
5252
"nt": RdfSource,
53+
"jelly": RdfSource,
5354
"owl": OwlSource,
5455
"sssom": SssomSource,
5556
"parquet": GraphSource,
@@ -62,6 +63,7 @@
6263
"jsonl": JsonlSink,
6364
"neo4j": NeoSink,
6465
"nt": RdfSink,
66+
"jelly": RdfSink,
6567
"null": NullSink,
6668
"sql": SqlSink,
6769
"tsv": TsvSink,

0 commit comments

Comments
 (0)