Check your eligibility for this 50% exam voucher offer and join us for free live learning sessions to get prepared for Exam DP-700.
Get StartedJoin us at the 2025 Microsoft Fabric Community Conference. March 31 - April 2, Las Vegas, Nevada. Use code FABINSIDER for $400 discount. Register now
Hi, I have a need to ingest a ton of Power BI audit logs into Fabric. The eventual target is a Lakehouse table or tables. I have one log file per day (as csv), about 500 altogether. These files need extensive reshaping - things like pivoting attribute:value pairs into columns, parsing the text, etc. There's also reasonably large - about 40 or 50 million total lines, and maybe 4 or 5 million distinct audit log entries (see image below of a single entry)
I was going to do this using a Gen2 dataflow, but I can't figure out how to get a Gen2 dataflow to ingest a folder of files. Works perfectly with our on-prem network file shares, but in Fabric the Gen2 dataflow seems to want only one file at a time, no way to combine things under a Lakehouse Files section.
I'm a complete newbie on notebooks and really barely understand any Python (working to rectify that, going to take some time).
Here's a sample log file entry:
Things I've tried:
1. Using some of the sample code in the "learn fabric" paths to combine the files and publish to a Delta lakehouse table. This doesn't really work - if set VORDER on it screws up the order of lines in the audit log, making it impossible to associate lines with the proper audit log ID. If I turn VORDER off, querying the resulting table returns "Failed to read parquet file because the column segment for column '_c0' is too large"
2. I've tried using code like the following to append the 500 individual csv files into a single file. But it doesn't work - it seems instead to create a folder that has a ton of separate files in it instead of a single large file
Any ideas or guidance would be greatly appreciated - thanks!
Scott
I read a lot of files data that are fed into OneLake via API calls to external systems. Even though my data source are .json files; the code below will work for .csv files as well.
#Imports and Includes to use different functions down the line
from pyspark.sql.functions import *
from pyspark.sql.window import *
from pyspark.sql.types import DateType
from pyspark.sql.functions import to_date,to_timestamp,sequence
from delta.tables import *
##Define a Parameter Cell in the Notebook to be able to dynamically pass a file struct
_year = "9999"
_month = "5"
_day = "2"
##Generating the path to my files dynamically
##Instead of *.json you can use *.csv as this will look for any file that ends .csv for the path specific.
_root = "Files/XCM/TaskSignOff"
_path = "/" + _year + "/" + "/" + _month + "/" + _day + "/*.json"
_full = _root + _path
##If your CSV file has a defined schema, I recommend defining the schema in your Notebook so you can control schema drift. I am calling out the expect elements/headers that will be in the files that I am reading. By setting the schema, if the vendor adds a new field; we do not necessarily bring it in right away.
_schema = StructType(
[
StructField("taskId",StringType()),
StructField("signOffId",StringType()),
StructField("comments",StringType()),
StructField("requiredComments",StringType()),
StructField("requiredDate",StringType()),
StructField("requiredName",StringType()),
StructField("signOffBy",StringType()),
StructField("signOffComments",StringType()),
StructField("signOffDate",StringType()),
StructField("signOffRequired",StringType()),
StructField("signOffName",StringType()),
StructField("statusName",StringType()),
StructField("updatedBy",StringType()),
StructField("updatedOn",StringType())
]
)
##Actual Dataframe ingestion
##Change format form 'json' to 'csv'
##I also append the source path & file name to my column set
_df = spark.read.format('json').schema(_schema).load(_full).select("*","_metadata.file_name","_metadata.file_path")
##Display the results of the read (first 1000 rows)
display(_df)
##Write to Delta Table
##Write Initial Task List To Delta
_df.write.mode("append").format("delta").save("Tables/<Table_Name>")
When you try to write to a file from spark, all of the different work threads have part of the data. So each worker writes their piece of the pie which results in a bunch of files with guids and other oddities for names. In order to write as a single file you would have to call the collect() statement and that would then result in all data getting combined in 1 single worker and then you could write the data. It is a very memory intensive operation.
Hope this helps out a little bit.
@Scott_Powell It sounds like you have quite the challenge on your hands, Scott!😉
For reading in the files, Spark has ability to read multiple csv files into a single DataFrame is definitely good way to get started , However i highly suggest adding a unique ID field to each row is a smart idea to preserve order later on. ( The error you mentioned sounds more like a corrupted parquet structure error at first glance)
Since you mentioned extensive data reshaping is needed, we can work together on the transformations - whether it's parsing text fields, pivoting, or anything else. Feel free to connect and share some example rows and we can prototype the logic.
Finally i would suggest instead of output as CSV try outputing the input csv in to Delta format in a Lakehouse architecture makes sense for your further workloads
The main thought is to maintain data integrity - both in order and values - as i suggest you move from raw CSVs to a transformed Delta table.
March 31 - April 2, 2025, in Las Vegas, Nevada. Use code MSCUST for a $150 discount!
Check out the February 2025 Fabric update to learn about new features.