Skip to content

AI Understanding

Analyze videos, transcribe audio, and describe visual content.

Backend Support

Class local openai gemini elevenlabs
ImageToText BLIP GPT-4o Gemini -
AudioToText Whisper Whisper API Gemini -
LLMSummarizer Ollama GPT-4o Gemini -
ObjectDetector YOLO GPT-4o Gemini -
TextDetector EasyOCR GPT-4o Gemini -
FaceDetector OpenCV - - -
ShotTypeClassifier - GPT-4o Gemini -
CameraMotionDetector OpenCV - - -

AudioToText

AudioToText

Transcription service for audio and video.

Source code in src/videopython/ai/understanding/audio.py
class AudioToText:
    """Transcription service for audio and video."""

    SUPPORTED_BACKENDS: list[str] = ["local", "openai", "gemini"]

    def __init__(
        self,
        backend: AudioToTextBackend | None = None,
        model_name: Literal["tiny", "base", "small", "medium", "large", "turbo"] = "small",
        enable_diarization: bool = False,
        device: str = "cpu",
        compute_type: str = "float32",
        api_key: str | None = None,
    ):
        """Initialize the audio-to-text transcriber.

        Args:
            backend: Backend to use. If None, uses config default or 'local'.
            model_name: Whisper model for local backend.
            enable_diarization: Enable speaker diarization (local backend only).
            device: Device for local backend ('cuda' or 'cpu').
            compute_type: Compute type for local backend.
            api_key: API key for cloud backends. If None, reads from environment.
        """
        resolved_backend: str = backend if backend is not None else get_default_backend("audio_to_text")
        if resolved_backend not in self.SUPPORTED_BACKENDS:
            raise UnsupportedBackendError(resolved_backend, self.SUPPORTED_BACKENDS)

        self.backend: AudioToTextBackend = resolved_backend  # type: ignore[assignment]
        self.model_name = model_name
        self.enable_diarization = enable_diarization
        self.device = device
        self.compute_type = compute_type
        self.api_key = api_key

        self._model: Any = None

    def _init_local(self) -> None:
        """Initialize local Whisper model."""
        if self.enable_diarization:
            import whisperx  # type: ignore

            self._model = whisperx.load_model(self.model_name, device=self.device, compute_type=self.compute_type)
        else:
            import whisper

            self._model = whisper.load_model(name=self.model_name)

    def _process_transcription_result(self, transcription_result: dict) -> Transcription:
        """Process raw transcription result into Transcription object."""
        transcription_segments = []
        for segment in transcription_result["segments"]:
            transcription_words = [
                TranscriptionWord(word=word["word"], start=float(word["start"]), end=float(word["end"]))
                for word in segment.get("words", [])
            ]
            transcription_segment = TranscriptionSegment(
                start=segment["start"],
                end=segment["end"],
                text=segment["text"],
                words=transcription_words,
            )
            transcription_segments.append(transcription_segment)

        return Transcription(segments=transcription_segments)

    def _process_whisperx_result(self, whisperx_result: dict, audio_data) -> Transcription:
        """Process whisperx result with diarization."""
        import whisperx  # type: ignore

        model_a, metadata = whisperx.load_align_model(language_code=whisperx_result["language"], device=self.device)
        aligned_result = whisperx.align(
            whisperx_result["segments"],
            model_a,
            metadata,
            audio_data,
            self.device,
            return_char_alignments=False,
        )

        diarize_model = whisperx.diarize.DiarizationPipeline(device=self.device)
        diarize_segments = diarize_model(audio_data)
        result_with_speakers = whisperx.assign_word_speakers(diarize_segments, aligned_result)

        words = []
        for item in result_with_speakers["word_segments"]:
            words.append(
                TranscriptionWord(
                    word=item["word"],
                    start=item["start"],
                    end=item["end"],
                    speaker=item.get("speaker", None),
                )
            )

        return Transcription(words=words)

    async def _transcribe_local(self, audio: Audio) -> Transcription:
        """Transcribe using local Whisper model."""
        import whisper

        if self._model is None:
            await asyncio.to_thread(self._init_local)

        audio_mono = audio.to_mono().resample(whisper.audio.SAMPLE_RATE)

        def _run_whisper() -> Transcription:
            if self.enable_diarization:
                audio_data = audio_mono.data
                transcription_result = self._model.transcribe(audio_data)
                return self._process_whisperx_result(transcription_result, audio_data)
            else:
                transcription_result = self._model.transcribe(audio=audio_mono.data, word_timestamps=True)
                return self._process_transcription_result(transcription_result)

        return await asyncio.to_thread(_run_whisper)

    async def _transcribe_openai(self, audio: Audio) -> Transcription:
        """Transcribe using OpenAI Whisper API."""
        from openai import AsyncOpenAI

        api_key = get_api_key("openai", self.api_key)
        client = AsyncOpenAI(api_key=api_key)

        # Convert audio to file-like object (WAV format)
        # Save to temp file first, then read into BytesIO
        with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
            audio.save(f.name)
            temp_path = f.name

        audio_bytes = io.BytesIO(Path(temp_path).read_bytes())
        audio_bytes.name = "audio.wav"
        Path(temp_path).unlink()  # Clean up temp file

        response = await client.audio.transcriptions.create(
            model="whisper-1",
            file=audio_bytes,
            response_format="verbose_json",
            timestamp_granularities=["word", "segment"],
        )

        # Convert OpenAI response to Transcription
        segments = []
        for segment in response.segments or []:
            words = []
            # OpenAI may include words in segment
            for word in getattr(response, "words", []) or []:
                if segment.start <= word.start < segment.end:
                    words.append(
                        TranscriptionWord(
                            word=word.word,
                            start=word.start,
                            end=word.end,
                        )
                    )

            segments.append(
                TranscriptionSegment(
                    start=segment.start,
                    end=segment.end,
                    text=segment.text,
                    words=words,
                )
            )

        return Transcription(segments=segments)

    async def _transcribe_gemini(self, audio: Audio) -> Transcription:
        """Transcribe using Google Gemini."""
        import google.generativeai as genai

        api_key = get_api_key("gemini", self.api_key)
        genai.configure(api_key=api_key)

        # Save audio to temp file (Gemini needs file path or bytes)
        with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
            audio.save(f.name)
            temp_path = f.name

        model = genai.GenerativeModel("gemini-2.0-flash")

        def _run_gemini() -> str:
            # Upload audio file
            audio_file = genai.upload_file(temp_path)

            response = model.generate_content(
                [
                    audio_file,
                    "Transcribe this audio. Return only the transcription text, nothing else.",
                ]
            )
            return response.text

        try:
            transcription_text = await asyncio.to_thread(_run_gemini)
        finally:
            import os

            os.unlink(temp_path)

        # Gemini doesn't provide timestamps, create a single segment
        return Transcription(
            segments=[
                TranscriptionSegment(
                    start=0.0,
                    end=audio.metadata.duration_seconds,
                    text=transcription_text.strip(),
                    words=[],
                )
            ]
        )

    async def transcribe(self, media: Audio | Video) -> Transcription:
        """Transcribe audio or video to text.

        Args:
            media: Audio or Video to transcribe.

        Returns:
            Transcription object with segments of text and their timestamps.
        """
        if isinstance(media, Video):
            if media.audio.is_silent:
                return Transcription(segments=[])
            audio = media.audio
        elif isinstance(media, Audio):
            if media.is_silent:
                return Transcription(segments=[])
            audio = media
        else:
            raise TypeError(f"Unsupported media type: {type(media)}. Expected Audio or Video.")

        if self.backend == "local":
            return await self._transcribe_local(audio)
        elif self.backend == "openai":
            return await self._transcribe_openai(audio)
        elif self.backend == "gemini":
            return await self._transcribe_gemini(audio)
        else:
            raise UnsupportedBackendError(self.backend, self.SUPPORTED_BACKENDS)

__init__

__init__(
    backend: AudioToTextBackend | None = None,
    model_name: Literal[
        "tiny", "base", "small", "medium", "large", "turbo"
    ] = "small",
    enable_diarization: bool = False,
    device: str = "cpu",
    compute_type: str = "float32",
    api_key: str | None = None,
)

Initialize the audio-to-text transcriber.

Parameters:

Name Type Description Default
backend AudioToTextBackend | None

Backend to use. If None, uses config default or 'local'.

None
model_name Literal['tiny', 'base', 'small', 'medium', 'large', 'turbo']

Whisper model for local backend.

'small'
enable_diarization bool

Enable speaker diarization (local backend only).

False
device str

Device for local backend ('cuda' or 'cpu').

'cpu'
compute_type str

Compute type for local backend.

'float32'
api_key str | None

API key for cloud backends. If None, reads from environment.

None
Source code in src/videopython/ai/understanding/audio.py
def __init__(
    self,
    backend: AudioToTextBackend | None = None,
    model_name: Literal["tiny", "base", "small", "medium", "large", "turbo"] = "small",
    enable_diarization: bool = False,
    device: str = "cpu",
    compute_type: str = "float32",
    api_key: str | None = None,
):
    """Initialize the audio-to-text transcriber.

    Args:
        backend: Backend to use. If None, uses config default or 'local'.
        model_name: Whisper model for local backend.
        enable_diarization: Enable speaker diarization (local backend only).
        device: Device for local backend ('cuda' or 'cpu').
        compute_type: Compute type for local backend.
        api_key: API key for cloud backends. If None, reads from environment.
    """
    resolved_backend: str = backend if backend is not None else get_default_backend("audio_to_text")
    if resolved_backend not in self.SUPPORTED_BACKENDS:
        raise UnsupportedBackendError(resolved_backend, self.SUPPORTED_BACKENDS)

    self.backend: AudioToTextBackend = resolved_backend  # type: ignore[assignment]
    self.model_name = model_name
    self.enable_diarization = enable_diarization
    self.device = device
    self.compute_type = compute_type
    self.api_key = api_key

    self._model: Any = None

transcribe async

transcribe(media: Audio | Video) -> Transcription

Transcribe audio or video to text.

Parameters:

Name Type Description Default
media Audio | Video

Audio or Video to transcribe.

required

Returns:

Type Description
Transcription

Transcription object with segments of text and their timestamps.

Source code in src/videopython/ai/understanding/audio.py
async def transcribe(self, media: Audio | Video) -> Transcription:
    """Transcribe audio or video to text.

    Args:
        media: Audio or Video to transcribe.

    Returns:
        Transcription object with segments of text and their timestamps.
    """
    if isinstance(media, Video):
        if media.audio.is_silent:
            return Transcription(segments=[])
        audio = media.audio
    elif isinstance(media, Audio):
        if media.is_silent:
            return Transcription(segments=[])
        audio = media
    else:
        raise TypeError(f"Unsupported media type: {type(media)}. Expected Audio or Video.")

    if self.backend == "local":
        return await self._transcribe_local(audio)
    elif self.backend == "openai":
        return await self._transcribe_openai(audio)
    elif self.backend == "gemini":
        return await self._transcribe_gemini(audio)
    else:
        raise UnsupportedBackendError(self.backend, self.SUPPORTED_BACKENDS)

ImageToText

ImageToText

Generates text descriptions of images.

Source code in src/videopython/ai/understanding/image.py
class ImageToText:
    """Generates text descriptions of images."""

    SUPPORTED_BACKENDS: list[str] = ["local", "openai", "gemini"]

    def __init__(
        self,
        backend: ImageToTextBackend | None = None,
        device: str | None = None,
        num_dominant_colors: int = 5,
        api_key: str | None = None,
    ):
        """Initialize image-to-text model.

        Args:
            backend: Backend to use. If None, uses config default or 'local'.
            device: Device for local backend ('cuda' or 'cpu').
            num_dominant_colors: Number of dominant colors for color analysis.
            api_key: API key for cloud backends. If None, reads from environment.
        """
        resolved_backend: str = backend if backend is not None else get_default_backend("image_to_text")
        if resolved_backend not in self.SUPPORTED_BACKENDS:
            raise UnsupportedBackendError(resolved_backend, self.SUPPORTED_BACKENDS)

        self.backend: ImageToTextBackend = resolved_backend  # type: ignore[assignment]
        self.device = device
        self.api_key = api_key
        self.color_analyzer = ColorAnalyzer(num_dominant_colors=num_dominant_colors)

        self._processor: Any = None
        self._model: Any = None

    def _init_local(self) -> None:
        """Initialize local BLIP model."""
        import torch
        from transformers import BlipForConditionalGeneration, BlipProcessor

        device = self.device
        if device is None:
            device = "cuda" if torch.cuda.is_available() else "cpu"

        model_name = "Salesforce/blip-image-captioning-large"
        self._processor = BlipProcessor.from_pretrained(model_name)
        self._model = BlipForConditionalGeneration.from_pretrained(model_name)
        self._model.to(device)
        self.device = device

    def _image_to_base64(self, image: Image.Image) -> str:
        """Convert PIL Image to base64 string."""
        buffer = io.BytesIO()
        image.save(buffer, format="PNG")
        return base64.b64encode(buffer.getvalue()).decode()

    async def _describe_local(
        self,
        image: np.ndarray | Image.Image,
        prompt: str | None,
    ) -> str:
        """Generate description using local BLIP model."""
        if self._model is None:
            await asyncio.to_thread(self._init_local)

        def _run_model() -> str:
            # Convert numpy array to PIL Image if needed
            pil_image = image
            if isinstance(image, np.ndarray):
                pil_image = Image.fromarray(image)

            inputs = self._processor(pil_image, prompt, return_tensors="pt").to(self.device)
            output = self._model.generate(**inputs, max_new_tokens=50)
            return self._processor.decode(output[0], skip_special_tokens=True)

        return await asyncio.to_thread(_run_model)

    async def _describe_openai(
        self,
        image: np.ndarray | Image.Image,
        prompt: str | None,
    ) -> str:
        """Generate description using OpenAI GPT-4o."""
        from openai import AsyncOpenAI

        api_key = get_api_key("openai", self.api_key)
        client = AsyncOpenAI(api_key=api_key)

        # Convert to PIL Image if needed
        if isinstance(image, np.ndarray):
            image = Image.fromarray(image)

        image_base64 = self._image_to_base64(image)

        system_prompt = "You are an image analysis assistant. Describe images concisely."
        user_prompt = prompt or "Describe this image in 1-2 sentences."

        response = await client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {"role": "system", "content": system_prompt},
                {
                    "role": "user",
                    "content": [
                        {"type": "text", "text": user_prompt},
                        {
                            "type": "image_url",
                            "image_url": {"url": f"data:image/png;base64,{image_base64}"},
                        },
                    ],
                },
            ],
            max_tokens=100,
        )

        return response.choices[0].message.content or ""

    async def _describe_gemini(
        self,
        image: np.ndarray | Image.Image,
        prompt: str | None,
    ) -> str:
        """Generate description using Google Gemini."""
        import google.generativeai as genai

        api_key = get_api_key("gemini", self.api_key)
        genai.configure(api_key=api_key)

        # Convert to PIL Image if needed
        if isinstance(image, np.ndarray):
            image = Image.fromarray(image)

        model = genai.GenerativeModel("gemini-2.0-flash")
        user_prompt = prompt or "Describe this image in 1-2 sentences."

        # Gemini's generate_content is sync, wrap in thread
        def _run_gemini() -> str:
            response = model.generate_content([user_prompt, image])
            return response.text

        return await asyncio.to_thread(_run_gemini)

    async def describe_image(
        self,
        image: np.ndarray | Image.Image,
        prompt: str | None = None,
    ) -> str:
        """Generate a text description of an image.

        Args:
            image: Image as numpy array (H, W, 3) in RGB format or PIL Image.
            prompt: Optional text prompt to guide the description.

        Returns:
            Text description of the image.
        """
        if self.backend == "local":
            return await self._describe_local(image, prompt)
        elif self.backend == "openai":
            return await self._describe_openai(image, prompt)
        elif self.backend == "gemini":
            return await self._describe_gemini(image, prompt)
        else:
            raise UnsupportedBackendError(self.backend, self.SUPPORTED_BACKENDS)

    async def describe_frame(
        self,
        video: Video,
        frame_index: int,
        prompt: str | None = None,
        extract_colors: bool = False,
        include_full_histogram: bool = False,
    ) -> FrameDescription:
        """Describe a specific frame from a video.

        Args:
            video: Video object.
            frame_index: Index of the frame to describe.
            prompt: Optional text prompt to guide the description.
            extract_colors: Whether to extract color features from the frame.
            include_full_histogram: Whether to include full HSV histogram.

        Returns:
            FrameDescription object with the frame description.
        """
        if frame_index < 0 or frame_index >= len(video.frames):
            raise ValueError(f"frame_index {frame_index} out of bounds for video with {len(video.frames)} frames")

        frame = video.frames[frame_index]
        description = await self.describe_image(frame, prompt)
        timestamp = frame_index / video.fps

        color_histogram = None
        if extract_colors:
            color_histogram = self.color_analyzer.extract_color_features(frame, include_full_histogram)

        return FrameDescription(
            frame_index=frame_index,
            timestamp=timestamp,
            description=description,
            color_histogram=color_histogram,
        )

    async def describe_frames(
        self,
        video: Video,
        frame_indices: list[int],
        prompt: str | None = None,
        extract_colors: bool = False,
        include_full_histogram: bool = False,
    ) -> list[FrameDescription]:
        """Describe multiple frames from a video.

        Args:
            video: Video object.
            frame_indices: List of frame indices to describe.
            prompt: Optional text prompt to guide the descriptions.
            extract_colors: Whether to extract color features.
            include_full_histogram: Whether to include full HSV histogram.

        Returns:
            List of FrameDescription objects.
        """
        # Process frames concurrently
        tasks = [
            self.describe_frame(video, idx, prompt, extract_colors, include_full_histogram) for idx in frame_indices
        ]
        return await asyncio.gather(*tasks)

    async def describe_scene(
        self,
        video: Video,
        scene: SceneDescription,
        frames_per_second: float = 1.0,
        prompt: str | None = None,
        extract_colors: bool = False,
        include_full_histogram: bool = False,
    ) -> list[FrameDescription]:
        """Describe frames from a scene, sampling at the specified rate.

        Args:
            video: Video object.
            scene: SceneDescription to analyze.
            frames_per_second: Frame sampling rate.
            prompt: Optional text prompt to guide the descriptions.
            extract_colors: Whether to extract color features.
            include_full_histogram: Whether to include full HSV histogram.

        Returns:
            List of FrameDescription objects for the sampled frames.
        """
        if frames_per_second <= 0:
            raise ValueError("frames_per_second must be positive")

        frame_interval = max(1, int(video.fps / frames_per_second))
        frame_indices = list(range(scene.start_frame, scene.end_frame, frame_interval))

        if not frame_indices:
            frame_indices = [scene.start_frame]

        return await self.describe_frames(video, frame_indices, prompt, extract_colors, include_full_histogram)

__init__

__init__(
    backend: ImageToTextBackend | None = None,
    device: str | None = None,
    num_dominant_colors: int = 5,
    api_key: str | None = None,
)

Initialize image-to-text model.

Parameters:

Name Type Description Default
backend ImageToTextBackend | None

Backend to use. If None, uses config default or 'local'.

None
device str | None

Device for local backend ('cuda' or 'cpu').

None
num_dominant_colors int

Number of dominant colors for color analysis.

5
api_key str | None

API key for cloud backends. If None, reads from environment.

None
Source code in src/videopython/ai/understanding/image.py
def __init__(
    self,
    backend: ImageToTextBackend | None = None,
    device: str | None = None,
    num_dominant_colors: int = 5,
    api_key: str | None = None,
):
    """Initialize image-to-text model.

    Args:
        backend: Backend to use. If None, uses config default or 'local'.
        device: Device for local backend ('cuda' or 'cpu').
        num_dominant_colors: Number of dominant colors for color analysis.
        api_key: API key for cloud backends. If None, reads from environment.
    """
    resolved_backend: str = backend if backend is not None else get_default_backend("image_to_text")
    if resolved_backend not in self.SUPPORTED_BACKENDS:
        raise UnsupportedBackendError(resolved_backend, self.SUPPORTED_BACKENDS)

    self.backend: ImageToTextBackend = resolved_backend  # type: ignore[assignment]
    self.device = device
    self.api_key = api_key
    self.color_analyzer = ColorAnalyzer(num_dominant_colors=num_dominant_colors)

    self._processor: Any = None
    self._model: Any = None

describe_image async

describe_image(
    image: ndarray | Image, prompt: str | None = None
) -> str

Generate a text description of an image.

Parameters:

Name Type Description Default
image ndarray | Image

Image as numpy array (H, W, 3) in RGB format or PIL Image.

required
prompt str | None

Optional text prompt to guide the description.

None

Returns:

Type Description
str

Text description of the image.

Source code in src/videopython/ai/understanding/image.py
async def describe_image(
    self,
    image: np.ndarray | Image.Image,
    prompt: str | None = None,
) -> str:
    """Generate a text description of an image.

    Args:
        image: Image as numpy array (H, W, 3) in RGB format or PIL Image.
        prompt: Optional text prompt to guide the description.

    Returns:
        Text description of the image.
    """
    if self.backend == "local":
        return await self._describe_local(image, prompt)
    elif self.backend == "openai":
        return await self._describe_openai(image, prompt)
    elif self.backend == "gemini":
        return await self._describe_gemini(image, prompt)
    else:
        raise UnsupportedBackendError(self.backend, self.SUPPORTED_BACKENDS)

describe_frame async

describe_frame(
    video: Video,
    frame_index: int,
    prompt: str | None = None,
    extract_colors: bool = False,
    include_full_histogram: bool = False,
) -> FrameDescription

Describe a specific frame from a video.

Parameters:

Name Type Description Default
video Video

Video object.

required
frame_index int

Index of the frame to describe.

required
prompt str | None

Optional text prompt to guide the description.

None
extract_colors bool

Whether to extract color features from the frame.

False
include_full_histogram bool

Whether to include full HSV histogram.

False

Returns:

Type Description
FrameDescription

FrameDescription object with the frame description.

Source code in src/videopython/ai/understanding/image.py
async def describe_frame(
    self,
    video: Video,
    frame_index: int,
    prompt: str | None = None,
    extract_colors: bool = False,
    include_full_histogram: bool = False,
) -> FrameDescription:
    """Describe a specific frame from a video.

    Args:
        video: Video object.
        frame_index: Index of the frame to describe.
        prompt: Optional text prompt to guide the description.
        extract_colors: Whether to extract color features from the frame.
        include_full_histogram: Whether to include full HSV histogram.

    Returns:
        FrameDescription object with the frame description.
    """
    if frame_index < 0 or frame_index >= len(video.frames):
        raise ValueError(f"frame_index {frame_index} out of bounds for video with {len(video.frames)} frames")

    frame = video.frames[frame_index]
    description = await self.describe_image(frame, prompt)
    timestamp = frame_index / video.fps

    color_histogram = None
    if extract_colors:
        color_histogram = self.color_analyzer.extract_color_features(frame, include_full_histogram)

    return FrameDescription(
        frame_index=frame_index,
        timestamp=timestamp,
        description=description,
        color_histogram=color_histogram,
    )

describe_frames async

describe_frames(
    video: Video,
    frame_indices: list[int],
    prompt: str | None = None,
    extract_colors: bool = False,
    include_full_histogram: bool = False,
) -> list[FrameDescription]

Describe multiple frames from a video.

Parameters:

Name Type Description Default
video Video

Video object.

required
frame_indices list[int]

List of frame indices to describe.

required
prompt str | None

Optional text prompt to guide the descriptions.

None
extract_colors bool

Whether to extract color features.

False
include_full_histogram bool

Whether to include full HSV histogram.

False

Returns:

Type Description
list[FrameDescription]

List of FrameDescription objects.

Source code in src/videopython/ai/understanding/image.py
async def describe_frames(
    self,
    video: Video,
    frame_indices: list[int],
    prompt: str | None = None,
    extract_colors: bool = False,
    include_full_histogram: bool = False,
) -> list[FrameDescription]:
    """Describe multiple frames from a video.

    Args:
        video: Video object.
        frame_indices: List of frame indices to describe.
        prompt: Optional text prompt to guide the descriptions.
        extract_colors: Whether to extract color features.
        include_full_histogram: Whether to include full HSV histogram.

    Returns:
        List of FrameDescription objects.
    """
    # Process frames concurrently
    tasks = [
        self.describe_frame(video, idx, prompt, extract_colors, include_full_histogram) for idx in frame_indices
    ]
    return await asyncio.gather(*tasks)

describe_scene async

describe_scene(
    video: Video,
    scene: SceneDescription,
    frames_per_second: float = 1.0,
    prompt: str | None = None,
    extract_colors: bool = False,
    include_full_histogram: bool = False,
) -> list[FrameDescription]

Describe frames from a scene, sampling at the specified rate.

Parameters:

Name Type Description Default
video Video

Video object.

required
scene SceneDescription

SceneDescription to analyze.

required
frames_per_second float

Frame sampling rate.

1.0
prompt str | None

Optional text prompt to guide the descriptions.

None
extract_colors bool

Whether to extract color features.

False
include_full_histogram bool

Whether to include full HSV histogram.

False

Returns:

Type Description
list[FrameDescription]

List of FrameDescription objects for the sampled frames.

Source code in src/videopython/ai/understanding/image.py
async def describe_scene(
    self,
    video: Video,
    scene: SceneDescription,
    frames_per_second: float = 1.0,
    prompt: str | None = None,
    extract_colors: bool = False,
    include_full_histogram: bool = False,
) -> list[FrameDescription]:
    """Describe frames from a scene, sampling at the specified rate.

    Args:
        video: Video object.
        scene: SceneDescription to analyze.
        frames_per_second: Frame sampling rate.
        prompt: Optional text prompt to guide the descriptions.
        extract_colors: Whether to extract color features.
        include_full_histogram: Whether to include full HSV histogram.

    Returns:
        List of FrameDescription objects for the sampled frames.
    """
    if frames_per_second <= 0:
        raise ValueError("frames_per_second must be positive")

    frame_interval = max(1, int(video.fps / frames_per_second))
    frame_indices = list(range(scene.start_frame, scene.end_frame, frame_interval))

    if not frame_indices:
        frame_indices = [scene.start_frame]

    return await self.describe_frames(video, frame_indices, prompt, extract_colors, include_full_histogram)

LLMSummarizer

LLMSummarizer

Generates coherent summaries of video content using LLMs.

Source code in src/videopython/ai/understanding/text.py
class LLMSummarizer:
    """Generates coherent summaries of video content using LLMs."""

    SUPPORTED_BACKENDS: list[str] = ["local", "openai", "gemini"]

    def __init__(
        self,
        backend: LLMBackend | None = None,
        model: str | None = None,
        api_key: str | None = None,
        timeout: float = 30.0,
    ):
        """Initialize the LLM summarizer.

        Args:
            backend: Backend to use. If None, uses config default or 'local'.
            model: Model name (backend-specific). If None, uses default per backend.
            api_key: API key for cloud backends. If None, reads from environment.
            timeout: Request timeout in seconds.
        """
        resolved_backend: str = backend if backend is not None else get_default_backend("llm_summarizer")
        if resolved_backend not in self.SUPPORTED_BACKENDS:
            raise UnsupportedBackendError(resolved_backend, self.SUPPORTED_BACKENDS)

        self.backend: LLMBackend = resolved_backend  # type: ignore[assignment]
        self.model = model
        self.api_key = api_key
        self.timeout = timeout

    def _get_model_name(self) -> str:
        """Get the model name for the current backend."""
        if self.model:
            return self.model

        if self.backend == "local":
            return "llama3.2"
        elif self.backend == "openai":
            return "gpt-4o"
        elif self.backend == "gemini":
            return "gemini-2.0-flash"
        else:
            return "llama3.2"

    async def _generate_local(self, prompt: str) -> str:
        """Generate text using local Ollama."""
        import ollama

        model = self._get_model_name()

        def _run_ollama() -> str:
            response = ollama.generate(
                model=model,
                prompt=prompt,
                options={"temperature": 0.3, "num_predict": 150},
            )
            return response["response"].strip()

        return await asyncio.to_thread(_run_ollama)

    async def _generate_openai(self, prompt: str) -> str:
        """Generate text using OpenAI."""
        from openai import AsyncOpenAI

        api_key = get_api_key("openai", self.api_key)
        client = AsyncOpenAI(api_key=api_key)

        model = self._get_model_name()

        response = await client.chat.completions.create(
            model=model,
            messages=[{"role": "user", "content": prompt}],
            max_tokens=150,
            temperature=0.3,
        )

        return response.choices[0].message.content or ""

    async def _generate_gemini(self, prompt: str) -> str:
        """Generate text using Google Gemini."""
        import google.generativeai as genai

        api_key = get_api_key("gemini", self.api_key)
        genai.configure(api_key=api_key)

        model_name = self._get_model_name()
        model = genai.GenerativeModel(model_name)

        def _run_gemini() -> str:
            response = model.generate_content(
                prompt,
                generation_config=genai.GenerationConfig(
                    temperature=0.3,
                    max_output_tokens=150,
                ),
            )
            return response.text

        return await asyncio.to_thread(_run_gemini)

    async def _generate(self, prompt: str) -> str:
        """Generate text using the configured backend."""
        if self.backend == "local":
            return await self._generate_local(prompt)
        elif self.backend == "openai":
            return await self._generate_openai(prompt)
        elif self.backend == "gemini":
            return await self._generate_gemini(prompt)
        else:
            raise UnsupportedBackendError(self.backend, self.SUPPORTED_BACKENDS)

    async def summarize_scene(self, frame_descriptions: list[tuple[float, str]]) -> str:
        """Generate a coherent summary of a scene from frame descriptions.

        Args:
            frame_descriptions: List of (timestamp, description) tuples for frames.

        Returns:
            2-3 sentence coherent summary of the scene.
        """
        if not frame_descriptions:
            return "Empty scene with no frames."

        frames_text = "\n".join([f"- At {ts:.2f}s: {desc}" for ts, desc in frame_descriptions])

        prompt = f"""You are analyzing a video scene. Below are descriptions of individual frames sampled from \
this scene:

{frames_text}

Task: Write a coherent 2-3 sentence summary of what happens in this scene. Focus on:
- Main actions or events
- Key objects or people present
- Any changes or progression within the scene

Remove redundancy and synthesize the information into a flowing narrative. Be concise and specific.

Summary:"""

        try:
            return await self._generate(prompt)
        except Exception:
            # Fallback: return concatenated descriptions
            return " ".join([desc for _, desc in frame_descriptions])

    async def summarize_video(self, scene_summaries: list[tuple[float, float, str]]) -> str:
        """Generate a high-level summary of the entire video from scene summaries.

        Args:
            scene_summaries: List of (start_time, end_time, summary) tuples for each scene.

        Returns:
            Paragraph describing the entire video narrative.
        """
        if not scene_summaries:
            return "Empty video with no scenes."

        scenes_text = "\n".join(
            [f"- Scene at {start:.2f}s-{end:.2f}s: {summary}" for start, end, summary in scene_summaries]
        )

        prompt = f"""You are analyzing a video. Below are summaries of different scenes in the video:

{scenes_text}

Task: Write a coherent paragraph (3-5 sentences) summarizing the entire video. Focus on:
- Overall narrative or theme
- Main events in chronological order
- Key subjects or topics covered
- The progression from beginning to end

Synthesize the scenes into a high-level overview that captures the video's essence.

Summary:"""

        try:
            return await self._generate(prompt)
        except Exception:
            # Fallback: return concatenated scene summaries
            return " ".join([summary for _, _, summary in scene_summaries])

    async def summarize_scene_description(self, scene_description: SceneDescription) -> str:
        """Generate summary from a SceneDescription object.

        Args:
            scene_description: SceneDescription object with frame descriptions.

        Returns:
            Coherent summary of the scene.
        """
        frame_descriptions = [(fd.timestamp, fd.description) for fd in scene_description.frame_descriptions]
        return await self.summarize_scene(frame_descriptions)

    async def summarize_video_description(self, video_description: VideoDescription) -> str:
        """Generate summary from a VideoDescription object.

        Args:
            video_description: VideoDescription object with scene descriptions.

        Returns:
            High-level summary of the entire video.
        """
        scene_summaries = [
            (sd.start, sd.end, sd.get_description_summary()) for sd in video_description.scene_descriptions
        ]
        return await self.summarize_video(scene_summaries)

__init__

__init__(
    backend: LLMBackend | None = None,
    model: str | None = None,
    api_key: str | None = None,
    timeout: float = 30.0,
)

Initialize the LLM summarizer.

Parameters:

Name Type Description Default
backend LLMBackend | None

Backend to use. If None, uses config default or 'local'.

None
model str | None

Model name (backend-specific). If None, uses default per backend.

None
api_key str | None

API key for cloud backends. If None, reads from environment.

None
timeout float

Request timeout in seconds.

30.0
Source code in src/videopython/ai/understanding/text.py
def __init__(
    self,
    backend: LLMBackend | None = None,
    model: str | None = None,
    api_key: str | None = None,
    timeout: float = 30.0,
):
    """Initialize the LLM summarizer.

    Args:
        backend: Backend to use. If None, uses config default or 'local'.
        model: Model name (backend-specific). If None, uses default per backend.
        api_key: API key for cloud backends. If None, reads from environment.
        timeout: Request timeout in seconds.
    """
    resolved_backend: str = backend if backend is not None else get_default_backend("llm_summarizer")
    if resolved_backend not in self.SUPPORTED_BACKENDS:
        raise UnsupportedBackendError(resolved_backend, self.SUPPORTED_BACKENDS)

    self.backend: LLMBackend = resolved_backend  # type: ignore[assignment]
    self.model = model
    self.api_key = api_key
    self.timeout = timeout

summarize_scene async

summarize_scene(
    frame_descriptions: list[tuple[float, str]],
) -> str

Generate a coherent summary of a scene from frame descriptions.

Parameters:

Name Type Description Default
frame_descriptions list[tuple[float, str]]

List of (timestamp, description) tuples for frames.

required

Returns:

Type Description
str

2-3 sentence coherent summary of the scene.

Source code in src/videopython/ai/understanding/text.py
    async def summarize_scene(self, frame_descriptions: list[tuple[float, str]]) -> str:
        """Generate a coherent summary of a scene from frame descriptions.

        Args:
            frame_descriptions: List of (timestamp, description) tuples for frames.

        Returns:
            2-3 sentence coherent summary of the scene.
        """
        if not frame_descriptions:
            return "Empty scene with no frames."

        frames_text = "\n".join([f"- At {ts:.2f}s: {desc}" for ts, desc in frame_descriptions])

        prompt = f"""You are analyzing a video scene. Below are descriptions of individual frames sampled from \
this scene:

{frames_text}

Task: Write a coherent 2-3 sentence summary of what happens in this scene. Focus on:
- Main actions or events
- Key objects or people present
- Any changes or progression within the scene

Remove redundancy and synthesize the information into a flowing narrative. Be concise and specific.

Summary:"""

        try:
            return await self._generate(prompt)
        except Exception:
            # Fallback: return concatenated descriptions
            return " ".join([desc for _, desc in frame_descriptions])

summarize_video async

summarize_video(
    scene_summaries: list[tuple[float, float, str]],
) -> str

Generate a high-level summary of the entire video from scene summaries.

Parameters:

Name Type Description Default
scene_summaries list[tuple[float, float, str]]

List of (start_time, end_time, summary) tuples for each scene.

required

Returns:

Type Description
str

Paragraph describing the entire video narrative.

Source code in src/videopython/ai/understanding/text.py
    async def summarize_video(self, scene_summaries: list[tuple[float, float, str]]) -> str:
        """Generate a high-level summary of the entire video from scene summaries.

        Args:
            scene_summaries: List of (start_time, end_time, summary) tuples for each scene.

        Returns:
            Paragraph describing the entire video narrative.
        """
        if not scene_summaries:
            return "Empty video with no scenes."

        scenes_text = "\n".join(
            [f"- Scene at {start:.2f}s-{end:.2f}s: {summary}" for start, end, summary in scene_summaries]
        )

        prompt = f"""You are analyzing a video. Below are summaries of different scenes in the video:

{scenes_text}

Task: Write a coherent paragraph (3-5 sentences) summarizing the entire video. Focus on:
- Overall narrative or theme
- Main events in chronological order
- Key subjects or topics covered
- The progression from beginning to end

Synthesize the scenes into a high-level overview that captures the video's essence.

Summary:"""

        try:
            return await self._generate(prompt)
        except Exception:
            # Fallback: return concatenated scene summaries
            return " ".join([summary for _, _, summary in scene_summaries])

summarize_scene_description async

summarize_scene_description(
    scene_description: SceneDescription,
) -> str

Generate summary from a SceneDescription object.

Parameters:

Name Type Description Default
scene_description SceneDescription

SceneDescription object with frame descriptions.

required

Returns:

Type Description
str

Coherent summary of the scene.

Source code in src/videopython/ai/understanding/text.py
async def summarize_scene_description(self, scene_description: SceneDescription) -> str:
    """Generate summary from a SceneDescription object.

    Args:
        scene_description: SceneDescription object with frame descriptions.

    Returns:
        Coherent summary of the scene.
    """
    frame_descriptions = [(fd.timestamp, fd.description) for fd in scene_description.frame_descriptions]
    return await self.summarize_scene(frame_descriptions)

summarize_video_description async

summarize_video_description(
    video_description: VideoDescription,
) -> str

Generate summary from a VideoDescription object.

Parameters:

Name Type Description Default
video_description VideoDescription

VideoDescription object with scene descriptions.

required

Returns:

Type Description
str

High-level summary of the entire video.

Source code in src/videopython/ai/understanding/text.py
async def summarize_video_description(self, video_description: VideoDescription) -> str:
    """Generate summary from a VideoDescription object.

    Args:
        video_description: VideoDescription object with scene descriptions.

    Returns:
        High-level summary of the entire video.
    """
    scene_summaries = [
        (sd.start, sd.end, sd.get_description_summary()) for sd in video_description.scene_descriptions
    ]
    return await self.summarize_video(scene_summaries)

SceneDetector

SceneDetector

Detects scene changes in videos using histogram comparison.

Scene changes are detected by comparing the color histograms of consecutive frames. When the histogram difference exceeds a threshold, a scene boundary is detected.

Source code in src/videopython/ai/understanding/video.py
class SceneDetector:
    """Detects scene changes in videos using histogram comparison.

    Scene changes are detected by comparing the color histograms of consecutive frames.
    When the histogram difference exceeds a threshold, a scene boundary is detected.
    """

    def __init__(self, threshold: float = 0.3, min_scene_length: float = 0.5):
        """Initialize the scene detector.

        Args:
            threshold: Sensitivity for scene change detection (0.0 to 1.0).
                      Lower values detect more scene changes. Default: 0.3
            min_scene_length: Minimum scene duration in seconds. Scenes shorter than
                            this will be merged with adjacent scenes. Default: 0.5
        """
        if not 0.0 <= threshold <= 1.0:
            raise ValueError("threshold must be between 0.0 and 1.0")
        if min_scene_length < 0:
            raise ValueError("min_scene_length must be non-negative")

        self.threshold = threshold
        self.min_scene_length = min_scene_length
        self.color_analyzer = ColorAnalyzer()

    def _calculate_histogram_difference(self, frame1: np.ndarray, frame2: np.ndarray) -> float:
        """Calculate histogram difference between two frames.

        Args:
            frame1: First frame (H, W, 3) in RGB format
            frame2: Second frame (H, W, 3) in RGB format

        Returns:
            Difference score between 0.0 (identical) and 1.0 (completely different)
        """
        return self.color_analyzer.calculate_histogram_difference(frame1, frame2)

    def detect(self, video: Video) -> list[SceneDescription]:
        """Detect scenes in a video.

        Args:
            video: Video object to analyze

        Returns:
            List of SceneDescription objects representing detected scenes, ordered by time.
            Frame descriptions are not populated - use VideoAnalyzer for full analysis.
        """
        if len(video.frames) == 0:
            return []

        if len(video.frames) == 1:
            # Single frame video is one scene
            return [SceneDescription(start=0.0, end=video.total_seconds, start_frame=0, end_frame=1)]

        # Calculate frame differences
        scene_boundaries = [0]  # First frame is always a scene start

        for i in range(1, len(video.frames)):
            difference = self._calculate_histogram_difference(video.frames[i - 1], video.frames[i])

            if difference > self.threshold:
                scene_boundaries.append(i)

        # Last frame index (exclusive)
        scene_boundaries.append(len(video.frames))

        # Create SceneDescription objects
        scenes = []
        for i in range(len(scene_boundaries) - 1):
            start_frame = scene_boundaries[i]
            end_frame = scene_boundaries[i + 1]

            start_time = start_frame / video.fps
            end_time = end_frame / video.fps

            scenes.append(
                SceneDescription(
                    start=start_time,
                    end=end_time,
                    start_frame=start_frame,
                    end_frame=end_frame,
                )
            )

        # Merge scenes that are too short
        if self.min_scene_length > 0:
            scenes = self._merge_short_scenes(scenes)

        return scenes

    def _merge_short_scenes(self, scenes: list[SceneDescription]) -> list[SceneDescription]:
        """Merge scenes that are shorter than min_scene_length.

        Args:
            scenes: List of scenes to process

        Returns:
            List of scenes with short scenes merged into adjacent ones
        """
        if not scenes:
            return scenes

        merged = [scenes[0]]

        for scene in scenes[1:]:
            last_scene = merged[-1]

            # If the last scene is too short, merge it with current scene
            if last_scene.duration < self.min_scene_length:
                # Merge by extending the previous scene to include this one
                merged[-1] = SceneDescription(
                    start=last_scene.start,
                    end=scene.end,
                    start_frame=last_scene.start_frame,
                    end_frame=scene.end_frame,
                )
            else:
                merged.append(scene)

        # Handle edge case: if the final scene is too short, merge it backward
        if len(merged) > 1 and merged[-1].duration < self.min_scene_length:
            second_last = merged[-2]
            last = merged[-1]
            merged[-2] = SceneDescription(
                start=second_last.start,
                end=last.end,
                start_frame=second_last.start_frame,
                end_frame=last.end_frame,
            )
            merged.pop()

        return merged

__init__

__init__(
    threshold: float = 0.3, min_scene_length: float = 0.5
)

Initialize the scene detector.

Parameters:

Name Type Description Default
threshold float

Sensitivity for scene change detection (0.0 to 1.0). Lower values detect more scene changes. Default: 0.3

0.3
min_scene_length float

Minimum scene duration in seconds. Scenes shorter than this will be merged with adjacent scenes. Default: 0.5

0.5
Source code in src/videopython/ai/understanding/video.py
def __init__(self, threshold: float = 0.3, min_scene_length: float = 0.5):
    """Initialize the scene detector.

    Args:
        threshold: Sensitivity for scene change detection (0.0 to 1.0).
                  Lower values detect more scene changes. Default: 0.3
        min_scene_length: Minimum scene duration in seconds. Scenes shorter than
                        this will be merged with adjacent scenes. Default: 0.5
    """
    if not 0.0 <= threshold <= 1.0:
        raise ValueError("threshold must be between 0.0 and 1.0")
    if min_scene_length < 0:
        raise ValueError("min_scene_length must be non-negative")

    self.threshold = threshold
    self.min_scene_length = min_scene_length
    self.color_analyzer = ColorAnalyzer()

detect

detect(video: Video) -> list[SceneDescription]

Detect scenes in a video.

Parameters:

Name Type Description Default
video Video

Video object to analyze

required

Returns:

Type Description
list[SceneDescription]

List of SceneDescription objects representing detected scenes, ordered by time.

list[SceneDescription]

Frame descriptions are not populated - use VideoAnalyzer for full analysis.

Source code in src/videopython/ai/understanding/video.py
def detect(self, video: Video) -> list[SceneDescription]:
    """Detect scenes in a video.

    Args:
        video: Video object to analyze

    Returns:
        List of SceneDescription objects representing detected scenes, ordered by time.
        Frame descriptions are not populated - use VideoAnalyzer for full analysis.
    """
    if len(video.frames) == 0:
        return []

    if len(video.frames) == 1:
        # Single frame video is one scene
        return [SceneDescription(start=0.0, end=video.total_seconds, start_frame=0, end_frame=1)]

    # Calculate frame differences
    scene_boundaries = [0]  # First frame is always a scene start

    for i in range(1, len(video.frames)):
        difference = self._calculate_histogram_difference(video.frames[i - 1], video.frames[i])

        if difference > self.threshold:
            scene_boundaries.append(i)

    # Last frame index (exclusive)
    scene_boundaries.append(len(video.frames))

    # Create SceneDescription objects
    scenes = []
    for i in range(len(scene_boundaries) - 1):
        start_frame = scene_boundaries[i]
        end_frame = scene_boundaries[i + 1]

        start_time = start_frame / video.fps
        end_time = end_frame / video.fps

        scenes.append(
            SceneDescription(
                start=start_time,
                end=end_time,
                start_frame=start_frame,
                end_frame=end_frame,
            )
        )

    # Merge scenes that are too short
    if self.min_scene_length > 0:
        scenes = self._merge_short_scenes(scenes)

    return scenes

VideoAnalyzer

VideoAnalyzer

Comprehensive video analysis combining scene detection, frame understanding, and transcription.

Source code in src/videopython/ai/understanding/video.py
class VideoAnalyzer:
    """Comprehensive video analysis combining scene detection, frame understanding, and transcription."""

    def __init__(
        self,
        scene_threshold: float = 0.3,
        min_scene_length: float = 0.5,
        device: str | None = None,
        detection_backend: ImageToTextBackend | None = None,
        api_key: str | None = None,
    ):
        """Initialize the video analyzer.

        Args:
            scene_threshold: Threshold for scene change detection (0.0-1.0)
            min_scene_length: Minimum scene duration in seconds
            device: Device for ImageToText model ('cuda', 'cpu', or None for auto)
            detection_backend: Backend for object/text detection ('local', 'openai', 'gemini')
            api_key: API key for cloud backends
        """
        self.scene_detector = SceneDetector(threshold=scene_threshold, min_scene_length=min_scene_length)
        self.image_to_text = ImageToText(device=device)
        self.detection_backend = detection_backend
        self.api_key = api_key

    async def analyze(
        self,
        video: Video,
        frames_per_second: float = 1.0,
        transcribe: bool = False,
        transcription_model: Literal["tiny", "base", "small", "medium", "large", "turbo"] = "base",
        description_prompt: str | None = None,
        extract_colors: bool = False,
        include_full_histogram: bool = False,
        detect_objects: bool = False,
        detect_faces: bool = False,
        detect_text: bool = False,
        detect_shot_type: bool = False,
        generate_summaries: bool = False,
    ) -> VideoDescription:
        """Perform comprehensive video analysis.

        Args:
            video: Video object to analyze
            frames_per_second: Frame sampling rate for visual analysis (default: 1.0 fps)
            transcribe: Whether to generate audio transcription (default: False)
            transcription_model: Whisper model to use if transcribe=True (default: "base")
            description_prompt: Optional prompt to guide frame descriptions
            extract_colors: Whether to extract color features from frames (default: False)
            include_full_histogram: Whether to include full HSV histogram in color features (default: False)
            detect_objects: Whether to detect objects in frames (default: False)
            detect_faces: Whether to detect faces in frames (default: False)
            detect_text: Whether to detect text (OCR) in frames (default: False)
            detect_shot_type: Whether to classify shot type (cloud backends only) (default: False)
            generate_summaries: Whether to generate LLM summaries for scenes (default: False)

        Returns:
            VideoDescription object with complete analysis
        """
        # Step 1: Detect scenes (returns SceneDescription objects with timing only)
        scene_descriptions = self.scene_detector.detect(video)

        # Step 2: Set up frame analyzer if any detection is enabled
        frame_analyzer = None
        if detect_objects or detect_faces or detect_text or detect_shot_type:
            frame_analyzer = FrameAnalyzer(
                backend=self.detection_backend,
                api_key=self.api_key,
                object_detection=detect_objects,
                face_detection=detect_faces,
                text_detection=detect_text,
                shot_type_detection=detect_shot_type,
            )

        # Step 3: Analyze frames from each scene and populate frame_descriptions
        for scene_desc in scene_descriptions:
            frame_descriptions = await self.image_to_text.describe_scene(
                video,
                scene_desc,
                frames_per_second=frames_per_second,
                prompt=description_prompt,
                extract_colors=extract_colors,
                include_full_histogram=include_full_histogram,
            )

            # Run detection on each frame if enabled
            if frame_analyzer:
                for fd in frame_descriptions:
                    frame = video.frames[fd.frame_index]
                    await frame_analyzer.analyze_frame(frame, fd)

            scene_desc.frame_descriptions = frame_descriptions

            # Populate scene-level aggregations
            if detect_objects:
                scene_desc.detected_entities = _aggregate_detected_entities(frame_descriptions)

            if extract_colors:
                scene_desc.dominant_colors = _aggregate_dominant_colors(frame_descriptions)

        # Step 4: Optional transcription
        transcription = None
        if transcribe:
            from videopython.ai.understanding.audio import AudioToText

            transcriber = AudioToText(model_name=transcription_model)
            transcription = await transcriber.transcribe(video)

        # Create VideoDescription and distribute transcription to scenes
        video_description = VideoDescription(scene_descriptions=scene_descriptions, transcription=transcription)
        if transcription:
            video_description.distribute_transcription()

        # Step 5: Generate summaries if requested
        if generate_summaries:
            from videopython.ai.understanding.text import LLMSummarizer

            summarizer = LLMSummarizer(backend=self.detection_backend, api_key=self.api_key)
            for scene_desc in scene_descriptions:
                scene_desc.summary = await summarizer.summarize_scene_description(scene_desc)

        return video_description

    async def analyze_scenes_only(self, video: Video) -> list[SceneDescription]:
        """Analyze video scenes without transcription (convenience method).

        Args:
            video: Video object to analyze

        Returns:
            List of SceneDescription objects
        """
        understanding = await self.analyze(video, transcribe=False)
        return understanding.scene_descriptions

__init__

__init__(
    scene_threshold: float = 0.3,
    min_scene_length: float = 0.5,
    device: str | None = None,
    detection_backend: ImageToTextBackend | None = None,
    api_key: str | None = None,
)

Initialize the video analyzer.

Parameters:

Name Type Description Default
scene_threshold float

Threshold for scene change detection (0.0-1.0)

0.3
min_scene_length float

Minimum scene duration in seconds

0.5
device str | None

Device for ImageToText model ('cuda', 'cpu', or None for auto)

None
detection_backend ImageToTextBackend | None

Backend for object/text detection ('local', 'openai', 'gemini')

None
api_key str | None

API key for cloud backends

None
Source code in src/videopython/ai/understanding/video.py
def __init__(
    self,
    scene_threshold: float = 0.3,
    min_scene_length: float = 0.5,
    device: str | None = None,
    detection_backend: ImageToTextBackend | None = None,
    api_key: str | None = None,
):
    """Initialize the video analyzer.

    Args:
        scene_threshold: Threshold for scene change detection (0.0-1.0)
        min_scene_length: Minimum scene duration in seconds
        device: Device for ImageToText model ('cuda', 'cpu', or None for auto)
        detection_backend: Backend for object/text detection ('local', 'openai', 'gemini')
        api_key: API key for cloud backends
    """
    self.scene_detector = SceneDetector(threshold=scene_threshold, min_scene_length=min_scene_length)
    self.image_to_text = ImageToText(device=device)
    self.detection_backend = detection_backend
    self.api_key = api_key

analyze async

analyze(
    video: Video,
    frames_per_second: float = 1.0,
    transcribe: bool = False,
    transcription_model: Literal[
        "tiny", "base", "small", "medium", "large", "turbo"
    ] = "base",
    description_prompt: str | None = None,
    extract_colors: bool = False,
    include_full_histogram: bool = False,
    detect_objects: bool = False,
    detect_faces: bool = False,
    detect_text: bool = False,
    detect_shot_type: bool = False,
    generate_summaries: bool = False,
) -> VideoDescription

Perform comprehensive video analysis.

Parameters:

Name Type Description Default
video Video

Video object to analyze

required
frames_per_second float

Frame sampling rate for visual analysis (default: 1.0 fps)

1.0
transcribe bool

Whether to generate audio transcription (default: False)

False
transcription_model Literal['tiny', 'base', 'small', 'medium', 'large', 'turbo']

Whisper model to use if transcribe=True (default: "base")

'base'
description_prompt str | None

Optional prompt to guide frame descriptions

None
extract_colors bool

Whether to extract color features from frames (default: False)

False
include_full_histogram bool

Whether to include full HSV histogram in color features (default: False)

False
detect_objects bool

Whether to detect objects in frames (default: False)

False
detect_faces bool

Whether to detect faces in frames (default: False)

False
detect_text bool

Whether to detect text (OCR) in frames (default: False)

False
detect_shot_type bool

Whether to classify shot type (cloud backends only) (default: False)

False
generate_summaries bool

Whether to generate LLM summaries for scenes (default: False)

False

Returns:

Type Description
VideoDescription

VideoDescription object with complete analysis

Source code in src/videopython/ai/understanding/video.py
async def analyze(
    self,
    video: Video,
    frames_per_second: float = 1.0,
    transcribe: bool = False,
    transcription_model: Literal["tiny", "base", "small", "medium", "large", "turbo"] = "base",
    description_prompt: str | None = None,
    extract_colors: bool = False,
    include_full_histogram: bool = False,
    detect_objects: bool = False,
    detect_faces: bool = False,
    detect_text: bool = False,
    detect_shot_type: bool = False,
    generate_summaries: bool = False,
) -> VideoDescription:
    """Perform comprehensive video analysis.

    Args:
        video: Video object to analyze
        frames_per_second: Frame sampling rate for visual analysis (default: 1.0 fps)
        transcribe: Whether to generate audio transcription (default: False)
        transcription_model: Whisper model to use if transcribe=True (default: "base")
        description_prompt: Optional prompt to guide frame descriptions
        extract_colors: Whether to extract color features from frames (default: False)
        include_full_histogram: Whether to include full HSV histogram in color features (default: False)
        detect_objects: Whether to detect objects in frames (default: False)
        detect_faces: Whether to detect faces in frames (default: False)
        detect_text: Whether to detect text (OCR) in frames (default: False)
        detect_shot_type: Whether to classify shot type (cloud backends only) (default: False)
        generate_summaries: Whether to generate LLM summaries for scenes (default: False)

    Returns:
        VideoDescription object with complete analysis
    """
    # Step 1: Detect scenes (returns SceneDescription objects with timing only)
    scene_descriptions = self.scene_detector.detect(video)

    # Step 2: Set up frame analyzer if any detection is enabled
    frame_analyzer = None
    if detect_objects or detect_faces or detect_text or detect_shot_type:
        frame_analyzer = FrameAnalyzer(
            backend=self.detection_backend,
            api_key=self.api_key,
            object_detection=detect_objects,
            face_detection=detect_faces,
            text_detection=detect_text,
            shot_type_detection=detect_shot_type,
        )

    # Step 3: Analyze frames from each scene and populate frame_descriptions
    for scene_desc in scene_descriptions:
        frame_descriptions = await self.image_to_text.describe_scene(
            video,
            scene_desc,
            frames_per_second=frames_per_second,
            prompt=description_prompt,
            extract_colors=extract_colors,
            include_full_histogram=include_full_histogram,
        )

        # Run detection on each frame if enabled
        if frame_analyzer:
            for fd in frame_descriptions:
                frame = video.frames[fd.frame_index]
                await frame_analyzer.analyze_frame(frame, fd)

        scene_desc.frame_descriptions = frame_descriptions

        # Populate scene-level aggregations
        if detect_objects:
            scene_desc.detected_entities = _aggregate_detected_entities(frame_descriptions)

        if extract_colors:
            scene_desc.dominant_colors = _aggregate_dominant_colors(frame_descriptions)

    # Step 4: Optional transcription
    transcription = None
    if transcribe:
        from videopython.ai.understanding.audio import AudioToText

        transcriber = AudioToText(model_name=transcription_model)
        transcription = await transcriber.transcribe(video)

    # Create VideoDescription and distribute transcription to scenes
    video_description = VideoDescription(scene_descriptions=scene_descriptions, transcription=transcription)
    if transcription:
        video_description.distribute_transcription()

    # Step 5: Generate summaries if requested
    if generate_summaries:
        from videopython.ai.understanding.text import LLMSummarizer

        summarizer = LLMSummarizer(backend=self.detection_backend, api_key=self.api_key)
        for scene_desc in scene_descriptions:
            scene_desc.summary = await summarizer.summarize_scene_description(scene_desc)

    return video_description

analyze_scenes_only async

analyze_scenes_only(video: Video) -> list[SceneDescription]

Analyze video scenes without transcription (convenience method).

Parameters:

Name Type Description Default
video Video

Video object to analyze

required

Returns:

Type Description
list[SceneDescription]

List of SceneDescription objects

Source code in src/videopython/ai/understanding/video.py
async def analyze_scenes_only(self, video: Video) -> list[SceneDescription]:
    """Analyze video scenes without transcription (convenience method).

    Args:
        video: Video object to analyze

    Returns:
        List of SceneDescription objects
    """
    understanding = await self.analyze(video, transcribe=False)
    return understanding.scene_descriptions

Detection Classes

ObjectDetector

ObjectDetector

Detects objects in images using YOLO (local) or vision LLMs (cloud).

Source code in src/videopython/ai/understanding/detection.py
class ObjectDetector:
    """Detects objects in images using YOLO (local) or vision LLMs (cloud)."""

    SUPPORTED_BACKENDS: list[str] = ["local", "openai", "gemini"]

    def __init__(
        self,
        backend: ImageToTextBackend | None = None,
        model_size: str = "n",
        confidence_threshold: float = 0.25,
        api_key: str | None = None,
    ):
        """Initialize object detector.

        Args:
            backend: Backend to use ('local' for YOLO, 'openai'/'gemini' for vision LLMs).
            model_size: YOLO model size for local backend ('n', 's', 'm', 'l', 'x').
            confidence_threshold: Minimum confidence for detections (0-1).
            api_key: API key for cloud backends.
        """
        resolved_backend: str = backend if backend is not None else get_default_backend("image_to_text")
        if resolved_backend not in self.SUPPORTED_BACKENDS:
            raise UnsupportedBackendError(resolved_backend, self.SUPPORTED_BACKENDS)

        self.backend: ImageToTextBackend = resolved_backend  # type: ignore[assignment]
        self.model_size = model_size
        self.confidence_threshold = confidence_threshold
        self.api_key = api_key
        self._model: Any = None

    def _init_yolo(self) -> None:
        """Initialize YOLO model."""
        from ultralytics import YOLO

        self._model = YOLO(f"yolo11{self.model_size}.pt")

    def _image_to_base64(self, image: Image.Image) -> str:
        """Convert PIL Image to base64 string."""
        buffer = io.BytesIO()
        image.save(buffer, format="PNG")
        return base64.b64encode(buffer.getvalue()).decode()

    async def _detect_local(self, image: np.ndarray | Image.Image) -> list[DetectedObject]:
        """Detect objects using YOLO."""
        if self._model is None:
            await asyncio.to_thread(self._init_yolo)

        def _run_detection() -> list[DetectedObject]:
            # Convert PIL to numpy if needed
            if isinstance(image, Image.Image):
                img_array = np.array(image)
            else:
                img_array = image

            results = self._model(img_array, conf=self.confidence_threshold, verbose=False)
            detected_objects = []

            for result in results:
                boxes = result.boxes
                if boxes is None:
                    continue

                img_h, img_w = result.orig_shape

                for i in range(len(boxes)):
                    # Get box coordinates (xyxy format)
                    x1, y1, x2, y2 = boxes.xyxy[i].tolist()
                    conf = float(boxes.conf[i])
                    cls_id = int(boxes.cls[i])
                    label = self._model.names[cls_id]

                    # Normalize coordinates to [0, 1]
                    bbox = BoundingBox(
                        x=x1 / img_w,
                        y=y1 / img_h,
                        width=(x2 - x1) / img_w,
                        height=(y2 - y1) / img_h,
                    )

                    detected_objects.append(
                        DetectedObject(
                            label=label,
                            confidence=conf,
                            bounding_box=bbox,
                        )
                    )

            return detected_objects

        return await asyncio.to_thread(_run_detection)

    async def _detect_openai(self, image: np.ndarray | Image.Image) -> list[DetectedObject]:
        """Detect objects using OpenAI GPT-4o."""
        from openai import AsyncOpenAI

        api_key = get_api_key("openai", self.api_key)
        client = AsyncOpenAI(api_key=api_key)

        if isinstance(image, np.ndarray):
            image = Image.fromarray(image)

        image_base64 = self._image_to_base64(image)

        prompt = """Analyze this image and detect all visible objects.
Return a JSON array of objects with this exact format:
[{"label": "object name", "confidence": 0.95, "bbox": {"x": 0.1, "y": 0.2, "width": 0.3, "height": 0.4}}]

Where bbox coordinates are normalized (0-1) relative to image dimensions:
- x: left edge of bounding box
- y: top edge of bounding box
- width: width of box
- height: height of box

Only include objects you're confident about. Return empty array [] if no objects detected.
Return ONLY the JSON array, no other text."""

        response = await client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {
                    "role": "user",
                    "content": [
                        {"type": "text", "text": prompt},
                        {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{image_base64}"}},
                    ],
                }
            ],
            max_tokens=1000,
        )

        return self._parse_detection_response(response.choices[0].message.content or "[]")

    async def _detect_gemini(self, image: np.ndarray | Image.Image) -> list[DetectedObject]:
        """Detect objects using Google Gemini."""
        import google.generativeai as genai

        api_key = get_api_key("gemini", self.api_key)
        genai.configure(api_key=api_key)

        if isinstance(image, np.ndarray):
            image = Image.fromarray(image)

        model = genai.GenerativeModel("gemini-2.0-flash")

        prompt = """Analyze this image and detect all visible objects.
Return a JSON array of objects with this exact format:
[{"label": "object name", "confidence": 0.95, "bbox": {"x": 0.1, "y": 0.2, "width": 0.3, "height": 0.4}}]

Where bbox coordinates are normalized (0-1) relative to image dimensions:
- x: left edge of bounding box
- y: top edge of bounding box
- width: width of box
- height: height of box

Only include objects you're confident about. Return empty array [] if no objects detected.
Return ONLY the JSON array, no other text."""

        def _run_gemini() -> str:
            response = model.generate_content([prompt, image])
            return response.text

        response_text = await asyncio.to_thread(_run_gemini)
        return self._parse_detection_response(response_text)

    def _parse_detection_response(self, response: str) -> list[DetectedObject]:
        """Parse JSON response from cloud backends into DetectedObject list."""
        try:
            # Clean up response - remove markdown code blocks if present
            response = response.strip()
            if response.startswith("```"):
                lines = response.split("\n")
                response = "\n".join(lines[1:-1] if lines[-1] == "```" else lines[1:])
            response = response.strip()

            data = json.loads(response)
            detected_objects = []

            for obj in data:
                bbox = None
                if "bbox" in obj and obj["bbox"]:
                    bbox = BoundingBox(
                        x=float(obj["bbox"]["x"]),
                        y=float(obj["bbox"]["y"]),
                        width=float(obj["bbox"]["width"]),
                        height=float(obj["bbox"]["height"]),
                    )

                detected_objects.append(
                    DetectedObject(
                        label=obj["label"],
                        confidence=float(obj.get("confidence", 0.8)),
                        bounding_box=bbox,
                    )
                )

            return detected_objects
        except (json.JSONDecodeError, KeyError, TypeError):
            return []

    async def detect(self, image: np.ndarray | Image.Image) -> list[DetectedObject]:
        """Detect objects in an image.

        Args:
            image: Image as numpy array (H, W, 3) in RGB format or PIL Image.

        Returns:
            List of DetectedObject instances.
        """
        if self.backend == "local":
            return await self._detect_local(image)
        elif self.backend == "openai":
            return await self._detect_openai(image)
        elif self.backend == "gemini":
            return await self._detect_gemini(image)
        else:
            raise UnsupportedBackendError(self.backend, self.SUPPORTED_BACKENDS)

__init__

__init__(
    backend: ImageToTextBackend | None = None,
    model_size: str = "n",
    confidence_threshold: float = 0.25,
    api_key: str | None = None,
)

Initialize object detector.

Parameters:

Name Type Description Default
backend ImageToTextBackend | None

Backend to use ('local' for YOLO, 'openai'/'gemini' for vision LLMs).

None
model_size str

YOLO model size for local backend ('n', 's', 'm', 'l', 'x').

'n'
confidence_threshold float

Minimum confidence for detections (0-1).

0.25
api_key str | None

API key for cloud backends.

None
Source code in src/videopython/ai/understanding/detection.py
def __init__(
    self,
    backend: ImageToTextBackend | None = None,
    model_size: str = "n",
    confidence_threshold: float = 0.25,
    api_key: str | None = None,
):
    """Initialize object detector.

    Args:
        backend: Backend to use ('local' for YOLO, 'openai'/'gemini' for vision LLMs).
        model_size: YOLO model size for local backend ('n', 's', 'm', 'l', 'x').
        confidence_threshold: Minimum confidence for detections (0-1).
        api_key: API key for cloud backends.
    """
    resolved_backend: str = backend if backend is not None else get_default_backend("image_to_text")
    if resolved_backend not in self.SUPPORTED_BACKENDS:
        raise UnsupportedBackendError(resolved_backend, self.SUPPORTED_BACKENDS)

    self.backend: ImageToTextBackend = resolved_backend  # type: ignore[assignment]
    self.model_size = model_size
    self.confidence_threshold = confidence_threshold
    self.api_key = api_key
    self._model: Any = None

detect async

detect(image: ndarray | Image) -> list[DetectedObject]

Detect objects in an image.

Parameters:

Name Type Description Default
image ndarray | Image

Image as numpy array (H, W, 3) in RGB format or PIL Image.

required

Returns:

Type Description
list[DetectedObject]

List of DetectedObject instances.

Source code in src/videopython/ai/understanding/detection.py
async def detect(self, image: np.ndarray | Image.Image) -> list[DetectedObject]:
    """Detect objects in an image.

    Args:
        image: Image as numpy array (H, W, 3) in RGB format or PIL Image.

    Returns:
        List of DetectedObject instances.
    """
    if self.backend == "local":
        return await self._detect_local(image)
    elif self.backend == "openai":
        return await self._detect_openai(image)
    elif self.backend == "gemini":
        return await self._detect_gemini(image)
    else:
        raise UnsupportedBackendError(self.backend, self.SUPPORTED_BACKENDS)

FaceDetector

FaceDetector

Detects faces in images using OpenCV DNN.

Source code in src/videopython/ai/understanding/detection.py
class FaceDetector:
    """Detects faces in images using OpenCV DNN."""

    def __init__(self, confidence_threshold: float = 0.5):
        """Initialize face detector.

        Args:
            confidence_threshold: Minimum confidence for detections (0-1).
        """
        self.confidence_threshold = confidence_threshold
        self._net: Any = None
        self._model_loaded = False

    def _init_model(self) -> None:
        """Initialize OpenCV DNN face detector."""
        import cv2

        # Use OpenCV's built-in DNN face detector
        self._net = cv2.dnn.readNetFromCaffe(
            cv2.data.haarcascades + "/../deploy.prototxt",
            cv2.data.haarcascades + "/../res10_300x300_ssd_iter_140000.caffemodel",
        )
        self._model_loaded = True

    def _init_cascade(self) -> None:
        """Initialize OpenCV Haar cascade as fallback."""
        import cv2

        self._cascade = cv2.CascadeClassifier(cv2.data.haarcascades + "haarcascade_frontalface_default.xml")
        self._model_loaded = True

    async def detect(self, image: np.ndarray | Image.Image) -> int:
        """Detect faces in an image.

        Args:
            image: Image as numpy array (H, W, 3) in RGB format or PIL Image.

        Returns:
            Number of faces detected.
        """

        def _run_detection() -> int:
            import cv2

            # Convert PIL to numpy if needed
            if isinstance(image, Image.Image):
                img_array = np.array(image)
            else:
                img_array = image

            # Convert RGB to BGR for OpenCV
            if len(img_array.shape) == 3 and img_array.shape[2] == 3:
                img_bgr = cv2.cvtColor(img_array, cv2.COLOR_RGB2BGR)
            else:
                img_bgr = img_array

            # Use Haar cascade (simpler, more reliable)
            if not self._model_loaded:
                self._init_cascade()

            gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY)
            faces = self._cascade.detectMultiScale(
                gray,
                scaleFactor=1.1,
                minNeighbors=5,
                minSize=(30, 30),
            )

            return len(faces)

        return await asyncio.to_thread(_run_detection)

__init__

__init__(confidence_threshold: float = 0.5)

Initialize face detector.

Parameters:

Name Type Description Default
confidence_threshold float

Minimum confidence for detections (0-1).

0.5
Source code in src/videopython/ai/understanding/detection.py
def __init__(self, confidence_threshold: float = 0.5):
    """Initialize face detector.

    Args:
        confidence_threshold: Minimum confidence for detections (0-1).
    """
    self.confidence_threshold = confidence_threshold
    self._net: Any = None
    self._model_loaded = False

detect async

detect(image: ndarray | Image) -> int

Detect faces in an image.

Parameters:

Name Type Description Default
image ndarray | Image

Image as numpy array (H, W, 3) in RGB format or PIL Image.

required

Returns:

Type Description
int

Number of faces detected.

Source code in src/videopython/ai/understanding/detection.py
async def detect(self, image: np.ndarray | Image.Image) -> int:
    """Detect faces in an image.

    Args:
        image: Image as numpy array (H, W, 3) in RGB format or PIL Image.

    Returns:
        Number of faces detected.
    """

    def _run_detection() -> int:
        import cv2

        # Convert PIL to numpy if needed
        if isinstance(image, Image.Image):
            img_array = np.array(image)
        else:
            img_array = image

        # Convert RGB to BGR for OpenCV
        if len(img_array.shape) == 3 and img_array.shape[2] == 3:
            img_bgr = cv2.cvtColor(img_array, cv2.COLOR_RGB2BGR)
        else:
            img_bgr = img_array

        # Use Haar cascade (simpler, more reliable)
        if not self._model_loaded:
            self._init_cascade()

        gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY)
        faces = self._cascade.detectMultiScale(
            gray,
            scaleFactor=1.1,
            minNeighbors=5,
            minSize=(30, 30),
        )

        return len(faces)

    return await asyncio.to_thread(_run_detection)

TextDetector

TextDetector

Detects text in images using EasyOCR (local) or vision LLMs (cloud).

Source code in src/videopython/ai/understanding/detection.py
class TextDetector:
    """Detects text in images using EasyOCR (local) or vision LLMs (cloud)."""

    SUPPORTED_BACKENDS: list[str] = ["local", "openai", "gemini"]

    def __init__(
        self,
        backend: ImageToTextBackend | None = None,
        languages: list[str] | None = None,
        api_key: str | None = None,
    ):
        """Initialize text detector.

        Args:
            backend: Backend to use ('local' for EasyOCR, 'openai'/'gemini' for vision LLMs).
            languages: List of language codes for EasyOCR (default: ['en']).
            api_key: API key for cloud backends.
        """
        resolved_backend: str = backend if backend is not None else get_default_backend("image_to_text")
        if resolved_backend not in self.SUPPORTED_BACKENDS:
            raise UnsupportedBackendError(resolved_backend, self.SUPPORTED_BACKENDS)

        self.backend: ImageToTextBackend = resolved_backend  # type: ignore[assignment]
        self.languages = languages or ["en"]
        self.api_key = api_key
        self._reader: Any = None

    def _init_easyocr(self) -> None:
        """Initialize EasyOCR reader."""
        import easyocr

        self._reader = easyocr.Reader(self.languages, gpu=False)

    def _image_to_base64(self, image: Image.Image) -> str:
        """Convert PIL Image to base64 string."""
        buffer = io.BytesIO()
        image.save(buffer, format="PNG")
        return base64.b64encode(buffer.getvalue()).decode()

    async def _detect_local(self, image: np.ndarray | Image.Image) -> list[str]:
        """Detect text using EasyOCR."""
        if self._reader is None:
            await asyncio.to_thread(self._init_easyocr)

        def _run_ocr() -> list[str]:
            # Convert PIL to numpy if needed
            if isinstance(image, Image.Image):
                img_array = np.array(image)
            else:
                img_array = image

            results = self._reader.readtext(img_array)
            # Extract just the text from results (each result is [bbox, text, confidence])
            return [text for _, text, _ in results if text.strip()]

        return await asyncio.to_thread(_run_ocr)

    async def _detect_openai(self, image: np.ndarray | Image.Image) -> list[str]:
        """Detect text using OpenAI GPT-4o."""
        from openai import AsyncOpenAI

        api_key = get_api_key("openai", self.api_key)
        client = AsyncOpenAI(api_key=api_key)

        if isinstance(image, np.ndarray):
            image = Image.fromarray(image)

        image_base64 = self._image_to_base64(image)

        prompt = """Extract all visible text from this image.
Return a JSON array of strings, where each string is a distinct piece of text found in the image.
Example: ["STOP", "Main Street", "Open 24 Hours"]
Return empty array [] if no text is found.
Return ONLY the JSON array, no other text."""

        response = await client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {
                    "role": "user",
                    "content": [
                        {"type": "text", "text": prompt},
                        {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{image_base64}"}},
                    ],
                }
            ],
            max_tokens=500,
        )

        return self._parse_text_response(response.choices[0].message.content or "[]")

    async def _detect_gemini(self, image: np.ndarray | Image.Image) -> list[str]:
        """Detect text using Google Gemini."""
        import google.generativeai as genai

        api_key = get_api_key("gemini", self.api_key)
        genai.configure(api_key=api_key)

        if isinstance(image, np.ndarray):
            image = Image.fromarray(image)

        model = genai.GenerativeModel("gemini-2.0-flash")

        prompt = """Extract all visible text from this image.
Return a JSON array of strings, where each string is a distinct piece of text found in the image.
Example: ["STOP", "Main Street", "Open 24 Hours"]
Return empty array [] if no text is found.
Return ONLY the JSON array, no other text."""

        def _run_gemini() -> str:
            response = model.generate_content([prompt, image])
            return response.text

        response_text = await asyncio.to_thread(_run_gemini)
        return self._parse_text_response(response_text)

    def _parse_text_response(self, response: str) -> list[str]:
        """Parse JSON response from cloud backends into string list."""
        try:
            response = response.strip()
            if response.startswith("```"):
                lines = response.split("\n")
                response = "\n".join(lines[1:-1] if lines[-1] == "```" else lines[1:])
            response = response.strip()

            data = json.loads(response)
            return [str(text) for text in data if text]
        except (json.JSONDecodeError, TypeError):
            return []

    async def detect(self, image: np.ndarray | Image.Image) -> list[str]:
        """Detect text in an image.

        Args:
            image: Image as numpy array (H, W, 3) in RGB format or PIL Image.

        Returns:
            List of detected text strings.
        """
        if self.backend == "local":
            return await self._detect_local(image)
        elif self.backend == "openai":
            return await self._detect_openai(image)
        elif self.backend == "gemini":
            return await self._detect_gemini(image)
        else:
            raise UnsupportedBackendError(self.backend, self.SUPPORTED_BACKENDS)

__init__

__init__(
    backend: ImageToTextBackend | None = None,
    languages: list[str] | None = None,
    api_key: str | None = None,
)

Initialize text detector.

Parameters:

Name Type Description Default
backend ImageToTextBackend | None

Backend to use ('local' for EasyOCR, 'openai'/'gemini' for vision LLMs).

None
languages list[str] | None

List of language codes for EasyOCR (default: ['en']).

None
api_key str | None

API key for cloud backends.

None
Source code in src/videopython/ai/understanding/detection.py
def __init__(
    self,
    backend: ImageToTextBackend | None = None,
    languages: list[str] | None = None,
    api_key: str | None = None,
):
    """Initialize text detector.

    Args:
        backend: Backend to use ('local' for EasyOCR, 'openai'/'gemini' for vision LLMs).
        languages: List of language codes for EasyOCR (default: ['en']).
        api_key: API key for cloud backends.
    """
    resolved_backend: str = backend if backend is not None else get_default_backend("image_to_text")
    if resolved_backend not in self.SUPPORTED_BACKENDS:
        raise UnsupportedBackendError(resolved_backend, self.SUPPORTED_BACKENDS)

    self.backend: ImageToTextBackend = resolved_backend  # type: ignore[assignment]
    self.languages = languages or ["en"]
    self.api_key = api_key
    self._reader: Any = None

detect async

detect(image: ndarray | Image) -> list[str]

Detect text in an image.

Parameters:

Name Type Description Default
image ndarray | Image

Image as numpy array (H, W, 3) in RGB format or PIL Image.

required

Returns:

Type Description
list[str]

List of detected text strings.

Source code in src/videopython/ai/understanding/detection.py
async def detect(self, image: np.ndarray | Image.Image) -> list[str]:
    """Detect text in an image.

    Args:
        image: Image as numpy array (H, W, 3) in RGB format or PIL Image.

    Returns:
        List of detected text strings.
    """
    if self.backend == "local":
        return await self._detect_local(image)
    elif self.backend == "openai":
        return await self._detect_openai(image)
    elif self.backend == "gemini":
        return await self._detect_gemini(image)
    else:
        raise UnsupportedBackendError(self.backend, self.SUPPORTED_BACKENDS)

ShotTypeClassifier

ShotTypeClassifier

Classifies shot types using vision LLMs.

Source code in src/videopython/ai/understanding/detection.py
class ShotTypeClassifier:
    """Classifies shot types using vision LLMs."""

    SUPPORTED_BACKENDS: list[str] = ["openai", "gemini"]
    SHOT_TYPES: list[str] = ["extreme-wide", "wide", "medium", "medium-close-up", "close-up", "extreme-close-up"]

    def __init__(
        self,
        backend: ImageToTextBackend | None = None,
        api_key: str | None = None,
    ):
        """Initialize shot type classifier.

        Args:
            backend: Backend to use ('openai' or 'gemini').
            api_key: API key for cloud backends.
        """
        resolved_backend: str = backend if backend is not None else get_default_backend("image_to_text")
        # Default to openai if local is configured (no local backend for shot type)
        if resolved_backend == "local":
            resolved_backend = "openai"
        if resolved_backend not in self.SUPPORTED_BACKENDS:
            raise UnsupportedBackendError(resolved_backend, self.SUPPORTED_BACKENDS)

        self.backend: ImageToTextBackend = resolved_backend  # type: ignore[assignment]
        self.api_key = api_key

    def _image_to_base64(self, image: Image.Image) -> str:
        """Convert PIL Image to base64 string."""
        buffer = io.BytesIO()
        image.save(buffer, format="PNG")
        return base64.b64encode(buffer.getvalue()).decode()

    async def _classify_openai(self, image: np.ndarray | Image.Image) -> str | None:
        """Classify shot type using OpenAI GPT-4o."""
        from openai import AsyncOpenAI

        api_key = get_api_key("openai", self.api_key)
        client = AsyncOpenAI(api_key=api_key)

        if isinstance(image, np.ndarray):
            image = Image.fromarray(image)

        image_base64 = self._image_to_base64(image)

        prompt = f"""Classify the cinematographic shot type of this image.
Choose exactly one from: {", ".join(self.SHOT_TYPES)}

Definitions:
- extreme-wide: Very distant view, landscape or establishing shot
- wide: Full scene visible, subjects appear small
- medium: Subject from waist/knees up
- medium-close-up: Subject from chest up
- close-up: Face or object fills most of frame
- extreme-close-up: Detail shot, part of face or small object

Return ONLY the shot type label, nothing else."""

        response = await client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {
                    "role": "user",
                    "content": [
                        {"type": "text", "text": prompt},
                        {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{image_base64}"}},
                    ],
                }
            ],
            max_tokens=50,
        )

        result = (response.choices[0].message.content or "").strip().lower()
        return result if result in self.SHOT_TYPES else None

    async def _classify_gemini(self, image: np.ndarray | Image.Image) -> str | None:
        """Classify shot type using Google Gemini."""
        import google.generativeai as genai

        api_key = get_api_key("gemini", self.api_key)
        genai.configure(api_key=api_key)

        if isinstance(image, np.ndarray):
            image = Image.fromarray(image)

        model = genai.GenerativeModel("gemini-2.0-flash")

        prompt = f"""Classify the cinematographic shot type of this image.
Choose exactly one from: {", ".join(self.SHOT_TYPES)}

Definitions:
- extreme-wide: Very distant view, landscape or establishing shot
- wide: Full scene visible, subjects appear small
- medium: Subject from waist/knees up
- medium-close-up: Subject from chest up
- close-up: Face or object fills most of frame
- extreme-close-up: Detail shot, part of face or small object

Return ONLY the shot type label, nothing else."""

        def _run_gemini() -> str:
            response = model.generate_content([prompt, image])
            return response.text

        result = (await asyncio.to_thread(_run_gemini)).strip().lower()
        return result if result in self.SHOT_TYPES else None

    async def classify(self, image: np.ndarray | Image.Image) -> str | None:
        """Classify the shot type of an image.

        Args:
            image: Image as numpy array (H, W, 3) in RGB format or PIL Image.

        Returns:
            Shot type string or None if classification failed.
        """
        if self.backend == "openai":
            return await self._classify_openai(image)
        elif self.backend == "gemini":
            return await self._classify_gemini(image)
        else:
            raise UnsupportedBackendError(self.backend, self.SUPPORTED_BACKENDS)

__init__

__init__(
    backend: ImageToTextBackend | None = None,
    api_key: str | None = None,
)

Initialize shot type classifier.

Parameters:

Name Type Description Default
backend ImageToTextBackend | None

Backend to use ('openai' or 'gemini').

None
api_key str | None

API key for cloud backends.

None
Source code in src/videopython/ai/understanding/detection.py
def __init__(
    self,
    backend: ImageToTextBackend | None = None,
    api_key: str | None = None,
):
    """Initialize shot type classifier.

    Args:
        backend: Backend to use ('openai' or 'gemini').
        api_key: API key for cloud backends.
    """
    resolved_backend: str = backend if backend is not None else get_default_backend("image_to_text")
    # Default to openai if local is configured (no local backend for shot type)
    if resolved_backend == "local":
        resolved_backend = "openai"
    if resolved_backend not in self.SUPPORTED_BACKENDS:
        raise UnsupportedBackendError(resolved_backend, self.SUPPORTED_BACKENDS)

    self.backend: ImageToTextBackend = resolved_backend  # type: ignore[assignment]
    self.api_key = api_key

classify async

classify(image: ndarray | Image) -> str | None

Classify the shot type of an image.

Parameters:

Name Type Description Default
image ndarray | Image

Image as numpy array (H, W, 3) in RGB format or PIL Image.

required

Returns:

Type Description
str | None

Shot type string or None if classification failed.

Source code in src/videopython/ai/understanding/detection.py
async def classify(self, image: np.ndarray | Image.Image) -> str | None:
    """Classify the shot type of an image.

    Args:
        image: Image as numpy array (H, W, 3) in RGB format or PIL Image.

    Returns:
        Shot type string or None if classification failed.
    """
    if self.backend == "openai":
        return await self._classify_openai(image)
    elif self.backend == "gemini":
        return await self._classify_gemini(image)
    else:
        raise UnsupportedBackendError(self.backend, self.SUPPORTED_BACKENDS)

CameraMotionDetector

CameraMotionDetector

Detects camera motion between frames using optical flow.

Source code in src/videopython/ai/understanding/detection.py
class CameraMotionDetector:
    """Detects camera motion between frames using optical flow."""

    MOTION_TYPES: list[str] = ["static", "pan", "tilt", "zoom", "complex"]

    def __init__(
        self,
        motion_threshold: float = 2.0,
        zoom_threshold: float = 0.1,
    ):
        """Initialize camera motion detector.

        Args:
            motion_threshold: Minimum average flow magnitude to consider as motion.
            zoom_threshold: Threshold for detecting zoom (relative change in flow magnitude from center).
        """
        self.motion_threshold = motion_threshold
        self.zoom_threshold = zoom_threshold

    async def detect(
        self,
        frame1: np.ndarray | Image.Image,
        frame2: np.ndarray | Image.Image,
    ) -> str:
        """Detect camera motion between two consecutive frames.

        Args:
            frame1: First frame as numpy array or PIL Image.
            frame2: Second frame as numpy array or PIL Image.

        Returns:
            Motion type: 'static', 'pan', 'tilt', 'zoom', or 'complex'.
        """

        def _analyze_motion() -> str:
            import cv2

            # Convert to numpy if needed
            if isinstance(frame1, Image.Image):
                img1 = np.array(frame1)
            else:
                img1 = frame1

            if isinstance(frame2, Image.Image):
                img2 = np.array(frame2)
            else:
                img2 = frame2

            # Convert to grayscale
            if len(img1.shape) == 3:
                gray1 = cv2.cvtColor(img1, cv2.COLOR_RGB2GRAY)
            else:
                gray1 = img1

            if len(img2.shape) == 3:
                gray2 = cv2.cvtColor(img2, cv2.COLOR_RGB2GRAY)
            else:
                gray2 = img2

            # Calculate optical flow using Farneback method
            flow = cv2.calcOpticalFlowFarneback(
                gray1,
                gray2,
                None,
                pyr_scale=0.5,
                levels=3,
                winsize=15,
                iterations=3,
                poly_n=5,
                poly_sigma=1.2,
                flags=0,
            )

            # Analyze flow vectors
            flow_x = flow[..., 0]
            flow_y = flow[..., 1]

            # Calculate magnitude
            magnitude = np.sqrt(flow_x**2 + flow_y**2)
            avg_magnitude = np.mean(magnitude)

            if avg_magnitude < self.motion_threshold:
                return "static"

            # Calculate mean flow direction
            mean_flow_x = np.mean(flow_x)
            mean_flow_y = np.mean(flow_y)

            # Check for zoom by analyzing flow from center
            h, w = gray1.shape
            cy, cx = h // 2, w // 2

            # Sample flow at different distances from center
            center_region = magnitude[cy - h // 4 : cy + h // 4, cx - w // 4 : cx + w // 4]
            edge_region_top = magnitude[: h // 4, :]
            edge_region_bottom = magnitude[-h // 4 :, :]
            edge_region_left = magnitude[:, : w // 4]
            edge_region_right = magnitude[:, -w // 4 :]

            center_mag = np.mean(center_region) if center_region.size > 0 else 0
            edge_mag = np.mean(
                [
                    np.mean(edge_region_top) if edge_region_top.size > 0 else 0,
                    np.mean(edge_region_bottom) if edge_region_bottom.size > 0 else 0,
                    np.mean(edge_region_left) if edge_region_left.size > 0 else 0,
                    np.mean(edge_region_right) if edge_region_right.size > 0 else 0,
                ]
            )

            # Zoom detection: edges move more than center (zoom in) or vice versa
            if edge_mag > 0 and abs(edge_mag - center_mag) / edge_mag > self.zoom_threshold:
                return "zoom"

            # Determine dominant motion direction
            abs_x = abs(mean_flow_x)
            abs_y = abs(mean_flow_y)

            if abs_x > abs_y * 1.5:
                return "pan"  # Horizontal motion
            elif abs_y > abs_x * 1.5:
                return "tilt"  # Vertical motion
            else:
                return "complex"  # Mixed motion

        return await asyncio.to_thread(_analyze_motion)

__init__

__init__(
    motion_threshold: float = 2.0,
    zoom_threshold: float = 0.1,
)

Initialize camera motion detector.

Parameters:

Name Type Description Default
motion_threshold float

Minimum average flow magnitude to consider as motion.

2.0
zoom_threshold float

Threshold for detecting zoom (relative change in flow magnitude from center).

0.1
Source code in src/videopython/ai/understanding/detection.py
def __init__(
    self,
    motion_threshold: float = 2.0,
    zoom_threshold: float = 0.1,
):
    """Initialize camera motion detector.

    Args:
        motion_threshold: Minimum average flow magnitude to consider as motion.
        zoom_threshold: Threshold for detecting zoom (relative change in flow magnitude from center).
    """
    self.motion_threshold = motion_threshold
    self.zoom_threshold = zoom_threshold

detect async

detect(
    frame1: ndarray | Image, frame2: ndarray | Image
) -> str

Detect camera motion between two consecutive frames.

Parameters:

Name Type Description Default
frame1 ndarray | Image

First frame as numpy array or PIL Image.

required
frame2 ndarray | Image

Second frame as numpy array or PIL Image.

required

Returns:

Type Description
str

Motion type: 'static', 'pan', 'tilt', 'zoom', or 'complex'.

Source code in src/videopython/ai/understanding/detection.py
async def detect(
    self,
    frame1: np.ndarray | Image.Image,
    frame2: np.ndarray | Image.Image,
) -> str:
    """Detect camera motion between two consecutive frames.

    Args:
        frame1: First frame as numpy array or PIL Image.
        frame2: Second frame as numpy array or PIL Image.

    Returns:
        Motion type: 'static', 'pan', 'tilt', 'zoom', or 'complex'.
    """

    def _analyze_motion() -> str:
        import cv2

        # Convert to numpy if needed
        if isinstance(frame1, Image.Image):
            img1 = np.array(frame1)
        else:
            img1 = frame1

        if isinstance(frame2, Image.Image):
            img2 = np.array(frame2)
        else:
            img2 = frame2

        # Convert to grayscale
        if len(img1.shape) == 3:
            gray1 = cv2.cvtColor(img1, cv2.COLOR_RGB2GRAY)
        else:
            gray1 = img1

        if len(img2.shape) == 3:
            gray2 = cv2.cvtColor(img2, cv2.COLOR_RGB2GRAY)
        else:
            gray2 = img2

        # Calculate optical flow using Farneback method
        flow = cv2.calcOpticalFlowFarneback(
            gray1,
            gray2,
            None,
            pyr_scale=0.5,
            levels=3,
            winsize=15,
            iterations=3,
            poly_n=5,
            poly_sigma=1.2,
            flags=0,
        )

        # Analyze flow vectors
        flow_x = flow[..., 0]
        flow_y = flow[..., 1]

        # Calculate magnitude
        magnitude = np.sqrt(flow_x**2 + flow_y**2)
        avg_magnitude = np.mean(magnitude)

        if avg_magnitude < self.motion_threshold:
            return "static"

        # Calculate mean flow direction
        mean_flow_x = np.mean(flow_x)
        mean_flow_y = np.mean(flow_y)

        # Check for zoom by analyzing flow from center
        h, w = gray1.shape
        cy, cx = h // 2, w // 2

        # Sample flow at different distances from center
        center_region = magnitude[cy - h // 4 : cy + h // 4, cx - w // 4 : cx + w // 4]
        edge_region_top = magnitude[: h // 4, :]
        edge_region_bottom = magnitude[-h // 4 :, :]
        edge_region_left = magnitude[:, : w // 4]
        edge_region_right = magnitude[:, -w // 4 :]

        center_mag = np.mean(center_region) if center_region.size > 0 else 0
        edge_mag = np.mean(
            [
                np.mean(edge_region_top) if edge_region_top.size > 0 else 0,
                np.mean(edge_region_bottom) if edge_region_bottom.size > 0 else 0,
                np.mean(edge_region_left) if edge_region_left.size > 0 else 0,
                np.mean(edge_region_right) if edge_region_right.size > 0 else 0,
            ]
        )

        # Zoom detection: edges move more than center (zoom in) or vice versa
        if edge_mag > 0 and abs(edge_mag - center_mag) / edge_mag > self.zoom_threshold:
            return "zoom"

        # Determine dominant motion direction
        abs_x = abs(mean_flow_x)
        abs_y = abs(mean_flow_y)

        if abs_x > abs_y * 1.5:
            return "pan"  # Horizontal motion
        elif abs_y > abs_x * 1.5:
            return "tilt"  # Vertical motion
        else:
            return "complex"  # Mixed motion

    return await asyncio.to_thread(_analyze_motion)

CombinedFrameAnalyzer

CombinedFrameAnalyzer

Analyzes frames using a single vision API call for efficiency.

For cloud backends (OpenAI/Gemini), combines object detection, OCR, face counting, and shot type classification into a single API call instead of multiple calls.

Uses structured outputs (JSON schema) to ensure valid responses.

Source code in src/videopython/ai/understanding/detection.py
class CombinedFrameAnalyzer:
    """Analyzes frames using a single vision API call for efficiency.

    For cloud backends (OpenAI/Gemini), combines object detection, OCR, face counting,
    and shot type classification into a single API call instead of multiple calls.

    Uses structured outputs (JSON schema) to ensure valid responses.
    """

    SUPPORTED_BACKENDS: list[str] = ["openai", "gemini"]
    SHOT_TYPES: list[str] = ["extreme-wide", "wide", "medium", "medium-close-up", "close-up", "extreme-close-up"]

    # JSON Schema for structured output
    RESPONSE_SCHEMA: dict[str, Any] = {
        "type": "object",
        "properties": {
            "objects": {
                "type": "array",
                "items": {
                    "type": "object",
                    "properties": {
                        "label": {"type": "string", "description": "Object name/class"},
                        "confidence": {"type": "number", "description": "Detection confidence 0-1"},
                        "bbox": {
                            "type": "object",
                            "properties": {
                                "x": {"type": "number", "description": "Left edge, normalized 0-1"},
                                "y": {"type": "number", "description": "Top edge, normalized 0-1"},
                                "width": {"type": "number", "description": "Width, normalized 0-1"},
                                "height": {"type": "number", "description": "Height, normalized 0-1"},
                            },
                            "required": ["x", "y", "width", "height"],
                        },
                    },
                    "required": ["label", "confidence"],
                },
            },
            "text": {
                "type": "array",
                "items": {"type": "string"},
                "description": "Text strings found in the image via OCR",
            },
            "face_count": {"type": "integer", "description": "Number of human faces detected"},
            "shot_type": {
                "type": "string",
                "enum": ["extreme-wide", "wide", "medium", "medium-close-up", "close-up", "extreme-close-up"],
                "description": "Cinematographic shot type classification",
            },
        },
        "required": ["objects", "text", "face_count", "shot_type"],
    }

    def __init__(
        self,
        backend: ImageToTextBackend | None = None,
        api_key: str | None = None,
    ):
        """Initialize combined frame analyzer.

        Args:
            backend: Backend to use ('openai' or 'gemini').
            api_key: API key for cloud backends.
        """
        resolved_backend: str = backend if backend is not None else get_default_backend("image_to_text")
        if resolved_backend == "local":
            raise UnsupportedBackendError(
                "local", self.SUPPORTED_BACKENDS + [" (use individual detectors for local backend)"]
            )
        if resolved_backend not in self.SUPPORTED_BACKENDS:
            raise UnsupportedBackendError(resolved_backend, self.SUPPORTED_BACKENDS)

        self.backend: ImageToTextBackend = resolved_backend  # type: ignore[assignment]
        self.api_key = api_key

    def _image_to_base64(self, image: Image.Image) -> str:
        """Convert PIL Image to base64 string."""
        buffer = io.BytesIO()
        image.save(buffer, format="PNG")
        return base64.b64encode(buffer.getvalue()).decode()

    def _get_combined_prompt(self) -> str:
        """Get the prompt for combined analysis."""
        return """Analyze this image and extract:
1. All visible objects with their bounding boxes (normalized 0-1 coordinates)
2. Any text visible in the image (OCR)
3. Count of human faces
4. Cinematographic shot type classification

Shot type definitions:
- extreme-wide: Very distant view, landscape or establishing shot
- wide: Full scene visible, subjects appear small
- medium: Subject from waist/knees up
- medium-close-up: Subject from chest up
- close-up: Face or object fills most of frame
- extreme-close-up: Detail shot, part of face or small object"""

    def _parse_response(self, data: dict[str, Any]) -> CombinedFrameAnalysis:
        """Parse structured response into CombinedFrameAnalysis."""
        try:
            # Parse objects
            detected_objects = []
            for obj in data.get("objects", []):
                bbox = None
                if "bbox" in obj and obj["bbox"]:
                    bbox = BoundingBox(
                        x=float(obj["bbox"].get("x", 0)),
                        y=float(obj["bbox"].get("y", 0)),
                        width=float(obj["bbox"].get("width", 0)),
                        height=float(obj["bbox"].get("height", 0)),
                    )
                detected_objects.append(
                    DetectedObject(
                        label=obj.get("label", "unknown"),
                        confidence=float(obj.get("confidence", 0.8)),
                        bounding_box=bbox,
                    )
                )

            # Parse text
            detected_text = [str(t) for t in data.get("text", []) if t]

            # Parse face count
            face_count = int(data.get("face_count", 0))

            # Parse shot type
            shot_type = data.get("shot_type", "").lower() if data.get("shot_type") else None
            if shot_type and shot_type not in self.SHOT_TYPES:
                shot_type = None

            return CombinedFrameAnalysis(
                detected_objects=detected_objects,
                detected_text=detected_text,
                face_count=face_count,
                shot_type=shot_type,
            )
        except (KeyError, TypeError, ValueError):
            return CombinedFrameAnalysis(
                detected_objects=[],
                detected_text=[],
                face_count=0,
                shot_type=None,
            )

    async def _analyze_openai(self, image: np.ndarray | Image.Image) -> CombinedFrameAnalysis:
        """Analyze image using OpenAI GPT-4o with structured outputs."""
        from openai import AsyncOpenAI

        api_key = get_api_key("openai", self.api_key)
        client = AsyncOpenAI(api_key=api_key)

        if isinstance(image, np.ndarray):
            image = Image.fromarray(image)

        image_base64 = self._image_to_base64(image)
        prompt = self._get_combined_prompt()

        response = await client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {
                    "role": "user",
                    "content": [
                        {"type": "text", "text": prompt},
                        {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{image_base64}"}},
                    ],
                }
            ],
            response_format={
                "type": "json_schema",
                "json_schema": {
                    "name": "frame_analysis",
                    "strict": True,
                    "schema": self.RESPONSE_SCHEMA,
                },
            },
            max_tokens=1000,
        )

        content = response.choices[0].message.content or "{}"
        data = json.loads(content)
        return self._parse_response(data)

    async def _analyze_gemini(self, image: np.ndarray | Image.Image) -> CombinedFrameAnalysis:
        """Analyze image using Google Gemini with structured outputs."""
        import google.generativeai as genai

        api_key = get_api_key("gemini", self.api_key)
        genai.configure(api_key=api_key)

        if isinstance(image, np.ndarray):
            image = Image.fromarray(image)

        prompt = self._get_combined_prompt()

        def _run_gemini() -> dict[str, Any]:
            model = genai.GenerativeModel(
                "gemini-2.0-flash",
                generation_config=genai.GenerationConfig(
                    response_mime_type="application/json",
                    response_schema=self.RESPONSE_SCHEMA,
                ),
            )
            response = model.generate_content([prompt, image])
            return json.loads(response.text)

        data = await asyncio.to_thread(_run_gemini)
        return self._parse_response(data)

    async def analyze(self, image: np.ndarray | Image.Image) -> CombinedFrameAnalysis:
        """Analyze an image with a single API call.

        Args:
            image: Image as numpy array (H, W, 3) in RGB format or PIL Image.

        Returns:
            CombinedFrameAnalysis with all detection results.
        """
        if self.backend == "openai":
            return await self._analyze_openai(image)
        elif self.backend == "gemini":
            return await self._analyze_gemini(image)
        else:
            raise UnsupportedBackendError(self.backend, self.SUPPORTED_BACKENDS)

__init__

__init__(
    backend: ImageToTextBackend | None = None,
    api_key: str | None = None,
)

Initialize combined frame analyzer.

Parameters:

Name Type Description Default
backend ImageToTextBackend | None

Backend to use ('openai' or 'gemini').

None
api_key str | None

API key for cloud backends.

None
Source code in src/videopython/ai/understanding/detection.py
def __init__(
    self,
    backend: ImageToTextBackend | None = None,
    api_key: str | None = None,
):
    """Initialize combined frame analyzer.

    Args:
        backend: Backend to use ('openai' or 'gemini').
        api_key: API key for cloud backends.
    """
    resolved_backend: str = backend if backend is not None else get_default_backend("image_to_text")
    if resolved_backend == "local":
        raise UnsupportedBackendError(
            "local", self.SUPPORTED_BACKENDS + [" (use individual detectors for local backend)"]
        )
    if resolved_backend not in self.SUPPORTED_BACKENDS:
        raise UnsupportedBackendError(resolved_backend, self.SUPPORTED_BACKENDS)

    self.backend: ImageToTextBackend = resolved_backend  # type: ignore[assignment]
    self.api_key = api_key

analyze async

analyze(image: ndarray | Image) -> CombinedFrameAnalysis

Analyze an image with a single API call.

Parameters:

Name Type Description Default
image ndarray | Image

Image as numpy array (H, W, 3) in RGB format or PIL Image.

required

Returns:

Type Description
CombinedFrameAnalysis

CombinedFrameAnalysis with all detection results.

Source code in src/videopython/ai/understanding/detection.py
async def analyze(self, image: np.ndarray | Image.Image) -> CombinedFrameAnalysis:
    """Analyze an image with a single API call.

    Args:
        image: Image as numpy array (H, W, 3) in RGB format or PIL Image.

    Returns:
        CombinedFrameAnalysis with all detection results.
    """
    if self.backend == "openai":
        return await self._analyze_openai(image)
    elif self.backend == "gemini":
        return await self._analyze_gemini(image)
    else:
        raise UnsupportedBackendError(self.backend, self.SUPPORTED_BACKENDS)

Scene Data Classes

These classes are used by SceneDetector and VideoAnalyzer to represent analysis results:

SceneDescription

SceneDescription dataclass

A self-contained description of a video scene.

A scene is a continuous segment of video where the visual content remains relatively consistent, bounded by scene changes or transitions. This class combines timing information with visual analysis, transcription, and other metadata.

Attributes:

Name Type Description
start float

Scene start time in seconds

end float

Scene end time in seconds

start_frame int

Index of the first frame in this scene

end_frame int

Index of the last frame in this scene (exclusive)

frame_descriptions list[FrameDescription]

List of descriptions for frames sampled from this scene

transcription Transcription | None

Optional transcription of speech within this scene

summary str | None

Optional LLM-generated summary of the scene

scene_type str | None

Optional classification (e.g., "dialogue", "action", "transition")

detected_entities list[str] | None

Optional list of entities/objects detected in the scene

dominant_colors list[tuple[int, int, int]] | None

Optional dominant colors aggregated across the scene

Source code in src/videopython/base/description.py
@dataclass
class SceneDescription:
    """A self-contained description of a video scene.

    A scene is a continuous segment of video where the visual content remains relatively consistent,
    bounded by scene changes or transitions. This class combines timing information with
    visual analysis, transcription, and other metadata.

    Attributes:
        start: Scene start time in seconds
        end: Scene end time in seconds
        start_frame: Index of the first frame in this scene
        end_frame: Index of the last frame in this scene (exclusive)
        frame_descriptions: List of descriptions for frames sampled from this scene
        transcription: Optional transcription of speech within this scene
        summary: Optional LLM-generated summary of the scene
        scene_type: Optional classification (e.g., "dialogue", "action", "transition")
        detected_entities: Optional list of entities/objects detected in the scene
        dominant_colors: Optional dominant colors aggregated across the scene
    """

    start: float
    end: float
    start_frame: int
    end_frame: int
    frame_descriptions: list[FrameDescription] = field(default_factory=list)
    transcription: Transcription | None = None
    summary: str | None = None
    scene_type: str | None = None
    detected_entities: list[str] | None = None
    dominant_colors: list[tuple[int, int, int]] | None = None

    @property
    def duration(self) -> float:
        """Duration of the scene in seconds."""
        return self.end - self.start

    @property
    def frame_count(self) -> int:
        """Number of frames in this scene."""
        return self.end_frame - self.start_frame

    @property
    def num_frames_described(self) -> int:
        """Number of frames that were described in this scene."""
        return len(self.frame_descriptions)

    def get_frame_indices(self, num_samples: int = 3) -> list[int]:
        """Get evenly distributed frame indices from this scene.

        Args:
            num_samples: Number of frames to sample from the scene

        Returns:
            List of frame indices evenly distributed throughout the scene
        """
        if num_samples <= 0:
            raise ValueError("num_samples must be positive")

        if num_samples == 1:
            # Return middle frame
            return [self.start_frame + self.frame_count // 2]

        # Get evenly spaced frames including start and end
        step = (self.end_frame - self.start_frame - 1) / (num_samples - 1)
        return [int(self.start_frame + i * step) for i in range(num_samples)]

    def get_description_summary(self) -> str:
        """Get a summary of all frame descriptions concatenated.

        Returns:
            Single string with all frame descriptions joined
        """
        return " ".join([fd.description for fd in self.frame_descriptions])

    def get_transcription_text(self) -> str:
        """Get the full transcription text for this scene.

        Returns:
            Concatenated transcription text, or empty string if no transcription
        """
        if not self.transcription or not self.transcription.segments:
            return ""
        return " ".join(segment.text for segment in self.transcription.segments)

duration property

duration: float

Duration of the scene in seconds.

frame_count property

frame_count: int

Number of frames in this scene.

num_frames_described property

num_frames_described: int

Number of frames that were described in this scene.

get_frame_indices

get_frame_indices(num_samples: int = 3) -> list[int]

Get evenly distributed frame indices from this scene.

Parameters:

Name Type Description Default
num_samples int

Number of frames to sample from the scene

3

Returns:

Type Description
list[int]

List of frame indices evenly distributed throughout the scene

Source code in src/videopython/base/description.py
def get_frame_indices(self, num_samples: int = 3) -> list[int]:
    """Get evenly distributed frame indices from this scene.

    Args:
        num_samples: Number of frames to sample from the scene

    Returns:
        List of frame indices evenly distributed throughout the scene
    """
    if num_samples <= 0:
        raise ValueError("num_samples must be positive")

    if num_samples == 1:
        # Return middle frame
        return [self.start_frame + self.frame_count // 2]

    # Get evenly spaced frames including start and end
    step = (self.end_frame - self.start_frame - 1) / (num_samples - 1)
    return [int(self.start_frame + i * step) for i in range(num_samples)]

get_description_summary

get_description_summary() -> str

Get a summary of all frame descriptions concatenated.

Returns:

Type Description
str

Single string with all frame descriptions joined

Source code in src/videopython/base/description.py
def get_description_summary(self) -> str:
    """Get a summary of all frame descriptions concatenated.

    Returns:
        Single string with all frame descriptions joined
    """
    return " ".join([fd.description for fd in self.frame_descriptions])

get_transcription_text

get_transcription_text() -> str

Get the full transcription text for this scene.

Returns:

Type Description
str

Concatenated transcription text, or empty string if no transcription

Source code in src/videopython/base/description.py
def get_transcription_text(self) -> str:
    """Get the full transcription text for this scene.

    Returns:
        Concatenated transcription text, or empty string if no transcription
    """
    if not self.transcription or not self.transcription.segments:
        return ""
    return " ".join(segment.text for segment in self.transcription.segments)

VideoDescription

VideoDescription dataclass

Complete understanding of a video including visual and audio analysis.

Attributes:

Name Type Description
scene_descriptions list[SceneDescription]

List of scene descriptions with frame analysis and per-scene transcription

transcription Transcription | None

Optional full audio transcription for the entire video

Source code in src/videopython/base/description.py
@dataclass
class VideoDescription:
    """Complete understanding of a video including visual and audio analysis.

    Attributes:
        scene_descriptions: List of scene descriptions with frame analysis and per-scene transcription
        transcription: Optional full audio transcription for the entire video
    """

    scene_descriptions: list[SceneDescription]
    transcription: Transcription | None = None

    @property
    def num_scenes(self) -> int:
        """Number of scenes detected in the video."""
        return len(self.scene_descriptions)

    @property
    def total_frames_analyzed(self) -> int:
        """Total number of frames analyzed across all scenes."""
        return sum(sd.num_frames_described for sd in self.scene_descriptions)

    def distribute_transcription(self) -> None:
        """Distribute the video-level transcription to each scene.

        Slices the full transcription at word-level granularity and assigns
        relevant words/segments to each SceneDescription based on time overlap.
        Modifies scene_descriptions in place.
        """
        if not self.transcription:
            return

        for sd in self.scene_descriptions:
            sd.transcription = self.transcription.slice(sd.start, sd.end)

    def get_scene_summary(self, scene_index: int) -> str:
        """Get a text summary of a specific scene.

        Args:
            scene_index: Index of the scene to summarize

        Returns:
            Text summary of the scene including timing, descriptions, and transcription
        """
        if scene_index < 0 or scene_index >= len(self.scene_descriptions):
            raise ValueError(f"scene_index {scene_index} out of bounds (0-{len(self.scene_descriptions) - 1})")

        sd = self.scene_descriptions[scene_index]
        summary = f"Scene {scene_index + 1} ({sd.start:.2f}s - {sd.end:.2f}s, {sd.duration:.2f}s): "
        summary += sd.get_description_summary()

        # Include scene-level transcription if available
        scene_transcript = sd.get_transcription_text()
        if scene_transcript:
            summary += f" [Speech: {scene_transcript}]"

        return summary

    def get_full_summary(self) -> str:
        """Get a complete text summary of the entire video.

        Returns:
            Multi-line string with scene summaries and optional transcription
        """
        lines = [f"Video Analysis - {self.num_scenes} scenes, {self.total_frames_analyzed} frames analyzed\n"]

        for i in range(len(self.scene_descriptions)):
            lines.append(self.get_scene_summary(i))

        if self.transcription and self.transcription.segments:
            lines.append("\nFull Transcription:")
            for segment in self.transcription.segments:
                lines.append(f"  [{segment.start:.2f}s - {segment.end:.2f}s]: {segment.text}")

        return "\n".join(lines)

num_scenes property

num_scenes: int

Number of scenes detected in the video.

total_frames_analyzed property

total_frames_analyzed: int

Total number of frames analyzed across all scenes.

distribute_transcription

distribute_transcription() -> None

Distribute the video-level transcription to each scene.

Slices the full transcription at word-level granularity and assigns relevant words/segments to each SceneDescription based on time overlap. Modifies scene_descriptions in place.

Source code in src/videopython/base/description.py
def distribute_transcription(self) -> None:
    """Distribute the video-level transcription to each scene.

    Slices the full transcription at word-level granularity and assigns
    relevant words/segments to each SceneDescription based on time overlap.
    Modifies scene_descriptions in place.
    """
    if not self.transcription:
        return

    for sd in self.scene_descriptions:
        sd.transcription = self.transcription.slice(sd.start, sd.end)

get_scene_summary

get_scene_summary(scene_index: int) -> str

Get a text summary of a specific scene.

Parameters:

Name Type Description Default
scene_index int

Index of the scene to summarize

required

Returns:

Type Description
str

Text summary of the scene including timing, descriptions, and transcription

Source code in src/videopython/base/description.py
def get_scene_summary(self, scene_index: int) -> str:
    """Get a text summary of a specific scene.

    Args:
        scene_index: Index of the scene to summarize

    Returns:
        Text summary of the scene including timing, descriptions, and transcription
    """
    if scene_index < 0 or scene_index >= len(self.scene_descriptions):
        raise ValueError(f"scene_index {scene_index} out of bounds (0-{len(self.scene_descriptions) - 1})")

    sd = self.scene_descriptions[scene_index]
    summary = f"Scene {scene_index + 1} ({sd.start:.2f}s - {sd.end:.2f}s, {sd.duration:.2f}s): "
    summary += sd.get_description_summary()

    # Include scene-level transcription if available
    scene_transcript = sd.get_transcription_text()
    if scene_transcript:
        summary += f" [Speech: {scene_transcript}]"

    return summary

get_full_summary

get_full_summary() -> str

Get a complete text summary of the entire video.

Returns:

Type Description
str

Multi-line string with scene summaries and optional transcription

Source code in src/videopython/base/description.py
def get_full_summary(self) -> str:
    """Get a complete text summary of the entire video.

    Returns:
        Multi-line string with scene summaries and optional transcription
    """
    lines = [f"Video Analysis - {self.num_scenes} scenes, {self.total_frames_analyzed} frames analyzed\n"]

    for i in range(len(self.scene_descriptions)):
        lines.append(self.get_scene_summary(i))

    if self.transcription and self.transcription.segments:
        lines.append("\nFull Transcription:")
        for segment in self.transcription.segments:
            lines.append(f"  [{segment.start:.2f}s - {segment.end:.2f}s]: {segment.text}")

    return "\n".join(lines)

FrameDescription

FrameDescription dataclass

Represents a description of a video frame.

Attributes:

Name Type Description
frame_index int

Index of the frame in the video

timestamp float

Time in seconds when this frame appears

description str

Text description of what's in the frame

color_histogram ColorHistogram | None

Optional color features extracted from the frame

detected_objects list[DetectedObject] | None

Optional list of objects detected in the frame

detected_text list[str] | None

Optional list of text strings found via OCR

detected_faces int | None

Optional count of faces detected in the frame

shot_type str | None

Optional shot classification (e.g., "close-up", "medium", "wide")

camera_motion str | None

Optional camera motion type (e.g., "static", "pan", "tilt", "zoom")

Source code in src/videopython/base/description.py
@dataclass
class FrameDescription:
    """Represents a description of a video frame.

    Attributes:
        frame_index: Index of the frame in the video
        timestamp: Time in seconds when this frame appears
        description: Text description of what's in the frame
        color_histogram: Optional color features extracted from the frame
        detected_objects: Optional list of objects detected in the frame
        detected_text: Optional list of text strings found via OCR
        detected_faces: Optional count of faces detected in the frame
        shot_type: Optional shot classification (e.g., "close-up", "medium", "wide")
        camera_motion: Optional camera motion type (e.g., "static", "pan", "tilt", "zoom")
    """

    frame_index: int
    timestamp: float
    description: str
    color_histogram: ColorHistogram | None = None
    detected_objects: list[DetectedObject] | None = None
    detected_text: list[str] | None = None
    detected_faces: int | None = None
    shot_type: str | None = None
    camera_motion: str | None = None

BoundingBox

BoundingBox dataclass

A bounding box for detected objects in an image.

Coordinates are normalized to [0, 1] range relative to image dimensions.

Attributes:

Name Type Description
x float

Left edge of the box (0 = left edge of image)

y float

Top edge of the box (0 = top edge of image)

width float

Width of the box

height float

Height of the box

Source code in src/videopython/base/description.py
@dataclass
class BoundingBox:
    """A bounding box for detected objects in an image.

    Coordinates are normalized to [0, 1] range relative to image dimensions.

    Attributes:
        x: Left edge of the box (0 = left edge of image)
        y: Top edge of the box (0 = top edge of image)
        width: Width of the box
        height: Height of the box
    """

    x: float
    y: float
    width: float
    height: float

    @property
    def center(self) -> tuple[float, float]:
        """Center point of the bounding box."""
        return (self.x + self.width / 2, self.y + self.height / 2)

    @property
    def area(self) -> float:
        """Area of the bounding box (normalized)."""
        return self.width * self.height

center property

center: tuple[float, float]

Center point of the bounding box.

area property

area: float

Area of the bounding box (normalized).

DetectedObject

DetectedObject dataclass

An object detected in a video frame.

Attributes:

Name Type Description
label str

Name/class of the detected object (e.g., "person", "car", "dog")

confidence float

Detection confidence score between 0 and 1

bounding_box BoundingBox | None

Optional bounding box location of the object

Source code in src/videopython/base/description.py
@dataclass
class DetectedObject:
    """An object detected in a video frame.

    Attributes:
        label: Name/class of the detected object (e.g., "person", "car", "dog")
        confidence: Detection confidence score between 0 and 1
        bounding_box: Optional bounding box location of the object
    """

    label: str
    confidence: float
    bounding_box: BoundingBox | None = None