Learn by example: Making it easier to understand GStreamer

We like step-by-step learning.

I have been using and learning GStreamer multimedia framework for the last +5 years, during this time I have also sent back some contributions. My first contribution was the implementation of the Rust version of the HLS sink element in 2021 (hlssink3). But I cannot say that I had completely grasped the framework concepts, it is a very large project and full of conventions. You can only learn those conventions by reading the GStreamer code, writing GStreamer applications, and participating in the community. Still I constantly find myself thinking about pads, elements, pipeline, etc, and learning something new.

Recently, I thought about yet another way to deepen my knowledge of GStreamer: write GStreamer myself! Of course I am joking here, but why not a greatly simplified version of the framework focused on learning the basic concepts. Things like pads, elements, pipeline and buffers and some time/clock handling will get you a long way. I get asked frequently about "how to learn GStreamer" and my answer is always: write code with GStreamer. Today, we will write "GStreamer code" from scratch, so you won't feel overwhelmed by all the information. We will call it "GStreamer Mini" (or gst_mini) and it is written in Python.

The Core Concepts and Objects

GStreamer is an object oriented framework, we have classes inheritance, properties, all the usual suspects (OOP abstractions). On top of those, like all code frameworks, we have core classes that we use to build upon. We will learn about Buffers, Pads, Elements, Pipeline, Clocks.

Buffers

Buffers hold the data we want to send through the pipeline. Buffers can hold any kind of data, GStreamer does not impose any limits on that. Ideally, buffers hold data that is temporal and produced over time in a streaming way (thus the name "streamer"). You can add anything to buffers, people has been using it to hold coded multimedia data, but you can use it to hold readings from your thermostat (that could be a cool GStreamer element actually). Trimming buffers to their bare bones fields, we are left with presentation time (PTS), duration, flags, and data.

from enum import Flag, auto
from typing import Any


class BufferFlags(Flag):
    """Buffer flags similar to GstBufferFlags."""
    NONE = 0
    DISCONT = auto()  # Discontinuity in the stream
    EOS = auto()      # End of stream marker
    DELTA_UNIT = auto()  # Indicates the buffer is not decodable by itself


class GstBuffer:
    """
    Simplified GstBuffer carrying timestamped data.

    Corresponds to GstBuffer in gstbuffer.h:80-133
    """

    def __init__(self, pts: float, duration: float, data: Any, flags: BufferFlags = BufferFlags.NONE):
        """
        Create a buffer.

        Args:
            pts: Presentation timestamp in seconds (like GST_BUFFER_PTS)
            duration: Buffer duration in seconds (like GST_BUFFER_DURATION)
            data: Payload data (dict for simulated frames, or segment info)
            flags: Buffer flags (DISCONT, EOS, etc.)
        """
        self.pts = pts
        self.duration = duration
        self.data = data
        self.flags = flags

    def has_flag(self, flag: BufferFlags) -> bool:
        """Check if buffer has a specific flag."""
        return bool(self.flags & flag)

    def set_flag(self, flag: BufferFlags):
        """Set a flag on the buffer."""
        self.flags |= flag

    def unset_flag(self, flag: BufferFlags):
        """Unset a flag on the buffer."""
        self.flags &= ~flag

We also define here some minimal flags we can set in our buffers. GStreamer has way more flags than those for buffers, but the EOS, DELTA_UNIT, and DISCONT will get us a long way. Those flags are direct takes from the real GStreamer codebase.

Clocks

The next concept we gonna implement are Clocks. As a streaming over time framework clocks are a very important concept for timing and syncronization. GStreamer implements multiple types of clocks, the basic definition of a clock in GStreamer is that it is always increasing through time (never goes backwards) and produces an absolute time. We will use the Python's time module monotonic function and write a wrapper class around it with some helpers.

import time
from typing import Tuple


class GstClockReturn:
    """Return values from clock operations."""
    OK = "ok"
    EARLY = "early"
    UNSCHEDULED = "unscheduled"
    BADTIME = "badtime"


class GstClock:
    """
    Simplified GstClock for timing and synchronization.

    Corresponds to gst_clock_id_wait in gstbasesink.c:2381
    """

    def __init__(self):
        """Create a clock."""
        self._start_time = None

    def start(self):
        """Start the clock (called when pipeline goes to PLAYING)."""
        self._start_time = time.monotonic()

    def get_time(self) -> float:
        """
        Get current clock time in seconds.

        Returns:
            Time in seconds since clock started, or 0 if not started
        """
        if self._start_time is None:
            return 0.0
        return time.monotonic() - self._start_time

    def wait_until(self, target_time: float) -> Tuple[str, float]:
        """
        Wait until the clock reaches the target time.

        This is similar to gst_clock_id_wait in gstbasesink.c:2381

        Args:
            target_time: Target time to wait for (in seconds)

        Returns:
            Tuple of (GstClockReturn, jitter in seconds)
            - jitter > 0: woke up late
            - jitter < 0: woke up early
            - jitter = 0: perfect timing
        """
        if self._start_time is None:
            return (GstClockReturn.BADTIME, 0.0)

        current = self.get_time()
        wait_duration = target_time - current

        if wait_duration <= 0:
            # Already past target time - return immediately with negative jitter
            jitter = -wait_duration
            return (GstClockReturn.OK, jitter)

        # Sleep until target time
        time.sleep(wait_duration)

        # Calculate jitter (how far off we were)
        actual_time = self.get_time()
        jitter = actual_time - target_time

        return (GstClockReturn.OK, jitter)

The Clock class will help us to track progress of time after a start instant. It also has a wait_until method which will be essential later for us to handle synchronisation in our elements. Again, those methods have direct relation to real GStreamer methods.

Elements

Elements are the steps of a GStreamer pipeline. Elements may produce, process, and receive buffers over time. Elements are specialised and focused on one function they process the buffers. I have talked about the different types of elements in GStreamer in my previous blog post "A brief introduction to GStreamer". Here we will represent the most basic functionality of elements in GStreamer plus some helper methods.

from abc import ABC, abstractmethod
from enum import Enum, auto
from typing import Optional

from .pad import GstPad
from .segment import GstSegment


class GstState(Enum):
    """Element states (simplified version of GstState)."""
    NULL = auto()
    READY = auto()
    PAUSED = auto()
    PLAYING = auto()


class GstElement(ABC):
    """
    Base class for all pipeline elements.

    Elements are the processing units in a pipeline.
    """

    def __init__(self, name: str):
        """
        Create an element.

        Args:
            name: Element name
        """
        self.name = name
        self.state = GstState.NULL
        self.src_pad: Optional[GstPad] = None
        self.sink_pad: Optional[GstPad] = None
        self.segment = GstSegment()
        self.pipeline: Optional['GstElement'] = None  # Will be set by pipeline

    def log(self, message: str):
        """
        Log a message with element context.

        Args:
            message: Message to log
        """
        print(f"[{self.pipeline.clock.get_time():.3f}s] {self.name}: {message}")

    def create_src_pad(self, name: str = "src") -> GstPad:
        """
        Create a source pad.

        Args:
            name: Pad name

        Returns:
            Created source pad
        """
        self.src_pad = GstPad(name, self)
        return self.src_pad

    def create_sink_pad(self, name: str = "sink") -> GstPad:
        """
        Create a sink pad with chain function.

        Args:
            name: Pad name

        Returns:
            Created sink pad
        """
        self.sink_pad = GstPad(name, self)
        return self.sink_pad

    def link(self, downstream: 'GstElement'):
        """
        Link this element to a downstream element.

        Args:
            downstream: Element to link to
        """
        if self.src_pad is None:
            raise ValueError(f"{self.name} has no source pad")
        if downstream.sink_pad is None:
            raise ValueError(f"{downstream.name} has no sink pad")

        self.src_pad.link(downstream.sink_pad)

    def set_state(self, state: GstState):
        """
        Change element state.

        Args:
            state: New state
        """
        if self.state == state:
            return

        old_state = self.state

        # Handle direct jumps (e.g., NULL -> PLAYING)
        # Go through intermediate states
        if old_state == GstState.NULL and state == GstState.PLAYING:
            self.set_state(GstState.READY)
            self.set_state(GstState.PAUSED)
            self.set_state(GstState.PLAYING)
            return
        elif old_state == GstState.NULL and state == GstState.PAUSED:
            self.set_state(GstState.READY)
            self.set_state(GstState.PAUSED)
            return

        self.state = state

        # Call subclass handlers
        if state == GstState.READY and old_state == GstState.NULL:
            self.on_ready()
        elif state == GstState.PAUSED and old_state == GstState.READY:
            self.on_paused()
        elif state == GstState.PLAYING and old_state == GstState.PAUSED:
            self.on_playing()
        elif state == GstState.PAUSED and old_state == GstState.PLAYING:
            self.on_paused_from_playing()
        elif state == GstState.NULL:
            self.on_null()

    # State change hooks (can be overridden by subclasses)
    def on_ready(self):
        """Called when transitioning to READY state."""
        pass

    def on_paused(self):
        """Called when transitioning to PAUSED state."""
        pass

    def on_playing(self):
        """Called when transitioning to PLAYING state."""
        pass

    def on_paused_from_playing(self):
        """Called when transitioning from PLAYING to PAUSED."""
        pass

    def on_null(self):
        """Called when transitioning to NULL state."""
        pass

I tried to keep this as simple as possible. One important part of this code is the handling of transition of states of elements.

Elements always transition between all the following states in order:

You can read in much more detail about each of those states in the official documentation.

Lastly, we add a few methods to help us create pads on our elements. Pads are the next concept we will dive into.

Pads

This might be a new concept for you, but you can think of pads like pipes where you push buffers between elements. You need to connect pads from a source element to a destination or sink element. Pads are another critical concept in GStreamer. Elements may contain one or many pads which can be dynamically created or static to the element (always present).

Our Pad implementation in gst_mini is greatly simplified. But I have tried to keep it relatable to the actual Pad implementation in the GStreamer codebase. Pads have a peer pad, which is populated when the pad is "connected". A pad connection is a complex process in GStreamer, but the essence is that pads need to be compatible to connect. Pads have a "type", which is called "caps" or capabilities, defining what type of data the pad is capable of handling. We are not representing capabilities in our codebase. We will be responsible and only connect compatible pads in our examples, promise.

import threading
from typing import Callable, Optional, TYPE_CHECKING
from enum import Enum, auto

from .buffer import GstBuffer

if TYPE_CHECKING:
    from .element import GstElement


class GstFlowReturn(Enum):
    """Return values for pad operations (similar to GstFlowReturn)."""
    OK = auto()
    EOS = auto()
    FLUSHING = auto()
    ERROR = auto()


class GstPad:
    """
    Simplified GstPad for connecting elements.

    Corresponds to gst_pad_push / gst_pad_chain_data_unchecked in gstpad.c:4497-4586
    """

    def __init__(self, name: str, element: 'GstElement'):
        """
        Create a pad.

        Args:
            name: Pad name (e.g., "src", "sink")
            element: Parent element
        """
        self.name = name
        self.element = element
        self.peer: Optional['GstPad'] = None
        self.chain_function: Optional[Callable[[GstBuffer], GstFlowReturn]] = None

        # Thread safety - similar to GST_PAD_STREAM_LOCK
        self._lock = threading.Lock()
        self._flushing = False

    def link(self, peer_pad: 'GstPad'):
        """
        Link this pad to a peer pad.

        Args:
            peer_pad: The pad to link to
        """
        self.peer = peer_pad
        peer_pad.peer = self

    def set_chain_function(self, func: Callable[[GstBuffer], GstFlowReturn]):
        """
        Set the chain function for this pad.

        Args:
            func: Function to call when receiving buffers
        """
        self.chain_function = func

    def push(self, buffer: GstBuffer) -> GstFlowReturn:
        """
        Push a buffer to the peer pad.

        This is similar to gst_pad_push() in gstpad.c:4795

        Args:
            buffer: Buffer to push

        Returns:
            GstFlowReturn indicating success/failure
        """
        if self.peer is None:
            return GstFlowReturn.ERROR

        # Call peer's chain function - this is the synchronous call
        # that makes chain calls blocking (gstpad.c:4560)
        return self.peer._chain(buffer)

    def _chain(self, buffer: GstBuffer) -> GstFlowReturn:
        """
        Internal chain function handler.

        Similar to gst_pad_chain_data_unchecked in gstpad.c:4497

        Args:
            buffer: Buffer received

        Returns:
            GstFlowReturn from the chain function
        """
        # Acquire stream lock (GST_PAD_STREAM_LOCK at gstpad.c:4509)
        with self._lock:
            if self._flushing:
                return GstFlowReturn.FLUSHING

            if self.chain_function is None:
                return GstFlowReturn.ERROR

            # Call the element's chain function (gstpad.c:4560)
            return self.chain_function(buffer)

One aspect of Pads that we must learn is the "chain function". The chain function of a pad is the main way the buffer is handled by the pad and consequently by the element that the pad belongs to. When a source element pushes buffers to its src pad, the sink chain function of the peer pad is called. We implement this in our code and it is similar to what the GStreamer codebase does as well.

The call stack of the chain function would look like something like this:

datasource - PUSHING buffer 0
  encoder.chain() - RECEIVED buffer
  encoder.chain() - PUSHING encoded buffer downstream
    segmenter.chain() - RECEIVED buffer
    segmenter.chain() - PUSHING segmented buffer downstream
        publisher.chain() - RECEIVED buffer
        publisher.chain() - RETURNING OK to caller
    segmenter.chain() - RETURNING OK to caller
  encoder.chain() - RETURNING OK to caller
datasource - PUSH RETURNED: OK

We will dive more on this process in the future.

Pipeline

Another central concept of GStreamer are pipelines. As you might have guessed, pipelines hold a collection of elements that must work in synchrony. Pipelines trigger operations on elements and hold the shared concept of time among its elements. Elements must be added to a pipeline to work properly in GStreamer. In most cases, your GStreamer based application will be built around a pipeline. Pipelines can be hold very complex graphs with many elements.

import time
from typing import List

from .element import GstElement, GstState
from .clock import GstClock


class GstPipeline:
    """
    Pipeline manages a collection of linked elements.

    Similar to GstPipeline in GStreamer.
    """

    def __init__(self, name: str):
        """
        Create a pipeline.

        Args:
            name: Pipeline name
        """
        self.name = name
        self.elements: List[GstElement] = []
        self.clock = GstClock()
        self.base_time = 0.0
        self.state = GstState.NULL

    def add(self, *elements: GstElement):
        """
        Add elements to the pipeline.

        Args:
            *elements: Elements to add
        """
        for element in elements:
            self.elements.append(element)
            element.pipeline = self

    def link(self, src: GstElement, dst: GstElement):
        """
        Link two elements together.

        Args:
            src: Source element
            dst: Destination element
        """
        src.link(dst)

    def set_state(self, state: GstState):
        """
        Set the state of all elements in the pipeline.

        Args:
            state: Target state
        """
        print(f"[{self.clock.get_time():.3f}s] Pipeline: Setting state to {state.name}")

        # Special handling for PLAYING state
        if state == GstState.PLAYING and self.state != GstState.PLAYING:
            # Start the clock
            if self.clock._start_time is None:
                self.clock.start()

            # Set base_time (corresponds to base_time in gstbasesink.c:2350)
            # In a real pipeline, this synchronizes multiple sinks
            self.base_time = self.clock.get_time()
            print(f"[{self.clock.get_time():.3f}s] Pipeline: base_time set to {self.base_time:.3f}s")

        self.state = state

        # Propagate state change to all elements
        for element in self.elements:
            element.set_state(state)

    def run(self, duration: float = 30.0):
        """
        Run the pipeline for a specified duration.

        Args:
            duration: How long to run in seconds (default: 30s)
        """
        print(f"[{self.clock.get_time():.3f}s] Pipeline: Running for {duration}s...")

        try:
            # Set to PLAYING if not already
            if self.state != GstState.PLAYING:
                self.set_state(GstState.PLAYING)

            # Run for specified duration
            start = time.monotonic()
            while time.monotonic() - start < duration:
                time.sleep(0.1)

        except KeyboardInterrupt:
            print(f"\n[{self.clock.get_time():.3f}s] Pipeline: Interrupted by user")

        finally:
            # Clean shutdown
            print(f"[{self.clock.get_time():.3f}s] Pipeline: Stopping...")
            self.set_state(GstState.NULL)

Our simplified pipeline provides two main functionalities. It allows us to add elements and to change the state of the pipeline which is then reflected to all its elements in order. The pipeline also hold the clock which is shared among all elements of that pipeline, this is essential to provide synchronous execution. When a pipeline starts the time is captured, and that becomes the base time of all processing.

Segment

The last core concept we will represent is the concept of a GstSegment. Understand segments is important to decipher the time synchronisation in GStreamer. Segments allows us to convert the PTS of a buffer to its processing time based on the rate we want to process data. No matter if a pipeline process live content or pre-recorded content we want the processing to happen based on time. The realtime rate makes sense when processing live content or playing back the content, but if we are processing pre-recorded information we may want to process all the data of a 2 hours movie in an instant. GstSegment is an event in GStreamer, but we won't be talking about events in gst_mini for now. We introduce the concept of segment here to keep our code minimally relatable to the GStreamer codebase.

class GstSegment:
    """
    Simplified GstSegment for stream time to running time conversion.

    Corresponds to gst_segment_to_running_time in gstsegment.c:822-867
    """

    def __init__(self, start: float = 0.0, stop: float = None, rate: float = 1.0, base: float = 0.0):
        """
        Create a segment.

        Args:
            start: Start time of the segment (in stream time)
            stop: Stop time of the segment (None = no limit)
            rate: Playback rate (1.0 = normal speed)
            base: Accumulated running time from previous segments
        """
        self.start = start
        self.stop = stop
        self.rate = rate
        self.base = base

    def to_running_time(self, position: float) -> float:
        """
        Convert stream time position to running time.

        Formula from gstsegment.c:822:
            running_time = (position - start) / rate + base

        Args:
            position: Position in stream time (e.g., buffer PTS)

        Returns:
            Running time, or -1 if position is outside segment boundaries
        """
        # Check if position is within segment boundaries
        if position < self.start:
            return -1.0

        if self.stop is not None and position > self.stop:
            return -1.0

        # Convert: (position - start) / rate + base
        running_time = (position - self.start) / self.rate + self.base

        return running_time

The following formula is how we transform the buffer time in the relative running time of the pipeline:

$$ running_time = \frac{position - start}{rate} + base $$

The $position$ (or PTS) of the buffer is relative to the $start$ of the pipeline. Then this difference needs to be adjusted by the $rate$ of playback (which may be 1 in live or realtime pipelines). The $base$ is the base time of the segment, which can be 0 or more. A pipeline might produce many segments, which have a monotonically increasing base time which is the end of the previous segment.

Why it is important to learn how to convert the buffer PTS to the running time? Because you will, at some point, find yourself in need to synchronise your element processing of a buffer to the global clock.

Creating Elements in gst_mini

We have a bare bones GStreamer mini implementation, but how to create elements? In this section we will create a few elements and connect them together to see processing happening using our gst_mini framework implementation.

The first element we will create is a "fake sink" element. This element does nothing, it just receive buffers and log information about those buffers.

from ..core.element import GstElement
from ..core.pad import GstFlowReturn
from ..core.buffer import GstBuffer, BufferFlags


class FakeSink(GstElement):
    def __init__(self, name, sync: bool = False):
        super().__init__(name)
        # properties
        self.sync = sync

        # state
        self.buffer_count = 0

        self.sink_pad = self.create_sink_pad("sink")
        self.sink_pad.set_chain_function(self._chain)

    def on_ready(self):
        self.buffer_count = 0

    def _chain(self, buffer: GstBuffer) -> GstFlowReturn:
        if buffer.has_flag(BufferFlags.EOS):
            self.log("Received EOS")
            return GstFlowReturn.EOS

        # Extract timestamp
        pts = buffer.pts

        # Convert stream time to running time (gst_segment_to_running_time)
        # Corresponds to gstbasesink.c:2207
        running_time = self.segment.to_running_time(pts)

        if running_time < 0:
            self.log("Buffer outside segment, dropping")
            return GstFlowReturn.OK

        # Synchronization (gst_base_sink_do_sync)
        if self.sync and self.state.name == 'PLAYING':
            # Calculate clock time: running_time + base_time
            # Corresponds to gstbasesink.c:2356
            clock_time = running_time + self.pipeline.base_time

            # Wait on clock (gst_base_sink_wait_clock at gstbasesink.c:2381)
            self.log(f"Waiting for clock_time={clock_time:.3f}s...")

            _, jitter = self.pipeline.clock.wait_until(clock_time)
            jitter_ms = jitter * 1000

            if jitter >= 0:
                self.log(f"Clock wait complete (jitter: {jitter_ms:+.1f}ms)")
            else:
                self.log(f"Buffer late (jitter: {jitter_ms:+.1f}ms)")

        self.buffer_count += 1
        self.log(f"Processed {repr(buffer)}")
        return GstFlowReturn.OK

We implement the basic functionality of a sink element, but without any other logic. The most interesting part of this code is that we implement the clock synchronisation similar to what the real GStreamer code does. We use the segment to calculate the running time and then use the pipeline clock to wait for that butter PTS in the pipeline clock before proceeding with processing. This is how sink elements, like displays can show the video frames at the exact time they need to be displayed to users.

We can now start creating a pipeline and using our new fakesink. This is already a good example of how the concepts and classes we introduced previously work together.

#!/usr/bin/env python3
"""
Simple example showing buffer synchronization.

Manually creates buffers and pushes them through a sink,
demonstrating the clock wait mechanism.

Pipeline:
    Manual buffers → FakeSink (with sync)
"""

from gst_mini import GstPipeline, GstState, GstBuffer, GstSegment, FakeSink


def main():
    print("=" * 80)
    print("Simple Synchronization Example")
    print("=" * 80)
    print()
    print("Manually pushing 5 buffers with increasing timestamps")
    print("Watch for clock waits demonstrating synchronization")
    print()
    print("=" * 80)
    print()

    # Create pipeline
    pipeline = GstPipeline("simple-pipeline")

    # Create sink
    sink = FakeSink('fakesink', sync=True)

    # Add to pipeline
    pipeline.add(sink)

    # Set up segment
    sink.segment = GstSegment(start=0.0, rate=1.0, base=0.0)

    # Set to PLAYING
    pipeline.set_state(GstState.PLAYING)

    print(f"[{pipeline.clock.get_time():.3f}s] Pushing buffers manually...\n")

    # Create and push buffers with increasing timestamps
    for i in range(5):
        pts = i * 2.0  # 2 seconds apart
        buffer = GstBuffer(
            pts=pts,
            duration=2.0,
            data={'segment_num': i, 'buffers': range(60), 'duration': 2.0}
        )

        print(f"[{pipeline.clock.get_time():.3f}s] Pushing buffer {i} with PTS={pts}s")
        sink.sink_pad._chain(buffer)
        print()

    # Cleanup
    pipeline.set_state(GstState.NULL)

    print()
    print("=" * 80)
    print("Notice:")
    print("  • Each buffer waited until its timestamp matched clock time")
    print("  • running_time = PTS (since segment.start=0, segment.base=0)")
    print("  • clock_time = running_time + base_time")
    print("  • Synchronization formula: wait until clock >= clock_time")
    print("=" * 80)


if __name__ == '__main__':
    main()

I highly encourage you to modify this code yourself. An especially interesting part I liked to adjust was the segment $rate$, which directly represents the rate of processing or playback of the buffers we are producing (where $rate = 1.0$ is normal speed, $rate > 1.0$ is faster, and $rate < 1.0$ is slower). We have a lot of logging and you will notice the changes in the time of processing. I find it super cool to play around.

Now we can implement a camera like element, producing buffers live.

"""LiveSource - Simulates live camera generating frames."""

import threading
import time
from ..core.element import GstElement, GstState
from ..core.buffer import GstBuffer, BufferFlags
from ..core.pad import GstFlowReturn


class LiveSource(GstElement):
    """
    Source element that generates frames at a fixed rate (like a camera).

    Demonstrates live sources that cannot be paused - they keep generating data.
    """

    def __init__(self, name: str, fps: int = 30):
        """
        Create a live source.

        Args:
            name: Element name
            fps: Frames per second to generate
        """
        super().__init__(name)
        self.fps = fps
        self.frame_interval = 1.0 / fps  # Time between frames
        self.frame_count = 0
        self.dropped_frames = 0

        self._thread = None
        self._running = False
        self._start_time = None

        # Create source pad
        self.src_pad = self.create_src_pad("src")

    def on_playing(self):
        """Start generating frames when entering PLAYING state."""
        self.log(f"Starting frame generation at {self.fps} fps")

        self._running = True
        self._start_time = time.monotonic()
        self._thread = threading.Thread(target=self._generate_frames, daemon=True)
        self._thread.start()

    def on_null(self):
        """Stop generating frames when entering NULL state."""
        self.log("Stopping frame generation")
        self.log(f"Generated {self.frame_count} frames, dropped {self.dropped_frames} frames")

        self._running = False
        if self._thread is not None:
            self._thread.join(timeout=1.0)

    def _generate_frames(self):
        """
        Frame generation loop (runs in separate thread).

        This simulates a live camera that captures frames at regular intervals.
        """
        next_frame_time = time.monotonic()

        while self._running:
            current_time = time.monotonic()

            # Check if it's time for the next frame
            if current_time >= next_frame_time:
                # Calculate PTS (presentation timestamp)
                # This is the time since we started, in seconds
                pts = current_time - self._start_time
                duration = self.frame_interval

                # Create buffer with frame data
                buffer = GstBuffer(
                    pts=pts,
                    duration=duration,
                    data={
                        'frame': self.frame_count,
                        'timestamp': pts,
                        'content': f'frame_{self.frame_count:06d}'
                    }
                )

                # Try to push buffer downstream
                # This call may block if downstream is slow or queue is full
                result = self.src_pad.push(buffer)

                if result == GstFlowReturn.OK:
                    if self.frame_count % self.fps == 0:  # Log every second
                        self.log(f"Generated frame {self.frame_count}, PTS={pts:.3f}s")
                    self.frame_count += 1
                else:
                    # Push failed (queue full, flushing, etc.)
                    self.dropped_frames += 1
                    if self.dropped_frames % 10 == 1:
                        self.log(f"Dropped frame {self.frame_count} (total dropped: {self.dropped_frames})")

                # Schedule next frame
                next_frame_time += self.frame_interval

            else:
                # Sleep until next frame time
                sleep_time = next_frame_time - current_time
                if sleep_time > 0:
                    time.sleep(min(sleep_time, 0.001))  # Sleep in small increments

        # Send EOS when stopping
        eos_buffer = GstBuffer(pts=0.0, duration=0.0, data={}, flags=BufferFlags.EOS)
        self.src_pad.push(eos_buffer)

    def get_stats(self) -> dict:
        """Get statistics about frame generation."""
        return {
            'frames_generated': self.frame_count,
            'frames_dropped': self.dropped_frames,
            'fps': self.fps,
        }

This is a slightly more complex element implementation, but all the complexity is just to simulate a device producing content as specific time. We still leverage all the gst_mini framework and publish buffers to pads just like in real GStreamer code.

Now we have everything necessary to create our first full gst_mini pipeline and play it out.

from gst_mini import GstPipeline, GstState, FakeSink, LiveSource


def main():
    print("=" * 80)
    print("Simple Synchronization Example")
    print("=" * 80)
    print()
    print("Using LiveSource to generate buffers at 1fps")
    print("Watch for clock waits demonstrating synchronization")
    print()
    print("=" * 80)
    print()

    # Create pipeline
    pipeline = GstPipeline("simple-pipeline")

    # Create elements
    source = LiveSource('videotestsrc', fps=1)
    sink = FakeSink('fakesink', sync=True)

    # Add to pipeline
    pipeline.add(source)
    pipeline.add(sink)

    # Link elements
    source.link(sink)

    # Set to PLAYING
    pipeline.set_state(GstState.PLAYING)

    # Runs for 5 seconds
    pipeline.run(5)

    # Cleanup
    pipeline.set_state(GstState.NULL)

    print()
    print("=" * 80)
    print(f"Completed example, total processed frames by sink: {sink.buffer_count}")
    print("=" * 80)


if __name__ == '__main__':
    main()

Look at the logs and you will notice all the processing was right on time. I have also created a few other elements we can play around.

The fun continues

We have also S3Sink, HLSSegmenter, VideoEnc, and Queue. Those extra elements can be combined to create more interesting pipelines for education. This is not dealing with real multimedia content, this is just for demostration. Still I tried to follow some of the GStreamer concepts with them and may use them in future blog posts. For now, here we can see a more complex example combining some of those extra elements:

from gst_mini import GstPipeline, GstState, LiveSource, Queue, HLSSegmenter, S3Sink


def main():
    print("=" * 80)
    print("Full HLS Pipeline Example")
    print("=" * 80)
    print()
    print("Pipeline: LiveSource → Queue → HLSSegmenter → S3Sink")
    print("- LiveSource generates 30 frames/sec")
    print("- Queue decouples threads (max 10 buffers, leaky upstream)")
    print("- HLSSegmenter creates 6-second segments")
    print("- S3Sink uploads with synchronization")
    print()
    print("Watch for:")
    print("  • Frame generation at steady rate (Thread A)")
    print("  • Queue filling/emptying")
    print("  • Segment creation every 6 seconds")
    print("  • Clock waits before upload (Thread B)")
    print("  • No blocking of frame generation!")
    print()
    print("=" * 80)
    print()

    # Create pipeline
    pipeline = GstPipeline("hls-pipeline")

    # Create elements
    source = LiveSource("camera", fps=30)
    queue = Queue("queue", max_size=10, leaky="upstream")
    segmenter = HLSSegmenter("segmenter", target_duration=6.0)
    sink = S3Sink("s3sink", bucket="live-streams", sync=True)

    # Add to pipeline
    pipeline.add(source, queue, segmenter, sink)

    # Link elements
    pipeline.link(source, queue)
    pipeline.link(queue, segmenter)
    pipeline.link(segmenter, sink)

    pipeline.set_state(GstState.PLAYING)

    # Run for 20 seconds
    pipeline.run(duration=20.0)

    pipeline.set_state(GstState.NULL)

    # Print statistics
    print()
    print("=" * 80)
    print("Statistics:")
    print("-" * 80)

    source_stats = source.get_stats()
    print(f"LiveSource: {source_stats['frames_generated']} frames generated, "
          f"{source_stats['frames_dropped']} dropped")

    queue_stats = queue.get_stats()
    print(f"Queue: {queue_stats['buffers_in']} in, {queue_stats['buffers_out']} out, "
          f"{queue_stats['buffers_dropped']} dropped")

    print(f"S3Sink: {sink.segment_count} segments uploaded")
    print("=" * 80)


if __name__ == '__main__':
    main()

I highly encourage you to study the logs and add more logs in critical parts like state changes, pad handling buffer processing. You will create a very useful mental model that will help you read real GStreamer element code and also help you create your own elements. I like to browse through the elements in the gst-plugins-rs repository a lot. I really enjoy Rust and find the Rust bindings to be really good and readable, also using Rust for writing elements will prevent some common mistakes as the Rust type system is leveraged greatly to guide you.

Can GstMini do multimedia? Why not?

While playing around with this project I decided to implement a small multimedia example. For that I implemented a TkSink for displaying frame bytes to a window canvas. I used Tcl/Tk GUI toolkit just because it was the lowest effort as it comes together with the Python installation (usually). Also a very simplified FileSrc element but that is essentially similar to the real filesrc element. You can look up the example here:

This example is almost a 1-1 usage of GStreamer, and you can even run this real GStreamer pipeline to replicate it:

gst-launch-1.0 filesrc location=$HOME/Downloads/simple.png ! pngdec ! videoconvert ! imagefreeze ! osxvideosink

Or using the actual GStreamer bindings for Python:

#!/usr/bin/env python3
import gi
import sys
import signal

gi.require_version('Gst', '1.0')
gi.require_version('Gtk', '3.0')
from gi.repository import Gst, GLib, Gtk


def main():
    Gst.init(None)
    Gtk.init(None)

    pipeline = Gst.Pipeline.new("test-pipeline")

    filesrc = Gst.ElementFactory.make("filesrc")
    pngdec = Gst.ElementFactory.make("pngdec")
    videoconvert = Gst.ElementFactory.make("videoconvert")
    imagefreeze = Gst.ElementFactory.make("imagefreeze")
    gtksink = Gst.ElementFactory.make("autovideosink")

    filesrc.set_property("location", "/media/simple.png")

    pipeline.add(filesrc)
    pipeline.add(pngdec)
    pipeline.add(videoconvert)
    pipeline.add(imagefreeze)
    pipeline.add(gtksink)

    filesrc.link(pngdec)
    pngdec.link(videoconvert)
    videoconvert.link(imagefreeze)
    imagefreeze.link(gtksink)

    # Create GLib MainLoop
    loop = GLib.MainLoop()

    # Handle bus messages
    bus = pipeline.get_bus()
    bus.add_signal_watch()

    def on_message(bus, message):
        t = message.type
        if t == Gst.MessageType.EOS:
            print("End of stream")
            loop.quit()
        elif t == Gst.MessageType.ERROR:
            err, debug = message.parse_error()
            print(f"ERROR: {err.message}")
            if debug:
                print(f"Debug info: {debug}")
            loop.quit()
        elif t == Gst.MessageType.STATE_CHANGED:
            if message.src == pipeline:
                old_state, new_state, pending_state = message.parse_state_changed()
                print(f"Pipeline state changed from {old_state.value_nick} to {new_state.value_nick}")
        return True

    bus.connect("message", on_message)

    # Handle SIGINT (Ctrl+C) gracefully
    def signal_handler(sig, frame):
        print("\nInterrupted by user")
        loop.quit()

    signal.signal(signal.SIGINT, signal_handler)

    # Set pipeline to PLAYING state
    ret = pipeline.set_state(Gst.State.PLAYING)
    if ret == Gst.StateChangeReturn.FAILURE:
        print("ERROR: Unable to set the pipeline to the playing state.")
        sys.exit(1)

    # Run the main loop
    try:
        print("Pipeline is playing - press Ctrl+C to stop")
        loop.run()
    except KeyboardInterrupt:
        print("\nInterrupted by user")
    finally:
        # Clean shutdown
        pipeline.set_state(Gst.State.NULL)
        print("Test completed.")


if __name__ == '__main__':
    main()

Wait! Is that GStreamer?

GStreamer is a project that has been built over 20 years by very experienced engineers dealing with multimedia processing. The simplification in gst_mini does not reflect in any way the amount of details and pitfals GStreamer prevents or takes care for you. A non-exhaustive list of features we did not cover in GstMini:

Many of the media services you use today may rely on GStreamer one way or another. Do not be fooled by the ugly GStreamer website! I hope this post was helpful to you and that it was inspiring for you to continue learning more about GStreamer.