关于消息队列应用场景自行百度吧。另外,ThinkPHP > 5.1版本的可以直接使用think-queue2.0版本,具体教程参考:/wslmf/notes/tree/master/thinkphp-queue
准备
我们使用ThinkPHP5.0框架作为基础框架,因此安装think-queue版本就需要指定到1.6版本了composerrequiretopthink/think-queue1.1.6
然后需要在服务器安装redis和php的redis扩展
配置
上诉think-queue安装后会在生成一个extra的目录,这里面包含queue.php也就是消息队列的配置,文件路径:application/extra/queue.phpreturn[
//'connector'=>'Sync'
'connector'=>'Redis',//Redis驱动
'expire'=>60,//任务的过期时间,默认为60秒;若要禁用,则设置为null
'default'=>'default',//默认的队列名称
'host'=>'127.0.0.1',//redis主机ip
'port'=>6379,//redis端口
'password'=>'',//redis密码
'select'=>1,//使用哪一个db,默认为db0
'timeout'=>0,//redis连接的超时时间
'persistent'=>false,//是否是长连接
];
代码
创建一个控制器,执行队列里的任务,例如创建一个app\message\controller\DoJob.php。namespaceapp\message\controller;
usethink\Db;
usethink\Queue\Job;
classDoJob{
/**
*fire方法是消息队列默认调用的方法
*@paramJob$job当前的任务对象
*@param$data
*@returnint
*/
publicfunctionfire(Job$job,$data){
//这里$data定义格式为(也就是push队列的时候传递过来的参数):$data=['type'=>1,'data_id'=>123,'ts'=>time()]
if(empty($data)){
$job->delete();
return0;
}
//有些消息在到达消费者时,可能已经不再需要执行了
$isJobStillNeedToBeDone=$this->checkDatabaseToSeeIfJobNeedToBeDone($data);
if(!$isJobStillNeedToBeDone){
$job->delete();
return0;
}
if(is_array($data)&&isset($data['type'])){
$type=$data['type'];
if($type==1){
//执行发送邮件业务
$isJobDone=$this->sendEmail($data['data_id']);
}elseif($type==2){
//执行APP推送消息业务
$isJobDone=$this->sendAppMessage($data['data_id']);
}elseif($type==3){
//执行订单业务
$isJobDone=$this->orderService($data['data_id']);
}else{
return0;
}
}else{
return0;
}
if($isJobDone){
//如果任务执行成功,删除任务
$job->delete();
}else{
if($job->attempts()>3){
//通过这个方法可以检查这个任务已经重试了几次了
$job->delete();
//也可以重新发布这个任务
//$job->release(2);//$delay为延迟时间,表示该任务延迟2秒后再执行
}
}
}
//检查是否需要继续执行任务
privatefunctioncheckDatabaseToSeeIfJobNeedToBeDone($data){
returntrue;
}
//发App消息业务
privatefunctionsendAppMessage($id){
}
//处理订单业务
privatefunctionorderService($id){
}
}
发布任务到队列中,创建一个测试代码,例如创建一个app\message\controller\Addjob.php:namespaceapp\message\controller;
usethink\Controller;
usethink\Queue;
classAddjobextendsController{
publicfunctionindex()
{
//定义要传递给队列的数据(类型是数组)
$jobData=['type'=>1,'data_id'=>12,'ts'=>time()];
//将该任务推送到消息队列,等待对应的消费者去执行
$isPushed=Queue::push('app\message\controller\DoJob',$jobData,'JobQueue');
if($isPushed!==false){
return'消息已发出';
}else{
return'消息发送出错';
}
}
}
我们直接访问上方的测试代码控制器,http://youdomain/index.php?s=/message/addjob/index ,会出现消息已发送提示,这时候表示队列已经添加ok,我们使用命令监听下
cd指定到ThinkPHP框架根目录下,使用think进行监听执行#测试监听
phpqueue:listen--queueJobQueue&
#实际生产中使用下方命令进行后台监听
nohupphpthinkqueue:listen--queueJobQueue2>&1&
保活
如果经一段时间发现,无论怎么样退出客户端后linux还是无法继续监听执行queue的线程,那么我们可以使用定时任务进行保活监控
我们保活的话要使用queue:work 而不是使用listen,因为work是单线程的listen是多线程,如果服务器内容太小分分钟“爆炸”。cd/www/wwwroot/&&nohupphpthinkqueue:work--queueJobQueue&
上面就是我们需要定时监控的脚本指令了,也可以使用自己的宝塔定时任务监控,监控频率1分钟!
调试日志
在DoJob文件中使用Log::write('这是调试日志','info'),此方式进行日志打印会发现很容易分辨出来,在runtime/log 目录下会有一个 日期_cli.log的文件,比如:09_cli.log 这里面就全部是消息队列打出来的日志信息了,与其他日志区分了文件还是比较方便看的。
其他
在消息队列的逻辑处理中,一旦处理完毕需要记得自己删除任务,否则可能会一直执行下去来消耗内存。关于listen和work的区别看下:work和listen模式的区别,进行查看。
参考