This time we’re going bigger than ever. Fabric, Power BI, SQL, AI and more. We're covering it all. You won't want to miss it.
Learn moreDid you hear? There's a new SQL AI Developer certification (DP-800). Start preparing now and be one of the first to get certified. Register now
In this blog, we will act in the persona of an AVEVA customer who needs to retrieve operations data from AVEVA Data Hub into a Microsoft Fabric Lakehouse.
Note: While this scenario is using AVEVA Data Hub, the concepts translate to interacting with any REST API that uses pagination.
Complete_architecture_of_the_design_pattern_for_retrieving_files_from_a_REST_API
Parameters allow the user to specify multiple conditions such as the AVEVA Data Hub data view, dataset time range, interval of records, file format returned, and number of records each request can return.
List_of_Data_Pipeline_Parameters
Variables are used to manage properties that need to change throughout the execution of the Data Pipeline.
List_of_Data_Pipeline_Variables
Before we can connect to the AVEVA Data Hub data view REST endpoint and retrieve data as a parquet file, we must obtain a Bearer Token to use within the Copy activity source.
Detailed_view_of_the_settings_for_the_Web_Activity_Get_Bearer_Token
Using Web activity, we can create a new Web v2 connection to the REST API endpoint.
Data_Pipelines_Tutorial_Ingest_files_into_a_Lakehouse_from_a_REST_API_with_pagin
Description automatically generated with medium confidence" class="wp-image-2515"/>
NOTE: In the future, we will be able to specify OAuth2.0 as an Authentication Method, which will allow us to securely store the Client ID and Client Secret obtained from AVEVA Data Hub.
Data_Pipelines_Tutorial_Ingest_files_into_a_Lakehouse_from_a_REST_API_with_pagin
Description automatically generated with medium confidence" class="wp-image-2516"/>
@concat('grant_type=',pipeline().parameters.grant_type,'&client_id=',pipeline().parameters.client_id,'&client_secret=',uriComponent(pipeline().parameters.client_secret))
The output of the Web activity contains multiple fields that are important to us later on in the Data Pipeline. The most important is the access_token, which will be used in the Copy activity and proceeding Web Activities as part of the Header. Expires_in tells us how long the access_token will be valid for. If we are doing a historical load with many paginations, there is risk for our process to allow this token to expire. We will discuss next how we can mitigate this risk. Token_type lets us know we are leveraging a Bearer Token.
Data_Pipelines_Tutorial_Ingest_files_into_a_Lakehouse_from_a_REST_API_with_pagin
Description automatically generated" class="wp-image-2517"/>
We will be referring to this auth token throughout the pipeline, and for easy reusability, we want to capture this token and assign it to a pipeline variable.
Initializing_the_BearerToken_variable
@concat('Authorization: Bearer ',activity('Get Bearer Token').output.access_token)
NOTE: In the future, we will have the capability to specify Secure Input/Output of the Set Variable activity, hiding sensitive information.
NOTE: The Copy and Web activity treats Headers differently, so we have to do some string manipulation to fit for each activity. In this case, we want to remove “Authorization: ” within the Web activity because we provide that in the Name field.
| Web activity | |
| Copy activity |
The Bearer Token expires after a certain amount of time. In this scenario, the Web activity that returns the Bearer Token includes an additional field, expires_in which specifies how many seconds this token is valid for. Our process might run longer than this timeframe and we want to prevent losing access and causing the Copy activity to fail. To handle this, we want to create a variable that will hold the future date of when the token expires. During our data loading stage (Until Loop), we will compare this date with the current date and time to determine if we need to generate a new token.
Data_Pipelines_Tutorial_Ingest_files_into_a_Lakehouse_from_a_REST_API_with_pagin
Description automatically generated" class="wp-image-2521"/>
To get the future date, we can create a variable called TokenExpires_In and set the value to:
@string(ticks(addseconds(utcnow(),sub(int(activity(‘Get Bearer Token’).output.expires_in),300))))
Ticks: Return the ticks property value for a specified timestamp. A tick is a 100-nanosecond interval.
NOTE: We are allowing for a buffer of 300 seconds (5 minutes).
Within the Data Loading Stage (Until activity) of our Pipeline, we will start each iteration off with an If Condition activity to check if the current time is greater than or equal to the time our token will expire. If it is, then we will resubmit the web request to obtain a new token and update our BearerToken variable.
Data_Pipelines_Tutorial_Ingest_files_into_a_Lakehouse_from_a_REST_API_with_pagin
Description automatically generated with medium confidence" class="wp-image-2522"/>
@greaterOrEquals(ticks(utcnow()), int(variables('TokenExpires_in')))
NOTE: Variable TokenExpires_in is described above.
The AVEVA Data Hub REST API uses the concept of Pagination. This means that each web request will return only a subset of the payload along with a custom header, Next-Page, containing the endpoint to the next subset of data.
In Microsoft Fabric preview, native pagination is supported by a Copy activity only when your source is using the Connection Type of REST. However, we are unable to use this REST option because this is expecting a JSON object to be returned. In our scenario, we are requesting to pull back a parquet file. Luckily, the other Connection Type available to us is HTTP which allows for File Format to be specified.
With some creative Pipeline design, we can mimic the functionality of the native REST pagination by nesting our data movement logic inside an Until activity. The idea is that we can pass in the current page into the Copy activity, then proceeding the Copy activity, we can use the Web activity to capture the Next-Page. We have to use the Web activity because in the current state, the Copy activity doesn’t expose the response’s custom headers. This process continues to be repeated until the Web activity does not return a Next-Page header, at which time, breaking out of the loop and completing the process.
Prior to interacting with the next page, we must start with the initial endpoint, or first page.
To do this, we are going to leverage the variable: next-page. As the name might imply, we will reuse this variable to hold the relative path for the next page, obtained downstream in the pipeline. However, the initial Copy activity will leverage the value defined from this Set Variable activity: Set First-Page.
Data_Pipelines_Tutorial_Ingest_files_into_a_Lakehouse_from_a_REST_API_with_pagin
Description automatically generated" class="wp-image-2523"/>
@concat('/api/v1/Tenants/',pipeline().parameters.Tenants,'/Namespaces/',pipeline().parameters.Namespaces,'/DataViews/',pipeline().parameters.DataViews,'/Data/Interpolated?startIndex=',pipeline().parameters.startIndex,'&endIndex=',pipeline().parameters.endIndex,'&interval=',pipeline().parameters.interval,'&form=',pipeline().parameters.form)
The Until activity allows us to continuously submit requests to the REST API until some condition is met. This condition is when the request no longer returns the Next-Page custom header. Once this header doesn’t exist anymore, we can set the next-page variable to an empty string. As we covered above, the variable next-page is set to the initial endpoint. Both of these situations allow us to specify the Until activity’s conditional expression to be based on whether or not the next-page variable is empty.
Data_Pipelines_Tutorial_Ingest_files_into_a_Lakehouse_from_a_REST_API_with_pagin
Description automatically generated" class="wp-image-2524"/>
As seen below, a lot of logic is nested within the Until activity.
Data_Pipelines_Tutorial_Ingest_files_into_a_Lakehouse_from_a_REST_API_with_pagin
Description automatically generated with medium confidence" class="wp-image-2525"/>
In order to get the Next-Page Custom Header, we have to use a Web activity since the Copy activity does not output the request response in the current state. The activity Output of the Web activity: Get Next Page is referenced in the Set Variable: Set Next Page.
Data_Pipelines_Tutorial_Ingest_files_into_a_Lakehouse_from_a_REST_API_with_pagin
Description automatically generated with medium confidence" class="wp-image-2526"/>
The Next-Page Custom Header is a complete URL. Since our Copy activity and Web Activities use the Web and Web v2 connectors, we need to remove the base URL and set the next-page variable to the URL’s Relative Path only. Also, we need to add the logic to replace the default &count= with the value of the parameter pagecount.
Data_Pipelines_Tutorial_Ingest_files_into_a_Lakehouse_from_a_REST_API_with_pagin
Description automatically generated with medium confidence" class="wp-image-2527"/>
@replace(replace(activity(‘Get Next Page’).output.ADFWebActivityResponseHeaders[‘Next-Page’], substring(activity(‘Get Next Page’).output.ADFWebActivityResponseHeaders[‘Next-Page’],0,add(indexOf(activity(‘Get Next Page’).output.ADFWebActivityResponseHeaders[‘Next-Page’],’.com’ ),length(‘.com’) )),’’), substring(activity(‘Get Next Page’).output.ADFWebActivityResponseHeaders[‘Next-Page’], lastindexof(activity(‘Get Next Page’).output.ADFWebActivityResponseHeaders[‘Next-Page’],’&count=’),
sub(
length(activity(‘Get Next Page’).output.ADFWebActivityResponseHeaders[‘Next-Page’]),
lastindexof(activity(‘Get Next Page’).output.ADFWebActivityResponseHeaders[‘Next-Page’], ‘&count=’)
)),
concat(‘&count=’,pipeline().parameters.pagecount)
)
The AVEVA Data Hub REST API will no longer respond with the custom header, next-page, when we have reached the end of the requested pull. Currently, using the expression builder, we cannot check if a field exists prior to referencing it and when you try to reference a field that doesn’t exist, the activity will fail. However, this can be used as a signal and we can use the activity dependency: On Failure, to route the pipeline’s flow to a Set Variable activity, setting the next-page variable to an empty string, thus breaking out of the loop.
Data_Pipelines_Tutorial_Ingest_files_into_a_Lakehouse_from_a_REST_API_with_pagin
Description automatically generated" class="wp-image-2528"/>
The Until activity is a sequential iteration process. To facilitate some mechanism of concurrency within this activity, we can leverage the Invoke Pipeline activity with the Wait on Completion option disabled. This allows us to send the longer running processes to another pipeline run while instantly moving on to proceeding activities. Since the Until loop will cycle faster than this child pipeline will finish, we will have multiple child pipelines running Copy activities simultaneously, allowing for faster overall throughput.
Data_Pipelines_Tutorial_Ingest_files_into_a_Lakehouse_from_a_REST_API_with_pagin
Description automatically generated" class="wp-image-2529"/>
Data_Pipelines_Tutorial_Ingest_files_into_a_Lakehouse_from_a_REST_API_with_pagin
Description automatically generated with medium confidence" class="wp-image-2530"/>
The source of the Copy activity is using a Web Connector with the Connection Type of HTTP which allows us to specify the File Format. For the most optimal performance, we can use the File Format of Binary. This also allows various file formats to be specified in the parameter form, allowing us to determine which file format we want at execution.
Data_Pipelines_Tutorial_Ingest_files_into_a_Lakehouse_from_a_REST_API_with_pagin
Description automatically generated" class="wp-image-2531"/>
For the Destination we are leveraging the built in integration with our Workspace to push these files to a Lakehouse Files destination. This will allow for the most optimal data loading while also preventing the need to maintain Schema Mapping which can be done via another downstream process.
Data_Pipelines_Tutorial_Ingest_files_into_a_Lakehouse_from_a_REST_API_with_pagin
In Fabric Data Pipelines, as well as in Azure Data Factory and Azure Synapse Analytics Pipelines, we are unable to reference a variable within the same Set Variable activity where you are setting that variable.
If we try, we will receive the follow error:
The expression contains self referencing variable. A variable cannot reference itself in the expression.
As a workaround, we have to create a secondary variable (temp) and use a Set Variable for this temp variable using the primary variable in the expression. Then following this Set Variable Temp, create another Set Variable activity and set our primary variable to the value of the temp variable.
In our scenario, our primary variable is pagecount, and our secondary temp variable is temppagecount.
At the start of our Data Pipeline, we want to initialize our pagecount variable to 0 because we are passing this variable into the Invoke Pipeline as a parameter.
Data_Pipelines_Tutorial_Ingest_files_into_a_Lakehouse_from_a_REST_API_with_pagin
Description automatically generated" class="wp-image-2533"/>
Inside the Until activity, after the Invoke Pipeline activity, we want to add 1 to the pagecount variable. To do this, we are going to create the Set Variable: Increment Page Count which will add 1 to the current value of the pagecount variable and set this value to the variable, temppagecount.
Data_Pipelines_Tutorial_Ingest_files_into_a_Lakehouse_from_a_REST_API_with_pagin
Description automatically generated" class="wp-image-2534"/>
Finally, we update the current value of pagecount to our newly incremented temppagecount value.
Data_Pipelines_Tutorial_Ingest_files_into_a_Lakehouse_from_a_REST_API_with_pagin
Description automatically generated with medium confidence" class="wp-image-2535"/>
In the child pipeline where we issue the Copy Command, we are also going to create the File Path and File Name used in the Destination by using string manipulation of the relative path.
Relative Path Example:
/api/v1/Tenants/<>/Namespaces/<>/DataViews/Emissions/Data/Interpolated?startIndex=2021-06-23T05:00:00Z&endIndex=2023-06-23T05:00:00Z&interval=00.00:01:00&form=parquet&continuationtoken=<>&count=100000
The File Path Format: Dataview/YYYY/MM/DD
The File Name Format : Dataview-YYYY-MM-DD-StartIndex-EndIndex-PageCount.form
Data_Pipelines_Tutorial_Ingest_files_into_a_Lakehouse_from_a_REST_API_with_pagin
Description automatically generated with medium confidence" class="wp-image-2536"/>
@decodeUriComponent(concat(substring(pipeline().parameters.RelativeURL,
add(indexOf(pipeline().parameters.RelativeURL,'/DataViews/' ), length('/DataViews/')),
sub(indexOf(pipeline().parameters.RelativeURL,'/Data/'), add(indexOf(pipeline().parameters.RelativeURL,'/DataViews/' ),
length('/DataViews/')))),'/', replace(split(substring(pipeline().parameters.RelativeURL,
add(indexOf(pipeline().parameters.RelativeURL,'?startIndex=' ),length('?startIndex=')),
sub(indexOf(pipeline().parameters.RelativeURL,'&endIndex='),
add(indexOf(pipeline().parameters.RelativeURL,'?startIndex=' ),
length('?startIndex=')))), 'T')[0], '-', '/')))
Data_Pipelines_Tutorial_Ingest_files_into_a_Lakehouse_from_a_REST_API_with_pagin
Description automatically generated with medium confidence" class="wp-image-2537"/>
@replace(decodeUriComponent(concat(substring(pipeline().parameters.RelativeURL,
add(indexOf(pipeline().parameters.RelativeURL,'/DataViews/' ),
length('/DataViews/')), sub(indexOf(pipeline().parameters.RelativeURL,'/Data/'),
add(indexOf(pipeline().parameters.RelativeURL,'/DataViews/' ),
length('/DataViews/')))), '-', substring(pipeline().parameters.RelativeURL,
add(indexOf(pipeline().parameters.RelativeURL,'?startIndex=' ),
length('startIndex=')),sub(indexOf(pipeline().parameters.RelativeURL,'&endIndex='),
add(indexOf(pipeline().parameters.RelativeURL,'?startIndex=' ),
length('?startIndex=')))), '-', substring(pipeline().parameters.RelativeURL,
add(indexOf(pipeline().parameters.RelativeURL,'&endIndex=' ),
length('&endIndex=')),
sub(indexOf(pipeline().parameters.RelativeURL,'&interval='),
add(indexOf(pipeline().parameters.RelativeURL,'&endIndex=' ),
length('&endIndex=')))), '-', pipeline().parameters.pagecount, '.parquet')),' ', '-')
After running this pipeline, we will see the parquet files within our Lakehouse Files, given the file path and name that we specified. Now that we have landed these files from the AVEVA Data Hub into the Lakehouse Files, we can leverage Data pipelines, Dataflows Gen2, or Notebooks to manipulate and move these files to either Lakehouse Tables, KQL Database, or the Warehouse.
Data_Pipelines_Tutorial_Ingest_files_into_a_Lakehouse_from_a_REST_API_with_pagin
Description automatically generated with low confidence" class="wp-image-2538"/>
Have any questions or feedback? Leave a comment below!
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.