Skip to content

AI Understanding

Analyze videos, transcribe audio, and describe visual content.

For a single aggregate, serializable analysis object across multiple analyzers, see Video Analysis.

Local Model Support

Class Local Model Family
ImageToText BLIP
AudioToText Whisper
AudioClassifier AST
ObjectDetector YOLO
TextDetector EasyOCR
FaceDetector OpenCV / YOLOv8-face
CameraMotionDetector OpenCV
MotionAnalyzer OpenCV
ActionRecognizer VideoMAE
SemanticSceneDetector TransNetV2

AudioToText

AudioToText

Transcription service for audio and video using local Whisper models.

Source code in src/videopython/ai/understanding/audio.py
class AudioToText:
    """Transcription service for audio and video using local Whisper models."""

    def __init__(
        self,
        model_name: Literal["tiny", "base", "small", "medium", "large", "turbo"] = "small",
        enable_diarization: bool = False,
        device: str | None = None,
        compute_type: str = "float32",
    ):
        self.model_name = model_name
        self.enable_diarization = enable_diarization
        self.device = device if device is not None else _detect_device()
        self.compute_type = compute_type
        self._model: Any = None

    def _init_local(self) -> None:
        """Initialize local Whisper model."""
        if self.enable_diarization:
            import whisperx  # type: ignore

            self._model = whisperx.load_model(self.model_name, device=self.device, compute_type=self.compute_type)
        else:
            import whisper

            self._model = whisper.load_model(name=self.model_name)

    def _process_transcription_result(self, transcription_result: dict) -> Transcription:
        """Process raw transcription result into a Transcription object."""
        transcription_segments = []
        for segment in transcription_result["segments"]:
            transcription_words = [
                TranscriptionWord(word=word["word"], start=float(word["start"]), end=float(word["end"]))
                for word in segment.get("words", [])
            ]
            transcription_segment = TranscriptionSegment(
                start=segment["start"],
                end=segment["end"],
                text=segment["text"],
                words=transcription_words,
            )
            transcription_segments.append(transcription_segment)

        return Transcription(segments=transcription_segments)

    def _process_whisperx_result(self, whisperx_result: dict, audio_data) -> Transcription:
        """Process whisperx result with diarization."""
        import torch.serialization
        import whisperx  # type: ignore
        from omegaconf import DictConfig, ListConfig, OmegaConf

        torch.serialization.add_safe_globals([DictConfig, ListConfig, OmegaConf])

        model_a, metadata = whisperx.load_align_model(language_code=whisperx_result["language"], device=self.device)
        aligned_result = whisperx.align(
            whisperx_result["segments"],
            model_a,
            metadata,
            audio_data,
            self.device,
            return_char_alignments=False,
        )

        diarize_model = whisperx.diarize.DiarizationPipeline(device=self.device)
        diarize_segments = diarize_model(audio_data)
        result_with_speakers = whisperx.assign_word_speakers(diarize_segments, aligned_result)

        words = []
        for item in result_with_speakers["word_segments"]:
            words.append(
                TranscriptionWord(
                    word=item["word"],
                    start=item["start"],
                    end=item["end"],
                    speaker=item.get("speaker", None),
                )
            )

        return Transcription(words=words)

    def _transcribe_local(self, audio: Audio) -> Transcription:
        """Transcribe using local Whisper model."""
        import whisper

        if self._model is None:
            self._init_local()

        audio_mono = audio.to_mono().resample(whisper.audio.SAMPLE_RATE)

        if self.enable_diarization:
            audio_data = audio_mono.data
            transcription_result = self._model.transcribe(audio_data)
            return self._process_whisperx_result(transcription_result, audio_data)

        transcription_result = self._model.transcribe(audio=audio_mono.data, word_timestamps=True)
        return self._process_transcription_result(transcription_result)

    def transcribe(self, media: Audio | Video) -> Transcription:
        """Transcribe audio or video to text."""
        if isinstance(media, Video):
            if media.audio.is_silent:
                return Transcription(segments=[])
            audio = media.audio
        elif isinstance(media, Audio):
            if media.is_silent:
                return Transcription(segments=[])
            audio = media
        else:
            raise TypeError(f"Unsupported media type: {type(media)}. Expected Audio or Video.")

        return self._transcribe_local(audio)

transcribe

transcribe(media: Audio | Video) -> Transcription

Transcribe audio or video to text.

Source code in src/videopython/ai/understanding/audio.py
def transcribe(self, media: Audio | Video) -> Transcription:
    """Transcribe audio or video to text."""
    if isinstance(media, Video):
        if media.audio.is_silent:
            return Transcription(segments=[])
        audio = media.audio
    elif isinstance(media, Audio):
        if media.is_silent:
            return Transcription(segments=[])
        audio = media
    else:
        raise TypeError(f"Unsupported media type: {type(media)}. Expected Audio or Video.")

    return self._transcribe_local(audio)

AudioClassifier

Detect and classify sounds, music, and audio events with timestamps using Audio Spectrogram Transformer (AST), a state-of-the-art model achieving 0.485 mAP on AudioSet.

Basic Usage

from videopython.ai import AudioClassifier
from videopython.base import Video

classifier = AudioClassifier(confidence_threshold=0.3)
video = Video.from_path("video.mp4")

result = classifier.classify(video)

# Clip-level predictions (overall audio content)
for label, confidence in result.clip_predictions.items():
    print(f"{label}: {confidence:.2f}")

# Timestamped events
for event in result.events:
    print(f"{event.start:.1f}s - {event.end:.1f}s: {event.label} ({event.confidence:.2f})")

AudioClassifier

Audio event and sound classification using AST.

Source code in src/videopython/ai/understanding/audio.py
class AudioClassifier:
    """Audio event and sound classification using AST."""

    SUPPORTED_MODELS: list[str] = ["MIT/ast-finetuned-audioset-10-10-0.4593"]
    AST_SAMPLE_RATE: int = 16000
    AST_CHUNK_SECONDS: float = 10.0
    AST_HOP_SECONDS: float = 5.0

    def __init__(
        self,
        model_name: str = "MIT/ast-finetuned-audioset-10-10-0.4593",
        confidence_threshold: float = 0.3,
        top_k: int = 10,
        device: str | None = None,
    ):
        if model_name not in self.SUPPORTED_MODELS:
            raise ValueError(f"Model '{model_name}' not supported. Supported: {self.SUPPORTED_MODELS}")

        self.model_name = model_name
        self.confidence_threshold = confidence_threshold
        self.top_k = top_k
        self.device = device if device is not None else _detect_device()

        self._model: Any = None
        self._processor: Any = None
        self._labels: list[str] = []

    def _init_local(self) -> None:
        """Initialize local AST model from HuggingFace."""
        from transformers import ASTFeatureExtractor, ASTForAudioClassification

        self._processor = ASTFeatureExtractor.from_pretrained(self.model_name)
        self._model = ASTForAudioClassification.from_pretrained(self.model_name)
        self._model.to(self.device)
        self._model.eval()

        self._labels = [self._model.config.id2label[i] for i in range(len(self._model.config.id2label))]

    def _merge_events(self, events: list[AudioEvent], gap_threshold: float = 0.5) -> list[AudioEvent]:
        """Merge consecutive events of the same class."""
        if not events:
            return []

        events_by_label: dict[str, list[AudioEvent]] = {}
        for event in events:
            if event.label not in events_by_label:
                events_by_label[event.label] = []
            events_by_label[event.label].append(event)

        merged = []
        for label, label_events in events_by_label.items():
            sorted_events = sorted(label_events, key=lambda e: e.start)
            current = sorted_events[0]

            for next_event in sorted_events[1:]:
                if next_event.start - current.end <= gap_threshold:
                    current = AudioEvent(
                        start=current.start,
                        end=next_event.end,
                        label=label,
                        confidence=max(current.confidence, next_event.confidence),
                    )
                else:
                    merged.append(current)
                    current = next_event

            merged.append(current)

        return sorted(merged, key=lambda e: e.start)

    def _classify_local(self, audio: Audio) -> AudioClassification:
        """Classify audio using local AST model with sliding window."""
        import numpy as np
        import torch

        if self._model is None:
            self._init_local()

        audio_processed = audio.to_mono().resample(self.AST_SAMPLE_RATE)
        audio_data = audio_processed.data.astype(np.float32)

        chunk_samples = int(self.AST_CHUNK_SECONDS * self.AST_SAMPLE_RATE)
        hop_samples = int(self.AST_HOP_SECONDS * self.AST_SAMPLE_RATE)
        total_samples = len(audio_data)

        all_chunk_probs = []
        chunk_times = []

        if total_samples <= chunk_samples:
            chunks = [(0, audio_data)]
        else:
            chunks = []
            start = 0
            while start < total_samples:
                end = min(start + chunk_samples, total_samples)
                chunk = audio_data[start:end]
                if len(chunk) < chunk_samples:
                    chunk = np.pad(chunk, (0, chunk_samples - len(chunk)))
                chunks.append((start, chunk))
                start += hop_samples

        for start_sample, chunk in chunks:
            start_time = start_sample / self.AST_SAMPLE_RATE

            inputs = self._processor(
                chunk,
                sampling_rate=self.AST_SAMPLE_RATE,
                return_tensors="pt",
            )
            inputs = {k: v.to(self.device) for k, v in inputs.items()}

            with torch.no_grad():
                outputs = self._model(**inputs)
                logits = outputs.logits[0]
                probs = torch.sigmoid(logits).cpu().numpy()

            all_chunk_probs.append(probs)
            chunk_times.append(start_time)

        chunk_probs_array = np.array(all_chunk_probs)

        events = []
        for start_time, probs in zip(chunk_times, chunk_probs_array):
            end_time = start_time + self.AST_CHUNK_SECONDS
            top_indices = np.argsort(probs)[-self.top_k :][::-1]

            for class_idx in top_indices:
                confidence = float(probs[class_idx])
                if confidence >= self.confidence_threshold:
                    label = self._labels[class_idx]
                    events.append(
                        AudioEvent(
                            start=start_time,
                            end=min(end_time, total_samples / self.AST_SAMPLE_RATE),
                            label=label,
                            confidence=confidence,
                        )
                    )

        merged_events = self._merge_events(events)

        clip_preds = np.mean(chunk_probs_array, axis=0)
        top_clip_indices = np.argsort(clip_preds)[-self.top_k :][::-1]
        clip_predictions = {
            self._labels[idx]: float(clip_preds[idx])
            for idx in top_clip_indices
            if clip_preds[idx] >= self.confidence_threshold
        }

        return AudioClassification(events=merged_events, clip_predictions=clip_predictions)

    def classify(self, media: Audio | Video) -> AudioClassification:
        """Classify audio events in audio or video."""
        if isinstance(media, Video):
            if media.audio.is_silent:
                return AudioClassification(events=[], clip_predictions={})
            audio = media.audio
        elif isinstance(media, Audio):
            if media.is_silent:
                return AudioClassification(events=[], clip_predictions={})
            audio = media
        else:
            raise TypeError(f"Unsupported media type: {type(media)}. Expected Audio or Video.")

        return self._classify_local(audio)

classify

classify(media: Audio | Video) -> AudioClassification

Classify audio events in audio or video.

Source code in src/videopython/ai/understanding/audio.py
def classify(self, media: Audio | Video) -> AudioClassification:
    """Classify audio events in audio or video."""
    if isinstance(media, Video):
        if media.audio.is_silent:
            return AudioClassification(events=[], clip_predictions={})
        audio = media.audio
    elif isinstance(media, Audio):
        if media.is_silent:
            return AudioClassification(events=[], clip_predictions={})
        audio = media
    else:
        raise TypeError(f"Unsupported media type: {type(media)}. Expected Audio or Video.")

    return self._classify_local(audio)

ImageToText

ImageToText

Generates text descriptions of images using BLIP.

Source code in src/videopython/ai/understanding/image.py
class ImageToText:
    """Generates text descriptions of images using BLIP."""

    def __init__(self, device: str | None = None):
        self.device = device
        self._processor: Any = None
        self._model: Any = None

    def _init_local(self) -> None:
        """Initialize local BLIP model."""
        from transformers.models.blip import BlipForConditionalGeneration, BlipProcessor

        # MPS is intentionally disabled here due to worse BLIP performance/compatibility.
        device = select_device(self.device, mps_allowed=False)

        model_name = "Salesforce/blip-image-captioning-large"
        self._processor = BlipProcessor.from_pretrained(model_name, use_fast=True)
        self._model = BlipForConditionalGeneration.from_pretrained(model_name)
        self._model.to(device)
        self.device = device

    def describe_image(
        self,
        image: np.ndarray | Image.Image,
        prompt: str | None = None,
    ) -> str:
        """Generate a text description of an image."""
        if self._model is None:
            self._init_local()

        pil_image = Image.fromarray(image) if isinstance(image, np.ndarray) else image
        inputs = self._processor(pil_image, prompt, return_tensors="pt").to(self.device)
        output = self._model.generate(**inputs, max_new_tokens=50)
        return self._processor.decode(output[0], skip_special_tokens=True)

describe_image

describe_image(
    image: ndarray | Image, prompt: str | None = None
) -> str

Generate a text description of an image.

Source code in src/videopython/ai/understanding/image.py
def describe_image(
    self,
    image: np.ndarray | Image.Image,
    prompt: str | None = None,
) -> str:
    """Generate a text description of an image."""
    if self._model is None:
        self._init_local()

    pil_image = Image.fromarray(image) if isinstance(image, np.ndarray) else image
    inputs = self._processor(pil_image, prompt, return_tensors="pt").to(self.device)
    output = self._model.generate(**inputs, max_new_tokens=50)
    return self._processor.decode(output[0], skip_special_tokens=True)

Detection Classes

ObjectDetector

ObjectDetector

Detects objects in images using local YOLO models.

Source code in src/videopython/ai/understanding/detection.py
class ObjectDetector:
    """Detects objects in images using local YOLO models."""

    def __init__(
        self,
        model_size: str = "n",
        confidence_threshold: float = 0.25,
        device: str | None = None,
    ):
        self.model_size = model_size
        self.confidence_threshold = confidence_threshold
        self.device = device
        self._model: Any = None

    def _init_yolo(self) -> None:
        """Initialize YOLO model."""
        from ultralytics import YOLO

        self._model = YOLO(f"yolo11{self.model_size}.pt")
        selected_device = select_device(self.device, mps_allowed=False)
        if selected_device != "cpu":
            self._model.to(selected_device)
        self.device = selected_device

    def detect(self, image: np.ndarray | Image.Image) -> list[DetectedObject]:
        """Detect objects in an image."""
        if self._model is None:
            self._init_yolo()

        img_array = np.array(image) if isinstance(image, Image.Image) else image
        results = self._model(img_array, conf=self.confidence_threshold, verbose=False)

        detected_objects: list[DetectedObject] = []
        for result in results:
            boxes = result.boxes
            if boxes is None:
                continue

            img_h, img_w = result.orig_shape
            for i in range(len(boxes)):
                x1, y1, x2, y2 = boxes.xyxy[i].tolist()
                conf = float(boxes.conf[i])
                cls_id = int(boxes.cls[i])
                label = self._model.names[cls_id]

                bbox = BoundingBox(
                    x=x1 / img_w,
                    y=y1 / img_h,
                    width=(x2 - x1) / img_w,
                    height=(y2 - y1) / img_h,
                )

                detected_objects.append(
                    DetectedObject(
                        label=label,
                        confidence=conf,
                        bounding_box=bbox,
                    )
                )

        return detected_objects

detect

detect(image: ndarray | Image) -> list[DetectedObject]

Detect objects in an image.

Source code in src/videopython/ai/understanding/detection.py
def detect(self, image: np.ndarray | Image.Image) -> list[DetectedObject]:
    """Detect objects in an image."""
    if self._model is None:
        self._init_yolo()

    img_array = np.array(image) if isinstance(image, Image.Image) else image
    results = self._model(img_array, conf=self.confidence_threshold, verbose=False)

    detected_objects: list[DetectedObject] = []
    for result in results:
        boxes = result.boxes
        if boxes is None:
            continue

        img_h, img_w = result.orig_shape
        for i in range(len(boxes)):
            x1, y1, x2, y2 = boxes.xyxy[i].tolist()
            conf = float(boxes.conf[i])
            cls_id = int(boxes.cls[i])
            label = self._model.names[cls_id]

            bbox = BoundingBox(
                x=x1 / img_w,
                y=y1 / img_h,
                width=(x2 - x1) / img_w,
                height=(y2 - y1) / img_h,
            )

            detected_objects.append(
                DetectedObject(
                    label=label,
                    confidence=conf,
                    bounding_box=bbox,
                )
            )

    return detected_objects

FaceDetector

FaceDetector

Detects faces in images using OpenCV (CPU) or YOLOv8-face (GPU).

Source code in src/videopython/ai/understanding/detection.py
class FaceDetector:
    """Detects faces in images using OpenCV (CPU) or YOLOv8-face (GPU)."""

    def __init__(
        self,
        confidence_threshold: float = 0.5,
        min_face_size: int = 30,
        backend: Literal["cpu", "gpu", "auto"] = "cpu",
        device: str | None = None,
    ):
        self.confidence_threshold = confidence_threshold
        self.min_face_size = min_face_size
        self.backend: Literal["cpu", "gpu", "auto"] = backend
        self.device = device

        self._cascade: Any = None
        self._yolo_model: Any = None
        self._resolved_backend: Literal["cpu", "gpu"] | None = None

    def _get_device(self) -> str:
        """Get the device to use for GPU inference."""
        return select_device(self.device, mps_allowed=True)

    def _resolve_backend(self) -> Literal["cpu", "gpu"]:
        """Resolve 'auto' backend to an actual backend."""
        if self._resolved_backend is not None:
            return self._resolved_backend

        if self.backend == "auto":
            device = self._get_device()
            self._resolved_backend = "gpu" if device in ("cuda", "mps") else "cpu"
        else:
            self._resolved_backend = self.backend

        return self._resolved_backend

    def _init_cascade(self) -> None:
        """Initialize OpenCV Haar cascade for CPU detection."""
        import cv2

        self._cascade = cv2.CascadeClassifier(cv2.data.haarcascades + "haarcascade_frontalface_default.xml")

    def _init_yolo_face(self) -> None:
        """Initialize YOLO face detection model for GPU detection."""
        from huggingface_hub import hf_hub_download
        from ultralytics import YOLO

        model_path = hf_hub_download(
            repo_id="arnabdhar/YOLOv8-Face-Detection",
            filename="model.pt",
        )
        self._yolo_model = YOLO(model_path)

        device = self._get_device()
        if device != "cpu":
            self._yolo_model.to(device)

    def _detect_cpu(self, image: np.ndarray) -> list[DetectedFace]:
        """Detect faces using OpenCV Haar cascade (CPU)."""
        import cv2

        img_h, img_w = image.shape[:2]

        if len(image.shape) == 3 and image.shape[2] == 3:
            img_bgr = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
        else:
            img_bgr = image

        if self._cascade is None:
            self._init_cascade()

        gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY)
        faces = self._cascade.detectMultiScale(
            gray,
            scaleFactor=1.1,
            minNeighbors=5,
            minSize=(self.min_face_size, self.min_face_size),
        )

        detected_faces: list[DetectedFace] = []
        for x, y, w, h in faces:
            bbox = BoundingBox(
                x=x / img_w,
                y=y / img_h,
                width=w / img_w,
                height=h / img_h,
            )
            detected_faces.append(DetectedFace(bounding_box=bbox, confidence=1.0))

        detected_faces.sort(key=lambda f: f.area or 0, reverse=True)
        return detected_faces

    def _detect_gpu(self, image: np.ndarray) -> list[DetectedFace]:
        """Detect faces using YOLOv8-face model (GPU)."""
        if self._yolo_model is None:
            self._init_yolo_face()

        img_h, img_w = image.shape[:2]
        results = self._yolo_model(image, conf=self.confidence_threshold, verbose=False)

        detected_faces: list[DetectedFace] = []
        for result in results:
            boxes = result.boxes
            if boxes is None:
                continue

            for i in range(len(boxes)):
                x1, y1, x2, y2 = boxes.xyxy[i].tolist()
                conf = float(boxes.conf[i])

                face_w = x2 - x1
                face_h = y2 - y1
                if face_w < self.min_face_size or face_h < self.min_face_size:
                    continue

                bbox = BoundingBox(
                    x=x1 / img_w,
                    y=y1 / img_h,
                    width=face_w / img_w,
                    height=face_h / img_h,
                )
                detected_faces.append(DetectedFace(bounding_box=bbox, confidence=conf))

        detected_faces.sort(key=lambda f: f.area or 0, reverse=True)
        return detected_faces

    def detect(self, image: np.ndarray | Image.Image) -> list[DetectedFace]:
        """Detect faces in an image."""
        img_array = np.array(image) if isinstance(image, Image.Image) else image

        backend = self._resolve_backend()
        if backend == "gpu":
            return self._detect_gpu(img_array)
        return self._detect_cpu(img_array)

    def detect_batch(self, images: list[np.ndarray] | np.ndarray) -> list[list[DetectedFace]]:
        """Detect faces in a batch of images."""
        backend = self._resolve_backend()

        if isinstance(images, np.ndarray):
            if images.ndim == 4:
                images = [images[i] for i in range(images.shape[0])]
            else:
                images = [images]

        if not images:
            return []

        if backend == "cpu":
            return [self._detect_cpu(img) for img in images]

        if self._yolo_model is None:
            self._init_yolo_face()

        results = self._yolo_model(images, conf=self.confidence_threshold, verbose=False)

        batch_results: list[list[DetectedFace]] = []
        for result in results:
            detected_faces: list[DetectedFace] = []
            boxes = result.boxes
            if boxes is not None:
                result_h, result_w = result.orig_shape

                for i in range(len(boxes)):
                    x1, y1, x2, y2 = boxes.xyxy[i].tolist()
                    conf = float(boxes.conf[i])

                    face_w = x2 - x1
                    face_h = y2 - y1
                    if face_w < self.min_face_size or face_h < self.min_face_size:
                        continue

                    bbox = BoundingBox(
                        x=x1 / result_w,
                        y=y1 / result_h,
                        width=face_w / result_w,
                        height=face_h / result_h,
                    )
                    detected_faces.append(DetectedFace(bounding_box=bbox, confidence=conf))

            detected_faces.sort(key=lambda f: f.area or 0, reverse=True)
            batch_results.append(detected_faces)

        return batch_results

detect

detect(image: ndarray | Image) -> list[DetectedFace]

Detect faces in an image.

Source code in src/videopython/ai/understanding/detection.py
def detect(self, image: np.ndarray | Image.Image) -> list[DetectedFace]:
    """Detect faces in an image."""
    img_array = np.array(image) if isinstance(image, Image.Image) else image

    backend = self._resolve_backend()
    if backend == "gpu":
        return self._detect_gpu(img_array)
    return self._detect_cpu(img_array)

detect_batch

detect_batch(
    images: list[ndarray] | ndarray,
) -> list[list[DetectedFace]]

Detect faces in a batch of images.

Source code in src/videopython/ai/understanding/detection.py
def detect_batch(self, images: list[np.ndarray] | np.ndarray) -> list[list[DetectedFace]]:
    """Detect faces in a batch of images."""
    backend = self._resolve_backend()

    if isinstance(images, np.ndarray):
        if images.ndim == 4:
            images = [images[i] for i in range(images.shape[0])]
        else:
            images = [images]

    if not images:
        return []

    if backend == "cpu":
        return [self._detect_cpu(img) for img in images]

    if self._yolo_model is None:
        self._init_yolo_face()

    results = self._yolo_model(images, conf=self.confidence_threshold, verbose=False)

    batch_results: list[list[DetectedFace]] = []
    for result in results:
        detected_faces: list[DetectedFace] = []
        boxes = result.boxes
        if boxes is not None:
            result_h, result_w = result.orig_shape

            for i in range(len(boxes)):
                x1, y1, x2, y2 = boxes.xyxy[i].tolist()
                conf = float(boxes.conf[i])

                face_w = x2 - x1
                face_h = y2 - y1
                if face_w < self.min_face_size or face_h < self.min_face_size:
                    continue

                bbox = BoundingBox(
                    x=x1 / result_w,
                    y=y1 / result_h,
                    width=face_w / result_w,
                    height=face_h / result_h,
                )
                detected_faces.append(DetectedFace(bounding_box=bbox, confidence=conf))

        detected_faces.sort(key=lambda f: f.area or 0, reverse=True)
        batch_results.append(detected_faces)

    return batch_results

TextDetector

TextDetector supports two output modes:

  • detect(image) -> list[str] (backward-compatible plain text)
  • detect_detailed(image) -> list[DetectedText] (text + confidence + bounding box)
from videopython.ai import TextDetector

detector = TextDetector(languages=["en"])
texts = detector.detect(frame)
regions = detector.detect_detailed(frame)

for region in regions:
    print(region.text, region.confidence, region.bounding_box)

TextDetector

Detects text in images using local EasyOCR.

Source code in src/videopython/ai/understanding/detection.py
class TextDetector:
    """Detects text in images using local EasyOCR."""

    def __init__(self, languages: list[str] | None = None, device: str | None = None):
        self.languages = languages or ["en"]
        self.device = device
        self._reader: Any = None

    def _init_easyocr(self) -> None:
        """Initialize EasyOCR reader."""
        import easyocr

        selected_device = select_device(self.device, mps_allowed=False)
        self._reader = easyocr.Reader(self.languages, gpu=(selected_device == "cuda"))
        self.device = selected_device

    def detect(self, image: np.ndarray | Image.Image) -> list[str]:
        """Detect text in an image.

        Returns plain text strings for backward compatibility.
        """
        return [item.text for item in self.detect_detailed(image)]

    def detect_detailed(self, image: np.ndarray | Image.Image) -> list[DetectedText]:
        """Detect text in an image with confidence and region boxes."""
        if self._reader is None:
            self._init_easyocr()

        img_array = np.array(image) if isinstance(image, Image.Image) else image
        results = self._reader.readtext(img_array)

        img_h, img_w = img_array.shape[:2]
        detected_text: list[DetectedText] = []
        for polygon, text, confidence in results:
            text_value = str(text).strip()
            if not text_value:
                continue

            bbox: BoundingBox | None = None
            try:
                if polygon:
                    xs = [float(point[0]) for point in polygon]
                    ys = [float(point[1]) for point in polygon]
                    x_min = max(0.0, min(xs))
                    x_max = min(float(img_w), max(xs))
                    y_min = max(0.0, min(ys))
                    y_max = min(float(img_h), max(ys))
                    width = max(0.0, x_max - x_min)
                    height = max(0.0, y_max - y_min)
                    if img_w > 0 and img_h > 0:
                        bbox = BoundingBox(
                            x=x_min / img_w,
                            y=y_min / img_h,
                            width=width / img_w,
                            height=height / img_h,
                        )
            except Exception:
                bbox = None

            detected_text.append(
                DetectedText(
                    text=text_value,
                    confidence=float(confidence),
                    bounding_box=bbox,
                )
            )

        return detected_text

detect

detect(image: ndarray | Image) -> list[str]

Detect text in an image.

Returns plain text strings for backward compatibility.

Source code in src/videopython/ai/understanding/detection.py
def detect(self, image: np.ndarray | Image.Image) -> list[str]:
    """Detect text in an image.

    Returns plain text strings for backward compatibility.
    """
    return [item.text for item in self.detect_detailed(image)]

detect_detailed

detect_detailed(
    image: ndarray | Image,
) -> list[DetectedText]

Detect text in an image with confidence and region boxes.

Source code in src/videopython/ai/understanding/detection.py
def detect_detailed(self, image: np.ndarray | Image.Image) -> list[DetectedText]:
    """Detect text in an image with confidence and region boxes."""
    if self._reader is None:
        self._init_easyocr()

    img_array = np.array(image) if isinstance(image, Image.Image) else image
    results = self._reader.readtext(img_array)

    img_h, img_w = img_array.shape[:2]
    detected_text: list[DetectedText] = []
    for polygon, text, confidence in results:
        text_value = str(text).strip()
        if not text_value:
            continue

        bbox: BoundingBox | None = None
        try:
            if polygon:
                xs = [float(point[0]) for point in polygon]
                ys = [float(point[1]) for point in polygon]
                x_min = max(0.0, min(xs))
                x_max = min(float(img_w), max(xs))
                y_min = max(0.0, min(ys))
                y_max = min(float(img_h), max(ys))
                width = max(0.0, x_max - x_min)
                height = max(0.0, y_max - y_min)
                if img_w > 0 and img_h > 0:
                    bbox = BoundingBox(
                        x=x_min / img_w,
                        y=y_min / img_h,
                        width=width / img_w,
                        height=height / img_h,
                    )
        except Exception:
            bbox = None

        detected_text.append(
            DetectedText(
                text=text_value,
                confidence=float(confidence),
                bounding_box=bbox,
            )
        )

    return detected_text

CameraMotionDetector

CameraMotionDetector

Detects camera motion between frames using optical flow.

Source code in src/videopython/ai/understanding/detection.py
class CameraMotionDetector:
    """Detects camera motion between frames using optical flow."""

    MOTION_TYPES: list[str] = ["static", "pan", "tilt", "zoom", "complex"]

    def __init__(
        self,
        motion_threshold: float = 2.0,
        zoom_threshold: float = 0.1,
    ):
        self.motion_threshold = motion_threshold
        self.zoom_threshold = zoom_threshold

    def detect(
        self,
        frame1: np.ndarray | Image.Image,
        frame2: np.ndarray | Image.Image,
    ) -> str:
        """Detect camera motion between two consecutive frames."""
        import cv2

        img1 = np.array(frame1) if isinstance(frame1, Image.Image) else frame1
        img2 = np.array(frame2) if isinstance(frame2, Image.Image) else frame2

        gray1 = cv2.cvtColor(img1, cv2.COLOR_RGB2GRAY) if len(img1.shape) == 3 else img1
        gray2 = cv2.cvtColor(img2, cv2.COLOR_RGB2GRAY) if len(img2.shape) == 3 else img2

        flow = cv2.calcOpticalFlowFarneback(
            gray1,
            gray2,
            None,
            pyr_scale=0.5,
            levels=3,
            winsize=15,
            iterations=3,
            poly_n=5,
            poly_sigma=1.2,
            flags=0,
        )

        flow_x = flow[..., 0]
        flow_y = flow[..., 1]

        magnitude = np.sqrt(flow_x**2 + flow_y**2)
        avg_magnitude = np.mean(magnitude)

        if avg_magnitude < self.motion_threshold:
            return "static"

        mean_flow_x = np.mean(flow_x)
        mean_flow_y = np.mean(flow_y)

        h, w = gray1.shape
        cy, cx = h // 2, w // 2

        center_region = magnitude[cy - h // 4 : cy + h // 4, cx - w // 4 : cx + w // 4]
        edge_region_top = magnitude[: h // 4, :]
        edge_region_bottom = magnitude[-h // 4 :, :]
        edge_region_left = magnitude[:, : w // 4]
        edge_region_right = magnitude[:, -w // 4 :]

        center_mag = np.mean(center_region) if center_region.size > 0 else 0
        edge_mag = np.mean(
            [
                np.mean(edge_region_top) if edge_region_top.size > 0 else 0,
                np.mean(edge_region_bottom) if edge_region_bottom.size > 0 else 0,
                np.mean(edge_region_left) if edge_region_left.size > 0 else 0,
                np.mean(edge_region_right) if edge_region_right.size > 0 else 0,
            ]
        )

        if edge_mag > 0 and abs(edge_mag - center_mag) / edge_mag > self.zoom_threshold:
            return "zoom"

        abs_x = abs(mean_flow_x)
        abs_y = abs(mean_flow_y)

        if abs_x > abs_y * 1.5:
            return "pan"
        elif abs_y > abs_x * 1.5:
            return "tilt"
        else:
            return "complex"

detect

detect(
    frame1: ndarray | Image, frame2: ndarray | Image
) -> str

Detect camera motion between two consecutive frames.

Source code in src/videopython/ai/understanding/detection.py
def detect(
    self,
    frame1: np.ndarray | Image.Image,
    frame2: np.ndarray | Image.Image,
) -> str:
    """Detect camera motion between two consecutive frames."""
    import cv2

    img1 = np.array(frame1) if isinstance(frame1, Image.Image) else frame1
    img2 = np.array(frame2) if isinstance(frame2, Image.Image) else frame2

    gray1 = cv2.cvtColor(img1, cv2.COLOR_RGB2GRAY) if len(img1.shape) == 3 else img1
    gray2 = cv2.cvtColor(img2, cv2.COLOR_RGB2GRAY) if len(img2.shape) == 3 else img2

    flow = cv2.calcOpticalFlowFarneback(
        gray1,
        gray2,
        None,
        pyr_scale=0.5,
        levels=3,
        winsize=15,
        iterations=3,
        poly_n=5,
        poly_sigma=1.2,
        flags=0,
    )

    flow_x = flow[..., 0]
    flow_y = flow[..., 1]

    magnitude = np.sqrt(flow_x**2 + flow_y**2)
    avg_magnitude = np.mean(magnitude)

    if avg_magnitude < self.motion_threshold:
        return "static"

    mean_flow_x = np.mean(flow_x)
    mean_flow_y = np.mean(flow_y)

    h, w = gray1.shape
    cy, cx = h // 2, w // 2

    center_region = magnitude[cy - h // 4 : cy + h // 4, cx - w // 4 : cx + w // 4]
    edge_region_top = magnitude[: h // 4, :]
    edge_region_bottom = magnitude[-h // 4 :, :]
    edge_region_left = magnitude[:, : w // 4]
    edge_region_right = magnitude[:, -w // 4 :]

    center_mag = np.mean(center_region) if center_region.size > 0 else 0
    edge_mag = np.mean(
        [
            np.mean(edge_region_top) if edge_region_top.size > 0 else 0,
            np.mean(edge_region_bottom) if edge_region_bottom.size > 0 else 0,
            np.mean(edge_region_left) if edge_region_left.size > 0 else 0,
            np.mean(edge_region_right) if edge_region_right.size > 0 else 0,
        ]
    )

    if edge_mag > 0 and abs(edge_mag - center_mag) / edge_mag > self.zoom_threshold:
        return "zoom"

    abs_x = abs(mean_flow_x)
    abs_y = abs(mean_flow_y)

    if abs_x > abs_y * 1.5:
        return "pan"
    elif abs_y > abs_x * 1.5:
        return "tilt"
    else:
        return "complex"

MotionAnalyzer

Analyze motion in video frames using optical flow. Detects camera motion types (pan, tilt, zoom) and measures motion magnitude.

from videopython.ai import MotionAnalyzer
from videopython.base import Video

analyzer = MotionAnalyzer()
video = Video.from_path("video.mp4")

# Analyze motion between two frames
motion = analyzer.analyze_frames(video.frames[0], video.frames[1])
print(f"Motion type: {motion.motion_type}, magnitude: {motion.magnitude:.2f}")

# Analyze entire video (memory-efficient)
results = analyzer.analyze_video_path("video.mp4", frames_per_second=1.0)
for timestamp, motion in results:
    print(f"{timestamp:.1f}s: {motion.motion_type} ({motion.magnitude:.2f})")

MotionAnalyzer

Analyzes motion characteristics in video using optical flow.

Detects both camera motion (pan, tilt, zoom) and overall motion magnitude, which is useful for identifying dynamic vs static scenes.

Example

from videopython.ai import MotionAnalyzer from videopython.base import Video

analyzer = MotionAnalyzer() video = Video.from_path("video.mp4")

Analyze motion between two frames

motion = analyzer.analyze_frames(video.frames[0], video.frames[1]) print(f"Motion type: {motion.motion_type}, magnitude: {motion.magnitude:.2f}")

Analyze motion for a list of frames (returns list of MotionInfo)

motions = analyzer.analyze_frame_sequence(video.frames[:10])

Source code in src/videopython/ai/understanding/motion.py
class MotionAnalyzer:
    """Analyzes motion characteristics in video using optical flow.

    Detects both camera motion (pan, tilt, zoom) and overall motion magnitude,
    which is useful for identifying dynamic vs static scenes.

    Example:
        >>> from videopython.ai import MotionAnalyzer
        >>> from videopython.base import Video
        >>>
        >>> analyzer = MotionAnalyzer()
        >>> video = Video.from_path("video.mp4")
        >>>
        >>> # Analyze motion between two frames
        >>> motion = analyzer.analyze_frames(video.frames[0], video.frames[1])
        >>> print(f"Motion type: {motion.motion_type}, magnitude: {motion.magnitude:.2f}")
        >>>
        >>> # Analyze motion for a list of frames (returns list of MotionInfo)
        >>> motions = analyzer.analyze_frame_sequence(video.frames[:10])
    """

    MOTION_TYPES: list[str] = ["static", "pan", "tilt", "zoom", "complex"]

    def __init__(
        self,
        motion_threshold: float = 2.0,
        zoom_threshold: float = 0.1,
        magnitude_cap: float = 50.0,
    ):
        """Initialize motion analyzer.

        Args:
            motion_threshold: Minimum average flow magnitude to consider as motion.
                Values below this are classified as "static". Default: 2.0 pixels/frame.
            zoom_threshold: Threshold for detecting zoom based on flow pattern.
                Default: 0.1 (10% difference between center and edges).
            magnitude_cap: Cap for normalizing magnitude to 0-1 range.
                Motion above this value maps to 1.0. Default: 50.0 pixels/frame.
        """
        self.motion_threshold = motion_threshold
        self.zoom_threshold = zoom_threshold
        self.magnitude_cap = magnitude_cap

    def analyze_frames(
        self,
        frame1: np.ndarray,
        frame2: np.ndarray,
    ) -> MotionInfo:
        """Analyze motion between two consecutive frames.

        Args:
            frame1: First frame as numpy array (H, W, 3) RGB.
            frame2: Second frame as numpy array (H, W, 3) RGB.

        Returns:
            MotionInfo with motion type and magnitude.
        """
        import cv2

        # Convert to grayscale
        if len(frame1.shape) == 3:
            gray1 = cv2.cvtColor(frame1, cv2.COLOR_RGB2GRAY)
        else:
            gray1 = frame1

        if len(frame2.shape) == 3:
            gray2 = cv2.cvtColor(frame2, cv2.COLOR_RGB2GRAY)
        else:
            gray2 = frame2

        # Calculate optical flow using Farneback method
        flow = cv2.calcOpticalFlowFarneback(
            gray1,
            gray2,
            None,
            pyr_scale=0.5,
            levels=3,
            winsize=15,
            iterations=3,
            poly_n=5,
            poly_sigma=1.2,
            flags=0,
        )

        # Calculate magnitude
        flow_x = flow[..., 0]
        flow_y = flow[..., 1]
        magnitude = np.sqrt(flow_x**2 + flow_y**2)
        avg_magnitude = float(np.mean(magnitude))

        # Normalize magnitude to 0-1 range
        normalized_magnitude = min(avg_magnitude / self.magnitude_cap, 1.0)

        # Classify motion type
        if avg_magnitude < self.motion_threshold:
            motion_type = "static"
        else:
            motion_type = self._classify_motion(flow, gray1.shape, avg_magnitude)

        return MotionInfo(
            motion_type=motion_type,
            magnitude=normalized_magnitude,
            raw_magnitude=avg_magnitude,
        )

    def _classify_motion(
        self,
        flow: np.ndarray,
        shape: tuple[int, int],
        avg_magnitude: float,
    ) -> str:
        """Classify the type of motion based on optical flow pattern.

        Args:
            flow: Optical flow array (H, W, 2) with x and y components.
            shape: Frame shape (H, W).
            avg_magnitude: Average flow magnitude.

        Returns:
            Motion type: "pan", "tilt", "zoom", or "complex".
        """
        flow_x = flow[..., 0]
        flow_y = flow[..., 1]
        magnitude = np.sqrt(flow_x**2 + flow_y**2)

        # Calculate mean flow direction
        mean_flow_x = np.mean(flow_x)
        mean_flow_y = np.mean(flow_y)

        # Check for zoom by analyzing flow from center
        h, w = shape
        cy, cx = h // 2, w // 2

        # Sample flow at different distances from center
        center_region = magnitude[cy - h // 4 : cy + h // 4, cx - w // 4 : cx + w // 4]
        edge_region_top = magnitude[: h // 4, :]
        edge_region_bottom = magnitude[-h // 4 :, :]
        edge_region_left = magnitude[:, : w // 4]
        edge_region_right = magnitude[:, -w // 4 :]

        center_mag = np.mean(center_region) if center_region.size > 0 else 0
        edge_mag = np.mean(
            [
                np.mean(edge_region_top) if edge_region_top.size > 0 else 0,
                np.mean(edge_region_bottom) if edge_region_bottom.size > 0 else 0,
                np.mean(edge_region_left) if edge_region_left.size > 0 else 0,
                np.mean(edge_region_right) if edge_region_right.size > 0 else 0,
            ]
        )

        # Zoom detection: edges move more than center (zoom in) or vice versa
        if edge_mag > 0 and abs(edge_mag - center_mag) / edge_mag > self.zoom_threshold:
            return "zoom"

        # Determine dominant motion direction
        abs_x = abs(mean_flow_x)
        abs_y = abs(mean_flow_y)

        if abs_x > abs_y * 1.5:
            return "pan"  # Horizontal motion
        elif abs_y > abs_x * 1.5:
            return "tilt"  # Vertical motion
        else:
            return "complex"  # Mixed motion

    def analyze_frame_sequence(
        self,
        frames: list[np.ndarray],
    ) -> list[MotionInfo]:
        """Analyze motion for a sequence of frames.

        Returns motion info for each pair of consecutive frames.
        Result list has length len(frames) - 1.

        Args:
            frames: List of frames as numpy arrays.

        Returns:
            List of MotionInfo objects for each frame transition.
        """
        if len(frames) < 2:
            return []

        motions = []
        for i in range(len(frames) - 1):
            motion = self.analyze_frames(frames[i], frames[i + 1])
            motions.append(motion)

        return motions

    def analyze_video(
        self,
        video: Video,
        sample_interval: int = 1,
    ) -> list[MotionInfo]:
        """Analyze motion throughout a video.

        Args:
            video: Video object to analyze.
            sample_interval: Analyze every Nth frame pair. Default: 1 (all frames).

        Returns:
            List of MotionInfo objects for sampled frame transitions.
        """
        frames = video.frames
        if len(frames) < 2:
            return []

        motions = []
        for i in range(0, len(frames) - 1, sample_interval):
            motion = self.analyze_frames(frames[i], frames[i + 1])
            motions.append(motion)

        return motions

    def analyze_video_path(
        self,
        path: str | Path,
        frames_per_second: float = 1.0,
    ) -> list[tuple[float, MotionInfo]]:
        """Analyze motion from video file with minimal memory usage.

        Streams frames from the video file instead of loading entire video.
        Returns timestamped motion info.

        Args:
            path: Path to video file.
            frames_per_second: How many frames per second to analyze. Default: 1.0.

        Returns:
            List of (timestamp, MotionInfo) tuples.
        """
        import cv2

        path = Path(path)
        cap = cv2.VideoCapture(str(path))

        if not cap.isOpened():
            raise ValueError(f"Could not open video file: {path}")

        fps = cap.get(cv2.CAP_PROP_FPS)
        frame_interval = max(1, int(fps / frames_per_second))

        results: list[tuple[float, MotionInfo]] = []
        prev_frame = None
        frame_idx = 0

        while True:
            ret, frame = cap.read()
            if not ret:
                break

            if frame_idx % frame_interval == 0:
                # Convert BGR to RGB
                frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

                if prev_frame is not None:
                    motion = self.analyze_frames(prev_frame, frame_rgb)
                    timestamp = frame_idx / fps
                    results.append((timestamp, motion))

                prev_frame = frame_rgb

            frame_idx += 1

        cap.release()
        return results

    @staticmethod
    def aggregate_motion(motions: list[MotionInfo]) -> tuple[float, str]:
        """Aggregate motion info into scene-level statistics.

        Args:
            motions: List of MotionInfo objects from frames in a scene.

        Returns:
            Tuple of (average_magnitude, dominant_motion_type).
        """
        if not motions:
            return 0.0, "static"

        avg_magnitude = sum(m.magnitude for m in motions) / len(motions)

        # Find dominant motion type (excluding static if there's any motion)
        motion_types = [m.motion_type for m in motions]
        type_counts = Counter(motion_types)

        # If mostly static, return static
        static_ratio = type_counts.get("static", 0) / len(motions)
        if static_ratio > 0.7:
            dominant_type = "static"
        else:
            # Find most common non-static type
            non_static = {k: v for k, v in type_counts.items() if k != "static"}
            if non_static:
                dominant_type = max(non_static, key=lambda k: non_static[k])
            else:
                dominant_type = "static"

        return avg_magnitude, dominant_type

__init__

__init__(
    motion_threshold: float = 2.0,
    zoom_threshold: float = 0.1,
    magnitude_cap: float = 50.0,
)

Initialize motion analyzer.

Parameters:

Name Type Description Default
motion_threshold float

Minimum average flow magnitude to consider as motion. Values below this are classified as "static". Default: 2.0 pixels/frame.

2.0
zoom_threshold float

Threshold for detecting zoom based on flow pattern. Default: 0.1 (10% difference between center and edges).

0.1
magnitude_cap float

Cap for normalizing magnitude to 0-1 range. Motion above this value maps to 1.0. Default: 50.0 pixels/frame.

50.0
Source code in src/videopython/ai/understanding/motion.py
def __init__(
    self,
    motion_threshold: float = 2.0,
    zoom_threshold: float = 0.1,
    magnitude_cap: float = 50.0,
):
    """Initialize motion analyzer.

    Args:
        motion_threshold: Minimum average flow magnitude to consider as motion.
            Values below this are classified as "static". Default: 2.0 pixels/frame.
        zoom_threshold: Threshold for detecting zoom based on flow pattern.
            Default: 0.1 (10% difference between center and edges).
        magnitude_cap: Cap for normalizing magnitude to 0-1 range.
            Motion above this value maps to 1.0. Default: 50.0 pixels/frame.
    """
    self.motion_threshold = motion_threshold
    self.zoom_threshold = zoom_threshold
    self.magnitude_cap = magnitude_cap

analyze_frames

analyze_frames(
    frame1: ndarray, frame2: ndarray
) -> MotionInfo

Analyze motion between two consecutive frames.

Parameters:

Name Type Description Default
frame1 ndarray

First frame as numpy array (H, W, 3) RGB.

required
frame2 ndarray

Second frame as numpy array (H, W, 3) RGB.

required

Returns:

Type Description
MotionInfo

MotionInfo with motion type and magnitude.

Source code in src/videopython/ai/understanding/motion.py
def analyze_frames(
    self,
    frame1: np.ndarray,
    frame2: np.ndarray,
) -> MotionInfo:
    """Analyze motion between two consecutive frames.

    Args:
        frame1: First frame as numpy array (H, W, 3) RGB.
        frame2: Second frame as numpy array (H, W, 3) RGB.

    Returns:
        MotionInfo with motion type and magnitude.
    """
    import cv2

    # Convert to grayscale
    if len(frame1.shape) == 3:
        gray1 = cv2.cvtColor(frame1, cv2.COLOR_RGB2GRAY)
    else:
        gray1 = frame1

    if len(frame2.shape) == 3:
        gray2 = cv2.cvtColor(frame2, cv2.COLOR_RGB2GRAY)
    else:
        gray2 = frame2

    # Calculate optical flow using Farneback method
    flow = cv2.calcOpticalFlowFarneback(
        gray1,
        gray2,
        None,
        pyr_scale=0.5,
        levels=3,
        winsize=15,
        iterations=3,
        poly_n=5,
        poly_sigma=1.2,
        flags=0,
    )

    # Calculate magnitude
    flow_x = flow[..., 0]
    flow_y = flow[..., 1]
    magnitude = np.sqrt(flow_x**2 + flow_y**2)
    avg_magnitude = float(np.mean(magnitude))

    # Normalize magnitude to 0-1 range
    normalized_magnitude = min(avg_magnitude / self.magnitude_cap, 1.0)

    # Classify motion type
    if avg_magnitude < self.motion_threshold:
        motion_type = "static"
    else:
        motion_type = self._classify_motion(flow, gray1.shape, avg_magnitude)

    return MotionInfo(
        motion_type=motion_type,
        magnitude=normalized_magnitude,
        raw_magnitude=avg_magnitude,
    )

analyze_frame_sequence

analyze_frame_sequence(
    frames: list[ndarray],
) -> list[MotionInfo]

Analyze motion for a sequence of frames.

Returns motion info for each pair of consecutive frames. Result list has length len(frames) - 1.

Parameters:

Name Type Description Default
frames list[ndarray]

List of frames as numpy arrays.

required

Returns:

Type Description
list[MotionInfo]

List of MotionInfo objects for each frame transition.

Source code in src/videopython/ai/understanding/motion.py
def analyze_frame_sequence(
    self,
    frames: list[np.ndarray],
) -> list[MotionInfo]:
    """Analyze motion for a sequence of frames.

    Returns motion info for each pair of consecutive frames.
    Result list has length len(frames) - 1.

    Args:
        frames: List of frames as numpy arrays.

    Returns:
        List of MotionInfo objects for each frame transition.
    """
    if len(frames) < 2:
        return []

    motions = []
    for i in range(len(frames) - 1):
        motion = self.analyze_frames(frames[i], frames[i + 1])
        motions.append(motion)

    return motions

analyze_video

analyze_video(
    video: Video, sample_interval: int = 1
) -> list[MotionInfo]

Analyze motion throughout a video.

Parameters:

Name Type Description Default
video Video

Video object to analyze.

required
sample_interval int

Analyze every Nth frame pair. Default: 1 (all frames).

1

Returns:

Type Description
list[MotionInfo]

List of MotionInfo objects for sampled frame transitions.

Source code in src/videopython/ai/understanding/motion.py
def analyze_video(
    self,
    video: Video,
    sample_interval: int = 1,
) -> list[MotionInfo]:
    """Analyze motion throughout a video.

    Args:
        video: Video object to analyze.
        sample_interval: Analyze every Nth frame pair. Default: 1 (all frames).

    Returns:
        List of MotionInfo objects for sampled frame transitions.
    """
    frames = video.frames
    if len(frames) < 2:
        return []

    motions = []
    for i in range(0, len(frames) - 1, sample_interval):
        motion = self.analyze_frames(frames[i], frames[i + 1])
        motions.append(motion)

    return motions

analyze_video_path

analyze_video_path(
    path: str | Path, frames_per_second: float = 1.0
) -> list[tuple[float, MotionInfo]]

Analyze motion from video file with minimal memory usage.

Streams frames from the video file instead of loading entire video. Returns timestamped motion info.

Parameters:

Name Type Description Default
path str | Path

Path to video file.

required
frames_per_second float

How many frames per second to analyze. Default: 1.0.

1.0

Returns:

Type Description
list[tuple[float, MotionInfo]]

List of (timestamp, MotionInfo) tuples.

Source code in src/videopython/ai/understanding/motion.py
def analyze_video_path(
    self,
    path: str | Path,
    frames_per_second: float = 1.0,
) -> list[tuple[float, MotionInfo]]:
    """Analyze motion from video file with minimal memory usage.

    Streams frames from the video file instead of loading entire video.
    Returns timestamped motion info.

    Args:
        path: Path to video file.
        frames_per_second: How many frames per second to analyze. Default: 1.0.

    Returns:
        List of (timestamp, MotionInfo) tuples.
    """
    import cv2

    path = Path(path)
    cap = cv2.VideoCapture(str(path))

    if not cap.isOpened():
        raise ValueError(f"Could not open video file: {path}")

    fps = cap.get(cv2.CAP_PROP_FPS)
    frame_interval = max(1, int(fps / frames_per_second))

    results: list[tuple[float, MotionInfo]] = []
    prev_frame = None
    frame_idx = 0

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        if frame_idx % frame_interval == 0:
            # Convert BGR to RGB
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

            if prev_frame is not None:
                motion = self.analyze_frames(prev_frame, frame_rgb)
                timestamp = frame_idx / fps
                results.append((timestamp, motion))

            prev_frame = frame_rgb

        frame_idx += 1

    cap.release()
    return results

aggregate_motion staticmethod

aggregate_motion(
    motions: list[MotionInfo],
) -> tuple[float, str]

Aggregate motion info into scene-level statistics.

Parameters:

Name Type Description Default
motions list[MotionInfo]

List of MotionInfo objects from frames in a scene.

required

Returns:

Type Description
tuple[float, str]

Tuple of (average_magnitude, dominant_motion_type).

Source code in src/videopython/ai/understanding/motion.py
@staticmethod
def aggregate_motion(motions: list[MotionInfo]) -> tuple[float, str]:
    """Aggregate motion info into scene-level statistics.

    Args:
        motions: List of MotionInfo objects from frames in a scene.

    Returns:
        Tuple of (average_magnitude, dominant_motion_type).
    """
    if not motions:
        return 0.0, "static"

    avg_magnitude = sum(m.magnitude for m in motions) / len(motions)

    # Find dominant motion type (excluding static if there's any motion)
    motion_types = [m.motion_type for m in motions]
    type_counts = Counter(motion_types)

    # If mostly static, return static
    static_ratio = type_counts.get("static", 0) / len(motions)
    if static_ratio > 0.7:
        dominant_type = "static"
    else:
        # Find most common non-static type
        non_static = {k: v for k, v in type_counts.items() if k != "static"}
        if non_static:
            dominant_type = max(non_static, key=lambda k: non_static[k])
        else:
            dominant_type = "static"

    return avg_magnitude, dominant_type

ActionRecognizer

Recognize actions and activities in video clips using VideoMAE, a masked autoencoder fine-tuned on Kinetics-400 (400 action classes like "walking", "running", "dancing", "answering questions").

from videopython.ai import ActionRecognizer

recognizer = ActionRecognizer(model_size="base", confidence_threshold=0.1)

# Recognize actions in entire video
actions = recognizer.recognize_path("video.mp4", top_k=5)
for action in actions:
    print(f"{action.label}: {action.confidence:.1%}")

# Output: answering questions: 37.2%
#         using computer: 12.2%

ActionRecognizer

Recognizes actions/activities in video clips using VideoMAE.

VideoMAE is a masked autoencoder pre-trained on video data and fine-tuned for action recognition on Kinetics-400 (400 action classes).

Example

from videopython.base import Video from videopython.ai.understanding import ActionRecognizer video = Video.from_path("video.mp4") recognizer = ActionRecognizer() actions = recognizer.recognize(video) for action in actions: ... print(f"{action.label}: {action.confidence:.2f}")

Source code in src/videopython/ai/understanding/temporal.py
class ActionRecognizer:
    """Recognizes actions/activities in video clips using VideoMAE.

    VideoMAE is a masked autoencoder pre-trained on video data and fine-tuned
    for action recognition on Kinetics-400 (400 action classes).

    Example:
        >>> from videopython.base import Video
        >>> from videopython.ai.understanding import ActionRecognizer
        >>> video = Video.from_path("video.mp4")
        >>> recognizer = ActionRecognizer()
        >>> actions = recognizer.recognize(video)
        >>> for action in actions:
        ...     print(f"{action.label}: {action.confidence:.2f}")
    """

    # Model variants available
    MODEL_VARIANTS = Literal["base", "large"]

    def __init__(
        self,
        model_size: MODEL_VARIANTS = "base",
        device: str | None = None,
        confidence_threshold: float = 0.1,
        num_frames: int = 16,
    ):
        """Initialize the action recognizer.

        Args:
            model_size: Model size - "base" (faster) or "large" (more accurate).
            device: Device to run on ('cuda', 'cpu', or None for auto).
            confidence_threshold: Minimum confidence for reported actions.
            num_frames: Number of frames to sample per clip (default 16 for VideoMAE).
        """
        self.model_size = model_size
        self.confidence_threshold = confidence_threshold
        self.num_frames = num_frames

        # Lazy load model
        self._model: Any = None
        self._processor: Any = None
        self._device: str | None = device

    def _load_model(self) -> None:
        """Load the VideoMAE model and processor."""
        if self._model is not None:
            return

        from transformers import VideoMAEForVideoClassification, VideoMAEImageProcessor  # type: ignore[attr-defined]

        model_name = (
            "MCG-NJU/videomae-base-finetuned-kinetics"
            if self.model_size == "base"
            else "MCG-NJU/videomae-large-finetuned-kinetics"
        )

        self._processor = VideoMAEImageProcessor.from_pretrained(model_name)
        self._model = VideoMAEForVideoClassification.from_pretrained(model_name)

        self._device = select_device(self._device, mps_allowed=True)

        self._model = self._model.to(self._device)
        self._model.eval()

    def _sample_frames(self, frames: np.ndarray, num_samples: int) -> np.ndarray:
        """Sample frames uniformly from a video clip.

        Args:
            frames: Video frames array (N, H, W, 3)
            num_samples: Number of frames to sample

        Returns:
            Sampled frames array (num_samples, H, W, 3)
        """
        total_frames = len(frames)
        if total_frames <= num_samples:
            # Pad by repeating last frame if needed
            if total_frames < num_samples:
                pad_count = num_samples - total_frames
                padding = np.repeat(frames[-1:], pad_count, axis=0)
                return np.concatenate([frames, padding], axis=0)
            return frames

        # Uniform sampling
        indices = np.linspace(0, total_frames - 1, num_samples, dtype=int)
        return frames[indices]

    def recognize(
        self,
        video: Video,
        top_k: int = 5,
    ) -> list[DetectedAction]:
        """Recognize actions in a video.

        Processes the entire video as a single clip and returns top-k predictions.

        Args:
            video: Video object to analyze.
            top_k: Number of top predictions to return.

        Returns:
            List of DetectedAction objects with recognized activities.
        """
        self._load_model()

        import torch

        # Sample frames for the model
        sampled_frames = self._sample_frames(video.frames, self.num_frames)

        # Convert to list of PIL images (processor expects this format)
        frames_list = [sampled_frames[i] for i in range(len(sampled_frames))]

        # Process frames
        inputs = self._processor(frames_list, return_tensors="pt")
        inputs = {k: v.to(self._device) for k, v in inputs.items()}

        # Run inference
        with torch.no_grad():
            outputs = self._model(**inputs)
            logits = outputs.logits

        # Get probabilities
        probs = torch.nn.functional.softmax(logits, dim=-1)[0]
        top_probs, top_indices = torch.topk(probs, min(top_k, len(probs)))

        # Build results
        labels = _get_kinetics_labels()
        actions = []
        for prob, idx in zip(top_probs.cpu().numpy(), top_indices.cpu().numpy()):
            if prob >= self.confidence_threshold:
                actions.append(
                    DetectedAction(
                        label=labels[idx],
                        confidence=float(prob),
                        start_frame=0,
                        end_frame=len(video.frames),
                        start_time=0.0,
                        end_time=video.total_seconds,
                    )
                )

        return actions

    def recognize_path(
        self,
        path: str | Path,
        top_k: int = 5,
        start_second: float | None = None,
        end_second: float | None = None,
    ) -> list[DetectedAction]:
        """Recognize actions from a video file with memory-efficient loading.

        Args:
            path: Path to video file.
            top_k: Number of top predictions to return.
            start_second: Optional start time for analysis.
            end_second: Optional end time for analysis.

        Returns:
            List of DetectedAction objects with recognized activities.
        """
        from videopython.base.video import VideoMetadata, extract_frames_at_times

        self._load_model()

        import torch

        metadata = VideoMetadata.from_path(path)

        # Determine time range
        start = start_second if start_second is not None else 0.0
        end = end_second if end_second is not None else metadata.total_seconds

        # Sample timestamps uniformly
        timestamps = np.linspace(start, end - 0.001, self.num_frames).tolist()
        frames = extract_frames_at_times(path, timestamps)

        if len(frames) < self.num_frames:
            # Pad if needed
            frames = self._sample_frames(frames, self.num_frames)

        # Convert to list for processor
        frames_list = [frames[i] for i in range(len(frames))]

        # Process and run inference
        inputs = self._processor(frames_list, return_tensors="pt")
        inputs = {k: v.to(self._device) for k, v in inputs.items()}

        with torch.no_grad():
            outputs = self._model(**inputs)
            logits = outputs.logits

        # Get probabilities
        probs = torch.nn.functional.softmax(logits, dim=-1)[0]
        top_probs, top_indices = torch.topk(probs, min(top_k, len(probs)))

        # Build results
        labels = _get_kinetics_labels()
        actions = []
        for prob, idx in zip(top_probs.cpu().numpy(), top_indices.cpu().numpy()):
            if prob >= self.confidence_threshold:
                start_frame = int(start * metadata.fps)
                end_frame = int(end * metadata.fps)
                actions.append(
                    DetectedAction(
                        label=labels[idx],
                        confidence=float(prob),
                        start_frame=start_frame,
                        end_frame=end_frame,
                        start_time=start,
                        end_time=end,
                    )
                )

        return actions

__init__

__init__(
    model_size: MODEL_VARIANTS = "base",
    device: str | None = None,
    confidence_threshold: float = 0.1,
    num_frames: int = 16,
)

Initialize the action recognizer.

Parameters:

Name Type Description Default
model_size MODEL_VARIANTS

Model size - "base" (faster) or "large" (more accurate).

'base'
device str | None

Device to run on ('cuda', 'cpu', or None for auto).

None
confidence_threshold float

Minimum confidence for reported actions.

0.1
num_frames int

Number of frames to sample per clip (default 16 for VideoMAE).

16
Source code in src/videopython/ai/understanding/temporal.py
def __init__(
    self,
    model_size: MODEL_VARIANTS = "base",
    device: str | None = None,
    confidence_threshold: float = 0.1,
    num_frames: int = 16,
):
    """Initialize the action recognizer.

    Args:
        model_size: Model size - "base" (faster) or "large" (more accurate).
        device: Device to run on ('cuda', 'cpu', or None for auto).
        confidence_threshold: Minimum confidence for reported actions.
        num_frames: Number of frames to sample per clip (default 16 for VideoMAE).
    """
    self.model_size = model_size
    self.confidence_threshold = confidence_threshold
    self.num_frames = num_frames

    # Lazy load model
    self._model: Any = None
    self._processor: Any = None
    self._device: str | None = device

recognize

recognize(
    video: Video, top_k: int = 5
) -> list[DetectedAction]

Recognize actions in a video.

Processes the entire video as a single clip and returns top-k predictions.

Parameters:

Name Type Description Default
video Video

Video object to analyze.

required
top_k int

Number of top predictions to return.

5

Returns:

Type Description
list[DetectedAction]

List of DetectedAction objects with recognized activities.

Source code in src/videopython/ai/understanding/temporal.py
def recognize(
    self,
    video: Video,
    top_k: int = 5,
) -> list[DetectedAction]:
    """Recognize actions in a video.

    Processes the entire video as a single clip and returns top-k predictions.

    Args:
        video: Video object to analyze.
        top_k: Number of top predictions to return.

    Returns:
        List of DetectedAction objects with recognized activities.
    """
    self._load_model()

    import torch

    # Sample frames for the model
    sampled_frames = self._sample_frames(video.frames, self.num_frames)

    # Convert to list of PIL images (processor expects this format)
    frames_list = [sampled_frames[i] for i in range(len(sampled_frames))]

    # Process frames
    inputs = self._processor(frames_list, return_tensors="pt")
    inputs = {k: v.to(self._device) for k, v in inputs.items()}

    # Run inference
    with torch.no_grad():
        outputs = self._model(**inputs)
        logits = outputs.logits

    # Get probabilities
    probs = torch.nn.functional.softmax(logits, dim=-1)[0]
    top_probs, top_indices = torch.topk(probs, min(top_k, len(probs)))

    # Build results
    labels = _get_kinetics_labels()
    actions = []
    for prob, idx in zip(top_probs.cpu().numpy(), top_indices.cpu().numpy()):
        if prob >= self.confidence_threshold:
            actions.append(
                DetectedAction(
                    label=labels[idx],
                    confidence=float(prob),
                    start_frame=0,
                    end_frame=len(video.frames),
                    start_time=0.0,
                    end_time=video.total_seconds,
                )
            )

    return actions

recognize_path

recognize_path(
    path: str | Path,
    top_k: int = 5,
    start_second: float | None = None,
    end_second: float | None = None,
) -> list[DetectedAction]

Recognize actions from a video file with memory-efficient loading.

Parameters:

Name Type Description Default
path str | Path

Path to video file.

required
top_k int

Number of top predictions to return.

5
start_second float | None

Optional start time for analysis.

None
end_second float | None

Optional end time for analysis.

None

Returns:

Type Description
list[DetectedAction]

List of DetectedAction objects with recognized activities.

Source code in src/videopython/ai/understanding/temporal.py
def recognize_path(
    self,
    path: str | Path,
    top_k: int = 5,
    start_second: float | None = None,
    end_second: float | None = None,
) -> list[DetectedAction]:
    """Recognize actions from a video file with memory-efficient loading.

    Args:
        path: Path to video file.
        top_k: Number of top predictions to return.
        start_second: Optional start time for analysis.
        end_second: Optional end time for analysis.

    Returns:
        List of DetectedAction objects with recognized activities.
    """
    from videopython.base.video import VideoMetadata, extract_frames_at_times

    self._load_model()

    import torch

    metadata = VideoMetadata.from_path(path)

    # Determine time range
    start = start_second if start_second is not None else 0.0
    end = end_second if end_second is not None else metadata.total_seconds

    # Sample timestamps uniformly
    timestamps = np.linspace(start, end - 0.001, self.num_frames).tolist()
    frames = extract_frames_at_times(path, timestamps)

    if len(frames) < self.num_frames:
        # Pad if needed
        frames = self._sample_frames(frames, self.num_frames)

    # Convert to list for processor
    frames_list = [frames[i] for i in range(len(frames))]

    # Process and run inference
    inputs = self._processor(frames_list, return_tensors="pt")
    inputs = {k: v.to(self._device) for k, v in inputs.items()}

    with torch.no_grad():
        outputs = self._model(**inputs)
        logits = outputs.logits

    # Get probabilities
    probs = torch.nn.functional.softmax(logits, dim=-1)[0]
    top_probs, top_indices = torch.topk(probs, min(top_k, len(probs)))

    # Build results
    labels = _get_kinetics_labels()
    actions = []
    for prob, idx in zip(top_probs.cpu().numpy(), top_indices.cpu().numpy()):
        if prob >= self.confidence_threshold:
            start_frame = int(start * metadata.fps)
            end_frame = int(end * metadata.fps)
            actions.append(
                DetectedAction(
                    label=labels[idx],
                    confidence=float(prob),
                    start_frame=start_frame,
                    end_frame=end_frame,
                    start_time=start,
                    end_time=end,
                )
            )

    return actions

SemanticSceneDetector

ML-based scene boundary detection using TransNetV2. More accurate than histogram-based detection, especially for gradual transitions like fades and dissolves.

from videopython.ai import SemanticSceneDetector

detector = SemanticSceneDetector(threshold=0.5, min_scene_length=1.0)
scenes = detector.detect_streaming("video.mp4")

for scene in scenes:
    print(f"Scene: {scene.start:.1f}s - {scene.end:.1f}s ({scene.duration:.1f}s)")

SemanticSceneDetector

ML-based scene detection using TransNetV2.

TransNetV2 is a neural network specifically designed for shot boundary detection, providing more accurate scene boundaries than histogram-based methods, especially for gradual transitions.

Uses the transnetv2-pytorch package with pretrained weights.

Example

from videopython.ai.understanding import SemanticSceneDetector detector = SemanticSceneDetector() scenes = detector.detect_streaming("video.mp4") for scene in scenes: ... print(f"Scene: {scene.start:.2f}s - {scene.end:.2f}s")

Source code in src/videopython/ai/understanding/temporal.py
class SemanticSceneDetector:
    """ML-based scene detection using TransNetV2.

    TransNetV2 is a neural network specifically designed for shot boundary
    detection, providing more accurate scene boundaries than histogram-based
    methods, especially for gradual transitions.

    Uses the transnetv2-pytorch package with pretrained weights.

    Example:
        >>> from videopython.ai.understanding import SemanticSceneDetector
        >>> detector = SemanticSceneDetector()
        >>> scenes = detector.detect_streaming("video.mp4")
        >>> for scene in scenes:
        ...     print(f"Scene: {scene.start:.2f}s - {scene.end:.2f}s")
    """

    def __init__(
        self,
        threshold: float = 0.5,
        min_scene_length: float = 0.5,
        device: str | None = None,
    ):
        """Initialize the semantic scene detector.

        Args:
            threshold: Confidence threshold for scene boundaries (0.0-1.0).
                Higher values = fewer, more confident boundaries.
            min_scene_length: Minimum scene duration in seconds.
            device: Device to run on ('cuda', 'mps', 'cpu', or None for auto).
                Note: MPS may have numerical inconsistencies; use 'cpu' for
                reproducible results.
        """
        if not 0.0 <= threshold <= 1.0:
            raise ValueError("threshold must be between 0.0 and 1.0")
        if min_scene_length < 0:
            raise ValueError("min_scene_length must be non-negative")

        self.threshold = threshold
        self.min_scene_length = min_scene_length
        self._device: str | None = device
        self._model: Any = None

    def _load_model(self) -> None:
        """Load the TransNetV2 model with pretrained weights."""
        if self._model is not None:
            return

        from transnetv2_pytorch import TransNetV2

        device = select_device(self._device, mps_allowed=True)
        self._model = TransNetV2(device=device)
        self._model.eval()

    def detect(self, video: Video) -> list[SceneBoundary]:
        """Detect scenes in a video using ML-based boundary detection.

        Note: This method requires saving video to a temporary file for
        TransNetV2 processing. For better performance, use detect_streaming()
        with a file path directly.

        Args:
            video: Video object to analyze.

        Returns:
            List of SceneBoundary objects representing detected scenes.
        """
        import tempfile

        if len(video.frames) == 0:
            return []

        if len(video.frames) == 1:
            return [SceneBoundary(start=0.0, end=video.total_seconds, start_frame=0, end_frame=1)]

        # Save video to temp file for TransNetV2 processing
        with tempfile.NamedTemporaryFile(suffix=".mp4", delete=True) as tmp:
            video.save(tmp.name)
            return self.detect_streaming(tmp.name)

    def detect_streaming(
        self,
        path: str | Path,
        start_second: float | None = None,
        end_second: float | None = None,
    ) -> list[SceneBoundary]:
        """Detect scenes from a video file.

        Uses TransNetV2 with pretrained weights for accurate shot boundary
        detection.

        Args:
            path: Path to video file.
            start_second: Optional start time for analysis (not yet supported).
            end_second: Optional end time for analysis (not yet supported).

        Returns:
            List of SceneBoundary objects representing detected scenes.
        """
        if start_second is not None or end_second is not None:
            import warnings

            warnings.warn(
                "start_second and end_second are not yet supported by SemanticSceneDetector. Processing entire video.",
                UserWarning,
                stacklevel=2,
            )

        self._load_model()

        # Use TransNetV2's detect_scenes which handles everything internally
        raw_scenes = self._model.detect_scenes(str(path), threshold=self.threshold)

        # Convert to SceneBoundary objects
        scenes = []
        for scene_data in raw_scenes:
            start_frame = scene_data["start_frame"]
            end_frame = scene_data["end_frame"]
            start_time = float(scene_data["start_time"])
            end_time = float(scene_data["end_time"])

            scenes.append(
                SceneBoundary(
                    start=start_time,
                    end=end_time,
                    start_frame=start_frame,
                    end_frame=end_frame,
                )
            )

        if self.min_scene_length > 0:
            scenes = self._merge_short_scenes(scenes)

        return scenes

    def _merge_short_scenes(self, scenes: list[SceneBoundary]) -> list[SceneBoundary]:
        """Merge scenes that are shorter than min_scene_length.

        Args:
            scenes: List of scenes to process.

        Returns:
            List of scenes with short scenes merged into adjacent ones.
        """
        if not scenes:
            return scenes

        merged = [scenes[0]]

        for scene in scenes[1:]:
            last_scene = merged[-1]

            if last_scene.duration < self.min_scene_length:
                merged[-1] = SceneBoundary(
                    start=last_scene.start,
                    end=scene.end,
                    start_frame=last_scene.start_frame,
                    end_frame=scene.end_frame,
                )
            else:
                merged.append(scene)

        if len(merged) > 1 and merged[-1].duration < self.min_scene_length:
            second_last = merged[-2]
            last = merged[-1]
            merged[-2] = SceneBoundary(
                start=second_last.start,
                end=last.end,
                start_frame=second_last.start_frame,
                end_frame=last.end_frame,
            )
            merged.pop()

        return merged

    @classmethod
    def detect_from_path(
        cls,
        path: str | Path,
        threshold: float = 0.5,
        min_scene_length: float = 0.5,
    ) -> list[SceneBoundary]:
        """Convenience method for one-shot scene detection.

        Args:
            path: Path to video file.
            threshold: Scene boundary threshold (0.0-1.0).
            min_scene_length: Minimum scene duration in seconds.

        Returns:
            List of SceneBoundary objects representing detected scenes.
        """
        detector = cls(threshold=threshold, min_scene_length=min_scene_length)
        return detector.detect_streaming(path)

__init__

__init__(
    threshold: float = 0.5,
    min_scene_length: float = 0.5,
    device: str | None = None,
)

Initialize the semantic scene detector.

Parameters:

Name Type Description Default
threshold float

Confidence threshold for scene boundaries (0.0-1.0). Higher values = fewer, more confident boundaries.

0.5
min_scene_length float

Minimum scene duration in seconds.

0.5
device str | None

Device to run on ('cuda', 'mps', 'cpu', or None for auto). Note: MPS may have numerical inconsistencies; use 'cpu' for reproducible results.

None
Source code in src/videopython/ai/understanding/temporal.py
def __init__(
    self,
    threshold: float = 0.5,
    min_scene_length: float = 0.5,
    device: str | None = None,
):
    """Initialize the semantic scene detector.

    Args:
        threshold: Confidence threshold for scene boundaries (0.0-1.0).
            Higher values = fewer, more confident boundaries.
        min_scene_length: Minimum scene duration in seconds.
        device: Device to run on ('cuda', 'mps', 'cpu', or None for auto).
            Note: MPS may have numerical inconsistencies; use 'cpu' for
            reproducible results.
    """
    if not 0.0 <= threshold <= 1.0:
        raise ValueError("threshold must be between 0.0 and 1.0")
    if min_scene_length < 0:
        raise ValueError("min_scene_length must be non-negative")

    self.threshold = threshold
    self.min_scene_length = min_scene_length
    self._device: str | None = device
    self._model: Any = None

detect

detect(video: Video) -> list[SceneBoundary]

Detect scenes in a video using ML-based boundary detection.

Note: This method requires saving video to a temporary file for TransNetV2 processing. For better performance, use detect_streaming() with a file path directly.

Parameters:

Name Type Description Default
video Video

Video object to analyze.

required

Returns:

Type Description
list[SceneBoundary]

List of SceneBoundary objects representing detected scenes.

Source code in src/videopython/ai/understanding/temporal.py
def detect(self, video: Video) -> list[SceneBoundary]:
    """Detect scenes in a video using ML-based boundary detection.

    Note: This method requires saving video to a temporary file for
    TransNetV2 processing. For better performance, use detect_streaming()
    with a file path directly.

    Args:
        video: Video object to analyze.

    Returns:
        List of SceneBoundary objects representing detected scenes.
    """
    import tempfile

    if len(video.frames) == 0:
        return []

    if len(video.frames) == 1:
        return [SceneBoundary(start=0.0, end=video.total_seconds, start_frame=0, end_frame=1)]

    # Save video to temp file for TransNetV2 processing
    with tempfile.NamedTemporaryFile(suffix=".mp4", delete=True) as tmp:
        video.save(tmp.name)
        return self.detect_streaming(tmp.name)

detect_streaming

detect_streaming(
    path: str | Path,
    start_second: float | None = None,
    end_second: float | None = None,
) -> list[SceneBoundary]

Detect scenes from a video file.

Uses TransNetV2 with pretrained weights for accurate shot boundary detection.

Parameters:

Name Type Description Default
path str | Path

Path to video file.

required
start_second float | None

Optional start time for analysis (not yet supported).

None
end_second float | None

Optional end time for analysis (not yet supported).

None

Returns:

Type Description
list[SceneBoundary]

List of SceneBoundary objects representing detected scenes.

Source code in src/videopython/ai/understanding/temporal.py
def detect_streaming(
    self,
    path: str | Path,
    start_second: float | None = None,
    end_second: float | None = None,
) -> list[SceneBoundary]:
    """Detect scenes from a video file.

    Uses TransNetV2 with pretrained weights for accurate shot boundary
    detection.

    Args:
        path: Path to video file.
        start_second: Optional start time for analysis (not yet supported).
        end_second: Optional end time for analysis (not yet supported).

    Returns:
        List of SceneBoundary objects representing detected scenes.
    """
    if start_second is not None or end_second is not None:
        import warnings

        warnings.warn(
            "start_second and end_second are not yet supported by SemanticSceneDetector. Processing entire video.",
            UserWarning,
            stacklevel=2,
        )

    self._load_model()

    # Use TransNetV2's detect_scenes which handles everything internally
    raw_scenes = self._model.detect_scenes(str(path), threshold=self.threshold)

    # Convert to SceneBoundary objects
    scenes = []
    for scene_data in raw_scenes:
        start_frame = scene_data["start_frame"]
        end_frame = scene_data["end_frame"]
        start_time = float(scene_data["start_time"])
        end_time = float(scene_data["end_time"])

        scenes.append(
            SceneBoundary(
                start=start_time,
                end=end_time,
                start_frame=start_frame,
                end_frame=end_frame,
            )
        )

    if self.min_scene_length > 0:
        scenes = self._merge_short_scenes(scenes)

    return scenes

detect_from_path classmethod

detect_from_path(
    path: str | Path,
    threshold: float = 0.5,
    min_scene_length: float = 0.5,
) -> list[SceneBoundary]

Convenience method for one-shot scene detection.

Parameters:

Name Type Description Default
path str | Path

Path to video file.

required
threshold float

Scene boundary threshold (0.0-1.0).

0.5
min_scene_length float

Minimum scene duration in seconds.

0.5

Returns:

Type Description
list[SceneBoundary]

List of SceneBoundary objects representing detected scenes.

Source code in src/videopython/ai/understanding/temporal.py
@classmethod
def detect_from_path(
    cls,
    path: str | Path,
    threshold: float = 0.5,
    min_scene_length: float = 0.5,
) -> list[SceneBoundary]:
    """Convenience method for one-shot scene detection.

    Args:
        path: Path to video file.
        threshold: Scene boundary threshold (0.0-1.0).
        min_scene_length: Minimum scene duration in seconds.

    Returns:
        List of SceneBoundary objects representing detected scenes.
    """
    detector = cls(threshold=threshold, min_scene_length=min_scene_length)
    return detector.detect_streaming(path)

Scene Data Classes

These classes are used by SceneDetector to represent analysis results:

SceneBoundary

SceneBoundary dataclass

Timing information for a detected scene.

A lightweight structure representing scene boundaries detected by SceneDetector. This is a backbone type - higher-level scene analysis belongs in orchestration packages.

Attributes:

Name Type Description
start float

Scene start time in seconds

end float

Scene end time in seconds

start_frame int

Index of the first frame in this scene

end_frame int

Index of the last frame in this scene (exclusive)

Source code in src/videopython/base/description.py
@dataclass
class SceneBoundary:
    """Timing information for a detected scene.

    A lightweight structure representing scene boundaries detected by SceneDetector.
    This is a backbone type - higher-level scene analysis belongs in orchestration packages.

    Attributes:
        start: Scene start time in seconds
        end: Scene end time in seconds
        start_frame: Index of the first frame in this scene
        end_frame: Index of the last frame in this scene (exclusive)
    """

    start: float
    end: float
    start_frame: int
    end_frame: int

    @property
    def duration(self) -> float:
        """Duration of the scene in seconds."""
        return self.end - self.start

    @property
    def frame_count(self) -> int:
        """Number of frames in this scene."""
        return self.end_frame - self.start_frame

    def to_dict(self) -> dict:
        """Convert to dictionary for JSON serialization."""
        return {
            "start": self.start,
            "end": self.end,
            "start_frame": self.start_frame,
            "end_frame": self.end_frame,
        }

    @classmethod
    def from_dict(cls, data: dict) -> "SceneBoundary":
        """Create SceneBoundary from dictionary."""
        return cls(
            start=data["start"],
            end=data["end"],
            start_frame=data["start_frame"],
            end_frame=data["end_frame"],
        )

duration property

duration: float

Duration of the scene in seconds.

frame_count property

frame_count: int

Number of frames in this scene.

to_dict

to_dict() -> dict

Convert to dictionary for JSON serialization.

Source code in src/videopython/base/description.py
def to_dict(self) -> dict:
    """Convert to dictionary for JSON serialization."""
    return {
        "start": self.start,
        "end": self.end,
        "start_frame": self.start_frame,
        "end_frame": self.end_frame,
    }

from_dict classmethod

from_dict(data: dict) -> 'SceneBoundary'

Create SceneBoundary from dictionary.

Source code in src/videopython/base/description.py
@classmethod
def from_dict(cls, data: dict) -> "SceneBoundary":
    """Create SceneBoundary from dictionary."""
    return cls(
        start=data["start"],
        end=data["end"],
        start_frame=data["start_frame"],
        end_frame=data["end_frame"],
    )

BoundingBox

BoundingBox dataclass

A bounding box for detected objects in an image.

Coordinates are normalized to [0, 1] range relative to image dimensions.

Attributes:

Name Type Description
x float

Left edge of the box (0 = left edge of image)

y float

Top edge of the box (0 = top edge of image)

width float

Width of the box

height float

Height of the box

Source code in src/videopython/base/description.py
@dataclass
class BoundingBox:
    """A bounding box for detected objects in an image.

    Coordinates are normalized to [0, 1] range relative to image dimensions.

    Attributes:
        x: Left edge of the box (0 = left edge of image)
        y: Top edge of the box (0 = top edge of image)
        width: Width of the box
        height: Height of the box
    """

    x: float
    y: float
    width: float
    height: float

    @property
    def center(self) -> tuple[float, float]:
        """Center point of the bounding box."""
        return (self.x + self.width / 2, self.y + self.height / 2)

    @property
    def area(self) -> float:
        """Area of the bounding box (normalized)."""
        return self.width * self.height

    def to_dict(self) -> dict:
        """Convert to dictionary for JSON serialization."""
        return {"x": self.x, "y": self.y, "width": self.width, "height": self.height}

    @classmethod
    def from_dict(cls, data: dict) -> BoundingBox:
        """Create BoundingBox from dictionary."""
        return cls(x=data["x"], y=data["y"], width=data["width"], height=data["height"])

center property

center: tuple[float, float]

Center point of the bounding box.

area property

area: float

Area of the bounding box (normalized).

to_dict

to_dict() -> dict

Convert to dictionary for JSON serialization.

Source code in src/videopython/base/description.py
def to_dict(self) -> dict:
    """Convert to dictionary for JSON serialization."""
    return {"x": self.x, "y": self.y, "width": self.width, "height": self.height}

from_dict classmethod

from_dict(data: dict) -> BoundingBox

Create BoundingBox from dictionary.

Source code in src/videopython/base/description.py
@classmethod
def from_dict(cls, data: dict) -> BoundingBox:
    """Create BoundingBox from dictionary."""
    return cls(x=data["x"], y=data["y"], width=data["width"], height=data["height"])

DetectedObject

DetectedObject dataclass

An object detected in a video frame.

Attributes:

Name Type Description
label str

Name/class of the detected object (e.g., "person", "car", "dog")

confidence float

Detection confidence score between 0 and 1

bounding_box BoundingBox | None

Optional bounding box location of the object

Source code in src/videopython/base/description.py
@dataclass
class DetectedObject:
    """An object detected in a video frame.

    Attributes:
        label: Name/class of the detected object (e.g., "person", "car", "dog")
        confidence: Detection confidence score between 0 and 1
        bounding_box: Optional bounding box location of the object
    """

    label: str
    confidence: float
    bounding_box: BoundingBox | None = None

    def to_dict(self) -> dict:
        """Convert to dictionary for JSON serialization."""
        return {
            "label": self.label,
            "confidence": self.confidence,
            "bounding_box": self.bounding_box.to_dict() if self.bounding_box else None,
        }

    @classmethod
    def from_dict(cls, data: dict) -> DetectedObject:
        """Create DetectedObject from dictionary."""
        return cls(
            label=data["label"],
            confidence=data["confidence"],
            bounding_box=BoundingBox.from_dict(data["bounding_box"]) if data.get("bounding_box") else None,
        )

to_dict

to_dict() -> dict

Convert to dictionary for JSON serialization.

Source code in src/videopython/base/description.py
def to_dict(self) -> dict:
    """Convert to dictionary for JSON serialization."""
    return {
        "label": self.label,
        "confidence": self.confidence,
        "bounding_box": self.bounding_box.to_dict() if self.bounding_box else None,
    }

from_dict classmethod

from_dict(data: dict) -> DetectedObject

Create DetectedObject from dictionary.

Source code in src/videopython/base/description.py
@classmethod
def from_dict(cls, data: dict) -> DetectedObject:
    """Create DetectedObject from dictionary."""
    return cls(
        label=data["label"],
        confidence=data["confidence"],
        bounding_box=BoundingBox.from_dict(data["bounding_box"]) if data.get("bounding_box") else None,
    )

DetectedText

DetectedText dataclass

Text detected in a video frame.

Attributes:

Name Type Description
text str

OCR text content

confidence float

Detection confidence score between 0 and 1

bounding_box BoundingBox | None

Optional normalized bounding box for the text region

Source code in src/videopython/base/description.py
@dataclass
class DetectedText:
    """Text detected in a video frame.

    Attributes:
        text: OCR text content
        confidence: Detection confidence score between 0 and 1
        bounding_box: Optional normalized bounding box for the text region
    """

    text: str
    confidence: float
    bounding_box: BoundingBox | None = None

    def to_dict(self) -> dict:
        """Convert to dictionary for JSON serialization."""
        return {
            "text": self.text,
            "confidence": self.confidence,
            "bounding_box": self.bounding_box.to_dict() if self.bounding_box else None,
        }

    @classmethod
    def from_dict(cls, data: dict) -> "DetectedText":
        """Create DetectedText from dictionary."""
        return cls(
            text=data["text"],
            confidence=data["confidence"],
            bounding_box=BoundingBox.from_dict(data["bounding_box"]) if data.get("bounding_box") else None,
        )

to_dict

to_dict() -> dict

Convert to dictionary for JSON serialization.

Source code in src/videopython/base/description.py
def to_dict(self) -> dict:
    """Convert to dictionary for JSON serialization."""
    return {
        "text": self.text,
        "confidence": self.confidence,
        "bounding_box": self.bounding_box.to_dict() if self.bounding_box else None,
    }

from_dict classmethod

from_dict(data: dict) -> 'DetectedText'

Create DetectedText from dictionary.

Source code in src/videopython/base/description.py
@classmethod
def from_dict(cls, data: dict) -> "DetectedText":
    """Create DetectedText from dictionary."""
    return cls(
        text=data["text"],
        confidence=data["confidence"],
        bounding_box=BoundingBox.from_dict(data["bounding_box"]) if data.get("bounding_box") else None,
    )

AudioEvent

AudioEvent dataclass

A detected audio event with timestamp.

Attributes:

Name Type Description
start float

Start time in seconds

end float

End time in seconds

label str

Name of the detected sound (e.g., "Music", "Speech", "Dog bark")

confidence float

Detection confidence score between 0 and 1

Source code in src/videopython/base/description.py
@dataclass
class AudioEvent:
    """A detected audio event with timestamp.

    Attributes:
        start: Start time in seconds
        end: End time in seconds
        label: Name of the detected sound (e.g., "Music", "Speech", "Dog bark")
        confidence: Detection confidence score between 0 and 1
    """

    start: float
    end: float
    label: str
    confidence: float

    @property
    def duration(self) -> float:
        """Duration of the audio event in seconds."""
        return self.end - self.start

    def to_dict(self) -> dict:
        """Convert to dictionary for JSON serialization."""
        return {
            "start": self.start,
            "end": self.end,
            "label": self.label,
            "confidence": self.confidence,
        }

    @classmethod
    def from_dict(cls, data: dict) -> AudioEvent:
        """Create AudioEvent from dictionary."""
        return cls(
            start=data["start"],
            end=data["end"],
            label=data["label"],
            confidence=data["confidence"],
        )

duration property

duration: float

Duration of the audio event in seconds.

to_dict

to_dict() -> dict

Convert to dictionary for JSON serialization.

Source code in src/videopython/base/description.py
def to_dict(self) -> dict:
    """Convert to dictionary for JSON serialization."""
    return {
        "start": self.start,
        "end": self.end,
        "label": self.label,
        "confidence": self.confidence,
    }

from_dict classmethod

from_dict(data: dict) -> AudioEvent

Create AudioEvent from dictionary.

Source code in src/videopython/base/description.py
@classmethod
def from_dict(cls, data: dict) -> AudioEvent:
    """Create AudioEvent from dictionary."""
    return cls(
        start=data["start"],
        end=data["end"],
        label=data["label"],
        confidence=data["confidence"],
    )

AudioClassification

AudioClassification dataclass

Complete audio classification results.

Attributes:

Name Type Description
events list[AudioEvent]

List of detected audio events with timestamps

clip_predictions dict[str, float]

Overall class probabilities for the entire audio clip

Source code in src/videopython/base/description.py
@dataclass
class AudioClassification:
    """Complete audio classification results.

    Attributes:
        events: List of detected audio events with timestamps
        clip_predictions: Overall class probabilities for the entire audio clip
    """

    events: list[AudioEvent]
    clip_predictions: dict[str, float] = field(default_factory=dict)

    def to_dict(self) -> dict:
        """Convert to dictionary for JSON serialization."""
        return {
            "events": [event.to_dict() for event in self.events],
            "clip_predictions": self.clip_predictions,
        }

    @classmethod
    def from_dict(cls, data: dict) -> "AudioClassification":
        """Create AudioClassification from dictionary."""
        return cls(
            events=[AudioEvent.from_dict(event) for event in data.get("events", [])],
            clip_predictions={k: float(v) for k, v in data.get("clip_predictions", {}).items()},
        )

to_dict

to_dict() -> dict

Convert to dictionary for JSON serialization.

Source code in src/videopython/base/description.py
def to_dict(self) -> dict:
    """Convert to dictionary for JSON serialization."""
    return {
        "events": [event.to_dict() for event in self.events],
        "clip_predictions": self.clip_predictions,
    }

from_dict classmethod

from_dict(data: dict) -> 'AudioClassification'

Create AudioClassification from dictionary.

Source code in src/videopython/base/description.py
@classmethod
def from_dict(cls, data: dict) -> "AudioClassification":
    """Create AudioClassification from dictionary."""
    return cls(
        events=[AudioEvent.from_dict(event) for event in data.get("events", [])],
        clip_predictions={k: float(v) for k, v in data.get("clip_predictions", {}).items()},
    )

MotionInfo

MotionInfo dataclass

Motion characteristics between consecutive frames.

Attributes:

Name Type Description
motion_type str

Classification of camera/scene motion - "static": No significant motion - "pan": Horizontal camera movement - "tilt": Vertical camera movement - "zoom": Camera zoom in/out - "complex": Mixed or irregular motion

magnitude float

Normalized motion magnitude (0.0 = no motion, 1.0 = high motion)

raw_magnitude float

Raw optical flow magnitude (pixels/frame)

Source code in src/videopython/base/description.py
@dataclass
class MotionInfo:
    """Motion characteristics between consecutive frames.

    Attributes:
        motion_type: Classification of camera/scene motion
            - "static": No significant motion
            - "pan": Horizontal camera movement
            - "tilt": Vertical camera movement
            - "zoom": Camera zoom in/out
            - "complex": Mixed or irregular motion
        magnitude: Normalized motion magnitude (0.0 = no motion, 1.0 = high motion)
        raw_magnitude: Raw optical flow magnitude (pixels/frame)
    """

    motion_type: str
    magnitude: float
    raw_magnitude: float

    @property
    def is_static(self) -> bool:
        """Check if this frame has no significant motion."""
        return self.motion_type == "static"

    @property
    def is_dynamic(self) -> bool:
        """Check if this frame has significant motion."""
        return self.motion_type != "static"

    def to_dict(self) -> dict:
        """Convert to dictionary for JSON serialization."""
        return {
            "motion_type": self.motion_type,
            "magnitude": self.magnitude,
            "raw_magnitude": self.raw_magnitude,
        }

    @classmethod
    def from_dict(cls, data: dict) -> MotionInfo:
        """Create MotionInfo from dictionary."""
        return cls(
            motion_type=data["motion_type"],
            magnitude=data["magnitude"],
            raw_magnitude=data["raw_magnitude"],
        )

is_static property

is_static: bool

Check if this frame has no significant motion.

is_dynamic property

is_dynamic: bool

Check if this frame has significant motion.

to_dict

to_dict() -> dict

Convert to dictionary for JSON serialization.

Source code in src/videopython/base/description.py
def to_dict(self) -> dict:
    """Convert to dictionary for JSON serialization."""
    return {
        "motion_type": self.motion_type,
        "magnitude": self.magnitude,
        "raw_magnitude": self.raw_magnitude,
    }

from_dict classmethod

from_dict(data: dict) -> MotionInfo

Create MotionInfo from dictionary.

Source code in src/videopython/base/description.py
@classmethod
def from_dict(cls, data: dict) -> MotionInfo:
    """Create MotionInfo from dictionary."""
    return cls(
        motion_type=data["motion_type"],
        magnitude=data["magnitude"],
        raw_magnitude=data["raw_magnitude"],
    )

DetectedAction

DetectedAction dataclass

An action/activity detected in a video segment.

Attributes:

Name Type Description
label str

Name of the detected action (e.g., "walking", "running", "dancing")

confidence float

Detection confidence score between 0 and 1

start_frame int | None

Start frame index of the action

end_frame int | None

End frame index of the action (exclusive)

start_time float | None

Start time in seconds

end_time float | None

End time in seconds

Source code in src/videopython/base/description.py
@dataclass
class DetectedAction:
    """An action/activity detected in a video segment.

    Attributes:
        label: Name of the detected action (e.g., "walking", "running", "dancing")
        confidence: Detection confidence score between 0 and 1
        start_frame: Start frame index of the action
        end_frame: End frame index of the action (exclusive)
        start_time: Start time in seconds
        end_time: End time in seconds
    """

    label: str
    confidence: float
    start_frame: int | None = None
    end_frame: int | None = None
    start_time: float | None = None
    end_time: float | None = None

    @property
    def duration(self) -> float | None:
        """Duration of the action in seconds."""
        if self.start_time is not None and self.end_time is not None:
            return self.end_time - self.start_time
        return None

    def to_dict(self) -> dict:
        """Convert to dictionary for JSON serialization."""
        return {
            "label": self.label,
            "confidence": self.confidence,
            "start_frame": self.start_frame,
            "end_frame": self.end_frame,
            "start_time": self.start_time,
            "end_time": self.end_time,
        }

    @classmethod
    def from_dict(cls, data: dict) -> "DetectedAction":
        """Create DetectedAction from dictionary."""
        return cls(
            label=data["label"],
            confidence=data["confidence"],
            start_frame=data.get("start_frame"),
            end_frame=data.get("end_frame"),
            start_time=data.get("start_time"),
            end_time=data.get("end_time"),
        )

duration property

duration: float | None

Duration of the action in seconds.

to_dict

to_dict() -> dict

Convert to dictionary for JSON serialization.

Source code in src/videopython/base/description.py
def to_dict(self) -> dict:
    """Convert to dictionary for JSON serialization."""
    return {
        "label": self.label,
        "confidence": self.confidence,
        "start_frame": self.start_frame,
        "end_frame": self.end_frame,
        "start_time": self.start_time,
        "end_time": self.end_time,
    }

from_dict classmethod

from_dict(data: dict) -> 'DetectedAction'

Create DetectedAction from dictionary.

Source code in src/videopython/base/description.py
@classmethod
def from_dict(cls, data: dict) -> "DetectedAction":
    """Create DetectedAction from dictionary."""
    return cls(
        label=data["label"],
        confidence=data["confidence"],
        start_frame=data.get("start_frame"),
        end_frame=data.get("end_frame"),
        start_time=data.get("start_time"),
        end_time=data.get("end_time"),
    )