kafka 各模块启动全流程 KafkaServer

【代码入口】

kafka 新增了 raft 协议之后将 BrokerServer、ControlServer 使用了单独的文件来启动最终调用网络层和请求处理层,如果还是使用 zk 的方式启动则是 KafkaServer 启动网络层和请求处理层。
主要区别:

private def buildServer(props: Properties): Server = {
  val config = KafkaConfig.fromProps(props, false)
  // 直接启动定时任务、网络层、请求处理层
  if (config.requiresZookeeper) {
    new KafkaServer(
      config,
      Time.SYSTEM,
      threadNamePrefix = None,
      enableForwarding = false
    )
  }
  // 调用 BrokerServer 等来启动网络层和请求处理层
  else {
    new KafkaRaftServer(
      config,
      Time.SYSTEM,
      threadNamePrefix = None
    )
  }
}

虽然中间过程不一样,但是最终还是调用的相同的请求处理器来进行处理。

【启动流程】

下面是使用 zk 的启动流程:

系统启动入口:

Kafka.getPropsFromArgs() 读取启动参数 (kafka.Kafka$)
Kafka.buildServer() 服务端实例启动 (kafka.Kafka$)
KafkaServer.startup() 服务端启动 (kafka.server.KafkaServer)

创建一些定时任务,KafkaScheduler.schedule() 中传入一个函数周期执行,大致分类:

KafkaScheduler.schedule() 执行定时任务 name=transactionalId-expiration (kafka.utils.KafkaScheduler)
KafkaScheduler.schedule() 执行定时任务 name=isr-expiration (kafka.utils.KafkaScheduler)
KafkaScheduler.schedule() 执行定时任务 name=__consumer_offsets-3 (kafka.utils.KafkaScheduler)
KafkaScheduler.schedule() 执行定时任务 name=delete-expired-group-metadata (kafka.utils.KafkaScheduler)
KafkaScheduler.schedule() 执行定时任务 name=highwatermark-checkpoint (kafka.utils.KafkaScheduler)
KafkaScheduler.schedule() 执行定时任务 name=PeriodicProducerExpirationCheck (kafka.utils.KafkaScheduler)
KafkaScheduler.schedule() 执行定时任务 name=kafka-log-start-offset-checkpoint (kafka.utils.KafkaScheduler)
KafkaScheduler.schedule() 执行定时任务 name=kafka-log-retention (kafka.utils.KafkaScheduler)
KafkaScheduler.schedule() 执行定时任务 name=auto-leader-rebalance-task (kafka.utils.KafkaScheduler)
KafkaScheduler.schedule() 执行定时任务 name=kafka-delete-logs (kafka.utils.KafkaScheduler)
KafkaScheduler.schedule() 执行定时任务 name=shutdown-idle-replica-alter-log-dirs-thread (kafka.utils.KafkaScheduler)
KafkaScheduler.schedule() 执行定时任务 name=kafka-log-flusher (kafka.utils.KafkaScheduler)
KafkaScheduler.schedule() 执行定时任务 name=kafka-recovery-point-checkpoint (kafka.utils.KafkaScheduler)
KafkaScheduler.schedule() 执行定时任务 name=transaction-abort (kafka.utils.KafkaScheduler)

初始化并启动网络模块,根据图解可以看到默认每个端点会创建一个 Acceptor、三个 Processor、RequestChannel 。其中 Acceptor 负责处理链接请求、Processor 负责处理业务请求、RequestChannel 缓存两个队列保存请求和响应:

RequestChannel.new() 初始化请求通道 (kafka.network.RequestChannel)
SockerServer.startup() 网络启动 (kafka.network.SocketServer)
SocketServer.createDataPlaneAcceptorsAndProcessors() 创建 Acceptor/Processor (kafka.network.SocketServer)
SocketServer.createAcceptor() 为每个EndPoint创建客户端连接处理器Acceptor,endPoint=EndPoint(null,9092,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.network.SocketServer)
SocketServer.openServerSocket() (kafka.network.Acceptor)
SocketServer.addDataPlaneProcessors() newProcessorsPerListener=3 (kafka.network.SocketServer)
SocketServer.newProcessor() id=0,requestChannel=kafka.network.RequestChannel@4d0402b (kafka.network.SocketServer)
RequestChannel.addProcessor() 缓存当前 processor (kafka.network.RequestChannel)
SocketServer.newProcessor() id=1,requestChannel=kafka.network.RequestChannel@4d0402b (kafka.network.SocketServer)
RequestChannel.addProcessor() 缓存当前 processor (kafka.network.RequestChannel)
SocketServer.newProcessor() id=2,requestChannel=kafka.network.RequestChannel@4d0402b (kafka.network.SocketServer)
RequestChannel.addProcessor() 缓存当前 processor (kafka.network.RequestChannel)

同时创建 KafkaRequestHandler 请求业务处理器(8个),创建业务处理线程池 KafkaRequestHandlerPool(8个线程),这个对象从前面的请求队列中取出后交给 Api 对象处理。

KafkaRequestHandlerPool.new() 创建请求处理池 brokerId=1,numThreads=8 (kafka.server.KafkaRequestHandlerPool)
KafkaRequestHandler.createHandler() id=0 (kafka.server.KafkaRequestHandlerPool)
KafkaRequestHandler.createHandler() id=1 (kafka.server.KafkaRequestHandlerPool)
KafkaRequestHandler.createHandler() id=2 (kafka.server.KafkaRequestHandlerPool)
KafkaRequestHandler.createHandler() id=3 (kafka.server.KafkaRequestHandlerPool)
KafkaRequestHandler.createHandler() id=4 (kafka.server.KafkaRequestHandlerPool)
KafkaRequestHandler.createHandler() id=5 (kafka.server.KafkaRequestHandlerPool)
KafkaRequestHandler.createHandler() id=6 (kafka.server.KafkaRequestHandlerPool)
KafkaRequestHandler.createHandler() id=7 (kafka.server.KafkaRequestHandlerPool)

这八个业务线程处理器各自调用 run() 从请求响应 RequestChannel 队列中进行轮训,将轮训到的请求交给 KafkaApis 进行处理:

KafkaRequestHandler.run() 调用[requestChannel]从[requestQueue]中轮询客户端请求 (kafka.server.KafkaRequestHandler)

最终处理请求的对象是 KafkaApis.handle():

KafkaApis.handle() 处理客户端请求 (kafka.server.KafkaApis)

前面业务请求处理器已经开始运行并不断从队列缓存中轮训请求了,就等客户端链接并发送请求了。所以下一步是一个 Acceptor、三个 Processor的启动。
其中 Acceptor 是 whille 轮训监听 OP_ACCEPT 事件,新建立的链接加入 newConnections 缓存中,Processor 是遍历这些链接并为每个链接轮训可读事件。

SocketServer.startDataPlaneProcessorsAndAcceptors() (kafka.network.SocketServer)
SocketServer.startAcceptorAndProcessors()  (kafka.network.SocketServer)
SocketServer.run() 开始运行Server主任务 (kafka.network.Processor)
SocketServer.run() 开始运行Server主任务 (kafka.network.Processor)
SocketServer.run() 开始运行Server主任务 (kafka.network.Processor)
SocketServer.run() (kafka.network.Acceptor)
SocketServer.acceptNewConnections() 死while循环监听客户端链接,注册 SelectionKey.OP_ACCEPT 事件 (kafka.network.Acceptor)

接下来是等待客户端链接,轮训请求并加入缓存队列,等待上面的 KafkaRequestHandler 和 KafkaApis 进行处理:

SocketServer.configureNewConnections() 从newConnections队列中获取到新的连接 (kafka.network.Processor)
Selector.register() 绑定SocketChannel,id=127.0.0.1:9092-127.0.0.1:40264-0 (org.apache.kafka.common.network.Selector)
SocketServer.processCompletedReceives() 调用selector轮询到的客户端请求 (kafka.network.Processor)
RequestChannel.Request() 将请求添加到requestQueue队列,等待KafkaApis (kafka.network.RequestChannel)

【数据请求和控制请求区别】

在启动启动过程中会启动两个 Plane,分别是 DataPlane、ControlPlane。作用不同,网络模块启动入口
 

SocketServer.startup()     =>    启动网络模块
    SocketServer.createControlPlaneAcceptorAndProcessor()    =>    启动控制类请求处理器模块    
            SocketServer.startControlPlaneProcessorAndAcceptor()    =>    启动 Acceptor、Processor
    SocketServer.createDataPlaneAcceptorsAndProcessors()    =>       启动数据请求类处理器模块
            SocketServer.startDataPlaneProcessorsAndAcceptors()    =>    启动 Acceptor、Processor

两个都一样,创建 Acceptor、Processor 并启动。

【一些关键步骤】

1. Acceptor、Processor 的启动入口,以及客户端链接、请求、响应处理的入口。

SocketServer.startAcceptorAndProcessors()  会遍历每个 Processor,分别各自执行 KafkaServer.run():

override def run(): Unit = {
  startupComplete()
  try {
    while (isRunning) {
      try {
        // 轮训新链接,保存到 newConnections 中并将新连接列表中的连接注册到 Selector 上
        configureNewConnections()
        // 将响应列表中的响应存入连接缓存,注册监听可写事件SelectionKey.OP_WRITE
        processNewResponses()
        // 便利 newConnections每个链接,并轮训每个连接上的可读事件,将网络数据暂存到接收缓冲区
        poll()
        // 将上一步接收缓冲区的网络数据解析为 Kafka 请求,并将其扔进请求队列,等待请求处理器轮询处理
        processCompletedReceives()
        // 处理响应
        processCompletedSends()
        processDisconnected()
        closeExcessConnections()
      } catch {
        case e: Throwable => processException("Processor got uncaught exception.", e)
      }
    }
  } finally {
    debug(s"Closing selector - processor $id")
    CoreUtils.swallow(closeAll(), this, Level.ERROR)
    shutdownComplete()
  }
}

2.请求的分发流程。其中请求处理的基类是 ApiRequestHandler,两个子类分别是 KafkaApis、ControllerApis。分别处理消息请求、元数据请求。

 KafkaApis.handler() 会根据不同的请求进行消息路由处理:

override def handle(request: RequestChannel.Request): Unit = {
  try {
    if (!apiVersionManager.isApiEnabled(request.header.apiKey)) {
      throw new IllegalStateException(s"API ${request.header.apiKey} is not enabled")
    }
    request.header.apiKey match {
      case ApiKeys.PRODUCE => handleProduceRequest(request)
      case ApiKeys.FETCH => handleFetchRequest(request)
      case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
      case ApiKeys.METADATA => handleTopicMetadataRequest(request)
      case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
      case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
      case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request)
      case ApiKeys.CONTROLLED_SHUTDOWN => handleControlledShutdownRequest(request)
      case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
      case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
      case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
      case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
      case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
      case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
      case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
      case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
      case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
      case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
      case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
      case ApiKeys.CREATE_TOPICS => maybeForwardToController(request, handleCreateTopicsRequest)
      case ApiKeys.DELETE_TOPICS => maybeForwardToController(request, handleDeleteTopicsRequest)
      case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
      case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request)
      case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request)
      case ApiKeys.ADD_PARTITIONS_TO_TXN => handleAddPartitionToTxnRequest(request)
      case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request)
      case ApiKeys.END_TXN => handleEndTxnRequest(request)
      case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request)
      case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request)
      case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request)
      case ApiKeys.CREATE_ACLS => maybeForwardToController(request, handleCreateAcls)
      case ApiKeys.DELETE_ACLS => maybeForwardToController(request, handleDeleteAcls)
      case ApiKeys.ALTER_CONFIGS => maybeForwardToController(request, handleAlterConfigsRequest)
      case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request)
      case ApiKeys.ALTER_REPLICA_LOG_DIRS => handleAlterReplicaLogDirsRequest(request)
      case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)
      case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
      case ApiKeys.CREATE_PARTITIONS => maybeForwardToController(request, handleCreatePartitionsRequest)
      case ApiKeys.CREATE_DELEGATION_TOKEN => maybeForwardToController(request, handleCreateTokenRequest)
      case ApiKeys.RENEW_DELEGATION_TOKEN => maybeForwardToController(request, handleRenewTokenRequest)
      case ApiKeys.EXPIRE_DELEGATION_TOKEN => maybeForwardToController(request, handleExpireTokenRequest)
      case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request)
      case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request)
      case ApiKeys.ELECT_LEADERS => handleElectReplicaLeader(request)
      case ApiKeys.INCREMENTAL_ALTER_CONFIGS => maybeForwardToController(request, handleIncrementalAlterConfigsRequest)
      case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => maybeForwardToController(request, handleAlterPartitionReassignmentsRequest)
      case ApiKeys.LIST_PARTITION_REASSIGNMENTS => handleListPartitionReassignmentsRequest(request)
      case ApiKeys.OFFSET_DELETE => handleOffsetDeleteRequest(request)
      case ApiKeys.DESCRIBE_CLIENT_QUOTAS => handleDescribeClientQuotasRequest(request)
      case ApiKeys.ALTER_CLIENT_QUOTAS => maybeForwardToController(request, handleAlterClientQuotasRequest)
      case ApiKeys.DESCRIBE_USER_SCRAM_CREDENTIALS => handleDescribeUserScramCredentialsRequest(request)
      case ApiKeys.ALTER_USER_SCRAM_CREDENTIALS => maybeForwardToController(request, handleAlterUserScramCredentialsRequest)
      case ApiKeys.ALTER_ISR => handleAlterIsrRequest(request)
      case ApiKeys.UPDATE_FEATURES => maybeForwardToController(request, handleUpdateFeatures)
      case ApiKeys.ENVELOPE => handleEnvelope(request)
      case ApiKeys.DESCRIBE_CLUSTER => handleDescribeCluster(request)
      case ApiKeys.DESCRIBE_PRODUCERS => handleDescribeProducersRequest(request)
      case ApiKeys.UNREGISTER_BROKER => maybeForwardToController(request, handleUnregisterBrokerRequest)
      case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}")
    }
  } catch {
    case e: FatalExitError => throw e
    case e: Throwable => requestHelper.handleError(request, e)
  } finally {
    // try to complete delayed action. In order to avoid conflicting locking, the actions to complete delayed requests
    // are kept in a queue. We add the logic to check the ReplicaManager queue at the end of KafkaApis.handle() and the
    // expiration thread for certain delayed operations (e.g. DelayedJoin)
    replicaManager.tryCompleteActions()
    // The local completion time may be set while processing the request. Only record it if it's unset.
    if (request.apiLocalCompleteTimeNanos < 0)
      request.apiLocalCompleteTimeNanos = time.nanoseconds
  }
}

3.请求响应处理流程。首先是将响应进行返回,然后缓存 response 对象在响应返回之后执行 onComplete() 一些回调逻辑。

首先看 SocketServer.processNewResponses() 循环从响应队列中 poll 看是否有需要响应的 Response 对象。代码如下:

private def processNewResponses(): Unit = {
  var currentResponse: RequestChannel.Response = null
  // responseQueue.poll() 循环调用从响应队列中获取需要处理的 Response 对象
  while ({currentResponse = dequeueResponse(); currentResponse != null}) {
    val channelId = currentResponse.request.context.connectionId
    try {
      currentResponse match {
        case response: NoOpResponse =>
          updateRequestMetrics(response)
          handleChannelMuteEvent(channelId, ChannelMuteEvent.RESPONSE_SENT)
          tryUnmuteChannel(channelId)
        // 如果需要返回给客户端,则调用 sendResponse() 进行响应
        case response: SendResponse =>
          sendResponse(response, response.responseSend)
        case response: CloseConnectionResponse =>
          updateRequestMetrics(response)
          trace("Closing socket connection actively according to the response code.")
          close(channelId)
        case _: StartThrottlingResponse =>
          handleChannelMuteEvent(channelId, ChannelMuteEvent.THROTTLE_STARTED)
        case _: EndThrottlingResponse =>
          handleChannelMuteEvent(channelId, ChannelMuteEvent.THROTTLE_ENDED)
          tryUnmuteChannel(channelId)
        case _ =>
          throw new IllegalArgumentException(s"Unknown response type: ${currentResponse.getClass}")
      }
    } catch {
      case e: Throwable =>
        processChannelException(channelId, s"Exception while processing response for $channelId", e)
    }
  }
}

接着调用 sendResponse() 进行响应,同时将已经 send 的 Response 对象缓存到 inflightResponses,后面会轮询这个 map 取出响应处理一些回调操作。
SocketServer.sendResponse() 代码如下:

protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send): Unit = {
  val connectionId = response.request.context.connectionId  if (channel(connectionId).isEmpty) {
    response.request.updateRequestMetrics(0L, response)
  }
  if (openOrClosingChannel(connectionId).isDefined) {
    // 先将响应发送给客户端
    selector.send(new NetworkSend(connectionId, responseSend))
    // 再将响应缓存到 inflightResponses 中,后续执行一些回调操作
    inflightResponses += (connectionId -> response)
  }
}

同时 Process 进程也就是 KafkaServer.run() 会处理这些响应的回调逻辑,调用的是 SocketServer.processCompletedSends() 。
SocketServer.processCompletedSends() 代码如下:

private def processCompletedSends(): Unit = {
  // 遍历所有的 List<NetworkSend> 这个用于处理客户端响应之后的回调逻辑
  selector.completedSends.forEach { send =>
    try {
      // 从缓存中取出 Response 对象,通过 destinationId 进行查找
      val response = inflightResponses.remove(send.destinationId).getOrElse {
        throw new IllegalStateException(s"Send for ${send.destinationId} completed, but not in `inflightResponses`")
      }
      updateRequestMetrics(response)
      // 然后调用 Response.onComplete() 执行响应之后的回调逻辑
      response.onComplete.foreach(onComplete => onComplete(send))
      handleChannelMuteEvent(send.destinationId, ChannelMuteEvent.RESPONSE_SENT)
      tryUnmuteChannel(send.destinationId)
    } catch {
      case e: Throwable => processChannelException(send.destinationId,
        s"Exception while processing completed send to ${send.destinationId}", e)
    }
  }
  selector.clearCompletedSends()
}

4.定时任务的作用,根据 KafkaScheduler 定时任务大致分类如下:

 

执行定时任务 name=transaction-abort (kafka.utils.KafkaScheduler)    定期放弃超时了(默认1min)的事务。
执行定时任务 name=transactionalId-expiration (kafka.utils.KafkaScheduler)    定期放弃超时了的事务(默认1min),移除事务id。
执行定时任务 name=isr-expiration (kafka.utils.KafkaScheduler)     用于将超过 replicaLagTimeMaxMs 的同步时间的 replica 从 ISR 中移除。
执行定时任务 name=__consumer_offsets-3 (kafka.utils.KafkaScheduler)
执行定时任务 name=delete-expired-group-metadata (kafka.utils.KafkaScheduler)    用于默认 60s 一次 清除 group 的 offsets。
执行定时任务 name=highwatermark-checkpoint (kafka.utils.KafkaScheduler)    定期持久化 partition 的Hight Watermark值。
执行定时任务 name=PeriodicProducerExpirationCheck (kafka.utils.KafkaScheduler)
执行定时任务 name=kafka-log-start-offset-checkpoint (kafka.utils.KafkaScheduler) 将所有日志的当前日志开始偏移量写到日志目录中的文本文件中,以避免暴露已被 DeleteRecordsRequest 删除的数据。
执行定时任务 name=kafka-log-retention (kafka.utils.KafkaScheduler)    周期性检测不符合保留条件的 log。
执行定时任务 name=kafka-delete-logs (kafka.utils.KafkaScheduler)    周期性将上面检测后标记为删除的log进行删除。
执行定时任务 name=auto-leader-rebalance-task (kafka.utils.KafkaScheduler)    负责维护分区的优先副本的均衡。
执行定时任务 name=shutdown-idle-replica-alter-log-dirs-thread (kafka.utils.KafkaScheduler)
执行定时任务 name=kafka-log-flusher (kafka.utils.KafkaScheduler)    定期flush日志,也就是异步刷盘。
执行定时任务 name=kafka-recovery-point-checkpoint (kafka.utils.KafkaScheduler)    用于定期更新每个 log 目录名下的 recovery-point-offset-checkpoint 文件,记录了最后一次检查点,之前的日志都已经落盘。

 


版权声明:本文为qq_34448345原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
THE END
< <上一篇
下一篇>>