JMS(Java 消息服务)详解:规范、模型与实践
JMS(Java Message Service,Java 消息服务)是 Sun 公司定义的一套面向消息中间件的 Java 规范,旨在为 Java 应用程序提供统一的消息通信接口。它类似于 JDBC(数据库访问规范),屏蔽了不同消息中间件(如 ActiveMQ、RabbitMQ)的实现差异,使开发者能通过统一的 API 进行跨平台、跨组件的异步通信。
JMS 核心概念与作用
核心目标
JMS 的核心是实现应用程序之间的异步、可靠通信,通过消息中间件作为中介,解除发送方与接收方的直接耦合:
- 发送方只需将消息发送到消息中间件,无需关心接收方是否在线或如何处理消息。
- 接收方可在合适时机从中间件获取消息,实现 “发送即忘” 的异步通信模式。
消息系统的优势
- 解耦:服务间无需直接依赖,通过消息间接通信,降低系统耦合度。
- 异步:发送方无需等待接收方处理完成,提高系统响应速度。
- 可靠:消息中间件确保消息不丢失、不重复,支持事务和持久化。
- 削峰填谷:应对突发流量(如秒杀场景),通过消息队列缓冲请求,避免系统过载。
JMS 编程模型:六大核心组件
JMS 定义了一套标准化的组件,所有 JMS 应用程序都基于这些组件构建,确保跨平台兼容性。
系统管理对象(Administered Objects)
由消息中间件管理员配置,客户端通过 JNDI(Java 命名和目录接口)获取,包括连接工厂和目标对象。
(1)连接工厂(ConnectionFactory)
作用:客户端创建连接(
Connection)的工厂类,是连接消息中间件的入口。类型:
QueueConnectionFactory:用于点对点(PTP)模式。TopicConnectionFactory:用于发布 / 订阅(Pub/Sub)模式。
获取方式:通常通过 JNDI lookup 从配置中获取(如 ActiveMQ 的默认连接工厂为
ConnectionFactory)。1
2
3// 从 JNDI 中获取连接工厂(示例)
Context ctx = new InitialContext();
QueueConnectionFactory queueCF = (QueueConnectionFactory) ctx.lookup("QueueCF");
(2)目标对象(Destination)
作用:定义消息的发送目的地(生产者)和接收源头(消费者)。
类型:
Queue:点对点模式的目标对象,消息存储在队列中。Topic:发布 / 订阅模式的目标对象,消息通过主题广播。
获取方式:同样通过 JNDI 获取,由管理员预先配置。
1
2
3// 获取队列(PTP)和主题(Pub/Sub)
Queue queue = (Queue) ctx.lookup("MyQueue");
Topic topic = (Topic) ctx.lookup("MyTopic");
连接对象(Connection)
作用:封装客户端与消息中间件之间的 TCP 连接,是所有通信的基础。
创建:通过连接工厂的
createConnection()方法创建。特性:连接默认处于关闭状态,需调用
start()启动才能接收消息;支持设置用户名 / 密码验证。
1 | // 创建队列连接(PTP) |
会话对象(Session)
作用:单线程的上下文环境,用于创建消息生产者、消费者、消息,支持事务管理和消息确认。
创建:通过连接的
createQueueSession()或createTopicSession()方法创建,需指定两个参数:- 事务支持:
boolean transacted(true表示支持事务,需手动提交;false不支持)。 - 消息确认模式:非事务模式下生效,定义消息何时被视为 “已消费”:
Session.AUTO_ACKNOWLEDGE:自动确认(消息接收后自动标记为已消费)。Session.CLIENT_ACKNOWLEDGE:客户端手动确认(需调用message.acknowledge())。Session.DUPS_OK_ACKNOWLEDGE:延迟确认(允许重复消息,适用于低可靠性场景)。
1
2// 创建非事务会话,自动确认消息
QueueSession queueSession = queueConn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);- 事务支持:
消息生产者(MessageProducer)
作用:由会话创建,负责向目标对象(
Queue或Topic)发送消息。创建:通过会话的
createProducer(Destination)方法创建。发送消息:调用
send(Message)方法,支持设置消息优先级、过期时间等。1
2
3
4
5
6// 创建队列生产者,向 MyQueue 发送消息
MessageProducer producer = queueSession.createProducer(queue);
// 创建文本消息
TextMessage message = queueSession.createTextMessage("Hello, JMS!");
// 发送消息
producer.send(message);
消息消费者(MessageConsumer)
作用:由会话创建,负责从目标对象(
Queue或Topic)接收消息。创建:通过会话的
createConsumer(Destination)方法创建,支持通过消息选择器(Selector)过滤消息。接收方式:
- 同步:调用
receive()或receive(long timeout)主动拉取消息(阻塞或超时等待)。 - 异步:注册
MessageListener,消息到达时自动触发回调(推荐,非阻塞)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18// 创建队列消费者(同步接收)
MessageConsumer consumer = queueSession.createConsumer(queue);
// 同步接收消息(最多等待 10 秒)
Message msg = consumer.receive(10000);
// 异步接收(通过监听器)
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
// 处理消息
TextMessage textMsg = (TextMessage) message;
try {
System.out.println("收到消息:" + textMsg.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});- 同步:调用
消息对象(Message)
- 作用:封装通信数据,JMS 定义了多种消息类型,满足不同场景:
TextMessage:文本消息(最常用,如 JSON、XML)。ObjectMessage:序列化的 Java 对象(需注意序列化版本兼容)。BytesMessage:字节数组(如文件、二进制数据)。MapMessage:键值对集合。StreamMessage:基本数据类型的流。
- 结构:
- 消息头:固定字段(如消息 ID、时间戳、目标对象、优先级等)。
- 消息属性:自定义键值对(用于消息过滤,如
message.setStringProperty("type", "order"))。 - 消息体:实际数据(如文本内容、对象等)。
JMS 消息模式:PTP 与 Pub/Sub
JMS 定义了两种消息传递模式,适用于不同的业务场景。
点对点模式(PTP,Point-to-Point)
核心组件:基于
Queue(队列),消息生产者向队列发送消息,消费者从队列接收消息。特点:
- 一对一:每条消息只能被一个消费者接收(即使多个消费者监听同一队列,也只会有一个处理)。
- 消息持久:消息在队列中持久化,消费者离线后再次上线仍可接收未处理的消息。
- 拉取模式:消费者可主动拉取消息,或通过监听器异步接收。
适用场景:订单通知、任务分配(确保消息被唯一处理)。
1
生产者 → 队列(Queue) → 唯一消费者
发布 / 订阅模式(Pub/Sub,Publish/Subscribe)
核心组件:基于
Topic(主题),消息生产者(发布者)向主题发布消息,消费者(订阅者)订阅主题接收消息。特点:
- 一对多:每条消息可被所有订阅者接收(类似广播)。
- 订阅类型:
- 临时订阅:订阅者在线时接收消息,离线后消息丢失(生命周期与连接绑定)。
- 持久订阅:订阅者注册持久化标识,即使离线,消息中间件也会保存消息,上线后补发。
适用场景:实时通知(如股票行情、系统告警)、多系统同步。
1
2
3发布者 → 主题(Topic) → 订阅者1
→ 订阅者2
→ ...(所有订阅者)
JMS 事务与消息确认
事务管理
当会话创建时
transacted为true时,支持事务:- 多条消息发送 / 接收可作为一个事务单元,要么全部成功,要么全部回滚。
- 需手动调用
session.commit()提交事务,或session.rollback()回滚。
1
2
3
4
5
6
7
8
9
10
11// 创建事务会话
QueueSession txSession = queueConn.createQueueSession(true, Session.AUTO_ACKNOWLEDGE);
MessageProducer txProducer = txSession.createProducer(queue);
try {
txProducer.send(txSession.createTextMessage("消息1"));
txProducer.send(txSession.createTextMessage("消息2"));
txSession.commit(); // 提交事务(消息实际发送)
} catch (Exception e) {
txSession.rollback(); // 回滚(消息不发送)
}
消息确认
- 非事务模式下,消息确认模式决定消息何时被标记为 “已消费”:
AUTO_ACKNOWLEDGE:消息被接收后自动确认(最简单,适用于可靠性要求不高的场景)。CLIENT_ACKNOWLEDGE:客户端调用message.acknowledge()手动确认(确保消息处理完成后再确认,避免丢失)。DUPS_OK_ACKNOWLEDGE:延迟批量确认(可能重复接收消息,适用于容忍重复的场景)。
JMS 应用场景与优势
- 分布式系统通信:服务间通过消息异步协作(如订单系统通知库存系统)。
- 异步任务处理:耗时操作(如邮件发送、报表生成)通过消息队列异步执行,不阻塞主流程。
- 流量削峰:高并发场景(如秒杀)中,消息队列缓冲请求,按系统处理能力消费。
- 跨平台集成:不同语言、不同系统通过消息中间件互通(JMS 客户端可与非 Java 客户端通信)

v1.3.10