基本概念 AR:分区的所有副本集合,AR=ISR+OSR
ISR:与leader副本一定程度同步的所有副本(包括leader副本)
OSR:与leader副本同步滞后过多的副本集合
HW:高水位,所有副本都同步到的offset,消费者只能拉取HW之前的消息
LEO:Log End Offset,待写入消息的offset,即最后一条消息的offset+1
安装 JDK安装 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 tar zxvf jdk-8u181-linux-x64.tar.gz vim /etc/profileexport JAVA_HOME=/opt/jdk1.8.0_181 export JRE_HOME=$JAVA_HOME /jreexport PATH=$PATH :$JAVA_HOME /bin export CLASSPATH=.:$JAVA_HOME /lib:$JRE_HOME /libsource /etc/profile java -version
ZooKeeper安装 1 2 3 4 5 6 7 8 9 10 11 12 13 14 tar zxvf zookeeper-3.4.12.tar.gz vim /etc/profileexport ZOOKEEPER_HOME=/opt/zookeeper-3.4.12export PATH=$PATH :$ZOOKEEPER_HOME /binsource /etc/profilecd $ZOOKEEPER_HOME /conf cp zoo_sample.cfg zoo.cfg
配置文件$ZOOKEEPER_HOME/conf/zoo.cfg参考
1 2 3 4 5 6 7 8 9 10 11 12 13 tickTime =2000 initLimit =10 syncLimit =5 dataDir =/tmp/zookeeper/data dataLogDir =/tmp/zookeeper/log clientPort =2181
创建数据目录和日志目录
1 2 mkdir -p /tmp/zookeeper/data mkdir -p /tmp/zookeeper/log
创建myid文件
1 echo 1 > /tmp/zookeeper/data/myid
启动服务并查看状态
1 2 zkServer.sh start zkServer.sh status
集群配置,在配置文件中添加如下内容:
1 2 3 server.0=192.168.0.2:2888:3888 server.1=192.168.0.3:2888:3888 server.2=192.168.0.4:2888:3888
server.A=B:C:D
A 服务器的编号,myid中的值
B 服务器的IP地址
C 服务器与集群中的leader服务器交换信息的端口
D 选举时服务器相互通信的端口
Kafka安装 1 2 3 4 5 6 7 tar zxvf kafka_2.11-2.0.0.tgz vim /etc/profileexport KAFKA_HOME=/opt/kafka_2.11-2.0.0source /etc/profile
配置文件$KAFKA_HOME/conf/server.properties参考
1 2 3 4 5 6 7 8 broker.id =0 listeners =PLAINTEXT://localhost:9092 log.dirs =/tmp/kafka-logs zookeeper.connect =localhost:2181/kafka
服务端参数配置说明:
启动服务:
1 2 3 4 5 6 bin/kafka-server-start.sh config/server.properties bin/kafka-server-start.sh -daemon config/server.properties bin/kafka-server-start.sh config/server.properties &
服务端工具 创建主题,主题名为topic-demo,分区数为4,每个分区有3个副本(1个leader,2个follower)
1 bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topic-demo --replication-factor 3 --partitions 4
查看主题详细
1 bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic-demo
生产者发布主题消息
1 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-demo
消费组订阅主题消息
1 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic-demo --from-beginning
生产者 生产者和消费者客户端都需添加kafka官方maven依赖
1 2 3 4 5 <dependency > <groupId > org.apache.kafka</groupId > <artifactId > kafka-clients</artifactId > <version > 2.0.0</version > </dependency >
必要参数配置 1 2 3 4 5 6 7 8 9 10 11 public static final String brokerList = "192.168.0.100:9092" ;public static final String topic = "topic-demo" ; public static Properties initConfig () { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.demo" ); return properties; }
消息发送 1 2 3 4 5 6 7 8 9 10 11 12 13 14 public static void main (String[] args) { Properties properties = initConfig(); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties); ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "Hello,Kafka!" ); try { Future<RecordMetadata> future = producer.send(record); RecordMetadata metadata = future.get(); System.out.println(metadata.topic() + "-" + metadata.partition() + ":" + metadata.offset()); } catch (Exception e) { e.printStackTrace(); } producer.close(); }
异步模式
1 2 3 4 5 6 7 8 9 producer.send(record, new Callback() { public void onCompletion (RecordMetadata metadata, Exception e) { if (e != null ) { e.printStackTrace(); } else { System.out.println(metadata.topic() + "-" + metadata.partition() + ":" + metadata.offset()); } } });
序列化 自定义对象:
1 2 3 4 5 6 7 8 @Data @NoArgsConstructor @AllArgsConstructor @Builder public class Company { private String name; private String address; }
自定义序列化的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public class CompanySerializer implements Serializer <Company > { public void configure (Map<String, ?> map, boolean b) { } public byte [] serialize(String s, Company data) { if (data == null ) { return null ; } byte [] name, address; try { if (data.getName() != null ) { name = data.getName().getBytes("UTF-8" ); } else { name = new byte [0 ]; } if (data.getAddress() != null ) { address = data.getAddress().getBytes("UTF-8" ); } else { address = new byte [0 ]; } ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + name.length + address.length); buffer.putInt(name.length); buffer.put(name); buffer.putInt(address.length); buffer.put(address); return buffer.array(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return new byte [0 ]; } public void close () { } }
设置自定义序列化属性:
1 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CompanySerializer.class.getName());
构造自定义对象消息:
1 2 Company company = Company.builder().name("myCompany" ).address("China" ).build(); ProducerRecord<String, Company> record = new ProducerRecord<String, Company>(topic, company);
分区器 ProducerRecord中没有指定partition字段,则依赖分区器,自定义分区器的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class DemoPartitioner implements Partitioner { private final AtomicInteger counter = new AtomicInteger(0 ); public int partition (String topic, Object key, byte [] keyBytes, Object value, byte [] valueBytes, Cluster cluster) { List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic); int numPartitions = partitionInfos.size(); if (keyBytes == null ) { return counter.getAndIncrement() % numPartitions; } else { return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } } public void close () { } public void configure (Map<String, ?> map) { } }
设置分区器属性:
1 properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DemoPartitioner.class.getName());
拦截器 自定义拦截器的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class ProducerInterceptorPrefix implements ProducerInterceptor <String , String > { private volatile long sendSuccess = 0 ; private volatile long sendFailure = 0 ; public ProducerRecord<String, String> onSend (ProducerRecord<String, String> record) { String modifiedValue = "prefix1-" + record.value(); return new ProducerRecord<String, String>(record.topic(), record.partition(), record.timestamp(), record.key(), modifiedValue, record.headers()); } public void onAcknowledgement (RecordMetadata recordMetadata, Exception e) { if (e == null ) sendSuccess++; else sendFailure++; } public void close () { double successRatio = (double ) sendSuccess / (sendSuccess + sendFailure); System.out.println("发送成功率=" + String.format("%f" , successRatio % 100 ) + "%" ); } public void configure (Map<String, ?> map) { } }
设置拦截器属性,可以指定多个拦截器,多个拦截器类名用英文逗号分隔,按指定的顺序执行拦截器链:
1 properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class.getName());
onSend:在消息序列化和计算分区之前,调用此方法对消息进行定制化操作;
onAcknowlegement:在消息被应答之前或消息放送失败调用此方法,优先于用户的Callback之前执行;
close:在关闭拦截器时执行一些资源的清理工作;
顺序:拦截器->序列化->分区器
重要的生产者参数
参数名
默认值
说明
acks
“1”
分区中必须要有多少副本收到消息,生产者才认为消息成功写入,”1”表示只需leader写入成功
max.request.size
1048576(1MB)
限制生产者客户端能发送的消息的最大值
retries
0
生产者发送消息的重试次数
retry.backoff.ms
100
两次重试之间的时间间隔,避免无效的频繁重试
compression.type
“none”
消息的压缩方式,可选值有”gzip”,”snappy”,”lz4”
connections.max.idle.ms
540000(9分钟)
指定在多久之后关闭限制的连接
linger.ms
0
生产者发送ProducerBatch之前等待更多消息(ProducerRecord)加入ProducerBatch的时间
receive.buffer.bytes
32768(32KB)
Socket接收消息缓冲区(SO_RECBUF)的大小,-1则使用操作系统默认值
send.buffer.bytes
131072(128KB)
Socket发送消息缓冲区(SO_SNDBUF)的大小,-1则使用操作系统默认值
request.timeout.ms
30000
生产者等待请求响应的最长时间
bootstrap.servers
“”
连接Kafka集群所需的broker地址清单
key.serializer
“”
消息中key对应的序列化类,需实现Serializer接口
value.serializer
“”
消息中value对应的序列化类,需实现Serializer接口
buffer.memory
33554432(32MB)
生产者客户端中用于缓存消息的缓冲区大小
batch.size
16384(16KB)
指定ProducerBatch可以复用内存区域的大小
client.id
“”
设定KafkaProducer对应的客户端id
max.block.ms
60000
控制KafkaProducer中send()方法和partitionsFor()方法的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞
partitioner.class
DefaultPartitioner
用来指定分区器,需实现Partitioner接口
enable.idempotence
false
是否开启幂等性功能
interceptor.classes
“”
用来指定生产者拦截器,需实现ProducerInterceptor接口
max.in.flight.requests.per.connection
5
限制每个连接(客户端与Node之前的连接)最多缓存的请求数
metadata.max.age.ms
300000(5分钟)
如果在该时间内元数据没有更新的话会被强制更新
transactional.id
null
设置事务id,必须唯一
消费者 点对点消费:多个消费者指定同一个消费组id,主题的每个分区均衡分配到一个消费者,每条消息只会被一个消费者处理;
广播消费:每个消费者指定不同的消费组id,主题的消息会广播给每个消费者,每条消息会被所有消费者处理;
必要参数配置 1 2 3 4 5 6 7 8 9 10 11 12 13 public static final String brokerList = "192.168.32.128:9092" ;public static final String topic = "topic-demo" ;public static final String groupId = "group.demo" ;public static Properties initConfig () { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer.client.id.demo" ); properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); return properties; }
订阅主题与分区 订阅 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public static final AtomicBoolean isRunning = new AtomicBoolean(true );public static void main (String[] args) { Properties props = initConfig(); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList(topic)); try { while (isRunning.get()) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000 )); for (ConsumerRecord<String, String> record : records) { System.out.println("topic=" + record.topic() + ", partition=" + record.partition() + ", offset=" + record.offset()); System.out.println("key=" + record.key() + ", value=" + record.value()); } } } catch (Exception e) { e.printStackTrace(); } finally { consumer.close(); } }
1.订阅主题集合
1 consumer.subscribe(Arrays.asList(topic));
2.订阅与正则表达式匹配的主题。如果有新的主题被创建并且名字和正则表达式匹配,那么这个消费者可以消费新添加的主题的消息
1 consumer.subscribe(Pattern.compile("topic.*" ));
3.订阅主题指定分区,只订阅主题中分区编号为0的消息:
1 consumer.assign(Arrays.asList(new TopicPartition(topic, 0 )));
使用该方法需要事先知道主题中有多少个分区,可通过partitionsFor方法查询主题的元数据信息
1 List<PartitionInfo> partitionsFor (String topic) ;
取消订阅 执行取消订阅方法,或者订阅参数传递空集合:
1 2 3 consumer.unsubscribe(); consumer.subscribe(new ArrayList<String>()); consumer.assign(new ArrayList<TopicPartition>());
订阅状态
AUTO_TOPICS:集合订阅方式subscribe(Collection)
AUTO_PATTERN:正则表达式订阅方式subscribe(Pattern)
USER_ASSIGNED:指定分区的订阅方式assign(Collection)
NONE:没有订阅
一个消费者只能使用其中一种订阅方式,否则会报出IllegalStateException异常,三种订阅状态是互斥的。
反序列化 自定义反序列化的实现,对应于生产者中的CompanySerializer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public class CompanyDeserializer implements Deserializer <Company > { @Override public void configure (Map<String, ?> configs, boolean isKey) { } @Override public Company deserialize (String topic, byte [] data) { if (data == null ) { return null ; } if (data.length < 8 ) { throw new SerializationException("Size of data received is shorter than expected!" ); } ByteBuffer buffer = ByteBuffer.wrap(data); int nameLen, addressLen; String name, address; nameLen = buffer.getInt(); byte [] nameBytes = new byte [nameLen]; buffer.get(nameBytes); addressLen = buffer.getInt(); byte [] addressBytes = new byte [addressLen]; buffer.get(addressBytes); try { name = new String(nameBytes, "UTF-8" ); address = new String(addressBytes, "UTF-8" ); } catch (UnsupportedEncodingException e) { throw new SerializationException("Error occur when deserialize!" ); } return new Company(name, address); } @Override public void close () { } }
设置自定义序列化属性:
1 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CompanyDeserializer.class.getName());
更常见的做法是使用通用的工具来实现序列化和反序列化,例如Protostuff工具
引入依赖
1 2 3 4 5 6 7 8 9 10 <dependency > <groupId > io.protostuff</groupId > <artifactId > protostuff-core</artifactId > <version > 1.5.4</version > </dependency > <dependency > <groupId > io.protostuff</groupId > <artifactId > protostuff-runtime</artifactId > <version > 1.5.4</version > </dependency >
序列化操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Override public byte [] serialize(String s, Company data) { if (data == null ) { return null ; } Schema schema = RuntimeSchema.getSchema(data.getClass()); LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); byte [] protostuff = null ; try { protostuff = ProtostuffIOUtil.toByteArray(data, schema, buffer); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } finally { buffer.clear(); } return protostuff; }
反序列化操作:
1 2 3 4 5 6 7 8 9 10 @Override public Company deserialize (String topic, byte [] data) { if (data == null ) { return null ; } Schema<Company> schema = RuntimeSchema.getSchema(Company.class); Company company = new Company(); ProtostuffIOUtil.mergeFrom(data, company, schema); return company; }
消息消费 按分区维度消费:
1 2 3 4 5 6 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000 )); for (TopicPartition tp : records.partitions()) { for (ConsumerRecord<String, String> record : records.records(tp)) { System.out.println(record.partition() + " : " + record.value()); } }
按主题维度消费:
1 2 3 4 5 for (String topic : Arrays.asList("topic1" , "topic2" )) { for (ConsumerRecord<String, String> record : records.records(topic)) { System.out.println(record.topic() + " : " + record.value()); } }
位移提交 自动提交 kafka默认的消费位移的提交方式是自动提交,消费者每隔5秒会将拉取到的每个分区最大的消息位移进行提交,由消费者客户端参数配置:
1 2 enable.auto.commit =true auto.commit.interval.ms =5000
手动提交 关闭自动提交配置
1 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false );
同步提交,提交poll获取的最大消息位移
1 2 3 4 5 6 7 8 9 10 11 12 final int minBatchSize = 200 ; List<ConsumerRecord> buffer = new ArrayList<>();while (isRunning.get()) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000 )); for (ConsumerRecord<String, String> record : records) { buffer.add(record); } if (buffer.size() >= minBatchSize) { consumer.commitSync(); buffer.clear(); } }
同步提交带参数,提交主题分区的指定位移
1 2 3 long offset = record.offset(); TopicPartition partition = new TopicPartition(record.topic(), record.partition()); consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(offset + 1 )));
按分区粒度同步提交消费位移:
1 2 3 4 5 6 7 8 9 10 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000 ));for (TopicPartition tp : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(tp); for (ConsumerRecord<String, String> record : partitionRecords) { System.out.println(record.partition() + " : " + record.value()); } long lastConsumedOffset = partitionRecords.get(partitionRecords.size() - 1 ).offset(); consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(lastConsumedOffset + 1 ))); } }
异步提交,指定回调函数
1 2 3 4 5 6 7 8 9 10 11 consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete (Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception==null ){ System.out.println(offsets); } else { exception.printStackTrace(); } } });
控制或关闭消费 暂停和恢复消费
1 2 public void pause (Collections<TopicPartition> partitions) public void resume (Collections<TopicPartition> partitions)
获取被暂停的分区集合
1 public Set<TopicPartition> paused ()
利用wakeup方法从poll中返回
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 try { while (isRunning.get()) { consumer.poll(); ... } } catch (WakeupException e){ }catch (Exception e) { e.printStackTrace(); } finally { consumer.close(); } ... consumer.wakeup();
指定位移消费 当消费者找不到记录的消费位移时(比如新加入的消费者),会根据auto.offset.reset配置决定从何处开始进行消费:
latest(默认值):表示从分区末尾开始消费;
earliest:表示从起始(0)开始消费;
none:查找不到消费位移的时候,抛出NoOffsetForPartitionException异常;
seek方法可以指定从分区的哪个位置开始消费,执行seek()方法之前必须先执行一次poll()方法,因为只能重置消费者分配到的分区的消费位置,而分区的分配是在poll方法中实现的(poll时间过短也不行,有可能还没分配到分区就返回了)。
1 2 3 4 5 6 7 8 9 10 11 12 13 consumer.subscribe(Arrays.asList(topic)); Set<TopicPartition> assignment = new HashSet<>();while (assignment.size() == 0 ) { consumer.poll(Duration.ofMillis(100 )); assignment = consumer.assignment(); }for (TopicPartition tp : assignment) { consumer.seek(tp, 10 ); }while (isRunning.get()) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000 )); ... }
还可以指定从分区末尾开始消费,先通过endOffsets方法获取到分区末尾的消息位置
1 2 3 4 Map<TopicPartition, Long> offsets = consumer.endOffsets(assignment);for (TopicPartition tp : assignment) { consumer.seek(tp, offsets.get(tp)); }
也可以直接seek到分区消息的开头和结尾
1 2 public void seekToBeginning (Collection<TopicPartition> partitions) public void seekToEnd (Collection<TopicPartition> partitions)
可通过时间戳调用offsetsForTimes方法查询消息位移,该方法会返回时间戳大于等于 待查询时间的第一条消息对应的位置和时间戳
1 2 3 4 5 6 7 8 9 10 11 Map<TopicPartition, Long> timestampToSearch = new HashMap<>();for (TopicPartition tp : assignment) { timestampToSearch.put(tp, System.currentTimeMillis() - 24 * 3600 * 1000 ); } Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestampToSearch);for (TopicPartition tp : assignment) { OffsetAndTimestamp offsetAndTimestamp = offsets.get(tp); if (offsetAndTimestamp != null ) { consumer.seek(tp, offsetAndTimestamp.offset()); } }
再均衡 再均衡是指分区的所属权从一个消费者转移到另一个消费者的行为,消费者订阅主题时可以指定再均衡的自定义行为:
1 2 3 4 5 6 7 8 9 10 consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked (Collection<TopicPartition> partitions) { } @Override public void onPartitionsAssigned (Collection<TopicPartition> partitions) { } });
消费者拦截器 自定义拦截器的实现,如果消息的时间戳与当前时间相差10秒则判定过期,不投递给具体消费者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public class ConsumerInterceptorTTL implements ConsumerInterceptor <String , String > { private static final long EXPIRE_INTERVAL = 10 * 1000 ; @Override public ConsumerRecords<String, String> onConsume (ConsumerRecords<String, String> records) { long now = System.currentTimeMillis(); Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashMap<>(); for (TopicPartition tp : records.partitions()) { List<ConsumerRecord<String, String>> tpRecords = records.records(tp); List<ConsumerRecord<String, String>> newTpRecords = new ArrayList<>(); for (ConsumerRecord<String, String> record : tpRecords) { if (now - record.timestamp() < EXPIRE_INTERVAL) { newTpRecords.add(record); } } if (!newTpRecords.isEmpty()) { newRecords.put(tp, newTpRecords); } } return new ConsumerRecords<>(newRecords); } @Override public void onCommit (Map<TopicPartition, OffsetAndMetadata> offsets) { offsets.forEach((tp, offset) -> System.out.println(tp + ":" + offset.offset())); } @Override public void close () { } @Override public void configure (Map<String, ?> configs) { } }
onConsume:再poll()方法返回之前,调用此方法对消息进行定制化操作;
onCommit:提交完消费位移之后调用该方法,可用来记录跟踪所提交的位移信息;
close:在关闭拦截器时执行一些资源的清理工作;
多线程实现 KafkaProducer是线程安全的,但KafkaConsumer是非线程安全的,如果有多个线程操作同一个KafkaConsumer对象,会抛出异常。
示例一:每个线程对应一个消费者,每个消费者负责拉取自己的消息并处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public static void main (String[] args) { Properties properties = initConfig(); int consumerThreadNum = 4 ; for (int i = 0 ; i < consumerThreadNum; i++) { new KafkaConsumerThread(properties, topic).start(); } }public class KafkaConsumerThread extends Thread { private KafkaConsumer<String, String> kafkaConsumer; public KafkaConsumerThread (Properties properties, String topic) { this .kafkaConsumer = new KafkaConsumer<>(properties); this .kafkaConsumer.subscribe(Arrays.asList(topic)); } @Override public void run () { try { while (true ) { ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100 )); for (ConsumerRecord<String, String> record : records) { } } } catch (Exception e) { e.printStackTrace(); } finally { kafkaConsumer.close(); } } }
示例二:一个消费线程负责拉取所有消息,线程池负责处理消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false );public static void main (String[] args) { Properties properties = initConfig(); KafkaConsumerThread consumerThread = new KafkaConsumerThread(properties, topic, Runtime.getRuntime().availableProcessors()); consumerThread.start(); }public class KafkaConsumerThread extends Thread { private KafkaConsumer<String, String> kafkaConsumer; private ExecutorService executorService; private int threadNumber; private Map<TopicPartition, OffsetAndMetadata> offsets; public KafkaConsumerThread (Properties properties, String topic, int threadNumber) { kafkaConsumer = new KafkaConsumer<>(properties); kafkaConsumer.subscribe(Collections.singletonList(topic)); offsets = new HashMap<>(); this .threadNumber = threadNumber; executorService = new ThreadPoolExecutor(threadNumber, threadNumber, 0L , TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000 ), new ThreadPoolExecutor.CallerRunsPolicy()); } @Override public void run () { try { while (true ) { ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100 )); if (!records.isEmpty()) { executorService.submit(new RecordsHandler(records, offsets)); synchronized (offsets) { if (!offsets.isEmpty()) { kafkaConsumer.commitSync(offsets); offsets.clear(); } } } } } catch (Exception e) { e.printStackTrace(); } finally { kafkaConsumer.close(); } } }public class RecordsHandler extends Thread { public final ConsumerRecords<String, String> records; private Map<TopicPartition, OffsetAndMetadata> offsets; public RecordsHandler (ConsumerRecords<String, String> records, Map<TopicPartition, OffsetAndMetadata> offsets) { this .records = records; this .offsets = offsets; } @Override public void run () { for (TopicPartition tp : records.partitions()) { List<ConsumerRecord<String, String>> tpRecords = records.records(tp); for (ConsumerRecord<String, String> record : records) { } long lastConsumedOffset = tpRecords.get(tpRecords.size() - 1 ).offset(); synchronized (offsets) { if (!offsets.containsKey(tp)) { offsets.put(tp, new OffsetAndMetadata(lastConsumedOffset + 1 )); } else { long position = offsets.get(tp).offset(); if (position < lastConsumedOffset + 1 ) { offsets.put(tp, new OffsetAndMetadata(lastConsumedOffset + 1 )); } } } } } }
注意ThreadPoolExecutor线程池设置的CallerRunsPolicy策略,表示当线程池的总体消费能力跟不上poll拉取能力时,由调用线程来进行消费消息。
此方法中位移提交的方式存在数据丢失的风险,如果一个线程正在处理offset为0-99的消息,另一个线程已经处理完offset为100-199的消息并进行了位移提交,而线程1发生了异常,但之后的消费会从200开始。如果保存小的位移,则又存在重复消费的问题。更好的方式是基于滑动窗口实现。
重要的消费者参数
参数名
默认值
说明
fetch.min.bytes
1
消费者一次拉取请求poll方法中能拉取的最小数据量
fetch.max.bytes
52428800(50MB)
消费者一次拉取请求poll方法中能拉取的最大数据量
fetch.max.wait.ms
500
如果没有足够多的消息满足不了fetch.min.bytes的数据量,最多等待时间
max.partition.fetch.bytes
1048576(1MB)
消费者一次拉取中每个分区 返回的最大数据量
max.poll.records
500
消费者一次拉取的最大消息数
connections.max.idle.ms
540000(9分钟)
指定在多久之后关闭限制的连接
exclude.internal.topics
true
内部主题__consumer_offsets和__transaction_state是否向消费者公开 设置为true不能使用正则表达式的方式订阅内部主题
receive.buffer.bytes
65536(64KB)
Socket接收消息缓冲区(SO_RECBUF)的大小,-1则使用操作系统默认值
send.buffer.bytes
131072(128KB)
Socket发送消息缓冲区(SO_SNDBUF)的大小,-1则使用操作系统默认值
request.timeout.ms
30000
消费者等待请求响应的最长时间
metadata.max.age.ms
300000(5分钟)
如果在该时间内元数据没有更新的话会被强制更新
reconnect.backoff.ms
50
消费者尝试重新连接指定主机之前的等待(退避)时间,避免频繁连接主机
retry.backoff.ms
100
消费者尝试重新发送失败的请求到指定的主题分区之前的等待(退避)时间
isolation.level
“read_uncommitted”
配置read_committed会忽略事务未提交的消息,只能消费到LSO(LastStableOffset)位置 配置read_uncommitted可以消费到HW(High Watermark)位置
bootstrap.servers
“”
连接Kafka集群所需的broker地址清单
key.deserializer
“”
消息中key对应的反序列化类,需实现Deserializer接口
value.deserializer
“”
消息中value对应的反序列化类,需实现Deserializer接口
client.id
“”
设定KafkaProducer对应的客户端id
group.id
“”
消费者所隶属的消费组唯一标识
heartbeat.interval.ms
3000
当使用Kafka的分组管理功能时,心跳到消费者协调器之间的预计时间
session.timeout.ms
10000
组管理协议中用来检测消费者是否失效的超时时间
max.poll.interval.ms
300000
指定拉取消息线程最长空闲时间,若超过这个时间还没有发起poll操作,则消费组认为该消费者已离开了消费组,将进行再均衡操作
auto.offset.reset
latest
消费者找不到记录的消费位移时从何处开始消费,earliest(开头),latest(结尾),none(报错)
enable.auto.commit
true
是否开启自动提交消费位移的功能
auto.commit.interval.ms
5000
开启自动提交消费位移时自动提交消费位移的时间间隔
partition.assignment.strategy
RangeAssignor
消费者的分区分配策略
interceptor.class
“”
配置客户端拦截器,需实现ConsumerInterceptor接口
主题管理 如果broker端配置参数auto.create.topics.enable设置为true(默认为true),那么当生产者向一个尚未创建的主题发送消息时,会自动创建一个分区数为num.partitions(默认值为1)、副本因子为default.replication.factor(默认值为1)的主题。
除此之外,当一个消费者从未知主题读取消息,或者当任意一个客户端向未知主题发送元数据请求时,都会创建一个相应的主题。
很多时候,这种自动创建主题的行为都是非预期的,不建议将auto.create.topics.enable设置为true,会增加主题的管理与维护的难度。
创建主题 创建一个分区数为4,副本因子为2,主题名为topic-create的主题
1 bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topic-create --replication-factor 2 --partitions 4
手动指定分区副本的分配方案,replica-assignment参数指定了创建4个分区(以逗号分隔),每个分区有2个副本以及副本分配情况(以冒号分隔broker_id,排在前面的broker_id为leader)
1 bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topic-create-same --replica-assignment 2:0,0:1,1:2,2:1
覆盖默认配置,修改了主题端的2个配置参数
1 bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topic-config --replication-factor 1 --partitions 1 --config cleanup.policy=compact --config max.message.bytes=10000
主题的命名不推荐(虽然可以)使用__开头,因为以__开头的主题一般为kafka内部主题。主题的名称必须由大小写字母、数字、点号.、连接线-、下划线_组成,不能为空,不能只有.,也不能只有..,且长度不能超过249。
查看主题 1 bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic-create,topic-config
describe可以额外指定参数来增加一些附加功能:
topics-with-overrides:会列出包含了覆盖配置的主题;
1 bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topics-with-overrides
under-replicated-partitions:查找主题中所有包含失效副本的分区,即ISR小于AR的分区;
1 bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic-create --under-replicated-partitions
unavailable-partitions:查找主题中没有leader副本的分区,即这些分区已处于离线状态,对外界来说不可用;
1 bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic-create --unavailable-partitions
修改主题 修改分区个数,将分区个数调整为3。注意只能增加分区数,不能减少分区数
1 bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic topic-config --partitions 3
修改主题的配置
1 bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic topic-config --config segment.bytes=1045877
删除主题的自定义配置,使其恢复为默认值
1 bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic topic-config --delete-config segment.bytes --delete-config cleanup.policy
删除主题 1 bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic topic-delete
参数说明 主题管理脚本kafka-topics.sh的更多参数说明:
参数名
说明
alter
用于修改主题,包括分区数及主题的配置
config <键值对>
创建或修改主题时,用于设置主题级别的参数
create
创建主题
delete
删除主题
delete-config <配置名称>
删除主题级别被覆盖的配置
describe
查看主题的详细信息
disable-rack-aware
创建主题时不考虑机架信息
help
打印帮助信息
if-exists
修改或删除主题时使用,只有当主题存在时才会执行动作
if-not-exists
创建主题时使用,只有主题不存在时才会执行动作
list
列出所有可用的主题
partitions <分区数>
创建主题或增加分区时指定分区数
replica-assignment <分配方案>
手工指定分区副本分配方案
replication-factor <副本数>
创建主题时指定副本因子
topic <主题名称>
指定主题名称
topics-with-overrides
使用describe查看主题信息时,只展示包含覆盖配置的主题
unavailable-partitions
使用describe查看主题信息时,只展示包含没有leader副本的分区
under-replicated-partitions
使用describe查看主题信息时,只展示包含失效副本的分区
bootstrap-server
指定连接的broker地址信息
配置管理 kafka-configs.sh脚本专门用来对配置进行操作的。它不仅可支持操作主题相关的配置,还可以支持操作broker、用户和客户端这些类型的配置。
查看配置参数
1 bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type topics --entity-name topic-config
添加/修改配置参数
1 bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name topic-config --add-config cleanup.policy=compact,max.message.bytes=10000
删除配置
1 bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name topic-config --delete-config cleanup.policy,max.message.bytes
配置对象映射
配置类型
entity-type取值
entity-name取值
主题
topics
主题名称
broker
brokers
broker中broker.id参数配置的值
客户端
clients
生产者或消费者client.id参数配置的值
用户
users
用户名
主题配置说明 创建主题时,若没有指定配置参数,则会使用broker端对应参数作为其默认值。部分主题配置参数对照关系如下:
主题端参数
对应的broker端参数
说明
cleanup.policy
log.cleanup.policy
日志压缩策略。默认值为delete,还可以配置为compact
delete.retention.ms
log.cleaner.delete.retention.ms
被标识为删除的数据能够保留多久,默认值为86400000(1天)
max.message.bytes
message.max.bytes
消息的最大字节数,默认值为1000012
message.timestamp.type
log.message.timestamp.type
消息的时间戳类型,默认值为CreateTime,还可以配置为LogAppendTime
使用客户端操作主题 除了利用脚本来管理主题,也可以在JAVA代码中使用AdminClient对象来管理主题。
创建一个分区数为4,副本因子为1的主题:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 Properties properties = new Properties(); properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000 ); AdminClient client = AdminClient.create(properties); Map<Integer, List<Integer>> replicasAssignments = new HashMap<>(); replicasAssignments.put(0 , Arrays.asList(0 )); replicasAssignments.put(1 , Arrays.asList(0 )); replicasAssignments.put(2 , Arrays.asList(0 )); replicasAssignments.put(3 , Arrays.asList(0 )); NewTopic newTopic = new NewTopic(topic, replicasAssign CreateTopicsResult result = client.createTopics(Collections.singleton(newTopic));try { result.all().get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } client.close();
覆盖主题默认配置:
1 2 3 4 NewTopic newTopic = new NewTopic(topic, replicasAssignments); Map<String, String> configs = new HashMap<>(); configs.put("cleanup.policy" , "compact" ); newTopic.configs(configs);
查询主题的所有配置信息:
1 2 3 4 5 6 7 8 9 10 ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic); DescribeConfigsResult result = client.describeConfigs(Collections.singleton(resource));try { Config config = result.all().get().get(resource); System.out.println(config); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }
修改主题配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic); ConfigEntry entry = new ConfigEntry("cleanup.policy" , "compact" ); Config config = new Config(Collections.singleton(entry)); Map<ConfigResource, Config> configs = new HashMap<>(); configs.put(resource, config); AlterConfigsResult result = client.alterConfigs(configs);try { result.all().get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }
增加分区数:
1 2 3 4 5 6 7 8 9 10 11 NewPartitions newPartitions = NewPartitions.increaseTo(5 ); Map<String, NewPartitions> newPartitionsMap = new HashMap<>(); newPartitionsMap.put(topic, newPartitions); CreatePartitionsResult result = client.createPartitions(newPartitionsMap);try { result.all().get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }
主题的合法性 我们一般禁止客户端直接创建主题,不利于维护,但是AdminClient却可以直接创建。kafka broker端有一个参数create.topic.policy.class.name默认为null,它提供了一个入口用来验证主题创建的合法性。可以自定义一个实现CreateTopicPolicy接口的类,然后在broker配置文件config/server.properties中设置这个参数为我们自定义的类。这个类运行在服务端,打个jar包扔到classpath里面。
限制创建主题的分区数和副本因子,如不满足则创建失败:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class PolicyDemo implements CreateTopicPolicy { @Override public void validate (RequestMetadata requestMetadata) throws PolicyViolationException { if (requestMetadata.numPartitions() != null || requestMetadata.replicationFactor() != null ) { if (requestMetadata.numPartitions() < 5 ) { throw new PolicyViolationException("Topic should have at least 5 partitions, received: " + requestMetadata.numPartitions()); } if (requestMetadata.replicationFactor() <= 1 ) { throw new PolicyViolationException("Topic should have at least 2 replication factor, received: " + requestMetadata.replicationFactor()); } } } @Override public void close () throws Exception { } @Override public void configure (Map<String, ?> configs) { } }
分区管理 优先副本的选举 Kafka需要确保所有主题的优先副本在集群中均匀分布,这要就保证了所有分区的leader均衡分布。如果leader分布过于集中,就会造成集群负载不均衡。如果某个broker节点宕机,就会有其他节点的follow副本成为leader,导致分区副本分布不均衡。即使该节点恢复,它原先的leader副本也会成为follow而不再对外提供服务。
所谓优先副本选举是指通过一定方式促使优先副本选举为leader副本,以此来促进集群的负载均衡,这一行为也可以称为“分区平衡”。
Kafka提供了分区自动平衡的功能,与此对应的broker端参数是auto.leader.rebalance.enable,默认值为true,即默认开启。开启此功能,Kafka控制器会启动一个定时任务,轮询时间由参数leader.imbalance.check.interval.seconds控制,默认值300秒。如果不平衡率超过leader.imbalance.per.broker.percetage设置的比值,默认值10%,则触发优先副本选举动作以求分区平衡。
但生产环境一般会关闭该参数,因为可能在不确定的时间发生分区平衡,导致业务阻塞。
手动执行分区平衡:
1 bin/kafka-leader-election.sh --bootstrap-server localhost:9092 --election-type preferred --path-to-json-file election.json
其中election.json指定了要平衡的主题分区:
1 2 3 4 5 6 7 8 9 10 11 { "partitions" : [{ "topic" : "foo" , "partition" : 1 }, { "topic" : "foobar" , "partition" : 2 } ] }
分区重分配 有时候,当我们需要下线或新增broker节点,为了保证分区及副本的合理分配,则需要将副本迁移到其他节点上。这就用到了分区重分配的功能,Kafka提供了kafka-reassign-partitions.sh脚本来实现该功能。
假如我们在一个由3个broker节点组成的集群中存在一个分区数为4,副本因子为2的主题topic-reassign,现在要移除节点broker1,让主题分区重分配。
步骤一 ,生成重分配方案:
1 bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --generate --topics-to-move-json-file reassign.json --broker-list 0,2
reassign.json文件指定了要分区重分配的主题名
1 2 3 4 5 6 { "topics" : [{ "topic" : "topic-reassign" }], "version" : 1 }
该脚本会输出2段json内容,分别表示当前分区分配方案和目标分区分配方案。前者可保存起来用于备份恢复,后者用于步骤二执行,将该json保存为project.json文件。
步骤二,执行分区重分配:
1 bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --execute --reassignment-json-file project.json
再次查看主题,可发现分区已分布在指定节点上了,如果此时leader分区失衡,还可以执行优先副本选举操作进行调整。
将execute替换为vertify,即可查看分区重分配的进度。
1 bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --vertify --reassignment-json-file project.json
分区重分配本质在于数据复制,先增加新的副本,然后进行数据同步,最后删除旧的副本来达到最终的目的。
复制限流 副本间的复制会占用额外资源,可以对复制流量加以限制来保证重分配期间不会对集群服务造成太大影响。
broker级别有2个配置参数:
leader.replication.throttled.rate:设置leader副本传输的速度
follower.replication.throttled.rate:设置follower副本复制的速度
二者一般设置为同样的值,单位都是B/s
1 bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 1 --alter --add-config follower.replication.throttled.rate=1024,leader.replication.throttled.rate=1024
在主题级别可以设置具体的被限速的副本列表,对于一个分区数为3,副本因子为2的主题topic-throttle,它的分区对应的broker_id分别为0:[0,1],1:[1,2],2:[2,0]
1 bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name topic-throttle --alter --add-config leader.replication.throttled.replicas=[0 :0 ,1 :1 ,2 :2 ],follower.replication.throttled.replicas=[0 :1 ,1 :2 ,2 :0 ]
参数的值是一个分区副本列表[分区0:broker_id,分区1:broker_id..]。
可以根据需求事先配置好这4个参数,再进行分区重分配,在数据复制时进行限流。
修改副本因子 利用分区重分配的kafka-reassign-partitions.sh文件,也可以对副本因子进行调整,可以添加也可以减少。手动修改project.json文件,在replicas和log_dirs属性中,分别加入1和any,表示在分区0中新加入一个副本存储在broker1中,它的日志文件不做要求,所以填any。
1 2 3 4 5 6 7 8 9 { "partitions" : [{ "topic" : "topic-throttle" , "partition" : 0 , "replicas" : [2 , 1 , 0 ], "log_dirs" : ["any" , "any" , "any" ] }], "version" : 1 }