0%

JMS

JMS(Java 消息服务)详解:规范、模型与实践

JMS(Java Message Service,Java 消息服务)是 Sun 公司定义的一套面向消息中间件的 Java 规范,旨在为 Java 应用程序提供统一的消息通信接口。它类似于 JDBC(数据库访问规范),屏蔽了不同消息中间件(如 ActiveMQ、RabbitMQ)的实现差异,使开发者能通过统一的 API 进行跨平台、跨组件的异步通信。

JMS 核心概念与作用

核心目标

JMS 的核心是实现应用程序之间的异步、可靠通信,通过消息中间件作为中介,解除发送方与接收方的直接耦合:

  • 发送方只需将消息发送到消息中间件,无需关心接收方是否在线或如何处理消息。
  • 接收方可在合适时机从中间件获取消息,实现 “发送即忘” 的异步通信模式。

消息系统的优势

  • 解耦:服务间无需直接依赖,通过消息间接通信,降低系统耦合度。
  • 异步:发送方无需等待接收方处理完成,提高系统响应速度。
  • 可靠:消息中间件确保消息不丢失、不重复,支持事务和持久化。
  • 削峰填谷:应对突发流量(如秒杀场景),通过消息队列缓冲请求,避免系统过载。

JMS 编程模型:六大核心组件

JMS 定义了一套标准化的组件,所有 JMS 应用程序都基于这些组件构建,确保跨平台兼容性。

系统管理对象(Administered Objects)

由消息中间件管理员配置,客户端通过 JNDI(Java 命名和目录接口)获取,包括连接工厂目标对象

(1)连接工厂(ConnectionFactory)
  • 作用:客户端创建连接(Connection)的工厂类,是连接消息中间件的入口。

  • 类型

    • QueueConnectionFactory:用于点对点(PTP)模式。
    • TopicConnectionFactory:用于发布 / 订阅(Pub/Sub)模式。
  • 获取方式:通常通过 JNDI lookup 从配置中获取(如 ActiveMQ 的默认连接工厂为 ConnectionFactory)。

    1
    2
    3
    // 从 JNDI 中获取连接工厂(示例)
    Context ctx = new InitialContext();
    QueueConnectionFactory queueCF = (QueueConnectionFactory) ctx.lookup("QueueCF");
(2)目标对象(Destination)
  • 作用:定义消息的发送目的地(生产者)和接收源头(消费者)。

  • 类型

    • Queue:点对点模式的目标对象,消息存储在队列中。
    • Topic:发布 / 订阅模式的目标对象,消息通过主题广播。
  • 获取方式:同样通过 JNDI 获取,由管理员预先配置。

    1
    2
    3
    // 获取队列(PTP)和主题(Pub/Sub)
    Queue queue = (Queue) ctx.lookup("MyQueue");
    Topic topic = (Topic) ctx.lookup("MyTopic");

连接对象(Connection)

  • 作用:封装客户端与消息中间件之间的 TCP 连接,是所有通信的基础。

  • 创建:通过连接工厂的 createConnection() 方法创建。

  • 特性:连接默认处于关闭状态,需调用 start() 启动才能接收消息;支持设置用户名 / 密码验证。

1
2
3
// 创建队列连接(PTP)
QueueConnection queueConn = queueCF.createConnection();
queueConn.start(); // 启动连接(开始接收消息)

会话对象(Session)

  • 作用:单线程的上下文环境,用于创建消息生产者、消费者、消息,支持事务管理和消息确认。

  • 创建:通过连接的 createQueueSession()createTopicSession() 方法创建,需指定两个参数:

    • 事务支持boolean transactedtrue 表示支持事务,需手动提交;false 不支持)。
    • 消息确认模式:非事务模式下生效,定义消息何时被视为 “已消费”:
      • Session.AUTO_ACKNOWLEDGE:自动确认(消息接收后自动标记为已消费)。
      • Session.CLIENT_ACKNOWLEDGE:客户端手动确认(需调用 message.acknowledge())。
      • Session.DUPS_OK_ACKNOWLEDGE:延迟确认(允许重复消息,适用于低可靠性场景)。
    1
    2
    // 创建非事务会话,自动确认消息
    QueueSession queueSession = queueConn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

消息生产者(MessageProducer)

  • 作用:由会话创建,负责向目标对象(QueueTopic)发送消息。

  • 创建:通过会话的 createProducer(Destination) 方法创建。

  • 发送消息:调用 send(Message) 方法,支持设置消息优先级、过期时间等。

    1
    2
    3
    4
    5
    6
    // 创建队列生产者,向 MyQueue 发送消息
    MessageProducer producer = queueSession.createProducer(queue);
    // 创建文本消息
    TextMessage message = queueSession.createTextMessage("Hello, JMS!");
    // 发送消息
    producer.send(message);

消息消费者(MessageConsumer)

  • 作用:由会话创建,负责从目标对象(QueueTopic)接收消息。

  • 创建:通过会话的 createConsumer(Destination) 方法创建,支持通过消息选择器(Selector)过滤消息。

  • 接收方式

    • 同步:调用 receive()receive(long timeout) 主动拉取消息(阻塞或超时等待)。
    • 异步:注册 MessageListener,消息到达时自动触发回调(推荐,非阻塞)。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    // 创建队列消费者(同步接收)
    MessageConsumer consumer = queueSession.createConsumer(queue);
    // 同步接收消息(最多等待 10 秒)
    Message msg = consumer.receive(10000);

    // 异步接收(通过监听器)
    consumer.setMessageListener(new MessageListener() {
    @Override
    public void onMessage(Message message) {
    // 处理消息
    TextMessage textMsg = (TextMessage) message;
    try {
    System.out.println("收到消息:" + textMsg.getText());
    } catch (JMSException e) {
    e.printStackTrace();
    }
    }
    });

消息对象(Message)

  • 作用:封装通信数据,JMS 定义了多种消息类型,满足不同场景:
    • TextMessage:文本消息(最常用,如 JSON、XML)。
    • ObjectMessage:序列化的 Java 对象(需注意序列化版本兼容)。
    • BytesMessage:字节数组(如文件、二进制数据)。
    • MapMessage:键值对集合。
    • StreamMessage:基本数据类型的流。
  • 结构:
    • 消息头:固定字段(如消息 ID、时间戳、目标对象、优先级等)。
    • 消息属性:自定义键值对(用于消息过滤,如 message.setStringProperty("type", "order"))。
    • 消息体:实际数据(如文本内容、对象等)。

JMS 消息模式:PTP 与 Pub/Sub

JMS 定义了两种消息传递模式,适用于不同的业务场景。

点对点模式(PTP,Point-to-Point)

  • 核心组件:基于 Queue(队列),消息生产者向队列发送消息,消费者从队列接收消息。

  • 特点

    • 一对一:每条消息只能被一个消费者接收(即使多个消费者监听同一队列,也只会有一个处理)。
    • 消息持久:消息在队列中持久化,消费者离线后再次上线仍可接收未处理的消息。
    • 拉取模式:消费者可主动拉取消息,或通过监听器异步接收。
  • 适用场景:订单通知、任务分配(确保消息被唯一处理)。

    1
    生产者 → 队列(Queue) → 唯一消费者

发布 / 订阅模式(Pub/Sub,Publish/Subscribe)

  • 核心组件:基于 Topic(主题),消息生产者(发布者)向主题发布消息,消费者(订阅者)订阅主题接收消息。

  • 特点

    • 一对多:每条消息可被所有订阅者接收(类似广播)。
    • 订阅类型:
      • 临时订阅:订阅者在线时接收消息,离线后消息丢失(生命周期与连接绑定)。
      • 持久订阅:订阅者注册持久化标识,即使离线,消息中间件也会保存消息,上线后补发。
  • 适用场景:实时通知(如股票行情、系统告警)、多系统同步。

    1
    2
    3
    发布者 → 主题(Topic) → 订阅者1
    → 订阅者2
    → ...(所有订阅者)

JMS 事务与消息确认

事务管理

  • 当会话创建时 transactedtrue 时,支持事务:

    • 多条消息发送 / 接收可作为一个事务单元,要么全部成功,要么全部回滚。
    • 需手动调用 session.commit() 提交事务,或 session.rollback() 回滚。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    // 创建事务会话
    QueueSession txSession = queueConn.createQueueSession(true, Session.AUTO_ACKNOWLEDGE);
    MessageProducer txProducer = txSession.createProducer(queue);

    try {
    txProducer.send(txSession.createTextMessage("消息1"));
    txProducer.send(txSession.createTextMessage("消息2"));
    txSession.commit(); // 提交事务(消息实际发送)
    } catch (Exception e) {
    txSession.rollback(); // 回滚(消息不发送)
    }

消息确认

  • 非事务模式下,消息确认模式决定消息何时被标记为 “已消费”:
    • AUTO_ACKNOWLEDGE:消息被接收后自动确认(最简单,适用于可靠性要求不高的场景)。
    • CLIENT_ACKNOWLEDGE:客户端调用 message.acknowledge() 手动确认(确保消息处理完成后再确认,避免丢失)。
    • DUPS_OK_ACKNOWLEDGE:延迟批量确认(可能重复接收消息,适用于容忍重复的场景)。

JMS 应用场景与优势

  • 分布式系统通信:服务间通过消息异步协作(如订单系统通知库存系统)。
  • 异步任务处理:耗时操作(如邮件发送、报表生成)通过消息队列异步执行,不阻塞主流程。
  • 流量削峰:高并发场景(如秒杀)中,消息队列缓冲请求,按系统处理能力消费。
  • 跨平台集成:不同语言、不同系统通过消息中间件互通(JMS 客户端可与非 Java 客户端通信)

欢迎关注我的其它发布渠道

表情 | 预览
快来做第一个评论的人吧~
Powered By Valine
v1.3.10