Skip to main content
cancel
Showing results for 
Search instead for 
Did you mean: 

Preparing for a certification exam? Ask exam experts all your questions on May 15th. Register now.

Reply
revathi_np
Regular Visitor

Structured Streaming for multiple queries are not picking the next batch after processing the first

I m streaming multiple queries in a single spark job where it takes parquet files from source folder and then upsert its to delta table in lakehouse in the foreachbatch function .It worked fine and all streams are active and running without any error .But its not picking up the next batch even if new files get added to source.What could be the reason for this.

 

 

for row in control_df.collect():
                table = os.path.basename(row.TargetFiles)
                source_path = f"{self.data_lake_container}/{row.SourceFiles}"
                schema= self.infer_schema_from_file(source_path,"parquet")
                # print(schema)
                df_read = spark.readStream \
                    .schema(schema) \
                    .format("parquet")\
                    .option("recursiveFileLookup","true")\
                    .load(source_path)
                    # Write streaming data to destination
                query = df_read.writeStream.foreachBatch(self.streamwriter).trigger(processingTime="60                                 seconds").queryName(table).option("checkpointLocation",f"{self.data_lake_container}/{self.checkpoint_path}/{table}").start()
            for q in spark.streams.active:
                stream_count+=1
                print(f"ID: {q.id},\nName: {q.name},\nStatus: {q.status},\nLast Progress: {q.lastProgress}\n\n")
            print(stream_count)
            self.logger.info(f"Created streams for {stream_count} tables in {self.target_lakehouse} lakehouse")
            query.awaitTermination()
2 REPLIES 2
Anonymous
Not applicable

Hi @revathi_np ,

 

Is my follow-up just to ask if the problem has been solved?

 

If so, can you accept the correct answer as a solution or share your solution to help other members find it faster?

 

Thank you very much for your cooperation!

 

Best Regards,
Yang
Community Support Team

 

If there is any post helps, then please consider Accept it as the solution  to help the other members find it more quickly.
If I misunderstand your needs or you still have problems on it, please feel free to let us know. Thanks a lot!

Anonymous
Not applicable

Hi @revathi_np ,

 

I have the following two suggestions:

 

In your code I observe that the processingTime trigger is set to 60 seconds. If new files are added after the trigger interval, they may not be captured immediately. Please try increasing the time interval.

 

Make sure that new files added to the source directory have unique names and are not overwritten. If the file name is reused, Spark may not detect the change.

 

If none of this solves your problem, as an alternative, you can recreate a spark job that adds all files (including the new ones) to the source at once.

 

Best Regards,
Yang
Community Support Team

 

If there is any post helps, then please consider Accept it as the solution  to help the other members find it more quickly.
If I misunderstand your needs or you still have problems on it, please feel free to let us know. Thanks a lot!

Helpful resources

Announcements
FBCApril_Carousel

Fabric Monthly Update - April 2025

Check out the April 2025 Fabric update to learn about new features.

Notebook Gallery Carousel1

NEW! Community Notebooks Gallery

Explore and share Fabric Notebooks to boost Power BI insights in the new community notebooks gallery.

April2025 Carousel

Fabric Community Update - April 2025

Find out what's new and trending in the Fabric community.