Task
EasySwoole 3.3.0+
異步任務放棄了 Swoole
的原生 task
,采用獨立組件實現(xiàn)。
相對于原生 Swoole Task
,easyswoole/task 組件實現(xiàn)了以下功能:
- 可以投遞閉包任務
- 可以在
TaskWorker
等其他自定義進程繼續(xù)投遞任務 - 實現(xiàn)任務限流與狀態(tài)監(jiān)控
安裝
composer require easyswoole/task
框架中使用
同步調用:
\EasySwoole\EasySwoole\Task\TaskManager::getInstance()->sync(function (){
echo 'sync';
});
異步調用:
\EasySwoole\EasySwoole\Task\TaskManager::getInstance()->async(function () {
echo 'async';
}, function ($reply, $taskId, $workerIndex) {
// $reply 返回的執(zhí)行結果
// $taskId 任務id
echo 'async success';
});
由于 php
本身就不能序列化閉包,該閉包投遞是通過反射該閉包函數(shù),獲取 php
代碼直接序列化 php
代碼,然后直接 eval
代碼實現(xiàn)的。
所以投遞閉包無法使用外部的對象引用,以及資源句柄,復雜任務請使用任務模板方法。
任務模版
自定義一個任務模版
<?php
namespace App\Task;
use EasySwoole\Task\AbstractInterface\TaskInterface;
class CustomTask implements TaskInterface
{
protected $data;
public function __construct($data)
{
// 保存投遞過來的數(shù)據(jù)
$this->data = $data;
}
public function run(int $taskId, int $workerIndex)
{
// 執(zhí)行邏輯
}
public function onException(\Throwable $throwable, int $taskId, int $workerIndex)
{
// 異常處理
}
}
如何使用
$task = \EasySwoole\EasySwoole\Task\TaskManager::getInstance();
// 投遞異步任務
$task->async(new CustomTask(['user' => 'custom']));
// 投遞同步任務
$data = $task->sync(new CustomTask(['user' => 'custom']));
投遞返回值
easyswoole/task
組件在 1.0.8
及以前版本支持,如下 4
個投遞返回值:
-
> 0
投遞成功(異步任務專屬,返回taskId
,同步任務直接返回run()
方法運行之后返回的值) -
-1
task
進程繁忙,投遞失敗 (已經(jīng)到達最大運行數(shù)量maxRunningNum
) -
-2
投遞數(shù)據(jù)解包失敗,當投遞數(shù)據(jù)傳輸時數(shù)據(jù)異常時會報錯,此錯誤為組件底層錯誤,一般不會出現(xiàn) -
-3
任務出錯 (該任務執(zhí)行時出現(xiàn)異常錯誤,被組件攔截并輸出錯誤)
在 1.0.9
~ 1.1.1
版本,除了支持上述 4
個投遞返回值,還新增支持了以下 2
個投遞返回值:
-
-4
投遞的任務數(shù)據(jù)不合法,一般是投遞了不能序列化的數(shù)據(jù)才會出現(xiàn)。 -
-5
投遞的任務在運行時出錯
在最新的版本及以后版本中,又新增支持了以下 2
個投遞返回值:
-
-6
投遞的任務數(shù)據(jù)包已過期,一般是Task
進程比較繁忙時才會出現(xiàn)。 -
-7
投遞任務時,任務運行完成后沒有任何數(shù)據(jù)返回。一般是因為執(zhí)行任務時間過長導致UnixSocket
超時,才會出現(xiàn)。
獨立使用
該組件可獨立使用,代碼如下:
<?php
use EasySwoole\Task\Config;
use EasySwoole\Task\Task;
require_once __DIR__ . '/vendor/autoload.php';
/**
* 配置項中可以修改工作進程數(shù)、臨時目錄,進程名,最大并發(fā)執(zhí)行任務數(shù),異常回調等
*/
$config = new Config();
$task = new Task($config);
// 添加 swoole 服務
$http = new \Swoole\Http\Server("0.0.0.0", 9501);
// 注入 swoole 服務,進行創(chuàng)建 task 進程
$task->attachToServer($http);
// 在 onrequest 事件中調用 task (其他地方也可以,這只是示例)
$http->on("request", function (\Swoole\Http\Request $request, \Swoole\Http\Response $response) use ($task) {
if (isset($request->get['sync'])) {
// 同步調用 task
$ret = $task->sync(function ($taskId, $workerIndex) {
return "{$taskId}.{$workerIndex}";
});
$response->end("sync result " . $ret);
} else if (isset($request->get['status'])) {
var_dump($task->status());
} else {
// 異步調用 task
$id = $task->async(function ($taskId, $workerIndex) {
\co::sleep(1);
var_dump("async id {$taskId} task run");
});
$response->end("async id {$id} ");
}
});
// 啟動服務
$http->start();
版本強調
框架低版本升級為 EasySwoole 3.3.0+
,需要手動進行配置修改。
需要刪除 MAIN_SERVER.SETTING.task_worker_num
,MAIN_SERVER.SETTING.task_enable_coroutine
配置項。
請在項目根目錄的 dev.php/produce.php
的 MAIN_SERVER
配置項中,增加 TASK
子配置項:
<?php
// 這里省略
return [
// 這里省略 ...
'MAIN_SERVER' => [
// 這里省略 ...
'TASK' => [
'workerNum' => 4,
'maxRunningNum' => 128,
'timeout' => 15
]
],
// 這里省略 ...
];
Task管理
查看所有Task進程的狀態(tài)
php easyswoole.php task status