Skip to content

AI Understanding

Analyze videos, transcribe audio, describe visual content, and track faces per shot.

For a single aggregate, serializable analysis object across multiple analyzers, see Video Analysis.

Local Model Support

Class Local Model Family
SceneVLM Qwen3.5 (4B / 9B / 27B)
AudioToText Whisper
AudioClassifier AST
SemanticSceneDetector TransNetV2
FaceTracker YOLOv8-face

AudioToText

Anti-hallucination knobs

Three Whisper decoder kwargs are surfaced for tuning on noisy or sparse-speech audio:

from videopython.ai import AudioToText

# Defaults: condition_on_previous_text=False (the cascading-hallucination fix),
# no_speech_threshold=0.6, logprob_threshold=-1.0.
transcriber = AudioToText()

# Tighter no-speech gate to drop more low-confidence windows on a film with
# heavy ambient music.
transcriber = AudioToText(no_speech_threshold=0.85)

# Restore Whisper's upstream default conditioning (e.g. for clean podcasts
# where cross-window context helps disambiguate homophones).
transcriber = AudioToText(condition_on_previous_text=True)

Brand-name vocabulary biasing

Bias Whisper's first-window decoder toward a caller-supplied list of brand names, product names, or proper nouns via the native initial_prompt channel. Recovers near-mishears (e.g. Klarna → "carna", InPost → "in post") on brand-monitoring inputs without any new model dependencies.

from videopython.ai import AudioToText

# Constructor default — applies to every transcribe() call on this instance.
transcriber = AudioToText(vocabulary=["Klarna", "Allegro", "InPost"])
result = transcriber.transcribe(video)

# Per-call override — useful when one transcriber serves multiple tenants.
result = transcriber.transcribe(video, vocabulary=["Pyszne", "Wolt"])

The list is normalized at construction (whitespace stripped, case-insensitive dedup, casing of the first occurrence preserved). Whisper reserves ~224 tokens for the prompt; longer lists are trimmed from the tail with a single WARNING log line naming the count dropped.

VideoDubber and LocalDubbingPipeline accept the same vocabulary kwarg; it threads through to the underlying transcriber. Within VideoAnalyzer, pass it via analyzer_params:

from videopython.ai import VideoAnalyzer
from videopython.ai.video_analysis import VideoAnalysisConfig

config = VideoAnalysisConfig(
    analyzer_params={"audio_to_text": {"vocabulary": ["Klarna", "Allegro"]}}
)
analysis = VideoAnalyzer(config=config).analyze_path("brand_review.mp4")

Recovers names Whisper almost heard correctly. It will not catch zero-prior names; an LLM correction pass would close that gap.

Per-segment confidence

TranscriptionSegment carries three optional confidence fields populated from the raw Whisper output: avg_logprob, no_speech_prob, and compression_ratio. They are None when not available (e.g. on the diarization-only path that builds segments from words without overlap match, or on transcripts loaded from formats that don't carry the metadata).

These signals feed the dubbing pipeline's transcript-quality gate (median avg_logprob is one of three reject flags) and Qwen3's confidence-aware translation prompt (segments below threshold get a low_confidence hint). They are also useful for downstream callers that want to drop low-quality segments before further processing.

result = AudioToText().transcribe(video)
for segment in result.segments:
    if segment.avg_logprob is not None and segment.avg_logprob < -1.0:
        print(f"low confidence: {segment.text!r}")

AudioToText

Transcription service for audio and video using local Whisper models.

Uses openai-whisper for transcription (with word-level timestamps) and pyannote-audio for optional speaker diarization. By default, Silero VAD runs before Whisper to gate language detection on a 30s window built from voiced regions only — fixes Whisper's tendency to lock onto the wrong language when the file opens with silence, music, or non-vocal credits. Disable with enable_vad=False to reproduce pre-0.27 behaviour.

Three Whisper decoder kwargs are surfaced for anti-hallucination tuning:

  • condition_on_previous_text defaults to False (Whisper's own default is True). With conditioning on, a single hallucinated filler phrase cascades through the rest of the file because each window's decoder is primed by the previous window's decoded text. Turning it off is the most commonly recommended fix for that failure mode; the cost on clean audio is small (slightly less context for ambiguous homophones across sentence boundaries).
  • no_speech_threshold and logprob_threshold are forwarded with Whisper's documented defaults (0.6 and -1.0); raising no_speech_threshold biases toward dropping low-confidence windows instead of emitting filler.

vocabulary biases Whisper's first-window decoder toward a caller- supplied list of brand names, product names, or proper nouns via the native initial_prompt channel. Recovers near-mishears (e.g. Klarna → "carna") without new model deps; will not catch zero-prior names. Per-call override is available on :meth:transcribe.

Source code in src/videopython/ai/understanding/audio.py
class AudioToText:
    """Transcription service for audio and video using local Whisper models.

    Uses openai-whisper for transcription (with word-level timestamps) and
    pyannote-audio for optional speaker diarization. By default, Silero VAD
    runs before Whisper to gate language detection on a 30s window built from
    voiced regions only — fixes Whisper's tendency to lock onto the wrong
    language when the file opens with silence, music, or non-vocal credits.
    Disable with ``enable_vad=False`` to reproduce pre-0.27 behaviour.

    Three Whisper decoder kwargs are surfaced for anti-hallucination tuning:

    - ``condition_on_previous_text`` defaults to ``False`` (Whisper's own
      default is ``True``). With conditioning on, a single hallucinated filler
      phrase cascades through the rest of the file because each window's
      decoder is primed by the previous window's decoded text. Turning it off
      is the most commonly recommended fix for that failure mode; the cost on
      clean audio is small (slightly less context for ambiguous homophones
      across sentence boundaries).
    - ``no_speech_threshold`` and ``logprob_threshold`` are forwarded with
      Whisper's documented defaults (``0.6`` and ``-1.0``); raising
      ``no_speech_threshold`` biases toward dropping low-confidence windows
      instead of emitting filler.

    ``vocabulary`` biases Whisper's first-window decoder toward a caller-
    supplied list of brand names, product names, or proper nouns via the
    native ``initial_prompt`` channel. Recovers near-mishears (e.g. Klarna
    → "carna") without new model deps; will not catch zero-prior names.
    Per-call override is available on :meth:`transcribe`.
    """

    PYANNOTE_DIARIZATION_MODEL = "pyannote/speaker-diarization-community-1"

    def __init__(
        self,
        model_name: Literal["tiny", "base", "small", "medium", "large", "turbo"] = "turbo",
        enable_diarization: bool = False,
        enable_vad: bool = True,
        condition_on_previous_text: bool = False,
        no_speech_threshold: float = 0.6,
        logprob_threshold: float | None = -1.0,
        vocabulary: list[str] | None = None,
        device: str | None = None,
    ):
        self.model_name = model_name
        self.enable_diarization = enable_diarization
        self.enable_vad = enable_vad
        self.condition_on_previous_text = condition_on_previous_text
        self.no_speech_threshold = no_speech_threshold
        self.logprob_threshold = logprob_threshold
        self.vocabulary = _normalize_vocabulary(vocabulary)
        self.device = select_device(device, mps_allowed=False)
        log_device_initialization(
            "AudioToText",
            requested_device=device,
            resolved_device=self.device,
        )
        self._model: Any = None
        self._diarization_pipeline: Any = None
        self._vad_model: Any = None

    def _transcribe_kwargs(self, language: str | None, vocabulary: list[str]) -> dict[str, Any]:
        """Kwargs threaded into ``whisper.Whisper.transcribe`` from both call sites.
        ``initial_prompt`` is omitted entirely on the no-vocab path."""
        kwargs: dict[str, Any] = {
            "word_timestamps": True,
            "language": language,
            "condition_on_previous_text": self.condition_on_previous_text,
            "no_speech_threshold": self.no_speech_threshold,
            "logprob_threshold": self.logprob_threshold,
        }
        prompt = _build_initial_prompt(vocabulary)
        if prompt is not None:
            kwargs["initial_prompt"] = prompt
        return kwargs

    def _init_local(self) -> None:
        """Initialize local Whisper model."""
        import whisper

        self._model = whisper.load_model(name=self.model_name, device=self.device)

    def _init_diarization(self) -> None:
        """Initialize pyannote speaker diarization pipeline."""
        import torch
        from pyannote.audio import Pipeline

        self._diarization_pipeline = Pipeline.from_pretrained(self.PYANNOTE_DIARIZATION_MODEL)
        self._diarization_pipeline.to(torch.device(self.device))

    def _init_vad(self) -> None:
        """Initialize Silero VAD model.

        The model is ~2 MB and CPU-fast (~5-15s for a 90 min movie); we keep
        it on CPU regardless of ``self.device`` since dispatch overhead would
        outweigh inference cost.
        """
        from silero_vad import load_silero_vad

        self._vad_model = load_silero_vad()

    def unload(self) -> None:
        """Release the Whisper, diarization, and VAD models so the next call re-initializes.

        Used by low-memory dubbing to free VRAM between pipeline stages.
        """
        self._model = None
        self._diarization_pipeline = None
        self._vad_model = None
        release_device_memory(self.device)

    def _process_transcription_result(self, transcription_result: dict[str, Any]) -> Transcription:
        """Process raw transcription result into a Transcription object."""
        transcription_segments = []
        for segment in transcription_result["segments"]:
            transcription_words = [
                TranscriptionWord(word=word["word"], start=float(word["start"]), end=float(word["end"]))
                for word in segment.get("words", [])
            ]
            transcription_segment = TranscriptionSegment(
                start=segment["start"],
                end=segment["end"],
                text=segment["text"],
                words=transcription_words,
                avg_logprob=segment.get("avg_logprob"),
                no_speech_prob=segment.get("no_speech_prob"),
                compression_ratio=segment.get("compression_ratio"),
            )
            transcription_segments.append(transcription_segment)

        return Transcription(segments=transcription_segments, language=transcription_result.get("language"))

    @staticmethod
    def _assign_speakers_to_words(
        words: list[TranscriptionWord],
        diarization_result: Any,
    ) -> list[TranscriptionWord]:
        """Assign speaker labels to words based on diarization segment overlap.

        For each word, finds the diarization segment with the greatest time overlap
        and assigns that speaker. Words with no overlapping diarization segment get
        the nearest speaker by midpoint distance.
        """
        speaker_segments: list[tuple[float, float, str]] = []
        # pyannote-audio 4.x returns DiarizeOutput; use exclusive_speaker_diarization
        # (no overlapping turns) for cleaner word assignment.
        annotation = getattr(diarization_result, "exclusive_speaker_diarization", diarization_result)
        for turn, _, speaker in annotation.itertracks(yield_label=True):
            speaker_segments.append((turn.start, turn.end, speaker))

        if not speaker_segments:
            return words

        result = []
        for word in words:
            best_speaker: str | None = None
            best_overlap = 0.0

            for seg_start, seg_end, speaker in speaker_segments:
                overlap = max(0.0, min(word.end, seg_end) - max(word.start, seg_start))
                if overlap > best_overlap:
                    best_overlap = overlap
                    best_speaker = speaker

            if best_speaker is None:
                word_mid = (word.start + word.end) / 2.0
                best_dist = float("inf")
                for seg_start, seg_end, speaker in speaker_segments:
                    seg_mid = (seg_start + seg_end) / 2.0
                    dist = abs(word_mid - seg_mid)
                    if dist < best_dist:
                        best_dist = dist
                        best_speaker = speaker

            result.append(
                TranscriptionWord(
                    word=word.word,
                    start=word.start,
                    end=word.end,
                    speaker=best_speaker,
                )
            )
        return result

    def diarize_transcription(self, audio: Audio, transcription: Transcription) -> Transcription:
        """Attach speaker labels to a pre-computed transcription using pyannote.

        Useful when callers have a transcription (e.g. pre-computed and edited)
        but no speakers, and want per-speaker voice cloning in dubbing without
        re-running Whisper. Runs pyannote standalone on ``audio`` and overlays
        speakers onto the supplied transcription's words.

        Requires word-level timings: at least one segment must contain more
        than one word. Transcriptions loaded from SRT (one synthetic word per
        segment) will not produce useful speakers and are rejected.
        """
        import numpy as np
        import torch

        all_words: list[TranscriptionWord] = list(transcription.words)
        if not all_words:
            raise ValueError("Cannot diarize a transcription with no words.")

        if not any(len(seg.words) > 1 for seg in transcription.segments):
            raise ValueError(
                "Cannot diarize a transcription without word-level timings. "
                "Supplied transcription has at most one word per segment "
                "(e.g. loaded from SRT). Provide a transcription with "
                "word-level timings, or omit `transcription` to let the "
                "pipeline transcribe and diarize from scratch."
            )

        if self._diarization_pipeline is None:
            self._init_diarization()

        import whisper

        audio_mono = audio.to_mono().resample(whisper.audio.SAMPLE_RATE)
        waveform = torch.from_numpy(audio_mono.data.astype(np.float32)).unsqueeze(0)
        diarization_result = self._diarization_pipeline(
            {"waveform": waveform, "sample_rate": audio_mono.metadata.sample_rate}
        )

        all_words = self._assign_speakers_to_words(all_words, diarization_result)
        return Transcription(words=all_words, language=transcription.language)

    def _run_vad(self, audio_mono: Audio) -> list[tuple[float, float]]:
        """Return voiced spans in seconds using Silero VAD.

        Audio must already be mono at ``whisper.audio.SAMPLE_RATE`` (16 kHz),
        which is one of Silero's two supported rates.
        """
        import numpy as np
        import torch

        if self._vad_model is None:
            self._init_vad()

        from silero_vad import get_speech_timestamps

        waveform = torch.from_numpy(audio_mono.data.astype(np.float32))
        timestamps = get_speech_timestamps(
            waveform,
            self._vad_model,
            sampling_rate=audio_mono.metadata.sample_rate,
            return_seconds=True,
        )
        return [(float(ts["start"]), float(ts["end"])) for ts in timestamps]

    def _detect_language(self, audio_mono: Audio, voiced_spans: list[tuple[float, float]]) -> str:
        """Run Whisper language detection on a 30s window of voiced audio.

        Whisper's auto-detection only inspects the first 30s of input. When
        the file opens with silence/music/credits, that window contains no
        speech and detection picks the closest-looking thing (typically
        English). Concatenating voiced spans up to 30s and running
        ``model.detect_language()`` on the resulting mel fixes this.
        """
        import numpy as np
        import torch
        import whisper

        sample_rate = audio_mono.metadata.sample_rate
        chunks: list[np.ndarray] = []
        remaining = whisper.audio.N_SAMPLES
        for start, end in voiced_spans:
            if remaining <= 0:
                break
            chunk = audio_mono.data[int(start * sample_rate) : int(end * sample_rate)][:remaining]
            chunks.append(chunk)
            remaining -= len(chunk)

        voiced_audio = np.concatenate(chunks).astype(np.float32) if chunks else np.zeros(0, dtype=np.float32)
        padded = whisper.audio.pad_or_trim(torch.from_numpy(voiced_audio))
        mel = whisper.audio.log_mel_spectrogram(padded, n_mels=self._model.dims.n_mels).to(self._model.device)

        _, probs = self._model.detect_language(mel)
        return max(probs, key=probs.get)

    def _transcribe_with_diarization(
        self, audio_mono: Audio, language: str | None, vocabulary: list[str]
    ) -> Transcription:
        """Transcribe with word timestamps and assign speakers via pyannote."""
        import numpy as np
        import torch

        if self._diarization_pipeline is None:
            self._init_diarization()

        audio_data = audio_mono.data
        transcription_result = self._model.transcribe(audio=audio_data, **self._transcribe_kwargs(language, vocabulary))

        waveform = torch.from_numpy(audio_data.astype(np.float32)).unsqueeze(0)
        diarization_result = self._diarization_pipeline(
            {"waveform": waveform, "sample_rate": audio_mono.metadata.sample_rate}
        )

        transcription = self._process_transcription_result(transcription_result)

        # Capture original Whisper segments before flattening to words. The
        # diarization rebuild via Transcription(words=...) regroups by speaker,
        # which loses the per-segment confidence M1.3 plumbed through. We
        # re-attach by max-overlap match below so M2's confidence-aware
        # translation prompts have signal on the diarized path too.
        whisper_segments = transcription.segments

        all_words: list[TranscriptionWord] = []
        for seg in transcription.segments:
            all_words.extend(seg.words)

        if all_words:
            all_words = self._assign_speakers_to_words(all_words, diarization_result)

        rebuilt = Transcription(words=all_words, language=transcription.language)
        _attach_confidence_by_overlap(rebuilt.segments, whisper_segments)
        return rebuilt

    def _transcribe_local(self, audio: Audio, vocabulary: list[str]) -> Transcription:
        """Transcribe using local Whisper model.

        When ``enable_vad`` is True (default), Silero VAD locates voiced
        regions and a 30s voiced window is used for Whisper language
        detection -- avoiding the well-known failure where Whisper locks
        onto the wrong language because the first 30s of input is silence
        or music. The detected language is then passed into
        ``transcribe()`` so chunked decoding stays consistent. If VAD
        finds no speech, an empty Transcription is returned without
        invoking Whisper.
        """
        import whisper

        if self._model is None:
            self._init_local()

        audio_mono = audio.to_mono().resample(whisper.audio.SAMPLE_RATE)

        language: str | None = None
        if self.enable_vad:
            voiced_spans = self._run_vad(audio_mono)
            if not voiced_spans:
                return Transcription(segments=[])
            language = self._detect_language(audio_mono, voiced_spans)

        if self.enable_diarization:
            return self._transcribe_with_diarization(audio_mono, language, vocabulary)

        transcription_result = self._model.transcribe(
            audio=audio_mono.data, **self._transcribe_kwargs(language, vocabulary)
        )
        return self._process_transcription_result(transcription_result)

    def transcribe(self, media: Audio | Video, vocabulary: list[str] | None = None) -> Transcription:
        """Transcribe audio or video to text.

        ``vocabulary`` overrides the constructor default for this call only;
        a per-call list wins over the instance's vocabulary so one
        :class:`AudioToText` instance can serve multiple tenants. Pass
        ``None`` (the default) to use the constructor's list.
        """
        if isinstance(media, Video):
            if media.audio.is_silent:
                return Transcription(segments=[])
            audio = media.audio
        elif isinstance(media, Audio):
            if media.is_silent:
                return Transcription(segments=[])
            audio = media
        else:
            raise TypeError(f"Unsupported media type: {type(media)}. Expected Audio or Video.")

        effective_vocab = self.vocabulary if vocabulary is None else _normalize_vocabulary(vocabulary)
        return self._transcribe_local(audio, effective_vocab)

unload

unload() -> None

Release the Whisper, diarization, and VAD models so the next call re-initializes.

Used by low-memory dubbing to free VRAM between pipeline stages.

Source code in src/videopython/ai/understanding/audio.py
def unload(self) -> None:
    """Release the Whisper, diarization, and VAD models so the next call re-initializes.

    Used by low-memory dubbing to free VRAM between pipeline stages.
    """
    self._model = None
    self._diarization_pipeline = None
    self._vad_model = None
    release_device_memory(self.device)

diarize_transcription

diarize_transcription(
    audio: Audio, transcription: Transcription
) -> Transcription

Attach speaker labels to a pre-computed transcription using pyannote.

Useful when callers have a transcription (e.g. pre-computed and edited) but no speakers, and want per-speaker voice cloning in dubbing without re-running Whisper. Runs pyannote standalone on audio and overlays speakers onto the supplied transcription's words.

Requires word-level timings: at least one segment must contain more than one word. Transcriptions loaded from SRT (one synthetic word per segment) will not produce useful speakers and are rejected.

Source code in src/videopython/ai/understanding/audio.py
def diarize_transcription(self, audio: Audio, transcription: Transcription) -> Transcription:
    """Attach speaker labels to a pre-computed transcription using pyannote.

    Useful when callers have a transcription (e.g. pre-computed and edited)
    but no speakers, and want per-speaker voice cloning in dubbing without
    re-running Whisper. Runs pyannote standalone on ``audio`` and overlays
    speakers onto the supplied transcription's words.

    Requires word-level timings: at least one segment must contain more
    than one word. Transcriptions loaded from SRT (one synthetic word per
    segment) will not produce useful speakers and are rejected.
    """
    import numpy as np
    import torch

    all_words: list[TranscriptionWord] = list(transcription.words)
    if not all_words:
        raise ValueError("Cannot diarize a transcription with no words.")

    if not any(len(seg.words) > 1 for seg in transcription.segments):
        raise ValueError(
            "Cannot diarize a transcription without word-level timings. "
            "Supplied transcription has at most one word per segment "
            "(e.g. loaded from SRT). Provide a transcription with "
            "word-level timings, or omit `transcription` to let the "
            "pipeline transcribe and diarize from scratch."
        )

    if self._diarization_pipeline is None:
        self._init_diarization()

    import whisper

    audio_mono = audio.to_mono().resample(whisper.audio.SAMPLE_RATE)
    waveform = torch.from_numpy(audio_mono.data.astype(np.float32)).unsqueeze(0)
    diarization_result = self._diarization_pipeline(
        {"waveform": waveform, "sample_rate": audio_mono.metadata.sample_rate}
    )

    all_words = self._assign_speakers_to_words(all_words, diarization_result)
    return Transcription(words=all_words, language=transcription.language)

transcribe

transcribe(
    media: Audio | Video,
    vocabulary: list[str] | None = None,
) -> Transcription

Transcribe audio or video to text.

vocabulary overrides the constructor default for this call only; a per-call list wins over the instance's vocabulary so one :class:AudioToText instance can serve multiple tenants. Pass None (the default) to use the constructor's list.

Source code in src/videopython/ai/understanding/audio.py
def transcribe(self, media: Audio | Video, vocabulary: list[str] | None = None) -> Transcription:
    """Transcribe audio or video to text.

    ``vocabulary`` overrides the constructor default for this call only;
    a per-call list wins over the instance's vocabulary so one
    :class:`AudioToText` instance can serve multiple tenants. Pass
    ``None`` (the default) to use the constructor's list.
    """
    if isinstance(media, Video):
        if media.audio.is_silent:
            return Transcription(segments=[])
        audio = media.audio
    elif isinstance(media, Audio):
        if media.is_silent:
            return Transcription(segments=[])
        audio = media
    else:
        raise TypeError(f"Unsupported media type: {type(media)}. Expected Audio or Video.")

    effective_vocab = self.vocabulary if vocabulary is None else _normalize_vocabulary(vocabulary)
    return self._transcribe_local(audio, effective_vocab)

AudioClassifier

Detect and classify sounds, music, and audio events with timestamps using Audio Spectrogram Transformer (AST), a state-of-the-art model achieving 0.485 mAP on AudioSet.

Basic Usage

from videopython.ai import AudioClassifier
from videopython.base import Video

classifier = AudioClassifier(confidence_threshold=0.3)
video = Video.from_path("video.mp4")

result = classifier.classify(video)

# Clip-level predictions (overall audio content)
for label, confidence in result.clip_predictions.items():
    print(f"{label}: {confidence:.2f}")

# Timestamped events
for event in result.events:
    print(f"{event.start:.1f}s - {event.end:.1f}s: {event.label} ({event.confidence:.2f})")

AudioClassifier

Audio event and sound classification using AST.

Source code in src/videopython/ai/understanding/audio.py
class AudioClassifier:
    """Audio event and sound classification using AST."""

    SUPPORTED_MODELS: list[str] = ["MIT/ast-finetuned-audioset-10-10-0.4593"]
    AST_SAMPLE_RATE: int = 16000
    AST_CHUNK_SECONDS: float = 10.0
    AST_HOP_SECONDS: float = 5.0

    def __init__(
        self,
        model_name: str = "MIT/ast-finetuned-audioset-10-10-0.4593",
        confidence_threshold: float = 0.3,
        top_k: int = 10,
        device: str | None = None,
    ):
        if model_name not in self.SUPPORTED_MODELS:
            raise ValueError(f"Model '{model_name}' not supported. Supported: {self.SUPPORTED_MODELS}")

        self.model_name = model_name
        self.confidence_threshold = confidence_threshold
        self.top_k = top_k
        self.device = select_device(device, mps_allowed=True)
        log_device_initialization(
            "AudioClassifier",
            requested_device=device,
            resolved_device=self.device,
        )

        self._model: Any = None
        self._processor: Any = None
        self._labels: list[str] = []

    def _init_local(self) -> None:
        """Initialize local AST model from HuggingFace."""
        from transformers import ASTFeatureExtractor, ASTForAudioClassification

        self._processor = ASTFeatureExtractor.from_pretrained(self.model_name)
        self._model = ASTForAudioClassification.from_pretrained(self.model_name)
        self._model.to(self.device)
        self._model.eval()

        self._labels = [self._model.config.id2label[i] for i in range(len(self._model.config.id2label))]

    def unload(self) -> None:
        """Release the AST model so the next classify() re-initializes.

        Used by low-memory dubbing to free VRAM between pipeline stages.
        """
        self._model = None
        self._processor = None
        release_device_memory(self.device)

    def _merge_events(self, events: list[AudioEvent], gap_threshold: float = 0.5) -> list[AudioEvent]:
        """Merge consecutive events of the same class."""
        if not events:
            return []

        events_by_label: dict[str, list[AudioEvent]] = {}
        for event in events:
            if event.label not in events_by_label:
                events_by_label[event.label] = []
            events_by_label[event.label].append(event)

        merged = []
        for label, label_events in events_by_label.items():
            sorted_events = sorted(label_events, key=lambda e: e.start)
            current = sorted_events[0]

            for next_event in sorted_events[1:]:
                if next_event.start - current.end <= gap_threshold:
                    current = AudioEvent(
                        start=current.start,
                        end=next_event.end,
                        label=label,
                        confidence=max(current.confidence, next_event.confidence),
                    )
                else:
                    merged.append(current)
                    current = next_event

            merged.append(current)

        return sorted(merged, key=lambda e: e.start)

    def _classify_local(self, audio: Audio) -> AudioClassification:
        """Classify audio using local AST model with sliding window."""
        import numpy as np
        import torch

        if self._model is None:
            self._init_local()

        audio_processed = audio.to_mono().resample(self.AST_SAMPLE_RATE)
        audio_data = audio_processed.data.astype(np.float32)

        chunk_samples = int(self.AST_CHUNK_SECONDS * self.AST_SAMPLE_RATE)
        hop_samples = int(self.AST_HOP_SECONDS * self.AST_SAMPLE_RATE)
        total_samples = len(audio_data)

        all_chunk_probs = []
        chunk_times = []

        if total_samples <= chunk_samples:
            chunks = [(0, audio_data)]
        else:
            chunks = []
            start = 0
            while start < total_samples:
                end = min(start + chunk_samples, total_samples)
                chunk = audio_data[start:end]
                if len(chunk) < chunk_samples:
                    chunk = np.pad(chunk, (0, chunk_samples - len(chunk)))
                chunks.append((start, chunk))
                start += hop_samples

        for start_sample, chunk in chunks:
            start_time = start_sample / self.AST_SAMPLE_RATE

            inputs = self._processor(
                chunk,
                sampling_rate=self.AST_SAMPLE_RATE,
                return_tensors="pt",
            )
            inputs = {k: v.to(self.device) for k, v in inputs.items()}

            with torch.no_grad():
                outputs = self._model(**inputs)
                logits = outputs.logits[0]
                probs = torch.sigmoid(logits).cpu().numpy()

            all_chunk_probs.append(probs)
            chunk_times.append(start_time)

        chunk_probs_array = np.array(all_chunk_probs)

        events = []
        for start_time, probs in zip(chunk_times, chunk_probs_array):
            end_time = start_time + self.AST_CHUNK_SECONDS
            top_indices = np.argsort(probs)[-self.top_k :][::-1]

            for class_idx in top_indices:
                confidence = float(probs[class_idx])
                if confidence >= self.confidence_threshold:
                    label = self._labels[class_idx]
                    events.append(
                        AudioEvent(
                            start=start_time,
                            end=min(end_time, total_samples / self.AST_SAMPLE_RATE),
                            label=label,
                            confidence=confidence,
                        )
                    )

        merged_events = self._merge_events(events)

        clip_preds = np.mean(chunk_probs_array, axis=0)
        top_clip_indices = np.argsort(clip_preds)[-self.top_k :][::-1]
        clip_predictions = {
            self._labels[idx]: float(clip_preds[idx])
            for idx in top_clip_indices
            if clip_preds[idx] >= self.confidence_threshold
        }

        return AudioClassification(events=merged_events, clip_predictions=clip_predictions)

    def classify(self, media: Audio | Video) -> AudioClassification:
        """Classify audio events in audio or video."""
        if isinstance(media, Video):
            if media.audio.is_silent:
                return AudioClassification(events=[], clip_predictions={})
            audio = media.audio
        elif isinstance(media, Audio):
            if media.is_silent:
                return AudioClassification(events=[], clip_predictions={})
            audio = media
        else:
            raise TypeError(f"Unsupported media type: {type(media)}. Expected Audio or Video.")

        return self._classify_local(audio)

unload

unload() -> None

Release the AST model so the next classify() re-initializes.

Used by low-memory dubbing to free VRAM between pipeline stages.

Source code in src/videopython/ai/understanding/audio.py
def unload(self) -> None:
    """Release the AST model so the next classify() re-initializes.

    Used by low-memory dubbing to free VRAM between pipeline stages.
    """
    self._model = None
    self._processor = None
    release_device_memory(self.device)

classify

classify(media: Audio | Video) -> AudioClassification

Classify audio events in audio or video.

Source code in src/videopython/ai/understanding/audio.py
def classify(self, media: Audio | Video) -> AudioClassification:
    """Classify audio events in audio or video."""
    if isinstance(media, Video):
        if media.audio.is_silent:
            return AudioClassification(events=[], clip_predictions={})
        audio = media.audio
    elif isinstance(media, Audio):
        if media.is_silent:
            return AudioClassification(events=[], clip_predictions={})
        audio = media
    else:
        raise TypeError(f"Unsupported media type: {type(media)}. Expected Audio or Video.")

    return self._classify_local(audio)

SceneVLM

SceneVLM supports Qwen3.5 dense vision-capable variants via the model_size kwarg: "4b" (default, ~8 GB FP16), "9b" (~18 GB FP16), "27b" (~54 GB FP16, needs ≥48 GB). Device selection is automatic by default (cuda -> mps -> cpu).

analyze_scene() and analyze_frame() return a structured SceneDescription with three fields: a one-sentence caption, an open-list subjects, and a closed-enum shot_type. The class uses few-shot JSON prompting with one parse-retry; on persistent parse failure, the raw text becomes the caption while subjects and shot_type are returned empty / None.

from videopython.ai import SceneVLM

vlm = SceneVLM(model_size="4b")  # default
description = vlm.analyze_frame(frame_array)

print(description.caption)     # "A man in a cap speaks into a microphone."
print(description.subjects)    # ["man", "microphone", "cap"]
print(description.shot_type)   # "medium"

SceneVLM.unload() releases the model + processor for low_memory parity with the dubbing pipeline's translator backends.

SceneVLM

Generates structured scene descriptions with local Qwen3.5.

model_size maps to Qwen3.5 dense vision-capable variants:

4b   -> Qwen/Qwen3.5-4B  (~8 GB FP16; default)
9b   -> Qwen/Qwen3.5-9B  (~18 GB FP16; 24 GB GPU when solo)
27b  -> Qwen/Qwen3.5-27B (~54 GB FP16; needs ≥48 GB)

All sizes return a fully-populated SceneDescription. JSON parse failures fall back to raw-text-as-caption with empty subjects and None shot_type; that path is the rare exception, not a tier.

Source code in src/videopython/ai/understanding/image.py
class SceneVLM:
    """Generates structured scene descriptions with local Qwen3.5.

    ``model_size`` maps to Qwen3.5 dense vision-capable variants:

        4b   -> Qwen/Qwen3.5-4B  (~8 GB FP16; default)
        9b   -> Qwen/Qwen3.5-9B  (~18 GB FP16; 24 GB GPU when solo)
        27b  -> Qwen/Qwen3.5-27B (~54 GB FP16; needs ≥48 GB)

    All sizes return a fully-populated ``SceneDescription``. JSON parse
    failures fall back to raw-text-as-caption with empty subjects and
    None shot_type; that path is the rare exception, not a tier.
    """

    DEFAULT_MAX_IMAGE_PIXELS: int = 384 * 384

    def __init__(
        self,
        model_name: str | None = None,
        device: str | None = None,
        max_new_tokens: int = 256,
        temperature: float = 0.0,
        model_size: SceneVLMModelSize = DEFAULT_SCENE_VLM_MODEL_SIZE,
        max_image_pixels: int | None = None,
    ):
        if model_size not in SCENE_VLM_MODEL_IDS:
            supported = ", ".join(SCENE_VLM_MODEL_IDS)
            raise ValueError(f"model_size must be one of: {supported}")

        self.model_size: SceneVLMModelSize = model_size
        self.model_name = model_name or SCENE_VLM_MODEL_IDS[model_size]
        self.device = device
        self.max_new_tokens = max_new_tokens
        self.temperature = temperature
        self.max_image_pixels = max_image_pixels if max_image_pixels is not None else self.DEFAULT_MAX_IMAGE_PIXELS
        self._processor: Any = None
        self._model: Any = None

        if model_size == "27b":
            self._warn_if_vram_under_large_model_floor()

    @staticmethod
    def _warn_if_vram_under_large_model_floor() -> None:
        """Loud WARNING when ``model_size='27b'`` is requested on a small card.

        Does not raise -- a knowledgeable user may run the 27B model with
        their own quantization layer or accept device off-loading. The
        warning makes the eventual OOM (deep inside ``from_pretrained``)
        easier to diagnose.
        """
        try:
            import torch

            if not torch.cuda.is_available():
                logger.warning(
                    "SceneVLM model_size='27b' requested but CUDA is not "
                    "available. 27B FP16 weights are ~54 GB; running on "
                    "CPU/MPS is likely to OOM."
                )
                return

            free_bytes, _total = torch.cuda.mem_get_info()
            free_gb = free_bytes / (1024**3)
            if free_gb < _LARGE_MODEL_VRAM_WARN_GB:
                logger.warning(
                    "SceneVLM model_size='27b' requested with %.1f GB free VRAM. "
                    "Qwen3.5-27B FP16 needs ~54 GB for weights alone -- expect "
                    "OOM during from_pretrained unless you wired up "
                    "quantization or device offloading.",
                    free_gb,
                )
        except ImportError:
            pass

    def _init_local(self) -> None:
        """Initialize local Qwen3.5 model."""
        import torch
        from transformers import AutoModelForImageTextToText, AutoProcessor

        t0 = time.perf_counter()
        requested_device = self.device
        resolved_device = select_device(self.device, mps_allowed=True)

        self._processor = AutoProcessor.from_pretrained(self.model_name)
        # Save and restore default dtype -- transformers torch_dtype="auto" can
        # mutate torch.get_default_dtype(), which breaks concurrent models
        # (e.g. Whisper) that expect float32.
        saved_dtype = torch.get_default_dtype()
        try:
            self._model = AutoModelForImageTextToText.from_pretrained(self.model_name, torch_dtype="auto")
        finally:
            torch.set_default_dtype(saved_dtype)
        self._model.to(resolved_device)
        self._model.eval()
        self.device = resolved_device

        log_device_initialization(
            "SceneVLM",
            requested_device=requested_device,
            resolved_device=resolved_device,
        )
        logger.info(
            "SceneVLM(%s, model_size=%s) model weights loaded in %.2fs",
            self.model_name,
            self.model_size,
            time.perf_counter() - t0,
        )

    def unload(self) -> None:
        """Release model + processor for ``low_memory`` parity with other stages.

        Mirrors ``MarianTranslator.unload`` / ``Qwen3Translator.unload``. Safe
        to call before ``_init_local`` -- it just clears already-None fields.
        """
        self._model = None
        self._processor = None
        release_device_memory(self.device)

    def _downscale_image(self, img: Image.Image) -> Image.Image:
        """Downscale image to fit within max_image_pixels budget, preserving aspect ratio."""
        w, h = img.size
        pixels = w * h
        if pixels <= self.max_image_pixels:
            return img
        scale = (self.max_image_pixels / pixels) ** 0.5
        new_w = max(1, int(w * scale))
        new_h = max(1, int(h * scale))
        return img.resize((new_w, new_h), Image.LANCZOS)

    def _generation_config_for_run(self) -> Any | None:
        base_config = getattr(self._model, "generation_config", None)
        if base_config is None or not hasattr(base_config, "to_dict"):
            return None

        config = base_config.__class__.from_dict(base_config.to_dict())
        if self.temperature > 0:
            config.do_sample = True
            config.temperature = self.temperature
            return config

        config.do_sample = False
        for name, value in (("temperature", 1.0), ("top_p", 1.0), ("top_k", 50)):
            if hasattr(config, name):
                setattr(config, name, value)
        return config

    def analyze_frame(
        self,
        image: np.ndarray | Image.Image,
        prompt: str | None = None,
    ) -> SceneDescription:
        """Analyze one frame and return a structured scene description."""
        frame = Image.fromarray(image) if isinstance(image, np.ndarray) else image
        return self.analyze_scene([frame], prompt=prompt)

    def analyze_scene(
        self,
        images: list[np.ndarray | Image.Image],
        prompt: str | None = None,
    ) -> SceneDescription:
        """Analyze a scene with multiple frames and return a structured description.

        Uses few-shot JSON prompting with one parse-retry. If both attempts
        fail to produce valid JSON, falls back to a raw-text caption with
        empty subjects and ``shot_type=None``.
        """
        if not images:
            raise ValueError("`images` must contain at least one frame")

        pil_images = [
            self._downscale_image(Image.fromarray(img) if isinstance(img, np.ndarray) else img) for img in images
        ]

        user_prompt = prompt or _STRUCTURED_PROMPT
        raw_first = self._generate_one(pil_images, user_prompt)
        parsed = _try_parse_scene_json(raw_first)
        if parsed is not None:
            return parsed

        logger.info("SceneVLM JSON parse failed on first attempt; retrying with tightened prompt")
        raw_retry = self._generate_one(pil_images, _RETRY_PROMPT)
        parsed = _try_parse_scene_json(raw_retry)
        if parsed is not None:
            return parsed

        # Final fallback: surface the raw text as a caption so the scene
        # still gets *something* useful, just without structured fields.
        fallback_text = " ".join(raw_first.split()).strip() or "No scene description"
        logger.warning("SceneVLM JSON parse failed after retry; using raw-text fallback")
        return SceneDescription(caption=fallback_text, subjects=[], shot_type=None)

    def _generate_one(self, pil_images: list[Image.Image], user_prompt: str) -> str:
        content: list[dict[str, Any]] = [{"type": "image", "image": img} for img in pil_images]
        content.append({"type": "text", "text": user_prompt})
        messages = [{"role": "user", "content": content}]
        outputs = self._generate_from_message_batch([messages])
        return outputs[0]

    def _generate_from_message_batch(self, messages_batch: list[list[dict[str, Any]]]) -> list[str]:
        """Run batch generation for one or more multimodal chat messages."""
        import torch
        from qwen_vl_utils import process_vision_info

        if self._model is None:
            self._init_local()

        texts = [
            self._processor.apply_chat_template(msg, tokenize=False, add_generation_prompt=True, enable_thinking=False)
            for msg in messages_batch
        ]

        image_inputs, video_inputs = process_vision_info(messages_batch)
        processor_kwargs: dict[str, Any] = {
            "text": texts,
            "padding": True,
            "return_tensors": "pt",
            "images": image_inputs,
        }
        if video_inputs is not None:
            processor_kwargs["videos"] = video_inputs

        num_images = sum(
            len(items) if isinstance(items, list) else 1 for items in (processor_kwargs.get("images") or [])
        )

        inputs = self._processor(**processor_kwargs)
        inputs = inputs.to(self.device) if hasattr(inputs, "to") else {k: v.to(self.device) for k, v in inputs.items()}

        generation_config = self._generation_config_for_run()
        if generation_config is not None:
            generation_config.max_new_tokens = self.max_new_tokens
            generation_kwargs: dict[str, Any] = {"generation_config": generation_config}
        elif self.temperature > 0:
            generation_kwargs = {
                "max_new_tokens": self.max_new_tokens,
                "do_sample": True,
                "temperature": self.temperature,
            }
        else:
            generation_kwargs = {"max_new_tokens": self.max_new_tokens, "do_sample": False}

        t0 = time.perf_counter()
        with torch.no_grad():
            output_ids = self._model.generate(**inputs, **generation_kwargs)
        logger.info(
            "SceneVLM inference: %.2fs, %d images, %d messages", time.perf_counter() - t0, num_images, len(texts)
        )

        generated_ids_trimmed = [
            out_ids[len(in_ids) :] for in_ids, out_ids in zip(inputs["input_ids"], output_ids, strict=False)
        ]
        output_texts = self._processor.batch_decode(generated_ids_trimmed, skip_special_tokens=True)
        return [text.strip() for text in output_texts]

unload

unload() -> None

Release model + processor for low_memory parity with other stages.

Mirrors MarianTranslator.unload / Qwen3Translator.unload. Safe to call before _init_local -- it just clears already-None fields.

Source code in src/videopython/ai/understanding/image.py
def unload(self) -> None:
    """Release model + processor for ``low_memory`` parity with other stages.

    Mirrors ``MarianTranslator.unload`` / ``Qwen3Translator.unload``. Safe
    to call before ``_init_local`` -- it just clears already-None fields.
    """
    self._model = None
    self._processor = None
    release_device_memory(self.device)

analyze_frame

analyze_frame(
    image: ndarray | Image, prompt: str | None = None
) -> SceneDescription

Analyze one frame and return a structured scene description.

Source code in src/videopython/ai/understanding/image.py
def analyze_frame(
    self,
    image: np.ndarray | Image.Image,
    prompt: str | None = None,
) -> SceneDescription:
    """Analyze one frame and return a structured scene description."""
    frame = Image.fromarray(image) if isinstance(image, np.ndarray) else image
    return self.analyze_scene([frame], prompt=prompt)

analyze_scene

analyze_scene(
    images: list[ndarray | Image], prompt: str | None = None
) -> SceneDescription

Analyze a scene with multiple frames and return a structured description.

Uses few-shot JSON prompting with one parse-retry. If both attempts fail to produce valid JSON, falls back to a raw-text caption with empty subjects and shot_type=None.

Source code in src/videopython/ai/understanding/image.py
def analyze_scene(
    self,
    images: list[np.ndarray | Image.Image],
    prompt: str | None = None,
) -> SceneDescription:
    """Analyze a scene with multiple frames and return a structured description.

    Uses few-shot JSON prompting with one parse-retry. If both attempts
    fail to produce valid JSON, falls back to a raw-text caption with
    empty subjects and ``shot_type=None``.
    """
    if not images:
        raise ValueError("`images` must contain at least one frame")

    pil_images = [
        self._downscale_image(Image.fromarray(img) if isinstance(img, np.ndarray) else img) for img in images
    ]

    user_prompt = prompt or _STRUCTURED_PROMPT
    raw_first = self._generate_one(pil_images, user_prompt)
    parsed = _try_parse_scene_json(raw_first)
    if parsed is not None:
        return parsed

    logger.info("SceneVLM JSON parse failed on first attempt; retrying with tightened prompt")
    raw_retry = self._generate_one(pil_images, _RETRY_PROMPT)
    parsed = _try_parse_scene_json(raw_retry)
    if parsed is not None:
        return parsed

    # Final fallback: surface the raw text as a caption so the scene
    # still gets *something* useful, just without structured fields.
    fallback_text = " ".join(raw_first.split()).strip() or "No scene description"
    logger.warning("SceneVLM JSON parse failed after retry; using raw-text fallback")
    return SceneDescription(caption=fallback_text, subjects=[], shot_type=None)

SemanticSceneDetector

ML-based scene boundary detection using TransNetV2. More accurate than histogram-based detection, especially for gradual transitions like fades and dissolves.

from videopython.ai import SemanticSceneDetector

detector = SemanticSceneDetector(threshold=0.5, min_scene_length=1.0)
scenes = detector.detect_streaming("video.mp4")

for scene in scenes:
    print(f"Scene: {scene.start:.1f}s - {scene.end:.1f}s ({scene.duration:.1f}s)")

SemanticSceneDetector

ML-based scene detection using TransNetV2.

TransNetV2 is a neural network specifically designed for shot boundary detection, providing more accurate scene boundaries than histogram-based methods, especially for gradual transitions.

Uses the transnetv2-pytorch package with pretrained weights.

Example

from videopython.ai.understanding import SemanticSceneDetector detector = SemanticSceneDetector() scenes = detector.detect_streaming("video.mp4") for scene in scenes: ... print(f"Scene: {scene.start:.2f}s - {scene.end:.2f}s")

Source code in src/videopython/ai/understanding/temporal.py
class SemanticSceneDetector:
    """ML-based scene detection using TransNetV2.

    TransNetV2 is a neural network specifically designed for shot boundary
    detection, providing more accurate scene boundaries than histogram-based
    methods, especially for gradual transitions.

    Uses the transnetv2-pytorch package with pretrained weights.

    Example:
        >>> from videopython.ai.understanding import SemanticSceneDetector
        >>> detector = SemanticSceneDetector()
        >>> scenes = detector.detect_streaming("video.mp4")
        >>> for scene in scenes:
        ...     print(f"Scene: {scene.start:.2f}s - {scene.end:.2f}s")
    """

    def __init__(
        self,
        threshold: float = 0.5,
        min_scene_length: float = 0.5,
        device: str | None = None,
    ):
        """Initialize the semantic scene detector.

        Args:
            threshold: Confidence threshold for scene boundaries (0.0-1.0).
                Higher values = fewer, more confident boundaries.
            min_scene_length: Minimum scene duration in seconds.
            device: Device to run on ('cuda', 'mps', 'cpu', or None for auto).
                Note: MPS may have numerical inconsistencies; use 'cpu' for
                reproducible results.
        """
        if not 0.0 <= threshold <= 1.0:
            raise ValueError("threshold must be between 0.0 and 1.0")
        if min_scene_length < 0:
            raise ValueError("min_scene_length must be non-negative")

        self.threshold = threshold
        self.min_scene_length = min_scene_length
        self.device: str | None = device
        self._model: Any = None

    def _init_local(self) -> None:
        """Load the TransNetV2 model with pretrained weights."""
        if self._model is not None:
            return

        from transnetv2_pytorch import TransNetV2

        requested_device = self.device
        device = select_device(self.device, mps_allowed=True)
        log_device_initialization(
            "SemanticSceneDetector",
            requested_device=requested_device,
            resolved_device=device,
        )
        self.device = device
        self._model = TransNetV2(device=device)
        self._model.eval()

    def unload(self) -> None:
        """Release the TransNetV2 model so the next call re-initializes."""
        self._model = None
        release_device_memory(self.device)

    def detect(self, video: Video) -> list[SceneBoundary]:
        """Detect scenes in a video using ML-based boundary detection.

        Note: This method requires saving video to a temporary file for
        TransNetV2 processing. For better performance, use detect_streaming()
        with a file path directly.

        Args:
            video: Video object to analyze.

        Returns:
            List of SceneBoundary objects representing detected scenes.
        """
        import tempfile

        if len(video.frames) == 0:
            return []

        if len(video.frames) == 1:
            return [SceneBoundary(start=0.0, end=video.total_seconds, start_frame=0, end_frame=1)]

        # Save video to temp file for TransNetV2 processing
        with tempfile.NamedTemporaryFile(suffix=".mp4", delete=True) as tmp:
            video.save(tmp.name)
            return self.detect_streaming(tmp.name)

    def detect_streaming(self, path: str | Path) -> list[SceneBoundary]:
        """Detect scenes from a video file.

        Uses TransNetV2 with pretrained weights for accurate shot boundary
        detection.

        Args:
            path: Path to video file.

        Returns:
            List of SceneBoundary objects representing detected scenes.
        """
        self._init_local()

        # Use TransNetV2's detect_scenes which handles everything internally
        raw_scenes = self._model.detect_scenes(str(path), threshold=self.threshold)

        # Convert to SceneBoundary objects
        scenes = []
        for scene_data in raw_scenes:
            start_frame = scene_data["start_frame"]
            end_frame = scene_data["end_frame"]
            start_time = float(scene_data["start_time"])
            end_time = float(scene_data["end_time"])

            scenes.append(
                SceneBoundary(
                    start=start_time,
                    end=end_time,
                    start_frame=start_frame,
                    end_frame=end_frame,
                )
            )

        if self.min_scene_length > 0:
            scenes = self._merge_short_scenes(scenes)

        return scenes

    def _merge_short_scenes(self, scenes: list[SceneBoundary]) -> list[SceneBoundary]:
        """Merge scenes that are shorter than min_scene_length.

        Args:
            scenes: List of scenes to process.

        Returns:
            List of scenes with short scenes merged into adjacent ones.
        """
        if not scenes:
            return scenes

        merged = [scenes[0]]

        for scene in scenes[1:]:
            last_scene = merged[-1]

            if last_scene.duration < self.min_scene_length:
                merged[-1] = SceneBoundary(
                    start=last_scene.start,
                    end=scene.end,
                    start_frame=last_scene.start_frame,
                    end_frame=scene.end_frame,
                )
            else:
                merged.append(scene)

        if len(merged) > 1 and merged[-1].duration < self.min_scene_length:
            second_last = merged[-2]
            last = merged[-1]
            merged[-2] = SceneBoundary(
                start=second_last.start,
                end=last.end,
                start_frame=second_last.start_frame,
                end_frame=last.end_frame,
            )
            merged.pop()

        return merged

    @classmethod
    def detect_from_path(
        cls,
        path: str | Path,
        threshold: float = 0.5,
        min_scene_length: float = 0.5,
    ) -> list[SceneBoundary]:
        """Convenience method for one-shot scene detection.

        Args:
            path: Path to video file.
            threshold: Scene boundary threshold (0.0-1.0).
            min_scene_length: Minimum scene duration in seconds.

        Returns:
            List of SceneBoundary objects representing detected scenes.
        """
        detector = cls(threshold=threshold, min_scene_length=min_scene_length)
        return detector.detect_streaming(path)

__init__

__init__(
    threshold: float = 0.5,
    min_scene_length: float = 0.5,
    device: str | None = None,
)

Initialize the semantic scene detector.

Parameters:

Name Type Description Default
threshold float

Confidence threshold for scene boundaries (0.0-1.0). Higher values = fewer, more confident boundaries.

0.5
min_scene_length float

Minimum scene duration in seconds.

0.5
device str | None

Device to run on ('cuda', 'mps', 'cpu', or None for auto). Note: MPS may have numerical inconsistencies; use 'cpu' for reproducible results.

None
Source code in src/videopython/ai/understanding/temporal.py
def __init__(
    self,
    threshold: float = 0.5,
    min_scene_length: float = 0.5,
    device: str | None = None,
):
    """Initialize the semantic scene detector.

    Args:
        threshold: Confidence threshold for scene boundaries (0.0-1.0).
            Higher values = fewer, more confident boundaries.
        min_scene_length: Minimum scene duration in seconds.
        device: Device to run on ('cuda', 'mps', 'cpu', or None for auto).
            Note: MPS may have numerical inconsistencies; use 'cpu' for
            reproducible results.
    """
    if not 0.0 <= threshold <= 1.0:
        raise ValueError("threshold must be between 0.0 and 1.0")
    if min_scene_length < 0:
        raise ValueError("min_scene_length must be non-negative")

    self.threshold = threshold
    self.min_scene_length = min_scene_length
    self.device: str | None = device
    self._model: Any = None

unload

unload() -> None

Release the TransNetV2 model so the next call re-initializes.

Source code in src/videopython/ai/understanding/temporal.py
def unload(self) -> None:
    """Release the TransNetV2 model so the next call re-initializes."""
    self._model = None
    release_device_memory(self.device)

detect

detect(video: Video) -> list[SceneBoundary]

Detect scenes in a video using ML-based boundary detection.

Note: This method requires saving video to a temporary file for TransNetV2 processing. For better performance, use detect_streaming() with a file path directly.

Parameters:

Name Type Description Default
video Video

Video object to analyze.

required

Returns:

Type Description
list[SceneBoundary]

List of SceneBoundary objects representing detected scenes.

Source code in src/videopython/ai/understanding/temporal.py
def detect(self, video: Video) -> list[SceneBoundary]:
    """Detect scenes in a video using ML-based boundary detection.

    Note: This method requires saving video to a temporary file for
    TransNetV2 processing. For better performance, use detect_streaming()
    with a file path directly.

    Args:
        video: Video object to analyze.

    Returns:
        List of SceneBoundary objects representing detected scenes.
    """
    import tempfile

    if len(video.frames) == 0:
        return []

    if len(video.frames) == 1:
        return [SceneBoundary(start=0.0, end=video.total_seconds, start_frame=0, end_frame=1)]

    # Save video to temp file for TransNetV2 processing
    with tempfile.NamedTemporaryFile(suffix=".mp4", delete=True) as tmp:
        video.save(tmp.name)
        return self.detect_streaming(tmp.name)

detect_streaming

detect_streaming(path: str | Path) -> list[SceneBoundary]

Detect scenes from a video file.

Uses TransNetV2 with pretrained weights for accurate shot boundary detection.

Parameters:

Name Type Description Default
path str | Path

Path to video file.

required

Returns:

Type Description
list[SceneBoundary]

List of SceneBoundary objects representing detected scenes.

Source code in src/videopython/ai/understanding/temporal.py
def detect_streaming(self, path: str | Path) -> list[SceneBoundary]:
    """Detect scenes from a video file.

    Uses TransNetV2 with pretrained weights for accurate shot boundary
    detection.

    Args:
        path: Path to video file.

    Returns:
        List of SceneBoundary objects representing detected scenes.
    """
    self._init_local()

    # Use TransNetV2's detect_scenes which handles everything internally
    raw_scenes = self._model.detect_scenes(str(path), threshold=self.threshold)

    # Convert to SceneBoundary objects
    scenes = []
    for scene_data in raw_scenes:
        start_frame = scene_data["start_frame"]
        end_frame = scene_data["end_frame"]
        start_time = float(scene_data["start_time"])
        end_time = float(scene_data["end_time"])

        scenes.append(
            SceneBoundary(
                start=start_time,
                end=end_time,
                start_frame=start_frame,
                end_frame=end_frame,
            )
        )

    if self.min_scene_length > 0:
        scenes = self._merge_short_scenes(scenes)

    return scenes

detect_from_path classmethod

detect_from_path(
    path: str | Path,
    threshold: float = 0.5,
    min_scene_length: float = 0.5,
) -> list[SceneBoundary]

Convenience method for one-shot scene detection.

Parameters:

Name Type Description Default
path str | Path

Path to video file.

required
threshold float

Scene boundary threshold (0.0-1.0).

0.5
min_scene_length float

Minimum scene duration in seconds.

0.5

Returns:

Type Description
list[SceneBoundary]

List of SceneBoundary objects representing detected scenes.

Source code in src/videopython/ai/understanding/temporal.py
@classmethod
def detect_from_path(
    cls,
    path: str | Path,
    threshold: float = 0.5,
    min_scene_length: float = 0.5,
) -> list[SceneBoundary]:
    """Convenience method for one-shot scene detection.

    Args:
        path: Path to video file.
        threshold: Scene boundary threshold (0.0-1.0).
        min_scene_length: Minimum scene duration in seconds.

    Returns:
        List of SceneBoundary objects representing detected scenes.
    """
    detector = cls(threshold=threshold, min_scene_length=min_scene_length)
    return detector.detect_streaming(path)

FaceTracker

FaceTracker runs YOLOv8-face detection and stitches detections into per-shot tracks via IoU association — no embedding re-id, so a track does not survive across shot boundaries. Two surfaces:

  • track_shot(frames, frame_indices) returns a list of FaceTrack objects with stable ids within the shot. This is the API the analyzer uses.
  • detect_and_track(frame, frame_index) / track_video(frames) are the legacy single-subject smoothed-position APIs used by FaceTrackingCrop (see AI Transforms).
from videopython.ai import FaceTracker

tracker = FaceTracker(backend="auto")
tracks = tracker.track_shot(frames)

for track in tracks:
    print(f"track #{track.track_id}: {track.length} frames, "
          f"first frame {track.frame_indices[0]}")

FaceTracker

Face tracking utility with per-frame smoothing and per-shot tracks.

Two surfaces:

  • detect_and_track(frame, frame_index) / track_video(frames) — legacy single-subject API used by FaceTrackingCrop. Returns a smoothed (cx, cy, w, h) tuple.
  • track_shot(frames, frame_indices) — new per-shot multi-track API returning list[FaceTrack]. Used by the analysis pipeline (M5) and lip-sync (M6) to bind detections to subjects across the frames of one shot. IoU-only association — tracks do not survive across shot boundaries.
Source code in src/videopython/ai/understanding/faces.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
class FaceTracker:
    """Face tracking utility with per-frame smoothing and per-shot tracks.

    Two surfaces:

    - ``detect_and_track(frame, frame_index)`` / ``track_video(frames)`` —
      legacy single-subject API used by ``FaceTrackingCrop``. Returns a
      smoothed ``(cx, cy, w, h)`` tuple.
    - ``track_shot(frames, frame_indices)`` — new per-shot multi-track API
      returning ``list[FaceTrack]``. Used by the analysis pipeline (M5)
      and lip-sync (M6) to bind detections to subjects across the
      frames of one shot. IoU-only association — tracks do not survive
      across shot boundaries.
    """

    def __init__(
        self,
        selection_strategy: Literal["largest", "centered", "index"] = "largest",
        face_index: int = 0,
        smoothing: float = 0.8,
        detection_interval: int = 3,
        min_face_size: int = 30,
        backend: Literal["cpu", "gpu", "auto"] = "auto",
        sample_rate: int = 1,
        batch_size: int = 16,
        iou_match_threshold: float = DEFAULT_IOU_MATCH_THRESHOLD,
        max_missed_frames: int = DEFAULT_MAX_MISSED_FRAMES,
    ):
        """Initialize face tracker.

        Args:
            selection_strategy: How to select which face to track (legacy
                single-subject API).
                - "largest": Track the face with the largest bounding box.
                - "centered": Track the face closest to frame center.
                - "index": Track the face at a specific index (sorted by area).
            face_index: Index of face to track when using "index" strategy.
            smoothing: Exponential moving average factor (0-1). Higher = smoother.
            detection_interval: Run detection every N frames, interpolate between.
            min_face_size: Minimum face size in pixels for detection.
            backend: Detection backend - "cpu", "gpu", or "auto".
            sample_rate: For GPU backend, detect every Nth frame and interpolate.
                Only used by track_video(). Default 1 (every frame).
            batch_size: Batch size for GPU detection. Default 16.
            iou_match_threshold: Minimum IoU between consecutive detections to
                continue an existing per-shot track. Used by ``track_shot``.
            max_missed_frames: How many consecutive frames a per-shot track
                can go without a detection before it's closed.
        """
        self.selection_strategy = selection_strategy
        self.face_index = face_index
        self.smoothing = smoothing
        self.detection_interval = detection_interval
        self.min_face_size = min_face_size
        self.backend: Literal["cpu", "gpu", "auto"] = backend
        self.sample_rate = sample_rate
        self.batch_size = batch_size
        self.iou_match_threshold = iou_match_threshold
        self.max_missed_frames = max_missed_frames

        self._detector: _FaceDetector | None = None
        self._last_position: tuple[float, float] | None = None
        self._last_size: tuple[float, float] | None = None
        self._smoothed_position: tuple[float, float] | None = None
        self._smoothed_size: tuple[float, float] | None = None
        logger.info("FaceTracker initialized with backend=%s", self.backend)

    def _init_detector(self) -> None:
        """Initialize face detector lazily."""
        self._detector = _FaceDetector(
            min_face_size=self.min_face_size,
            backend=self.backend,
        )

    def _select_face(
        self,
        faces: list[DetectedFace],
        frame_width: int,
        frame_height: int,
    ) -> tuple[float, float, float, float] | None:
        """Select a face based on the configured strategy.

        Args:
            faces: List of DetectedFace objects.
            frame_width: Width of the frame.
            frame_height: Height of the frame.

        Returns:
            Tuple of (center_x, center_y, width, height) in normalized coords, or None.
        """
        faces_with_box = [(f, f.bounding_box) for f in faces if f.bounding_box is not None]
        if not faces_with_box:
            return None

        if self.selection_strategy == "largest":
            _, bbox = faces_with_box[0]
        elif self.selection_strategy == "centered":
            frame_center = (0.5, 0.5)
            _, bbox = min(
                faces_with_box,
                key=lambda fb: ((fb[1].center[0] - frame_center[0]) ** 2 + (fb[1].center[1] - frame_center[1]) ** 2),
            )
        elif self.selection_strategy == "index":
            idx = self.face_index if self.face_index < len(faces_with_box) else 0
            _, bbox = faces_with_box[idx]
        else:
            _, bbox = faces_with_box[0]

        return (bbox.center[0], bbox.center[1], bbox.width, bbox.height)

    def detect_and_track(
        self,
        frame: np.ndarray,
        frame_index: int,
    ) -> tuple[float, float, float, float] | None:
        """Detect face in frame and return smoothed position.

        Args:
            frame: Video frame as numpy array (H, W, 3).
            frame_index: Index of current frame.

        Returns:
            Tuple of (center_x, center_y, width, height) in normalized coords,
            or None if no face detected and no fallback available.
        """
        if self._detector is None:
            self._init_detector()
            assert self._detector is not None

        h, w = frame.shape[:2]

        if frame_index % self.detection_interval == 0:
            faces = self._detector.detect(frame)
            face_info = self._select_face(faces, w, h)
            if face_info is not None:
                self._last_position = (face_info[0], face_info[1])
                self._last_size = (face_info[2], face_info[3])
        elif self._last_position is not None and self._last_size is not None:
            face_info = (*self._last_position, *self._last_size)
        else:
            face_info = None

        return self._smooth(face_info)

    def _smooth(
        self,
        face_info: tuple[float, float, float, float] | None,
    ) -> tuple[float, float, float, float] | None:
        """Apply EMA smoothing, or replay the last smoothed value when no detection.

        Returns ``None`` when no detection has been seen yet.
        """
        if face_info is not None:
            cx, cy, fw, fh = face_info
            if self._smoothed_position is None:
                self._smoothed_position = (cx, cy)
                self._smoothed_size = (fw, fh)
            else:
                assert self._smoothed_size is not None
                alpha = 1 - self.smoothing
                self._smoothed_position = (
                    self._smoothed_position[0] * self.smoothing + cx * alpha,
                    self._smoothed_position[1] * self.smoothing + cy * alpha,
                )
                self._smoothed_size = (
                    self._smoothed_size[0] * self.smoothing + fw * alpha,
                    self._smoothed_size[1] * self.smoothing + fh * alpha,
                )
            return (*self._smoothed_position, *self._smoothed_size)

        if self._smoothed_position is not None and self._smoothed_size is not None:
            return (*self._smoothed_position, *self._smoothed_size)
        return None

    def reset(self) -> None:
        """Reset tracker state for a new video."""
        self._last_position = None
        self._last_size = None
        self._smoothed_position = None
        self._smoothed_size = None

    @staticmethod
    def _interpolate_bbox(
        bbox1: tuple[float, float, float, float],
        bbox2: tuple[float, float, float, float],
        t: float,
    ) -> tuple[float, float, float, float]:
        """Linearly interpolate between two bounding boxes."""
        return (
            bbox1[0] + (bbox2[0] - bbox1[0]) * t,
            bbox1[1] + (bbox2[1] - bbox1[1]) * t,
            bbox1[2] + (bbox2[2] - bbox1[2]) * t,
            bbox1[3] + (bbox2[3] - bbox1[3]) * t,
        )

    def track_video(
        self,
        frames: np.ndarray,
    ) -> list[tuple[float, float, float, float] | None]:
        """Track face through entire video using optimized batch detection.

        Optimized for GPU backends with frame sampling and interpolation
        for smooth tracking with reduced computation.

        Args:
            frames: Video frames array of shape (N, H, W, 3).

        Returns:
            List of face positions (cx, cy, w, h) for each frame, or None if
            no face detected and no fallback available.
        """
        if self._detector is None:
            self._init_detector()
            assert self._detector is not None

        n_frames = len(frames)
        if n_frames == 0:
            return []

        h, w = frames[0].shape[:2]

        execution_device_getter = getattr(self._detector, "execution_device", None)
        if callable(execution_device_getter):
            resolved = execution_device_getter()
            backend_execution_device = resolved if resolved in {"cpu", "cuda"} else None
        else:
            backend_execution_device = None
        if backend_execution_device is None:
            backend_execution_device = "cuda" if self.backend == "gpu" else "cpu"

        use_sampled_interpolation = self.sample_rate > 1 and backend_execution_device == "cuda"

        if use_sampled_interpolation:
            sample_indices = list(range(0, n_frames, self.sample_rate))
            if sample_indices[-1] != n_frames - 1:
                sample_indices.append(n_frames - 1)
        else:
            sample_indices = list(range(n_frames))

        sampled_frames = [frames[i] for i in sample_indices]

        sampled_detections: list[list[DetectedFace]] = []
        for batch_start in range(0, len(sampled_frames), self.batch_size):
            batch_end = min(batch_start + self.batch_size, len(sampled_frames))
            batch = sampled_frames[batch_start:batch_end]
            batch_results = self._detector.detect_batch(batch)
            sampled_detections.extend(batch_results)

        sampled_faces: list[tuple[float, float, float, float] | None] = []
        for faces in sampled_detections:
            face_info = self._select_face(faces, w, h)
            sampled_faces.append(face_info)

        if not use_sampled_interpolation:
            self.reset()
            return [self._smooth(face_info) for face_info in sampled_faces]

        all_positions: list[tuple[float, float, float, float] | None] = [None] * n_frames

        for idx, sample_idx in enumerate(sample_indices):
            all_positions[sample_idx] = sampled_faces[idx]

        for i in range(len(sample_indices) - 1):
            start_idx = sample_indices[i]
            end_idx = sample_indices[i + 1]
            start_face = sampled_faces[i]
            end_face = sampled_faces[i + 1]

            if start_face is None and end_face is None:
                continue
            elif start_face is None:
                for j in range(start_idx, end_idx):
                    all_positions[j] = end_face
            elif end_face is None:
                for j in range(start_idx + 1, end_idx + 1):
                    all_positions[j] = start_face
            else:
                gap = end_idx - start_idx
                for j in range(start_idx + 1, end_idx):
                    t = (j - start_idx) / gap
                    all_positions[j] = self._interpolate_bbox(start_face, end_face, t)

        self.reset()
        return [self._smooth(face_info) for face_info in all_positions]

    def track_shot(
        self,
        frames: list[np.ndarray] | np.ndarray,
        frame_indices: list[int] | None = None,
    ) -> list[FaceTrack]:
        """Per-shot multi-track association via IoU.

        Detection is run on every input frame (caller is expected to have
        already chosen the sampling cadence -- the analysis pipeline
        passes one frame per scene-VLM sample, lip-sync passes every
        frame in the shot). Tracks are stitched together greedily by
        best IoU above ``iou_match_threshold``; tracks with no match for
        ``max_missed_frames`` consecutive frames are closed and won't
        accept future associations.

        Track ids are integers starting at 1 within this shot. They are
        **not** stable across shots — embedding re-id is deferred.

        Args:
            frames: Frames in the shot (list or stacked ndarray).
            frame_indices: Source-video frame indices. Defaults to
                ``range(len(frames))`` when omitted.

        Returns:
            List of ``FaceTrack`` objects, one per distinct subject
            tracked in the shot.
        """
        if isinstance(frames, np.ndarray):
            frame_list = [frames[i] for i in range(frames.shape[0])] if frames.ndim == 4 else [frames]
        else:
            frame_list = list(frames)

        if not frame_list:
            return []

        if frame_indices is None:
            frame_indices = list(range(len(frame_list)))
        if len(frame_indices) != len(frame_list):
            raise ValueError("frame_indices length must match frames length")

        if self._detector is None:
            self._init_detector()
            assert self._detector is not None

        per_frame_detections: list[list[DetectedFace]] = []
        for batch_start in range(0, len(frame_list), self.batch_size):
            batch = frame_list[batch_start : batch_start + self.batch_size]
            per_frame_detections.extend(self._detector.detect_batch(batch))

        active: list[_OpenTrack] = []
        finished: list[_OpenTrack] = []
        next_id = 1

        for relative_idx, faces in enumerate(per_frame_detections):
            absolute_idx = frame_indices[relative_idx]
            available = [face for face in faces if face.bounding_box is not None]
            assignments: dict[int, DetectedFace] = {}

            for track in active:
                best_face: DetectedFace | None = None
                best_iou = self.iou_match_threshold
                last_box = track.last_box
                if last_box is None:
                    continue
                for face in available:
                    if face in assignments.values() or face.bounding_box is None:
                        continue
                    iou = _bbox_iou(last_box, face.bounding_box)
                    if iou > best_iou:
                        best_iou = iou
                        best_face = face
                if best_face is not None:
                    assignments[track.track_id] = best_face

            for track in active:
                if track.track_id in assignments:
                    face = assignments[track.track_id]
                    assert face.bounding_box is not None
                    track.frame_indices.append(absolute_idx)
                    track.boxes.append(face.bounding_box)
                    track.confidences.append(face.confidence)
                    track.last_box = face.bounding_box
                    track.missed = 0
                else:
                    track.missed += 1

            for face in available:
                if face in assignments.values() or face.bounding_box is None:
                    continue
                track = _OpenTrack(track_id=next_id, last_box=face.bounding_box)
                next_id += 1
                track.frame_indices.append(absolute_idx)
                track.boxes.append(face.bounding_box)
                track.confidences.append(face.confidence)
                active.append(track)

            still_active: list[_OpenTrack] = []
            for track in active:
                if track.missed > self.max_missed_frames:
                    finished.append(track)
                else:
                    still_active.append(track)
            active = still_active

        finished.extend(active)

        return [
            FaceTrack(
                track_id=track.track_id,
                frame_indices=track.frame_indices,
                boxes=track.boxes,
                confidences=track.confidences,
            )
            for track in finished
            if track.frame_indices
        ]

__init__

__init__(
    selection_strategy: Literal[
        "largest", "centered", "index"
    ] = "largest",
    face_index: int = 0,
    smoothing: float = 0.8,
    detection_interval: int = 3,
    min_face_size: int = 30,
    backend: Literal["cpu", "gpu", "auto"] = "auto",
    sample_rate: int = 1,
    batch_size: int = 16,
    iou_match_threshold: float = DEFAULT_IOU_MATCH_THRESHOLD,
    max_missed_frames: int = DEFAULT_MAX_MISSED_FRAMES,
)

Initialize face tracker.

Parameters:

Name Type Description Default
selection_strategy Literal['largest', 'centered', 'index']

How to select which face to track (legacy single-subject API). - "largest": Track the face with the largest bounding box. - "centered": Track the face closest to frame center. - "index": Track the face at a specific index (sorted by area).

'largest'
face_index int

Index of face to track when using "index" strategy.

0
smoothing float

Exponential moving average factor (0-1). Higher = smoother.

0.8
detection_interval int

Run detection every N frames, interpolate between.

3
min_face_size int

Minimum face size in pixels for detection.

30
backend Literal['cpu', 'gpu', 'auto']

Detection backend - "cpu", "gpu", or "auto".

'auto'
sample_rate int

For GPU backend, detect every Nth frame and interpolate. Only used by track_video(). Default 1 (every frame).

1
batch_size int

Batch size for GPU detection. Default 16.

16
iou_match_threshold float

Minimum IoU between consecutive detections to continue an existing per-shot track. Used by track_shot.

DEFAULT_IOU_MATCH_THRESHOLD
max_missed_frames int

How many consecutive frames a per-shot track can go without a detection before it's closed.

DEFAULT_MAX_MISSED_FRAMES
Source code in src/videopython/ai/understanding/faces.py
def __init__(
    self,
    selection_strategy: Literal["largest", "centered", "index"] = "largest",
    face_index: int = 0,
    smoothing: float = 0.8,
    detection_interval: int = 3,
    min_face_size: int = 30,
    backend: Literal["cpu", "gpu", "auto"] = "auto",
    sample_rate: int = 1,
    batch_size: int = 16,
    iou_match_threshold: float = DEFAULT_IOU_MATCH_THRESHOLD,
    max_missed_frames: int = DEFAULT_MAX_MISSED_FRAMES,
):
    """Initialize face tracker.

    Args:
        selection_strategy: How to select which face to track (legacy
            single-subject API).
            - "largest": Track the face with the largest bounding box.
            - "centered": Track the face closest to frame center.
            - "index": Track the face at a specific index (sorted by area).
        face_index: Index of face to track when using "index" strategy.
        smoothing: Exponential moving average factor (0-1). Higher = smoother.
        detection_interval: Run detection every N frames, interpolate between.
        min_face_size: Minimum face size in pixels for detection.
        backend: Detection backend - "cpu", "gpu", or "auto".
        sample_rate: For GPU backend, detect every Nth frame and interpolate.
            Only used by track_video(). Default 1 (every frame).
        batch_size: Batch size for GPU detection. Default 16.
        iou_match_threshold: Minimum IoU between consecutive detections to
            continue an existing per-shot track. Used by ``track_shot``.
        max_missed_frames: How many consecutive frames a per-shot track
            can go without a detection before it's closed.
    """
    self.selection_strategy = selection_strategy
    self.face_index = face_index
    self.smoothing = smoothing
    self.detection_interval = detection_interval
    self.min_face_size = min_face_size
    self.backend: Literal["cpu", "gpu", "auto"] = backend
    self.sample_rate = sample_rate
    self.batch_size = batch_size
    self.iou_match_threshold = iou_match_threshold
    self.max_missed_frames = max_missed_frames

    self._detector: _FaceDetector | None = None
    self._last_position: tuple[float, float] | None = None
    self._last_size: tuple[float, float] | None = None
    self._smoothed_position: tuple[float, float] | None = None
    self._smoothed_size: tuple[float, float] | None = None
    logger.info("FaceTracker initialized with backend=%s", self.backend)

detect_and_track

detect_and_track(
    frame: ndarray, frame_index: int
) -> tuple[float, float, float, float] | None

Detect face in frame and return smoothed position.

Parameters:

Name Type Description Default
frame ndarray

Video frame as numpy array (H, W, 3).

required
frame_index int

Index of current frame.

required

Returns:

Type Description
tuple[float, float, float, float] | None

Tuple of (center_x, center_y, width, height) in normalized coords,

tuple[float, float, float, float] | None

or None if no face detected and no fallback available.

Source code in src/videopython/ai/understanding/faces.py
def detect_and_track(
    self,
    frame: np.ndarray,
    frame_index: int,
) -> tuple[float, float, float, float] | None:
    """Detect face in frame and return smoothed position.

    Args:
        frame: Video frame as numpy array (H, W, 3).
        frame_index: Index of current frame.

    Returns:
        Tuple of (center_x, center_y, width, height) in normalized coords,
        or None if no face detected and no fallback available.
    """
    if self._detector is None:
        self._init_detector()
        assert self._detector is not None

    h, w = frame.shape[:2]

    if frame_index % self.detection_interval == 0:
        faces = self._detector.detect(frame)
        face_info = self._select_face(faces, w, h)
        if face_info is not None:
            self._last_position = (face_info[0], face_info[1])
            self._last_size = (face_info[2], face_info[3])
    elif self._last_position is not None and self._last_size is not None:
        face_info = (*self._last_position, *self._last_size)
    else:
        face_info = None

    return self._smooth(face_info)

reset

reset() -> None

Reset tracker state for a new video.

Source code in src/videopython/ai/understanding/faces.py
def reset(self) -> None:
    """Reset tracker state for a new video."""
    self._last_position = None
    self._last_size = None
    self._smoothed_position = None
    self._smoothed_size = None

track_video

track_video(
    frames: ndarray,
) -> list[tuple[float, float, float, float] | None]

Track face through entire video using optimized batch detection.

Optimized for GPU backends with frame sampling and interpolation for smooth tracking with reduced computation.

Parameters:

Name Type Description Default
frames ndarray

Video frames array of shape (N, H, W, 3).

required

Returns:

Type Description
list[tuple[float, float, float, float] | None]

List of face positions (cx, cy, w, h) for each frame, or None if

list[tuple[float, float, float, float] | None]

no face detected and no fallback available.

Source code in src/videopython/ai/understanding/faces.py
def track_video(
    self,
    frames: np.ndarray,
) -> list[tuple[float, float, float, float] | None]:
    """Track face through entire video using optimized batch detection.

    Optimized for GPU backends with frame sampling and interpolation
    for smooth tracking with reduced computation.

    Args:
        frames: Video frames array of shape (N, H, W, 3).

    Returns:
        List of face positions (cx, cy, w, h) for each frame, or None if
        no face detected and no fallback available.
    """
    if self._detector is None:
        self._init_detector()
        assert self._detector is not None

    n_frames = len(frames)
    if n_frames == 0:
        return []

    h, w = frames[0].shape[:2]

    execution_device_getter = getattr(self._detector, "execution_device", None)
    if callable(execution_device_getter):
        resolved = execution_device_getter()
        backend_execution_device = resolved if resolved in {"cpu", "cuda"} else None
    else:
        backend_execution_device = None
    if backend_execution_device is None:
        backend_execution_device = "cuda" if self.backend == "gpu" else "cpu"

    use_sampled_interpolation = self.sample_rate > 1 and backend_execution_device == "cuda"

    if use_sampled_interpolation:
        sample_indices = list(range(0, n_frames, self.sample_rate))
        if sample_indices[-1] != n_frames - 1:
            sample_indices.append(n_frames - 1)
    else:
        sample_indices = list(range(n_frames))

    sampled_frames = [frames[i] for i in sample_indices]

    sampled_detections: list[list[DetectedFace]] = []
    for batch_start in range(0, len(sampled_frames), self.batch_size):
        batch_end = min(batch_start + self.batch_size, len(sampled_frames))
        batch = sampled_frames[batch_start:batch_end]
        batch_results = self._detector.detect_batch(batch)
        sampled_detections.extend(batch_results)

    sampled_faces: list[tuple[float, float, float, float] | None] = []
    for faces in sampled_detections:
        face_info = self._select_face(faces, w, h)
        sampled_faces.append(face_info)

    if not use_sampled_interpolation:
        self.reset()
        return [self._smooth(face_info) for face_info in sampled_faces]

    all_positions: list[tuple[float, float, float, float] | None] = [None] * n_frames

    for idx, sample_idx in enumerate(sample_indices):
        all_positions[sample_idx] = sampled_faces[idx]

    for i in range(len(sample_indices) - 1):
        start_idx = sample_indices[i]
        end_idx = sample_indices[i + 1]
        start_face = sampled_faces[i]
        end_face = sampled_faces[i + 1]

        if start_face is None and end_face is None:
            continue
        elif start_face is None:
            for j in range(start_idx, end_idx):
                all_positions[j] = end_face
        elif end_face is None:
            for j in range(start_idx + 1, end_idx + 1):
                all_positions[j] = start_face
        else:
            gap = end_idx - start_idx
            for j in range(start_idx + 1, end_idx):
                t = (j - start_idx) / gap
                all_positions[j] = self._interpolate_bbox(start_face, end_face, t)

    self.reset()
    return [self._smooth(face_info) for face_info in all_positions]

track_shot

track_shot(
    frames: list[ndarray] | ndarray,
    frame_indices: list[int] | None = None,
) -> list[FaceTrack]

Per-shot multi-track association via IoU.

Detection is run on every input frame (caller is expected to have already chosen the sampling cadence -- the analysis pipeline passes one frame per scene-VLM sample, lip-sync passes every frame in the shot). Tracks are stitched together greedily by best IoU above iou_match_threshold; tracks with no match for max_missed_frames consecutive frames are closed and won't accept future associations.

Track ids are integers starting at 1 within this shot. They are not stable across shots — embedding re-id is deferred.

Parameters:

Name Type Description Default
frames list[ndarray] | ndarray

Frames in the shot (list or stacked ndarray).

required
frame_indices list[int] | None

Source-video frame indices. Defaults to range(len(frames)) when omitted.

None

Returns:

Type Description
list[FaceTrack]

List of FaceTrack objects, one per distinct subject

list[FaceTrack]

tracked in the shot.

Source code in src/videopython/ai/understanding/faces.py
def track_shot(
    self,
    frames: list[np.ndarray] | np.ndarray,
    frame_indices: list[int] | None = None,
) -> list[FaceTrack]:
    """Per-shot multi-track association via IoU.

    Detection is run on every input frame (caller is expected to have
    already chosen the sampling cadence -- the analysis pipeline
    passes one frame per scene-VLM sample, lip-sync passes every
    frame in the shot). Tracks are stitched together greedily by
    best IoU above ``iou_match_threshold``; tracks with no match for
    ``max_missed_frames`` consecutive frames are closed and won't
    accept future associations.

    Track ids are integers starting at 1 within this shot. They are
    **not** stable across shots — embedding re-id is deferred.

    Args:
        frames: Frames in the shot (list or stacked ndarray).
        frame_indices: Source-video frame indices. Defaults to
            ``range(len(frames))`` when omitted.

    Returns:
        List of ``FaceTrack`` objects, one per distinct subject
        tracked in the shot.
    """
    if isinstance(frames, np.ndarray):
        frame_list = [frames[i] for i in range(frames.shape[0])] if frames.ndim == 4 else [frames]
    else:
        frame_list = list(frames)

    if not frame_list:
        return []

    if frame_indices is None:
        frame_indices = list(range(len(frame_list)))
    if len(frame_indices) != len(frame_list):
        raise ValueError("frame_indices length must match frames length")

    if self._detector is None:
        self._init_detector()
        assert self._detector is not None

    per_frame_detections: list[list[DetectedFace]] = []
    for batch_start in range(0, len(frame_list), self.batch_size):
        batch = frame_list[batch_start : batch_start + self.batch_size]
        per_frame_detections.extend(self._detector.detect_batch(batch))

    active: list[_OpenTrack] = []
    finished: list[_OpenTrack] = []
    next_id = 1

    for relative_idx, faces in enumerate(per_frame_detections):
        absolute_idx = frame_indices[relative_idx]
        available = [face for face in faces if face.bounding_box is not None]
        assignments: dict[int, DetectedFace] = {}

        for track in active:
            best_face: DetectedFace | None = None
            best_iou = self.iou_match_threshold
            last_box = track.last_box
            if last_box is None:
                continue
            for face in available:
                if face in assignments.values() or face.bounding_box is None:
                    continue
                iou = _bbox_iou(last_box, face.bounding_box)
                if iou > best_iou:
                    best_iou = iou
                    best_face = face
            if best_face is not None:
                assignments[track.track_id] = best_face

        for track in active:
            if track.track_id in assignments:
                face = assignments[track.track_id]
                assert face.bounding_box is not None
                track.frame_indices.append(absolute_idx)
                track.boxes.append(face.bounding_box)
                track.confidences.append(face.confidence)
                track.last_box = face.bounding_box
                track.missed = 0
            else:
                track.missed += 1

        for face in available:
            if face in assignments.values() or face.bounding_box is None:
                continue
            track = _OpenTrack(track_id=next_id, last_box=face.bounding_box)
            next_id += 1
            track.frame_indices.append(absolute_idx)
            track.boxes.append(face.bounding_box)
            track.confidences.append(face.confidence)
            active.append(track)

        still_active: list[_OpenTrack] = []
        for track in active:
            if track.missed > self.max_missed_frames:
                finished.append(track)
            else:
                still_active.append(track)
        active = still_active

    finished.extend(active)

    return [
        FaceTrack(
            track_id=track.track_id,
            frame_indices=track.frame_indices,
            boxes=track.boxes,
            confidences=track.confidences,
        )
        for track in finished
        if track.frame_indices
    ]

Scene Data Classes

These classes are used by scene and audio analyzers to represent analysis results:

SceneBoundary

SceneBoundary dataclass

Timing information for a detected scene.

A lightweight structure representing scene boundaries returned by scene detectors (e.g. videopython.ai.SemanticSceneDetector). This is a backbone type — higher-level scene analysis lives in orchestration packages.

Attributes:

Name Type Description
start float

Scene start time in seconds

end float

Scene end time in seconds

start_frame int

Index of the first frame in this scene

end_frame int

Index of the last frame in this scene (exclusive)

Source code in src/videopython/base/description.py
@dataclass
class SceneBoundary:
    """Timing information for a detected scene.

    A lightweight structure representing scene boundaries returned by
    scene detectors (e.g. ``videopython.ai.SemanticSceneDetector``). This
    is a backbone type — higher-level scene analysis lives in orchestration
    packages.

    Attributes:
        start: Scene start time in seconds
        end: Scene end time in seconds
        start_frame: Index of the first frame in this scene
        end_frame: Index of the last frame in this scene (exclusive)
    """

    start: float
    end: float
    start_frame: int
    end_frame: int

    @property
    def duration(self) -> float:
        """Duration of the scene in seconds."""
        return self.end - self.start

    @property
    def frame_count(self) -> int:
        """Number of frames in this scene."""
        return self.end_frame - self.start_frame

    def to_dict(self) -> dict[str, Any]:
        """Convert to dictionary for JSON serialization."""
        return {
            "start": self.start,
            "end": self.end,
            "start_frame": self.start_frame,
            "end_frame": self.end_frame,
        }

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> "SceneBoundary":
        """Create SceneBoundary from dictionary."""
        return cls(
            start=data["start"],
            end=data["end"],
            start_frame=data["start_frame"],
            end_frame=data["end_frame"],
        )

duration property

duration: float

Duration of the scene in seconds.

frame_count property

frame_count: int

Number of frames in this scene.

to_dict

to_dict() -> dict[str, Any]

Convert to dictionary for JSON serialization.

Source code in src/videopython/base/description.py
def to_dict(self) -> dict[str, Any]:
    """Convert to dictionary for JSON serialization."""
    return {
        "start": self.start,
        "end": self.end,
        "start_frame": self.start_frame,
        "end_frame": self.end_frame,
    }

from_dict classmethod

from_dict(data: dict[str, Any]) -> 'SceneBoundary'

Create SceneBoundary from dictionary.

Source code in src/videopython/base/description.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "SceneBoundary":
    """Create SceneBoundary from dictionary."""
    return cls(
        start=data["start"],
        end=data["end"],
        start_frame=data["start_frame"],
        end_frame=data["end_frame"],
    )

SceneDescription

SceneDescription dataclass

Structured visual scene description from the SceneVLM.

The v1 schema is intentionally narrow (caption + subjects + shot_type). Wider schemas drop JSON parse rate on small models without eval data to defend the cost. Fields are added in v2 as parse-rate measurements justify them; closed enums first, open lists last.

Attributes:

Name Type Description
caption str

One-sentence summary of the scene.

subjects list[str]

Open list of named subjects visible in the frames.

shot_type str | None

Closed enum framing the camera distance, or None when JSON parsing fell back to raw text.

Source code in src/videopython/base/description.py
@dataclass
class SceneDescription:
    """Structured visual scene description from the SceneVLM.

    The v1 schema is intentionally narrow (caption + subjects + shot_type).
    Wider schemas drop JSON parse rate on small models without eval data
    to defend the cost. Fields are added in v2 as parse-rate measurements
    justify them; closed enums first, open lists last.

    Attributes:
        caption: One-sentence summary of the scene.
        subjects: Open list of named subjects visible in the frames.
        shot_type: Closed enum framing the camera distance, or None
            when JSON parsing fell back to raw text.
    """

    caption: str
    subjects: list[str] = field(default_factory=list)
    shot_type: str | None = None

    def to_dict(self) -> dict[str, Any]:
        return {
            "caption": self.caption,
            "subjects": list(self.subjects),
            "shot_type": self.shot_type,
        }

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> "SceneDescription":
        return cls(
            caption=str(data["caption"]),
            subjects=[str(s) for s in data.get("subjects", [])],
            shot_type=data.get("shot_type"),
        )

BoundingBox

BoundingBox

Bases: BaseModel

A bounding box for detected objects or crop regions in an image.

Coordinates are normalized to [0, 1] relative to image dimensions. Promoted to a Pydantic model so it can be embedded directly into Operation fields (e.g. KenBurns.start_region) and validated / serialised as part of an op's JSON wire format.

Source code in src/videopython/base/description.py
class BoundingBox(BaseModel):
    """A bounding box for detected objects or crop regions in an image.

    Coordinates are normalized to ``[0, 1]`` relative to image dimensions.
    Promoted to a Pydantic model so it can be embedded directly into
    ``Operation`` fields (e.g. ``KenBurns.start_region``) and validated /
    serialised as part of an op's JSON wire format.
    """

    model_config = ConfigDict(extra="forbid", frozen=True)

    x: float = Field(description="Left edge of the box, 0=left of the image.")
    y: float = Field(description="Top edge of the box, 0=top of the image.")
    width: float = Field(description="Width of the box, normalized to image width.")
    height: float = Field(description="Height of the box, normalized to image height.")

    @property
    def center(self) -> tuple[float, float]:
        """Center point of the bounding box."""
        return (self.x + self.width / 2, self.y + self.height / 2)

    @property
    def area(self) -> float:
        """Area of the bounding box (normalized)."""
        return self.width * self.height

    def to_dict(self) -> dict[str, Any]:
        """Backwards-compat alias for ``model_dump()``."""
        return self.model_dump()

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> BoundingBox:
        """Backwards-compat alias for ``model_validate(data)``."""
        return cls.model_validate(data)

center property

center: tuple[float, float]

Center point of the bounding box.

area property

area: float

Area of the bounding box (normalized).

to_dict

to_dict() -> dict[str, Any]

Backwards-compat alias for model_dump().

Source code in src/videopython/base/description.py
def to_dict(self) -> dict[str, Any]:
    """Backwards-compat alias for ``model_dump()``."""
    return self.model_dump()

from_dict classmethod

from_dict(data: dict[str, Any]) -> BoundingBox

Backwards-compat alias for model_validate(data).

Source code in src/videopython/base/description.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> BoundingBox:
    """Backwards-compat alias for ``model_validate(data)``."""
    return cls.model_validate(data)

DetectedObject

DetectedObject dataclass

An object detected in a video frame.

Attributes:

Name Type Description
label str

Name/class of the detected object (e.g., "person", "car", "dog")

confidence float

Detection confidence score between 0 and 1

bounding_box BoundingBox | None

Optional bounding box location of the object

Source code in src/videopython/base/description.py
@dataclass
class DetectedObject:
    """An object detected in a video frame.

    Attributes:
        label: Name/class of the detected object (e.g., "person", "car", "dog")
        confidence: Detection confidence score between 0 and 1
        bounding_box: Optional bounding box location of the object
    """

    label: str
    confidence: float
    bounding_box: BoundingBox | None = None

    def to_dict(self) -> dict[str, Any]:
        """Convert to dictionary for JSON serialization."""
        return {
            "label": self.label,
            "confidence": self.confidence,
            "bounding_box": self.bounding_box.to_dict() if self.bounding_box else None,
        }

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> DetectedObject:
        """Create DetectedObject from dictionary."""
        return cls(
            label=data["label"],
            confidence=data["confidence"],
            bounding_box=BoundingBox.from_dict(data["bounding_box"]) if data.get("bounding_box") else None,
        )

to_dict

to_dict() -> dict[str, Any]

Convert to dictionary for JSON serialization.

Source code in src/videopython/base/description.py
def to_dict(self) -> dict[str, Any]:
    """Convert to dictionary for JSON serialization."""
    return {
        "label": self.label,
        "confidence": self.confidence,
        "bounding_box": self.bounding_box.to_dict() if self.bounding_box else None,
    }

from_dict classmethod

from_dict(data: dict[str, Any]) -> DetectedObject

Create DetectedObject from dictionary.

Source code in src/videopython/base/description.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> DetectedObject:
    """Create DetectedObject from dictionary."""
    return cls(
        label=data["label"],
        confidence=data["confidence"],
        bounding_box=BoundingBox.from_dict(data["bounding_box"]) if data.get("bounding_box") else None,
    )

DetectedFace

DetectedFace dataclass

A face detected in a video frame.

Attributes:

Name Type Description
bounding_box BoundingBox | None

Bounding box location of the face (normalized 0-1 coordinates). May be None for cloud backends that only return face counts.

confidence float

Detection confidence score between 0 and 1

Source code in src/videopython/base/description.py
@dataclass
class DetectedFace:
    """A face detected in a video frame.

    Attributes:
        bounding_box: Bounding box location of the face (normalized 0-1 coordinates).
            May be None for cloud backends that only return face counts.
        confidence: Detection confidence score between 0 and 1
    """

    bounding_box: BoundingBox | None = None
    confidence: float = 1.0

    @property
    def center(self) -> tuple[float, float] | None:
        """Center point of the face bounding box, or None if no bounding box."""
        return self.bounding_box.center if self.bounding_box else None

    @property
    def area(self) -> float | None:
        """Area of the face bounding box (normalized), or None if no bounding box."""
        return self.bounding_box.area if self.bounding_box else None

    def to_dict(self) -> dict[str, Any]:
        """Convert to dictionary for JSON serialization."""
        return {
            "bounding_box": self.bounding_box.to_dict() if self.bounding_box else None,
            "confidence": self.confidence,
        }

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> DetectedFace:
        """Create DetectedFace from dictionary."""
        return cls(
            bounding_box=BoundingBox.from_dict(data["bounding_box"]) if data.get("bounding_box") else None,
            confidence=data.get("confidence", 1.0),
        )

center property

center: tuple[float, float] | None

Center point of the face bounding box, or None if no bounding box.

area property

area: float | None

Area of the face bounding box (normalized), or None if no bounding box.

to_dict

to_dict() -> dict[str, Any]

Convert to dictionary for JSON serialization.

Source code in src/videopython/base/description.py
def to_dict(self) -> dict[str, Any]:
    """Convert to dictionary for JSON serialization."""
    return {
        "bounding_box": self.bounding_box.to_dict() if self.bounding_box else None,
        "confidence": self.confidence,
    }

from_dict classmethod

from_dict(data: dict[str, Any]) -> DetectedFace

Create DetectedFace from dictionary.

Source code in src/videopython/base/description.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> DetectedFace:
    """Create DetectedFace from dictionary."""
    return cls(
        bounding_box=BoundingBox.from_dict(data["bounding_box"]) if data.get("bounding_box") else None,
        confidence=data.get("confidence", 1.0),
    )

DetectedText

DetectedText dataclass

Text detected in a video frame.

Attributes:

Name Type Description
text str

OCR text content

confidence float

Detection confidence score between 0 and 1

bounding_box BoundingBox | None

Optional normalized bounding box for the text region

Source code in src/videopython/base/description.py
@dataclass
class DetectedText:
    """Text detected in a video frame.

    Attributes:
        text: OCR text content
        confidence: Detection confidence score between 0 and 1
        bounding_box: Optional normalized bounding box for the text region
    """

    text: str
    confidence: float
    bounding_box: BoundingBox | None = None

    def to_dict(self) -> dict[str, Any]:
        """Convert to dictionary for JSON serialization."""
        return {
            "text": self.text,
            "confidence": self.confidence,
            "bounding_box": self.bounding_box.to_dict() if self.bounding_box else None,
        }

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> "DetectedText":
        """Create DetectedText from dictionary."""
        return cls(
            text=data["text"],
            confidence=data["confidence"],
            bounding_box=BoundingBox.from_dict(data["bounding_box"]) if data.get("bounding_box") else None,
        )

to_dict

to_dict() -> dict[str, Any]

Convert to dictionary for JSON serialization.

Source code in src/videopython/base/description.py
def to_dict(self) -> dict[str, Any]:
    """Convert to dictionary for JSON serialization."""
    return {
        "text": self.text,
        "confidence": self.confidence,
        "bounding_box": self.bounding_box.to_dict() if self.bounding_box else None,
    }

from_dict classmethod

from_dict(data: dict[str, Any]) -> 'DetectedText'

Create DetectedText from dictionary.

Source code in src/videopython/base/description.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "DetectedText":
    """Create DetectedText from dictionary."""
    return cls(
        text=data["text"],
        confidence=data["confidence"],
        bounding_box=BoundingBox.from_dict(data["bounding_box"]) if data.get("bounding_box") else None,
    )

FaceTrack

FaceTrack dataclass

A face tracked across consecutive frames within a single shot.

Tracks are produced by IoU association — no embedding re-id, so a track does not survive across shot/scene boundaries. frame_indices and boxes are parallel lists of equal length.

Attributes:

Name Type Description
track_id int

Stable id within the shot the track was produced in. Not globally unique across scenes.

frame_indices list[int]

Source-video frame indices for each detection.

boxes list[BoundingBox]

Per-frame bounding boxes (normalized 0-1 coords).

confidences list[float]

Per-frame detection confidence in [0, 1].

Source code in src/videopython/base/description.py
@dataclass
class FaceTrack:
    """A face tracked across consecutive frames within a single shot.

    Tracks are produced by IoU association — no embedding re-id, so a
    track does not survive across shot/scene boundaries. ``frame_indices``
    and ``boxes`` are parallel lists of equal length.

    Attributes:
        track_id: Stable id within the shot the track was produced in.
            Not globally unique across scenes.
        frame_indices: Source-video frame indices for each detection.
        boxes: Per-frame bounding boxes (normalized 0-1 coords).
        confidences: Per-frame detection confidence in [0, 1].
    """

    track_id: int
    frame_indices: list[int]
    boxes: list[BoundingBox]
    confidences: list[float] = field(default_factory=list)

    @property
    def length(self) -> int:
        """Number of frames in this track."""
        return len(self.frame_indices)

    def to_dict(self) -> dict[str, Any]:
        return {
            "track_id": self.track_id,
            "frame_indices": list(self.frame_indices),
            "boxes": [box.to_dict() for box in self.boxes],
            "confidences": list(self.confidences),
        }

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> "FaceTrack":
        return cls(
            track_id=int(data["track_id"]),
            frame_indices=[int(i) for i in data.get("frame_indices", [])],
            boxes=[BoundingBox.from_dict(b) for b in data.get("boxes", [])],
            confidences=[float(c) for c in data.get("confidences", [])],
        )

length property

length: int

Number of frames in this track.

AudioEvent

AudioEvent dataclass

A detected audio event with timestamp.

Attributes:

Name Type Description
start float

Start time in seconds

end float

End time in seconds

label str

Name of the detected sound (e.g., "Music", "Speech", "Dog bark")

confidence float

Detection confidence score between 0 and 1

Source code in src/videopython/base/description.py
@dataclass
class AudioEvent:
    """A detected audio event with timestamp.

    Attributes:
        start: Start time in seconds
        end: End time in seconds
        label: Name of the detected sound (e.g., "Music", "Speech", "Dog bark")
        confidence: Detection confidence score between 0 and 1
    """

    start: float
    end: float
    label: str
    confidence: float

    @property
    def duration(self) -> float:
        """Duration of the audio event in seconds."""
        return self.end - self.start

    def to_dict(self) -> dict[str, Any]:
        """Convert to dictionary for JSON serialization."""
        return {
            "start": self.start,
            "end": self.end,
            "label": self.label,
            "confidence": self.confidence,
        }

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> AudioEvent:
        """Create AudioEvent from dictionary."""
        return cls(
            start=data["start"],
            end=data["end"],
            label=data["label"],
            confidence=data["confidence"],
        )

duration property

duration: float

Duration of the audio event in seconds.

to_dict

to_dict() -> dict[str, Any]

Convert to dictionary for JSON serialization.

Source code in src/videopython/base/description.py
def to_dict(self) -> dict[str, Any]:
    """Convert to dictionary for JSON serialization."""
    return {
        "start": self.start,
        "end": self.end,
        "label": self.label,
        "confidence": self.confidence,
    }

from_dict classmethod

from_dict(data: dict[str, Any]) -> AudioEvent

Create AudioEvent from dictionary.

Source code in src/videopython/base/description.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> AudioEvent:
    """Create AudioEvent from dictionary."""
    return cls(
        start=data["start"],
        end=data["end"],
        label=data["label"],
        confidence=data["confidence"],
    )

AudioClassification

AudioClassification dataclass

Complete audio classification results.

Attributes:

Name Type Description
events list[AudioEvent]

List of detected audio events with timestamps

clip_predictions dict[str, float]

Overall class probabilities for the entire audio clip

Source code in src/videopython/base/description.py
@dataclass
class AudioClassification:
    """Complete audio classification results.

    Attributes:
        events: List of detected audio events with timestamps
        clip_predictions: Overall class probabilities for the entire audio clip
    """

    events: list[AudioEvent]
    clip_predictions: dict[str, float] = field(default_factory=dict)

    def to_dict(self) -> dict[str, Any]:
        """Convert to dictionary for JSON serialization."""
        return {
            "events": [event.to_dict() for event in self.events],
            "clip_predictions": self.clip_predictions,
        }

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> "AudioClassification":
        """Create AudioClassification from dictionary."""
        return cls(
            events=[AudioEvent.from_dict(event) for event in data.get("events", [])],
            clip_predictions={k: float(v) for k, v in data.get("clip_predictions", {}).items()},
        )

to_dict

to_dict() -> dict[str, Any]

Convert to dictionary for JSON serialization.

Source code in src/videopython/base/description.py
def to_dict(self) -> dict[str, Any]:
    """Convert to dictionary for JSON serialization."""
    return {
        "events": [event.to_dict() for event in self.events],
        "clip_predictions": self.clip_predictions,
    }

from_dict classmethod

from_dict(data: dict[str, Any]) -> 'AudioClassification'

Create AudioClassification from dictionary.

Source code in src/videopython/base/description.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "AudioClassification":
    """Create AudioClassification from dictionary."""
    return cls(
        events=[AudioEvent.from_dict(event) for event in data.get("events", [])],
        clip_predictions={k: float(v) for k, v in data.get("clip_predictions", {}).items()},
    )