PHP Fibers: The Event Loop

PHP's new feature Fibers allows for easier creation of asynchronous applications, and a key component of designing such an application is the event loop. The event loop is responsible for monitoring external resources for interesting events such as new input data or state changes. Once an event is detected, the event loop will resume the fibers that are interested in that event so they can process it. There are many ways to implement such a loop, ranging from a trivial sleep and resume to a complex, multifeatured event monitoring system. In this article, we'll look at a few examples and build up on the complexity as we go.

Starting out simple

The simplest form of an event loop is one that doesn't monitor events at all and just waits a fixed amount of time between iterations. In my previous article, PHP Fibers: A Practical Example, I used this type of loop for managing the video creation fibers. Here is an excerpt of the relevant loop components, please refer to the article for the complete example code.

while (count($fiberList) && count($completedFibers) < $completionCount){
    usleep(1000);
    foreach ($fiberList as $idx => $fiber){
        if ($fiber->isSuspended()){
            $fiber->resume();
        } else if ($fiber->isTerminated()){
            $completedFibers[] = $fiber;
            unset($fiberList[$idx]);
        }
    }
}

This event loop does not monitor any sort of external progression, it simply sleeps for 1 millisecond, then resumes all the fibers. The individual fibers are then responsible for determining if any progress has been made and if not, suspending again. This method of looping is easy to write and understand, but not as efficient. We could improve it by moving the progression test from the fiber to the loop.

Stepping things up

One of the benefits of PHP's Fiber implementation is the ability to pass a value back to the previous context when calling Fiber::suspend. Using this feature, we can provide the event loop with the data it will need to determine when the fiber is ready to be resumed. We can take advantage of this in our example to move the "is the process running?" check from the individual fibers to the event loop by passing back the process resource.

First, we update the createVideoClip function to remove the loop that checks the status of the process and replace it with a single call to Fiber::suspend which will pass the process resource and return the status when the process exits.

$status = Fiber::suspend($proc);

Now that the fiber is returning a value, we need a way to track that value and the fiber it is associated with. Currently, the fibers are stored in a simple array list. By using the SplObjectStorage class, we can instead create a map of the fiber objects to the process resources. We'll update the fiber creation code to store the fibers and the process resource into a new SplObjectStorage class.

$fiberList = new SplObjectStorage;
foreach (new DirectoryIterator('.') as $item){
    $fiber = new Fiber(createVideoClip(...));
    $fiberList[$fiber] = $fiber->start($item->getPathname(), getTempDestination());
    //...
}

When the fiber is started, the createVideoClip function is run up until the Fiber::suspend call. The value provided to Fiber::suspend (our process resource) is returned from the start method. That value is then associated with the fiber via the SplObjectStorage class. Next, we need to update our loop to accept this new fiber list and check the process status before resuming our fibers.

/**
 * @param SplObjectStorage $fiberList
 * @param int|null $completionCount
 *
 * @return Fiber[]
 */
function waitForFibers(SplObjectStorage $fiberList, ?int $completionCount = null) : array{
    $completedFibers = [];
    $completionCount ??= count($fiberList);
    while (count($fiberList) && count($completedFibers) < $completionCount){
        foreach ($fiberList as $fiber){
            $process = $fiberList[$fiber];
            $status = proc_get_status($process);
            if (!$status['running']){
                $fiber->resume($status);
                $completedFibers[] = $fiber;
                unset($fiberList[$fiber]);
            }
        }

        if (!$completedFibers){
            usleep(1000);
        }
    }

    return $completedFibers;
}

Now the event loop pulls the process resource associated with the fiber and checks if the process is running. If a process has terminated we resume the fiber passing the process status array back to the fiber. The fiber receives the status array as the return value of the suspend call and can use it to determine the exit code. If none of the active processes have terminated, we wait for 1 millisecond before checking again.

Creating a framework

The event loop above is tailored very specifically to that situation, it only monitors for a process exiting. This is all we need for the video clip creation, but other applications will need to watch for other events, such as stream data or a simple delay. This means we need to create a more generic event loop framework that can monitor these (and maybe more) different types of events.

As we've seen, monitoring for a process exit is simple, call proc_get_status and check the running flag. Monitoring for a fixed delay is also simple, determine the shortest delay of all the active fibers and usleep for that amount of time. Monitoring for stream data is a little more complicated as there are two types of stream events: stream is readable, and stream is writable. For these events, we use stream_select to monitor the stream, but we need to know which array parameter to add the stream to.

Adding the ability to monitor for different event types involves introducing a few new classes that will be responsible for setting up and monitoring for the different events. Below we'll take a look at some of these classes and talk briefly about their role in the framework. Finally, we'll look at a few example applications based on this framework.

The Fiber event loop class

The first class we need to create is one that will implement our fiber management loop. Since this class will need access to all the fibers we create in order to manage them, the best approach is to let it handle the creation and start up of fibers. We'll add a startFiber method that accepts the fiber's callback and any arguments the callback needs to run. The new fiber gets created, started, and added to the list of fibers to manage. Any time we need to create a new fiber, we'll use this method to do so rather than calling new Fiber directly.

class FiberEventLoop {
    private SplObjectStorage $fiberTriggerMap;
    private TriggerMonitor $monitor;

    public function __construct(){
        $this->fiberTriggerMap = new SplObjectStorage();
        $this->monitor = new TriggerMonitor();
    }

    public function startFiber(callable $code, ...$args) : void{
        $this->runFiber(new Fiber($code), $args);
    }

    public function loop() : void{
        do {
            $this->monitor->reset();
            $this->prepareTriggers();
            $this->monitor->monitor();
            $this->resumeTriggered();
        } while ($this->fiberTriggerMap->count());
    }

    private function resumeTriggered() : void{
        foreach ($this->fiberTriggerMap as $fiber){
            /** @var Trigger $trigger */
            $trigger = $this->fiberTriggerMap[$fiber];
            if ($trigger->isTriggered($this->monitor)){
                $this->runFiber($fiber, [$trigger->getResumeData()]);
            }
        }
    }

    private function prepareTriggers() : void{
        foreach ($this->fiberTriggerMap as $fiber){
            /** @var Trigger $trigger */
            $trigger = $this->fiberTriggerMap[$fiber];
            $trigger->prepare($this->monitor);
        }
    }

    private function runFiber(Fiber $fiber, array $args = []) : void{
        if ($fiber->isTerminated()){
            unset($this->fiberTriggerMap[$fiber]);
        } else {
            if ($fiber->isStarted()){
                $resume = $fiber->resume(...$args);
            } else {
                $resume = $fiber->start(...$args);
            }
            if ($fiber->isTerminated()){
                unset($this->fiberTriggerMap[$fiber]);
            } else {
                $this->fiberTriggerMap[$fiber] = $resume ?? new NextTick();
            }
        }
    }
}

The trigger monitor class

While the loop class handles starting/resuming fibers, the TriggerMonitor class handles determining if a fiber should resume by collecting all the trigger information and waiting for the appropriate events. For this generic event loop implementation, we are concerned only with fixed delays, process termination, and stream events. We'll create a class with methods that allow setting those resources to monitor for events and querying for event results.

class TriggerMonitor {
    private const STREAM_READABLE = 1;
    private const STREAM_WRITABLE = 2;
    private array $streamList = [];
    private array $processList = [];
    private array $readableStreams = [];
    private array $writableStreams = [];
    private ?int $delay = null;

    public function addProcessExited($process) : void{
        $this->processList[] = [$process, null];
    }

    public function getProcessStatus($process) : array{
        foreach ($this->processList as [$handle, $status]){
            if ($handle === $process){
                return $status;
            }
        }
        throw new \RuntimeException('Process handle not found');
    }

    public function addStreamReadable($stream) : void{
        $this->streamList[] = [$stream, self::STREAM_READABLE];
    }

    public function isReadable($stream) : bool{
        return in_array($stream, $this->readableStreams, true);
    }

    public function addStreamWritable($stream) : void{
        $this->streamList[] = [$stream, self::STREAM_WRITABLE];
    }

    public function isWritable($stream) : bool{
        return in_array($stream, $this->writableStreams, true);
    }

    public function addDelay(int $milliseconds) : void{
        if ($this->delay === null || $milliseconds < $this->delay){
            $this->delay = max($milliseconds, 0);
        }
    }

    public function monitor() : void{
        if ($this->checkForProcessTerminated()){
            return;
        }
        if ($this->checkForStreamEvent()){
            return;
        }

        usleep(max($this->delay, 1) * 1000);
    }

    private function checkForStreamEvent() : bool{
        $this->readableStreams = $this->writableStreams = [];
        foreach ($this->streamList as [$stream, $mode]){
            if (is_resource($stream) && get_resource_type($stream) === 'stream'){
                if ($mode & self::STREAM_READABLE){
                    $this->readableStreams[] = $stream;
                }
                if ($mode & self::STREAM_WRITABLE){
                    $this->writableStreams[] = $stream;
                }
            }
        }

        if (!$this->readableStreams && !$this->writableStreams){
            return false;
        }

        $timeoutSeconds = $timeoutMicroseconds = null;
        if ($this->delay !== null){
            $timeoutSeconds = floor($this->delay / 1000);
            $timeoutMicroseconds = ($this->delay % 1000) * 1000;
        }

        stream_select($this->readableStreams, $this->writableStreams, $e, $timeoutSeconds, $timeoutMicroseconds);

        return true;
    }

    public function reset() : void{
        $this->streamList = $this->readableStreams = $this->writableStreams = $this->processList = [];
        $this->delay = null;
    }

    private function checkForProcessTerminated() : bool{
        $hasExit = false;
        foreach ($this->processList as [$process, &$status]){
            $status = proc_get_status($process);
            $hasExit = $hasExit || !$status['running'];
        }

        return $hasExit;
    }
}

It is not possible to monitor all event types at once, so we need to prioritize the monitoring a bit. First, we check if any processes have terminated. This is a simple test that completes immediately and won't delay the script any.

If there are no processes to check, or none have exited we move on to checking for stream data events. We setup the necessary socket arrays then call stream_select to wait for an event. If part of the event loop is a fixed delay, we need to set the timeout on stream_select to that delay, otherwise we can wait indefinitely.

Finally, if there are no streams or processes to be watched, we wait for a fixed delay, with a minimum delay of 1 millisecond.

The trigger interface

In order for the event loop to know when to resume a fiber, we need to pass back an object that can provide that information. We'll call these objects Triggers and we'll have different types for each possible event. Each trigger is given an instance of the monitor class to which it registers the appropriate resources. After all the resources have been added, the monitor class will check the resources for progress and wait if necessary. When progress has been detected, each trigger is again called with an instance of the monitor class which can be queried for the progress information. The trigger then uses that information to determine if the fiber should be resumed and optionally provide data to send back to the fiber.

interface Trigger {
    public function prepare(TriggerMonitor $monitor) : void;

    public function isTriggered(TriggerMonitor $monitor) : bool;

    public function getResumeData() : mixed;
}

The delay trigger

Here is an example of a trigger that waits for a fixed delay. The amount of time that the script waited for is returned back to the fiber when it is resumed.

class Delay implements Trigger {
    private readonly int $start;

    public function __construct(public readonly int $milliseconds){
        $this->start = $this->nowInMilliseconds();
    }

    public function prepare(TriggerMonitor $monitor) : void{
        $elapsed = $this->getElapsed();
        $monitor->addDelay($this->milliseconds - $elapsed);
    }

    public function isTriggered(TriggerMonitor $monitor) : bool{
        $elapsed = $this->getElapsed();

        return $this->milliseconds - $elapsed <= 0;
    }

    public function getResumeData() : int{
        return $this->getElapsed();
    }

    private function nowInMilliseconds() : int{
        return floor(microtime(true) * 1000);
    }

    private function getElapsed() : int{
        return $this->nowInMilliseconds() - $this->start;
    }
}

There are many trigger classes, each handling a specific trigger type. For brevity, only the delay trigger source is shown here, the rest can be viewed using the links.

Putting it all together

Now that we have a basic event loop framework, how do we use it to create an application? Let's look at a few example applications to see.

A simple countdown

This application is very simple. It uses the fixed delay trigger to run through a few countdown sequences, displaying the countdown as it goes.

$loop = new FiberEventLoop();
$loop->startFiber(countdown(...), $seconds);
$loop->startFiber(countdown(...), 10, 2000);
$loop->startFiber(countdown(...), 60, 500);
$loop->loop();

function countdown(int $seconds, int $delay = 1000) : void{
    do {
        echo (new DateTimeImmutable())->format('H:i:s');
        echo ': T-', $seconds, PHP_EOL;
        Fiber::suspend(new Delay($delay));
    } while (--$seconds > 0);
    echo 'Blast off!', PHP_EOL;
}

A simple server

Stepping things up a notch, we have a simple echo server implementation. This listens on a socket for connections and when a client connects, any data sent by the client is repeated back to them.

$port = 9007;
$loop = new FiberEventLoop();
$loop->startFiber(serverLoop(...), $port, $loop);
$loop->loop();

function serverLoop(int $port, FiberEventLoop $loop) : void{
    $server = stream_socket_server('tcp://0.0.0.0:' . $port);
    stream_set_blocking($server, false);

    Fiber::suspend(new StreamReadable($server));
    while (is_resource($server) && !feof($server)){
        $client = stream_socket_accept($server, 0);
        if ($client){
            $loop->startFiber(clientLoop(...), $client, $server);
            Fiber::suspend(new StreamReadable($server));
        }
    }
}

function clientLoop(mixed $client, mixed $server) : void{
    stream_set_blocking($client, false);
    while (is_resource($client) && !feof($client)){
        $data = fread($client, 8192);
        if ($data === ''){
            Fiber::suspend(new StreamReadable($client));
        } else {
            $isBye = trim($data) === 'bye';
            $isShutdown = trim($data) === 'shutdown';
            while ($data){
                $written = fwrite($client, $data);
                $data = substr($data, $written);
                if ($data){
                    Fiber::suspend(new StreamWritable($client));
                }
            }
            if ($isBye || $isShutdown){
                fclose($client);
                $client = null;
                if ($isShutdown){
                    fclose($server);
                }
            }
        }
    }
}

The initial fiber handles the server socket and accepting client connections. When a client has connected, a new fiber is started that handles the communication with that client.

In conclusion

Hopefully this helps to demonstrate how an asynchronous application works and how PHP's new fiber feature can be used to write these types of applications. In an asynchronous application, the code execution needs to jump around to different areas based on when external resources become available. Fibers provide a simple and clean way to accomplish this by allowing to you jump around in your code with simple suspend and resume calls rather than having to setup a complex chain of callback functions as one would traditionally do.

Support free Wi-Fi

openwireless.org

Support free maps

openstreetmap.org