Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spike/rdmp 282 cohort temp tables #2131

Open
wants to merge 9 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Fix application restart not closing all windows
- Improve cohort deprecation override test
- Add Filters for CatalogueItems to Dashboard graphs
- Add ability to use cohort temp table during extractions

## [8.4.2] - 2024-12-18

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) The University of Dundee 2018-2019
// Copyright (c) The University of Dundee 2018-2025
// This file is part of the Research Data Management Platform (RDMP).
// RDMP is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
// RDMP is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
Expand All @@ -7,6 +7,7 @@
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.Common;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
Expand Down Expand Up @@ -88,6 +89,9 @@ public class ExecuteDatasetExtractionSource : IPluginDataFlowSource<DataTable>,
"Exclusion list. A collection of Catalogues which will never be considered for HASH JOIN even when UseHashJoins is enabled. Being on this list takes precedence for a Catalogue even if it is on UseHashJoinsForCatalogues.")]
public Catalogue[] DoNotUseHashJoinsForCatalogues { get; set; }

[DemandsInitialization("When performing an extracton, copy the cohort into a temporary table to improve extraction speed", defaultValue: false)]
public bool UseTempTablesWhenExtractingCohort { get; set; }


/// <summary>
/// This is a dictionary containing all the CatalogueItems used in the query, the underlying datatype in the origin database and the
Expand All @@ -99,13 +103,18 @@ public class ExecuteDatasetExtractionSource : IPluginDataFlowSource<DataTable>,

private DbDataCommandDataFlowSource _hostedSource;

private IExternalCohortTable _externalCohortTable;
private string _whereSQL;
private DbConnection _con;
private string _uuid;
protected virtual void Initialize(ExtractDatasetCommand request)
{
Request = request;

if (request == ExtractDatasetCommand.EmptyCommand)
return;

_externalCohortTable = request.ExtractableCohort.ExternalCohortTable;
_whereSQL = request.ExtractableCohort.WhereSQL();
_timeSpentValidating = new Stopwatch();
_timeSpentCalculatingDISTINCT = new Stopwatch();
_timeSpentBuckettingDates = new Stopwatch();
Expand Down Expand Up @@ -156,6 +165,71 @@ private void Initialize(ExtractGlobalsCommand request)

private RowPeeker _peeker = new();

private static readonly Random random = new Random();

private static string RandomString(int length)
{
const string chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
return new string(Enumerable.Repeat(chars, length)
.Select(s => s[random.Next(s.Length)]).ToArray());
}

private void CreateCohortTempTable(DbConnection con, IDataLoadEventListener listener)
{
_uuid = $"#{RandomString(24)}";
var sql = "";
var db = _externalCohortTable.Discover();
switch (db.Server.DatabaseType)
{
case DatabaseType.MicrosoftSQLServer:
sql = $"""
SELECT *
INTO {_uuid}
FROM(
SELECT * FROM {_externalCohortTable.TableName}
WHERE {_whereSQL}
) as cohortTempTable
""";
break;
case DatabaseType.MySql:
sql = $"""
CREATE TEMPORARY TABLE {_uuid} ENGINE=MEMORY
as (SELECT * FROM {_externalCohortTable.TableName} WHERE {_whereSQL})
""";
break;
case DatabaseType.Oracle:
sql= $"""
CREATE TEMPORARY TABLE {_uuid} SELECT * FROM {_externalCohortTable.TableName} WHERE {_whereSQL}
""";
break;
case DatabaseType.PostgreSql:
sql= $"""
CREATE TEMP TABLE {_uuid} AS
SELECT * FROM {_externalCohortTable.TableName} WHERE {_whereSQL}
""";
break;
default:
listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning, $"Unable to create temporary table for cohort. Original cohort table will be used"));
return;


}
listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information, $"About to copy the cohort into a temporary table using the SQL: {sql}"));

using var cmd = db.Server.GetCommand(sql, con);
cmd.CommandTimeout = ExecutionTimeout;
try
{
cmd.ExecuteNonQuery();
}
catch (Exception ex) {
listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning, $"Unable to create temporary table for cohort. Original cohort table will be used",ex));
_uuid = null;
}
listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information, $"Cohort successfully copied to temporary table"));

}

public virtual DataTable GetChunk(IDataLoadEventListener listener, GracefulCancellationToken cancellationToken)
{
// we are in the Global Commands case, let's return an empty DataTable (not null)
Expand Down Expand Up @@ -185,21 +259,31 @@ public virtual DataTable GetChunk(IDataLoadEventListener listener, GracefulCance

if (_hostedSource == null)
{
StartAudit(Request.QueryBuilder.SQL);
if (UseTempTablesWhenExtractingCohort)
{
_con = DatabaseCommandHelper.GetConnection(Request.GetDistinctLiveDatabaseServer().Builder);
_con.Open();
CreateCohortTempTable(_con, listener);
}
var cmdSql = GetCommandSQL(listener);
StartAudit(cmdSql);

if (Request.DatasetBundle.DataSet.DisableExtraction)
throw new Exception(
$"Cannot extract {Request.DatasetBundle.DataSet} because DisableExtraction is set to true");

_hostedSource = new DbDataCommandDataFlowSource(GetCommandSQL(listener),
_hostedSource = UseTempTablesWhenExtractingCohort ? new DbDataCommandDataFlowSource(cmdSql,
$"ExecuteDatasetExtraction {Request.DatasetBundle.DataSet}",
Request.GetDistinctLiveDatabaseServer().Builder,
_con,
ExecutionTimeout) : new DbDataCommandDataFlowSource(cmdSql,
$"ExecuteDatasetExtraction {Request.DatasetBundle.DataSet}",
Request.GetDistinctLiveDatabaseServer().Builder,
ExecutionTimeout)
{
// If we are running in batches then always allow empty extractions
AllowEmptyResultSets = AllowEmptyExtractions || Request.IsBatchResume,
BatchSize = BatchSize
};
{
// If we are running in batches then always allow empty extractions
AllowEmptyResultSets = AllowEmptyExtractions || Request.IsBatchResume,
BatchSize = BatchSize
};
}

DataTable chunk = null;
Expand Down Expand Up @@ -451,7 +535,10 @@ private string GetCommandSQL(IDataLoadEventListener listener)

listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
$"/*Decided on extraction SQL:*/{Environment.NewLine}{sql}"));

if (UseTempTablesWhenExtractingCohort && _uuid is not null)
{
sql = sql.Replace(_externalCohortTable.TableName, _uuid);
}
return sql;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,17 @@ public DbDataCommandDataFlowSource(string sql, string taskBeingPerformed, DbConn
BatchSize = 10000;
}

public DbDataCommandDataFlowSource(string sql, string taskBeingPerformed, DbConnection con,
int timeout)
{
Sql = sql;
_taskBeingPerformed = taskBeingPerformed;
_con = con;
_timeout = timeout;

BatchSize = 10000;
}

private int _numberOfColumns;

private bool _firstChunk = true;
Expand All @@ -59,8 +70,11 @@ public DataTable GetChunk(IDataLoadEventListener job, GracefulCancellationToken
{
if (_reader == null)
{
_con = DatabaseCommandHelper.GetConnection(_builder);
_con.Open();
_con = _con ==null?DatabaseCommandHelper.GetConnection(_builder):_con;
if(_con != null && _con.State == ConnectionState.Closed)
{
_con.Open();
}

job.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
$"Running SQL:{Environment.NewLine}{Sql}"));
Expand Down Expand Up @@ -209,11 +223,13 @@ private void CloseReader(IDataLoadEventListener listener)
public DataTable TryGetPreview()
{
var chunk = new DataTable();
using var con = DatabaseCommandHelper.GetConnection(_builder);
con.Open();
using var da = DatabaseCommandHelper.GetDataAdapter(DatabaseCommandHelper.GetCommand(Sql, con));
_con = _con == null ? DatabaseCommandHelper.GetConnection(_builder) : _con;
if (_con != null && _con.State == ConnectionState.Closed)
{
_con.Open();
}
using var da = DatabaseCommandHelper.GetDataAdapter(DatabaseCommandHelper.GetCommand(Sql, _con));
var read = da.Fill(0, 100, chunk);

return read == 0 ? null : chunk;
}
}