Skip to content

Video Analysis

Create a single, serializable, scene-first analysis object.

Overview

VideoAnalyzer runs global passes (transcription + scene detection), then for each detected scene runs the scene-VLM, audio classifier, and per-shot face tracker.

VideoAnalysis is a Pydantic model, so all BaseModel serialization methods are available — model_dump(), model_dump_json(), model_validate(), model_validate_json(). For the common file-I/O case, the convenience wrappers analysis.save(path) and VideoAnalysis.load(path) go through model_dump_json / model_validate_json with UTF-8 + parent directory creation.

The output is centered on analysis.scenes.samples (one payload per scene).

Basic Usage

from videopython.ai import VideoAnalyzer

analyzer = VideoAnalyzer()
analysis = analyzer.analyze_path("video.mp4")

print(analysis.source.title)
if analysis.scenes:
    sample = analysis.scenes.samples[0]
    if sample.scene_description:
        print(sample.scene_description.caption)
        print(sample.scene_description.subjects)
        print(sample.scene_description.shot_type)
    if sample.faces:
        for track in sample.faces:
            print(f"track #{track.track_id}: {track.length} frames")

# Persist results
analysis.save("video_analysis.json")

# Load later
loaded = analysis.load("video_analysis.json")
print(loaded.run_info.mode)

Configure Analysis

Pick which analyzers run, and forward kwargs to their constructors via analyzer_params:

from videopython.ai import VideoAnalysisConfig, VideoAnalyzer

config = VideoAnalysisConfig(
    enabled_analyzers={
        "audio_to_text",
        "semantic_scene_detector",
        "scene_vlm",
        "face_tracker",
    },
    analyzer_params={
        "scene_vlm": {"model_size": "9b"},   # default is "4b"
        "audio_to_text": {
            "model_name": "large",
            "vocabulary": ["Klarna", "Allegro", "InPost"],  # brand-name biasing
        },
    },
)

analyzer = VideoAnalyzer(config=config, sampling="medium")
analysis = analyzer.analyze_path("video.mp4")

Sampling Presets

VideoAnalyzer(sampling=...) controls the per-scene SceneVLM frame budget. The preset tunes the per-scene frame cap, the log-curve scale/base used to size short scenes, and the threshold below which adjacent short scenes get merged into a single VLM call:

sampling per-scene frame cap adjacent-merge threshold typical use
"low" 8 20s quick previews, long videos
"medium" (default) 30 10s balanced default
"high" 60 4s rich analysis, talking-head depth

model_size and sampling are orthogonal kwargs. model_size="4b" + sampling="high" is the rich-analysis pairing; model_size="4b" + sampling="low" is a fast preview.

Rich Understanding Preset

Use the built-in preset when you want broad understanding coverage across many video types:

from videopython.ai import VideoAnalysisConfig, VideoAnalyzer

config = VideoAnalysisConfig.rich_understanding_preset()
analysis = VideoAnalyzer(config=config).analyze_path("video.mp4")

The preset enables every analyzer (audio_to_text, audio_classifier, semantic_scene_detector, scene_vlm, face_tracker) and is equivalent to bare VideoAnalysisConfig().

Output Shape

  • analysis.audio.transcription — full Whisper transcription.
  • analysis.scenes.samples — list of SceneAnalysisSample, one per scene. Each sample carries:
  • scene timing (start_second, end_second, start_frame, end_frame)
  • scene_description: SceneDescription | None — the structured SceneVLM output (caption + subjects + shot_type). None when the VLM was disabled or its forward pass failed.
  • audio_classification: AudioClassification | None — events and clip-level predictions for the scene window.
  • faces: list[FaceTrack] | None — per-shot IoU-associated face tracks. One list per scene, each track carrying its own per-frame indices and bounding boxes.
  • analysis.run_info.stage_durations_seconds — wall-clock time per stage (whisper, scene_detection, scene_vlm, face_tracker, audio_classification, plus whisper_and_scene_detection_parallel when those two run together).

Classes

VideoAnalysisConfig

Bases: BaseModel

Execution config for scene-first analysis runs.

analyzer_params lets you forward keyword arguments to each predictor constructor keyed by analyzer id. For example::

VideoAnalysisConfig(
    analyzer_params={
        "audio_to_text": {"model_name": "large"},
        "scene_vlm": {"model_size": "9b"},
    }
)
Source code in src/videopython/ai/video_analysis/models.py
class VideoAnalysisConfig(BaseModel):
    """Execution config for scene-first analysis runs.

    ``analyzer_params`` lets you forward keyword arguments to each predictor
    constructor keyed by analyzer id.  For example::

        VideoAnalysisConfig(
            analyzer_params={
                "audio_to_text": {"model_name": "large"},
                "scene_vlm": {"model_size": "9b"},
            }
        )
    """

    enabled_analyzers: set[str] = Field(default_factory=lambda: set(ALL_ANALYZER_IDS))
    analyzer_params: dict[str, dict[str, Any]] = Field(default_factory=dict)

    @model_validator(mode="after")
    def _reject_unknown_analyzer_ids(self) -> VideoAnalysisConfig:
        unknown_enabled = sorted(set(self.enabled_analyzers) - set(ALL_ANALYZER_IDS))
        if unknown_enabled:
            raise ValueError(f"Unknown analyzer ids in enabled_analyzers: {unknown_enabled}")
        unknown_params = sorted(set(self.analyzer_params) - set(ALL_ANALYZER_IDS))
        if unknown_params:
            raise ValueError(f"Unknown analyzer ids in analyzer_params: {unknown_params}")
        return self

    def get_params(self, analyzer_id: str) -> dict[str, Any]:
        """Return kwargs dict for the given analyzer, defaulting to empty."""
        return dict(self.analyzer_params.get(analyzer_id, {}))

    @classmethod
    def rich_understanding_preset(cls) -> VideoAnalysisConfig:
        """Backward-compatible alias for the default config."""
        return cls()

get_params

get_params(analyzer_id: str) -> dict[str, Any]

Return kwargs dict for the given analyzer, defaulting to empty.

Source code in src/videopython/ai/video_analysis/models.py
def get_params(self, analyzer_id: str) -> dict[str, Any]:
    """Return kwargs dict for the given analyzer, defaulting to empty."""
    return dict(self.analyzer_params.get(analyzer_id, {}))

rich_understanding_preset classmethod

rich_understanding_preset() -> VideoAnalysisConfig

Backward-compatible alias for the default config.

Source code in src/videopython/ai/video_analysis/models.py
@classmethod
def rich_understanding_preset(cls) -> VideoAnalysisConfig:
    """Backward-compatible alias for the default config."""
    return cls()

VideoAnalyzer

Orchestrates scene-first analyzers and builds VideoAnalysis output.

sampling controls how aggressively the SceneVLM samples frames per scene. low is a fast preview pass for long videos, high keeps talking-head depth, medium is the previous default. The preset tunes the per-scene frame cap, the log-curve scale/base used to size short scenes, and the threshold below which adjacent scenes get merged into one VLM call.

sampling and the SceneVLM tier are orthogonal: small models can't make use of dense sampling, but the user owns that tradeoff.

Source code in src/videopython/ai/video_analysis/analyzer.py
class VideoAnalyzer:
    """Orchestrates scene-first analyzers and builds `VideoAnalysis` output.

    ``sampling`` controls how aggressively the SceneVLM samples frames per
    scene. ``low`` is a fast preview pass for long videos, ``high`` keeps
    talking-head depth, ``medium`` is the previous default. The preset
    tunes the per-scene frame cap, the log-curve scale/base used to size
    short scenes, and the threshold below which adjacent scenes get
    merged into one VLM call.

    ``sampling`` and the SceneVLM ``tier`` are orthogonal: small models
    can't make use of dense sampling, but the user owns that tradeoff.
    """

    def __init__(
        self,
        config: VideoAnalysisConfig | None = None,
        *,
        sampling: SamplingPreset = DEFAULT_SAMPLING_PRESET,
    ):
        if sampling not in SAMPLING_PRESETS:
            supported = ", ".join(SAMPLING_PRESETS)
            raise ValueError(f"sampling must be one of: {supported}")
        self.config = config or VideoAnalysisConfig()
        self.sampling: SamplingPreset = sampling
        self._sampling_profile = SAMPLING_PRESETS[sampling]

    def analyze_path(self, path: str | Path) -> VideoAnalysis:
        """Analyze a video path in scene-first mode."""
        path_obj = Path(path)
        metadata = VideoMetadata.from_path(path_obj)
        source = self._build_source(
            metadata=metadata,
            path_obj=path_obj,
            duration_seconds=metadata.total_seconds,
            title_fallback=path_obj.stem,
        )
        return self._analyze(video=None, source_path=path_obj, metadata=metadata, source=source)

    def analyze(self, video: Video, *, source_path: str | Path | None = None) -> VideoAnalysis:
        """Analyze an in-memory `Video` object."""
        path_obj = Path(source_path) if source_path else None
        metadata = VideoMetadata.from_video(video)
        source = self._build_source(
            metadata=metadata,
            path_obj=path_obj,
            duration_seconds=video.total_seconds,
            title_fallback=path_obj.stem if path_obj is not None else None,
        )
        return self._analyze(
            video=video,
            source_path=path_obj,
            metadata=metadata,
            source=source,
        )

    def _analyze(
        self,
        *,
        video: Video | None,
        source_path: Path | None,
        metadata: VideoMetadata,
        source: VideoAnalysisSource,
    ) -> VideoAnalysis:
        mode = "path" if source_path is not None else "video"
        if source_path is None and video is None:
            raise ValueError("Either `source_path` or `video` must be provided")

        enabled = self.config.enabled_analyzers

        run_info = AnalysisRunInfo(
            created_at=stages.utc_now_iso(),
            mode=mode,
            library_version=stages.library_version(),
        )

        t_analysis_start = time.perf_counter()

        run_whisper = AUDIO_TO_TEXT in enabled
        run_scene_det = SEMANTIC_SCENE_DETECTOR in enabled

        transcription = None
        detected: list[SceneBoundary] | None = None

        # SceneVLM is loaded *after* Whisper/TransNetV2 finish (not concurrently)
        # because transformers' from_pretrained(torch_dtype="auto") mutates the
        # process-global torch.get_default_dtype() during model construction,
        # which corrupts Whisper's model weights if they're initialized at the
        # same time.
        if run_whisper and run_scene_det:
            transcription, detected = stages.run_whisper_and_scene_detection(
                config=self.config, source_path=source_path, video=video, run_info=run_info
            )
        else:
            if run_whisper:
                with stages.record_stage(run_info, "whisper"):
                    transcription = stages.run_whisper(config=self.config, source_path=source_path, video=video)

            if run_scene_det:
                with stages.record_stage(run_info, "scene_detection"):
                    detected = stages.run_scene_detection(config=self.config, source_path=source_path, video=video)

        if run_scene_det:
            stages.reset_transnetv2_torch_state()

        # Whisper and TransNetV2 are done -- free their GPU memory before
        # loading SceneVLM (~9GB). Python GC doesn't guarantee immediate
        # cleanup, so force it and release the CUDA cache.
        if run_whisper or run_scene_det:
            gc.collect()
            stages.release_gpu_cache()

        scenes = self._default_scene_boundaries(metadata)
        if detected is not None:
            scenes = self._normalize_scene_boundaries(detected, metadata)

        if not scenes:
            scenes = self._default_scene_boundaries(metadata)

        with stages.record_stage(run_info, "scene_analysis"):
            scene_section = self._analyze_scenes(
                source_path=source_path,
                video=video,
                metadata=metadata,
                scenes=scenes,
                run_info=run_info,
            )

        audio_section = AudioAnalysisSection(transcription=transcription) if transcription is not None else None

        run_info.total_duration_seconds = time.perf_counter() - t_analysis_start
        logger.info("Total analysis completed in %.2fs", run_info.total_duration_seconds)
        return VideoAnalysis(
            source=source,
            config=self.config,
            run_info=run_info,
            audio=audio_section,
            scenes=scene_section if scene_section.samples else None,
        )

    def _analyze_scenes(
        self,
        *,
        source_path: Path | None,
        video: Video | None,
        metadata: VideoMetadata,
        scenes: list[SceneBoundary],
        run_info: AnalysisRunInfo,
    ) -> SceneAnalysisSection:
        enabled = self.config.enabled_analyzers

        scene_vlm: SceneVLM | None
        try:
            scene_vlm = SceneVLM(**self.config.get_params(SCENE_VLM)) if SCENE_VLM in enabled else None
        except (ImportError, OSError, RuntimeError, ValueError):
            logger.warning("Failed to initialize SceneVLM, skipping visual understanding", exc_info=True)
            scene_vlm = None

        try:
            audio_classifier = (
                AudioClassifier(**self.config.get_params(AUDIO_CLASSIFIER)) if AUDIO_CLASSIFIER in enabled else None
            )
        except (ImportError, OSError, RuntimeError, ValueError):
            logger.warning("Failed to initialize AudioClassifier, skipping audio classification", exc_info=True)
            audio_classifier = None

        face_tracker: FaceTracker | None = None
        if FACE_TRACKER in enabled:
            try:
                face_tracker = FaceTracker(**self.config.get_params(FACE_TRACKER))
            except (ImportError, OSError, RuntimeError, ValueError):
                logger.warning("Failed to initialize FaceTracker, skipping face tracks", exc_info=True)
                face_tracker = None

        path_audio: Audio | None = None
        if audio_classifier is not None and source_path is not None:
            try:
                path_audio = Audio.from_path(source_path)
            except (OSError, RuntimeError, ValueError):
                logger.warning(
                    "Failed to load audio from path, audio classification will use clip fallback",
                    exc_info=True,
                )
                path_audio = None

        descriptions: list[SceneDescription | None] = [None] * len(scenes)
        if scene_vlm is not None:
            with stages.record_stage(run_info, "scene_vlm"):
                try:
                    descriptions = stages.run_scene_vlm_batched(
                        scene_vlm=scene_vlm,
                        profile=self._sampling_profile,
                        sampling=self.sampling,
                        source_path=source_path,
                        video=video,
                        metadata=metadata,
                        scenes=scenes,
                    )
                except (IndexError, OSError, RuntimeError, ValueError):
                    logger.warning("Batched SceneVLM failed, skipping visual understanding", exc_info=True)

        samples: list[SceneAnalysisSample] = []
        audio_ctx = (
            stages.record_stage(run_info, "audio_classification") if audio_classifier is not None else nullcontext()
        )
        face_ctx = stages.record_stage(run_info, "face_tracker") if face_tracker is not None else nullcontext()
        with audio_ctx, face_ctx:
            for index, scene in enumerate(scenes):
                sample = SceneAnalysisSample(
                    scene_index=index,
                    start_second=float(scene.start),
                    end_second=float(scene.end),
                    start_frame=int(scene.start_frame),
                    end_frame=int(scene.end_frame),
                    scene_description=descriptions[index],
                )

                if audio_classifier is not None:
                    try:
                        scene_clip: Video | None = None
                        if path_audio is None:
                            try:
                                scene_clip = self._load_scene_video_clip(
                                    source_path=source_path,
                                    video=video,
                                    start_second=scene.start,
                                    end_second=scene.end,
                                )
                            except (OSError, RuntimeError, ValueError):
                                scene_clip = None
                        sample.audio_classification = stages.run_scene_audio_classification(
                            audio_classifier=audio_classifier,
                            path_audio=path_audio,
                            scene_clip=scene_clip,
                            scene_start=scene.start,
                            scene_end=scene.end,
                        )
                    except (OSError, RuntimeError, ValueError):
                        logger.warning(
                            "AudioClassifier failed for scene %d (%.1f-%.1fs)",
                            index,
                            scene.start,
                            scene.end,
                            exc_info=True,
                        )

                if face_tracker is not None:
                    try:
                        sample.faces = stages.run_scene_face_tracker(
                            face_tracker=face_tracker,
                            source_path=source_path,
                            video=video,
                            metadata=metadata,
                            scene=scene,
                        )
                    except (IndexError, OSError, RuntimeError, ValueError):
                        logger.warning(
                            "FaceTracker failed for scene %d (%.1f-%.1fs)",
                            index,
                            scene.start,
                            scene.end,
                            exc_info=True,
                        )

                samples.append(sample)

        return SceneAnalysisSection(samples=samples)

    def _load_scene_video_clip(
        self,
        *,
        source_path: Path | None,
        video: Video | None,
        start_second: float,
        end_second: float,
    ) -> Video | None:
        if end_second <= start_second:
            return None
        if source_path is not None:
            return Video.from_path(str(source_path), start_second=start_second, end_second=end_second)
        from videopython.editing.transforms import CutSeconds

        return CutSeconds(start=start_second, end=end_second).apply(stages.require_video(video))

    def _default_scene_boundaries(self, metadata: VideoMetadata) -> list[SceneBoundary]:
        if metadata.total_seconds <= 0 or metadata.frame_count <= 0:
            return []
        return [
            SceneBoundary(
                start=0.0,
                end=float(metadata.total_seconds),
                start_frame=0,
                end_frame=int(metadata.frame_count),
            )
        ]

    def _normalize_scene_boundaries(self, scenes: list[SceneBoundary], metadata: VideoMetadata) -> list[SceneBoundary]:
        normalized: list[SceneBoundary] = []
        max_time = float(metadata.total_seconds)
        max_frame = int(metadata.frame_count)

        for item in scenes:
            start = max(0.0, min(max_time, float(item.start)))
            end = max(0.0, min(max_time, float(item.end)))
            if end <= start:
                continue

            start_frame = int(item.start_frame)
            end_frame = int(item.end_frame)
            start_frame = max(0, min(max_frame, start_frame))
            end_frame = max(0, min(max_frame, end_frame))
            if end_frame <= start_frame:
                start_frame = int(round(start * metadata.fps))
                end_frame = max(start_frame + 1, int(round(end * metadata.fps)))
                start_frame = max(0, min(max_frame, start_frame))
                end_frame = max(0, min(max_frame, end_frame))
                if end_frame <= start_frame:
                    continue

            normalized.append(
                SceneBoundary(
                    start=round(start, 6),
                    end=round(end, 6),
                    start_frame=start_frame,
                    end_frame=end_frame,
                )
            )

        normalized.sort(key=lambda scene: (scene.start, scene.end))
        return normalized

    def _build_source(
        self,
        *,
        metadata: VideoMetadata,
        path_obj: Path | None,
        duration_seconds: float,
        title_fallback: str | None,
    ) -> VideoAnalysisSource:
        tags = self._extract_source_tags(path_obj) if path_obj else {}
        creation_time = _normalize_creation_time(
            next((tags[key] for key in _CREATION_TIME_TAG_KEYS if key in tags), None)
        )
        geo = _parse_geo_metadata(tags)
        title = tags.get("title") or title_fallback

        return VideoAnalysisSource(
            title=title,
            path=str(path_obj) if path_obj else None,
            filename=path_obj.name if path_obj else None,
            duration=duration_seconds,
            fps=metadata.fps,
            width=metadata.width,
            height=metadata.height,
            frame_count=metadata.frame_count,
            creation_time=creation_time,
            geo=geo,
            raw_tags=tags or None,
        )

    def _extract_source_tags(self, path: Path | None) -> dict[str, str]:
        if path is None:
            return {}

        try:
            payload = _ffmpeg.probe(path, extra_args=["-show_entries", "format_tags:stream_tags"])
        except (FFmpegProbeError, OSError):
            return {}

        tags: dict[str, str] = {}

        format_tags = payload.get("format", {}).get("tags", {})
        if isinstance(format_tags, dict):
            tags.update({str(k).lower(): str(v) for k, v in format_tags.items()})

        for stream in payload.get("streams", []):
            stream_tags = stream.get("tags", {})
            if not isinstance(stream_tags, dict):
                continue
            for key, value in stream_tags.items():
                lowered = str(key).lower()
                tags.setdefault(lowered, str(value))

        return tags

analyze_path

analyze_path(path: str | Path) -> VideoAnalysis

Analyze a video path in scene-first mode.

Source code in src/videopython/ai/video_analysis/analyzer.py
def analyze_path(self, path: str | Path) -> VideoAnalysis:
    """Analyze a video path in scene-first mode."""
    path_obj = Path(path)
    metadata = VideoMetadata.from_path(path_obj)
    source = self._build_source(
        metadata=metadata,
        path_obj=path_obj,
        duration_seconds=metadata.total_seconds,
        title_fallback=path_obj.stem,
    )
    return self._analyze(video=None, source_path=path_obj, metadata=metadata, source=source)

analyze

analyze(
    video: Video, *, source_path: str | Path | None = None
) -> VideoAnalysis

Analyze an in-memory Video object.

Source code in src/videopython/ai/video_analysis/analyzer.py
def analyze(self, video: Video, *, source_path: str | Path | None = None) -> VideoAnalysis:
    """Analyze an in-memory `Video` object."""
    path_obj = Path(source_path) if source_path else None
    metadata = VideoMetadata.from_video(video)
    source = self._build_source(
        metadata=metadata,
        path_obj=path_obj,
        duration_seconds=video.total_seconds,
        title_fallback=path_obj.stem if path_obj is not None else None,
    )
    return self._analyze(
        video=video,
        source_path=path_obj,
        metadata=metadata,
        source=source,
    )

VideoAnalysis

Bases: BaseModel

Serializable aggregate scene-first analysis result for one video.

Source code in src/videopython/ai/video_analysis/models.py
class VideoAnalysis(BaseModel):
    """Serializable aggregate scene-first analysis result for one video."""

    source: VideoAnalysisSource
    config: VideoAnalysisConfig
    run_info: AnalysisRunInfo
    audio: AudioAnalysisSection | None = None
    scenes: SceneAnalysisSection | None = None

    def save(self, path: str | Path, *, indent: int | None = 2) -> None:
        path_obj = Path(path)
        path_obj.parent.mkdir(parents=True, exist_ok=True)
        path_obj.write_text(self.model_dump_json(indent=indent), encoding="utf-8")

    @classmethod
    def load(cls, path: str | Path) -> VideoAnalysis:
        return cls.model_validate_json(Path(path).read_text(encoding="utf-8"))