forked from mssalvador/WorkflowCleaning
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
67 lines (61 loc) · 2.96 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# the usual include statements
import os
import sys
import importlib
import pyspark
package_dict = {
'semisupervised.zip': './semisupervised', 'cleaning.zip': './cleaning',
'classification.zip': './classification', 'shared.zip': './shared', 'examples.zip': './examples'}
for zip_file, path in package_dict.items():
if os.path.exists(zip_file):
sys.path.insert(0, zip_file)
else:
sys.path.insert(0, path)
if __name__ == '__main__':
from shared.OwnArguments import OwnArguments
arguments = OwnArguments()
arguments.add_argument('--cluster_path', types=str, required=True, dest='cluster_path')
arguments.add_argument('--job', types=str, required=True, dest='job_name')
arguments.add_argument('--job_args', dest='job_args', nargs='*')
arguments.add_argument('--input_data', dest='input_data', types=str)
arguments.add_argument('--features', dest='features', types=str, nargs='*')
arguments.add_argument('--id', dest='id', types=str, nargs='*')
arguments.add_argument('--labels', dest='labels', types=str, nargs='*', required=False)
arguments.parse_arguments()
all_args = dict()
if arguments.job_args:
all_args['algo_params'] = dict(arg.split('=') for arg in arguments.job_args)
all_args['input_data'] = arguments.input_data
all_args['features'] = arguments.features
all_args['id'] = arguments.id
all_args['labels'] = arguments.labels
# dtu_cluster_path = 'file:///home/micsas/workspace/distributions/dist_workflow'
# local_path = "file:/home/svanhmic/workspace/DABAI/Workflows/dist_workflow"
# visma_cluster_path = 'file:/home/ml/deployments/workflows'
py_files = ['/shared.zip', '/examples.zip', '/cleaning.zip', '/classification.zip', '/semisupervised.zip']
spark_conf = pyspark.SparkConf(loadDefaults=False)
(spark_conf
.set('spark.executor.cores', 4)
.set('spark.executor.memory', '1G')
.set('spark.executors', 2)
)
sc = pyspark.SparkContext(appName=arguments.job_name)
job_module = importlib.import_module('{:s}'.format(arguments.job_name))
# sc = pyspark.SparkContext(
# appName=arguments.job_name, pyFiles=[arguments.cluster_path+py_file for py_file in py_files], conf=spark_conf)
# job_module = importlib.import_module('{:s}'.format(arguments.job_name))
try:
data_frame = job_module.run(sc, **all_args)
# data_frame.printSchema()
# data_frame.show()
rdd = data_frame.toJSON() # .saveAsTextFile('hdfs:///tmp/cleaning.txt')
js = rdd.collect()
# print(js)
if arguments.job_name == 'cleaning':
print("""{"cluster":["""+','.join(js)+"""]}""")
elif arguments.job_name == 'classification':
print("""{"classification":[""" + ','.join(js) + """]}""")
elif arguments.job_name == 'semisupervised':
print("""{"semisuper":["""+ ','.join(js)+"""]}""")
except TypeError as te:
print('Did not run', te) # make this more logable...