Skip to main content
cancel
Showing results for 
Search instead for 
Did you mean: 

Find everything you need to get certified on Fabric—skills challenges, live sessions, exam prep, role guidance, and more. Get started

Reply
Scott_Powell
Advocate III
Advocate III

Help with notebooks / multiple .csv files for complete newbie

Hi, I have a need to ingest a ton of Power BI audit logs into Fabric. The eventual target is a Lakehouse table or tables. I have one log file per day (as csv), about 500 altogether. These files need extensive reshaping - things like pivoting attribute:value pairs into columns, parsing the text, etc. There's also reasonably large - about 40 or 50 million total lines, and maybe 4 or 5 million distinct audit log entries (see image below of a single entry)

 

I was going to do this using a Gen2 dataflow, but I can't figure out how to get a Gen2 dataflow to ingest a folder of files. Works perfectly with our on-prem network file shares, but in Fabric the Gen2 dataflow seems to want only one file at a time, no way to combine things under a Lakehouse Files section.

 

I'm a complete newbie on notebooks and really barely understand any Python (working to rectify that, going to take some time).

 

Here's a sample log file entry:

Scott_Powell_0-1694730360054.png

 

 

Things I've tried:

1. Using some of the sample code in the "learn fabric" paths to combine the files and publish to a Delta lakehouse table. This doesn't really work - if set VORDER on it screws up the order of lines in the audit log, making it impossible to associate lines with the proper audit log ID. If I turn VORDER off, querying the resulting table returns "Failed to read parquet file because the column segment for column '_c0' is too large"

 

2. I've tried using code like the following to append the 500 individual csv files into a single file. But it doesn't work - it seems instead to create a folder that has a ton of separate files in it instead of a single large file

 

df = spark.read.format("csv").load('Files/Power BI audit logs/Audit log*.csv')
df.write.mode("overwrite").format("csv").save("Files/singleFile")

 

Any ideas or guidance would be greatly appreciated - thanks!

Scott

 

 

2 REPLIES 2
jwinchell40
Resolver I
Resolver I

@Scott_Powell -

 

I read a lot of files data that are fed into OneLake via API calls to external systems.  Even though my data source are .json files; the code below will work for .csv files as well. 

 

#Imports and Includes to use different functions down the line
from pyspark.sql.functions import *
from pyspark.sql.window import *
from pyspark.sql.types import DateType
from pyspark.sql.functions import to_date,to_timestamp,sequence
from delta.tables import *

##Define a Parameter Cell in the Notebook to be able to dynamically pass a file struct
_year = "9999"
_month = "5"
_day = "2"

##Generating the path to my files dynamically
##Instead of *.json you can use *.csv as this will look for any file that ends .csv for the path specific.

_root = "Files/XCM/TaskSignOff"
_path = "/" + _year + "/" + "/" + _month +  "/" + _day + "/*.json"
_full = _root + _path

##If your CSV file has a defined schema, I recommend defining the schema in your Notebook so you can control schema drift.  I am calling out the expect elements/headers that will be in the files that I am reading.  By setting the schema, if the vendor adds a new field; we do not necessarily bring it in right away.

_schema = StructType(
    [
        StructField("taskId",StringType()),
        StructField("signOffId",StringType()),
        StructField("comments",StringType()),
        StructField("requiredComments",StringType()),
        StructField("requiredDate",StringType()),
        StructField("requiredName",StringType()),
        StructField("signOffBy",StringType()),
        StructField("signOffComments",StringType()),
        StructField("signOffDate",StringType()),
        StructField("signOffRequired",StringType()),
        StructField("signOffName",StringType()),
        StructField("statusName",StringType()),
        StructField("updatedBy",StringType()),
        StructField("updatedOn",StringType())
    ]
)

##Actual Dataframe ingestion
##Change format form 'json' to 'csv'
##I also append the source path & file name to my column set
_df = spark.read.format('json').schema(_schema).load(_full).select("*","_metadata.file_name","_metadata.file_path")

##Display the results of the read (first 1000 rows)
display(_df)

##Write to Delta Table
##Write Initial Task List To Delta
_df.write.mode("append").format("delta").save("Tables/<Table_Name>")

 

 When you try to write to a file from spark, all of the different work threads have part of the data.  So each worker writes their piece of the pie which results in a bunch of files with guids and other oddities for names.  In order to write as a single file you would have to call the collect() statement and that would then result in all data getting combined in 1 single worker and then you could write the data.  It is a very memory intensive operation.

 

Hope this helps out a little bit.

puneetvijwani
Resolver IV
Resolver IV

@Scott_Powell It sounds like you have quite the challenge on your hands, Scott!😉

For reading in the files, Spark has ability to read multiple csv files into a single DataFrame is definitely good way to get started , However i highly suggest adding a unique ID field to each row is a smart idea to preserve order later on. ( The error you mentioned sounds more like a corrupted parquet structure error at first glance)

Since you mentioned extensive data reshaping is needed, we can work together on the transformations - whether it's parsing text fields, pivoting, or anything else. Feel free to connect and share some example rows and we can prototype the logic.

 

Finally i would suggest instead of output as  CSV try outputing the input csv in to Delta format in a Lakehouse architecture makes sense for your further workloads
The main thought is to maintain data integrity - both in order and values - as i suggest you move from raw CSVs to a transformed Delta table. 




Helpful resources

Announcements
Europe Fabric Conference

Europe’s largest Microsoft Fabric Community Conference

Join the community in Stockholm for expert Microsoft Fabric learning including a very exciting keynote from Arun Ulag, Corporate Vice President, Azure Data.

PBI_Carousel_NL_June

Fabric Community Update - June 2024

Get the latest Fabric updates from Build 2024, key Skills Challenge voucher deadlines, top blogs, forum posts, and product ideas.

MayFBCUpdateCarousel

Fabric Monthly Update - May 2024

Check out the May 2024 Fabric update to learn about new features.

Top Solution Authors
Top Kudoed Authors