Skip to content

Commit 9571cda

Browse files
committed
fix for s3 export
1 parent f64944e commit 9571cda

File tree

2 files changed

+39
-12
lines changed

2 files changed

+39
-12
lines changed

combine/settings.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515

1616
# Combine Version
17-
COMBINE_VERSION = 'v0.7'
17+
COMBINE_VERSION = 'v0.7.1'
1818

1919

2020
# Build paths inside the project like this: os.path.join(BASE_DIR, ...)

core/spark/console.py

Lines changed: 38 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@
2929
from xml2kvp import XML2kvp
3030

3131

32+
33+
34+
3235
############################################################################
3336
# Background Tasks
3437
############################################################################
@@ -47,6 +50,11 @@ def export_records_as_xml(spark, ct_id):
4750
ct_id (int): CombineBackgroundTask id
4851
'''
4952

53+
# init logging support
54+
spark.sparkContext.setLogLevel('INFO')
55+
log4jLogger = spark.sparkContext._jvm.org.apache.log4j
56+
logger = log4jLogger.LogManager.getLogger(__name__)
57+
5058
# hydrate CombineBackgroundTask
5159
ct = CombineBackgroundTask.objects.get(pk=int(ct_id))
5260

@@ -60,23 +68,42 @@ def export_records_as_xml(spark, ct_id):
6068
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.access.key", settings.AWS_ACCESS_KEY_ID)
6169
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.secret.key", settings.AWS_SECRET_ACCESS_KEY)
6270

63-
# determine column subset
64-
col_subset = ['*']
71+
# init dfs and col_set across all published sets
72+
dfs = []
73+
col_set = set()
6574

66-
# loop through keys and export
67-
rdds = []
75+
# loop through published sets (includes non-set Records)
6876
for folder_name, job_ids in ct.task_params['job_dict'].items():
6977

70-
# handle single job_id
71-
if len(job_ids) == 1:
72-
rdds.extend([get_job_as_df(spark, job_ids[0]).select(col_subset).rdd])
78+
# get dfs and columns
79+
for job_id in job_ids:
7380

74-
# handle multiple jobs
75-
else:
76-
rdds.extend([ get_job_as_df(spark, job_id).select(col_subset).rdd for job_id in job_ids ])
81+
print("Adding job #%s" % job_id)
82+
83+
# get df
84+
df = get_job_as_df(spark, job_id)
85+
86+
# add to total set of columns
87+
col_set.update(df.columns)
88+
89+
# append to dfs
90+
dfs.append(df)
91+
92+
# convert col_set to list
93+
col_set = list(col_set)
94+
logger.info("column final set: %s" % col_set)
95+
96+
# add empty columns to dfs where needed
97+
n_dfs = []
98+
for df in dfs:
99+
n_df = df
100+
for col in col_set:
101+
if col not in df.columns:
102+
n_df = n_df.withColumn(col, lit('').cast(StringType()))
103+
n_dfs.append(n_df)
77104

78105
# get union of all RDDs to write
79-
rdd_to_write = spark.sparkContext.union(rdds)
106+
rdd_to_write = spark.sparkContext.union([ df.select(col_set).rdd for df in n_dfs ])
80107

81108
# repartition
82109
rdd_to_write = rdd_to_write.repartition(math.ceil(rdd_to_write.count() / settings.TARGET_RECORDS_PER_PARTITION))

0 commit comments

Comments
 (0)