深入了解ActiveMQ!

作者阿里云代理 文章分类 分类:linux图文教程 阅读次数 已被围观 939

知道MQ(Message Queue)

什么是音讯行列


aHR0cHM6Ly9tbWJpei5xcGljLmNuL21tYml6X3BuZy9QeE16VDBPaWJmNGpONDBYQThKdDNaTVlyRE54b2NQSUZrUnRsbHhEYUlCdTZSbXdPaWNDMWQ1SW1tS0tZSm9sWVBlZXJvcEF1WW9UcnBEUWdZU1pKQXZ3LzY0MA.png


音讯行列


首要咱们先从以下几个维度来知道一下音讯行列:


  • 音讯行列:一般咱们会简称它为MQ(MessageQueue)
  • 音讯(Message):传输的数据。
  • 行列(Queue):行列是一种先进先出的数据结构。
  • 音讯行列从字面的意义来看就是一个寄存音讯的容器。
  • 音讯行列能够简单理解为:把要传输的数据放在行列中。
  • 把数据放到音讯行列叫做出产者。
  • 从音讯行列里面取数据叫做顾客。


为什么需求音讯行列


运用音讯行列首要是根据以下三个首要场景:


  • 解耦


  • 异步


  • 削峰/限流


下面咱们分场景来描述下运用音讯行列带来的优点


解耦


假定咱们有一个用户体系A,用户体系A能够产生一个userId。


然后,现在有体系B和体系C都需求这个userId去做相关的操作。


aHR0cHM6Ly9tbWJpei5xcGljLmNuL21tYml6X3BuZy9QeE16VDBPaWJmNGpONDBYQThKdDNaTVlyRE54b2NQSUZVaWNNRzZDVVJXTkx6Z1hHazVyU0dobEdZeXBhZ3ZWR3pIZkxrMlhqakwwSlUyd0w5WUNqU0JBLzY0MA.png


解耦前架构


伪码大致如下:


java public class SystemA {  // 体系B和体系C的依靠  SystemB systemB = new SystemB();  SystemC systemC = new SystemC();  // 体系A独有的数据userId  private String userId = "activeMq-1234567890";  public void doSomething() {  // 体系B和体系C都需求拿着体系A的userId去操作其他的事  systemB.SystemBNeed2do(userId);  systemC.SystemCNeed2do(userId);  } }


「这样相似的业务场景咱们是不是很熟悉,咱们是不是这样写很入情入理,也很简单。」


某一天,体系B的负责人告知体系A的负责人,现在体系B的SystemBNeed2do(String userId)这个接口不再运用了,让体系A别去调它了。


所以,体系A的负责人说"好的,那我就不调用你了。",所以就把调用体系B接口的代码给删掉了。代码变成这样了:


java public void doSomething() {  // 体系A不再调用体系B的接口了  //systemB.SystemBNeed2do(userId);  systemC.SystemCNeed2do(userId); }  


由于业务需求,体系D说也需求用到体系A的userId,所以代码改成了这样:


java public void doSomething() {  // 现已不再需求体系B的依靠了  //systemB.SystemBNeed2do(userId);  // 体系C和体系D都需求拿着体系A的userId去操作其他的事  systemC.SystemCNeed2do(userId);  systemD.SystemDNeed2do(userId);  }


当前体系A、B、C、D体系的交互是这姿态的。


aHR0cHM6Ly9tbWJpei5xcGljLmNuL21tYml6X3BuZy9QeE16VDBPaWJmNGpONDBYQThKdDNaTVlyRE54b2NQSUY4Njh2bFpNdExZczlWd3U4U2VaT0VQZUNyV1ZpYkhqM0xnZlZ2amtIZzN4NFA2NnV6amM2VmV3LzY0MA.png


体系交互


随着业务需求的改变,代码也要一遍一遍的修正。


还会存在别的一个问题,调用体系C的时分,假如体系C挂了,体系A还要想办法处理。假如调用体系D时,由于网络推迟,恳求超时了,那体系A是反馈fail还是重试?


那么怎样去处理这样的现状呢,如何从频繁的修正代码中摆脱呢?


这时分咱们就引进一层音讯行列中间件,交互图如下:


1.png


解耦


将体系A产生的userId写到音讯行列中,体系C和体系D从音讯行列中拿数据。


这样有什么优点?


  • 体系A只负责把数据写到行列中,谁想要或不想要这个数据(音讯),体系A一点都不关心。


  • 即使现在体系D不想要userId这个数据了,体系B又突然想要userId这个数据了,都跟体系A无关,体系A一点代码都不必改。


  • 体系D拿userId不再经过体系A,而是从音讯行列里面拿。体系D即使挂了或许恳求超时,都跟体系A无关,


只跟音讯行列有关。这样一来,体系A与体系B、C、D都解耦了。


异步


体系A做的是首要的业务,而体系B、C、D对错首要的业务。比方体系A处理的是订单下单,而体系B是订单下单成功了,那发送一条短信告知具体的用户此订单已成功,而体系C和体系D也是处理一些小事而已。


那么此时,为了进步用户体验和吞吐量,其实能够异步地调用体系B、C、D的接口。


aHR0cHM6Ly9tbWJpei5xcGljLmNuL21tYml6X3BuZy9QeE16VDBPaWJmNGpONDBYQThKdDNaTVlyRE54b2NQSUZGS0NiWmFmZjBHdDZGTUtMZVV0cGlhZmZQUnA2ZjFIZW95NDBPNWljNmxyZjZKZjdjSktLVk94QS82NDA.png


异步


削峰/限流


咱们再来一个场景,现在咱们每个月要搞一次大促,大促期间的并发或许会很高的,比方每秒3000个恳求。假定咱们现在有两台机器处理恳求,而且每台机器只能每次处理1000个恳求。


3.png


削峰前


体系B和体系C根据自己的能够处理的恳求数去音讯行列中拿数据,这样即使有每秒有8000个恳求,那仅仅把恳求放在音讯行列中,去拿音讯行列的音讯由体系自己去控制,这样就不会把整个体系给搞崩。


4.png


削峰/限流


什么是JMS MQ


全称:Java MessageService 中文:Java 音讯服务。


JMS 是 Java 的一套 API 标准,开端的意图是为了使应用程序能够拜访现有的MOM 系 统(MOM 是 MessageOriented Middleware 的英文缩写,指的是运用高效牢靠的音讯传递机制进行渠道无关的数据沟通,并根据数据通信来进行分布式体系的集成。) 后来被许多现有的 MOM 供应商选用,并完结为MOM 体系。


常见 MOM 体系包含 Apache的 ActiveMQ、阿里巴巴的 RocketMQ、IBM 的 MQSeries、Microsoft 的 MSMQ、BEA 的 RabbitMQ 等。(并非全部的 MOM 体系都遵从JMS 标准)】


根据 JMS 完结的 MOM,又被称为JMSProvider。


JMS中的一些概念


「Broker」


音讯服务器,作为server供给音讯核心服务


「Provider 出产者」


音讯出产者是由会话创立的一个方针,用于把音讯发动到一个意图地


「Consumer 顾客」


音讯顾客是由会话创立的一个方针,它用于接纳发送到意图地的音讯。音讯的消费能够选用以下两种办法:

同步消费。经过调用顾客的receive办法从意图地中显式提取音讯。receive办法能够一向阻塞到音讯到达。

异步消费。客户能够为顾客注册一个音讯监听器,以界说在音讯到达时所采取的动作。


「P2P 点对点音讯模型」


音讯出产者出产音讯发送到queue 中,然后音讯顾客从queue 中取出而且消费音讯。音讯被消费以后,queue 中不再有存储,所以音讯顾客不或许消费到现已被消费的音讯。Queue支撑存在多个顾客,可是对一个音讯而言,只会有一个顾客能够消费、其它的则不能消费此音讯了。当顾客不存在时,音讯会一向保存,直到有消费消费。


「Pub/Sub 发布订阅音讯模型」


音讯出产者(发布)将音讯发布到topic 中,一起有多个音讯顾客(订阅)消费该音讯。和点对点办法不同,发布到 topic 的音讯会被一切订阅者消费。当出产者发布音讯,不论是否有顾客。都不会保存音讯一定要先有音讯的顾客,后有音讯的出产者。


「P2P vs Pub/Sub」


5.png


P2P vs Pub/Sub


「Queue」


行列存储,常用于点对点音讯模型


默许只能由仅有的一个顾客处理。一旦处理音讯删去。


「Topic」


主题存储,用于订阅/发布音讯模型


主题中的音讯,会发送给一切的顾客一起处理。只要在音讯能够重复处理的业务场景中可运用。


「ConnectionFactory」


衔接工厂,jms中用它创立衔接


衔接工厂是客户用来创立衔接的方针,例如ActiveMQ供给的ActiveMQConnectionFactory。


「Connection」


JMS Connection封装了客户与JMS供给者之间的一个虚拟的衔接。


「Destination 音讯的意图地」


意图地是客户用来指定它出产的音讯的方针和它消费的音讯的来历的方针。


订阅一个主题的顾客只能消费自它订阅之后发布的音讯。JMS标准答应客户创立耐久订阅,这在一定程度上放松了时刻上的相关性要求。耐久订阅答应顾客消费它在未处于激活状态时发送的音讯。在点对点音讯传递域中,意图地被成为行列(queue);在发布/订阅音讯传递域中,意图地被成为主题(topic)。


「Session」


JMS Session是出产和消费音讯的一个单线程上下文。会话用于创立音讯出产者(producer)、音讯顾客(consumer)和音讯(message)等。会话供给了一个业务性的上下文,在这个上下文中,一组发送和接纳被组合到了一个原子操作中。


音讯牢靠性机制

「承认 JMS音讯」


只要在被承认之后,才以为现已被成功地消费了。音讯的成功消费一般包含三个阶段:客户接纳音讯、客户处理音讯和音讯被承认。


在业务性会话中,当一个业务被提交的时分,承认主动产生。


在非业务性会话中,音讯何时被承认取决于创立会话时的应答方式(acknowledgement mode)。该参数有以下三个可选值:


「Session.AUTO_ACKNOWLEDGE」。当客户成功的从receive办法返回的时分,或许从MessageListener.onMessage办法成功返回的时分,会话主动承认客户收到的音讯。


「Session.CLIENT_ACKNOWLEDGE」。客户经过音讯的acknowledge办法承认音讯。需求留意的是,在这种方式中,承认是在会话层上进行:承认一个被消费的音讯将主动承认一切已被会话消费的音讯。例如,假如一个音讯顾客消费了10个音讯,然后承认第5个音讯,那么一切10个音讯都被承认。


「Session.DUPS_ACKNOWLEDGE」。该挑选仅仅会话迟钝的承认音讯的提交。假如JMS Provider失败,那么或许会导致一些重复的音讯。假如是重复的音讯,那么JMS Provider有必要把音讯头的JMSRedelivered字段设置为true。

「耐久性」


JMS 支撑以下两种音讯提交方式:


「PERSISTENT」。指示JMSProvider耐久保存音讯,以保证音讯不会由于JMS Provider的失败而丢掉。


「NON_PERSISTENT」。不要求JMS Provider耐久保存音讯。


「优先级」


能够运用音讯优先级来指示JMS Provider首要提交紧迫的音讯。优先级分10个级别,从0(最低)到9(最高)。假如不指定优先级,默许级别是4。「需求留意的是,JMSProvider并不一定保证依照优先级的次序提交音讯。」


「音讯过期」


能够设置音讯在一定时刻后过期,默许是永不过期


「暂时意图地」


能够经过会话上的createTemporaryQueue办法和createTemporaryTopic办法来创立暂时意图地。它们的存在时刻只限于创立它们的衔接所坚持的时刻。只要创立该暂时意图地的衔接上的音讯顾客才能够从暂时意图地中提取音讯。


「耐久订阅」


首要音讯出产者有必要运用PERSISTENT提交音讯。客户能够经过会话上的createDurableSubscriber办法来创立一个耐久订阅,该办法的第一个参数有必要是一个topic,第二个参数是订阅的名称。


JMS Provider会存储发布到耐久订阅对应的topic上的音讯。假如开端创立耐久订阅的客户或许任何其它客户运用相同的衔接工厂和衔接的客户ID、相同的主题和相同的订阅名再次调用会话上的createDurableSubscriber办法,那么该耐久订阅就会被激活。


JMS Provider会向客户发送客户处于非激活状态时所发布的音讯。


耐久订阅在某个时刻只能有一个激活的订阅者。耐久订阅在创立之后会一向保留,直到应用程序调用会话上的unsubscribe办法。


「本地业务」

在一个JMS客户端,能够运用本地业务来组合音讯的发送和接纳。JMS Session接口供给了commit和rollback办法。业务提交意味着出产的一切音讯被发送,消费的一切音讯被承认;业务回滚意味着出产的一切音讯被销毁,消费的一切音讯被康复并从头提交,除非它们现已过期。


业务性的会话总是牵涉到业务处理中,commit或rollback办法一旦被调用,一个业务就结束了,而另一个业务被开端。关闭业务性会话将回滚其间的业务。


需求留意的是,假如运用恳求/回复机制,即发送一个音讯,一起希望在同一个业务中等候接纳该音讯的回复,那么程序将被挂起,由于知道业务提交,发送操作才会真正执行。需求留意的还有一个,音讯的出产和消费不能包含在同一个业务中。


ActiveMQ

存储


ActiveMQ支撑很多种存储办法,常见的有 KahaDB存储,AMQ存储,JDBC存储,LevelDB存储,Memory 音讯存储。咱们重点介绍一下KahaDB和JDBC存储办法。


KahaDB存储


KahaDB是默许的耐久化策略,一切音讯次序添加到一个日志文件中,一起别的有一个索引文件记载指向这些日志的存储地址,还有一个业务日志用于音讯回复操作。是一个专门针对音讯耐久化的处理方案,它对典型的音讯运用方式进行了优化。

在data/kahadb这个目录下,会生成四个文件,来完结音讯耐久化 db.data 它是音讯的索引文件,本质上是B-Tree(B树),运用B-Tree作为索引指向db-*.log里面存储的音讯 db.redo 用来进行音讯康复 *db-.log 存储音讯内容。


6.png


kahadb文件结构


新的数据以APPEND的办法追加到日志文件结尾。属于次序写入,因此音讯存储是比较 快的。默许

是32M,到达阀值会主动递增 lock文件 锁,写入当前取得kahadb读写权限的broker ,用于在集群环境下的竞赛处理。

KahaDB有如下几个特性:


  • 日志方式存储音讯;


  • 音讯索引以 B-Tree 结构存储,能够快速更新;


  • 完全支撑 JMS 业务;


  • 支撑多种康复机制kahadb 能够约束每个数据文件的巨细。不代表总计数据容量。


装备办法如下:


<persistenceAdapter>  <kahaDB directory="${activemq.data}/kahadb"/> persistenceAdapter>



JDBC 存储

支撑经过 JDBC 将音讯存储到联系数据库,性能上不如文件存储,能经过联系型数据库查询到音讯的信息。

MQ 支撑的数据库:Apache Derby、MySQL、PostgreSQL、Oracle、SQLServer、Sybase、Informix、MaxDB。运用JDBC存储需求用到下面三张数据表。


「activemq_acks」:用于存储订阅联系。假如是耐久化Topic,订阅者和服务器的订阅联系在这个表保存。首要的数据库字段如下:


  • container:音讯的destination


  • sub_dest:假如是运用static集群,这个字段会有集群其他体系的信息


  • client_id:每个订阅者都有必要有一个仅有的客户端id用以区别


  • sub_name:订阅者名称


  • selector:挑选器,能够挑选只消费满足条件的音讯。条件能够用自界说特点完结,可支撑多特点and和or操作


  • last_acked_id:记载消费过的音讯的id。


「activemq_lock」:在集群环境中才有用,只要一个Broker能够取得音讯,称为Master Broker,其他的只能作为备份等候Master Broker不可用,才或许成为下一个Master Broker。这个表用于记载哪个Broker是当前的Master Broker。

「activemq_msgs」:用于存储音讯,Queue和Topic都存储在这个表中。首要的数据库字段如下


  • id:自增的数据库主键


  • container:音讯的destination


  • msgid_prod:音讯发送者客户端的主键


  • msg_seq:是发送音讯的次序,msgid_prod+msg_seq能够组成jms的messageid


  • expiration:音讯的过期时刻,存储的是从1970-01-01到现在的毫秒数


  • msg:音讯本体的java序列化方针的二进制数据


  • priority:优先级,从0-9,数值越大优先级越高


  • xid:topic


装备办法如下:


  1. 装备数据源 conf/acticvemq.xml 文件:


<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">  <property name="driverClassName" value="com.mysql.jdbc.Driver"/>  <property name="url" value="jdbc:mysql://localhost:3306/activemq?relaxAutoCommit=true"/>  <property name="username" value="root"/>  <property name="password" value="111111"/>  <property name="maxActive" value="200"/>  <property name="poolPreparedStatements" value="true"/> bean> 


  1. 装备 broke 中的 persistenceAdapter dataSource 指定耐久化数据库的 bean,createTablesOnStartup 是否在发动的时分创立数据表,默许值是 true,这样每次发动都会去创立数据表了,一般是第一次发动的时分设置为 true,之后改成 false。


<persistenceAdapter>  <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false"/> persistenceAdapter> 


协议


ActiveMQ支撑的client-broker通讯协议有:TCP、NIO、UDP、SSL、Http(s)、VM。


Transmission Control Protocol (TCP)


这是默许的Broker装备,TCP的Client监听端口是61616。


在网络传输数据前,有必要要序列化数据,音讯是经过一个叫wire protocol的来序列化成字节省。默许情况下,ActiveMQ把wire protocol叫做OpenWire,它的意图是促使网络上的效率和数据快速交互。


TCP衔接的URI方式:tcp://hostname:port?key=value&key=value


TCP传输的优点:(1)TCP协议传输牢靠性高,安稳性强 (2)高效性:字节省办法传递,效率很高 (3)有效性、可用性:应用广泛,支撑任何渠道


New I/O API Protocol(NIO)


NIO协议和TCP协议相似,但NIO更侧重于底层的拜访操作。它答应开发人员对同一资源可有更多的client调用和服务端有更多的负载。


合适运用NIO协议的场景:(1)或许有大量的Client去链接到Broker上一般情况下,大量的Client去链接Broker是被操作体系的线程数所约束的。因此,NIO的完结比TCP需求更少的线程去运转,所以建议运用NIO协议 (2)或许对于Broker有一个很迟钝的网络传输NIO比TCP供给更好的性能


NIO衔接的URI方式:nio://hostname:port?key=value


Transport Connector装备示例:


<transportConnectors>  <transportConnector  name="nio"  uri="nio://localhost:61618?trace=true" /> transportConnectors>


User Datagram Protocol(UDP)


UDP和TCP的区别 (1)TCP是一个原始流的传递协议,意味着数据包是有保证的,换句话说,数据包是不会被仿制和丢掉的。


UDP,另一方面,它是不会保证数据包的传递的 (2)TCP也是一个安稳牢靠的数据包传递协议,意味着数据在传递的过程中不会被丢掉。这样保证了在发送和接纳之间能够牢靠的传递。相反,UDP仅仅是一个链接协议,所以它没有牢靠性之说

从上面能够得出:TCP是被用在安稳牢靠的场景中运用的;UDP一般用在快速数据传递和不怕数据丢掉的场景中,还有ActiveMQ经过防火墙时,只能用UDP


UDP衔接的URI方式:udp://hostname:port?key=value


Transport Connector装备示例:


<transportConnectors>  <transportConnector  name="udp"  uri="udp://localhost:61618?trace=true" /> transportConnectors>


Active MQ的安全机制


「web控制台安全」


修正jetty-realm.properties

# username: password [,rolename ...](用户名:暗码 角色)

留意:装备需重启ActiveMQ才会生效


「音讯安全机制」


修正activemq.xml 在中添加如下代码:


<plugins>  <simpleAuthenticationPlugin>  <users>  <authenticationUser username="admin" password="admin" groups="admins,publishers,consumers"/>  <authenticationUser username="publisher" password="publisher" groups="publishers,consumers"/>  <authenticationUser username="consumer" password="consumer" groups="consumers"/>  <authenticationUser username="guest" password="guest" groups="guests"/>  users>  simpleAuthenticationPlugin>  plugins>


ActiveMQ运用


在java中运用ActiveMQ只需求引进相关依靠


<dependency>  <groupId>org.apache.activemqgroupId>  <artifactId>activemq-allartifactId>  <version>5.15.11version> dependency>


编写出产者


public class Sender {  public static void main(String[] args) throws JMSException {  // 1. 树立工厂方针,  ActiveMQConnectionFactory acf = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,ActiveMQConnectionFactory.DEFAULT_PASSWORD,"tcp://localhost:61618");  //2 从工厂里拿一个衔接  Connection connection = acf.createConnection();  connection.start();  //3 从衔接中获取Session(会话)  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  //4 从会话中获取意图地(Destination)顾客会从这个意图地取音讯  Queue queue = session.createQueue("mq.test");  //5 从会话中创立音讯供给者  MessageProducer producer = session.createProducer(queue);  //6 从会话中创立文本音讯(也能够创立其它类型的音讯体)  TextMessage message = session.createTextMessage("msg: hello world");  //7 经过音讯供给者发送音讯到ActiveMQ  producer.send(message);  //8 关闭衔接  connection.close();  } }


编写顾客


public class Receiver {  public static void main(String[] args) throws JMSException {  // 1. 树立工厂方针,  ActiveMQConnectionFactory acf = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,ActiveMQConnectionFactory.DEFAULT_PASSWORD,"tcp://localhost:61618");  //2 从工厂里拿一个衔接  Connection connection = acf.createConnection();  connection.start();  //3 从衔接中获取Session(会话)  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  //4 从会话中获取意图地(Destination)顾客会从这个意图地取音讯  Queue queue = session.createQueue("mq.test");  //5 从会话中创立音讯顾客  MessageConsumer consumer = session.createConsumer(queue);  while (true){  //6 顾客接纳音讯  Message msg = consumer.receive();  TextMessage textMessage = (TextMessage) msg;  System.out.println("text:"+textMessage.getText());  }  } }


常用API及特性


  • 业务音讯
    Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
    提交业务:session.commit();
    回滚业务:session.rollback();
    敞开业务后,只要业务commit成功,音讯才会发送到MQ中


  • 耐久化
    默许耐久化是敞开的;
    敞开非耐久化示例代码:
    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)


  • 设置音讯优先级
    producer.setPriority();


  • 设置音讯超时/过期时刻
    producer.setTimeToLive
    设置了音讯超时的音讯,消费端在超时后无法在消费到此音讯。


  • 死信
    此类音讯会进入到ActiveMQ.DLQ行列且不会主动清除,称为死信,有音讯堆积的危险。


  • 签收方式
    签收代表接纳端的session已收到音讯的一次承认,反馈给broker
    假如session带有业务,而且业务成功提交,则音讯被主动签收。假如业务回滚,则音讯会被再次传送。
    音讯业务是在出产者producer到broker或broker到consumer过程中同一个session中产生的,保证几条音讯在发送过程中的原子性。在支撑业务的session中,producer发送message时在message中带有transactionID。broker收到message后判别是否有transactionID,假如有就把message保存在transaction store中,等候commit或许rollback音讯。
    ActiveMQ支撑主动签收与手动签收
    「Session.AUTO_ACKNOWLEDGE」
    当客户端从receiver或onMessage成功返回时,Session主动签收客户端的这条音讯的收条。
    「Session.CLIENT_ACKNOWLEDGE」
    客户端经过调用音讯(Message)的acknowledge办法签收音讯。在这种情况下,签收产生在Session层面:签收一个现已消费的音讯会主动地签收这个Session一切已消费的收条。
    「Session.DUPS_OK_ACKNOWLEDGE」
    Session不必保证对传送音讯的签收,这个方式或许会引起音讯的重复,可是降低了Session的开销,所以只要客户端能容忍重复的音讯,才可运用。

  • 独占顾客
    Queue queue = session.createQueue("xxoo?consumer.exclusive=true");

  • 发送异步音讯


ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(  "admin",  "admin",  "tcp://localhost:61616"  ); // 2.获取一个向ActiveMQ的衔接 connectionFactory.setUseAsyncSend(true); ActiveMQConnection connection = (ActiveMQConnection)connectionFactory.createConnection(); connection.setUseAsyncSend(true);


  • 音讯堆积
    producer每发送一个音讯,计算一下发送的字节数,当字节数到达ProducerWindowSize值时,需求等候broker的承认,才干持续发送。
    brokerUrl中设置:tcp://localhost:61616?jms.producerWindowSize=1048576
    destinationUri中设置:myQueue?producer.windowSize=1048576

  • 推迟音讯投递
    首要在装备文件中敞开推迟和调度


<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">


推迟发送示例代码:

message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,10*1000);


  • 创立监听器


ActiveMQConnectionFactory acf = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,  ActiveMQConnectionFactory.DEFAULT_PASSWORD,  "tcp://localhost:61618"); //2 从工厂里拿一个衔接 Connection connection = acf.createConnection(); connection.start(); //3 从衔接中获取Session(会话) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4 从会话中获取意图地(Destination)顾客会从这个意图地取音讯 Queue queue = session.createQueue("mq.test"); //5 从会话中创立音讯顾客 MessageConsumer consumer = session.createConsumer(queue); MyListener myListener = new MyListener(); MessageListener listener = myListener::receiveMessage; consumer.setMessageListener(listener);


SpringBoot整合ActiveMQ


  • 添加依靠


 <dependency>  <groupId>org.springframework.bootgroupId>  <artifactId>spring-boot-starter-activemqartifactId> dependency>


  • 装备文件


server:  port: 80 spring:  activemq:  broker-url: tcp://localhost:61618  user: admin  password: admin  pool:  enabled: true  #衔接池最大衔接数  max-connections: 5  #空闲的衔接过期时刻,默许为30秒  idle-timeout: 0  packages:  trust-all: true  jms:  pub-sub-domain: true


  • 装备类


 @Configuration @EnableJms public class ActiveMqConfig { // topic方式的ListenerContainer  @Bean public JmsListenerContainerFactory jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {  DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();  bean.setPubSubDomain(true);  bean.setConnectionFactory(activeMQConnectionFactory);  return bean;  } // queue方式的ListenerContainer @Bean public JmsListenerContainerFactory jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) {  DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();  bean.setConnectionFactory(activeMQConnectionFactory);  return bean;  } }  


  • 编写出产者


@Service public class MqProducerService {  @Autowired  private JmsMessagingTemplate jmsMessagingTemplate;   public void sendStringQueue(String destination, String msg) {  System.out.println("send...");  ActiveMQQueue queue = new ActiveMQQueue(destination);  jmsMessagingTemplate.afterPropertiesSet();  ConnectionFactory factory = jmsMessagingTemplate.getConnectionFactory();  try {  Connection connection = factory.createConnection();  connection.start();   Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);  Queue queue2 = session.createQueue(destination);   MessageProducer producer = session.createProducer(queue2);   TextMessage message = session.createTextMessage("hahaha");    producer.send(message);  } catch (JMSException e) {  // TODO Auto-generated catch block  e.printStackTrace();  }   jmsMessagingTemplate.convertAndSend(queue, msg);  }  public void sendStringQueueList(String destination, String msg) {  System.out.println("xxooq");  ArrayList<String> list = new ArrayList<>();  list.add("1");  list.add("2");  jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(destination), list);  } }


  • 编写顾客


@JmsListener(destination = "user",containerFactory = "jmsListenerContainerQueue") public void receiveStringQueue(String msg) {  System.out.println("接纳到音讯...." + msg);  }  @JmsListener(destination = "ooo",containerFactory = "jmsListenerContainerTopic") public void receiveStringTopic(String msg) {  System.out.println("接纳到音讯...." + msg);  }


小结


本文具体介绍了为什么需求引进音讯行列,JMS、ActiveMQ的基础概念以及常用API,与原生JAVA整合及SpringBoot整合等知识点,能够让咱们更好的了解ActiveMQ的运用场景及运用办法。

本公司销售:阿里云、腾讯云、百度云、天翼云、金山大米云、金山企业云盘!可签订合同,开具发票。

我有话说: