Swoft源码之Swoole和Swoft的分析
这篇文章给大家分享的内容是关于Swoft 源码剖析之Swoole和Swoft的一些介绍(Task投递/定时任务篇),有一定的参考价值,有需要的朋友可以参考一下。 前言
我的官方群点击此处。 任务投递//SwoftTaskTask.php class Task { /** * Deliver coroutine or async task * * @param string $taskName * @param string $methodName * @param array $params * @param string $type * @param int $timeout * * @return bool|array * @throws TaskException */ public static function deliver(string $taskName,string $methodName,array $params = [],string $type = self::TYPE_CO,$timeout = 3) { $data = TaskHelper::pack($taskName,$methodName,$params,$type); if(!App::isWorkerStatus() && !App::isCoContext()){ return self::deliverByQueue($data);//见下文Command章节 } if(!App::isWorkerStatus() && App::isCoContext()){ throw new TaskException('Please deliver task by http!'); } $server = App::$server->getServer(); // Delier coroutine task if ($type == self::TYPE_CO) { $tasks[0] = $data; $prifleKey = 'task' . '.' . $taskName . '.' . $methodName; App::profileStart($prifleKey); $result = $server->taskCo($tasks,$timeout); App::profileEnd($prifleKey); return $result; } // Deliver async task return $server->task($data); } } ? 任务投递 任务执行//SwoftTaskBootstrapListenersTaskEventListener /** * The listener of swoole task * @SwooleListener({ * SwooleEvent::ON_TASK,* SwooleEvent::ON_FINISH,* }) */ class TaskEventListener implements TaskInterface,FinishInterface { /** * @param SwooleServer $server * @param int $taskId * @param int $workerId * @param mixed $data * @return mixed * @throws InvalidArgumentException */ public function onTask(Server $server,int $taskId,int $workerId,$data) { try { /* @var TaskExecutor $taskExecutor*/ $taskExecutor = App::getBean(TaskExecutor::class); $result = $taskExecutor->run($data); } catch (Throwable $throwable) { App::error(sprintf('TaskExecutor->run %s file=%s line=%d ',$throwable->getMessage(),$throwable->getFile(),$throwable->getLine())); $result = false; // Release system resources App::trigger(AppEvent::RESOURCE_RELEASE); App::trigger(TaskEvent::AFTER_TASK); } return $result; } } ? 此处是
//SwoftTaskTaskExecutor /** * The task executor * * @Bean() */ class TaskExecutor { /** * @param string $data * @return mixed */ public function run(string $data) { $data = TaskHelper::unpack($data); $name = $data['name']; $type = $data['type']; $method = $data['method']; $params = $data['params']; $logid = $data['logid'] ?? uniqid('',true); $spanid = $data['spanid'] ?? 0; $collector = TaskCollector::getCollector(); if (!isset($collector['task'][$name])) { return false; } list(,$coroutine) = $collector['task'][$name]; $task = App::getBean($name); if ($coroutine) { $result = $this->runCoTask($task,$method,$logid,$spanid,$name,$type); } else { $result = $this->runSyncTask($task,$type); } return $result; } } ? 任务执行思路很简单,将 值得一提的一点是, 从Process中投递任务前面我们提到:
换句话说, 这个限制大大的限制了使用场景。 如何能够为了能够在 //SwoftTaskTask.php /** * Deliver task by process * * @param string $taskName * @param string $methodName * @param array $params * @param string $type * @param int $timeout * @param int $workId * * @return bool */ public static function deliverByProcess(string $taskName,int $timeout = 3,int $workId = 0,string $type = self::TYPE_ASYNC): bool { /* @var PipeMessageInterface $pipeMessage */ $server = App::$server->getServer(); $pipeMessage = App::getBean(PipeMessage::class); $data = [ 'name' => $taskName,'method' => $methodName,'params' => $params,'timeout' => $timeout,'type' => $type,]; $message = $pipeMessage->pack(PipeMessage::MESSAGE_TYPE_TASK,$data); return $server->sendMessage($message,$workId); } ? 数据打包后使用 //SwoftBootstrapServerServerTrait.php /** * onPipeMessage event callback * * @param SwooleServer $server * @param int $srcWorkerId * @param string $message * @return void * @throws InvalidArgumentException */ public function onPipeMessage(Server $server,int $srcWorkerId,string $message) { /* @var PipeMessageInterface $pipeMessage */ $pipeMessage = App::getBean(PipeMessage::class); list($type,$data) = $pipeMessage->unpack($message); App::trigger(AppEvent::PIPE_MESSAGE,null,$type,$data,$srcWorkerId); } ?
//SwoftTaskEventListenersPipeMessageListener.php /** * The pipe message listener * * @Listener(event=AppEvent::PIPE_MESSAGE) */ class PipeMessageListener implements EventHandlerInterface { /** * @param SwoftEventEventInterface $event */ public function handle(EventInterface $event) { $params = $event->getParams(); if (count($params) < 3) { return; } list($type,$srcWorkerId) = $params; if ($type != PipeMessage::MESSAGE_TYPE_TASK) { return; } $type = $data['type']; $taskName = $data['name']; $params = $data['params']; $timeout = $data['timeout']; $methodName = $data['method']; // delever task Task::deliver($taskName,$params,$timeout); } } ?
一道简单的回顾练习:从 从Command进程或其子进程中投递任务//SwoftTaskQueueTask.php /** * @param string $data * @param int $taskWorkerId * @param int $srcWorkerId * * @return bool */ public function deliver(string $data,int $taskWorkerId = null,$srcWorkerId = null) { if ($taskWorkerId === null) { $taskWorkerId = mt_rand($this->workerNum + 1,$this->workerNum + $this->taskNum); } if ($srcWorkerId === null) { $srcWorkerId = mt_rand(0,$this->workerNum - 1); } $this->check(); $data = $this->pack($data,$srcWorkerId); $result = msg_send($this->queueId,$taskWorkerId,false); if (!$result) { return false; } return true; } ? 对于 但在 ?
? 同一个项目中 定时任务除了手动执行的普通任务,
SwoftTaskCrontabTableCrontab.php /** * 任务表,记录用户配置的任务信息 * 表每行记录包含的字段如下,其中`rule`,`taskClass`,`taskMethod`生成key唯一确定一条记录 * @var array $originStruct */ private $originStruct = [ 'rule' => [SwooleTable::TYPE_STRING,100],//定时任务执行规则,对应@Scheduled注解的cron属性 'taskClass' => [SwooleTable::TYPE_STRING,255],//任务名 对应@Task的name属性(默认为类名) 'taskMethod' => [SwooleTable::TYPE_STRING,//Task方法,对应@Scheduled注解所在方法 'add_time' => [SwooleTable::TYPE_STRING,11],//初始化该表内容时的10位时间戳 ]; /** * 执行表,记录短时间内要执行的任务列表及其执行状态 * 表每行记录包含的字段如下,其中`taskClass`,`taskMethod`,`minute`,`sec`生成key唯一确定一条记录 * @var array $runTimeStruct */ private $runTimeStruct = [ 'taskClass' => [SwooleTable::TYPE_STRING,//同上 'taskMethod' => [SwooleTable::TYPE_STRING,//同上 'minute' => [SwooleTable::TYPE_STRING,20],//需要执行任务的时间,精确到分钟 格式date('YmdHi') 'sec' => [SwooleTable::TYPE_STRING,//需要执行任务的时间,精确到分钟 10位时间戳 'runStatus' => [SwooleTABLE::TYPE_INT,4],//任务状态,有 0(未执行) 1(已执行) 2(执行中) 三种。 //注意:这里的执行是一个容易误解的地方,此处的执行并不是指任务本身的执行,而是值`任务投递`这一操作的执行,从宏观上看换成 _未投递_,_已投递_,_投递中_描述会更准确。 ]; ? 此处为何要使用Swoole的内存Table?
为了 背景介绍完了,我们来看看这两个定时任务进程的行为 //SwoftTaskBootstrapProcessCronTimerProcess.php /** * Crontab timer process * * @Process(name="cronTimer",boot=true) */ class CronTimerProcess implements ProcessInterface { /** * @param SwoftProcessProcess $process */ public function run(SwoftProcess $process) { //code.... /* @var SwoftTaskCrontabCrontab $cron*/ $cron = App::getBean('crontab'); // Swoole/HttpServer $server = App::$server->getServer(); $time = (60 - date('s')) * 1000; $server->after($time,function () use ($server,$cron) { // Every minute check all tasks,and prepare the tasks that next execution point needs $cron->checkTask(); $server->tick(60 * 1000,function () use ($cron) { $cron->checkTask(); }); }); } } //SwoftTaskCrontabCrontab.php /** * 初始化runTimeTable数据 * * @param array $task 任务 * @param array $parseResult 解析crontab命令规则结果,即Task需要在当前分钟内的哪些秒执行 * @return bool */ private function initRunTimeTableData(array $task,array $parseResult): bool { $runTimeTableTasks = $this->getRunTimeTable()->table; $min = date('YmdHi'); $sec = strtotime(date('Y-m-d H:i')); foreach ($parseResult as $time) { $this->checkTaskQueue(false); $key = $this->getKey($task['rule'],$task['taskClass'],$task['taskMethod'],$min,$time + $sec); $runTimeTableTasks->set($key,[ 'taskClass' => $task['taskClass'],'taskMethod' => $task['taskMethod'],'minute' => $min,'sec' => $time + $sec,'runStatus' => self::NORMAL ]); } return true; } ?
该进程使用了 //SwoftTaskBootstrapProcess /** * Crontab process * * @Process(name="cronExec",boot=true) */ class CronExecProcess implements ProcessInterface { /** * @param SwoftProcessProcess $process */ public function run(SwoftProcess $process) { $pname = App::$server->getPname(); $process->name(sprintf('%s cronexec process',$pname)); /** @var SwoftTaskCrontabCrontab $cron */ $cron = App::getBean('crontab'); // Swoole/HttpServer $server = App::$server->getServer(); $server->tick(0.5 * 1000,function () use ($cron) { $tasks = $cron->getExecTasks(); if (!empty($tasks)) { foreach ($tasks as $task) { // Diliver task Task::deliverByProcess($task['taskClass'],$task['taskMethod']); $cron->finishTask($task['key']); } } }); } } ?
定时任务的宏观执行情况如下:
以上内容希望帮助到大家,很多PHPer在进阶的时候总会遇到一些问题和瓶颈,业务代码写多了没有方向感,不知道该从那里入手去提升,对此我整理了一些资料,包括但不限于:分布式架构、高可扩展、高性能、高并发、服务器性能调优、TP6,laravel,YII2,Redis,Swoole、Swoft、Kafka、Mysql优化、shell脚本、Docker、微服务、Nginx等多个知识点高级进阶干货需要的可以免费分享给大家,需要的可以加入我的官方群点击此处。 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |