Skip to content

Cohort templates #134

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 95 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
95 commits
Select commit Hold shift + click to select a range
19b11b3
Initial classes and code for creating references with rxNorm template…
azimov Mar 26, 2024
cd2910d
Cohort generation of templates works and is tested
azimov Mar 26, 2024
6db260f
Added ATC class cohort templates
azimov Mar 26, 2024
9aba45f
Template code works with large ints
azimov Mar 27, 2024
1b64c1a
Snomed templates
azimov Apr 18, 2024
73d3692
Merge branch 'develop' into cohort_templates
azimov Oct 24, 2024
a7f7d05
Implementation/creation of tables
azimov Mar 28, 2025
cca9ff8
Improved incremental mode
azimov Mar 28, 2025
ec86dba
Missing parameter
azimov Mar 28, 2025
47ca66a
refactor to support datetime on different platforms and handle some o…
azimov Mar 28, 2025
85782f8
fixing some issues in cohort sample
azimov Mar 29, 2025
1ac52c5
fixing some issues in cohort sample
azimov Mar 29, 2025
bec2f13
fixing some issues in cohort sample
azimov Mar 29, 2025
804d8ec
removed subset deprecation warnings
azimov Apr 1, 2025
51fb102
Modified unit tests to use posix time to get around difficult support…
azimov Apr 1, 2025
b659228
Added unit tests around checksum table function
azimov Apr 1, 2025
1408d18
Milisecond precision removes need for Sys.sleep call
azimov Apr 2, 2025
7db23ed
Milisecond precision removes need for Sys.sleep call
azimov Apr 2, 2025
364adec
Fixes for broken subset tests
azimov Apr 8, 2025
3c7dc02
Fixes for broken subset operation tests
azimov Apr 8, 2025
be6aeb0
Alter results data model to support checksums
azimov Apr 8, 2025
c0425c7
Doc string
azimov Apr 8, 2025
3d5f03d
Platform handling of start and end time stamps
azimov Apr 8, 2025
3eb08e9
Don't report time for checksums
azimov Apr 9, 2025
cc964ff
Merge branch 'db_cohort_checksums' into cohort_templates
azimov Apr 30, 2025
0a844be
First attempt at integrating checksums in db with templates - broken …
azimov May 1, 2025
33435b2
r bind oddities
azimov May 1, 2025
169628e
Documentation
azimov May 5, 2025
4f2c3a5
fix snomed function
azimov May 5, 2025
2f1ab3e
fix snomed function
azimov May 5, 2025
d105d0d
fix snomed function sql
azimov May 5, 2025
145de72
fix snomed function sql
azimov May 5, 2025
a28e65e
checksums fix for templates
azimov May 6, 2025
3cfb21e
snomed sql fixes
azimov May 6, 2025
6c5f1a9
Checksum table as big int
azimov May 6, 2025
9f7612e
Attempt to fix checksum insert on spark insert
azimov May 6, 2025
2f09e71
Attempt to fix checksum insert on spark insert
azimov May 6, 2025
7863e83
Attempt to fix checksum insert on spark insert
azimov May 6, 2025
451e990
Attempt to fix checksum insert on spark insert
azimov May 6, 2025
9565910
more insert fixes
azimov May 6, 2025
46eb893
delete cohorts prior to generating
azimov May 6, 2025
9357fca
Simplified drug era queries to workaround spark issue
azimov May 6, 2025
ec255d5
Fix snomed group query
azimov May 6, 2025
42612b0
More consistent date time conversions for start and end time
azimov May 6, 2025
8e55329
Fix snomed references
azimov May 6, 2025
3610bcd
Alter snomet definition
azimov May 6, 2025
cb99d75
sql fix
azimov May 6, 2025
d2451a6
snomed exec fix
azimov May 6, 2025
e55ac10
snomed exec fix
azimov May 6, 2025
f419ce5
snomed reference fix
azimov May 6, 2025
860844f
snomed reference fix
azimov May 6, 2025
243901a
snomed definition fix
azimov May 7, 2025
fadd44d
SNOMED refs
azimov May 7, 2025
82decf8
Cohort table fixed in snomed defs
azimov May 7, 2025
50c7465
Fixing snomed cohort names
azimov May 7, 2025
88b5291
Fixing snomed cohort names on spark
azimov May 7, 2025
3147d29
Fixing snomed cohort names on spark
azimov May 7, 2025
ced50cc
Fixing snomed cohort names on spark
azimov May 7, 2025
f9ff1ef
Merged era definition for atc cohorts in parameterized form
azimov May 10, 2025
da88be9
Cohort param bug
azimov May 10, 2025
d33973c
Cohort param bugs
azimov May 10, 2025
b774f0a
Refactored implementation to simplify API and created basic sql test
azimov May 12, 2025
f1c20e8
* All template execution code is in one place
azimov May 12, 2025
4589007
Tidied up template implementation functions
azimov May 12, 2025
a97790e
broken implementation tests
azimov May 13, 2025
75fa66d
Working SNOMED implementation
azimov May 13, 2025
d46cd21
Generation of template cohorts now happens after base cohorts
azimov May 13, 2025
296b31e
Implementation of templates fixed (no tests for ATC due to Eunomia)
azimov May 13, 2025
eac79b5
Docstrings
azimov May 13, 2025
165bca7
Union and intersection cohorts
azimov May 13, 2025
2414b79
Refined cohort definitions
azimov May 13, 2025
d54a1ef
fix for definition of merged atc cohorts
azimov May 13, 2025
f97773d
fix for definition of non-era merged atc cohorts
azimov May 13, 2025
e4d901b
fix for definition of non-era merged atc cohorts
azimov May 13, 2025
b138cf4
fix for definition of non-era merged atc cohorts
azimov May 13, 2025
c539afc
Test covearge for ATC cohorts
azimov May 14, 2025
6cf33e8
Serialization with refactoring to support common practices
azimov May 14, 2025
f4e5cfc
Fixes for saving and loading
azimov May 14, 2025
ab07b59
Documentation
azimov May 15, 2025
3b91b49
PR comments
azimov May 21, 2025
a4812e9
Checksum computation now strips new line characters and proceding and…
azimov May 23, 2025
4d49c09
1 char change for docs
azimov May 23, 2025
29f5b50
Merge branch 'develop' into db_cohort_checksums
anthonysena May 28, 2025
579cf14
Merge branch 'develop' into db_cohort_checksums
anthonysena May 30, 2025
c39d625
Add back fix for cohort censor stats
anthonysena May 30, 2025
f1f2c22
Merge branch 'develop' into cohort_templates
azimov Jun 5, 2025
fecad2f
Merge branch 'db_cohort_checksums' into cohort_templates
azimov Jun 5, 2025
9e97e28
Doc file added
azimov Jun 5, 2025
c612345
Fix for sqlArgs active field
azimov Jun 10, 2025
6c6cc99
saving template json fixed
azimov Jun 10, 2025
4dc7129
saving template json fixed
azimov Jun 10, 2025
4da0c3c
saving templates
azimov Jun 10, 2025
bd11dc6
Loading tempaltes fixed
azimov Jun 10, 2025
22e26e1
Template gen fixes
azimov Jun 12, 2025
5a33751
Intersection removed
azimov Jun 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,30 @@

export(CohortSubsetDefinition)
export(CohortSubsetOperator)
export(CohortTemplateDefinition)
export(DemographicSubsetOperator)
export(LimitSubsetOperator)
export(SubsetCohortWindow)
export(SubsetOperator)
export(addCohortSubsetDefinition)
export(addCohortTemplateDefintion)
export(addSqlCohortDefinition)
export(checkAndFixCohortDefinitionSetDataTypes)
export(computeChecksum)
export(createAtcCohortTemplateDefinition)
export(createCohortSubset)
export(createCohortSubsetDefinition)
export(createCohortTables)
export(createCohortTemplateDefintion)
export(createDemographicSubset)
export(createEmptyCohortDefinitionSet)
export(createEmptyNegativeControlOutcomeCohortSet)
export(createLimitSubset)
export(createResultsDataModel)
export(createRxNormCohortTemplateDefinition)
export(createSnomedCohortTemplateDefinition)
export(createSubsetCohortWindow)
export(createUnionCohortTemplate)
export(dropCohortStatsTables)
export(exportCohortStatsTables)
export(generateCohortSet)
Expand Down
3 changes: 3 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
CohortGenerator 0.12.0
======================

- creation of cohort_checksum tables that enable verifcation of generated cohorts and incremental execution in distributed
environments

- Backwards compatable extension to CohortSubsetOperators and cohortSubsetWindows to allow windowing to be logic of any
length

Expand Down
412 changes: 281 additions & 131 deletions R/CohortConstruction.R

Large diffs are not rendered by default.

19 changes: 19 additions & 0 deletions R/CohortDefinitionSet.R
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ checkAndFixCohortDefinitionSetDataTypes <- function(x, fixDataTypes = TRUE, emit
#'
#' @param sqlFolder The name of the folder that will hold the SQL representation
#' of the cohort.
#' @param templateFolder Defines the folder to store sql template cohorts that can be loaded as part of the definition
#' Json files are loaded into cohort definition set
#'
#' @param cohortFileNameFormat Defines the format string for naming the cohort
#' JSON and SQL files. The format string follows the
Expand Down Expand Up @@ -221,6 +223,7 @@ getCohortDefinitionSet <- function(settingsFileName = "Cohorts.csv",
cohortFileNameFormat = "%s",
cohortFileNameValue = c("cohortId"),
subsetJsonFolder = "inst/cohort_subset_definitions/",
templateFolder = "inst/cohort_template_definitions/",
packageName = NULL,
warnOnMissingJson = TRUE,
verbose = FALSE) {
Expand Down Expand Up @@ -299,6 +302,8 @@ getCohortDefinitionSet <- function(settingsFileName = "Cohorts.csv",
}

cohortDefinitionSet <- cbind(settings, fileData)
cohortDefinitionSet <- loadTemplateDefinitionsFolder(cohortDefinitionSet, templateFolder)

# Loading cohort subset definitions with their associated targets
if (loadSubsets & nrow(subsetsToLoad) > 0) {
if (dir.exists(subsetJsonFolder)) {
Expand Down Expand Up @@ -355,6 +360,8 @@ getCohortDefinitionSet <- function(settingsFileName = "Cohorts.csv",
#' in conjunction with the cohortFileNameFormat parameter.
#'
#' @param subsetJsonFolder Defines the folder to store the subset JSON
#' @param templateFolder Defines the folder to store sql template cohorts that can be saved as part of the definition
#' Sql will be copied to this location when `saveCohortDefinitionSet` is called.
#'
#' @param verbose When TRUE, logging messages are emitted to indicate export
#' progress.
Expand All @@ -367,12 +374,24 @@ saveCohortDefinitionSet <- function(cohortDefinitionSet,
cohortFileNameFormat = "%s",
cohortFileNameValue = c("cohortId"),
subsetJsonFolder = "inst/cohort_subset_definitions/",
templateFolder = "inst/cohort_template_definitions/",
verbose = FALSE) {
checkmate::assertDataFrame(cohortDefinitionSet, min.rows = 1, col.names = "named")
checkmate::assert_vector(cohortFileNameValue)
checkmate::assert_true(length(cohortFileNameValue) > 0)
assertSettingsColumns(names(cohortDefinitionSet))
checkmate::assert_true(all(cohortFileNameValue %in% names(cohortDefinitionSet)))

templateDefinitions <- getTemplateDefinitions(cohortDefinitionSet)
if (length(templateDefinitions) > 0) {
saveCohortTemplateDefinitions(templateDefinitions, templateFolder)
if (all(cohortDefinitionSet$isTemplatedCohort))
return(invisible())
# Don't save templates as regular cohorts
cohortDefinitionSet <- cohortDefinitionSet |>
dplyr::filter(!.data$isTemplatedCohort)
}

settingsFolder <- dirname(settingsFileName)
if (!dir.exists(settingsFolder)) {
dir.create(settingsFolder, recursive = TRUE)
Expand Down
79 changes: 48 additions & 31 deletions R/CohortSample.R
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
countSql <- "SELECT COUNT(DISTINCT SUBJECT_ID) as cnt FROM @cohort_database_schema.@target_table
WHERE cohort_definition_id = @target_cohort_id"
count <- DatabaseConnector::renderTranslateQuerySql(connection,
countSql,
cohort_database_schema = cohortDatabaseSchema,
target_cohort_id = targetCohortId,
target_table = targetTable
countSql,
cohort_database_schema = cohortDatabaseSchema,
target_cohort_id = targetCohortId,
target_table = targetTable
) %>%
dplyr::pull()

Expand Down Expand Up @@ -64,11 +64,16 @@
targetTable,
outputCohortId,
outputTable,
checksumTable,
cohortDatabaseSchema,
outputDatabaseSchema,
sampleTable,
seed,
tempEmulationSchema) {
tempEmulationSchema,
checksum,
incremental,
recordKeepingFile) {
startTime <- lubridate::now()
randSampleTableName <- paste0("#SAMPLE_TABLE_", seed)
DatabaseConnector::insertTable(
connection = connection,
Expand All @@ -80,9 +85,8 @@
)

execSql <- SqlRender::readSql(system.file("sql", "sql_server", "sampling", "RandomSample.sql", package = "CohortGenerator"))
DatabaseConnector::renderTranslateExecuteSql(connection,
execSql <- SqlRender::render(
execSql,
tempEmulationSchema = tempEmulationSchema,
random_sample_table = randSampleTableName,
target_cohort_id = targetCohortId,
output_cohort_id = outputCohortId,
Expand All @@ -91,6 +95,18 @@
output_table = outputTable,
target_table = targetTable
)
execSql <- SqlRender::translate(execSql,
targetDialect = DatabaseConnector::dbms(connection))

.runCohortSql(connection = connection,
sql = execSql,
startTime = startTime,
resultsDatabaseSchema = cohortDatabaseSchema,
cohortChecksumTable = checksumTable,
incremental = incremental,
cohortId = outputCohortId,
checksum = checksum,
recordKeepingFile = recordKeepingFile)$generationStatus
}


Expand Down Expand Up @@ -169,11 +185,11 @@ sampleCohortDefinitionSet <- function(cohortDefinitionSet,
checkmate::assertIntegerish(seed, min.len = 1)
checkmate::assertDataFrame(cohortDefinitionSet, min.rows = 1, col.names = "named")
checkmate::assertNames(colnames(cohortDefinitionSet),
must.include = c(
"cohortId",
"cohortName",
"sql"
)
must.include = c(
"cohortId",
"cohortName",
"sql"
)
)

if (is.null(n) && is.null(sampleFraction)) {
Expand Down Expand Up @@ -204,6 +220,10 @@ sampleCohortDefinitionSet <- function(cohortDefinitionSet,
}

.checkCohortTables(connection, cohortDatabaseSchema, cohortTableNames)
computedChecksums <- getLastGeneratedCohortChecksums(connection = connection,
cohortDatabaseSchema = cohortDatabaseSchema,
cohortTableNames = cohortTableNames)

sampledCohorts <-
base::Map(function(seed, targetCohortId) {
sampledCohortDefinition <- cohortDefinitionSet %>%
Expand Down Expand Up @@ -240,12 +260,14 @@ sampleCohortDefinitionSet <- function(cohortDefinitionSet,
)
}

if (incremental && !isTaskRequired(
cohortId = outputCohortId,
seed = seed,
checksum = computeChecksum(paste0(sampledCohortDefinition$sql, n, seed, outputCohortId)),
recordKeepingFile = recordKeepingFile
)) {
sampleChecksum <- computeChecksum(paste0(sampledCohortDefinition$sql, n, seed, outputCohortId))
cohortComputed <- computedChecksums |>
dplyr::filter(.data$checksum == sampleChecksum,
.data$cohortDefinitionId == outputCohortId) |>
dplyr::count() |>
dplyr::pull() > 0

if (incremental && cohortComputed) {
sampledCohortDefinition$status <- "skipped"
return(sampledCohortDefinition)
}
Expand All @@ -265,33 +287,28 @@ sampleCohortDefinitionSet <- function(cohortDefinitionSet,
rlang::inform(paste0("No entires found for ", targetCohortId, " was it generated?"))
return(sampledCohortDefinition)
}

# Called only for side effects
.sampleCohort(
sampledCohortDefinition$status <- .sampleCohort(
connection = connection,
targetCohortId = targetCohortId,
targetTable = cohortTableNames$cohortTable,
outputCohortId = outputCohortId,
outputTable = cohortTableNames$cohortSampleTable,
checksumTable = cohortTableNames$cohortChecksumTable,
cohortDatabaseSchema = cohortDatabaseSchema,
outputDatabaseSchema = outputDatabaseSchema,
sampleTable = sampleTable,
seed = seed + targetCohortId, # Seed is unique to each target cohort
tempEmulationSchema = tempEmulationSchema
tempEmulationSchema = tempEmulationSchema,
checksum = sampleChecksum,
incremental = incremental,
recordKeepingFile = recordKeepingFile
)

sampledCohortDefinition$status <- "generated"
if (incremental) {
recordTasksDone(
cohortId = sampledCohortDefinition$cohortId,
seed = seed,
checksum = computeChecksum(paste0(sampledCohortDefinition$sql, n, seed, outputCohortId)),
recordKeepingFile = recordKeepingFile
)
}
return(sampledCohortDefinition)
}, seed, cohortIds) %>%
dplyr::bind_rows()

dplyr::bind_rows()


attr(sampledCohorts, "isSampledCohortDefinition") <- TRUE
Expand Down
9 changes: 7 additions & 2 deletions R/CohortTables.R
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#' inclusion rule statistics.
#' @param cohortCensorStatsTable Name of the censor stats table, one of the tables for storing
#' inclusion rule statistics.
#' @param cohortChecksumTable Stores the checksum of the cohort used and the time generation starts and ends
#'
#' @returns
#' A list of the table names as specified in the parameters to this function.
Expand All @@ -46,15 +47,17 @@ getCohortTableNames <- function(cohortTable = "cohort",
cohortInclusionResultTable = paste0(cohortTable, "_inclusion_result"),
cohortInclusionStatsTable = paste0(cohortTable, "_inclusion_stats"),
cohortSummaryStatsTable = paste0(cohortTable, "_summary_stats"),
cohortCensorStatsTable = paste0(cohortTable, "_censor_stats")) {
cohortCensorStatsTable = paste0(cohortTable, "_censor_stats"),
cohortChecksumTable = paste0(cohortTable, "_checksum")) {
return(list(
cohortTable = cohortTable,
cohortSampleTable = cohortSampleTable,
cohortInclusionTable = cohortInclusionTable,
cohortInclusionResultTable = cohortInclusionResultTable,
cohortInclusionStatsTable = cohortInclusionStatsTable,
cohortSummaryStatsTable = cohortSummaryStatsTable,
cohortCensorStatsTable = cohortCensorStatsTable
cohortCensorStatsTable = cohortCensorStatsTable,
cohortChecksumTable = cohortChecksumTable
))
}

Expand Down Expand Up @@ -121,13 +124,15 @@ createCohortTables <- function(connectionDetails = NULL,
create_cohort_inclusion_stats_table = createTableFlagList$cohortInclusionStatsTable,
create_cohort_summary_stats_table = createTableFlagList$cohortSummaryStatsTable,
create_cohort_censor_stats_table = createTableFlagList$cohortCensorStatsTable,
create_cohort_checksum_table = createTableFlagList$cohortChecksumTable,
cohort_table = cohortTableNames$cohortTable,
cohort_sample_table = cohortTableNames$cohortSampleTable,
cohort_inclusion_table = cohortTableNames$cohortInclusionTable,
cohort_inclusion_result_table = cohortTableNames$cohortInclusionResultTable,
cohort_inclusion_stats_table = cohortTableNames$cohortInclusionStatsTable,
cohort_summary_stats_table = cohortTableNames$cohortSummaryStatsTable,
cohort_censor_stats_table = cohortTableNames$cohortCensorStatsTable,
cohort_checksum_table = cohortTableNames$cohortChecksumTable,
warnOnMissingParameters = TRUE
)
sql <- SqlRender::translate(
Expand Down
8 changes: 7 additions & 1 deletion R/Incremental.R
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#' to store in a record keeping file. This function leverages the md5
#' hash from the digest package
#'
#'
#' @param val The value to hash. It is converted to a character to perform
#' the hash.
#'
Expand All @@ -30,7 +31,12 @@
#'
#' @export
computeChecksum <- function(val) {
return(sapply(as.character(val), digest::digest, algo = "md5", serialize = FALSE))
val <- as.character(val)
# strip whitespace
val <- gsub("[\r\n]", "", val)
val <- trimws(val)
hashes <- sapply(val, digest::digest, algo = "md5", serialize = FALSE, USE.NAMES = FALSE)
return(hashes)
}

#' Is a task required when running in incremental mode
Expand Down
13 changes: 9 additions & 4 deletions R/SubsetDefinitions.R
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ CohortSubsetDefinition <- R6::R6Class(
#' Returns vector of join, logic, having statements returned by subset operations
#' @param targetOutputPair Target output pair
getSubsetQuery = function(targetOutputPair) {
checkmate::assertIntegerish(targetOutputPair, len = 2)
checkmate::assertNumeric(targetOutputPair, len = 2)
checkmate::assertTRUE(all(targetOutputPair %% 1 == 0))
checkmate::assertFALSE(targetOutputPair[[1]] == targetOutputPair[[2]])

targetTable <- "#cohort_sub_base"
Expand Down Expand Up @@ -133,7 +134,8 @@ CohortSubsetDefinition <- R6::R6Class(
#' @param cohortDefinitionSet Cohort definition set containing base names
#' @param targetOutputPair Target output pair
getSubsetCohortName = function(cohortDefinitionSet, targetOutputPair) {
checkmate::assertIntegerish(targetOutputPair, len = 2)
checkmate::assertNumeric(targetOutputPair, len = 2)
checkmate::assertTRUE(all(targetOutputPair %% 1 == 0))
checkmate::assertFALSE(targetOutputPair[[1]] == targetOutputPair[[2]])
checkmate::assertTRUE(targetOutputPair[[1]] %in% cohortDefinitionSet$cohortId)
checkmate::assertTRUE(isCohortDefinitionSet(cohortDefinitionSet))
Expand All @@ -156,7 +158,9 @@ CohortSubsetDefinition <- R6::R6Class(
#' Set the targetOutputPairs to be added to a cohort definition set
#' @param targetIds list of cohort ids to apply subsetting operations to
setTargetOutputPairs = function(targetIds) {
checkmate::assertIntegerish(targetIds, min.len = 1, upper = 10e11)
checkmate::assertNumeric(targetIds, min.len = 1)
checkmate::assertTRUE(all(targetIds %% 1 == 0))

definitionId <- self$definitionId
targetOutputPairs <- list()

Expand Down Expand Up @@ -190,7 +194,8 @@ CohortSubsetDefinition <- R6::R6Class(
targetOutputPairs,
function(targetOutputPair) {
targetOutputPair <- as.numeric(targetOutputPair)
checkmate::assertIntegerish(targetOutputPair, len = 2, upper = 10e11)
checkmate::assertNumeric(targetOutputPair, len = 2)
checkmate::assertTRUE(all(targetOutputPair %% 1 == 0))
checkmate::assertFALSE(targetOutputPair[[1]] == targetOutputPair[[2]])
targetOutputPair
}
Expand Down
Loading
Loading