Video is all around us from TikTok and YouTube to Surveillance footage and Lecture recordings. Indexing these videos when they're even longer than 1 minute to then become searchable, is computationally expensive. Now compound that to thousands of 1 minute clips or even a handful 10 minute videos.

By distributing the indexing workload to serverless functions, we're able to get closer to our goal of:

Index Time = Search Time

Once we get here, then we can theoretically achieve constant running time or O(1) time complexity. Let's walk through how we're going to achive this.

Let's Build it

As an example, we'll be using two different video sources:

  • Ten 1 minute clips
  • One 10 minute video

1. Chunk video up by N intervals and open up a Lambda function for each chunk

This script will open the video file and read it in chunks of the specified size. For each chunk, it will create a Lambda function using the boto3 library and the AWS Lambda API.

import boto3
import math

def chunk_video(video_file, chunk_size):
    # Open the video file
    with open(video_file, 'rb') as f:
        # Get the file size
        file_size = f.seek(0, 2)

        # Calculate the number of chunks
        num_chunks = math.ceil(file_size / chunk_size)

        # Iterate over the chunks
        for i in range(num_chunks):
            # Calculate the chunk start and end offset
            start = i * chunk_size
            end = min((i + 1) * chunk_size, file_size)

            # Seek to the start of the chunk
            f.seek(start)

            # Read the chunk
            chunk = f.read(end - start)

            # Create a Lambda function for the chunk
            create_lambda_function(chunk)

def create_lambda_function(chunk):
    # Create a Lambda client
    lambda_client = boto3.client('lambda')

    # Create the Lambda function
    lambda_client.create_function(
        FunctionName='my_lambda_function',
        Runtime='python3.7',
        Role='arn:aws:iam::1234567890:role/lambda_role',
        Handler='handler.lambda_handler',
        Code={
            'ZipFile': chunk
        },
        Description='My Lambda function',
        Timeout=300,
        MemorySize=128
    )

# Chunk the video file into 1MB intervals
chunk_video('video.mp4', 1024 * 1024)

2. Save each chunk to conduct audio and video extractions

This function will return an object with the transcription and a timestamp in Unix time. You can customize this function to fit your specific needs. For example, you may want to include additional information in the return object, such as the language of the transcription or the confidence level of the transcription.

import speech_recognition as sr

def transcribe_video(video_file):
    # Initialize the recognizer
    r = sr.Recognizer()

    # Open the video file
    with sr.AudioFile(video_file) as source:
        # Read the audio data from the file
        audio_data = r.record(source)

    # Transcribe the audio data to text
    transcription = r.recognize_google(audio_data)

    # Create the return object with the transcription and timestamp
    transcription_object = {
        'transcription': transcription,
        'timestamp': datetime.datetime.now().timestamp()
    }

    return transcription_object

3. Store the transcription, timestamp and filenames as an array of objects

[
  {
    "transcription": "lorem ipsum...",
    "timestamp": 1673280415,
    // 30 second intervals indicating the end
    "timestamp_interval": 30000,
    "filename": "video_file_segment_0-30.mp4"
  }
]

4. Run each transcription text and N frames through a vector embedding

Embeddings allow us to perform natural language search on our transcribed videos and the content in the videos themselves.

For more information on transformers check out vectorsearch.dev

Here, we'll use one of the popular Sentence Transformer libraries which accepts strings and returns vectors:

from sentence_transformers import SentenceTransformer

model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')

def transform_transcriptions(transcription_objects, transformer):
  transformed_transcriptions = []
  for obj in transcription_objects:
  
    # Get the transcription value for the current object
    transcription = obj['transcription']
    
    # Use the transformer to transform the transcription
    transcription_vector = model.encode(transcription).tolist()
    
    # Append the transformed transcription to the list
    transformed_transcriptions.append(transcription_vector)
    
  return transformed_transcriptions

This time we'll use the same library to convert the video frames into their corresponding vector representation:

from sentence_transformers import SentenceTransformer
import cv2

model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')

def transform_video_frames(video_path, transformer, num_frames=10):
  # Open the video file using OpenCV
  video = cv2.VideoCapture(video_path)

  # Initialize a list to store the transformed frames
  transformed_frames = []

  # Calculate the interval between frames
  video_length = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
  interval = video_length // num_frames

  # Iterate through the video frames
  for i in range(video_length):
    # Read the frame at the current position
    _, frame = video.read()

    # Check if the interval has been reached
    if i % interval == 0:
      # Transform the frame using the transformer function
      transformed_frame = model.encode(frame).tolist()
      
      # Append the transformed frame to the list
      transformed_frames.append(transformed_frame)

  # Close the video file
  video.release()

  return transformed_frames

5. Store results in a vector search engine

Now we have our full objects, like:

[
  {
    "transcription": "lorem ipsum...",
    "timestamp": 1673280415,
    "timestamp_interval": 30000,
    "filename": "video_file_segment_0-30.mp4",
    "embeddings": {
      "frames": [
        [1, 2, 3, 4, 5],
        [6, 7, 8, 9, 10],
        [11, 12, 13, 14]
      ],
      "transcription": [1, 2, 3, 4, 5]
    }
  }
]

Notice how we have an array of frame embeddings in embeddings.frames, this allows us to traverse the entire list of frames, and understand that because we split by N number of frames (10 by default) we can assume which frame matches each index, like frame = index * 10

6. Search for terms and phrases

Now we can finally search for terms across the entire stream of videos that span both the audio and imagery, we'll be using OpenSearch

vector_query = model.encode("18 wheeler with a green front").tolist()
{
    "size": 2,
    "query": {
        "knn": {
            "embeddings.frames": {
                "vector": vector_query,
                "k": 2
            },
            "embeddings.transcription": {
                "vector": vector_query,
                "k": 2
            }            
        }
    }
}
note: you will have to create the kNN index first, see more here: https://opensearch.org/docs/latest/search-plugins/knn/index/

All This Messiness and Complexity in One API Call

from mixpeek import Mixpeek

mix = Mixpeek(
    api_key="mixpeek_api_key",
    access_key="aws_access_key",
    secret_key="aws_secret_key",
    region="region"
)

# this opens up a listener stream on your S3 bucket
# so as you upload video files, they are incrementally indexed
mix.index_bucket("mixpeek-public-demo", stream=True)

Where Else Can we Use This?

  • Media Streaming - Incrementally indexing streams of video content
  • Memory Recall - Using some kind of continuous recording device (like a GoPro) to retrieve memories quickly
  • Surveillance Footage - Identify key pieces of historical recordings
About the author
Ethan Steininger

Ethan Steininger

Former GTM Lead of MongoDB's NLP platform, Atlas Search. Occasionally off the grid in his self-converted camper van.

Multimodal Makers | Mixpeek

Multimodal Pipelines for AI

Great! You’ve successfully signed up.

Welcome back! You've successfully signed in.

You've successfully subscribed to Multimodal Makers | Mixpeek.

Success! Check your email for magic link to sign-in.

Success! Your billing info has been updated.

Your billing was not updated.