namespace React\Tests\EventLoop;
abstract class AbstractLoopTest extends TestCase
* @var \React\EventLoop\LoopInterface
protected $loop;
private $tickTimeout;
public function setUp()
// It's a timeout, don't set it too low. Travis and other CI systems are slow.
$this->tickTimeout = 0.02;
$this->loop = $this->createLoop();
abstract public function createLoop();
public function createSocketPair()
$sockets = stream_socket_pair($domain, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
foreach ($sockets as $socket) {
if (function_exists('stream_set_read_buffer')) {
stream_set_read_buffer($socket, 0);
return $sockets;
public function testAddReadStream()
list ($input, $output) = $this->createSocketPair();
$this->loop->addReadStream($input, $this->expectCallableExactly(2));
fwrite($output, "foo\n");
fwrite($output, "bar\n");
public function testAddReadStreamIgnoresSecondCallable()
list ($input, $output) = $this->createSocketPair();
$this->loop->addReadStream($input, $this->expectCallableExactly(2));
$this->loop->addReadStream($input, $this->expectCallableNever());
fwrite($output, "foo\n");
fwrite($output, "bar\n");
public function testAddReadStreamReceivesDataFromStreamReference()
$this->received = '';
$this->assertEquals('', $this->received);
$this->assertRunFasterThan($this->tickTimeout * 2);
$this->assertEquals('[hello]X', $this->received);
* Helper for above test. This happens in another helper method to verify
* the loop keeps track of assigned stream resources (refcount).
private function subAddReadStreamReceivesDataFromStreamReference()
list ($input, $output) = $this->createSocketPair();
fwrite($input, 'hello');
$loop = $this->loop;
$received =& $this->received;
$loop->addReadStream($output, function ($output) use ($loop, &$received) {
$chunk = fread($output, 1024);
if ($chunk === '') {
$received .= 'X';
} else {
$received .= '[' . $chunk . ']';
public function testAddWriteStream()
list ($input) = $this->createSocketPair();
$this->loop->addWriteStream($input, $this->expectCallableExactly(2));
public function testAddWriteStreamIgnoresSecondCallable()
list ($input) = $this->createSocketPair();
$this->loop->addWriteStream($input, $this->expectCallableExactly(2));
$this->loop->addWriteStream($input, $this->expectCallableNever());
public function testRemoveReadStreamInstantly()
list ($input, $output) = $this->createSocketPair();
$this->loop->addReadStream($input, $this->expectCallableNever());
fwrite($output, "bar\n");
public function testRemoveReadStreamAfterReading()
list ($input, $output) = $this->createSocketPair();
$this->loop->addReadStream($input, $this->expectCallableOnce());
fwrite($output, "foo\n");
fwrite($output, "bar\n");
public function testRemoveWriteStreamInstantly()
list ($input) = $this->createSocketPair();
$this->loop->addWriteStream($input, $this->expectCallableNever());
public function testRemoveWriteStreamAfterWriting()
list ($input) = $this->createSocketPair();
$this->loop->addWriteStream($input, $this->expectCallableOnce());
public function testRemoveStreamForReadOnly()
list ($input, $output) = $this->createSocketPair();
$this->loop->addReadStream($input, $this->expectCallableNever());
$this->loop->addWriteStream($output, $this->expectCallableOnce());
fwrite($output, "foo\n");
public function testRemoveStreamForWriteOnly()
list ($input, $output) = $this->createSocketPair();
fwrite($output, "foo\n");
$this->loop->addReadStream($input, $this->expectCallableOnce());
$this->loop->addWriteStream($output, $this->expectCallableNever());
public function testRemoveReadAndWriteStreamFromLoopOnceResourceClosesEndsLoop()
list($stream, $other) = $this->createSocketPair();
stream_set_blocking($stream, false);
stream_set_blocking($other, false);
// dummy writable handler
$this->loop->addWriteStream($stream, function () { });
// remove stream when the stream is readable (closes)
$loop = $this->loop;
$loop->addReadStream($stream, function ($stream) use ($loop) {
// close other side
public function testRemoveReadAndWriteStreamFromLoopOnceResourceClosesOnEndOfFileEndsLoop()
list($stream, $other) = $this->createSocketPair();
stream_set_blocking($stream, false);
stream_set_blocking($other, false);
// dummy writable handler
$this->loop->addWriteStream($stream, function () { });
// remove stream when the stream is readable (closes)
$loop = $this->loop;
$loop->addReadStream($stream, function ($stream) use ($loop) {
$data = fread($stream, 1024);
if ($data !== '') {
// send data and close stream
fwrite($other, str_repeat('.', static::PHP_DEFAULT_CHUNK_SIZE));
$this->loop->addTimer(0.01, function () use ($other) {
public function testRemoveReadAndWriteStreamFromLoopWithClosingResourceEndsLoop()
// get only one part of the pair to ensure the other side will close immediately
list($stream) = $this->createSocketPair();
stream_set_blocking($stream, false);
// dummy writable handler
$this->loop->addWriteStream($stream, function () { });
// remove stream when the stream is readable (closes)
$loop = $this->loop;
$loop->addReadStream($stream, function ($stream) use ($loop) {
public function testRemoveInvalid()
list ($stream) = $this->createSocketPair();
// remove a valid stream from the event loop that was never added in the first place
/** @test */
public function emptyRunShouldSimplyReturn()
/** @test */
public function runShouldReturnWhenNoMoreFds()
list ($input, $output) = $this->createSocketPair();
$loop = $this->loop;
$this->loop->addReadStream($input, function ($stream) use ($loop) {
fwrite($output, "foo\n");
$this->assertRunFasterThan($this->tickTimeout * 2);
/** @test */
public function stopShouldStopRunningLoop()
list ($input, $output) = $this->createSocketPair();
$loop = $this->loop;
$this->loop->addReadStream($input, function ($stream) use ($loop) {
fwrite($output, "foo\n");
$this->assertRunFasterThan($this->tickTimeout * 2);
public function testStopShouldPreventRunFromBlocking()
$that = $this;
function () use ($that) {
$that->fail('Timer was executed.');
$loop = $this->loop;
function () use ($loop) {
$this->assertRunFasterThan($this->tickTimeout * 2);
public function testIgnoreRemovedCallback()
// two independent streams, both should be readable right away
list ($input1, $output1) = $this->createSocketPair();
list ($input2, $output2) = $this->createSocketPair();
$called = false;
$loop = $this->loop;
$loop->addReadStream($input1, function ($stream) use (& $called, $loop, $input2) {
// stream1 is readable, remove stream2 as well => this will invalidate its callback
$called = true;
// this callback would have to be called as well, but the first stream already removed us
$that = $this;
$loop->addReadStream($input2, function () use (& $called, $that) {
if ($called) {
$that->fail('Callback 2 must not be called after callback 1 was called');
fwrite($output1, "foo\n");
fwrite($output2, "foo\n");
public function testFutureTickEventGeneratedByFutureTick()
$loop = $this->loop;
function () use ($loop) {
function () {
echo 'future-tick' . PHP_EOL;
$this->expectOutputString('future-tick' . PHP_EOL);
public function testFutureTick()
$called = false;
$callback = function () use (&$called) {
$called = true;
public function testFutureTickFiresBeforeIO()
list ($stream) = $this->createSocketPair();
function () {
echo 'stream' . PHP_EOL;
function () {
echo 'future-tick' . PHP_EOL;
$this->expectOutputString('future-tick' . PHP_EOL . 'stream' . PHP_EOL);
public function testRecursiveFutureTick()
list ($stream) = $this->createSocketPair();
$loop = $this->loop;
function () use ($stream, $loop) {
echo 'stream' . PHP_EOL;
function () use ($loop) {
echo 'future-tick-1' . PHP_EOL;
function () {
echo 'future-tick-2' . PHP_EOL;
$this->expectOutputString('future-tick-1' . PHP_EOL . 'stream' . PHP_EOL . 'future-tick-2' . PHP_EOL);
public function testRunWaitsForFutureTickEvents()
list ($stream) = $this->createSocketPair();
$loop = $this->loop;
function () use ($stream, $loop) {
function () {
echo 'future-tick' . PHP_EOL;
$this->expectOutputString('future-tick' . PHP_EOL);
public function testFutureTickEventGeneratedByTimer()
$loop = $this->loop;
function () use ($loop) {
function () {
echo 'future-tick' . PHP_EOL;
$this->expectOutputString('future-tick' . PHP_EOL);
public function testRemoveSignalNotRegisteredIsNoOp()
$this->loop->removeSignal(2, function () { });
* @requires extension pcntl
public function testSignal()
if (!function_exists('posix_kill') || !function_exists('posix_getpid')) {
$this->markTestSkipped('Signal test skipped because functions "posix_kill" and "posix_getpid" are missing.');
$called = false;
$calledShouldNot = true;
$timer = $this->loop->addPeriodicTimer(1, function () {});
$this->loop->addSignal(SIGUSR2, $func2 = function () use (&$calledShouldNot) {
$calledShouldNot = false;
$loop = $this->loop;
$this->loop->addSignal(SIGUSR1, $func1 = function () use (&$func1, &$func2, &$called, $timer, $loop) {
$called = true;
$loop->removeSignal(SIGUSR1, $func1);
$loop->removeSignal(SIGUSR2, $func2);
$this->loop->futureTick(function () {
posix_kill(posix_getpid(), SIGUSR1);
* @requires extension pcntl
public function testSignalMultipleUsagesForTheSameListener()
$funcCallCount = 0;
$func = function () use (&$funcCallCount) {
$this->loop->addTimer(1, function () {});
$this->loop->addSignal(SIGUSR1, $func);
$this->loop->addSignal(SIGUSR1, $func);
$this->loop->addTimer(0.4, function () {
posix_kill(posix_getpid(), SIGUSR1);
$loop = $this->loop;
$this->loop->addTimer(0.9, function () use (&$func, $loop) {
$loop->removeSignal(SIGUSR1, $func);
$this->assertSame(1, $funcCallCount);
* @requires extension pcntl
public function testSignalsKeepTheLoopRunning()
$loop = $this->loop;
$function = function () {};
$this->loop->addSignal(SIGUSR1, $function);
$this->loop->addTimer(1.5, function () use ($function, $loop) {
$loop->removeSignal(SIGUSR1, $function);
* @requires extension pcntl
public function testSignalsKeepTheLoopRunningAndRemovingItStopsTheLoop()
$loop = $this->loop;
$function = function () {};
$this->loop->addSignal(SIGUSR1, $function);
$this->loop->addTimer(1.5, function () use ($function, $loop) {
$loop->removeSignal(SIGUSR1, $function);
public function testTimerIntervalCanBeFarInFuture()
$loop = $this->loop;
// start a timer very far in the future
$timer = $this->loop->addTimer(PHP_INT_MAX, function () { });
$this->loop->futureTick(function () use ($timer, $loop) {
private function assertRunSlowerThan($minInterval)
$start = microtime(true);
$end = microtime(true);
$interval = $end - $start;
$this->assertLessThan($interval, $minInterval);
private function assertRunFasterThan($maxInterval)
$start = microtime(true);
$end = microtime(true);
$interval = $end - $start;
$this->assertLessThan($maxInterval, $interval);