第一步:创建Maven项目
Maven依赖:
junit
junit
4.12
org.springframework.boot
spring-boot-starter
org.springframework.boot
spring-boot-starter-web
org.springframework.boot
spring-boot-starter-test
org.junit.vintage
junit-vintage-engine
org.springframework.boot
spring-boot-starter-activemq
2.2.6.RELEASE
org.springframework
spring-tx
5.2.5.RELEASE
org.springframework
spring-jms
5.2.5.RELEASE
org.apache.activemq
activemq-all
5.15.12
org.apache.activemq
activemq-pool
5.15.12
application.yml:
server:
port: 80
servlet:
context-path: /am
spring:
activemq:
broker-url: tcp://hcmaster:61616 #ActiveMQ服务器地址及端口
user: admin
password: admin
close-timeout: 5000
send-timeout: 3000
# 下面五行配置加上程序报错,程序启动不起来
# in-memory: false # true表示使用内置的MQ,false表示连接服务器
# pool:
# enabled: true # true表示使用连接池,false表示每发送一条数据就创建一个连接
# max-connections: 10 #连接池最大连接数
# idle-timeout: 30000 #空闲的连接过期时间,默认为30s
jms:
pub-sub-domain: false # 默认值false表示Queue,true表示Topic
topicName: springboot-activemq-topic
# debug: true #显示Debug信息
第二步:配置主题Bean
@Component
public class ActiveMQTopicConfig {
@Value("${topicName}")
private String topicName;
@Bean //在Spring中注入一个名称为activeMQConfig的Bean
public Topic topic(){
return new ActiveMQTopic(topicName);
}
}
第三步:创建Producer
@Component
public class TopicProducer {
//注入springboot封装的工具类,它是Jmstemplate的封装类
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Topic topic;
public void publishTopic(String msg) {
System.out.println("TopicProvider发送了消息 : " + msg);
//添加消息到消息队列
jmsMessagingTemplate.convertAndSend(topic, msg);
}
@Scheduled(fixedDelay = 3000)
public void publishTopicScheduled(){
String msg ="xixi";
System.out.println("TopicProvider定时发送了消息 : " + msg+" "+System.currentTimeMillis());
//方法一:添加消息到消息队列
jmsMessagingTemplate.convertAndSend(topic, msg+System.currentTimeMillis());
}
}
第三步:创建Consumer
@Component
public class TopicConsumer {
@JmsListener(destination = "${topicName}" , containerFactory="jmsListenerContainerTopic")
public void receiveQueue(String text) {
System.out.println("TopicConsumer消费了消息: "+text);
}
//需要给topic定义独立的JmsListenerContainer
@Bean
public JmsListenerContainerFactory jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setPubSubDomain(true);
bean.setConnectionFactory(activeMQConnectionFactory);
return bean;
}
}
3.测试代码
@RunWith(SpringRunner.class)
@SpringBootTest
@EnableScheduling
public class TopicTests {
//注入springboot封装的工具类,它是Jmstemplate的封装类
@Autowired
private TopicProducer topicProducer;
@Test
public void testPublishTopic() {
topicProducer.publishTopic("hehe");
}
@Test
public void testPublishTopicScheduled(){
topicProducer.publishTopicScheduled();
try {
System.in.read(); //让间隔消费发布一直进行下去
} catch (IOException e) {
e.printStackTrace();
}
}
}
结果