Skip to content

Editing Plans

VideoEdit is a multi-segment editing plan modeled as a Pydantic BaseModel. Each segment selects a time range from a source video and carries an ordered list of Operation instances to run against it.

At a Glance

  • One operations list per segment — transforms and effects are sequenced together.
  • post_operations runs against the concatenated result.
  • validate() is a dry-run via metadata; no frames are loaded.
  • run() returns a Video in memory; run_to_file() streams directly to disk.

Quick Start

from videopython.editing import VideoEdit

plan = {
    "segments": [
        {
            "source": "input.mp4",
            "start": 5.0,
            "end": 12.0,
            "operations": [
                {"op": "crop", "width": 0.5, "height": 1.0, "mode": "center"},
                {"op": "resize", "width": 1080, "height": 1920},
                {
                    "op": "blur_effect",
                    "mode": "constant",
                    "iterations": 1,
                    "window": {"start": 0.0, "stop": 1.0},
                },
            ],
        },
        {"source": "input.mp4", "start": 20.0, "end": 28.0},
    ],
    "post_operations": [
        {"op": "color_adjust", "brightness": 0.05},
    ],
}

edit = VideoEdit.from_dict(plan)
predicted = edit.validate()        # dry-run via VideoMetadata
video = edit.run()                  # in-memory
video.save("output.mp4")

# Or stream directly to file (constant memory, any video length):
edit.run_to_file("output.mp4", crf=20, preset="medium")

JSON Plan Format

{
  "segments": [
    {
      "source": "path/to/video.mp4",
      "start": 5.0,
      "end": 15.0,
      "operations": [
        {"op": "resize", "width": 1080, "height": 1920},
        {"op": "blur_effect", "mode": "constant", "iterations": 2,
         "window": {"start": 0.0, "stop": 3.0}}
      ]
    }
  ],
  "post_operations": [
    {"op": "color_adjust", "brightness": 0.05}
  ],
  "match_to_lowest_fps": true,
  "match_to_lowest_resolution": true
}

Rules:

  • segments is required and must be non-empty.
  • Each op object has an op discriminator field; remaining fields belong to that op's Pydantic schema. Unknown fields are rejected.
  • Effect time windows go in the op's window field ({"start": s, "stop": e}). Either endpoint may be omitted.
  • Top-level and segment-level keys are strict (extra="forbid").

Pipeline Order

VideoEdit runs each segment's operations in order, concatenates the results, then applies post_operations to the assembled output.

Streaming Mode (run_to_file)

run_to_file() pipes ffmpeg decode → per-frame effect chain → ffmpeg encode, keeping memory constant (~250 MB) regardless of video length.

Each operation contributes either a ffmpeg -vf filter (op.to_ffmpeg_filter(ctx)) or a streaming Effect (op.streamable == True plus process_frame). If any operation is not streamable, run_to_file falls back to eager (run() + save()).

Streamable transforms: resize, crop, resample_fps. Streamable effects: every Effect except add_subtitles.

Context Data

Operations that need side-channel data (e.g. silence_removal and add_subtitles need a transcription) declare it via requires: ClassVar[tuple[str, ...]]. The runner picks matching keys out of the context dict and threads them into apply / predict_metadata:

edit = VideoEdit.from_dict(plan)
video = edit.run(context={"transcription": my_transcription})

Validation

VideoEdit.validate() chains Operation.predict_metadata across the plan and checks:

  • segment end is within source duration
  • each operation's metadata prediction succeeds
  • effect window is within the predicted segment duration
  • concatenation compatibility (exact fps + dimensions)

Returns the predicted final VideoMetadata. Raises ValueError on failure.

For dry-run validation without disk access, pass a pre-built VideoMetadata to validate_with_metadata(meta_or_dict, context=...).

Matching Sources

When multiple segments draw from sources with different fps/resolution, VideoEdit auto-matches:

  • match_to_lowest_fps (default true) — resample all segments to the lowest source fps.
  • match_to_lowest_resolution (default true) — resize all segments to the lowest source resolution.

Set either flag to false to require sources match natively; otherwise validate() / run() raises.

JSON Schema (json_schema)

VideoEdit.json_schema() returns a JSON Schema for the wire format, including the discriminated union over every registered Operation. Pass it to any LLM API as a tool/function schema or structured-output format. AI operations appear in the union only after import videopython.ai has run.

schema = VideoEdit.json_schema()
# tools=[{"input_schema": schema}]            # Anthropic
# tools=[{"type": "function", "function": {"parameters": schema}}]  # OpenAI

API Reference

VideoEdit

VideoEdit

Bases: BaseModel

A multi-segment editing plan.

Source code in src/videopython/editing/video_edit.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
class VideoEdit(BaseModel):
    """A multi-segment editing plan."""

    model_config = ConfigDict(extra="forbid")

    segments: list[SegmentConfig] = Field(
        min_length=1,
        description=(
            "Ordered list of segments. Each segment selects a time range from a "
            "source video and applies its `operations` to it; results are "
            "concatenated in order."
        ),
    )
    post_operations: list[OperationInput] = Field(
        default_factory=list,
        description="Operations applied to the concatenated output after all segments are joined.",
    )
    match_to_lowest_fps: bool = Field(
        True,
        description=(
            "When concatenating multiple segments with different fps, resample "
            "all of them to the lowest source fps. If false, mismatched fps "
            "raises during validation."
        ),
    )
    match_to_lowest_resolution: bool = Field(
        True,
        description=(
            "When concatenating multiple segments with different resolutions, "
            "resize all of them to the lowest source resolution. If false, "
            "mismatched dimensions raise during validation."
        ),
    )

    # ------------------------------------------------------------------ I/O

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> VideoEdit:
        return cls.model_validate(data)

    @classmethod
    def from_json(cls, text: str) -> VideoEdit:
        try:
            data = json.loads(text)
        except json.JSONDecodeError as e:
            raise ValueError(f"Invalid VideoEdit JSON: {e.msg} at line {e.lineno} column {e.colno}") from e
        return cls.model_validate(data)

    def to_dict(self) -> dict[str, Any]:
        return self.model_dump(mode="json", exclude_none=False)

    @classmethod
    def json_schema(cls) -> dict[str, Any]:
        """LLM-facing schema: a discriminated union of operations per slot.

        Field descriptions are pulled from the corresponding Pydantic
        ``Field(description=...)`` declarations on ``VideoEdit`` and
        ``SegmentConfig`` so the hand-rolled schema stays in sync with the
        models without needing to repeat the docstrings here.
        """
        op_schema = Operation.json_schema()

        def _desc(model: type[BaseModel], field_name: str) -> str:
            return model.model_fields[field_name].description or ""

        segment_schema: dict[str, Any] = {
            "type": "object",
            "description": SegmentConfig.__doc__,
            "properties": {
                "source": {"type": "string", "description": _desc(SegmentConfig, "source")},
                "start": {"type": "number", "minimum": 0, "description": _desc(SegmentConfig, "start")},
                "end": {"type": "number", "minimum": 0, "description": _desc(SegmentConfig, "end")},
                "operations": {
                    "type": "array",
                    "items": op_schema,
                    "default": [],
                    "description": _desc(SegmentConfig, "operations"),
                },
            },
            "required": ["source", "start", "end"],
            "additionalProperties": False,
        }
        return {
            "$schema": "http://json-schema.org/draft-07/schema#",
            "type": "object",
            "description": cls.__doc__,
            "properties": {
                "segments": {
                    "type": "array",
                    "items": segment_schema,
                    "minItems": 1,
                    "description": _desc(cls, "segments"),
                },
                "post_operations": {
                    "type": "array",
                    "items": op_schema,
                    "default": [],
                    "description": _desc(cls, "post_operations"),
                },
                "match_to_lowest_fps": {
                    "type": "boolean",
                    "default": True,
                    "description": _desc(cls, "match_to_lowest_fps"),
                },
                "match_to_lowest_resolution": {
                    "type": "boolean",
                    "default": True,
                    "description": _desc(cls, "match_to_lowest_resolution"),
                },
            },
            "required": ["segments"],
            "additionalProperties": False,
        }

    # --------------------------------------------------------------- validate

    def validate(self, context: dict[str, Any] | None = None) -> VideoMetadata:  # type: ignore[override]
        """Dry-run the plan via metadata. Requires source files on disk.

        Shadows Pydantic v1's deprecated ``BaseModel.validate`` classmethod;
        use ``VideoEdit.from_dict``/``model_validate`` for plan parsing.
        """
        source_metas = [VideoMetadata.from_path(str(seg.source)) for seg in self.segments]
        return self._validate(source_metas, context)

    def validate_with_metadata(
        self,
        source_metadata: VideoMetadata | dict[str, VideoMetadata],
        context: dict[str, Any] | None = None,
    ) -> VideoMetadata:
        """Dry-run with pre-built metadata, avoiding disk access."""
        if isinstance(source_metadata, VideoMetadata):
            metas = [source_metadata for _ in self.segments]
        else:
            metas = []
            for i, seg in enumerate(self.segments):
                key = str(seg.source)
                if key not in source_metadata:
                    available = sorted(source_metadata)
                    raise ValueError(f"Segment {i}: no metadata for '{key}'. Available: {available}")
                metas.append(source_metadata[key])
        return self._validate(metas, context)

    def _assert_post_ops_supported(self, context: dict[str, Any] | None) -> None:
        """Reject post_operations needing time-based context on a multi-segment plan.

        ``post_operations`` run on the assembled, concatenated timeline. A
        source-absolute context value (e.g. a ``Transcription``) cannot be
        re-based across a multi-segment concat, and passing the raw value would
        silently mis-time the op (subtitles/silence-removal against the wrong
        timeline). Fail fast with an actionable message instead of producing a
        wrong render. Single-segment plans are unaffected -- their concatenated
        timeline is just the one segment's, handled by ``_segment_context``.
        """
        if len(self.segments) <= 1 or not self.post_operations:
            return
        rebaseable = _rebaseable_keys(context)
        if not rebaseable:
            return
        for op in self.post_operations:
            clash = sorted(set(op.requires) & rebaseable)
            if clash:
                raise ValueError(
                    f"post_operation '{op.op}' requires time-based context {clash}, but the plan "
                    f"has {len(self.segments)} segments. post_operations run on the concatenated "
                    "timeline and time-based context is not re-based across a multi-segment concat. "
                    f"Move '{op.op}' into a segment, or use a single-segment plan."
                )

    def _validate(
        self,
        source_metas: list[VideoMetadata],
        context: dict[str, Any] | None,
    ) -> VideoMetadata:
        self._assert_post_ops_supported(context)
        cut_metas: list[VideoMetadata] = []
        for i, (seg, meta) in enumerate(zip(self.segments, source_metas)):
            if seg.end > meta.total_seconds + 1e-3:
                raise ValueError(f"Segment {i}: end ({seg.end}) exceeds source duration ({meta.total_seconds}s)")
            cut_metas.append(CutSeconds(start=seg.start, end=seg.end).predict_metadata(meta))

        matched = self._apply_matching(cut_metas)
        segment_outputs = [
            self._predict_segment(i, seg, meta, context) for i, (seg, meta) in enumerate(zip(self.segments, matched))
        ]
        self._assert_concat_compatible(segment_outputs)

        first = segment_outputs[0]
        assembled = VideoMetadata(
            height=first.height,
            width=first.width,
            fps=first.fps,
            frame_count=sum(m.frame_count for m in segment_outputs),
            total_seconds=round(sum(m.total_seconds for m in segment_outputs), 4),
        )
        for op in self.post_operations:
            _validate_effect_window(op, assembled.total_seconds)
            assembled = _predict_with_context(op, assembled, context)
        return assembled

    def _predict_segment(
        self,
        index: int,
        segment: SegmentConfig,
        meta: VideoMetadata,
        context: dict[str, Any] | None,
    ) -> VideoMetadata:
        seg_context = _segment_context(context, segment.start, segment.end)
        for op in segment.operations:
            _validate_effect_window(op, meta.total_seconds)
            try:
                meta = _predict_with_context(op, meta, seg_context)
            except (ValueError, TypeError) as e:
                raise ValueError(f"Segment {index}: metadata prediction failed for '{op.op}': {e}") from e
        return meta

    def _apply_matching(self, metas: list[VideoMetadata]) -> list[VideoMetadata]:
        if len(metas) <= 1:
            return metas
        result = metas
        if self.match_to_lowest_fps:
            min_fps = min(m.fps for m in result)
            result = [m.with_fps(min_fps) if m.fps != min_fps else m for m in result]
        if self.match_to_lowest_resolution:
            min_w = min(m.width for m in result)
            min_h = min(m.height for m in result)
            result = [m.with_dimensions(min_w, min_h) if (m.width, m.height) != (min_w, min_h) else m for m in result]
        return result

    @staticmethod
    def _assert_concat_compatible(metas: list[VideoMetadata]) -> None:
        if len(metas) <= 1:
            return
        first = metas[0]
        for j, other in enumerate(metas[1:], start=1):
            if first.fps != other.fps:
                raise ValueError(
                    f"Segment 0 fps ({first.fps}) != segment {j} fps ({other.fps}); "
                    "all segments must share fps for concatenation."
                )
            if (first.width, first.height) != (other.width, other.height):
                raise ValueError(
                    f"Segment 0 dimensions ({first.width}x{first.height}) != "
                    f"segment {j} ({other.width}x{other.height}); all segments must share dimensions."
                )

    # -------------------------------------------------------------------- run

    def run(self, context: dict[str, Any] | None = None) -> Video:
        """Execute the plan in memory and return the final ``Video``."""
        self._assert_post_ops_supported(context)
        target_fps, target_w, target_h = self._matching_targets_from_disk()
        videos = [
            segment.process(segment.load(fps=target_fps, width=target_w, height=target_h), context)
            for segment in self.segments
        ]
        result = videos[0]
        for video in videos[1:]:
            result = result + video
        for op in self.post_operations:
            result = _apply_with_context(op, result, context)
        return result

    def run_to_file(
        self,
        output_path: str | Path,
        format: ALLOWED_VIDEO_FORMATS = "mp4",
        preset: ALLOWED_VIDEO_PRESETS = "medium",
        crf: int = 23,
        context: dict[str, Any] | None = None,
    ) -> Path:
        """Execute the plan, streaming directly to a file when possible.

        Falls back to eager (``self.run().save(...)``) for any operation that
        isn't streamable. Memory usage is O(1) w.r.t. video length for fully
        streamable pipelines.
        """
        self._assert_post_ops_supported(context)
        output_path = Path(output_path).with_suffix(f".{format}")
        output_path.parent.mkdir(parents=True, exist_ok=True)

        target_fps, target_w, target_h = self._matching_targets_from_disk()
        plans: list[StreamingSegmentPlan] = []
        for segment in self.segments:
            plan = self._build_streaming_plan(segment, target_fps, target_w, target_h)
            if plan is None:
                return self._run_to_file_eager(output_path, format, preset, crf, context)
            plans.append(plan)

        # Post-ops only fold cleanly into a single segment plan; multi-segment
        # post-ops would need a second pass we don't bother with.
        if self.post_operations and len(plans) != 1:
            return self._run_to_file_eager(output_path, format, preset, crf, context)
        if self.post_operations:
            plan = plans[0]
            total_frames = round((plan.end_second - plan.start_second) * plan.output_fps)
            for op in self.post_operations:
                if op.requires:
                    # Same reason as the per-segment guard: no runtime context
                    # in the streaming path. (Multi-segment + requires already
                    # raised by _assert_post_ops_supported.)
                    return self._run_to_file_eager(output_path, format, preset, crf, context)
                if not isinstance(op, Effect) or not op.streamable:
                    return self._run_to_file_eager(output_path, format, preset, crf, context)
                start_f, end_f = _effect_frame_range(op, plan.output_fps, total_frames)
                plan.effect_schedule.append(EffectScheduleEntry(op, start_f, end_f))

        if len(plans) == 1:
            plan = plans[0]
            audio = self._load_segment_audio(self.segments[0], plan)
            return stream_segment(plan, output_path, audio=audio, format=format, preset=preset, crf=crf)

        temp_files: list[Path] = []
        try:
            for segment, plan in zip(self.segments, plans):
                tmp = tempfile.NamedTemporaryFile(suffix=f".{format}", delete=False)
                tmp.close()
                audio = self._load_segment_audio(segment, plan)
                stream_segment(plan, Path(tmp.name), audio=audio, format=format, preset=preset, crf=crf)
                temp_files.append(Path(tmp.name))
            return concat_files(temp_files, output_path)
        finally:
            for f in temp_files:
                f.unlink(missing_ok=True)

    def _run_to_file_eager(
        self,
        output_path: Path,
        format: ALLOWED_VIDEO_FORMATS,
        preset: ALLOWED_VIDEO_PRESETS,
        crf: int,
        context: dict[str, Any] | None,
    ) -> Path:
        video = self.run(context=context)
        return video.save(output_path, format=format, preset=preset, crf=crf)

    # ----------------------------------------------------------------- helpers

    def _matching_targets_from_disk(self) -> tuple[float | None, int | None, int | None]:
        if len(self.segments) <= 1 or (not self.match_to_lowest_fps and not self.match_to_lowest_resolution):
            return None, None, None
        metas = [VideoMetadata.from_path(str(seg.source)) for seg in self.segments]
        fps = min(m.fps for m in metas) if self.match_to_lowest_fps else None
        w = min(m.width for m in metas) if self.match_to_lowest_resolution else None
        h = min(m.height for m in metas) if self.match_to_lowest_resolution else None
        return fps, w, h

    def _build_streaming_plan(
        self,
        segment: SegmentConfig,
        target_fps: float | None,
        target_w: int | None,
        target_h: int | None,
    ) -> StreamingSegmentPlan | None:
        source_meta = VideoMetadata.from_path(str(segment.source))
        out_fps = target_fps or source_meta.fps
        out_w = target_w or source_meta.width
        out_h = target_h or source_meta.height

        vf_filters: list[str] = []
        if target_w and target_h and (target_w != source_meta.width or target_h != source_meta.height):
            vf_filters.append(f"scale={target_w}:{target_h}")
        if target_fps and target_fps != source_meta.fps:
            vf_filters.append(f"fps={target_fps}")

        effect_schedule: list[EffectScheduleEntry] = []
        for op in segment.operations:
            if op.requires:
                # Streaming schedules effects by frame range with no runtime
                # context, so it can't supply -- let alone re-base onto the
                # segment's local timeline -- anything an op `requires`. Defer
                # to the eager path, where _segment_context handles re-basing.
                return None
            if isinstance(op, Effect):
                if not op.streamable:
                    return None
                total_frames = round(segment.duration * out_fps)
                start_f, end_f = _effect_frame_range(op, out_fps, total_frames)
                effect_schedule.append(EffectScheduleEntry(op, start_f, end_f))
                continue
            # Non-effect transform: compile to ffmpeg filter if streamable.
            ctx = FilterCtx(width=out_w, height=out_h, fps=out_fps)
            filter_expr = op.to_ffmpeg_filter(ctx)
            if filter_expr is None:
                return None
            vf_filters.append(filter_expr)
            new_meta = op.predict_metadata(
                VideoMetadata(height=out_h, width=out_w, fps=out_fps, frame_count=1, total_seconds=1.0)
            )
            out_w, out_h, out_fps = new_meta.width, new_meta.height, new_meta.fps

        return StreamingSegmentPlan(
            source_path=segment.source,
            start_second=segment.start,
            end_second=segment.end,
            output_fps=out_fps,
            output_width=out_w,
            output_height=out_h,
            vf_filters=vf_filters,
            effect_schedule=effect_schedule,
        )

    def _load_segment_audio(
        self,
        segment: SegmentConfig,
        plan: StreamingSegmentPlan,
    ) -> Audio | None:
        try:
            audio = Audio.from_path(str(segment.source))
            audio = audio.slice(segment.start, segment.end)
        except (AudioLoadError, FileNotFoundError, subprocess.CalledProcessError):
            warnings.warn(f"No audio found for `{segment.source}`, using silent track.")
            audio = Audio.create_silent(duration_seconds=round(segment.duration, 2), stereo=True, sample_rate=44100)

        for entry in plan.effect_schedule:
            effect = entry.effect
            if isinstance(effect, (Fade, VolumeAdjust)) and not audio.is_silent:
                start_s = entry.start_frame / plan.output_fps
                stop_s = entry.end_frame / plan.output_fps
                effect._apply_audio(audio, start_s, stop_s)

        return audio

json_schema classmethod

json_schema() -> dict[str, Any]

LLM-facing schema: a discriminated union of operations per slot.

Field descriptions are pulled from the corresponding Pydantic Field(description=...) declarations on VideoEdit and SegmentConfig so the hand-rolled schema stays in sync with the models without needing to repeat the docstrings here.

Source code in src/videopython/editing/video_edit.py
@classmethod
def json_schema(cls) -> dict[str, Any]:
    """LLM-facing schema: a discriminated union of operations per slot.

    Field descriptions are pulled from the corresponding Pydantic
    ``Field(description=...)`` declarations on ``VideoEdit`` and
    ``SegmentConfig`` so the hand-rolled schema stays in sync with the
    models without needing to repeat the docstrings here.
    """
    op_schema = Operation.json_schema()

    def _desc(model: type[BaseModel], field_name: str) -> str:
        return model.model_fields[field_name].description or ""

    segment_schema: dict[str, Any] = {
        "type": "object",
        "description": SegmentConfig.__doc__,
        "properties": {
            "source": {"type": "string", "description": _desc(SegmentConfig, "source")},
            "start": {"type": "number", "minimum": 0, "description": _desc(SegmentConfig, "start")},
            "end": {"type": "number", "minimum": 0, "description": _desc(SegmentConfig, "end")},
            "operations": {
                "type": "array",
                "items": op_schema,
                "default": [],
                "description": _desc(SegmentConfig, "operations"),
            },
        },
        "required": ["source", "start", "end"],
        "additionalProperties": False,
    }
    return {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "type": "object",
        "description": cls.__doc__,
        "properties": {
            "segments": {
                "type": "array",
                "items": segment_schema,
                "minItems": 1,
                "description": _desc(cls, "segments"),
            },
            "post_operations": {
                "type": "array",
                "items": op_schema,
                "default": [],
                "description": _desc(cls, "post_operations"),
            },
            "match_to_lowest_fps": {
                "type": "boolean",
                "default": True,
                "description": _desc(cls, "match_to_lowest_fps"),
            },
            "match_to_lowest_resolution": {
                "type": "boolean",
                "default": True,
                "description": _desc(cls, "match_to_lowest_resolution"),
            },
        },
        "required": ["segments"],
        "additionalProperties": False,
    }

validate

validate(
    context: dict[str, Any] | None = None,
) -> VideoMetadata

Dry-run the plan via metadata. Requires source files on disk.

Shadows Pydantic v1's deprecated BaseModel.validate classmethod; use VideoEdit.from_dict/model_validate for plan parsing.

Source code in src/videopython/editing/video_edit.py
def validate(self, context: dict[str, Any] | None = None) -> VideoMetadata:  # type: ignore[override]
    """Dry-run the plan via metadata. Requires source files on disk.

    Shadows Pydantic v1's deprecated ``BaseModel.validate`` classmethod;
    use ``VideoEdit.from_dict``/``model_validate`` for plan parsing.
    """
    source_metas = [VideoMetadata.from_path(str(seg.source)) for seg in self.segments]
    return self._validate(source_metas, context)

validate_with_metadata

validate_with_metadata(
    source_metadata: VideoMetadata
    | dict[str, VideoMetadata],
    context: dict[str, Any] | None = None,
) -> VideoMetadata

Dry-run with pre-built metadata, avoiding disk access.

Source code in src/videopython/editing/video_edit.py
def validate_with_metadata(
    self,
    source_metadata: VideoMetadata | dict[str, VideoMetadata],
    context: dict[str, Any] | None = None,
) -> VideoMetadata:
    """Dry-run with pre-built metadata, avoiding disk access."""
    if isinstance(source_metadata, VideoMetadata):
        metas = [source_metadata for _ in self.segments]
    else:
        metas = []
        for i, seg in enumerate(self.segments):
            key = str(seg.source)
            if key not in source_metadata:
                available = sorted(source_metadata)
                raise ValueError(f"Segment {i}: no metadata for '{key}'. Available: {available}")
            metas.append(source_metadata[key])
    return self._validate(metas, context)

run

run(context: dict[str, Any] | None = None) -> Video

Execute the plan in memory and return the final Video.

Source code in src/videopython/editing/video_edit.py
def run(self, context: dict[str, Any] | None = None) -> Video:
    """Execute the plan in memory and return the final ``Video``."""
    self._assert_post_ops_supported(context)
    target_fps, target_w, target_h = self._matching_targets_from_disk()
    videos = [
        segment.process(segment.load(fps=target_fps, width=target_w, height=target_h), context)
        for segment in self.segments
    ]
    result = videos[0]
    for video in videos[1:]:
        result = result + video
    for op in self.post_operations:
        result = _apply_with_context(op, result, context)
    return result

run_to_file

run_to_file(
    output_path: str | Path,
    format: ALLOWED_VIDEO_FORMATS = "mp4",
    preset: ALLOWED_VIDEO_PRESETS = "medium",
    crf: int = 23,
    context: dict[str, Any] | None = None,
) -> Path

Execute the plan, streaming directly to a file when possible.

Falls back to eager (self.run().save(...)) for any operation that isn't streamable. Memory usage is O(1) w.r.t. video length for fully streamable pipelines.

Source code in src/videopython/editing/video_edit.py
def run_to_file(
    self,
    output_path: str | Path,
    format: ALLOWED_VIDEO_FORMATS = "mp4",
    preset: ALLOWED_VIDEO_PRESETS = "medium",
    crf: int = 23,
    context: dict[str, Any] | None = None,
) -> Path:
    """Execute the plan, streaming directly to a file when possible.

    Falls back to eager (``self.run().save(...)``) for any operation that
    isn't streamable. Memory usage is O(1) w.r.t. video length for fully
    streamable pipelines.
    """
    self._assert_post_ops_supported(context)
    output_path = Path(output_path).with_suffix(f".{format}")
    output_path.parent.mkdir(parents=True, exist_ok=True)

    target_fps, target_w, target_h = self._matching_targets_from_disk()
    plans: list[StreamingSegmentPlan] = []
    for segment in self.segments:
        plan = self._build_streaming_plan(segment, target_fps, target_w, target_h)
        if plan is None:
            return self._run_to_file_eager(output_path, format, preset, crf, context)
        plans.append(plan)

    # Post-ops only fold cleanly into a single segment plan; multi-segment
    # post-ops would need a second pass we don't bother with.
    if self.post_operations and len(plans) != 1:
        return self._run_to_file_eager(output_path, format, preset, crf, context)
    if self.post_operations:
        plan = plans[0]
        total_frames = round((plan.end_second - plan.start_second) * plan.output_fps)
        for op in self.post_operations:
            if op.requires:
                # Same reason as the per-segment guard: no runtime context
                # in the streaming path. (Multi-segment + requires already
                # raised by _assert_post_ops_supported.)
                return self._run_to_file_eager(output_path, format, preset, crf, context)
            if not isinstance(op, Effect) or not op.streamable:
                return self._run_to_file_eager(output_path, format, preset, crf, context)
            start_f, end_f = _effect_frame_range(op, plan.output_fps, total_frames)
            plan.effect_schedule.append(EffectScheduleEntry(op, start_f, end_f))

    if len(plans) == 1:
        plan = plans[0]
        audio = self._load_segment_audio(self.segments[0], plan)
        return stream_segment(plan, output_path, audio=audio, format=format, preset=preset, crf=crf)

    temp_files: list[Path] = []
    try:
        for segment, plan in zip(self.segments, plans):
            tmp = tempfile.NamedTemporaryFile(suffix=f".{format}", delete=False)
            tmp.close()
            audio = self._load_segment_audio(segment, plan)
            stream_segment(plan, Path(tmp.name), audio=audio, format=format, preset=preset, crf=crf)
            temp_files.append(Path(tmp.name))
        return concat_files(temp_files, output_path)
    finally:
        for f in temp_files:
            f.unlink(missing_ok=True)

SegmentConfig

SegmentConfig is exported, but most users should construct plans via VideoEdit.from_dict(...) / VideoEdit.from_json(...).

SegmentConfig

Bases: BaseModel

A single source segment with its operation chain.

Source code in src/videopython/editing/video_edit.py
class SegmentConfig(BaseModel):
    """A single source segment with its operation chain."""

    model_config = ConfigDict(extra="forbid")

    source: Path = Field(description="Path to the source video file.")
    start: float = Field(ge=0, description="Segment start time in seconds.")
    end: float = Field(ge=0, description="Segment end time in seconds.")
    operations: list[OperationInput] = Field(
        default_factory=list,
        description=(
            "Ordered list of operations to run against this segment. "
            "Each item is an Operation discriminated by its `op` field."
        ),
    )

    @model_validator(mode="after")
    def _validate_range(self) -> SegmentConfig:
        if self.end <= self.start:
            raise ValueError(f"end ({self.end}) must be greater than start ({self.start})")
        return self

    @property
    def duration(self) -> float:
        return self.end - self.start

    def load(
        self,
        fps: float | None = None,
        width: int | None = None,
        height: int | None = None,
    ) -> Video:
        """Load the raw segment from disk with optional decode-time matching."""
        return Video.from_path(
            str(self.source),
            start_second=self.start,
            end_second=self.end,
            fps=fps,
            width=width,
            height=height,
        )

    def process(self, video: Video, context: dict[str, Any] | None = None) -> Video:
        """Apply every operation in this segment to ``video`` in order.

        Time-based context (e.g. ``transcription``) is re-based onto this
        segment's 0-based local timeline before any operation sees it.
        """
        seg_context = _segment_context(context, self.start, self.end)
        for op in self.operations:
            video = _apply_with_context(op, video, seg_context)
        return video

load

load(
    fps: float | None = None,
    width: int | None = None,
    height: int | None = None,
) -> Video

Load the raw segment from disk with optional decode-time matching.

Source code in src/videopython/editing/video_edit.py
def load(
    self,
    fps: float | None = None,
    width: int | None = None,
    height: int | None = None,
) -> Video:
    """Load the raw segment from disk with optional decode-time matching."""
    return Video.from_path(
        str(self.source),
        start_second=self.start,
        end_second=self.end,
        fps=fps,
        width=width,
        height=height,
    )

process

process(
    video: Video, context: dict[str, Any] | None = None
) -> Video

Apply every operation in this segment to video in order.

Time-based context (e.g. transcription) is re-based onto this segment's 0-based local timeline before any operation sees it.

Source code in src/videopython/editing/video_edit.py
def process(self, video: Video, context: dict[str, Any] | None = None) -> Video:
    """Apply every operation in this segment to ``video`` in order.

    Time-based context (e.g. ``transcription``) is re-based onto this
    segment's 0-based local timeline before any operation sees it.
    """
    seg_context = _segment_context(context, self.start, self.end)
    for op in self.operations:
        video = _apply_with_context(op, video, seg_context)
    return video