Skip to content

Commit

Permalink
feat: Added dataset parameters to dataflow debug settings and added t…
Browse files Browse the repository at this point in the history
…ests (#228)

* Added dataset parameters to dataflow debug settings and added tests

* Update src/Arcus.Testing.Integration.DataFactory/TemporaryDataFlowDebugSession.cs

Co-authored-by: Stijn Moreels <[email protected]>

* Update src/Arcus.Testing.Integration.DataFactory/TemporaryDataFlowDebugSession.cs

Co-authored-by: Stijn Moreels <[email protected]>

* Refactor DataFlow handling and add dataset parameter support:
- Reorganized using directives in TemporaryDataFlowDebugSession.cs.
- Modified RunDataFlowOptions to change DataSetParameters type, add null checks, and handle nested dictionaries.
- Enhanced TemporaryDataFactoryDataFlow with new properties, methods, and dataset parameter handling.
- Updated RunDataFlowTests with a new test for dataset parameters and removed obsolete tests.

* Removed deprecated test

* Updated integration DataFactory documentation to include new method available in options to pass dataset parameter(s)

* Added 2 parameters to source dataset and 1 parameter to sink dataset

* Moved Name param on top of the public param list

* Update docs/preview/02-Features/06-Integration/01-data-factory.mdx

Co-authored-by: Stijn Moreels <[email protected]>

* Update docs/preview/02-Features/06-Integration/01-data-factory.mdx

Co-authored-by: Stijn Moreels <[email protected]>

* Update src/Arcus.Testing.Integration.DataFactory/TemporaryDataFlowDebugSession.cs

Co-authored-by: Stijn Moreels <[email protected]>

* Renamed datasetname param by sourceOrSinkName

* Small improvement

* Refactor DataFlow options handling and update tests

- Introduced new classes to encapsulate DataSet options for source and sink in Azure DataFactory.
- Updated methods to utilize these new classes, enhancing flexibility and configurability.
- Refactored and removed redundant methods, consolidating functionality.
- Updated tests to reflect these changes and added a helper method for generating randomized DataSet parameter keys and values.

* Removed commented method

* Update src/Arcus.Testing.Tests.Integration/Integration/DataFactory/RunDataFlowTests.cs

Co-authored-by: Stijn Moreels <[email protected]>

* Renamed `SetSourceDataSetParameterKeyValues` and `SetSinkDataSetParameterKeyValues` to `AddFolderPathParameters`

* - Removed SourceDataSetParameterKeyValues and SinkDataSetParameterKeyValues properties.
- Updated methods to use dataFlowOptions directly.
- Simplified UploadToSourceAsync method signature and filePath construction.

* Removed the unnecessary `null` argument from `UploadToSourceAsync`
method calls in `RunDataFlowTests.cs`

* Update src/Arcus.Testing.Tests.Integration/Integration/DataFactory/Fixture/TemporaryDataFactoryDataFlow.cs

Co-authored-by: Stijn Moreels <[email protected]>

* Update src/Arcus.Testing.Tests.Integration/Integration/DataFactory/RunDataFlowTests.cs

Co-authored-by: Stijn Moreels <[email protected]>

* - Removed SinkDataSetParameterValue from TemporaryDataFactoryDataFlow.
- Consolidated AddDataSetParameters into ApplyOptions with new parameter.
- Modified ApplyOptions signatures to remove ref and add new argument.
- Directly pass dataset parameter key-values to ApplyOptions.

* - Made `tempDataFlowOptions` optional in `CreateWithCsvSinkSourceAsync`.
- Removed `sourceDataSetParameterKeyValues` and `sinkDataSetParameterKeyValues` from `ApplyOptions` method calls and definitions.
- Updated `ApplyOptions` to iterate over `SourceDataSetParameterKeyValues` and `SinkDataSetParameterKeyValues` directly.
- Adjusted `RunDataFlowTests` to reflect the optional `tempDataFlowOptions` parameter.

---------

Co-authored-by: Stijn Moreels <[email protected]>
  • Loading branch information
ClementVaillantCodit and stijnmoreels authored Dec 4, 2024
1 parent ab26e30 commit b826f6d
Show file tree
Hide file tree
Showing 5 changed files with 287 additions and 41 deletions.
5 changes: 5 additions & 0 deletions docs/preview/02-Features/06-Integration/01-data-factory.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,11 @@ await session.RunDataFlowAsync(..., options =>
// Adds a parameter to the data flow upon starting.
options.AddDataFlowParameter("<name>", "<value>");

// Adds a dataset parameter to the data flow upon starting.
// 👀 Note that the source or sink name should be the "Output stream name" of the source or sink dataset in the DataFlow, not the actual DataSet name.
// For more info: https://learn.microsoft.com/en-us/azure/data-factory/data-flow-source#source-settings
options.AddDataSetParameter(“<soure-or-sink-name>", “<parameter-name>", “<parameter-value>");

// Add additional linked services to the debug session.
// 💡 This can be useful to add, for example, an additional Key vault linked services for certain authentication types of datasets.
options.AddLinkedService("datafactory_sales_keyvaultLS");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public static async Task<TemporaryDataFlowDebugSession> StartDebugSessionAsync(
}

logger.LogTrace("[Test:Setup] Starting Azure DataFactory '{Name}' DataFlow debug session... (might take up to 3 min to start up)", resource.Id.Name);
ArmOperation<DataFactoryDataFlowCreateDebugSessionResult> result =
ArmOperation<DataFactoryDataFlowCreateDebugSessionResult> result =
await resource.CreateDataFlowDebugSessionAsync(WaitUntil.Completed, new DataFactoryDataFlowDebugSessionContent
{
TimeToLiveInMinutes = options.TimeToLiveInMinutes
Expand Down Expand Up @@ -247,7 +247,7 @@ public async Task<DataFlowRunResult> RunDataFlowAsync(
configureOptions?.Invoke(options);

await StartDataFlowAsync(dataFlowName, options);

return await GetDataFlowResultAsync(dataFlowName, targetSinkName, options);
}

Expand Down Expand Up @@ -277,12 +277,27 @@ private async Task StartDataFlowAsync(string dataFlowName, RunDataFlowOptions op
private DataFlowDebugPackageDebugSettings CreateDebugSettings(RunDataFlowOptions options)
{
var settings = new DataFlowDebugPackageDebugSettings();

foreach (KeyValuePair<string, BinaryData> parameter in options.DataFlowParameters)
{
_logger.LogTrace("[Test:Setup] Add DataFlow parameter '{Name}' to debug session", parameter.Key);
settings.Parameters[parameter.Key] = parameter.Value;
}

foreach (KeyValuePair<string, IDictionary<string, object>> datasetParameter in options.DataSetParameters)
{
foreach (KeyValuePair<string, object> parameter in datasetParameter.Value)
{
_logger.LogTrace("[Test:Setup] Add DataSet parameter '{ParameterName}' for DataSet '{DataSetName}' to debug session",
parameter.Key,
datasetParameter.Key
);
}
}

string jsonString = System.Text.Json.JsonSerializer.Serialize(options.DataSetParameters);
settings.DatasetParameters = BinaryData.FromString(jsonString);

return settings;
}

Expand Down Expand Up @@ -343,7 +358,7 @@ private async Task<DataFlowRunResult> GetDataFlowResultAsync(string dataFlowName
{
_logger.LogTrace("[Test] Run DataFlow '{DataFlowName}' until result is available on sink '{SinkName}'", dataFlowName, targetSinkName);

ArmOperation<DataFactoryDataFlowDebugCommandResult> result =
ArmOperation<DataFactoryDataFlowDebugCommandResult> result =
await DataFactory.ExecuteDataFlowDebugSessionCommandAsync(WaitUntil.Completed, new DataFlowDebugCommandContent
{
Command = "executePreviewQuery",
Expand Down Expand Up @@ -380,7 +395,7 @@ public async ValueTask DisposeAsync()
}
}

/// <summary>
/// <summary>
/// Represents the run options when calling the <see cref="TemporaryDataFlowDebugSession.RunDataFlowAsync(string,string,Action{RunDataFlowOptions})"/>.
/// </summary>
public class RunDataFlowOptions
Expand All @@ -389,22 +404,57 @@ public class RunDataFlowOptions

internal Collection<string> LinkedServiceNames { get; } = new();
internal IDictionary<string, BinaryData> DataFlowParameters { get; } = new Dictionary<string, BinaryData>();
internal IDictionary<string, IDictionary<string, object>> DataSetParameters { get; } = new Dictionary<string, IDictionary<string, object>>();

/// <summary>
/// Adds a parameter to the DataFlow to run.
/// </summary>
/// <exception cref="ArgumentException">Thrown when the <paramref name="name"/> is blank.</exception>
/// <exception cref="ArgumentNullException">Thrown when the <paramref name="value"/> is null.</exception>
public RunDataFlowOptions AddDataFlowParameter(string name, object value)
{
if (string.IsNullOrWhiteSpace(name))
{
throw new ArgumentException("DataFlow parameter name should not be blank", nameof(name));
}

ArgumentNullException.ThrowIfNull(value);

DataFlowParameters[name] = BinaryData.FromObjectAsJson(value);
return this;
}

/// <summary>
/// Adds a parameter to a DataSet that is part of the targeted DataFlow.
/// </summary>
/// <remarks>
/// The <paramref name="sourceOrSinkName"/> should be the "Output stream name" of the source or sink dataset in the DataFlow, not than the actual DataSet name, see <a href="https://learn.microsoft.com/en-us/azure/data-factory/data-flow-source#source-settings" />.
/// </remarks>///
/// <exception cref="ArgumentException">Thrown when the <paramref name="sourceOrSinkName"/> or the <paramref name="parameterName"/> is blank.</exception>
/// <exception cref="ArgumentNullException">Thrown when the <paramref name="parameterValue"/> is null.</exception>
public RunDataFlowOptions AddDataSetParameter(string sourceOrSinkName, string parameterName, object parameterValue)
{
if (string.IsNullOrWhiteSpace(sourceOrSinkName))
{
throw new ArgumentException("Source or Sink name should not be blank", nameof(sourceOrSinkName));
}

if (string.IsNullOrWhiteSpace(parameterName))
{
throw new ArgumentException("DataSet parameter name should not be blank", nameof(parameterName));
}

ArgumentNullException.ThrowIfNull(parameterValue);

if (!DataSetParameters.ContainsKey(sourceOrSinkName))
{
DataSetParameters[sourceOrSinkName] = new Dictionary<string, object>();
}

DataSetParameters[sourceOrSinkName].Add(parameterName, parameterValue);
return this;
}

/// <summary>
/// Gets or sets the limit of rows for the preview response of the DataFlow run (default: 100 rows).
/// </summary>
Expand Down
Loading

0 comments on commit b826f6d

Please sign in to comment.