Skip to content

Video

The Video class is the core data structure in videopython.

Video

Video

Source code in src/videopython/base/video.py
class Video:
    def __init__(self, frames: np.ndarray, fps: int | float, audio: Audio | None = None):
        self.frames = frames
        self.fps = fps
        if audio:
            self.audio = audio
        else:
            self.audio = Audio.create_silent(
                duration_seconds=round(self.total_seconds, 2), stereo=True, sample_rate=44100
            )

    @classmethod
    def from_path(
        cls,
        path: str,
        read_batch_size: int = 100,
        start_second: float | None = None,
        end_second: float | None = None,
        fps: float | None = None,
        width: int | None = None,
        height: int | None = None,
    ) -> Video:
        frames, out_fps, audio = _video_io.decode_video(
            path,
            read_batch_size=read_batch_size,
            start_second=start_second,
            end_second=end_second,
            fps=fps,
            width=width,
            height=height,
        )
        return cls(frames=frames, fps=out_fps, audio=audio)

    @classmethod
    def from_frames(cls, frames: np.ndarray, fps: float) -> Video:
        if frames.ndim != 4:
            raise ValueError(f"Unsupported number of dimensions: {frames.shape}!")
        elif frames.shape[-1] == 4:
            frames = frames[:, :, :, :3]
        elif frames.shape[-1] != 3:
            raise ValueError(f"Unsupported number of dimensions: {frames.shape}!")
        return cls(frames=frames, fps=fps)

    @classmethod
    def from_image(cls, image: np.ndarray, fps: float = 24.0, length_seconds: float = 1.0) -> Video:
        if len(image.shape) == 3:
            image = np.expand_dims(image, axis=0)
        frames = np.repeat(image, round(length_seconds * fps), axis=0)
        return cls(frames=frames, fps=fps)

    def copy(self) -> Video:
        copied = Video.from_frames(self.frames.copy(), self.fps)
        copied.audio = self.audio  # Audio objects are immutable, no need to copy
        return copied

    def is_loaded(self) -> bool:
        return self.fps is not None and self.frames is not None and self.audio is not None

    def split(self, frame_index: int | None = None) -> tuple[Video, Video]:
        if frame_index:
            if not (0 <= frame_index <= len(self.frames)):
                raise ValueError(f"frame_idx must be between 0 and {len(self.frames)}, got {frame_index}")
        else:
            frame_index = len(self.frames) // 2

        split_videos = (
            self.from_frames(self.frames[:frame_index], self.fps),
            self.from_frames(self.frames[frame_index:], self.fps),
        )

        # Split audio at the corresponding time point
        split_time = frame_index / self.fps
        split_videos[0].audio = self.audio.slice(start_seconds=0, end_seconds=split_time)
        split_videos[1].audio = self.audio.slice(start_seconds=split_time)

        return split_videos

    def save(
        self,
        filename: str | Path | None = None,
        format: ALLOWED_VIDEO_FORMATS = "mp4",
        preset: ALLOWED_VIDEO_PRESETS = "medium",
        crf: int = 23,
    ) -> Path:
        """Save video to file.

        Args:
            filename: Output filename. If None, generates random name
            format: Output format (mp4, avi, mov, mkv, webm)
            preset: Encoding speed/compression tradeoff. Slower presets give smaller
                files at the same quality. Options from fastest to smallest:
                ultrafast, superfast, veryfast, faster, fast, medium, slow, slower, veryslow
            crf: Constant Rate Factor (0-51). Lower = better quality, larger file.
                Default 23 is visually lossless for most content. Range 18-28 recommended.

        Returns:
            Path to saved video file

        Raises:
            RuntimeError: If video is not loaded
            ValueError: If format or preset is not supported
        """
        if not self.is_loaded():
            raise RuntimeError("Video is not loaded, cannot save!")

        return _video_io.encode_video(
            self.frames,
            self.fps,
            self.audio,
            filename=filename,
            format=format,
            preset=preset,
            crf=crf,
        )

    def add_audio(self, audio: Audio, overlay: bool = True) -> Video:
        """Add audio to video, returning a new Video instance.

        Args:
            audio: Audio to add
            overlay: If True, overlay on existing audio; if False, replace it

        Returns:
            New Video with the audio added
        """
        video_duration = self.total_seconds
        audio_duration = audio.metadata.duration_seconds

        if audio_duration > video_duration:
            audio = audio.slice(start_seconds=0, end_seconds=video_duration)
        elif audio_duration < video_duration:
            silence_duration = video_duration - audio_duration
            silence = Audio.create_silent(
                duration_seconds=silence_duration,
                stereo=audio.metadata.channels == 2,
                sample_rate=audio.metadata.sample_rate,
            )
            audio = audio.concat(silence)

        new_video = self.copy()
        if new_video.audio.is_silent:
            new_video.audio = audio
        elif overlay:
            new_video.audio = new_video.audio.overlay(audio, position=0.0)
        else:
            new_video.audio = audio
        return new_video

    def add_audio_from_file(self, path: str, overlay: bool = True) -> Video:
        """Add audio from file, returning a new Video instance.

        Args:
            path: Path to audio file
            overlay: If True, overlay on existing audio; if False, replace it

        Returns:
            New Video with the audio added

        Raises:
            AudioLoadError: If audio file cannot be loaded
            FileNotFoundError: If audio file does not exist
        """
        new_audio = Audio.from_path(path)
        return self.add_audio(new_audio, overlay)

    def __add__(self, other: Video) -> Video:
        if self.fps != other.fps:
            raise ValueError("FPS of videos do not match!")
        elif self.frame_shape != other.frame_shape:
            raise ValueError(f"Resolutions do not match: {self.frame_shape} vs {other.frame_shape}")
        new_video = self.from_frames(np.r_["0,2", self.frames, other.frames], fps=self.fps)
        new_video.audio = self.audio.concat(other.audio)
        return new_video

    def __str__(self) -> str:
        return str(self.metadata)

    def __getitem__(self, val: slice) -> Video:
        if not isinstance(val, slice):
            raise ValueError("Only slices are supported for video indexing!")

        # Sub-slice video frames
        sliced = self.from_frames(self.frames[val], fps=self.fps)

        # Handle slicing bounds for audio
        start = val.start if val.start else 0
        stop = val.stop if val.stop else len(self.frames)
        if start < 0:
            start = len(self.frames) + start
        if stop < 0:
            stop = len(self.frames) + stop

        # Slice audio to match video duration
        audio_start = start / self.fps
        audio_end = stop / self.fps
        sliced.audio = self.audio.slice(start_seconds=audio_start, end_seconds=audio_end)
        return sliced

    @property
    def video_shape(self) -> tuple[int, int, int, int]:
        return self.frames.shape

    @property
    def frame_shape(self) -> tuple[int, int, int]:
        return self.frames.shape[1:]

    @property
    def total_seconds(self) -> float:
        return round(self.frames.shape[0] / self.fps, 4)

    @property
    def metadata(self) -> VideoMetadata:
        return VideoMetadata.from_video(self)

video_shape property

video_shape: tuple[int, int, int, int]

frame_shape property

frame_shape: tuple[int, int, int]

total_seconds property

total_seconds: float

metadata property

metadata: VideoMetadata

__init__

__init__(
    frames: ndarray,
    fps: int | float,
    audio: Audio | None = None,
)
Source code in src/videopython/base/video.py
def __init__(self, frames: np.ndarray, fps: int | float, audio: Audio | None = None):
    self.frames = frames
    self.fps = fps
    if audio:
        self.audio = audio
    else:
        self.audio = Audio.create_silent(
            duration_seconds=round(self.total_seconds, 2), stereo=True, sample_rate=44100
        )

from_path classmethod

from_path(
    path: str,
    read_batch_size: int = 100,
    start_second: float | None = None,
    end_second: float | None = None,
    fps: float | None = None,
    width: int | None = None,
    height: int | None = None,
) -> Video
Source code in src/videopython/base/video.py
@classmethod
def from_path(
    cls,
    path: str,
    read_batch_size: int = 100,
    start_second: float | None = None,
    end_second: float | None = None,
    fps: float | None = None,
    width: int | None = None,
    height: int | None = None,
) -> Video:
    frames, out_fps, audio = _video_io.decode_video(
        path,
        read_batch_size=read_batch_size,
        start_second=start_second,
        end_second=end_second,
        fps=fps,
        width=width,
        height=height,
    )
    return cls(frames=frames, fps=out_fps, audio=audio)

from_frames classmethod

from_frames(frames: ndarray, fps: float) -> Video
Source code in src/videopython/base/video.py
@classmethod
def from_frames(cls, frames: np.ndarray, fps: float) -> Video:
    if frames.ndim != 4:
        raise ValueError(f"Unsupported number of dimensions: {frames.shape}!")
    elif frames.shape[-1] == 4:
        frames = frames[:, :, :, :3]
    elif frames.shape[-1] != 3:
        raise ValueError(f"Unsupported number of dimensions: {frames.shape}!")
    return cls(frames=frames, fps=fps)

from_image classmethod

from_image(
    image: ndarray,
    fps: float = 24.0,
    length_seconds: float = 1.0,
) -> Video
Source code in src/videopython/base/video.py
@classmethod
def from_image(cls, image: np.ndarray, fps: float = 24.0, length_seconds: float = 1.0) -> Video:
    if len(image.shape) == 3:
        image = np.expand_dims(image, axis=0)
    frames = np.repeat(image, round(length_seconds * fps), axis=0)
    return cls(frames=frames, fps=fps)

save

save(
    filename: str | Path | None = None,
    format: ALLOWED_VIDEO_FORMATS = "mp4",
    preset: ALLOWED_VIDEO_PRESETS = "medium",
    crf: int = 23,
) -> Path

Save video to file.

Parameters:

Name Type Description Default
filename str | Path | None

Output filename. If None, generates random name

None
format ALLOWED_VIDEO_FORMATS

Output format (mp4, avi, mov, mkv, webm)

'mp4'
preset ALLOWED_VIDEO_PRESETS

Encoding speed/compression tradeoff. Slower presets give smaller files at the same quality. Options from fastest to smallest: ultrafast, superfast, veryfast, faster, fast, medium, slow, slower, veryslow

'medium'
crf int

Constant Rate Factor (0-51). Lower = better quality, larger file. Default 23 is visually lossless for most content. Range 18-28 recommended.

23

Returns:

Type Description
Path

Path to saved video file

Raises:

Type Description
RuntimeError

If video is not loaded

ValueError

If format or preset is not supported

Source code in src/videopython/base/video.py
def save(
    self,
    filename: str | Path | None = None,
    format: ALLOWED_VIDEO_FORMATS = "mp4",
    preset: ALLOWED_VIDEO_PRESETS = "medium",
    crf: int = 23,
) -> Path:
    """Save video to file.

    Args:
        filename: Output filename. If None, generates random name
        format: Output format (mp4, avi, mov, mkv, webm)
        preset: Encoding speed/compression tradeoff. Slower presets give smaller
            files at the same quality. Options from fastest to smallest:
            ultrafast, superfast, veryfast, faster, fast, medium, slow, slower, veryslow
        crf: Constant Rate Factor (0-51). Lower = better quality, larger file.
            Default 23 is visually lossless for most content. Range 18-28 recommended.

    Returns:
        Path to saved video file

    Raises:
        RuntimeError: If video is not loaded
        ValueError: If format or preset is not supported
    """
    if not self.is_loaded():
        raise RuntimeError("Video is not loaded, cannot save!")

    return _video_io.encode_video(
        self.frames,
        self.fps,
        self.audio,
        filename=filename,
        format=format,
        preset=preset,
        crf=crf,
    )

copy

copy() -> Video
Source code in src/videopython/base/video.py
def copy(self) -> Video:
    copied = Video.from_frames(self.frames.copy(), self.fps)
    copied.audio = self.audio  # Audio objects are immutable, no need to copy
    return copied

split

split(
    frame_index: int | None = None,
) -> tuple[Video, Video]
Source code in src/videopython/base/video.py
def split(self, frame_index: int | None = None) -> tuple[Video, Video]:
    if frame_index:
        if not (0 <= frame_index <= len(self.frames)):
            raise ValueError(f"frame_idx must be between 0 and {len(self.frames)}, got {frame_index}")
    else:
        frame_index = len(self.frames) // 2

    split_videos = (
        self.from_frames(self.frames[:frame_index], self.fps),
        self.from_frames(self.frames[frame_index:], self.fps),
    )

    # Split audio at the corresponding time point
    split_time = frame_index / self.fps
    split_videos[0].audio = self.audio.slice(start_seconds=0, end_seconds=split_time)
    split_videos[1].audio = self.audio.slice(start_seconds=split_time)

    return split_videos

add_audio

add_audio(audio: Audio, overlay: bool = True) -> Video

Add audio to video, returning a new Video instance.

Parameters:

Name Type Description Default
audio Audio

Audio to add

required
overlay bool

If True, overlay on existing audio; if False, replace it

True

Returns:

Type Description
Video

New Video with the audio added

Source code in src/videopython/base/video.py
def add_audio(self, audio: Audio, overlay: bool = True) -> Video:
    """Add audio to video, returning a new Video instance.

    Args:
        audio: Audio to add
        overlay: If True, overlay on existing audio; if False, replace it

    Returns:
        New Video with the audio added
    """
    video_duration = self.total_seconds
    audio_duration = audio.metadata.duration_seconds

    if audio_duration > video_duration:
        audio = audio.slice(start_seconds=0, end_seconds=video_duration)
    elif audio_duration < video_duration:
        silence_duration = video_duration - audio_duration
        silence = Audio.create_silent(
            duration_seconds=silence_duration,
            stereo=audio.metadata.channels == 2,
            sample_rate=audio.metadata.sample_rate,
        )
        audio = audio.concat(silence)

    new_video = self.copy()
    if new_video.audio.is_silent:
        new_video.audio = audio
    elif overlay:
        new_video.audio = new_video.audio.overlay(audio, position=0.0)
    else:
        new_video.audio = audio
    return new_video

add_audio_from_file

add_audio_from_file(
    path: str, overlay: bool = True
) -> Video

Add audio from file, returning a new Video instance.

Parameters:

Name Type Description Default
path str

Path to audio file

required
overlay bool

If True, overlay on existing audio; if False, replace it

True

Returns:

Type Description
Video

New Video with the audio added

Raises:

Type Description
AudioLoadError

If audio file cannot be loaded

FileNotFoundError

If audio file does not exist

Source code in src/videopython/base/video.py
def add_audio_from_file(self, path: str, overlay: bool = True) -> Video:
    """Add audio from file, returning a new Video instance.

    Args:
        path: Path to audio file
        overlay: If True, overlay on existing audio; if False, replace it

    Returns:
        New Video with the audio added

    Raises:
        AudioLoadError: If audio file cannot be loaded
        FileNotFoundError: If audio file does not exist
    """
    new_audio = Audio.from_path(path)
    return self.add_audio(new_audio, overlay)

is_loaded

is_loaded() -> bool
Source code in src/videopython/base/video.py
def is_loaded(self) -> bool:
    return self.fps is not None and self.frames is not None and self.audio is not None

VideoMetadata

Get video metadata without loading frames into memory:

from videopython.base import VideoMetadata

metadata = VideoMetadata.from_path("video.mp4")
print(f"Duration: {metadata.total_seconds}s")
print(f"Resolution: {metadata.width}x{metadata.height}")
print(f"FPS: {metadata.fps}")
print(f"Total frames: {metadata.frame_count}")

VideoMetadata dataclass

Class to store video metadata.

Source code in src/videopython/base/video.py
@dataclass
class VideoMetadata:
    """Class to store video metadata."""

    height: int
    width: int
    fps: float
    frame_count: int
    total_seconds: float

    def __str__(self) -> str:
        return f"{self.width}x{self.height} @ {self.fps}fps, {self.total_seconds} seconds"

    def __repr__(self) -> str:
        return self.__str__()

    def get_frame_shape(self) -> np.ndarray:
        """Returns frame shape."""
        return np.array((self.height, self.width, 3))

    def get_video_shape(self) -> np.ndarray:
        """Returns video shape."""
        return np.array((self.frame_count, self.height, self.width, 3))

    @staticmethod
    def _run_ffprobe(video_path: str | Path) -> dict[str, Any]:
        """Run ffprobe and return parsed JSON output."""
        try:
            return _ffmpeg.probe(
                video_path,
                extra_args=[
                    "-select_streams",
                    "v:0",
                    "-show_entries",
                    "stream=width,height,r_frame_rate,nb_frames",
                    "-show_entries",
                    "format=duration",
                ],
            )
        except FFmpegProbeError as e:
            raise VideoMetadataError(str(e)) from e

    @classmethod
    def from_path(cls, video_path: str | Path) -> VideoMetadata:
        """Creates VideoMetadata object from video file using ffprobe."""
        if not Path(video_path).exists():
            raise FileNotFoundError(f"Video file not found: {video_path}")

        probe_data = cls._run_ffprobe(video_path)

        try:
            stream_info = probe_data["streams"][0]

            width = int(stream_info["width"])
            height = int(stream_info["height"])

            try:
                fps_fraction = Fraction(stream_info["r_frame_rate"])
                fps = float(fps_fraction)
            except (ValueError, ZeroDivisionError):
                raise VideoMetadataError(f"Invalid frame rate: {stream_info['r_frame_rate']}")

            if "nb_frames" in stream_info and stream_info["nb_frames"].isdigit():
                frame_count = int(stream_info["nb_frames"])
            else:
                duration = float(probe_data["format"]["duration"])
                frame_count = int(round(duration * fps))

            total_seconds = round(frame_count / fps, 4)

            return cls(height=height, width=width, fps=fps, frame_count=frame_count, total_seconds=total_seconds)

        except KeyError as e:
            raise VideoMetadataError(f"Missing required metadata field: {e}")
        except (TypeError, IndexError) as e:
            raise VideoMetadataError(f"Invalid metadata structure: {e}")

    @classmethod
    def from_video(cls, video: Video) -> VideoMetadata:
        """Creates VideoMetadata object from Video instance."""
        frame_count, height, width, _ = video.frames.shape
        total_seconds = round(frame_count / video.fps, 4)

        return cls(height=height, width=width, fps=video.fps, frame_count=frame_count, total_seconds=total_seconds)

    def with_duration(self, seconds: float) -> VideoMetadata:
        """Return new metadata with updated duration.

        Args:
            seconds: New duration in seconds.

        Returns:
            New VideoMetadata with updated duration and frame count.
        """
        return VideoMetadata(
            height=self.height,
            width=self.width,
            fps=self.fps,
            frame_count=round(self.fps * seconds),
            total_seconds=round(seconds, 4),
        )

    def with_dimensions(self, width: int, height: int) -> VideoMetadata:
        """Return new metadata with updated dimensions.

        Args:
            width: New width in pixels.
            height: New height in pixels.

        Returns:
            New VideoMetadata with updated dimensions.
        """
        return VideoMetadata(
            height=height,
            width=width,
            fps=self.fps,
            frame_count=self.frame_count,
            total_seconds=self.total_seconds,
        )

    def with_fps(self, fps: float) -> VideoMetadata:
        """Return new metadata with updated fps.

        Args:
            fps: New frames per second.

        Returns:
            New VideoMetadata with updated fps (duration stays same).
        """
        return VideoMetadata(
            height=self.height,
            width=self.width,
            fps=fps,
            frame_count=round(fps * self.total_seconds),
            total_seconds=self.total_seconds,
        )

    def can_be_downsampled_to(self, target_format: VideoMetadata) -> bool:
        """Checks if video can be downsampled to target_format."""
        return (
            self.height >= target_format.height
            and self.width >= target_format.width
            and round(self.fps) >= round(target_format.fps)
            and self.total_seconds >= target_format.total_seconds
        )

get_frame_shape

get_frame_shape() -> np.ndarray

Returns frame shape.

Source code in src/videopython/base/video.py
def get_frame_shape(self) -> np.ndarray:
    """Returns frame shape."""
    return np.array((self.height, self.width, 3))

get_video_shape

get_video_shape() -> np.ndarray

Returns video shape.

Source code in src/videopython/base/video.py
def get_video_shape(self) -> np.ndarray:
    """Returns video shape."""
    return np.array((self.frame_count, self.height, self.width, 3))

from_path classmethod

from_path(video_path: str | Path) -> VideoMetadata

Creates VideoMetadata object from video file using ffprobe.

Source code in src/videopython/base/video.py
@classmethod
def from_path(cls, video_path: str | Path) -> VideoMetadata:
    """Creates VideoMetadata object from video file using ffprobe."""
    if not Path(video_path).exists():
        raise FileNotFoundError(f"Video file not found: {video_path}")

    probe_data = cls._run_ffprobe(video_path)

    try:
        stream_info = probe_data["streams"][0]

        width = int(stream_info["width"])
        height = int(stream_info["height"])

        try:
            fps_fraction = Fraction(stream_info["r_frame_rate"])
            fps = float(fps_fraction)
        except (ValueError, ZeroDivisionError):
            raise VideoMetadataError(f"Invalid frame rate: {stream_info['r_frame_rate']}")

        if "nb_frames" in stream_info and stream_info["nb_frames"].isdigit():
            frame_count = int(stream_info["nb_frames"])
        else:
            duration = float(probe_data["format"]["duration"])
            frame_count = int(round(duration * fps))

        total_seconds = round(frame_count / fps, 4)

        return cls(height=height, width=width, fps=fps, frame_count=frame_count, total_seconds=total_seconds)

    except KeyError as e:
        raise VideoMetadataError(f"Missing required metadata field: {e}")
    except (TypeError, IndexError) as e:
        raise VideoMetadataError(f"Invalid metadata structure: {e}")

from_video classmethod

from_video(video: Video) -> VideoMetadata

Creates VideoMetadata object from Video instance.

Source code in src/videopython/base/video.py
@classmethod
def from_video(cls, video: Video) -> VideoMetadata:
    """Creates VideoMetadata object from Video instance."""
    frame_count, height, width, _ = video.frames.shape
    total_seconds = round(frame_count / video.fps, 4)

    return cls(height=height, width=width, fps=video.fps, frame_count=frame_count, total_seconds=total_seconds)

with_duration

with_duration(seconds: float) -> VideoMetadata

Return new metadata with updated duration.

Parameters:

Name Type Description Default
seconds float

New duration in seconds.

required

Returns:

Type Description
VideoMetadata

New VideoMetadata with updated duration and frame count.

Source code in src/videopython/base/video.py
def with_duration(self, seconds: float) -> VideoMetadata:
    """Return new metadata with updated duration.

    Args:
        seconds: New duration in seconds.

    Returns:
        New VideoMetadata with updated duration and frame count.
    """
    return VideoMetadata(
        height=self.height,
        width=self.width,
        fps=self.fps,
        frame_count=round(self.fps * seconds),
        total_seconds=round(seconds, 4),
    )

with_dimensions

with_dimensions(width: int, height: int) -> VideoMetadata

Return new metadata with updated dimensions.

Parameters:

Name Type Description Default
width int

New width in pixels.

required
height int

New height in pixels.

required

Returns:

Type Description
VideoMetadata

New VideoMetadata with updated dimensions.

Source code in src/videopython/base/video.py
def with_dimensions(self, width: int, height: int) -> VideoMetadata:
    """Return new metadata with updated dimensions.

    Args:
        width: New width in pixels.
        height: New height in pixels.

    Returns:
        New VideoMetadata with updated dimensions.
    """
    return VideoMetadata(
        height=height,
        width=width,
        fps=self.fps,
        frame_count=self.frame_count,
        total_seconds=self.total_seconds,
    )

with_fps

with_fps(fps: float) -> VideoMetadata

Return new metadata with updated fps.

Parameters:

Name Type Description Default
fps float

New frames per second.

required

Returns:

Type Description
VideoMetadata

New VideoMetadata with updated fps (duration stays same).

Source code in src/videopython/base/video.py
def with_fps(self, fps: float) -> VideoMetadata:
    """Return new metadata with updated fps.

    Args:
        fps: New frames per second.

    Returns:
        New VideoMetadata with updated fps (duration stays same).
    """
    return VideoMetadata(
        height=self.height,
        width=self.width,
        fps=fps,
        frame_count=round(fps * self.total_seconds),
        total_seconds=self.total_seconds,
    )

can_be_downsampled_to

can_be_downsampled_to(target_format: VideoMetadata) -> bool

Checks if video can be downsampled to target_format.

Source code in src/videopython/base/video.py
def can_be_downsampled_to(self, target_format: VideoMetadata) -> bool:
    """Checks if video can be downsampled to target_format."""
    return (
        self.height >= target_format.height
        and self.width >= target_format.width
        and round(self.fps) >= round(target_format.fps)
        and self.total_seconds >= target_format.total_seconds
    )

FrameIterator

Memory-efficient frame iterator for streaming video frames without loading the entire video into memory. Useful for processing very long videos.

from videopython.base import FrameIterator

# Stream frames one at a time - O(1) memory usage
with FrameIterator("long_video.mp4") as frames:
    for frame_idx, frame in frames:
        # frame is a numpy array (H, W, 3) in RGB format
        process_frame(frame)

# With time bounds
with FrameIterator("video.mp4", start_second=10.0, end_second=60.0) as frames:
    for frame_idx, frame in frames:
        process_frame(frame)

FrameIterator

Memory-efficient frame iterator using ffmpeg streaming.

Yields frames one at a time, keeping memory usage constant regardless of video length. Supports context manager protocol for resource cleanup.

This is useful for operations that only need to process frames sequentially, such as scene detection, without loading the entire video into memory.

Example

with FrameIterator("video.mp4") as frames: ... for idx, frame in frames: ... process(frame)

Source code in src/videopython/base/video.py
class FrameIterator:
    """Memory-efficient frame iterator using ffmpeg streaming.

    Yields frames one at a time, keeping memory usage constant regardless
    of video length. Supports context manager protocol for resource cleanup.

    This is useful for operations that only need to process frames sequentially,
    such as scene detection, without loading the entire video into memory.

    Example:
        >>> with FrameIterator("video.mp4") as frames:
        ...     for idx, frame in frames:
        ...         process(frame)
    """

    def __init__(
        self,
        path: str | Path,
        start_second: float | None = None,
        end_second: float | None = None,
        vf_filters: list[str] | None = None,
        output_fps: float | None = None,
        output_width: int | None = None,
        output_height: int | None = None,
    ):
        """Initialize the frame iterator.

        Args:
            path: Path to video file
            start_second: Optional start time in seconds (seek before reading)
            end_second: Optional end time in seconds (stop reading after this)
            vf_filters: Optional list of ffmpeg -vf filter expressions to apply
                during decode (e.g. ``["scale=1280:720", "fps=30"]``).
            output_fps: Override output fps (adds fps filter if not in vf_filters).
            output_width: Override output width for frame size calculation.
            output_height: Override output height for frame size calculation.
        """
        self.path = Path(path)
        if not self.path.exists():
            raise FileNotFoundError(f"Video file not found: {path}")

        self.metadata = VideoMetadata.from_path(path)
        self.start_second = start_second if start_second is not None else 0.0
        self.end_second = end_second
        self._iter: Generator[tuple[int, np.ndarray], None, None] | None = None

        # Build -vf filter chain
        self._vf_filters = list(vf_filters) if vf_filters else []
        if output_fps is not None and not any(f.startswith("fps=") for f in self._vf_filters):
            self._vf_filters.append(f"fps={output_fps}")

        # Output dimensions (after filters)
        self.output_width = output_width or self.metadata.width
        self.output_height = output_height or self.metadata.height
        self.output_fps = output_fps or self.metadata.fps
        self._frame_size = self.output_width * self.output_height * 3

    def _build_ffmpeg_command(self) -> list[str]:
        """Build ffmpeg command for frame streaming."""
        cmd = ["ffmpeg", "-hide_banner", "-loglevel", "error"]

        if self.start_second > 0:
            cmd.extend(["-ss", str(self.start_second)])

        cmd.extend(["-i", str(self.path)])

        if self.end_second is not None:
            duration = self.end_second - self.start_second
            cmd.extend(["-t", str(duration)])

        if self._vf_filters:
            cmd.extend(["-vf", ",".join(self._vf_filters)])

        cmd.extend(
            [
                "-f",
                "rawvideo",
                "-pix_fmt",
                "rgb24",
                "-vcodec",
                "rawvideo",
                "-y",
                "pipe:1",
            ]
        )
        return cmd

    def __iter__(self) -> Generator[tuple[int, np.ndarray], None, None]:
        """Yield (frame_index, frame) tuples.

        Frame indices are absolute indices in the original video,
        accounting for any start_second offset.
        """
        self._iter = self._iter_frames()
        return self._iter

    def _iter_frames(self) -> Generator[tuple[int, np.ndarray], None, None]:
        cmd = self._build_ffmpeg_command()
        with _ffmpeg.popen_decode(cmd, bufsize=self._frame_size * 2) as proc:
            frame_idx = int(self.start_second * self.output_fps)
            while True:
                raw_frame = proc.stdout.read(self._frame_size)  # type: ignore[union-attr]
                if len(raw_frame) != self._frame_size:
                    break
                frame = (
                    np.frombuffer(raw_frame, dtype=np.uint8).copy().reshape(self.output_height, self.output_width, 3)
                )
                yield frame_idx, frame
                frame_idx += 1

    def __enter__(self) -> "FrameIterator":
        return self

    def __exit__(self, *args: object) -> None:
        if self._iter is not None:
            self._iter.close()
            self._iter = None

__init__

__init__(
    path: str | Path,
    start_second: float | None = None,
    end_second: float | None = None,
    vf_filters: list[str] | None = None,
    output_fps: float | None = None,
    output_width: int | None = None,
    output_height: int | None = None,
)

Initialize the frame iterator.

Parameters:

Name Type Description Default
path str | Path

Path to video file

required
start_second float | None

Optional start time in seconds (seek before reading)

None
end_second float | None

Optional end time in seconds (stop reading after this)

None
vf_filters list[str] | None

Optional list of ffmpeg -vf filter expressions to apply during decode (e.g. ["scale=1280:720", "fps=30"]).

None
output_fps float | None

Override output fps (adds fps filter if not in vf_filters).

None
output_width int | None

Override output width for frame size calculation.

None
output_height int | None

Override output height for frame size calculation.

None
Source code in src/videopython/base/video.py
def __init__(
    self,
    path: str | Path,
    start_second: float | None = None,
    end_second: float | None = None,
    vf_filters: list[str] | None = None,
    output_fps: float | None = None,
    output_width: int | None = None,
    output_height: int | None = None,
):
    """Initialize the frame iterator.

    Args:
        path: Path to video file
        start_second: Optional start time in seconds (seek before reading)
        end_second: Optional end time in seconds (stop reading after this)
        vf_filters: Optional list of ffmpeg -vf filter expressions to apply
            during decode (e.g. ``["scale=1280:720", "fps=30"]``).
        output_fps: Override output fps (adds fps filter if not in vf_filters).
        output_width: Override output width for frame size calculation.
        output_height: Override output height for frame size calculation.
    """
    self.path = Path(path)
    if not self.path.exists():
        raise FileNotFoundError(f"Video file not found: {path}")

    self.metadata = VideoMetadata.from_path(path)
    self.start_second = start_second if start_second is not None else 0.0
    self.end_second = end_second
    self._iter: Generator[tuple[int, np.ndarray], None, None] | None = None

    # Build -vf filter chain
    self._vf_filters = list(vf_filters) if vf_filters else []
    if output_fps is not None and not any(f.startswith("fps=") for f in self._vf_filters):
        self._vf_filters.append(f"fps={output_fps}")

    # Output dimensions (after filters)
    self.output_width = output_width or self.metadata.width
    self.output_height = output_height or self.metadata.height
    self.output_fps = output_fps or self.metadata.fps
    self._frame_size = self.output_width * self.output_height * 3

__iter__

__iter__() -> Generator[tuple[int, np.ndarray], None, None]

Yield (frame_index, frame) tuples.

Frame indices are absolute indices in the original video, accounting for any start_second offset.

Source code in src/videopython/base/video.py
def __iter__(self) -> Generator[tuple[int, np.ndarray], None, None]:
    """Yield (frame_index, frame) tuples.

    Frame indices are absolute indices in the original video,
    accounting for any start_second offset.
    """
    self._iter = self._iter_frames()
    return self._iter