首页 云服务器Linux编程正文

linux编程_ActiveMQ持久化机制和JMS可靠消息

admin Linux编程 2020-05-26 11:15:24 77 0 Linux编程

1、ActiveMQ持久化机制

1.1 JDBC将数据持久化到数据库

1.2 AMQ天生日志文件

1.3 KahaDB:本次磁盘天生数据文件(默认)

1.4 LevelDB:谷歌K/V数据库

1.5 默认不持久化,正常的开发模式当中,是需要持久化的  

  DeliveryMode.NON_PERSISTENT = 1 不开启持久化
  DeliveryMode.PERSISTENT = 2 开启持久化
  

1.6 使用JDBC持久化  

步骤一:建立一个数据库

    

步骤二:设置activemq.xml设置文件 

1.在persistenceAdAPTer加入如下设置
    <!--createTablesOnStartup 启动是否建立表 第一次为true 后续为false-->
    <jdbcPersistenceAdapter dataSource="#activemq-db" createTablesOnStartup="true" />
  

  2.设置数据源

  

  3.将数据库毗邻Jar放到activemq解压的lib文件夹下

  

步骤三:重新启动ActiveMQ

  

  数据表注释:

  activemq_acks:用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保留。

    主要的数据库字段如下:

      CONTAINER:新闻的DestiNATion

      SUB_DEST:如果是使用Static集群,这个字段会有集群其他系统的信息

      CLIENT_ID:每个订阅者都必须有一个唯一的客户端ID用于区分

      SUB_NAME:订阅者名称

      SELECTOR:选择器,可以选择只消费知足条件的新闻。条件可以用自定义属性实现,可支持多属性AND和OR操作

      LAST_ACKED_ID:纪录最后消费过的新闻的ID

  activemq_lock:在集群环境下才有用,只有一个Broker可以获得新闻,称为Master Broker

  activemq_msgs:用于存储新闻,Queue和Topic都存储在这个表中;

    主要的数据库字段如下:

      CONTAINER:新闻的Destination

      MSGID_PROD:新闻发送者客户端的主键

      MSG_SEQ:是发送新闻的顺序,MSGID+PROD+MSG_SEQ可以组成JMS的MessageID

      EXPIRATION:新闻的过时时间

      MSG:新闻本体的Java序列化工具的二进制数据

      PRIORITY:优先级,从0-9,数值越大优先级越高

步骤四:启动提供者后数据表activemq_msgs便增添一条数据

  

   

步骤五:消费者启动后数据表新闻被删除

  由于消费者已经接受新闻,行列中数据删除了,以是数据库中的数据也被删除了

  

2、JMS可靠新闻

2.1 带事务的Session

  ①生产者必须在生产完数据后手动提交session

  

package com.zn.p2p;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 点对点提供者
 */
public class P2P_Provider {
    public static void main(String[] args) throws JMSException {
        //步骤一:建立毗邻工厂
        ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory();
        //步骤二:建立毗邻
        Connection connection = activeMQConnectionFactory.createConnection();
        //步骤三:启动毗邻
        connection.start();
        //步骤四:获取会话工厂
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        //步骤五:建立行列
        Queue quque = session.createQueue("MyQueue");
        //建立新闻生产者
        MessageProducer producer = session.createProducer(quque);
        //新闻持久化  参数1为不做持久化  2为持久化
        producer.setDeliveryMode(2);
        //模拟新闻
        TextMessage textMessage = session.createTextMessage("Hello ActiveMQ!");
        //发送新闻
        producer.send(textMessage);
        //手动提交新闻
        session.commit();
        System.out.println("生产者天生新闻完毕!");
        //接纳资源
        session.close();
        connection.close();
    }
}

②消费者在消费完数据之后也必须手动提交Session

  

package com.zn.p2p;

import org.Apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * P2P消费者
 */
public class P2P_Consumer {
    public static void main(String[] args) throws JMSException {
        //步骤一:建立毗邻工厂
        ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        //步骤二:建立毗邻
        Connection connection = activeMQConnectionFactory.createConnection();
        //步骤三:启动毗邻
        connection.start();
        //步骤四:获取会话工厂
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        //步骤五:建立行列
        Queue quque = session.createQueue("MyQueue");
        //建立消费者
        MessageConsumer consumer = session.createConsumer(quque);
        //循环获取新闻
        while (true){
            TextMessage message = (TextMessage)consumer.receive();
            session.commit();
            if (message!=null){
                System.out.println("消费者获取信息:"+message.getText());
            }else {
                break;
            }
        }
        //接纳资源
        session.close();
        connection.close();
    }
}

2.2 不带事务的Session:

  ①自动签收(不靠谱)

  

  ②手动签收

  

【版权声明】

温馨提示:文章内容系作者个人观点,不代表seo教程网对观点赞同或支持。

版权声明:本文为转载文章,来源于金邦科技 ,版权归原作者所有,欢迎分享本文,转载请保留出处!

网址:www.15544.cn


本文链接:http://15544.cn/Linuxbiancheng/65394.html

最新留言