Skip to content
Merged
49 changes: 49 additions & 0 deletions kgx/parsers/jelly_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""
Jelly parser for KGX.

This class provides streaming parsing of Jelly format files,
similar to CustomNTriplesParser but for Jelly format.
"""

import gzip
from typing import Generator, Optional, IO

from rdflib import URIRef


class JellyParser:
"""
Streaming parser for Jelly format files.

Similar to CustomNTriplesParser interface but for Jelly format.
"""

def __init__(self, sink):
self.sink = sink

def parse(
self,
filename: str,
compression: Optional[str] = None
) -> Generator:
file_obj = self._open_file(filename, compression)

try:
yield from self._parse_jelly_stream(file_obj)
finally:
file_obj.close()

def _open_file(self, filename: str, compression: Optional[str]) -> IO[bytes]:
if compression == "gz":
return gzip.open(filename, "rb")
else:
return open(filename, "rb")

def _parse_jelly_stream(self, file_obj: IO[bytes]) -> Generator:
from pyjelly.integrations.rdflib.parse import parse_jelly_flat, Triple, Quad

for item in parse_jelly_flat(file_obj):
if isinstance(item, (Triple, Quad)):
s, p, o = item[:3]

yield from self.sink.triple(s, p, o)
49 changes: 41 additions & 8 deletions kgx/sink/rdf_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class RdfSink(Sink):
to an RDF serialization.

.. note::
Currently only RDF N-Triples serialization is supported.
Currently RDF N-Triples and Jelly serializations are supported.

Parameters
----------
Expand Down Expand Up @@ -62,8 +62,10 @@ def __init__(
**kwargs: Any,
):
super().__init__(owner)
if format not in {"nt"}:
raise ValueError(f"Only RDF N-Triples ('nt') serialization supported.")
self.format = format
if format not in {"nt", "jelly"}:
raise ValueError(f"Unsupported RDF serialization format '{self.format}'")

self.DEFAULT = Namespace(self.prefix_manager.prefix_map[""])
# self.OBO = Namespace('http://purl.obolibrary.org/obo/')
self.OBAN = Namespace(self.prefix_manager.prefix_map["OBAN"])
Expand All @@ -80,11 +82,33 @@ def __init__(
self.OBAN.association,
}
if compression == "gz":
f = gzip.open(filename, "wb")
self.FH = gzip.open(filename, "wb")
else:
self.FH = open(filename, "wb")

if self.format == "jelly":
from pyjelly.serialize.streams import TripleStream, SerializerOptions
from pyjelly.options import StreamParameters
from pyjelly import jelly
from pyjelly.serialize.ioutils import write_delimited

params = StreamParameters(
generalized_statements=False,
rdf_star=False,
)

options = SerializerOptions(
logical_type=jelly.LOGICAL_STREAM_TYPE_FLAT_TRIPLES,
params=params,
)

self._jelly_stream = TripleStream.for_rdflib(options=options)
self._jelly_stream.enroll()

self._jelly_write = write_delimited

else:
f = open(filename, "wb")
self.FH = f
self.encoding = "ascii"
self.encoding = "ascii"

def set_reverse_predicate_mapping(self, m: Dict) -> None:
"""
Expand Down Expand Up @@ -180,7 +204,12 @@ def _write_triple(self, s: URIRef, p: URIRef, o: Union[URIRef, Literal]) -> None
The object

"""
self.FH.write(_nt_row((s, p, o)).encode(self.encoding, "_rdflib_nt_escape"))
if self.format == "jelly":
frame = self._jelly_stream.triple((s, p, o))
if frame:
self._jelly_write(frame, self.FH)
else:
self.FH.write(_nt_row((s, p, o)).encode(self.encoding, "_rdflib_nt_escape"))

def write_edge(self, record: Dict) -> None:
"""
Expand Down Expand Up @@ -567,4 +596,8 @@ def finalize(self) -> None:
"""
Perform any operations after writing the file.
"""
if self.format == "jelly":
if frame := self._jelly_stream.flow.to_stream_frame():
self._jelly_write(frame, self.FH)

self.FH.close()
26 changes: 18 additions & 8 deletions kgx/source/rdf_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from kgx.prefix_manager import PrefixManager
from kgx.config import get_logger
from kgx.parsers.ntriples_parser import CustomNTriplesParser
from kgx.parsers.jelly_parser import JellyParser
from kgx.source.source import Source, DEFAULT_EDGE_PREDICATE
from kgx.utils.graph_utils import curie_lookup
from kgx.utils.kgx_utils import (
Expand Down Expand Up @@ -37,7 +38,7 @@ class RdfSource(Source):
from RDF.

.. note::
Currently only RDF N-Triples are supported.
Currently only RDF N-Triples and Jelly are supported.

"""

Expand Down Expand Up @@ -145,7 +146,7 @@ def parse(
filename: str
The filename to parse
format: str
The format (``nt``)
The RDF serialization format (``nt`` or ``jelly``).
compression: Optional[str]
The compression type (``gz``)
kwargs: Any
Expand All @@ -157,15 +158,24 @@ def parse(
A generator for records

"""
p = CustomNTriplesParser(self)

self.set_provenance_map(kwargs)

if compression == "gz":
yield from p.parse(gzip.open(filename, "rb"))
if format == "jelly":
parser = JellyParser(self)
yield from parser.parse(filename, compression)
log.info(f"Done parsing {filename} (jelly)")

elif format == "nt":
p = CustomNTriplesParser(self)

if compression == "gz":
yield from p.parse(gzip.open(filename, "rb"))
else:
yield from p.parse(open(filename, "rb"))
log.info(f"Done parsing {filename} (nt)")

else:
yield from p.parse(open(filename, "rb"))
log.info(f"Done parsing {filename}")
raise ValueError(f"Unsupported format: {format}")

for n in self.reified_nodes:
data = self.node_cache.pop(n)
Expand Down
2 changes: 2 additions & 0 deletions kgx/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
"neo4j": NeoSource,
"duckdb": DuckDbSource,
"nt": RdfSource,
"jelly": RdfSource,
"owl": OwlSource,
"sssom": SssomSource,
"parquet": GraphSource,
Expand All @@ -62,6 +63,7 @@
"jsonl": JsonlSink,
"neo4j": NeoSink,
"nt": RdfSink,
"jelly": RdfSink,
"null": NullSink,
"sql": SqlSink,
"tsv": TsvSink,
Expand Down
Loading