PHP Fibers: A practical example

PHP 8.1 introduced Fibers as a new feature of the language. I often see many people refer to fibers as threads, which is not accurate and I think gives some people the idea that PHP is somehow now multi-threaded. It seems to me that maybe people have a misunderstanding of what fibers are (and possibly what asynchronous coding is) and I'm hoping that I can help rectify the situation by providing information about what fibers are, and a practical example of how one might use them.

So what are fibers?

While fibers are not threads, they can help make your code more efficient at doing multiple things more quickly, which might on the surface seem like threading (hence the confusion I think). What fibers do is allow your code to spend less time waiting around for external resources by allowing to you initiate multiple requests in parallel, then wait all of them to complete (in whatever order).

To take advantage of fibers, your situation must meet a few conditions:

  1. You must be dealing with something external to PHP.

    Your resource must be external to PHP so that it can be processed in parallel to your PHP code. PHP is still single-threaded so while a particular fiber is executing, nothing else in your script will be executing. This means the actual work needs to be offloaded to a separate process. Common scenarios that meet this requirement are network requests or sub-processes.

  2. You need to be able to request the external resource in a way that does not block your script execution.

    A complex database query for example does not meet this requirement as there is no way to continue your script while the query is running and gather the results later (using the standard php extensions).

How about an example

For this example, we'll look a script that iterates over a directory of video files and creates a 30-second clip for each file using FFMpeg. First, we'll look at how typical synchronous code would accomplish the task, then slowly migrate that code to an asynchronous format using fibers.

The synchronous way

The code here is pretty simple, iterate over the directory and for each file exec the ffmpeg process to run the conversion.

$start = microtime(true);
foreach (new DirectoryIterator('.') as $item){
    if ($item->getExtension() === 'mkv'){
        $source = $item->getPathname();
        $destination = getTempDestination();
        $cmd = sprintf('%s -threads 1 -i %s -t 30 -crf 26 -c:v h264 -c:a ac3 %s', $ffmpeg, $source, $destination);
        exec($cmd, $output, $ret);
        if ($ret !== 0){
            throw new \RuntimeException('Failed to create clip.');
        }

        echo 'Successfully created clip from ' . $source . ' => ' . $destination . PHP_EOL;
    }
}
$end = microtime(true);
echo 'Directory processed in ' . round($end - $start, 1) . ' seconds' . PHP_EOL;

Running this on my folder of 19 Scrubs episodes, I can get all the videos processed in 243.1 seconds. It takes a while because I only end up using about 50% of my overall CPU power (thanks to -threads 1, added for demonstration purposes).

I could certainly speed this process up if I can run two or three instances in parallel. Getting the code to do that will involve first changing to a non-blocking way of running ffmpeg and then implementing Fiber using this new non-blocking exec as the fiber's runnable function.

Creating a non-blocking exec

In order to use fibers properly we need to start with non-blocking exec code. That is code which will start our ffmpeg process but keep running while the ffmpeg process does its work rather than wait for the result. We can accomplish this by replacing the simple exec function call with proc_open. This function and it's friends allow the process to start while our code keeps running. Our code then will sit in a loop polling the status of the other process to see if it's completed or not and only return when ffmpeg has completed its task.

$start = microtime(true);
foreach (new DirectoryIterator('.') as $item){
    if ($item->getExtension() === 'mkv'){
        $source = $item->getPathname();
        $destination = getTempDestination();
        createVideoClip($ffmpeg, $source, $destination);

        echo 'Successfully created clip from ' . $source . ' => ' . $destination . PHP_EOL;
    }
}
$end = microtime(true);
echo 'Directory processed in ' . round($end - $start, 1) . ' seconds' . PHP_EOL;

function createVideoClip(string $ffmpeg, string $source, string $destination) : array{
    $cmd = sprintf('%s -threads 1 -i %s -t 30 -crf 26 -c:v h264 -c:a ac3 %s', $ffmpeg, $source, $destination);

    $stdout = fopen('php://temporary', 'w+');
    $stderr = fopen('php://temporary', 'w+');
    $streams = [
        0 => ['pipe', 'r']
        , 1 => $stdout
        , 2 => $stderr
    ];

    $proc = proc_open($cmd, $streams, $pipes);
    if (!$proc){
        throw new \RuntimeException('Unable to launch download process');
    }

    do {
        usleep(1000); //Wait 1ms before checking
        $status = proc_get_status($proc);
    } while ($status['running']);

    proc_close($proc);
    fclose($stdout);
    fclose($stderr);
    $success = $status['exitcode'] === 0;
    if ($success){
        return [$source, $destination];
    } else {
        throw new \RuntimeException('Unable to perform conversion');
    }
}

From the perspective of the directory iteration code, this function is blocking just like exec was. The difference is that we are now doing the blocking ourselves using the polling loop, which we can change later when implementing fibers.

Introducing Fiber

Now that we have our non-blocking clip creation function, we can modify our directory iteration to create a new fiber for each file that we want to convert, using this function as the callable for the fiber.

$fiberList = [];
$start = microtime(true);

foreach (new DirectoryIterator('.') as $item){
    if ($item->getExtension() === 'mkv'){
        $fiber = new Fiber(createVideoClip(...));
        $fiber->start($ffmpeg, $item->getPathname(), getTempDestination());
        [$source, $destination] = $fiber->getReturn();
        echo 'Successfully created clip from ' . $source . ' => ' . $destination . PHP_EOL;
    }
}

So now we are using Fibers right? The code should be faster, right? Nope, not yet. Sure, we are "using" Fiber to run our video clip creation function, but this code is still 100% synchronous and will only process one video at a time, just as slow as the old exec code.

Making it asynchronous

In order to truly take advantage of the fibers, we need to make the code asynchronous and introduce Fiber::suspend in our createVideoClip function. This is the secret sauce that lets PHP continue and get the ball rolling on creating clips for the other files while ffmpeg processes the current file. In our function, the proper place to put this suspend call is in the polling loop, replacing the existing usleep call.

    do {
        Fiber::suspend();
        $status = proc_get_status($proc);
    } while ($status['running']);

Now that the fiber will suspend, we also need to resume it at some point to actually get the result of the video clip creation, otherwise it'll simply never finish. To do this, we need to gather all the fibers we create while iterating over the directory

$fiberList=[];
foreach (new DirectoryIterator('.') as $item){
    if ($item->getExtension() === 'mkv'){
        $fiber = new Fiber(createVideoClip(...));
        $fiber->start($ffmpeg, $item->getPathname(), getTempDestination());
        $fiberList[] = $fiber;
    }
}

then loop over the fibers, resuming each one in turn until they all terminate and we can get the result.

while ($fiberList){
    foreach ($fiberList as $idx => $fiber){
        if ($fiber->isTerminated()){
            [$source, $destination] = $fiber->getReturn();
            echo 'Successfully created clip from ' . $source . ' => ' . $destination . PHP_EOL;
            unset($fiberList[$idx]);
        } else {
            $fiber->resume();
        }
    }
}

Now our code is finally asynchronous and all the video clip creation jobs will run in parallel. If you monitor your system processes after running the script now, you'll find a separate ffmpeg process gets spawned for each video file immediately and they all start working at once. For my Scrubs folder, that's 19 ffmpeg processes all running at once, and the total execution time has been reduced to 173 seconds.

That's too much parallelism!

Having 19 concurrent ffmpeg processes converting video is a bit much for my simple 8-core processor. Sure it's faster, but now we are wasting time by trying to run too much. The CPU is going to be spending a lot of time context switching between the processes rather than creating our video clips. Things would probably be even better if we could only run a few conversions at once instead of all of them.

Adding a concurrency limit is possible, but before that we need to generalize the code a bit to make it more understandable and reusable. The loop we have that processes the fibers until they terminate needs to be generalized into a small function we can call which will wait for the fibers to complete.

/**
 * @param Fiber[] $fiberList
 * @param int|null $completionCount
 *
 * @return Fiber[]
 */
function waitForFibers(array &$fiberList, ?int $completionCount = null) : array{
    $completedFibers = [];
    $completionCount ??= count($fiberList);
    while (count($fiberList) && count($completedFibers) < $completionCount){
        usleep(1000);
        foreach ($fiberList as $idx => $fiber){
            if ($fiber->isSuspended()){
                $fiber->resume();
            } else if ($fiber->isTerminated()){
                $completedFibers[] = $fiber;
                unset($fiberList[$idx]);
            }
        }
    }

    return $completedFibers;
}

Here is a simple function that takes a list of fibers by reference, and optionally a count of how many fibers to wait for. Any of the fibers which have terminated will be removed from the list and returned from the function for processing by the calling code. By default, the function will wait for all fibers in the list to complete, however adding an optional count will allow it to return after at least that many fibers have completed (could be more than requested).

With that wait function created, we can update our code to add a concurrency limit. This way we can spend less time context switching and more time on clip creation, hopefully speeding things up even further.

$concurrency = 3;
$fiberList = [];
foreach (new DirectoryIterator('.') as $item){
    if ($item->getExtension() === 'mkv'){
        $fiber = new Fiber(createVideoClip(...));
        $fiber->start($ffmpeg, $item->getPathname(), getTempDestination());
        $fiberList[] = $fiber;
        if (count($fiberList) >= $concurrency){
            foreach (waitForFibers($fiberList, 1) as $fiber){
                [$source, $destination] = $fiber->getReturn();
                echo 'Successfully created clip from ' . $source . ' => ' . $destination . PHP_EOL;
            }
        }
    }
}

foreach (waitForFibers($fiberList) as $fiber){
    [$source, $destination] = $fiber->getReturn();
    echo 'Successfully created clip from ' . $source . ' => ' . $destination . PHP_EOL;
}

Great, now with the concurrency limit of 3, my CPU isn't so bogged down with context switches and is able run through all 19 episodes of Scrubs in just 143.7 seconds. Much better than the original 243.1 seconds it took doing this the synchronous wey.

The complete example

<?php

$ffmpeg = getenv('FFMPEG_BIN') ?: 'ffmpeg';
$concurrency = $argv[1] ?? 3;
$fiberList = [];
$start = microtime(true);

foreach (new DirectoryIterator('.') as $item){
    if ($item->getExtension() === 'mkv'){
        $fiber = new Fiber(createVideoClip(...));
        $fiber->start($ffmpeg, $item->getPathname(), getTempDestination());
        $fiberList[] = $fiber;
        if (count($fiberList) >= $concurrency){
            foreach (waitForFibers($fiberList, 1) as $fiber){
                [$source, $destination] = $fiber->getReturn();
                echo 'Successfully created clip from ' . $source . ' => ' . $destination . PHP_EOL;
            }
        }
    }
}

foreach (waitForFibers($fiberList) as $fiber){
    [$source, $destination] = $fiber->getReturn();
    echo 'Successfully created clip from ' . $source . ' => ' . $destination . PHP_EOL;
}

$end = microtime(true);
echo 'Directory processed in ' . round($end - $start, 1) . ' seconds' . PHP_EOL;

/**
 * @param Fiber[] $fiberList
 * @param int|null $completionCount
 *
 * @return Fiber[]
 */
function waitForFibers(array &$fiberList, ?int $completionCount = null) : array{
    $completedFibers = [];
    $completionCount ??= count($fiberList);
    while (count($fiberList) && count($completedFibers) < $completionCount){
        usleep(1000);
        foreach ($fiberList as $idx => $fiber){
            if ($fiber->isSuspended()){
                $fiber->resume();
            } else if ($fiber->isTerminated()){
                $completedFibers[] = $fiber;
                unset($fiberList[$idx]);
            }
        }
    }

    return $completedFibers;
}

function getTempDestination() : string{
    $destination = tempnam(sys_get_temp_dir(), 'video');
    unlink($destination);
    $dir = dirname($destination);
    $file = basename($destination, '.tmp');

    return $dir . DIRECTORY_SEPARATOR . $file . '.mp4';
}

function createVideoClip(string $ffmpeg, string $source, string $destination) : array{
    $cmd = sprintf('%s -threads 1 -i %s -t 30 -crf 26 -c:v h264 -c:a ac3 %s', $ffmpeg, $source, $destination);

    $stdout = fopen('php://temporary', 'w+');
    $stderr = fopen('php://temporary', 'w+');
    $streams = [
        0 => ['pipe', 'r']
        , 1 => $stdout
        , 2 => $stderr
    ];

    $proc = proc_open($cmd, $streams, $pipes);
    if (!$proc){
        throw new \RuntimeException('Unable to launch download process');
    }

    do {
        Fiber::suspend();
        $status = proc_get_status($proc);
    } while ($status['running']);

    proc_close($proc);
    fclose($stdout);
    fclose($stderr);
    $success = $status['exitcode'] === 0;
    if ($success){
        return [$source, $destination];
    } else {
        throw new \RuntimeException('Unable to perform conversion');
    }
}

Support free Wi-Fi

openwireless.org

Support free maps

openstreetmap.org