-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathjsonld2compactjsonldldj.py
executable file
·121 lines (104 loc) · 4.98 KB
/
jsonld2compactjsonldldj.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import argparse
import json
import sys
import requests
from multiprocessing import Pool,current_process
import ijson.backends.yajl2_cffi as ijson
from pyld import jsonld
def init_mp(c,rf,url,pr,bn):
global context
global context_url
global record_field
global pathprefix
global node
pathprefix=pr
context=c
if bn:
node=True
else:
node=False
if url:
context_url=url
else:
context_url=None
if rf:
record_field=rf
else:
record_field=None
def compact_object(jsonobject):
dnb_split=True
if isinstance(jsonobject,list) and len(jsonobject)==1:
jsonobject=jsonobject[0]
if isinstance(jsonobject, dict):
if (record_field and record_field in jsonobject) or (record_field is None):
compacted = jsonld.compact(jsonobject, context, {'skipExpansion': True})
if context_url:
compacted['@context'] = context_url#
if (node and compacted.get("@id") and compacted.get("@id").startswith("_:")) or (node and compacted.get("id") and compacted.get("id").startswith("_:")):
with open(pathprefix+str(current_process().name)+"-bnodes.ldj","a") as fileout: ###avoid raceconditions
fileout.write(json.dumps(compacted, indent=None) + "\n")
else:
with open(pathprefix+str(current_process().name)+".ldj","a") as fileout:
fileout.write(json.dumps(compacted, indent=None) + "\n")
def yield_obj(path,basepath):
with open(path,"rb") as fin:
builder=ijson.common.ObjectBuilder()
for prefix,event,val in ijson.parse(fin):
try:
builder.event(event,val)
except:
if hasattr(builder,"value"):
print(builder.value)
if prefix==basepath and event=="end_map":
if hasattr(builder,"value"):
yield builder.value
builder=ijson.common.ObjectBuilder()
def run():
parser = argparse.ArgumentParser(prog='jsonld2compactjsonldldj',
description='Transforms a given JSON-LD record array to line-delimited, compact JSON-LD records',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
optional_arguments = parser._action_groups.pop()
required_arguments = parser.add_argument_group('required arguments')
required_arguments.add_argument('-context-url', type=str, dest='context_url',required=True,
help='A JSON-LD context URL that should be set to reference to the JSON-LD context (instead of inline the JSON-LD context)')
required_arguments.add_argument('-input', type=str, help='the input JSON-LD record array')
optional_arguments.add_argument('-record-field', type=str, dest='record_field',
help='A field that should be contained in all records, e.g., a record identifier (this field will be used to identify records)')
optional_arguments.add_argument('-prefix',type=str,default="chunks/record-",help="a prefix for the multiprocessed outputfiles, could also be a path",dest="prefix")
optional_arguments.add_argument('-split-bnodes',dest="bnode",action="store_true",help="activate this switch to seperate bnodes into extra chunks")
optional_arguments.add_argument('-workers',dest="worker",default=8,type=int,help="number of workers to use")
optional_arguments.add_argument('-ldj',dest="ldj",default=False,action="store_true",help="switch on if you want to process line-delimited json, instead of json-arrays")
parser._action_groups.append(optional_arguments)
args = parser.parse_args()
process(args.input,args.record_field,args.context_url,args.prefix,args.bnode,args.worker,args.ldj)
#put this into a function to able to use jsonld2compactjsonldldj as a lib
def process(input,record_field,context_url,pathprefix,bnode,worker,ldj):
r=requests.get(context_url)
if r.ok:
jsonldcontext=r.json()
sys.stderr.write("got context from "+context_url+"\n")
else:
sys.stderr.write("unable to get context from {}. aborting\n".format(context_url))
exit(-1)
pool = Pool(worker,initializer=init_mp,initargs=(jsonldcontext,record_field,context_url,pathprefix,bnode,))
#init_mp(jsonldcontext,record_field,context_url,pathprefix,bnode)
#item.item = go down 2 (array-)levels as in jsonld-1.1 spec
if ldj:
with open(input,"r") as inp:
for line in inp:
try:
jline=json.loads(line)
except:
continue
#malicious json
pool.apply_async(compact_object,(jline,))
else:
for obj in yield_obj(input,"item.item"):
#compact_object(obj)
pool.apply_async(compact_object,(obj,))
pool.close()
pool.join()
if __name__ == "__main__":
run()