Skip to main content
cancel
Showing results for 
Search instead for 
Did you mean: 

Enhance your career with this limited time 50% discount on Fabric and Power BI exams. Ends August 31st. Request your voucher.

Reply
spartan27244
Resolver I
Resolver I

Pass File list to notebook from copy data in a pipeline

I have a copy data activity that copies files that come in via sftp each day to my onelake. I need to be able move those files to another folder once the copy is complete. Since there is no activity for that I have a PySpark notebook that will take paramters (list of files moved, and folder to move to) and perform the operation. My problem is that I cannot figure out how to get the list of files from the copy data activity to pass to the notebook, and if that output exists then what does it look like, how do I get it, parse it in my notebook? co-pilot suggested pass dynamic activity of "@activity('Copy data1').output.fileList" however it cannot provide details and I cannot find any official documentation. I have found a post that suggested that it's not possible. 

1 ACCEPTED SOLUTION
spartan27244
Resolver I
Resolver I

was able to accomplish my task using a notebook, since the Copy Data activity does not pass a list of files anywhere.

 

transport = paramiko.Transport(ftpserver, 22)
transport.connect(username=ftpuserid, password=ftppwd)
sftp = paramiko.SFTPClient.from_transport(transport)

fileList = sftp.listdir(FromFolder)

for file in fileList:
    if fnmatch.fnmatch(file, Mask):
        # read file to memory becuase ftp library has no access to lake house
        with sftp.open(FromFolder + "/" + file, "r") as remote_file:
            file_data = remote_file.read()   # 1MB chunks
            str_data = file_data.decode("latin-1")
            notebookutils.fs.put(ToFolder + "/" + file, str_data, True)            
       
        #let's ensure the Arc Folder exists on the ftp server
        try:
            print(sftp.listdir(ArcFolder))
        except IOError:
            sftp.mkdir(ArcFolder)
       
        sftp.rename(FromFolder + "/" + file, ArcFolder + "/" + file)
       
   
sftp.close()
transport.close()

View solution in original post

14 REPLIES 14
spartan27244
Resolver I
Resolver I

was able to accomplish my task using a notebook, since the Copy Data activity does not pass a list of files anywhere.

 

transport = paramiko.Transport(ftpserver, 22)
transport.connect(username=ftpuserid, password=ftppwd)
sftp = paramiko.SFTPClient.from_transport(transport)

fileList = sftp.listdir(FromFolder)

for file in fileList:
    if fnmatch.fnmatch(file, Mask):
        # read file to memory becuase ftp library has no access to lake house
        with sftp.open(FromFolder + "/" + file, "r") as remote_file:
            file_data = remote_file.read()   # 1MB chunks
            str_data = file_data.decode("latin-1")
            notebookutils.fs.put(ToFolder + "/" + file, str_data, True)            
       
        #let's ensure the Arc Folder exists on the ftp server
        try:
            print(sftp.listdir(ArcFolder))
        except IOError:
            sftp.mkdir(ArcFolder)
       
        sftp.rename(FromFolder + "/" + file, ArcFolder + "/" + file)
       
   
sftp.close()
transport.close()

Hi @spartan27244 ,

Thanks for the update glad to hear you were able to accomplish the task using a notebook-based approach. If this is the solution that has worked for you please accept your reply as solution so as to help other community members who may face similar issue in the future.

Best Regards, 
Menaka.
Community Support Team

OldDogNewTricks
Advocate IV
Advocate IV

I understand.  Go ahead and provide Kudos and mark as the answer if you feel that is the right thing to do.

Hi @spartan27244 

Thanks for reaching out to the Microsoft fabric community forum. 

 

I would also take a moment to thank @OldDogNewTricks , for actively participating in the community forum and for the solutions you’ve been sharing in the community forum. Your contributions make a real difference. 
If this post helps then please mark it as a solution, so that other members find it more quickly.

Best Regards, 
Menaka.
Community Support Team  

 

OldDogNewTricks
Advocate IV
Advocate IV

I just noticed my replies appear out of order because of the default sort on replies.  You will probably want to sort by "Oldest to Newest" to have the replies in the correct order.  Sorry, there was a character limit on replies, hence the multiple replies.

Feel free to kudos all of them if they are helpful 😉

Thanks for all of the code, I already am using most of it, just have bosses that don't know fabric but like to see pipelines with nice blocks of predefined activities. Sometimes they just don't listen and you have to prove to them "it can't be done that way". 

OldDogNewTricks
Advocate IV
Advocate IV

This function will archive folders on the SFTP folder itself:
def sftp_archive_files(
        creds: dict
        ,src_path: str
        ,files: list
        ,archive_path: str
        ) -> Tuple [int, int, str]:
    """
    Connect to SFTP server and remove previously processed files using list of file names.

    Parameters:
        creds                      = Dictionary object that contains the endpoint, username, and password for this sftp site
        src_path                   = Path on sftp to search
        files                      = List of files (from previous handling function)
        archive_path               = Path to move file to on SFTP server
   
    Return:
        files_touched              = Number of files touched during the process
        files_moved                = Number of files moved on the SFTP server
        message                    = Message summarizing the action(s) taken
   
    Example usage:
        file_touched, files_deleted, msg = sftp_archive_files(sftp_creds, '/ven-directdivision', ['File1.txt','File2.txt'], '/ven-directdivision/archive')
    """

    # Connect to the SFTP server using paramiko
    transport = paramiko.Transport((creds['endpoint'],creds['port']))
    transport.connect(username=creds['username'], password=creds['password'])

    files_touched = 0
    files_moved = 0

    with paramiko.SFTPClient.from_transport(transport) as sftp:
        sftp.chdir(src_path)
       
        for file in files:
            files_touched += 1

            file_name = file
            file_path = f"{src_path}/{file_name}"

            archive_full_path = f"{archive_path}/{file_name}"
            print(f'Moving: {file_path} to {archive_full_path}')
           
            sftp.rename(file_path, archive_full_path)

            files_moved += 1

    transport.close()

    msg = f'Files touched: {files_touched}, Files moved: {files_moved}'
   
    return(files_touched, files_moved, msg)

    with paramiko.SFTPClient.from_transport(transport) as sftp:
        sftp.chdir(src_path)
        files = sftp.listdir_attr()

        for file_attr in files:
            files_touched += 1

            file_name = file_attr.filename
           
            # Check if the file matches the mask and date conditions
            if filename_mask in file_name:
                file_path = f"{src_path}/{file_name}"
                print(f'Downloading file: {file_path}...')

                # download file into memory, put file into dataframe, append dataframe to list of dataframes
                df = sftp_download_file(sftp, file_path, src_file_delim, src_file_encoding, src_file_head_row, transport)
                df['FileName'] = file_name
                lst_df.append(df)
                lst_files.append(file_name)
               
                files_downloaded += 1
               
       
        # Combine all DataFrames into a single result DataFrame
        if lst_df:
            df_result = pd.concat(lst_df, ignore_index=True)
            print("All files successfully combined into a single DataFrame.")
        else:
            df_result = pd.DataFrame()
            print("No files matched the conditions.")

    transport.close()
    return(files_touched, files_downloaded, df_result, lst_files)

 

Hopefully this helps!  We are using these functions to ingest hundreds of files daily.

OldDogNewTricks
Advocate IV
Advocate IV

This function downloads all files in a folder location that meet the specified file mask:
def sftp_get_files_all(
        creds: dict
        ,src_path: str
        ,filename_mask: str
        ,src_file_delim: Optional[str] = None
        ,src_file_encoding: Optional[str] = None
        ,src_file_head_row: Optional[int] = 1
        ) -> Tuple [int, int, pd.DataFrame, list]:
    """
    Connect to SFTP server, download all files in folder that match the file mask, combine into dataframe using paramiko and upload to lakehouse.

    Parameter:
        creds                      = Dictionary object that contains the endpoint, username, and password for this sftp site
        src_path                   = Path on sftp to search
        filename_mask              = Format of filename
        src_file_delim (opt)       = Delimiter to use, default is comma
        src_file_encoding (opt)    = Source file encoding, default is utf-8
        src_file_head_row (opt)    = Source file header, default is 0 or first row contains headers

    Return:
        files_touched               = number of files touched during the process
        files_downloaded            = number of files downloaded during the process
        df_result                   = Pandas dataframe that combines all files
        lst_files                   = list of files that met the criteria, for downstream processing

    Example usage:
        files_touched, files_downloaded, df_result = sftp_get_files_all(
            creds = sftp_creds
            ,src_path = sftp_src_path
            ,filename_mask = 'file_name_sample'
            ,src_file_delim = '|'
            ,src_file_encoding = 'utf-8'
            ,src_file_head_row = 1
        )
    """

    # Check for default values
    if src_file_delim is None:
        src_file_delim = ','                # csv default
   
    if src_file_encoding is None:
        src_file_encoding = 'utf-8'         # utf-8 default

    if src_file_head_row is not None:
        src_file_head_row -= 1              # subtracting 1, argument for underlying function uses 0 to indicate first row, None = no headers

    # Connect to the SFTP server using paramiko
    transport = paramiko.Transport((creds['endpoint'],creds['port']))
    transport.connect(username=creds['username'], password=creds['password'])

    files_touched = 0
    files_downloaded = 0
    lst_df = []
    lst_files = []

OldDogNewTricks
Advocate IV
Advocate IV

This function downloads a single file and puts it into a Pandas dataframe:
def sftp_download_file(
    sftp
    ,file_path: str
    ,src_file_delim: str
    ,src_file_encoding: str
    ,src_file_head_row: int
    ,trans_object
    ) -> pd.DataFrame:
    """
    Download a file from the SFTP server to Microsoft Fabric Lakehouse.

    @sftp                   = Paramiko SFTP object
    @src_file_delim         = Delimiter for source file
    @src_file_encoding      = Encoding for source file
    @src_file_head_row      = Header row for the source file
    @trans_object           = Paramiko transport object, for closing connection upon error
   
    Returns:        
    """

    with sftp.open(file_path, 'rb') as f:
        data = f.read()
        try:
            df_file_data = pd.read_csv(BytesIO(data), encoding = src_file_encoding, sep = src_file_delim, header = src_file_head_row)  # Read into a temporary DataFrame
            return df_file_data
        except Exception as e:
            print(f'\n---Error in file download with file: {file_path}.  Exception: {e}---\n')
            trans_object.close()            # if there is an error, close the connection
            return pd.DataFrame()

I would like to write the file to my lake house exactly as it was read with no interpetation as to delmited etc.

Hi @spartan27244 ,

Thanks for reaching out to the Microsoft fabric community forum.


If your goal is to preserve the file exactly as it was received  without interpreting delimiters, encodings, or structure  you can simply treat it as a binary file and write it directly to your Lakehouse storage. This avoids any transformation, parsing, or data loss. The file will remain byte-for-byte identical to what was on the SFTP server.

If this post was helpful, please give us Kudos and consider marking Accept as solution to assist other members in finding it more easily.
Best Regards, 
Menaka.
Community Support Team  



OldDogNewTricks
Advocate IV
Advocate IV

I store the endpoint (including port) in KeyVault along with creds.  This function returns the relevant details in a dictionary based on how I store them in KeyVault:


import notebookutils as nbu

from typing import Optional, Tuple, Any, List, Union, Dict
import paramiko
import pandas as pd
from io import BytesIO


def get_ftp_creds (prefix: str)-> dict:
    """
    Function that takes a prefix and returns the 4 necessary datapoints from key vault (endpoint - which stores host and port, username, password).
    Returns a dictionary with the endpoint, username, password, and port values

    Paramaters:
        prefix          String that identifies the resources to get values for

    Returns:
        dict            Dictionary object with relevant sftp connection values
   
    Example usage:
        ftp_data = get_ftp_creds('ftp-com-ech')
       
        endpoint = ftp_data['endpoint']
        username = ftp_data['username']
        password = ftp_data['password']
        port = ftp_data['port']          
    """
   
    host, port = nbu.credentials.getSecret('https://YOUR_KV_ENDPOINT_HERE/', f'{prefix}-endpoint').split(":")

    ftp_data = {}
    ftp_data['endpoint'] = host
    ftp_data['username'] = nbu.credentials.getSecret('YOUR_KV_ENDPOINT_HERE/', f'{prefix}-user')
    ftp_data['password'] = nbu.credentials.getSecret('YOUR_KV_ENDPOINT_HERE/', f'{prefix}-pass')
    ftp_data['port'] = int(port)
   
    return(ftp_data)

OldDogNewTricks
Advocate IV
Advocate IV

I will admit that I am not a pipeline expert, but from what I can tell; there is no ability to either 1) archive as an option within the copy activity or 2) get the list of files out of the copy activity.

Are you using an on-premise gateway to access said FTP/SFTP?
 
If you are NOT using a on-premise gateway, I have a solution for you using pure notebooks.  If you are using an on-premise gateway, I unfortunately do not have a solution for you because pipelines are the only thing in MS Fabric that can use those gateway connections.

I am not using a gateway. I can do it completly with notebooks to do the ftp connection etc. This is really the reason I have never used data factory and pipelines in the past. Just not enough flexability. Now saddled with the same thing in Fabric. Oh well.

Helpful resources

Announcements
July 2025 community update carousel

Fabric Community Update - July 2025

Find out what's new and trending in the Fabric community.

June FBC25 Carousel

Fabric Monthly Update - June 2025

Check out the June 2025 Fabric update to learn about new features.