Output RGB Bytes From Any Source

True Story Follows

So I’m on the grind, straight hustling on an interesting side project. I’m exploring the whacky world of microcontrollers and C blogs that have no CSS on them. And by the time I’ve put something together, I found out that it was dramatically easier than I thought it would be. But the work I did was just a small component of what could otherwise be a gateway to tons of hardware and IOT projects.

The Problem

You have a video file, you want to work with the raw RGB bytes to do something: maybe integrate with a microcontroller, maybe do some whacky IOT stuff…who knows. But here’s how you can easily get the raw RGB bytes from any video source and manipulate in real time:

The Code

from cStringIO import StringIO
import sys

# requires Pillow==2.7.0
from PIL import Image
# requires numpy
import numpy as np


BYTES_PER_PIXEL = 3


class PPMState(object):
    READ_P6 = 0
    READ_WIDTH_HEIGHT = 1
    READ_MAX_COLOR = 2
    READ_DATA = 3

    expected_length = sys.maxint


def get_np_array_from_frame(read_buffer, expected_length, (width, height)):
    read_buffer.seek(0)
    return np.array(
        Image.open(
            _image_data_bytes_to_ppm_file(
                read_buffer.read(expected_length),
                (width, height)
            )
        )
    )


def _image_data_bytes_to_ppm_file(image_data, (width, height)):
    ppm_image = StringIO()
    ppm_image.write(
        "P6\n{width} {height}\n255\n".format(
            width=width,
            height=height
        )
    )
    ppm_image.write(image_data)
    ppm_image.seek(0)
    return ppm_image


def generate_np_array_frames_from_std():
    current_state = PPMState.READ_P6
    read_buffer = StringIO()

    for line in sys.stdin:
        if current_state == PPMState.READ_WIDTH_HEIGHT and line.startswith("P6"):
            current_state = PPMState.READ_P6

        if current_state == PPMState.READ_P6:
            current_state = PPMState.READ_WIDTH_HEIGHT

        elif current_state == PPMState.READ_WIDTH_HEIGHT:
            width, height = [int(val) for val in line.split()]
            PPMState.expected_length = width * height * BYTES_PER_PIXEL
            current_state = PPMState.READ_MAX_COLOR

        elif current_state == PPMState.READ_MAX_COLOR:
            # unusued, but this could specify colors > 255
            int(line)  # max color
            current_state = PPMState.READ_DATA

        elif current_state == PPMState.READ_DATA:

            read_buffer.write(line)

            if read_buffer.tell() >= PPMState.expected_length:
                np_array = get_np_array_from_frame(read_buffer, PPMState.expected_length, (width, height))
                read_buffer = StringIO()
                current_state = PPMState.READ_WIDTH_HEIGHT
                yield np_array


def do_something_with_np_array(np_array):
    print np_array


if __name__ == "__main__":
    for np_array in generate_np_array_frames_from_std():
        do_something_with_np_array(np_array)

To Run

ffmpeg -i <my_video_file.avi> -c:v ppm -f rawvideo - < /dev/null | python file_i_just_created.py