Hadoop 数据写入 Elasticsearch:基于 ES-Hadoop 的实现详解
ES-Hadoop 是 Elasticsearch 官方推出的工具,用于打通 Hadoop 生态(HDFS、MapReduce、Spark 等)与 Elasticsearch,支持数据双向流动。本文详细讲解如何通过 MapReduce 任务将 Hadoop 中的数据(如 HDFS 上的文件)写入 Elasticsearch,包括依赖配置、代码实现、核心参数及最佳实践。
ES-Hadoop 简介与依赖准备
核心作用
ES-Hadoop 简化了 Hadoop 与 Elasticsearch 的数据交互,提供:
- 适配 Hadoop 生态的输入 / 输出格式(
EsInputFormat/EsOutputFormat)。 - 自动处理数据序列化(Hadoop 数据 → ES 文档 JSON)。
- 支持批量写入、节点发现、认证等核心功能。
依赖配置
在 Maven 项目中添加以下依赖(版本需与 Hadoop 和 Elasticsearch 匹配):
1 | <!-- Hadoop 核心依赖 --> |
MapReduce 任务实现:HDFS 数据写入 ES
核心思路
通过 MapReduce 任务读取 HDFS 上的文件(如 JSON 格式),经 Mapper 处理后,通过 EsOutputFormat 批量写入 Elasticsearch。由于仅需数据透传(无聚合逻辑),无需 Reducer。
代码实现
(1)Job 配置类(H2EJob.java)
1 | import org.apache.hadoop.conf.Configuration; |
(2)Mapper 类(H2EMapper.java)
1 | import org.apache.hadoop.io.LongWritable; |
代码说明
数据流向:HDFS 文件(每行一个 JSON)→ Mapper 读取 →
EsOutputFormat批量写入 ES。无 Reducer:因无需聚合,通过
job.setNumReduceTasks(0)禁用 Reducer(默认无)。JSON 格式要求:HDFS 中的每行数据必须是合法 JSON(与 ES 索引映射匹配),例如:
1
{"id": 1, "name": "test", "timestamp": "2024-08-01"}
ES-Hadoop 核心配置参数详解
ES-Hadoop 提供丰富的配置参数,控制连接、读写、映射等行为,核心参数分类如下:
基本连接配置
| 参数 | 作用 | 默认值 |
|---|---|---|
es.nodes |
ES 节点地址(格式:host:port) |
localhost:9200 |
es.resource |
目标索引 / 类型(格式:index/type) |
无(必须配置) |
es.resource.read |
读取数据的索引 / 类型(默认同 es.resource) |
- |
es.resource.write |
写入数据的索引 / 类型(默认同 es.resource) |
- |
es.port |
ES HTTP 端口 | 9200 |
写入行为配置
| 参数 | 作用 | 默认值 |
|---|---|---|
es.write.operation |
写入操作类型: - index(新增 / 更新) - create(仅新增,冲突报错) - update(仅更新,不存在报错) - upsert(存在则更新,否则新增) |
index |
es.batch.size.entries |
批量写入的文档数 | 1000 |
es.batch.size.bytes |
批量写入的最大字节数 | 1mb |
es.batch.write.retry.count |
批量写入失败重试次数 | 3 |
es.batch.write.retry.wait |
重试间隔时间 | 10s |
字段映射配置
用于将 Hadoop 数据字段映射到 ES 文档的元数据(如 _id、_routing):
| 参数 | 作用 | 默认值 |
|---|---|---|
es.mapping.id |
映射为 ES 文档 _id 的字段名 |
无(自动生成 _id) |
es.mapping.routing |
映射为路由键 _routing 的字段名 |
无(默认用 _id 路由) |
es.mapping.timestamp |
映射为 _timestamp 的字段名 |
无 |
es.mapping.include |
仅包含的字段(逗号分隔) | 所有字段 |
es.mapping.exclude |
排除的字段(逗号分隔) | 无 |
认证与安全配置
| 参数 | 作用 | 默认值 |
|---|---|---|
es.net.http.auth.user |
HTTP 认证用户名 | 无 |
es.net.http.auth.pass |
HTTP 认证密码 | 无 |
es.net.ssl |
是否启用 SSL 加密 | false |
es.net.ssl.keystore.location |
SSL 证书路径 | 无 |
性能与容错配置
| 参数 | 作用 | 默认值 |
|---|---|---|
es.http.timeout |
HTTP 连接超时时间 | 1m |
es.nodes.discovery |
是否自动发现集群其他节点 | true |
es.scroll.keepalive |
读取数据时滚动查询的超时时间 | 10m |
运行与验证
打包与提交
打包:使用 Maven 打包为 JAR(需包含依赖,或使用
hadoop jar时指定依赖)。提交任务:
1
hadoop jar h2es-job.jar com.example.H2EJob /user/data/input # 输入路径为 HDFS 上的 JSON 文件目录
验证数据
通过 ES API 检查数据是否写入:
1 | GET myindex/_search # 查询索引数据 |
最佳实践与注意事项
- 数据格式校验:确保 HDFS 中的 JSON 字段与 ES 索引映射匹配(类型、字段名一致),否则会导致写入失败。
- 批量参数调优:根据 ES 集群性能调整
es.batch.size.entries和es.batch.size.bytes(如大数据量时设为 2000~5000)。 - 避免热点分片:通过
es.mapping.routing指定路由字段,使数据均匀分布到各分片。 - 索引预创建:生产环境建议提前创建索引并定义映射(禁用
es.index.auto.create),避免动态映射导致类型错误。 - 监控与重试:开启 MapReduce 任务日志,关注
es.batch.write.retry.count相关日志,排查写入失败原因(如网络、权限、数据格式)
v1.3.10