1200字范文,内容丰富有趣,写作的好帮手!
1200字范文 > SpringBoot+RocketMq+Mybatis项目整合demo

SpringBoot+RocketMq+Mybatis项目整合demo

时间:2021-01-08 05:54:52

相关推荐

SpringBoot+RocketMq+Mybatis项目整合demo

Demo实现的功能

项目启动生产者和消费者2个服务后, 生产者端执行用户查询, 从数据库查出用户数据后, 发送给消息中间件rocketMq, 消费者监听到mq消息后获取到用户数据.

github源码地址

/mikewuhao/soringBoot-mq-demo/

搭建详细步骤

1. 准备工作

提前把rocketMq环境搭建和启动好, rocketMq可视化工具安装启动好.

(本人mac环境部署rocketMq,参考/p/a759e8ea6ac1)

(windows环境部署rocketMq,参考/darendu/p/12036380.html)

注意: 按照官方启动rocketMq ,可能会报运行环境内存不足,建议修改内存, 参考: /u014803081/article/details/90705792?utm_medium=distribute.pc_relevant.none-task-blog-baidujs-6

2. 项目结构

在idea开发工具里面先建普通空的project, 命名为springBoot-mq-demo, 再新建2个maven类型的module, 分别命名 consumer(消费者服务), provider(生产者服务).

3. provider生产者服务创建

3.1 provider服务目录结构

3.2 provider服务的pom文件

<?xml version="1.0" encoding="UTF-8"?><project xmlns="/POM/4.0.0"xmlns:xsi="/2001/XMLSchema-instance"xsi:schemaLocation="/POM/4.0.0 /xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.wuhao.demo</groupId><artifactId>provider</artifactId><version>1.0-SNAPSHOT</version><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.7.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- spring-boot整合mybatis --><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>1.1.1</version></dependency><!-- mysql驱动 --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.6</version></dependency><!--rocketMq--><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-common</artifactId><version>4.4.0</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.4.0</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId></dependency><!--lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.0</version><scope>provided</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

3.3 provider服务的application.properties

server.port=.springframework=DEBUG#数据库spring.datasource.driver-class-name= com.mysql.jdbc.Driverspring.datasource.url = jdbc:mysql://127.0.0.1:3306/boot?useUnicode=true&characterEncoding=utf-8spring.datasource.username = rootspring.datasource.password = root#mybatismybatis.type-aliases-package=com.wuhao.domainmybatis.mapper-locations=classpath:mapper/*.xml#Rocketmq producerrocketmq.producer.groupName=ProducerGrouprocketmq.producer.namesrvAddr=127.0.0.1:9876rocketmq.producer.instanceName=ProducerGrouprocketmq.producer.topic=topicrocketmq.producer.tag=testrocketmq.producer.maxMessageSize=131072rocketmq.producer.sendMsgTimeout=10000

3.4 provider服务的启动类, ProviderApplication

package com.wuhao;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class ProviderApplication {public static void main(String[] args) {SpringApplication.run(ProviderApplication.class, args);}}

3.5 provider服务的controller, MqController

package com.wuhao.controller;import com.alibaba.fastjson.JSONObject;import com.wuhao.domain.User;import com.wuhao.mq.RocketMQProducer;import com.wuhao.service.UserService;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.mon.message.Message;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;import java.text.SimpleDateFormat;import java.util.Date;/*** @author wuhao* @Title: MqController* @Description: Mq测试 controller* @date /6/9 17:38*/@RestController@Slf4jpublic class MqController {@Autowired@Qualifier("rocketMQProducer")RocketMQProducer rocketMQProducer;@Autowiredprivate UserService userService;@GetMapping("/testSend")public void testSend() {DefaultMQProducer producer = rocketMQProducer.getRocketMQProducer();SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");User user = userService.queryUserById(1L);String body = "hi RocketMQ, now is " + sdf.format(new Date()) + "---"+ JSONObject.toJSONString(user);Message message = new Message("topic", "test", body.getBytes());try {producer.send(message);} catch (Exception e) {e.printStackTrace();}}}

3.6 provider服务的mq配置类, RocketMQProducer

package com.wuhao.mq;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/*** @author wuhao* @Title: RocketMQProducer* @Description: 消息生产者* @date /6/9 17:31*/@Configuration@Slf4jpublic class RocketMQProducer {@Value("${rocketmq.producer.groupName}")private String groupName;@Value("${rocketmq.producer.namesrvAddr}")private String nameserAddr;@Value("${rocketmq.producer.instanceName}")private String instanceName;@Value("${rocketmq.producer.maxMessageSize}")private int maxMessageSize;@Value("${rocketmq.producer.sendMsgTimeout}")private int sendMsgTimeout;@Bean(initMethod = "start", destroyMethod = "shutdown")public DefaultMQProducer getRocketMQProducer() {DefaultMQProducer producer = new DefaultMQProducer(groupName);producer.setNamesrvAddr(nameserAddr);producer.setInstanceName(instanceName);producer.setMaxMessageSize(maxMessageSize);producer.setSendMsgTimeout(sendMsgTimeout);producer.setVipChannelEnabled(false);log.info("================>生产者创建完成,ProducerGroupName{}<================", groupName);return producer;}}

3.7 provider服务的实体类, User

package com.wuhao.domain;import lombok.AllArgsConstructor;import lombok.NoArgsConstructor;import javax.persistence.*;/*** @Description: User实体* @CreateDate: -02-04 19:25* @Author: wuhao*/@Entity@NoArgsConstructor@AllArgsConstructorpublic class User {@Id@GeneratedValueprivate Long id;@Column(name = "username")private String username;@Column(name = "birthday")private String birthday;@Column(name = "sex")private String sex;@Column(name = "address")private String address;public Long getId() {return id;}public void setId(Long id) {this.id = id;}public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}public String getBirthday() {return birthday;}public void setBirthday(String birthday) {this.birthday = birthday;}public String getSex() {return sex;}public void setSex(String sex) {this.sex = sex;}public String getAddress() {return address;}public void setAddress(String address) {this.address = address;}@Overridepublic String toString() {return "User{" +"id=" + id +", username='" + username + '\'' +", birthday='" + birthday + '\'' +", sex='" + sex + '\'' +", address='" + address + '\'' +'}';}}

3.8 provider服务的接口

package com.wuhao.service;import com.wuhao.domain.User;/*** @Description: User的Service* @CreateDate: -06-09 09:35* @Author: wuhao*/public interface UserService {User queryUserById(Long id);int addUser(User user);int modifyUser(User user);int deleteUserById(Long id);}

3.8 provider服务的接口实现类

package com.wuhao.service.impl;import com.wuhao.domain.User;import com.wuhao.dao.UserMapper;import com.wuhao.service.UserService;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;/*** @Description:User的Service实现类* @CreateDate: -02-05 09:36* @Author: wuhao*/@Servicepublic class UserServiceImpl implements UserService {@Autowiredprivate UserMapper userMapper;@Overridepublic User queryUserById(Long id) {return userMapper.queryUserById(id);}@Overridepublic int addUser(User user) {return userMapper.addUser(user);}@Overridepublic int modifyUser(User user) {return userMapper.modifyUser(user);}@Overridepublic int deleteUserById(Long id) {return userMapper.deleteUserById(id);}}

3.9 provider服务的Dao层, UserMapper

package com.wuhao.dao;import com.wuhao.domain.User;import org.apache.ibatis.annotations.Mapper;import org.springframework.stereotype.Repository;/*** @Description: user的mapper* @CreateDate: -06-04 19:38* @Author: wuhao*/@Mapper@Repositorypublic interface UserMapper {User queryUserById(Long id);int addUser(User user);int modifyUser(User user);int deleteUserById(Long id);}

3.10 provider服务sql的xml文件, userMapper.xml

<?xml version="1.0" encoding="UTF-8"?><!DOCTYPE mapper PUBLIC "-////DTD Mapper 3.0//EN""/dtd/mybatis-3-mapper.dtd"><mapper namespace="com.wuhao.dao.UserMapper"><!--按id查询用户--><select id="queryUserById" resultType="com.wuhao.domain.User">select * from `user` where id = #{id}</select><!--用户更新--><update id="modifyUser" parameterType="com.wuhao.domain.User" >update `user` set username=#{username},birthday=#{birthday},sex=#{sex}, address=#{address} where id=#{id}</update><!--删除用户--><delete id="deleteUserById" parameterType="long">delete from `user` where id=#{id}</delete><!--用户添加--><insert id="addUser" parameterType="com.wuhao.domain.User">insert into `user` (username,birthday,sex,address)values(#{username},#{birthday},#{sex},#{address})</insert></mapper>

4. consumer消费者服务创建

4.1 consumer服务目录结构

4.2 consumer服务的pom文件

<?xml version="1.0" encoding="UTF-8"?><project xmlns="/POM/4.0.0"xmlns:xsi="/2001/XMLSchema-instance"xsi:schemaLocation="/POM/4.0.0 /xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.wuhao.demo</groupId><artifactId>consumer</artifactId><version>1.0-SNAPSHOT</version><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.7.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--rocketMq--><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-common</artifactId><version>4.4.0</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.4.0</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId></dependency><!--lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.0</version><scope>provided</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

4.3 consumer服务的application.properties

server.port=.springframework=DEBUG#Rocketmq consumerrocketmq.consumer.namesrvAddr=127.0.0.1:9876rocketmq.consumer.groupName=ConsumerGrouprocketmq.consumer.topic=topicrocketmq.consumer.tag=testrocketmq.consumer.consumeThreadMin=20rocketmq.consumer.consumeThreadMax=64

4.4 consumer服务的启动类

package com.wuhao;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)public class ConsumerApplication {public static void main(String[] args) {SpringApplication.run(ConsumerApplication.class, args);}}

4.5 consumer服务的实体类, User

package com.wuhao.domain;import lombok.AllArgsConstructor;import lombok.NoArgsConstructor;import javax.persistence.Column;import javax.persistence.Entity;import javax.persistence.GeneratedValue;import javax.persistence.Id;/*** @Description: User实体* @CreateDate: -02-04 19:25* @Author: wuhao*/@Entity@NoArgsConstructor@AllArgsConstructorpublic class User {@Id@GeneratedValueprivate Long id;@Column(name = "username")private String username;@Column(name = "birthday")private String birthday;@Column(name = "sex")private String sex;@Column(name = "address")private String address;public Long getId() {return id;}public void setId(Long id) {this.id = id;}public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}public String getBirthday() {return birthday;}public void setBirthday(String birthday) {this.birthday = birthday;}public String getSex() {return sex;}public void setSex(String sex) {this.sex = sex;}public String getAddress() {return address;}public void setAddress(String address) {this.address = address;}@Overridepublic String toString() {return "User{" +"id=" + id +", username='" + username + '\'' +", birthday='" + birthday + '\'' +", sex='" + sex + '\'' +", address='" + address + '\'' +'}';}}

4.6 consumer服务的监听器, MessageListen

package com.wuhao.mq;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.mon.message.MessageExt;import org.springframework.beans.factory.annotation.Autowired;import org.ponent;import java.util.List;/*** @author wuhao* @Title: MessageListen* @Description: 消息监听类* @date /4/17 17:28*/@Componentpublic class MessageListen implements MessageListenerConcurrently {@Autowiredprivate MessageProcessor messageProcessor;@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {MessageExt ext = list.get(0);boolean result = messageProcessor.handle(ext);if (!result) {return ConsumeConcurrentlyStatus.RECONSUME_LATER;}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}}

4.7 consumer服务的mq消息处理接口

package com.wuhao.mq;import org.mon.message.MessageExt;/*** @author wuhao* @Title: MessageProcessor* @Description: mq消息处理接口* @date /4/17 17:24*/public interface MessageProcessor {boolean handle(MessageExt messageExt);}

4.8 consumer服务的消息处理类

package com.wuhao.mq;import org.mon.message.MessageExt;import org.springframework.stereotype.Service;/*** @author wuhao* @Title: MessageProcessorImpl* @Description: 消息处理类* @date /4/17 17:27*/@Servicepublic class MessageProcessorImpl implements MessageProcessor {@Overridepublic boolean handle(MessageExt messageExt) {// 收到的body(消息体),字节类型,需转为StringString result = new String(messageExt.getBody());System.out.println("监听到了消息,消息为:"+ result);return true;}}

4.9 consumer服务的mq消费者配置类

package com.wuhao.mq;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.exception.MQClientException;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/*** @author wuhao* @Title: RocketMQConsumer* @Description: Mq消费者* @date /4/17 17:36*/@Configuration@Slf4jpublic class RocketMQConsumer {@Autowiredprivate MessageListen messageListen;@Value("${rocketmq.consumer.namesrvAddr}")private String namesrvAddr;@Value("${rocketmq.consumer.groupName}")private String groupName;@Value("${rocketmq.consumer.topic}")private String topic;@Value("${rocketmq.consumer.tag}")private String tag;@Value("${rocketmq.consumer.consumeThreadMin}")private int consumeThreadMin;@Value("${rocketmq.consumer.consumeThreadMax}")private int consumeThreadMax;@Bean(initMethod = "start", destroyMethod = "shutdown")public DefaultMQPushConsumer getRocketMQConsumer() {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);consumer.setNamesrvAddr(namesrvAddr);consumer.setConsumeThreadMin(consumeThreadMin);consumer.setConsumeThreadMax(consumeThreadMax);consumer.setVipChannelEnabled(false);// 我们自己实现的监听类consumer.registerMessageListener(messageListen);try {consumer.subscribe(topic, tag);log.info("================>消费者创建完成,ConsumerGroupName{}<================", groupName);log.info("============>消费者监听开始,groupName:{},topic:{}<============", groupName, topic);} catch (MQClientException e) {log.error("消费者启动失败");e.printStackTrace();}return consumer;}}

演示效果

编写完成后, 按顺序启动provider服务, consumer服务

1 浏览器上执行http://localhost:8082/testSend, 开始发送消息

2 查看rocketMq可视化工具, mq里已经有消息了.

3查看consumer服务的控制台日志, 已经有消息输出了

遇到过的问题

安装启动rocketMq时 ,windows环境下一直报运行环境内存不足,修改内存也不起作用, 不知道是够是个人电脑问题, 后来换成mac环境, 成功安装和启动了rocketMq.

参考博客

/p/38a3596beea6

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。