基本概念

AR:分区的所有副本集合,AR=ISR+OSR

ISR:与leader副本一定程度同步的所有副本(包括leader副本)

OSR:与leader副本同步滞后过多的副本集合

HW:高水位,所有副本都同步到的offset,消费者只能拉取HW之前的消息

LEO:Log End Offset,待写入消息的offset,即最后一条消息的offset+1

安装

JDK安装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 解压jdk压缩包至/opt
tar zxvf jdk-8u181-linux-x64.tar.gz

# 配置jdk环境变量
vim /etc/profile
# 添加如下配置
export JAVA_HOME=/opt/jdk1.8.0_181
export JRE_HOME=$JAVA_HOME/jre
export PATH=$PATH:$JAVA_HOME/bin
export CLASSPATH=.:$JAVA_HOME/lib:$JRE_HOME/lib

# 使配置生效
source /etc/profile

java -version

ZooKeeper安装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 解压安装包至/opt
tar zxvf zookeeper-3.4.12.tar.gz

# 配置环境变量
vim /etc/profile
# 添加如下配置
export ZOOKEEPER_HOME=/opt/zookeeper-3.4.12
export PATH=$PATH:$ZOOKEEPER_HOME/bin
# 使配置生效
source /etc/profile

# 修改zk配置文件
cd $ZOOKEEPER_HOME/conf
cp zoo_sample.cfg zoo.cfg

配置文件$ZOOKEEPER_HOME/conf/zoo.cfg参考

1
2
3
4
5
6
7
8
9
10
11
12
13
# ZooKeeper 服务器心跳时间, 单位为 ms
tickTime=2000
# 投票选举新 leader 的初始化时间
initLimit=10
# leader 与 follower 心跳检测最大容忍时间,响应超过 syncLimit*tickTime,leader 认为
# follower “死掉 ,从服务器列表中删除 follower
syncLimit=5
# 数据目录
dataDir=/tmp/zookeeper/data
# 日志目录
dataLogDir=/tmp/zookeeper/log
# ZooKeeper 对外服务端口
clientPort=2181

创建数据目录和日志目录

1
2
mkdir -p /tmp/zookeeper/data
mkdir -p /tmp/zookeeper/log

创建myid文件

1
echo 1 > /tmp/zookeeper/data/myid

启动服务并查看状态

1
2
zkServer.sh start
zkServer.sh status

集群配置,在配置文件中添加如下内容:

1
2
3
server.0=192.168.0.2:2888:3888
server.1=192.168.0.3:2888:3888
server.2=192.168.0.4:2888:3888

server.A=B:C:D

  • A 服务器的编号,myid中的值
  • B 服务器的IP地址
  • C 服务器与集群中的leader服务器交换信息的端口
  • D 选举时服务器相互通信的端口

Kafka安装

1
2
3
4
5
6
7
tar zxvf kafka_2.11-2.0.0.tgz
# 配置环境变量
vim /etc/profile
# 添加如下配置
export KAFKA_HOME=/opt/kafka_2.11-2.0.0
# 使配置生效
source /etc/profile

配置文件$KAFKA_HOME/conf/server.properties参考

1
2
3
4
5
6
7
8
# broker 的编号,如果集群中有多个broker,则每个broker的编号需要设置的不同
broker.id=0
# broker对外提供的服务入口地址
listeners=PLAINTEXT://localhost:9092
# 存放消息日志文件的地址
log.dirs=/tmp/kafka-logs
# Kafka所需的ZooKeeper集群地址
zookeeper.connect=localhost:2181/kafka

服务端参数配置说明:

  • zookeeper.connect:broker要连接的ZooKeeper集群的服务地址,多个用英文逗号分隔;

  • listeners:broker监听客户端连接的地址列表,即为客户端连接broker的入口地址列表,多个用英文逗号分隔,支持协议类型有:

    • PLAINTEXT
    • SSL
    • SASL_SSL
  • broker.id:broker唯一标识,默认为-1,如果没有设置则自动生成;
  • log.dirs:日志文件存放的根目录,可以配置多个根目录,用英文逗号分隔;
  • message.max.bytes:broker所能接收消息的最大值,单位Byte。如果Producer发送的消息大于这个参数,会抛出RecordTooLargeException异常,默认值1000012(约976.6KB);

启动服务:

1
2
3
4
5
6
# 直接启动
bin/kafka-server-start.sh config/server.properties
# 后台启动
bin/kafka-server-start.sh -daemon config/server.properties
# 或者
bin/kafka-server-start.sh config/server.properties &

服务端工具

创建主题,主题名为topic-demo,分区数为4,每个分区有3个副本(1个leader,2个follower)

1
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topic-demo --replication-factor 3 --partitions 4

查看主题详细

1
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic-demo

生产者发布主题消息

1
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-demo

消费组订阅主题消息

1
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic-demo --from-beginning

生产者

生产者和消费者客户端都需添加kafka官方maven依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>

必要参数配置

1
2
3
4
5
6
7
8
9
10
11
public static final String brokerList = "192.168.0.100:9092";
public static final String topic = "topic-demo";

public static Properties initConfig() {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.demo");
return properties;
}

消息发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void main(String[] args) {
Properties properties = initConfig();
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "Hello,Kafka!");
try {
Future<RecordMetadata> future = producer.send(record);
//同步模式,这里会阻塞
RecordMetadata metadata = future.get();
System.out.println(metadata.topic() + "-" + metadata.partition() + ":" + metadata.offset());
} catch (Exception e) {
e.printStackTrace();
}
producer.close();
}

异步模式

1
2
3
4
5
6
7
8
9
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
e.printStackTrace();
} else {
System.out.println(metadata.topic() + "-" + metadata.partition() + ":" + metadata.offset());
}
}
});

序列化

自定义对象:

1
2
3
4
5
6
7
8
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class Company {
private String name;
private String address;
}

自定义序列化的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class CompanySerializer implements Serializer<Company> {
public void configure(Map<String, ?> map, boolean b) {
}

public byte[] serialize(String s, Company data) {
if (data == null) {
return null;
}
byte[] name, address;
try {
if (data.getName() != null) {
name = data.getName().getBytes("UTF-8");
} else {
name = new byte[0];
}
if (data.getAddress() != null) {
address = data.getAddress().getBytes("UTF-8");
} else {
address = new byte[0];
}
ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + name.length + address.length);
buffer.putInt(name.length);
buffer.put(name);
buffer.putInt(address.length);
buffer.put(address);
return buffer.array();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return new byte[0];
}

public void close() {
}
}

设置自定义序列化属性:

1
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CompanySerializer.class.getName());

构造自定义对象消息:

1
2
Company company = Company.builder().name("myCompany").address("China").build();
ProducerRecord<String, Company> record = new ProducerRecord<String, Company>(topic, company);

分区器

ProducerRecord中没有指定partition字段,则依赖分区器,自定义分区器的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class DemoPartitioner implements Partitioner {
private final AtomicInteger counter = new AtomicInteger(0);

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);
int numPartitions = partitionInfos.size();
if (keyBytes == null) {
return counter.getAndIncrement() % numPartitions;
} else {
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}

public void close() {
}

public void configure(Map<String, ?> map) {
}
}

设置分区器属性:

1
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DemoPartitioner.class.getName());

拦截器

自定义拦截器的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ProducerInterceptorPrefix implements ProducerInterceptor<String, String> {
private volatile long sendSuccess = 0;
private volatile long sendFailure = 0;

public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
String modifiedValue = "prefix1-" + record.value();
return new ProducerRecord<String, String>(record.topic(), record.partition(), record.timestamp(), record.key(), modifiedValue, record.headers());
}

public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
if (e == null)
sendSuccess++;
else
sendFailure++;
}

public void close() {
double successRatio = (double) sendSuccess / (sendSuccess + sendFailure);
System.out.println("发送成功率=" + String.format("%f", successRatio % 100) + "%");
}

public void configure(Map<String, ?> map) {
}
}

设置拦截器属性,可以指定多个拦截器,多个拦截器类名用英文逗号分隔,按指定的顺序执行拦截器链:

1
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class.getName());
  • onSend:在消息序列化和计算分区之前,调用此方法对消息进行定制化操作;
  • onAcknowlegement:在消息被应答之前或消息放送失败调用此方法,优先于用户的Callback之前执行;
  • close:在关闭拦截器时执行一些资源的清理工作;

顺序:拦截器->序列化->分区器

重要的生产者参数

参数名 默认值 说明
acks “1” 分区中必须要有多少副本收到消息,生产者才认为消息成功写入,”1”表示只需leader写入成功
max.request.size 1048576(1MB) 限制生产者客户端能发送的消息的最大值
retries 0 生产者发送消息的重试次数
retry.backoff.ms 100 两次重试之间的时间间隔,避免无效的频繁重试
compression.type “none” 消息的压缩方式,可选值有”gzip”,”snappy”,”lz4”
connections.max.idle.ms 540000(9分钟) 指定在多久之后关闭限制的连接
linger.ms 0 生产者发送ProducerBatch之前等待更多消息(ProducerRecord)加入ProducerBatch的时间
receive.buffer.bytes 32768(32KB) Socket接收消息缓冲区(SO_RECBUF)的大小,-1则使用操作系统默认值
send.buffer.bytes 131072(128KB) Socket发送消息缓冲区(SO_SNDBUF)的大小,-1则使用操作系统默认值
request.timeout.ms 30000 生产者等待请求响应的最长时间
bootstrap.servers “” 连接Kafka集群所需的broker地址清单
key.serializer “” 消息中key对应的序列化类,需实现Serializer接口
value.serializer “” 消息中value对应的序列化类,需实现Serializer接口
buffer.memory 33554432(32MB) 生产者客户端中用于缓存消息的缓冲区大小
batch.size 16384(16KB) 指定ProducerBatch可以复用内存区域的大小
client.id “” 设定KafkaProducer对应的客户端id
max.block.ms 60000 控制KafkaProducersend()方法和partitionsFor()方法的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞
partitioner.class DefaultPartitioner 用来指定分区器,需实现Partitioner接口
enable.idempotence false 是否开启幂等性功能
interceptor.classes “” 用来指定生产者拦截器,需实现ProducerInterceptor接口
max.in.flight.requests.per.connection 5 限制每个连接(客户端与Node之前的连接)最多缓存的请求数
metadata.max.age.ms 300000(5分钟) 如果在该时间内元数据没有更新的话会被强制更新
transactional.id null 设置事务id,必须唯一

消费者

点对点消费:多个消费者指定同一个消费组id,主题的每个分区均衡分配到一个消费者,每条消息只会被一个消费者处理;

广播消费:每个消费者指定不同的消费组id,主题的消息会广播给每个消费者,每条消息会被所有消费者处理;

必要参数配置

1
2
3
4
5
6
7
8
9
10
11
12
13
public static final String brokerList = "192.168.32.128:9092";
public static final String topic = "topic-demo";
public static final String groupId = "group.demo";

public static Properties initConfig() {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer.client.id.demo");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return properties;
}

订阅主题与分区

订阅

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static final AtomicBoolean isRunning = new AtomicBoolean(true);

public static void main(String[] args) {
Properties props = initConfig();
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topic));
try {
while (isRunning.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println("topic=" + record.topic() + ", partition=" + record.partition() + ", offset=" + record.offset());
System.out.println("key=" + record.key() + ", value=" + record.value());
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}

1.订阅主题集合

1
consumer.subscribe(Arrays.asList(topic));

2.订阅与正则表达式匹配的主题。如果有新的主题被创建并且名字和正则表达式匹配,那么这个消费者可以消费新添加的主题的消息

1
consumer.subscribe(Pattern.compile("topic.*"));

3.订阅主题指定分区,只订阅主题中分区编号为0的消息:

1
consumer.assign(Arrays.asList(new TopicPartition(topic, 0)));

使用该方法需要事先知道主题中有多少个分区,可通过partitionsFor方法查询主题的元数据信息

1
List<PartitionInfo> partitionsFor(String topic);

取消订阅

执行取消订阅方法,或者订阅参数传递空集合:

1
2
3
consumer.unsubscribe();
consumer.subscribe(new ArrayList<String>());
consumer.assign(new ArrayList<TopicPartition>());

订阅状态

  • AUTO_TOPICS:集合订阅方式subscribe(Collection)
  • AUTO_PATTERN:正则表达式订阅方式subscribe(Pattern)
  • USER_ASSIGNED:指定分区的订阅方式assign(Collection)
  • NONE:没有订阅

一个消费者只能使用其中一种订阅方式,否则会报出IllegalStateException异常,三种订阅状态是互斥的。

反序列化

自定义反序列化的实现,对应于生产者中的CompanySerializer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class CompanyDeserializer implements Deserializer<Company> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}

@Override
public Company deserialize(String topic, byte[] data) {
if (data == null) {
return null;
}
if (data.length < 8) {
throw new SerializationException("Size of data received is shorter than expected!");
}
ByteBuffer buffer = ByteBuffer.wrap(data);
int nameLen, addressLen;
String name, address;
nameLen = buffer.getInt();
byte[] nameBytes = new byte[nameLen];
buffer.get(nameBytes);
addressLen = buffer.getInt();
byte[] addressBytes = new byte[addressLen];
buffer.get(addressBytes);
try {
name = new String(nameBytes, "UTF-8");
address = new String(addressBytes, "UTF-8");
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error occur when deserialize!");
}
return new Company(name, address);
}

@Override
public void close() {
}
}

设置自定义序列化属性:

1
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CompanyDeserializer.class.getName());

更常见的做法是使用通用的工具来实现序列化和反序列化,例如Protostuff工具

引入依赖

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-core</artifactId>
<version>1.5.4</version>
</dependency>
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-runtime</artifactId>
<version>1.5.4</version>
</dependency>

序列化操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
public byte[] serialize(String s, Company data) {
if (data == null) {
return null;
}
Schema schema = RuntimeSchema.getSchema(data.getClass());
LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
byte[] protostuff = null;
try {
protostuff = ProtostuffIOUtil.toByteArray(data, schema, buffer);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
} finally {
buffer.clear();
}
return protostuff;
}

反序列化操作:

1
2
3
4
5
6
7
8
9
10
@Override
public Company deserialize(String topic, byte[] data) {
if (data == null) {
return null;
}
Schema<Company> schema = RuntimeSchema.getSchema(Company.class);
Company company = new Company();
ProtostuffIOUtil.mergeFrom(data, company, schema);
return company;
}

消息消费

按分区维度消费:

1
2
3
4
5
6
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));            
for (TopicPartition tp : records.partitions()) {
for (ConsumerRecord<String, String> record : records.records(tp)) {
System.out.println(record.partition() + " : " + record.value());
}
}

按主题维度消费:

1
2
3
4
5
for (String topic : Arrays.asList("topic1", "topic2")) {
for (ConsumerRecord<String, String> record : records.records(topic)) {
System.out.println(record.topic() + " : " + record.value());
}
}

位移提交

自动提交

kafka默认的消费位移的提交方式是自动提交,消费者每隔5秒会将拉取到的每个分区最大的消息位移进行提交,由消费者客户端参数配置:

1
2
enable.auto.commit=true
auto.commit.interval.ms=5000

手动提交

关闭自动提交配置

1
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

同步提交,提交poll获取的最大消息位移

1
2
3
4
5
6
7
8
9
10
11
12
final int minBatchSize = 200;
List<ConsumerRecord> buffer = new ArrayList<>();
while (isRunning.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
consumer.commitSync();
buffer.clear();
}
}

同步提交带参数,提交主题分区的指定位移

1
2
3
long offset = record.offset();
TopicPartition partition = new TopicPartition(record.topic(), record.partition());
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(offset + 1)));

按分区粒度同步提交消费位移:

1
2
3
4
5
6
7
8
9
10
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (TopicPartition tp : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(tp);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(record.partition() + " : " + record.value());
}
long lastConsumedOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(lastConsumedOffset + 1)));
}
}

异步提交,指定回调函数

1
2
3
4
5
6
7
8
9
10
11
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if(exception==null){
System.out.println(offsets);
}
else{
exception.printStackTrace();
}
}
});

控制或关闭消费

暂停和恢复消费

1
2
public void pause(Collections<TopicPartition> partitions)
public void resume(Collections<TopicPartition> partitions)

获取被暂停的分区集合

1
public Set<TopicPartition> paused()

利用wakeup方法从poll中返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
try {
while (isRunning.get()) {
consumer.poll();
...
}
}
catch (WakeupException e){
//忽略错误,退出循环
}
catch (Exception e) {
e.printStackTrace();
} finally {
//这里可提交消费位移
consumer.close();
}

...
consumer.wakeup();

指定位移消费

当消费者找不到记录的消费位移时(比如新加入的消费者),会根据auto.offset.reset配置决定从何处开始进行消费:

  • latest(默认值):表示从分区末尾开始消费;
  • earliest:表示从起始(0)开始消费;
  • none:查找不到消费位移的时候,抛出NoOffsetForPartitionException异常;

seek方法可以指定从分区的哪个位置开始消费,执行seek()方法之前必须先执行一次poll()方法,因为只能重置消费者分配到的分区的消费位置,而分区的分配是在poll方法中实现的(poll时间过短也不行,有可能还没分配到分区就返回了)。

1
2
3
4
5
6
7
8
9
10
11
12
13
consumer.subscribe(Arrays.asList(topic));        
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size() == 0) { //如果不为0,说明成功分配到了分区
consumer.poll(Duration.ofMillis(100));
assignment = consumer.assignment(); //获取分配到的分区
}
for (TopicPartition tp : assignment) {
consumer.seek(tp, 10); //指定拉取位移
}
while (isRunning.get()) { //再次拉取数据
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
...
}

还可以指定从分区末尾开始消费,先通过endOffsets方法获取到分区末尾的消息位置

1
2
3
4
Map<TopicPartition, Long> offsets = consumer.endOffsets(assignment);
for (TopicPartition tp : assignment) {
consumer.seek(tp, offsets.get(tp)); //这里返回的offset是将要写入最新消息的位置,不需要再加1了
}

也可以直接seek到分区消息的开头和结尾

1
2
public void seekToBeginning(Collection<TopicPartition> partitions)
public void seekToEnd(Collection<TopicPartition> partitions)

可通过时间戳调用offsetsForTimes方法查询消息位移,该方法会返回时间戳大于等于待查询时间的第一条消息对应的位置和时间戳

1
2
3
4
5
6
7
8
9
10
11
Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
for (TopicPartition tp : assignment) {
timestampToSearch.put(tp, System.currentTimeMillis() - 24 * 3600 * 1000); //查询一天前的消息
}
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestampToSearch);
for (TopicPartition tp : assignment) {
OffsetAndTimestamp offsetAndTimestamp = offsets.get(tp);
if (offsetAndTimestamp != null) {
consumer.seek(tp, offsetAndTimestamp.offset());
}
}

再均衡

再均衡是指分区的所属权从一个消费者转移到另一个消费者的行为,消费者订阅主题时可以指定再均衡的自定义行为:

  • onPartitionsRevoked:该方法会再再均衡开始之前和消费者停止读取消息之后被调用,参数partitions表示再均衡前所分配到的分区;

  • onPartitionsAssigned:该方法会再重新分配分区之后和消费者开始读取消费之前被调用,参数partitions表示再均衡后分配到的分区。

1
2
3
4
5
6
7
8
9
10
consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions){
//将位移存入数据库
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
//从数据库中读取位移
}
});

消费者拦截器

自定义拦截器的实现,如果消息的时间戳与当前时间相差10秒则判定过期,不投递给具体消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class ConsumerInterceptorTTL implements ConsumerInterceptor<String, String> {
private static final long EXPIRE_INTERVAL = 10 * 1000;

@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
long now = System.currentTimeMillis();
Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashMap<>();
for (TopicPartition tp : records.partitions()) {
List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
List<ConsumerRecord<String, String>> newTpRecords = new ArrayList<>();
for (ConsumerRecord<String, String> record : tpRecords) {
if (now - record.timestamp() < EXPIRE_INTERVAL) {
newTpRecords.add(record);
}
}
if (!newTpRecords.isEmpty()) {
newRecords.put(tp, newTpRecords);
}
}
return new ConsumerRecords<>(newRecords);
}

@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
offsets.forEach((tp, offset) -> System.out.println(tp + ":" + offset.offset()));
}

@Override
public void close() {
}

@Override
public void configure(Map<String, ?> configs) {
}
}
  • onConsume:再poll()方法返回之前,调用此方法对消息进行定制化操作;
  • onCommit:提交完消费位移之后调用该方法,可用来记录跟踪所提交的位移信息;
  • close:在关闭拦截器时执行一些资源的清理工作;

多线程实现

KafkaProducer是线程安全的,但KafkaConsumer是非线程安全的,如果有多个线程操作同一个KafkaConsumer对象,会抛出异常。

示例一:每个线程对应一个消费者,每个消费者负责拉取自己的消息并处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public static void main(String[] args) {
Properties properties = initConfig();
int consumerThreadNum = 4;
for (int i = 0; i < consumerThreadNum; i++) {
new KafkaConsumerThread(properties, topic).start();
}
}

public class KafkaConsumerThread extends Thread {
private KafkaConsumer<String, String> kafkaConsumer;

public KafkaConsumerThread(Properties properties, String topic) {
this.kafkaConsumer = new KafkaConsumer<>(properties);
this.kafkaConsumer.subscribe(Arrays.asList(topic));
}

@Override
public void run() {
try {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
//处理消息
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
kafkaConsumer.close();
}
}
}

示例二:一个消费线程负责拉取所有消息,线程池负责处理消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

public static void main(String[] args) {
Properties properties = initConfig();
KafkaConsumerThread consumerThread = new KafkaConsumerThread(properties, topic, Runtime.getRuntime().availableProcessors());
consumerThread.start();
}

public class KafkaConsumerThread extends Thread {
private KafkaConsumer<String, String> kafkaConsumer;
private ExecutorService executorService;
private int threadNumber;
private Map<TopicPartition, OffsetAndMetadata> offsets;

public KafkaConsumerThread(Properties properties, String topic, int threadNumber) {
kafkaConsumer = new KafkaConsumer<>(properties);
kafkaConsumer.subscribe(Collections.singletonList(topic));
offsets = new HashMap<>();
this.threadNumber = threadNumber;
executorService = new ThreadPoolExecutor(threadNumber, threadNumber, 0L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1000),
new ThreadPoolExecutor.CallerRunsPolicy());
}

@Override
public void run() {
try {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
executorService.submit(new RecordsHandler(records, offsets));
//手动提交位移
synchronized (offsets) {
if (!offsets.isEmpty()) {
kafkaConsumer.commitSync(offsets);
offsets.clear();
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
kafkaConsumer.close();
}
}
}

public class RecordsHandler extends Thread {
public final ConsumerRecords<String, String> records;
private Map<TopicPartition, OffsetAndMetadata> offsets;

public RecordsHandler(ConsumerRecords<String, String> records, Map<TopicPartition, OffsetAndMetadata> offsets) {
this.records = records;
this.offsets = offsets;
}

@Override
public void run() {
for (TopicPartition tp : records.partitions()) {
List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
for (ConsumerRecord<String, String> record : records) {
//处理消息
}
long lastConsumedOffset = tpRecords.get(tpRecords.size() - 1).offset();
//每个线程记录自己的提交位移
synchronized (offsets) {
if (!offsets.containsKey(tp)) {
offsets.put(tp, new OffsetAndMetadata(lastConsumedOffset + 1));
} else {
long position = offsets.get(tp).offset();
if (position < lastConsumedOffset + 1) {
offsets.put(tp, new OffsetAndMetadata(lastConsumedOffset + 1));
}
}
}
}
}
}

注意ThreadPoolExecutor线程池设置的CallerRunsPolicy策略,表示当线程池的总体消费能力跟不上poll拉取能力时,由调用线程来进行消费消息。

此方法中位移提交的方式存在数据丢失的风险,如果一个线程正在处理offset为0-99的消息,另一个线程已经处理完offset为100-199的消息并进行了位移提交,而线程1发生了异常,但之后的消费会从200开始。如果保存小的位移,则又存在重复消费的问题。更好的方式是基于滑动窗口实现。

重要的消费者参数

参数名 默认值 说明
fetch.min.bytes 1 消费者一次拉取请求poll方法中能拉取的最小数据量
fetch.max.bytes 52428800(50MB) 消费者一次拉取请求poll方法中能拉取的最大数据量
fetch.max.wait.ms 500 如果没有足够多的消息满足不了fetch.min.bytes的数据量,最多等待时间
max.partition.fetch.bytes 1048576(1MB) 消费者一次拉取中每个分区返回的最大数据量
max.poll.records 500 消费者一次拉取的最大消息数
connections.max.idle.ms 540000(9分钟) 指定在多久之后关闭限制的连接
exclude.internal.topics true 内部主题__consumer_offsets__transaction_state是否向消费者公开
设置为true不能使用正则表达式的方式订阅内部主题
receive.buffer.bytes 65536(64KB) Socket接收消息缓冲区(SO_RECBUF)的大小,-1则使用操作系统默认值
send.buffer.bytes 131072(128KB) Socket发送消息缓冲区(SO_SNDBUF)的大小,-1则使用操作系统默认值
request.timeout.ms 30000 消费者等待请求响应的最长时间
metadata.max.age.ms 300000(5分钟) 如果在该时间内元数据没有更新的话会被强制更新
reconnect.backoff.ms 50 消费者尝试重新连接指定主机之前的等待(退避)时间,避免频繁连接主机
retry.backoff.ms 100 消费者尝试重新发送失败的请求到指定的主题分区之前的等待(退避)时间
isolation.level “read_uncommitted” 配置read_committed会忽略事务未提交的消息,只能消费到LSO(LastStableOffset)位置
配置read_uncommitted可以消费到HW(High Watermark)位置
bootstrap.servers “” 连接Kafka集群所需的broker地址清单
key.deserializer “” 消息中key对应的反序列化类,需实现Deserializer接口
value.deserializer “” 消息中value对应的反序列化类,需实现Deserializer接口
client.id “” 设定KafkaProducer对应的客户端id
group.id “” 消费者所隶属的消费组唯一标识
heartbeat.interval.ms 3000 当使用Kafka的分组管理功能时,心跳到消费者协调器之间的预计时间
session.timeout.ms 10000 组管理协议中用来检测消费者是否失效的超时时间
max.poll.interval.ms 300000 指定拉取消息线程最长空闲时间,若超过这个时间还没有发起poll操作,则消费组认为该消费者已离开了消费组,将进行再均衡操作
auto.offset.reset latest 消费者找不到记录的消费位移时从何处开始消费,earliest(开头),latest(结尾),none(报错)
enable.auto.commit true 是否开启自动提交消费位移的功能
auto.commit.interval.ms 5000 开启自动提交消费位移时自动提交消费位移的时间间隔
partition.assignment.strategy RangeAssignor 消费者的分区分配策略
interceptor.class “” 配置客户端拦截器,需实现ConsumerInterceptor接口

主题管理

如果broker端配置参数auto.create.topics.enable设置为true(默认为true),那么当生产者向一个尚未创建的主题发送消息时,会自动创建一个分区数为num.partitions(默认值为1)、副本因子为default.replication.factor(默认值为1)的主题。

除此之外,当一个消费者从未知主题读取消息,或者当任意一个客户端向未知主题发送元数据请求时,都会创建一个相应的主题。

很多时候,这种自动创建主题的行为都是非预期的,不建议将auto.create.topics.enable设置为true,会增加主题的管理与维护的难度。

创建主题

创建一个分区数为4,副本因子为2,主题名为topic-create的主题

1
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topic-create --replication-factor 2 --partitions 4

手动指定分区副本的分配方案,replica-assignment参数指定了创建4个分区(以逗号分隔),每个分区有2个副本以及副本分配情况(以冒号分隔broker_id,排在前面的broker_id为leader)

1
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topic-create-same --replica-assignment 2:0,0:1,1:2,2:1

覆盖默认配置,修改了主题端的2个配置参数

1
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topic-config --replication-factor 1 --partitions 1 --config cleanup.policy=compact --config max.message.bytes=10000

主题的命名不推荐(虽然可以)使用__开头,因为以__开头的主题一般为kafka内部主题。主题的名称必须由大小写字母、数字、点号.、连接线-、下划线_组成,不能为空,不能只有.,也不能只有..,且长度不能超过249。

查看主题

1
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic-create,topic-config

describe可以额外指定参数来增加一些附加功能:

  • topics-with-overrides:会列出包含了覆盖配置的主题;

    1
    bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topics-with-overrides
  • under-replicated-partitions:查找主题中所有包含失效副本的分区,即ISR小于AR的分区;

    1
    bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic-create --under-replicated-partitions
  • unavailable-partitions:查找主题中没有leader副本的分区,即这些分区已处于离线状态,对外界来说不可用;

    1
    bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic-create --unavailable-partitions

修改主题

修改分区个数,将分区个数调整为3。注意只能增加分区数,不能减少分区数

1
bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic topic-config --partitions 3

修改主题的配置

1
bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic topic-config --config segment.bytes=1045877

删除主题的自定义配置,使其恢复为默认值

1
bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic topic-config --delete-config segment.bytes --delete-config cleanup.policy

删除主题

1
bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic topic-delete

参数说明

主题管理脚本kafka-topics.sh的更多参数说明:

参数名 说明
alter 用于修改主题,包括分区数及主题的配置
config <键值对> 创建或修改主题时,用于设置主题级别的参数
create 创建主题
delete 删除主题
delete-config <配置名称> 删除主题级别被覆盖的配置
describe 查看主题的详细信息
disable-rack-aware 创建主题时不考虑机架信息
help 打印帮助信息
if-exists 修改或删除主题时使用,只有当主题存在时才会执行动作
if-not-exists 创建主题时使用,只有主题不存在时才会执行动作
list 列出所有可用的主题
partitions <分区数> 创建主题或增加分区时指定分区数
replica-assignment <分配方案> 手工指定分区副本分配方案
replication-factor <副本数> 创建主题时指定副本因子
topic <主题名称> 指定主题名称
topics-with-overrides 使用describe查看主题信息时,只展示包含覆盖配置的主题
unavailable-partitions 使用describe查看主题信息时,只展示包含没有leader副本的分区
under-replicated-partitions 使用describe查看主题信息时,只展示包含失效副本的分区
bootstrap-server 指定连接的broker地址信息

配置管理

kafka-configs.sh脚本专门用来对配置进行操作的。它不仅可支持操作主题相关的配置,还可以支持操作broker、用户和客户端这些类型的配置。

查看配置参数

1
bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type topics --entity-name topic-config

添加/修改配置参数

1
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name topic-config --add-config cleanup.policy=compact,max.message.bytes=10000

删除配置

1
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name topic-config --delete-config cleanup.policy,max.message.bytes

配置对象映射

配置类型 entity-type取值 entity-name取值
主题 topics 主题名称
broker brokers broker中broker.id参数配置的值
客户端 clients 生产者或消费者client.id参数配置的值
用户 users 用户名

主题配置说明

创建主题时,若没有指定配置参数,则会使用broker端对应参数作为其默认值。部分主题配置参数对照关系如下:

主题端参数 对应的broker端参数 说明
cleanup.policy log.cleanup.policy 日志压缩策略。默认值为delete,还可以配置为compact
delete.retention.ms log.cleaner.delete.retention.ms 被标识为删除的数据能够保留多久,默认值为86400000(1天)
max.message.bytes message.max.bytes 消息的最大字节数,默认值为1000012
message.timestamp.type log.message.timestamp.type 消息的时间戳类型,默认值为CreateTime,还可以配置为LogAppendTime

使用客户端操作主题

除了利用脚本来管理主题,也可以在JAVA代码中使用AdminClient对象来管理主题。

创建一个分区数为4,副本因子为1的主题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
AdminClient client = AdminClient.create(properties);
//自动指定分区副本分配方案
//NewTopic newTopic = new NewTopic(topic, 4, (short) 1);
//或者手动指定分区副本分配方案
Map<Integer, List<Integer>> replicasAssignments = new HashMap<>();
replicasAssignments.put(0, Arrays.asList(0));
replicasAssignments.put(1, Arrays.asList(0));
replicasAssignments.put(2, Arrays.asList(0));
replicasAssignments.put(3, Arrays.asList(0));
NewTopic newTopic = new NewTopic(topic, replicasAssign
CreateTopicsResult result = client.createTopics(Collections.singleton(newTopic));
try {
result.all().get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
client.close();

覆盖主题默认配置:

1
2
3
4
NewTopic newTopic = new NewTopic(topic, replicasAssignments);
Map<String, String> configs = new HashMap<>();
configs.put("cleanup.policy", "compact");
newTopic.configs(configs);

查询主题的所有配置信息:

1
2
3
4
5
6
7
8
9
10
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
DescribeConfigsResult result = client.describeConfigs(Collections.singleton(resource));
try {
Config config = result.all().get().get(resource);
System.out.println(config);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}

修改主题配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
ConfigEntry entry = new ConfigEntry("cleanup.policy", "compact");
Config config = new Config(Collections.singleton(entry));
Map<ConfigResource, Config> configs = new HashMap<>();
configs.put(resource, config);
AlterConfigsResult result = client.alterConfigs(configs);
try {
result.all().get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}

增加分区数:

1
2
3
4
5
6
7
8
9
10
11
NewPartitions newPartitions = NewPartitions.increaseTo(5);
Map<String, NewPartitions> newPartitionsMap = new HashMap<>();
newPartitionsMap.put(topic, newPartitions);
CreatePartitionsResult result = client.createPartitions(newPartitionsMap);
try {
result.all().get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}

主题的合法性

我们一般禁止客户端直接创建主题,不利于维护,但是AdminClient却可以直接创建。kafka broker端有一个参数create.topic.policy.class.name默认为null,它提供了一个入口用来验证主题创建的合法性。可以自定义一个实现CreateTopicPolicy接口的类,然后在broker配置文件config/server.properties中设置这个参数为我们自定义的类。这个类运行在服务端,打个jar包扔到classpath里面。

限制创建主题的分区数和副本因子,如不满足则创建失败:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class PolicyDemo implements CreateTopicPolicy {
@Override
public void validate(RequestMetadata requestMetadata) throws PolicyViolationException {
if (requestMetadata.numPartitions() != null || requestMetadata.replicationFactor() != null) {
if (requestMetadata.numPartitions() < 5) {
throw new PolicyViolationException("Topic should have at least 5 partitions, received: " + requestMetadata.numPartitions());
}
if (requestMetadata.replicationFactor() <= 1) {
throw new PolicyViolationException("Topic should have at least 2 replication factor, received: " + requestMetadata.replicationFactor());
}
}
}

@Override
public void close() throws Exception {
}

@Override
public void configure(Map<String, ?> configs) {
}
}

分区管理

优先副本的选举

Kafka需要确保所有主题的优先副本在集群中均匀分布,这要就保证了所有分区的leader均衡分布。如果leader分布过于集中,就会造成集群负载不均衡。如果某个broker节点宕机,就会有其他节点的follow副本成为leader,导致分区副本分布不均衡。即使该节点恢复,它原先的leader副本也会成为follow而不再对外提供服务。

所谓优先副本选举是指通过一定方式促使优先副本选举为leader副本,以此来促进集群的负载均衡,这一行为也可以称为“分区平衡”。

Kafka提供了分区自动平衡的功能,与此对应的broker端参数是auto.leader.rebalance.enable,默认值为true,即默认开启。开启此功能,Kafka控制器会启动一个定时任务,轮询时间由参数leader.imbalance.check.interval.seconds控制,默认值300秒。如果不平衡率超过leader.imbalance.per.broker.percetage设置的比值,默认值10%,则触发优先副本选举动作以求分区平衡。

但生产环境一般会关闭该参数,因为可能在不确定的时间发生分区平衡,导致业务阻塞。

手动执行分区平衡:

1
bin/kafka-leader-election.sh --bootstrap-server localhost:9092 --election-type preferred --path-to-json-file election.json

其中election.json指定了要平衡的主题分区:

1
2
3
4
5
6
7
8
9
10
11
{
"partitions": [{
"topic": "foo",
"partition": 1
},
{
"topic": "foobar",
"partition": 2
}
]
}

分区重分配

有时候,当我们需要下线或新增broker节点,为了保证分区及副本的合理分配,则需要将副本迁移到其他节点上。这就用到了分区重分配的功能,Kafka提供了kafka-reassign-partitions.sh脚本来实现该功能。

假如我们在一个由3个broker节点组成的集群中存在一个分区数为4,副本因子为2的主题topic-reassign,现在要移除节点broker1,让主题分区重分配。

步骤一,生成重分配方案:

1
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --generate --topics-to-move-json-file reassign.json --broker-list 0,2

reassign.json文件指定了要分区重分配的主题名

1
2
3
4
5
6
{
"topics": [{
"topic": "topic-reassign"
}],
"version": 1
}

该脚本会输出2段json内容,分别表示当前分区分配方案和目标分区分配方案。前者可保存起来用于备份恢复,后者用于步骤二执行,将该json保存为project.json文件。

步骤二,执行分区重分配:

1
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --execute --reassignment-json-file project.json

再次查看主题,可发现分区已分布在指定节点上了,如果此时leader分区失衡,还可以执行优先副本选举操作进行调整。

execute替换为vertify,即可查看分区重分配的进度。

1
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --vertify --reassignment-json-file project.json

分区重分配本质在于数据复制,先增加新的副本,然后进行数据同步,最后删除旧的副本来达到最终的目的。

复制限流

副本间的复制会占用额外资源,可以对复制流量加以限制来保证重分配期间不会对集群服务造成太大影响。

broker级别有2个配置参数:

  • leader.replication.throttled.rate:设置leader副本传输的速度
  • follower.replication.throttled.rate:设置follower副本复制的速度

二者一般设置为同样的值,单位都是B/s

1
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 1 --alter --add-config follower.replication.throttled.rate=1024,leader.replication.throttled.rate=1024

在主题级别可以设置具体的被限速的副本列表,对于一个分区数为3,副本因子为2的主题topic-throttle,它的分区对应的broker_id分别为0:[0,1]1:[1,2]2:[2,0]

1
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name topic-throttle --alter --add-config leader.replication.throttled.replicas=[0:0,1:1,2:2],follower.replication.throttled.replicas=[0:1,1:2,2:0]

参数的值是一个分区副本列表[分区0:broker_id,分区1:broker_id..]

可以根据需求事先配置好这4个参数,再进行分区重分配,在数据复制时进行限流。

修改副本因子

利用分区重分配的kafka-reassign-partitions.sh文件,也可以对副本因子进行调整,可以添加也可以减少。手动修改project.json文件,在replicaslog_dirs属性中,分别加入1any,表示在分区0中新加入一个副本存储在broker1中,它的日志文件不做要求,所以填any

1
2
3
4
5
6
7
8
9
{
"partitions": [{
"topic": "topic-throttle",
"partition": 0,
"replicas": [2, 1, 0],
"log_dirs": ["any", "any", "any"]
}],
"version": 1
}