Skip to content

Editing Plans

VideoEdit represents a complete multi-segment editing plan:

  1. Extract one or more segments from source videos
  2. Apply per-segment transforms, then effects
  3. Concatenate processed segments
  4. Apply post-assembly transforms, then effects

This is the recommended API for JSON/LLM-generated editing plans.

At a Glance

  • Use segments[*].transforms for transforms and segments[*].effects for effects
  • Use post_transforms for transforms after concatenation
  • Use post_effects for effects after concatenation (not post_transforms)
  • Validate first with edit.validate() before edit.run() when plans are generated dynamically

Quick Start

from videopython.editing import VideoEdit

plan = {
    "segments": [
        {
            "source": "input.mp4",
            "start": 5.0,
            "end": 12.0,
            "transforms": [
                {"op": "crop", "args": {"width": 0.5, "height": 1.0, "mode": "center"}},
                {"op": "resize", "args": {"width": 1080, "height": 1920}},
            ],
            "effects": [
                {"op": "blur", "args": {"mode": "constant", "iterations": 1}, "apply": {"start": 0.0, "stop": 1.0}}
            ],
        },
        {
            "source": "input.mp4",
            "start": 20.0,
            "end": 28.0,
        },
    ],
    "post_effects": [
        {"op": "color_adjust", "args": {"brightness": 0.05}}
    ],
}

edit = VideoEdit.from_dict(plan)

# Dry-run validation using VideoMetadata (no frame loading)
predicted = edit.validate()
print(predicted)

video = edit.run()
video.save("output.mp4")

# Or stream directly to file (constant memory, any video length):
edit.run_to_file("output.mp4", crf=20, preset="medium")

Streaming Mode (run_to_file)

run_to_file() streams frames one at a time from ffmpeg decode through per-frame effect processing to ffmpeg encode. Memory usage is constant (~250 MB) regardless of video length.

edit = VideoEdit.from_dict(plan)
edit.run_to_file("output.mp4", format="mp4", preset="medium", crf=20)

When all operations in the plan are streamable, frames are never fully loaded into memory. If any operation is not streamable, it falls back to the eager path (run() + save()) automatically.

Streamable operations (check x-streamable in json_schema() output):

  • Transforms: resize, crop, resample_fps
  • Effects: color_adjust, blur_effect, zoom_effect, vignette, ken_burns, fade, full_image_overlay, text_overlay, volume_adjust

Non-streamable (triggers eager fallback):

  • Transforms: cut, cut_frames, speed_change, reverse, freeze_frame, picture_in_picture, silence_removal
  • Post-transforms on multi-segment plans

Use run() + save() when you need to inspect or modify the result in Python. Use run_to_file() for production pipelines processing long videos.

JSON Plan Format

Top-level shape:

{
  "segments": [
    {
      "source": "path/to/video.mp4",
      "start": 5.0,
      "end": 15.0,
      "transforms": [
        {"op": "crop", "args": {"width": 1080, "height": 1920}}
      ],
      "effects": [
        {"op": "blur_effect", "args": {"mode": "constant", "iterations": 2}, "apply": {"start": 0.0, "stop": 3.0}}
      ]
    }
  ],
  "post_transforms": [
    {"op": "resize", "args": {"width": 1080, "height": 1920}}
  ],
  "post_effects": [
    {"op": "color_adjust", "args": {"brightness": 0.05}}
  ]
}

Notes:

  • segments is required and must be non-empty.
  • post_transforms and post_effects are optional.
  • post_transforms accepts only transform operations.
  • post_effects accepts only effect operations.
  • Segment keys are strict (source, start, end, transforms, effects).
  • Step keys are strict:
  • transform step: op, optional args
  • effect step: op, optional args, optional apply
  • Unknown top-level keys are ignored for forward compatibility.

Context Data

Some operations need side-channel data that shouldn't be part of the JSON plan (e.g. transcription for silence_removal). Pass it via the context parameter:

from videopython.editing import VideoEdit

edit = VideoEdit.from_dict(plan)
video = edit.run(context={"transcription": my_transcription})

Operations whose registry spec has the requires_transcript tag automatically receive context["transcription"] as a keyword argument. Other operations are unaffected.

Pipeline Order (Enforced)

VideoEdit always runs operations in this order:

  • Per segment:
  • transforms (in order)
  • effects (in order)
  • After concatenation:
  • post transforms (in order)
  • post effects (in order)

Callers do not control transform/effect interleaving. The model enforces this discipline.

Effect Time Semantics

  • Segment effect apply.start / apply.stop are relative to the segment timeline (segment starts at 0).
  • Post effect apply.start / apply.stop are relative to the assembled output timeline.

Validation and Compatibility Checks

VideoEdit.validate() performs a dry run using VideoMetadata:

  • segment time bounds (start, end)
  • transform metadata prediction (for transforms with registered metadata_method)
  • effect time bounds
  • concatenation compatibility (exact fps, exact dimensions)

Validation returns the predicted final VideoMetadata on success and raises ValueError on invalid plans.

Validation behavior notes:

  • cut metadata prediction mirrors runtime rounded frame slicing semantics (fractional seconds are rounded to frames).
  • crop metadata prediction mirrors runtime crop slicing behavior, including odd-size center crops and edge clipping.

JSON Parsing Behavior

Alias normalization

Input aliases are accepted (for example blur), but:

  • VideoEdit.to_dict() emits canonical operation IDs (for example blur_effect)
  • VideoEdit.json_schema() lists canonical operation IDs only

Common parser constraints

  • resize requires at least one non-null dimension (width or height)
  • valid: {"op": "resize", "args": {"width": 320}}
  • valid: {"op": "resize", "args": {"height": 180}}
  • invalid: {"op": "resize"}
  • invalid: {"op": "resize", "args": {"width": null, "height": null}}

Unsupported operations in JSON plans

The parser rejects operations that are not supported in VideoEdit JSON plans, including:

  • transitions (fade_transition, blur_transition, ...)
  • multi-source operations (picture_in_picture, split_screen, ...)
  • registered operations that are not JSON-instantiable because required constructor args are excluded from registry specs (for example ken_burns, full_image_overlay)

AI operations and lazy registration

AI operation specs are registered only after importing videopython.ai.

If a plan references AI ops (for example face_crop), import AI first:

import videopython.ai  # registers AI ops
from videopython.editing import VideoEdit

edit = VideoEdit.from_dict(plan)

videopython.base does not auto-import AI modules.

Schema Generation (json_schema)

Use VideoEdit.json_schema() to get a parser-aligned JSON Schema for the current registry state. The schema is designed to be passed directly to LLM APIs as a tool definition or structured-output format.

from videopython.editing import VideoEdit

schema = VideoEdit.json_schema()
print(schema["properties"]["segments"]["minItems"])  # 1

Using the schema with LLMs

The schema encodes all structural rules - valid operation IDs, required fields, parameter types, and value constraints - so the LLM does not need to learn them from examples:

from videopython.editing import VideoEdit

schema = VideoEdit.json_schema()

# Pass as a tool/function schema to any LLM API:
# - OpenAI: tools=[{"type": "function", "function": {"parameters": schema}}]
# - Anthropic: tools=[{"input_schema": schema}]
# - Any structured-output API that accepts JSON Schema

For complete examples with OpenAI and Anthropic APIs, see the LLM Integration Guide.

Schema properties

  • Built dynamically from the operation registry
  • Canonical op IDs only (aliases omitted)
  • Excludes unsupported categories/tags/non-JSON-instantiable ops
  • Reflects current registration state (AI ops appear only if videopython.ai was imported)
  • Encodes parser-aligned constraints (for example resize requires at least one non-null dimension)
  • Includes rich value constraints (minimum, maximum, exclusive_minimum, enum) for all parameters
  • Operations compatible with run_to_file() streaming are marked with "x-streamable": true

Serialization (to_dict)

VideoEdit.to_dict() returns a canonical JSON-ready dict:

  • canonical op IDs
  • deep-copied step args / apply args
  • stable output even if live operation instances are mutated after parsing

Multicam Editing (MultiCamEdit)

MultiCamEdit is for podcast-style multicam recordings: switch between synchronized camera angles at specified cut points with transitions, and replace audio with an external track.

Quick Start

from videopython.editing import MultiCamEdit, CutPoint
from videopython.base import FadeTransition

edit = MultiCamEdit(
    sources={
        "wide": "cam1.mp4",
        "closeup1": "cam2.mp4",
        "closeup2": "cam3.mp4",
    },
    audio_source="podcast_audio.aac",
    cuts=[
        CutPoint(time=0.0, camera="wide"),
        CutPoint(time=15.0, camera="closeup1", transition=FadeTransition(0.5)),
        CutPoint(time=45.0, camera="wide", transition=FadeTransition(0.5)),
        CutPoint(time=60.0, camera="closeup2"),
    ],
)

video = edit.run()
video.save("podcast.mp4")

Data Model

  • sources: Named camera angles as dict[str, Path].
  • cuts: Ordered list of CutPoints. First cut must start at time=0.0. Each segment runs from its time until the next cut's time (last segment runs to end of source).
  • audio_source: Optional external audio file. If None, output is silent. Camera mic audio is always discarded.
  • default_transition: Transition used between cuts when a CutPoint has no explicit transition. Defaults to InstantTransition (hard cut).

Requirements

  • All sources must have identical fps and resolution.
  • All sources must be synchronized (same start time and duration).
  • Cuts must be in strictly ascending order.

Validation

Validate the plan and predict output metadata without loading video frames:

predicted = edit.validate()
print(predicted)  # VideoMetadata(width=1280, height=720, fps=25, ...)

Validation accounts for duration consumed by fade/blur transitions.

JSON Schema

Use MultiCamEdit.json_schema() to get a JSON Schema describing valid plans. Pass it to an LLM API as a tool definition or structured-output format:

schema = MultiCamEdit.json_schema()
# schema includes sources, cuts, transitions, audio_source

JSON Serialization

# Serialize
data = edit.to_dict()

# Deserialize
edit = MultiCamEdit.from_dict(data)
edit = MultiCamEdit.from_json('{"sources": {...}, "cuts": [...]}')

Premiere XML Export

Export a MultiCamEdit plan to FCP7 XML (xmeml) for direct import into Adobe Premiere Pro:

from videopython.editing import MultiCamEdit, CutPoint, to_premiere_xml

edit = MultiCamEdit(
    sources={"wide": "cam1.mp4", "closeup": "cam2.mp4"},
    audio_source="podcast_audio.aac",
    cuts=[
        CutPoint(time=0.0, camera="wide"),
        CutPoint(time=15.0, camera="closeup"),
        CutPoint(time=45.0, camera="wide"),
    ],
)

xml = to_premiere_xml(edit)
Path("project.xml").write_text(xml)

Import in Premiere via File > Import and select the .xml file.

What gets exported

  • Each cut becomes a <clipitem> on the video track, directly referencing its source file. Source offsets are baked into the in/out points.
  • External audio becomes a single continuous clip on stereo audio tracks.
  • FadeTransition becomes a Cross Dissolve <transitionitem> on the video track.
  • InstantTransition is a hard cut (no transition element).

Known limitations

  • BlurTransition has no xmeml equivalent and is exported as a hard cut.
  • File paths are absolute file://localhost/ URLs. Not portable across machines without relinking media in Premiere.
  • Audio tracks assume stereo (2 channels).

API Reference

VideoEdit

VideoEdit

Represents a complete multi-segment video editing plan.

Source code in src/videopython/editing/video_edit.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
class VideoEdit:
    """Represents a complete multi-segment video editing plan."""

    def __init__(
        self,
        segments: Sequence[SegmentConfig],
        post_transform_records: Sequence[_StepRecord] | None = None,
        post_effect_records: Sequence[_StepRecord] | None = None,
        match_to_lowest_fps: bool = True,
        match_to_lowest_resolution: bool = True,
    ):
        if not segments:
            raise ValueError("VideoEdit requires at least one segment")
        self.segments: tuple[SegmentConfig, ...] = tuple(segments)
        self.post_transform_records: tuple[_StepRecord, ...] = tuple(post_transform_records or ())
        self.post_effect_records: tuple[_StepRecord, ...] = tuple(post_effect_records or ())
        self.match_to_lowest_fps: bool = match_to_lowest_fps
        self.match_to_lowest_resolution: bool = match_to_lowest_resolution

        for record in self.post_transform_records:
            if not isinstance(record.operation, Transformation):
                raise TypeError(
                    "VideoEdit.post_transform_records must contain "
                    f"Transformation operations, got {type(record.operation)}"
                )
        for record in self.post_effect_records:
            if not isinstance(record.operation, Effect):
                raise TypeError(
                    f"VideoEdit.post_effect_records must contain Effect operations, got {type(record.operation)}"
                )

    @classmethod
    def from_json(cls, text: str) -> VideoEdit:
        try:
            data = json.loads(text)
        except json.JSONDecodeError as e:
            raise ValueError(f"Invalid VideoEdit JSON: {e.msg} at line {e.lineno} column {e.colno}") from e
        return cls.from_dict(data)

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> VideoEdit:
        if not isinstance(data, dict):
            raise ValueError("VideoEdit plan must be a JSON object")

        segments_data = data.get("segments")
        if segments_data is None:
            raise ValueError("VideoEdit plan is missing required key 'segments'")
        if not isinstance(segments_data, list):
            raise ValueError("VideoEdit plan 'segments' must be a list")
        if not segments_data:
            raise ValueError("VideoEdit plan 'segments' must not be empty")

        post_transforms_data = data.get("post_transforms", [])
        post_effects_data = data.get("post_effects", [])
        if not isinstance(post_transforms_data, list):
            raise ValueError("VideoEdit plan 'post_transforms' must be a list")
        if not isinstance(post_effects_data, list):
            raise ValueError("VideoEdit plan 'post_effects' must be a list")

        segments: list[SegmentConfig] = []
        for i, segment_data in enumerate(segments_data):
            location = f"segments[{i}]"
            segments.append(_parse_segment(segment_data, location))

        post_transform_records = [
            _parse_transform_step(step, f"post_transforms[{i}]") for i, step in enumerate(post_transforms_data)
        ]
        post_effect_records = [
            _parse_effect_step(step, f"post_effects[{i}]") for i, step in enumerate(post_effects_data)
        ]

        return cls(
            segments=segments,
            post_transform_records=post_transform_records,
            post_effect_records=post_effect_records,
            match_to_lowest_fps=data.get("match_to_lowest_fps", True),
            match_to_lowest_resolution=data.get("match_to_lowest_resolution", True),
        )

    def to_dict(self) -> dict[str, Any]:
        """Serialize to canonical JSON-compatible dict.

        Serialization uses `_StepRecord` snapshots as the source of truth. Mutating
        live operation objects after parsing/construction does not affect output.
        """
        result: dict[str, Any] = {
            "segments": [self._segment_to_dict(segment) for segment in self.segments],
            "post_transforms": [_step_to_dict(record, include_apply=False) for record in self.post_transform_records],
            "post_effects": [_step_to_dict(record, include_apply=True) for record in self.post_effect_records],
        }
        if not self.match_to_lowest_fps:
            result["match_to_lowest_fps"] = False
        if not self.match_to_lowest_resolution:
            result["match_to_lowest_resolution"] = False
        return result

    @classmethod
    def json_schema(cls) -> dict[str, Any]:
        """Return a JSON Schema for `VideoEdit` plans."""
        transform_specs = _videoedit_supported_specs_for_category(OperationCategory.TRANSFORMATION)
        effect_specs = _videoedit_supported_specs_for_category(OperationCategory.EFFECT)

        transform_step_schemas = [
            _videoedit_step_schema_from_spec(spec, include_apply=False) for spec in transform_specs
        ]
        effect_step_schemas = [_videoedit_step_schema_from_spec(spec, include_apply=True) for spec in effect_specs]

        segment_schema: dict[str, Any] = {
            "type": "object",
            "properties": {
                "source": {"type": "string", "description": "Source video path."},
                "start": {"type": "number", "description": "Segment start time in seconds."},
                "end": {"type": "number", "description": "Segment end time in seconds."},
                "transforms": {
                    "type": "array",
                    "items": {"oneOf": transform_step_schemas},
                },
                "effects": {
                    "type": "array",
                    "items": {"oneOf": effect_step_schemas},
                },
            },
            "required": ["source", "start", "end"],
            "additionalProperties": False,
        }

        return {
            "$schema": "http://json-schema.org/draft-07/schema#",
            "type": "object",
            "properties": {
                "segments": {
                    "type": "array",
                    "items": segment_schema,
                    "minItems": 1,
                },
                "post_transforms": {
                    "type": "array",
                    "items": {"oneOf": transform_step_schemas},
                },
                "post_effects": {
                    "type": "array",
                    "items": {"oneOf": effect_step_schemas},
                },
            },
            "required": ["segments"],
        }

    def run(self, context: dict[str, Any] | None = None) -> Video:
        """Execute the editing plan and return the final video.

        Args:
            context: Optional side-channel data for context-dependent operations.
                Operations whose registry spec has a ``requires_transcript`` tag
                receive ``context["transcription"]`` as a keyword argument.
        """
        video = self._assemble_segments(context)
        for record in self.post_transform_records:
            video = _apply_transform_with_context(record, video, context)
        for record in self.post_effect_records:
            if not isinstance(record.operation, Effect):
                raise TypeError(
                    f"VideoEdit.post_effect_records must contain Effect operations, got {type(record.operation)}"
                )
            video = record.operation.apply(
                video,
                start=_coerce_optional_number(record.apply_args.get("start"), "start"),
                stop=_coerce_optional_number(record.apply_args.get("stop"), "stop"),
            )
        return video

    def run_to_file(
        self,
        output_path: str | Path,
        format: ALLOWED_VIDEO_FORMATS = "mp4",
        preset: ALLOWED_VIDEO_PRESETS = "medium",
        crf: int = 23,
        context: dict[str, Any] | None = None,
    ) -> Path:
        """Execute the editing plan, streaming directly to a file.

        Memory usage is O(1) w.r.t. video length for fully streamable pipelines.
        Falls back to eager mode (run + save) for non-streamable operations.

        Args:
            output_path: Destination file path.
            format: Output container format.
            preset: x264 encoding preset.
            crf: Constant rate factor (quality).
            context: Optional side-channel data for context-dependent operations.

        Returns:
            Path to the output file.
        """
        output_path = Path(output_path).with_suffix(f".{format}")
        output_path.parent.mkdir(parents=True, exist_ok=True)

        # Fall back to eager if post-transforms or non-streamable post-effects exist
        if self.post_transform_records:
            return self._run_to_file_eager(output_path, format, preset, crf, context)

        for record in self.post_effect_records:
            if not isinstance(record.operation, Effect) or not record.operation.supports_streaming:
                return self._run_to_file_eager(output_path, format, preset, crf, context)

        # Compute matching targets
        target_fps, target_w, target_h = self._compute_matching_targets()

        # Analyze each segment
        plans: list[StreamingSegmentPlan | None] = []
        for segment in self.segments:
            plan = self._build_streaming_plan(segment, target_fps, target_w, target_h, context)
            plans.append(plan)

        # If any segment can't stream, fall back entirely
        if any(p is None for p in plans):
            return self._run_to_file_eager(output_path, format, preset, crf, context)

        streaming_plans: list[StreamingSegmentPlan] = plans  # type: ignore[assignment]

        # Fold post-effects into plans (they apply to the full assembled video)
        # For simplicity, fold into single-segment plans; multi-segment post-effects
        # require a second pass which we skip for now
        if self.post_effect_records and len(streaming_plans) > 1:
            return self._run_to_file_eager(output_path, format, preset, crf, context)

        if self.post_effect_records and len(streaming_plans) == 1:
            plan = streaming_plans[0]
            total_frames = round((plan.end_second - plan.start_second) * plan.output_fps)
            for record in self.post_effect_records:
                start_s = _coerce_optional_number(record.apply_args.get("start"), "start")
                stop_s = _coerce_optional_number(record.apply_args.get("stop"), "stop")
                start_f = round(start_s * plan.output_fps) if start_s is not None else 0
                end_f = round(stop_s * plan.output_fps) if stop_s is not None else total_frames
                assert isinstance(record.operation, Effect)
                plan.effect_schedule.append(EffectScheduleEntry(record.operation, start_f, end_f))

        import tempfile

        if len(streaming_plans) == 1:
            plan = streaming_plans[0]
            audio = self._load_segment_audio(self.segments[0], plan, context)
            return stream_segment(plan, output_path, audio=audio, format=format, preset=preset, crf=crf)
        else:
            # Multi-segment: stream each to temp, then concat
            temp_files: list[Path] = []
            try:
                for segment, plan in zip(self.segments, streaming_plans):
                    temp = tempfile.NamedTemporaryFile(suffix=f".{format}", delete=False)
                    temp.close()
                    audio = self._load_segment_audio(segment, plan, context)
                    stream_segment(plan, Path(temp.name), audio=audio, format=format, preset=preset, crf=crf)
                    temp_files.append(Path(temp.name))
                return concat_files(temp_files, output_path)
            finally:
                for f in temp_files:
                    f.unlink(missing_ok=True)

    def _run_to_file_eager(
        self,
        output_path: Path,
        format: ALLOWED_VIDEO_FORMATS,
        preset: ALLOWED_VIDEO_PRESETS,
        crf: int,
        context: dict[str, Any] | None,
    ) -> Path:
        """Fallback: run eagerly and save."""
        video = self.run(context=context)
        return video.save(output_path, format=format, preset=preset, crf=crf)

    def _compute_matching_targets(self) -> tuple[float | None, int | None, int | None]:
        """Compute fps/width/height matching targets across segments."""
        target_fps, target_w, target_h = None, None, None
        if len(self.segments) > 1 and (self.match_to_lowest_fps or self.match_to_lowest_resolution):
            source_metas = [VideoMetadata.from_path(str(seg.source_video)) for seg in self.segments]
            if self.match_to_lowest_fps:
                target_fps = min(m.fps for m in source_metas)
            if self.match_to_lowest_resolution:
                target_w = min(m.width for m in source_metas)
                target_h = min(m.height for m in source_metas)
        return target_fps, target_w, target_h

    def _build_streaming_plan(
        self,
        segment: SegmentConfig,
        target_fps: float | None,
        target_w: int | None,
        target_h: int | None,
        context: dict[str, Any] | None,
    ) -> StreamingSegmentPlan | None:
        """Try to build a streaming plan for a segment. Returns None if not streamable."""
        source_meta = VideoMetadata.from_path(str(segment.source_video))
        vf_filters: list[str] = []

        # Start with matching targets (applied as decode filters)
        out_fps = target_fps or source_meta.fps
        out_w = target_w or source_meta.width
        out_h = target_h or source_meta.height

        if target_w and target_h and (target_w != source_meta.width or target_h != source_meta.height):
            vf_filters.append(f"scale={target_w}:{target_h}")
        if target_fps and target_fps != source_meta.fps:
            vf_filters.append(f"fps={target_fps}")

        # Compile transforms to ffmpeg filters
        for record in segment.transform_records:
            vf = _compile_transform_to_vf(record, out_w, out_h, out_fps)
            if vf is None:
                return None  # Non-streamable transform
            if vf.filter_expr:
                vf_filters.append(vf.filter_expr)
            out_w = vf.out_width
            out_h = vf.out_height
            out_fps = vf.out_fps

        # Check effects are streamable
        effect_schedule: list[EffectScheduleEntry] = []
        duration = segment.end_second - segment.start_second
        total_frames = round(duration * out_fps)

        for record in segment.effect_records:
            if not isinstance(record.operation, Effect):
                return None
            if not record.operation.supports_streaming:
                return None
            # Compute frame range
            start_s = _coerce_optional_number(record.apply_args.get("start"), "start")
            stop_s = _coerce_optional_number(record.apply_args.get("stop"), "stop")
            start_f = round(start_s * out_fps) if start_s is not None else 0
            end_f = round(stop_s * out_fps) if stop_s is not None else total_frames
            effect_schedule.append(EffectScheduleEntry(record.operation, start_f, end_f))

        return StreamingSegmentPlan(
            source_path=segment.source_video,
            start_second=segment.start_second,
            end_second=segment.end_second,
            output_fps=out_fps,
            output_width=out_w,
            output_height=out_h,
            vf_filters=vf_filters,
            effect_schedule=effect_schedule,
        )

    def _load_segment_audio(
        self,
        segment: SegmentConfig,
        plan: StreamingSegmentPlan,
        context: dict[str, Any] | None,
    ) -> Audio | None:
        """Load and process audio for a segment."""
        import warnings

        from videopython.base.audio import AudioLoadError

        try:
            audio = Audio.from_path(str(segment.source_video))
            audio = audio.slice(segment.start_second, segment.end_second)
        except (AudioLoadError, FileNotFoundError, subprocess.CalledProcessError):
            duration = segment.end_second - segment.start_second
            warnings.warn(f"No audio found for `{segment.source_video}`, using silent track.")
            audio = Audio.create_silent(duration_seconds=round(duration, 2), stereo=True, sample_rate=44100)

        # Apply audio effects (AudioEffect subclasses + Fade audio component)
        for entry in plan.effect_schedule:
            effect = entry.effect
            start_s = entry.start_frame / plan.output_fps
            stop_s = entry.end_frame / plan.output_fps
            if isinstance(effect, AudioEffect):
                effect._apply_audio(audio, start_s, stop_s, plan.output_fps)
            elif isinstance(effect, Fade) and audio is not None and not audio.is_silent:
                effect.apply_audio(audio, start_s, stop_s)

        return audio

    def validate(self, context: dict[str, Any] | None = None) -> VideoMetadata:
        """Validate the editing plan without loading video data.

        Requires source video files to be present on disk (uses ``VideoMetadata.from_path``).
        For validation without file access, use :meth:`validate_with_metadata`.

        Args:
            context: Optional side-channel data for context-dependent operations.
                Operations whose registry spec has a ``requires_transcript`` tag
                use ``context["transcription"]`` for metadata prediction.
        """
        source_metas = [self._validate_source_meta(i, seg) for i, seg in enumerate(self.segments)]
        source_metas = self._match_metas(source_metas)
        segment_metas = [
            self._apply_segment_meta_ops(i, seg, meta, context)
            for i, (seg, meta) in enumerate(zip(self.segments, source_metas))
        ]
        return self._validate_assembled(segment_metas, context)

    def validate_with_metadata(
        self,
        source_metadata: VideoMetadata | dict[str, VideoMetadata],
        context: dict[str, Any] | None = None,
    ) -> VideoMetadata:
        """Validate the editing plan using pre-built metadata instead of loading from file.

        Same validation as validate() but accepts a VideoMetadata directly,
        avoiding the need for the source video file to be on disk.

        Args:
            source_metadata: VideoMetadata for the source video (duration, dimensions, fps).
                For multi-source plans, pass a dict mapping source paths to their metadata.
            context: Optional side-channel data for context-dependent operations.
                Operations whose registry spec has a ``requires_transcript`` tag
                use ``context["transcription"]`` for metadata prediction.

        Returns:
            Predicted output VideoMetadata after all operations.

        Raises:
            ValueError: If any validation check fails.
        """
        if isinstance(source_metadata, VideoMetadata):
            meta_map: dict[str, VideoMetadata] = {str(seg.source_video): source_metadata for seg in self.segments}
        else:
            meta_map = source_metadata

        source_metas: list[VideoMetadata] = []
        for i, segment in enumerate(self.segments):
            source_key = str(segment.source_video)
            if source_key not in meta_map:
                raise ValueError(
                    f"Segment {i}: no metadata provided for source '{source_key}'. Available keys: {sorted(meta_map)}"
                )
            source_metas.append(self._validate_source_meta(i, segment, meta_map[source_key]))
        source_metas = self._match_metas(source_metas)
        segment_metas = [
            self._apply_segment_meta_ops(i, seg, meta, context)
            for i, (seg, meta) in enumerate(zip(self.segments, source_metas))
        ]
        return self._validate_assembled(segment_metas, context)

    def _validate_assembled(
        self, segment_metas: list[VideoMetadata], runtime_context: dict[str, Any] | None = None
    ) -> VideoMetadata:
        if len(segment_metas) > 1:
            first = segment_metas[0]
            for j, other in enumerate(segment_metas[1:], start=1):
                if first.fps != other.fps:
                    raise ValueError(
                        f"Segment 0 output fps ({first.fps}) != segment {j} output fps ({other.fps}). "
                        f"All segments must have identical fps for concatenation."
                    )
                if (first.width, first.height) != (other.width, other.height):
                    raise ValueError(
                        f"Segment 0 output dimensions ({first.width}x{first.height}) != "
                        f"segment {j} output dimensions ({other.width}x{other.height}). "
                        f"All segments must have identical dimensions for concatenation."
                    )

        meta = VideoMetadata(
            height=segment_metas[0].height,
            width=segment_metas[0].width,
            fps=segment_metas[0].fps,
            frame_count=sum(m.frame_count for m in segment_metas),
            total_seconds=round(sum(m.total_seconds for m in segment_metas), 4),
        )

        for record in self.post_transform_records:
            meta = _predict_transform_metadata(
                meta,
                record.op_id,
                record.args,
                context=f"post-assembly ({record.op_id})",
                runtime_context=runtime_context,
            )
        for record in self.post_effect_records:
            _validate_effect_bounds(record, meta.total_seconds, context="post-assembly")

        return meta

    def _segment_to_dict(self, segment: SegmentConfig) -> dict[str, Any]:
        return {
            "source": str(segment.source_video),
            "start": segment.start_second,
            "end": segment.end_second,
            "transforms": [_step_to_dict(record, include_apply=False) for record in segment.transform_records],
            "effects": [_step_to_dict(record, include_apply=True) for record in segment.effect_records],
        }

    def _validate_source_meta(
        self, index: int, segment: SegmentConfig, source_meta: VideoMetadata | None = None
    ) -> VideoMetadata:
        """Validate segment bounds and return cut source metadata (no transforms/effects)."""
        ctx = f"Segment {index}"
        if segment.start_second < 0:
            raise ValueError(f"{ctx}: start_second ({segment.start_second}) must be >= 0")
        if segment.end_second <= segment.start_second:
            raise ValueError(
                f"{ctx}: end_second ({segment.end_second}) must be > start_second ({segment.start_second})"
            )
        meta = source_meta if source_meta is not None else VideoMetadata.from_path(str(segment.source_video))
        if segment.end_second > meta.total_seconds:
            raise ValueError(
                f"{ctx}: end_second ({segment.end_second}) exceeds source duration ({meta.total_seconds}s)"
            )
        return meta.cut(segment.start_second, segment.end_second)

    def _apply_segment_meta_ops(
        self,
        index: int,
        segment: SegmentConfig,
        meta: VideoMetadata,
        runtime_context: dict[str, Any] | None = None,
    ) -> VideoMetadata:
        """Apply per-segment transform/effect metadata predictions."""
        ctx = f"Segment {index}"
        for record in segment.transform_records:
            meta = _predict_transform_metadata(
                meta, record.op_id, record.args, context=f"{ctx} ({record.op_id})", runtime_context=runtime_context
            )
        for record in segment.effect_records:
            _validate_effect_bounds(record, meta.total_seconds, context=ctx)
        return meta

    def _match_metas(self, metas: list[VideoMetadata]) -> list[VideoMetadata]:
        """Apply matching to source metadata list."""
        if len(metas) <= 1:
            return metas
        if self.match_to_lowest_fps:
            min_fps = min(m.fps for m in metas)
            metas = [m.resample_fps(min_fps) if m.fps != min_fps else m for m in metas]
        if self.match_to_lowest_resolution:
            min_w = min(m.width for m in metas)
            min_h = min(m.height for m in metas)
            metas = [m.resize(width=min_w, height=min_h) if (m.width, m.height) != (min_w, min_h) else m for m in metas]
        return metas

    def _assemble_segments(self, context: dict[str, Any] | None = None) -> Video:
        # Compute matching targets from source metadata before loading.
        target_fps, target_w, target_h = None, None, None
        if len(self.segments) > 1 and (self.match_to_lowest_fps or self.match_to_lowest_resolution):
            source_metas = [VideoMetadata.from_path(str(seg.source_video)) for seg in self.segments]
            if self.match_to_lowest_fps:
                target_fps = min(m.fps for m in source_metas)
            if self.match_to_lowest_resolution:
                target_w = min(m.width for m in source_metas)
                target_h = min(m.height for m in source_metas)

        # Load segments with matching applied via ffmpeg, then apply per-segment ops.
        videos = [
            segment.apply_operations(
                segment.load_segment(fps=target_fps, width=target_w, height=target_h),
                context,
            )
            for segment in self.segments
        ]
        result = videos[0]
        for video in videos[1:]:
            result = result + video
        return result

to_dict

to_dict() -> dict[str, Any]

Serialize to canonical JSON-compatible dict.

Serialization uses _StepRecord snapshots as the source of truth. Mutating live operation objects after parsing/construction does not affect output.

Source code in src/videopython/editing/video_edit.py
def to_dict(self) -> dict[str, Any]:
    """Serialize to canonical JSON-compatible dict.

    Serialization uses `_StepRecord` snapshots as the source of truth. Mutating
    live operation objects after parsing/construction does not affect output.
    """
    result: dict[str, Any] = {
        "segments": [self._segment_to_dict(segment) for segment in self.segments],
        "post_transforms": [_step_to_dict(record, include_apply=False) for record in self.post_transform_records],
        "post_effects": [_step_to_dict(record, include_apply=True) for record in self.post_effect_records],
    }
    if not self.match_to_lowest_fps:
        result["match_to_lowest_fps"] = False
    if not self.match_to_lowest_resolution:
        result["match_to_lowest_resolution"] = False
    return result

json_schema classmethod

json_schema() -> dict[str, Any]

Return a JSON Schema for VideoEdit plans.

Source code in src/videopython/editing/video_edit.py
@classmethod
def json_schema(cls) -> dict[str, Any]:
    """Return a JSON Schema for `VideoEdit` plans."""
    transform_specs = _videoedit_supported_specs_for_category(OperationCategory.TRANSFORMATION)
    effect_specs = _videoedit_supported_specs_for_category(OperationCategory.EFFECT)

    transform_step_schemas = [
        _videoedit_step_schema_from_spec(spec, include_apply=False) for spec in transform_specs
    ]
    effect_step_schemas = [_videoedit_step_schema_from_spec(spec, include_apply=True) for spec in effect_specs]

    segment_schema: dict[str, Any] = {
        "type": "object",
        "properties": {
            "source": {"type": "string", "description": "Source video path."},
            "start": {"type": "number", "description": "Segment start time in seconds."},
            "end": {"type": "number", "description": "Segment end time in seconds."},
            "transforms": {
                "type": "array",
                "items": {"oneOf": transform_step_schemas},
            },
            "effects": {
                "type": "array",
                "items": {"oneOf": effect_step_schemas},
            },
        },
        "required": ["source", "start", "end"],
        "additionalProperties": False,
    }

    return {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "type": "object",
        "properties": {
            "segments": {
                "type": "array",
                "items": segment_schema,
                "minItems": 1,
            },
            "post_transforms": {
                "type": "array",
                "items": {"oneOf": transform_step_schemas},
            },
            "post_effects": {
                "type": "array",
                "items": {"oneOf": effect_step_schemas},
            },
        },
        "required": ["segments"],
    }

run

run(context: dict[str, Any] | None = None) -> Video

Execute the editing plan and return the final video.

Parameters:

Name Type Description Default
context dict[str, Any] | None

Optional side-channel data for context-dependent operations. Operations whose registry spec has a requires_transcript tag receive context["transcription"] as a keyword argument.

None
Source code in src/videopython/editing/video_edit.py
def run(self, context: dict[str, Any] | None = None) -> Video:
    """Execute the editing plan and return the final video.

    Args:
        context: Optional side-channel data for context-dependent operations.
            Operations whose registry spec has a ``requires_transcript`` tag
            receive ``context["transcription"]`` as a keyword argument.
    """
    video = self._assemble_segments(context)
    for record in self.post_transform_records:
        video = _apply_transform_with_context(record, video, context)
    for record in self.post_effect_records:
        if not isinstance(record.operation, Effect):
            raise TypeError(
                f"VideoEdit.post_effect_records must contain Effect operations, got {type(record.operation)}"
            )
        video = record.operation.apply(
            video,
            start=_coerce_optional_number(record.apply_args.get("start"), "start"),
            stop=_coerce_optional_number(record.apply_args.get("stop"), "stop"),
        )
    return video

run_to_file

run_to_file(
    output_path: str | Path,
    format: ALLOWED_VIDEO_FORMATS = "mp4",
    preset: ALLOWED_VIDEO_PRESETS = "medium",
    crf: int = 23,
    context: dict[str, Any] | None = None,
) -> Path

Execute the editing plan, streaming directly to a file.

Memory usage is O(1) w.r.t. video length for fully streamable pipelines. Falls back to eager mode (run + save) for non-streamable operations.

Parameters:

Name Type Description Default
output_path str | Path

Destination file path.

required
format ALLOWED_VIDEO_FORMATS

Output container format.

'mp4'
preset ALLOWED_VIDEO_PRESETS

x264 encoding preset.

'medium'
crf int

Constant rate factor (quality).

23
context dict[str, Any] | None

Optional side-channel data for context-dependent operations.

None

Returns:

Type Description
Path

Path to the output file.

Source code in src/videopython/editing/video_edit.py
def run_to_file(
    self,
    output_path: str | Path,
    format: ALLOWED_VIDEO_FORMATS = "mp4",
    preset: ALLOWED_VIDEO_PRESETS = "medium",
    crf: int = 23,
    context: dict[str, Any] | None = None,
) -> Path:
    """Execute the editing plan, streaming directly to a file.

    Memory usage is O(1) w.r.t. video length for fully streamable pipelines.
    Falls back to eager mode (run + save) for non-streamable operations.

    Args:
        output_path: Destination file path.
        format: Output container format.
        preset: x264 encoding preset.
        crf: Constant rate factor (quality).
        context: Optional side-channel data for context-dependent operations.

    Returns:
        Path to the output file.
    """
    output_path = Path(output_path).with_suffix(f".{format}")
    output_path.parent.mkdir(parents=True, exist_ok=True)

    # Fall back to eager if post-transforms or non-streamable post-effects exist
    if self.post_transform_records:
        return self._run_to_file_eager(output_path, format, preset, crf, context)

    for record in self.post_effect_records:
        if not isinstance(record.operation, Effect) or not record.operation.supports_streaming:
            return self._run_to_file_eager(output_path, format, preset, crf, context)

    # Compute matching targets
    target_fps, target_w, target_h = self._compute_matching_targets()

    # Analyze each segment
    plans: list[StreamingSegmentPlan | None] = []
    for segment in self.segments:
        plan = self._build_streaming_plan(segment, target_fps, target_w, target_h, context)
        plans.append(plan)

    # If any segment can't stream, fall back entirely
    if any(p is None for p in plans):
        return self._run_to_file_eager(output_path, format, preset, crf, context)

    streaming_plans: list[StreamingSegmentPlan] = plans  # type: ignore[assignment]

    # Fold post-effects into plans (they apply to the full assembled video)
    # For simplicity, fold into single-segment plans; multi-segment post-effects
    # require a second pass which we skip for now
    if self.post_effect_records and len(streaming_plans) > 1:
        return self._run_to_file_eager(output_path, format, preset, crf, context)

    if self.post_effect_records and len(streaming_plans) == 1:
        plan = streaming_plans[0]
        total_frames = round((plan.end_second - plan.start_second) * plan.output_fps)
        for record in self.post_effect_records:
            start_s = _coerce_optional_number(record.apply_args.get("start"), "start")
            stop_s = _coerce_optional_number(record.apply_args.get("stop"), "stop")
            start_f = round(start_s * plan.output_fps) if start_s is not None else 0
            end_f = round(stop_s * plan.output_fps) if stop_s is not None else total_frames
            assert isinstance(record.operation, Effect)
            plan.effect_schedule.append(EffectScheduleEntry(record.operation, start_f, end_f))

    import tempfile

    if len(streaming_plans) == 1:
        plan = streaming_plans[0]
        audio = self._load_segment_audio(self.segments[0], plan, context)
        return stream_segment(plan, output_path, audio=audio, format=format, preset=preset, crf=crf)
    else:
        # Multi-segment: stream each to temp, then concat
        temp_files: list[Path] = []
        try:
            for segment, plan in zip(self.segments, streaming_plans):
                temp = tempfile.NamedTemporaryFile(suffix=f".{format}", delete=False)
                temp.close()
                audio = self._load_segment_audio(segment, plan, context)
                stream_segment(plan, Path(temp.name), audio=audio, format=format, preset=preset, crf=crf)
                temp_files.append(Path(temp.name))
            return concat_files(temp_files, output_path)
        finally:
            for f in temp_files:
                f.unlink(missing_ok=True)

validate

validate(
    context: dict[str, Any] | None = None,
) -> VideoMetadata

Validate the editing plan without loading video data.

Requires source video files to be present on disk (uses VideoMetadata.from_path). For validation without file access, use :meth:validate_with_metadata.

Parameters:

Name Type Description Default
context dict[str, Any] | None

Optional side-channel data for context-dependent operations. Operations whose registry spec has a requires_transcript tag use context["transcription"] for metadata prediction.

None
Source code in src/videopython/editing/video_edit.py
def validate(self, context: dict[str, Any] | None = None) -> VideoMetadata:
    """Validate the editing plan without loading video data.

    Requires source video files to be present on disk (uses ``VideoMetadata.from_path``).
    For validation without file access, use :meth:`validate_with_metadata`.

    Args:
        context: Optional side-channel data for context-dependent operations.
            Operations whose registry spec has a ``requires_transcript`` tag
            use ``context["transcription"]`` for metadata prediction.
    """
    source_metas = [self._validate_source_meta(i, seg) for i, seg in enumerate(self.segments)]
    source_metas = self._match_metas(source_metas)
    segment_metas = [
        self._apply_segment_meta_ops(i, seg, meta, context)
        for i, (seg, meta) in enumerate(zip(self.segments, source_metas))
    ]
    return self._validate_assembled(segment_metas, context)

validate_with_metadata

validate_with_metadata(
    source_metadata: VideoMetadata
    | dict[str, VideoMetadata],
    context: dict[str, Any] | None = None,
) -> VideoMetadata

Validate the editing plan using pre-built metadata instead of loading from file.

Same validation as validate() but accepts a VideoMetadata directly, avoiding the need for the source video file to be on disk.

Parameters:

Name Type Description Default
source_metadata VideoMetadata | dict[str, VideoMetadata]

VideoMetadata for the source video (duration, dimensions, fps). For multi-source plans, pass a dict mapping source paths to their metadata.

required
context dict[str, Any] | None

Optional side-channel data for context-dependent operations. Operations whose registry spec has a requires_transcript tag use context["transcription"] for metadata prediction.

None

Returns:

Type Description
VideoMetadata

Predicted output VideoMetadata after all operations.

Raises:

Type Description
ValueError

If any validation check fails.

Source code in src/videopython/editing/video_edit.py
def validate_with_metadata(
    self,
    source_metadata: VideoMetadata | dict[str, VideoMetadata],
    context: dict[str, Any] | None = None,
) -> VideoMetadata:
    """Validate the editing plan using pre-built metadata instead of loading from file.

    Same validation as validate() but accepts a VideoMetadata directly,
    avoiding the need for the source video file to be on disk.

    Args:
        source_metadata: VideoMetadata for the source video (duration, dimensions, fps).
            For multi-source plans, pass a dict mapping source paths to their metadata.
        context: Optional side-channel data for context-dependent operations.
            Operations whose registry spec has a ``requires_transcript`` tag
            use ``context["transcription"]`` for metadata prediction.

    Returns:
        Predicted output VideoMetadata after all operations.

    Raises:
        ValueError: If any validation check fails.
    """
    if isinstance(source_metadata, VideoMetadata):
        meta_map: dict[str, VideoMetadata] = {str(seg.source_video): source_metadata for seg in self.segments}
    else:
        meta_map = source_metadata

    source_metas: list[VideoMetadata] = []
    for i, segment in enumerate(self.segments):
        source_key = str(segment.source_video)
        if source_key not in meta_map:
            raise ValueError(
                f"Segment {i}: no metadata provided for source '{source_key}'. Available keys: {sorted(meta_map)}"
            )
        source_metas.append(self._validate_source_meta(i, segment, meta_map[source_key]))
    source_metas = self._match_metas(source_metas)
    segment_metas = [
        self._apply_segment_meta_ops(i, seg, meta, context)
        for i, (seg, meta) in enumerate(zip(self.segments, source_metas))
    ]
    return self._validate_assembled(segment_metas, context)

SegmentConfig

SegmentConfig is still exported, but most users should construct plans via VideoEdit.from_dict(...) or VideoEdit.from_json(...).

SegmentConfig dataclass

Configuration for a single video segment in an editing plan.

Source code in src/videopython/editing/video_edit.py
@dataclass
class SegmentConfig:
    """Configuration for a single video segment in an editing plan."""

    source_video: Path
    start_second: float
    end_second: float
    transform_records: tuple[_StepRecord, ...] = field(default_factory=tuple)
    effect_records: tuple[_StepRecord, ...] = field(default_factory=tuple)

    def __post_init__(self) -> None:
        self.transform_records = tuple(self.transform_records)
        self.effect_records = tuple(self.effect_records)
        for record in self.transform_records:
            if not isinstance(record.operation, Transformation):
                raise TypeError(
                    "SegmentConfig.transform_records must contain "
                    f"Transformation operations, got {type(record.operation)}"
                )
        for record in self.effect_records:
            if not isinstance(record.operation, Effect):
                raise TypeError(
                    f"SegmentConfig.effect_records must contain Effect operations, got {type(record.operation)}"
                )

    def load_segment(
        self,
        fps: float | None = None,
        width: int | None = None,
        height: int | None = None,
    ) -> Video:
        """Load the raw segment from disk (cut only, no transforms or effects).

        Optional fps/width/height are applied during decoding via ffmpeg filters.
        """
        return Video.from_path(
            str(self.source_video),
            start_second=self.start_second,
            end_second=self.end_second,
            fps=fps,
            width=width,
            height=height,
        )

    def apply_operations(self, video: Video, context: dict[str, Any] | None = None) -> Video:
        """Apply per-segment transforms and effects to a loaded video."""
        for record in self.transform_records:
            video = _apply_transform_with_context(record, video, context)
        for record in self.effect_records:
            if not isinstance(record.operation, Effect):
                raise TypeError(
                    f"SegmentConfig.effect_records must contain Effect operations, got {type(record.operation)}"
                )
            video = record.operation.apply(
                video,
                start=_coerce_optional_number(record.apply_args.get("start"), "start"),
                stop=_coerce_optional_number(record.apply_args.get("stop"), "stop"),
            )
        return video

    def process_segment(self, context: dict[str, Any] | None = None) -> Video:
        """Load the segment and apply transforms then effects."""
        return self.apply_operations(self.load_segment(), context)

load_segment

load_segment(
    fps: float | None = None,
    width: int | None = None,
    height: int | None = None,
) -> Video

Load the raw segment from disk (cut only, no transforms or effects).

Optional fps/width/height are applied during decoding via ffmpeg filters.

Source code in src/videopython/editing/video_edit.py
def load_segment(
    self,
    fps: float | None = None,
    width: int | None = None,
    height: int | None = None,
) -> Video:
    """Load the raw segment from disk (cut only, no transforms or effects).

    Optional fps/width/height are applied during decoding via ffmpeg filters.
    """
    return Video.from_path(
        str(self.source_video),
        start_second=self.start_second,
        end_second=self.end_second,
        fps=fps,
        width=width,
        height=height,
    )

apply_operations

apply_operations(
    video: Video, context: dict[str, Any] | None = None
) -> Video

Apply per-segment transforms and effects to a loaded video.

Source code in src/videopython/editing/video_edit.py
def apply_operations(self, video: Video, context: dict[str, Any] | None = None) -> Video:
    """Apply per-segment transforms and effects to a loaded video."""
    for record in self.transform_records:
        video = _apply_transform_with_context(record, video, context)
    for record in self.effect_records:
        if not isinstance(record.operation, Effect):
            raise TypeError(
                f"SegmentConfig.effect_records must contain Effect operations, got {type(record.operation)}"
            )
        video = record.operation.apply(
            video,
            start=_coerce_optional_number(record.apply_args.get("start"), "start"),
            stop=_coerce_optional_number(record.apply_args.get("stop"), "stop"),
        )
    return video

process_segment

process_segment(
    context: dict[str, Any] | None = None,
) -> Video

Load the segment and apply transforms then effects.

Source code in src/videopython/editing/video_edit.py
def process_segment(self, context: dict[str, Any] | None = None) -> Video:
    """Load the segment and apply transforms then effects."""
    return self.apply_operations(self.load_segment(), context)

MultiCamEdit

MultiCamEdit

Multicam timeline editor for podcast-style recordings.

Switches between synchronized camera angles at specified cut points, joining segments with transitions and replacing audio with an external track (or silence).

Source code in src/videopython/editing/multicam.py
class MultiCamEdit:
    """Multicam timeline editor for podcast-style recordings.

    Switches between synchronized camera angles at specified cut points,
    joining segments with transitions and replacing audio with an external
    track (or silence).
    """

    def __init__(
        self,
        sources: dict[str, str | Path],
        cuts: Sequence[CutPoint],
        audio_source: str | Path | None = None,
        default_transition: Transition | None = None,
        source_offsets: dict[str, float] | None = None,
    ):
        if not sources:
            raise ValueError("MultiCamEdit requires at least one source")
        if not cuts:
            raise ValueError("MultiCamEdit requires at least one cut point")

        self.sources: dict[str, Path] = {k: Path(v) for k, v in sources.items()}
        self.cuts: tuple[CutPoint, ...] = tuple(cuts)
        self.audio_source: Path | None = Path(audio_source) if audio_source else None
        self.default_transition: Transition = default_transition or InstantTransition()
        self.source_offsets: dict[str, float] = source_offsets or {}

        self._validate()

    def _validate(self) -> None:
        # Sources must exist
        for name, path in self.sources.items():
            if not path.exists():
                raise FileNotFoundError(f"Source '{name}' not found: {path}")

        # Audio source must exist if provided
        if self.audio_source and not self.audio_source.exists():
            raise FileNotFoundError(f"Audio source not found: {self.audio_source}")

        # First cut must start at time 0
        if self.cuts[0].time != 0.0:
            raise ValueError(f"First cut must start at time 0.0, got {self.cuts[0].time}")

        # Cuts must be in ascending order
        for i in range(1, len(self.cuts)):
            if self.cuts[i].time <= self.cuts[i - 1].time:
                raise ValueError(
                    f"Cuts must be in strictly ascending order: "
                    f"cut {i} time ({self.cuts[i].time}) <= cut {i - 1} time ({self.cuts[i - 1].time})"
                )

        # All camera references must be valid
        for i, cut in enumerate(self.cuts):
            if cut.camera not in self.sources:
                raise ValueError(
                    f"Cut {i} references unknown camera '{cut.camera}'. Available: {sorted(self.sources.keys())}"
                )

        # All offset keys must reference valid sources
        for name in self.source_offsets:
            if name not in self.sources:
                raise ValueError(
                    f"source_offsets references unknown source '{name}'. Available: {sorted(self.sources.keys())}"
                )

        # All sources must have compatible fps and resolution
        metas: dict[str, VideoMetadata] = {}
        for name, path in self.sources.items():
            metas[name] = VideoMetadata.from_path(str(path))

        meta_list = list(metas.values())
        first = meta_list[0]
        for name, meta in metas.items():
            if meta.fps != first.fps:
                raise ValueError(
                    f"Source '{name}' has fps {meta.fps}, expected {first.fps}. All sources must have the same fps."
                )
            if (meta.width, meta.height) != (first.width, first.height):
                raise ValueError(
                    f"Source '{name}' has resolution {meta.width}x{meta.height}, "
                    f"expected {first.width}x{first.height}. "
                    f"All sources must have the same resolution."
                )

        # Cache source metadata for validate() and run()
        self._source_meta = first
        self._source_duration = min(m.total_seconds for m in meta_list)
        self._source_metas = metas

        # Build per-camera time ranges (cut start, cut end) from the timeline
        camera_ranges: dict[str, list[tuple[float, float]]] = {}
        for cut, start, end in self._cut_ranges():
            camera_ranges.setdefault(cut.camera, []).append((start, end))

        # Validate adjusted seek positions per source
        for camera, ranges in camera_ranges.items():
            offset = self.source_offsets.get(camera, 0.0)
            source_dur = metas[camera].total_seconds
            for start, end in ranges:
                adj_start = start - offset
                adj_end = end - offset
                if adj_start < 0:
                    raise ValueError(
                        f"Cut at timeline {start}s for '{camera}' (offset {offset}s) "
                        f"results in negative seek position ({adj_start}s)"
                    )
                if adj_end > source_dur:
                    raise ValueError(
                        f"Cut ending at timeline {end}s for '{camera}' (offset {offset}s) "
                        f"exceeds source duration ({source_dur}s)"
                    )

    def _cut_ranges(self) -> list[tuple[CutPoint, float, float]]:
        """Build (cut, start_time, end_time) for each segment in the timeline."""
        ranges: list[tuple[CutPoint, float, float]] = []
        for i, cut in enumerate(self.cuts):
            start = cut.time
            end = self.cuts[i + 1].time if i + 1 < len(self.cuts) else self._source_duration
            ranges.append((cut, start, end))
        return ranges

    def run(self) -> Video:
        """Execute the multicam edit and return the final video."""
        # Load and join segments
        result: Video | None = None
        for i, (cut, start, end) in enumerate(self._cut_ranges()):
            source_path = self.sources[cut.camera]
            offset = self.source_offsets.get(cut.camera, 0.0)
            segment = Video.from_path(str(source_path), start_second=start - offset, end_second=end - offset)

            if result is None:
                result = segment
            else:
                transition = cut.transition or self.default_transition
                result = transition.apply((result, segment))

        assert result is not None

        # Replace audio
        if self.audio_source:
            audio = Audio.from_path(self.audio_source)
            audio = audio.fit_to_duration(result.total_seconds)
        else:
            audio = Audio.create_silent(
                duration_seconds=result.total_seconds,
                sample_rate=result.audio.metadata.sample_rate,
            )
        result.audio = audio

        return result

    @property
    def source_meta(self) -> VideoMetadata:
        """Metadata of the reference source (first listed)."""
        return self._source_meta

    @property
    def source_duration(self) -> float:
        """Timeline duration in seconds (minimum across all sources)."""
        return self._source_duration

    @property
    def source_metas(self) -> dict[str, VideoMetadata]:
        """Per-camera metadata keyed by source name."""
        return dict(self._source_metas)

    def validate(self) -> VideoMetadata:
        """Validate the plan and predict output metadata without loading frames."""
        total_seconds = self._source_duration
        fps = self._source_meta.fps

        # Subtract overlap consumed by transitions
        for i in range(1, len(self.cuts)):
            transition = self.cuts[i].transition or self.default_transition
            effect_time = getattr(transition, "effect_time_seconds", 0.0)
            if effect_time > 0:
                total_seconds -= effect_time

        total_seconds = round(total_seconds, 4)
        frame_count = math.floor(total_seconds * fps)

        return VideoMetadata(
            width=self._source_meta.width,
            height=self._source_meta.height,
            fps=fps,
            frame_count=frame_count,
            total_seconds=total_seconds,
        )

    @classmethod
    def json_schema(cls) -> dict[str, Any]:
        """Return a JSON Schema for MultiCamEdit plans."""
        transition_schemas = [
            {
                "type": "object",
                "properties": {"type": {"const": "instant"}},
                "required": ["type"],
                "additionalProperties": False,
            },
            {
                "type": "object",
                "properties": {
                    "type": {"const": "fade"},
                    "effect_time_seconds": {
                        "type": "number",
                        "exclusiveMinimum": 0,
                        "description": "Duration of the crossfade in seconds.",
                    },
                },
                "required": ["type", "effect_time_seconds"],
                "additionalProperties": False,
            },
            {
                "type": "object",
                "properties": {
                    "type": {"const": "blur"},
                    "effect_time_seconds": {
                        "type": "number",
                        "exclusiveMinimum": 0,
                        "description": "Duration of the blur transition in seconds.",
                    },
                    "blur_iterations": {
                        "type": "integer",
                        "minimum": 1,
                        "description": "Blur strength at peak.",
                    },
                    "blur_kernel_size": {
                        "type": "array",
                        "items": {"type": "integer"},
                        "minItems": 2,
                        "maxItems": 2,
                        "description": "Gaussian kernel [width, height] in pixels.",
                    },
                },
                "required": ["type"],
                "additionalProperties": False,
            },
        ]

        cut_schema: dict[str, Any] = {
            "type": "object",
            "properties": {
                "time": {
                    "type": "number",
                    "minimum": 0,
                    "description": "Seconds into the timeline where this cut happens.",
                },
                "camera": {
                    "type": "string",
                    "description": "Camera name (key into sources).",
                },
                "transition": {
                    "oneOf": transition_schemas,
                    "description": "Transition to use at this cut. Omit to use default_transition.",
                },
            },
            "required": ["time", "camera"],
            "additionalProperties": False,
        }

        return {
            "$schema": "http://json-schema.org/draft-07/schema#",
            "type": "object",
            "properties": {
                "sources": {
                    "type": "object",
                    "description": "Named camera sources. Keys are camera names, values are file paths.",
                    "additionalProperties": {"type": "string"},
                    "minProperties": 1,
                },
                "source_offsets": {
                    "type": "object",
                    "additionalProperties": {"type": "number"},
                    "description": "Per-source time offsets in seconds. "
                    "Positive means the source starts later than the timeline origin.",
                },
                "audio_source": {
                    "type": "string",
                    "description": "Path to external audio track. Omit for silent output.",
                },
                "cuts": {
                    "type": "array",
                    "items": cut_schema,
                    "minItems": 1,
                    "description": "Ordered list of camera switches. First cut must have time=0.",
                },
                "default_transition": {
                    "oneOf": transition_schemas,
                    "description": "Transition used between cuts when not specified per-cut.",
                },
            },
            "required": ["sources", "cuts"],
            "additionalProperties": False,
        }

    def to_dict(self) -> dict[str, Any]:
        """Serialize to a JSON-compatible dict."""
        result: dict[str, Any] = {
            "sources": {k: str(v) for k, v in self.sources.items()},
            "cuts": [],
            "default_transition": self.default_transition.to_dict(),
        }
        if self.source_offsets:
            result["source_offsets"] = dict(self.source_offsets)
        if self.audio_source:
            result["audio_source"] = str(self.audio_source)

        for cut in self.cuts:
            cut_dict: dict[str, Any] = {"time": cut.time, "camera": cut.camera}
            if cut.transition is not None:
                cut_dict["transition"] = cut.transition.to_dict()
            result["cuts"].append(cut_dict)

        return result

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> MultiCamEdit:
        """Deserialize from a dict."""
        if not isinstance(data, dict):
            raise ValueError("MultiCamEdit plan must be a JSON object")

        sources = data.get("sources")
        if not isinstance(sources, dict) or not sources:
            raise ValueError("MultiCamEdit plan must have a non-empty 'sources' dict")

        cuts_data = data.get("cuts")
        if not isinstance(cuts_data, list) or not cuts_data:
            raise ValueError("MultiCamEdit plan must have a non-empty 'cuts' list")

        cuts: list[CutPoint] = []
        for i, cut_data in enumerate(cuts_data):
            if not isinstance(cut_data, dict):
                raise ValueError(f"cuts[{i}] must be an object")
            transition = None
            if "transition" in cut_data:
                transition = Transition.from_dict(cut_data["transition"])
            cuts.append(
                CutPoint(
                    time=cut_data["time"],
                    camera=cut_data["camera"],
                    transition=transition,
                )
            )

        default_transition = None
        if "default_transition" in data:
            default_transition = Transition.from_dict(data["default_transition"])

        return cls(
            sources=sources,
            cuts=cuts,
            audio_source=data.get("audio_source"),
            default_transition=default_transition,
            source_offsets=data.get("source_offsets"),
        )

    @classmethod
    def from_json(cls, text: str) -> MultiCamEdit:
        """Deserialize from a JSON string."""
        try:
            data = json.loads(text)
        except json.JSONDecodeError as e:
            raise ValueError(f"Invalid MultiCamEdit JSON: {e.msg} at line {e.lineno} column {e.colno}") from e
        return cls.from_dict(data)

source_meta property

source_meta: VideoMetadata

Metadata of the reference source (first listed).

source_duration property

source_duration: float

Timeline duration in seconds (minimum across all sources).

source_metas property

source_metas: dict[str, VideoMetadata]

Per-camera metadata keyed by source name.

run

run() -> Video

Execute the multicam edit and return the final video.

Source code in src/videopython/editing/multicam.py
def run(self) -> Video:
    """Execute the multicam edit and return the final video."""
    # Load and join segments
    result: Video | None = None
    for i, (cut, start, end) in enumerate(self._cut_ranges()):
        source_path = self.sources[cut.camera]
        offset = self.source_offsets.get(cut.camera, 0.0)
        segment = Video.from_path(str(source_path), start_second=start - offset, end_second=end - offset)

        if result is None:
            result = segment
        else:
            transition = cut.transition or self.default_transition
            result = transition.apply((result, segment))

    assert result is not None

    # Replace audio
    if self.audio_source:
        audio = Audio.from_path(self.audio_source)
        audio = audio.fit_to_duration(result.total_seconds)
    else:
        audio = Audio.create_silent(
            duration_seconds=result.total_seconds,
            sample_rate=result.audio.metadata.sample_rate,
        )
    result.audio = audio

    return result

validate

validate() -> VideoMetadata

Validate the plan and predict output metadata without loading frames.

Source code in src/videopython/editing/multicam.py
def validate(self) -> VideoMetadata:
    """Validate the plan and predict output metadata without loading frames."""
    total_seconds = self._source_duration
    fps = self._source_meta.fps

    # Subtract overlap consumed by transitions
    for i in range(1, len(self.cuts)):
        transition = self.cuts[i].transition or self.default_transition
        effect_time = getattr(transition, "effect_time_seconds", 0.0)
        if effect_time > 0:
            total_seconds -= effect_time

    total_seconds = round(total_seconds, 4)
    frame_count = math.floor(total_seconds * fps)

    return VideoMetadata(
        width=self._source_meta.width,
        height=self._source_meta.height,
        fps=fps,
        frame_count=frame_count,
        total_seconds=total_seconds,
    )

json_schema classmethod

json_schema() -> dict[str, Any]

Return a JSON Schema for MultiCamEdit plans.

Source code in src/videopython/editing/multicam.py
@classmethod
def json_schema(cls) -> dict[str, Any]:
    """Return a JSON Schema for MultiCamEdit plans."""
    transition_schemas = [
        {
            "type": "object",
            "properties": {"type": {"const": "instant"}},
            "required": ["type"],
            "additionalProperties": False,
        },
        {
            "type": "object",
            "properties": {
                "type": {"const": "fade"},
                "effect_time_seconds": {
                    "type": "number",
                    "exclusiveMinimum": 0,
                    "description": "Duration of the crossfade in seconds.",
                },
            },
            "required": ["type", "effect_time_seconds"],
            "additionalProperties": False,
        },
        {
            "type": "object",
            "properties": {
                "type": {"const": "blur"},
                "effect_time_seconds": {
                    "type": "number",
                    "exclusiveMinimum": 0,
                    "description": "Duration of the blur transition in seconds.",
                },
                "blur_iterations": {
                    "type": "integer",
                    "minimum": 1,
                    "description": "Blur strength at peak.",
                },
                "blur_kernel_size": {
                    "type": "array",
                    "items": {"type": "integer"},
                    "minItems": 2,
                    "maxItems": 2,
                    "description": "Gaussian kernel [width, height] in pixels.",
                },
            },
            "required": ["type"],
            "additionalProperties": False,
        },
    ]

    cut_schema: dict[str, Any] = {
        "type": "object",
        "properties": {
            "time": {
                "type": "number",
                "minimum": 0,
                "description": "Seconds into the timeline where this cut happens.",
            },
            "camera": {
                "type": "string",
                "description": "Camera name (key into sources).",
            },
            "transition": {
                "oneOf": transition_schemas,
                "description": "Transition to use at this cut. Omit to use default_transition.",
            },
        },
        "required": ["time", "camera"],
        "additionalProperties": False,
    }

    return {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "type": "object",
        "properties": {
            "sources": {
                "type": "object",
                "description": "Named camera sources. Keys are camera names, values are file paths.",
                "additionalProperties": {"type": "string"},
                "minProperties": 1,
            },
            "source_offsets": {
                "type": "object",
                "additionalProperties": {"type": "number"},
                "description": "Per-source time offsets in seconds. "
                "Positive means the source starts later than the timeline origin.",
            },
            "audio_source": {
                "type": "string",
                "description": "Path to external audio track. Omit for silent output.",
            },
            "cuts": {
                "type": "array",
                "items": cut_schema,
                "minItems": 1,
                "description": "Ordered list of camera switches. First cut must have time=0.",
            },
            "default_transition": {
                "oneOf": transition_schemas,
                "description": "Transition used between cuts when not specified per-cut.",
            },
        },
        "required": ["sources", "cuts"],
        "additionalProperties": False,
    }

to_dict

to_dict() -> dict[str, Any]

Serialize to a JSON-compatible dict.

Source code in src/videopython/editing/multicam.py
def to_dict(self) -> dict[str, Any]:
    """Serialize to a JSON-compatible dict."""
    result: dict[str, Any] = {
        "sources": {k: str(v) for k, v in self.sources.items()},
        "cuts": [],
        "default_transition": self.default_transition.to_dict(),
    }
    if self.source_offsets:
        result["source_offsets"] = dict(self.source_offsets)
    if self.audio_source:
        result["audio_source"] = str(self.audio_source)

    for cut in self.cuts:
        cut_dict: dict[str, Any] = {"time": cut.time, "camera": cut.camera}
        if cut.transition is not None:
            cut_dict["transition"] = cut.transition.to_dict()
        result["cuts"].append(cut_dict)

    return result

from_dict classmethod

from_dict(data: dict[str, Any]) -> MultiCamEdit

Deserialize from a dict.

Source code in src/videopython/editing/multicam.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> MultiCamEdit:
    """Deserialize from a dict."""
    if not isinstance(data, dict):
        raise ValueError("MultiCamEdit plan must be a JSON object")

    sources = data.get("sources")
    if not isinstance(sources, dict) or not sources:
        raise ValueError("MultiCamEdit plan must have a non-empty 'sources' dict")

    cuts_data = data.get("cuts")
    if not isinstance(cuts_data, list) or not cuts_data:
        raise ValueError("MultiCamEdit plan must have a non-empty 'cuts' list")

    cuts: list[CutPoint] = []
    for i, cut_data in enumerate(cuts_data):
        if not isinstance(cut_data, dict):
            raise ValueError(f"cuts[{i}] must be an object")
        transition = None
        if "transition" in cut_data:
            transition = Transition.from_dict(cut_data["transition"])
        cuts.append(
            CutPoint(
                time=cut_data["time"],
                camera=cut_data["camera"],
                transition=transition,
            )
        )

    default_transition = None
    if "default_transition" in data:
        default_transition = Transition.from_dict(data["default_transition"])

    return cls(
        sources=sources,
        cuts=cuts,
        audio_source=data.get("audio_source"),
        default_transition=default_transition,
        source_offsets=data.get("source_offsets"),
    )

from_json classmethod

from_json(text: str) -> MultiCamEdit

Deserialize from a JSON string.

Source code in src/videopython/editing/multicam.py
@classmethod
def from_json(cls, text: str) -> MultiCamEdit:
    """Deserialize from a JSON string."""
    try:
        data = json.loads(text)
    except json.JSONDecodeError as e:
        raise ValueError(f"Invalid MultiCamEdit JSON: {e.msg} at line {e.lineno} column {e.colno}") from e
    return cls.from_dict(data)

CutPoint

CutPoint dataclass

A camera switch point in a multicam timeline.

Attributes:

Name Type Description
time float

Seconds into the timeline where this cut happens.

camera str

Key into the MultiCamEdit.sources dict.

transition Transition | None

Transition to use when switching to this camera. None means use the MultiCamEdit.default_transition.

Source code in src/videopython/editing/multicam.py
@dataclass(frozen=True)
class CutPoint:
    """A camera switch point in a multicam timeline.

    Attributes:
        time: Seconds into the timeline where this cut happens.
        camera: Key into the MultiCamEdit.sources dict.
        transition: Transition to use when switching to this camera.
            None means use the MultiCamEdit.default_transition.
    """

    time: float
    camera: str
    transition: Transition | None = None