Swoole 进程管理
根据官方文档[1],进程包括进程池和进程管理,及共享内存。
进程池对于进程间通讯有些区别。
进程管理器、进程池都用于管理多进程,进程池更注重结果,进程管理器更注重工程。
比如进程池创建是便设置大小,在回调中处理业务逻辑,进程管理器可增加Server\Process(进程)或者Server\Process\Pool(进程池)。
多进程利于大量并发任务处理。
进程池
Swoole\Process\Pool(进程池)比使用Swoole\Process创建多进程方便。
Pool类的构造中传入进程数量,即创建进程池时设置其大小,也可以设置通讯模式。
通讯模式:
- 消息队列。顺序投递消息。这种方式非常的不灵活,实际项目使用的并不多,官网没有详细的介绍。
- Scoket通讯。客户端和服务端不在同一个服务器上。需要监听、接收数据、写入数据流程进行通讯。
- UnixSocket。官方推荐,通讯更简单仅通过sendMessage()和OnMessage()回调
不使用任何进程间通信特性
$pool = new Swoole\Process\Pool(2, SWOOLE_IPC_NONE, 1, false);$pool->on('workerStart', function(Swoole\Process\Pool $pool, int $workerId){ $sleeptime = $workerId + 1;echo"sleeptime:" . $sleeptime . " workerId:" . $workerId . " workerStart" . PHP_EOL; sleep($sleeptime);});$pool->on("workerStop", function(Swoole\Process\Pool $pool, int $workerId){echo"# workerId:" . $workerId . " workerStop" . PHP_EOL;});//SWOOLE_IPC_NONE 不可使用 报错// $pool->on('Message', function (Swoole\Process\Pool $pool, string $data) {// var_dump($data);// });$pool->start();
例子中第四个参数为false,不使用协程。
执行结果
sleeptime:1 workerId:0 workerStartsleeptime:2 workerId:1 workerStart# workerId:0 workerStopsleeptime:1 workerId:0 workerStart# workerId:1 workerStop# workerId:0 workerStopsleeptime:2 workerId:1 workerStartsleeptime:1 workerId:0 workerStart# workerId:0 workerStopsleeptime:1 workerId:0 workerStart# workerId:1 workerStopsleeptime:2 workerId:1 workerStart# workerId:0 workerStop
两个worker进程启动时运行onWorkerStart(),结束时运行OnWorkerStop(),进程停止后会再调起。
默认通讯方式不支持OnMessage()。
socket通讯
//服务端$host = "127.0.0.1";$port = "9501";$pool = new Swoole\Process\Pool(2, SWOOLE_IPC_SOCKET, 1, false);$pool->on('workerStart', function(Swoole\Process\Pool $pool, int $workerId){ $sleeptime = $workerId + 1; $msg = "sleeptime:" . $sleeptime . " workerId:" . $workerId . " workerStart";echo $msg . PHP_EOL; sleep($sleeptime);});$pool->on("workerStop", function(Swoole\Process\Pool $pool, int $workerId){echo"# workerId:" . $workerId . " workerStop" . PHP_EOL;});$pool->on('Message', function(Swoole\Process\Pool $pool, string $data){echo"Message: {$data}" . PHP_EOL; $pool->write("this "); $pool->write("is "); $pool->write("test!");});$pool->listen($host, $port);$pool->start();
//客户端$fp = stream_socket_client("tcp://127.0.0.1:9501", $errno, $errstr) ordie("error: $errstr\n");$msg = json_encode(['data' => 'hello', 'uid' => 1991]);fwrite($fp, pack('N', strlen($msg)) . $msg);sleep(1);//将显示 hello world\n$data = fread($fp, 8192);var_dump(substr($data, 4, unpack('N', substr($data, 0, 4))[1]));fclose($fp);
通过stream_socket_client()从php调起socket客户端,webSocket不能使用tcp协议。
服务端中OnMessage()对于SWOOLE_IPC_SOCKET通信方式可用,但使用协程时可能不能使用。
必须监听地址和端口。
执行结果
//服务端sleeptime:1 workerId:0 workerStartsleeptime:2 workerId:1 workerStartMessage: {"data":"hello","uid":1991}
//客户端string(13) "this is test!"
UnixSocket
$pool = new Swoole\Process\Pool(2, SWOOLE_IPC_UNIXSOCK, 1, false);$pool->on('workerStart', function(Swoole\Process\Pool $pool, int $workerId){ $sleeptime = $workerId + 1; $msg = "sleeptime:" . $sleeptime . " workerId:" . $workerId . " workerStart";if (0 === $workerId) { $pool->sendMessage($msg, 1); }if (1 == $workerId) { $pool->sendMessage($msg, 0); }echo $msg . PHP_EOL;});$pool->on("workerStop", function(Swoole\Process\Pool $pool, int $workerId){echo"# workerId:" . $workerId . " workerStop" . PHP_EOL;});$pool->on('Message', function(Swoole\Process\Pool $pool, string $data){echo"Message: {$data}" . PHP_EOL;});$pool->start();
运行结果
sleeptime:1 workerId:0 workerStartsleeptime:2 workerId:1 workerStartsleeptime:1 workerId:0 workerStart seleep endMessage: sleeptime:2 workerId:1 workerStartsleeptime:2 workerId:1 workerStart seleep endMessage: sleeptime:1 workerId:0 workerStart
服务端中OnMessage()对于SWOOLE_IPC_UNIXSOCK通信方式可用。
总结
默认不设置通讯方式,SWOOLE_IPC_SOCKET通信方式必须设置监听用于服务端和客户端中通讯,SWOOLE_IPC_UNIXSOCK用于同一个进程池中进程通讯。
进程管理
Porcess\Manager(进程管理),可以通过add()和addBatch()批量加进程,通过setIPCType()设置进程通讯方式,通过setMsgQueueKey()设置消息队列key。
$atomic = new Swoole\Atomic();$pm = new Swoole\Process\Manager();for ($i = 0; $i < 2; $i++) { $pm->add(function(Swoole\Process\Pool $pool, int $workerId)use($atomic){ $items = $atomic->get();echo"pm1 " . date("H:i:s") . " atomic:" . $items . " workerId:" . $workerId . " start" . PHP_EOL; $atomic->add();if (3 <= $items) { sleep($items); $pool->shutdown();echo"pm1 " . date("H:i:s") . " atomic:" . $items . " workerId:" . $workerId . " shutdown" . PHP_EOL; } });}var_dump("master " . date("H:i:s") . " atomic:" . $atomic->get());$pm->start();
运行结果
string(24) "master 07:16:57 atomic:0"pm1 07:16:57 atomic:0 workerId:0 startpm1 07:16:57 atomic:1 workerId:1 startpm1 07:16:57 atomic:2 workerId:0 startpm1 07:16:57 atomic:3 workerId:1 startpm1 07:16:57 atomic:4 workerId:0 startpm1 07:17:00 atomic:3 workerId:1 shutdown
进程管理创建的进程是异步执行,当运行shutdown()时会等待程序执行完后关闭进程。
样例中两个进程持续运行三次,在进程1运行第三次时关闭,进程1执行shutdown()关闭后进程0也被关闭。
[1] 进程池: https://wiki.swoole.com/zh-cn/#/process/process_pool