Swoole协程
简介
官方文档
https://wiki.swoole.com/zh-cn/#/coroutine
从4.0版本开始提供完整的协程和通道的特性,开发者不需要使用yield,且提供完善的客户端。
协程容器和主进程代码是同步堵塞,协程容器内异步不堵塞。
使用Coroutine\WaitGroup或Coroutine\Barrier可等待协程容器内部都结束之后,继续操作。
协程间的消息队列channel,使用push和pop处理消息,无法跨进程。
协程容器可以通过Coroutine::create或者go()创建。
Swoole 协程调度器 协程1 ───┐ |→ 遇到I/O操作 → 让出CPU → 调度器选择 协程2 ───┘ I/O 完成 → 唤醒协程 → 继续执行
一个TCP链接仅允许一个协程读、一个协程写,多个协程进行读或写操作会报错[1]
创建
所有的协程必须在协程容器里面创建。
部分协程化的类可使用短命名,php.ini 设置 swoole.use_shortname=On/Off来开启/关闭短名,默认为开启。
协程HTTP服务端
use Swoole\Coroutine;Co\run(function () {$host = "0.0.0.0";$port = "80"; //不使用ssl$server = new Co\Http\Server($host, $port, false); //注册路由及回调$server->handle('/', function ($request, $response) {$response->end("<h1>Index</h1>"); });$server->handle('/stop', function ($request, $response) use ($server) {$response->end("<h1>Stop</h1>");$server->shutdown(); });$server->start();});
TCP和WebSocket客户端参考官网。
文件异步操作
Co\run(function () {$file = "../data/test.txt"; go(function () use ($file) {echo"睡眠 0.2 秒" . PHP_EOL; Co\System::sleep(0.2);$r = Co\System::writeFile($file, "\n this is a test\n", FILE_APPEND); var_dump($r); }); go(function () use ($file) { var_dump(Co\System::statvfs($file));$r = Co\System::readFile($file); var_dump($r); });});
运行结果
睡眠 0.2 秒array(11) { ["bsize"]=> int(4096) ["frsize"]=> int(4096) ["blocks"]=> int(159028479) ["bfree"]=> int(12279418) ["bavail"]=> int(12279418) ["files"]=> int(999) ["ffree"]=> int(1000000) ["favail"]=> int(1000000) ["fsid"]=> int(1) ["flag"]=> int(1024) ["namemax"]=> int(255)}string(19) "11111this is a test"int(17)
通讯
协程之间使用Coroutine\Channel实现通讯,Coroutine\WaitGroup基于Channel实现增加、完整、等待完成。
Coroutine\Barrier协程屏障是Swoole Library提供的便携协程并发管理工具,基于PHP计数和 Coroutine API 实现,比WaitGroup更简单。
使用WaitGroup
Co\run(function () {$wg = new WaitGroup();$channel = new Channel(1);$wg->add(); go(function () use ($channel, $wg) {$cid = Co::getCid();$str = "cid:" . $cid;$msg = "this is test"; var_dump($str . " sleep 3"); Co\System::sleep(3); var_dump($str . " push msg:" . $msg);$channel->push(['msg' => $msg]);$wg->done(); });$wg->add(); go(function () use ($channel, $wg) {$cid = Co::getCid();$str = "cid:" . $cid; //设置超时时间if ($msg = $channel->pop(2)) { var_dump($str . " " . $msg); } else { var_dump($str . " pop msg timeout"); } var_dump($str . " sleep 5"); Co\System::sleep(5);$wg->done(); });$wg->wait(); var_dump("all Coroutine done");});
运行结果
string(13) "cid:2 sleep 3"string(21) "cid:3 pop msg timeout"string(13) "cid:3 sleep 5"string(27) "cid:2 push msg:this is test"string(18) "all Coroutine done"
使用Barrier,与上述执行代码效果相同。
Co\run(function () {$barrier = Barrier::make();$channel = new Channel(1); go(function () use ($channel, $barrier) {$cid = Co::getCid();$str = "cid:" . $cid;$msg = "this is test"; var_dump($str . " sleep 3"); Co\System::sleep(3); var_dump($str . " push msg:" . $msg);$channel->push(['msg' => $msg]); }); go(function () use ($channel, $barrier) {$cid = Co::getCid();$str = "cid:" . $cid; //设置超时时间if ($msg = $channel->pop(2)) { var_dump($str . " " . $msg); } else { var_dump($str . " pop msg timeout"); } var_dump($str . " sleep 5"); Co\System::sleep(5); }); Barrier::wait($barrier); var_dump("all Coroutine done");});
全局变量
禁止使用类静态变量 Class::$array、全局对象属性 $object->array、其他超全局变量 $GLOBALS 等。
多协程使用全局变量,在每个协程中都会对其修改,所以使用全局变量在协程中可能会得到错误数据。
对上下文对象中的对象进行设置和获取,实现全局变量,也可以使用Swoole\Table配合行锁。
Swoole\Table是一个基于共享内存和锁实现的超高性能,并发数据结构。用于解决多进程/多线程数据共享和同步加锁问题。
Table的内存容量不受PHP的memory_limit限制,一定要使用文档中提供的 API 来进行操作。
锁分类:
- 非协程锁
Swoole\Lock多线程使用Swoole\Thread\Lock,协程中不可使用。 - 协程锁
Swoole\Coroutine\Lock,加锁和解锁必须在同一个协程中进行,否则会导致静态条件被破坏。
原子计数器Swoole\Atomic32位无符号整型,64位使用Swoole\Atomic\Long,多线程使用Swoole\Thread\Atomic和Swoole\Thread\Atomic\Long。
Co\run(function () {$atomic = new Swoole\Atomic();$lock = new Swoole\Coroutine\Lock();$wg = new WaitGroup();$table = new Swoole\Table(1024);$table->column('id', Swoole\Table::TYPE_INT);$table->column('name', Swoole\Table::TYPE_STRING, 64);$table->create();$count = 2;for ($i = 0; $i < $count; $i++) {$wg->add(); go(function () use ($i, $wg, $atomic, $lock, $table) { //协程锁$lock->lock(); //计数器$atomic->add();$id = $atomic->get(); //table$item = ['id' => $id,'name' => "name_" . $id, ];$table->set($i, $item); //协程锁$lock->unlock();$wg->done(); }); }for ($i = 0; $i < $count; $i++) {$s = $count + 1 - $i;$wg->add(); go(function () use ($i, $s, $table, $wg) { var_dump("sleep time:" . $s . " get " . $i); Co\System::sleep($s);$info = $table->get($i); var_dump($info);$wg->done(); }); }$wg->wait(); var_dump("all Coroutine done");});
运行结果
string(18) "sleep time:3 get 0"string(18) "sleep time:2 get 1"array(2) { ["id"]=> int(2) ["name"]=> string(6) "name_2"}array(2) { ["id"]=> int(1) ["name"]=> string(6) "name_1"}string(18) "all Coroutine done"
[1] client-has-already-been-bound-to-another-coroutine: https://wiki.swoole.com/zh-cn/#/question/use?id=client-has-already-been-bound-to-another-coroutine