In this 5 part series I will be talking about the Synapse Minimal Test Framework. The why, how and whatnot:
- Part 1: Let’s get testing – Introduction, considerations, and approach
- Part 2: Simple pipeline test – Testing a simple unzip pipeline using Storage
- Part 3: Advanced pipeline test – Testing a complex pipeline using SQL
- Part 4: CI with Github – Automate testing using Github Actions
- Part 5: CI with Azure DevOps – Automate testing using Azure DevOps Pipelines
Want to dive straight into the code?
Want to skip reading? All code below can be found on GitHub: jpaarhuis/synapse-minimal-test-framework: Minimal test framework to test Azure Synapse pipelines with MSTest (CI included) (github.com)
Testing a simple pipeline using input and output
A general good practice in writing test is using the AAA (Arrange, Act, Assert) pattern. We are using this pattern to test a pipeline that unzips a file from one location and extracts it to a second location. For this pipeline we can provide an input zip file and check the extracted output file.
- Arrange: Put the zip file in place in Azure Storage and clear file from previous test run
- Act: Execute the pipeline
- Assert: Check whether the pipeline succeeded and the extracted file is present
For step 1 and 3 (writing and reading files to and from Azure Storage) we need to access storage. The StorageHelper from the framework will help with that.
public class StorageHelper
{
private const string testFilesFolder = "TestFiles";
private BlobServiceClient blobServiceClient;
public StorageHelper(string storageUrl)
{
var tokenCredential = new DefaultAzureCredential();
blobServiceClient = new BlobServiceClient(new Uri(storageUrl), tokenCredential);
}
public async Task UploadFileAsync(string container, string folder, string testFile)
{
var containerClient = blobServiceClient.GetBlobContainerClient(container);
using var fileStream = new FileStream(Path.Combine(testFilesFolder, testFile), FileMode.Open);
await DeleteBlobIfExistsAsync(container, folder, testFile);
await containerClient.UploadBlobAsync(Path.Combine(folder, testFile), fileStream);
}
public async Task DeleteBlobIfExistsAsync(string container, string folder, string blobFile)
{
var containerClient = blobServiceClient.GetBlobContainerClient(container);
await containerClient.DeleteBlobIfExistsAsync(Path.Combine(folder, blobFile));
}
public async Task<bool> BlobExistsAsync(string container, string folder, string blobFile)
{
var containerClient = blobServiceClient.GetBlobContainerClient(container);
var blobClient = containerClient.GetBlobClient(Path.Combine(folder, blobFile));
return await blobClient.ExistsAsync();
}
// ...
}
For step 2 (execute the pipeline) we need the PipelineHelper to kick off Synapse pipelines.
public class PipelineHelper
{
private const int pollingIntervalSeconds = 2;
private readonly PipelineClient pipelineClient;
private readonly PipelineRunClient pipelineRunClient;
public PipelineHelper(string synapseDevUrl)
{
var tokenCredential = new DefaultAzureCredential();
pipelineClient = new PipelineClient(new Uri(synapseDevUrl), tokenCredential);
pipelineRunClient = new PipelineRunClient(new Uri(synapseDevUrl), tokenCredential);
}
public async Task<PipelineRunResult> RunAndAwaitAsync(string pipelineName, TimeSpan timeout, IDictionary<string, object>? parameters = null)
{
if (timeout.TotalMinutes > 120 || timeout.TotalMinutes < 0)
{
throw new ArgumentOutOfRangeException(nameof(timeout), "Timeout should be between 0 and 120 minutes");
}
var response = await pipelineClient.CreatePipelineRunAsync(pipelineName, parameters: parameters);
string runId = response.Value.RunId;
Response<PipelineRun> run;
var retries = timeout.TotalSeconds / pollingIntervalSeconds;
do
{
await Task.Delay(TimeSpan.FromSeconds(pollingIntervalSeconds));
run = await pipelineRunClient.GetPipelineRunAsync(runId);
} while (retries == 0
|| run.Value.Status == "Queued"
|| run.Value.Status == "InProgress"
|| run.Value.Status == "Canceling");
return new PipelineRunResult(runId, run.Value.Status);
}
// ...
}
Then we can add the zip file to the solution in the TestFiles
folder.
And finally write the test.
[TestMethod]
public async Task When_unzipping_Expect_file_unzipped_and_zip_removed()
{
// arrange : clean previous results and prep with test file
await storageHelper.UploadFileAsync("mycontainer", "zipped", "test.zip");
await storageHelper.DeleteBlobIfExistsAsync("mycontainer", "unzipped", "unzipped.txt");
// act : run pl_unzip pipeline
var result = await pipelineHelper.RunAndAwaitAsync("pl_unzip", TimeSpan.FromMinutes(10));
// assert : check that the file is unzipped and the zip file is removed
var unzippedExists = await storageHelper.BlobExistsAsync("mycontainer", "unzipped", "unzipped.txt");
var zipExists = await storageHelper.BlobExistsAsync("mycontainer", "zipped", "test.zip");
Assert.AreEqual("Succeeded", result.Status);
Assert.IsTrue(unzippedExists);
Assert.IsFalse(zipExists);
}
And run it with dotnet test
or using VSCode or Visual Studio Test Explorer.
Next
In part 3 we will take things to a next level and test a pipeline with parameters and check the results using Synapse Serverless SQL (Built-in).
Leave a Reply