0%

广告系统中的频控实现:从 Redis 到 HBase 的方案详解

在广告投放中,频控(频率控制) 是核心机制之一,用于限制同一用户对同一广告的曝光次数(如 “1 小时内最多看 3 次”),避免用户反感并优化广告资源利用率。本文将详细介绍如何基于 Redis 和 HBase 实现频控,并分析两种方案的适用场景。

频控的核心需求与设计原则

核心需求

  • 精准计数:准确记录用户对特定广告的曝光 / 点击时间,支持按周期(天 / 小时 / 分钟)统计。
  • 高效判断:快速判断当前请求是否超出频控限制(如 “用户 A 在 1 小时内已看广告 B 5 次,限制为 3 次则拒绝投放”)。
  • 高并发支持:广告系统峰值请求可达百万级 QPS,频控判断需在毫秒级完成。
  • 数据持久化:长期保留用户行为数据(如 90 天),用于数据分析和策略优化。

设计原则

  • key 设计:需唯一标识 “用户 - 广告” 组合,通常采用 uid:campaignId 作为键(uid 为用户唯一标识,campaignId 为广告计划 ID)。
  • 时间排序:存储的行为时间需按顺序排列,便于快速筛选指定周期内的记录。
  • 过期清理:自动删除超出统计周期的数据(如只保留 24 小时内的记录),减少存储压力。

基于 Redis 的频控实现(高并发场景首选)

Redis 凭借内存存储和丰富的数据结构,成为高频场景下频控的首选方案,尤其适合实时性要求高的场景(如信息流广告实时投放)。

数据结构选择

使用 Redis List 存储用户对广告的行为时间戳,原因如下:

阅读全文 »

Kafka 镜像操作详解:跨集群数据同步

Kafka 镜像操作(MirrorMaker)是实现跨集群数据同步的核心工具,通过消费源集群的消息并生产到目标集群,实现两个 Kafka 集群之间的数据镜像。这种机制适用于灾备、数据迁移、多区域部署等场景。本文将详细介绍 MirrorMaker 的工作原理、配置方法及操作步骤。

镜像操作核心原理

MirrorMaker 的工作机制本质是 “消费 - 生产” 模式:

  1. 消费者:从源集群的指定主题拉取消息。
  2. 生产者:将拉取的消息推送到目标集群的同名(或指定)主题。

通过这种方式,目标集群会实时同步源集群的消息,形成 “镜像”。MirrorMaker 支持通过正则表达式(--whitelist)过滤需要同步的主题,灵活控制同步范围。

镜像操作工具:kafka-mirror-maker.sh

Kafka 提供 kafka-mirror-maker.sh(Linux/Mac)或 kafka-mirror-maker.bat(Windows)脚本执行镜像操作,核心参数如下:

参数 作用
--consumer.config 源集群消费者配置文件路径(必传)。
--producer.config 目标集群生产者配置文件路径(必传)。
--whitelist 正则表达式,指定需要同步的源集群主题(如 `test-mirror topic.*`)。
--blacklist 正则表达式,指定无需同步的源集群主题(与 --whitelist 互斥)。
--num.streams 消费线程数(默认 1,增加可提升同步吞吐量)。

操作步骤

环境准备

  • 源集群(待同步数据的集群)和目标集群(接收同步数据的集群)已正常运行。
  • 确保源集群的主题在目标集群中已创建(可手动创建或配置自动创建)。

配置文件

(1)消费者配置文件(源集群)

创建 consumer-mirror.properties,配置源集群连接信息:

阅读全文 »

Java 线程池详解:从原理到实践

线程池是 Java 并发编程中不可或缺的组件,它通过复用线程、统一管理任务,显著提升了多线程程序的性能和稳定性。本文将深入解析线程池的核心原理、常用类型、工作机制及实践技巧。

线程池的核心价值

在没有线程池的场景下,每次任务执行都需要创建和销毁线程,这会带来显著的性能开销(线程创建涉及操作系统内核调用)。线程池通过以下方式解决这一问题:

  • 线程复用:避免频繁创建 / 销毁线程的开销;
  • 资源管控:限制最大线程数,防止线程过多导致的系统资源竞争和阻塞;
  • 任务管理:提供任务排队、定时执行、拒绝策略等功能,简化并发编程。

线程池的核心组件与接口

Java 线程池基于 Executor 框架实现,核心接口和类的关系如下:

线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 最顶层接口:定义任务执行入口
public interface Executor {
void execute(Runnable command);
}

// 扩展 Executor,增加线程池生命周期管理和任务提交能力
public interface ExecutorService extends Executor {
void shutdown(); // 优雅关闭
List<Runnable> shutdownNow(); // 强制关闭
<T> Future<T> submit(Callable<T> task); // 提交有返回值的任务
// 其他方法...
}

// 线程池的具体实现类
public class ThreadPoolExecutor extends AbstractExecutorService {
// 核心实现...
}

Executors 工具类提供了多种预定义线程池的创建方法,但实际开发中更推荐直接使用 ThreadPoolExecutor 自定义线程池(避免资源耗尽风险)。

ThreadPoolExecutor 核心参数与工作原理

核心构造参数

1
2
3
4
5
6
7
8
9
10
11
  public ThreadPoolExecutor(int corePoolSize, // 核心线程数,决定新提交的任务是新开线程去执行还是放到任务队列中,当线程数量小于corePoolSize时才会去创建线程,如果大于corePoolSize会将任务放入workQueue队列中
int maximumPoolSize, // 最大线程数,线程池能创建的最大线程数,达到该数值后将不会创建新的线程,再添加任务则采用拒绝策略
long keepAliveTime, // 线程超过corePoolSize后,多余线程的最大闲置时间,如果超过,则会终止,但是对于核心线程,如果allowCoreThreadTimeOut(boolean value)设置为true的话,核心线程也会被终止
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 线程数如果大于corePoolSize会将任务放入workQueue队列中,存放未处理的任务的阻塞队列,常用的队列有
//1)ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小
//2)LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE
//3)SynchronousQueue:这个队列比较特殊,它不会进行存储,而是将直接新建一个线程来执行新来的任务
//4)PriorityBlockingQuene:具有优先级的无界阻塞队列
ThreadFactory threadFactory, // 生产线程的工厂类,可以用来定义线程名称以及线程的优先级
RejectedExecutionHandler handler)//拒绝策略

工作流程

线程池处理任务的逻辑如下:

  1. 当任务提交时,若当前线程数 < corePoolSize,创建核心线程执行任务;
  2. 若线程数 ≥ corePoolSize,将任务加入 workQueue 等待;
  3. workQueue 已满,且当前线程数 < maximumPoolSize,创建临时线程执行任务;
  4. 若线程数 ≥ maximumPoolSize,触发 handler 拒绝策略;
  5. 临时线程空闲时间超过 keepAliveTime 时,被销毁释放资源。

注意:若设置 allowCoreThreadTimeOut(true),核心线程也会因空闲超时被销毁。

核心方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
// 变量的高3位代表线程池的状态,后29位(从低位往高位数)代表该线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 线程池数量的位数 29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程最大个数
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// RUNNING -> SHUTDOWN On invocation of shutdown(), perhaps implicitly in finalize()
// (RUNNING or SHUTDOWN) -> STOP On invocation of shutdownNow()
// SHUTDOWN -> TIDYING When both queue and pool are empty
// STOP -> TIDYING When pool is empty
// TIDYING -> TERMINATED When the terminated() hook method has completed

// 线程池状态 高3位
// 高三位111,表示运行中的状态标识,此时可以接受新任务并且执行队列中的任务 runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
// 高三位000,表示关闭中的状态标识,调用了shutdown方法,此时不再接收新任务,但是队列里的任务还得执行
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 高三位001,表示已停止的状态标识,调用了shutdownNow方法不再接受新任务,同时抛弃阻塞队列中的所有任务并中断正在执行的任务
private static final int STOP = 1 << COUNT_BITS;
// 高三位010,表示当前所有任务已经终止,任务数量为0时的状态标识,在调用shutdown方法和shutdownNow方法时都会尝试更新这个状态
private static final int TIDYING = 2 << COUNT_BITS;
// 高三位011,表示线程池已经完全终止(关闭),调用了terminated方法会更新为该状态
private static final int TERMINATED = 3 << COUNT_BITS;

// 获取运行状态 高3位
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取线程数量 低29位
private static int workerCountOf(int c) { return c & CAPACITY; }
// 计算ctl值 线程状态和线程个数 或操作
private static int ctlOf(int rs, int wc) { return rs | wc; }

// 用来保存等待任务执行的阻塞队列
private final BlockingQueue<Runnable> workQueue;
// 可重入锁,对线程池状态等属性进行修改时需要持有该锁
private final ReentrantLock mainLock = new ReentrantLock();
// 包含了所有在工作的线程
private final HashSet<Worker> workers = new HashSet<Worker>();
// 锁条件队列,主要用于 awaitTermination 终止条件
private final Condition termination = mainLock.newCondition();
// 记录线程池最大工作线程的数量
private int largestPoolSize;
// 完成任务的数量,仅在中止工作任务时更新
private long completedTaskCount;
// 用于创建线程的工厂
private volatile ThreadFactory threadFactory;
// 拒绝策略
private volatile RejectedExecutionHandler handler;
// 空闲存活时间,如果线程池中的线程数量比核心线程数量还要多时,并且多出的这些线程都是闲置状态,该变量则是这些闲置状态的线程的存活时间
private volatile long keepAliveTime;
// 默认为 false,如果设为 true那么核心线程也会遵循 keepAliveTime的时间来做闲置处理
private volatile boolean allowCoreThreadTimeOut;
// 线程池核心线程数量
private volatile int corePoolSize;
// 线程池最大线程数量
private volatile int maximumPoolSize;

// submit是将任务进行一次封装,然后执行execute方法
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}

public void execute(Runnable command) {
// command为空,则抛出空指针
if (command == null)
throw new NullPointerException();
// 获取线程池状态 和 线程数量的组合值
int c = ctl.get();
// workerCountOf方法根据ctl的低29位,得到线程池的当前线程数,如果线程数小于corePoolSize,则执行addWorker方法创建新的线程执行任务
// 判断是否创建核心线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) // 创建核心线程
// 成功则直接返回
return;
// 失败需要再次获取线程池的状态
c = ctl.get();
}
// 经过上面的if语句,走到这里可能会因为两种情况1:当前工作线程数大于等于核心线程数 2:创建线程失败
// 判断线程池是否在运行
// 如果是RUNNING状态,则添加任务到阻塞队列中
if (isRunning(c) && workQueue.offer(command)) {
// 双重检查,再次获取线程池状态,插入成功再次验证线程池是否运行,防止任务入队的过程中ctl的值发生变化
int recheck = ctl.get();
//再次检查如果不在运行,移除已入队的任务,然后抛出拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果在运行,当前线程数为0,就启用一个非核心线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 加入阻塞队列失败(队列已满),尝试创建非核心线程
//如果创建非核心线程失败,直接拒绝
else if (!addWorker(command, false))
reject(command);
}


// 添加线程 core表示是否为核心线程
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) { // 自旋
int c = ctl.get();
// 当前线程池运行状态
int rs = runStateOf(c);

// Check if queue empty only if necessary.
// 判断线程池状态,如果线程池的状态值大于或等于SHUTDOWN,则不处理提交任务,直接返回
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 自旋,更新线程数量
for (;;) {
// 当前线程数
int wc = workerCountOf(c);
// 根据core来判断是否为核心线程
// 当前线程数已经超过容量,就直接返回
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// CAS增加当前线程数量,更改成功结束自旋
if (compareAndIncrementWorkerCount(c))
break retry;
// CAS操作失败了,获取最新值
c = ctl.get(); // Re-read ctl
// 检查线程池状态是不是修改了,线程池状态修改则需要重新获取线程池
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// CAS操作成功,创建线程的所有条件都满足了,可以开始创建线程来执行任务
// worker是否启动
boolean workerStarted = false;
// 是否将worker添加到workers集合中
boolean workerAdded = false;
Worker w = null;
try {
// 创建工作线程
w = new Worker(firstTask);
// 取出worker中的线程对象
final Thread t = w.thread;
if (t != null) {
// 获取线程池主锁,为了保证workers同步
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
// 获取当前线程池运行状态
int rs = runStateOf(ctl.get());

//小于SHUTTDOWN即RUNNING
//等于SHUTDOWN并且firstTask为null,不接受新的任务,但是会继续执行等待队列中的任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 判断线程是否是 alive状态
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 添加线程到workers中,workers是一个HashSet private final HashSet<Worker> workers = new HashSet<Worker>();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// 设置当前工作者加入线程队列的已添加的标识为 true
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 线程添加线程池成功,启动新建的线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 线程启动失败
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}


Worker

Worker实现了Runnable接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable

public void run() {
runWorker(this);
}
// worker的执行方法
final void runWorker(Worker w) {
// 获取当前线程
Thread wt = Thread.currentThread();
// 获取worker的firstTask
Runnable task = w.firstTask;
w.firstTask = null;
// 释放锁,设置aqs的state为0,允许中断
w.unlock(); // allow interrupts
// 用于标识线程是否异常终止
boolean completedAbruptly = true;
try {
// 循环getTask获取任务
while (task != null || (task = getTask()) != null) {
// 获取到可执行的任务,对worker对象加锁,保证线程在执行任务过程中不会被中断
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) || // 线程池状态大于等于STOP
(Thread.interrupted() && // 线程被中断 且 是线程池内部的状态变化中断的
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted()) // 当前线程未被中断
wt.interrupt(); // 中断
try {
// 线程执行前
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 开始执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 线程执行后
afterExecute(task, thrown);
}
} finally {
task = null;
// 完成数加一
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

// 获取任务
private Runnable getTask() {
// 标识是否超时
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
// 线程池控制状态
int c = ctl.get();
// 线程池运行状态
int rs = runStateOf(c);

// 如果线程池状态大于等于STOP,或者处于SHUTDOWN状态,并且阻塞队列为空,线程池工作线程数量递减,方法返回null,回收线程
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// Are workers subject to culling?
// 如果allowCoreThreadTimeOut为ture,或当前线程数大于核心池大小,则需要超时回收
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//如果worker数量大于maximumPoolSize或者超时了
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c)) //线程池工作线程数量递减,方法返回null,回收线程
return null;
continue;//线程池工作线程数量递减失败,跳过剩余部分,继续循环
}

try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

其他方法

1
2
3
4
5
6
7
8
// 取得第一个完成任务的结果值,当第一个任务执行完成后,会调用interrupt()将其他任务中断,需要结合if(Thread.currentThread().isInterrupted())来判断
// 会阻塞
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException
// 等全部线程任务执行完毕后,取得所有完成任务的结果值
// 会阻塞
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException

常用线程池类型(Executors 工具类)

Executors 提供了便捷的线程池创建方法,但需注意其潜在风险(如无界队列可能导致 OOM):

线程池类型 核心参数 适用场景 风险提示
newFixedThreadPool(n) 核心线程 = 最大线程 = n,队列无界(LinkedBlockingQueue 执行长期稳定的任务 任务过多时队列可能耗尽内存
newSingleThreadExecutor() 核心线程 = 最大线程 = 1,队列无界 串行执行任务(保证顺序) 同上
newCachedThreadPool() 核心线程 = 0,最大线程 = Integer.MAX_VALUE,队列无界(SynchronousQueue 短期小任务(如 RPC 调用) 任务暴增时可能创建过多线程
newScheduledThreadPool(n) 核心线程 = n,支持定时 / 周期性任务(DelayedWorkQueue 定时任务(如心跳检测) 最大线程数过大可能导致问题

任务提交与线程池关闭

任务提交方式

  • execute(Runnable):无返回值,无法捕获任务异常;
  • submit(Runnable/Callable):返回 Future 对象,可通过 get() 获取结果或异常。
1
2
3
4
5
6
7
// 示例:提交有返回值的任务
ExecutorService pool = new ThreadPoolExecutor(...);
Future<Integer> future = pool.submit(() -> {
Thread.sleep(1000);
return 1 + 1;
});
System.out.println(future.get()); // 阻塞等待结果:2

线程池关闭

  • shutdown():优雅关闭,不再接收新任务,等待已提交任务执行完毕;
  • shutdownNow():强制关闭,尝试中断正在执行的任务,返回未执行的任务列表。
1
2
3
4
5
6
// 优雅关闭示例
pool.shutdown();
if (!pool.awaitTermination(1, TimeUnit.MINUTES)) {
// 超时后强制关闭
pool.shutdownNow();
}

拒绝策略(RejectedExecutionHandler)

当任务数超过线程池承载能力(maximumPoolSize + workQueue 容量)时,触发拒绝策略:

策略类型 行为描述 适用场景
AbortPolicy 抛出 RejectedExecutionException(默认策略) 需明确感知任务拒绝的场景
CallerRunsPolicy 由提交任务的线程(如主线程)执行任务 临时流量峰值,减缓提交速度
DiscardPolicy 直接丢弃任务,无任何提示 非核心任务,允许丢失
DiscardOldestPolicy 丢弃队列中最旧的任务,尝试提交新任务 任务有时间优先级(如日志)

自定义拒绝策略:实现 RejectedExecutionHandler 接口,例如记录日志后异步重试。

ForkJoinPool:分治任务的并行计算

ForkJoinPool 是 Java 7 引入的线程池,专为分治任务设计(大任务拆分为小任务,并行计算后合并结果),底层采用工作窃取算法(空闲线程主动获取其他线程的任务)。

核心使用方式

  • RecursiveTask<V>:有返回值的分治任务;
  • RecursiveAction:无返回值的分治任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 示例:计算 1~100 的和(RecursiveTask)
public class SumTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 10; // 拆分阈值
private int start;
private int end;

public SumTask(int start, int end) {
this.start = start;
this.end = end;
}

@Override
protected Integer compute() {
if (end - start <= THRESHOLD) {
// 小任务直接计算
int sum = 0;
for (int i = start; i <= end; i++) sum += i;
return sum;
} else {
// 大任务拆分
int mid = (start + end) / 2;
SumTask left = new SumTask(start, mid);
SumTask right = new SumTask(mid + 1, end);
left.fork(); // 并行执行左任务
right.fork(); // 并行执行右任务
return left.join() + right.join(); // 合并结果
}
}

public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
System.out.println(pool.invoke(new SumTask(1, 100))); // 输出:5050
}
}

线程池实践建议

  1. 避免使用 Executors 工具类:直接通过 ThreadPoolExecutor 自定义参数,明确队列大小和拒绝策略;
  2. 合理设置核心参数
    • 核心线程数:CPU 密集型任务(如计算)设为 CPU核心数 + 1,IO 密集型任务(如网络请求)设为 2 * CPU核心数
    • 队列选择:短期任务用 ArrayBlockingQueue(有界),避免 OOM;
  3. 监控线程池状态:通过 getActiveCount()getQueue().size() 等方法监控负载,动态调整参数;
  4. 自定义线程工厂:为线程命名(如 OrderProcessing-Thread-1),便于问题排查;
  5. 优雅处理任务异常:通过 afterExecute 钩子方法记录任务异常,避免异常被吞噬。

Kafka 安全机制详解:身份认证与权限控制

Kafka 作为分布式消息系统,需保障数据传输的安全性和访问的可控性。其安全机制主要包含身份认证(验证连接方身份)和权限控制(限制操作范围)两部分,可通过配置实现客户端与 Broker、Broker 之间及 Broker 与 ZooKeeper 之间的安全通信。本文将详细介绍这两种机制的实现方式及操作步骤。

身份认证

身份认证用于验证连接发起方的合法性,防止未授权节点接入集群。Kafka 支持多种认证机制,如 SSL、SASL(含 PLAIN、SCRAM 等),其中 SASL/PLAIN 因配置简单被广泛用于内部环境。

SASL/PLAIN 认证原理

SASL(Simple Authentication and Security Layer)是一种通用认证框架,PLAIN 是其最简单的机制,基于用户名 / 密码验证。适用于客户端与 Broker、Broker 之间及 Broker 与 ZooKeeper 的认证。

配置步骤(SASL/PLAIN)

(1)环境准备
  • 确保 Kafka 集群(含 ZooKeeper)已安装并正常运行。
  • 准备 3 类 JAAS(Java Authentication and Authorization Service)配置文件:ZooKeeper 服务端、Kafka 服务端、Kafka 客户端。
(2)配置 ZooKeeper 认证
① 修改 zookeeper.properties

开启 SASL 认证,添加以下配置:

1
2
3
4
5
6
# 启用 SASL 认证插件
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
# 要求客户端必须使用 SASL 认证
requireClientAuthScheme=sasl
# 认证信息刷新间隔(毫秒)
jaasLoginRenew=3600000
② 创建 ZooKeeper JAAS 文件(kafka_zk_jaas.conf

定义 ZooKeeper 服务端的认证用户(如管理员 admin):

阅读全文 »

Kafka 连接器详解:数据集成的桥梁

Kafka 连接器(Kafka Connect)是 Kafka 生态中用于数据集成的核心工具,旨在简化外部系统(如文件、数据库、消息队列等)与 Kafka 之间的数据同步。它支持独立模式(Standalone)和分布式模式(Distributed)两种部署方式,提供标准化的连接器接口和 REST API,降低了数据导入导出的开发成本。本文将详细介绍连接器的工作模式、配置方法及核心功能。

连接器概述

核心作用

Kafka 连接器用于解决 “外部系统 ↔ Kafka” 的数据流转问题,避免重复开发数据同步工具:

  • Source 连接器:从外部系统(如文件、MySQL)读取数据,导入 Kafka 主题。
  • Sink 连接器:从 Kafka 主题读取数据,导出到外部系统(如文件、Elasticsearch)。

核心概念

  • Worker:连接器的运行实例,负责管理连接器和任务(Task)。
  • Task:实际执行数据同步的单元,Source 连接器对应 SourceTask,Sink 对应 SinkTask,可并行处理以提升效率。
  • 转换器(Converter):负责数据格式转换(如 JSON ↔ 字节数组),确保 Kafka 与外部系统的数据格式兼容。
  • 偏移量(Offset):记录数据同步的进度,确保断点续传(类似消费者偏移量)。

独立模式(Standalone)

独立模式是单进程部署,适合测试、开发或简单场景,所有连接器和任务运行在一个 Worker 进程中。通过 connect-standalone.sh 脚本启动。

核心配置

(1)Worker 配置(connect-standalone.properties)

定义 Worker 与 Kafka 的连接、数据转换、偏移量存储等全局配置:

阅读全文 »