kafka图解

一、QuickStart

kakfa图解

二、Kafka系统化梳理

Kafka 最早是由美国领英公司(下称 Linkedln )的工程师们研发的

linkedIn开源

分布式数据同步系统Databus

高性能计算引擎Cubert

Java异步处理框架ParSeq

Kafka流数据框架

2011年开源

2.1消息引擎系统

 

1.消息设计

Kafka 的消息是用 二进制 方式。

传统Web使用xml或者json来传输。

2.传输协议设计

目前使用 AMQP Web Service + SOAP 以及微软的 MSMQ为主流协议。

3.消息引擎范型

最常见的两种消息引擎范型是消息队列模型和发布/订阅模型。

消息队列 可以类比火车站买票。

pub/subscribe 可以类比报纸,新闻。

4. JMS

java提供的通用的消息接口API规范。

2.2 Kafka设计在解决什么问题

吞吐量/延时

消息持久化

负载均衡和故障转移

伸缩性

 

**吞吐量( throughput )在知识引擎中都是至关重要的性能指标

 

性能指标

性能指标 kafka rabbitmq
吞吐量    
延时    
     

吞吐量和延时是相互矛盾的组合体,即调优其中一 个指标通常会使另一指标变差 当然它们之间的关系也不是等比例地此消彼长的关系,比如牺牲 20% 的延时就能换20%的吞吐量的提升

Kafka 在设计时采用了追加写入消息的方式,即只能在日志文件末尾追加 写入新的消息,且不允许修改己写入的消息,因此它属于典型的磁盘顺序访问型操作,所以 Kafka 消息发送的吞吐量是很高的。 目的追求吞吐量。

1.kafka性能设计

零拷贝技术+大量使用操作系统页缓存,内存操作速度快且命中率高。

Kafka 不直接参与物理 1/0 操作,而是交由最擅长此事的操作系统来完成。

采用追加写入方式,摒弃了缓慢的磁盘随机读/写操作。

2.Kafka的持久化

消息重演---固定会议的创建。

Kafka 实现持久化的设计也有新颖之处。普通的系统在实现持久化时可能会先尽量 使用内存,当内存资源耗尽时,再 次性地把数据“刷盘”;而 Kafka 则反其道而行之, 有数据都会立即被写入文件系统的持久化日志中,之后 Kafka 服务器才会返回结果给客户端通 知它们消息已被成功写入。这样做既实时保存了数据,又减少了 Kafka 程序对于内存的消耗, 从而将节省出的内存留给页缓存使用,更进一步地提升了整体性能

3.负载均衡+心跳实现故障转移

4.伸缩性,依赖zookpeer

小结:

生产者发送-kafka-消费者消费

Kafka 服务器依托 ZooKeeper 集群进行服务的协调管理

2.3 kafka升级之路

兼并了部分下游处理工作-->流处理框架

kafka stream

元数据格式

• Key:消息键,对消息做 partition 时使用,即决定消息被保存在某 topic 下的哪个 partition • Value :消息体,保存实际的消息数 • Timestamp :消息发送时间戳,用于流式处理及其他依赖时间的处理语义。如果不指定 则取当前时间

消息

大量使用页缓存而非堆内存还有一个好处一一当出现 Kafka broker 进程崩溃时,堆 内存上的数据也一并消失,但页缓存的数据依然存在。下次 Kafka broker 重启后可以继续提供 服务,不需要再单独“热”缓存了。

2.4 术语

2.41 broker

Kafka 服务器有 个官方名字: broker

2.42 topic和partition

 

Kafka partition 实际上并没有太多的业务含义,它的引入就是单纯地为 了提升系统的吞吐量,因此在创建 Kafka topic 的时候可以根据集群实际配置设置具体的 partition 数, 实现整体性能的最大。

2.43 offset

topic partition 下的每条消息都被分配一个位移值。实际上 Kafka 消费者端也 有位移( offset )的概念,但 定要注意这两个 offset 属于不同的概念

2.44 ​唯一组合查询标识--kakfa三元组

kafka中的一条消息就是一个<topic,partition,offset> 三元组(tuple),能够作为唯一的索引查询消息。

2.45 replica

己知 partition 是有序消息日志,那么一定不能只保存这一份日志,否则一旦保partition Kafka 服务器挂掉了,其上保存的消息也就都丢失了。分布式系统必然要实现高可靠性,而目前实现的主要途径还是依靠冗余机制一一简单地说,就是备份多份日志。

副本分为两类 领导者副本( leader replica )和追随者副本( follower replica follower)

leader机制

Kafka 保证同 partition 的多个 replica 一定不会分配在同一台 broker 。毕竟如果同一broker 上有同一个 partition 的多个 replica ,那么将无法实现备份冗余的效果。

isr,am,leo,lag

2.46 ISR

ISR 全称是 in-sync replica

“己提交”状态的消息就不会丢失 ---维护kafka 副本的repla 确认所有副本都同步了已提交状态。

3. kafka的应用场景

主要是 实时流数据、实时流数据管道

为什么呢?官网说明:

 

场景  
消息传输  
网站行为日志追踪  
审计数据收集  
日志收集 日志收集汇总解决方案。每个企业都会产生大量的服务日志,这些日志分散在不同的机器上 我们可以使用 fka 对它们进行全 收集,井集中送往下游的分布式存储中(比如 等)
Event Sourcing Event Sourcing 实际上是领域驱动设计(OOD)思想。
流式处理  

三、Kafak 版本

 

Kafka 0.9 0.0 版本不仅废弃了旧版本 producer ,还提供了新版本的 consumer 同样地,新 版本 consumer 也是使用 Java 编写的,也不再需要依赖 ZooKeeper 的帮助 新版本 consumer 的入口 org.apache.kafka.clients.consumer.KafkaConsumer 由

旧版本 consumer 中,消费位移( offset )的保存与 理都是依托于 ZooKeeper 来完成的 数据 很大且消费很频繁时, ZooKeeper 的读 写性能往往容易成为系统瓶颈

版本选择:

功能场景:

流式处理 0.10.0+

安全 0.9+

消息引擎 无限制

客户端场景:

自行研发客户端和第 方框架提供客户端

第三方框架:

Spark-kafka

以 Storm 为例: Storm 目前提供了两套 Kafka 实际数据传输的插件 stormkafka storm-kafka-client ,

四、Kafka 线上部署

4.1 集群环境

操作系统

谈到 I/O 模型,就不能不说当前主流且耳熟能详的 5种模型:阻塞I/O 、非阻塞I/O 、I/O多路复用、信号驱动 I/O和异步I/O。每一种 模型都有典型的使用场 ,比如 Socket 的阻塞模式和非阻塞模式就对应于前两种模型。

这些和 Kafka 又有什么关系呢?关键就在于 clients 底层网络库的设计。Kafka 新版本 clients 在设计底层网络库时采用了 Java Selector 机制,而后者在 Linux 上的现机制就是巳poll ;但是在 Windows 平台上, Java NIO Selector 层是使用 select 模型而非IOCP 实现的,只有 Java NI02 才是使用 IOCP 实现的 。因此在这一点上,在 Linux 上部Kafka 要比在 Windows 上部署能够得到更高效的 1/0 处理性能。

第二个方面,即数据网络传输效率而言, Linux 也更有优势。具体来说, Kafka 这种 应用必然需要大量地通过网络与磁盘进行数据传输 ,而大部分这样的操作都是通过 Java F ileChannel. transferTo 方法实现的。在 Linux 平台上该方法底层会调用 sendfile 系统调用,即 用了 Linux 提供的零拷贝( Zero Copy )技术。

所以linux 牛B。

磁盘规划

择普通的机械硬盘( HDD )还是固态硬盘 (CSS )

HDD基本就可以了

主要是

JBOD (一大堆普通磁盘)与磁盘阵列(下称 RAID )之争。

磁盘容量

如果每天1 亿条消息,那么每天 生的消息会占lKB I 1000 I 1000 = 200GB 的磁盘空间 我们最好再额外预留 10% 的磁盘空间用于其他数据文件(比如索引文件等)的存储,因此在这种使用场景下每天新发送的消息将占用210GB 左右的磁盘空间因为还要保存 周的数据,所以整体的磁盘容 规划是 210*7=1.5TB,clients 启用了消息压缩,我们可以预估 1个平均的压缩比(比如 0. 5,那么整体的磁盘容量就是 0.75TB )

相关因素: • 新增消息数。 • 消息留存时间。 • 平均消息大小 • 副本数。 • 是否启用压缩

内存

总之对于内存规划的建议如下。 • 尽量分配更多的内存给操作系统的 page cache • 不要为 broker 设置过大的堆内存,最好不超过 6GB • page cache 大小至少要大于一个日志段的大小。

 

CPU

对 CPU 资源规划的建议如下 • 使用多核系统, CPU 核数最好大于 • 如果使用 Kafka 0.10 0.0 之前的版本或 clients 端与 broker 端消息版本不一致(若无显式 配置,这种情况多半由 clients broker 版本不一致造成),则考虑多配置 些资源以 防止消息解压缩操作消耗过多 CPU

带宽

带宽主要也有两种: lGb/s lOGb/s ,即平时所说的千兆位网络和万兆位网络。无论是哪种带宽,对于大多数的 Kafka 集群来说都足矣了。

样例:

• CPU 24 核。 •.内存 32GB • 磁盘 lTB 7200 SAS 盘两块 • 带宽 lGb/s • ulimit -n 1000000 • Socket Buffer 至少 64KB一一适用于跨机房网络传输。

4.2 性能测试

生产者测试

katka-producer-perf-test 脚本是 Kafka 提供的用于测试 producer 性能的脚本

消费者测试

katka-consumer-perf-test 脚本

4.3 参数设置:

Kafka 集群涉及的

• broker 参数。 • topic 级别参数 • GC 配置参数。 • Jvm参数。 • OS 参数。

参数  
broker 1.broker.id标识broker.2.log.dirs持久化消息目录3.zoo keeper.connect4.listeners一-broker 监听器的 csv 列表,5.advertised.listeners一一平日 listeners 类似,该参数也是用于发布给 clients 的监昕器,不过该参数主要用于 IaaS 环境。6.unclean.leader.election.enable-7.delete.topic.enable一一是否允许 Kafka 删除 topic 。8.log.retention.消息留存时间 9.log.retention.bytes--空间留存.10.min.insync.replicas-一该参数其实是与 producer 端的 acks 参数配合使用的11.num.network threads一一一个非常重要的参数 它控制了 broker 在后台用于处理网络请求的线程数,默认是3 。12.num.io.threads-一这个参数就是控制 broker 端实际处理网络请求的线程数.13.message.max. es-Kafka broker 能够接收的最大消息大小,默认是 977KB,还不到lMB ,可见是非常小的。在实际使用场景中,突破 lMB 大小的消息十分常见,最好重配置。
topic 1.delete.retention.ms-一每个 topic 可以设置自己的日志留存时间以覆盖全局默认值。2.max.message.bytes一一覆盖全局的 message.max.bytes ,即为每个 topic 指定不同的最大消息尺寸。3.retention. bytes 一一覆盖全局的 log.retention. bytes ,每个 topic 设置不同的日志留存尺寸关于如何设置 topic 级别的参数。
GC 1. 如果用户机器上的 CPU 资源非常充裕,那么推荐使用 CMS 收集器 这样可以充分利用多 CPU 执行并发垃圾收集 启用方法为-:XX:+UseCurrentMarkSweepGC 2.相反地,则使用吞吐量收集器,即所谓的 throughput collector 这样不会挤占紧张的CPU 资源,使 Kafka broker 达到最大的吞吐量 启用方法为-XX:+UseParallelGC
JVM -Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseGlGC -XX:MaxGCPauseMillis=20 -XX: Ini tiatingHeapOccupancyPercent=35 - XX : G1HeapRegionSize=l 6M -XX: MinMetaspaceFreeRatio=SO -XX: MaxMetaspaceFreeRatio=80
OS 1.文件描述符限制:设置方法为 ulimit -n 100000 2.更长的 flush 时间 :我们知道 Kafka 依赖 OS 页缓存的“刷盘”功能实现消息真正写入物理磁盘,默认的刷盘问隔是 秒。通常情况下,这个间隔太短了,适当增加该加时间。

 


版权声明:本文为qq_24935049原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
THE END
< <上一篇
下一篇>>