Skip to content

AI Generation

Generate videos, images, audio, and music from text prompts.

Backend Support

Class local openai gemini elevenlabs luma runway
TextToVideo CogVideoX1.5-5B - - - Dream Machine -
ImageToVideo CogVideoX1.5-5B-I2V - - - Dream Machine Gen-4 Turbo
VideoUpscaler RealBasicVSR - - - - -
TextToSpeech Bark TTS - Multilingual v2 - -
TextToMusic MusicGen - - - - -
TextToImage SDXL DALL-E 3 - - - -

TextToVideo

TextToVideo

Generates videos from text descriptions using diffusion models.

Source code in src/videopython/ai/generation/video.py
class TextToVideo:
    """Generates videos from text descriptions using diffusion models."""

    SUPPORTED_BACKENDS: list[str] = ["local", "luma"]

    def __init__(
        self,
        backend: TextToVideoBackend | None = None,
        api_key: str | None = None,
    ):
        """Initialize text-to-video generator.

        Args:
            backend: Backend to use. If None, uses config default or 'local'.
            api_key: API key for cloud backends. If None, uses environment variable.
        """
        resolved_backend: str = backend if backend is not None else get_default_backend("text_to_video")
        if resolved_backend not in self.SUPPORTED_BACKENDS:
            raise UnsupportedBackendError(resolved_backend, self.SUPPORTED_BACKENDS)

        self.backend: TextToVideoBackend = resolved_backend  # type: ignore[assignment]
        self._api_key = api_key
        self._pipeline: Any = None
        self._device: str | None = None

    def _init_local(self) -> None:
        """Initialize local diffusion pipeline."""
        from diffusers import CogVideoXPipeline

        self._device, dtype = _get_torch_device_and_dtype()

        model_name = "THUDM/CogVideoX1.5-5B"
        self._pipeline = CogVideoXPipeline.from_pretrained(model_name, torch_dtype=dtype)
        self._pipeline.to(self._device)

    async def _generate_local(
        self,
        prompt: str,
        num_steps: int,
        num_frames: int,
        guidance_scale: float,
    ) -> Video:
        """Generate video using local CogVideoX diffusion model."""
        if self._pipeline is None:
            await asyncio.to_thread(self._init_local)

        def _run_pipeline() -> Video:
            import torch

            video_frames = self._pipeline(
                prompt=prompt,
                num_inference_steps=num_steps,
                num_frames=num_frames,
                guidance_scale=guidance_scale,
                generator=torch.Generator(device=self._device).manual_seed(42),
            ).frames[0]
            video_frames = np.asarray(video_frames, dtype=np.uint8)
            return Video.from_frames(video_frames, fps=16.0)

        return await asyncio.to_thread(_run_pipeline)

    async def _generate_luma(self, prompt: str) -> Video:
        """Generate video using Luma AI Dream Machine API."""
        import tempfile
        from pathlib import Path

        from lumaai import LumaAI

        client = LumaAI(auth_token=get_api_key("luma", self._api_key))

        # Create generation request
        generation = client.generations.create(prompt=prompt, model="ray-2")

        # Poll for completion
        while generation.state not in ["completed", "failed"]:
            await asyncio.sleep(3)
            assert generation.id is not None
            generation = client.generations.get(generation.id)

        if generation.state == "failed":
            raise RuntimeError(f"Luma generation failed: {generation.failure_reason}")

        # Download the video
        if generation.assets is None:
            raise RuntimeError("Luma generation completed but no assets returned")
        video_url = generation.assets.video
        if not video_url:
            raise RuntimeError("Luma generation completed but no video URL returned")

        import httpx

        async with httpx.AsyncClient() as http_client:
            response = await http_client.get(video_url)
            response.raise_for_status()

            with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as f:
                f.write(response.content)
                temp_path = Path(f.name)

        video = Video.from_path(str(temp_path))
        temp_path.unlink()
        return video

    async def generate_video(
        self,
        prompt: str,
        num_steps: int = 50,
        num_frames: int = 81,
        guidance_scale: float = 6.0,
    ) -> Video:
        """Generate video from text prompt.

        Args:
            prompt: Text description of desired video content.
            num_steps: Number of diffusion steps (local backend only). Default 50.
            num_frames: Number of frames to generate (local backend only). Default 81.
            guidance_scale: Prompt guidance strength (local backend only). Default 6.0.

        Returns:
            Generated video.
        """
        if self.backend == "local":
            return await self._generate_local(prompt, num_steps, num_frames, guidance_scale)
        elif self.backend == "luma":
            return await self._generate_luma(prompt)
        else:
            raise UnsupportedBackendError(self.backend, self.SUPPORTED_BACKENDS)

__init__

__init__(
    backend: TextToVideoBackend | None = None,
    api_key: str | None = None,
)

Initialize text-to-video generator.

Parameters:

Name Type Description Default
backend TextToVideoBackend | None

Backend to use. If None, uses config default or 'local'.

None
api_key str | None

API key for cloud backends. If None, uses environment variable.

None
Source code in src/videopython/ai/generation/video.py
def __init__(
    self,
    backend: TextToVideoBackend | None = None,
    api_key: str | None = None,
):
    """Initialize text-to-video generator.

    Args:
        backend: Backend to use. If None, uses config default or 'local'.
        api_key: API key for cloud backends. If None, uses environment variable.
    """
    resolved_backend: str = backend if backend is not None else get_default_backend("text_to_video")
    if resolved_backend not in self.SUPPORTED_BACKENDS:
        raise UnsupportedBackendError(resolved_backend, self.SUPPORTED_BACKENDS)

    self.backend: TextToVideoBackend = resolved_backend  # type: ignore[assignment]
    self._api_key = api_key
    self._pipeline: Any = None
    self._device: str | None = None

generate_video async

generate_video(
    prompt: str,
    num_steps: int = 50,
    num_frames: int = 81,
    guidance_scale: float = 6.0,
) -> Video

Generate video from text prompt.

Parameters:

Name Type Description Default
prompt str

Text description of desired video content.

required
num_steps int

Number of diffusion steps (local backend only). Default 50.

50
num_frames int

Number of frames to generate (local backend only). Default 81.

81
guidance_scale float

Prompt guidance strength (local backend only). Default 6.0.

6.0

Returns:

Type Description
Video

Generated video.

Source code in src/videopython/ai/generation/video.py
async def generate_video(
    self,
    prompt: str,
    num_steps: int = 50,
    num_frames: int = 81,
    guidance_scale: float = 6.0,
) -> Video:
    """Generate video from text prompt.

    Args:
        prompt: Text description of desired video content.
        num_steps: Number of diffusion steps (local backend only). Default 50.
        num_frames: Number of frames to generate (local backend only). Default 81.
        guidance_scale: Prompt guidance strength (local backend only). Default 6.0.

    Returns:
        Generated video.
    """
    if self.backend == "local":
        return await self._generate_local(prompt, num_steps, num_frames, guidance_scale)
    elif self.backend == "luma":
        return await self._generate_luma(prompt)
    else:
        raise UnsupportedBackendError(self.backend, self.SUPPORTED_BACKENDS)

ImageToVideo

ImageToVideo

Generates videos from static images using video diffusion.

Source code in src/videopython/ai/generation/video.py
class ImageToVideo:
    """Generates videos from static images using video diffusion."""

    SUPPORTED_BACKENDS: list[str] = ["local", "luma", "runway"]

    def __init__(
        self,
        backend: ImageToVideoBackend | None = None,
        api_key: str | None = None,
    ):
        """Initialize image-to-video generator.

        Args:
            backend: Backend to use. If None, uses config default or 'local'.
            api_key: API key for cloud backends. If None, uses environment variable.
        """
        resolved_backend: str = backend if backend is not None else get_default_backend("image_to_video")
        if resolved_backend not in self.SUPPORTED_BACKENDS:
            raise UnsupportedBackendError(resolved_backend, self.SUPPORTED_BACKENDS)

        self.backend: ImageToVideoBackend = resolved_backend  # type: ignore[assignment]
        self._api_key = api_key
        self._pipeline: Any = None
        self._device: str | None = None

    def _init_local(self) -> None:
        """Initialize local diffusion pipeline."""
        from diffusers import CogVideoXImageToVideoPipeline

        self._device, dtype = _get_torch_device_and_dtype()

        model_name = "THUDM/CogVideoX1.5-5B-I2V"
        self._pipeline = CogVideoXImageToVideoPipeline.from_pretrained(model_name, torch_dtype=dtype)
        self._pipeline.to(self._device)

    async def _generate_local(
        self,
        image: Image,
        prompt: str,
        num_steps: int,
        num_frames: int,
        guidance_scale: float,
    ) -> Video:
        """Generate video using local CogVideoX I2V diffusion model."""
        if self._pipeline is None:
            await asyncio.to_thread(self._init_local)

        def _run_pipeline() -> Video:
            import torch

            video_frames = self._pipeline(
                prompt=prompt,
                image=image,
                num_inference_steps=num_steps,
                num_frames=num_frames,
                guidance_scale=guidance_scale,
                generator=torch.Generator(device=self._device).manual_seed(42),
            ).frames[0]
            video_frames = np.asarray(video_frames, dtype=np.uint8)
            return Video.from_frames(video_frames, fps=16.0)

        return await asyncio.to_thread(_run_pipeline)

    async def _generate_runway(self, image: Image, prompt: str) -> Video:
        """Generate video using Runway Gen-4 Turbo API."""
        import base64
        import io
        import tempfile
        from pathlib import Path

        from runwayml import RunwayML

        client = RunwayML(api_key=get_api_key("runway", self._api_key))

        # Convert PIL image to base64 data URI
        buffer = io.BytesIO()
        image.save(buffer, format="PNG")
        image_base64 = base64.b64encode(buffer.getvalue()).decode("utf-8")
        image_uri = f"data:image/png;base64,{image_base64}"

        # Create image-to-video task
        task_response = client.image_to_video.create(
            model="gen4_turbo",
            prompt_image=image_uri,
            prompt_text=prompt if prompt else "",
            ratio="1280:720",
        )

        # Poll for completion
        task = client.tasks.retrieve(task_response.id)
        while task.status not in ["SUCCEEDED", "FAILED"]:
            await asyncio.sleep(5)
            task = client.tasks.retrieve(task_response.id)

        if task.status == "FAILED":
            failure_msg = getattr(task, "failure", "Unknown error")
            raise RuntimeError(f"Runway generation failed: {failure_msg}")

        # Download the video - task.status is "SUCCEEDED" at this point
        output = getattr(task, "output", None)
        if not output:
            raise RuntimeError("Runway generation completed but no video URL returned")
        video_url: str = output[0]

        import httpx

        async with httpx.AsyncClient() as http_client:
            response = await http_client.get(video_url)
            response.raise_for_status()

            with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as f:
                f.write(response.content)
                temp_path = Path(f.name)

        video = Video.from_path(str(temp_path))
        temp_path.unlink()
        return video

    async def _generate_luma(self, image: Image, prompt: str) -> Video:
        """Generate video using Luma AI Dream Machine API."""
        import base64
        import io
        import tempfile
        from pathlib import Path

        from lumaai import LumaAI

        client = LumaAI(auth_token=get_api_key("luma", self._api_key))

        # Convert PIL image to base64 data URI
        buffer = io.BytesIO()
        image.save(buffer, format="PNG")
        image_base64 = base64.b64encode(buffer.getvalue()).decode("utf-8")
        image_uri = f"data:image/png;base64,{image_base64}"

        # Create generation request with image
        generation = client.generations.create(
            prompt=prompt if prompt else "Animate this image",
            model="ray-2",
            keyframes={"frame0": {"type": "image", "url": image_uri}},
        )

        # Poll for completion
        while generation.state not in ["completed", "failed"]:
            await asyncio.sleep(3)
            assert generation.id is not None
            generation = client.generations.get(generation.id)

        if generation.state == "failed":
            raise RuntimeError(f"Luma generation failed: {generation.failure_reason}")

        # Download the video
        if generation.assets is None:
            raise RuntimeError("Luma generation completed but no assets returned")
        video_url = generation.assets.video
        if not video_url:
            raise RuntimeError("Luma generation completed but no video URL returned")

        import httpx

        async with httpx.AsyncClient() as http_client:
            response = await http_client.get(video_url)
            response.raise_for_status()

            with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as f:
                f.write(response.content)
                temp_path = Path(f.name)

        video = Video.from_path(str(temp_path))
        temp_path.unlink()
        return video

    async def generate_video(
        self,
        image: Image,
        prompt: str = "",
        num_steps: int = 50,
        num_frames: int = 81,
        guidance_scale: float = 6.0,
    ) -> Video:
        """Generate video animation from a static image.

        Args:
            image: Input PIL image to animate.
            prompt: Text description to guide the animation.
            num_steps: Number of diffusion steps (local backend only). Default 50.
            num_frames: Number of frames to generate (local backend only). Default 81.
            guidance_scale: Prompt guidance strength (local backend only). Default 6.0.

        Returns:
            Generated animated video.
        """
        if self.backend == "local":
            return await self._generate_local(image, prompt, num_steps, num_frames, guidance_scale)
        elif self.backend == "runway":
            return await self._generate_runway(image, prompt)
        elif self.backend == "luma":
            return await self._generate_luma(image, prompt)
        else:
            raise UnsupportedBackendError(self.backend, self.SUPPORTED_BACKENDS)

__init__

__init__(
    backend: ImageToVideoBackend | None = None,
    api_key: str | None = None,
)

Initialize image-to-video generator.

Parameters:

Name Type Description Default
backend ImageToVideoBackend | None

Backend to use. If None, uses config default or 'local'.

None
api_key str | None

API key for cloud backends. If None, uses environment variable.

None
Source code in src/videopython/ai/generation/video.py
def __init__(
    self,
    backend: ImageToVideoBackend | None = None,
    api_key: str | None = None,
):
    """Initialize image-to-video generator.

    Args:
        backend: Backend to use. If None, uses config default or 'local'.
        api_key: API key for cloud backends. If None, uses environment variable.
    """
    resolved_backend: str = backend if backend is not None else get_default_backend("image_to_video")
    if resolved_backend not in self.SUPPORTED_BACKENDS:
        raise UnsupportedBackendError(resolved_backend, self.SUPPORTED_BACKENDS)

    self.backend: ImageToVideoBackend = resolved_backend  # type: ignore[assignment]
    self._api_key = api_key
    self._pipeline: Any = None
    self._device: str | None = None

generate_video async

generate_video(
    image: Image,
    prompt: str = "",
    num_steps: int = 50,
    num_frames: int = 81,
    guidance_scale: float = 6.0,
) -> Video

Generate video animation from a static image.

Parameters:

Name Type Description Default
image Image

Input PIL image to animate.

required
prompt str

Text description to guide the animation.

''
num_steps int

Number of diffusion steps (local backend only). Default 50.

50
num_frames int

Number of frames to generate (local backend only). Default 81.

81
guidance_scale float

Prompt guidance strength (local backend only). Default 6.0.

6.0

Returns:

Type Description
Video

Generated animated video.

Source code in src/videopython/ai/generation/video.py
async def generate_video(
    self,
    image: Image,
    prompt: str = "",
    num_steps: int = 50,
    num_frames: int = 81,
    guidance_scale: float = 6.0,
) -> Video:
    """Generate video animation from a static image.

    Args:
        image: Input PIL image to animate.
        prompt: Text description to guide the animation.
        num_steps: Number of diffusion steps (local backend only). Default 50.
        num_frames: Number of frames to generate (local backend only). Default 81.
        guidance_scale: Prompt guidance strength (local backend only). Default 6.0.

    Returns:
        Generated animated video.
    """
    if self.backend == "local":
        return await self._generate_local(image, prompt, num_steps, num_frames, guidance_scale)
    elif self.backend == "runway":
        return await self._generate_runway(image, prompt)
    elif self.backend == "luma":
        return await self._generate_luma(image, prompt)
    else:
        raise UnsupportedBackendError(self.backend, self.SUPPORTED_BACKENDS)

VideoUpscaler

VideoUpscaler

Upscales video resolution using AI super-resolution models.

Uses RealBasicVSR for 4x upscaling with temporal consistency.

Source code in src/videopython/ai/generation/video.py
class VideoUpscaler:
    """Upscales video resolution using AI super-resolution models.

    Uses RealBasicVSR for 4x upscaling with temporal consistency.
    """

    SUPPORTED_BACKENDS: list[str] = ["local"]

    def __init__(
        self,
        backend: VideoUpscalerBackend | None = None,
    ):
        """Initialize video upscaler.

        Args:
            backend: Backend to use. If None, uses config default or 'local'.
        """
        resolved_backend: str = backend if backend is not None else get_default_backend("video_upscaler")
        if resolved_backend not in self.SUPPORTED_BACKENDS:
            raise UnsupportedBackendError(resolved_backend, self.SUPPORTED_BACKENDS)

        self.backend: VideoUpscalerBackend = resolved_backend  # type: ignore[assignment]
        self._inferencer: Any = None

    def _init_local(self) -> None:
        """Initialize local RealBasicVSR model via MMagic."""
        import torch

        if not torch.cuda.is_available():
            raise ValueError("CUDA is not available, but local VideoUpscaler requires CUDA.")

        from mmagic.apis import MMagicInferencer

        self._inferencer = MMagicInferencer(model_name="realbasicvsr")

    async def _upscale_local(self, video: Video) -> Video:
        """Upscale video using local RealBasicVSR model."""
        import tempfile
        from pathlib import Path

        if self._inferencer is None:
            await asyncio.to_thread(self._init_local)

        def _run_upscale() -> Video:
            with tempfile.TemporaryDirectory() as tmpdir:
                input_path = Path(tmpdir) / "input.mp4"
                output_path = Path(tmpdir) / "output.mp4"

                video.save(str(input_path))

                self._inferencer.infer(video=str(input_path), result_out_dir=str(output_path))

                return Video.from_path(str(output_path))

        return await asyncio.to_thread(_run_upscale)

    async def upscale(self, video: Video) -> Video:
        """Upscale video resolution by 4x.

        Args:
            video: Input video to upscale.

        Returns:
            Upscaled video with 4x resolution.
        """
        if self.backend == "local":
            return await self._upscale_local(video)
        else:
            raise UnsupportedBackendError(self.backend, self.SUPPORTED_BACKENDS)

__init__

__init__(backend: VideoUpscalerBackend | None = None)

Initialize video upscaler.

Parameters:

Name Type Description Default
backend VideoUpscalerBackend | None

Backend to use. If None, uses config default or 'local'.

None
Source code in src/videopython/ai/generation/video.py
def __init__(
    self,
    backend: VideoUpscalerBackend | None = None,
):
    """Initialize video upscaler.

    Args:
        backend: Backend to use. If None, uses config default or 'local'.
    """
    resolved_backend: str = backend if backend is not None else get_default_backend("video_upscaler")
    if resolved_backend not in self.SUPPORTED_BACKENDS:
        raise UnsupportedBackendError(resolved_backend, self.SUPPORTED_BACKENDS)

    self.backend: VideoUpscalerBackend = resolved_backend  # type: ignore[assignment]
    self._inferencer: Any = None

upscale async

upscale(video: Video) -> Video

Upscale video resolution by 4x.

Parameters:

Name Type Description Default
video Video

Input video to upscale.

required

Returns:

Type Description
Video

Upscaled video with 4x resolution.

Source code in src/videopython/ai/generation/video.py
async def upscale(self, video: Video) -> Video:
    """Upscale video resolution by 4x.

    Args:
        video: Input video to upscale.

    Returns:
        Upscaled video with 4x resolution.
    """
    if self.backend == "local":
        return await self._upscale_local(video)
    else:
        raise UnsupportedBackendError(self.backend, self.SUPPORTED_BACKENDS)

TextToImage

TextToImage

Generates images from text descriptions.

Source code in src/videopython/ai/generation/image.py
class TextToImage:
    """Generates images from text descriptions."""

    SUPPORTED_BACKENDS: list[str] = ["local", "openai"]

    def __init__(
        self,
        backend: TextToImageBackend | None = None,
        api_key: str | None = None,
    ):
        """Initialize text-to-image generator.

        Args:
            backend: Backend to use. If None, uses config default or 'local'.
            api_key: API key for cloud backends. If None, reads from environment.
        """
        resolved_backend: str = backend if backend is not None else get_default_backend("text_to_image")
        if resolved_backend not in self.SUPPORTED_BACKENDS:
            raise UnsupportedBackendError(resolved_backend, self.SUPPORTED_BACKENDS)

        self.backend: TextToImageBackend = resolved_backend  # type: ignore[assignment]
        self.api_key = api_key
        self._pipeline: Any = None

    def _init_local(self) -> None:
        """Initialize local diffusion pipeline."""
        import torch
        from diffusers import DiffusionPipeline

        if not torch.cuda.is_available():
            raise ValueError("CUDA is not available, but local TextToImage requires CUDA.")

        model_name = "stabilityai/stable-diffusion-xl-base-1.0"
        self._pipeline = DiffusionPipeline.from_pretrained(
            model_name, torch_dtype=torch.float16, variant="fp16", use_safetensors=True
        )
        self._pipeline.to("cuda")

    async def _generate_local(self, prompt: str) -> Image.Image:
        """Generate image using local diffusion model."""
        if self._pipeline is None:
            await asyncio.to_thread(self._init_local)

        def _run_pipeline() -> Image.Image:
            return self._pipeline(prompt=prompt).images[0]

        return await asyncio.to_thread(_run_pipeline)

    async def _generate_openai(self, prompt: str, size: str) -> Image.Image:
        """Generate image using OpenAI DALL-E."""
        import httpx
        from openai import AsyncOpenAI

        api_key = get_api_key("openai", self.api_key)
        client = AsyncOpenAI(api_key=api_key)

        response = await client.images.generate(
            model="dall-e-3",
            prompt=prompt,
            size=size,  # type: ignore
            quality="hd",
            n=1,
        )

        image_url = response.data[0].url
        if image_url is None:
            raise RuntimeError("OpenAI returned no image URL")

        # Download the image
        async with httpx.AsyncClient() as http_client:
            image_response = await http_client.get(image_url, timeout=60.0)
            image_response.raise_for_status()

            return Image.open(io.BytesIO(image_response.content))

    async def generate_image(
        self,
        prompt: str,
        size: str = "1024x1024",
    ) -> Image.Image:
        """Generate image from text prompt.

        Args:
            prompt: Text description of desired image.
            size: Image size (OpenAI backend only). Options: "1024x1024", "1792x1024", "1024x1792".

        Returns:
            Generated PIL Image.
        """
        if self.backend == "local":
            return await self._generate_local(prompt)
        elif self.backend == "openai":
            return await self._generate_openai(prompt, size)
        else:
            raise UnsupportedBackendError(self.backend, self.SUPPORTED_BACKENDS)

__init__

__init__(
    backend: TextToImageBackend | None = None,
    api_key: str | None = None,
)

Initialize text-to-image generator.

Parameters:

Name Type Description Default
backend TextToImageBackend | None

Backend to use. If None, uses config default or 'local'.

None
api_key str | None

API key for cloud backends. If None, reads from environment.

None
Source code in src/videopython/ai/generation/image.py
def __init__(
    self,
    backend: TextToImageBackend | None = None,
    api_key: str | None = None,
):
    """Initialize text-to-image generator.

    Args:
        backend: Backend to use. If None, uses config default or 'local'.
        api_key: API key for cloud backends. If None, reads from environment.
    """
    resolved_backend: str = backend if backend is not None else get_default_backend("text_to_image")
    if resolved_backend not in self.SUPPORTED_BACKENDS:
        raise UnsupportedBackendError(resolved_backend, self.SUPPORTED_BACKENDS)

    self.backend: TextToImageBackend = resolved_backend  # type: ignore[assignment]
    self.api_key = api_key
    self._pipeline: Any = None

generate_image async

generate_image(
    prompt: str, size: str = "1024x1024"
) -> Image.Image

Generate image from text prompt.

Parameters:

Name Type Description Default
prompt str

Text description of desired image.

required
size str

Image size (OpenAI backend only). Options: "1024x1024", "1792x1024", "1024x1792".

'1024x1024'

Returns:

Type Description
Image

Generated PIL Image.

Source code in src/videopython/ai/generation/image.py
async def generate_image(
    self,
    prompt: str,
    size: str = "1024x1024",
) -> Image.Image:
    """Generate image from text prompt.

    Args:
        prompt: Text description of desired image.
        size: Image size (OpenAI backend only). Options: "1024x1024", "1792x1024", "1024x1792".

    Returns:
        Generated PIL Image.
    """
    if self.backend == "local":
        return await self._generate_local(prompt)
    elif self.backend == "openai":
        return await self._generate_openai(prompt, size)
    else:
        raise UnsupportedBackendError(self.backend, self.SUPPORTED_BACKENDS)

TextToSpeech

TextToSpeech

Generates speech audio from text.

Source code in src/videopython/ai/generation/audio.py
class TextToSpeech:
    """Generates speech audio from text."""

    SUPPORTED_BACKENDS: list[str] = ["local", "openai", "elevenlabs"]

    def __init__(
        self,
        backend: TextToSpeechBackend | None = None,
        model_size: str = "base",
        voice: str | None = None,
        api_key: str | None = None,
        device: str | None = None,
    ):
        """Initialize text-to-speech generator.

        Args:
            backend: Backend to use. If None, uses config default or 'local'.
            model_size: Model size for local backend ('base' or 'small').
            voice: Voice to use (backend-specific).
            api_key: API key for cloud backends. If None, reads from environment.
            device: Device for local backend ('cuda' or 'cpu').
        """
        resolved_backend: str = backend if backend is not None else get_default_backend("text_to_speech")
        if resolved_backend not in self.SUPPORTED_BACKENDS:
            raise UnsupportedBackendError(resolved_backend, self.SUPPORTED_BACKENDS)

        self.backend: TextToSpeechBackend = resolved_backend  # type: ignore[assignment]
        self.model_size = model_size
        self.voice = voice
        self.api_key = api_key
        self.device = device
        self._model: Any = None
        self._processor: Any = None

    def _init_local(self) -> None:
        """Initialize local Bark model."""
        import torch
        from transformers import AutoModel, AutoProcessor

        if self.model_size not in ["base", "small"]:
            raise ValueError(f"model_size must be 'base' or 'small', got '{self.model_size}'")

        device = self.device
        if device is None:
            device = "cuda" if torch.cuda.is_available() else "cpu"

        model_name = "suno/bark" if self.model_size == "base" else "suno/bark-small"
        self._processor = AutoProcessor.from_pretrained(model_name)
        self._model = AutoModel.from_pretrained(model_name).to(device)
        self.device = device

    async def _generate_local(
        self,
        text: str,
        voice_preset: str | None,
    ) -> Audio:
        """Generate speech using local Bark model."""
        import torch

        if self._model is None:
            await asyncio.to_thread(self._init_local)

        def _run_model() -> Audio:
            inputs = self._processor(text=[text], return_tensors="pt", voice_preset=voice_preset)
            inputs = {k: v.to(self.device) if isinstance(v, torch.Tensor) else v for k, v in inputs.items()}

            with torch.no_grad():
                speech_values = self._model.generate(**inputs, do_sample=True)

            audio_data = speech_values.cpu().float().numpy().squeeze()
            sample_rate = self._model.generation_config.sample_rate

            metadata = AudioMetadata(
                sample_rate=sample_rate,
                channels=1,
                sample_width=2,
                duration_seconds=len(audio_data) / sample_rate,
                frame_count=len(audio_data),
            )
            return Audio(audio_data, metadata)

        return await asyncio.to_thread(_run_model)

    async def _generate_openai(self, text: str) -> Audio:
        """Generate speech using OpenAI TTS."""

        import numpy as np
        from openai import AsyncOpenAI

        api_key = get_api_key("openai", self.api_key)
        client = AsyncOpenAI(api_key=api_key)

        voice = self.voice or "alloy"
        response = await client.audio.speech.create(
            model="tts-1-hd",
            voice=voice,  # type: ignore
            input=text,
            response_format="pcm",
        )

        # OpenAI returns raw PCM at 24kHz, 16-bit, mono
        audio_bytes = await response.aread()
        audio_data = np.frombuffer(audio_bytes, dtype=np.int16).astype(np.float32) / 32768.0
        sample_rate = 24000

        metadata = AudioMetadata(
            sample_rate=sample_rate,
            channels=1,
            sample_width=2,
            duration_seconds=len(audio_data) / sample_rate,
            frame_count=len(audio_data),
        )
        return Audio(audio_data, metadata)

    async def _generate_elevenlabs(self, text: str) -> Audio:
        """Generate speech using ElevenLabs."""
        import numpy as np
        from elevenlabs import AsyncElevenLabs

        api_key = get_api_key("elevenlabs", self.api_key)
        client = AsyncElevenLabs(api_key=api_key)

        voice = self.voice or "Sarah"

        # Resolve voice name to ID if needed (voice IDs are 20+ chars)
        if len(voice) < 20:
            voices = await client.voices.get_all()
            voice_id = None
            for v in voices.voices:
                if v.name and voice.lower() in v.name.lower():
                    voice_id = v.voice_id
                    break
            if voice_id is None:
                raise ValueError(f"Voice '{voice}' not found. Use a voice ID or valid name.")
            voice = voice_id

        # Generate audio - returns async generator directly (no await)
        audio_chunks = []
        async for chunk in client.text_to_speech.convert(
            voice_id=voice,
            text=text,
            model_id="eleven_multilingual_v2",
            output_format="pcm_24000",
        ):
            audio_chunks.append(chunk)

        audio_bytes = b"".join(audio_chunks)
        audio_data = np.frombuffer(audio_bytes, dtype=np.int16).astype(np.float32) / 32768.0
        sample_rate = 24000

        metadata = AudioMetadata(
            sample_rate=sample_rate,
            channels=1,
            sample_width=2,
            duration_seconds=len(audio_data) / sample_rate,
            frame_count=len(audio_data),
        )
        return Audio(audio_data, metadata)

    async def generate_audio(
        self,
        text: str,
        voice_preset: str | None = None,
    ) -> Audio:
        """Generate speech audio from text.

        Args:
            text: Text to synthesize. For local backend, can include emotion markers
                  like [laughs], [sighs].
            voice_preset: Voice preset (backend-specific). For local backend, use
                          IDs like "v2/en_speaker_0".

        Returns:
            Generated speech audio.
        """
        effective_voice = voice_preset or self.voice

        if self.backend == "local":
            return await self._generate_local(text, effective_voice)
        elif self.backend == "openai":
            return await self._generate_openai(text)
        elif self.backend == "elevenlabs":
            return await self._generate_elevenlabs(text)
        else:
            raise UnsupportedBackendError(self.backend, self.SUPPORTED_BACKENDS)

__init__

__init__(
    backend: TextToSpeechBackend | None = None,
    model_size: str = "base",
    voice: str | None = None,
    api_key: str | None = None,
    device: str | None = None,
)

Initialize text-to-speech generator.

Parameters:

Name Type Description Default
backend TextToSpeechBackend | None

Backend to use. If None, uses config default or 'local'.

None
model_size str

Model size for local backend ('base' or 'small').

'base'
voice str | None

Voice to use (backend-specific).

None
api_key str | None

API key for cloud backends. If None, reads from environment.

None
device str | None

Device for local backend ('cuda' or 'cpu').

None
Source code in src/videopython/ai/generation/audio.py
def __init__(
    self,
    backend: TextToSpeechBackend | None = None,
    model_size: str = "base",
    voice: str | None = None,
    api_key: str | None = None,
    device: str | None = None,
):
    """Initialize text-to-speech generator.

    Args:
        backend: Backend to use. If None, uses config default or 'local'.
        model_size: Model size for local backend ('base' or 'small').
        voice: Voice to use (backend-specific).
        api_key: API key for cloud backends. If None, reads from environment.
        device: Device for local backend ('cuda' or 'cpu').
    """
    resolved_backend: str = backend if backend is not None else get_default_backend("text_to_speech")
    if resolved_backend not in self.SUPPORTED_BACKENDS:
        raise UnsupportedBackendError(resolved_backend, self.SUPPORTED_BACKENDS)

    self.backend: TextToSpeechBackend = resolved_backend  # type: ignore[assignment]
    self.model_size = model_size
    self.voice = voice
    self.api_key = api_key
    self.device = device
    self._model: Any = None
    self._processor: Any = None

generate_audio async

generate_audio(
    text: str, voice_preset: str | None = None
) -> Audio

Generate speech audio from text.

Parameters:

Name Type Description Default
text str

Text to synthesize. For local backend, can include emotion markers like [laughs], [sighs].

required
voice_preset str | None

Voice preset (backend-specific). For local backend, use IDs like "v2/en_speaker_0".

None

Returns:

Type Description
Audio

Generated speech audio.

Source code in src/videopython/ai/generation/audio.py
async def generate_audio(
    self,
    text: str,
    voice_preset: str | None = None,
) -> Audio:
    """Generate speech audio from text.

    Args:
        text: Text to synthesize. For local backend, can include emotion markers
              like [laughs], [sighs].
        voice_preset: Voice preset (backend-specific). For local backend, use
                      IDs like "v2/en_speaker_0".

    Returns:
        Generated speech audio.
    """
    effective_voice = voice_preset or self.voice

    if self.backend == "local":
        return await self._generate_local(text, effective_voice)
    elif self.backend == "openai":
        return await self._generate_openai(text)
    elif self.backend == "elevenlabs":
        return await self._generate_elevenlabs(text)
    else:
        raise UnsupportedBackendError(self.backend, self.SUPPORTED_BACKENDS)

TextToMusic

TextToMusic

Generates music from text descriptions.

Source code in src/videopython/ai/generation/audio.py
class TextToMusic:
    """Generates music from text descriptions."""

    SUPPORTED_BACKENDS: list[str] = ["local"]

    def __init__(
        self,
        backend: TextToMusicBackend | None = None,
    ):
        """Initialize text-to-music generator.

        Args:
            backend: Backend to use. If None, uses config default or 'local'.
        """
        resolved_backend: str = backend if backend is not None else get_default_backend("text_to_music")
        if resolved_backend not in self.SUPPORTED_BACKENDS:
            raise UnsupportedBackendError(resolved_backend, self.SUPPORTED_BACKENDS)

        self.backend: TextToMusicBackend = resolved_backend  # type: ignore[assignment]
        self._processor: Any = None
        self._model: Any = None

    def _init_local(self) -> None:
        """Initialize local MusicGen model."""
        from transformers import AutoProcessor, MusicgenForConditionalGeneration

        model_name = "facebook/musicgen-small"
        self._processor = AutoProcessor.from_pretrained(model_name)
        self._model = MusicgenForConditionalGeneration.from_pretrained(model_name)

    async def _generate_local(self, text: str, max_new_tokens: int) -> Audio:
        """Generate music using local MusicGen model."""
        if self._model is None:
            await asyncio.to_thread(self._init_local)

        def _run_model() -> Audio:
            inputs = self._processor(text=[text], padding=True, return_tensors="pt")
            audio_values = self._model.generate(**inputs, max_new_tokens=max_new_tokens)
            sampling_rate = self._model.config.audio_encoder.sampling_rate

            audio_data = audio_values[0, 0].float().numpy()

            metadata = AudioMetadata(
                sample_rate=sampling_rate,
                channels=1,
                sample_width=2,
                duration_seconds=len(audio_data) / sampling_rate,
                frame_count=len(audio_data),
            )
            return Audio(audio_data, metadata)

        return await asyncio.to_thread(_run_model)

    async def generate_audio(self, text: str, max_new_tokens: int = 256) -> Audio:
        """Generate music audio from text description.

        Args:
            text: Text description of desired music.
            max_new_tokens: Maximum length of generated audio in tokens.

        Returns:
            Generated music audio.
        """
        if self.backend == "local":
            return await self._generate_local(text, max_new_tokens)
        else:
            raise UnsupportedBackendError(self.backend, self.SUPPORTED_BACKENDS)

__init__

__init__(backend: TextToMusicBackend | None = None)

Initialize text-to-music generator.

Parameters:

Name Type Description Default
backend TextToMusicBackend | None

Backend to use. If None, uses config default or 'local'.

None
Source code in src/videopython/ai/generation/audio.py
def __init__(
    self,
    backend: TextToMusicBackend | None = None,
):
    """Initialize text-to-music generator.

    Args:
        backend: Backend to use. If None, uses config default or 'local'.
    """
    resolved_backend: str = backend if backend is not None else get_default_backend("text_to_music")
    if resolved_backend not in self.SUPPORTED_BACKENDS:
        raise UnsupportedBackendError(resolved_backend, self.SUPPORTED_BACKENDS)

    self.backend: TextToMusicBackend = resolved_backend  # type: ignore[assignment]
    self._processor: Any = None
    self._model: Any = None

generate_audio async

generate_audio(
    text: str, max_new_tokens: int = 256
) -> Audio

Generate music audio from text description.

Parameters:

Name Type Description Default
text str

Text description of desired music.

required
max_new_tokens int

Maximum length of generated audio in tokens.

256

Returns:

Type Description
Audio

Generated music audio.

Source code in src/videopython/ai/generation/audio.py
async def generate_audio(self, text: str, max_new_tokens: int = 256) -> Audio:
    """Generate music audio from text description.

    Args:
        text: Text description of desired music.
        max_new_tokens: Maximum length of generated audio in tokens.

    Returns:
        Generated music audio.
    """
    if self.backend == "local":
        return await self._generate_local(text, max_new_tokens)
    else:
        raise UnsupportedBackendError(self.backend, self.SUPPORTED_BACKENDS)