Skip to main content
cancel
Showing results for 
Search instead for 
Did you mean: 

Register now to learn Fabric in free live sessions led by the best Microsoft experts. From Apr 16 to May 9, in English and Spanish.

Reply
amaaiia
Resolver I
Resolver I

Use Dataflow Gen2 for multiple tables ingest dinamically

I have a Data Pipeline to ingest in my Lakehouse several tables located in an on-premise SQL Server, and the tables are going to be daily reloaded incrementally. They have a date field, and every day I'm going to delete X months of data of each table from Lakehouse. I have a Data Warehouse in Fabric with a table A that collects the list of tables that I have and want to reload, with fields table_name and months_to_reload.

 

Data ingestion has to be done with Dataflow Gen2 because my data is on-premise and it can only be done by DFGen2. My Data Pipeline has a ForEach that iterates over the records in table A. For each record, 2 steps:
1. Remove Lakehouse data for table_name from months_to_reload months ago.
2. Read from source (SQL Server on-premise) the data for table_name from months_to_reload months ago and write it to Lakehouse table_name.

 

My problem is that I can't pass variables to DataFlow Gen2 to tell it to read table table_name with a months_to_reload months filtering.

 

How can I set up a Data Pipeline with dynamic Dataflow Gen2, to which I can pass the name of the table I want to ingest and the date filters? If this cannot be done, I need to create a individual Data Pipeline for each of the tables I want to ingest, and I have 100. There must be a way to dinamically ingest tabla passing table_name as variable to Dataflow Gen2, the flow of data ingestion is always the same, and I can also pass date field names as variables stored in A table.

21 REPLIES 21
Element115
Power Participant
Power Participant

@amaaiia If I understand you correctly, how about having another table in the LH that contains 1 value only, namely the months_to_reload  value? 

So for each delete-update cycle, in the DFg2, you read in this value in its own M query. Your other queries then can access the value. When the main query is done with its processing, you have another query that updates the months_to_reload value and publishes it back to the LH in replace mode.  

 

Would that do the trick or am I missing something?

Hi @Element115 . yes, for months_to_realod it would be ok. In fact, I already have a warehouse table with the list of tables I need to ingest and with a column months_to_reload (table_name, months_to_reload). The problem I still have is that the tables I need to ingest I want them to be dynamic. For now, I have to define then in the dataflow one by one, I want a Foreach activity to iterate through my warehouse with tables list and ingest all the tables I have in the warehouse table.

@amaaiia How about this approach:

 

CREATE OR ALTER VIEW vTABLE_NAMES 
AS 
    SELECT TABLE_NAME 
    FROM INFORMATION_SCHEMA.TABLES
    WHERE TABLE_SCHEMA = 'dbo'
;

 

 

If you run this SQL script from your SQL analytics endpoint, it will create a view in your LH. Then from you DF, all you have to do is create a query that uses this view as its source.  The view return a column of all the tables in the dbo schema of the LH as a table.  Now your M code can reference this table and Bob's your uncle.

Hi @Element115 

So imagine my view has 10 rows (10 tables to ingest from my SQL Server). Once my query (let's say Q1) gets the 10 rows, how can I tell DF to create a new query from each row of the result of Q1? So each new query (QN) gets data from my SQL Server and saves it in a Lakehouse table in append mode?

@amaaiia Before I answer, let me clarify something... 

 

1__do you want to perform an incremental refresh on the tables?  

2__are these tables always going to be the same?  in other words, once you have created your 10 tables, it will also sorry--always be these 10 tables receiving data or, in future, would you have to create 1 or more new tables?  

Hi @Element115 

1. Yes

2. No, more tables can be added

 

Just to sum up, I have a source_tables table in a warehouse with all the necessary information about my tables, as follows:

amaaiia_3-1710484237181.png

 

Then in my Pipeline, firstly I ingest all tables in one DFg2 (have to add them manually one by one, and whenever there is a new table to ingets, I have to update de DFg2 to add it in a new query), an then I iterate over each row from source_tables to ingest data and then do some other operations (notebooks and other activities) to each table one by one. It's like I have to synchronise both DFg2 and source_tables table to have the same tables inside. What happens is that DFg2 is not inside my ForEach ctivity because I can't pass the source table name, the destination info (lakehouse, table_name, conneciton parameters...), etc by parameter, so I only have the rest of the activities inside ForEach (not a good practise I think, because DFg2 and source_tables are working separately)

amaaiia_2-1710483990302.png

I really would like to automate tables ingest, so if I added a new table on my source_tables table, the Pipeline would automatically load it, and DFg2 could check the tables list from source_tables so the queries to ingest data would be created automatically.

 

Ok so here is how I would do it.  I would only use Python (a PySpark notebook) for the entire ETL cycle because, if I understand you correctly, you don't know in advance the total number of tables--one day, there could suddenly be one or more new tables to process in addition to the ones already in the LH, which means you would need new DFs to process these new tables because of how the DF data destination feature works.  
 
Or you could try to do it in one DF but the complexity of the M code required, needing to merge append data from different tables into one working table, that is M query, and persisting another query as a table that would keep track of the offsets of where each data chunk ends (in the working table) for each respective table, and then reading the data out of that one working table using the offsets from the second M query with another PySpark notebook that would finally append this data to its respective table already in the LH, is a headache I wouldn't wish on anyone.
 
Going back to the main issue, for each table you have, you need a corresponding DF because when you publish to a data destination, it is the tabular output of this DF that you want to persist to the LH, and since it is not possible to put 'Choose data destination' in a loop inside of a DF, you need one DF for each incoming table.  That is way easier to do with PySpark IMHO. 
 
Ok so, all ETL done with one PySpark notebook:
 
0__in pipeline, run PySpark notebook
1__if first run ever, populate LH with meta data table, else append new table meta data to meta data table
2__read LH meta data table
3__for each table name, (because of memory considerations, use a staging LH if too much data instead of loading into memory)
if not exist in LH --> load table from on-prem source
if exist in LH --> load incrementally, ie only new data into staging LH
4__transform data accordingly
5__persist data to production LH
6__do additional stuff in pipeline

 

Or did I miss something?

Hi @Element115, thanks for the response. Is it possible to ingest data from SQL Server on-prem through gateway? I dind't tell you but I need data to be accessed thorugh gateway, and I think DFg2 is the only way for the moment.

 

I could

Look what I found!  

Use this capability to make your data flows general-purpose, flexible, and reusable. You can parameterize data flow settings and expressions with these parameters.

I am assuming the Fabric Data Factory is just a rebranding of Azure Data Factory, so it should work the same.

Parameterizing mapping data flows - Azure Data Factory & Azure Synapse | Microsoft Learn

Yes it is possible to ingest data from an on-prem SQL Server with only one DFg2 (but only if the firewalll is configured properly, otherwise you'll 2 DFg2 chained together), which is what we are doing too.  And unless this has changed, from the Microsoft doc, DFg2 is the only way currently to do so, until Microsoft will release the new OPGW (on-prem gateway). ETA is supposed to be sometime in Q1. Then it will be possible to do use a Copy activity inside a pipeline instead of a DFg2, if I remember correctly. In any case, we wait expectantly.

 

But first you have to figure out how to config your firewall.  Also, you need to know how the data actually moves between a DFg2 and the GW. Here is the excellent explanation from @pqian_MSFT :  Re: Dataflow refresh started failing today MashupE... - Page 2 - Microsoft Fabric Community

 

And here is my post explaining about the firewall config: DATAFLOW and ON-PREM DB connectivity solution - Microsoft Fabric Community 

 

but in a nutshell, the firewall should allow you to specify that, for outgoing traffic for protocol TCP, you want to open port 1433, and only for the following destinations (the wildcard FQDNs below); but if that doesn't work, then destination can be set to: 0.0.0.0, which means the outgoing traffic is not restricted to any specific IP address: 

 

Protocol: TCP
Endpoints: *.datawarehouse.pbidedicated.windows.net, *.datawarehouse.fabric.microsoft.com, *.dfs.fabric.microsoft.com
Port: 1433

 

Sorry, maybe I didn't ask the question correctly. Yes, I know that with DFg2 I can do ingests via gateway, in fact, that's how I'm doing it now and I have everything configured to work.

 

My question was, since you tell me to do the ingest with notebooks because otherwise I can't parameterise this process, as the number of tables is variable, can I do the ingest using gateway by notebooks? I think the answer is no, and in that case I understand that the solution you gave me of doing the whole process from the ingestion with notebooks does not work for me.

 

Am I wrong?

So it looks like you'll have to do everything using the M language in a DFg2 after all.  

Yes, but the table ingest is not dynamic. Every time I have new table, I have to add it manuallu to my DFg2 and publish it with the resto of the tables.

Actually, now that I think a little more on this... here is a potential solution.  You need to ingest everything into one staging table using M in a DFg2.  So the output of DFg2 is going to be one table containing the data of all the tables you need to ingest for this cycle. 

 

Prerequisites:  at the source, ie the on-prem DB, create a metadata table to keep track of all the tables that need processing. 

 

Here is the algo:

 

0__load the metadata table

1__iterate over each source table (using the each _ keyword) and extract the data needed as per some incremental timestamp

2__the ingested data from each table is a table and as such it is put into a Power Query table where each row consists of one column holding the source table name, and a second column (of type Table) holding the data just injested during the iterative cycle

3__the output of the M script should be a table of tables, like so:

 

TABLE_NAME DATA_FROM_SOURCE_TABLE_AS_TABLE
TABLE_0 {[c0, c1, ..., cn], [d0, d1, ..., dn]}
TABLE_1 {[c0, c1, ..., cn], [d0, d1, ..., dn]}
... ...
TABLE_n {[c0, c1, ..., cn], [d0, d1, ..., dn]}

 

Your DFg2 has one output as a table. Publish that to a staging table in your LH.  Use a PySpark notebook that now can get access to this data, and have a Python script extract each source table from that one staging table and write the rows out to their corresponding LH production table.

I've heard that in 1-2 weeks on-prem SQL Server data ingestion with gateway will be available for Copy activity. I guess this update will be so helpful for me, because source and destination can me parametrised in Copy activity. I think I'm going to wait until this feature comes up.

@amaaiia I just stumbled upon this page:  Lakehouse management API - Microsoft Fabric | Microsoft Learn  and read it quickly, but if I am not mistaken, you can load data to a lakehouse table using the lakehouse REST API.  

 

So, couldn't you at the source DB export the data you want in the LH as multiple CSV files; put these files on a secure file server;  use either Python locally to upload the CSV files to the LH, or use Power Automate to do the same?

 

Or if T-SQL allows, do all this from a stored procedure connecting to the LH REST API.

Hi @amaaiia 


We haven’t heard from you on the last response and was just checking back to see if you have a resolution yet. In case if you have any resolution please do share that same with the community as it can be helpful to others.
If you have any question relating to the current thread, please do let us know and we will try out best to help you.
In case if you have any other question on a different issue, we request you to open a new thread.


Thanks.

EDIT_0:  actually, perhaps this might not work due to the huge amount of data that has to be passed around. I don't know the limits put on the HTTP Request action of Power Automate. 

------------------------------------------------------------------------------------------------

But I got another idea!  How about creating a Power Automate flow that would ingest the data from the on-prem DB, and then inside the pipeline, you would use a Webhook activity to connect to that Power Automate flow and pass the data to your Notebook and do everything in PySpark?  That way there is not need to change your firewall rules, as Power Automate uses the gateway.

 

Calling Microsoft Flow from your application | Power Automate Blog

Chris Webb's BI Blog: Calling Microsoft Flow From Power Query And Power BI (crossjoin.co.uk)

 

I asked Bing Copilot and got this:

 

Oh! I see what you mean. I just asked Bing Copilot and it says it's possible from a PySpark notebook. But, as one commenter says on the LinkedIn page below, this would mean opening your DB port 1433 to incoming traffic, and if it's a production DB, it is doubtful your sysadmin will agree due to the security risk.  Here' what Copilot said:

 

Here’s the Python import statement for PySpark and JDBC:

from pyspark.sql import SparkSession

This will import the SparkSession module, which is the entry point to any functionality in Spark. You can then use it to create a SparkSession and connect to your database using JDBC. Please replace 'jdbc:your_database' and 'your_table' with your actual database JDBC URL and table name.

spark = SparkSession.builder.getOrCreate()
df = spark.read.format('jdbc').options(
    url='jdbc:your_database',
    dbtable='your_table',
).load()

Remember to also include your driver, user, and password options as needed. Please consult your database documentation for the exact details.

 

Hi @amaaiia 

 

We haven’t heard from you on the last response and was just checking back to see if you have a resolution yet. In case if you have any resolution please do share that same with the community as it can be helpful to others.
Otherwise, will respond back with the more details and we will try to help.


Thanks.

Helpful resources

Announcements
Microsoft Fabric Learn Together

Microsoft Fabric Learn Together

Covering the world! 9:00-10:30 AM Sydney, 4:00-5:30 PM CET (Paris/Berlin), 7:00-8:30 PM Mexico City

March 2024  FBC Gallery Image

Fabric Monthly Update - March 2024

Check out the March 2024 Fabric update to learn about new features.

April Fabric Community Update

Fabric Community Update - April 2024

Find out what's new and trending in the Fabric Community.

Top Kudoed Authors