class VideoAnalyzer:
"""Orchestrates understanding analyzers and builds `VideoAnalysis` output."""
def __init__(self, config: VideoAnalysisConfig | None = None):
self.config = config or VideoAnalysisConfig()
def analyze_path(self, path: str | Path) -> VideoAnalysis:
"""Analyze a video path with bounded frame memory usage."""
path_obj = Path(path)
metadata = VideoMetadata.from_path(path_obj)
source = self._build_source_from_path(path_obj, metadata)
return self._analyze(video=None, source_path=path_obj, metadata=metadata, source=source)
def analyze(self, video: Video, *, source_path: str | Path | None = None) -> VideoAnalysis:
"""Analyze an in-memory Video object."""
metadata = VideoMetadata.from_video(video)
source = self._build_source_from_video(video=video, source_path=source_path, metadata=metadata)
return self._analyze(
video=video,
source_path=Path(source_path) if source_path else None,
metadata=metadata,
source=source,
)
def _analyze(
self,
*,
video: Video | None,
source_path: Path | None,
metadata: VideoMetadata,
source: VideoAnalysisSource,
) -> VideoAnalysis:
mode = "path" if source_path is not None else "video"
if source_path is None and video is None:
raise ValueError("Either `source_path` or `video` must be provided")
started = time.perf_counter()
steps: dict[str, AnalysisStepStatus] = {}
for analyzer_id in ALL_ANALYZER_IDS:
if analyzer_id not in self.config.enabled_analyzers:
steps[analyzer_id] = AnalysisStepStatus(status="skipped", warning="Disabled in config")
run_info = AnalysisRunInfo(
created_at=_utc_now_iso(),
mode=mode,
library_version=_library_version(),
elapsed_seconds=None,
)
audio_section = AudioAnalysisSection()
temporal_section = TemporalAnalysisSection()
motion_section = MotionAnalysisSection()
audio_cache: Audio | None = None
def get_path_audio() -> Audio:
nonlocal audio_cache
if audio_cache is None:
if source_path is None:
raise RuntimeError("Path audio requested for in-memory analysis without source path")
audio_cache = Audio.from_path(source_path)
return audio_cache
if AUDIO_TO_TEXT in self.config.enabled_analyzers:
audio_input: Audio | Video
if source_path is not None:
audio_input = get_path_audio()
else:
assert video is not None
audio_input = video
transcription = self._run_step(
steps,
AUDIO_TO_TEXT,
lambda: AudioToText(**self._analyzer_kwargs(AUDIO_TO_TEXT)).transcribe(audio_input),
optional=AUDIO_TO_TEXT in self.config.optional_analyzers,
)
if transcription is not None:
audio_section.transcription = transcription
if AUDIO_CLASSIFIER in self.config.enabled_analyzers:
classifier_input: Audio | Video
if source_path is not None:
classifier_input = get_path_audio()
else:
assert video is not None
classifier_input = video
classification = self._run_step(
steps,
AUDIO_CLASSIFIER,
lambda: AudioClassifier(**self._analyzer_kwargs(AUDIO_CLASSIFIER)).classify(classifier_input),
optional=AUDIO_CLASSIFIER in self.config.optional_analyzers,
)
if classification is not None:
audio_section.classification = classification
scenes: list[SceneBoundary] = []
if SEMANTIC_SCENE_DETECTOR in self.config.enabled_analyzers:
scene_detector = SemanticSceneDetector(**self._analyzer_kwargs(SEMANTIC_SCENE_DETECTOR))
scenes_result = self._run_step(
steps,
SEMANTIC_SCENE_DETECTOR,
lambda: scene_detector.detect_streaming(source_path)
if source_path is not None
else scene_detector.detect(_require_video(video)),
optional=SEMANTIC_SCENE_DETECTOR in self.config.optional_analyzers,
)
if scenes_result is not None:
scenes = scenes_result
temporal_section.scenes = scenes_result
if ACTION_RECOGNIZER in self.config.enabled_analyzers:
action_recognizer = ActionRecognizer(**self._analyzer_kwargs(ACTION_RECOGNIZER))
actions_result = self._run_step(
steps,
ACTION_RECOGNIZER,
lambda: self._run_action_recognition(
action_recognizer=action_recognizer,
source_path=source_path,
video=video,
scenes=scenes,
),
optional=ACTION_RECOGNIZER in self.config.optional_analyzers,
)
if actions_result is not None:
temporal_section.actions = actions_result
if MOTION_ANALYZER in self.config.enabled_analyzers:
motion_analyzer = MotionAnalyzer(**self._analyzer_kwargs(MOTION_ANALYZER))
if source_path is not None:
motion_timeline = self._run_step(
steps,
MOTION_ANALYZER,
lambda: motion_analyzer.analyze_video_path(
source_path,
frames_per_second=max(float(self.config.frames_per_second), 0.1),
),
optional=MOTION_ANALYZER in self.config.optional_analyzers,
)
if motion_timeline is not None:
motion_section.motion_timeline = [
MotionTimelineSample(timestamp=float(ts), motion=motion) for ts, motion in motion_timeline
]
motion_section.video_motion = [sample.motion for sample in motion_section.motion_timeline]
else:
motion_result = self._run_step(
steps,
MOTION_ANALYZER,
lambda: motion_analyzer.analyze_video(_require_video(video)),
optional=MOTION_ANALYZER in self.config.optional_analyzers,
)
if motion_result is not None:
motion_section.video_motion = motion_result
effective_max_frames = self._effective_max_frames(metadata)
frame_indices = self._plan_frame_indices(
metadata=metadata,
scenes=scenes,
effective_max_frames=effective_max_frames,
)
sampling = FrameSamplingReport(
mode=self.config.frame_sampling_mode,
frames_per_second=self.config.frames_per_second,
max_frames=self.config.max_frames,
sampled_indices=frame_indices,
sampled_timestamps=[round(idx / metadata.fps, 6) for idx in frame_indices],
access_mode=None,
effective_max_frames=effective_max_frames,
)
frame_steps_runtime: dict[str, dict[str, Any]] = {}
camera_samples: list[CameraMotionSample] = []
frame_samples: list[FrameAnalysisSample] = []
frame_work_ids = (
OBJECT_DETECTOR,
FACE_DETECTOR,
TEXT_DETECTOR,
IMAGE_TO_TEXT,
CAMERA_MOTION_DETECTOR,
)
frame_work_enabled = any(step_id in self.config.enabled_analyzers for step_id in frame_work_ids)
if frame_indices and frame_work_enabled:
frame_steps_runtime = self._initialize_frame_steps(steps)
if frame_steps_runtime:
access_mode = self._choose_frame_access_mode(len(frame_indices), metadata.frame_count)
sampling.access_mode = access_mode
if source_path is not None:
frame_samples, camera_samples = self._process_path_samples(
path=source_path,
metadata=metadata,
frame_indices=frame_indices,
frame_steps_runtime=frame_steps_runtime,
steps=steps,
)
else:
frame_samples, camera_samples = self._process_video_samples(
video=video,
frame_indices=frame_indices,
frame_steps_runtime=frame_steps_runtime,
steps=steps,
)
self._finalize_frame_steps(frame_steps_runtime=frame_steps_runtime, steps=steps)
elif frame_work_enabled:
for step_id in frame_work_ids:
if step_id in self.config.enabled_analyzers and step_id not in steps:
steps[step_id] = AnalysisStepStatus(status="skipped", warning="No frames sampled")
if camera_samples:
motion_section.camera_motion_samples = camera_samples
frames_section = FrameAnalysisSection(sampling=sampling, samples=frame_samples)
run_info.elapsed_seconds = time.perf_counter() - started
summary = self._build_summary(
temporal_section=temporal_section,
audio_section=audio_section,
motion_section=motion_section,
frame_samples=frame_samples,
)
return VideoAnalysis(
source=source,
config=self.config,
run_info=run_info,
steps=steps,
audio=audio_section if (audio_section.transcription or audio_section.classification) else None,
temporal=temporal_section if (temporal_section.scenes or temporal_section.actions) else None,
motion=motion_section
if (motion_section.video_motion or motion_section.motion_timeline or motion_section.camera_motion_samples)
else None,
frames=frames_section,
summary=summary,
)
def _run_step(
self,
steps: dict[str, AnalysisStepStatus],
step_id: str,
func: Any,
*,
optional: bool,
) -> Any:
started = time.perf_counter()
try:
result = func()
except Exception as exc:
duration = time.perf_counter() - started
status = "skipped" if optional else "failed"
steps[step_id] = AnalysisStepStatus(status=status, duration_seconds=duration, error=str(exc))
if self._should_raise(optional=optional):
raise
return None
steps[step_id] = AnalysisStepStatus(status="succeeded", duration_seconds=time.perf_counter() - started)
return result
def _should_raise(self, *, optional: bool) -> bool:
if optional:
return False
return self.config.fail_fast or (not self.config.best_effort)
def _analyzer_kwargs(self, analyzer_id: str) -> dict[str, Any]:
return dict(self.config.analyzer_params.get(analyzer_id, {}))
def _run_action_recognition(
self,
*,
action_recognizer: ActionRecognizer,
source_path: Path | None,
video: Video | None,
scenes: list[SceneBoundary],
) -> list[DetectedAction]:
def analyze_full_video() -> list[DetectedAction]:
if source_path is not None:
return action_recognizer.recognize_path(source_path)
return action_recognizer.recognize(_require_video(video))
if self.config.action_scope == "video":
return analyze_full_video()
use_scene_scope = False
if self.config.action_scope == "scene":
use_scene_scope = bool(scenes)
elif self.config.action_scope == "adaptive":
use_scene_scope = bool(scenes) and (
self.config.max_action_scenes is None or len(scenes) <= self.config.max_action_scenes
)
if not use_scene_scope:
return analyze_full_video()
selected_scenes = self._select_action_scenes(scenes)
if not selected_scenes:
return analyze_full_video()
if source_path is not None:
actions: list[DetectedAction] = []
for scene in selected_scenes:
actions.extend(
action_recognizer.recognize_path(
source_path,
start_second=scene.start,
end_second=scene.end,
)
)
return actions
current_video = _require_video(video)
return self._recognize_actions_on_video_scenes(
action_recognizer=action_recognizer,
video=current_video,
scenes=selected_scenes,
)
def _select_action_scenes(self, scenes: list[SceneBoundary]) -> list[SceneBoundary]:
selected = [scene for scene in scenes if scene.end > scene.start and scene.end_frame > scene.start_frame]
max_action_scenes = self.config.max_action_scenes
if max_action_scenes is not None and len(selected) > max_action_scenes:
picks = np.linspace(0, len(selected) - 1, max_action_scenes, dtype=int)
selected = [selected[i] for i in picks]
return selected
def _recognize_actions_on_video_scenes(
self,
*,
action_recognizer: ActionRecognizer,
video: Video,
scenes: list[SceneBoundary],
) -> list[DetectedAction]:
actions: list[DetectedAction] = []
frame_count = len(video.frames)
if frame_count <= 0:
return actions
for scene in scenes:
start_frame = max(0, min(frame_count - 1, int(scene.start_frame)))
end_frame = max(start_frame + 1, min(frame_count, int(scene.end_frame)))
clip = Video.from_frames(video.frames[start_frame:end_frame], video.fps)
clip_actions = action_recognizer.recognize(clip)
for action in clip_actions:
action.start_frame = (
start_frame if action.start_frame is None else min(frame_count, start_frame + action.start_frame)
)
action.end_frame = (
end_frame if action.end_frame is None else min(frame_count, start_frame + action.end_frame)
)
action.start_time = scene.start if action.start_time is None else scene.start + action.start_time
action.end_time = (
scene.end if action.end_time is None else min(video.total_seconds, scene.start + action.end_time)
)
actions.extend(clip_actions)
return actions
def _build_summary(
self,
*,
temporal_section: TemporalAnalysisSection,
audio_section: AudioAnalysisSection,
motion_section: MotionAnalysisSection,
frame_samples: list[FrameAnalysisSample],
) -> dict[str, Any]:
summary: dict[str, Any] = {
"scene_count": len(temporal_section.scenes),
"action_count": len(temporal_section.actions),
"frame_sample_count": len(frame_samples),
"audio_events_count": len(audio_section.classification.events) if audio_section.classification else 0,
}
if temporal_section.actions:
action_counts = Counter(action.label for action in temporal_section.actions if action.label)
action_conf = {
label: max(a.confidence for a in temporal_section.actions if a.label == label)
for label in action_counts
}
summary["top_actions"] = [
{
"label": label,
"count": count,
"max_confidence": round(float(action_conf[label]), 4),
}
for label, count in action_counts.most_common(5)
]
if frame_samples:
face_present_count = sum(1 for sample in frame_samples if sample.faces)
summary["face_presence_ratio"] = round(face_present_count / len(frame_samples), 4)
object_labels: list[tuple[str, float]] = []
for sample in frame_samples:
for obj in sample.objects or []:
object_labels.append((obj.label, float(obj.confidence)))
if object_labels:
object_counts = Counter(label for label, _ in object_labels)
object_conf: dict[str, float] = {}
for label, conf in object_labels:
object_conf[label] = max(object_conf.get(label, 0.0), conf)
summary["top_objects"] = [
{
"label": label,
"count": count,
"max_confidence": round(object_conf[label], 4),
}
for label, count in object_counts.most_common(10)
]
text_tokens: Counter[str] = Counter()
for sample in frame_samples:
for text_item in sample.text or []:
for token in re.findall(r"[A-Za-z0-9]{3,}", text_item.lower()):
text_tokens[token] += 1
if text_tokens:
summary["top_ocr_terms"] = [
{"term": token, "count": count} for token, count in text_tokens.most_common(10)
]
if motion_section.camera_motion_samples:
camera_counts = Counter(item.label for item in motion_section.camera_motion_samples)
summary["camera_motion_distribution"] = dict(camera_counts)
elif motion_section.motion_timeline:
motion_counts = Counter(item.motion.motion_type for item in motion_section.motion_timeline)
summary["motion_type_distribution"] = dict(motion_counts)
return summary
def _initialize_frame_steps(self, steps: dict[str, AnalysisStepStatus]) -> dict[str, dict[str, Any]]:
runtime: dict[str, dict[str, Any]] = {}
for analyzer_id, analyzer_cls in (
(OBJECT_DETECTOR, ObjectDetector),
(TEXT_DETECTOR, TextDetector),
):
if analyzer_id not in self.config.enabled_analyzers:
continue
optional = analyzer_id in self.config.optional_analyzers
analyzer = self._create_analyzer(analyzer_cls, analyzer_id=analyzer_id, steps=steps, optional=optional)
if analyzer is not None:
runtime[analyzer_id] = self._frame_runtime(analyzer, optional=optional)
for analyzer_id, analyzer_cls2 in (
(FACE_DETECTOR, FaceDetector),
(IMAGE_TO_TEXT, ImageToText),
(CAMERA_MOTION_DETECTOR, CameraMotionDetector),
):
if analyzer_id not in self.config.enabled_analyzers:
continue
optional = analyzer_id in self.config.optional_analyzers
analyzer = self._create_analyzer(analyzer_cls2, analyzer_id=analyzer_id, steps=steps, optional=optional)
if analyzer is not None:
runtime[analyzer_id] = self._frame_runtime(analyzer, optional=optional)
return runtime
def _frame_runtime(self, analyzer: Any, *, optional: bool) -> dict[str, Any]:
return {
"analyzer": analyzer,
"optional": optional,
"started": time.perf_counter(),
"processed": 0,
"error": None,
}
def _create_analyzer(
self,
analyzer_cls: Any,
*,
analyzer_id: str,
steps: dict[str, AnalysisStepStatus],
optional: bool,
) -> Any | None:
started = time.perf_counter()
try:
analyzer = analyzer_cls(**self._analyzer_kwargs(analyzer_id))
except Exception as exc:
duration = time.perf_counter() - started
status = "skipped" if optional else "failed"
steps[analyzer_id] = AnalysisStepStatus(status=status, duration_seconds=duration, error=str(exc))
if self._should_raise(optional=optional):
raise
return None
return analyzer
def _finalize_frame_steps(
self,
*,
frame_steps_runtime: dict[str, dict[str, Any]],
steps: dict[str, AnalysisStepStatus],
) -> None:
for analyzer_id, runtime in frame_steps_runtime.items():
duration = time.perf_counter() - runtime["started"]
error = runtime["error"]
optional = bool(runtime["optional"])
processed = int(runtime["processed"])
if error is not None:
status = "skipped" if optional else "failed"
steps[analyzer_id] = AnalysisStepStatus(
status=status,
duration_seconds=duration,
error=error,
details={"processed_samples": processed},
)
else:
steps[analyzer_id] = AnalysisStepStatus(
status="succeeded",
duration_seconds=duration,
details={"processed_samples": processed},
)
def _process_path_samples(
self,
*,
path: Path,
metadata: VideoMetadata,
frame_indices: list[int],
frame_steps_runtime: dict[str, dict[str, Any]],
steps: dict[str, AnalysisStepStatus],
) -> tuple[list[FrameAnalysisSample], list[CameraMotionSample]]:
sample_count = len(frame_indices)
density = sample_count / max(metadata.frame_count, 1)
use_streaming = density >= 0.20
if use_streaming:
return self._process_path_samples_streaming(
path=path,
metadata=metadata,
frame_indices=frame_indices,
frame_steps_runtime=frame_steps_runtime,
steps=steps,
)
return self._process_path_samples_chunked(
path=path,
metadata=metadata,
frame_indices=frame_indices,
frame_steps_runtime=frame_steps_runtime,
steps=steps,
)
def _process_path_samples_streaming(
self,
*,
path: Path,
metadata: VideoMetadata,
frame_indices: list[int],
frame_steps_runtime: dict[str, dict[str, Any]],
steps: dict[str, AnalysisStepStatus],
) -> tuple[list[FrameAnalysisSample], list[CameraMotionSample]]:
samples: list[FrameAnalysisSample] = []
camera_samples: list[CameraMotionSample] = []
if not frame_indices:
return samples, camera_samples
target_indices = sorted(frame_indices)
target_pos = 0
start_second = target_indices[0] / metadata.fps
end_second = min(metadata.total_seconds, (target_indices[-1] + 1) / metadata.fps)
camera_window: deque[tuple[int, float, np.ndarray]] = deque(maxlen=max(self.config.camera_motion_stride, 1) + 1)
with FrameIterator(path, start_second=start_second, end_second=end_second) as iterator:
for frame_idx, frame in iterator:
while target_pos < len(target_indices) and target_indices[target_pos] < frame_idx:
target_pos += 1
if target_pos >= len(target_indices):
break
if frame_idx != target_indices[target_pos]:
continue
sample, camera_sample = self._analyze_sampled_frame(
frame=frame,
frame_index=frame_idx,
fps=metadata.fps,
frame_steps_runtime=frame_steps_runtime,
camera_window=camera_window,
steps=steps,
)
samples.append(sample)
if camera_sample is not None:
camera_samples.append(camera_sample)
target_pos += 1
if target_pos >= len(target_indices):
break
return samples, camera_samples
def _process_path_samples_chunked(
self,
*,
path: Path,
metadata: VideoMetadata,
frame_indices: list[int],
frame_steps_runtime: dict[str, dict[str, Any]],
steps: dict[str, AnalysisStepStatus],
) -> tuple[list[FrameAnalysisSample], list[CameraMotionSample]]:
samples: list[FrameAnalysisSample] = []
camera_samples: list[CameraMotionSample] = []
camera_window: deque[tuple[int, float, np.ndarray]] = deque(maxlen=max(self.config.camera_motion_stride, 1) + 1)
chunk_size = self._effective_frame_chunk_size(metadata)
for chunk_start in range(0, len(frame_indices), chunk_size):
chunk_indices = frame_indices[chunk_start : chunk_start + chunk_size]
chunk_frames = extract_frames_at_indices(path, chunk_indices)
for i, frame in enumerate(chunk_frames):
frame_idx = chunk_indices[i]
sample, camera_sample = self._analyze_sampled_frame(
frame=frame,
frame_index=frame_idx,
fps=metadata.fps,
frame_steps_runtime=frame_steps_runtime,
camera_window=camera_window,
steps=steps,
)
samples.append(sample)
if camera_sample is not None:
camera_samples.append(camera_sample)
return samples, camera_samples
def _process_video_samples(
self,
*,
video: Video | None,
frame_indices: list[int],
frame_steps_runtime: dict[str, dict[str, Any]],
steps: dict[str, AnalysisStepStatus],
) -> tuple[list[FrameAnalysisSample], list[CameraMotionSample]]:
if video is None:
return [], []
samples: list[FrameAnalysisSample] = []
camera_samples: list[CameraMotionSample] = []
camera_window: deque[tuple[int, float, np.ndarray]] = deque(maxlen=max(self.config.camera_motion_stride, 1) + 1)
for frame_idx in frame_indices:
sample, camera_sample = self._analyze_sampled_frame(
frame=video.frames[frame_idx],
frame_index=frame_idx,
fps=video.fps,
frame_steps_runtime=frame_steps_runtime,
camera_window=camera_window,
steps=steps,
)
samples.append(sample)
if camera_sample is not None:
camera_samples.append(camera_sample)
return samples, camera_samples
def _analyze_sampled_frame(
self,
*,
frame: np.ndarray,
frame_index: int,
fps: float,
frame_steps_runtime: dict[str, dict[str, Any]],
camera_window: deque[tuple[int, float, np.ndarray]],
steps: dict[str, AnalysisStepStatus],
) -> tuple[FrameAnalysisSample, CameraMotionSample | None]:
timestamp = round(frame_index / fps, 6)
sample = FrameAnalysisSample(
timestamp=timestamp,
frame_index=frame_index,
objects=[],
faces=[],
text=[],
text_regions=[],
)
step_results: dict[str, str] = {}
for analyzer_id in (
OBJECT_DETECTOR,
FACE_DETECTOR,
TEXT_DETECTOR,
IMAGE_TO_TEXT,
):
runtime = frame_steps_runtime.get(analyzer_id)
if runtime is None or runtime["error"] is not None:
continue
analyzer = runtime["analyzer"]
try:
if analyzer_id == OBJECT_DETECTOR:
sample.objects = analyzer.detect(frame)
elif analyzer_id == FACE_DETECTOR:
sample.faces = analyzer.detect(frame)
elif analyzer_id == TEXT_DETECTOR:
if hasattr(analyzer, "detect_detailed"):
detailed = analyzer.detect_detailed(frame)
sample.text_regions = detailed
sample.text = [item.text for item in detailed]
else:
sample.text = analyzer.detect(frame)
elif analyzer_id == IMAGE_TO_TEXT:
sample.image_caption = analyzer.describe_image(frame)
runtime["processed"] += 1
step_results[analyzer_id] = "ok"
except Exception as exc:
runtime["error"] = str(exc)
step_results[analyzer_id] = "error"
if self._should_raise(optional=bool(runtime["optional"])):
raise
camera_sample: CameraMotionSample | None = None
camera_runtime = frame_steps_runtime.get(CAMERA_MOTION_DETECTOR)
if camera_runtime is not None and camera_runtime["error"] is None:
camera_window.append((frame_index, timestamp, frame))
stride = max(int(self.config.camera_motion_stride), 1)
if len(camera_window) >= stride + 1:
first_idx, first_timestamp, first_frame = camera_window[0]
_, last_timestamp, last_frame = camera_window[-1]
try:
label = camera_runtime["analyzer"].detect(first_frame, last_frame)
camera_runtime["processed"] += 1
step_results[CAMERA_MOTION_DETECTOR] = "ok"
camera_sample = CameraMotionSample(start=first_timestamp, end=last_timestamp, label=label)
except Exception as exc:
camera_runtime["error"] = str(exc)
step_results[CAMERA_MOTION_DETECTOR] = "error"
if self._should_raise(optional=bool(camera_runtime["optional"])):
raise
sample.objects = sample.objects if sample.objects is not None else []
sample.faces = sample.faces if sample.faces is not None else []
sample.text = sample.text if sample.text is not None else []
sample.text_regions = sample.text_regions if sample.text_regions is not None else []
# Keep payload compact.
if step_results:
sample.step_results = step_results
return sample, camera_sample
def _choose_frame_access_mode(self, sampled_frames: int, total_frames: int) -> str:
if total_frames <= 0:
return "chunked"
density = sampled_frames / total_frames
return "streaming" if density >= 0.20 else "chunked"
def _effective_max_frames(self, metadata: VideoMetadata) -> int | None:
"""Compute the max sampled frames after applying explicit and memory budget limits."""
limits: list[int] = []
if self.config.max_frames is not None:
limits.append(int(self.config.max_frames))
if self.config.max_memory_mb is not None:
frame_bytes = metadata.width * metadata.height * 3
if frame_bytes > 0:
budget_bytes = int(self.config.max_memory_mb * 1024 * 1024)
# Reserve memory for model tensors and transient buffers.
usable_bytes = max(frame_bytes, int(budget_bytes * 0.5))
limits.append(max(1, usable_bytes // frame_bytes))
if not limits:
return None
return max(1, min(limits))
def _effective_frame_chunk_size(self, metadata: VideoMetadata) -> int:
chunk_size = max(1, int(self.config.frame_chunk_size))
effective_max_frames = self._effective_max_frames(metadata)
if effective_max_frames is None:
return chunk_size
return max(1, min(chunk_size, effective_max_frames))
def _apply_max_frames_limit(self, indices: list[int], max_frames: int | None) -> list[int]:
if max_frames is None or len(indices) <= max_frames:
return indices
picks = np.linspace(0, len(indices) - 1, max_frames, dtype=int)
return [indices[i] for i in picks]
def _plan_frame_indices(
self,
*,
metadata: VideoMetadata,
scenes: list[SceneBoundary],
effective_max_frames: int | None = None,
) -> list[int]:
if metadata.frame_count <= 0:
return []
mode = self.config.frame_sampling_mode
sampled: set[int] = set()
if mode in {"uniform", "hybrid"}:
fps = max(float(self.config.frames_per_second), 0.0)
if fps > 0:
interval = max(int(round(metadata.fps / fps)), 1)
sampled.update(range(0, metadata.frame_count, interval))
if mode in {"scene_boundary", "scene_representative", "hybrid"} and scenes:
if self.config.include_scene_boundaries:
sampled.update(max(0, min(metadata.frame_count - 1, scene.start_frame)) for scene in scenes)
if mode in {"scene_representative", "hybrid"}:
offset = min(max(self.config.scene_representative_offset, 0.0), 1.0)
for scene in scenes:
span = max(scene.end_frame - scene.start_frame, 1)
representative = scene.start_frame + int(round(offset * (span - 1)))
sampled.add(max(0, min(metadata.frame_count - 1, representative)))
if not sampled:
sampled.add(0)
ordered = sorted(sampled)
ordered = self._apply_max_frames_limit(ordered, effective_max_frames)
return ordered
def _build_source_from_path(self, path: Path, metadata: VideoMetadata) -> VideoAnalysisSource:
tags = self._extract_source_tags(path)
raw_tags = _sanitize_raw_tags(tags, redact_geo=self.config.redact_geo)
creation_time = _normalize_creation_time(next((tags[k] for k in _CREATION_TIME_TAG_KEYS if k in tags), None))
geo: GeoMetadata | None = None
if self.config.include_geo and not self.config.redact_geo:
geo = _parse_geo_metadata(tags)
title = tags.get("title") or path.stem
return VideoAnalysisSource(
title=title,
path=str(path),
filename=path.name,
duration=metadata.total_seconds,
fps=metadata.fps,
width=metadata.width,
height=metadata.height,
frame_count=metadata.frame_count,
creation_time=creation_time,
geo=geo,
raw_tags=raw_tags or None,
)
def _build_source_from_video(
self,
*,
video: Video,
source_path: str | Path | None,
metadata: VideoMetadata,
) -> VideoAnalysisSource:
path_obj = Path(source_path) if source_path is not None else None
tags = self._extract_source_tags(path_obj) if path_obj else {}
raw_tags = _sanitize_raw_tags(tags, redact_geo=self.config.redact_geo)
creation_time = _normalize_creation_time(next((tags[k] for k in _CREATION_TIME_TAG_KEYS if k in tags), None))
geo: GeoMetadata | None = None
if self.config.include_geo and not self.config.redact_geo:
geo = _parse_geo_metadata(tags)
title = tags.get("title") if tags else None
if title is None and path_obj is not None:
title = path_obj.stem
return VideoAnalysisSource(
title=title,
path=str(path_obj) if path_obj else None,
filename=path_obj.name if path_obj else None,
duration=video.total_seconds,
fps=metadata.fps,
width=metadata.width,
height=metadata.height,
frame_count=metadata.frame_count,
creation_time=creation_time,
geo=geo,
raw_tags=raw_tags or None,
)
def _extract_source_tags(self, path: Path | None) -> dict[str, str]:
if path is None:
return {}
cmd = [
"ffprobe",
"-v",
"error",
"-show_entries",
"format_tags:stream_tags",
"-of",
"json",
str(path),
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
payload = json.loads(result.stdout)
except Exception:
return {}
tags: dict[str, str] = {}
format_tags = payload.get("format", {}).get("tags", {})
if isinstance(format_tags, dict):
tags.update({str(k).lower(): str(v) for k, v in format_tags.items()})
for stream in payload.get("streams", []):
stream_tags = stream.get("tags", {})
if not isinstance(stream_tags, dict):
continue
for key, value in stream_tags.items():
lowered = str(key).lower()
tags.setdefault(lowered, str(value))
return tags