Skip to content

Video Analysis

Create a single, serializable analysis object that aggregates multiple AI understanding results.

Overview

VideoAnalyzer orchestrates audio, temporal, motion, and frame analyzers and returns VideoAnalysis.

VideoAnalysis can be serialized with:

  • to_dict() / from_dict()
  • to_json() / from_json()
  • save() / load()

The path-based flow (analyze_path) is designed for bounded frame memory usage by preferring streaming/chunked frame access.

Basic Usage

from videopython.ai import VideoAnalyzer

analyzer = VideoAnalyzer()
analysis = analyzer.analyze_path("video.mp4")

print(analysis.source.title)
print(analysis.summary)

# Persist results
analysis.save("video_analysis.json")

# Load later
loaded = analysis.load("video_analysis.json")
print(loaded.run_info.mode)

Configure Analysis

from videopython.ai import VideoAnalysisConfig, VideoAnalyzer

config = VideoAnalysisConfig(
    frame_sampling_mode="hybrid",
    frames_per_second=1.0,
    max_frames=240,
    max_memory_mb=512,  # Optional memory budget for sampled frames
    frame_chunk_size=24,
    action_scope="adaptive",  # "video" | "scene" | "adaptive"
    max_action_scenes=16,
    best_effort=True,
)

analyzer = VideoAnalyzer(config=config)
analysis = analyzer.analyze_path("video.mp4")

Rich Understanding Preset

Use the built-in preset when you want broad understanding coverage across many video types:

from videopython.ai import VideoAnalysisConfig, VideoAnalyzer

config = VideoAnalysisConfig.rich_understanding_preset()
analysis = VideoAnalyzer(config=config).analyze_path("video.mp4")

The preset enables all analyzers and keeps resource usage bounded with adaptive defaults.

Notes on New Output Fields

  • FrameSamplingReport.effective_max_frames shows the effective cap after applying max_frames and optional max_memory_mb.
  • FrameAnalysisSample.text_regions contains structured OCR detections (text, confidence, bounding_box) in addition to the existing plain text list.
  • summary now includes richer aggregate signals such as top actions/objects, OCR term frequencies, face presence ratio, and motion distributions when available.

Classes

VideoAnalysisConfig dataclass

Serializable execution plan for VideoAnalyzer.

Source code in src/videopython/ai/video_analysis.py
@dataclass
class VideoAnalysisConfig:
    """Serializable execution plan for VideoAnalyzer."""

    enabled_analyzers: set[str] = field(
        default_factory=lambda: {
            AUDIO_TO_TEXT,
            AUDIO_CLASSIFIER,
            SEMANTIC_SCENE_DETECTOR,
            ACTION_RECOGNIZER,
            MOTION_ANALYZER,
            CAMERA_MOTION_DETECTOR,
            OBJECT_DETECTOR,
            FACE_DETECTOR,
            TEXT_DETECTOR,
        }
    )
    optional_analyzers: set[str] = field(default_factory=set)
    analyzer_params: dict[str, dict[str, Any]] = field(default_factory=dict)
    frame_sampling_mode: str = "hybrid"
    frames_per_second: float = 0.5
    max_frames: int | None = 120
    include_scene_boundaries: bool = True
    scene_representative_offset: float = 0.5
    camera_motion_stride: int = 1
    frame_chunk_size: int = 16
    max_memory_mb: int | None = None
    best_effort: bool = True
    fail_fast: bool = False
    include_geo: bool = True
    redact_geo: bool = False
    action_scope: str = "adaptive"
    max_action_scenes: int | None = 12

    def __post_init__(self) -> None:
        unknown_enabled = sorted(set(self.enabled_analyzers) - set(ALL_ANALYZER_IDS))
        if unknown_enabled:
            raise ValueError(f"Unknown analyzer ids in enabled_analyzers: {unknown_enabled}")

        unknown_optional = sorted(set(self.optional_analyzers) - set(ALL_ANALYZER_IDS))
        if unknown_optional:
            raise ValueError(f"Unknown analyzer ids in optional_analyzers: {unknown_optional}")

        if self.frame_sampling_mode not in {"uniform", "scene_boundary", "scene_representative", "hybrid"}:
            raise ValueError(
                "frame_sampling_mode must be one of: uniform, scene_boundary, scene_representative, hybrid"
            )
        if self.frames_per_second < 0:
            raise ValueError("frames_per_second must be >= 0")
        if self.max_frames is not None and self.max_frames < 1:
            raise ValueError("max_frames must be >= 1 or None")
        if not 0.0 <= self.scene_representative_offset <= 1.0:
            raise ValueError("scene_representative_offset must be between 0.0 and 1.0")
        if self.camera_motion_stride < 1:
            raise ValueError("camera_motion_stride must be >= 1")
        if self.frame_chunk_size < 1:
            raise ValueError("frame_chunk_size must be >= 1")
        if self.max_memory_mb is not None and self.max_memory_mb < 1:
            raise ValueError("max_memory_mb must be >= 1 or None")
        if self.action_scope not in {"video", "scene", "adaptive"}:
            raise ValueError("action_scope must be one of: video, scene, adaptive")
        if self.max_action_scenes is not None and self.max_action_scenes < 1:
            raise ValueError("max_action_scenes must be >= 1 or None")

    @classmethod
    def rich_understanding_preset(cls) -> "VideoAnalysisConfig":
        """High-coverage preset for richer cross-domain video understanding."""
        return cls(
            enabled_analyzers=set(ALL_ANALYZER_IDS),
            optional_analyzers={IMAGE_TO_TEXT, TEXT_DETECTOR, ACTION_RECOGNIZER},
            frame_sampling_mode="hybrid",
            frames_per_second=1.0,
            max_frames=240,
            include_scene_boundaries=True,
            scene_representative_offset=0.5,
            camera_motion_stride=2,
            frame_chunk_size=24,
            best_effort=True,
            fail_fast=False,
            action_scope="adaptive",
            max_action_scenes=16,
        )

    def to_dict(self) -> dict[str, Any]:
        return {
            "enabled_analyzers": sorted(self.enabled_analyzers),
            "optional_analyzers": sorted(self.optional_analyzers),
            "analyzer_params": self.analyzer_params,
            "frame_sampling_mode": self.frame_sampling_mode,
            "frames_per_second": self.frames_per_second,
            "max_frames": self.max_frames,
            "include_scene_boundaries": self.include_scene_boundaries,
            "scene_representative_offset": self.scene_representative_offset,
            "camera_motion_stride": self.camera_motion_stride,
            "frame_chunk_size": self.frame_chunk_size,
            "max_memory_mb": self.max_memory_mb,
            "best_effort": self.best_effort,
            "fail_fast": self.fail_fast,
            "include_geo": self.include_geo,
            "redact_geo": self.redact_geo,
            "action_scope": self.action_scope,
            "max_action_scenes": self.max_action_scenes,
        }

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> "VideoAnalysisConfig":
        defaults = cls()
        enabled_raw = data.get("enabled_analyzers")
        optional_raw = data.get("optional_analyzers")
        return cls(
            enabled_analyzers=set(enabled_raw) if enabled_raw is not None else defaults.enabled_analyzers,
            optional_analyzers=set(optional_raw) if optional_raw is not None else defaults.optional_analyzers,
            analyzer_params=data.get("analyzer_params", {}),
            frame_sampling_mode=data.get("frame_sampling_mode", "hybrid"),
            frames_per_second=float(data.get("frames_per_second", 0.5)),
            max_frames=data.get("max_frames", 120),
            include_scene_boundaries=bool(data.get("include_scene_boundaries", True)),
            scene_representative_offset=float(data.get("scene_representative_offset", 0.5)),
            camera_motion_stride=int(data.get("camera_motion_stride", 1)),
            frame_chunk_size=int(data.get("frame_chunk_size", 16)),
            max_memory_mb=data.get("max_memory_mb"),
            best_effort=bool(data.get("best_effort", True)),
            fail_fast=bool(data.get("fail_fast", False)),
            include_geo=bool(data.get("include_geo", True)),
            redact_geo=bool(data.get("redact_geo", False)),
            action_scope=data.get("action_scope", "adaptive"),
            max_action_scenes=data.get("max_action_scenes", 12),
        )

rich_understanding_preset classmethod

rich_understanding_preset() -> 'VideoAnalysisConfig'

High-coverage preset for richer cross-domain video understanding.

Source code in src/videopython/ai/video_analysis.py
@classmethod
def rich_understanding_preset(cls) -> "VideoAnalysisConfig":
    """High-coverage preset for richer cross-domain video understanding."""
    return cls(
        enabled_analyzers=set(ALL_ANALYZER_IDS),
        optional_analyzers={IMAGE_TO_TEXT, TEXT_DETECTOR, ACTION_RECOGNIZER},
        frame_sampling_mode="hybrid",
        frames_per_second=1.0,
        max_frames=240,
        include_scene_boundaries=True,
        scene_representative_offset=0.5,
        camera_motion_stride=2,
        frame_chunk_size=24,
        best_effort=True,
        fail_fast=False,
        action_scope="adaptive",
        max_action_scenes=16,
    )

VideoAnalyzer

Orchestrates understanding analyzers and builds VideoAnalysis output.

Source code in src/videopython/ai/video_analysis.py
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
class VideoAnalyzer:
    """Orchestrates understanding analyzers and builds `VideoAnalysis` output."""

    def __init__(self, config: VideoAnalysisConfig | None = None):
        self.config = config or VideoAnalysisConfig()

    def analyze_path(self, path: str | Path) -> VideoAnalysis:
        """Analyze a video path with bounded frame memory usage."""
        path_obj = Path(path)
        metadata = VideoMetadata.from_path(path_obj)
        source = self._build_source_from_path(path_obj, metadata)
        return self._analyze(video=None, source_path=path_obj, metadata=metadata, source=source)

    def analyze(self, video: Video, *, source_path: str | Path | None = None) -> VideoAnalysis:
        """Analyze an in-memory Video object."""
        metadata = VideoMetadata.from_video(video)
        source = self._build_source_from_video(video=video, source_path=source_path, metadata=metadata)
        return self._analyze(
            video=video,
            source_path=Path(source_path) if source_path else None,
            metadata=metadata,
            source=source,
        )

    def _analyze(
        self,
        *,
        video: Video | None,
        source_path: Path | None,
        metadata: VideoMetadata,
        source: VideoAnalysisSource,
    ) -> VideoAnalysis:
        mode = "path" if source_path is not None else "video"
        if source_path is None and video is None:
            raise ValueError("Either `source_path` or `video` must be provided")
        started = time.perf_counter()
        steps: dict[str, AnalysisStepStatus] = {}

        for analyzer_id in ALL_ANALYZER_IDS:
            if analyzer_id not in self.config.enabled_analyzers:
                steps[analyzer_id] = AnalysisStepStatus(status="skipped", warning="Disabled in config")

        run_info = AnalysisRunInfo(
            created_at=_utc_now_iso(),
            mode=mode,
            library_version=_library_version(),
            elapsed_seconds=None,
        )

        audio_section = AudioAnalysisSection()
        temporal_section = TemporalAnalysisSection()
        motion_section = MotionAnalysisSection()

        audio_cache: Audio | None = None

        def get_path_audio() -> Audio:
            nonlocal audio_cache
            if audio_cache is None:
                if source_path is None:
                    raise RuntimeError("Path audio requested for in-memory analysis without source path")
                audio_cache = Audio.from_path(source_path)
            return audio_cache

        if AUDIO_TO_TEXT in self.config.enabled_analyzers:
            audio_input: Audio | Video
            if source_path is not None:
                audio_input = get_path_audio()
            else:
                assert video is not None
                audio_input = video
            transcription = self._run_step(
                steps,
                AUDIO_TO_TEXT,
                lambda: AudioToText(**self._analyzer_kwargs(AUDIO_TO_TEXT)).transcribe(audio_input),
                optional=AUDIO_TO_TEXT in self.config.optional_analyzers,
            )
            if transcription is not None:
                audio_section.transcription = transcription

        if AUDIO_CLASSIFIER in self.config.enabled_analyzers:
            classifier_input: Audio | Video
            if source_path is not None:
                classifier_input = get_path_audio()
            else:
                assert video is not None
                classifier_input = video
            classification = self._run_step(
                steps,
                AUDIO_CLASSIFIER,
                lambda: AudioClassifier(**self._analyzer_kwargs(AUDIO_CLASSIFIER)).classify(classifier_input),
                optional=AUDIO_CLASSIFIER in self.config.optional_analyzers,
            )
            if classification is not None:
                audio_section.classification = classification

        scenes: list[SceneBoundary] = []
        if SEMANTIC_SCENE_DETECTOR in self.config.enabled_analyzers:
            scene_detector = SemanticSceneDetector(**self._analyzer_kwargs(SEMANTIC_SCENE_DETECTOR))
            scenes_result = self._run_step(
                steps,
                SEMANTIC_SCENE_DETECTOR,
                lambda: scene_detector.detect_streaming(source_path)
                if source_path is not None
                else scene_detector.detect(_require_video(video)),
                optional=SEMANTIC_SCENE_DETECTOR in self.config.optional_analyzers,
            )
            if scenes_result is not None:
                scenes = scenes_result
                temporal_section.scenes = scenes_result

        if ACTION_RECOGNIZER in self.config.enabled_analyzers:
            action_recognizer = ActionRecognizer(**self._analyzer_kwargs(ACTION_RECOGNIZER))
            actions_result = self._run_step(
                steps,
                ACTION_RECOGNIZER,
                lambda: self._run_action_recognition(
                    action_recognizer=action_recognizer,
                    source_path=source_path,
                    video=video,
                    scenes=scenes,
                ),
                optional=ACTION_RECOGNIZER in self.config.optional_analyzers,
            )
            if actions_result is not None:
                temporal_section.actions = actions_result

        if MOTION_ANALYZER in self.config.enabled_analyzers:
            motion_analyzer = MotionAnalyzer(**self._analyzer_kwargs(MOTION_ANALYZER))
            if source_path is not None:
                motion_timeline = self._run_step(
                    steps,
                    MOTION_ANALYZER,
                    lambda: motion_analyzer.analyze_video_path(
                        source_path,
                        frames_per_second=max(float(self.config.frames_per_second), 0.1),
                    ),
                    optional=MOTION_ANALYZER in self.config.optional_analyzers,
                )
                if motion_timeline is not None:
                    motion_section.motion_timeline = [
                        MotionTimelineSample(timestamp=float(ts), motion=motion) for ts, motion in motion_timeline
                    ]
                    motion_section.video_motion = [sample.motion for sample in motion_section.motion_timeline]
            else:
                motion_result = self._run_step(
                    steps,
                    MOTION_ANALYZER,
                    lambda: motion_analyzer.analyze_video(_require_video(video)),
                    optional=MOTION_ANALYZER in self.config.optional_analyzers,
                )
                if motion_result is not None:
                    motion_section.video_motion = motion_result

        effective_max_frames = self._effective_max_frames(metadata)
        frame_indices = self._plan_frame_indices(
            metadata=metadata,
            scenes=scenes,
            effective_max_frames=effective_max_frames,
        )
        sampling = FrameSamplingReport(
            mode=self.config.frame_sampling_mode,
            frames_per_second=self.config.frames_per_second,
            max_frames=self.config.max_frames,
            sampled_indices=frame_indices,
            sampled_timestamps=[round(idx / metadata.fps, 6) for idx in frame_indices],
            access_mode=None,
            effective_max_frames=effective_max_frames,
        )

        frame_steps_runtime: dict[str, dict[str, Any]] = {}
        camera_samples: list[CameraMotionSample] = []
        frame_samples: list[FrameAnalysisSample] = []
        frame_work_ids = (
            OBJECT_DETECTOR,
            FACE_DETECTOR,
            TEXT_DETECTOR,
            IMAGE_TO_TEXT,
            CAMERA_MOTION_DETECTOR,
        )
        frame_work_enabled = any(step_id in self.config.enabled_analyzers for step_id in frame_work_ids)

        if frame_indices and frame_work_enabled:
            frame_steps_runtime = self._initialize_frame_steps(steps)
            if frame_steps_runtime:
                access_mode = self._choose_frame_access_mode(len(frame_indices), metadata.frame_count)
                sampling.access_mode = access_mode
                if source_path is not None:
                    frame_samples, camera_samples = self._process_path_samples(
                        path=source_path,
                        metadata=metadata,
                        frame_indices=frame_indices,
                        frame_steps_runtime=frame_steps_runtime,
                        steps=steps,
                    )
                else:
                    frame_samples, camera_samples = self._process_video_samples(
                        video=video,
                        frame_indices=frame_indices,
                        frame_steps_runtime=frame_steps_runtime,
                        steps=steps,
                    )
                self._finalize_frame_steps(frame_steps_runtime=frame_steps_runtime, steps=steps)
        elif frame_work_enabled:
            for step_id in frame_work_ids:
                if step_id in self.config.enabled_analyzers and step_id not in steps:
                    steps[step_id] = AnalysisStepStatus(status="skipped", warning="No frames sampled")

        if camera_samples:
            motion_section.camera_motion_samples = camera_samples

        frames_section = FrameAnalysisSection(sampling=sampling, samples=frame_samples)

        run_info.elapsed_seconds = time.perf_counter() - started

        summary = self._build_summary(
            temporal_section=temporal_section,
            audio_section=audio_section,
            motion_section=motion_section,
            frame_samples=frame_samples,
        )

        return VideoAnalysis(
            source=source,
            config=self.config,
            run_info=run_info,
            steps=steps,
            audio=audio_section if (audio_section.transcription or audio_section.classification) else None,
            temporal=temporal_section if (temporal_section.scenes or temporal_section.actions) else None,
            motion=motion_section
            if (motion_section.video_motion or motion_section.motion_timeline or motion_section.camera_motion_samples)
            else None,
            frames=frames_section,
            summary=summary,
        )

    def _run_step(
        self,
        steps: dict[str, AnalysisStepStatus],
        step_id: str,
        func: Any,
        *,
        optional: bool,
    ) -> Any:
        started = time.perf_counter()
        try:
            result = func()
        except Exception as exc:
            duration = time.perf_counter() - started
            status = "skipped" if optional else "failed"
            steps[step_id] = AnalysisStepStatus(status=status, duration_seconds=duration, error=str(exc))
            if self._should_raise(optional=optional):
                raise
            return None

        steps[step_id] = AnalysisStepStatus(status="succeeded", duration_seconds=time.perf_counter() - started)
        return result

    def _should_raise(self, *, optional: bool) -> bool:
        if optional:
            return False
        return self.config.fail_fast or (not self.config.best_effort)

    def _analyzer_kwargs(self, analyzer_id: str) -> dict[str, Any]:
        return dict(self.config.analyzer_params.get(analyzer_id, {}))

    def _run_action_recognition(
        self,
        *,
        action_recognizer: ActionRecognizer,
        source_path: Path | None,
        video: Video | None,
        scenes: list[SceneBoundary],
    ) -> list[DetectedAction]:
        def analyze_full_video() -> list[DetectedAction]:
            if source_path is not None:
                return action_recognizer.recognize_path(source_path)
            return action_recognizer.recognize(_require_video(video))

        if self.config.action_scope == "video":
            return analyze_full_video()

        use_scene_scope = False
        if self.config.action_scope == "scene":
            use_scene_scope = bool(scenes)
        elif self.config.action_scope == "adaptive":
            use_scene_scope = bool(scenes) and (
                self.config.max_action_scenes is None or len(scenes) <= self.config.max_action_scenes
            )

        if not use_scene_scope:
            return analyze_full_video()

        selected_scenes = self._select_action_scenes(scenes)
        if not selected_scenes:
            return analyze_full_video()

        if source_path is not None:
            actions: list[DetectedAction] = []
            for scene in selected_scenes:
                actions.extend(
                    action_recognizer.recognize_path(
                        source_path,
                        start_second=scene.start,
                        end_second=scene.end,
                    )
                )
            return actions

        current_video = _require_video(video)
        return self._recognize_actions_on_video_scenes(
            action_recognizer=action_recognizer,
            video=current_video,
            scenes=selected_scenes,
        )

    def _select_action_scenes(self, scenes: list[SceneBoundary]) -> list[SceneBoundary]:
        selected = [scene for scene in scenes if scene.end > scene.start and scene.end_frame > scene.start_frame]
        max_action_scenes = self.config.max_action_scenes
        if max_action_scenes is not None and len(selected) > max_action_scenes:
            picks = np.linspace(0, len(selected) - 1, max_action_scenes, dtype=int)
            selected = [selected[i] for i in picks]
        return selected

    def _recognize_actions_on_video_scenes(
        self,
        *,
        action_recognizer: ActionRecognizer,
        video: Video,
        scenes: list[SceneBoundary],
    ) -> list[DetectedAction]:
        actions: list[DetectedAction] = []
        frame_count = len(video.frames)
        if frame_count <= 0:
            return actions

        for scene in scenes:
            start_frame = max(0, min(frame_count - 1, int(scene.start_frame)))
            end_frame = max(start_frame + 1, min(frame_count, int(scene.end_frame)))
            clip = Video.from_frames(video.frames[start_frame:end_frame], video.fps)
            clip_actions = action_recognizer.recognize(clip)
            for action in clip_actions:
                action.start_frame = (
                    start_frame if action.start_frame is None else min(frame_count, start_frame + action.start_frame)
                )
                action.end_frame = (
                    end_frame if action.end_frame is None else min(frame_count, start_frame + action.end_frame)
                )
                action.start_time = scene.start if action.start_time is None else scene.start + action.start_time
                action.end_time = (
                    scene.end if action.end_time is None else min(video.total_seconds, scene.start + action.end_time)
                )
            actions.extend(clip_actions)

        return actions

    def _build_summary(
        self,
        *,
        temporal_section: TemporalAnalysisSection,
        audio_section: AudioAnalysisSection,
        motion_section: MotionAnalysisSection,
        frame_samples: list[FrameAnalysisSample],
    ) -> dict[str, Any]:
        summary: dict[str, Any] = {
            "scene_count": len(temporal_section.scenes),
            "action_count": len(temporal_section.actions),
            "frame_sample_count": len(frame_samples),
            "audio_events_count": len(audio_section.classification.events) if audio_section.classification else 0,
        }

        if temporal_section.actions:
            action_counts = Counter(action.label for action in temporal_section.actions if action.label)
            action_conf = {
                label: max(a.confidence for a in temporal_section.actions if a.label == label)
                for label in action_counts
            }
            summary["top_actions"] = [
                {
                    "label": label,
                    "count": count,
                    "max_confidence": round(float(action_conf[label]), 4),
                }
                for label, count in action_counts.most_common(5)
            ]

        if frame_samples:
            face_present_count = sum(1 for sample in frame_samples if sample.faces)
            summary["face_presence_ratio"] = round(face_present_count / len(frame_samples), 4)

            object_labels: list[tuple[str, float]] = []
            for sample in frame_samples:
                for obj in sample.objects or []:
                    object_labels.append((obj.label, float(obj.confidence)))

            if object_labels:
                object_counts = Counter(label for label, _ in object_labels)
                object_conf: dict[str, float] = {}
                for label, conf in object_labels:
                    object_conf[label] = max(object_conf.get(label, 0.0), conf)
                summary["top_objects"] = [
                    {
                        "label": label,
                        "count": count,
                        "max_confidence": round(object_conf[label], 4),
                    }
                    for label, count in object_counts.most_common(10)
                ]

            text_tokens: Counter[str] = Counter()
            for sample in frame_samples:
                for text_item in sample.text or []:
                    for token in re.findall(r"[A-Za-z0-9]{3,}", text_item.lower()):
                        text_tokens[token] += 1
            if text_tokens:
                summary["top_ocr_terms"] = [
                    {"term": token, "count": count} for token, count in text_tokens.most_common(10)
                ]

        if motion_section.camera_motion_samples:
            camera_counts = Counter(item.label for item in motion_section.camera_motion_samples)
            summary["camera_motion_distribution"] = dict(camera_counts)
        elif motion_section.motion_timeline:
            motion_counts = Counter(item.motion.motion_type for item in motion_section.motion_timeline)
            summary["motion_type_distribution"] = dict(motion_counts)

        return summary

    def _initialize_frame_steps(self, steps: dict[str, AnalysisStepStatus]) -> dict[str, dict[str, Any]]:
        runtime: dict[str, dict[str, Any]] = {}

        for analyzer_id, analyzer_cls in (
            (OBJECT_DETECTOR, ObjectDetector),
            (TEXT_DETECTOR, TextDetector),
        ):
            if analyzer_id not in self.config.enabled_analyzers:
                continue

            optional = analyzer_id in self.config.optional_analyzers
            analyzer = self._create_analyzer(analyzer_cls, analyzer_id=analyzer_id, steps=steps, optional=optional)
            if analyzer is not None:
                runtime[analyzer_id] = self._frame_runtime(analyzer, optional=optional)

        for analyzer_id, analyzer_cls2 in (
            (FACE_DETECTOR, FaceDetector),
            (IMAGE_TO_TEXT, ImageToText),
            (CAMERA_MOTION_DETECTOR, CameraMotionDetector),
        ):
            if analyzer_id not in self.config.enabled_analyzers:
                continue
            optional = analyzer_id in self.config.optional_analyzers
            analyzer = self._create_analyzer(analyzer_cls2, analyzer_id=analyzer_id, steps=steps, optional=optional)
            if analyzer is not None:
                runtime[analyzer_id] = self._frame_runtime(analyzer, optional=optional)

        return runtime

    def _frame_runtime(self, analyzer: Any, *, optional: bool) -> dict[str, Any]:
        return {
            "analyzer": analyzer,
            "optional": optional,
            "started": time.perf_counter(),
            "processed": 0,
            "error": None,
        }

    def _create_analyzer(
        self,
        analyzer_cls: Any,
        *,
        analyzer_id: str,
        steps: dict[str, AnalysisStepStatus],
        optional: bool,
    ) -> Any | None:
        started = time.perf_counter()
        try:
            analyzer = analyzer_cls(**self._analyzer_kwargs(analyzer_id))
        except Exception as exc:
            duration = time.perf_counter() - started
            status = "skipped" if optional else "failed"
            steps[analyzer_id] = AnalysisStepStatus(status=status, duration_seconds=duration, error=str(exc))
            if self._should_raise(optional=optional):
                raise
            return None
        return analyzer

    def _finalize_frame_steps(
        self,
        *,
        frame_steps_runtime: dict[str, dict[str, Any]],
        steps: dict[str, AnalysisStepStatus],
    ) -> None:
        for analyzer_id, runtime in frame_steps_runtime.items():
            duration = time.perf_counter() - runtime["started"]
            error = runtime["error"]
            optional = bool(runtime["optional"])
            processed = int(runtime["processed"])
            if error is not None:
                status = "skipped" if optional else "failed"
                steps[analyzer_id] = AnalysisStepStatus(
                    status=status,
                    duration_seconds=duration,
                    error=error,
                    details={"processed_samples": processed},
                )
            else:
                steps[analyzer_id] = AnalysisStepStatus(
                    status="succeeded",
                    duration_seconds=duration,
                    details={"processed_samples": processed},
                )

    def _process_path_samples(
        self,
        *,
        path: Path,
        metadata: VideoMetadata,
        frame_indices: list[int],
        frame_steps_runtime: dict[str, dict[str, Any]],
        steps: dict[str, AnalysisStepStatus],
    ) -> tuple[list[FrameAnalysisSample], list[CameraMotionSample]]:
        sample_count = len(frame_indices)
        density = sample_count / max(metadata.frame_count, 1)
        use_streaming = density >= 0.20

        if use_streaming:
            return self._process_path_samples_streaming(
                path=path,
                metadata=metadata,
                frame_indices=frame_indices,
                frame_steps_runtime=frame_steps_runtime,
                steps=steps,
            )
        return self._process_path_samples_chunked(
            path=path,
            metadata=metadata,
            frame_indices=frame_indices,
            frame_steps_runtime=frame_steps_runtime,
            steps=steps,
        )

    def _process_path_samples_streaming(
        self,
        *,
        path: Path,
        metadata: VideoMetadata,
        frame_indices: list[int],
        frame_steps_runtime: dict[str, dict[str, Any]],
        steps: dict[str, AnalysisStepStatus],
    ) -> tuple[list[FrameAnalysisSample], list[CameraMotionSample]]:
        samples: list[FrameAnalysisSample] = []
        camera_samples: list[CameraMotionSample] = []
        if not frame_indices:
            return samples, camera_samples

        target_indices = sorted(frame_indices)
        target_pos = 0

        start_second = target_indices[0] / metadata.fps
        end_second = min(metadata.total_seconds, (target_indices[-1] + 1) / metadata.fps)

        camera_window: deque[tuple[int, float, np.ndarray]] = deque(maxlen=max(self.config.camera_motion_stride, 1) + 1)

        with FrameIterator(path, start_second=start_second, end_second=end_second) as iterator:
            for frame_idx, frame in iterator:
                while target_pos < len(target_indices) and target_indices[target_pos] < frame_idx:
                    target_pos += 1
                if target_pos >= len(target_indices):
                    break
                if frame_idx != target_indices[target_pos]:
                    continue

                sample, camera_sample = self._analyze_sampled_frame(
                    frame=frame,
                    frame_index=frame_idx,
                    fps=metadata.fps,
                    frame_steps_runtime=frame_steps_runtime,
                    camera_window=camera_window,
                    steps=steps,
                )
                samples.append(sample)
                if camera_sample is not None:
                    camera_samples.append(camera_sample)

                target_pos += 1
                if target_pos >= len(target_indices):
                    break

        return samples, camera_samples

    def _process_path_samples_chunked(
        self,
        *,
        path: Path,
        metadata: VideoMetadata,
        frame_indices: list[int],
        frame_steps_runtime: dict[str, dict[str, Any]],
        steps: dict[str, AnalysisStepStatus],
    ) -> tuple[list[FrameAnalysisSample], list[CameraMotionSample]]:
        samples: list[FrameAnalysisSample] = []
        camera_samples: list[CameraMotionSample] = []
        camera_window: deque[tuple[int, float, np.ndarray]] = deque(maxlen=max(self.config.camera_motion_stride, 1) + 1)

        chunk_size = self._effective_frame_chunk_size(metadata)
        for chunk_start in range(0, len(frame_indices), chunk_size):
            chunk_indices = frame_indices[chunk_start : chunk_start + chunk_size]
            chunk_frames = extract_frames_at_indices(path, chunk_indices)
            for i, frame in enumerate(chunk_frames):
                frame_idx = chunk_indices[i]
                sample, camera_sample = self._analyze_sampled_frame(
                    frame=frame,
                    frame_index=frame_idx,
                    fps=metadata.fps,
                    frame_steps_runtime=frame_steps_runtime,
                    camera_window=camera_window,
                    steps=steps,
                )
                samples.append(sample)
                if camera_sample is not None:
                    camera_samples.append(camera_sample)

        return samples, camera_samples

    def _process_video_samples(
        self,
        *,
        video: Video | None,
        frame_indices: list[int],
        frame_steps_runtime: dict[str, dict[str, Any]],
        steps: dict[str, AnalysisStepStatus],
    ) -> tuple[list[FrameAnalysisSample], list[CameraMotionSample]]:
        if video is None:
            return [], []

        samples: list[FrameAnalysisSample] = []
        camera_samples: list[CameraMotionSample] = []
        camera_window: deque[tuple[int, float, np.ndarray]] = deque(maxlen=max(self.config.camera_motion_stride, 1) + 1)

        for frame_idx in frame_indices:
            sample, camera_sample = self._analyze_sampled_frame(
                frame=video.frames[frame_idx],
                frame_index=frame_idx,
                fps=video.fps,
                frame_steps_runtime=frame_steps_runtime,
                camera_window=camera_window,
                steps=steps,
            )
            samples.append(sample)
            if camera_sample is not None:
                camera_samples.append(camera_sample)

        return samples, camera_samples

    def _analyze_sampled_frame(
        self,
        *,
        frame: np.ndarray,
        frame_index: int,
        fps: float,
        frame_steps_runtime: dict[str, dict[str, Any]],
        camera_window: deque[tuple[int, float, np.ndarray]],
        steps: dict[str, AnalysisStepStatus],
    ) -> tuple[FrameAnalysisSample, CameraMotionSample | None]:
        timestamp = round(frame_index / fps, 6)
        sample = FrameAnalysisSample(
            timestamp=timestamp,
            frame_index=frame_index,
            objects=[],
            faces=[],
            text=[],
            text_regions=[],
        )
        step_results: dict[str, str] = {}

        for analyzer_id in (
            OBJECT_DETECTOR,
            FACE_DETECTOR,
            TEXT_DETECTOR,
            IMAGE_TO_TEXT,
        ):
            runtime = frame_steps_runtime.get(analyzer_id)
            if runtime is None or runtime["error"] is not None:
                continue

            analyzer = runtime["analyzer"]
            try:
                if analyzer_id == OBJECT_DETECTOR:
                    sample.objects = analyzer.detect(frame)
                elif analyzer_id == FACE_DETECTOR:
                    sample.faces = analyzer.detect(frame)
                elif analyzer_id == TEXT_DETECTOR:
                    if hasattr(analyzer, "detect_detailed"):
                        detailed = analyzer.detect_detailed(frame)
                        sample.text_regions = detailed
                        sample.text = [item.text for item in detailed]
                    else:
                        sample.text = analyzer.detect(frame)
                elif analyzer_id == IMAGE_TO_TEXT:
                    sample.image_caption = analyzer.describe_image(frame)

                runtime["processed"] += 1
                step_results[analyzer_id] = "ok"
            except Exception as exc:
                runtime["error"] = str(exc)
                step_results[analyzer_id] = "error"
                if self._should_raise(optional=bool(runtime["optional"])):
                    raise

        camera_sample: CameraMotionSample | None = None
        camera_runtime = frame_steps_runtime.get(CAMERA_MOTION_DETECTOR)
        if camera_runtime is not None and camera_runtime["error"] is None:
            camera_window.append((frame_index, timestamp, frame))
            stride = max(int(self.config.camera_motion_stride), 1)
            if len(camera_window) >= stride + 1:
                first_idx, first_timestamp, first_frame = camera_window[0]
                _, last_timestamp, last_frame = camera_window[-1]
                try:
                    label = camera_runtime["analyzer"].detect(first_frame, last_frame)
                    camera_runtime["processed"] += 1
                    step_results[CAMERA_MOTION_DETECTOR] = "ok"
                    camera_sample = CameraMotionSample(start=first_timestamp, end=last_timestamp, label=label)
                except Exception as exc:
                    camera_runtime["error"] = str(exc)
                    step_results[CAMERA_MOTION_DETECTOR] = "error"
                    if self._should_raise(optional=bool(camera_runtime["optional"])):
                        raise

        sample.objects = sample.objects if sample.objects is not None else []
        sample.faces = sample.faces if sample.faces is not None else []
        sample.text = sample.text if sample.text is not None else []
        sample.text_regions = sample.text_regions if sample.text_regions is not None else []

        # Keep payload compact.
        if step_results:
            sample.step_results = step_results

        return sample, camera_sample

    def _choose_frame_access_mode(self, sampled_frames: int, total_frames: int) -> str:
        if total_frames <= 0:
            return "chunked"
        density = sampled_frames / total_frames
        return "streaming" if density >= 0.20 else "chunked"

    def _effective_max_frames(self, metadata: VideoMetadata) -> int | None:
        """Compute the max sampled frames after applying explicit and memory budget limits."""
        limits: list[int] = []
        if self.config.max_frames is not None:
            limits.append(int(self.config.max_frames))

        if self.config.max_memory_mb is not None:
            frame_bytes = metadata.width * metadata.height * 3
            if frame_bytes > 0:
                budget_bytes = int(self.config.max_memory_mb * 1024 * 1024)
                # Reserve memory for model tensors and transient buffers.
                usable_bytes = max(frame_bytes, int(budget_bytes * 0.5))
                limits.append(max(1, usable_bytes // frame_bytes))

        if not limits:
            return None
        return max(1, min(limits))

    def _effective_frame_chunk_size(self, metadata: VideoMetadata) -> int:
        chunk_size = max(1, int(self.config.frame_chunk_size))
        effective_max_frames = self._effective_max_frames(metadata)
        if effective_max_frames is None:
            return chunk_size
        return max(1, min(chunk_size, effective_max_frames))

    def _apply_max_frames_limit(self, indices: list[int], max_frames: int | None) -> list[int]:
        if max_frames is None or len(indices) <= max_frames:
            return indices
        picks = np.linspace(0, len(indices) - 1, max_frames, dtype=int)
        return [indices[i] for i in picks]

    def _plan_frame_indices(
        self,
        *,
        metadata: VideoMetadata,
        scenes: list[SceneBoundary],
        effective_max_frames: int | None = None,
    ) -> list[int]:
        if metadata.frame_count <= 0:
            return []

        mode = self.config.frame_sampling_mode
        sampled: set[int] = set()

        if mode in {"uniform", "hybrid"}:
            fps = max(float(self.config.frames_per_second), 0.0)
            if fps > 0:
                interval = max(int(round(metadata.fps / fps)), 1)
                sampled.update(range(0, metadata.frame_count, interval))

        if mode in {"scene_boundary", "scene_representative", "hybrid"} and scenes:
            if self.config.include_scene_boundaries:
                sampled.update(max(0, min(metadata.frame_count - 1, scene.start_frame)) for scene in scenes)

            if mode in {"scene_representative", "hybrid"}:
                offset = min(max(self.config.scene_representative_offset, 0.0), 1.0)
                for scene in scenes:
                    span = max(scene.end_frame - scene.start_frame, 1)
                    representative = scene.start_frame + int(round(offset * (span - 1)))
                    sampled.add(max(0, min(metadata.frame_count - 1, representative)))

        if not sampled:
            sampled.add(0)

        ordered = sorted(sampled)
        ordered = self._apply_max_frames_limit(ordered, effective_max_frames)

        return ordered

    def _build_source_from_path(self, path: Path, metadata: VideoMetadata) -> VideoAnalysisSource:
        tags = self._extract_source_tags(path)
        raw_tags = _sanitize_raw_tags(tags, redact_geo=self.config.redact_geo)
        creation_time = _normalize_creation_time(next((tags[k] for k in _CREATION_TIME_TAG_KEYS if k in tags), None))

        geo: GeoMetadata | None = None
        if self.config.include_geo and not self.config.redact_geo:
            geo = _parse_geo_metadata(tags)

        title = tags.get("title") or path.stem

        return VideoAnalysisSource(
            title=title,
            path=str(path),
            filename=path.name,
            duration=metadata.total_seconds,
            fps=metadata.fps,
            width=metadata.width,
            height=metadata.height,
            frame_count=metadata.frame_count,
            creation_time=creation_time,
            geo=geo,
            raw_tags=raw_tags or None,
        )

    def _build_source_from_video(
        self,
        *,
        video: Video,
        source_path: str | Path | None,
        metadata: VideoMetadata,
    ) -> VideoAnalysisSource:
        path_obj = Path(source_path) if source_path is not None else None
        tags = self._extract_source_tags(path_obj) if path_obj else {}
        raw_tags = _sanitize_raw_tags(tags, redact_geo=self.config.redact_geo)
        creation_time = _normalize_creation_time(next((tags[k] for k in _CREATION_TIME_TAG_KEYS if k in tags), None))

        geo: GeoMetadata | None = None
        if self.config.include_geo and not self.config.redact_geo:
            geo = _parse_geo_metadata(tags)

        title = tags.get("title") if tags else None
        if title is None and path_obj is not None:
            title = path_obj.stem

        return VideoAnalysisSource(
            title=title,
            path=str(path_obj) if path_obj else None,
            filename=path_obj.name if path_obj else None,
            duration=video.total_seconds,
            fps=metadata.fps,
            width=metadata.width,
            height=metadata.height,
            frame_count=metadata.frame_count,
            creation_time=creation_time,
            geo=geo,
            raw_tags=raw_tags or None,
        )

    def _extract_source_tags(self, path: Path | None) -> dict[str, str]:
        if path is None:
            return {}

        cmd = [
            "ffprobe",
            "-v",
            "error",
            "-show_entries",
            "format_tags:stream_tags",
            "-of",
            "json",
            str(path),
        ]

        try:
            result = subprocess.run(cmd, capture_output=True, text=True, check=True)
            payload = json.loads(result.stdout)
        except Exception:
            return {}

        tags: dict[str, str] = {}

        format_tags = payload.get("format", {}).get("tags", {})
        if isinstance(format_tags, dict):
            tags.update({str(k).lower(): str(v) for k, v in format_tags.items()})

        for stream in payload.get("streams", []):
            stream_tags = stream.get("tags", {})
            if not isinstance(stream_tags, dict):
                continue
            for key, value in stream_tags.items():
                lowered = str(key).lower()
                tags.setdefault(lowered, str(value))

        return tags

analyze_path

analyze_path(path: str | Path) -> VideoAnalysis

Analyze a video path with bounded frame memory usage.

Source code in src/videopython/ai/video_analysis.py
def analyze_path(self, path: str | Path) -> VideoAnalysis:
    """Analyze a video path with bounded frame memory usage."""
    path_obj = Path(path)
    metadata = VideoMetadata.from_path(path_obj)
    source = self._build_source_from_path(path_obj, metadata)
    return self._analyze(video=None, source_path=path_obj, metadata=metadata, source=source)

analyze

analyze(
    video: Video, *, source_path: str | Path | None = None
) -> VideoAnalysis

Analyze an in-memory Video object.

Source code in src/videopython/ai/video_analysis.py
def analyze(self, video: Video, *, source_path: str | Path | None = None) -> VideoAnalysis:
    """Analyze an in-memory Video object."""
    metadata = VideoMetadata.from_video(video)
    source = self._build_source_from_video(video=video, source_path=source_path, metadata=metadata)
    return self._analyze(
        video=video,
        source_path=Path(source_path) if source_path else None,
        metadata=metadata,
        source=source,
    )

VideoAnalysis dataclass

Serializable aggregate analysis result for one video.

Source code in src/videopython/ai/video_analysis.py
@dataclass
class VideoAnalysis:
    """Serializable aggregate analysis result for one video."""

    source: VideoAnalysisSource
    config: VideoAnalysisConfig
    run_info: AnalysisRunInfo
    steps: dict[str, AnalysisStepStatus] = field(default_factory=dict)
    audio: AudioAnalysisSection | None = None
    temporal: TemporalAnalysisSection | None = None
    motion: MotionAnalysisSection | None = None
    frames: FrameAnalysisSection | None = None
    summary: dict[str, Any] | None = None

    def to_dict(self) -> dict[str, Any]:
        return {
            "source": self.source.to_dict(),
            "config": self.config.to_dict(),
            "run_info": self.run_info.to_dict(),
            "steps": {name: step.to_dict() for name, step in self.steps.items()},
            "audio": self.audio.to_dict() if self.audio else None,
            "temporal": self.temporal.to_dict() if self.temporal else None,
            "motion": self.motion.to_dict() if self.motion else None,
            "frames": self.frames.to_dict() if self.frames else None,
            "summary": self.summary,
        }

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> "VideoAnalysis":
        audio_data = data.get("audio")
        temporal_data = data.get("temporal")
        motion_data = data.get("motion")
        frames_data = data.get("frames")
        return cls(
            source=VideoAnalysisSource.from_dict(data["source"]),
            config=VideoAnalysisConfig.from_dict(data["config"]),
            run_info=AnalysisRunInfo.from_dict(data["run_info"]),
            steps={name: AnalysisStepStatus.from_dict(item) for name, item in data.get("steps", {}).items()},
            audio=AudioAnalysisSection.from_dict(audio_data) if audio_data else None,
            temporal=TemporalAnalysisSection.from_dict(temporal_data) if temporal_data else None,
            motion=MotionAnalysisSection.from_dict(motion_data) if motion_data else None,
            frames=FrameAnalysisSection.from_dict(frames_data) if frames_data else None,
            summary=data.get("summary"),
        )

    def to_json(self, *, indent: int | None = 2) -> str:
        return json.dumps(self.to_dict(), ensure_ascii=False, indent=indent)

    @classmethod
    def from_json(cls, text: str) -> "VideoAnalysis":
        return cls.from_dict(json.loads(text))

    def save(self, path: str | Path, *, indent: int | None = 2) -> None:
        path_obj = Path(path)
        path_obj.parent.mkdir(parents=True, exist_ok=True)
        path_obj.write_text(self.to_json(indent=indent), encoding="utf-8")

    @classmethod
    def load(cls, path: str | Path) -> "VideoAnalysis":
        return cls.from_json(Path(path).read_text(encoding="utf-8"))