The ultimate Fabric, Power BI, SQL, and AI community-led learning event. Save €200 with code FABCOMM.
Get registeredEnhance your career with this limited time 50% discount on Fabric and Power BI exams. Ends September 15. Request your voucher.
HI Team,
we have a solution (POC) where we query the Mongo DB to do anlytics in Azure synapse analytics using synapse link ,we are in the process of implementing the similar fucntionality in Microsoft fabric but as we dont have any synapse link in Microsoft fabric what would be the best approach to implement this in fabric.
input:
The data in Azure cosmos for Mongo DB is multi level Json data
The data continuulsy updated or inserted with new records from the system
we need to load and flatten the json data and store it in tables to write th queries for analysis.
Note : we dont have mirroring for Azure cosmos DB for Mongo
can some one provide optimized solution to implement in microsoft fabric.
Solved! Go to Solution.
Hello @sathyasai ,
Use notebook to make this simple , as multiple Dataflow will be needed for different table write. Here is the sample code which you can use, this will work for any number of levels and will write to different tables for each level.
Please mark the question as solved and give kudos if this is helpful.
json_path = "path_to_json_file"
df = spark.read.option("multiline", "true").json(json_path)
# Define a function to flatten and write each level
def flatten_and_write(df, level, parent_field=None):
"""
Flattens a specific level of the DataFrame and writes it to a Delta table.
Args:
df (DataFrame): Input DataFrame to process.
level (int): Current hierarchy level.
parent_field (str): Field to explode or flatten.
Returns:
DataFrame: Flattened DataFrame for the next level.
"""
if parent_field:
# Explode or select nested fields
df = df.select(
*[col(c) for c in df.columns if c != parent_field], # Keep all other columns
explode(col(parent_field)).alias(f"level{level}_item") # Explode nested field
)
# Write the current level to a table
table_name = f"table_name_level{level}" # Name tables dynamically based on level
df.write.format("delta").mode("overwrite").saveAsTable(table_name)
return df
#Loop through levels and flatten iteratively
current_df = df # Start with the original DataFrame
nested_fields = ["nested_array", "nested_item.deep_nested_field"] # Replace with actual nested fields
for level, field in enumerate(nested_fields, start=1):
current_df = flatten_and_write(current_df, level, parent_field=field)
Hello @sathyasai ,
Thank you for reaching out to the Microsoft fabric cummunity forum regarding your query about flattening multi-level JSON data from Azure Cosmos DB for Mongo in Microsoft Fabric.
The suggestions provided by @nilendraFabric, including the use of Spark notebooks and pipelines for continuous ingestion, were aimed at helping you implement the required functionality effectively.
Could you please let us know if the proposed solution resolved your issue? If so, please mark the helpful reply and accept it as the solution. This will be helpful for other community members who have similar problems to solve it faster.
Thankyou.
Hello @sathyasai ,
Use notebook to make this simple , as multiple Dataflow will be needed for different table write. Here is the sample code which you can use, this will work for any number of levels and will write to different tables for each level.
Please mark the question as solved and give kudos if this is helpful.
json_path = "path_to_json_file"
df = spark.read.option("multiline", "true").json(json_path)
# Define a function to flatten and write each level
def flatten_and_write(df, level, parent_field=None):
"""
Flattens a specific level of the DataFrame and writes it to a Delta table.
Args:
df (DataFrame): Input DataFrame to process.
level (int): Current hierarchy level.
parent_field (str): Field to explode or flatten.
Returns:
DataFrame: Flattened DataFrame for the next level.
"""
if parent_field:
# Explode or select nested fields
df = df.select(
*[col(c) for c in df.columns if c != parent_field], # Keep all other columns
explode(col(parent_field)).alias(f"level{level}_item") # Explode nested field
)
# Write the current level to a table
table_name = f"table_name_level{level}" # Name tables dynamically based on level
df.write.format("delta").mode("overwrite").saveAsTable(table_name)
return df
#Loop through levels and flatten iteratively
current_df = df # Start with the original DataFrame
nested_fields = ["nested_array", "nested_item.deep_nested_field"] # Replace with actual nested fields
for level, field in enumerate(nested_fields, start=1):
current_df = flatten_and_write(current_df, level, parent_field=field)
Thanks you so much for the input, I have few question regarding flattening the data.
My json has 10 to 12 levels of hirearchy and they need to stored in different tables, can you guide me the better way to flatten the data into multiple levels and store them accordingly to the respective tables in Dataware house.
I tried using dataflow gen2 I can flatten the data into multiple levels but when storing the data we can point to single table.
If i use dataflowgen2 should i use multiple dataflowgen2 to flatten at different levels and store them to the respective tables.
what would be the optimizes solution.
Hi @sathyasai ,You can implement continuous ingestion and flattening of multi-level JSON data stored in Azure Cosmos DB for Mongo via pipelines or Spark notebooks, then store the flattened results in a Fabric lakehouse or warehouse for analytics.Create a pipeline or use a notebook to pull data from the Cosmos DB for Mongo API at scheduled intervals. Although Fabric doesn’t currently offer mirroring or Synapse Link equivalents for Mongo, you can still connect to the database using the Mongo integration in a pipeline or with Spark code to query the source.
If you need further help please let me know , we have done the same thing in one of the project.