实现物联网设备的数据获取,有些设备的数据需要定时获取
1、做这个功能期间,请教了开发组的成员,最后通过阿正提供的思路来完成
2、首先任务有间隔执行(秒级)和 定时执行(分级),所以就考虑到每种类型的任务开两个进程来执行(也就是正哥所说的管理进程和执行进程),最终间隔执行是开启自定义进程开启一秒的Timer定时器进行从Db查询(定时任务不多的情况下),符合条件的进行投递到Task进程执行并标记为已投递,Task进程根据投递的内容,开启Timer定时器执行相关逻辑(如间隔20秒执行),然后Task进程执行相关逻辑并将定时器Id保存到数据库(用于清除定时器,切记只能清除当前进程的)。定时任务是启动一个Corntab定时任务来查询符合条件的任务,并投递到Task进程执行相关逻辑
EasySwooleEvent.php
public static function mainServerCreate(EventRegister $register)
{
// TODO: Implement mainServerCreate() method.
//定时任务检测
$processConfig= new \EasySwoole\Component\Process\Config();
$processConfig->setProcessName('checkTaskProcess');//设置进程名称
$processConfig->setProcessGroup('Task');//设置进程组
$processConfig->setEnableCoroutine(true);//是否自动开启协程
$processConfig->setMaxExitWaitTime(3);//最大退出等待时间
Manager::getInstance()->addProcess(new CheckTaskProcess($processConfig));
}
自定义进程代码
<?php
namespace App\Process;
use App\Model\Admin\Corntab\CorntabModel;
use App\Task\TimerTask;
use EasySwoole\Component\Process\AbstractProcess;
use EasySwoole\Component\Timer;
use EasySwoole\EasySwoole\Task\TaskManager;
class CheckTaskProcess extends AbstractProcess
{
protected function run($arg)
{
// TODO: Implement run() method.
//当进程启动后,会执行的回调
Timer::getInstance()->loop(1*1000,function (){
$timers = CorntabModel::create()->where('corntab_type',2)->where('corntab_state',0)->all();
if(!empty($timers)){
foreach ($timers as $timer)
{
$timer->update(['corntab_state' => 1]);
TaskManager::getInstance()->async(new TimerTask($timer->toArray()),null,0);
}
}
});
}
}
Task进程代码
<?php
namespace App\Task;
use App\Model\Admin\Corntab\CorntabModel;
use EasySwoole\Component\Timer;
use EasySwoole\Task\AbstractInterface\TaskInterface;
class TimerTask implements TaskInterface
{
protected $data;
public function __construct($data)
{
$this->data = $data;
}
function run(int $taskId, int $workerIndex)
{
$data = $this->data;
if(is_array($data))
{
$timerId = Timer::getInstance()->loop($data['corntab_intval']*1000,function () use ($data){
//相关定时业务逻辑
var_dump($data['corntab_name'].' Task---'.date('Y-m-d H:i:s'));
});
$corntab = CorntabModel::create()->get($data['corntab_id']);
$corntab->update(['corntab_timer_id' => $timerId]);
}else{
//清除定时器
Timer::getInstance()->clear($this->data);
}
// TODO: Implement run() method.
}
function onException(\Throwable $throwable, int $taskId, int $workerIndex)
{
// TODO: Implement onException() method.
}
}
EasySwooleEvent.php
public static function mainServerCreate(EventRegister $register)
{
//================================== Ctontab 定时任务 ==============================
Crontab::getInstance()->addTask(Min::class);
}
Corntab定时任务代码
<?php
namespace App\Crontab;
use App\Model\Admin\Corntab\CorntabModel;
use App\Task\CorntabTask;
use EasySwoole\EasySwoole\Crontab\AbstractCronTask;
use EasySwoole\EasySwoole\Task\TaskManager;
class Min extends AbstractCronTask
{
public static function getRule(): string
{
// TODO: Implement getRule() method.
//一分钟执行一次
return '*/1 * * * *';
}
public static function getTaskName(): string
{
// TODO: Implement getTaskName() method.
return 'CrontabOne';
}
function run(int $taskId, int $workerIndex)
{
$corntabs = CorntabModel::create()->where('corntab_type',1)->where('corntab_state',1)->all();
if(!empty($corntabs)){
foreach ($corntabs as $corntab){
TaskManager::getInstance()->async(new CorntabTask($corntab->toArray()),null,1);
}
}
// TODO: Implement run() method.
}
function onException(\Throwable $throwable, int $taskId, int $workerIndex)
{
// TODO: Implement onException() method.
}
}
Task进程代码
<?php
namespace App\Task;
use App\Utility\Logs;
use EasySwoole\Task\AbstractInterface\TaskInterface;
class CorntabTask implements TaskInterface
{
protected $data;
public function __construct($data)
{
$this->data = $data;
}
function run(int $taskId, int $workerIndex)
{
$data = $this->data;
$weeks = explode(',',$data['corntab_day']);
$day = date('w');
if(in_array($day,$weeks) && (date('H:i:s') == $data['corntab_time']))
{
Logs::getInstance()->log($data['corntab_name'].date('Y-m-d H:i:s')."到时间了,我执行了");
}
// TODO: Implement run() method.
}
function onException(\Throwable $throwable, int $taskId, int $workerIndex)
{
echo $throwable->getMessage();
// TODO: Implement onException() method.
}
}
本文为北溟有鱼QAQ原创文章,转载无需和我联系,但请注明来自北溟有鱼QAQ https://www.amdzz.cn
最新评论