Skip to content

Video

The Video class is the core data structure in videopython.

Video

Video

Source code in src/videopython/base/video.py
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
class Video:
    def __init__(self, frames: np.ndarray, fps: int | float, audio: Audio | None = None):
        self.frames = frames
        self.fps = fps
        if audio:
            self.audio = audio
        else:
            self.audio = Audio.create_silent(
                duration_seconds=round(self.total_seconds, 2), stereo=True, sample_rate=44100
            )

    @classmethod
    def from_path(
        cls, path: str, read_batch_size: int = 100, start_second: float | None = None, end_second: float | None = None
    ) -> Video:
        try:
            # Get video metadata using VideoMetadata.from_path
            metadata = VideoMetadata.from_path(path)

            width = metadata.width
            height = metadata.height
            fps = metadata.fps
            total_duration = metadata.total_seconds

            # Validate time bounds
            if start_second is not None and start_second < 0:
                raise ValueError("start_second must be non-negative")
            if end_second is not None and end_second > total_duration:
                raise ValueError(f"end_second ({end_second}) exceeds video duration ({total_duration})")
            if start_second is not None and end_second is not None and start_second >= end_second:
                raise ValueError("start_second must be less than end_second")

            # Estimate memory usage and warn for large videos
            segment_duration = total_duration
            if start_second is not None and end_second is not None:
                segment_duration = end_second - start_second
            elif end_second is not None:
                segment_duration = end_second
            elif start_second is not None:
                segment_duration = total_duration - start_second

            estimated_frames = int(segment_duration * fps)
            estimated_bytes = estimated_frames * height * width * 3
            estimated_gb = estimated_bytes / (1024**3)
            if estimated_gb > 10:
                warnings.warn(
                    f"Loading this video will use ~{estimated_gb:.1f}GB of RAM. "
                    f"For large videos, consider using FrameIterator for memory-efficient streaming.",
                    ResourceWarning,
                    stacklevel=2,
                )

            # Build FFmpeg command with improved segment handling
            ffmpeg_cmd = ["ffmpeg"]

            # Add seek option BEFORE input for more efficient seeking
            if start_second is not None:
                ffmpeg_cmd.extend(["-ss", str(start_second)])

            ffmpeg_cmd.extend(["-i", path])

            # Add duration AFTER input for more precise timing
            if end_second is not None and start_second is not None:
                duration = end_second - start_second
                ffmpeg_cmd.extend(["-t", str(duration)])
            elif end_second is not None:
                ffmpeg_cmd.extend(["-t", str(end_second)])

            # Output format settings - removed problematic -vsync 0
            ffmpeg_cmd.extend(
                [
                    "-f",
                    "rawvideo",
                    "-pix_fmt",
                    "rgb24",
                    "-vcodec",
                    "rawvideo",
                    "-avoid_negative_ts",
                    "make_zero",  # Handle timing issues
                    "-y",
                    "pipe:1",
                ]
            )

            # Start FFmpeg process with stderr redirected to avoid deadlock
            process = subprocess.Popen(
                ffmpeg_cmd,
                stdout=subprocess.PIPE,
                stderr=subprocess.DEVNULL,  # Redirect stderr to avoid deadlock
                bufsize=10**8,  # Use large buffer for efficient I/O
            )

            # Calculate frame size in bytes
            frame_size = width * height * 3  # 3 bytes per pixel for RGB

            # Estimate frame count for pre-allocation
            if start_second is not None and end_second is not None:
                estimated_duration = end_second - start_second
            elif end_second is not None:
                estimated_duration = end_second
            elif start_second is not None:
                estimated_duration = total_duration - start_second
            else:
                estimated_duration = total_duration

            # Add buffer to handle frame rate variations and rounding
            estimated_frames = int(estimated_duration * fps * FRAME_BUFFER_MULTIPLIER) + FRAME_BUFFER_PADDING

            # Pre-allocate numpy array
            frames = np.empty((estimated_frames, height, width, 3), dtype=np.uint8)
            frames_read = 0

            try:
                while frames_read < estimated_frames:
                    # Calculate remaining frames to read
                    remaining_frames = estimated_frames - frames_read
                    batch_size = min(read_batch_size, remaining_frames)

                    # Read batch of data
                    batch_data = process.stdout.read(frame_size * batch_size)  # type: ignore

                    if not batch_data:
                        break

                    # Convert to numpy array
                    batch_frames = np.frombuffer(batch_data, dtype=np.uint8)

                    # Calculate how many complete frames we got
                    complete_frames = len(batch_frames) // (height * width * 3)

                    if complete_frames == 0:
                        break

                    # Only keep complete frames
                    complete_data = batch_frames[: complete_frames * height * width * 3]
                    batch_frames_array = complete_data.reshape(complete_frames, height, width, 3)

                    # Check if we have room in pre-allocated array
                    if frames_read + complete_frames > estimated_frames:
                        # Need to expand array - this should be rare with our buffer
                        new_size = max(estimated_frames * 2, frames_read + complete_frames + 100)
                        new_frames = np.empty((new_size, height, width, 3), dtype=np.uint8)
                        new_frames[:frames_read] = frames[:frames_read]
                        frames = new_frames
                        estimated_frames = new_size

                    # Store batch in pre-allocated array
                    end_idx = frames_read + complete_frames
                    frames[frames_read:end_idx] = batch_frames_array
                    frames_read += complete_frames

            finally:
                # Ensure process is properly terminated
                if process.poll() is None:
                    process.terminate()
                    try:
                        process.wait(timeout=5)
                    except subprocess.TimeoutExpired:
                        process.kill()
                        process.wait()

                # Clean up pipes
                if process.stdout:
                    process.stdout.close()

            # Check if FFmpeg had an error (non-zero return code)
            if process.returncode not in (0, None) and frames_read == 0:
                raise ValueError(f"FFmpeg failed to process video (return code: {process.returncode})")

            if frames_read == 0:
                raise ValueError("No frames were read from the video")

            # Trim the pre-allocated array to actual frames read
            frames = frames[:frames_read]  # type: ignore

            # Load audio for the specified segment
            try:
                audio = Audio.from_path(path)
                # Slice audio to match the video segment
                if start_second is not None or end_second is not None:
                    audio_start = start_second if start_second is not None else 0
                    audio_end = end_second if end_second is not None else audio.metadata.duration_seconds
                    audio = audio.slice(start_seconds=audio_start, end_seconds=audio_end)
            except (AudioLoadError, FileNotFoundError, subprocess.CalledProcessError):
                warnings.warn(f"No audio found for `{path}`, adding silent track.")
                # Create silent audio based on actual frames read
                segment_duration = frames_read / fps
                audio = Audio.create_silent(duration_seconds=round(segment_duration, 2), stereo=True, sample_rate=44100)

            return cls(frames=frames, fps=fps, audio=audio)

        except VideoMetadataError:
            raise
        except subprocess.CalledProcessError as e:
            raise VideoLoadError(f"FFmpeg failed: {e}")
        except (OSError, IOError) as e:
            raise VideoLoadError(f"I/O error: {e}")

    @classmethod
    def from_frames(cls, frames: np.ndarray, fps: float) -> Video:
        if frames.ndim != 4:
            raise ValueError(f"Unsupported number of dimensions: {frames.shape}!")
        elif frames.shape[-1] == 4:
            frames = frames[:, :, :, :3]
        elif frames.shape[-1] != 3:
            raise ValueError(f"Unsupported number of dimensions: {frames.shape}!")
        return cls(frames=frames, fps=fps)

    @classmethod
    def from_image(cls, image: np.ndarray, fps: float = 24.0, length_seconds: float = 1.0) -> Video:
        if len(image.shape) == 3:
            image = np.expand_dims(image, axis=0)
        frames = np.repeat(image, round(length_seconds * fps), axis=0)
        return cls(frames=frames, fps=fps)

    def copy(self) -> Video:
        copied = Video.from_frames(self.frames.copy(), self.fps)
        copied.audio = self.audio  # Audio objects are immutable, no need to copy
        return copied

    def is_loaded(self) -> bool:
        return self.fps is not None and self.frames is not None and self.audio is not None

    def split(self, frame_index: int | None = None) -> tuple[Video, Video]:
        if frame_index:
            if not (0 <= frame_index <= len(self.frames)):
                raise ValueError(f"frame_idx must be between 0 and {len(self.frames)}, got {frame_index}")
        else:
            frame_index = len(self.frames) // 2

        split_videos = (
            self.from_frames(self.frames[:frame_index], self.fps),
            self.from_frames(self.frames[frame_index:], self.fps),
        )

        # Split audio at the corresponding time point
        split_time = frame_index / self.fps
        split_videos[0].audio = self.audio.slice(start_seconds=0, end_seconds=split_time)
        split_videos[1].audio = self.audio.slice(start_seconds=split_time)

        return split_videos

    def save(
        self,
        filename: str | Path | None = None,
        format: ALLOWED_VIDEO_FORMATS = "mp4",
        preset: ALLOWED_VIDEO_PRESETS = "medium",
        crf: int = 23,
    ) -> Path:
        """Save video to file.

        Args:
            filename: Output filename. If None, generates random name
            format: Output format (mp4, avi, mov, mkv, webm)
            preset: Encoding speed/compression tradeoff. Slower presets give smaller
                files at the same quality. Options from fastest to smallest:
                ultrafast, superfast, veryfast, faster, fast, medium, slow, slower, veryslow
            crf: Constant Rate Factor (0-51). Lower = better quality, larger file.
                Default 23 is visually lossless for most content. Range 18-28 recommended.

        Returns:
            Path to saved video file

        Raises:
            RuntimeError: If video is not loaded
            ValueError: If format or preset is not supported
        """
        if not self.is_loaded():
            raise RuntimeError("Video is not loaded, cannot save!")

        if format.lower() not in get_args(ALLOWED_VIDEO_FORMATS):
            raise ValueError(
                f"Unsupported format: {format}. Allowed formats are: {', '.join(get_args(ALLOWED_VIDEO_FORMATS))}"
            )

        if preset not in get_args(ALLOWED_VIDEO_PRESETS):
            raise ValueError(
                f"Unsupported preset: {preset}. Allowed presets are: {', '.join(get_args(ALLOWED_VIDEO_PRESETS))}"
            )

        frame_height, frame_width = self.frame_shape[:2]
        if frame_width % 2 != 0 or frame_height % 2 != 0:
            raise ValueError(
                "Current save pipeline uses libx264 with yuv420p, which requires even frame dimensions. "
                f"Got {frame_width}x{frame_height}. "
                "Resize, crop, or pad to an even width and height before saving."
            )

        if filename is None:
            filename = Path(generate_random_name(suffix=f".{format}"))
        else:
            filename = Path(filename).with_suffix(f".{format}")
            filename.parent.mkdir(parents=True, exist_ok=True)

        # Save audio to temporary WAV file
        with tempfile.NamedTemporaryFile(suffix=".wav") as temp_audio:
            self.audio.save(temp_audio.name, format="wav")

            # Calculate exact duration
            duration = len(self.frames) / self.fps

            # Construct FFmpeg command (stream raw video via stdin)
            ffmpeg_command = [
                "ffmpeg",
                "-y",
                "-hide_banner",
                "-loglevel",
                "error",
                # Raw video input settings
                "-f",
                "rawvideo",
                "-pixel_format",
                "rgb24",
                "-video_size",
                f"{self.frame_shape[1]}x{self.frame_shape[0]}",
                "-framerate",
                str(self.fps),
                "-i",
                "pipe:0",
                # Audio input
                "-i",
                temp_audio.name,
                # Video encoding settings
                "-c:v",
                "libx264",
                "-preset",
                preset,
                "-crf",
                str(crf),
                # Audio settings
                "-c:a",
                "aac",
                "-b:a",
                "192k",
                # Output settings
                "-pix_fmt",
                "yuv420p",
                "-movflags",
                "+faststart",  # Enable fast start for web playback
                "-t",
                str(duration),
                "-vsync",
                "cfr",
                str(filename),
            ]

            process = subprocess.Popen(
                ffmpeg_command,
                stdin=subprocess.PIPE,
                stdout=subprocess.DEVNULL,
                stderr=subprocess.PIPE,
            )

            try:
                if process.stdin is None:
                    raise RuntimeError("Failed to open FFmpeg stdin pipe for video data")

                frames = self.frames
                if frames.dtype != np.uint8 or not frames.flags["C_CONTIGUOUS"]:
                    frames = np.ascontiguousarray(frames, dtype=np.uint8)

                buffer = memoryview(frames)
                try:
                    process.stdin.write(buffer)
                    process.stdin.close()
                except BrokenPipeError as e:
                    stderr = process.stderr.read() if process.stderr is not None else b""
                    returncode = process.wait()
                    raise RuntimeError(
                        f"FFmpeg terminated while receiving video data (code {returncode}): "
                        f"{stderr.decode(errors='ignore')}"
                    ) from e

                stderr = process.stderr.read() if process.stderr is not None else b""
                returncode = process.wait()

                if returncode != 0:
                    raise RuntimeError(f"FFmpeg failed with code {returncode}: {stderr.decode(errors='ignore')}")

                return filename
            finally:
                if process.poll() is None:
                    process.kill()

    def add_audio(self, audio: Audio, overlay: bool = True) -> Video:
        """Add audio to video, returning a new Video instance.

        Args:
            audio: Audio to add
            overlay: If True, overlay on existing audio; if False, replace it

        Returns:
            New Video with the audio added
        """
        video_duration = self.total_seconds
        audio_duration = audio.metadata.duration_seconds

        if audio_duration > video_duration:
            audio = audio.slice(start_seconds=0, end_seconds=video_duration)
        elif audio_duration < video_duration:
            silence_duration = video_duration - audio_duration
            silence = Audio.create_silent(
                duration_seconds=silence_duration,
                stereo=audio.metadata.channels == 2,
                sample_rate=audio.metadata.sample_rate,
            )
            audio = audio.concat(silence)

        new_video = self.copy()
        if new_video.audio.is_silent:
            new_video.audio = audio
        elif overlay:
            new_video.audio = new_video.audio.overlay(audio, position=0.0)
        else:
            new_video.audio = audio
        return new_video

    def add_audio_from_file(self, path: str, overlay: bool = True) -> Video:
        """Add audio from file, returning a new Video instance.

        Args:
            path: Path to audio file
            overlay: If True, overlay on existing audio; if False, replace it

        Returns:
            New Video with the audio added

        Raises:
            AudioLoadError: If audio file cannot be loaded
            FileNotFoundError: If audio file does not exist
        """
        new_audio = Audio.from_path(path)
        return self.add_audio(new_audio, overlay)

    def __add__(self, other: Video) -> Video:
        if self.fps != other.fps:
            raise ValueError("FPS of videos do not match!")
        elif self.frame_shape != other.frame_shape:
            raise ValueError(f"Resolutions do not match: {self.frame_shape} vs {other.frame_shape}")
        new_video = self.from_frames(np.r_["0,2", self.frames, other.frames], fps=self.fps)
        new_video.audio = self.audio.concat(other.audio)
        return new_video

    def __str__(self) -> str:
        return str(self.metadata)

    def __getitem__(self, val: slice) -> Video:
        if not isinstance(val, slice):
            raise ValueError("Only slices are supported for video indexing!")

        # Sub-slice video frames
        sliced = self.from_frames(self.frames[val], fps=self.fps)

        # Handle slicing bounds for audio
        start = val.start if val.start else 0
        stop = val.stop if val.stop else len(self.frames)
        if start < 0:
            start = len(self.frames) + start
        if stop < 0:
            stop = len(self.frames) + stop

        # Slice audio to match video duration
        audio_start = start / self.fps
        audio_end = stop / self.fps
        sliced.audio = self.audio.slice(start_seconds=audio_start, end_seconds=audio_end)
        return sliced

    @property
    def video_shape(self) -> tuple[int, int, int, int]:
        return self.frames.shape

    @property
    def frame_shape(self) -> tuple[int, int, int]:
        return self.frames.shape[1:]

    @property
    def total_seconds(self) -> float:
        return round(self.frames.shape[0] / self.fps, 4)

    @property
    def metadata(self) -> VideoMetadata:
        return VideoMetadata.from_video(self)

    # Fluent API for video transformations
    # These methods mirror the VideoMetadata fluent API

    def cut(self, start: float, end: float) -> Video:
        """Cut video to a time range.

        Args:
            start: Start time in seconds.
            end: End time in seconds.

        Returns:
            New Video with the specified time range.
        """
        from videopython.base.transforms import CutSeconds

        return CutSeconds(start, end).apply(self)

    def cut_frames(self, start: int, end: int) -> Video:
        """Cut video to a frame range.

        Args:
            start: Start frame index (inclusive).
            end: End frame index (exclusive).

        Returns:
            New Video with the specified frame range.
        """
        from videopython.base.transforms import CutFrames

        return CutFrames(start, end).apply(self)

    def resize(
        self,
        width: int | None = None,
        height: int | None = None,
        round_to_even: bool = True,
    ) -> Video:
        """Resize video.

        If only width or height is provided, the other dimension is calculated
        to preserve aspect ratio.

        Args:
            width: Target width in pixels.
            height: Target height in pixels.
            round_to_even: If True (default), snap output width/height to even numbers.

        Returns:
            New Video with the specified dimensions.
        """
        from videopython.base.transforms import Resize

        return Resize(width=width, height=height, round_to_even=round_to_even).apply(self)

    def crop(self, width: int, height: int) -> Video:
        """Crop video to specified dimensions (center crop).

        Args:
            width: Target width in pixels.
            height: Target height in pixels.

        Returns:
            New Video with the specified dimensions.
        """
        from videopython.base.transforms import Crop

        return Crop(width=width, height=height).apply(self)

    def resample_fps(self, fps: float) -> Video:
        """Resample video to a different frame rate.

        Args:
            fps: Target frames per second.

        Returns:
            New Video with the specified frame rate.
        """
        from videopython.base.transforms import ResampleFPS

        return ResampleFPS(fps=fps).apply(self)

    def transition_to(self, other: Video, transition: object) -> Video:
        """Combine with another video using a transition.

        Args:
            other: Video to transition to.
            transition: Transition to apply (e.g., FadeTransition, BlurTransition).

        Returns:
            New Video combining both videos with the transition effect.
        """
        from videopython.base.transitions import Transition

        if not isinstance(transition, Transition):
            raise TypeError(f"Expected Transition, got {type(transition).__name__}")
        return transition.apply((self, other))

    def ken_burns(
        self,
        start_region: "BoundingBox",
        end_region: "BoundingBox",
        easing: Literal["linear", "ease_in", "ease_out", "ease_in_out"] = "linear",
        start: float | None = None,
        stop: float | None = None,
    ) -> Video:
        """Apply Ken Burns pan-and-zoom effect.

        Creates cinematic movement by smoothly transitioning between two regions.

        Args:
            start_region: Starting crop region (BoundingBox with normalized 0-1 coordinates).
            end_region: Ending crop region (BoundingBox with normalized 0-1 coordinates).
            easing: Animation easing - "linear", "ease_in", "ease_out", or "ease_in_out".
            start: Optional start time in seconds for the effect.
            stop: Optional stop time in seconds for the effect.

        Returns:
            New Video with Ken Burns effect applied.
        """
        from videopython.base.effects import KenBurns

        return KenBurns(start_region=start_region, end_region=end_region, easing=easing).apply(
            self, start=start, stop=stop
        )

    def picture_in_picture(
        self,
        overlay: Video,
        position: tuple[float, float] = (0.7, 0.7),
        scale: float = 0.25,
        border_width: int = 0,
        border_color: tuple[int, int, int] = (255, 255, 255),
        corner_radius: int = 0,
        opacity: float = 1.0,
        audio_mode: Literal["main", "overlay", "mix"] = "main",
        audio_mix: tuple[float, float] = (1.0, 1.0),
    ) -> Video:
        """Overlay another video as picture-in-picture.

        Args:
            overlay: Video to overlay on this video.
            position: Normalized (x, y) center position, (0,0)=top-left, (1,1)=bottom-right.
            scale: Overlay size relative to main video width (0.25 = 25%).
            border_width: Border width in pixels (default 0).
            border_color: Border color as RGB tuple (default white).
            corner_radius: Rounded corner radius in pixels (default 0).
            opacity: Overlay transparency from 0 to 1 (default 1.0).
            audio_mode: Audio handling - "main" (default), "overlay", or "mix".
            audio_mix: Volume factors (main, overlay) for mix mode, default (1.0, 1.0).

        Returns:
            New Video with picture-in-picture overlay.
        """
        from videopython.base.transforms import PictureInPicture

        return PictureInPicture(
            overlay=overlay,
            position=position,
            scale=scale,
            border_width=border_width,
            border_color=border_color,
            corner_radius=corner_radius,
            opacity=opacity,
            audio_mode=audio_mode,
            audio_mix=audio_mix,
        ).apply(self)

video_shape property

video_shape: tuple[int, int, int, int]

frame_shape property

frame_shape: tuple[int, int, int]

total_seconds property

total_seconds: float

metadata property

metadata: VideoMetadata

__init__

__init__(
    frames: ndarray,
    fps: int | float,
    audio: Audio | None = None,
)
Source code in src/videopython/base/video.py
def __init__(self, frames: np.ndarray, fps: int | float, audio: Audio | None = None):
    self.frames = frames
    self.fps = fps
    if audio:
        self.audio = audio
    else:
        self.audio = Audio.create_silent(
            duration_seconds=round(self.total_seconds, 2), stereo=True, sample_rate=44100
        )

from_path classmethod

from_path(
    path: str,
    read_batch_size: int = 100,
    start_second: float | None = None,
    end_second: float | None = None,
) -> Video
Source code in src/videopython/base/video.py
@classmethod
def from_path(
    cls, path: str, read_batch_size: int = 100, start_second: float | None = None, end_second: float | None = None
) -> Video:
    try:
        # Get video metadata using VideoMetadata.from_path
        metadata = VideoMetadata.from_path(path)

        width = metadata.width
        height = metadata.height
        fps = metadata.fps
        total_duration = metadata.total_seconds

        # Validate time bounds
        if start_second is not None and start_second < 0:
            raise ValueError("start_second must be non-negative")
        if end_second is not None and end_second > total_duration:
            raise ValueError(f"end_second ({end_second}) exceeds video duration ({total_duration})")
        if start_second is not None and end_second is not None and start_second >= end_second:
            raise ValueError("start_second must be less than end_second")

        # Estimate memory usage and warn for large videos
        segment_duration = total_duration
        if start_second is not None and end_second is not None:
            segment_duration = end_second - start_second
        elif end_second is not None:
            segment_duration = end_second
        elif start_second is not None:
            segment_duration = total_duration - start_second

        estimated_frames = int(segment_duration * fps)
        estimated_bytes = estimated_frames * height * width * 3
        estimated_gb = estimated_bytes / (1024**3)
        if estimated_gb > 10:
            warnings.warn(
                f"Loading this video will use ~{estimated_gb:.1f}GB of RAM. "
                f"For large videos, consider using FrameIterator for memory-efficient streaming.",
                ResourceWarning,
                stacklevel=2,
            )

        # Build FFmpeg command with improved segment handling
        ffmpeg_cmd = ["ffmpeg"]

        # Add seek option BEFORE input for more efficient seeking
        if start_second is not None:
            ffmpeg_cmd.extend(["-ss", str(start_second)])

        ffmpeg_cmd.extend(["-i", path])

        # Add duration AFTER input for more precise timing
        if end_second is not None and start_second is not None:
            duration = end_second - start_second
            ffmpeg_cmd.extend(["-t", str(duration)])
        elif end_second is not None:
            ffmpeg_cmd.extend(["-t", str(end_second)])

        # Output format settings - removed problematic -vsync 0
        ffmpeg_cmd.extend(
            [
                "-f",
                "rawvideo",
                "-pix_fmt",
                "rgb24",
                "-vcodec",
                "rawvideo",
                "-avoid_negative_ts",
                "make_zero",  # Handle timing issues
                "-y",
                "pipe:1",
            ]
        )

        # Start FFmpeg process with stderr redirected to avoid deadlock
        process = subprocess.Popen(
            ffmpeg_cmd,
            stdout=subprocess.PIPE,
            stderr=subprocess.DEVNULL,  # Redirect stderr to avoid deadlock
            bufsize=10**8,  # Use large buffer for efficient I/O
        )

        # Calculate frame size in bytes
        frame_size = width * height * 3  # 3 bytes per pixel for RGB

        # Estimate frame count for pre-allocation
        if start_second is not None and end_second is not None:
            estimated_duration = end_second - start_second
        elif end_second is not None:
            estimated_duration = end_second
        elif start_second is not None:
            estimated_duration = total_duration - start_second
        else:
            estimated_duration = total_duration

        # Add buffer to handle frame rate variations and rounding
        estimated_frames = int(estimated_duration * fps * FRAME_BUFFER_MULTIPLIER) + FRAME_BUFFER_PADDING

        # Pre-allocate numpy array
        frames = np.empty((estimated_frames, height, width, 3), dtype=np.uint8)
        frames_read = 0

        try:
            while frames_read < estimated_frames:
                # Calculate remaining frames to read
                remaining_frames = estimated_frames - frames_read
                batch_size = min(read_batch_size, remaining_frames)

                # Read batch of data
                batch_data = process.stdout.read(frame_size * batch_size)  # type: ignore

                if not batch_data:
                    break

                # Convert to numpy array
                batch_frames = np.frombuffer(batch_data, dtype=np.uint8)

                # Calculate how many complete frames we got
                complete_frames = len(batch_frames) // (height * width * 3)

                if complete_frames == 0:
                    break

                # Only keep complete frames
                complete_data = batch_frames[: complete_frames * height * width * 3]
                batch_frames_array = complete_data.reshape(complete_frames, height, width, 3)

                # Check if we have room in pre-allocated array
                if frames_read + complete_frames > estimated_frames:
                    # Need to expand array - this should be rare with our buffer
                    new_size = max(estimated_frames * 2, frames_read + complete_frames + 100)
                    new_frames = np.empty((new_size, height, width, 3), dtype=np.uint8)
                    new_frames[:frames_read] = frames[:frames_read]
                    frames = new_frames
                    estimated_frames = new_size

                # Store batch in pre-allocated array
                end_idx = frames_read + complete_frames
                frames[frames_read:end_idx] = batch_frames_array
                frames_read += complete_frames

        finally:
            # Ensure process is properly terminated
            if process.poll() is None:
                process.terminate()
                try:
                    process.wait(timeout=5)
                except subprocess.TimeoutExpired:
                    process.kill()
                    process.wait()

            # Clean up pipes
            if process.stdout:
                process.stdout.close()

        # Check if FFmpeg had an error (non-zero return code)
        if process.returncode not in (0, None) and frames_read == 0:
            raise ValueError(f"FFmpeg failed to process video (return code: {process.returncode})")

        if frames_read == 0:
            raise ValueError("No frames were read from the video")

        # Trim the pre-allocated array to actual frames read
        frames = frames[:frames_read]  # type: ignore

        # Load audio for the specified segment
        try:
            audio = Audio.from_path(path)
            # Slice audio to match the video segment
            if start_second is not None or end_second is not None:
                audio_start = start_second if start_second is not None else 0
                audio_end = end_second if end_second is not None else audio.metadata.duration_seconds
                audio = audio.slice(start_seconds=audio_start, end_seconds=audio_end)
        except (AudioLoadError, FileNotFoundError, subprocess.CalledProcessError):
            warnings.warn(f"No audio found for `{path}`, adding silent track.")
            # Create silent audio based on actual frames read
            segment_duration = frames_read / fps
            audio = Audio.create_silent(duration_seconds=round(segment_duration, 2), stereo=True, sample_rate=44100)

        return cls(frames=frames, fps=fps, audio=audio)

    except VideoMetadataError:
        raise
    except subprocess.CalledProcessError as e:
        raise VideoLoadError(f"FFmpeg failed: {e}")
    except (OSError, IOError) as e:
        raise VideoLoadError(f"I/O error: {e}")

from_frames classmethod

from_frames(frames: ndarray, fps: float) -> Video
Source code in src/videopython/base/video.py
@classmethod
def from_frames(cls, frames: np.ndarray, fps: float) -> Video:
    if frames.ndim != 4:
        raise ValueError(f"Unsupported number of dimensions: {frames.shape}!")
    elif frames.shape[-1] == 4:
        frames = frames[:, :, :, :3]
    elif frames.shape[-1] != 3:
        raise ValueError(f"Unsupported number of dimensions: {frames.shape}!")
    return cls(frames=frames, fps=fps)

from_image classmethod

from_image(
    image: ndarray,
    fps: float = 24.0,
    length_seconds: float = 1.0,
) -> Video
Source code in src/videopython/base/video.py
@classmethod
def from_image(cls, image: np.ndarray, fps: float = 24.0, length_seconds: float = 1.0) -> Video:
    if len(image.shape) == 3:
        image = np.expand_dims(image, axis=0)
    frames = np.repeat(image, round(length_seconds * fps), axis=0)
    return cls(frames=frames, fps=fps)

save

save(
    filename: str | Path | None = None,
    format: ALLOWED_VIDEO_FORMATS = "mp4",
    preset: ALLOWED_VIDEO_PRESETS = "medium",
    crf: int = 23,
) -> Path

Save video to file.

Parameters:

Name Type Description Default
filename str | Path | None

Output filename. If None, generates random name

None
format ALLOWED_VIDEO_FORMATS

Output format (mp4, avi, mov, mkv, webm)

'mp4'
preset ALLOWED_VIDEO_PRESETS

Encoding speed/compression tradeoff. Slower presets give smaller files at the same quality. Options from fastest to smallest: ultrafast, superfast, veryfast, faster, fast, medium, slow, slower, veryslow

'medium'
crf int

Constant Rate Factor (0-51). Lower = better quality, larger file. Default 23 is visually lossless for most content. Range 18-28 recommended.

23

Returns:

Type Description
Path

Path to saved video file

Raises:

Type Description
RuntimeError

If video is not loaded

ValueError

If format or preset is not supported

Source code in src/videopython/base/video.py
def save(
    self,
    filename: str | Path | None = None,
    format: ALLOWED_VIDEO_FORMATS = "mp4",
    preset: ALLOWED_VIDEO_PRESETS = "medium",
    crf: int = 23,
) -> Path:
    """Save video to file.

    Args:
        filename: Output filename. If None, generates random name
        format: Output format (mp4, avi, mov, mkv, webm)
        preset: Encoding speed/compression tradeoff. Slower presets give smaller
            files at the same quality. Options from fastest to smallest:
            ultrafast, superfast, veryfast, faster, fast, medium, slow, slower, veryslow
        crf: Constant Rate Factor (0-51). Lower = better quality, larger file.
            Default 23 is visually lossless for most content. Range 18-28 recommended.

    Returns:
        Path to saved video file

    Raises:
        RuntimeError: If video is not loaded
        ValueError: If format or preset is not supported
    """
    if not self.is_loaded():
        raise RuntimeError("Video is not loaded, cannot save!")

    if format.lower() not in get_args(ALLOWED_VIDEO_FORMATS):
        raise ValueError(
            f"Unsupported format: {format}. Allowed formats are: {', '.join(get_args(ALLOWED_VIDEO_FORMATS))}"
        )

    if preset not in get_args(ALLOWED_VIDEO_PRESETS):
        raise ValueError(
            f"Unsupported preset: {preset}. Allowed presets are: {', '.join(get_args(ALLOWED_VIDEO_PRESETS))}"
        )

    frame_height, frame_width = self.frame_shape[:2]
    if frame_width % 2 != 0 or frame_height % 2 != 0:
        raise ValueError(
            "Current save pipeline uses libx264 with yuv420p, which requires even frame dimensions. "
            f"Got {frame_width}x{frame_height}. "
            "Resize, crop, or pad to an even width and height before saving."
        )

    if filename is None:
        filename = Path(generate_random_name(suffix=f".{format}"))
    else:
        filename = Path(filename).with_suffix(f".{format}")
        filename.parent.mkdir(parents=True, exist_ok=True)

    # Save audio to temporary WAV file
    with tempfile.NamedTemporaryFile(suffix=".wav") as temp_audio:
        self.audio.save(temp_audio.name, format="wav")

        # Calculate exact duration
        duration = len(self.frames) / self.fps

        # Construct FFmpeg command (stream raw video via stdin)
        ffmpeg_command = [
            "ffmpeg",
            "-y",
            "-hide_banner",
            "-loglevel",
            "error",
            # Raw video input settings
            "-f",
            "rawvideo",
            "-pixel_format",
            "rgb24",
            "-video_size",
            f"{self.frame_shape[1]}x{self.frame_shape[0]}",
            "-framerate",
            str(self.fps),
            "-i",
            "pipe:0",
            # Audio input
            "-i",
            temp_audio.name,
            # Video encoding settings
            "-c:v",
            "libx264",
            "-preset",
            preset,
            "-crf",
            str(crf),
            # Audio settings
            "-c:a",
            "aac",
            "-b:a",
            "192k",
            # Output settings
            "-pix_fmt",
            "yuv420p",
            "-movflags",
            "+faststart",  # Enable fast start for web playback
            "-t",
            str(duration),
            "-vsync",
            "cfr",
            str(filename),
        ]

        process = subprocess.Popen(
            ffmpeg_command,
            stdin=subprocess.PIPE,
            stdout=subprocess.DEVNULL,
            stderr=subprocess.PIPE,
        )

        try:
            if process.stdin is None:
                raise RuntimeError("Failed to open FFmpeg stdin pipe for video data")

            frames = self.frames
            if frames.dtype != np.uint8 or not frames.flags["C_CONTIGUOUS"]:
                frames = np.ascontiguousarray(frames, dtype=np.uint8)

            buffer = memoryview(frames)
            try:
                process.stdin.write(buffer)
                process.stdin.close()
            except BrokenPipeError as e:
                stderr = process.stderr.read() if process.stderr is not None else b""
                returncode = process.wait()
                raise RuntimeError(
                    f"FFmpeg terminated while receiving video data (code {returncode}): "
                    f"{stderr.decode(errors='ignore')}"
                ) from e

            stderr = process.stderr.read() if process.stderr is not None else b""
            returncode = process.wait()

            if returncode != 0:
                raise RuntimeError(f"FFmpeg failed with code {returncode}: {stderr.decode(errors='ignore')}")

            return filename
        finally:
            if process.poll() is None:
                process.kill()

copy

copy() -> Video
Source code in src/videopython/base/video.py
def copy(self) -> Video:
    copied = Video.from_frames(self.frames.copy(), self.fps)
    copied.audio = self.audio  # Audio objects are immutable, no need to copy
    return copied

split

split(
    frame_index: int | None = None,
) -> tuple[Video, Video]
Source code in src/videopython/base/video.py
def split(self, frame_index: int | None = None) -> tuple[Video, Video]:
    if frame_index:
        if not (0 <= frame_index <= len(self.frames)):
            raise ValueError(f"frame_idx must be between 0 and {len(self.frames)}, got {frame_index}")
    else:
        frame_index = len(self.frames) // 2

    split_videos = (
        self.from_frames(self.frames[:frame_index], self.fps),
        self.from_frames(self.frames[frame_index:], self.fps),
    )

    # Split audio at the corresponding time point
    split_time = frame_index / self.fps
    split_videos[0].audio = self.audio.slice(start_seconds=0, end_seconds=split_time)
    split_videos[1].audio = self.audio.slice(start_seconds=split_time)

    return split_videos

add_audio

add_audio(audio: Audio, overlay: bool = True) -> Video

Add audio to video, returning a new Video instance.

Parameters:

Name Type Description Default
audio Audio

Audio to add

required
overlay bool

If True, overlay on existing audio; if False, replace it

True

Returns:

Type Description
Video

New Video with the audio added

Source code in src/videopython/base/video.py
def add_audio(self, audio: Audio, overlay: bool = True) -> Video:
    """Add audio to video, returning a new Video instance.

    Args:
        audio: Audio to add
        overlay: If True, overlay on existing audio; if False, replace it

    Returns:
        New Video with the audio added
    """
    video_duration = self.total_seconds
    audio_duration = audio.metadata.duration_seconds

    if audio_duration > video_duration:
        audio = audio.slice(start_seconds=0, end_seconds=video_duration)
    elif audio_duration < video_duration:
        silence_duration = video_duration - audio_duration
        silence = Audio.create_silent(
            duration_seconds=silence_duration,
            stereo=audio.metadata.channels == 2,
            sample_rate=audio.metadata.sample_rate,
        )
        audio = audio.concat(silence)

    new_video = self.copy()
    if new_video.audio.is_silent:
        new_video.audio = audio
    elif overlay:
        new_video.audio = new_video.audio.overlay(audio, position=0.0)
    else:
        new_video.audio = audio
    return new_video

add_audio_from_file

add_audio_from_file(
    path: str, overlay: bool = True
) -> Video

Add audio from file, returning a new Video instance.

Parameters:

Name Type Description Default
path str

Path to audio file

required
overlay bool

If True, overlay on existing audio; if False, replace it

True

Returns:

Type Description
Video

New Video with the audio added

Raises:

Type Description
AudioLoadError

If audio file cannot be loaded

FileNotFoundError

If audio file does not exist

Source code in src/videopython/base/video.py
def add_audio_from_file(self, path: str, overlay: bool = True) -> Video:
    """Add audio from file, returning a new Video instance.

    Args:
        path: Path to audio file
        overlay: If True, overlay on existing audio; if False, replace it

    Returns:
        New Video with the audio added

    Raises:
        AudioLoadError: If audio file cannot be loaded
        FileNotFoundError: If audio file does not exist
    """
    new_audio = Audio.from_path(path)
    return self.add_audio(new_audio, overlay)

is_loaded

is_loaded() -> bool
Source code in src/videopython/base/video.py
def is_loaded(self) -> bool:
    return self.fps is not None and self.frames is not None and self.audio is not None

cut

cut(start: float, end: float) -> Video

Cut video to a time range.

Parameters:

Name Type Description Default
start float

Start time in seconds.

required
end float

End time in seconds.

required

Returns:

Type Description
Video

New Video with the specified time range.

Source code in src/videopython/base/video.py
def cut(self, start: float, end: float) -> Video:
    """Cut video to a time range.

    Args:
        start: Start time in seconds.
        end: End time in seconds.

    Returns:
        New Video with the specified time range.
    """
    from videopython.base.transforms import CutSeconds

    return CutSeconds(start, end).apply(self)

cut_frames

cut_frames(start: int, end: int) -> Video

Cut video to a frame range.

Parameters:

Name Type Description Default
start int

Start frame index (inclusive).

required
end int

End frame index (exclusive).

required

Returns:

Type Description
Video

New Video with the specified frame range.

Source code in src/videopython/base/video.py
def cut_frames(self, start: int, end: int) -> Video:
    """Cut video to a frame range.

    Args:
        start: Start frame index (inclusive).
        end: End frame index (exclusive).

    Returns:
        New Video with the specified frame range.
    """
    from videopython.base.transforms import CutFrames

    return CutFrames(start, end).apply(self)

resize

resize(
    width: int | None = None,
    height: int | None = None,
    round_to_even: bool = True,
) -> Video

Resize video.

If only width or height is provided, the other dimension is calculated to preserve aspect ratio.

Parameters:

Name Type Description Default
width int | None

Target width in pixels.

None
height int | None

Target height in pixels.

None
round_to_even bool

If True (default), snap output width/height to even numbers.

True

Returns:

Type Description
Video

New Video with the specified dimensions.

Source code in src/videopython/base/video.py
def resize(
    self,
    width: int | None = None,
    height: int | None = None,
    round_to_even: bool = True,
) -> Video:
    """Resize video.

    If only width or height is provided, the other dimension is calculated
    to preserve aspect ratio.

    Args:
        width: Target width in pixels.
        height: Target height in pixels.
        round_to_even: If True (default), snap output width/height to even numbers.

    Returns:
        New Video with the specified dimensions.
    """
    from videopython.base.transforms import Resize

    return Resize(width=width, height=height, round_to_even=round_to_even).apply(self)

crop

crop(width: int, height: int) -> Video

Crop video to specified dimensions (center crop).

Parameters:

Name Type Description Default
width int

Target width in pixels.

required
height int

Target height in pixels.

required

Returns:

Type Description
Video

New Video with the specified dimensions.

Source code in src/videopython/base/video.py
def crop(self, width: int, height: int) -> Video:
    """Crop video to specified dimensions (center crop).

    Args:
        width: Target width in pixels.
        height: Target height in pixels.

    Returns:
        New Video with the specified dimensions.
    """
    from videopython.base.transforms import Crop

    return Crop(width=width, height=height).apply(self)

resample_fps

resample_fps(fps: float) -> Video

Resample video to a different frame rate.

Parameters:

Name Type Description Default
fps float

Target frames per second.

required

Returns:

Type Description
Video

New Video with the specified frame rate.

Source code in src/videopython/base/video.py
def resample_fps(self, fps: float) -> Video:
    """Resample video to a different frame rate.

    Args:
        fps: Target frames per second.

    Returns:
        New Video with the specified frame rate.
    """
    from videopython.base.transforms import ResampleFPS

    return ResampleFPS(fps=fps).apply(self)

transition_to

transition_to(other: Video, transition: object) -> Video

Combine with another video using a transition.

Parameters:

Name Type Description Default
other Video

Video to transition to.

required
transition object

Transition to apply (e.g., FadeTransition, BlurTransition).

required

Returns:

Type Description
Video

New Video combining both videos with the transition effect.

Source code in src/videopython/base/video.py
def transition_to(self, other: Video, transition: object) -> Video:
    """Combine with another video using a transition.

    Args:
        other: Video to transition to.
        transition: Transition to apply (e.g., FadeTransition, BlurTransition).

    Returns:
        New Video combining both videos with the transition effect.
    """
    from videopython.base.transitions import Transition

    if not isinstance(transition, Transition):
        raise TypeError(f"Expected Transition, got {type(transition).__name__}")
    return transition.apply((self, other))

ken_burns

ken_burns(
    start_region: "BoundingBox",
    end_region: "BoundingBox",
    easing: Literal[
        "linear", "ease_in", "ease_out", "ease_in_out"
    ] = "linear",
    start: float | None = None,
    stop: float | None = None,
) -> Video

Apply Ken Burns pan-and-zoom effect.

Creates cinematic movement by smoothly transitioning between two regions.

Parameters:

Name Type Description Default
start_region 'BoundingBox'

Starting crop region (BoundingBox with normalized 0-1 coordinates).

required
end_region 'BoundingBox'

Ending crop region (BoundingBox with normalized 0-1 coordinates).

required
easing Literal['linear', 'ease_in', 'ease_out', 'ease_in_out']

Animation easing - "linear", "ease_in", "ease_out", or "ease_in_out".

'linear'
start float | None

Optional start time in seconds for the effect.

None
stop float | None

Optional stop time in seconds for the effect.

None

Returns:

Type Description
Video

New Video with Ken Burns effect applied.

Source code in src/videopython/base/video.py
def ken_burns(
    self,
    start_region: "BoundingBox",
    end_region: "BoundingBox",
    easing: Literal["linear", "ease_in", "ease_out", "ease_in_out"] = "linear",
    start: float | None = None,
    stop: float | None = None,
) -> Video:
    """Apply Ken Burns pan-and-zoom effect.

    Creates cinematic movement by smoothly transitioning between two regions.

    Args:
        start_region: Starting crop region (BoundingBox with normalized 0-1 coordinates).
        end_region: Ending crop region (BoundingBox with normalized 0-1 coordinates).
        easing: Animation easing - "linear", "ease_in", "ease_out", or "ease_in_out".
        start: Optional start time in seconds for the effect.
        stop: Optional stop time in seconds for the effect.

    Returns:
        New Video with Ken Burns effect applied.
    """
    from videopython.base.effects import KenBurns

    return KenBurns(start_region=start_region, end_region=end_region, easing=easing).apply(
        self, start=start, stop=stop
    )

picture_in_picture

picture_in_picture(
    overlay: Video,
    position: tuple[float, float] = (0.7, 0.7),
    scale: float = 0.25,
    border_width: int = 0,
    border_color: tuple[int, int, int] = (255, 255, 255),
    corner_radius: int = 0,
    opacity: float = 1.0,
    audio_mode: Literal["main", "overlay", "mix"] = "main",
    audio_mix: tuple[float, float] = (1.0, 1.0),
) -> Video

Overlay another video as picture-in-picture.

Parameters:

Name Type Description Default
overlay Video

Video to overlay on this video.

required
position tuple[float, float]

Normalized (x, y) center position, (0,0)=top-left, (1,1)=bottom-right.

(0.7, 0.7)
scale float

Overlay size relative to main video width (0.25 = 25%).

0.25
border_width int

Border width in pixels (default 0).

0
border_color tuple[int, int, int]

Border color as RGB tuple (default white).

(255, 255, 255)
corner_radius int

Rounded corner radius in pixels (default 0).

0
opacity float

Overlay transparency from 0 to 1 (default 1.0).

1.0
audio_mode Literal['main', 'overlay', 'mix']

Audio handling - "main" (default), "overlay", or "mix".

'main'
audio_mix tuple[float, float]

Volume factors (main, overlay) for mix mode, default (1.0, 1.0).

(1.0, 1.0)

Returns:

Type Description
Video

New Video with picture-in-picture overlay.

Source code in src/videopython/base/video.py
def picture_in_picture(
    self,
    overlay: Video,
    position: tuple[float, float] = (0.7, 0.7),
    scale: float = 0.25,
    border_width: int = 0,
    border_color: tuple[int, int, int] = (255, 255, 255),
    corner_radius: int = 0,
    opacity: float = 1.0,
    audio_mode: Literal["main", "overlay", "mix"] = "main",
    audio_mix: tuple[float, float] = (1.0, 1.0),
) -> Video:
    """Overlay another video as picture-in-picture.

    Args:
        overlay: Video to overlay on this video.
        position: Normalized (x, y) center position, (0,0)=top-left, (1,1)=bottom-right.
        scale: Overlay size relative to main video width (0.25 = 25%).
        border_width: Border width in pixels (default 0).
        border_color: Border color as RGB tuple (default white).
        corner_radius: Rounded corner radius in pixels (default 0).
        opacity: Overlay transparency from 0 to 1 (default 1.0).
        audio_mode: Audio handling - "main" (default), "overlay", or "mix".
        audio_mix: Volume factors (main, overlay) for mix mode, default (1.0, 1.0).

    Returns:
        New Video with picture-in-picture overlay.
    """
    from videopython.base.transforms import PictureInPicture

    return PictureInPicture(
        overlay=overlay,
        position=position,
        scale=scale,
        border_width=border_width,
        border_color=border_color,
        corner_radius=corner_radius,
        opacity=opacity,
        audio_mode=audio_mode,
        audio_mix=audio_mix,
    ).apply(self)

VideoMetadata

Get video metadata without loading frames into memory:

from videopython.base import VideoMetadata

metadata = VideoMetadata.from_path("video.mp4")
print(f"Duration: {metadata.total_seconds}s")
print(f"Resolution: {metadata.width}x{metadata.height}")
print(f"FPS: {metadata.fps}")
print(f"Total frames: {metadata.frame_count}")

VideoMetadata dataclass

Class to store video metadata.

Source code in src/videopython/base/video.py
@dataclass
class VideoMetadata:
    """Class to store video metadata."""

    height: int
    width: int
    fps: float
    frame_count: int
    total_seconds: float

    def __str__(self) -> str:
        return f"{self.width}x{self.height} @ {self.fps}fps, {self.total_seconds} seconds"

    def __repr__(self) -> str:
        return self.__str__()

    def get_frame_shape(self) -> np.ndarray:
        """Returns frame shape."""
        return np.array((self.height, self.width, 3))

    def get_video_shape(self) -> np.ndarray:
        """Returns video shape."""
        return np.array((self.frame_count, self.height, self.width, 3))

    @staticmethod
    def _run_ffprobe(video_path: str | Path) -> dict:
        """Run ffprobe and return parsed JSON output."""
        cmd = [
            "ffprobe",
            "-v",
            "error",
            "-select_streams",
            "v:0",
            "-show_entries",
            "stream=width,height,r_frame_rate,nb_frames",
            "-show_entries",
            "format=duration",
            "-print_format",
            "json",
            str(video_path),
        ]

        try:
            result = subprocess.run(cmd, capture_output=True, text=True, check=True)
            return json.loads(result.stdout)
        except subprocess.CalledProcessError as e:
            raise VideoMetadataError(f"FFprobe error: {e.stderr}")
        except json.JSONDecodeError as e:
            raise VideoMetadataError(f"Error parsing FFprobe output: {e}")

    @classmethod
    def from_path(cls, video_path: str | Path) -> VideoMetadata:
        """Creates VideoMetadata object from video file using ffprobe."""
        if not Path(video_path).exists():
            raise FileNotFoundError(f"Video file not found: {video_path}")

        probe_data = cls._run_ffprobe(video_path)

        try:
            stream_info = probe_data["streams"][0]

            width = int(stream_info["width"])
            height = int(stream_info["height"])

            try:
                fps_fraction = Fraction(stream_info["r_frame_rate"])
                fps = float(fps_fraction)
            except (ValueError, ZeroDivisionError):
                raise VideoMetadataError(f"Invalid frame rate: {stream_info['r_frame_rate']}")

            if "nb_frames" in stream_info and stream_info["nb_frames"].isdigit():
                frame_count = int(stream_info["nb_frames"])
            else:
                duration = float(probe_data["format"]["duration"])
                frame_count = int(round(duration * fps))

            total_seconds = round(frame_count / fps, 2)

            return cls(height=height, width=width, fps=fps, frame_count=frame_count, total_seconds=total_seconds)

        except KeyError as e:
            raise VideoMetadataError(f"Missing required metadata field: {e}")
        except (TypeError, IndexError) as e:
            raise VideoMetadataError(f"Invalid metadata structure: {e}")

    @classmethod
    def from_video(cls, video: Video) -> VideoMetadata:
        """Creates VideoMetadata object from Video instance."""
        frame_count, height, width, _ = video.frames.shape
        total_seconds = round(frame_count / video.fps, 2)

        return cls(height=height, width=width, fps=video.fps, frame_count=frame_count, total_seconds=total_seconds)

    def can_be_merged_with(self, other_format: VideoMetadata) -> bool:
        """Check if videos can be merged."""
        return (
            self.height == other_format.height
            and self.width == other_format.width
            and round(self.fps) == round(other_format.fps)
        )

    def with_duration(self, seconds: float) -> VideoMetadata:
        """Return new metadata with updated duration.

        Args:
            seconds: New duration in seconds.

        Returns:
            New VideoMetadata with updated duration and frame count.
        """
        return VideoMetadata(
            height=self.height,
            width=self.width,
            fps=self.fps,
            frame_count=round(self.fps * seconds),
            total_seconds=seconds,
        )

    def with_dimensions(self, width: int, height: int) -> VideoMetadata:
        """Return new metadata with updated dimensions.

        Args:
            width: New width in pixels.
            height: New height in pixels.

        Returns:
            New VideoMetadata with updated dimensions.
        """
        return VideoMetadata(
            height=height,
            width=width,
            fps=self.fps,
            frame_count=self.frame_count,
            total_seconds=self.total_seconds,
        )

    def with_fps(self, fps: float) -> VideoMetadata:
        """Return new metadata with updated fps.

        Args:
            fps: New frames per second.

        Returns:
            New VideoMetadata with updated fps (duration stays same).
        """
        return VideoMetadata(
            height=self.height,
            width=self.width,
            fps=fps,
            frame_count=round(fps * self.total_seconds),
            total_seconds=self.total_seconds,
        )

    def can_be_downsampled_to(self, target_format: VideoMetadata) -> bool:
        """Checks if video can be downsampled to target_format."""
        return (
            self.height >= target_format.height
            and self.width >= target_format.width
            and round(self.fps) >= round(target_format.fps)
            and self.total_seconds >= target_format.total_seconds
        )

    # Fluent API for operation validation
    # These methods mirror the Video fluent API but only transform metadata

    def cut(self, start: float, end: float) -> VideoMetadata:
        """Predict metadata after cutting by time range.

        Args:
            start: Start time in seconds.
            end: End time in seconds.

        Returns:
            New VideoMetadata with updated duration.
        """
        if end <= start:
            raise ValueError(f"End time ({end}) must be greater than start time ({start})")
        if start < 0:
            raise ValueError(f"Start time ({start}) cannot be negative")
        if end > self.total_seconds:
            raise ValueError(f"End time ({end}) exceeds video duration ({self.total_seconds})")
        # Mirror CutSeconds.apply() semantics: convert times to frame indices using
        # round() before slicing so metadata validation matches runtime output.
        start_frame = round(start * self.fps)
        end_frame = round(end * self.fps)
        return self.cut_frames(start_frame, end_frame)

    def cut_frames(self, start: int, end: int) -> VideoMetadata:
        """Predict metadata after cutting by frame range.

        Args:
            start: Start frame index (inclusive).
            end: End frame index (exclusive).

        Returns:
            New VideoMetadata with updated duration.
        """
        if end <= start:
            raise ValueError(f"End frame ({end}) must be greater than start frame ({start})")
        if start < 0:
            raise ValueError(f"Start frame ({start}) cannot be negative")
        if end > self.frame_count:
            raise ValueError(f"End frame ({end}) exceeds frame count ({self.frame_count})")
        duration = (end - start) / self.fps
        return self.with_duration(duration)

    def resize(
        self,
        width: int | None = None,
        height: int | None = None,
        round_to_even: bool = True,
    ) -> VideoMetadata:
        """Predict metadata after resizing.

        If only width or height is provided, the other dimension is calculated
        to preserve aspect ratio.

        Args:
            width: Target width in pixels.
            height: Target height in pixels.
            round_to_even: If True (default), snap output width/height to even numbers.

        Returns:
            New VideoMetadata with updated dimensions.
        """
        if width is None and height is None:
            raise ValueError("Must provide width or height")

        def _snap(value: int) -> int:
            return _round_dimension_to_even(value) if round_to_even else value

        if width and height:
            return self.with_dimensions(_snap(width), _snap(height))
        elif width:
            ratio = width / self.width
            new_height = round(self.height * ratio)
            return self.with_dimensions(_snap(width), _snap(new_height))
        else:  # height only
            ratio = height / self.height  # type: ignore[operator]
            new_width = round(self.width * ratio)
            return self.with_dimensions(_snap(new_width), _snap(height))  # type: ignore[arg-type]

    def crop(self, width: int, height: int) -> VideoMetadata:
        """Predict metadata after cropping.

        Args:
            width: Target width in pixels.
            height: Target height in pixels.

        Returns:
            New VideoMetadata with updated dimensions.
        """
        if width > self.width:
            raise ValueError(f"Crop width ({width}) exceeds video width ({self.width})")
        if height > self.height:
            raise ValueError(f"Crop height ({height}) exceeds video height ({self.height})")
        return self.with_dimensions(width, height)

    def resample_fps(self, fps: float) -> VideoMetadata:
        """Predict metadata after resampling frame rate.

        Args:
            fps: Target frames per second.

        Returns:
            New VideoMetadata with updated fps.
        """
        if fps <= 0:
            raise ValueError(f"FPS ({fps}) must be positive")
        return self.with_fps(fps)

    def speed_change(self, speed: float) -> VideoMetadata:
        """Predict metadata after speed change.

        Mirrors runtime frame-count semantics: int(frame_count / speed),
        matching SpeedChange.apply() behavior.

        Args:
            speed: Speed multiplier (e.g. 2.0 = double speed, 0.5 = half speed).

        Returns:
            New VideoMetadata with updated duration and frame count.
        """
        if speed <= 0:
            raise ValueError(f"Speed ({speed}) must be positive")
        new_frame_count = int(self.frame_count / speed)
        if new_frame_count == 0:
            raise ValueError(f"Speed {speed}x would result in 0 frames")
        new_seconds = round(new_frame_count / self.fps, 4)
        return VideoMetadata(
            height=self.height,
            width=self.width,
            fps=self.fps,
            frame_count=new_frame_count,
            total_seconds=new_seconds,
        )

    def crop_to_aspect_even(self, target_aspect: tuple[int, int] | list[int] = (9, 16)) -> VideoMetadata:
        """Predict metadata after aspect-ratio crop with even output dimensions.

        Mirrors the output dimension logic used by AI crop transforms like
        ``FaceTrackingCrop``.
        """
        if not isinstance(target_aspect, (tuple, list)) or len(target_aspect) != 2:
            raise ValueError("target_aspect must be a 2-item tuple/list of positive integers")

        try:
            aspect_w = int(target_aspect[0])
            aspect_h = int(target_aspect[1])
        except (TypeError, ValueError) as e:
            raise ValueError("target_aspect must contain numeric values") from e

        if aspect_w <= 0 or aspect_h <= 0:
            raise ValueError("target_aspect values must be positive")

        target_ratio = aspect_w / aspect_h
        frame_ratio = self.width / self.height

        def _make_even(value: int) -> int:
            return value - (value % 2)

        if target_ratio < frame_ratio:
            out_h = _make_even(self.height)
            out_w = _make_even(int(out_h * target_ratio))
        else:
            out_w = _make_even(self.width)
            out_h = _make_even(int(out_w / target_ratio))

        return self.with_dimensions(out_w, out_h)

    def transition_to(self, other: VideoMetadata, effect_time: float = 0.0) -> VideoMetadata:
        """Predict metadata after transition to another video.

        Args:
            other: Metadata of the video to transition to.
            effect_time: Duration of the transition effect in seconds.

        Returns:
            New VideoMetadata for the combined video.

        Raises:
            ValueError: If videos have incompatible dimensions or fps.
        """
        if not self.can_be_merged_with(other):
            raise ValueError(
                f"Cannot merge videos: {self.width}x{self.height}@{round(self.fps)}fps "
                f"vs {other.width}x{other.height}@{round(other.fps)}fps"
            )
        combined_duration = self.total_seconds + other.total_seconds - effect_time
        return self.with_duration(combined_duration)

get_frame_shape

get_frame_shape() -> np.ndarray

Returns frame shape.

Source code in src/videopython/base/video.py
def get_frame_shape(self) -> np.ndarray:
    """Returns frame shape."""
    return np.array((self.height, self.width, 3))

get_video_shape

get_video_shape() -> np.ndarray

Returns video shape.

Source code in src/videopython/base/video.py
def get_video_shape(self) -> np.ndarray:
    """Returns video shape."""
    return np.array((self.frame_count, self.height, self.width, 3))

from_path classmethod

from_path(video_path: str | Path) -> VideoMetadata

Creates VideoMetadata object from video file using ffprobe.

Source code in src/videopython/base/video.py
@classmethod
def from_path(cls, video_path: str | Path) -> VideoMetadata:
    """Creates VideoMetadata object from video file using ffprobe."""
    if not Path(video_path).exists():
        raise FileNotFoundError(f"Video file not found: {video_path}")

    probe_data = cls._run_ffprobe(video_path)

    try:
        stream_info = probe_data["streams"][0]

        width = int(stream_info["width"])
        height = int(stream_info["height"])

        try:
            fps_fraction = Fraction(stream_info["r_frame_rate"])
            fps = float(fps_fraction)
        except (ValueError, ZeroDivisionError):
            raise VideoMetadataError(f"Invalid frame rate: {stream_info['r_frame_rate']}")

        if "nb_frames" in stream_info and stream_info["nb_frames"].isdigit():
            frame_count = int(stream_info["nb_frames"])
        else:
            duration = float(probe_data["format"]["duration"])
            frame_count = int(round(duration * fps))

        total_seconds = round(frame_count / fps, 2)

        return cls(height=height, width=width, fps=fps, frame_count=frame_count, total_seconds=total_seconds)

    except KeyError as e:
        raise VideoMetadataError(f"Missing required metadata field: {e}")
    except (TypeError, IndexError) as e:
        raise VideoMetadataError(f"Invalid metadata structure: {e}")

from_video classmethod

from_video(video: Video) -> VideoMetadata

Creates VideoMetadata object from Video instance.

Source code in src/videopython/base/video.py
@classmethod
def from_video(cls, video: Video) -> VideoMetadata:
    """Creates VideoMetadata object from Video instance."""
    frame_count, height, width, _ = video.frames.shape
    total_seconds = round(frame_count / video.fps, 2)

    return cls(height=height, width=width, fps=video.fps, frame_count=frame_count, total_seconds=total_seconds)

can_be_merged_with

can_be_merged_with(other_format: VideoMetadata) -> bool

Check if videos can be merged.

Source code in src/videopython/base/video.py
def can_be_merged_with(self, other_format: VideoMetadata) -> bool:
    """Check if videos can be merged."""
    return (
        self.height == other_format.height
        and self.width == other_format.width
        and round(self.fps) == round(other_format.fps)
    )

with_duration

with_duration(seconds: float) -> VideoMetadata

Return new metadata with updated duration.

Parameters:

Name Type Description Default
seconds float

New duration in seconds.

required

Returns:

Type Description
VideoMetadata

New VideoMetadata with updated duration and frame count.

Source code in src/videopython/base/video.py
def with_duration(self, seconds: float) -> VideoMetadata:
    """Return new metadata with updated duration.

    Args:
        seconds: New duration in seconds.

    Returns:
        New VideoMetadata with updated duration and frame count.
    """
    return VideoMetadata(
        height=self.height,
        width=self.width,
        fps=self.fps,
        frame_count=round(self.fps * seconds),
        total_seconds=seconds,
    )

with_dimensions

with_dimensions(width: int, height: int) -> VideoMetadata

Return new metadata with updated dimensions.

Parameters:

Name Type Description Default
width int

New width in pixels.

required
height int

New height in pixels.

required

Returns:

Type Description
VideoMetadata

New VideoMetadata with updated dimensions.

Source code in src/videopython/base/video.py
def with_dimensions(self, width: int, height: int) -> VideoMetadata:
    """Return new metadata with updated dimensions.

    Args:
        width: New width in pixels.
        height: New height in pixels.

    Returns:
        New VideoMetadata with updated dimensions.
    """
    return VideoMetadata(
        height=height,
        width=width,
        fps=self.fps,
        frame_count=self.frame_count,
        total_seconds=self.total_seconds,
    )

with_fps

with_fps(fps: float) -> VideoMetadata

Return new metadata with updated fps.

Parameters:

Name Type Description Default
fps float

New frames per second.

required

Returns:

Type Description
VideoMetadata

New VideoMetadata with updated fps (duration stays same).

Source code in src/videopython/base/video.py
def with_fps(self, fps: float) -> VideoMetadata:
    """Return new metadata with updated fps.

    Args:
        fps: New frames per second.

    Returns:
        New VideoMetadata with updated fps (duration stays same).
    """
    return VideoMetadata(
        height=self.height,
        width=self.width,
        fps=fps,
        frame_count=round(fps * self.total_seconds),
        total_seconds=self.total_seconds,
    )

can_be_downsampled_to

can_be_downsampled_to(target_format: VideoMetadata) -> bool

Checks if video can be downsampled to target_format.

Source code in src/videopython/base/video.py
def can_be_downsampled_to(self, target_format: VideoMetadata) -> bool:
    """Checks if video can be downsampled to target_format."""
    return (
        self.height >= target_format.height
        and self.width >= target_format.width
        and round(self.fps) >= round(target_format.fps)
        and self.total_seconds >= target_format.total_seconds
    )

cut

cut(start: float, end: float) -> VideoMetadata

Predict metadata after cutting by time range.

Parameters:

Name Type Description Default
start float

Start time in seconds.

required
end float

End time in seconds.

required

Returns:

Type Description
VideoMetadata

New VideoMetadata with updated duration.

Source code in src/videopython/base/video.py
def cut(self, start: float, end: float) -> VideoMetadata:
    """Predict metadata after cutting by time range.

    Args:
        start: Start time in seconds.
        end: End time in seconds.

    Returns:
        New VideoMetadata with updated duration.
    """
    if end <= start:
        raise ValueError(f"End time ({end}) must be greater than start time ({start})")
    if start < 0:
        raise ValueError(f"Start time ({start}) cannot be negative")
    if end > self.total_seconds:
        raise ValueError(f"End time ({end}) exceeds video duration ({self.total_seconds})")
    # Mirror CutSeconds.apply() semantics: convert times to frame indices using
    # round() before slicing so metadata validation matches runtime output.
    start_frame = round(start * self.fps)
    end_frame = round(end * self.fps)
    return self.cut_frames(start_frame, end_frame)

cut_frames

cut_frames(start: int, end: int) -> VideoMetadata

Predict metadata after cutting by frame range.

Parameters:

Name Type Description Default
start int

Start frame index (inclusive).

required
end int

End frame index (exclusive).

required

Returns:

Type Description
VideoMetadata

New VideoMetadata with updated duration.

Source code in src/videopython/base/video.py
def cut_frames(self, start: int, end: int) -> VideoMetadata:
    """Predict metadata after cutting by frame range.

    Args:
        start: Start frame index (inclusive).
        end: End frame index (exclusive).

    Returns:
        New VideoMetadata with updated duration.
    """
    if end <= start:
        raise ValueError(f"End frame ({end}) must be greater than start frame ({start})")
    if start < 0:
        raise ValueError(f"Start frame ({start}) cannot be negative")
    if end > self.frame_count:
        raise ValueError(f"End frame ({end}) exceeds frame count ({self.frame_count})")
    duration = (end - start) / self.fps
    return self.with_duration(duration)

resize

resize(
    width: int | None = None,
    height: int | None = None,
    round_to_even: bool = True,
) -> VideoMetadata

Predict metadata after resizing.

If only width or height is provided, the other dimension is calculated to preserve aspect ratio.

Parameters:

Name Type Description Default
width int | None

Target width in pixels.

None
height int | None

Target height in pixels.

None
round_to_even bool

If True (default), snap output width/height to even numbers.

True

Returns:

Type Description
VideoMetadata

New VideoMetadata with updated dimensions.

Source code in src/videopython/base/video.py
def resize(
    self,
    width: int | None = None,
    height: int | None = None,
    round_to_even: bool = True,
) -> VideoMetadata:
    """Predict metadata after resizing.

    If only width or height is provided, the other dimension is calculated
    to preserve aspect ratio.

    Args:
        width: Target width in pixels.
        height: Target height in pixels.
        round_to_even: If True (default), snap output width/height to even numbers.

    Returns:
        New VideoMetadata with updated dimensions.
    """
    if width is None and height is None:
        raise ValueError("Must provide width or height")

    def _snap(value: int) -> int:
        return _round_dimension_to_even(value) if round_to_even else value

    if width and height:
        return self.with_dimensions(_snap(width), _snap(height))
    elif width:
        ratio = width / self.width
        new_height = round(self.height * ratio)
        return self.with_dimensions(_snap(width), _snap(new_height))
    else:  # height only
        ratio = height / self.height  # type: ignore[operator]
        new_width = round(self.width * ratio)
        return self.with_dimensions(_snap(new_width), _snap(height))  # type: ignore[arg-type]

crop

crop(width: int, height: int) -> VideoMetadata

Predict metadata after cropping.

Parameters:

Name Type Description Default
width int

Target width in pixels.

required
height int

Target height in pixels.

required

Returns:

Type Description
VideoMetadata

New VideoMetadata with updated dimensions.

Source code in src/videopython/base/video.py
def crop(self, width: int, height: int) -> VideoMetadata:
    """Predict metadata after cropping.

    Args:
        width: Target width in pixels.
        height: Target height in pixels.

    Returns:
        New VideoMetadata with updated dimensions.
    """
    if width > self.width:
        raise ValueError(f"Crop width ({width}) exceeds video width ({self.width})")
    if height > self.height:
        raise ValueError(f"Crop height ({height}) exceeds video height ({self.height})")
    return self.with_dimensions(width, height)

resample_fps

resample_fps(fps: float) -> VideoMetadata

Predict metadata after resampling frame rate.

Parameters:

Name Type Description Default
fps float

Target frames per second.

required

Returns:

Type Description
VideoMetadata

New VideoMetadata with updated fps.

Source code in src/videopython/base/video.py
def resample_fps(self, fps: float) -> VideoMetadata:
    """Predict metadata after resampling frame rate.

    Args:
        fps: Target frames per second.

    Returns:
        New VideoMetadata with updated fps.
    """
    if fps <= 0:
        raise ValueError(f"FPS ({fps}) must be positive")
    return self.with_fps(fps)

speed_change

speed_change(speed: float) -> VideoMetadata

Predict metadata after speed change.

Mirrors runtime frame-count semantics: int(frame_count / speed), matching SpeedChange.apply() behavior.

Parameters:

Name Type Description Default
speed float

Speed multiplier (e.g. 2.0 = double speed, 0.5 = half speed).

required

Returns:

Type Description
VideoMetadata

New VideoMetadata with updated duration and frame count.

Source code in src/videopython/base/video.py
def speed_change(self, speed: float) -> VideoMetadata:
    """Predict metadata after speed change.

    Mirrors runtime frame-count semantics: int(frame_count / speed),
    matching SpeedChange.apply() behavior.

    Args:
        speed: Speed multiplier (e.g. 2.0 = double speed, 0.5 = half speed).

    Returns:
        New VideoMetadata with updated duration and frame count.
    """
    if speed <= 0:
        raise ValueError(f"Speed ({speed}) must be positive")
    new_frame_count = int(self.frame_count / speed)
    if new_frame_count == 0:
        raise ValueError(f"Speed {speed}x would result in 0 frames")
    new_seconds = round(new_frame_count / self.fps, 4)
    return VideoMetadata(
        height=self.height,
        width=self.width,
        fps=self.fps,
        frame_count=new_frame_count,
        total_seconds=new_seconds,
    )

crop_to_aspect_even

crop_to_aspect_even(
    target_aspect: tuple[int, int] | list[int] = (9, 16),
) -> VideoMetadata

Predict metadata after aspect-ratio crop with even output dimensions.

Mirrors the output dimension logic used by AI crop transforms like FaceTrackingCrop.

Source code in src/videopython/base/video.py
def crop_to_aspect_even(self, target_aspect: tuple[int, int] | list[int] = (9, 16)) -> VideoMetadata:
    """Predict metadata after aspect-ratio crop with even output dimensions.

    Mirrors the output dimension logic used by AI crop transforms like
    ``FaceTrackingCrop``.
    """
    if not isinstance(target_aspect, (tuple, list)) or len(target_aspect) != 2:
        raise ValueError("target_aspect must be a 2-item tuple/list of positive integers")

    try:
        aspect_w = int(target_aspect[0])
        aspect_h = int(target_aspect[1])
    except (TypeError, ValueError) as e:
        raise ValueError("target_aspect must contain numeric values") from e

    if aspect_w <= 0 or aspect_h <= 0:
        raise ValueError("target_aspect values must be positive")

    target_ratio = aspect_w / aspect_h
    frame_ratio = self.width / self.height

    def _make_even(value: int) -> int:
        return value - (value % 2)

    if target_ratio < frame_ratio:
        out_h = _make_even(self.height)
        out_w = _make_even(int(out_h * target_ratio))
    else:
        out_w = _make_even(self.width)
        out_h = _make_even(int(out_w / target_ratio))

    return self.with_dimensions(out_w, out_h)

transition_to

transition_to(
    other: VideoMetadata, effect_time: float = 0.0
) -> VideoMetadata

Predict metadata after transition to another video.

Parameters:

Name Type Description Default
other VideoMetadata

Metadata of the video to transition to.

required
effect_time float

Duration of the transition effect in seconds.

0.0

Returns:

Type Description
VideoMetadata

New VideoMetadata for the combined video.

Raises:

Type Description
ValueError

If videos have incompatible dimensions or fps.

Source code in src/videopython/base/video.py
def transition_to(self, other: VideoMetadata, effect_time: float = 0.0) -> VideoMetadata:
    """Predict metadata after transition to another video.

    Args:
        other: Metadata of the video to transition to.
        effect_time: Duration of the transition effect in seconds.

    Returns:
        New VideoMetadata for the combined video.

    Raises:
        ValueError: If videos have incompatible dimensions or fps.
    """
    if not self.can_be_merged_with(other):
        raise ValueError(
            f"Cannot merge videos: {self.width}x{self.height}@{round(self.fps)}fps "
            f"vs {other.width}x{other.height}@{round(other.fps)}fps"
        )
    combined_duration = self.total_seconds + other.total_seconds - effect_time
    return self.with_duration(combined_duration)

FrameIterator

Memory-efficient frame iterator for streaming video frames without loading the entire video into memory. Useful for processing very long videos.

from videopython.base import FrameIterator

# Stream frames one at a time - O(1) memory usage
with FrameIterator("long_video.mp4") as frames:
    for frame_idx, frame in frames:
        # frame is a numpy array (H, W, 3) in RGB format
        process_frame(frame)

# With time bounds
with FrameIterator("video.mp4", start_second=10.0, end_second=60.0) as frames:
    for frame_idx, frame in frames:
        process_frame(frame)

FrameIterator

Memory-efficient frame iterator using ffmpeg streaming.

Yields frames one at a time, keeping memory usage constant regardless of video length. Supports context manager protocol for resource cleanup.

This is useful for operations that only need to process frames sequentially, such as scene detection, without loading the entire video into memory.

Example

with FrameIterator("video.mp4") as frames: ... for idx, frame in frames: ... process(frame)

Source code in src/videopython/base/video.py
class FrameIterator:
    """Memory-efficient frame iterator using ffmpeg streaming.

    Yields frames one at a time, keeping memory usage constant regardless
    of video length. Supports context manager protocol for resource cleanup.

    This is useful for operations that only need to process frames sequentially,
    such as scene detection, without loading the entire video into memory.

    Example:
        >>> with FrameIterator("video.mp4") as frames:
        ...     for idx, frame in frames:
        ...         process(frame)
    """

    def __init__(
        self,
        path: str | Path,
        start_second: float | None = None,
        end_second: float | None = None,
    ):
        """Initialize the frame iterator.

        Args:
            path: Path to video file
            start_second: Optional start time in seconds (seek before reading)
            end_second: Optional end time in seconds (stop reading after this)
        """
        self.path = Path(path)
        if not self.path.exists():
            raise FileNotFoundError(f"Video file not found: {path}")

        self.metadata = VideoMetadata.from_path(path)
        self.start_second = start_second if start_second is not None else 0.0
        self.end_second = end_second
        self._process: subprocess.Popen | None = None
        self._frame_size = self.metadata.width * self.metadata.height * 3

    def _build_ffmpeg_command(self) -> list[str]:
        """Build ffmpeg command for frame streaming."""
        cmd = ["ffmpeg"]

        if self.start_second > 0:
            cmd.extend(["-ss", str(self.start_second)])

        cmd.extend(["-i", str(self.path)])

        if self.end_second is not None:
            duration = self.end_second - self.start_second
            cmd.extend(["-t", str(duration)])

        cmd.extend(
            [
                "-f",
                "rawvideo",
                "-pix_fmt",
                "rgb24",
                "-vcodec",
                "rawvideo",
                "-y",
                "pipe:1",
            ]
        )
        return cmd

    def __iter__(self) -> Generator[tuple[int, np.ndarray], None, None]:
        """Yield (frame_index, frame) tuples.

        Frame indices are absolute indices in the original video,
        accounting for any start_second offset.
        """
        cmd = self._build_ffmpeg_command()

        self._process = subprocess.Popen(
            cmd,
            stdout=subprocess.PIPE,
            stderr=subprocess.DEVNULL,
            bufsize=self._frame_size * 2,
        )

        # Calculate starting frame index based on start_second
        start_frame = int(self.start_second * self.metadata.fps)
        frame_idx = start_frame

        try:
            while True:
                raw_frame = self._process.stdout.read(self._frame_size)  # type: ignore
                if len(raw_frame) != self._frame_size:
                    break

                frame = np.frombuffer(raw_frame, dtype=np.uint8).copy()
                frame = frame.reshape(self.metadata.height, self.metadata.width, 3)

                yield frame_idx, frame
                frame_idx += 1
        finally:
            self._cleanup()

    def _cleanup(self) -> None:
        """Clean up ffmpeg process."""
        if self._process is not None:
            if self._process.poll() is None:
                self._process.terminate()
                try:
                    self._process.wait(timeout=5)
                except subprocess.TimeoutExpired:
                    self._process.kill()
                    self._process.wait()
            if self._process.stdout:
                self._process.stdout.close()
            self._process = None

    def __enter__(self) -> "FrameIterator":
        return self

    def __exit__(self, *args: object) -> None:
        self._cleanup()

__init__

__init__(
    path: str | Path,
    start_second: float | None = None,
    end_second: float | None = None,
)

Initialize the frame iterator.

Parameters:

Name Type Description Default
path str | Path

Path to video file

required
start_second float | None

Optional start time in seconds (seek before reading)

None
end_second float | None

Optional end time in seconds (stop reading after this)

None
Source code in src/videopython/base/video.py
def __init__(
    self,
    path: str | Path,
    start_second: float | None = None,
    end_second: float | None = None,
):
    """Initialize the frame iterator.

    Args:
        path: Path to video file
        start_second: Optional start time in seconds (seek before reading)
        end_second: Optional end time in seconds (stop reading after this)
    """
    self.path = Path(path)
    if not self.path.exists():
        raise FileNotFoundError(f"Video file not found: {path}")

    self.metadata = VideoMetadata.from_path(path)
    self.start_second = start_second if start_second is not None else 0.0
    self.end_second = end_second
    self._process: subprocess.Popen | None = None
    self._frame_size = self.metadata.width * self.metadata.height * 3

__iter__

__iter__() -> Generator[tuple[int, np.ndarray], None, None]

Yield (frame_index, frame) tuples.

Frame indices are absolute indices in the original video, accounting for any start_second offset.

Source code in src/videopython/base/video.py
def __iter__(self) -> Generator[tuple[int, np.ndarray], None, None]:
    """Yield (frame_index, frame) tuples.

    Frame indices are absolute indices in the original video,
    accounting for any start_second offset.
    """
    cmd = self._build_ffmpeg_command()

    self._process = subprocess.Popen(
        cmd,
        stdout=subprocess.PIPE,
        stderr=subprocess.DEVNULL,
        bufsize=self._frame_size * 2,
    )

    # Calculate starting frame index based on start_second
    start_frame = int(self.start_second * self.metadata.fps)
    frame_idx = start_frame

    try:
        while True:
            raw_frame = self._process.stdout.read(self._frame_size)  # type: ignore
            if len(raw_frame) != self._frame_size:
                break

            frame = np.frombuffer(raw_frame, dtype=np.uint8).copy()
            frame = frame.reshape(self.metadata.height, self.metadata.width, 3)

            yield frame_idx, frame
            frame_idx += 1
    finally:
        self._cleanup()