Earn a 50% discount on the DP-600 certification exam by completing the Fabric 30 Days to Learn It challenge.
I have a Data Pipeline to ingest in my Lakehouse several tables located in an on-premise SQL Server, and the tables are going to be daily reloaded incrementally. They have a date field, and every day I'm going to delete X months of data of each table from Lakehouse. I have a Data Warehouse in Fabric with a table A that collects the list of tables that I have and want to reload, with fields table_name and months_to_reload.
Data ingestion has to be done with Dataflow Gen2 because my data is on-premise and it can only be done by DFGen2. My Data Pipeline has a ForEach that iterates over the records in table A. For each record, 2 steps:
1. Remove Lakehouse data for table_name from months_to_reload months ago.
2. Read from source (SQL Server on-premise) the data for table_name from months_to_reload months ago and write it to Lakehouse table_name.
My problem is that I can't pass variables to DataFlow Gen2 to tell it to read table table_name with a months_to_reload months filtering.
How can I set up a Data Pipeline with dynamic Dataflow Gen2, to which I can pass the name of the table I want to ingest and the date filters? If this cannot be done, I need to create a individual Data Pipeline for each of the tables I want to ingest, and I have 100. There must be a way to dinamically ingest tabla passing table_name as variable to Dataflow Gen2, the flow of data ingestion is always the same, and I can also pass date field names as variables stored in A table.
@amaaiia If I understand you correctly, how about having another table in the LH that contains 1 value only, namely the months_to_reload value?
So for each delete-update cycle, in the DFg2, you read in this value in its own M query. Your other queries then can access the value. When the main query is done with its processing, you have another query that updates the months_to_reload value and publishes it back to the LH in replace mode.
Would that do the trick or am I missing something?
Hi @Element115 . yes, for months_to_realod it would be ok. In fact, I already have a warehouse table with the list of tables I need to ingest and with a column months_to_reload (table_name, months_to_reload). The problem I still have is that the tables I need to ingest I want them to be dynamic. For now, I have to define then in the dataflow one by one, I want a Foreach activity to iterate through my warehouse with tables list and ingest all the tables I have in the warehouse table.
@amaaiia How about this approach:
CREATE OR ALTER VIEW vTABLE_NAMES
AS
SELECT TABLE_NAME
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = 'dbo'
;
If you run this SQL script from your SQL analytics endpoint, it will create a view in your LH. Then from you DF, all you have to do is create a query that uses this view as its source. The view return a column of all the tables in the dbo schema of the LH as a table. Now your M code can reference this table and Bob's your uncle.
Hi @Element115
So imagine my view has 10 rows (10 tables to ingest from my SQL Server). Once my query (let's say Q1) gets the 10 rows, how can I tell DF to create a new query from each row of the result of Q1? So each new query (QN) gets data from my SQL Server and saves it in a Lakehouse table in append mode?
@amaaiia Before I answer, let me clarify something...
1__do you want to perform an incremental refresh on the tables?
2__are these tables always going to be the same? in other words, once you have created your 10 tables, it will also sorry--always be these 10 tables receiving data or, in future, would you have to create 1 or more new tables?
Hi @Element115
1. Yes
2. No, more tables can be added
Just to sum up, I have a source_tables table in a warehouse with all the necessary information about my tables, as follows:
Then in my Pipeline, firstly I ingest all tables in one DFg2 (have to add them manually one by one, and whenever there is a new table to ingets, I have to update de DFg2 to add it in a new query), an then I iterate over each row from source_tables to ingest data and then do some other operations (notebooks and other activities) to each table one by one. It's like I have to synchronise both DFg2 and source_tables table to have the same tables inside. What happens is that DFg2 is not inside my ForEach ctivity because I can't pass the source table name, the destination info (lakehouse, table_name, conneciton parameters...), etc by parameter, so I only have the rest of the activities inside ForEach (not a good practise I think, because DFg2 and source_tables are working separately)
I really would like to automate tables ingest, so if I added a new table on my source_tables table, the Pipeline would automatically load it, and DFg2 could check the tables list from source_tables so the queries to ingest data would be created automatically.
Or did I miss something?
Hi @Element115, thanks for the response. Is it possible to ingest data from SQL Server on-prem through gateway? I dind't tell you but I need data to be accessed thorugh gateway, and I think DFg2 is the only way for the moment.
I could
Look what I found!
Use this capability to make your data flows general-purpose, flexible, and reusable. You can parameterize data flow settings and expressions with these parameters.
I am assuming the Fabric Data Factory is just a rebranding of Azure Data Factory, so it should work the same.
Parameterizing mapping data flows - Azure Data Factory & Azure Synapse | Microsoft Learn
Yes it is possible to ingest data from an on-prem SQL Server with only one DFg2 (but only if the firewalll is configured properly, otherwise you'll 2 DFg2 chained together), which is what we are doing too. And unless this has changed, from the Microsoft doc, DFg2 is the only way currently to do so, until Microsoft will release the new OPGW (on-prem gateway). ETA is supposed to be sometime in Q1. Then it will be possible to do use a Copy activity inside a pipeline instead of a DFg2, if I remember correctly. In any case, we wait expectantly.
But first you have to figure out how to config your firewall. Also, you need to know how the data actually moves between a DFg2 and the GW. Here is the excellent explanation from @pqian_MSFT : Re: Dataflow refresh started failing today MashupE... - Page 2 - Microsoft Fabric Community
And here is my post explaining about the firewall config: DATAFLOW and ON-PREM DB connectivity solution - Microsoft Fabric Community
but in a nutshell, the firewall should allow you to specify that, for outgoing traffic for protocol TCP, you want to open port 1433, and only for the following destinations (the wildcard FQDNs below); but if that doesn't work, then destination can be set to: 0.0.0.0, which means the outgoing traffic is not restricted to any specific IP address:
Protocol: TCP
Endpoints: *.datawarehouse.pbidedicated.windows.net, *.datawarehouse.fabric.microsoft.com, *.dfs.fabric.microsoft.com
Port: 1433
Sorry, maybe I didn't ask the question correctly. Yes, I know that with DFg2 I can do ingests via gateway, in fact, that's how I'm doing it now and I have everything configured to work.
My question was, since you tell me to do the ingest with notebooks because otherwise I can't parameterise this process, as the number of tables is variable, can I do the ingest using gateway by notebooks? I think the answer is no, and in that case I understand that the solution you gave me of doing the whole process from the ingestion with notebooks does not work for me.
Am I wrong?
So it looks like you'll have to do everything using the M language in a DFg2 after all.
Yes, but the table ingest is not dynamic. Every time I have new table, I have to add it manuallu to my DFg2 and publish it with the resto of the tables.
Actually, now that I think a little more on this... here is a potential solution. You need to ingest everything into one staging table using M in a DFg2. So the output of DFg2 is going to be one table containing the data of all the tables you need to ingest for this cycle.
Prerequisites: at the source, ie the on-prem DB, create a metadata table to keep track of all the tables that need processing.
Here is the algo:
0__load the metadata table
1__iterate over each source table (using the each _ keyword) and extract the data needed as per some incremental timestamp
2__the ingested data from each table is a table and as such it is put into a Power Query table where each row consists of one column holding the source table name, and a second column (of type Table) holding the data just injested during the iterative cycle
3__the output of the M script should be a table of tables, like so:
TABLE_NAME | DATA_FROM_SOURCE_TABLE_AS_TABLE | |
TABLE_0 | {[c0, c1, ..., cn], [d0, d1, ..., dn]} | |
TABLE_1 | {[c0, c1, ..., cn], [d0, d1, ..., dn]} | |
... | ... | |
TABLE_n | {[c0, c1, ..., cn], [d0, d1, ..., dn]} |
Your DFg2 has one output as a table. Publish that to a staging table in your LH. Use a PySpark notebook that now can get access to this data, and have a Python script extract each source table from that one staging table and write the rows out to their corresponding LH production table.
I've heard that in 1-2 weeks on-prem SQL Server data ingestion with gateway will be available for Copy activity. I guess this update will be so helpful for me, because source and destination can me parametrised in Copy activity. I think I'm going to wait until this feature comes up.
@amaaiia I just stumbled upon this page: Lakehouse management API - Microsoft Fabric | Microsoft Learn and read it quickly, but if I am not mistaken, you can load data to a lakehouse table using the lakehouse REST API.
So, couldn't you at the source DB export the data you want in the LH as multiple CSV files; put these files on a secure file server; use either Python locally to upload the CSV files to the LH, or use Power Automate to do the same?
Or if T-SQL allows, do all this from a stored procedure connecting to the LH REST API.
Hi @amaaiia
We haven’t heard from you on the last response and was just checking back to see if you have a resolution yet. In case if you have any resolution please do share that same with the community as it can be helpful to others.
If you have any question relating to the current thread, please do let us know and we will try out best to help you.
In case if you have any other question on a different issue, we request you to open a new thread.
Thanks.
EDIT_0: actually, perhaps this might not work due to the huge amount of data that has to be passed around. I don't know the limits put on the HTTP Request action of Power Automate.
------------------------------------------------------------------------------------------------
But I got another idea! How about creating a Power Automate flow that would ingest the data from the on-prem DB, and then inside the pipeline, you would use a Webhook activity to connect to that Power Automate flow and pass the data to your Notebook and do everything in PySpark? That way there is not need to change your firewall rules, as Power Automate uses the gateway.
Calling Microsoft Flow from your application | Power Automate Blog
Chris Webb's BI Blog: Calling Microsoft Flow From Power Query And Power BI (crossjoin.co.uk)
I asked Bing Copilot and got this:
You can use a Fabric pipeline webhook activity to call an endpoint and wait for it to complete2. This webhook can be configured to receive data from a Power Automate flow3.
The Power Automate flow can be set up to get data from an on-premises database1. However, you need to ensure that the necessary ports are open for remote access and securely manage ...1.
Remember, the exact steps may vary based on your specific environment and database. Always ensure you’re following best practices for security and compliance in your organization.
Oh! I see what you mean. I just asked Bing Copilot and it says it's possible from a PySpark notebook. But, as one commenter says on the LinkedIn page below, this would mean opening your DB port 1433 to incoming traffic, and if it's a production DB, it is doubtful your sysadmin will agree due to the security risk. Here' what Copilot said:
Yes, it is possible to connect to an on-premises database from a Fabric PySpark Notebook1. You would typically use JDBC to establish the connection and PySpark to read the data2. However, you should ensure that the necessary ports are open for remote access1 and securely manage your connection details1. Please note that the exact steps may vary based on your specific environment and database.
Here’s the Python import statement for PySpark and JDBC:
from pyspark.sql import SparkSession
This will import the SparkSession module, which is the entry point to any functionality in Spark. You can then use it to create a SparkSession and connect to your database using JDBC. Please replace 'jdbc:your_database' and 'your_table' with your actual database JDBC URL and table name.
spark = SparkSession.builder.getOrCreate() df = spark.read.format('jdbc').options( url='jdbc:your_database', dbtable='your_table', ).load()
Remember to also include your driver, user, and password options as needed. Please consult your database documentation for the exact details.
If the on-premises database is behind a firewall, you would need to ensure that the necessary ports ...1. This is essential for data transfer and communication from external sources1.
In some cases, you might need to set up a secure or DMZ subnet within your network, which is separat...2. You would then create an additional route in the custom route table to 0.0.0.0/0 and set the Next ho...2.
Remember, the exact steps may vary based on your specific environment and firewall settings. Always ensure you’re following best practices for security and compliance in your organization.
Hi @amaaiia
We haven’t heard from you on the last response and was just checking back to see if you have a resolution yet. In case if you have any resolution please do share that same with the community as it can be helpful to others.
Otherwise, will respond back with the more details and we will try to help.
Thanks.
Check out the April 2024 Fabric update to learn about new features.
Covering the world! 9:00-10:30 AM Sydney, 4:00-5:30 PM CET (Paris/Berlin), 7:00-8:30 PM Mexico City
User | Count |
---|---|
9 | |
7 | |
7 | |
7 | |
4 |