class VideoAnalyzer:
"""Orchestrates scene-first analyzers and builds `VideoAnalysis` output."""
def __init__(self, config: VideoAnalysisConfig | None = None):
self.config = config or VideoAnalysisConfig()
def analyze_path(self, path: str | Path) -> VideoAnalysis:
"""Analyze a video path in scene-first mode."""
path_obj = Path(path)
metadata = VideoMetadata.from_path(path_obj)
source = self._build_source(
metadata=metadata,
path_obj=path_obj,
duration_seconds=metadata.total_seconds,
title_fallback=path_obj.stem,
)
return self._analyze(video=None, source_path=path_obj, metadata=metadata, source=source)
def analyze(self, video: Video, *, source_path: str | Path | None = None) -> VideoAnalysis:
"""Analyze an in-memory `Video` object."""
path_obj = Path(source_path) if source_path else None
metadata = VideoMetadata.from_video(video)
source = self._build_source(
metadata=metadata,
path_obj=path_obj,
duration_seconds=video.total_seconds,
title_fallback=path_obj.stem if path_obj is not None else None,
)
return self._analyze(
video=video,
source_path=path_obj,
metadata=metadata,
source=source,
)
def _analyze(
self,
*,
video: Video | None,
source_path: Path | None,
metadata: VideoMetadata,
source: VideoAnalysisSource,
) -> VideoAnalysis:
mode = "path" if source_path is not None else "video"
if source_path is None and video is None:
raise ValueError("Either `source_path` or `video` must be provided")
enabled = self.config.enabled_analyzers
run_info = AnalysisRunInfo(
created_at=_utc_now_iso(),
mode=mode,
library_version=_library_version(),
)
t_analysis_start = time.perf_counter()
run_whisper = AUDIO_TO_TEXT in enabled
run_scene_det = SEMANTIC_SCENE_DETECTOR in enabled
transcription: Transcription | None = None
detected: list[SceneBoundary] | None = None
# Whisper and TransNetV2 operate on independent data (audio vs video
# frames) and both fit comfortably in GPU memory together. Run them
# concurrently via threads -- the GIL is released during GPU compute
# and ffmpeg I/O so real parallelism is achieved.
#
# SceneVLM is loaded *after* Whisper/TransNetV2 finish (not concurrently)
# because transformers' from_pretrained(torch_dtype="auto") mutates the
# process-global torch.get_default_dtype() during model construction,
# which corrupts Whisper's model weights if they're initialized at the
# same time.
if run_whisper and run_scene_det:
transcription, detected = self._run_whisper_and_scene_detection(source_path=source_path, video=video)
else:
if run_whisper:
t0 = time.perf_counter()
transcription = self._run_whisper(source_path=source_path, video=video)
logger.info("Whisper transcription completed in %.2fs", time.perf_counter() - t0)
if run_scene_det:
t0 = time.perf_counter()
detected = self._run_scene_detection(source_path=source_path, video=video)
logger.info("Scene detection completed in %.2fs", time.perf_counter() - t0)
if run_scene_det:
self._reset_transnetv2_torch_state()
# Whisper and TransNetV2 are done -- free their GPU memory before
# loading SceneVLM (~9GB). Python GC doesn't guarantee immediate
# cleanup, so force it and release the CUDA cache.
if run_whisper or run_scene_det:
gc.collect()
self._release_gpu_cache()
scenes = self._default_scene_boundaries(metadata)
if detected is not None:
scenes = self._normalize_scene_boundaries(detected, metadata)
if not scenes:
scenes = self._default_scene_boundaries(metadata)
t0 = time.perf_counter()
scene_section = self._analyze_scenes(
source_path=source_path,
video=video,
metadata=metadata,
scenes=scenes,
preloaded_scene_vlm=None,
)
logger.info("Scene analysis completed in %.2fs", time.perf_counter() - t0)
audio_section = AudioAnalysisSection(transcription=transcription) if transcription is not None else None
logger.info("Total analysis completed in %.2fs", time.perf_counter() - t_analysis_start)
return VideoAnalysis(
source=source,
config=self.config,
run_info=run_info,
audio=audio_section,
scenes=scene_section if scene_section.samples else None,
)
def _run_whisper(self, *, source_path: Path | None, video: Video | None) -> Transcription | None:
try:
return AudioToText(**self.config.get_params(AUDIO_TO_TEXT)).transcribe(
Audio.from_path(source_path) if source_path is not None else _require_video(video)
)
except Exception:
logger.warning("AudioToText failed, skipping transcription", exc_info=True)
return None
def _run_scene_detection(self, *, source_path: Path | None, video: Video | None) -> list[SceneBoundary] | None:
try:
scene_detector = SemanticSceneDetector(**self.config.get_params(SEMANTIC_SCENE_DETECTOR))
return (
scene_detector.detect_streaming(source_path)
if source_path is not None
else scene_detector.detect(_require_video(video))
)
except Exception:
logger.warning("SemanticSceneDetector failed, using default scene boundaries", exc_info=True)
return None
def _run_whisper_and_scene_detection(
self, *, source_path: Path | None, video: Video | None
) -> tuple[Transcription | None, list[SceneBoundary] | None]:
with ThreadPoolExecutor(max_workers=2) as pool:
t0 = time.perf_counter()
whisper_future = pool.submit(self._run_whisper, source_path=source_path, video=video)
scene_future = pool.submit(self._run_scene_detection, source_path=source_path, video=video)
transcription = whisper_future.result()
detected = scene_future.result()
elapsed = time.perf_counter() - t0
logger.info("Whisper + scene detection (parallel) completed in %.2fs", elapsed)
return transcription, detected
@staticmethod
def _reset_transnetv2_torch_state() -> None:
"""Reset global torch state that TransNetV2 sets during init.
TransNetV2 sets torch.use_deterministic_algorithms(True) and
cudnn.benchmark=False globally. Reset to defaults so subsequent
models (especially SceneVLM's convolution-heavy ViT) can use cuDNN
autotuner and non-deterministic kernels for better throughput.
"""
try:
import torch
torch.use_deterministic_algorithms(False)
if torch.backends.cudnn.is_available():
torch.backends.cudnn.deterministic = False
torch.backends.cudnn.benchmark = True
except ImportError:
pass
@staticmethod
def _release_gpu_cache() -> None:
"""Force-release unused GPU memory back to the CUDA allocator."""
try:
import torch
if torch.cuda.is_available():
torch.cuda.empty_cache()
except ImportError:
pass
def _analyze_scenes(
self,
*,
source_path: Path | None,
video: Video | None,
metadata: VideoMetadata,
scenes: list[SceneBoundary],
preloaded_scene_vlm: SceneVLM | None = None,
) -> SceneAnalysisSection:
enabled = self.config.enabled_analyzers
scene_vlm: SceneVLM | None
if preloaded_scene_vlm is not None:
scene_vlm = preloaded_scene_vlm
else:
try:
scene_vlm = SceneVLM(**self.config.get_params(SCENE_VLM)) if SCENE_VLM in enabled else None
except Exception:
logger.warning("Failed to initialize SceneVLM, skipping visual understanding", exc_info=True)
scene_vlm = None
try:
audio_classifier = (
AudioClassifier(**self.config.get_params(AUDIO_CLASSIFIER)) if AUDIO_CLASSIFIER in enabled else None
)
except Exception:
logger.warning("Failed to initialize AudioClassifier, skipping audio classification", exc_info=True)
audio_classifier = None
path_audio: Audio | None = None
if audio_classifier is not None and source_path is not None:
try:
path_audio = Audio.from_path(source_path)
except Exception:
logger.warning(
"Failed to load audio from path, audio classification will use clip fallback", exc_info=True
)
path_audio = None
# -- Batched SceneVLM: collect all timestamps, extract frames once, run one forward pass --
captions: list[str | None] = [None] * len(scenes)
if scene_vlm is not None:
try:
captions = self._run_scene_vlm_batched(
scene_vlm=scene_vlm,
source_path=source_path,
video=video,
metadata=metadata,
scenes=scenes,
)
except Exception:
logger.warning("Batched SceneVLM failed, skipping visual understanding", exc_info=True)
samples: list[SceneAnalysisSample] = []
t_audio_total = 0.0
for index, scene in enumerate(scenes):
sample = SceneAnalysisSample(
scene_index=index,
start_second=float(scene.start),
end_second=float(scene.end),
start_frame=int(scene.start_frame),
end_frame=int(scene.end_frame),
caption=captions[index],
)
if audio_classifier is not None:
t0 = time.perf_counter()
try:
scene_clip: Video | None = None
if path_audio is None:
try:
scene_clip = self._load_scene_video_clip(
source_path=source_path,
video=video,
start_second=scene.start,
end_second=scene.end,
)
except Exception:
scene_clip = None
sample.audio_classification = self._run_scene_audio_classification(
audio_classifier=audio_classifier,
path_audio=path_audio,
scene_clip=scene_clip,
scene_start=scene.start,
scene_end=scene.end,
)
except Exception:
logger.warning(
"AudioClassifier failed for scene %d (%.1f-%.1fs)", index, scene.start, scene.end, exc_info=True
)
t_audio_total += time.perf_counter() - t0
samples.append(sample)
if audio_classifier is not None:
logger.info("AudioClassifier inference total: %.2fs across %d scenes", t_audio_total, len(scenes))
return SceneAnalysisSection(samples=samples)
def _run_scene_vlm_batched(
self,
*,
scene_vlm: SceneVLM,
source_path: Path | None,
video: Video | None,
metadata: VideoMetadata,
scenes: list[SceneBoundary],
) -> list[str | None]:
"""Extract frames for all scenes in one ffmpeg call, then caption each group.
Adjacent short scenes (< _SCENE_VLM_GROUP_THRESHOLD seconds) are merged
into a single VLM call to reduce per-call overhead.
"""
# Group adjacent short scenes to reduce VLM call count.
# Each group is a list of scene indices that share one VLM call.
groups: list[list[int]] = []
current_group: list[int] = []
current_group_duration = 0.0
for i, scene in enumerate(scenes):
dur = max(0.0, scene.end - scene.start)
if current_group and current_group_duration + dur > _SCENE_VLM_GROUP_THRESHOLD:
groups.append(current_group)
current_group = [i]
current_group_duration = dur
else:
current_group.append(i)
current_group_duration += dur
if current_group:
groups.append(current_group)
# Compute timestamps for each group (treating merged scenes as one span)
group_timestamps: list[list[float]] = []
all_timestamps: list[float] = []
for group in groups:
span_start = scenes[group[0]].start
span_end = scenes[group[-1]].end
duration = max(0.0, span_end - span_start)
frame_count = min(
_SCENE_VLM_MAX_FRAMES,
max(1, math.ceil(_SCENE_VLM_FRAME_SCALE * math.log(duration / _SCENE_VLM_FRAME_BASE + 1))),
)
timestamps = self._sample_timestamps(start_second=span_start, end_second=span_end, frame_count=frame_count)
group_timestamps.append(timestamps)
all_timestamps.extend(timestamps)
if not all_timestamps:
return [None] * len(scenes)
# Extract all frames in a single ffmpeg call
if source_path is not None:
all_frames_array = extract_frames_at_times(source_path, all_timestamps)
all_frames: list[np.ndarray | Image.Image] = list(all_frames_array)
else:
current_video = _require_video(video)
max_frame = max(len(current_video.frames) - 1, 0)
indices = [max(0, min(max_frame, int(ts * metadata.fps))) for ts in all_timestamps]
all_frames = [current_video.frames[idx] for idx in indices]
# Caption each group and assign to all scenes in that group
captions: list[str | None] = [None] * len(scenes)
offset = 0
for group, timestamps in zip(groups, group_timestamps):
frame_count = len(timestamps)
group_frames = all_frames[offset : offset + frame_count]
offset += frame_count
if not group_frames:
continue
caption: str | None = None
try:
caption = scene_vlm.analyze_scene(group_frames) or None
except Exception:
logger.warning(
"SceneVLM failed for scenes %d-%d (%.1f-%.1fs)",
group[0],
group[-1],
scenes[group[0]].start,
scenes[group[-1]].end,
exc_info=True,
)
caption = None
for i in group:
captions[i] = caption
logger.info("SceneVLM: %d groups from %d scenes", len(groups), len(scenes))
return captions
def _run_scene_audio_classification(
self,
*,
audio_classifier: AudioClassifier,
path_audio: Audio | None,
scene_clip: Video | None,
scene_start: float,
scene_end: float,
) -> AudioClassification | None:
if scene_end <= scene_start:
return None
if path_audio is not None:
scene_media: Audio | Video = path_audio.slice(start_seconds=scene_start, end_seconds=scene_end)
elif scene_clip is not None:
scene_media = scene_clip
else:
return None
classification = audio_classifier.classify(scene_media)
offset_events = [
AudioEvent(
start=scene_start + event.start,
end=scene_start + event.end,
label=event.label,
confidence=event.confidence,
)
for event in classification.events
]
return AudioClassification(events=offset_events, clip_predictions=classification.clip_predictions)
@staticmethod
def _sample_timestamps(*, start_second: float, end_second: float, frame_count: int) -> list[float]:
duration = max(0.0, end_second - start_second)
if duration <= 0.0:
return []
count = max(1, frame_count)
if count == 1:
return [start_second + (duration * 0.5)]
# Center samples inside the interval so we avoid exact boundaries.
step = duration / float(count + 1)
timestamps = [start_second + (step * (idx + 1)) for idx in range(count)]
epsilon = 1e-3
return [min(end_second - epsilon, max(start_second + epsilon, ts)) for ts in timestamps]
def _load_scene_video_clip(
self,
*,
source_path: Path | None,
video: Video | None,
start_second: float,
end_second: float,
) -> Video | None:
if end_second <= start_second:
return None
if source_path is not None:
return Video.from_path(str(source_path), start_second=start_second, end_second=end_second)
return _require_video(video).cut(start_second, end_second)
def _default_scene_boundaries(self, metadata: VideoMetadata) -> list[SceneBoundary]:
if metadata.total_seconds <= 0 or metadata.frame_count <= 0:
return []
return [
SceneBoundary(
start=0.0,
end=float(metadata.total_seconds),
start_frame=0,
end_frame=int(metadata.frame_count),
)
]
def _normalize_scene_boundaries(self, scenes: list[SceneBoundary], metadata: VideoMetadata) -> list[SceneBoundary]:
normalized: list[SceneBoundary] = []
max_time = float(metadata.total_seconds)
max_frame = int(metadata.frame_count)
for item in scenes:
start = max(0.0, min(max_time, float(item.start)))
end = max(0.0, min(max_time, float(item.end)))
if end <= start:
continue
start_frame = int(item.start_frame)
end_frame = int(item.end_frame)
start_frame = max(0, min(max_frame, start_frame))
end_frame = max(0, min(max_frame, end_frame))
if end_frame <= start_frame:
start_frame = int(round(start * metadata.fps))
end_frame = max(start_frame + 1, int(round(end * metadata.fps)))
start_frame = max(0, min(max_frame, start_frame))
end_frame = max(0, min(max_frame, end_frame))
if end_frame <= start_frame:
continue
normalized.append(
SceneBoundary(
start=round(start, 6),
end=round(end, 6),
start_frame=start_frame,
end_frame=end_frame,
)
)
normalized.sort(key=lambda scene: (scene.start, scene.end))
return normalized
def _build_source(
self,
*,
metadata: VideoMetadata,
path_obj: Path | None,
duration_seconds: float,
title_fallback: str | None,
) -> VideoAnalysisSource:
tags = self._extract_source_tags(path_obj) if path_obj else {}
creation_time = _normalize_creation_time(
next((tags[key] for key in _CREATION_TIME_TAG_KEYS if key in tags), None)
)
geo = _parse_geo_metadata(tags)
title = tags.get("title") or title_fallback
return VideoAnalysisSource(
title=title,
path=str(path_obj) if path_obj else None,
filename=path_obj.name if path_obj else None,
duration=duration_seconds,
fps=metadata.fps,
width=metadata.width,
height=metadata.height,
frame_count=metadata.frame_count,
creation_time=creation_time,
geo=geo,
raw_tags=tags or None,
)
def _extract_source_tags(self, path: Path | None) -> dict[str, str]:
if path is None:
return {}
cmd = [
"ffprobe",
"-v",
"error",
"-show_entries",
"format_tags:stream_tags",
"-of",
"json",
str(path),
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
payload = json.loads(result.stdout)
except Exception:
return {}
tags: dict[str, str] = {}
format_tags = payload.get("format", {}).get("tags", {})
if isinstance(format_tags, dict):
tags.update({str(k).lower(): str(v) for k, v in format_tags.items()})
for stream in payload.get("streams", []):
stream_tags = stream.get("tags", {})
if not isinstance(stream_tags, dict):
continue
for key, value in stream_tags.items():
lowered = str(key).lower()
tags.setdefault(lowered, str(value))
return tags