1616package org .ohdsi .webapi .cohortdefinition ;
1717
1818import org .ohdsi .circe .helper .ResourceHelper ;
19+ import org .ohdsi .cohortcharacterization .CCQueryBuilder ;
20+ import org .ohdsi .sql .BigQuerySparkTranslate ;
1921import org .ohdsi .sql .SqlRender ;
2022import org .ohdsi .sql .SqlSplit ;
2123import org .ohdsi .sql .SqlTranslate ;
24+ import org .ohdsi .webapi .cohortcharacterization .domain .CohortCharacterizationEntity ;
2225import org .ohdsi .webapi .common .generation .CancelableTasklet ;
26+ import org .ohdsi .webapi .common .generation .GenerationUtils ;
27+ import org .ohdsi .webapi .feanalysis .domain .FeAnalysisEntity ;
28+ import org .ohdsi .webapi .feanalysis .repository .FeAnalysisEntityRepository ;
2329import org .ohdsi .webapi .generationcache .GenerationCacheHelper ;
30+ import org .ohdsi .webapi .shiro .Entities .UserRepository ;
2431import org .ohdsi .webapi .source .Source ;
2532import org .ohdsi .webapi .source .SourceService ;
2633import org .ohdsi .webapi .util .CancelableJdbcTemplate ;
2734import org .ohdsi .webapi .util .SessionUtils ;
35+ import org .ohdsi .webapi .util .SourceUtils ;
2836import org .slf4j .LoggerFactory ;
2937import org .springframework .batch .core .scope .context .ChunkContext ;
3038import org .springframework .batch .core .step .tasklet .StoppableTasklet ;
39+ import org .springframework .beans .factory .annotation .Autowired ;
3140import org .springframework .transaction .support .TransactionTemplate ;
3241
42+ import com .google .common .collect .ImmutableList ;
43+ import com .odysseusinc .arachne .commons .types .DBMSType ;
44+
45+ import java .sql .SQLException ;
46+ import java .util .Arrays ;
47+ import java .util .HashSet ;
3348import java .util .Map ;
49+ import java .util .Set ;
50+ import java .util .stream .Collectors ;
51+ import java .util .stream .Stream ;
3452
3553import static org .ohdsi .webapi .Constants .Params .*;
3654
@@ -44,54 +62,151 @@ public class GenerateCohortTasklet extends CancelableTasklet implements Stoppabl
4462 private final GenerationCacheHelper generationCacheHelper ;
4563 private final CohortDefinitionRepository cohortDefinitionRepository ;
4664 private final SourceService sourceService ;
65+ private final FeAnalysisEntityRepository feAnalysisRepository ;
66+
67+ public GenerateCohortTasklet (final CancelableJdbcTemplate jdbcTemplate , final TransactionTemplate transactionTemplate ,
68+ final GenerationCacheHelper generationCacheHelper ,
69+ final CohortDefinitionRepository cohortDefinitionRepository , final SourceService sourceService ) {
70+ super (LoggerFactory .getLogger (GenerateCohortTasklet .class ), jdbcTemplate , transactionTemplate );
71+ this .generationCacheHelper = generationCacheHelper ;
72+ this .cohortDefinitionRepository = cohortDefinitionRepository ;
73+ this .sourceService = sourceService ;
74+ this .feAnalysisRepository = null ;
75+ }
4776
4877 public GenerateCohortTasklet (
4978 final CancelableJdbcTemplate jdbcTemplate ,
5079 final TransactionTemplate transactionTemplate ,
5180 final GenerationCacheHelper generationCacheHelper ,
5281 final CohortDefinitionRepository cohortDefinitionRepository ,
53- final SourceService sourceService
82+ final SourceService sourceService , final FeAnalysisEntityRepository feAnalysisRepository
5483 ) {
5584 super (LoggerFactory .getLogger (GenerateCohortTasklet .class ), jdbcTemplate , transactionTemplate );
5685 this .generationCacheHelper = generationCacheHelper ;
5786 this .cohortDefinitionRepository = cohortDefinitionRepository ;
5887 this .sourceService = sourceService ;
88+ this .feAnalysisRepository = feAnalysisRepository ;
5989 }
6090
6191 @ Override
6292 protected String [] prepareQueries (ChunkContext chunkContext , CancelableJdbcTemplate jdbcTemplate ) {
93+ Map <String , Object > jobParams = chunkContext .getStepContext ().getJobParameters ();
94+
95+ String [] defaultQueries = prepareQueriesDefault (jobParams , jdbcTemplate );
96+
97+ Boolean demographicStat = jobParams .get (DEMOGRAPHIC_STATS ) == null ? null
98+ : Boolean .valueOf ((String ) jobParams .get (DEMOGRAPHIC_STATS ));
99+
100+ if (demographicStat != null && demographicStat .booleanValue ()) {
101+ String [] demographicsQueries = prepareQueriesDemographic (chunkContext , jdbcTemplate );
102+ return Stream .concat (Arrays .stream (defaultQueries ), Arrays .stream (demographicsQueries )).toArray (String []::new );
103+ }
104+
105+ return defaultQueries ;
106+ }
107+
108+ private String [] prepareQueriesDemographic (ChunkContext chunkContext , CancelableJdbcTemplate jdbcTemplate ) {
109+ Map <String , Object > jobParams = chunkContext .getStepContext ().getJobParameters ();
110+ CohortCharacterizationEntity cohortCharacterization = new CohortCharacterizationEntity ();
111+
112+ Integer cohortDefinitionId = Integer .valueOf (jobParams .get (COHORT_DEFINITION_ID ).toString ());
113+ CohortDefinition cohortDefinition = cohortDefinitionRepository .findOneWithDetail (cohortDefinitionId );
114+
115+ cohortCharacterization .setCohortDefinitions (new HashSet <>(Arrays .asList (cohortDefinition )));
116+
117+ // Get FE Analysis Demographic (Gender, Age, Race,)
118+ Set <FeAnalysisEntity > feAnalysis = feAnalysisRepository .findByListIds (Arrays .asList (70 , 72 , 74 , 77 ));
119+
120+ // Set<CcFeAnalysisEntity> ccFeAnalysis = feAnalysis.stream().map(a -> {
121+ // CcFeAnalysisEntity ccA = new CcFeAnalysisEntity();
122+ // ccA.setCohortCharacterization(cohortCharacterization);
123+ // ccA.setFeatureAnalysis(a);
124+ // return ccA;
125+ // }).collect(Collectors.toSet());
126+
127+ cohortCharacterization .setFeatureAnalyses (feAnalysis );
128+
129+ final Long jobId = chunkContext .getStepContext ().getStepExecution ().getJobExecution ().getId ();
130+
131+ final Integer sourceId = Integer .valueOf (jobParams .get (SOURCE_ID ).toString ());
132+ final Source source = sourceService .findBySourceId (sourceId );
133+
134+ final String cohortTable = jobParams .get (TARGET_TABLE ).toString ();
135+ final String sessionId = jobParams .get (SESSION_ID ).toString ();
136+
137+ final String tempSchema = SourceUtils .getTempQualifier (source );
138+
139+ boolean includeAnnual = false ;
140+ boolean includeTemporal = false ;
141+
142+ CCQueryBuilder ccQueryBuilder = new CCQueryBuilder (cohortCharacterization , cohortTable , sessionId ,
143+ SourceUtils .getCdmQualifier (source ), SourceUtils .getResultsQualifier (source ),
144+ SourceUtils .getVocabularyQualifier (source ), tempSchema , jobId );
145+ String sql = ccQueryBuilder .build ();
146+
147+ /*
148+ * There is an issue with temp tables on sql server: Temp tables scope is
149+ * session or stored procedure. To execute PreparedStatement sql server
150+ * uses stored procedure <i>sp_executesql</i> and this is the reason why
151+ * multiple PreparedStatements cannot share the same local temporary
152+ * table.
153+ *
154+ * On the other side, temp tables cannot be re-used in the same
155+ * PreparedStatement, e.g. temp table cannot be created, used, dropped and
156+ * created again in the same PreparedStatement because sql optimizator
157+ * detects object already exists and fails. When is required to re-use
158+ * temp table it should be separated to several PreparedStatements.
159+ *
160+ * An option to use global temp tables also doesn't work since such tables
161+ * can be not supported / disabled.
162+ *
163+ * Therefore, there are two ways: - either precisely group SQLs into
164+ * statements so that temp tables aren't re-used in a single statement, -
165+ * or use ‘permanent temporary tables’
166+ *
167+ * The second option looks better since such SQL could be exported and
168+ * executed manually, which is not the case with the first option.
169+ */
170+ if (ImmutableList .of (DBMSType .MS_SQL_SERVER .getOhdsiDB (), DBMSType .PDW .getOhdsiDB ())
171+ .contains (source .getSourceDialect ())) {
172+ sql = sql .replaceAll ("#" , tempSchema + "." + sessionId + "_" ).replaceAll ("tempdb\\ .\\ ." , "" );
173+ }
174+ if (source .getSourceDialect ().equals ("spark" )) {
175+ try {
176+ sql = BigQuerySparkTranslate .sparkHandleInsert (sql , source .getSourceConnection ());
177+ } catch (SQLException e ) {
178+ e .printStackTrace ();
179+ }
180+ }
181+
182+ final String translatedSql = SqlTranslate .translateSql (sql , source .getSourceDialect (), sessionId , tempSchema );
183+ return SqlSplit .splitSql (translatedSql );
184+ }
185+
186+ private String [] prepareQueriesDefault (Map <String , Object > jobParams , CancelableJdbcTemplate jdbcTemplate ) {
187+ Integer cohortDefinitionId = Integer .valueOf (jobParams .get (COHORT_DEFINITION_ID ).toString ());
188+ Integer sourceId = Integer .parseInt (jobParams .get (SOURCE_ID ).toString ());
189+ String targetSchema = jobParams .get (TARGET_DATABASE_SCHEMA ).toString ();
190+ String sessionId = jobParams .getOrDefault (SESSION_ID , SessionUtils .sessionId ()).toString ();
191+
192+ CohortDefinition cohortDefinition = cohortDefinitionRepository .findOneWithDetail (cohortDefinitionId );
193+ Source source = sourceService .findBySourceId (sourceId );
194+
195+ CohortGenerationRequestBuilder generationRequestBuilder = new CohortGenerationRequestBuilder (sessionId ,
196+ targetSchema );
197+
198+ int designHash = this .generationCacheHelper .computeHash (cohortDefinition .getDetails ().getExpression ());
199+ CohortGenerationUtils .insertInclusionRules (cohortDefinition , source , designHash , targetSchema , sessionId ,
200+ jdbcTemplate );
201+
202+ GenerationCacheHelper .CacheResult res = generationCacheHelper .computeCacheIfAbsent (cohortDefinition , source ,
203+ generationRequestBuilder ,
204+ (resId , sqls ) -> generationCacheHelper .runCancelableCohortGeneration (jdbcTemplate , stmtCancel , sqls ));
63205
64- Map <String , Object > jobParams = chunkContext .getStepContext ().getJobParameters ();
65-
66- Integer cohortDefinitionId = Integer .valueOf (jobParams .get (COHORT_DEFINITION_ID ).toString ());
67- Integer sourceId = Integer .parseInt (jobParams .get (SOURCE_ID ).toString ());
68- String targetSchema = jobParams .get (TARGET_DATABASE_SCHEMA ).toString ();
69- String sessionId = jobParams .getOrDefault (SESSION_ID , SessionUtils .sessionId ()).toString ();
70-
71- CohortDefinition cohortDefinition = cohortDefinitionRepository .findOneWithDetail (cohortDefinitionId );
72- Source source = sourceService .findBySourceId (sourceId );
73-
74- CohortGenerationRequestBuilder generationRequestBuilder = new CohortGenerationRequestBuilder (
75- sessionId ,
76- targetSchema
77- );
78-
79- int designHash = this .generationCacheHelper .computeHash (cohortDefinition .getDetails ().getExpression ());
80- CohortGenerationUtils .insertInclusionRules (cohortDefinition , source , designHash , targetSchema , sessionId , jdbcTemplate );
81-
82- GenerationCacheHelper .CacheResult res = generationCacheHelper .computeCacheIfAbsent (
83- cohortDefinition ,
84- source ,
85- generationRequestBuilder ,
86- (resId , sqls ) -> generationCacheHelper .runCancelableCohortGeneration (jdbcTemplate , stmtCancel , sqls )
87- );
88-
89- String sql = SqlRender .renderSql (
90- copyGenerationIntoCohortTableSql ,
91- new String []{ RESULTS_DATABASE_SCHEMA , COHORT_DEFINITION_ID , DESIGN_HASH },
92- new String []{ targetSchema , cohortDefinition .getId ().toString (), res .getIdentifier ().toString () }
93- );
94- sql = SqlTranslate .translateSql (sql , source .getSourceDialect ());
95- return SqlSplit .splitSql (sql );
206+ String sql = SqlRender .renderSql (copyGenerationIntoCohortTableSql ,
207+ new String [] { RESULTS_DATABASE_SCHEMA , COHORT_DEFINITION_ID , DESIGN_HASH },
208+ new String [] { targetSchema , cohortDefinition .getId ().toString (), res .getIdentifier ().toString () });
209+ sql = SqlTranslate .translateSql (sql , source .getSourceDialect ());
210+ return SqlSplit .splitSql (sql );
96211 }
97212}
0 commit comments