
大家好,我是专注 PHP 实战干货的博主。
做业务开发时,「接口阻塞」是高频痛点:用户注册后发送短信/邮件要等3-5秒、导出大Excel要卡页面、生成报表要等加载… 这些场景用同步执行体验极差,但搭建 Redis/MQ 队列又太复杂(配置多、学习成本高)。
今天给大家带来 PHP7+ 异步任务极简版:无第三方依赖、无需队列、一行调用、安全加固,原生 PHP 就能跑,彻底解决接口阻塞问题,小项目/快速迭代直接上线,生产级可用。
温馨提示:本文代码量有点长~但都是干货
一、核心痛点(为什么要做异步)
<?php// 同步执行:用户注册后发送短信,接口要等短信接口响应完才返回functionregister($mobile) { // 1. 写入数据库 saveUser($mobile); // 2. 发送短信(阻塞3-5秒) sendSms($mobile); // 3. 返回结果(用户要等3-5秒) return ['code' => 200, 'msg' => '注册成功'];}
用户体验差、接口超时、服务器资源被占用… 而异步执行能让「发送短信」在后台跑,接口瞬间返回,核心就是:主线程快速响应,子进程处理耗时任务。
二、核心原理(极简科普)
- 基于 PHP exec 函数开启独立子进程执行耗时任务,主线程不等待、直接返回;
- 安全封装
- 无依赖不用 Redis、不用 MQ、不用框架,原生 PHP 就能跑;
- 轻量高效
关键:exec 执行时加 > /dev/null 2>&1 & 让子进程后台运行,彻底脱离主线程。三、异步任务完整封装类
单文件 AsyncTask.php,几十行核心代码,包含「异步执行、日志记录、失败重试、安全过滤」,复制就能用:<?php/** * PHP 异步任务封装类(PHP 7.4+) * * 特性: * - 无 Redis / MQ 依赖,原生 PHP 实现 * - 并发超限时自动入队,不丢任务 * - 队列由独立 Worker 消费,支持手动/自动触发 * - 大参数走临时文件,规避 ARG_MAX 限制 * - 递归参数过滤,防命令注入 * - exec + PID 双重确认子进程启动 * - 日志文件自动轮转(50MB 归档) * - 重试指数退避(由 handler 执行) */classAsyncTask{ // ----------------------------------------------------------------------- // 可配置项(通过 setter 修改,不要直接改这里) // ----------------------------------------------------------------------- /** 日志根目录(建议放 web 不可访问的路径) */ private static string $logDir = ''; /** 队列文件路径(JSONL 格式,每行一个任务) */ private static string $queueFile = ''; /** 任务白名单 */ private static array $taskWhitelist = [ 'sendSms', 'sendEmail', 'generateExcel', 'backupDb', ]; /** 同一任务名允许同时运行的最大子进程数,超限自动入队 */ private static int $maxConcurrent = 10; /** 单次 processQueue() 最多消费的条目数(防止一次跑太久) */ private static int $queueBatchSize = 20; /** 日志文件轮转阈值(字节),默认 50MB */ private static int $logRotateSize = 52428800; /** boot 标志 */ private static bool $booted = false; // ----------------------------------------------------------------------- // 公开配置接口 // ----------------------------------------------------------------------- public static functionsetWhitelist(array$tasks): void { self::$taskWhitelist = array_values( array_unique(array_merge(self::$taskWhitelist, $tasks)) ); } public static functionsetLogDir(string$dir): void { self::$logDir = rtrim($dir, '/\\'); } public static functionsetQueueFile(string$path): void { self::$queueFile = $path; } public static functionsetMaxConcurrent(int$max): void { self::$maxConcurrent = max(1, $max); } public static functionsetQueueBatchSize(int$size): void { self::$queueBatchSize = max(1, $size); } public static functionsetLogRotateSizeMB(int$mb): void { self::$logRotateSize = $mb * 1024 * 1024; } // ----------------------------------------------------------------------- // 核心入口 // ----------------------------------------------------------------------- /** * 提交异步任务 * * 流程: * 白名单校验 → 并发检测 → [未超限] 直接启动子进程 * → [超 限] 写入文件队列,等待 processQueue() 消费 * * @param string $taskName 任务名(必须在白名单内) * @param array $params 任务参数(任意深度嵌套均可) * @param int $retry 最多执行次数(含首次) * * @return bool true = 已启动 或 已入队;false = 被拒绝(白名单不通过) */ public static functionrun(string$taskName, array$params = [], int$retry = 3): bool { self::boot(); // 1. 白名单校验 if(!in_array($taskName, self::$taskWhitelist, true)) { self::writeLog( self::masterLogFile(), 'ERROR', "非白名单任务被拒绝", ['task' => $taskName] ); return false; } // 2. 递归过滤参数 $params = self::filterParams($params); // 3. 并发检测(仅 Unix) $overLimit = false; $running = 0; if(self::isUnix()) { exec('pgrep -c -f ' . escapeshellarg($taskName), $cntOut); $running = (int)trim($cntOut[0] ?? '0'); $overLimit = $running >= self::$maxConcurrent; } if($overLimit) { // 并发已满 → 入队,不丢任务 $queued = self::enqueue($taskName, $params, $retry); self::writeLog( self::masterLogFile(), $queued ? 'INFO': 'ERROR', $queued ? "并发已满,任务已入队等待消费" : "并发已满,且入队失败!任务已丢失!", ['task' => $taskName, 'running' => $running, 'limit' => self::$maxConcurrent] ); return $queued; } // 4. 并发未满 → 直接启动子进程 return self::dispatch($taskName, $params, $retry); } /** * 消费队列(将积压的任务取出并依次启动子进程) * * 调用时机(三选一,按场景选择): * A. 低流量:在每次 run() 之后顺带调用 * AsyncTask::run('sendSms', $p); * AsyncTask::processQueue(); * * B. 推荐:crontab 每分钟执行一次消费脚本 * * * * * * php /path/to/queue_worker.php >> /path/async_logs/worker.log 2>&1 * * C. 高流量:Supervisor 持续运行消费脚本 * * @return int 本次实际消费(成功启动)的任务数 */ public static functionprocessQueue(): int { self::boot(); $queueFile = self::$queueFile; if(!file_exists($queueFile) || filesize($queueFile) === 0) { return 0; } // 独占锁:读取全部内容 + 截断(原子操作,防并发重复消费) $fp = fopen($queueFile, 'c+'); if(!$fp || !flock($fp, LOCK_EX)) { self::writeLog(self::masterLogFile(), 'WARN', "队列文件加锁失败,跳过本次消费"); return 0; } $content = stream_get_contents($fp); $lines = array_values(array_filter(array_map('trim', explode("\n", $content)))); if(empty($lines)) { flock($fp, LOCK_UN); fclose($fp); return 0; } // 切分:本批处理 + 剩余回写 $batch = array_splice($lines, 0, self::$queueBatchSize); $remaining = $lines; ftruncate($fp, 0); rewind($fp); if(!empty($remaining)) { fwrite($fp, implode("\n", $remaining) . "\n"); } flock($fp, LOCK_UN); fclose($fp); // 逐条启动子进程 $dispatched = 0; foreach($batch as $line) { $item = json_decode($line, true); if( !is_array($item) || empty($item['task']) || !isset($item['params'], $item['retry']) ) { self::writeLog( self::masterLogFile(), 'WARN', "队列条目格式错误,已跳过", ['raw' => mb_substr($line, 0, 200)] // 避免超长日志 ); continue; } $ok = self::dispatch($item['task'], $item['params'], (int)$item['retry']); self::writeLog( self::masterLogFile(), $ok ? 'INFO': 'ERROR', $ok ? "队列任务消费成功": "队列任务消费失败", ['task' => $item['task'], 'queued_at' => $item['queued_at'] ?? '-'] ); if($ok) { $dispatched++; } } self::writeLog( self::masterLogFile(), 'INFO', "队列消费批次结束", ['dispatched' => $dispatched, 'remaining' => count($remaining)] ); return $dispatched; } /** * 查询当前队列积压数量(不加锁,仅供监控参考) */ public static functionqueueSize(): int { self::boot(); if(!file_exists(self::$queueFile)) { return 0; } $content = file_get_contents(self::$queueFile); return count(array_filter(array_map('trim', explode("\n", $content)))); } // ----------------------------------------------------------------------- // 内部:入队 // ----------------------------------------------------------------------- private static functionenqueue(string$taskName, array$params, int$retry): bool { $item = json_encode([ 'task' => $taskName, 'params' => $params, 'retry' => $retry, 'queued_at' => date('Y-m-d H:i:s'), ], JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR); $written = file_put_contents(self::$queueFile, $item . "\n", FILE_APPEND | LOCK_EX); return $written !== false; } // ----------------------------------------------------------------------- // 内部:启动子进程 // ----------------------------------------------------------------------- private static functiondispatch(string$taskName, array$params, int$retry): bool { // 参数写临时文件(规避 ARG_MAX ~2MB 限制) $tmpFile = self::writeTmpParams($params); if($tmpFile === null) { self::writeLog( self::masterLogFile(), 'ERROR', "临时参数文件写入失败", ['task' => $taskName] ); return false; } $phpBin = escapeshellcmd(PHP_BINARY); $handler = escapeshellarg(self::handlerPath()); $tName = escapeshellarg($taskName); $tFile = escapeshellarg($tmpFile); $tRetry = (int)$retry; $logFile = self::taskLogFile($taskName); if(self::isUnix()) { $cmd = self::buildUnixCmd($phpBin, $handler, $tName, $tFile, $tRetry, $logFile); } else { $cmd = "start /B {$phpBin}{$handler}{$tName}{$tFile}{$tRetry} >> {$logFile} 2>&1"; } exec($cmd); // PID 确认(Unix 专属) $launched = true; if(self::isUnix()) { usleep(500000); exec('pgrep -n -f ' . escapeshellarg($taskName), $pidOut); $launched = !empty(trim($pidOut[0] ?? '')); } if(!$launched) { @unlink($tmpFile); self::writeLog( self::masterLogFile(), 'ERROR', "子进程未检测到,启动失败", ['task' => $taskName, 'cmd' => $cmd] ); return false; } self::writeLog( self::masterLogFile(), 'INFO', "子进程已确认启动", ['task' => $taskName, 'tmp' => $tmpFile] ); return true; } // ----------------------------------------------------------------------- // 内部辅助 // ----------------------------------------------------------------------- private static functionboot(): void { if(self::$booted) { return; } if(self::$logDir === '') { self::$logDir = __DIR__ . '/async_logs'; } if(self::$queueFile === '') { self::$queueFile = self::$logDir . '/task_queue.jsonl'; } if(!is_dir(self::$logDir)) { mkdir(self::$logDir, 0755, true); } self::$booted = true; } private static functionfilterParams(array$params): array { array_walk_recursive($params, static function (&$value): void { if(is_string($value)) { $value = preg_replace('/[&;|`$><\'"\\n\\r]/', '', $value); } }); return $params; } private static functionwriteTmpParams(array$params): ?string { $tmpFile = sys_get_temp_dir() . DIRECTORY_SEPARATOR . 'async_' . bin2hex(random_bytes(8)) . '_' . time() . '.json'; $json = json_encode($params, JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR); $written = file_put_contents($tmpFile, $json, LOCK_EX); return $written !== false ? $tmpFile : null; } private static functionwriteLog( string $logFile, string $level, string $msg, array $data = [] ): void { if(file_exists($logFile) && filesize($logFile) >= self::$logRotateSize) { rename($logFile, $logFile . '.' . date('His') . '.bak'); } $line = sprintf( "[%s] [%-5s] %s%s\n", date('Y-m-d H:i:s'), strtoupper($level), $msg, empty($data) ? '' : ' ' . json_encode($data, JSON_UNESCAPED_UNICODE) ); file_put_contents($logFile, $line, FILE_APPEND | LOCK_EX); } private static functionmasterLogFile(): string { return self::$logDir . '/task_master_' . date('Ymd') . '.log'; } private static functiontaskLogFile(string$taskName): string { $safe = preg_replace('/[^a-zA-Z0-9_\-]/', '', $taskName); return self::$logDir . "/{$safe}_" . date('Ymd') . '.log'; } private static functionhandlerPath(): string { return __DIR__ . '/async_task_handler.php'; } private static functionisUnix(): bool { return DIRECTORY_SEPARATOR === '/'; } /** * 构建 Unix 下彻底脱离父进程的启动命令(避免僵尸进程) * * 僵尸进程产生的根本原因: * 父进程 fork 出子进程,子进程退出后内核保留退出状态(变为僵尸), * 直到父进程调用 wait() 读取状态后才真正销毁。 * PHP-FPM/Apache 作为父进程,并不会主动 wait() 它 exec() 出来的孙进程。 * * 解决思路(三层保险): * * 1. setsid:创建新的 session + 进程组,彻底脱离 PHP-FPM 的进程组。 * 即使 FPM reload/重启,SIGHUP 也不会传播到子进程。 * * 2. 双重 fork(sh -c '...' &): * sh 本身是第一层 fork,& 让 sh 再 fork 一个孙进程后立刻退出。 * 孙进程成为孤儿,由 init/systemd(PID 1)收养。 * init 始终调用 wait(),孤儿退出后立即被回收,永远不会变成僵尸。 * * 3. nohup:防止子进程因 SIGHUP 被意外终止(双重保险,和 setsid 互补)。 * * 最终进程树(执行后): * PID 1 (init/systemd) * └── php async_task_handler.php ← 孤儿进程,由 init 收养并回收 * * PHP-FPM Worker 进程树中不再有任何残留子进程。 */ private static functionbuildUnixCmd( string $phpBin, string $handler, string $tName, string $tFile, int $tRetry, string $logFile ): string { // 检测 setsid 是否可用(绝大多数 Linux 发行版均有) $hasSetsid = !empty(trim(shell_exec('command -v setsid 2>/dev/null') ?? '')); $innerCmd = "{$phpBin}{$handler}{$tName}{$tFile}{$tRetry} >> {$logFile} 2>&1"; if($hasSetsid) { // 最优方案:setsid 创建新 session + nohup 防 SIGHUP + & 后台运行 // setsid 本身就是 double-fork 的封装,子进程完全独立于父进程组/session return "setsid nohup {$innerCmd} &"; } // 降级方案:手动双重 fork(适用于没有 setsid 的极少数环境) // 外层 sh -c '...' & :第一层 fork,sh 进入后台 // 内层 nohup ... & :sh 内部再 fork 一次,真正的子进程成为孤儿 // sh 随即退出,孤儿被 init 收养 return "sh -c 'nohup {$innerCmd} &' &"; }}
配套任务处理文件 async_task_handler.php
(和 AsyncTask.php 同目录,子进程执行的核心文件)<?php/** * 异步任务处理脚本(子进程执行) * * 安全限制:只允许 CLI 模式调用,禁止 web 直接访问。 * 由 AsyncTask::dispatch() 通过 exec() 调用,请勿手动执行。 * * 用法:php async_task_handler.php <taskName> <tmpParamsFile> <maxAttempts> */// -----------------------------------------------------------------------// 0. 安全门:只允许 CLI 执行// -----------------------------------------------------------------------PHP_SAPI === 'cli' or exit('Access denied.' . PHP_EOL);// -----------------------------------------------------------------------// 1. 运行时设置// -----------------------------------------------------------------------set_time_limit(0);ini_set('memory_limit', '256M');// -----------------------------------------------------------------------// 2. 解析命令行参数// -----------------------------------------------------------------------$argv = $GLOBALS['argv'] ?? [];if(count($argv) < 4) { fwrite(STDERR, "[ERROR] 参数不足。用法: php async_task_handler.php <taskName> <tmpParamsFile> <maxAttempts>\n"); exit(1);}$taskName = (string)$argv[1];$tmpFile = (string)$argv[2];$maxAttempts = max(1, (int)$argv[3]);// -----------------------------------------------------------------------// 3. 读取 & 立即删除临时参数文件// -----------------------------------------------------------------------if(!file_exists($tmpFile) || !is_readable($tmpFile)) { fwrite(STDERR, "[ERROR] 临时参数文件不存在或不可读: {$tmpFile}\n"); exit(1);}$rawJson = file_get_contents($tmpFile);@unlink($tmpFile); // 读完立即删除,无论后续是否报错$params = json_decode($rawJson, true);unset($rawJson);if(json_last_error() !== JSON_ERROR_NONE) { fwrite(STDERR, "[ERROR] 参数 JSON 解析失败: " . json_last_error_msg() . "\n"); exit(1);}// -----------------------------------------------------------------------// 4. 日志工具(带文件大小轮转,50MB 归档)// -----------------------------------------------------------------------$logDir = __DIR__ . '/async_logs';$logRotateB = 50 * 1024 * 1024;if(!is_dir($logDir)) { mkdir($logDir, 0755, true);}$safeTaskName = preg_replace('/[^a-zA-Z0-9_\-]/', '', $taskName);$logFile = $logDir . "/{$safeTaskName}_" . date('Ymd') . '.log';$log = static function (string $level, string $msg) use ($logFile, $logRotateB): void { if (file_exists($logFile) && filesize($logFile) >= $logRotateB) { rename($logFile, $logFile . '.' . date('His') . '.bak'); } $line = sprintf("[%s] [%-5s] %s\n", date('Y-m-d H:i:s'), strtoupper($level), $msg); file_put_contents($logFile, $line, FILE_APPEND | LOCK_EX);};// -----------------------------------------------------------------------// 5. 任务注册表// 新增任务:在此处添加一个 key => Closure 即可,同时在 AsyncTask::$taskWhitelist 里加上任务名// -----------------------------------------------------------------------$taskHandlers = [ // ------------------------------------------------------------------ // 发送短信 // ------------------------------------------------------------------ 'sendSms' => static function (array $params): string { $mobile = $params['mobile'] ?? ''; $code = $params['code'] ?? ''; if($mobile === '' || $code === '') { throw new RuntimeException("短信参数缺失:mobile={$mobile}, code={$code}"); } // 真实业务(取消注释): // $res = curlPost('https://sms.example.com/send', ['mobile' => $mobile, 'code' => $code]); // if (empty($res['success'])) { // throw new RuntimeException("短信接口返回失败:" . json_encode($res)); // } sleep(2); // 模拟耗时,上线删除 return "短信发送成功:{$mobile},验证码:{$code}"; }, // ------------------------------------------------------------------ // 发送邮件 // ------------------------------------------------------------------ 'sendEmail' => static function (array $params): string { $email = $params['email'] ?? ''; $title = $params['title'] ?? ''; $content = $params['content'] ?? ''; if($email === '' || $title === '') { throw new RuntimeException("邮件参数缺失:email={$email}, title={$title}"); } // 真实业务(PHPMailer): // $mailer = new PHPMailer(true); // $mailer->addAddress($email); // $mailer->Subject = $title; // $mailer->Body = $content; // $mailer->send(); // 失败会抛 PHPMailer\PHPMailer\Exception sleep(3); return "邮件发送成功:{$email},标题:{$title}"; }, // ------------------------------------------------------------------ // 生成 Excel // ------------------------------------------------------------------ 'generateExcel' => static function (array $params): string { $fileName = $params['fileName'] ?? 'export_' . date('YmdHis'); $data = $params['data'] ?? []; if(empty($data)) { throw new RuntimeException("Excel 数据为空"); } $outDir = __DIR__ . '/uploads'; if(!is_dir($outDir)) { mkdir($outDir, 0755, true); } $filePath = $outDir . "/{$fileName}.xlsx"; // 真实业务(PhpSpreadsheet): // $spreadsheet = new \PhpOffice\PhpSpreadsheet\Spreadsheet(); // $sheet = $spreadsheet->getActiveSheet(); // foreach ($data as $rowIdx => $row) { // $sheet->fromArray($row, null, 'A' . ($rowIdx + 1)); // } // (new \PhpOffice\PhpSpreadsheet\Writer\Xlsx($spreadsheet))->save($filePath); sleep(5); unset($data); // 大数组用完立即释放 return "Excel 生成成功:{$filePath}"; }, // ------------------------------------------------------------------ // 数据库备份 // ------------------------------------------------------------------ 'backupDb' => static function (array $params): string { $dbName = $params['dbName'] ?? ''; $savePath = $params['savePath'] ?? __DIR__ . '/backups'; if($dbName === '') { throw new RuntimeException("数据库名缺失"); } if(!is_dir($savePath)) { mkdir($savePath, 0755, true); } $backupFile = $savePath . "/{$dbName}_" . date('YmdHis') . '.sql'; // 真实业务(mysqldump): // $cmd = sprintf( // 'mysqldump -u%s -p%s %s > %s', // escapeshellarg(DB_USER), // escapeshellarg(DB_PASS), // escapeshellarg($dbName), // escapeshellarg($backupFile) // ); // exec($cmd, $out, $exitCode); // if ($exitCode !== 0) { // throw new RuntimeException("mysqldump 退出码:{$exitCode}"); // } sleep(4); return "数据库备份成功:{$backupFile}"; }, // ------------------------------------------------------------------ // 扩展示例:推送消息 // 同时需要在 AsyncTask::setWhitelist(['pushMsg']) 里注册 // ------------------------------------------------------------------ // 'pushMsg' => static function (array $params): string { // $userId = $params['userId'] ?? ''; // $content = $params['content'] ?? ''; // if ($userId === '' || $content === '') { // throw new RuntimeException("推送参数缺失"); // } // // 调用推送 SDK ... // return "推送成功:用户 {$userId}"; // },];// -----------------------------------------------------------------------// 6. 校验任务是否在注册表中// -----------------------------------------------------------------------if(!isset($taskHandlers[$taskName])) { $log('ERROR', "任务未注册:{$taskName}"); exit(1);}$handler = $taskHandlers[$taskName];// -----------------------------------------------------------------------// 7. 执行任务(指数退避重试)// 第 1 次失败 → 等 1s → 第 2 次失败 → 等 2s → 第 3 次失败 → 等 4s → …// 最大等待上限 30s,防止进程长期驻留// -----------------------------------------------------------------------$success = false;$lastError = '';for($attempt = 1; $attempt <= $maxAttempts; $attempt++) { try { $result = $handler($params); $log('INFO', "任务成功(第 {$attempt}/{$maxAttempts} 次):{$result}"); $success = true; break; } catch(Throwable $e) { // 捕获 Exception + Error $lastError = $e->getMessage(); $log('WARN', "任务失败(第 {$attempt}/{$maxAttempts} 次):{$lastError}"); if($attempt < $maxAttempts) { $delay = min((int)pow(2, $attempt - 1), 30); // 1,2,4,8,…,30 $log('INFO', "指数退避,等待 {$delay}s 后重试…"); sleep($delay); } }}// -----------------------------------------------------------------------// 8. 最终结果// -----------------------------------------------------------------------if(!$success) { $log('ERROR', "任务彻底失败(共 {$maxAttempts} 次),最后错误:{$lastError}"); exit(1);}exit(0);
<?php/** * 队列消费脚本 * * 推荐用法(crontab,每分钟执行一次): * * * * * * php /path/to/queue_worker.php >> /path/async_logs/worker.log 2>&1 * * 或通过 Supervisor 持续运行(适合高流量场景): * [program:async_queue_worker] * command=php /path/to/queue_worker.php --loop * autostart=true * autorestart=true */PHP_SAPI === 'cli' or exit('Access denied.' . PHP_EOL);require_once __DIR__ . '/AsyncTask.php';// 可选:覆盖默认配置// AsyncTask::setLogDir('/var/log/async');// AsyncTask::setQueueFile('/var/log/async/task_queue.jsonl');// AsyncTask::setQueueBatchSize(50);$loop = in_array('--loop', $argv ?? [], true);do { $count = AsyncTask::processQueue(); echo sprintf("[%s] 本次消费任务数:%d,队列剩余:%d\n", date('Y-m-d H:i:s'), $count, AsyncTask::queueSize()); if($loop) { sleep(5); // 持续模式每 5 秒轮询一次 }} while($loop);
# 方式 A:crontab,每分钟消费一次(低流量推荐)* * * * * php /path/to/queue_worker.php >> /path/async_logs/worker.log 2>&1# 方式 B:持续轮询(高流量,配合 Supervisor)php queue_worker.php --loop
四、完整使用示例
重点说一下 Demo 3 导出的三个场景
场景 A:小数据量(< 1000 行)
数据直接随参数传入,最简单,直接用。
场景 B:10 万行大数据
数据照样塞进 $params,AsyncTask 内部自动走临时文件绕过命令行长度限制,调用方无感知。但有一个坑要注意:unset($data); // ← 必须加!主线程把 10 万行数组组装完交给子进程后,自己要立刻释放,否则主线程内存同样会爆。
场景 C:分片导出(生产最推荐)
主线程完全不查数据,只负责「算出分片参数 → 提交 N 个任务」,每个 handler 子进程自己去数据库分页查询并生成一个 Excel 文件。好处:
- N 个子进程并发跑,总耗时约等于单片耗时而不是累加
<?php/** * AsyncTask 使用示例合集 * * 目录: * Demo 1 - 用户注册 → 异步发送短信 * Demo 2 - 异步发送邮件(含 HTML 内容) * Demo 3 - 大数据导出 Excel(含进度查询接口) * Demo 4 - 数据库备份(定时任务场景) * Demo 5 - 自定义任务:推送消息 * Demo 6 - 队列状态监控 * * 所有 Demo 均模拟「接口调用」场景,run() 调用后立即返回,不阻塞主线程。 */require_once __DIR__ . '/AsyncTask.php';// ============================================================// 全局配置(可选,不调用则使用默认值)// ============================================================AsyncTask::setLogDir(__DIR__ . '/async_logs'); // 日志目录AsyncTask::setQueueFile(__DIR__ . '/async_logs/task_queue.jsonl'); // 队列文件AsyncTask::setMaxConcurrent(10); // 同类任务最大并发数AsyncTask::setQueueBatchSize(20); // 每次消费队列的批次大小AsyncTask::setLogRotateSizeMB(50); // 日志文件超 50MB 自动归档// 运行指定 Demo(命令行传参:php demos.php 1)$demoId = (int)($argv[1] ?? 0);switch($demoId) { case 1: demo1_register(); break; case 2: demo2_sendEmail(); break; case 3: demo3_exportExcel(); break; case 4: demo4_backupDb(); break; case 5: demo5_customTask(); break; case 6: demo6_queueMonitor(); break; default: echo "用法:php demos.php <编号>\n"; echo " 1 - 注册发短信\n"; echo " 2 - 发送邮件\n"; echo " 3 - 导出大 Excel\n"; echo " 4 - 数据库备份\n"; echo " 5 - 自定义推送任务\n"; echo " 6 - 队列状态监控\n";}// ============================================================// Demo 1:用户注册 → 异步发送短信// ============================================================functiondemo1_register(): void{ echo "=== Demo 1:用户注册 ===\n"; $startTime = microtime(true); // 模拟注册接口 $result = userRegister('13800138000'); $elapsed = round((microtime(true) - $startTime) * 1000, 2); echo json_encode($result, JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT) . "\n"; echo "接口耗时:{$elapsed}ms(短信在后台发送,主线程不阻塞)\n";}functionuserRegister(string$mobile): array{ $code = rand(1000, 9999); // --- 同步操作 --- // saveUserToDb($mobile, $code); // 写数据库 // saveVerifyCode($mobile, $code, 300); // 写验证码缓存(5分钟有效) // --- 异步操作:提交后立即返回,不等短信发出 --- $submitted = AsyncTask::run('sendSms', [ 'mobile' => $mobile, 'code' => (string)$code, ], retry: 3); return [ 'code' => 200, 'msg' => '注册成功,验证码已发送', 'mobile' => $mobile, 'async_sms' => $submitted ? '已提交' : '提交失败,请稍后重试', ];}// ============================================================// Demo 2:发送邮件(含 HTML 正文)// ============================================================functiondemo2_sendEmail(): void{ echo "=== Demo 2:异步发送邮件 ===\n"; $startTime = microtime(true); // 模拟订单完成后发送确认邮件 $result = sendOrderConfirmEmail( orderId: 'ORD20240315001', email: 'user@example.com', name: '张三', amount: '¥299.00' ); $elapsed = round((microtime(true) - $startTime) * 1000, 2); echo json_encode($result, JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT) . "\n"; echo "接口耗时:{$elapsed}ms\n";}functionsendOrderConfirmEmail( string $orderId, string $email, string $name, string $amount): array { // HTML 邮件正文(真实场景可渲染模板文件) $htmlContent = <<<HTML <h2>亲爱的 {$name},您的订单已完成!</h2> <p>订单号:<strong>{$orderId}</strong></p> <p>实付金额:<strong>{$amount}</strong></p> <p>感谢您的购买,如有疑问请联系客服。</p> HTML; $submitted = AsyncTask::run('sendEmail', [ 'email' => $email, 'title' => "订单确认 - {$orderId}", 'content' => $htmlContent, ], retry: 2); return [ 'code' => 200, 'msg' => '订单处理完成', 'order_id' => $orderId, 'async_email' => $submitted ? '确认邮件已提交发送' : '邮件提交失败', ];}// ============================================================// Demo 3:大数据导出 Excel(核心 Demo)// ============================================================functiondemo3_exportExcel(): void{ echo "=== Demo 3:大数据导出 Excel ===\n\n"; // --- 场景 A:小数据量(< 1000 行),直接传参数 --- echo "--- 场景 A:小数据量直接导出 ---\n"; $startTime = microtime(true); $result = exportSmallData(); $elapsed = round((microtime(true) - $startTime) * 1000, 2); echo json_encode($result, JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT) . "\n"; echo "接口耗时:{$elapsed}ms\n\n"; // --- 场景 B:大数据量(10 万行),参数走临时文件(自动处理,无需手动干预) --- echo "--- 场景 B:10 万行大数据导出 ---\n"; $startTime = microtime(true); $result = exportBigData(); $elapsed = round((microtime(true) - $startTime) * 1000, 2); echo json_encode($result, JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT) . "\n"; echo "接口耗时:{$elapsed}ms(10 万行数据后台生成,接口瞬间返回)\n\n"; // --- 场景 C:超大数据量分片导出(推荐:避免单次内存爆炸)--- echo "--- 场景 C:分片导出(每片 1 万行,共 10 片)---\n"; $startTime = microtime(true); $result = exportByChunks(totalRows: 100000, chunkSize: 10000); $elapsed = round((microtime(true) - $startTime) * 1000, 2); echo json_encode($result, JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT) . "\n"; echo "接口耗时:{$elapsed}ms(10 个子任务并发后台跑,主线程零等待)\n";}/** 场景 A:小数据量,数据直接随参数传入 */functionexportSmallData(): array{ // 模拟从数据库查出 200 行 $data = []; for($i = 1; $i <= 200; $i++) { $data[] = [ 'id' => $i, 'name' => "商品{$i}", 'price' => rand(10, 500), 'stock' => rand(0, 1000), 'category' => ['电子', '服装', '食品', '家居'][rand(0, 3)], ]; } $fileName = 'goods_small_' . date('YmdHis'); $submitted = AsyncTask::run('generateExcel', [ 'fileName' => $fileName, 'data' => $data, // 可选:告诉 handler 生成完后通知谁 'notify' => ['type' => 'callback', 'url' => 'https://example.com/excel/done'], ]); return [ 'code' => 200, 'msg' => 'Excel 正在后台生成', 'file_name' => $fileName . '.xlsx', 'async' => $submitted ? '已提交' : '提交失败', 'tip' => '生成完成后文件将保存至 uploads/ 目录', ];}/** 场景 B:10 万行大数据,数据直接传入(AsyncTask 内部自动走临时文件,规避 ARG_MAX) */functionexportBigData(): array{ // 实际项目中应从数据库分批查询,此处用循环模拟 $data = []; for($i = 1; $i <= 100000; $i++) { $data[] = [ 'order_id' => 'ORD' . str_pad((string)$i, 8, '0', STR_PAD_LEFT), 'user_id' => rand(1000, 9999), 'product' => "商品{$i}", 'qty' => rand(1, 10), 'unit_price' => rand(10, 1000), 'total' => rand(10, 10000), 'status' => ['待支付', '已支付', '已发货', '已完成'][rand(0, 3)], 'created_at' => date('Y-m-d H:i:s', time() - rand(0, 86400 * 30)), ]; } $fileName = 'orders_full_' . date('YmdHis'); $submitted = AsyncTask::run('generateExcel', [ 'fileName' => $fileName, 'data' => $data, // 10 万行数据由 AsyncTask 自动写临时文件传递,不会撑爆命令行 ]); unset($data); // 主进程用完立即释放,不要等 GC return [ 'code' => 200, 'msg' => 'Excel 正在后台生成,请稍候下载', 'file_name' => $fileName . '.xlsx', 'async' => $submitted ? '已提交' : '提交失败', 'tip' => '数据量较大,预计 30~60 秒完成', ];}/** * 场景 C:超大数据分片导出 * * 思路:主线程只负责「分片 + 提交任务」,每片单独生成一个 Excel 文件, * 最终可由另一个异步任务将多个文件合并,或直接打包成 zip 提供下载。 * * 优点: * - 主线程不持有大数组,内存极低 * - 各片并发执行,总耗时约等于单片耗时 * - 某片失败只需重试该片,不影响其他 */functionexportByChunks(int$totalRows, int$chunkSize): array{ $totalChunks = (int)ceil($totalRows / $chunkSize); $batchId = 'batch_' . date('YmdHis') . '_' . rand(1000, 9999); $submitted = 0; $failed = 0; $fileNames = []; for($chunk = 0; $chunk < $totalChunks; $chunk++) { $offset = $chunk * $chunkSize; $limit = $chunkSize; $fileName = "{$batchId}_part" . str_pad((string)($chunk + 1), 3, '0', STR_PAD_LEFT); // 真实项目:这里只传查询参数,让 handler 自己去数据库分页查 // 避免主线程查出全量数据再传参(主线程内存爆炸的根源) $ok = AsyncTask::run('generateExcel', [ 'fileName' => $fileName, 'source' => [ // 告诉 handler 去哪查数据 'type' => 'db_query', 'sql' => "SELECT * FROM orders ORDER BY id LIMIT {$limit} OFFSET {$offset}", // 真实场景建议传结构化参数,handler 内拼 SQL,避免 SQL 注入 // 'table' => 'orders', // 'offset' => $offset, // 'limit' => $limit, ], ]); if($ok) { $submitted++; $fileNames[] = $fileName . '.xlsx'; } else { $failed++; } } return [ 'code' => 200, 'msg' => "分片任务已提交", 'batch_id' => $batchId, 'total_chunks' => $totalChunks, 'submitted' => $submitted, 'failed' => $failed, 'files' => $fileNames, 'tip' => '所有分片完成后,可调用合并接口将文件打包成 zip', ];}// ============================================================// Demo 4:数据库备份(定时任务场景)// ============================================================functiondemo4_backupDb(): void{ echo "=== Demo 4:异步数据库备份 ===\n"; // 同时备份多个库(每个库独立子进程,互不影响) $databases = [ ['name' => 'shop_db', 'path' => '/www/backups/shop'], ['name' => 'user_db', 'path' => '/www/backups/user'], ['name' => 'finance_db', 'path' => '/www/backups/finance'], ]; $results = []; foreach($databases as $db) { $ok = AsyncTask::run('backupDb', [ 'dbName' => $db['name'], 'savePath' => $db['path'], ], retry: 2); $results[] = [ 'db' => $db['name'], 'submitted' => $ok, 'status' => $ok ? '备份任务已提交' : '提交失败', ]; } echo json_encode([ 'code' => 200, 'msg' => '数据库备份任务已全部提交', 'details' => $results, ], JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT) . "\n"; echo "\n建议在 crontab 中配置(每天凌晨 2 点执行):\n"; echo "0 2 * * * php " . __FILE__ . " 4 >> /var/log/backup_cron.log 2>&1\n";}// ============================================================// Demo 5:自定义任务(以「推送消息」为例)// ============================================================functiondemo5_customTask(): void{ echo "=== Demo 5:自定义任务 - 推送消息 ===\n\n"; // Step 1:将自定义任务加入白名单 AsyncTask::setWhitelist(['pushMsg']); // Step 2:在 async_task_handler.php 的 $taskHandlers 数组中添加对应处理函数: // // 'pushMsg' => static function (array $params): string { // $userId = $params['userId'] ?? ''; // $title = $params['title'] ?? ''; // $content = $params['content'] ?? ''; // $channel = $params['channel'] ?? 'app'; // app / sms / email // // if ($userId === '' || $content === '') { // throw new RuntimeException("推送参数缺失"); // } // // // 调用推送 SDK(极光/个推/自建) // // $push = new JiGuangPush(); // // $push->toUser($userId)->send($title, $content); // // sleep(1); // return "推送成功:用户 {$userId},渠道:{$channel}"; // }, // Step 3:调用 $userIds = [1001, 1002, 1003, 1004, 1005]; $success = 0; foreach($userIds as $uid) { $ok = AsyncTask::run('pushMsg', [ 'userId' => $uid, 'title' => '您有新订单待处理', 'content' => '您的订单 ORD20240315001 已发货,请注意查收!', 'channel' => 'app', ]); if($ok) $success++; } echo "批量推送结果:{$success}/" . count($userIds) . " 个任务提交成功\n"; echo "提示:pushMsg 已加入白名单,但 handler 中对应函数需手动添加(见注释)\n";}// ============================================================// Demo 6:队列状态监控// ============================================================functiondemo6_queueMonitor(): void{ echo "=== Demo 6:队列状态监控 ===\n\n"; // 模拟高并发:快速提交 30 个任务,触发部分入队 echo "模拟提交 30 个短信任务(maxConcurrent=3,其余自动入队)...\n"; AsyncTask::setMaxConcurrent(3); // 调低并发限制,方便演示入队效果 $submitted = 0; $queued = 0; for($i = 1; $i <= 30; $i++) { $ok = AsyncTask::run('sendSms', [ 'mobile' => '138' . str_pad((string)$i, 8, '0', STR_PAD_LEFT), 'code' => (string)rand(1000, 9999), ]); $ok ? $submitted++ : $queued++; } // 实际上 run() 返回 true 包含「直接启动」和「成功入队」两种情况 // 这里只是演示计数,真实监控看下面的 queueSize() echo "\n当前队列积压数:" . AsyncTask::queueSize() . " 个任务\n\n"; // 手动触发消费(正常由 queue_worker.php 负责) echo "手动触发队列消费...\n"; $consumed = AsyncTask::processQueue(); echo "本次消费:{$consumed} 个任务\n"; echo "消费后队列剩余:" . AsyncTask::queueSize() . " 个任务\n\n"; echo "提示:生产环境建议配置 crontab 自动消费:\n"; echo "* * * * * php " . __DIR__ . "/queue_worker.php >> " . __DIR__ . "/async_logs/worker.log 2>&1\n";}
五、生产级配置(必看)
1. 环境准备
- 开启 PHP
exec 函数:修改 php.ini,确保 disable_functions 中没有 exec; - 权限配置:
async_logs、uploads、backups 目录赋予 0755 权限; - 绝对路径:所有文件路径必须用绝对路径(子进程环境无相对路径)。
2. 安全加固
- 白名单严格控制:只允许执行业务需要的任务,禁止新增未知任务;
- 参数过滤:封装类已过滤危险字符,禁止传递
rm、cd、mv 等命令参数; - 日志权限:日志目录禁止 web 访问(可放
public 目录外); - 禁止直接访问:
async_task_handler.php 可增加访问限制(比如只允许 cli 执行)。
3. 高并发优化
- 控制子进程数量:避免短时间提交大量任务,导致服务器过载;
- 资源释放:任务执行完成后,及时 unset 大数组、关闭文件句柄。
4. 部署方式
- Linux 服务器直接使用,建议配合 Supervisor 监控日志;
- 虚拟主机确认主机允许 exec 函数(部分虚拟主机禁用,可联系服务商开启);
- Windows 服务器将
exec 命令改为 start /B php ...(适配 Windows 后台执行)。
六、常见问题解决
1. 任务提交成功但不执行
- 检查
async_task_handler.php 路径是否为绝对路径; - 检查 PHP 执行权限:
which php 确认路径,替换封装类中的 PHP_BINARY; - 查看日志文件:
async_logs 目录下的 error 日志,排查具体错误。
2. 子进程卡死
- 设置任务超时:在
async_task_handler.php 中为每个任务加超时逻辑; - 限制内存:
ini_set('memory_limit', '256M'),避免内存溢出; - 定期清理:定时清理日志文件、临时文件,避免磁盘占满。
3. 虚拟主机禁用 exec
- 替代方案:使用
file_put_contents 写入任务队列,单独写一个脚本定时读取执行; - 联系服务商:申请开启 exec 函数(仅允许执行 php 命令)。
七、核心优化总结
- 无依赖不用 Redis/MQ,原生 PHP 就能跑,小项目首选;
- 安全白名单+参数过滤,防止 exec 注入,生产级加固;
- 高效
- 易扩展
- 可监控
关注我,后续我会继续分享:PHP 开发规范、框架实战、架构思路、运维技巧、效率工具。关注我,每天 3 分钟,提升开发效率。
欢迎在留言区告诉我:你项目中哪些场景需要异步执行?有没有遇到过子进程执行失败的问题?我们一起交流解决方案!