1200字范文,内容丰富有趣,写作的好帮手!
1200字范文 > 简述thinkphp自带队列think-queue的使用以及通过supervisor实现常驻进程

简述thinkphp自带队列think-queue的使用以及通过supervisor实现常驻进程

时间:2023-09-12 13:28:11

相关推荐

简述thinkphp自带队列think-queue的使用以及通过supervisor实现常驻进程

think-queue是thinkphp官方提供的一个消息队列服务,适用于大并发、返回结果时间较长、需要批量操作等专门支持队列服务的扩展包。例如短信发送、模板消息邮件等推送。可以进行发布、获取、执行、删除、重发、失败处理、延迟执行、操作控制等操作。

1、安装think-queue

使用composer进行安装,在项目根目录,执行: composer require topthink/think-queue

2、配置消息队列的存储环境

执行完composer安装命令后,会自动生成一个queue.php的配置文件。

这里我们用redis驱动。当然还可以使用别的驱动,例如:database驱动(与redis相比不推荐)、sync等。

<?php /** * 消息队列配置 * 内置驱动:redis、database、topthink、sync*/ use think\facade\Env; return ['default'=> 'redis','prefix'=> 'prefix_','connections' => ['sync'=> ['type' => 'sync',],'database' => ['type' => 'database','queue'=> 'default','table'=> 'jobs',],'redis' => ['type' => 'redis','queue'=> Env::get('queue.queue_name', ''),'host' => Env::get('redis.redis_hostname', '127.0.0.1'),'port' => Env::get('redis.port', 6379),'password' => Env::get('redis.redis_password', ''),'select'=> Env::get('redis.select', 0),'timeout' => 0,'persistent' => false,],],'failed'=> ['type' => 'none','table' => 'failed_jobs',], ];

3、队列生产者

修改app/api/controller/Test.php里使用Queue::later或者Queue::push发布任务。

queue::push(); //创建同步任务

Queue::later(); //创建异步任务,延时执行

<?phpnamespace app\api\controller;use think\Queue;class Test extends Controller;{public function queue(){//获取参数$data = file_get_contents("php://input");if(empty($data)){$this->error("post is null");}$params = json_decode($data, true);/*创建新消息并推送到消息队列*/// 当前任务由哪个类负责处理$job_handler_classname = "app\jobs\Send";// 当前队列归属的队列名称$job_queue_name = "send_job_queue";// 当前任务所需的业务数据$job_data = ["ts"=>time(),"uniqid"=>uniqid(), "params"=>$params];// 将任务推送到消息队列等待对应的消费者去执行$is_pushed = Queue::push($job_handler_classname, $job_data, $job_queue_name);if($is_pushed == false){$this->error("send job queue went wrong");}//操作成功$this->success('success');}}

4、消息的消费和删除。

在app目录下创建jobs文件夹,并创建Send.php类。

<?phpnamespace app\jobs;use think\queue\Job;/*** 消费者类* 用于处理 send_job_queue 队列中的任务*/class Send{/*** fire是消息队列默认调用的方法* @param Job $job 当前的任务对象* @param array|mixed $data 发布任务时自定义的数据*/public function fire(Job $job, $data){//有效消息到达消费者时可能已经不再需要执行了if(!$this->checkJob($data)){$job->delete();return;}//执行业务处理if($this->doJob($data)){$job->delete();//任务执行成功后删除}else{//检查任务重试次数if($job->attempts() > 3){$job->delete();}}}/*** 消息在到达消费者时可能已经不需要执行了* @param array|mixed $data 发布任务时自定义的数据* @return boolean 任务执行的结果*/private function checkJob($data){$ts = $data["ts"];$uniqid = $data["uniqid"];$params = $data["params"];return true;}/*** 根据消息中的数据进行实际的业务处理*/private function doJob($data){// 实际业务流程处理return true;}}

5、监听任务并执行

work和listen两种,具体的用法和可选参数可以输入命令加 --help 查看

php think queue:listenphp think queue:work

以上queue配置、发布、删除、执行、监听等一个简单的队列完成。实际生产中,我们需要用常驻进程的方式运行,防止因为服务器出现问题导致监听退出手动再次启动,这就得用的我们重点提到的Supervisor。Supervisord 是用 Python 实现的一款的进程管理工具,supervisord 要求管理的程序是非 daemon 程序,supervisord 会帮你把它转成 daemon 程序,因此如果用 supervisord 来管理进程,进程需要以非daemon的方式启动。

6、安装Supervisor

yum install -y supervisor

7、查看Supervisor安装位置

supervisor安装完成后,会在/usr/bin下生成三个执行程序:supervisortd、supervisorctl、echo_supervisord_conf,分别是supervisor的守护进程服务(用于接收进程管理命令)、客户端(用于和守护进程通信,发送管理进程的指令)、生成初始配置文件程序。

[root@iZ2ze7bmgk6rb7s77apcr8Z supervisord.d]# [root@iZ2ze7bmgk6rb7s77apcr8Z supervisord.d]# [root@iZ2ze7bmgk6rb7s77apcr8Z supervisord.d]# whereis supervisordsupervisord: /usr/bin/supervisord /etc/supervisord.d /etc/supervisord.conf[root@iZ2ze7bmgk6rb7s77apcr8Z supervisord.d]# whereis echo_supervisord_confecho_supervisord_conf: /usr/bin/echo_supervisord_conf[root@iZ2ze7bmgk6rb7s77apcr8Z supervisord.d]# [root@iZ2ze7bmgk6rb7s77apcr8Z supervisord.d]# whereis supervisorctl supervisorctl: /usr/bin/supervisorctl[root@iZ2ze7bmgk6rb7s77apcr8Z supervisord.d]#

8、修改配置文件

vi /etc/supervisord.conf

[include]files = supervisord.d/*.ini

9、自定义待守护进程配置文件

在 /etc/supervisord.d 下创建以.ini 后缀的文件。编辑如下配置:[program:shop-queue] ;程序名称,在 supervisorctl 中通过这个值来对程序进行一系列的操作command=php /www/wwwroot/shop/think queue:listenautostart=true ;在 supervisord 启动的时候也自动启动autorestart=true ; 程序异常退出后自动重启user=root ;用哪个用户启动redirect_stderr=true ;把 stderr 重定向到 stdout,默认 falsestdout_logfile_maxbytes=20MB ;stdout 日志文件大小,默认 50MBstdout_logfile_backups=20 ;stdout 日志文件备份数stderr_logfile=/www/wwwroot/shop/public/queue/worker_err.log ; 错误日志文件stdout_logfile=/www/wwwroot/shop/public/queue/worker.log ;输出日志文件

10、Surpervisor的启动

# supervisord二进制启动supervisord -c /etc/supervisord.conf# 检查进程ps aux | grep supervisord# 更新Supervisorsupervisorctl update

欢迎留言讨论。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。