Skip to content

AI Understanding

Analyze videos, transcribe audio, and describe visual content.

For a single aggregate, serializable analysis object across multiple analyzers, see Video Analysis.

Local Model Support

Class Local Model Family
SceneVLM Qwen3.5
AudioToText Whisper
AudioClassifier AST
SemanticSceneDetector TransNetV2

AudioToText

AudioToText

Transcription service for audio and video using local Whisper models.

Uses openai-whisper for transcription (with word-level timestamps) and pyannote-audio for optional speaker diarization.

Source code in src/videopython/ai/understanding/audio.py
class AudioToText:
    """Transcription service for audio and video using local Whisper models.

    Uses openai-whisper for transcription (with word-level timestamps) and
    pyannote-audio for optional speaker diarization.
    """

    PYANNOTE_DIARIZATION_MODEL = "pyannote/speaker-diarization-community-1"

    def __init__(
        self,
        model_name: Literal["tiny", "base", "small", "medium", "large", "turbo"] = "small",
        enable_diarization: bool = False,
        device: str | None = None,
    ):
        self.model_name = model_name
        self.enable_diarization = enable_diarization
        self.device = select_device(device, mps_allowed=False)
        log_device_initialization(
            "AudioToText",
            requested_device=device,
            resolved_device=self.device,
        )
        self._model: Any = None
        self._diarization_pipeline: Any = None

    def _init_local(self) -> None:
        """Initialize local Whisper model."""
        import whisper

        self._model = whisper.load_model(name=self.model_name, device=self.device)

    def _init_diarization(self) -> None:
        """Initialize pyannote speaker diarization pipeline."""
        import torch
        from pyannote.audio import Pipeline  # type: ignore[import-untyped]

        self._diarization_pipeline = Pipeline.from_pretrained(self.PYANNOTE_DIARIZATION_MODEL)
        self._diarization_pipeline.to(torch.device(self.device))

    def _process_transcription_result(self, transcription_result: dict) -> Transcription:
        """Process raw transcription result into a Transcription object."""
        transcription_segments = []
        for segment in transcription_result["segments"]:
            transcription_words = [
                TranscriptionWord(word=word["word"], start=float(word["start"]), end=float(word["end"]))
                for word in segment.get("words", [])
            ]
            transcription_segment = TranscriptionSegment(
                start=segment["start"],
                end=segment["end"],
                text=segment["text"],
                words=transcription_words,
            )
            transcription_segments.append(transcription_segment)

        return Transcription(segments=transcription_segments, language=transcription_result.get("language"))

    @staticmethod
    def _assign_speakers_to_words(
        words: list[TranscriptionWord],
        diarization_result: Any,
    ) -> list[TranscriptionWord]:
        """Assign speaker labels to words based on diarization segment overlap.

        For each word, finds the diarization segment with the greatest time overlap
        and assigns that speaker. Words with no overlapping diarization segment get
        the nearest speaker by midpoint distance.
        """
        speaker_segments: list[tuple[float, float, str]] = []
        # pyannote-audio 4.x returns DiarizeOutput; use exclusive_speaker_diarization
        # (no overlapping turns) for cleaner word assignment.
        annotation = getattr(diarization_result, "exclusive_speaker_diarization", diarization_result)
        for turn, _, speaker in annotation.itertracks(yield_label=True):
            speaker_segments.append((turn.start, turn.end, speaker))

        if not speaker_segments:
            return words

        result = []
        for word in words:
            best_speaker: str | None = None
            best_overlap = 0.0

            for seg_start, seg_end, speaker in speaker_segments:
                overlap = max(0.0, min(word.end, seg_end) - max(word.start, seg_start))
                if overlap > best_overlap:
                    best_overlap = overlap
                    best_speaker = speaker

            if best_speaker is None:
                word_mid = (word.start + word.end) / 2.0
                best_dist = float("inf")
                for seg_start, seg_end, speaker in speaker_segments:
                    seg_mid = (seg_start + seg_end) / 2.0
                    dist = abs(word_mid - seg_mid)
                    if dist < best_dist:
                        best_dist = dist
                        best_speaker = speaker

            result.append(
                TranscriptionWord(
                    word=word.word,
                    start=word.start,
                    end=word.end,
                    speaker=best_speaker,
                )
            )
        return result

    def _transcribe_with_diarization(self, audio_mono: Audio) -> Transcription:
        """Transcribe with word timestamps and assign speakers via pyannote."""
        import numpy as np
        import torch

        if self._diarization_pipeline is None:
            self._init_diarization()

        audio_data = audio_mono.data
        transcription_result = self._model.transcribe(audio=audio_data, word_timestamps=True)

        waveform = torch.from_numpy(audio_data.astype(np.float32)).unsqueeze(0)
        diarization_result = self._diarization_pipeline(
            {"waveform": waveform, "sample_rate": audio_mono.metadata.sample_rate}
        )

        transcription = self._process_transcription_result(transcription_result)

        all_words: list[TranscriptionWord] = []
        for seg in transcription.segments:
            all_words.extend(seg.words)

        if all_words:
            all_words = self._assign_speakers_to_words(all_words, diarization_result)

        return Transcription(words=all_words, language=transcription.language)

    def _transcribe_local(self, audio: Audio) -> Transcription:
        """Transcribe using local Whisper model."""
        import whisper

        if self._model is None:
            self._init_local()

        audio_mono = audio.to_mono().resample(whisper.audio.SAMPLE_RATE)

        if self.enable_diarization:
            return self._transcribe_with_diarization(audio_mono)

        transcription_result = self._model.transcribe(audio=audio_mono.data, word_timestamps=True)
        return self._process_transcription_result(transcription_result)

    def transcribe(self, media: Audio | Video) -> Transcription:
        """Transcribe audio or video to text."""
        if isinstance(media, Video):
            if media.audio.is_silent:
                return Transcription(segments=[])
            audio = media.audio
        elif isinstance(media, Audio):
            if media.is_silent:
                return Transcription(segments=[])
            audio = media
        else:
            raise TypeError(f"Unsupported media type: {type(media)}. Expected Audio or Video.")

        return self._transcribe_local(audio)

transcribe

transcribe(media: Audio | Video) -> Transcription

Transcribe audio or video to text.

Source code in src/videopython/ai/understanding/audio.py
def transcribe(self, media: Audio | Video) -> Transcription:
    """Transcribe audio or video to text."""
    if isinstance(media, Video):
        if media.audio.is_silent:
            return Transcription(segments=[])
        audio = media.audio
    elif isinstance(media, Audio):
        if media.is_silent:
            return Transcription(segments=[])
        audio = media
    else:
        raise TypeError(f"Unsupported media type: {type(media)}. Expected Audio or Video.")

    return self._transcribe_local(audio)

AudioClassifier

Detect and classify sounds, music, and audio events with timestamps using Audio Spectrogram Transformer (AST), a state-of-the-art model achieving 0.485 mAP on AudioSet.

Basic Usage

from videopython.ai import AudioClassifier
from videopython.base import Video

classifier = AudioClassifier(confidence_threshold=0.3)
video = Video.from_path("video.mp4")

result = classifier.classify(video)

# Clip-level predictions (overall audio content)
for label, confidence in result.clip_predictions.items():
    print(f"{label}: {confidence:.2f}")

# Timestamped events
for event in result.events:
    print(f"{event.start:.1f}s - {event.end:.1f}s: {event.label} ({event.confidence:.2f})")

AudioClassifier

Audio event and sound classification using AST.

Source code in src/videopython/ai/understanding/audio.py
class AudioClassifier:
    """Audio event and sound classification using AST."""

    SUPPORTED_MODELS: list[str] = ["MIT/ast-finetuned-audioset-10-10-0.4593"]
    AST_SAMPLE_RATE: int = 16000
    AST_CHUNK_SECONDS: float = 10.0
    AST_HOP_SECONDS: float = 5.0

    def __init__(
        self,
        model_name: str = "MIT/ast-finetuned-audioset-10-10-0.4593",
        confidence_threshold: float = 0.3,
        top_k: int = 10,
        device: str | None = None,
    ):
        if model_name not in self.SUPPORTED_MODELS:
            raise ValueError(f"Model '{model_name}' not supported. Supported: {self.SUPPORTED_MODELS}")

        self.model_name = model_name
        self.confidence_threshold = confidence_threshold
        self.top_k = top_k
        self.device = select_device(device, mps_allowed=True)
        log_device_initialization(
            "AudioClassifier",
            requested_device=device,
            resolved_device=self.device,
        )

        self._model: Any = None
        self._processor: Any = None
        self._labels: list[str] = []

    def _init_local(self) -> None:
        """Initialize local AST model from HuggingFace."""
        from transformers import ASTFeatureExtractor, ASTForAudioClassification

        self._processor = ASTFeatureExtractor.from_pretrained(self.model_name)
        self._model = ASTForAudioClassification.from_pretrained(self.model_name)
        self._model.to(self.device)
        self._model.eval()

        self._labels = [self._model.config.id2label[i] for i in range(len(self._model.config.id2label))]

    def _merge_events(self, events: list[AudioEvent], gap_threshold: float = 0.5) -> list[AudioEvent]:
        """Merge consecutive events of the same class."""
        if not events:
            return []

        events_by_label: dict[str, list[AudioEvent]] = {}
        for event in events:
            if event.label not in events_by_label:
                events_by_label[event.label] = []
            events_by_label[event.label].append(event)

        merged = []
        for label, label_events in events_by_label.items():
            sorted_events = sorted(label_events, key=lambda e: e.start)
            current = sorted_events[0]

            for next_event in sorted_events[1:]:
                if next_event.start - current.end <= gap_threshold:
                    current = AudioEvent(
                        start=current.start,
                        end=next_event.end,
                        label=label,
                        confidence=max(current.confidence, next_event.confidence),
                    )
                else:
                    merged.append(current)
                    current = next_event

            merged.append(current)

        return sorted(merged, key=lambda e: e.start)

    def _classify_local(self, audio: Audio) -> AudioClassification:
        """Classify audio using local AST model with sliding window."""
        import numpy as np
        import torch

        if self._model is None:
            self._init_local()

        audio_processed = audio.to_mono().resample(self.AST_SAMPLE_RATE)
        audio_data = audio_processed.data.astype(np.float32)

        chunk_samples = int(self.AST_CHUNK_SECONDS * self.AST_SAMPLE_RATE)
        hop_samples = int(self.AST_HOP_SECONDS * self.AST_SAMPLE_RATE)
        total_samples = len(audio_data)

        all_chunk_probs = []
        chunk_times = []

        if total_samples <= chunk_samples:
            chunks = [(0, audio_data)]
        else:
            chunks = []
            start = 0
            while start < total_samples:
                end = min(start + chunk_samples, total_samples)
                chunk = audio_data[start:end]
                if len(chunk) < chunk_samples:
                    chunk = np.pad(chunk, (0, chunk_samples - len(chunk)))
                chunks.append((start, chunk))
                start += hop_samples

        for start_sample, chunk in chunks:
            start_time = start_sample / self.AST_SAMPLE_RATE

            inputs = self._processor(
                chunk,
                sampling_rate=self.AST_SAMPLE_RATE,
                return_tensors="pt",
            )
            inputs = {k: v.to(self.device) for k, v in inputs.items()}

            with torch.no_grad():
                outputs = self._model(**inputs)
                logits = outputs.logits[0]
                probs = torch.sigmoid(logits).cpu().numpy()

            all_chunk_probs.append(probs)
            chunk_times.append(start_time)

        chunk_probs_array = np.array(all_chunk_probs)

        events = []
        for start_time, probs in zip(chunk_times, chunk_probs_array):
            end_time = start_time + self.AST_CHUNK_SECONDS
            top_indices = np.argsort(probs)[-self.top_k :][::-1]

            for class_idx in top_indices:
                confidence = float(probs[class_idx])
                if confidence >= self.confidence_threshold:
                    label = self._labels[class_idx]
                    events.append(
                        AudioEvent(
                            start=start_time,
                            end=min(end_time, total_samples / self.AST_SAMPLE_RATE),
                            label=label,
                            confidence=confidence,
                        )
                    )

        merged_events = self._merge_events(events)

        clip_preds = np.mean(chunk_probs_array, axis=0)
        top_clip_indices = np.argsort(clip_preds)[-self.top_k :][::-1]
        clip_predictions = {
            self._labels[idx]: float(clip_preds[idx])
            for idx in top_clip_indices
            if clip_preds[idx] >= self.confidence_threshold
        }

        return AudioClassification(events=merged_events, clip_predictions=clip_predictions)

    def classify(self, media: Audio | Video) -> AudioClassification:
        """Classify audio events in audio or video."""
        if isinstance(media, Video):
            if media.audio.is_silent:
                return AudioClassification(events=[], clip_predictions={})
            audio = media.audio
        elif isinstance(media, Audio):
            if media.is_silent:
                return AudioClassification(events=[], clip_predictions={})
            audio = media
        else:
            raise TypeError(f"Unsupported media type: {type(media)}. Expected Audio or Video.")

        return self._classify_local(audio)

classify

classify(media: Audio | Video) -> AudioClassification

Classify audio events in audio or video.

Source code in src/videopython/ai/understanding/audio.py
def classify(self, media: Audio | Video) -> AudioClassification:
    """Classify audio events in audio or video."""
    if isinstance(media, Video):
        if media.audio.is_silent:
            return AudioClassification(events=[], clip_predictions={})
        audio = media.audio
    elif isinstance(media, Audio):
        if media.is_silent:
            return AudioClassification(events=[], clip_predictions={})
        audio = media
    else:
        raise TypeError(f"Unsupported media type: {type(media)}. Expected Audio or Video.")

    return self._classify_local(audio)

SceneVLM

SceneVLM supports both Qwen3.5 2B and 4B model variants. Device selection is automatic by default (cuda -> cpu).

SceneVLM

Generates scene captions with local Qwen3.5.

Source code in src/videopython/ai/understanding/image.py
class SceneVLM:
    """Generates scene captions with local Qwen3.5."""

    # Default pixel budget per image for scene captioning. Qwen3.5 tiles
    # images into patches; fewer pixels = fewer vision tokens = faster
    # inference. 384x384 = 147456 is plenty for scene-level captioning.
    DEFAULT_MAX_IMAGE_PIXELS: int = 384 * 384

    def __init__(
        self,
        model_name: str | None = None,
        device: str | None = None,
        max_new_tokens: int = 128,
        temperature: float = 0.0,
        model_size: Literal["2b", "4b"] = DEFAULT_SCENE_VLM_MODEL_SIZE,
        max_image_pixels: int | None = None,
    ):
        if model_size not in SCENE_VLM_MODEL_IDS:
            supported = ", ".join(sorted(SCENE_VLM_MODEL_IDS))
            raise ValueError(f"model_size must be one of: {supported}")

        self.model_size = model_size
        self.model_name = model_name or SCENE_VLM_MODEL_IDS[model_size]
        self.device = device
        self.max_new_tokens = max_new_tokens
        self.temperature = temperature
        self.max_image_pixels = max_image_pixels if max_image_pixels is not None else self.DEFAULT_MAX_IMAGE_PIXELS
        self._processor: Any = None
        self._model: Any = None

    def _init_local(self) -> None:
        """Initialize local Qwen3.5 model."""
        import torch
        from transformers import AutoModelForImageTextToText, AutoProcessor  # type: ignore[attr-defined]

        t0 = time.perf_counter()
        requested_device = self.device
        resolved_device = select_device(self.device, mps_allowed=True)

        self._processor = AutoProcessor.from_pretrained(self.model_name)
        # Save and restore default dtype -- transformers torch_dtype="auto" can
        # mutate torch.get_default_dtype(), which breaks concurrent models
        # (e.g. Whisper) that expect float32.
        saved_dtype = torch.get_default_dtype()
        try:
            self._model = AutoModelForImageTextToText.from_pretrained(self.model_name, torch_dtype="auto")
        finally:
            torch.set_default_dtype(saved_dtype)
        self._model.to(resolved_device)
        self._model.eval()
        self.device = resolved_device

        log_device_initialization(
            "SceneVLM",
            requested_device=requested_device,
            resolved_device=resolved_device,
        )
        logger.info("SceneVLM model weights loaded in %.2fs", time.perf_counter() - t0)

    def _downscale_image(self, img: Image.Image) -> Image.Image:
        """Downscale image to fit within max_image_pixels budget, preserving aspect ratio."""
        w, h = img.size
        pixels = w * h
        if pixels <= self.max_image_pixels:
            return img
        scale = (self.max_image_pixels / pixels) ** 0.5
        new_w = max(1, int(w * scale))
        new_h = max(1, int(h * scale))
        return img.resize((new_w, new_h), Image.LANCZOS)

    def _generation_config_for_run(self) -> Any | None:
        base_config = getattr(self._model, "generation_config", None)
        if base_config is None or not hasattr(base_config, "to_dict"):
            return None

        config = base_config.__class__.from_dict(base_config.to_dict())
        if self.temperature > 0:
            config.do_sample = True
            config.temperature = self.temperature
            return config

        config.do_sample = False
        for name, value in (("temperature", 1.0), ("top_p", 1.0), ("top_k", 50)):
            if hasattr(config, name):
                setattr(config, name, value)
        return config

    def analyze_frame(
        self,
        image: np.ndarray | Image.Image,
        prompt: str | None = None,
    ) -> str:
        """Analyze one frame and return a plain-text caption."""
        frame = Image.fromarray(image) if isinstance(image, np.ndarray) else image
        return self.analyze_scene([frame], prompt=prompt)

    def analyze_scene(
        self,
        images: list[np.ndarray | Image.Image],
        prompt: str | None = None,
    ) -> str:
        """Analyze a scene with multiple frames and return a plain-text caption."""
        if not images:
            raise ValueError("`images` must contain at least one frame")

        pil_images = [
            self._downscale_image(Image.fromarray(img) if isinstance(img, np.ndarray) else img) for img in images
        ]
        user_prompt = prompt or _DEFAULT_PROMPT
        content: list[dict[str, Any]] = [{"type": "image", "image": img} for img in pil_images]
        content.append({"type": "text", "text": user_prompt})
        messages = [{"role": "user", "content": content}]
        outputs = self._generate_from_message_batch([messages])
        caption = " ".join(outputs[0].split()).strip()
        return caption or "No scene description"

    def _generate_from_message_batch(self, messages_batch: list[list[dict[str, Any]]]) -> list[str]:
        """Run batch generation for one or more multimodal chat messages."""
        import torch

        if self._model is None:
            self._init_local()

        texts = [
            self._processor.apply_chat_template(msg, tokenize=False, add_generation_prompt=True, enable_thinking=False)
            for msg in messages_batch
        ]

        processor_kwargs: dict[str, Any] = {
            "text": texts,
            "padding": True,
            "return_tensors": "pt",
        }

        try:
            from qwen_vl_utils import process_vision_info  # type: ignore
        except ImportError:
            image_inputs = [
                [
                    item["image"]
                    for part in message
                    for item in part.get("content", [])
                    if isinstance(item, dict) and item.get("type") == "image" and "image" in item
                ]
                for message in messages_batch
            ]
            if all(len(items) == 1 for items in image_inputs):
                processor_kwargs["images"] = [items[0] for items in image_inputs]
            else:
                processor_kwargs["images"] = image_inputs
        else:
            image_inputs, video_inputs = process_vision_info(messages_batch)
            processor_kwargs["images"] = image_inputs
            if video_inputs is not None:
                processor_kwargs["videos"] = video_inputs

        num_images = sum(
            len(items) if isinstance(items, list) else 1 for items in (processor_kwargs.get("images") or [])
        )

        inputs = self._processor(**processor_kwargs)
        inputs = inputs.to(self.device) if hasattr(inputs, "to") else {k: v.to(self.device) for k, v in inputs.items()}

        generation_config = self._generation_config_for_run()
        if generation_config is not None:
            generation_config.max_new_tokens = self.max_new_tokens
            generation_kwargs: dict[str, Any] = {"generation_config": generation_config}
        elif self.temperature > 0:
            generation_kwargs = {
                "max_new_tokens": self.max_new_tokens,
                "do_sample": True,
                "temperature": self.temperature,
            }
        else:
            generation_kwargs = {"max_new_tokens": self.max_new_tokens, "do_sample": False}

        t0 = time.perf_counter()
        with torch.no_grad():
            output_ids = self._model.generate(**inputs, **generation_kwargs)
        logger.info(
            "SceneVLM inference: %.2fs, %d images, %d messages", time.perf_counter() - t0, num_images, len(texts)
        )

        generated_ids_trimmed = [
            out_ids[len(in_ids) :] for in_ids, out_ids in zip(inputs["input_ids"], output_ids, strict=False)
        ]
        output_texts = self._processor.batch_decode(generated_ids_trimmed, skip_special_tokens=True)
        return [text.strip() for text in output_texts]

analyze_frame

analyze_frame(
    image: ndarray | Image, prompt: str | None = None
) -> str

Analyze one frame and return a plain-text caption.

Source code in src/videopython/ai/understanding/image.py
def analyze_frame(
    self,
    image: np.ndarray | Image.Image,
    prompt: str | None = None,
) -> str:
    """Analyze one frame and return a plain-text caption."""
    frame = Image.fromarray(image) if isinstance(image, np.ndarray) else image
    return self.analyze_scene([frame], prompt=prompt)

analyze_scene

analyze_scene(
    images: list[ndarray | Image], prompt: str | None = None
) -> str

Analyze a scene with multiple frames and return a plain-text caption.

Source code in src/videopython/ai/understanding/image.py
def analyze_scene(
    self,
    images: list[np.ndarray | Image.Image],
    prompt: str | None = None,
) -> str:
    """Analyze a scene with multiple frames and return a plain-text caption."""
    if not images:
        raise ValueError("`images` must contain at least one frame")

    pil_images = [
        self._downscale_image(Image.fromarray(img) if isinstance(img, np.ndarray) else img) for img in images
    ]
    user_prompt = prompt or _DEFAULT_PROMPT
    content: list[dict[str, Any]] = [{"type": "image", "image": img} for img in pil_images]
    content.append({"type": "text", "text": user_prompt})
    messages = [{"role": "user", "content": content}]
    outputs = self._generate_from_message_batch([messages])
    caption = " ".join(outputs[0].split()).strip()
    return caption or "No scene description"

SemanticSceneDetector

ML-based scene boundary detection using TransNetV2. More accurate than histogram-based detection, especially for gradual transitions like fades and dissolves.

from videopython.ai import SemanticSceneDetector

detector = SemanticSceneDetector(threshold=0.5, min_scene_length=1.0)
scenes = detector.detect_streaming("video.mp4")

for scene in scenes:
    print(f"Scene: {scene.start:.1f}s - {scene.end:.1f}s ({scene.duration:.1f}s)")

SemanticSceneDetector

ML-based scene detection using TransNetV2.

TransNetV2 is a neural network specifically designed for shot boundary detection, providing more accurate scene boundaries than histogram-based methods, especially for gradual transitions.

Uses the transnetv2-pytorch package with pretrained weights.

Example

from videopython.ai.understanding import SemanticSceneDetector detector = SemanticSceneDetector() scenes = detector.detect_streaming("video.mp4") for scene in scenes: ... print(f"Scene: {scene.start:.2f}s - {scene.end:.2f}s")

Source code in src/videopython/ai/understanding/temporal.py
class SemanticSceneDetector:
    """ML-based scene detection using TransNetV2.

    TransNetV2 is a neural network specifically designed for shot boundary
    detection, providing more accurate scene boundaries than histogram-based
    methods, especially for gradual transitions.

    Uses the transnetv2-pytorch package with pretrained weights.

    Example:
        >>> from videopython.ai.understanding import SemanticSceneDetector
        >>> detector = SemanticSceneDetector()
        >>> scenes = detector.detect_streaming("video.mp4")
        >>> for scene in scenes:
        ...     print(f"Scene: {scene.start:.2f}s - {scene.end:.2f}s")
    """

    def __init__(
        self,
        threshold: float = 0.5,
        min_scene_length: float = 0.5,
        device: str | None = None,
    ):
        """Initialize the semantic scene detector.

        Args:
            threshold: Confidence threshold for scene boundaries (0.0-1.0).
                Higher values = fewer, more confident boundaries.
            min_scene_length: Minimum scene duration in seconds.
            device: Device to run on ('cuda', 'mps', 'cpu', or None for auto).
                Note: MPS may have numerical inconsistencies; use 'cpu' for
                reproducible results.
        """
        if not 0.0 <= threshold <= 1.0:
            raise ValueError("threshold must be between 0.0 and 1.0")
        if min_scene_length < 0:
            raise ValueError("min_scene_length must be non-negative")

        self.threshold = threshold
        self.min_scene_length = min_scene_length
        self._device: str | None = device
        self._model: Any = None

    def _load_model(self) -> None:
        """Load the TransNetV2 model with pretrained weights."""
        if self._model is not None:
            return

        from transnetv2_pytorch import TransNetV2

        requested_device = self._device
        device = select_device(self._device, mps_allowed=True)
        log_device_initialization(
            "SemanticSceneDetector",
            requested_device=requested_device,
            resolved_device=device,
        )
        self._model = TransNetV2(device=device)
        self._model.eval()

    def detect(self, video: Video) -> list[SceneBoundary]:
        """Detect scenes in a video using ML-based boundary detection.

        Note: This method requires saving video to a temporary file for
        TransNetV2 processing. For better performance, use detect_streaming()
        with a file path directly.

        Args:
            video: Video object to analyze.

        Returns:
            List of SceneBoundary objects representing detected scenes.
        """
        import tempfile

        if len(video.frames) == 0:
            return []

        if len(video.frames) == 1:
            return [SceneBoundary(start=0.0, end=video.total_seconds, start_frame=0, end_frame=1)]

        # Save video to temp file for TransNetV2 processing
        with tempfile.NamedTemporaryFile(suffix=".mp4", delete=True) as tmp:
            video.save(tmp.name)
            return self.detect_streaming(tmp.name)

    def detect_streaming(
        self,
        path: str | Path,
        start_second: float | None = None,
        end_second: float | None = None,
    ) -> list[SceneBoundary]:
        """Detect scenes from a video file.

        Uses TransNetV2 with pretrained weights for accurate shot boundary
        detection.

        Args:
            path: Path to video file.
            start_second: Optional start time for analysis (not yet supported).
            end_second: Optional end time for analysis (not yet supported).

        Returns:
            List of SceneBoundary objects representing detected scenes.
        """
        if start_second is not None or end_second is not None:
            import warnings

            warnings.warn(
                "start_second and end_second are not yet supported by SemanticSceneDetector. Processing entire video.",
                UserWarning,
                stacklevel=2,
            )

        self._load_model()

        # Use TransNetV2's detect_scenes which handles everything internally
        raw_scenes = self._model.detect_scenes(str(path), threshold=self.threshold)

        # Convert to SceneBoundary objects
        scenes = []
        for scene_data in raw_scenes:
            start_frame = scene_data["start_frame"]
            end_frame = scene_data["end_frame"]
            start_time = float(scene_data["start_time"])
            end_time = float(scene_data["end_time"])

            scenes.append(
                SceneBoundary(
                    start=start_time,
                    end=end_time,
                    start_frame=start_frame,
                    end_frame=end_frame,
                )
            )

        if self.min_scene_length > 0:
            scenes = self._merge_short_scenes(scenes)

        return scenes

    def _merge_short_scenes(self, scenes: list[SceneBoundary]) -> list[SceneBoundary]:
        """Merge scenes that are shorter than min_scene_length.

        Args:
            scenes: List of scenes to process.

        Returns:
            List of scenes with short scenes merged into adjacent ones.
        """
        if not scenes:
            return scenes

        merged = [scenes[0]]

        for scene in scenes[1:]:
            last_scene = merged[-1]

            if last_scene.duration < self.min_scene_length:
                merged[-1] = SceneBoundary(
                    start=last_scene.start,
                    end=scene.end,
                    start_frame=last_scene.start_frame,
                    end_frame=scene.end_frame,
                )
            else:
                merged.append(scene)

        if len(merged) > 1 and merged[-1].duration < self.min_scene_length:
            second_last = merged[-2]
            last = merged[-1]
            merged[-2] = SceneBoundary(
                start=second_last.start,
                end=last.end,
                start_frame=second_last.start_frame,
                end_frame=last.end_frame,
            )
            merged.pop()

        return merged

    @classmethod
    def detect_from_path(
        cls,
        path: str | Path,
        threshold: float = 0.5,
        min_scene_length: float = 0.5,
    ) -> list[SceneBoundary]:
        """Convenience method for one-shot scene detection.

        Args:
            path: Path to video file.
            threshold: Scene boundary threshold (0.0-1.0).
            min_scene_length: Minimum scene duration in seconds.

        Returns:
            List of SceneBoundary objects representing detected scenes.
        """
        detector = cls(threshold=threshold, min_scene_length=min_scene_length)
        return detector.detect_streaming(path)

__init__

__init__(
    threshold: float = 0.5,
    min_scene_length: float = 0.5,
    device: str | None = None,
)

Initialize the semantic scene detector.

Parameters:

Name Type Description Default
threshold float

Confidence threshold for scene boundaries (0.0-1.0). Higher values = fewer, more confident boundaries.

0.5
min_scene_length float

Minimum scene duration in seconds.

0.5
device str | None

Device to run on ('cuda', 'mps', 'cpu', or None for auto). Note: MPS may have numerical inconsistencies; use 'cpu' for reproducible results.

None
Source code in src/videopython/ai/understanding/temporal.py
def __init__(
    self,
    threshold: float = 0.5,
    min_scene_length: float = 0.5,
    device: str | None = None,
):
    """Initialize the semantic scene detector.

    Args:
        threshold: Confidence threshold for scene boundaries (0.0-1.0).
            Higher values = fewer, more confident boundaries.
        min_scene_length: Minimum scene duration in seconds.
        device: Device to run on ('cuda', 'mps', 'cpu', or None for auto).
            Note: MPS may have numerical inconsistencies; use 'cpu' for
            reproducible results.
    """
    if not 0.0 <= threshold <= 1.0:
        raise ValueError("threshold must be between 0.0 and 1.0")
    if min_scene_length < 0:
        raise ValueError("min_scene_length must be non-negative")

    self.threshold = threshold
    self.min_scene_length = min_scene_length
    self._device: str | None = device
    self._model: Any = None

detect

detect(video: Video) -> list[SceneBoundary]

Detect scenes in a video using ML-based boundary detection.

Note: This method requires saving video to a temporary file for TransNetV2 processing. For better performance, use detect_streaming() with a file path directly.

Parameters:

Name Type Description Default
video Video

Video object to analyze.

required

Returns:

Type Description
list[SceneBoundary]

List of SceneBoundary objects representing detected scenes.

Source code in src/videopython/ai/understanding/temporal.py
def detect(self, video: Video) -> list[SceneBoundary]:
    """Detect scenes in a video using ML-based boundary detection.

    Note: This method requires saving video to a temporary file for
    TransNetV2 processing. For better performance, use detect_streaming()
    with a file path directly.

    Args:
        video: Video object to analyze.

    Returns:
        List of SceneBoundary objects representing detected scenes.
    """
    import tempfile

    if len(video.frames) == 0:
        return []

    if len(video.frames) == 1:
        return [SceneBoundary(start=0.0, end=video.total_seconds, start_frame=0, end_frame=1)]

    # Save video to temp file for TransNetV2 processing
    with tempfile.NamedTemporaryFile(suffix=".mp4", delete=True) as tmp:
        video.save(tmp.name)
        return self.detect_streaming(tmp.name)

detect_streaming

detect_streaming(
    path: str | Path,
    start_second: float | None = None,
    end_second: float | None = None,
) -> list[SceneBoundary]

Detect scenes from a video file.

Uses TransNetV2 with pretrained weights for accurate shot boundary detection.

Parameters:

Name Type Description Default
path str | Path

Path to video file.

required
start_second float | None

Optional start time for analysis (not yet supported).

None
end_second float | None

Optional end time for analysis (not yet supported).

None

Returns:

Type Description
list[SceneBoundary]

List of SceneBoundary objects representing detected scenes.

Source code in src/videopython/ai/understanding/temporal.py
def detect_streaming(
    self,
    path: str | Path,
    start_second: float | None = None,
    end_second: float | None = None,
) -> list[SceneBoundary]:
    """Detect scenes from a video file.

    Uses TransNetV2 with pretrained weights for accurate shot boundary
    detection.

    Args:
        path: Path to video file.
        start_second: Optional start time for analysis (not yet supported).
        end_second: Optional end time for analysis (not yet supported).

    Returns:
        List of SceneBoundary objects representing detected scenes.
    """
    if start_second is not None or end_second is not None:
        import warnings

        warnings.warn(
            "start_second and end_second are not yet supported by SemanticSceneDetector. Processing entire video.",
            UserWarning,
            stacklevel=2,
        )

    self._load_model()

    # Use TransNetV2's detect_scenes which handles everything internally
    raw_scenes = self._model.detect_scenes(str(path), threshold=self.threshold)

    # Convert to SceneBoundary objects
    scenes = []
    for scene_data in raw_scenes:
        start_frame = scene_data["start_frame"]
        end_frame = scene_data["end_frame"]
        start_time = float(scene_data["start_time"])
        end_time = float(scene_data["end_time"])

        scenes.append(
            SceneBoundary(
                start=start_time,
                end=end_time,
                start_frame=start_frame,
                end_frame=end_frame,
            )
        )

    if self.min_scene_length > 0:
        scenes = self._merge_short_scenes(scenes)

    return scenes

detect_from_path classmethod

detect_from_path(
    path: str | Path,
    threshold: float = 0.5,
    min_scene_length: float = 0.5,
) -> list[SceneBoundary]

Convenience method for one-shot scene detection.

Parameters:

Name Type Description Default
path str | Path

Path to video file.

required
threshold float

Scene boundary threshold (0.0-1.0).

0.5
min_scene_length float

Minimum scene duration in seconds.

0.5

Returns:

Type Description
list[SceneBoundary]

List of SceneBoundary objects representing detected scenes.

Source code in src/videopython/ai/understanding/temporal.py
@classmethod
def detect_from_path(
    cls,
    path: str | Path,
    threshold: float = 0.5,
    min_scene_length: float = 0.5,
) -> list[SceneBoundary]:
    """Convenience method for one-shot scene detection.

    Args:
        path: Path to video file.
        threshold: Scene boundary threshold (0.0-1.0).
        min_scene_length: Minimum scene duration in seconds.

    Returns:
        List of SceneBoundary objects representing detected scenes.
    """
    detector = cls(threshold=threshold, min_scene_length=min_scene_length)
    return detector.detect_streaming(path)

Scene Data Classes

These classes are used by scene and audio analyzers to represent analysis results:

SceneBoundary

SceneBoundary dataclass

Timing information for a detected scene.

A lightweight structure representing scene boundaries detected by SceneDetector. This is a backbone type - higher-level scene analysis belongs in orchestration packages.

Attributes:

Name Type Description
start float

Scene start time in seconds

end float

Scene end time in seconds

start_frame int

Index of the first frame in this scene

end_frame int

Index of the last frame in this scene (exclusive)

Source code in src/videopython/base/description.py
@dataclass
class SceneBoundary:
    """Timing information for a detected scene.

    A lightweight structure representing scene boundaries detected by SceneDetector.
    This is a backbone type - higher-level scene analysis belongs in orchestration packages.

    Attributes:
        start: Scene start time in seconds
        end: Scene end time in seconds
        start_frame: Index of the first frame in this scene
        end_frame: Index of the last frame in this scene (exclusive)
    """

    start: float
    end: float
    start_frame: int
    end_frame: int

    @property
    def duration(self) -> float:
        """Duration of the scene in seconds."""
        return self.end - self.start

    @property
    def frame_count(self) -> int:
        """Number of frames in this scene."""
        return self.end_frame - self.start_frame

    def to_dict(self) -> dict:
        """Convert to dictionary for JSON serialization."""
        return {
            "start": self.start,
            "end": self.end,
            "start_frame": self.start_frame,
            "end_frame": self.end_frame,
        }

    @classmethod
    def from_dict(cls, data: dict) -> "SceneBoundary":
        """Create SceneBoundary from dictionary."""
        return cls(
            start=data["start"],
            end=data["end"],
            start_frame=data["start_frame"],
            end_frame=data["end_frame"],
        )

duration property

duration: float

Duration of the scene in seconds.

frame_count property

frame_count: int

Number of frames in this scene.

to_dict

to_dict() -> dict

Convert to dictionary for JSON serialization.

Source code in src/videopython/base/description.py
def to_dict(self) -> dict:
    """Convert to dictionary for JSON serialization."""
    return {
        "start": self.start,
        "end": self.end,
        "start_frame": self.start_frame,
        "end_frame": self.end_frame,
    }

from_dict classmethod

from_dict(data: dict) -> 'SceneBoundary'

Create SceneBoundary from dictionary.

Source code in src/videopython/base/description.py
@classmethod
def from_dict(cls, data: dict) -> "SceneBoundary":
    """Create SceneBoundary from dictionary."""
    return cls(
        start=data["start"],
        end=data["end"],
        start_frame=data["start_frame"],
        end_frame=data["end_frame"],
    )

BoundingBox

BoundingBox dataclass

A bounding box for detected objects in an image.

Coordinates are normalized to [0, 1] range relative to image dimensions.

Attributes:

Name Type Description
x float

Left edge of the box (0 = left edge of image)

y float

Top edge of the box (0 = top edge of image)

width float

Width of the box

height float

Height of the box

Source code in src/videopython/base/description.py
@dataclass
class BoundingBox:
    """A bounding box for detected objects in an image.

    Coordinates are normalized to [0, 1] range relative to image dimensions.

    Attributes:
        x: Left edge of the box (0 = left edge of image)
        y: Top edge of the box (0 = top edge of image)
        width: Width of the box
        height: Height of the box
    """

    x: float
    y: float
    width: float
    height: float

    @property
    def center(self) -> tuple[float, float]:
        """Center point of the bounding box."""
        return (self.x + self.width / 2, self.y + self.height / 2)

    @property
    def area(self) -> float:
        """Area of the bounding box (normalized)."""
        return self.width * self.height

    def to_dict(self) -> dict:
        """Convert to dictionary for JSON serialization."""
        return {"x": self.x, "y": self.y, "width": self.width, "height": self.height}

    @classmethod
    def from_dict(cls, data: dict) -> BoundingBox:
        """Create BoundingBox from dictionary."""
        return cls(x=data["x"], y=data["y"], width=data["width"], height=data["height"])

center property

center: tuple[float, float]

Center point of the bounding box.

area property

area: float

Area of the bounding box (normalized).

to_dict

to_dict() -> dict

Convert to dictionary for JSON serialization.

Source code in src/videopython/base/description.py
def to_dict(self) -> dict:
    """Convert to dictionary for JSON serialization."""
    return {"x": self.x, "y": self.y, "width": self.width, "height": self.height}

from_dict classmethod

from_dict(data: dict) -> BoundingBox

Create BoundingBox from dictionary.

Source code in src/videopython/base/description.py
@classmethod
def from_dict(cls, data: dict) -> BoundingBox:
    """Create BoundingBox from dictionary."""
    return cls(x=data["x"], y=data["y"], width=data["width"], height=data["height"])

DetectedObject

DetectedObject dataclass

An object detected in a video frame.

Attributes:

Name Type Description
label str

Name/class of the detected object (e.g., "person", "car", "dog")

confidence float

Detection confidence score between 0 and 1

bounding_box BoundingBox | None

Optional bounding box location of the object

Source code in src/videopython/base/description.py
@dataclass
class DetectedObject:
    """An object detected in a video frame.

    Attributes:
        label: Name/class of the detected object (e.g., "person", "car", "dog")
        confidence: Detection confidence score between 0 and 1
        bounding_box: Optional bounding box location of the object
    """

    label: str
    confidence: float
    bounding_box: BoundingBox | None = None

    def to_dict(self) -> dict:
        """Convert to dictionary for JSON serialization."""
        return {
            "label": self.label,
            "confidence": self.confidence,
            "bounding_box": self.bounding_box.to_dict() if self.bounding_box else None,
        }

    @classmethod
    def from_dict(cls, data: dict) -> DetectedObject:
        """Create DetectedObject from dictionary."""
        return cls(
            label=data["label"],
            confidence=data["confidence"],
            bounding_box=BoundingBox.from_dict(data["bounding_box"]) if data.get("bounding_box") else None,
        )

to_dict

to_dict() -> dict

Convert to dictionary for JSON serialization.

Source code in src/videopython/base/description.py
def to_dict(self) -> dict:
    """Convert to dictionary for JSON serialization."""
    return {
        "label": self.label,
        "confidence": self.confidence,
        "bounding_box": self.bounding_box.to_dict() if self.bounding_box else None,
    }

from_dict classmethod

from_dict(data: dict) -> DetectedObject

Create DetectedObject from dictionary.

Source code in src/videopython/base/description.py
@classmethod
def from_dict(cls, data: dict) -> DetectedObject:
    """Create DetectedObject from dictionary."""
    return cls(
        label=data["label"],
        confidence=data["confidence"],
        bounding_box=BoundingBox.from_dict(data["bounding_box"]) if data.get("bounding_box") else None,
    )

DetectedText

DetectedText dataclass

Text detected in a video frame.

Attributes:

Name Type Description
text str

OCR text content

confidence float

Detection confidence score between 0 and 1

bounding_box BoundingBox | None

Optional normalized bounding box for the text region

Source code in src/videopython/base/description.py
@dataclass
class DetectedText:
    """Text detected in a video frame.

    Attributes:
        text: OCR text content
        confidence: Detection confidence score between 0 and 1
        bounding_box: Optional normalized bounding box for the text region
    """

    text: str
    confidence: float
    bounding_box: BoundingBox | None = None

    def to_dict(self) -> dict:
        """Convert to dictionary for JSON serialization."""
        return {
            "text": self.text,
            "confidence": self.confidence,
            "bounding_box": self.bounding_box.to_dict() if self.bounding_box else None,
        }

    @classmethod
    def from_dict(cls, data: dict) -> "DetectedText":
        """Create DetectedText from dictionary."""
        return cls(
            text=data["text"],
            confidence=data["confidence"],
            bounding_box=BoundingBox.from_dict(data["bounding_box"]) if data.get("bounding_box") else None,
        )

to_dict

to_dict() -> dict

Convert to dictionary for JSON serialization.

Source code in src/videopython/base/description.py
def to_dict(self) -> dict:
    """Convert to dictionary for JSON serialization."""
    return {
        "text": self.text,
        "confidence": self.confidence,
        "bounding_box": self.bounding_box.to_dict() if self.bounding_box else None,
    }

from_dict classmethod

from_dict(data: dict) -> 'DetectedText'

Create DetectedText from dictionary.

Source code in src/videopython/base/description.py
@classmethod
def from_dict(cls, data: dict) -> "DetectedText":
    """Create DetectedText from dictionary."""
    return cls(
        text=data["text"],
        confidence=data["confidence"],
        bounding_box=BoundingBox.from_dict(data["bounding_box"]) if data.get("bounding_box") else None,
    )

AudioEvent

AudioEvent dataclass

A detected audio event with timestamp.

Attributes:

Name Type Description
start float

Start time in seconds

end float

End time in seconds

label str

Name of the detected sound (e.g., "Music", "Speech", "Dog bark")

confidence float

Detection confidence score between 0 and 1

Source code in src/videopython/base/description.py
@dataclass
class AudioEvent:
    """A detected audio event with timestamp.

    Attributes:
        start: Start time in seconds
        end: End time in seconds
        label: Name of the detected sound (e.g., "Music", "Speech", "Dog bark")
        confidence: Detection confidence score between 0 and 1
    """

    start: float
    end: float
    label: str
    confidence: float

    @property
    def duration(self) -> float:
        """Duration of the audio event in seconds."""
        return self.end - self.start

    def to_dict(self) -> dict:
        """Convert to dictionary for JSON serialization."""
        return {
            "start": self.start,
            "end": self.end,
            "label": self.label,
            "confidence": self.confidence,
        }

    @classmethod
    def from_dict(cls, data: dict) -> AudioEvent:
        """Create AudioEvent from dictionary."""
        return cls(
            start=data["start"],
            end=data["end"],
            label=data["label"],
            confidence=data["confidence"],
        )

duration property

duration: float

Duration of the audio event in seconds.

to_dict

to_dict() -> dict

Convert to dictionary for JSON serialization.

Source code in src/videopython/base/description.py
def to_dict(self) -> dict:
    """Convert to dictionary for JSON serialization."""
    return {
        "start": self.start,
        "end": self.end,
        "label": self.label,
        "confidence": self.confidence,
    }

from_dict classmethod

from_dict(data: dict) -> AudioEvent

Create AudioEvent from dictionary.

Source code in src/videopython/base/description.py
@classmethod
def from_dict(cls, data: dict) -> AudioEvent:
    """Create AudioEvent from dictionary."""
    return cls(
        start=data["start"],
        end=data["end"],
        label=data["label"],
        confidence=data["confidence"],
    )

AudioClassification

AudioClassification dataclass

Complete audio classification results.

Attributes:

Name Type Description
events list[AudioEvent]

List of detected audio events with timestamps

clip_predictions dict[str, float]

Overall class probabilities for the entire audio clip

Source code in src/videopython/base/description.py
@dataclass
class AudioClassification:
    """Complete audio classification results.

    Attributes:
        events: List of detected audio events with timestamps
        clip_predictions: Overall class probabilities for the entire audio clip
    """

    events: list[AudioEvent]
    clip_predictions: dict[str, float] = field(default_factory=dict)

    def to_dict(self) -> dict:
        """Convert to dictionary for JSON serialization."""
        return {
            "events": [event.to_dict() for event in self.events],
            "clip_predictions": self.clip_predictions,
        }

    @classmethod
    def from_dict(cls, data: dict) -> "AudioClassification":
        """Create AudioClassification from dictionary."""
        return cls(
            events=[AudioEvent.from_dict(event) for event in data.get("events", [])],
            clip_predictions={k: float(v) for k, v in data.get("clip_predictions", {}).items()},
        )

to_dict

to_dict() -> dict

Convert to dictionary for JSON serialization.

Source code in src/videopython/base/description.py
def to_dict(self) -> dict:
    """Convert to dictionary for JSON serialization."""
    return {
        "events": [event.to_dict() for event in self.events],
        "clip_predictions": self.clip_predictions,
    }

from_dict classmethod

from_dict(data: dict) -> 'AudioClassification'

Create AudioClassification from dictionary.

Source code in src/videopython/base/description.py
@classmethod
def from_dict(cls, data: dict) -> "AudioClassification":
    """Create AudioClassification from dictionary."""
    return cls(
        events=[AudioEvent.from_dict(event) for event in data.get("events", [])],
        clip_predictions={k: float(v) for k, v in data.get("clip_predictions", {}).items()},
    )

DetectedAction

DetectedAction dataclass

An action/activity detected in a video segment.

Attributes:

Name Type Description
label str

Name of the detected action (e.g., "walking", "running", "dancing")

confidence float

Detection confidence score between 0 and 1

start_frame int | None

Start frame index of the action

end_frame int | None

End frame index of the action (exclusive)

start_time float | None

Start time in seconds

end_time float | None

End time in seconds

Source code in src/videopython/base/description.py
@dataclass
class DetectedAction:
    """An action/activity detected in a video segment.

    Attributes:
        label: Name of the detected action (e.g., "walking", "running", "dancing")
        confidence: Detection confidence score between 0 and 1
        start_frame: Start frame index of the action
        end_frame: End frame index of the action (exclusive)
        start_time: Start time in seconds
        end_time: End time in seconds
    """

    label: str
    confidence: float
    start_frame: int | None = None
    end_frame: int | None = None
    start_time: float | None = None
    end_time: float | None = None

    @property
    def duration(self) -> float | None:
        """Duration of the action in seconds."""
        if self.start_time is not None and self.end_time is not None:
            return self.end_time - self.start_time
        return None

    def to_dict(self) -> dict:
        """Convert to dictionary for JSON serialization."""
        return {
            "label": self.label,
            "confidence": self.confidence,
            "start_frame": self.start_frame,
            "end_frame": self.end_frame,
            "start_time": self.start_time,
            "end_time": self.end_time,
        }

    @classmethod
    def from_dict(cls, data: dict) -> "DetectedAction":
        """Create DetectedAction from dictionary."""
        return cls(
            label=data["label"],
            confidence=data["confidence"],
            start_frame=data.get("start_frame"),
            end_frame=data.get("end_frame"),
            start_time=data.get("start_time"),
            end_time=data.get("end_time"),
        )

duration property

duration: float | None

Duration of the action in seconds.

to_dict

to_dict() -> dict

Convert to dictionary for JSON serialization.

Source code in src/videopython/base/description.py
def to_dict(self) -> dict:
    """Convert to dictionary for JSON serialization."""
    return {
        "label": self.label,
        "confidence": self.confidence,
        "start_frame": self.start_frame,
        "end_frame": self.end_frame,
        "start_time": self.start_time,
        "end_time": self.end_time,
    }

from_dict classmethod

from_dict(data: dict) -> 'DetectedAction'

Create DetectedAction from dictionary.

Source code in src/videopython/base/description.py
@classmethod
def from_dict(cls, data: dict) -> "DetectedAction":
    """Create DetectedAction from dictionary."""
    return cls(
        label=data["label"],
        confidence=data["confidence"],
        start_frame=data.get("start_frame"),
        end_frame=data.get("end_frame"),
        start_time=data.get("start_time"),
        end_time=data.get("end_time"),
    )