Synapse Minimal Test Framework – Part 2: Simple pipeline test

In this 5 part series I will be talking about the Synapse Minimal Test Framework. The why, how and whatnot:

Want to dive straight into the code?

Want to skip reading? All code below can be found on GitHub: jpaarhuis/synapse-minimal-test-framework: Minimal test framework to test Azure Synapse pipelines with MSTest (CI included) (github.com)

Testing a simple pipeline using input and output

A general good practice in writing test is using the AAA (Arrange, Act, Assert) pattern. We are using this pattern to test a pipeline that unzips a file from one location and extracts it to a second location. For this pipeline we can provide an input zip file and check the extracted output file.

  1. Arrange: Put the zip file in place in Azure Storage and clear file from previous test run
  2. Act: Execute the pipeline
  3. Assert: Check whether the pipeline succeeded and the extracted file is present

For step 1 and 3 (writing and reading files to and from Azure Storage) we need to access storage. The StorageHelper from the framework will help with that.

public class StorageHelper
{
    private const string testFilesFolder = "TestFiles";
    private BlobServiceClient blobServiceClient;

    public StorageHelper(string storageUrl)
    {
        var tokenCredential = new DefaultAzureCredential();
        blobServiceClient = new BlobServiceClient(new Uri(storageUrl), tokenCredential);
    }

    public async Task UploadFileAsync(string container, string folder, string testFile)
    {
        var containerClient = blobServiceClient.GetBlobContainerClient(container);

        using var fileStream = new FileStream(Path.Combine(testFilesFolder, testFile), FileMode.Open);

        await DeleteBlobIfExistsAsync(container, folder, testFile);
        await containerClient.UploadBlobAsync(Path.Combine(folder, testFile), fileStream);
    }

    public async Task DeleteBlobIfExistsAsync(string container, string folder, string blobFile)
    {
        var containerClient = blobServiceClient.GetBlobContainerClient(container);

        await containerClient.DeleteBlobIfExistsAsync(Path.Combine(folder, blobFile));
    }

    public async Task<bool> BlobExistsAsync(string container, string folder, string blobFile)
    {
        var containerClient = blobServiceClient.GetBlobContainerClient(container);

        var blobClient = containerClient.GetBlobClient(Path.Combine(folder, blobFile));

        return await blobClient.ExistsAsync();
    }
    
    // ...
}

For step 2 (execute the pipeline) we need the PipelineHelper to kick off Synapse pipelines.

public class PipelineHelper
{
    private const int pollingIntervalSeconds = 2;
    private readonly PipelineClient pipelineClient;
    private readonly PipelineRunClient pipelineRunClient;

    public PipelineHelper(string synapseDevUrl)
    {
        var tokenCredential = new DefaultAzureCredential();

        pipelineClient = new PipelineClient(new Uri(synapseDevUrl), tokenCredential);
        pipelineRunClient = new PipelineRunClient(new Uri(synapseDevUrl), tokenCredential);
    }

    public async Task<PipelineRunResult> RunAndAwaitAsync(string pipelineName, TimeSpan timeout, IDictionary<string, object>? parameters = null)
    {
        if (timeout.TotalMinutes > 120 || timeout.TotalMinutes < 0)
        {
            throw new ArgumentOutOfRangeException(nameof(timeout), "Timeout should be between 0 and 120 minutes");
        }

        var response = await pipelineClient.CreatePipelineRunAsync(pipelineName, parameters: parameters);
        string runId = response.Value.RunId;

        Response<PipelineRun> run;

        var retries = timeout.TotalSeconds / pollingIntervalSeconds;

        do
        {
            await Task.Delay(TimeSpan.FromSeconds(pollingIntervalSeconds));
            run = await pipelineRunClient.GetPipelineRunAsync(runId);
        } while (retries == 0 
            || run.Value.Status == "Queued" 
            || run.Value.Status == "InProgress" 
            || run.Value.Status == "Canceling");

        return new PipelineRunResult(runId, run.Value.Status);
    }

    // ...
}

Then we can add the zip file to the solution in the TestFiles folder.

And finally write the test.

    [TestMethod]
    public async Task When_unzipping_Expect_file_unzipped_and_zip_removed()
    {
        // arrange : clean previous results and prep with test file
        await storageHelper.UploadFileAsync("mycontainer", "zipped", "test.zip");
        await storageHelper.DeleteBlobIfExistsAsync("mycontainer", "unzipped", "unzipped.txt");

        // act : run pl_unzip pipeline
        var result = await pipelineHelper.RunAndAwaitAsync("pl_unzip", TimeSpan.FromMinutes(10));

        // assert : check that the file is unzipped and the zip file is removed
        var unzippedExists = await storageHelper.BlobExistsAsync("mycontainer", "unzipped", "unzipped.txt");
        var zipExists = await storageHelper.BlobExistsAsync("mycontainer", "zipped", "test.zip");

        Assert.AreEqual("Succeeded", result.Status);
        Assert.IsTrue(unzippedExists);
        Assert.IsFalse(zipExists);
    }

And run it with dotnet test or using VSCode or Visual Studio Test Explorer.

Next

In part 3 we will take things to a next level and test a pipeline with parameters and check the results using Synapse Serverless SQL (Built-in).


Comments

Leave a Reply

Your email address will not be published. Required fields are marked *