0%

hadoop数据存入es

Hadoop 数据写入 Elasticsearch:基于 ES-Hadoop 的实现详解

ES-Hadoop 是 Elasticsearch 官方推出的工具,用于打通 Hadoop 生态(HDFS、MapReduce、Spark 等)与 Elasticsearch,支持数据双向流动。本文详细讲解如何通过 MapReduce 任务将 Hadoop 中的数据(如 HDFS 上的文件)写入 Elasticsearch,包括依赖配置、代码实现、核心参数及最佳实践。

ES-Hadoop 简介与依赖准备

核心作用

ES-Hadoop 简化了 Hadoop 与 Elasticsearch 的数据交互,提供:

  • 适配 Hadoop 生态的输入 / 输出格式(EsInputFormat/EsOutputFormat)。
  • 自动处理数据序列化(Hadoop 数据 → ES 文档 JSON)。
  • 支持批量写入、节点发现、认证等核心功能。

依赖配置

在 Maven 项目中添加以下依赖(版本需与 Hadoop 和 Elasticsearch 匹配):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<!-- Hadoop 核心依赖 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.3</version> <!-- 与集群 Hadoop 版本一致 -->
<exclusions>
<exclusion> <!-- 排除冲突的日志组件 -->
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.3</version>
</dependency>

<!-- ES-Hadoop 核心依赖 -->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
<version>6.8.23</version> <!-- 与 ES 版本一致 -->
</dependency>

<!-- 可选:ES 高级客户端(用于额外操作) -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.8.23</version>
</dependency>

MapReduce 任务实现:HDFS 数据写入 ES

核心思路

通过 MapReduce 任务读取 HDFS 上的文件(如 JSON 格式),经 Mapper 处理后,通过 EsOutputFormat 批量写入 Elasticsearch。由于仅需数据透传(无聚合逻辑),无需 Reducer。

代码实现

(1)Job 配置类(H2EJob.java)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.elasticsearch.hadoop.mr.EsOutputFormat;

public class H2EJob {
public static void main(String[] args) throws Exception {
// 1. 初始化配置
Configuration conf = new Configuration();

// 2. ES 连接配置
conf.set("es.nodes", "192.168.1.222:9200"); // ES 节点地址(多个用逗号分隔)
conf.set("es.resource", "myindex/mytype"); // 目标索引/类型(6.x 支持类型,7.x 后仅 _doc)
conf.set("es.index.auto.create", "true"); // 自动创建索引(不存在时)
conf.set("es.input.json", "yes"); // 输入数据为 JSON 格式(直接解析)

// 3. 高级配置(可选)
conf.set("es.net.http.auth.user", "user"); // 认证用户名(如开启 X-Pack)
conf.set("es.net.http.auth.pass", "password"); // 认证密码
conf.set("es.http.timeout", "30m"); // 超时时间(大文件时增大)
conf.set("es.batch.size.entries", "2000"); // 批量写入文档数(默认 1000)

// 4. 配置 MapReduce 任务
Job job = Job.getInstance(conf, "HDFS to ES");
job.setJarByClass(H2EJob.class);

// Mapper 配置
job.setMapperClass(H2EMapper.class);
job.setMapOutputKeyClass(NullWritable.class); // 输出键(无意义,用 NullWritable)
job.setMapOutputValueClass(Text.class); // 输出值(JSON 字符串)

// 输出格式(使用 ES 提供的输出格式)
job.setOutputFormatClass(EsOutputFormat.class);

// 输入路径(HDFS 上的文件路径,从命令行参数传入)
FileInputFormat.addInputPath(job, new Path(args[0]));

// 提交任务并等待完成
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
(2)Mapper 类(H2EMapper.java)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

public class H2EMapper extends Mapper<LongWritable, Text, NullWritable, Text> {
/**
* 映射逻辑:读取 HDFS 中的 JSON 行,直接传递给输出(无需处理)
* @param key 输入键(行偏移量,无用)
* @param value 输入值(HDFS 中的一行 JSON 数据)
* @param context 上下文(用于输出数据)
*/
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 直接将 JSON 字符串写入上下文,由 EsOutputFormat 处理
context.write(NullWritable.get(), value);
}
}

代码说明

  • 数据流向:HDFS 文件(每行一个 JSON)→ Mapper 读取 → EsOutputFormat 批量写入 ES。

  • 无 Reducer:因无需聚合,通过 job.setNumReduceTasks(0) 禁用 Reducer(默认无)。

  • JSON 格式要求:HDFS 中的每行数据必须是合法 JSON(与 ES 索引映射匹配),例如:

    1
    {"id": 1, "name": "test", "timestamp": "2024-08-01"}

ES-Hadoop 核心配置参数详解

ES-Hadoop 提供丰富的配置参数,控制连接、读写、映射等行为,核心参数分类如下:

基本连接配置

参数 作用 默认值
es.nodes ES 节点地址(格式:host:port localhost:9200
es.resource 目标索引 / 类型(格式:index/type 无(必须配置)
es.resource.read 读取数据的索引 / 类型(默认同 es.resource -
es.resource.write 写入数据的索引 / 类型(默认同 es.resource -
es.port ES HTTP 端口 9200

写入行为配置

参数 作用 默认值
es.write.operation 写入操作类型: - index(新增 / 更新) - create(仅新增,冲突报错) - update(仅更新,不存在报错) - upsert(存在则更新,否则新增) index
es.batch.size.entries 批量写入的文档数 1000
es.batch.size.bytes 批量写入的最大字节数 1mb
es.batch.write.retry.count 批量写入失败重试次数 3
es.batch.write.retry.wait 重试间隔时间 10s

字段映射配置

用于将 Hadoop 数据字段映射到 ES 文档的元数据(如 _id_routing):

参数 作用 默认值
es.mapping.id 映射为 ES 文档 _id 的字段名 无(自动生成 _id
es.mapping.routing 映射为路由键 _routing 的字段名 无(默认用 _id 路由)
es.mapping.timestamp 映射为 _timestamp 的字段名
es.mapping.include 仅包含的字段(逗号分隔) 所有字段
es.mapping.exclude 排除的字段(逗号分隔)

认证与安全配置

参数 作用 默认值
es.net.http.auth.user HTTP 认证用户名
es.net.http.auth.pass HTTP 认证密码
es.net.ssl 是否启用 SSL 加密 false
es.net.ssl.keystore.location SSL 证书路径

性能与容错配置

参数 作用 默认值
es.http.timeout HTTP 连接超时时间 1m
es.nodes.discovery 是否自动发现集群其他节点 true
es.scroll.keepalive 读取数据时滚动查询的超时时间 10m

运行与验证

打包与提交

  • 打包:使用 Maven 打包为 JAR(需包含依赖,或使用 hadoop jar 时指定依赖)。

  • 提交任务:

    1
    hadoop jar h2es-job.jar com.example.H2EJob /user/data/input  # 输入路径为 HDFS 上的 JSON 文件目录

验证数据

通过 ES API 检查数据是否写入:

1
2
3
4
5
GET myindex/_search  # 查询索引数据
{
"query": { "match_all": {} },
"size": 10
}

最佳实践与注意事项

  1. 数据格式校验:确保 HDFS 中的 JSON 字段与 ES 索引映射匹配(类型、字段名一致),否则会导致写入失败。
  2. 批量参数调优:根据 ES 集群性能调整 es.batch.size.entrieses.batch.size.bytes(如大数据量时设为 2000~5000)。
  3. 避免热点分片:通过 es.mapping.routing 指定路由字段,使数据均匀分布到各分片。
  4. 索引预创建:生产环境建议提前创建索引并定义映射(禁用 es.index.auto.create),避免动态映射导致类型错误。
  5. 监控与重试:开启 MapReduce 任务日志,关注 es.batch.write.retry.count 相关日志,排查写入失败原因(如网络、权限、数据格式)

欢迎关注我的其它发布渠道

表情 | 预览
快来做第一个评论的人吧~
Powered By Valine
v1.3.10