Skip to main content
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Data Days is here! Join us now for 60+ days of learning, challenges, and connection. Learn more

arunvi

AI Ready Apps: build RAG Data pipeline from Azure Blob Storage to SQL Database in Microsoft Fabric within minutes

Microsoft Fabric is a unified, secure, and user-friendly data platform equipped with features necessary for developing enterprise-grade applications with minimal or no coding required. Last year, the platform was enhanced by introducing SQL Database in Fabric, facilitating AI application development within Microsoft Fabric. In a previous blog post, we discussed how to build a chatbot using SQL Database in Fabric. This blog post will provide step-by-step instructions for creating a Retrieval Augmented Generation (RAG) pipeline to prepare your data for AI integration using SQL Database in Microsoft Fabric.

For this blog post, we will consider a use case involving Contoso Recruiting Agency, which aims to make more sense of the stored candidates' resumes by creating a searchable database of candidates to match specific skill requirements.

AI_Ready_Apps_build_RAG_Data_pipeline_from_Azure_Blob_Storage_to_SQL_Database_inAI_Ready_Apps_build_RAG_Data_pipeline_from_Azure_Blob_Storage_to_SQL_Database_in

AI-generated content may be incorrect." />

RAG pipeline activity workflow

Each recruiter uploads the resume (in PDF format) to their designated folder in Azure Blob storage. Whenever a file is uploaded, the RAG pipeline is triggered. The pipeline processes the file by extracting the text content, chunking the text, redacting any Personally Identifiable Information (PII), generating embeddings, and finally storing the embeddings in a SQL database within Fabric as a vector store. These stored embeddings can subsequently be utilized to develop AI applications such as smart search, recommendation engines, chatbots, and other tools.

Prerequisite

This blog post requires users to bring their own key (BYOK) for AI services, which also means creating these services outside of the Microsoft Fabric platform.

Dataset

Considering the file formats supported by the Document Intelligence Service we utilize the PDF files from Resume Dataset from Kaggle.

Steps

  1. Create a workspace named โ€œIntelligentAppโ€.
  2. Create a Lakehouse named โ€œblob_filestorageโ€.
  3. Create SQL Database in Fabric named โ€œdatamartโ€.
  4. Navigate to the workspace โ€œIntelligentAppโ€, click Import - Notebook - From this computer and then import the notebook โ€œcp_azblob_lakehouse.ipynbโ€ from the cloned repositoryโ€™s notebook folder.
AI_Ready_Apps_build_RAG_Data_pipeline_from_Azure_Blob_Storage_to_SQL_Database_inAI_Ready_Apps_build_RAG_Data_pipeline_from_Azure_Blob_Storage_to_SQL_Database_in

Import a Notebook

5. Attach the Lakehouse "blob_filestorage" to "cp_azblob_lakehouse" Notebook.
    • Open the notebook, on the explorer click 'Add sources'.
    • Select, 'Existing data sources'.
    • Select, โ€œblob_filestorageโ€ from OneLake catalog and then click 'Connect'.
AI_Ready_Apps_build_RAG_Data_pipeline_from_Azure_Blob_Storage_to_SQL_Database_inAI_Ready_Apps_build_RAG_Data_pipeline_from_Azure_Blob_Storage_to_SQL_Database_in

Attach a Lakehouse to the Notebook

6. Create User Data Function,
    • Navigate to the โ€œIntelligentAppโ€ workspace and click "+New item"
    • Search for โ€œFunctionโ€ and Select โ€œUser data function(s)โ€
    • Provide the name "file_processor".
    • Click "New function".
    • Add Lakehouse and SQL Database as managed connection(s).
      • On the Functions Explorer screen click "Manage Connections" and then select "+Add Data connection".
      • From the โ€œOneLake catalogโ€ select datamart (SQL database) and then click โ€œConnectโ€.
      • Repeat the previous step to add blob_filestorage (Lakehouse) as a managed connection.
    • Click "Library management", add the following dependencies (Click โ€œ+ Add from PyPIโ€ to add the dependencies). The dependencies are also listed in โ€œ/functions/requirements.txtโ€ file of the cloned repository. Ensure you are using fabric-user-data-functions version 0.2.28rc0 or higher.
AI_Ready_Apps_build_RAG_Data_pipeline_from_Azure_Blob_Storage_to_SQL_Database_inAI_Ready_Apps_build_RAG_Data_pipeline_from_Azure_Blob_Storage_to_SQL_Database_in

AI-generated content may be incorrect." />

Add User Data Function dependencies

    • In the function editor, replace existing content with the contents of "function\function_app.py" from the cloned repository.
    • Click โ€œPublishโ€ (on the top right) to deploy the function. After the functions are deployed, click โ€œRefreshโ€.
7. Create a Data pipeline by navigating to the workspace and then clicking on โ€œ+ New                    Itemโ€
    • Search and select โ€œData pipelineโ€.
    • Provide the name โ€œblob_ingest_pipelineโ€.
8. Create a data pipeline storage trigger by clicking โ€œAdd trigger (preview)โ€ button and                    provide the following configuration:
    • Source: Select โ€œAzure Blob Storage eventsโ€
    • Storage account: โ€œConnect to existing Azure Blob Storage accountโ€
    • Subscription: Select your Azure subscription
    • Azure Blob Storage account: Select the blob storage account under your subscription.
    • Eventstream name: blob_ingest_stream
AI_Ready_Apps_build_RAG_Data_pipeline_from_Azure_Blob_Storage_to_SQL_Database_inAI_Ready_Apps_build_RAG_Data_pipeline_from_Azure_Blob_Storage_to_SQL_Database_inAI-generated content may be incorrect." />

Create Event stream

Click โ€œNextโ€, to configure the event type and source
  • Event Type(s): Select only the Microsoft.Storage.BlobCreated event. This will ensure that an event is generated each time a new blob object is uploaded.
Click "Next" to review the configuration. Then, click "Connect" to connect to the blob storage. A successful connection will be indicated by the status "Successful". Finally, click "Save".

On the Set alert screen, under Save location, configure the following settings.

  • Select, โ€œCreate a new itemโ€.
  • New item name: blob_activator.
Click โ€œCreateโ€ to create and save the alert.

AI_Ready_Apps_build_RAG_Data_pipeline_from_Azure_Blob_Storage_to_SQL_Database_inAI_Ready_Apps_build_RAG_Data_pipeline_from_Azure_Blob_Storage_to_SQL_Database_in

AI-generated content may be incorrect." />

Create Fabric Activator

Now that we have setup the stream, it's time to define the โ€œblob_ingest_pipelineโ€.

Pipeline definition

Pipeline can be defined in two ways as outlined below,

Import template

Templates offer a quick way to begin building data pipelines. Importing a template brings in all required activities for orchestrating a pipeline.

To import a template:

  • Navigate to the Home menu of the data pipeline.
  • Click, โ€œUse a templateโ€
  • From the โ€œPipeline templatesโ€ page click โ€œImport templateโ€
  • Import the file โ€œtemplate/ AI-Develop RAG pipeline using SQL database in Fabric.zipโ€ from the cloned repository.
AI_Ready_Apps_build_RAG_Data_pipeline_from_Azure_Blob_Storage_to_SQL_Database_inAI_Ready_Apps_build_RAG_Data_pipeline_from_Azure_Blob_Storage_to_SQL_Database_in

AI-generated content may be incorrect." />

Import Data pipeline from template

The imported data pipeline is preloaded with all necessary activities, variables, and connectors required for end-to-end orchestration.
Important!!!
If you are using a template, there is no need to manually add a variable or an activity. Instead, you can proceed directly to configuring values for the variables and each activity parameter in the pipeline, as detailed in the Blank Canvas section.

Blank Canvas

1. Establish pipeline variables: Click on the pipeline canvas, select the โ€œVariablesโ€ menu, and then click โ€œ+ Newโ€ to add and configure values for the following variables:

AI_Ready_Apps_build_RAG_Data_pipeline_from_Azure_Blob_Storage_to_SQL_Database_inAI_Ready_Apps_build_RAG_Data_pipeline_from_Azure_Blob_Storage_to_SQL_Database_in

AI-generated content may be incorrect." />

Configure pipeline variables

Name

Type

Value

Comment

fileName String @pipeline()?.TriggerEvent?.FileName
container String @pipeline()?.TriggerEvent?.FolderPath
source String @pipeline()?.TriggerEvent?.Source
cognitiveServiceEndpoint String <<YOUR-MULTI-SERVICE-ACCOUNT-ENDPOINT>> Replace<< YOUR-MULTI-SERVICE-ACCOUNT-ENDPOINT>> with the cognitive Service Endpoint.
For example,
https://myservices.cognitiveservices.azure.com/
apiKey String <<YOUR-MULTI-SERVICE-ACCOUNT-APIKEY>> Replace <<YOUR-MULTI-SERVICE-ACCOUNT-APIKEY with the apikey of your multi-service account
openAIEndpoint String <<YOUR-OPENAI-SERVICE-ENDPOINT >> Replace <<YOUR-OPENAI-SERVICE-ENDPOINT>> with the endpoint of your Azure OpenAI Account.
For example,
https://myopenaiservice.openai.azure.com/
openAIKey String <<YOUR-OPENAI-APIKEY>> Replace <<YOUR-OPENAI-APIKE>> with the apikey of your Azure OpenAI Account
embeddingModel String text-embedding-3-small
recepientEmailAddress String <<to-email-address>> recipients email address
senderEmailAddress String <<from-email-address>> sender's email address
AI_Ready_Apps_build_RAG_Data_pipeline_from_Azure_Blob_Storage_to_SQL_Database_inAI_Ready_Apps_build_RAG_Data_pipeline_from_Azure_Blob_Storage_to_SQL_Database_inAI-generated content may be incorrect." width="1576" height="903" />

Get multi-services account endpoint

2. Add a "Notebook" activity. The notebook associated with this activity utilizes NotebookUtils to manage file system. During the execution of the notebook, a folder corresponding to the container name will be created if it does not exist. Subsequently, the file will be copied from Azure Blob Storage to the Lakehouse folder. Configure this activity as outlined below:
    • General tab,
      • Name: azureblob_to_lakehouse
    • Settings tab,
      • Notebook: cp_azblob_lakehouse
               Base parameters

Click โ€œ+Newโ€ to add the following parameters,

      • Name: fileName
        • Type: String
        • Value:
          @variables('fileName')
      • Name: container
        • Type: String
        • Value:
          @variables('container')
      •  Name: source
        • Type: String
        • Value:
          @variables('source')
Use On Success connector of the activity to link to the subsequent function (Extract Text) activity.

3. Add a โ€œFunctions activityโ€. The function โ€œextract_textโ€ associated with this activity uses Azure AI Document Intelligence service to extract the "text" content from the file copied into the Lakehouse by the previous activity. Configure this activity as outlined below:

    • General tab,
      • Name: Extract Text
    • Settings tab,
      • Type: Fabric user data functions
      • Connection: Sign-in (if not already) using your workspace credentials.
      • Workspace: IntelligentApp (default selected)
      • User data functions: file_processor
      • Function: extract_text
               Parameters,
        • Name: filePath
          • Type: str
          • Value:
            @activity('azureblob_to_lakehouse').output.result.exitValue
        • Name: cognitiveServicesEndpoint
          • Type: str
          • Value:
            @variables('cognitiveServiceEndpoint')
        • Name: apiKey
          • Type: str
          • Value:
            @variables('apiKey')
Use On Completion connector of the activity to link to the subsequent If Condition (Text Extraction Results) activity.

4. Add an โ€œIf Conditionsโ€ activity to verify the success of the text extraction in the previous step. If the extraction was unsuccessful, an email would be sent to the configured recipient, and the pipeline would be terminated. Configure this activity as outlined below,

    • General tab,
      • Name: Text Extraction Results
    • Activities tab,
      • Expression:
        @empty(activity('Extract Text').error)
      • Case: False, edit the โ€œfalseโ€ condition using the edit (pencil) icon, and add the following activities,
โ€œOffice 365 Outlookโ€ activity: To send alert emails.
    • General Tab,
      • Name: Text Extraction Failure Email Alert
    • Settings Tab,
      • Signed in as: Sign- in (if not already) using the credentials as those of your workspace.
      • To:
        @variables('recepientEmailAddress')
      • Subject: Text Extraction Error
      • Body:
        <p>@{replace(string(activity('Extract Text').error.message), '\','')}</p>
 Advanced,
      • From:
        @variables('senderEmailAddress')
      • Importance: High
Use On Success connector of the activity to link to the subsequent Fail activity.

โ€œFailโ€ activity: To terminate the pipeline

    • General tab,
      • Name: Text Extraction Process Failure
    • Settings tab,
      • Fail message:
        @{replace(string(activity('Extract Text').error), '\','')}
      • Error code:
        @{activity('Extract Text').statuscode}
Return to the main canvas by clicking the pipeline name (blob_ingest_pipeline) and use the On Success connector of the โ€œIf Conditionโ€ activity to link to the subsequent Function (generate chunks) activity.

5. Add a โ€œFunctionโ€ activity. The function โ€œchunk_textโ€ associated with this activity uses the tiktoken tokenizer to โ€œgenerate chunksโ€ for the text extracted by the previous activity. Configure this activity as outlined below:

    • General tab,
      • Name: Generate Chunks
    • Settings tab,
      • Type: Fabric user data functions
      • Connection: If not already, sign in using the credentials that have complete access to your workspace.
      • Workspace: IntelligentApp (default selected)
      • User data functions: file_processor
      • Function: chunk_text
               Parameters,
        • Name: text
          • Type: str
          • Value:
            @activity('Extract Text').output.output
        • Name: maxToken
          • Type: int
          • Value:
            500
        • Name: encoding
          • Type: str
          • Value:
            cl100k_base
Use On Success connector of the activity to link to the subsequent Function (Redact PII Data) activity.

6. Add a โ€œFunctionโ€ activity. The function โ€œredact_textโ€ associated with this activity uses the Azure AI Language service to โ€œRedact PII Dataโ€ for the chunks generated by the preceding activity. The chunking of text is done prior to redaction to comply with the service limits requirements for the PII detection feature. Configure this activity as outlined below:

  • General tab,
      • Name: Redact PII Data
  • Settings tab,
      • Type: Fabric user data functions
      • Connection: Sign-in (if not already) using the credentials that have complete access to your workspace.
      • Workspace: IntelligentApp (default selected)
      • User data functions: file_processor
      • Function: redact_text
               Parameters,
        • Name: text
          • Type: list
          • Value:
            @activity('Generate Chunks').output.output
        • Name: cognitiveServicesEndpoint
          • Type: str
          • Value:
            @variables('cognitiveServiceEndpoint')
        • Name: apiKey
          • Type: str
          • Value:
            @variables('apiKey')
AI_Ready_Apps_build_RAG_Data_pipeline_from_Azure_Blob_Storage_to_SQL_Database_inAI_Ready_Apps_build_RAG_Data_pipeline_from_Azure_Blob_Storage_to_SQL_Database_in

AI-generated content may be incorrect." />

Configure a User Defined Function

Use On Completion connector of the activity to link to the subsequent If Condition (PII Redaction Results) activity.

7. Add an โ€œIf Conditionsโ€ activity to verify the success of the PII redaction in the previous step. If the redaction was unsuccessful, an email would be sent to the configured recipient, and the pipeline would be terminated. Configure this activity as outlined below,

    • General tab,
      • Name: PII Reaction Results
    • Activities tab,
      • Expression:
        @empty(activity('Redact PII Data').error)
      • Case: False, edit the โ€œfalseโ€ condition using the pencil icon, and add the following activities:
โ€œOffice 365 Outlookโ€ activity: To send alert emails.
    • General tab,
      • Name: Redaction Failure Email Alert
    • Settings tab,
      • Signed in as: Sign-in (if not already) using the credentials as those of your workspace.
      • To:
        @variables('recepientEmailAddress')
      • Subject: Text Redaction Error
      • Body:
        <p>@{replace(string(activity('Redact PII Data').error.message), '\','')} </p>
              Advanced,
      • From:
        @variables('senderEmailAddress')
      • Importance: High
Use On Success connector of the activity to link to the subsequent Fail activity.

         โ€œFailโ€ activity: To terminate the pipeline

    • General tab,
      • Name: Text Redaction Process Failure
    • Settings tab,
      • Fail message:
        @{replace(string(activity('Redact PII Data').error), '\','')}
      • Error code:
        @{activity('Redact PII Data').statuscode}
Return to the main canvas and use the On Success connector of the โ€œIf Conditionโ€ activity to link to the subsequent Function (Generate Embeddings) activity.

8. Add a โ€œFunctionโ€ activity. The function โ€œgenerate_embeddingsโ€ associated with this activity uses Azure Open AI Service embedding model to convert the redacted chunks into embeddings. Configure this activity as outlined below:

    • General tab,
      • Name: Generate Embeddings
    • Settings tab,
      • Type: Fabric user data functions
      • Connection: Sign-in (if not already) using the credentials that have complete access to your workspace.
      • Workspace: IntelligentApp (default selected)
      • User data functions: file_processor
      • Function: generate_embeddings

               Parameters,

      • Name: text
        • Type: list
        • Value:
          @activity('Redact PII Data').output.output
      • Name: openAIServiceEndpoint
        • Type: str
        • Value:
          @variables('openAIEndpoint')
      • Name: embeddingModel
        • Type: str
        • Value:
          @variables('embeddingModel')
      • Name: openAIKey
        • Type: str
        • Value:
          @variables('openAIKey')
      • Name: fileName
        • Type: str
        • Value:
          @variables('fileName')
Use On Completion connector of the activity to link to the subsequent If Condition (Generate Embeddings Results) activity.

9. Add an โ€œIf Conditionsโ€ activity to verify the success of the Generate Embeddings in the previous step. If the embeddings generation were unsuccessful, an email would be sent to the configured recipient, and the pipeline would be terminated. Configure this activity as outlined below:

    • General tab,
      • Name: Generate Embeddings Results
    • Activities tab,
      • Expression:
        @empty(activity('Generate Embeddings').error)
      • Case: False, edit the โ€œfalseโ€ condition using the pencil icon, and add the following activities:
โ€œOffice 365 Outlookโ€ activity: To send alert emails.
    • General tab,
      • Name: Generate Embeddings Failure Email Alert
    • Settings tab,
      • Signed in as: Sign-in (if not already) using the credentials as those of your workspace.
      • To: @variables('recepientEmailAddress')
      • Subject: Generate Embeddings Error
      • Body:
        <p>@{replace(string(activity('Generate Embeddings').error.message)</p>
          Advanced,
      • From:
        @variables('senderEmailAddress')
      • Importance: High
Use On Success connector of the activity to link to the subsequent Fail activity.

โ€œFailโ€ activity: To terminate the pipeline

    • General tab,
      • Name: Generate Embeddings Processing Failure
    • Settings tab,
      • Fail message:
        @{replace(string(activity('Generate Embeddings').error), '\','')}
      • Error code:
        @{activity('Generate Embeddings').statuscode}
Return to the main canvas and use On Success connector of the โ€œIf Conditionโ€ activity to link to the subsequent Function (Create Database Objects) activity.

10. Add a โ€œFunctionโ€ activity. The function โ€œcreate_tableโ€ associated with this activity executes a SQL command to create a documents table within the previously created โ€œdatamartโ€, SQL database. Configure this activity as outlined below:

    • General tab,
      • Name: Create Database Objects
    • Settings tab,
      • Type: Fabric user data functions
      • Connection: Sign-in (if not already) using the credentials that have complete access to your workspace.
      • Workspace: IntelligentApp (default selected)
      • User data functions: file_processor
      • Function: create_table
Use On Success connector of the activity to link to the subsequent Function (Save Data) activity.

11. Add a โ€œFunctionโ€ activity. The function โ€œinsert_dataโ€ associated with this activity executes a SQL command to bulk insert rows in the documents table created in the previous activity. Configure this activity as outlined below:

    • General tab,
      • Name: Save Data
    • Settings tab,
      • Type: Fabric user data functions
      • Connection: Sign-in (if not already) using the credentials that have complete access to your workspace.
      • Workspace: IntelligentApp (default selected)
      • User data functions: file_processor
      • Function: insert_data
          Parameters,
        • Name: data
          • Type: list
          • Value:
            @activity('Generate Embeddings').output.output

Troubleshooting

  • When adding a Python library from PyPI to User Data Functions, you might notice an error, such as a wiggly line under the library name (e.g., "azure-ai-textanalytics"), like a spelling mistake. Users should ensure the library name is spelled correctly and then ignore the error by tabbing out to the Version dropdown and selecting the correct version, this transient error should resolve itself.
  • The imported pipeline reportedly doesn't seem to preload with the parameter values. For each activity in the pipeline, ensure that the parameter values are provided and correct. Refer to the Blank Canvas section for the required parameters and their values.

Execute pipeline (pipeline in action)

Itโ€™s now time to put everything we have done so far into perspective and see the pipeline in action.
  • Upload a PDF file,
    • Use the Azure Storage Explorer or alternatively Azure Portal and create a Blob container named โ€œresumeโ€.
    • Upload a PDF file from the Kaggle dataset.
AI_Ready_Apps_build_RAG_Data_pipeline_from_Azure_Blob_Storage_to_SQL_Database_inAI_Ready_Apps_build_RAG_Data_pipeline_from_Azure_Blob_Storage_to_SQL_Database_in

AI-generated content may be incorrect." />

Create a container and upload a PDF file

  • Review pipeline execution
    • From the pipelineโ€™s โ€œRunโ€ menu, select โ€œView run historyโ€ and select the recent pipeline run.
    • In the details view, check to see if the status is โ€œSucceededโ€.
    • In case of a Failure, try to โ€œRerunโ€ the pipeline using the rerun option.
AI_Ready_Apps_build_RAG_Data_pipeline_from_Azure_Blob_Storage_to_SQL_Database_inAI_Ready_Apps_build_RAG_Data_pipeline_from_Azure_Blob_Storage_to_SQL_Database_inAI-generated content may be incorrect." />

Pipeline activity execution

  • Review Lakehouse
    • A folder with the same name as that of the container (resume) is created.
    • The PDF file is copied from Azure Blob Storage to the Lakehouse files.
AI_Ready_Apps_build_RAG_Data_pipeline_from_Azure_Blob_Storage_to_SQL_Database_inAI_Ready_Apps_build_RAG_Data_pipeline_from_Azure_Blob_Storage_to_SQL_Database_inAI-generated content may be incorrect." />

PDF file copied to Lakehouse

  • Review database
    • The document table should be automatically created by the pipeline.
    • Redacted chunk data and the embeddings stored in the documents table.
AI_Ready_Apps_build_RAG_Data_pipeline_from_Azure_Blob_Storage_to_SQL_Database_inAI_Ready_Apps_build_RAG_Data_pipeline_from_Azure_Blob_Storage_to_SQL_Database_inAI-generated content may be incorrect." />

Save data in SQL Database in Fabric

Conclusion

We hope this blog post has provided useful insights for preparing your data for AI applications. Your feedback is important, and we look forward to seeing how you use this information to develop innovative AI solutions.

Would also like to express our sincere gratitude to Aaron Saidi for the invaluable contribution towards reviewing this blog post. Thank you, Aaron, for your support.

Submit your feedback on Fabric Ideas and join the conversation on the Fabric Community