# 说明
消息任务: 调用一次消息中心的消息发送接口为一个消息任务
消息记录: 一个消息任务可发送给多个人,对应到个人的记录为一个消息记录
topic:主题,主题下可有多个分区
partition:分区;每个分区都会划分到一个消费者,一个分区只可被同一消费组下其中一个消费者订阅,其他消费者再订阅将不会接收到消息。
Group:消费组,消费组下可有多个消费者;一个消费组对应一个业务逻辑
Consumer:消费者,可以订阅多个partition
# 消息中心
# kafka topic&partition
目前topic分为以下几种;
分区数 :5种topic partition 比例为1:1:1:1:1,每个topic partition 数量设置区间为 5 ~ 30
# 高可用部署
kafka集群节点数:3、5、7……单数
Partition副本数 : 3、5、7……单数;副本数不得超过Kafka集群节点数
# 生产者
生产者配置(kafka)
- acks 本设计默认设置为all, acks为all时,kafka集群某个节点挂了,新产生的partition leader也能保证数据不丢失,无重复消费;acks 0、all 的效率比是 10:1;关于acks参数的说明如下:
- acks=0,生产者不需要等待服务器的响应,以网络能支持的最大速度发送消息,吞吐量高,但是如果broker没有收到消息,生产者是不知道的
- acks=1,leader partition 副本收到消息,生产者就会收到一个来自服务器的成功响应
- acks=all,所有的partition 副本都收到消息,生产者才会收到一个服务器的成功响应
- retires & retry.backoff.ms
- retires 重发次数 1 ~ 2,主要因为网络波动或者新partition leader产生的空窗期导致的发送失败;其他异常另行处理
- retry.backoff.ms 重发间隔时间,设置 300 ,减少无效的重试
- max.request.size 请求的最大字节数,使用默认值 1048576;最大值取决于 Broker message.max.bytes 设置
- buffer.memory Producer用来缓冲等待被发送到服务器的记录的总字节数,值区间 33554432 ~ 67108864。 缓冲超过字节,Producer 就会阻塞,如果阻塞的时间超过 max.block.ms 配置的时长,则会抛出一个异常。
- max.block.ms 该配置控制 KafkaProducer.send()和KafkaProducer.partitionsFor() 允许被阻塞的时长。这些方法可能因为缓冲区满了或者元数据不可用而被阻塞。用户提供的序列化程序或分区程序的阻塞将不会被计算到这个超时。
- batch.size 多个请求整合为一个批处理,设置请求的字节数,值区间 16384 ~ 163840
- linger.ms 一个时间间隔内的多个请求,合并为一个批处理,值区间 1 ~ 10;设置10ms 意味着有些消息会增加10ms延迟,但请求减少且负载降低
消息推送到Kafka
一个消息请求过来,根据接收对象数量对应转换成多条记录,由生产者推送到消息队列中;除了成功推送到消息队列之外,还可能存在以下几种失败情况:
kafka临时性的错误
- 由于网络波动或者新的partition leader 产生等原因导致的推送失败;这类失败数据通过生产者配置retires去重发,如果依旧失败则记录日志,等待定时服务刷失败记录去重发;当超过策略设置的失败重发次数后,就不会再重发。
kafka非临时性错误
- 与生产者、Broker 配置冲突的错误,此类信息直接记录消息记录,不再重发;例如:消息内容过大
消息中心业务异常
- 记录消息记录;例如:张三没有手机号,不会发送短信给他,失败记录也设置不必再重发,手机号码信息补充完之后,可以手动触发重发。
# 消费者
消费者配置(kafka)
- bootstrap.servers 启动consumer时配置的broker地址的
- group.id 设置消费者所属消费组
- heartbeat.interval.ms 心跳间隔,确定 consumer 的存活、加入和退出 ;设置值区间 3000 ~ 6000
- fetch.max.bytes 一次拉取请求中从Kafka中拉取的最大数据量;设置区间值 52428800 ~ 104857600
- fetch.min.bytes 一次拉取请求中从Kafka中拉取的最小数据量;使用默认
- fetch.max.wait.ms consumer最多等待response时间;使用默认
- max.partition.fetch.bytes partition返回给 Consumer 的最大数据量 ;使用默认 1M
- max.poll.records Consumer每次调用poll()时取到的records的最大数;设置值区间 100 ~ 1000
- max.poll.interval.ms 两次poll()调用之间的最大延迟;设置值区间 300000 ~ 1800000
- auto.offset.reset earliest,自动重置到最早的offset
- enable.auto.commit 是否开启自动提交offset的功能;手动提交offset,调用commitSync()commit,最大程度上避免重复消费和消息遗漏
- auto.commit.interval.ms 自动提交offset的时间间隔;默认5000
- request.timeout.ms Consumer 等待请求响应的最长时间; 设置值区间 36000 ~ 305000
- session.timeout.ms 用于发现消费者故障的超时时间。消费者周期性的发送心跳到broker,表示其还活着。如果会话超时期满之前没有收到心跳,那么broker将从分组中移除消费者,并启动重新平衡。请注意,该值必须在broker配置的group.min.session.timeout.ms和group.max.session.timeout.ms允许的范围内。
线程池配置
- worknum 工作线程数,消费者对应线程池设定的线程数,设置值区间 10 ~ 50(一个实例总线程数建议不超过900)
offset提交
手动提交 offset 是poll()一次,提交一次offset。本设计采用手动提交的方式。
选择手动提交而非自动提交的原因:
自动提交 offset 则需要达到设定的时间间隔,才会提交一次offset,而在这个时间间隔内,Consumer 可能poll()了很多次。如果消息中心的节点挂掉了,那么在上一次自动提交到当前时间的范围内的所有消息记录都会重复消费;手动提交则是能控制重复消费的消息数量(这里我们采用的是每100条消息提交一次)。
关于已提交而未处理的消息的保证:
在上图可以看见,消息中心是先提交而后处理消息的,若是此时消息中心的节点节点挂了,就会出现遗漏处理消息的情况,在本设计中这些遗漏数据通过消息任务审计功能去发现并补发遗漏消息记录。
流程图
消费者
消费者启一个线程池,线程池中的工作线程处理业务逻辑,处理流程如下:
- Consumer poll 队列中records,条数根据 max.poll.records 设定
- records分发到线程池中工作线程(收到的消息数 / 线程池尺寸)
- 每个工作线程顺序处理得到的消息
- 记录消息处理结果
响应结果
成功 消息接收者响应发送成功
失败 消息接收者响应失败
无响应 消息接收者无响应
系统消息已读未读
发送给系统的消息,使用消息详情ID调用已读未读接口,修改消息已读未读状态; 根据消息任务ID查询该任务下所有消息记录的已读未读情况。
# 关于消息送达的保证
本设计中,对于消息送达的保证为至少一次。
消息的状态:
- 【成功】
- 【失败】
消息状态【失败】定义以下异常:
- 【消息中心业务异常】
- 【kafka临时错误】
- 【kafka非临时性错误】
- 【Consumer发送响应异常】
- 【未知】
消息发送过程的情况按顺序如下:
- Producer业务逻辑判断消息不应该发送,消息状态记录为【失败】,异常记录为【消息中心业务异常】
- Producer发送消息到kafka遇到的临时性错误,这种错误在Producer内部根据retries的配置重试。如果重试超过次数,则消息状态记录为【失败】,异常记录为【kafka临时错误】
- Producer发送消息到Kafka遇到的非临时性错误,消息状态记录为【失败】,异常记录为【kafka非临时性错误】
- Consumer 提交了offset但消息未处理,消息中心节点却挂了,那么消息无记录,需要消息任务审计功能补发
- Consumer的消息发送模块响应消息发送成功,消息状态记录为【成功】
- Consumer的消息发送模块响应失败,消息状态记录为【失败】,异常记录为【Consumer发送响应异常】
- 其他无法得到消息是否发送成功的情况,消息状态记录为【失败】,异常记录为【未知】
# 消息失败重发和遗漏补发
消息重发
举例说明:SMS消息发送失败,后续定时筛选出失败数据再由Producer重新推送到SMS topic,排到队列最后
消息重发策略
- 自动重发次数:默认5次,设置消息记录可重发次数;超过重发次数,不可再自动重发
- 自动重发初次时间间隔: 默认2分钟;自定义时间,分
- 自动重发时间间隔增长: 倍数增长,默认2倍(2、4、8、16、32);可自定义倍数
- 自动重发(根据失败异常)
- kafka临时错误
- Consumer发送响应异常
- 未知
消息任务审计
消息任务定时审计,通过消息任务记录,对比消息记录;筛选出遗漏消息记录,再由Producer补发遗漏消息。
消息任务审计策略
- 任务审计时间间隔: 默认30分钟;自定义时间,分,时,天
- 审计时间范围: 默认7天; 自定义时间,天,月,年
# 横向扩展
pritition数决定了横向扩展的实例数