The ultimate Microsoft Fabric, Power BI, Azure AI, and SQL learning event: Join us in Stockholm, September 24-27, 2024.
Save €200 with code MSCUST on top of early bird pricing!
Find everything you need to get certified on Fabric—skills challenges, live sessions, exam prep, role guidance, and more. Get started
Hi, I have a need to ingest a ton of Power BI audit logs into Fabric. The eventual target is a Lakehouse table or tables. I have one log file per day (as csv), about 500 altogether. These files need extensive reshaping - things like pivoting attribute:value pairs into columns, parsing the text, etc. There's also reasonably large - about 40 or 50 million total lines, and maybe 4 or 5 million distinct audit log entries (see image below of a single entry)
I was going to do this using a Gen2 dataflow, but I can't figure out how to get a Gen2 dataflow to ingest a folder of files. Works perfectly with our on-prem network file shares, but in Fabric the Gen2 dataflow seems to want only one file at a time, no way to combine things under a Lakehouse Files section.
I'm a complete newbie on notebooks and really barely understand any Python (working to rectify that, going to take some time).
Here's a sample log file entry:
Things I've tried:
1. Using some of the sample code in the "learn fabric" paths to combine the files and publish to a Delta lakehouse table. This doesn't really work - if set VORDER on it screws up the order of lines in the audit log, making it impossible to associate lines with the proper audit log ID. If I turn VORDER off, querying the resulting table returns "Failed to read parquet file because the column segment for column '_c0' is too large"
2. I've tried using code like the following to append the 500 individual csv files into a single file. But it doesn't work - it seems instead to create a folder that has a ton of separate files in it instead of a single large file
Any ideas or guidance would be greatly appreciated - thanks!
Scott
I read a lot of files data that are fed into OneLake via API calls to external systems. Even though my data source are .json files; the code below will work for .csv files as well.
#Imports and Includes to use different functions down the line
from pyspark.sql.functions import *
from pyspark.sql.window import *
from pyspark.sql.types import DateType
from pyspark.sql.functions import to_date,to_timestamp,sequence
from delta.tables import *
##Define a Parameter Cell in the Notebook to be able to dynamically pass a file struct
_year = "9999"
_month = "5"
_day = "2"
##Generating the path to my files dynamically
##Instead of *.json you can use *.csv as this will look for any file that ends .csv for the path specific.
_root = "Files/XCM/TaskSignOff"
_path = "/" + _year + "/" + "/" + _month + "/" + _day + "/*.json"
_full = _root + _path
##If your CSV file has a defined schema, I recommend defining the schema in your Notebook so you can control schema drift. I am calling out the expect elements/headers that will be in the files that I am reading. By setting the schema, if the vendor adds a new field; we do not necessarily bring it in right away.
_schema = StructType(
[
StructField("taskId",StringType()),
StructField("signOffId",StringType()),
StructField("comments",StringType()),
StructField("requiredComments",StringType()),
StructField("requiredDate",StringType()),
StructField("requiredName",StringType()),
StructField("signOffBy",StringType()),
StructField("signOffComments",StringType()),
StructField("signOffDate",StringType()),
StructField("signOffRequired",StringType()),
StructField("signOffName",StringType()),
StructField("statusName",StringType()),
StructField("updatedBy",StringType()),
StructField("updatedOn",StringType())
]
)
##Actual Dataframe ingestion
##Change format form 'json' to 'csv'
##I also append the source path & file name to my column set
_df = spark.read.format('json').schema(_schema).load(_full).select("*","_metadata.file_name","_metadata.file_path")
##Display the results of the read (first 1000 rows)
display(_df)
##Write to Delta Table
##Write Initial Task List To Delta
_df.write.mode("append").format("delta").save("Tables/<Table_Name>")
When you try to write to a file from spark, all of the different work threads have part of the data. So each worker writes their piece of the pie which results in a bunch of files with guids and other oddities for names. In order to write as a single file you would have to call the collect() statement and that would then result in all data getting combined in 1 single worker and then you could write the data. It is a very memory intensive operation.
Hope this helps out a little bit.
@Scott_Powell It sounds like you have quite the challenge on your hands, Scott!😉
For reading in the files, Spark has ability to read multiple csv files into a single DataFrame is definitely good way to get started , However i highly suggest adding a unique ID field to each row is a smart idea to preserve order later on. ( The error you mentioned sounds more like a corrupted parquet structure error at first glance)
Since you mentioned extensive data reshaping is needed, we can work together on the transformations - whether it's parsing text fields, pivoting, or anything else. Feel free to connect and share some example rows and we can prototype the logic.
Finally i would suggest instead of output as CSV try outputing the input csv in to Delta format in a Lakehouse architecture makes sense for your further workloads
The main thought is to maintain data integrity - both in order and values - as i suggest you move from raw CSVs to a transformed Delta table.
Join the community in Stockholm for expert Microsoft Fabric learning including a very exciting keynote from Arun Ulag, Corporate Vice President, Azure Data.
Get the latest Fabric updates from Build 2024, key Skills Challenge voucher deadlines, top blogs, forum posts, and product ideas.