Building a Multimodal Data Processing Pipeline with Kafka, Airflow, and SageMaker

Build a multimodal data processing pipeline using Apache Kafka, Apache Airflow, and Amazon SageMaker. This pipeline will handle various file types (image, video, audio, text, and documents) in parallel, process them through custom ML tasks, and store the results in a database.
Building a Multimodal Data Processing Pipeline with Kafka, Airflow, and SageMaker

In this article, we'll explore how to build a robust multimodal data processing pipeline similar to Mixpeek using Apache Kafka, Apache Airflow, and Amazon SageMaker. This pipeline will handle various file types (image, video, audio, text, and documents) in parallel, process them through custom ML tasks, and store the results in a database.

Architecture Overview

Let's start with a high-level overview of our architecture:

graph TD S3[Amazon S3] --> Kafka[Apache Kafka] Kafka --> Airflow[Apache Airflow] Airflow --> Extract[Extraction] Airflow --> Generate[Generation] Airflow --> Embed[Embedding] Extract --> SageMaker[Amazon SageMaker] Generate --> SageMaker Embed --> SageMaker SageMaker --> DB[Customer Database]

Components

  1. Amazon S3: Storage for incoming files
  2. Apache Kafka: Message queue for handling file events
  3. Apache Airflow: Workflow management and orchestration
  4. Amazon SageMaker: ML model hosting and inference
  5. Customer Database: Final storage for processed data

Step 1: Set Up S3 Event Notifications

First, we'll set up S3 to send notifications when new files are added. We'll use AWS Lambda to forward these notifications to Kafka.

import json
import boto3
from kafka import KafkaProducer

def lambda_handler(event, context):
    s3 = boto3.client('s3')
    kafka_producer = KafkaProducer(bootstrap_servers=['your_kafka_broker:9092'])
    
    for record in event['Records']:
        bucket = record['s3']['bucket']['name']
        key = record['s3']['object']['key']
        
        # Get file metadata
        metadata = s3.head_object(Bucket=bucket, Key=key)
        file_type = metadata['ContentType']
        
        # Send message to Kafka
        message = json.dumps({
            'bucket': bucket,
            'key': key,
            'file_type': file_type
        })
        kafka_producer.send('s3_events', message.encode('utf-8'))
    
    kafka_producer.close()
    return {
        'statusCode': 200,
        'body': json.dumps('Messages sent to Kafka successfully')
    }

Step 2: Set Up Kafka Consumer in Airflow

Next, we'll create an Airflow DAG that consumes messages from Kafka and triggers the appropriate processing tasks.

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from kafka import KafkaConsumer
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 6, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'process_s3_files',
    default_args=default_args,
    description='Process S3 files based on Kafka messages',
    schedule_interval=timedelta(minutes=5),
)

def process_kafka_message():
    consumer = KafkaConsumer('s3_events', bootstrap_servers=['your_kafka_broker:9092'])
    for message in consumer:
        file_info = json.loads(message.value.decode('utf-8'))
        trigger_processing_tasks(file_info)

def trigger_processing_tasks(file_info):
    # Trigger appropriate tasks based on file type
    if file_info['file_type'].startswith('image'):
        process_image.execute(context={'file_info': file_info})
    elif file_info['file_type'].startswith('video'):
        process_video.execute(context={'file_info': file_info})
    # Add similar conditions for audio, text, and documents

process_kafka_messages = PythonOperator(
    task_id='process_kafka_messages',
    python_callable=process_kafka_message,
    dag=dag,
)

# Define task operators for each file type and processing step
process_image = PythonOperator(
    task_id='process_image',
    python_callable=process_image_file,
    provide_context=True,
    dag=dag,
)

# Define similar operators for video, audio, text, and documents

process_kafka_messages >> [process_image, process_video, process_audio, process_text, process_document]

Step 3: Implement Processing Tasks

Now, let's implement the processing tasks for each file type. We'll use SageMaker for ML inference.

import boto3
import sagemaker

def process_image_file(**context):
    file_info = context['file_info']
    s3_client = boto3.client('s3')
    sagemaker_runtime = boto3.client('sagemaker-runtime')

    # Download file from S3
    file_content = s3_client.get_object(Bucket=file_info['bucket'], Key=file_info['key'])['Body'].read()

    # Extraction
    extracted_data = extract_image_data(file_content)

    # Generation
    generated_data = generate_image_metadata(extracted_data)

    # Embedding
    embedded_data = create_image_embedding(extracted_data)

    # Store results in customer database
    store_results(file_info, extracted_data, generated_data, embedded_data)

def extract_image_data(file_content):
    # Use SageMaker endpoint for image extraction
    response = sagemaker_runtime.invoke_endpoint(
        EndpointName='image-extraction-endpoint',
        ContentType='application/x-image',
        Body=file_content
    )
    return json.loads(response['Body'].read().decode())

def generate_image_metadata(extracted_data):
    # Use SageMaker endpoint for metadata generation
    response = sagemaker_runtime.invoke_endpoint(
        EndpointName='image-metadata-generation-endpoint',
        ContentType='application/json',
        Body=json.dumps(extracted_data)
    )
    return json.loads(response['Body'].read().decode())

def create_image_embedding(extracted_data):
    # Use SageMaker endpoint for embedding creation
    response = sagemaker_runtime.invoke_endpoint(
        EndpointName='image-embedding-endpoint',
        ContentType='application/json',
        Body=json.dumps(extracted_data)
    )
    return json.loads(response['Body'].read().decode())

def store_results(file_info, extracted_data, generated_data, embedded_data):
    # Implement logic to store results in customer database
    pass

# Implement similar functions for video, audio, text, and document processing

Step 4: Set Up SageMaker Endpoints

To use custom ML models in SageMaker, you'll need to create and deploy endpoints for each task. Here's an example of how to deploy a custom model:

from sagemaker.pytorch import PyTorchModel

model_data = 's3://your-bucket/model.tar.gz'
role = 'arn:aws:iam::your-account-id:role/SageMakerRole'

pytorch_model = PyTorchModel(model_data=model_data,
                             role=role,
                             framework_version='1.8.0',
                             py_version='py3',
                             entry_point='inference.py')

predictor = pytorch_model.deploy(instance_type='ml.c5.xlarge',
                                 initial_instance_count=1,
                                 endpoint_name='your-endpoint-name')

Everything Together Now

This pipeline allows for parallel processing of multiple file types using custom ML tasks. The architecture is scalable and can be extended to handle more file types or processing steps as needed.

Here's a final diagram showing the complete flow:

graph TD S3[Amazon S3] --> Lambda[AWS Lambda] Lambda --> Kafka[Apache Kafka] Kafka --> Airflow[Apache Airflow] Airflow --> |Image| ExtractI[Extract Image] Airflow --> |Video| ExtractV[Extract Video] Airflow --> |Audio| ExtractA[Extract Audio] Airflow --> |Text| ExtractT[Extract Text] ExtractI --> GenerateI[Generate Image Metadata] ExtractV --> GenerateV[Generate Video Metadata] ExtractA --> GenerateA[Generate Audio Metadata] ExtractT --> GenerateT[Generate Text Metadata] GenerateI --> EmbedI[Embed Image] GenerateV --> EmbedV[Embed Video] GenerateA --> EmbedA[Embed Audio] GenerateT --> EmbedT[Embed Text] EmbedI --> SageMaker[Amazon SageMaker] EmbedV --> SageMaker EmbedA --> SageMaker EmbedT --> SageMaker SageMaker --> DB[Customer Database]

Top 3 Critical Challenges

  1. Data Consistency and Integrity:
    • Issue: If any step in the processing pipeline fails, it can lead to incomplete or inconsistent data in the final database.
    • Impact: This can result in unreliable analytics, flawed decision-making, and potential data corruption.
    • Solution: Implement transactional updates and rollback mechanisms. Ensure each processing step is idempotent and can be safely retried. Consider implementing a staging area before final database insertion to verify data integrity.
  2. Scalability and Performance Bottlenecks:
    • Issue: As data volume grows, certain components (e.g., SageMaker endpoints, Kafka brokers) may become overwhelmed, leading to increased latency or system failures.
    • Impact: This can result in delayed data processing, missed real-time insights, and potential data loss during high-traffic periods.
    • Solution: Implement auto-scaling for all components, especially SageMaker endpoints. Use Kafka partitioning effectively. Consider batch processing for non-real-time tasks. Regularly perform load testing to identify and address bottlenecks proactively.
  3. Operational Complexity and Maintenance Overhead:
    • Issue: Managing multiple complex systems (Kafka, Airflow, SageMaker) requires significant DevOps resources and expertise.
    • Impact: This can lead to increased operational costs, longer time-to-market for new features, and higher risk of configuration errors or system downtime.
    • Solution: Invest in comprehensive monitoring and alerting systems. Automate as much of the operational tasks as possible through Infrastructure as Code. Consider using managed services or platforms like Mixpeek that abstract away much of this complexity, allowing your team to focus on data insights rather than infrastructure management.

By focusing on these three critical areas, you can address the most significant risks to your multimodal data processing pipeline's reliability, performance, and manageability. Remember, while building such a system offers great flexibility, it also comes with substantial responsibilities. Platforms like Mixpeek can significantly reduce these challenges by providing a more integrated and managed solution.

Why Build This?

Use Cases

  1. Content Moderation: Automatically process and flag inappropriate content across various media types.
  2. Intelligent Search: Enable advanced search capabilities across different file types in your data lake.
  3. Recommendation Systems: Generate personalized recommendations based on user interactions with diverse content.
  4. Data Enrichment: Automatically enhance your datasets with AI-generated metadata and embeddings.
  5. Compliance and Legal: Scan documents and communications for potential legal or regulatory issues.

Benefits

  1. Replication:
    • The Kafka-based architecture allows for easy replication of data streams.
    • Multiple consumers can process the same data independently, enabling parallel workflows.
  2. Consistency:
    • Airflow ensures that processing tasks are executed in a consistent, ordered manner.
    • The use of SageMaker endpoints guarantees consistent ML model performance across all data processing.
  3. Scalability:
    • Each component (Kafka, Airflow, SageMaker) can be scaled independently based on workload.
    • Kafka can handle high-volume data ingestion.
    • Airflow can manage an increasing number of concurrent tasks.
    • SageMaker can auto-scale inference endpoints based on demand.
  4. Flexibility:
    • Easy to add new file types or processing steps by extending the Airflow DAG.
    • Custom ML models can be deployed and updated in SageMaker without disrupting the pipeline.

Simplifying with Mixpeek

While the system we've described is powerful and flexible, it requires significant setup and maintenance. Mixpeek offers a streamlined solution that accomplishes the same goals with much less complexity. Here's a glimpse of how Mixpeek simplifies this process:

Creating a Connection

from mixpeek import Mixpeek

mixpeek = Mixpeek('API_KEY')

mixpeek.connections.create(
    alias="my-mongo-test",
    engine="mongodb",
    details={
        "host": "your_host_address",
        "database": "your_database_name",
        "username": "your_username",
        "password": "your_password"
    }
)

This single API call replaces the need for manual setup of Kafka consumers and database connections.

Creating a Pipeline

💡
Note: we're sending the pipeline code as a string, but it's advised to use Github for CI/CD with the pipeline code: https://docs.mixpeek.com/pipelines/github
from mixpeek import Mixpeek
mixpeek = Mixpeek("API_KEY")

pipeline_id = mixpeek.pipelines.create(
  alias="VideoProcessingPipeline",
  code="""
def handler(event):
    mixpeek = Mixpeek("API_KEY")
    file_url = event.file_url(event['bucket'], event['key'])
    # process video into chunks
    processed_videos = mixpeek.tools.video.process(
        url=file_url,
        frame_interval=5,
        resolution=[720, 1280],
        return_base64=True
    )
    results = []
    for index, chunk in enumerate(processed_videos):
        print(f"embedding video chunk: {index}")
        # embed each chunk
        embed_response = mixpeek.embed.video(
            model_id="mixpeek/vuse-generic-v1",
            input=chunk['base64_string'],
            input_type="base64"
        )
        obj = {
            "embedding": embed_response['embedding'],
            "file_url": file_url,
            "metadata": {
                "time_start": chunk.start_time,
                "time_end": chunk.end_time,
                "duration": chunk.duration,
            }
        }
        results.append(obj)
    return results
  """,
  source={
    "connection_id": "conn_456",
    "filters": {
      "content_type": "video/*"
    }
  },
  destination={
    "connection_id": "conn_431",
    "collection": "video_embeddings",
    "metadata": {
      "type": "video_processing"
    }
  }
)

https://docs.mixpeek.com/use-cases/video_understanding

This single pipeline creation replaces the entire Airflow DAG setup, SageMaker endpoint management, and custom processing logic we implemented earlier.

By using Mixpeek, you can achieve the same multimodal data processing capabilities with significantly less code and infrastructure management. This allows you to focus on utilizing the processed data rather than maintaining the processing pipeline itself.

While building a custom solution gives you ultimate control, platforms like Mixpeek offer a compelling balance of power and simplicity, especially for teams that want to quickly implement advanced data processing without extensive DevOps overhead.

About the author
Ethan Steininger

Ethan Steininger

Probably outside.

Multimodal Makers | Mixpeek

Multimodal Pipelines for AI

Great! You’ve successfully signed up.

Welcome back! You've successfully signed in.

You've successfully subscribed to Multimodal Makers | Mixpeek.

Success! Check your email for magic link to sign-in.

Success! Your billing info has been updated.

Your billing was not updated.