diff --git a/pychunkedgraph/ingest/cli.py b/pychunkedgraph/ingest/cli.py index 66dd36e97..7061cd0f7 100644 --- a/pychunkedgraph/ingest/cli.py +++ b/pychunkedgraph/ingest/cli.py @@ -157,6 +157,10 @@ def ingest_chunk(queue: str, chunk_info): def ingest_chunk_local(graph_id: str, chunk_info, n_threads: int): """Manually ingest a chunk on a local machine.""" from .create.abstract_layers import add_layer + from .cluster import _create_atomic_chunk - cg = ChunkedGraph(graph_id=graph_id) - add_layer(cg, chunk_info[0], chunk_info[1:], n_threads=n_threads) + if chunk_info[0] == 2: + _create_atomic_chunk(chunk_info[1:]) + else: + cg = ChunkedGraph(graph_id=graph_id) + add_layer(cg, chunk_info[0], chunk_info[1:], n_threads=n_threads) diff --git a/pychunkedgraph/io/edges.py b/pychunkedgraph/io/edges.py index df0538db7..d9f72d9fe 100644 --- a/pychunkedgraph/io/edges.py +++ b/pychunkedgraph/io/edges.py @@ -37,13 +37,15 @@ def deserialize(edges_message: EdgesMsg) -> Tuple[np.ndarray, np.ndarray, np.nda def _parse_edges(compressed: List[bytes]) -> List[Dict]: + result = [] + if(len(compressed) == 0): + return result zdc = zstd.ZstdDecompressor() try: n_threads = int(os.environ.get("ZSTD_THREADS", 1)) except ValueError: n_threads = 1 decompressed = zdc.multi_decompress_to_buffer(compressed, threads=n_threads) - result = [] for content in decompressed: chunk_edges = ChunkEdgesMsg() chunk_edges.ParseFromString(memoryview(content))