0%

Elasticsearch Java Api

Elasticsearch Java API 全解析:从 TransportClient 到 RestHighLevelClient

Elasticsearch 提供了多种 Java API 用于与集群交互,其中 TransportClient(已过时)和 RestHighLevelClient(推荐)是最常用的两种。本文详细讲解这两种客户端的使用方法,包括索引管理、文档操作、查询等核心功能。

TransportClient(过时,了解即可)

TransportClient 基于 TCP 协议与 Elasticsearch 集群通信,在 Elasticsearch 7.0 后被标记为过时,8.0 完全移除。但部分旧系统仍在使用,此处简要介绍其核心用法。

依赖配置

1
2
3
4
5
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>5.4.0</version> <!-- 需与 Elasticsearch 版本匹配 -->
</dependency>

客户端初始化

1
2
3
4
5
6
7
8
9
10
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import java.net.InetAddress;

// 创建客户端(连接到本地 9300 端口,Transport 协议默认端口)
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(
InetAddress.getLocalHost(), 9300
));

核心操作示例

(1)索引管理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 获取索引管理客户端
IndicesAdminClient indices = client.admin().indices();

// 1. 创建索引(指定分片和副本)
CreateIndexResponse createResp = indices.prepareCreate("index3")
.setSettings(Settings.builder()
.put("index.number_of_shards", 3) // 3 个主分片
.put("index.number_of_replicas", 1) // 1 个副本
).get();
System.out.println("索引创建结果:" + createResp.isAcknowledged());

// 2. 判断索引是否存在
boolean exists = indices.prepareExists("index3").get().isExists();

// 3. 删除索引
DeleteIndexResponse deleteResp = indices.prepareDelete("index3").get();
(2)文档操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 1. 插入文档(ID 为 1)
Map<String, Object> doc = new HashMap<>();
doc.put("name", "zhangsan");
doc.put("age", 20);
IndexResponse indexResp = client.prepareIndex("index5", "default", "1")
.setSource(doc)
.get();

// 2. 查询文档
GetResponse getResp = client.prepareGet("index5", "default", "1").get();
System.out.println("文档内容:" + getResp.getSourceAsString());

// 3. 更新文档(修改 age 为 18)
UpdateResponse updateResp = client.prepareUpdate("index5", "default", "1")
.setDoc(XContentFactory.jsonBuilder()
.startObject()
.field("age", 18)
.endObject()
).get();

// 4. 删除文档
DeleteResponse deleteResp = client.prepareDelete("index5", "default", "1").get();
(3)查询操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 1. 匹配所有文档
SearchResponse matchAllResp = client.prepareSearch("index6")
.setQuery(QueryBuilders.matchAllQuery())
.setSize(10) // 返回 10 条结果
.get();

// 2. 条件查询(term 查询 name = "zhangsan")
SearchResponse termResp = client.prepareSearch("index6")
.setQuery(QueryBuilders.termQuery("name", "zhangsan"))
.get();

// 3. 组合查询(bool 条件)
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery()
.must(QueryBuilders.matchQuery("name", "zhangsan")) // 必须满足
.should(QueryBuilders.matchQuery("age", 20)); // 可选满足
SearchResponse boolResp = client.prepareSearch("index6")
.setQuery(boolQuery)
.get();

RestHighLevelClient(推荐)

RestHighLevelClient 基于 HTTP 协议与 Elasticsearch 交互,是目前推荐的客户端,支持 Elasticsearch 7.x+ 所有功能,且持续更新。

依赖配置

1
2
3
4
5
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.14.0</version> <!-- 需与 Elasticsearch 版本匹配 -->
</dependency>

客户端初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;

// 1. 配置连接信息(IP、端口、协议)
HttpHost httpHost = new HttpHost("127.0.0.1", 9200, "http");

// 2. 配置认证(若集群开启了安全验证)
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials("elastic", "password") // 用户名和密码
);

// 3. 构建客户端
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(httpHost)
.setHttpClientConfigCallback(httpClientBuilder ->
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
)
.setRequestConfigCallback(requestConfigBuilder -> {
// 配置超时时间
requestConfigBuilder.setConnectTimeout(5000); // 连接超时
requestConfigBuilder.setSocketTimeout(10000); // socket 超时
return requestConfigBuilder;
})
);

核心操作示例

(1)索引管理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 1. 创建索引
CreateIndexRequest createRequest = new CreateIndexRequest("index7");
// 可选:设置分片和副本
createRequest.settings(Settings.builder()
.put("index.number_of_shards", 3)
.put("index.number_of_replicas", 1)
);
// 可选:设置映射(mapping)
createRequest.mapping("{\n" +
" \"properties\": {\n" +
" \"name\": { \"type\": \"keyword\" },\n" +
" \"age\": { \"type\": \"integer\" }\n" +
" }\n" +
"}", XContentType.JSON);
CreateIndexResponse createResp = client.indices().create(createRequest, RequestOptions.DEFAULT);
System.out.println("索引创建结果:" + createResp.isAcknowledged());

// 2. 判断索引是否存在
GetIndexRequest existRequest = new GetIndexRequest("index7");
boolean exists = client.indices().exists(existRequest, RequestOptions.DEFAULT);

// 3. 删除索引
DeleteIndexRequest deleteRequest = new DeleteIndexRequest("index7");
AcknowledgedResponse deleteResp = client.indices().delete(deleteRequest, RequestOptions.DEFAULT);
(2)文档操作
插入单文档
1
2
3
4
5
6
7
8
9
10
11
12
13
// 1. 构建插入请求(索引 index8,类型 _doc,ID 1)
IndexRequest indexRequest = new IndexRequest("index8", "_doc", "1");
// 文档内容(Map 或 JSON 字符串)
Map<String, Object> doc = new HashMap<>();
doc.put("name", "lisi");
doc.put("age", 25);
indexRequest.source(doc);
// 可选:设置刷新策略(NONE:不立即刷新,IMMEDIATE:立即刷新)
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

// 2. 执行插入
IndexResponse indexResp = client.index(indexRequest, RequestOptions.DEFAULT);
System.out.println("插入结果:" + indexResp.getResult()); // 输出 CREATED 或 UPDATED
批量插入文档
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 1. 构建批量请求
BulkRequest bulkRequest = new BulkRequest();

// 2. 添加多个文档
// 文档 1(ID 2)
Map<String, Object> doc1 = new HashMap<>();
doc1.put("name", "wangwu");
doc1.put("age", 30);
bulkRequest.add(new IndexRequest("index8", "_doc", "2").source(doc1));

// 文档 2(ID 3)
Map<String, Object> doc2 = new HashMap<>();
doc2.put("name", "zhaoliu");
doc2.put("age", 35);
bulkRequest.add(new IndexRequest("index8", "_doc", "3").source(doc2));

// 3. 执行批量插入
BulkResponse bulkResp = client.bulk(bulkRequest, RequestOptions.DEFAULT);

// 4. 处理结果
if (bulkResp.hasFailures()) {
for (BulkItemResponse item : bulkResp) {
if (item.isFailed()) {
System.out.println("插入失败:" + item.getFailureMessage());
}
}
}
更新与删除文档
1
2
3
4
5
6
7
8
// 1. 更新文档(ID 1 的 age 改为 26)
UpdateRequest updateRequest = new UpdateRequest("index8", "_doc", "1");
updateRequest.doc("age", 26); // 直接指定字段,或使用 Map/JSON
UpdateResponse updateResp = client.update(updateRequest, RequestOptions.DEFAULT);

// 2. 删除文档(ID 3)
DeleteRequest deleteRequest = new DeleteRequest("index8", "_doc", "3");
DeleteResponse deleteResp = client.delete(deleteRequest, RequestOptions.DEFAULT);
(3)查询操作
基础查询
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 1. 构建搜索请求
SearchRequest searchRequest = new SearchRequest("index8"); // 目标索引
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();

// 2. 设置查询条件(匹配所有文档)
sourceBuilder.query(QueryBuilders.matchAllQuery());

// 3. 执行查询
searchRequest.source(sourceBuilder);
SearchResponse searchResp = client.search(searchRequest, RequestOptions.DEFAULT);

// 4. 解析结果
SearchHits hits = searchResp.getHits();
for (SearchHit hit : hits) {
System.out.println("文档 ID:" + hit.getId() + ",内容:" + hit.getSourceAsString());
}
高级查询(条件组合、分页、排序、聚合)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
SearchRequest searchRequest = new SearchRequest("index8");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();

// 1. 组合条件查询(bool 查询)
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery()
.must(QueryBuilders.rangeQuery("age").gte(25)) // 年龄 ≥25
.mustNot(QueryBuilders.termQuery("name", "wangwu")); // 排除 name=wangwu
sourceBuilder.query(boolQuery);

// 2. 分页(从第 0 条开始,返回 10 条)
sourceBuilder.from(0);
sourceBuilder.size(10);

// 3. 排序(按 age 降序)
sourceBuilder.sort("age", SortOrder.DESC);

// 4. 聚合(计算最大年龄)
sourceBuilder.aggregation(AggregationBuilders.max("max_age").field("age"));

// 5. 执行查询
searchRequest.source(sourceBuilder);
SearchResponse searchResp = client.search(searchRequest, RequestOptions.DEFAULT);

// 6. 解析聚合结果
Max maxAge = searchResp.getAggregations().get("max_age");
System.out.println("最大年龄:" + maxAge.getValue());

两种客户端的对比与迁移建议

特性 TransportClient RestHighLevelClient
通信协议 TCP(9300 端口) HTTP(9200 端口)
版本兼容性 严格匹配集群版本 兼容多个版本(需注意 minor 版本)
安全性 需额外配置安全插件 天然支持 HTTPS 和 Basic Auth
功能完整性 支持旧版本特性 支持最新特性(如 ILM、CCS)
未来支持 已废弃(8.0+ 移除) 官方推荐,持续更新

迁移建议

  1. 新项目直接使用 RestHighLevelClient,避免使用过时的 TransportClient。
  2. 旧项目逐步迁移:
    • 替换客户端初始化代码(TCP → HTTP)。
    • 调整 API 调用方式(如 prepareIndexIndexRequest)。
    • 注意索引类型(type)在 7.x 后逐渐移除,统一使用 _doc

欢迎关注我的其它发布渠道