Skip to content

Video Analysis

Create a single, serializable, scene-first analysis object.

Overview

VideoAnalyzer runs global passes (transcription + scene detection), then analyzes each scene window.

VideoAnalysis can be serialized with:

  • to_dict() / from_dict()
  • to_json() / from_json()
  • save() / load()

The output is centered on analysis.scenes.samples (one payload per scene).

Basic Usage

from videopython.ai import VideoAnalyzer

analyzer = VideoAnalyzer()
analysis = analyzer.analyze_path("video.mp4")

print(analysis.source.title)
print(len(analysis.scenes.samples) if analysis.scenes else 0)
print(analysis.scenes.samples[0].visual_segments if analysis.scenes else [])

# Persist results
analysis.save("video_analysis.json")

# Load later
loaded = analysis.load("video_analysis.json")
print(loaded.run_info.mode)

Configure Analysis

from videopython.ai import VideoAnalysisConfig, VideoAnalyzer

config = VideoAnalysisConfig(
    enabled_analyzers={"audio_to_text", "semantic_scene_detector", "scene_vlm"},
)

analyzer = VideoAnalyzer(config=config)
analysis = analyzer.analyze_path("video.mp4")

Rich Understanding Preset

Use the built-in preset when you want broad understanding coverage across many video types:

from videopython.ai import VideoAnalysisConfig, VideoAnalyzer

config = VideoAnalysisConfig.rich_understanding_preset()
analysis = VideoAnalyzer(config=config).analyze_path("video.mp4")

The preset is equivalent to VideoAnalysisConfig().

Notes on Output Fields

  • Use enabled_analyzers to run a subset of steps.
  • Full transcription is available at analysis.audio.transcription.
  • Scene payload lives in analysis.scenes.samples.
  • Each sample includes:
  • scene timing (start_second, end_second, optional frame bounds)
  • chunked visual segments from scene VLM (visual_segments)
  • each visual segment covers up to _SCENE_VLM_MAX_SEGMENT_SECONDS and uses _SCENE_VLM_FRAMES_PER_SEGMENT frames
  • scene-scoped actions and audio classification
  • Scene VLM model size and max segment window are hard-coded module constants by design.

Classes

VideoAnalysisConfig dataclass

Execution config for scene-first analysis runs.

analyzer_params lets you forward keyword arguments to each predictor constructor keyed by analyzer id. For example::

VideoAnalysisConfig(
    analyzer_params={
        "audio_to_text": {"model_name": "large"},
        "scene_vlm": {"model_size": "2b"},
    }
)
Source code in src/videopython/ai/video_analysis.py
@dataclass
class VideoAnalysisConfig:
    """Execution config for scene-first analysis runs.

    ``analyzer_params`` lets you forward keyword arguments to each predictor
    constructor keyed by analyzer id.  For example::

        VideoAnalysisConfig(
            analyzer_params={
                "audio_to_text": {"model_name": "large"},
                "scene_vlm": {"model_size": "2b"},
            }
        )
    """

    enabled_analyzers: set[str] = field(default_factory=lambda: set(ALL_ANALYZER_IDS))
    analyzer_params: dict[str, dict[str, Any]] = field(default_factory=dict)

    def __post_init__(self) -> None:
        unknown_enabled = sorted(set(self.enabled_analyzers) - set(ALL_ANALYZER_IDS))
        if unknown_enabled:
            raise ValueError(f"Unknown analyzer ids in enabled_analyzers: {unknown_enabled}")
        unknown_params = sorted(set(self.analyzer_params) - set(ALL_ANALYZER_IDS))
        if unknown_params:
            raise ValueError(f"Unknown analyzer ids in analyzer_params: {unknown_params}")

    def get_params(self, analyzer_id: str) -> dict[str, Any]:
        """Return kwargs dict for the given analyzer, defaulting to empty."""
        return dict(self.analyzer_params.get(analyzer_id, {}))

    @classmethod
    def rich_understanding_preset(cls) -> "VideoAnalysisConfig":
        """Backward-compatible alias for the default config."""
        return cls()

    def to_dict(self) -> dict[str, Any]:
        result: dict[str, Any] = {
            "enabled_analyzers": sorted(self.enabled_analyzers),
        }
        if self.analyzer_params:
            result["analyzer_params"] = self.analyzer_params
        return result

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> "VideoAnalysisConfig":
        enabled_raw = data.get("enabled_analyzers")
        return cls(
            enabled_analyzers=set(enabled_raw) if enabled_raw is not None else set(ALL_ANALYZER_IDS),
            analyzer_params=data.get("analyzer_params", {}),
        )

get_params

get_params(analyzer_id: str) -> dict[str, Any]

Return kwargs dict for the given analyzer, defaulting to empty.

Source code in src/videopython/ai/video_analysis.py
def get_params(self, analyzer_id: str) -> dict[str, Any]:
    """Return kwargs dict for the given analyzer, defaulting to empty."""
    return dict(self.analyzer_params.get(analyzer_id, {}))

rich_understanding_preset classmethod

rich_understanding_preset() -> 'VideoAnalysisConfig'

Backward-compatible alias for the default config.

Source code in src/videopython/ai/video_analysis.py
@classmethod
def rich_understanding_preset(cls) -> "VideoAnalysisConfig":
    """Backward-compatible alias for the default config."""
    return cls()

VideoAnalyzer

Orchestrates scene-first analyzers and builds VideoAnalysis output.

Source code in src/videopython/ai/video_analysis.py
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
class VideoAnalyzer:
    """Orchestrates scene-first analyzers and builds `VideoAnalysis` output."""

    def __init__(self, config: VideoAnalysisConfig | None = None):
        self.config = config or VideoAnalysisConfig()

    def analyze_path(self, path: str | Path) -> VideoAnalysis:
        """Analyze a video path in scene-first mode."""
        path_obj = Path(path)
        metadata = VideoMetadata.from_path(path_obj)
        source = self._build_source(
            metadata=metadata,
            path_obj=path_obj,
            duration_seconds=metadata.total_seconds,
            title_fallback=path_obj.stem,
        )
        return self._analyze(video=None, source_path=path_obj, metadata=metadata, source=source)

    def analyze(self, video: Video, *, source_path: str | Path | None = None) -> VideoAnalysis:
        """Analyze an in-memory `Video` object."""
        path_obj = Path(source_path) if source_path else None
        metadata = VideoMetadata.from_video(video)
        source = self._build_source(
            metadata=metadata,
            path_obj=path_obj,
            duration_seconds=video.total_seconds,
            title_fallback=path_obj.stem if path_obj is not None else None,
        )
        return self._analyze(
            video=video,
            source_path=path_obj,
            metadata=metadata,
            source=source,
        )

    def _analyze(
        self,
        *,
        video: Video | None,
        source_path: Path | None,
        metadata: VideoMetadata,
        source: VideoAnalysisSource,
    ) -> VideoAnalysis:
        mode = "path" if source_path is not None else "video"
        if source_path is None and video is None:
            raise ValueError("Either `source_path` or `video` must be provided")

        enabled = self.config.enabled_analyzers

        run_info = AnalysisRunInfo(
            created_at=_utc_now_iso(),
            mode=mode,
            library_version=_library_version(),
        )

        t_analysis_start = time.perf_counter()

        run_whisper = AUDIO_TO_TEXT in enabled
        run_scene_det = SEMANTIC_SCENE_DETECTOR in enabled

        transcription: Transcription | None = None
        detected: list[SceneBoundary] | None = None

        # Whisper and TransNetV2 operate on independent data (audio vs video
        # frames) and both fit comfortably in GPU memory together. Run them
        # concurrently via threads -- the GIL is released during GPU compute
        # and ffmpeg I/O so real parallelism is achieved.
        #
        # SceneVLM is loaded *after* Whisper/TransNetV2 finish (not concurrently)
        # because transformers' from_pretrained(torch_dtype="auto") mutates the
        # process-global torch.get_default_dtype() during model construction,
        # which corrupts Whisper's model weights if they're initialized at the
        # same time.
        if run_whisper and run_scene_det:
            transcription, detected = self._run_whisper_and_scene_detection(source_path=source_path, video=video)
        else:
            if run_whisper:
                t0 = time.perf_counter()
                transcription = self._run_whisper(source_path=source_path, video=video)
                logger.info("Whisper transcription completed in %.2fs", time.perf_counter() - t0)

            if run_scene_det:
                t0 = time.perf_counter()
                detected = self._run_scene_detection(source_path=source_path, video=video)
                logger.info("Scene detection completed in %.2fs", time.perf_counter() - t0)

        if run_scene_det:
            self._reset_transnetv2_torch_state()

        # Whisper and TransNetV2 are done -- free their GPU memory before
        # loading SceneVLM (~9GB). Python GC doesn't guarantee immediate
        # cleanup, so force it and release the CUDA cache.
        if run_whisper or run_scene_det:
            gc.collect()
            self._release_gpu_cache()

        scenes = self._default_scene_boundaries(metadata)
        if detected is not None:
            scenes = self._normalize_scene_boundaries(detected, metadata)

        if not scenes:
            scenes = self._default_scene_boundaries(metadata)

        t0 = time.perf_counter()
        scene_section = self._analyze_scenes(
            source_path=source_path,
            video=video,
            metadata=metadata,
            scenes=scenes,
            preloaded_scene_vlm=None,
        )
        logger.info("Scene analysis completed in %.2fs", time.perf_counter() - t0)

        audio_section = AudioAnalysisSection(transcription=transcription) if transcription is not None else None

        logger.info("Total analysis completed in %.2fs", time.perf_counter() - t_analysis_start)
        return VideoAnalysis(
            source=source,
            config=self.config,
            run_info=run_info,
            audio=audio_section,
            scenes=scene_section if scene_section.samples else None,
        )

    def _run_whisper(self, *, source_path: Path | None, video: Video | None) -> Transcription | None:
        try:
            return AudioToText(**self.config.get_params(AUDIO_TO_TEXT)).transcribe(
                Audio.from_path(source_path) if source_path is not None else _require_video(video)
            )
        except Exception:
            logger.warning("AudioToText failed, skipping transcription", exc_info=True)
            return None

    def _run_scene_detection(self, *, source_path: Path | None, video: Video | None) -> list[SceneBoundary] | None:
        try:
            scene_detector = SemanticSceneDetector(**self.config.get_params(SEMANTIC_SCENE_DETECTOR))
            return (
                scene_detector.detect_streaming(source_path)
                if source_path is not None
                else scene_detector.detect(_require_video(video))
            )
        except Exception:
            logger.warning("SemanticSceneDetector failed, using default scene boundaries", exc_info=True)
            return None

    def _run_whisper_and_scene_detection(
        self, *, source_path: Path | None, video: Video | None
    ) -> tuple[Transcription | None, list[SceneBoundary] | None]:
        with ThreadPoolExecutor(max_workers=2) as pool:
            t0 = time.perf_counter()
            whisper_future = pool.submit(self._run_whisper, source_path=source_path, video=video)
            scene_future = pool.submit(self._run_scene_detection, source_path=source_path, video=video)

            transcription = whisper_future.result()
            detected = scene_future.result()
            elapsed = time.perf_counter() - t0
            logger.info("Whisper + scene detection (parallel) completed in %.2fs", elapsed)

        return transcription, detected

    @staticmethod
    def _reset_transnetv2_torch_state() -> None:
        """Reset global torch state that TransNetV2 sets during init.

        TransNetV2 sets torch.use_deterministic_algorithms(True) and
        cudnn.benchmark=False globally. Reset to defaults so subsequent
        models (especially SceneVLM's convolution-heavy ViT) can use cuDNN
        autotuner and non-deterministic kernels for better throughput.
        """
        try:
            import torch

            torch.use_deterministic_algorithms(False)
            if torch.backends.cudnn.is_available():
                torch.backends.cudnn.deterministic = False
                torch.backends.cudnn.benchmark = True
        except ImportError:
            pass

    @staticmethod
    def _release_gpu_cache() -> None:
        """Force-release unused GPU memory back to the CUDA allocator."""
        try:
            import torch

            if torch.cuda.is_available():
                torch.cuda.empty_cache()
        except ImportError:
            pass

    def _analyze_scenes(
        self,
        *,
        source_path: Path | None,
        video: Video | None,
        metadata: VideoMetadata,
        scenes: list[SceneBoundary],
        preloaded_scene_vlm: SceneVLM | None = None,
    ) -> SceneAnalysisSection:
        enabled = self.config.enabled_analyzers

        scene_vlm: SceneVLM | None
        if preloaded_scene_vlm is not None:
            scene_vlm = preloaded_scene_vlm
        else:
            try:
                scene_vlm = SceneVLM(**self.config.get_params(SCENE_VLM)) if SCENE_VLM in enabled else None
            except Exception:
                logger.warning("Failed to initialize SceneVLM, skipping visual understanding", exc_info=True)
                scene_vlm = None

        try:
            audio_classifier = (
                AudioClassifier(**self.config.get_params(AUDIO_CLASSIFIER)) if AUDIO_CLASSIFIER in enabled else None
            )
        except Exception:
            logger.warning("Failed to initialize AudioClassifier, skipping audio classification", exc_info=True)
            audio_classifier = None

        path_audio: Audio | None = None
        if audio_classifier is not None and source_path is not None:
            try:
                path_audio = Audio.from_path(source_path)
            except Exception:
                logger.warning(
                    "Failed to load audio from path, audio classification will use clip fallback", exc_info=True
                )
                path_audio = None

        # -- Batched SceneVLM: collect all timestamps, extract frames once, run one forward pass --
        captions: list[str | None] = [None] * len(scenes)
        if scene_vlm is not None:
            try:
                captions = self._run_scene_vlm_batched(
                    scene_vlm=scene_vlm,
                    source_path=source_path,
                    video=video,
                    metadata=metadata,
                    scenes=scenes,
                )
            except Exception:
                logger.warning("Batched SceneVLM failed, skipping visual understanding", exc_info=True)

        samples: list[SceneAnalysisSample] = []
        t_audio_total = 0.0
        for index, scene in enumerate(scenes):
            sample = SceneAnalysisSample(
                scene_index=index,
                start_second=float(scene.start),
                end_second=float(scene.end),
                start_frame=int(scene.start_frame),
                end_frame=int(scene.end_frame),
                caption=captions[index],
            )

            if audio_classifier is not None:
                t0 = time.perf_counter()
                try:
                    scene_clip: Video | None = None
                    if path_audio is None:
                        try:
                            scene_clip = self._load_scene_video_clip(
                                source_path=source_path,
                                video=video,
                                start_second=scene.start,
                                end_second=scene.end,
                            )
                        except Exception:
                            scene_clip = None
                    sample.audio_classification = self._run_scene_audio_classification(
                        audio_classifier=audio_classifier,
                        path_audio=path_audio,
                        scene_clip=scene_clip,
                        scene_start=scene.start,
                        scene_end=scene.end,
                    )
                except Exception:
                    logger.warning(
                        "AudioClassifier failed for scene %d (%.1f-%.1fs)", index, scene.start, scene.end, exc_info=True
                    )
                t_audio_total += time.perf_counter() - t0

            samples.append(sample)

        if audio_classifier is not None:
            logger.info("AudioClassifier inference total: %.2fs across %d scenes", t_audio_total, len(scenes))

        return SceneAnalysisSection(samples=samples)

    def _run_scene_vlm_batched(
        self,
        *,
        scene_vlm: SceneVLM,
        source_path: Path | None,
        video: Video | None,
        metadata: VideoMetadata,
        scenes: list[SceneBoundary],
    ) -> list[str | None]:
        """Extract frames for all scenes in one ffmpeg call, then caption each group.

        Adjacent short scenes (< _SCENE_VLM_GROUP_THRESHOLD seconds) are merged
        into a single VLM call to reduce per-call overhead.
        """
        # Group adjacent short scenes to reduce VLM call count.
        # Each group is a list of scene indices that share one VLM call.
        groups: list[list[int]] = []
        current_group: list[int] = []
        current_group_duration = 0.0
        for i, scene in enumerate(scenes):
            dur = max(0.0, scene.end - scene.start)
            if current_group and current_group_duration + dur > _SCENE_VLM_GROUP_THRESHOLD:
                groups.append(current_group)
                current_group = [i]
                current_group_duration = dur
            else:
                current_group.append(i)
                current_group_duration += dur
        if current_group:
            groups.append(current_group)

        # Compute timestamps for each group (treating merged scenes as one span)
        group_timestamps: list[list[float]] = []
        all_timestamps: list[float] = []
        for group in groups:
            span_start = scenes[group[0]].start
            span_end = scenes[group[-1]].end
            duration = max(0.0, span_end - span_start)
            frame_count = min(
                _SCENE_VLM_MAX_FRAMES,
                max(1, math.ceil(_SCENE_VLM_FRAME_SCALE * math.log(duration / _SCENE_VLM_FRAME_BASE + 1))),
            )
            timestamps = self._sample_timestamps(start_second=span_start, end_second=span_end, frame_count=frame_count)
            group_timestamps.append(timestamps)
            all_timestamps.extend(timestamps)

        if not all_timestamps:
            return [None] * len(scenes)

        # Extract all frames in a single ffmpeg call
        if source_path is not None:
            all_frames_array = extract_frames_at_times(source_path, all_timestamps)
            all_frames: list[np.ndarray | Image.Image] = list(all_frames_array)
        else:
            current_video = _require_video(video)
            max_frame = max(len(current_video.frames) - 1, 0)
            indices = [max(0, min(max_frame, int(ts * metadata.fps))) for ts in all_timestamps]
            all_frames = [current_video.frames[idx] for idx in indices]

        # Caption each group and assign to all scenes in that group
        captions: list[str | None] = [None] * len(scenes)
        offset = 0
        for group, timestamps in zip(groups, group_timestamps):
            frame_count = len(timestamps)
            group_frames = all_frames[offset : offset + frame_count]
            offset += frame_count
            if not group_frames:
                continue
            caption: str | None = None
            try:
                caption = scene_vlm.analyze_scene(group_frames) or None
            except Exception:
                logger.warning(
                    "SceneVLM failed for scenes %d-%d (%.1f-%.1fs)",
                    group[0],
                    group[-1],
                    scenes[group[0]].start,
                    scenes[group[-1]].end,
                    exc_info=True,
                )
                caption = None
            for i in group:
                captions[i] = caption
        logger.info("SceneVLM: %d groups from %d scenes", len(groups), len(scenes))
        return captions

    def _run_scene_audio_classification(
        self,
        *,
        audio_classifier: AudioClassifier,
        path_audio: Audio | None,
        scene_clip: Video | None,
        scene_start: float,
        scene_end: float,
    ) -> AudioClassification | None:
        if scene_end <= scene_start:
            return None

        if path_audio is not None:
            scene_media: Audio | Video = path_audio.slice(start_seconds=scene_start, end_seconds=scene_end)
        elif scene_clip is not None:
            scene_media = scene_clip
        else:
            return None

        classification = audio_classifier.classify(scene_media)
        offset_events = [
            AudioEvent(
                start=scene_start + event.start,
                end=scene_start + event.end,
                label=event.label,
                confidence=event.confidence,
            )
            for event in classification.events
        ]
        return AudioClassification(events=offset_events, clip_predictions=classification.clip_predictions)

    @staticmethod
    def _sample_timestamps(*, start_second: float, end_second: float, frame_count: int) -> list[float]:
        duration = max(0.0, end_second - start_second)
        if duration <= 0.0:
            return []

        count = max(1, frame_count)
        if count == 1:
            return [start_second + (duration * 0.5)]

        # Center samples inside the interval so we avoid exact boundaries.
        step = duration / float(count + 1)
        timestamps = [start_second + (step * (idx + 1)) for idx in range(count)]
        epsilon = 1e-3
        return [min(end_second - epsilon, max(start_second + epsilon, ts)) for ts in timestamps]

    def _load_scene_video_clip(
        self,
        *,
        source_path: Path | None,
        video: Video | None,
        start_second: float,
        end_second: float,
    ) -> Video | None:
        if end_second <= start_second:
            return None
        if source_path is not None:
            return Video.from_path(str(source_path), start_second=start_second, end_second=end_second)
        return _require_video(video).cut(start_second, end_second)

    def _default_scene_boundaries(self, metadata: VideoMetadata) -> list[SceneBoundary]:
        if metadata.total_seconds <= 0 or metadata.frame_count <= 0:
            return []
        return [
            SceneBoundary(
                start=0.0,
                end=float(metadata.total_seconds),
                start_frame=0,
                end_frame=int(metadata.frame_count),
            )
        ]

    def _normalize_scene_boundaries(self, scenes: list[SceneBoundary], metadata: VideoMetadata) -> list[SceneBoundary]:
        normalized: list[SceneBoundary] = []
        max_time = float(metadata.total_seconds)
        max_frame = int(metadata.frame_count)

        for item in scenes:
            start = max(0.0, min(max_time, float(item.start)))
            end = max(0.0, min(max_time, float(item.end)))
            if end <= start:
                continue

            start_frame = int(item.start_frame)
            end_frame = int(item.end_frame)
            start_frame = max(0, min(max_frame, start_frame))
            end_frame = max(0, min(max_frame, end_frame))
            if end_frame <= start_frame:
                start_frame = int(round(start * metadata.fps))
                end_frame = max(start_frame + 1, int(round(end * metadata.fps)))
                start_frame = max(0, min(max_frame, start_frame))
                end_frame = max(0, min(max_frame, end_frame))
                if end_frame <= start_frame:
                    continue

            normalized.append(
                SceneBoundary(
                    start=round(start, 6),
                    end=round(end, 6),
                    start_frame=start_frame,
                    end_frame=end_frame,
                )
            )

        normalized.sort(key=lambda scene: (scene.start, scene.end))
        return normalized

    def _build_source(
        self,
        *,
        metadata: VideoMetadata,
        path_obj: Path | None,
        duration_seconds: float,
        title_fallback: str | None,
    ) -> VideoAnalysisSource:
        tags = self._extract_source_tags(path_obj) if path_obj else {}
        creation_time = _normalize_creation_time(
            next((tags[key] for key in _CREATION_TIME_TAG_KEYS if key in tags), None)
        )
        geo = _parse_geo_metadata(tags)
        title = tags.get("title") or title_fallback

        return VideoAnalysisSource(
            title=title,
            path=str(path_obj) if path_obj else None,
            filename=path_obj.name if path_obj else None,
            duration=duration_seconds,
            fps=metadata.fps,
            width=metadata.width,
            height=metadata.height,
            frame_count=metadata.frame_count,
            creation_time=creation_time,
            geo=geo,
            raw_tags=tags or None,
        )

    def _extract_source_tags(self, path: Path | None) -> dict[str, str]:
        if path is None:
            return {}

        cmd = [
            "ffprobe",
            "-v",
            "error",
            "-show_entries",
            "format_tags:stream_tags",
            "-of",
            "json",
            str(path),
        ]

        try:
            result = subprocess.run(cmd, capture_output=True, text=True, check=True)
            payload = json.loads(result.stdout)
        except Exception:
            return {}

        tags: dict[str, str] = {}

        format_tags = payload.get("format", {}).get("tags", {})
        if isinstance(format_tags, dict):
            tags.update({str(k).lower(): str(v) for k, v in format_tags.items()})

        for stream in payload.get("streams", []):
            stream_tags = stream.get("tags", {})
            if not isinstance(stream_tags, dict):
                continue
            for key, value in stream_tags.items():
                lowered = str(key).lower()
                tags.setdefault(lowered, str(value))

        return tags

analyze_path

analyze_path(path: str | Path) -> VideoAnalysis

Analyze a video path in scene-first mode.

Source code in src/videopython/ai/video_analysis.py
def analyze_path(self, path: str | Path) -> VideoAnalysis:
    """Analyze a video path in scene-first mode."""
    path_obj = Path(path)
    metadata = VideoMetadata.from_path(path_obj)
    source = self._build_source(
        metadata=metadata,
        path_obj=path_obj,
        duration_seconds=metadata.total_seconds,
        title_fallback=path_obj.stem,
    )
    return self._analyze(video=None, source_path=path_obj, metadata=metadata, source=source)

analyze

analyze(
    video: Video, *, source_path: str | Path | None = None
) -> VideoAnalysis

Analyze an in-memory Video object.

Source code in src/videopython/ai/video_analysis.py
def analyze(self, video: Video, *, source_path: str | Path | None = None) -> VideoAnalysis:
    """Analyze an in-memory `Video` object."""
    path_obj = Path(source_path) if source_path else None
    metadata = VideoMetadata.from_video(video)
    source = self._build_source(
        metadata=metadata,
        path_obj=path_obj,
        duration_seconds=video.total_seconds,
        title_fallback=path_obj.stem if path_obj is not None else None,
    )
    return self._analyze(
        video=video,
        source_path=path_obj,
        metadata=metadata,
        source=source,
    )

VideoAnalysis dataclass

Serializable aggregate scene-first analysis result for one video.

Source code in src/videopython/ai/video_analysis.py
@dataclass
class VideoAnalysis:
    """Serializable aggregate scene-first analysis result for one video."""

    source: VideoAnalysisSource
    config: VideoAnalysisConfig
    run_info: AnalysisRunInfo
    audio: AudioAnalysisSection | None = None
    scenes: SceneAnalysisSection | None = None

    def to_dict(self) -> dict[str, Any]:
        return {
            "source": self.source.to_dict(),
            "config": self.config.to_dict(),
            "run_info": self.run_info.to_dict(),
            "audio": self.audio.to_dict() if self.audio else None,
            "scenes": self.scenes.to_dict() if self.scenes else None,
        }

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> "VideoAnalysis":
        audio_data = data.get("audio")
        scenes_data = data.get("scenes")
        return cls(
            source=VideoAnalysisSource.from_dict(data["source"]),
            config=VideoAnalysisConfig.from_dict(data["config"]),
            run_info=AnalysisRunInfo.from_dict(data["run_info"]),
            audio=AudioAnalysisSection.from_dict(audio_data) if audio_data else None,
            scenes=SceneAnalysisSection.from_dict(scenes_data) if scenes_data else None,
        )

    def to_json(self, *, indent: int | None = 2) -> str:
        return json.dumps(self.to_dict(), ensure_ascii=False, indent=indent)

    @classmethod
    def from_json(cls, text: str) -> "VideoAnalysis":
        return cls.from_dict(json.loads(text))

    def save(self, path: str | Path, *, indent: int | None = 2) -> None:
        path_obj = Path(path)
        path_obj.parent.mkdir(parents=True, exist_ok=True)
        path_obj.write_text(self.to_json(indent=indent), encoding="utf-8")

    @classmethod
    def load(cls, path: str | Path) -> "VideoAnalysis":
        return cls.from_json(Path(path).read_text(encoding="utf-8"))