diff --git a/CHANGELOG.md b/CHANGELOG.md index 5cc94376cd..70f315472a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Fix application restart not closing all windows - Improve cohort deprecation override test - Add Filters for CatalogueItems to Dashboard graphs +- Add ability to use cohort temp table during extractions ## [8.4.2] - 2024-12-18 diff --git a/Rdmp.Core/DataExport/DataExtraction/Pipeline/Sources/ExecuteDatasetExtractionSource.cs b/Rdmp.Core/DataExport/DataExtraction/Pipeline/Sources/ExecuteDatasetExtractionSource.cs index d6e602bb4b..3daced40fa 100644 --- a/Rdmp.Core/DataExport/DataExtraction/Pipeline/Sources/ExecuteDatasetExtractionSource.cs +++ b/Rdmp.Core/DataExport/DataExtraction/Pipeline/Sources/ExecuteDatasetExtractionSource.cs @@ -1,4 +1,4 @@ -// Copyright (c) The University of Dundee 2018-2019 +// Copyright (c) The University of Dundee 2018-2025 // This file is part of the Research Data Management Platform (RDMP). // RDMP is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. // RDMP is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. @@ -7,6 +7,7 @@ using System; using System.Collections.Generic; using System.Data; +using System.Data.Common; using System.Diagnostics; using System.Linq; using System.Threading.Tasks; @@ -88,6 +89,9 @@ public class ExecuteDatasetExtractionSource : IPluginDataFlowSource, "Exclusion list. A collection of Catalogues which will never be considered for HASH JOIN even when UseHashJoins is enabled. Being on this list takes precedence for a Catalogue even if it is on UseHashJoinsForCatalogues.")] public Catalogue[] DoNotUseHashJoinsForCatalogues { get; set; } + [DemandsInitialization("When performing an extracton, copy the cohort into a temporary table to improve extraction speed", defaultValue: false)] + public bool UseTempTablesWhenExtractingCohort { get; set; } + /// /// This is a dictionary containing all the CatalogueItems used in the query, the underlying datatype in the origin database and the @@ -99,13 +103,18 @@ public class ExecuteDatasetExtractionSource : IPluginDataFlowSource, private DbDataCommandDataFlowSource _hostedSource; + private IExternalCohortTable _externalCohortTable; + private string _whereSQL; + private DbConnection _con; + private string _uuid; protected virtual void Initialize(ExtractDatasetCommand request) { Request = request; if (request == ExtractDatasetCommand.EmptyCommand) return; - + _externalCohortTable = request.ExtractableCohort.ExternalCohortTable; + _whereSQL = request.ExtractableCohort.WhereSQL(); _timeSpentValidating = new Stopwatch(); _timeSpentCalculatingDISTINCT = new Stopwatch(); _timeSpentBuckettingDates = new Stopwatch(); @@ -156,6 +165,71 @@ private void Initialize(ExtractGlobalsCommand request) private RowPeeker _peeker = new(); + private static readonly Random random = new Random(); + + private static string RandomString(int length) + { + const string chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; + return new string(Enumerable.Repeat(chars, length) + .Select(s => s[random.Next(s.Length)]).ToArray()); + } + + private void CreateCohortTempTable(DbConnection con, IDataLoadEventListener listener) + { + _uuid = $"#{RandomString(24)}"; + var sql = ""; + var db = _externalCohortTable.Discover(); + switch (db.Server.DatabaseType) + { + case DatabaseType.MicrosoftSQLServer: + sql = $""" + SELECT * + INTO {_uuid} + FROM( + SELECT * FROM {_externalCohortTable.TableName} + WHERE {_whereSQL} + ) as cohortTempTable + """; + break; + case DatabaseType.MySql: + sql = $""" + CREATE TEMPORARY TABLE {_uuid} ENGINE=MEMORY + as (SELECT * FROM {_externalCohortTable.TableName} WHERE {_whereSQL}) + """; + break; + case DatabaseType.Oracle: + sql= $""" + CREATE TEMPORARY TABLE {_uuid} SELECT * FROM {_externalCohortTable.TableName} WHERE {_whereSQL} + """; + break; + case DatabaseType.PostgreSql: + sql= $""" + CREATE TEMP TABLE {_uuid} AS + SELECT * FROM {_externalCohortTable.TableName} WHERE {_whereSQL} + """; + break; + default: + listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning, $"Unable to create temporary table for cohort. Original cohort table will be used")); + return; + + + } + listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information, $"About to copy the cohort into a temporary table using the SQL: {sql}")); + + using var cmd = db.Server.GetCommand(sql, con); + cmd.CommandTimeout = ExecutionTimeout; + try + { + cmd.ExecuteNonQuery(); + } + catch (Exception ex) { + listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning, $"Unable to create temporary table for cohort. Original cohort table will be used",ex)); + _uuid = null; + } + listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information, $"Cohort successfully copied to temporary table")); + + } + public virtual DataTable GetChunk(IDataLoadEventListener listener, GracefulCancellationToken cancellationToken) { // we are in the Global Commands case, let's return an empty DataTable (not null) @@ -185,21 +259,31 @@ public virtual DataTable GetChunk(IDataLoadEventListener listener, GracefulCance if (_hostedSource == null) { - StartAudit(Request.QueryBuilder.SQL); + if (UseTempTablesWhenExtractingCohort) + { + _con = DatabaseCommandHelper.GetConnection(Request.GetDistinctLiveDatabaseServer().Builder); + _con.Open(); + CreateCohortTempTable(_con, listener); + } + var cmdSql = GetCommandSQL(listener); + StartAudit(cmdSql); if (Request.DatasetBundle.DataSet.DisableExtraction) throw new Exception( $"Cannot extract {Request.DatasetBundle.DataSet} because DisableExtraction is set to true"); - _hostedSource = new DbDataCommandDataFlowSource(GetCommandSQL(listener), + _hostedSource = UseTempTablesWhenExtractingCohort ? new DbDataCommandDataFlowSource(cmdSql, $"ExecuteDatasetExtraction {Request.DatasetBundle.DataSet}", - Request.GetDistinctLiveDatabaseServer().Builder, + _con, + ExecutionTimeout) : new DbDataCommandDataFlowSource(cmdSql, + $"ExecuteDatasetExtraction {Request.DatasetBundle.DataSet}", + Request.GetDistinctLiveDatabaseServer().Builder, ExecutionTimeout) - { - // If we are running in batches then always allow empty extractions - AllowEmptyResultSets = AllowEmptyExtractions || Request.IsBatchResume, - BatchSize = BatchSize - }; + { + // If we are running in batches then always allow empty extractions + AllowEmptyResultSets = AllowEmptyExtractions || Request.IsBatchResume, + BatchSize = BatchSize + }; } DataTable chunk = null; @@ -451,7 +535,10 @@ private string GetCommandSQL(IDataLoadEventListener listener) listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information, $"/*Decided on extraction SQL:*/{Environment.NewLine}{sql}")); - + if (UseTempTablesWhenExtractingCohort && _uuid is not null) + { + sql = sql.Replace(_externalCohortTable.TableName, _uuid); + } return sql; } diff --git a/Rdmp.Core/DataLoad/Engine/Pipeline/Sources/DbDataCommandDataFlowSource.cs b/Rdmp.Core/DataLoad/Engine/Pipeline/Sources/DbDataCommandDataFlowSource.cs index 67213043cc..440b848446 100644 --- a/Rdmp.Core/DataLoad/Engine/Pipeline/Sources/DbDataCommandDataFlowSource.cs +++ b/Rdmp.Core/DataLoad/Engine/Pipeline/Sources/DbDataCommandDataFlowSource.cs @@ -49,6 +49,17 @@ public DbDataCommandDataFlowSource(string sql, string taskBeingPerformed, DbConn BatchSize = 10000; } + public DbDataCommandDataFlowSource(string sql, string taskBeingPerformed, DbConnection con, + int timeout) + { + Sql = sql; + _taskBeingPerformed = taskBeingPerformed; + _con = con; + _timeout = timeout; + + BatchSize = 10000; + } + private int _numberOfColumns; private bool _firstChunk = true; @@ -59,8 +70,11 @@ public DataTable GetChunk(IDataLoadEventListener job, GracefulCancellationToken { if (_reader == null) { - _con = DatabaseCommandHelper.GetConnection(_builder); - _con.Open(); + _con = _con ==null?DatabaseCommandHelper.GetConnection(_builder):_con; + if(_con != null && _con.State == ConnectionState.Closed) +{ + _con.Open(); + } job.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information, $"Running SQL:{Environment.NewLine}{Sql}")); @@ -209,11 +223,13 @@ private void CloseReader(IDataLoadEventListener listener) public DataTable TryGetPreview() { var chunk = new DataTable(); - using var con = DatabaseCommandHelper.GetConnection(_builder); - con.Open(); - using var da = DatabaseCommandHelper.GetDataAdapter(DatabaseCommandHelper.GetCommand(Sql, con)); + _con = _con == null ? DatabaseCommandHelper.GetConnection(_builder) : _con; + if (_con != null && _con.State == ConnectionState.Closed) + { + _con.Open(); + } + using var da = DatabaseCommandHelper.GetDataAdapter(DatabaseCommandHelper.GetCommand(Sql, _con)); var read = da.Fill(0, 100, chunk); - return read == 0 ? null : chunk; } } \ No newline at end of file