Skip to content

Video

The Video class is the core data structure in videopython.

Video

Video

Source code in src/videopython/base/video.py
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
class Video:
    def __init__(self, frames: np.ndarray, fps: int | float, audio: Audio | None = None):
        self.frames = frames
        self.fps = fps
        if audio:
            self.audio = audio
        else:
            self.audio = Audio.create_silent(
                duration_seconds=round(self.total_seconds, 2), stereo=True, sample_rate=44100
            )

    @classmethod
    def from_path(
        cls,
        path: str,
        read_batch_size: int = 100,
        start_second: float | None = None,
        end_second: float | None = None,
        fps: float | None = None,
        width: int | None = None,
        height: int | None = None,
    ) -> Video:
        try:
            # Get video metadata using VideoMetadata.from_path
            metadata = VideoMetadata.from_path(path)

            out_width = width if width is not None else metadata.width
            out_height = height if height is not None else metadata.height
            out_fps = fps if fps is not None else metadata.fps
            total_duration = metadata.total_seconds

            # Validate time bounds
            if start_second is not None and start_second < 0:
                raise ValueError("start_second must be non-negative")
            if end_second is not None and end_second > total_duration:
                raise ValueError(f"end_second ({end_second}) exceeds video duration ({total_duration})")
            if start_second is not None and end_second is not None and start_second >= end_second:
                raise ValueError("start_second must be less than end_second")

            # Estimate memory usage and warn for large videos
            segment_duration = total_duration
            if start_second is not None and end_second is not None:
                segment_duration = end_second - start_second
            elif end_second is not None:
                segment_duration = end_second
            elif start_second is not None:
                segment_duration = total_duration - start_second

            estimated_frames = int(segment_duration * out_fps)
            estimated_bytes = estimated_frames * out_height * out_width * 3
            estimated_gb = estimated_bytes / (1024**3)
            if estimated_gb > 10:
                warnings.warn(
                    f"Loading this video will use ~{estimated_gb:.1f}GB of RAM. "
                    f"For large videos, consider using FrameIterator for memory-efficient streaming.",
                    ResourceWarning,
                    stacklevel=2,
                )

            # Build FFmpeg command with improved segment handling
            ffmpeg_cmd = ["ffmpeg"]

            # Add seek option BEFORE input for more efficient seeking
            if start_second is not None:
                ffmpeg_cmd.extend(["-ss", str(start_second)])

            ffmpeg_cmd.extend(["-i", path])

            # Add duration AFTER input for more precise timing
            if end_second is not None and start_second is not None:
                duration = end_second - start_second
                ffmpeg_cmd.extend(["-t", str(duration)])
            elif end_second is not None:
                ffmpeg_cmd.extend(["-t", str(end_second)])

            # Apply video filters for resize and fps resampling
            vf_filters: list[str] = []
            if width is not None or height is not None:
                vf_filters.append(f"scale={out_width}:{out_height}")
            if fps is not None and fps != metadata.fps:
                vf_filters.append(f"fps={out_fps}")
            if vf_filters:
                ffmpeg_cmd.extend(["-vf", ",".join(vf_filters)])

            # Output format settings - removed problematic -vsync 0
            ffmpeg_cmd.extend(
                [
                    "-f",
                    "rawvideo",
                    "-pix_fmt",
                    "rgb24",
                    "-vcodec",
                    "rawvideo",
                    "-avoid_negative_ts",
                    "make_zero",  # Handle timing issues
                    "-y",
                    "pipe:1",
                ]
            )

            # Start FFmpeg process with stderr redirected to avoid deadlock
            process = subprocess.Popen(
                ffmpeg_cmd,
                stdout=subprocess.PIPE,
                stderr=subprocess.DEVNULL,  # Redirect stderr to avoid deadlock
                bufsize=10**8,  # Use large buffer for efficient I/O
            )

            # Calculate frame size in bytes
            frame_size = out_width * out_height * 3  # 3 bytes per pixel for RGB

            # Estimate frame count for pre-allocation
            if start_second is not None and end_second is not None:
                estimated_duration = end_second - start_second
            elif end_second is not None:
                estimated_duration = end_second
            elif start_second is not None:
                estimated_duration = total_duration - start_second
            else:
                estimated_duration = total_duration

            # Add buffer to handle frame rate variations and rounding
            estimated_frames = int(estimated_duration * out_fps * FRAME_BUFFER_MULTIPLIER) + FRAME_BUFFER_PADDING

            # Pre-allocate numpy array
            frames = np.empty((estimated_frames, out_height, out_width, 3), dtype=np.uint8)
            frames_read = 0

            try:
                while frames_read < estimated_frames:
                    # Calculate remaining frames to read
                    remaining_frames = estimated_frames - frames_read
                    batch_size = min(read_batch_size, remaining_frames)

                    # Read batch of data
                    batch_data = process.stdout.read(frame_size * batch_size)  # type: ignore

                    if not batch_data:
                        break

                    # Convert to numpy array
                    batch_frames = np.frombuffer(batch_data, dtype=np.uint8)

                    # Calculate how many complete frames we got
                    complete_frames = len(batch_frames) // (out_height * out_width * 3)

                    if complete_frames == 0:
                        break

                    # Only keep complete frames
                    complete_data = batch_frames[: complete_frames * out_height * out_width * 3]
                    batch_frames_array = complete_data.reshape(complete_frames, out_height, out_width, 3)

                    # Check if we have room in pre-allocated array
                    if frames_read + complete_frames > estimated_frames:
                        # Need to expand array - this should be rare with our buffer
                        new_size = max(estimated_frames * 2, frames_read + complete_frames + 100)
                        new_frames = np.empty((new_size, out_height, out_width, 3), dtype=np.uint8)
                        new_frames[:frames_read] = frames[:frames_read]
                        frames = new_frames
                        estimated_frames = new_size

                    # Store batch in pre-allocated array
                    end_idx = frames_read + complete_frames
                    frames[frames_read:end_idx] = batch_frames_array
                    frames_read += complete_frames

            finally:
                # Ensure process is properly terminated
                if process.poll() is None:
                    process.terminate()
                    try:
                        process.wait(timeout=5)
                    except subprocess.TimeoutExpired:
                        process.kill()
                        process.wait()

                # Clean up pipes
                if process.stdout:
                    process.stdout.close()

            # Check if FFmpeg had an error (non-zero return code)
            if process.returncode not in (0, None) and frames_read == 0:
                raise ValueError(f"FFmpeg failed to process video (return code: {process.returncode})")

            if frames_read == 0:
                raise ValueError("No frames were read from the video")

            # Trim the pre-allocated array to actual frames read
            frames = frames[:frames_read]  # type: ignore

            # Load audio for the specified segment
            try:
                audio = Audio.from_path(path)
                # Slice audio to match the video segment
                if start_second is not None or end_second is not None:
                    audio_start = start_second if start_second is not None else 0
                    audio_end = end_second if end_second is not None else audio.metadata.duration_seconds
                    audio = audio.slice(start_seconds=audio_start, end_seconds=audio_end)
            except (AudioLoadError, FileNotFoundError, subprocess.CalledProcessError):
                warnings.warn(f"No audio found for `{path}`, adding silent track.")
                # Create silent audio based on actual frames read
                segment_duration = frames_read / out_fps
                audio = Audio.create_silent(duration_seconds=round(segment_duration, 2), stereo=True, sample_rate=44100)

            return cls(frames=frames, fps=out_fps, audio=audio)

        except VideoMetadataError:
            raise
        except subprocess.CalledProcessError as e:
            raise VideoLoadError(f"FFmpeg failed: {e}")
        except (OSError, IOError) as e:
            raise VideoLoadError(f"I/O error: {e}")

    @classmethod
    def from_frames(cls, frames: np.ndarray, fps: float) -> Video:
        if frames.ndim != 4:
            raise ValueError(f"Unsupported number of dimensions: {frames.shape}!")
        elif frames.shape[-1] == 4:
            frames = frames[:, :, :, :3]
        elif frames.shape[-1] != 3:
            raise ValueError(f"Unsupported number of dimensions: {frames.shape}!")
        return cls(frames=frames, fps=fps)

    @classmethod
    def from_image(cls, image: np.ndarray, fps: float = 24.0, length_seconds: float = 1.0) -> Video:
        if len(image.shape) == 3:
            image = np.expand_dims(image, axis=0)
        frames = np.repeat(image, round(length_seconds * fps), axis=0)
        return cls(frames=frames, fps=fps)

    def copy(self) -> Video:
        copied = Video.from_frames(self.frames.copy(), self.fps)
        copied.audio = self.audio  # Audio objects are immutable, no need to copy
        return copied

    def is_loaded(self) -> bool:
        return self.fps is not None and self.frames is not None and self.audio is not None

    def split(self, frame_index: int | None = None) -> tuple[Video, Video]:
        if frame_index:
            if not (0 <= frame_index <= len(self.frames)):
                raise ValueError(f"frame_idx must be between 0 and {len(self.frames)}, got {frame_index}")
        else:
            frame_index = len(self.frames) // 2

        split_videos = (
            self.from_frames(self.frames[:frame_index], self.fps),
            self.from_frames(self.frames[frame_index:], self.fps),
        )

        # Split audio at the corresponding time point
        split_time = frame_index / self.fps
        split_videos[0].audio = self.audio.slice(start_seconds=0, end_seconds=split_time)
        split_videos[1].audio = self.audio.slice(start_seconds=split_time)

        return split_videos

    def save(
        self,
        filename: str | Path | None = None,
        format: ALLOWED_VIDEO_FORMATS = "mp4",
        preset: ALLOWED_VIDEO_PRESETS = "medium",
        crf: int = 23,
    ) -> Path:
        """Save video to file.

        Args:
            filename: Output filename. If None, generates random name
            format: Output format (mp4, avi, mov, mkv, webm)
            preset: Encoding speed/compression tradeoff. Slower presets give smaller
                files at the same quality. Options from fastest to smallest:
                ultrafast, superfast, veryfast, faster, fast, medium, slow, slower, veryslow
            crf: Constant Rate Factor (0-51). Lower = better quality, larger file.
                Default 23 is visually lossless for most content. Range 18-28 recommended.

        Returns:
            Path to saved video file

        Raises:
            RuntimeError: If video is not loaded
            ValueError: If format or preset is not supported
        """
        if not self.is_loaded():
            raise RuntimeError("Video is not loaded, cannot save!")

        if format.lower() not in get_args(ALLOWED_VIDEO_FORMATS):
            raise ValueError(
                f"Unsupported format: {format}. Allowed formats are: {', '.join(get_args(ALLOWED_VIDEO_FORMATS))}"
            )

        if preset not in get_args(ALLOWED_VIDEO_PRESETS):
            raise ValueError(
                f"Unsupported preset: {preset}. Allowed presets are: {', '.join(get_args(ALLOWED_VIDEO_PRESETS))}"
            )

        frame_height, frame_width = self.frame_shape[:2]
        if frame_width % 2 != 0 or frame_height % 2 != 0:
            raise ValueError(
                "Current save pipeline uses libx264 with yuv420p, which requires even frame dimensions. "
                f"Got {frame_width}x{frame_height}. "
                "Resize, crop, or pad to an even width and height before saving."
            )

        if filename is None:
            filename = Path(generate_random_name(suffix=f".{format}"))
        else:
            filename = Path(filename).with_suffix(f".{format}")
            filename.parent.mkdir(parents=True, exist_ok=True)

        # Save audio to temporary WAV file
        with tempfile.NamedTemporaryFile(suffix=".wav") as temp_audio:
            self.audio.save(temp_audio.name, format="wav")

            # Calculate exact duration
            duration = len(self.frames) / self.fps

            # Construct FFmpeg command (stream raw video via stdin)
            ffmpeg_command = [
                "ffmpeg",
                "-y",
                "-hide_banner",
                "-loglevel",
                "error",
                # Raw video input settings
                "-f",
                "rawvideo",
                "-pixel_format",
                "rgb24",
                "-video_size",
                f"{self.frame_shape[1]}x{self.frame_shape[0]}",
                "-framerate",
                str(self.fps),
                "-i",
                "pipe:0",
                # Audio input
                "-i",
                temp_audio.name,
                # Video encoding settings
                "-c:v",
                "libx264",
                "-preset",
                preset,
                "-crf",
                str(crf),
                # Audio settings
                "-c:a",
                "aac",
                "-b:a",
                "192k",
                # Output settings
                "-pix_fmt",
                "yuv420p",
                "-movflags",
                "+faststart",  # Enable fast start for web playback
                "-t",
                str(duration),
                "-vsync",
                "cfr",
                str(filename),
            ]

            process = subprocess.Popen(
                ffmpeg_command,
                stdin=subprocess.PIPE,
                stdout=subprocess.DEVNULL,
                stderr=subprocess.PIPE,
            )

            try:
                if process.stdin is None:
                    raise RuntimeError("Failed to open FFmpeg stdin pipe for video data")

                frames = self.frames
                if frames.dtype != np.uint8 or not frames.flags["C_CONTIGUOUS"]:
                    frames = np.ascontiguousarray(frames, dtype=np.uint8)

                buffer = memoryview(frames)
                try:
                    process.stdin.write(buffer)
                    process.stdin.close()
                except BrokenPipeError as e:
                    stderr = process.stderr.read() if process.stderr is not None else b""
                    returncode = process.wait()
                    raise RuntimeError(
                        f"FFmpeg terminated while receiving video data (code {returncode}): "
                        f"{stderr.decode(errors='ignore')}"
                    ) from e

                stderr = process.stderr.read() if process.stderr is not None else b""
                returncode = process.wait()

                if returncode != 0:
                    raise RuntimeError(f"FFmpeg failed with code {returncode}: {stderr.decode(errors='ignore')}")

                return filename
            finally:
                if process.poll() is None:
                    process.kill()

    def add_audio(self, audio: Audio, overlay: bool = True) -> Video:
        """Add audio to video, returning a new Video instance.

        Args:
            audio: Audio to add
            overlay: If True, overlay on existing audio; if False, replace it

        Returns:
            New Video with the audio added
        """
        video_duration = self.total_seconds
        audio_duration = audio.metadata.duration_seconds

        if audio_duration > video_duration:
            audio = audio.slice(start_seconds=0, end_seconds=video_duration)
        elif audio_duration < video_duration:
            silence_duration = video_duration - audio_duration
            silence = Audio.create_silent(
                duration_seconds=silence_duration,
                stereo=audio.metadata.channels == 2,
                sample_rate=audio.metadata.sample_rate,
            )
            audio = audio.concat(silence)

        new_video = self.copy()
        if new_video.audio.is_silent:
            new_video.audio = audio
        elif overlay:
            new_video.audio = new_video.audio.overlay(audio, position=0.0)
        else:
            new_video.audio = audio
        return new_video

    def add_audio_from_file(self, path: str, overlay: bool = True) -> Video:
        """Add audio from file, returning a new Video instance.

        Args:
            path: Path to audio file
            overlay: If True, overlay on existing audio; if False, replace it

        Returns:
            New Video with the audio added

        Raises:
            AudioLoadError: If audio file cannot be loaded
            FileNotFoundError: If audio file does not exist
        """
        new_audio = Audio.from_path(path)
        return self.add_audio(new_audio, overlay)

    def __add__(self, other: Video) -> Video:
        if self.fps != other.fps:
            raise ValueError("FPS of videos do not match!")
        elif self.frame_shape != other.frame_shape:
            raise ValueError(f"Resolutions do not match: {self.frame_shape} vs {other.frame_shape}")
        new_video = self.from_frames(np.r_["0,2", self.frames, other.frames], fps=self.fps)
        new_video.audio = self.audio.concat(other.audio)
        return new_video

    def __str__(self) -> str:
        return str(self.metadata)

    def __getitem__(self, val: slice) -> Video:
        if not isinstance(val, slice):
            raise ValueError("Only slices are supported for video indexing!")

        # Sub-slice video frames
        sliced = self.from_frames(self.frames[val], fps=self.fps)

        # Handle slicing bounds for audio
        start = val.start if val.start else 0
        stop = val.stop if val.stop else len(self.frames)
        if start < 0:
            start = len(self.frames) + start
        if stop < 0:
            stop = len(self.frames) + stop

        # Slice audio to match video duration
        audio_start = start / self.fps
        audio_end = stop / self.fps
        sliced.audio = self.audio.slice(start_seconds=audio_start, end_seconds=audio_end)
        return sliced

    @property
    def video_shape(self) -> tuple[int, int, int, int]:
        return self.frames.shape

    @property
    def frame_shape(self) -> tuple[int, int, int]:
        return self.frames.shape[1:]

    @property
    def total_seconds(self) -> float:
        return round(self.frames.shape[0] / self.fps, 4)

    @property
    def metadata(self) -> VideoMetadata:
        return VideoMetadata.from_video(self)

    # Fluent API for video transformations
    # These methods mirror the VideoMetadata fluent API

    def cut(self, start: float, end: float) -> Video:
        """Cut video to a time range.

        Args:
            start: Start time in seconds.
            end: End time in seconds.

        Returns:
            New Video with the specified time range.
        """
        from videopython.base.transforms import CutSeconds

        return CutSeconds(start, end).apply(self)

    def cut_frames(self, start: int, end: int) -> Video:
        """Cut video to a frame range.

        Args:
            start: Start frame index (inclusive).
            end: End frame index (exclusive).

        Returns:
            New Video with the specified frame range.
        """
        from videopython.base.transforms import CutFrames

        return CutFrames(start, end).apply(self)

    def resize(
        self,
        width: int | None = None,
        height: int | None = None,
        round_to_even: bool = True,
    ) -> Video:
        """Resize video.

        If only width or height is provided, the other dimension is calculated
        to preserve aspect ratio.

        Args:
            width: Target width in pixels.
            height: Target height in pixels.
            round_to_even: If True (default), snap output width/height to even numbers.

        Returns:
            New Video with the specified dimensions.
        """
        from videopython.base.transforms import Resize

        return Resize(width=width, height=height, round_to_even=round_to_even).apply(self)

    def crop(self, width: int, height: int) -> Video:
        """Crop video to specified dimensions (center crop).

        Args:
            width: Target width in pixels.
            height: Target height in pixels.

        Returns:
            New Video with the specified dimensions.
        """
        from videopython.base.transforms import Crop

        return Crop(width=width, height=height).apply(self)

    def resample_fps(self, fps: float) -> Video:
        """Resample video to a different frame rate.

        Args:
            fps: Target frames per second.

        Returns:
            New Video with the specified frame rate.
        """
        from videopython.base.transforms import ResampleFPS

        return ResampleFPS(fps=fps).apply(self)

    def transition_to(self, other: Video, transition: object) -> Video:
        """Combine with another video using a transition.

        Args:
            other: Video to transition to.
            transition: Transition to apply (e.g., FadeTransition, BlurTransition).

        Returns:
            New Video combining both videos with the transition effect.
        """
        from videopython.base.transitions import Transition

        if not isinstance(transition, Transition):
            raise TypeError(f"Expected Transition, got {type(transition).__name__}")
        return transition.apply((self, other))

    def ken_burns(
        self,
        start_region: "BoundingBox",
        end_region: "BoundingBox",
        easing: Literal["linear", "ease_in", "ease_out", "ease_in_out"] = "linear",
        start: float | None = None,
        stop: float | None = None,
    ) -> Video:
        """Apply Ken Burns pan-and-zoom effect.

        Creates cinematic movement by smoothly transitioning between two regions.

        Args:
            start_region: Starting crop region (BoundingBox with normalized 0-1 coordinates).
            end_region: Ending crop region (BoundingBox with normalized 0-1 coordinates).
            easing: Animation easing - "linear", "ease_in", "ease_out", or "ease_in_out".
            start: Optional start time in seconds for the effect.
            stop: Optional stop time in seconds for the effect.

        Returns:
            New Video with Ken Burns effect applied.
        """
        from videopython.base.effects import KenBurns

        return KenBurns(start_region=start_region, end_region=end_region, easing=easing).apply(
            self, start=start, stop=stop
        )

    def picture_in_picture(
        self,
        overlay: Video,
        position: tuple[float, float] = (0.7, 0.7),
        scale: float = 0.25,
        border_width: int = 0,
        border_color: tuple[int, int, int] = (255, 255, 255),
        corner_radius: int = 0,
        opacity: float = 1.0,
        audio_mode: Literal["main", "overlay", "mix"] = "main",
        audio_mix: tuple[float, float] = (1.0, 1.0),
    ) -> Video:
        """Overlay another video as picture-in-picture.

        Args:
            overlay: Video to overlay on this video.
            position: Normalized (x, y) center position, (0,0)=top-left, (1,1)=bottom-right.
            scale: Overlay size relative to main video width (0.25 = 25%).
            border_width: Border width in pixels (default 0).
            border_color: Border color as RGB tuple (default white).
            corner_radius: Rounded corner radius in pixels (default 0).
            opacity: Overlay transparency from 0 to 1 (default 1.0).
            audio_mode: Audio handling - "main" (default), "overlay", or "mix".
            audio_mix: Volume factors (main, overlay) for mix mode, default (1.0, 1.0).

        Returns:
            New Video with picture-in-picture overlay.
        """
        from videopython.base.transforms import PictureInPicture

        return PictureInPicture(
            overlay=overlay,
            position=position,
            scale=scale,
            border_width=border_width,
            border_color=border_color,
            corner_radius=corner_radius,
            opacity=opacity,
            audio_mode=audio_mode,
            audio_mix=audio_mix,
        ).apply(self)

video_shape property

video_shape: tuple[int, int, int, int]

frame_shape property

frame_shape: tuple[int, int, int]

total_seconds property

total_seconds: float

metadata property

metadata: VideoMetadata

__init__

__init__(
    frames: ndarray,
    fps: int | float,
    audio: Audio | None = None,
)
Source code in src/videopython/base/video.py
def __init__(self, frames: np.ndarray, fps: int | float, audio: Audio | None = None):
    self.frames = frames
    self.fps = fps
    if audio:
        self.audio = audio
    else:
        self.audio = Audio.create_silent(
            duration_seconds=round(self.total_seconds, 2), stereo=True, sample_rate=44100
        )

from_path classmethod

from_path(
    path: str,
    read_batch_size: int = 100,
    start_second: float | None = None,
    end_second: float | None = None,
    fps: float | None = None,
    width: int | None = None,
    height: int | None = None,
) -> Video
Source code in src/videopython/base/video.py
@classmethod
def from_path(
    cls,
    path: str,
    read_batch_size: int = 100,
    start_second: float | None = None,
    end_second: float | None = None,
    fps: float | None = None,
    width: int | None = None,
    height: int | None = None,
) -> Video:
    try:
        # Get video metadata using VideoMetadata.from_path
        metadata = VideoMetadata.from_path(path)

        out_width = width if width is not None else metadata.width
        out_height = height if height is not None else metadata.height
        out_fps = fps if fps is not None else metadata.fps
        total_duration = metadata.total_seconds

        # Validate time bounds
        if start_second is not None and start_second < 0:
            raise ValueError("start_second must be non-negative")
        if end_second is not None and end_second > total_duration:
            raise ValueError(f"end_second ({end_second}) exceeds video duration ({total_duration})")
        if start_second is not None and end_second is not None and start_second >= end_second:
            raise ValueError("start_second must be less than end_second")

        # Estimate memory usage and warn for large videos
        segment_duration = total_duration
        if start_second is not None and end_second is not None:
            segment_duration = end_second - start_second
        elif end_second is not None:
            segment_duration = end_second
        elif start_second is not None:
            segment_duration = total_duration - start_second

        estimated_frames = int(segment_duration * out_fps)
        estimated_bytes = estimated_frames * out_height * out_width * 3
        estimated_gb = estimated_bytes / (1024**3)
        if estimated_gb > 10:
            warnings.warn(
                f"Loading this video will use ~{estimated_gb:.1f}GB of RAM. "
                f"For large videos, consider using FrameIterator for memory-efficient streaming.",
                ResourceWarning,
                stacklevel=2,
            )

        # Build FFmpeg command with improved segment handling
        ffmpeg_cmd = ["ffmpeg"]

        # Add seek option BEFORE input for more efficient seeking
        if start_second is not None:
            ffmpeg_cmd.extend(["-ss", str(start_second)])

        ffmpeg_cmd.extend(["-i", path])

        # Add duration AFTER input for more precise timing
        if end_second is not None and start_second is not None:
            duration = end_second - start_second
            ffmpeg_cmd.extend(["-t", str(duration)])
        elif end_second is not None:
            ffmpeg_cmd.extend(["-t", str(end_second)])

        # Apply video filters for resize and fps resampling
        vf_filters: list[str] = []
        if width is not None or height is not None:
            vf_filters.append(f"scale={out_width}:{out_height}")
        if fps is not None and fps != metadata.fps:
            vf_filters.append(f"fps={out_fps}")
        if vf_filters:
            ffmpeg_cmd.extend(["-vf", ",".join(vf_filters)])

        # Output format settings - removed problematic -vsync 0
        ffmpeg_cmd.extend(
            [
                "-f",
                "rawvideo",
                "-pix_fmt",
                "rgb24",
                "-vcodec",
                "rawvideo",
                "-avoid_negative_ts",
                "make_zero",  # Handle timing issues
                "-y",
                "pipe:1",
            ]
        )

        # Start FFmpeg process with stderr redirected to avoid deadlock
        process = subprocess.Popen(
            ffmpeg_cmd,
            stdout=subprocess.PIPE,
            stderr=subprocess.DEVNULL,  # Redirect stderr to avoid deadlock
            bufsize=10**8,  # Use large buffer for efficient I/O
        )

        # Calculate frame size in bytes
        frame_size = out_width * out_height * 3  # 3 bytes per pixel for RGB

        # Estimate frame count for pre-allocation
        if start_second is not None and end_second is not None:
            estimated_duration = end_second - start_second
        elif end_second is not None:
            estimated_duration = end_second
        elif start_second is not None:
            estimated_duration = total_duration - start_second
        else:
            estimated_duration = total_duration

        # Add buffer to handle frame rate variations and rounding
        estimated_frames = int(estimated_duration * out_fps * FRAME_BUFFER_MULTIPLIER) + FRAME_BUFFER_PADDING

        # Pre-allocate numpy array
        frames = np.empty((estimated_frames, out_height, out_width, 3), dtype=np.uint8)
        frames_read = 0

        try:
            while frames_read < estimated_frames:
                # Calculate remaining frames to read
                remaining_frames = estimated_frames - frames_read
                batch_size = min(read_batch_size, remaining_frames)

                # Read batch of data
                batch_data = process.stdout.read(frame_size * batch_size)  # type: ignore

                if not batch_data:
                    break

                # Convert to numpy array
                batch_frames = np.frombuffer(batch_data, dtype=np.uint8)

                # Calculate how many complete frames we got
                complete_frames = len(batch_frames) // (out_height * out_width * 3)

                if complete_frames == 0:
                    break

                # Only keep complete frames
                complete_data = batch_frames[: complete_frames * out_height * out_width * 3]
                batch_frames_array = complete_data.reshape(complete_frames, out_height, out_width, 3)

                # Check if we have room in pre-allocated array
                if frames_read + complete_frames > estimated_frames:
                    # Need to expand array - this should be rare with our buffer
                    new_size = max(estimated_frames * 2, frames_read + complete_frames + 100)
                    new_frames = np.empty((new_size, out_height, out_width, 3), dtype=np.uint8)
                    new_frames[:frames_read] = frames[:frames_read]
                    frames = new_frames
                    estimated_frames = new_size

                # Store batch in pre-allocated array
                end_idx = frames_read + complete_frames
                frames[frames_read:end_idx] = batch_frames_array
                frames_read += complete_frames

        finally:
            # Ensure process is properly terminated
            if process.poll() is None:
                process.terminate()
                try:
                    process.wait(timeout=5)
                except subprocess.TimeoutExpired:
                    process.kill()
                    process.wait()

            # Clean up pipes
            if process.stdout:
                process.stdout.close()

        # Check if FFmpeg had an error (non-zero return code)
        if process.returncode not in (0, None) and frames_read == 0:
            raise ValueError(f"FFmpeg failed to process video (return code: {process.returncode})")

        if frames_read == 0:
            raise ValueError("No frames were read from the video")

        # Trim the pre-allocated array to actual frames read
        frames = frames[:frames_read]  # type: ignore

        # Load audio for the specified segment
        try:
            audio = Audio.from_path(path)
            # Slice audio to match the video segment
            if start_second is not None or end_second is not None:
                audio_start = start_second if start_second is not None else 0
                audio_end = end_second if end_second is not None else audio.metadata.duration_seconds
                audio = audio.slice(start_seconds=audio_start, end_seconds=audio_end)
        except (AudioLoadError, FileNotFoundError, subprocess.CalledProcessError):
            warnings.warn(f"No audio found for `{path}`, adding silent track.")
            # Create silent audio based on actual frames read
            segment_duration = frames_read / out_fps
            audio = Audio.create_silent(duration_seconds=round(segment_duration, 2), stereo=True, sample_rate=44100)

        return cls(frames=frames, fps=out_fps, audio=audio)

    except VideoMetadataError:
        raise
    except subprocess.CalledProcessError as e:
        raise VideoLoadError(f"FFmpeg failed: {e}")
    except (OSError, IOError) as e:
        raise VideoLoadError(f"I/O error: {e}")

from_frames classmethod

from_frames(frames: ndarray, fps: float) -> Video
Source code in src/videopython/base/video.py
@classmethod
def from_frames(cls, frames: np.ndarray, fps: float) -> Video:
    if frames.ndim != 4:
        raise ValueError(f"Unsupported number of dimensions: {frames.shape}!")
    elif frames.shape[-1] == 4:
        frames = frames[:, :, :, :3]
    elif frames.shape[-1] != 3:
        raise ValueError(f"Unsupported number of dimensions: {frames.shape}!")
    return cls(frames=frames, fps=fps)

from_image classmethod

from_image(
    image: ndarray,
    fps: float = 24.0,
    length_seconds: float = 1.0,
) -> Video
Source code in src/videopython/base/video.py
@classmethod
def from_image(cls, image: np.ndarray, fps: float = 24.0, length_seconds: float = 1.0) -> Video:
    if len(image.shape) == 3:
        image = np.expand_dims(image, axis=0)
    frames = np.repeat(image, round(length_seconds * fps), axis=0)
    return cls(frames=frames, fps=fps)

save

save(
    filename: str | Path | None = None,
    format: ALLOWED_VIDEO_FORMATS = "mp4",
    preset: ALLOWED_VIDEO_PRESETS = "medium",
    crf: int = 23,
) -> Path

Save video to file.

Parameters:

Name Type Description Default
filename str | Path | None

Output filename. If None, generates random name

None
format ALLOWED_VIDEO_FORMATS

Output format (mp4, avi, mov, mkv, webm)

'mp4'
preset ALLOWED_VIDEO_PRESETS

Encoding speed/compression tradeoff. Slower presets give smaller files at the same quality. Options from fastest to smallest: ultrafast, superfast, veryfast, faster, fast, medium, slow, slower, veryslow

'medium'
crf int

Constant Rate Factor (0-51). Lower = better quality, larger file. Default 23 is visually lossless for most content. Range 18-28 recommended.

23

Returns:

Type Description
Path

Path to saved video file

Raises:

Type Description
RuntimeError

If video is not loaded

ValueError

If format or preset is not supported

Source code in src/videopython/base/video.py
def save(
    self,
    filename: str | Path | None = None,
    format: ALLOWED_VIDEO_FORMATS = "mp4",
    preset: ALLOWED_VIDEO_PRESETS = "medium",
    crf: int = 23,
) -> Path:
    """Save video to file.

    Args:
        filename: Output filename. If None, generates random name
        format: Output format (mp4, avi, mov, mkv, webm)
        preset: Encoding speed/compression tradeoff. Slower presets give smaller
            files at the same quality. Options from fastest to smallest:
            ultrafast, superfast, veryfast, faster, fast, medium, slow, slower, veryslow
        crf: Constant Rate Factor (0-51). Lower = better quality, larger file.
            Default 23 is visually lossless for most content. Range 18-28 recommended.

    Returns:
        Path to saved video file

    Raises:
        RuntimeError: If video is not loaded
        ValueError: If format or preset is not supported
    """
    if not self.is_loaded():
        raise RuntimeError("Video is not loaded, cannot save!")

    if format.lower() not in get_args(ALLOWED_VIDEO_FORMATS):
        raise ValueError(
            f"Unsupported format: {format}. Allowed formats are: {', '.join(get_args(ALLOWED_VIDEO_FORMATS))}"
        )

    if preset not in get_args(ALLOWED_VIDEO_PRESETS):
        raise ValueError(
            f"Unsupported preset: {preset}. Allowed presets are: {', '.join(get_args(ALLOWED_VIDEO_PRESETS))}"
        )

    frame_height, frame_width = self.frame_shape[:2]
    if frame_width % 2 != 0 or frame_height % 2 != 0:
        raise ValueError(
            "Current save pipeline uses libx264 with yuv420p, which requires even frame dimensions. "
            f"Got {frame_width}x{frame_height}. "
            "Resize, crop, or pad to an even width and height before saving."
        )

    if filename is None:
        filename = Path(generate_random_name(suffix=f".{format}"))
    else:
        filename = Path(filename).with_suffix(f".{format}")
        filename.parent.mkdir(parents=True, exist_ok=True)

    # Save audio to temporary WAV file
    with tempfile.NamedTemporaryFile(suffix=".wav") as temp_audio:
        self.audio.save(temp_audio.name, format="wav")

        # Calculate exact duration
        duration = len(self.frames) / self.fps

        # Construct FFmpeg command (stream raw video via stdin)
        ffmpeg_command = [
            "ffmpeg",
            "-y",
            "-hide_banner",
            "-loglevel",
            "error",
            # Raw video input settings
            "-f",
            "rawvideo",
            "-pixel_format",
            "rgb24",
            "-video_size",
            f"{self.frame_shape[1]}x{self.frame_shape[0]}",
            "-framerate",
            str(self.fps),
            "-i",
            "pipe:0",
            # Audio input
            "-i",
            temp_audio.name,
            # Video encoding settings
            "-c:v",
            "libx264",
            "-preset",
            preset,
            "-crf",
            str(crf),
            # Audio settings
            "-c:a",
            "aac",
            "-b:a",
            "192k",
            # Output settings
            "-pix_fmt",
            "yuv420p",
            "-movflags",
            "+faststart",  # Enable fast start for web playback
            "-t",
            str(duration),
            "-vsync",
            "cfr",
            str(filename),
        ]

        process = subprocess.Popen(
            ffmpeg_command,
            stdin=subprocess.PIPE,
            stdout=subprocess.DEVNULL,
            stderr=subprocess.PIPE,
        )

        try:
            if process.stdin is None:
                raise RuntimeError("Failed to open FFmpeg stdin pipe for video data")

            frames = self.frames
            if frames.dtype != np.uint8 or not frames.flags["C_CONTIGUOUS"]:
                frames = np.ascontiguousarray(frames, dtype=np.uint8)

            buffer = memoryview(frames)
            try:
                process.stdin.write(buffer)
                process.stdin.close()
            except BrokenPipeError as e:
                stderr = process.stderr.read() if process.stderr is not None else b""
                returncode = process.wait()
                raise RuntimeError(
                    f"FFmpeg terminated while receiving video data (code {returncode}): "
                    f"{stderr.decode(errors='ignore')}"
                ) from e

            stderr = process.stderr.read() if process.stderr is not None else b""
            returncode = process.wait()

            if returncode != 0:
                raise RuntimeError(f"FFmpeg failed with code {returncode}: {stderr.decode(errors='ignore')}")

            return filename
        finally:
            if process.poll() is None:
                process.kill()

copy

copy() -> Video
Source code in src/videopython/base/video.py
def copy(self) -> Video:
    copied = Video.from_frames(self.frames.copy(), self.fps)
    copied.audio = self.audio  # Audio objects are immutable, no need to copy
    return copied

split

split(
    frame_index: int | None = None,
) -> tuple[Video, Video]
Source code in src/videopython/base/video.py
def split(self, frame_index: int | None = None) -> tuple[Video, Video]:
    if frame_index:
        if not (0 <= frame_index <= len(self.frames)):
            raise ValueError(f"frame_idx must be between 0 and {len(self.frames)}, got {frame_index}")
    else:
        frame_index = len(self.frames) // 2

    split_videos = (
        self.from_frames(self.frames[:frame_index], self.fps),
        self.from_frames(self.frames[frame_index:], self.fps),
    )

    # Split audio at the corresponding time point
    split_time = frame_index / self.fps
    split_videos[0].audio = self.audio.slice(start_seconds=0, end_seconds=split_time)
    split_videos[1].audio = self.audio.slice(start_seconds=split_time)

    return split_videos

add_audio

add_audio(audio: Audio, overlay: bool = True) -> Video

Add audio to video, returning a new Video instance.

Parameters:

Name Type Description Default
audio Audio

Audio to add

required
overlay bool

If True, overlay on existing audio; if False, replace it

True

Returns:

Type Description
Video

New Video with the audio added

Source code in src/videopython/base/video.py
def add_audio(self, audio: Audio, overlay: bool = True) -> Video:
    """Add audio to video, returning a new Video instance.

    Args:
        audio: Audio to add
        overlay: If True, overlay on existing audio; if False, replace it

    Returns:
        New Video with the audio added
    """
    video_duration = self.total_seconds
    audio_duration = audio.metadata.duration_seconds

    if audio_duration > video_duration:
        audio = audio.slice(start_seconds=0, end_seconds=video_duration)
    elif audio_duration < video_duration:
        silence_duration = video_duration - audio_duration
        silence = Audio.create_silent(
            duration_seconds=silence_duration,
            stereo=audio.metadata.channels == 2,
            sample_rate=audio.metadata.sample_rate,
        )
        audio = audio.concat(silence)

    new_video = self.copy()
    if new_video.audio.is_silent:
        new_video.audio = audio
    elif overlay:
        new_video.audio = new_video.audio.overlay(audio, position=0.0)
    else:
        new_video.audio = audio
    return new_video

add_audio_from_file

add_audio_from_file(
    path: str, overlay: bool = True
) -> Video

Add audio from file, returning a new Video instance.

Parameters:

Name Type Description Default
path str

Path to audio file

required
overlay bool

If True, overlay on existing audio; if False, replace it

True

Returns:

Type Description
Video

New Video with the audio added

Raises:

Type Description
AudioLoadError

If audio file cannot be loaded

FileNotFoundError

If audio file does not exist

Source code in src/videopython/base/video.py
def add_audio_from_file(self, path: str, overlay: bool = True) -> Video:
    """Add audio from file, returning a new Video instance.

    Args:
        path: Path to audio file
        overlay: If True, overlay on existing audio; if False, replace it

    Returns:
        New Video with the audio added

    Raises:
        AudioLoadError: If audio file cannot be loaded
        FileNotFoundError: If audio file does not exist
    """
    new_audio = Audio.from_path(path)
    return self.add_audio(new_audio, overlay)

is_loaded

is_loaded() -> bool
Source code in src/videopython/base/video.py
def is_loaded(self) -> bool:
    return self.fps is not None and self.frames is not None and self.audio is not None

cut

cut(start: float, end: float) -> Video

Cut video to a time range.

Parameters:

Name Type Description Default
start float

Start time in seconds.

required
end float

End time in seconds.

required

Returns:

Type Description
Video

New Video with the specified time range.

Source code in src/videopython/base/video.py
def cut(self, start: float, end: float) -> Video:
    """Cut video to a time range.

    Args:
        start: Start time in seconds.
        end: End time in seconds.

    Returns:
        New Video with the specified time range.
    """
    from videopython.base.transforms import CutSeconds

    return CutSeconds(start, end).apply(self)

cut_frames

cut_frames(start: int, end: int) -> Video

Cut video to a frame range.

Parameters:

Name Type Description Default
start int

Start frame index (inclusive).

required
end int

End frame index (exclusive).

required

Returns:

Type Description
Video

New Video with the specified frame range.

Source code in src/videopython/base/video.py
def cut_frames(self, start: int, end: int) -> Video:
    """Cut video to a frame range.

    Args:
        start: Start frame index (inclusive).
        end: End frame index (exclusive).

    Returns:
        New Video with the specified frame range.
    """
    from videopython.base.transforms import CutFrames

    return CutFrames(start, end).apply(self)

resize

resize(
    width: int | None = None,
    height: int | None = None,
    round_to_even: bool = True,
) -> Video

Resize video.

If only width or height is provided, the other dimension is calculated to preserve aspect ratio.

Parameters:

Name Type Description Default
width int | None

Target width in pixels.

None
height int | None

Target height in pixels.

None
round_to_even bool

If True (default), snap output width/height to even numbers.

True

Returns:

Type Description
Video

New Video with the specified dimensions.

Source code in src/videopython/base/video.py
def resize(
    self,
    width: int | None = None,
    height: int | None = None,
    round_to_even: bool = True,
) -> Video:
    """Resize video.

    If only width or height is provided, the other dimension is calculated
    to preserve aspect ratio.

    Args:
        width: Target width in pixels.
        height: Target height in pixels.
        round_to_even: If True (default), snap output width/height to even numbers.

    Returns:
        New Video with the specified dimensions.
    """
    from videopython.base.transforms import Resize

    return Resize(width=width, height=height, round_to_even=round_to_even).apply(self)

crop

crop(width: int, height: int) -> Video

Crop video to specified dimensions (center crop).

Parameters:

Name Type Description Default
width int

Target width in pixels.

required
height int

Target height in pixels.

required

Returns:

Type Description
Video

New Video with the specified dimensions.

Source code in src/videopython/base/video.py
def crop(self, width: int, height: int) -> Video:
    """Crop video to specified dimensions (center crop).

    Args:
        width: Target width in pixels.
        height: Target height in pixels.

    Returns:
        New Video with the specified dimensions.
    """
    from videopython.base.transforms import Crop

    return Crop(width=width, height=height).apply(self)

resample_fps

resample_fps(fps: float) -> Video

Resample video to a different frame rate.

Parameters:

Name Type Description Default
fps float

Target frames per second.

required

Returns:

Type Description
Video

New Video with the specified frame rate.

Source code in src/videopython/base/video.py
def resample_fps(self, fps: float) -> Video:
    """Resample video to a different frame rate.

    Args:
        fps: Target frames per second.

    Returns:
        New Video with the specified frame rate.
    """
    from videopython.base.transforms import ResampleFPS

    return ResampleFPS(fps=fps).apply(self)

transition_to

transition_to(other: Video, transition: object) -> Video

Combine with another video using a transition.

Parameters:

Name Type Description Default
other Video

Video to transition to.

required
transition object

Transition to apply (e.g., FadeTransition, BlurTransition).

required

Returns:

Type Description
Video

New Video combining both videos with the transition effect.

Source code in src/videopython/base/video.py
def transition_to(self, other: Video, transition: object) -> Video:
    """Combine with another video using a transition.

    Args:
        other: Video to transition to.
        transition: Transition to apply (e.g., FadeTransition, BlurTransition).

    Returns:
        New Video combining both videos with the transition effect.
    """
    from videopython.base.transitions import Transition

    if not isinstance(transition, Transition):
        raise TypeError(f"Expected Transition, got {type(transition).__name__}")
    return transition.apply((self, other))

ken_burns

ken_burns(
    start_region: "BoundingBox",
    end_region: "BoundingBox",
    easing: Literal[
        "linear", "ease_in", "ease_out", "ease_in_out"
    ] = "linear",
    start: float | None = None,
    stop: float | None = None,
) -> Video

Apply Ken Burns pan-and-zoom effect.

Creates cinematic movement by smoothly transitioning between two regions.

Parameters:

Name Type Description Default
start_region 'BoundingBox'

Starting crop region (BoundingBox with normalized 0-1 coordinates).

required
end_region 'BoundingBox'

Ending crop region (BoundingBox with normalized 0-1 coordinates).

required
easing Literal['linear', 'ease_in', 'ease_out', 'ease_in_out']

Animation easing - "linear", "ease_in", "ease_out", or "ease_in_out".

'linear'
start float | None

Optional start time in seconds for the effect.

None
stop float | None

Optional stop time in seconds for the effect.

None

Returns:

Type Description
Video

New Video with Ken Burns effect applied.

Source code in src/videopython/base/video.py
def ken_burns(
    self,
    start_region: "BoundingBox",
    end_region: "BoundingBox",
    easing: Literal["linear", "ease_in", "ease_out", "ease_in_out"] = "linear",
    start: float | None = None,
    stop: float | None = None,
) -> Video:
    """Apply Ken Burns pan-and-zoom effect.

    Creates cinematic movement by smoothly transitioning between two regions.

    Args:
        start_region: Starting crop region (BoundingBox with normalized 0-1 coordinates).
        end_region: Ending crop region (BoundingBox with normalized 0-1 coordinates).
        easing: Animation easing - "linear", "ease_in", "ease_out", or "ease_in_out".
        start: Optional start time in seconds for the effect.
        stop: Optional stop time in seconds for the effect.

    Returns:
        New Video with Ken Burns effect applied.
    """
    from videopython.base.effects import KenBurns

    return KenBurns(start_region=start_region, end_region=end_region, easing=easing).apply(
        self, start=start, stop=stop
    )

picture_in_picture

picture_in_picture(
    overlay: Video,
    position: tuple[float, float] = (0.7, 0.7),
    scale: float = 0.25,
    border_width: int = 0,
    border_color: tuple[int, int, int] = (255, 255, 255),
    corner_radius: int = 0,
    opacity: float = 1.0,
    audio_mode: Literal["main", "overlay", "mix"] = "main",
    audio_mix: tuple[float, float] = (1.0, 1.0),
) -> Video

Overlay another video as picture-in-picture.

Parameters:

Name Type Description Default
overlay Video

Video to overlay on this video.

required
position tuple[float, float]

Normalized (x, y) center position, (0,0)=top-left, (1,1)=bottom-right.

(0.7, 0.7)
scale float

Overlay size relative to main video width (0.25 = 25%).

0.25
border_width int

Border width in pixels (default 0).

0
border_color tuple[int, int, int]

Border color as RGB tuple (default white).

(255, 255, 255)
corner_radius int

Rounded corner radius in pixels (default 0).

0
opacity float

Overlay transparency from 0 to 1 (default 1.0).

1.0
audio_mode Literal['main', 'overlay', 'mix']

Audio handling - "main" (default), "overlay", or "mix".

'main'
audio_mix tuple[float, float]

Volume factors (main, overlay) for mix mode, default (1.0, 1.0).

(1.0, 1.0)

Returns:

Type Description
Video

New Video with picture-in-picture overlay.

Source code in src/videopython/base/video.py
def picture_in_picture(
    self,
    overlay: Video,
    position: tuple[float, float] = (0.7, 0.7),
    scale: float = 0.25,
    border_width: int = 0,
    border_color: tuple[int, int, int] = (255, 255, 255),
    corner_radius: int = 0,
    opacity: float = 1.0,
    audio_mode: Literal["main", "overlay", "mix"] = "main",
    audio_mix: tuple[float, float] = (1.0, 1.0),
) -> Video:
    """Overlay another video as picture-in-picture.

    Args:
        overlay: Video to overlay on this video.
        position: Normalized (x, y) center position, (0,0)=top-left, (1,1)=bottom-right.
        scale: Overlay size relative to main video width (0.25 = 25%).
        border_width: Border width in pixels (default 0).
        border_color: Border color as RGB tuple (default white).
        corner_radius: Rounded corner radius in pixels (default 0).
        opacity: Overlay transparency from 0 to 1 (default 1.0).
        audio_mode: Audio handling - "main" (default), "overlay", or "mix".
        audio_mix: Volume factors (main, overlay) for mix mode, default (1.0, 1.0).

    Returns:
        New Video with picture-in-picture overlay.
    """
    from videopython.base.transforms import PictureInPicture

    return PictureInPicture(
        overlay=overlay,
        position=position,
        scale=scale,
        border_width=border_width,
        border_color=border_color,
        corner_radius=corner_radius,
        opacity=opacity,
        audio_mode=audio_mode,
        audio_mix=audio_mix,
    ).apply(self)

VideoMetadata

Get video metadata without loading frames into memory:

from videopython.base import VideoMetadata

metadata = VideoMetadata.from_path("video.mp4")
print(f"Duration: {metadata.total_seconds}s")
print(f"Resolution: {metadata.width}x{metadata.height}")
print(f"FPS: {metadata.fps}")
print(f"Total frames: {metadata.frame_count}")

VideoMetadata dataclass

Class to store video metadata.

Source code in src/videopython/base/video.py
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
@dataclass
class VideoMetadata:
    """Class to store video metadata."""

    height: int
    width: int
    fps: float
    frame_count: int
    total_seconds: float

    def __str__(self) -> str:
        return f"{self.width}x{self.height} @ {self.fps}fps, {self.total_seconds} seconds"

    def __repr__(self) -> str:
        return self.__str__()

    def get_frame_shape(self) -> np.ndarray:
        """Returns frame shape."""
        return np.array((self.height, self.width, 3))

    def get_video_shape(self) -> np.ndarray:
        """Returns video shape."""
        return np.array((self.frame_count, self.height, self.width, 3))

    @staticmethod
    def _run_ffprobe(video_path: str | Path) -> dict:
        """Run ffprobe and return parsed JSON output."""
        cmd = [
            "ffprobe",
            "-v",
            "error",
            "-select_streams",
            "v:0",
            "-show_entries",
            "stream=width,height,r_frame_rate,nb_frames",
            "-show_entries",
            "format=duration",
            "-print_format",
            "json",
            str(video_path),
        ]

        try:
            result = subprocess.run(cmd, capture_output=True, text=True, check=True)
            return json.loads(result.stdout)
        except subprocess.CalledProcessError as e:
            raise VideoMetadataError(f"FFprobe error: {e.stderr}")
        except json.JSONDecodeError as e:
            raise VideoMetadataError(f"Error parsing FFprobe output: {e}")

    @classmethod
    def from_path(cls, video_path: str | Path) -> VideoMetadata:
        """Creates VideoMetadata object from video file using ffprobe."""
        if not Path(video_path).exists():
            raise FileNotFoundError(f"Video file not found: {video_path}")

        probe_data = cls._run_ffprobe(video_path)

        try:
            stream_info = probe_data["streams"][0]

            width = int(stream_info["width"])
            height = int(stream_info["height"])

            try:
                fps_fraction = Fraction(stream_info["r_frame_rate"])
                fps = float(fps_fraction)
            except (ValueError, ZeroDivisionError):
                raise VideoMetadataError(f"Invalid frame rate: {stream_info['r_frame_rate']}")

            if "nb_frames" in stream_info and stream_info["nb_frames"].isdigit():
                frame_count = int(stream_info["nb_frames"])
            else:
                duration = float(probe_data["format"]["duration"])
                frame_count = int(round(duration * fps))

            total_seconds = round(frame_count / fps, 4)

            return cls(height=height, width=width, fps=fps, frame_count=frame_count, total_seconds=total_seconds)

        except KeyError as e:
            raise VideoMetadataError(f"Missing required metadata field: {e}")
        except (TypeError, IndexError) as e:
            raise VideoMetadataError(f"Invalid metadata structure: {e}")

    @classmethod
    def from_video(cls, video: Video) -> VideoMetadata:
        """Creates VideoMetadata object from Video instance."""
        frame_count, height, width, _ = video.frames.shape
        total_seconds = round(frame_count / video.fps, 4)

        return cls(height=height, width=width, fps=video.fps, frame_count=frame_count, total_seconds=total_seconds)

    def can_be_merged_with(self, other_format: VideoMetadata) -> bool:
        """Check if videos can be merged."""
        return (
            self.height == other_format.height
            and self.width == other_format.width
            and round(self.fps) == round(other_format.fps)
        )

    def with_duration(self, seconds: float) -> VideoMetadata:
        """Return new metadata with updated duration.

        Args:
            seconds: New duration in seconds.

        Returns:
            New VideoMetadata with updated duration and frame count.
        """
        return VideoMetadata(
            height=self.height,
            width=self.width,
            fps=self.fps,
            frame_count=round(self.fps * seconds),
            total_seconds=round(seconds, 4),
        )

    def with_dimensions(self, width: int, height: int) -> VideoMetadata:
        """Return new metadata with updated dimensions.

        Args:
            width: New width in pixels.
            height: New height in pixels.

        Returns:
            New VideoMetadata with updated dimensions.
        """
        return VideoMetadata(
            height=height,
            width=width,
            fps=self.fps,
            frame_count=self.frame_count,
            total_seconds=self.total_seconds,
        )

    def with_fps(self, fps: float) -> VideoMetadata:
        """Return new metadata with updated fps.

        Args:
            fps: New frames per second.

        Returns:
            New VideoMetadata with updated fps (duration stays same).
        """
        return VideoMetadata(
            height=self.height,
            width=self.width,
            fps=fps,
            frame_count=round(fps * self.total_seconds),
            total_seconds=self.total_seconds,
        )

    def can_be_downsampled_to(self, target_format: VideoMetadata) -> bool:
        """Checks if video can be downsampled to target_format."""
        return (
            self.height >= target_format.height
            and self.width >= target_format.width
            and round(self.fps) >= round(target_format.fps)
            and self.total_seconds >= target_format.total_seconds
        )

    # Fluent API for operation validation
    # These methods mirror the Video fluent API but only transform metadata

    def cut(self, start: float, end: float) -> VideoMetadata:
        """Predict metadata after cutting by time range.

        Args:
            start: Start time in seconds.
            end: End time in seconds.

        Returns:
            New VideoMetadata with updated duration.
        """
        if end <= start:
            raise ValueError(f"End time ({end}) must be greater than start time ({start})")
        if start < 0:
            raise ValueError(f"Start time ({start}) cannot be negative")
        if end > self.total_seconds:
            raise ValueError(f"End time ({end}) exceeds video duration ({self.total_seconds})")
        # Mirror CutSeconds.apply() semantics: convert times to frame indices using
        # round() before slicing so metadata validation matches runtime output.
        start_frame = round(start * self.fps)
        end_frame = round(end * self.fps)
        return self.cut_frames(start_frame, end_frame)

    def cut_frames(self, start: int, end: int) -> VideoMetadata:
        """Predict metadata after cutting by frame range.

        Args:
            start: Start frame index (inclusive).
            end: End frame index (exclusive).

        Returns:
            New VideoMetadata with updated duration.
        """
        if end <= start:
            raise ValueError(f"End frame ({end}) must be greater than start frame ({start})")
        if start < 0:
            raise ValueError(f"Start frame ({start}) cannot be negative")
        if end > self.frame_count:
            raise ValueError(f"End frame ({end}) exceeds frame count ({self.frame_count})")
        duration = round((end - start) / self.fps, 4)
        return self.with_duration(duration)

    def resize(
        self,
        width: int | None = None,
        height: int | None = None,
        round_to_even: bool = True,
    ) -> VideoMetadata:
        """Predict metadata after resizing.

        If only width or height is provided, the other dimension is calculated
        to preserve aspect ratio.

        Args:
            width: Target width in pixels.
            height: Target height in pixels.
            round_to_even: If True (default), snap output width/height to even numbers.

        Returns:
            New VideoMetadata with updated dimensions.
        """
        if width is None and height is None:
            raise ValueError("Must provide width or height")

        def _snap(value: int) -> int:
            return _round_dimension_to_even(value) if round_to_even else value

        if width and height:
            return self.with_dimensions(_snap(width), _snap(height))
        elif width:
            ratio = width / self.width
            new_height = round(self.height * ratio)
            return self.with_dimensions(_snap(width), _snap(new_height))
        else:  # height only
            ratio = height / self.height  # type: ignore[operator]
            new_width = round(self.width * ratio)
            return self.with_dimensions(_snap(new_width), _snap(height))  # type: ignore[arg-type]

    def crop(self, width: int, height: int) -> VideoMetadata:
        """Predict metadata after cropping.

        Args:
            width: Target width in pixels.
            height: Target height in pixels.

        Returns:
            New VideoMetadata with updated dimensions.
        """
        if width > self.width:
            raise ValueError(f"Crop width ({width}) exceeds video width ({self.width})")
        if height > self.height:
            raise ValueError(f"Crop height ({height}) exceeds video height ({self.height})")
        return self.with_dimensions(width, height)

    def resample_fps(self, fps: float) -> VideoMetadata:
        """Predict metadata after resampling frame rate.

        Args:
            fps: Target frames per second.

        Returns:
            New VideoMetadata with updated fps.
        """
        if fps <= 0:
            raise ValueError(f"FPS ({fps}) must be positive")
        return self.with_fps(fps)

    def speed_change(self, speed: float) -> VideoMetadata:
        """Predict metadata after speed change.

        Mirrors runtime frame-count semantics: int(frame_count / speed),
        matching SpeedChange.apply() behavior.

        Args:
            speed: Speed multiplier (e.g. 2.0 = double speed, 0.5 = half speed).

        Returns:
            New VideoMetadata with updated duration and frame count.
        """
        if speed <= 0:
            raise ValueError(f"Speed ({speed}) must be positive")
        new_frame_count = int(self.frame_count / speed)
        if new_frame_count == 0:
            raise ValueError(f"Speed {speed}x would result in 0 frames")
        new_seconds = round(new_frame_count / self.fps, 4)
        return VideoMetadata(
            height=self.height,
            width=self.width,
            fps=self.fps,
            frame_count=new_frame_count,
            total_seconds=new_seconds,
        )

    def reverse(self) -> VideoMetadata:
        """Predict metadata after reversing video. No metadata changes."""
        return VideoMetadata(
            height=self.height,
            width=self.width,
            fps=self.fps,
            frame_count=self.frame_count,
            total_seconds=self.total_seconds,
        )

    def freeze_frame(
        self,
        timestamp: float,
        duration: float = 2.0,
        position: str = "after",
    ) -> VideoMetadata:
        """Predict metadata after freeze frame insertion.

        Mirrors the frame-count logic in FreezeFrame.apply().

        Args:
            timestamp: Time in seconds to capture the frame from.
            duration: How long to hold the frozen frame in seconds.
            position: Where to insert: "before", "after", or "replace".
        """
        if timestamp < 0:
            raise ValueError(f"timestamp must be >= 0, got {timestamp}")
        if timestamp >= self.total_seconds:
            raise ValueError(f"timestamp ({timestamp}) must be less than video duration ({self.total_seconds})")
        if duration <= 0:
            raise ValueError(f"duration must be > 0, got {duration}")

        freeze_count = round(duration * self.fps)

        if position in ("after", "before"):
            new_frame_count = self.frame_count + freeze_count
        elif position == "replace":
            frame_idx = round(timestamp * self.fps)
            replace_end = min(frame_idx + freeze_count, self.frame_count)
            replaced = replace_end - frame_idx
            new_frame_count = self.frame_count - replaced + freeze_count
        else:
            raise ValueError(f"Invalid position: {position}")

        new_total_seconds = round(new_frame_count / self.fps, 4)
        return VideoMetadata(
            height=self.height,
            width=self.width,
            fps=self.fps,
            frame_count=new_frame_count,
            total_seconds=new_total_seconds,
        )

    def silence_removal(
        self,
        min_silence_duration: float = 1.0,
        padding: float = 0.15,
        mode: str = "cut",
        speed_factor: float = 3.0,
        transcription: Any = None,
    ) -> VideoMetadata:
        """Predict metadata after silence removal.

        Replicates the silence-gap detection logic from SilenceRemoval.apply()
        but only computes frame counts.

        Args:
            min_silence_duration: Only remove/speed-up gaps longer than this (seconds).
            padding: Seconds of silence to preserve around speech boundaries.
            mode: "cut" to hard-cut silence, "speed_up" to speed up silent sections.
            speed_factor: Speed multiplier for silent sections (only used with mode="speed_up").
            transcription: Transcription object with word-level timestamps.
        """
        if transcription is None or not hasattr(transcription, "words") or not transcription.words:
            return VideoMetadata(
                height=self.height,
                width=self.width,
                fps=self.fps,
                frame_count=self.frame_count,
                total_seconds=self.total_seconds,
            )

        # Build speech ranges from word timestamps (with padding)
        speech_ranges: list[tuple[float, float]] = []
        for word in transcription.words:
            start = max(0, word.start - padding)
            end = min(self.total_seconds, word.end + padding)
            if speech_ranges and start <= speech_ranges[-1][1]:
                speech_ranges[-1] = (speech_ranges[-1][0], max(speech_ranges[-1][1], end))
            else:
                speech_ranges.append((start, end))

        # Identify silence gaps
        silence_ranges: list[tuple[float, float]] = []
        prev_end = 0.0
        for s_start, s_end in speech_ranges:
            if s_start - prev_end >= min_silence_duration:
                silence_ranges.append((prev_end, s_start))
            prev_end = s_end
        if self.total_seconds - prev_end >= min_silence_duration:
            silence_ranges.append((prev_end, self.total_seconds))

        if not silence_ranges:
            return VideoMetadata(
                height=self.height,
                width=self.width,
                fps=self.fps,
                frame_count=self.frame_count,
                total_seconds=self.total_seconds,
            )

        if mode == "cut":
            # Mirror _apply_cut keep-range logic exactly
            keep_frames = 0
            prev_frame = 0
            for s_start, s_end in silence_ranges:
                cut_start = round(s_start * self.fps)
                cut_end = round(s_end * self.fps)
                if cut_start > prev_frame:
                    keep_frames += cut_start - prev_frame
                prev_frame = cut_end
            if prev_frame < self.frame_count:
                keep_frames += self.frame_count - prev_frame
            new_frame_count = keep_frames
        elif mode == "speed_up":
            saved_frames = 0
            for s_start, s_end in silence_ranges:
                gap_frames = round((s_end - s_start) * self.fps)
                sped_up_frames = max(1, round(gap_frames / speed_factor))
                saved_frames += gap_frames - sped_up_frames
            new_frame_count = self.frame_count - saved_frames
        else:
            raise ValueError(f"Invalid mode: {mode}")

        new_frame_count = max(1, new_frame_count)
        new_total_seconds = round(new_frame_count / self.fps, 4)
        return VideoMetadata(
            height=self.height,
            width=self.width,
            fps=self.fps,
            frame_count=new_frame_count,
            total_seconds=new_total_seconds,
        )

    def crop_to_aspect_even(self, target_aspect: tuple[int, int] | list[int] = (9, 16)) -> VideoMetadata:
        """Predict metadata after aspect-ratio crop with even output dimensions.

        Mirrors the output dimension logic used by AI crop transforms like
        ``FaceTrackingCrop``.
        """
        if not isinstance(target_aspect, (tuple, list)) or len(target_aspect) != 2:
            raise ValueError("target_aspect must be a 2-item tuple/list of positive integers")

        try:
            aspect_w = int(target_aspect[0])
            aspect_h = int(target_aspect[1])
        except (TypeError, ValueError) as e:
            raise ValueError("target_aspect must contain numeric values") from e

        if aspect_w <= 0 or aspect_h <= 0:
            raise ValueError("target_aspect values must be positive")

        target_ratio = aspect_w / aspect_h
        frame_ratio = self.width / self.height

        def _make_even(value: int) -> int:
            return value - (value % 2)

        if target_ratio < frame_ratio:
            out_h = _make_even(self.height)
            out_w = _make_even(int(out_h * target_ratio))
        else:
            out_w = _make_even(self.width)
            out_h = _make_even(int(out_w / target_ratio))

        return self.with_dimensions(out_w, out_h)

    def transition_to(self, other: VideoMetadata, effect_time: float = 0.0) -> VideoMetadata:
        """Predict metadata after transition to another video.

        Args:
            other: Metadata of the video to transition to.
            effect_time: Duration of the transition effect in seconds.

        Returns:
            New VideoMetadata for the combined video.

        Raises:
            ValueError: If videos have incompatible dimensions or fps.
        """
        if not self.can_be_merged_with(other):
            raise ValueError(
                f"Cannot merge videos: {self.width}x{self.height}@{round(self.fps)}fps "
                f"vs {other.width}x{other.height}@{round(other.fps)}fps"
            )
        combined_duration = self.total_seconds + other.total_seconds - effect_time
        return self.with_duration(combined_duration)

get_frame_shape

get_frame_shape() -> np.ndarray

Returns frame shape.

Source code in src/videopython/base/video.py
def get_frame_shape(self) -> np.ndarray:
    """Returns frame shape."""
    return np.array((self.height, self.width, 3))

get_video_shape

get_video_shape() -> np.ndarray

Returns video shape.

Source code in src/videopython/base/video.py
def get_video_shape(self) -> np.ndarray:
    """Returns video shape."""
    return np.array((self.frame_count, self.height, self.width, 3))

from_path classmethod

from_path(video_path: str | Path) -> VideoMetadata

Creates VideoMetadata object from video file using ffprobe.

Source code in src/videopython/base/video.py
@classmethod
def from_path(cls, video_path: str | Path) -> VideoMetadata:
    """Creates VideoMetadata object from video file using ffprobe."""
    if not Path(video_path).exists():
        raise FileNotFoundError(f"Video file not found: {video_path}")

    probe_data = cls._run_ffprobe(video_path)

    try:
        stream_info = probe_data["streams"][0]

        width = int(stream_info["width"])
        height = int(stream_info["height"])

        try:
            fps_fraction = Fraction(stream_info["r_frame_rate"])
            fps = float(fps_fraction)
        except (ValueError, ZeroDivisionError):
            raise VideoMetadataError(f"Invalid frame rate: {stream_info['r_frame_rate']}")

        if "nb_frames" in stream_info and stream_info["nb_frames"].isdigit():
            frame_count = int(stream_info["nb_frames"])
        else:
            duration = float(probe_data["format"]["duration"])
            frame_count = int(round(duration * fps))

        total_seconds = round(frame_count / fps, 4)

        return cls(height=height, width=width, fps=fps, frame_count=frame_count, total_seconds=total_seconds)

    except KeyError as e:
        raise VideoMetadataError(f"Missing required metadata field: {e}")
    except (TypeError, IndexError) as e:
        raise VideoMetadataError(f"Invalid metadata structure: {e}")

from_video classmethod

from_video(video: Video) -> VideoMetadata

Creates VideoMetadata object from Video instance.

Source code in src/videopython/base/video.py
@classmethod
def from_video(cls, video: Video) -> VideoMetadata:
    """Creates VideoMetadata object from Video instance."""
    frame_count, height, width, _ = video.frames.shape
    total_seconds = round(frame_count / video.fps, 4)

    return cls(height=height, width=width, fps=video.fps, frame_count=frame_count, total_seconds=total_seconds)

can_be_merged_with

can_be_merged_with(other_format: VideoMetadata) -> bool

Check if videos can be merged.

Source code in src/videopython/base/video.py
def can_be_merged_with(self, other_format: VideoMetadata) -> bool:
    """Check if videos can be merged."""
    return (
        self.height == other_format.height
        and self.width == other_format.width
        and round(self.fps) == round(other_format.fps)
    )

with_duration

with_duration(seconds: float) -> VideoMetadata

Return new metadata with updated duration.

Parameters:

Name Type Description Default
seconds float

New duration in seconds.

required

Returns:

Type Description
VideoMetadata

New VideoMetadata with updated duration and frame count.

Source code in src/videopython/base/video.py
def with_duration(self, seconds: float) -> VideoMetadata:
    """Return new metadata with updated duration.

    Args:
        seconds: New duration in seconds.

    Returns:
        New VideoMetadata with updated duration and frame count.
    """
    return VideoMetadata(
        height=self.height,
        width=self.width,
        fps=self.fps,
        frame_count=round(self.fps * seconds),
        total_seconds=round(seconds, 4),
    )

with_dimensions

with_dimensions(width: int, height: int) -> VideoMetadata

Return new metadata with updated dimensions.

Parameters:

Name Type Description Default
width int

New width in pixels.

required
height int

New height in pixels.

required

Returns:

Type Description
VideoMetadata

New VideoMetadata with updated dimensions.

Source code in src/videopython/base/video.py
def with_dimensions(self, width: int, height: int) -> VideoMetadata:
    """Return new metadata with updated dimensions.

    Args:
        width: New width in pixels.
        height: New height in pixels.

    Returns:
        New VideoMetadata with updated dimensions.
    """
    return VideoMetadata(
        height=height,
        width=width,
        fps=self.fps,
        frame_count=self.frame_count,
        total_seconds=self.total_seconds,
    )

with_fps

with_fps(fps: float) -> VideoMetadata

Return new metadata with updated fps.

Parameters:

Name Type Description Default
fps float

New frames per second.

required

Returns:

Type Description
VideoMetadata

New VideoMetadata with updated fps (duration stays same).

Source code in src/videopython/base/video.py
def with_fps(self, fps: float) -> VideoMetadata:
    """Return new metadata with updated fps.

    Args:
        fps: New frames per second.

    Returns:
        New VideoMetadata with updated fps (duration stays same).
    """
    return VideoMetadata(
        height=self.height,
        width=self.width,
        fps=fps,
        frame_count=round(fps * self.total_seconds),
        total_seconds=self.total_seconds,
    )

can_be_downsampled_to

can_be_downsampled_to(target_format: VideoMetadata) -> bool

Checks if video can be downsampled to target_format.

Source code in src/videopython/base/video.py
def can_be_downsampled_to(self, target_format: VideoMetadata) -> bool:
    """Checks if video can be downsampled to target_format."""
    return (
        self.height >= target_format.height
        and self.width >= target_format.width
        and round(self.fps) >= round(target_format.fps)
        and self.total_seconds >= target_format.total_seconds
    )

cut

cut(start: float, end: float) -> VideoMetadata

Predict metadata after cutting by time range.

Parameters:

Name Type Description Default
start float

Start time in seconds.

required
end float

End time in seconds.

required

Returns:

Type Description
VideoMetadata

New VideoMetadata with updated duration.

Source code in src/videopython/base/video.py
def cut(self, start: float, end: float) -> VideoMetadata:
    """Predict metadata after cutting by time range.

    Args:
        start: Start time in seconds.
        end: End time in seconds.

    Returns:
        New VideoMetadata with updated duration.
    """
    if end <= start:
        raise ValueError(f"End time ({end}) must be greater than start time ({start})")
    if start < 0:
        raise ValueError(f"Start time ({start}) cannot be negative")
    if end > self.total_seconds:
        raise ValueError(f"End time ({end}) exceeds video duration ({self.total_seconds})")
    # Mirror CutSeconds.apply() semantics: convert times to frame indices using
    # round() before slicing so metadata validation matches runtime output.
    start_frame = round(start * self.fps)
    end_frame = round(end * self.fps)
    return self.cut_frames(start_frame, end_frame)

cut_frames

cut_frames(start: int, end: int) -> VideoMetadata

Predict metadata after cutting by frame range.

Parameters:

Name Type Description Default
start int

Start frame index (inclusive).

required
end int

End frame index (exclusive).

required

Returns:

Type Description
VideoMetadata

New VideoMetadata with updated duration.

Source code in src/videopython/base/video.py
def cut_frames(self, start: int, end: int) -> VideoMetadata:
    """Predict metadata after cutting by frame range.

    Args:
        start: Start frame index (inclusive).
        end: End frame index (exclusive).

    Returns:
        New VideoMetadata with updated duration.
    """
    if end <= start:
        raise ValueError(f"End frame ({end}) must be greater than start frame ({start})")
    if start < 0:
        raise ValueError(f"Start frame ({start}) cannot be negative")
    if end > self.frame_count:
        raise ValueError(f"End frame ({end}) exceeds frame count ({self.frame_count})")
    duration = round((end - start) / self.fps, 4)
    return self.with_duration(duration)

resize

resize(
    width: int | None = None,
    height: int | None = None,
    round_to_even: bool = True,
) -> VideoMetadata

Predict metadata after resizing.

If only width or height is provided, the other dimension is calculated to preserve aspect ratio.

Parameters:

Name Type Description Default
width int | None

Target width in pixels.

None
height int | None

Target height in pixels.

None
round_to_even bool

If True (default), snap output width/height to even numbers.

True

Returns:

Type Description
VideoMetadata

New VideoMetadata with updated dimensions.

Source code in src/videopython/base/video.py
def resize(
    self,
    width: int | None = None,
    height: int | None = None,
    round_to_even: bool = True,
) -> VideoMetadata:
    """Predict metadata after resizing.

    If only width or height is provided, the other dimension is calculated
    to preserve aspect ratio.

    Args:
        width: Target width in pixels.
        height: Target height in pixels.
        round_to_even: If True (default), snap output width/height to even numbers.

    Returns:
        New VideoMetadata with updated dimensions.
    """
    if width is None and height is None:
        raise ValueError("Must provide width or height")

    def _snap(value: int) -> int:
        return _round_dimension_to_even(value) if round_to_even else value

    if width and height:
        return self.with_dimensions(_snap(width), _snap(height))
    elif width:
        ratio = width / self.width
        new_height = round(self.height * ratio)
        return self.with_dimensions(_snap(width), _snap(new_height))
    else:  # height only
        ratio = height / self.height  # type: ignore[operator]
        new_width = round(self.width * ratio)
        return self.with_dimensions(_snap(new_width), _snap(height))  # type: ignore[arg-type]

crop

crop(width: int, height: int) -> VideoMetadata

Predict metadata after cropping.

Parameters:

Name Type Description Default
width int

Target width in pixels.

required
height int

Target height in pixels.

required

Returns:

Type Description
VideoMetadata

New VideoMetadata with updated dimensions.

Source code in src/videopython/base/video.py
def crop(self, width: int, height: int) -> VideoMetadata:
    """Predict metadata after cropping.

    Args:
        width: Target width in pixels.
        height: Target height in pixels.

    Returns:
        New VideoMetadata with updated dimensions.
    """
    if width > self.width:
        raise ValueError(f"Crop width ({width}) exceeds video width ({self.width})")
    if height > self.height:
        raise ValueError(f"Crop height ({height}) exceeds video height ({self.height})")
    return self.with_dimensions(width, height)

resample_fps

resample_fps(fps: float) -> VideoMetadata

Predict metadata after resampling frame rate.

Parameters:

Name Type Description Default
fps float

Target frames per second.

required

Returns:

Type Description
VideoMetadata

New VideoMetadata with updated fps.

Source code in src/videopython/base/video.py
def resample_fps(self, fps: float) -> VideoMetadata:
    """Predict metadata after resampling frame rate.

    Args:
        fps: Target frames per second.

    Returns:
        New VideoMetadata with updated fps.
    """
    if fps <= 0:
        raise ValueError(f"FPS ({fps}) must be positive")
    return self.with_fps(fps)

speed_change

speed_change(speed: float) -> VideoMetadata

Predict metadata after speed change.

Mirrors runtime frame-count semantics: int(frame_count / speed), matching SpeedChange.apply() behavior.

Parameters:

Name Type Description Default
speed float

Speed multiplier (e.g. 2.0 = double speed, 0.5 = half speed).

required

Returns:

Type Description
VideoMetadata

New VideoMetadata with updated duration and frame count.

Source code in src/videopython/base/video.py
def speed_change(self, speed: float) -> VideoMetadata:
    """Predict metadata after speed change.

    Mirrors runtime frame-count semantics: int(frame_count / speed),
    matching SpeedChange.apply() behavior.

    Args:
        speed: Speed multiplier (e.g. 2.0 = double speed, 0.5 = half speed).

    Returns:
        New VideoMetadata with updated duration and frame count.
    """
    if speed <= 0:
        raise ValueError(f"Speed ({speed}) must be positive")
    new_frame_count = int(self.frame_count / speed)
    if new_frame_count == 0:
        raise ValueError(f"Speed {speed}x would result in 0 frames")
    new_seconds = round(new_frame_count / self.fps, 4)
    return VideoMetadata(
        height=self.height,
        width=self.width,
        fps=self.fps,
        frame_count=new_frame_count,
        total_seconds=new_seconds,
    )

reverse

reverse() -> VideoMetadata

Predict metadata after reversing video. No metadata changes.

Source code in src/videopython/base/video.py
def reverse(self) -> VideoMetadata:
    """Predict metadata after reversing video. No metadata changes."""
    return VideoMetadata(
        height=self.height,
        width=self.width,
        fps=self.fps,
        frame_count=self.frame_count,
        total_seconds=self.total_seconds,
    )

freeze_frame

freeze_frame(
    timestamp: float,
    duration: float = 2.0,
    position: str = "after",
) -> VideoMetadata

Predict metadata after freeze frame insertion.

Mirrors the frame-count logic in FreezeFrame.apply().

Parameters:

Name Type Description Default
timestamp float

Time in seconds to capture the frame from.

required
duration float

How long to hold the frozen frame in seconds.

2.0
position str

Where to insert: "before", "after", or "replace".

'after'
Source code in src/videopython/base/video.py
def freeze_frame(
    self,
    timestamp: float,
    duration: float = 2.0,
    position: str = "after",
) -> VideoMetadata:
    """Predict metadata after freeze frame insertion.

    Mirrors the frame-count logic in FreezeFrame.apply().

    Args:
        timestamp: Time in seconds to capture the frame from.
        duration: How long to hold the frozen frame in seconds.
        position: Where to insert: "before", "after", or "replace".
    """
    if timestamp < 0:
        raise ValueError(f"timestamp must be >= 0, got {timestamp}")
    if timestamp >= self.total_seconds:
        raise ValueError(f"timestamp ({timestamp}) must be less than video duration ({self.total_seconds})")
    if duration <= 0:
        raise ValueError(f"duration must be > 0, got {duration}")

    freeze_count = round(duration * self.fps)

    if position in ("after", "before"):
        new_frame_count = self.frame_count + freeze_count
    elif position == "replace":
        frame_idx = round(timestamp * self.fps)
        replace_end = min(frame_idx + freeze_count, self.frame_count)
        replaced = replace_end - frame_idx
        new_frame_count = self.frame_count - replaced + freeze_count
    else:
        raise ValueError(f"Invalid position: {position}")

    new_total_seconds = round(new_frame_count / self.fps, 4)
    return VideoMetadata(
        height=self.height,
        width=self.width,
        fps=self.fps,
        frame_count=new_frame_count,
        total_seconds=new_total_seconds,
    )

silence_removal

silence_removal(
    min_silence_duration: float = 1.0,
    padding: float = 0.15,
    mode: str = "cut",
    speed_factor: float = 3.0,
    transcription: Any = None,
) -> VideoMetadata

Predict metadata after silence removal.

Replicates the silence-gap detection logic from SilenceRemoval.apply() but only computes frame counts.

Parameters:

Name Type Description Default
min_silence_duration float

Only remove/speed-up gaps longer than this (seconds).

1.0
padding float

Seconds of silence to preserve around speech boundaries.

0.15
mode str

"cut" to hard-cut silence, "speed_up" to speed up silent sections.

'cut'
speed_factor float

Speed multiplier for silent sections (only used with mode="speed_up").

3.0
transcription Any

Transcription object with word-level timestamps.

None
Source code in src/videopython/base/video.py
def silence_removal(
    self,
    min_silence_duration: float = 1.0,
    padding: float = 0.15,
    mode: str = "cut",
    speed_factor: float = 3.0,
    transcription: Any = None,
) -> VideoMetadata:
    """Predict metadata after silence removal.

    Replicates the silence-gap detection logic from SilenceRemoval.apply()
    but only computes frame counts.

    Args:
        min_silence_duration: Only remove/speed-up gaps longer than this (seconds).
        padding: Seconds of silence to preserve around speech boundaries.
        mode: "cut" to hard-cut silence, "speed_up" to speed up silent sections.
        speed_factor: Speed multiplier for silent sections (only used with mode="speed_up").
        transcription: Transcription object with word-level timestamps.
    """
    if transcription is None or not hasattr(transcription, "words") or not transcription.words:
        return VideoMetadata(
            height=self.height,
            width=self.width,
            fps=self.fps,
            frame_count=self.frame_count,
            total_seconds=self.total_seconds,
        )

    # Build speech ranges from word timestamps (with padding)
    speech_ranges: list[tuple[float, float]] = []
    for word in transcription.words:
        start = max(0, word.start - padding)
        end = min(self.total_seconds, word.end + padding)
        if speech_ranges and start <= speech_ranges[-1][1]:
            speech_ranges[-1] = (speech_ranges[-1][0], max(speech_ranges[-1][1], end))
        else:
            speech_ranges.append((start, end))

    # Identify silence gaps
    silence_ranges: list[tuple[float, float]] = []
    prev_end = 0.0
    for s_start, s_end in speech_ranges:
        if s_start - prev_end >= min_silence_duration:
            silence_ranges.append((prev_end, s_start))
        prev_end = s_end
    if self.total_seconds - prev_end >= min_silence_duration:
        silence_ranges.append((prev_end, self.total_seconds))

    if not silence_ranges:
        return VideoMetadata(
            height=self.height,
            width=self.width,
            fps=self.fps,
            frame_count=self.frame_count,
            total_seconds=self.total_seconds,
        )

    if mode == "cut":
        # Mirror _apply_cut keep-range logic exactly
        keep_frames = 0
        prev_frame = 0
        for s_start, s_end in silence_ranges:
            cut_start = round(s_start * self.fps)
            cut_end = round(s_end * self.fps)
            if cut_start > prev_frame:
                keep_frames += cut_start - prev_frame
            prev_frame = cut_end
        if prev_frame < self.frame_count:
            keep_frames += self.frame_count - prev_frame
        new_frame_count = keep_frames
    elif mode == "speed_up":
        saved_frames = 0
        for s_start, s_end in silence_ranges:
            gap_frames = round((s_end - s_start) * self.fps)
            sped_up_frames = max(1, round(gap_frames / speed_factor))
            saved_frames += gap_frames - sped_up_frames
        new_frame_count = self.frame_count - saved_frames
    else:
        raise ValueError(f"Invalid mode: {mode}")

    new_frame_count = max(1, new_frame_count)
    new_total_seconds = round(new_frame_count / self.fps, 4)
    return VideoMetadata(
        height=self.height,
        width=self.width,
        fps=self.fps,
        frame_count=new_frame_count,
        total_seconds=new_total_seconds,
    )

crop_to_aspect_even

crop_to_aspect_even(
    target_aspect: tuple[int, int] | list[int] = (9, 16),
) -> VideoMetadata

Predict metadata after aspect-ratio crop with even output dimensions.

Mirrors the output dimension logic used by AI crop transforms like FaceTrackingCrop.

Source code in src/videopython/base/video.py
def crop_to_aspect_even(self, target_aspect: tuple[int, int] | list[int] = (9, 16)) -> VideoMetadata:
    """Predict metadata after aspect-ratio crop with even output dimensions.

    Mirrors the output dimension logic used by AI crop transforms like
    ``FaceTrackingCrop``.
    """
    if not isinstance(target_aspect, (tuple, list)) or len(target_aspect) != 2:
        raise ValueError("target_aspect must be a 2-item tuple/list of positive integers")

    try:
        aspect_w = int(target_aspect[0])
        aspect_h = int(target_aspect[1])
    except (TypeError, ValueError) as e:
        raise ValueError("target_aspect must contain numeric values") from e

    if aspect_w <= 0 or aspect_h <= 0:
        raise ValueError("target_aspect values must be positive")

    target_ratio = aspect_w / aspect_h
    frame_ratio = self.width / self.height

    def _make_even(value: int) -> int:
        return value - (value % 2)

    if target_ratio < frame_ratio:
        out_h = _make_even(self.height)
        out_w = _make_even(int(out_h * target_ratio))
    else:
        out_w = _make_even(self.width)
        out_h = _make_even(int(out_w / target_ratio))

    return self.with_dimensions(out_w, out_h)

transition_to

transition_to(
    other: VideoMetadata, effect_time: float = 0.0
) -> VideoMetadata

Predict metadata after transition to another video.

Parameters:

Name Type Description Default
other VideoMetadata

Metadata of the video to transition to.

required
effect_time float

Duration of the transition effect in seconds.

0.0

Returns:

Type Description
VideoMetadata

New VideoMetadata for the combined video.

Raises:

Type Description
ValueError

If videos have incompatible dimensions or fps.

Source code in src/videopython/base/video.py
def transition_to(self, other: VideoMetadata, effect_time: float = 0.0) -> VideoMetadata:
    """Predict metadata after transition to another video.

    Args:
        other: Metadata of the video to transition to.
        effect_time: Duration of the transition effect in seconds.

    Returns:
        New VideoMetadata for the combined video.

    Raises:
        ValueError: If videos have incompatible dimensions or fps.
    """
    if not self.can_be_merged_with(other):
        raise ValueError(
            f"Cannot merge videos: {self.width}x{self.height}@{round(self.fps)}fps "
            f"vs {other.width}x{other.height}@{round(other.fps)}fps"
        )
    combined_duration = self.total_seconds + other.total_seconds - effect_time
    return self.with_duration(combined_duration)

FrameIterator

Memory-efficient frame iterator for streaming video frames without loading the entire video into memory. Useful for processing very long videos.

from videopython.base import FrameIterator

# Stream frames one at a time - O(1) memory usage
with FrameIterator("long_video.mp4") as frames:
    for frame_idx, frame in frames:
        # frame is a numpy array (H, W, 3) in RGB format
        process_frame(frame)

# With time bounds
with FrameIterator("video.mp4", start_second=10.0, end_second=60.0) as frames:
    for frame_idx, frame in frames:
        process_frame(frame)

FrameIterator

Memory-efficient frame iterator using ffmpeg streaming.

Yields frames one at a time, keeping memory usage constant regardless of video length. Supports context manager protocol for resource cleanup.

This is useful for operations that only need to process frames sequentially, such as scene detection, without loading the entire video into memory.

Example

with FrameIterator("video.mp4") as frames: ... for idx, frame in frames: ... process(frame)

Source code in src/videopython/base/video.py
class FrameIterator:
    """Memory-efficient frame iterator using ffmpeg streaming.

    Yields frames one at a time, keeping memory usage constant regardless
    of video length. Supports context manager protocol for resource cleanup.

    This is useful for operations that only need to process frames sequentially,
    such as scene detection, without loading the entire video into memory.

    Example:
        >>> with FrameIterator("video.mp4") as frames:
        ...     for idx, frame in frames:
        ...         process(frame)
    """

    def __init__(
        self,
        path: str | Path,
        start_second: float | None = None,
        end_second: float | None = None,
        vf_filters: list[str] | None = None,
        output_fps: float | None = None,
        output_width: int | None = None,
        output_height: int | None = None,
    ):
        """Initialize the frame iterator.

        Args:
            path: Path to video file
            start_second: Optional start time in seconds (seek before reading)
            end_second: Optional end time in seconds (stop reading after this)
            vf_filters: Optional list of ffmpeg -vf filter expressions to apply
                during decode (e.g. ``["scale=1280:720", "fps=30"]``).
            output_fps: Override output fps (adds fps filter if not in vf_filters).
            output_width: Override output width for frame size calculation.
            output_height: Override output height for frame size calculation.
        """
        self.path = Path(path)
        if not self.path.exists():
            raise FileNotFoundError(f"Video file not found: {path}")

        self.metadata = VideoMetadata.from_path(path)
        self.start_second = start_second if start_second is not None else 0.0
        self.end_second = end_second
        self._process: subprocess.Popen | None = None

        # Build -vf filter chain
        self._vf_filters = list(vf_filters) if vf_filters else []
        if output_fps is not None and not any(f.startswith("fps=") for f in self._vf_filters):
            self._vf_filters.append(f"fps={output_fps}")

        # Output dimensions (after filters)
        self.output_width = output_width or self.metadata.width
        self.output_height = output_height or self.metadata.height
        self.output_fps = output_fps or self.metadata.fps
        self._frame_size = self.output_width * self.output_height * 3

    def _build_ffmpeg_command(self) -> list[str]:
        """Build ffmpeg command for frame streaming."""
        cmd = ["ffmpeg", "-hide_banner", "-loglevel", "error"]

        if self.start_second > 0:
            cmd.extend(["-ss", str(self.start_second)])

        cmd.extend(["-i", str(self.path)])

        if self.end_second is not None:
            duration = self.end_second - self.start_second
            cmd.extend(["-t", str(duration)])

        if self._vf_filters:
            cmd.extend(["-vf", ",".join(self._vf_filters)])

        cmd.extend(
            [
                "-f",
                "rawvideo",
                "-pix_fmt",
                "rgb24",
                "-vcodec",
                "rawvideo",
                "-y",
                "pipe:1",
            ]
        )
        return cmd

    def __iter__(self) -> Generator[tuple[int, np.ndarray], None, None]:
        """Yield (frame_index, frame) tuples.

        Frame indices are absolute indices in the original video,
        accounting for any start_second offset.
        """
        cmd = self._build_ffmpeg_command()

        self._process = subprocess.Popen(
            cmd,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            bufsize=self._frame_size * 2,
        )

        # Calculate starting frame index based on start_second
        start_frame = int(self.start_second * self.output_fps)
        frame_idx = start_frame

        try:
            while True:
                raw_frame = self._process.stdout.read(self._frame_size)  # type: ignore
                if len(raw_frame) != self._frame_size:
                    break

                frame = np.frombuffer(raw_frame, dtype=np.uint8).copy()
                frame = frame.reshape(self.output_height, self.output_width, 3)

                yield frame_idx, frame
                frame_idx += 1
        finally:
            self._cleanup()

    def _cleanup(self) -> None:
        """Clean up ffmpeg process."""
        if self._process is not None:
            if self._process.poll() is None:
                self._process.terminate()
                try:
                    self._process.wait(timeout=5)
                except subprocess.TimeoutExpired:
                    self._process.kill()
                    self._process.wait()
            if self._process.stdout:
                self._process.stdout.close()
            self._process = None

    def __enter__(self) -> "FrameIterator":
        return self

    def __exit__(self, *args: object) -> None:
        self._cleanup()

__init__

__init__(
    path: str | Path,
    start_second: float | None = None,
    end_second: float | None = None,
    vf_filters: list[str] | None = None,
    output_fps: float | None = None,
    output_width: int | None = None,
    output_height: int | None = None,
)

Initialize the frame iterator.

Parameters:

Name Type Description Default
path str | Path

Path to video file

required
start_second float | None

Optional start time in seconds (seek before reading)

None
end_second float | None

Optional end time in seconds (stop reading after this)

None
vf_filters list[str] | None

Optional list of ffmpeg -vf filter expressions to apply during decode (e.g. ["scale=1280:720", "fps=30"]).

None
output_fps float | None

Override output fps (adds fps filter if not in vf_filters).

None
output_width int | None

Override output width for frame size calculation.

None
output_height int | None

Override output height for frame size calculation.

None
Source code in src/videopython/base/video.py
def __init__(
    self,
    path: str | Path,
    start_second: float | None = None,
    end_second: float | None = None,
    vf_filters: list[str] | None = None,
    output_fps: float | None = None,
    output_width: int | None = None,
    output_height: int | None = None,
):
    """Initialize the frame iterator.

    Args:
        path: Path to video file
        start_second: Optional start time in seconds (seek before reading)
        end_second: Optional end time in seconds (stop reading after this)
        vf_filters: Optional list of ffmpeg -vf filter expressions to apply
            during decode (e.g. ``["scale=1280:720", "fps=30"]``).
        output_fps: Override output fps (adds fps filter if not in vf_filters).
        output_width: Override output width for frame size calculation.
        output_height: Override output height for frame size calculation.
    """
    self.path = Path(path)
    if not self.path.exists():
        raise FileNotFoundError(f"Video file not found: {path}")

    self.metadata = VideoMetadata.from_path(path)
    self.start_second = start_second if start_second is not None else 0.0
    self.end_second = end_second
    self._process: subprocess.Popen | None = None

    # Build -vf filter chain
    self._vf_filters = list(vf_filters) if vf_filters else []
    if output_fps is not None and not any(f.startswith("fps=") for f in self._vf_filters):
        self._vf_filters.append(f"fps={output_fps}")

    # Output dimensions (after filters)
    self.output_width = output_width or self.metadata.width
    self.output_height = output_height or self.metadata.height
    self.output_fps = output_fps or self.metadata.fps
    self._frame_size = self.output_width * self.output_height * 3

__iter__

__iter__() -> Generator[tuple[int, np.ndarray], None, None]

Yield (frame_index, frame) tuples.

Frame indices are absolute indices in the original video, accounting for any start_second offset.

Source code in src/videopython/base/video.py
def __iter__(self) -> Generator[tuple[int, np.ndarray], None, None]:
    """Yield (frame_index, frame) tuples.

    Frame indices are absolute indices in the original video,
    accounting for any start_second offset.
    """
    cmd = self._build_ffmpeg_command()

    self._process = subprocess.Popen(
        cmd,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        bufsize=self._frame_size * 2,
    )

    # Calculate starting frame index based on start_second
    start_frame = int(self.start_second * self.output_fps)
    frame_idx = start_frame

    try:
        while True:
            raw_frame = self._process.stdout.read(self._frame_size)  # type: ignore
            if len(raw_frame) != self._frame_size:
                break

            frame = np.frombuffer(raw_frame, dtype=np.uint8).copy()
            frame = frame.reshape(self.output_height, self.output_width, 3)

            yield frame_idx, frame
            frame_idx += 1
    finally:
        self._cleanup()