Synapse Minimal Test Framework – Part 3: Advanced pipeline test

In this 5 part series I will be talking about the Synapse Minimal Test Framework. The why, how and whatnot:

Want to dive straight into the code?

Want to skip reading? All code below can be found on GitHub: jpaarhuis/synapse-minimal-test-framework: Minimal test framework to test Azure Synapse pipelines with MSTest (CI included) (github.com)

Testing a complex pipeline using a prepared input file, pipeline parameters and SQL Serverless

In Part 2, we demonstrated how to use our minimal test framework to test a simple Azure Synapse pipeline that unzips a file from one blob container to another. In this article, we will take a step further and show you how to test a more complex pipeline that writes to a Delta Lake table using parameters and then verify the outcome with SQL on an external table (SQL Serverless).

For this example we’re using the AAA pattern again (as described in part 2) to do the following:

  1. Arrange: Put a JSON input file in place with a unique value
  2. Act: Execute the pipeline with a custom pipeline parameter to use the JSON generated in the previous step
  3. Assert: Check whether the pipeline ran correctly and check if the records are updated with the unique value

Last time I showed how the StorageHelper and PipelineHelper work. This time we use a SqlHelper class to do specific data checks. There is a timeout retry mechanic in place since SQL Serverless could take some time to initialize and throw a timeout exception in the meantime.

public class SqlHelper
{
    private const int SQL_TIMEOUT_ERROR_CODE = -2146232060;
    private string ondemandSqlServer;

    public SqlHelper(string synapseDevUrl)
    {
        var client = new Azure.Analytics.Synapse.Artifacts.WorkspaceClient(new Uri(synapseDevUrl), new DefaultAzureCredential());
        var workspace = client.Get();

        ondemandSqlServer = workspace.Value.ConnectivityEndpoints["sqlOnDemand"];
    }

    public DataTable RetrieveQueryResults(string dbName, string sql)
    {
        string sqlConnectionString = $"Server={ondemandSqlServer}; Authentication=Active Directory Default; Encrypt=True; Database={dbName};";

        var ds = new DataSet();

        RetryOnTimeout(() => {
            using var connection = new SqlConnection(sqlConnectionString);
            connection.Open();

            var command = new SqlCommand(sql, connection);
            var adapter = new SqlDataAdapter(command);

            adapter.Fill(ds);
        });
        
        return ds.Tables[0];
    }

    private void RetryOnTimeout(Action sqlMethod)
    {
        var timeout = false;
        var retries = 0;

        do
        {
            if (timeout)
            {
                // wait 60 seconds before retrying
                Task.Delay(60 * 1000).Wait();
            }

            timeout = false;
            retries++;
            try
            {
                sqlMethod();
            }
            catch (SqlException ex) when (ex.ErrorCode == SQL_TIMEOUT_ERROR_CODE)
            {
                timeout = true;
            }
        } while (timeout && retries <= 3);

        if (timeout)
        {
            throw new TimeoutException("SQL query timed out");
        }
    }
}

The test will then look like this:

    [TestMethod]
    public async Task When_appending_to_delta_lake_Expect_new_lines_in_external_table()
    {
        string uniqueUnixTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds().ToString();

        // arrange : upload test file with unique value to expect in assert
        await storageHelper.UploadFileAsync("mycontainer", "", "mytest.json", new BinaryData(new
        {
            ColumnName = "Edition",
            ColumnValue = uniqueUnixTime,
            Ids = "9782503574318,9782503585390",
            User = "AutomatedTest"
        }));

        // act : run pl_json_append_delta_lake pipeline
        var parameters = new Dictionary<string, object>() { ["triggerFile"] = "mytest.json" };
        var result = await pipelineHelper.RunAndAwaitAsync("pl_json_append_delta_lake", TimeSpan.FromMinutes(10), parameters);

        // assert: check that new lines are in external table
        string sql = @$"
            SELECT Edition, Notes FROM [default].[dbo].[sat_topics]
            WHERE ISBN IN ('9782503574318', '9782503574318')
            AND Edition = '{uniqueUnixTime}'
            AND Notes = 'AutomatedTest'
        ";
        var table = sqlHelper.RetrieveQueryResults("DataExplorationDB", sql);

        Assert.AreEqual("Succeeded", result.Status);
        Assert.AreEqual(2, table.Rows.Count);
    }

And running the test will give me a successful result.

Next

In the next part, we will explore how to automate the testing process using Github Actions. We will walk through the steps to create a workflow file that defines the pipeline for Github Actions and run the tests. By automating our testing process with Github Actions, we can ensure that our pipelines are always working correctly and catch any issues early on in the development cycle.


Comments

Leave a Reply

Your email address will not be published. Required fields are marked *