-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.nf
186 lines (157 loc) · 6.85 KB
/
main.nf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
#!/usr/bin/env nextflow
// Enable DSL2
nextflow.enable.dsl = 2
params.insertApi = false // By default, don't insert via the API
params.insertDbstan = false // By default, don't insert runs or stanfits into the DB
params.testtracts = false // By default, run all tracts
params.alwayssample = false // By default, fall back to the optimizer for states
params.alwaysoptimize = false // By default, use the sampler for states
params.inputUrl = false // By default, don't use a custom input URL
params.n = -1 // By default, run all tracts
params.ngroups = 10000000 // By default, each tract gets its own NF process
params.branch = "latest" // Branch of model to run - must be on Docker Hub
params.key = "fips" // "fips" for county runs, "state" for state runs
params.raw = false // Output raw `covidestim-result` object as .RDS?
params.s3pub = false // Don't upload to S3 by default
params.s3commit = false // Don't make static files available publicly, by default
params.splicedate = false // By default, don't do any custom date splicing
// for state-level runs. This still means that
// CTP data will prefill JHU data.
// Where a set of backup LM objects for creating synthetic intervals resides.
params.syntheticBackup = 's3://covidestim/synthetic-backup/backup.RDS'
include {combinedVaxData; jhuStateVaxData} from './src/inputs'
include {filterTestTracts; splitTractData} from './src/inputs-utils'
include {runTractSampler; runTractOptimizer} from './src/modelrunners'
include {makeSyntheticIntervals} from './src/synthetic-intervals'
include {publishStateResults; publishCountyResults; insertResults} from './src/outputs'
include {commitStaticFiles} from './src/commitStaticFiles'
def collectCSVs(chan, fname) {
chan.collectFile(
name: fname,
storeDir: params.outdir,
keepHeader: true,
skip: 1
)
}
// Joins two channels and emits a third channel of lists, of the form
// [key, chan1Item, chan2Item]
//
// Assumption: chan1 and chan2 are both emitting file paths
def tupleChan(chan1, chan2) {
chan1Lists = chan1.flatten().map{ [it.getSimpleName(), it] }
chan2Lists = chan2.flatten().map{ [it.getSimpleName(), it] }
chan1Lists.join(chan2Lists, failOnDuplicate: true, failOnMismatch: true)
}
def collectJSONs(chan, fname) {
def slurp = new groovy.json.JsonSlurper()
return chan.reduce( [] ) { listA, jsonFileB ->
def b = slurp.parse(jsonFileB)
// Returns `false` if `listA` was not modified
def successful = listA.addAll(b)
if (!successful)
throw new Exception("No items were added to accumulator")
return listA
}
.map{ it -> groovy.json.JsonOutput.toJson(it) }
.collectFile( name: fname, storeDir: params.outdir )
}
workflow {
main:
// Choose which data cleaning process to use based on whether state-level
// or county-level data is desired
generateData = params.key == "fips" ? combinedVaxData : jhuStateVaxData
// Rules for choosing which runner is to be used
runner = ""
if (params.alwayssample) {
runTract = runTractSampler
runner = "runTractSampler"
} else if (params.alwaysoptimize) {
runTract = runTractOptimizer
runner = "runTractOptimizer"
} else {
runTract = params.key == "fips" ? runTractOptimizer : runTractSampler
runner = params.key == "fips" ? "runTractOptimizer" : "runTractSampler"
}
if (params.testtracts)
inputData = generateData | filterTestTracts | splitTractData //| flatten | take(params.n) | runTract
else
inputData = generateData | splitTractData // | flatten | take(params.n) | runTract
tupleChan(splitTractData.out.timeseries, splitTractData.out.metadata) |
take(params.n) | runTract
// You can't refer directly to the `runTract` object for some reason, so
// this branch is here simply to refer to the correct object when collapsing
// all of the summary/warning/optvals/method csv's into four large .csv
// files.
if (runner == "runTractOptimizer") {
summary = collectCSVs(runTractOptimizer.out.summary, 'summary.csv')
warning = collectCSVs(runTractOptimizer.out.warning, 'warning.csv')
optvals = collectCSVs(runTractOptimizer.out.optvals, 'optvals.csv')
method = collectCSVs(runTractOptimizer.out.method, 'method.csv' )
metadata = collectJSONs(runTractOptimizer.out.metadata, 'postrun_metadata.json')
} else {
summary = collectCSVs(runTractSampler.out.summary, 'summary.csv')
warning = collectCSVs(runTractSampler.out.warning, 'warning.csv')
optvals = collectCSVs(runTractSampler.out.optvals, 'optvals.csv')
method = collectCSVs(runTractSampler.out.method, 'method.csv' )
metadata = collectJSONs(runTractSampler.out.metadata, 'postrun_metadata.json')
}
// Invoke one of the two publishing functions, which reformat the output
// data for web consumption/DB insertion.
if (params.key == "fips") {
input = combinedVaxData.out.data
rejects = combinedVaxData.out.rejects
publishCountyResults(summary, input, rejects, warning, optvals, metadata)
insertResults(summary, input, metadata, method)
if (params.s3commit) {
commitStaticFiles(
publishCountyResults.out.messagepack,
publishCountyResults.out.summary_gzip,
params.key
)
}
} else {
input = jhuStateVaxData.out.data
rejects = jhuStateVaxData.out.rejects
makeSyntheticIntervals(
summary,
metadata,
file(params.syntheticBackup)
)
summarySynthetic = collectCSVs(
makeSyntheticIntervals.out.summary,
'summary_synthetic.csv'
)
publishStateResults(
summarySynthetic,
input,
rejects,
warning,
optvals,
method,
makeSyntheticIntervals.out.metadata
)
insertResults(
summarySynthetic,
input,
makeSyntheticIntervals.out.metadata,
method
)
if (params.s3commit) {
commitStaticFiles(
publishStateResults.out.messagepack,
publishStateResults.out.summary_gzip,
params.key
)
}
final_metadata = collectJSONs(makeSyntheticIntervals.out.metadata, 'final_metadata.json')
}
// Collect the list of rejected states or counties which were NOT run
// by any of the runTract* processes.
collectCSVs(rejects, 'rejects.csv')
emit:
// Emit the following channels as output from this workflow:
summary = summary
warning = warning
optvals = optvals
rejects = rejects
}