用Reactor模式构建的Kafka Server网络层和API层---架构和设计
1.前言从Yarn RPC Server到Kafka Server,凡是有高并发需求的服务端,无一例外采用了基于了Reactor设计模式。在我的博客《Hadoop RPC Server基于Reactor模式和Java NIO 的架构和原理》中,分析了Yarn的基于Reactor设计模式和Java NIO实现的RPC Server的架构和设计,而Kafka的Server端网络层也同样使用了Reactor设计模式。Reactor模式有以下显著特定: 下文中,我们就从Kafka Server端网络层代码入手,分析和讲解Kafka基于Ractor模式的具体网络层实现以及网络层和具体的业务层的衔接逻辑。在我的另外一篇博客《基于Java NIO的Kafka底层网络层源码和架构》中,已经详细讲解了Kafka使用Java NIO实现的底层网络架构,本文讲解Reactor设计模式,将忽略具体的网络层,而将注意力放在NIO的上层,即网络事件的派发、请求和响应的处理上面。 同样,事先说明,Kafka自己对NIO的Selector进行了封装,放在了 为了便于在代码层理解,我花了整个KafkaServer从网络层消息到业务处理,以及相反,业务处理结果通过网络层返回的基于Reactor模式的处理流如下: 从网络层接收请求到交付给业务处理流程示意图: 从业务处理层返回结果,到将结果通过网络层返回给客户端的流程图: 2. SocketServer初始化和启动Kafka的网络层入口类是SocketServer。 SocketServer的核心任务,是完成9092端口的绑定,监听并处理客户端或者其它Kafka Broker的请求,将请求分派,然后将相应返回给发起请求的对应客户端或者其它的Kafka Broker。核心角色,是同基本Reactor模式一致的负责接收请求的Acceptor角色、负责具体请求管理的Processor角色、负责请求和响应队列的RequestChannel角色以及负责管理、限制整个网络负载的ConnectionQuotas角色。 private val endpoints = config.listeners
private val numProcessorThreads = config.numNetworkThreads
private val maxQueuedRequests = config.queuedMaxRequests
private val totalProcessorThreads = numProcessorThreads * endpoints.size
private val maxConnectionsPerIp = config.maxConnectionsPerIp
private val maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides
//创建RequestChannel,有totalProcessorThreads个responseQueue队列,
val requestChannel = new RequestChannel(totalProcessorThreads,maxQueuedRequests)
//所有的processers,长度为totalProcessorThreads
private val processors = new Array[Processor](totalProcessorThreads)
def startup() {
this.synchronized {
connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp,maxConnectionsPerIpOverrides)
val sendBufferSize = config.socketSendBufferBytes
val recvBufferSize = config.socketReceiveBufferBytes
val brokerId = config.brokerId
var processorBeginIndex = 0
endpoints.values.foreach { endpoint =>//遍历endPoint集合,对于每一个endpoint,创建一个acceptor和多个processor
val protocol = endpoint.protocolType
val processorEndIndex = processorBeginIndex + numProcessorThreads //按照序号,创建process
for (i <- processorBeginIndex until processorEndIndex)
processors(i) = newProcessor(i,connectionQuotas,protocol)
val acceptor = new Acceptor(endpoint,sendBufferSize,recvBufferSize,brokerId,processors.slice(processorBeginIndex,processorEndIndex),connectionQuotas)
acceptors.put(endpoint,acceptor)//保存endr的point和accepto对应关系
//起一个名字叫做"kafka-socket-acceptor-%s-%d".format(protocol.toString,endpoint.port)的acceptor线程,非deamon
Utils.newThread("kafka-socket-acceptor-%s-%d".format(protocol.toString,endpoint.port),acceptor,false).start()
acceptor.awaitStartup()//一直等到acceptor的run()方法的第一条语句开始执行,才证明已经启动
processorBeginIndex = processorEndIndex
}
}
SocketServer的启动方法中,会遍历本机的所有EndPoint(一个EndPoint一般对应一个网卡),为每一个EndPoint创建一个唯一独立的SocketServer.Acceptor()对象,负责处理这个EndPoint上的所有请求。我们从代码
同时,负责创建一个空的SocketServer.Acceptor数组,用来放置所有的Processor线程对象,但是并未实际创建Processor对象。Processor是由Acceptor直接管理,因此也是由SocketServer.Acceptor负责创建的,这符合角色分层的原则。角色分层在Yarn的代码中也体现得非常好,这样做可以让一个复杂的系统的每个功能模块都变得清晰可控。每层角色只会负责自己还有自己直接管理的直接下层角色的初始化和启动,绝对不会去触碰不是自己所直接管理的角色。 根据Acceptor的数量,将这些线程对象平均分配给Acceptor。然后,一直等待Acceptor启动完成,SocketServer.startup()会一直阻塞等待启动完成才会退出代表SocketServer完成启动。阻塞方式我们在SocketServer.Acceptor的讲解中会进行分析。总之,下层多个服务全部启动完成,上层服务才算启动完成,这也是职责分层的原则。 3.SocketServer.Acceptor每一个SocketServer.Acceptor负责唯一一个endpoint上的网络请求和响应的管理。正常情况下,我们会给的Kafka Server配置唯一的一个Endpoint。但是有些服务器具有多个网卡驱动,因此可以配置多个EndPoint。旧版本的Kafka使用host.name来标识集群中的主机身份,到了新版本的kakfa,则改为使用listeners和advertised.listeners 进行配置,这主要用于复杂网络环境下的Kafka配置,比如,我们的Kafka Server安装在亚马逊云上,三台服务器,每台服务器都有一个内网IP和外网IP,Server之间的通信走内网IP以节省网络流量,而外网用户的访问则只能使用公网IP,这时候我们就可以将内网IP的host配置在 private[kafka] class Acceptor(val endPoint: EndPoint,val sendBufferSize: Int,val recvBufferSize: Int,brokerId: Int,processors: Array[Processor],connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
private val nioSelector = NSelector.open()
val serverChannel = openServerSocket(endPoint.host,endPoint.port)//创建一个ServerSocketChannel,监听endPoint.host,endPoint.port套接字
//Acceptor被构造的时候就会启动所有的processor线程
this.synchronized {
//每个processor创建一个单独线程
processors.foreach { processor =>
Utils.newThread("kafka-network-thread-%d-%s-%d".format(brokerId,endPoint.protocolType.toString,processor.id),processor,false).start()
}
}
Acceptor的构造方法中,首先通过openServerSocket()打开自己负责的EndPoint的Socket,即打开端口并启动监听。 Acceptor使用 当Acceptor()完成启动,会调用 /** * Record that the thread startup is complete */
protected def startupComplete() = {
startupLatch.countDown()
}
方法将startupLatch置位为0,通知上层SocketServer自己已经完成启动。此时上层SocketServer正通过调用startupLatch.await一直阻塞等待当前acceptor完成初始化。当SocketServer管理的所有Acceptor均完成了启动,SocketServer完成启动。 def run() {
//向selector注册channel,可以接收ACCEPT事件,只有非阻塞的serverChannel才可以注册给Selector
serverChannel.register(nioSelector,SelectionKey.OP_ACCEPT)
startupComplete() //启动完成的标记,放在run()方法的第一行,说明当确认线程开始运行,则认为启动成功,启动过以后就是运行了
try {
var currentProcessor = 0
while (isRunning) {///无限循环,持续等待OP_ACCEPT事件发生
try {
val ready = nioSelector.select(500)
if (ready > 0) {//已经有对应的Accept事件发生
val keys = nioSelector.selectedKeys()//取出发了了对应事件的事件的key,即有连接事件发生
val iter = keys.iterator()
while (iter.hasNext && isRunning) {
try {
val key = iter.next
iter.remove()//NIO的通用做法,取出一个有相关事件发生的channel以后,必须remove掉对应的SelectionKey,防止下次重复取出
if (key.isAcceptable)//
accept(key,processors(currentProcessor))//通过round-robin的方式取出一个acceptor进行处理
else
throw new IllegalStateException("Unrecognized key state for acceptor thread.")
//round-robin序号增1,下一个连接会取出下一个processor
currentProcessor = (currentProcessor + 1) % processors.length
} catch {
case e: Throwable => error("Error while accepting connection",e)
}
}
}
}
catch { //略 }
}
} finally { //略 }
}
Acceptor线程的run()方法,是不断监听对应ServerChannel上的连接请求,如果有新的连接请求,就选择出一个Processor,用来处理这个请求,将这个新连接交付给Processor是在方法 def accept(key: SelectionKey,processor: Processor) {
val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]//取出channel
val socketChannel = serverSocketChannel.accept()//创建socketChannel,专门负责与这个客户端的连接
try {
//socketChannel参数设置
processor.accept(socketChannel)//将SocketChannel交给process进行处理
} catch {
//异常处理
}
}
//Processor.accept():
/** * Queue up a new connection for reading */
def accept(socketChannel: SocketChannel) {
newConnections.add(socketChannel)
wakeup()
}
Processor将这个新的SocketChannel加入到自己维护的 4.ProcessorProcessor负责不断检查Acceptor是否有交付给自己新的连接,如果有,就负责这个channel上消息的读写操作,即:监听自己维护的所有channel上的读请求,解析并交付给对应的业务处理逻辑;监听新的写操作,把服务端的响应数据通过SocketChannel准确地发送给客户端。 注意,每一个Processor都维护了一个单独的KSelector对象,这个KSelector只负责这个Processor上所有channel的监听。这样最大程度上保证了不同Processor线程之间的完全并行和业务隔离,尽管,在异步IO情况下,一个Selector负责成百上千个socketChannel的状态监控也不会带来效率问题。 override def run() {
startupComplete()//表示初始化流程已经结束,通过这个CountDownLatch代表初始化已经结束,这个Processor已经开始正常运行了
while (isRunning) {
try {
// setup any new connections that have been queued up
configureNewConnections()//为已经接受的请求注册OR_READ事件
// register any new responses for writing
processNewResponses()//处理响应队列,这个响应队列是Handler线程处理以后的结果,会交付给RequestChannel.responseQueue.同时调用unmute,开始接受请求
poll() //调用KSelector.poll(),进行真正的数据读写
processCompletedReceives()//调用mute,停止接受新的请求
processCompletedSends()
processDisconnected()
} catch {
//异常处理 略
}
debug("Closing selector - processor " + id)
swallowError(closeAll())
shutdownComplete()
}
在
我们先来看Processor对于已经收到的请求,是如何交付给业务端进行处理的: * 将completedReceived中的对象进行封装,交付给requestQueue.completRequets
*/
private def processCompletedReceives() {
selector.completedReceives.asScala.foreach { receive =>//每一个receive是一个NetworkReceivedui'xiagn
try {
//receive.source代表了这个请求的发送者的身份,KSelector保存了channel另一端的身份和对应的SocketChannel之间的对应关系
val channel = selector.channel(receive.source)
val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE,channel.principal.getName),channel.socketAddress)
val req = RequestChannel.Request(processor = id,connectionId = receive.source,session = session,buffer = receive.payload,startTimeMs = time.milliseconds,securityProtocol = protocol)
requestChannel.sendRequest(req)//将请求通过RequestChannel.requestQueue交付给Handler
selector.mute(receive.source)//不再接受Read请求,发送响应之前,不可以再接收任何请求
} catch {
//异常处理 略
}
}
}
5. RequestChannelRequestChannel负责消息从网络层转接到业务层,以及将业务层的处理结果交付给网络层进而返回给客户端。每一个SocketServer只有一个RequestChannel对象,在SocketServer中构造。 //创建RequestChannel,maxQueuedRequests)
我们一起来看RequestChannel的构造方法: class RequestChannel(val numProcessors: Int,val queueSize: Int) extends KafkaMetricsGroup {
private var responseListeners: List[(Int) => Unit] = Nil
//request存放了所有Processor接收到的远程请求,负责把requestQueue中的请求交付给具体业务逻辑进行处理
private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)
//responseQueues存放了所有Processor的带出来的response,即每一个Processor都有一个response queue
private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)
for(i <- 0 until numProcessors) //初始化responseQueues
responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()
//一些metrics用来监控request和response的数量,代码略
}
RequestChannel构造方法中初始化了requestQueue,用来存放网络层接收到的请求,这些请求即将交付给业务层进行处理。同时,初始化了responseQueues,为每一个Processor建立了一个response队列,用来存放这个Processor的一个或者多个Response,这些response即将交付给网络层返回给客户端。 6. KafkaRequestHandler线程和KafkaRequestHandlerPool线程池在上面讲到 /* start processing requests */
apis = new KafkaApis(socketServer.requestChannel,replicaManager,groupCoordinator,kafkaController,zkUtils,config.brokerId,config,metadataCache,metrics,authorizer)
//KafkaRequestHandlerPool线程池,用来管理所有KafkaRequestHandler线程
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId,socketServer.requestChannel,apis,config.numIoThreads)
这里,KafkaApis是Kafka的API接口层,可以理解为一个工具类,职责就是解析请求然后获取请求类型,根据请求类型将请求交付给对应的业务层,可见,KafkaApis是实现了网络层到业务层的真正映射关系,下文会详解。 class KafkaRequestHandlerPool(val brokerId: Int,val requestChannel: RequestChannel,val apis: KafkaApis,numThreads: Int) extends Logging with KafkaMetricsGroup {
/* a meter to track the average free capacity of the request handlers */
private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent","percent",TimeUnit.NANOSECONDS)
this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "],"
val threads = new Array[Thread](numThreads)
//初始化由KafkaRequestHandler线程构成的线程数组
val runnables = new Array[KafkaRequestHandler](numThreads)
for(i <- 0 until numThreads) {
runnables(i) = new KafkaRequestHandler(i,aggregateIdleMeter,numThreads,requestChannel,apis)
threads(i) = Utils.daemonThread("kafka-request-handler-" + i,runnables(i))
threads(i).start()
}
KafkaRequestHandlerPool构造方法中初始化并启动了多个KafkaRequestHandler线程对象,线程池大小通过Kafka配置文件配置项 KafkaRequestHandlerPool线程池中的所有KafkaRequestHandler,通过竞争方式从 def run() {
while(true) {
try {
var req : RequestChannel.Request = null
while (req == null) {
//略
req = requestChannel.receiveRequest(300)//从RequestChannel.requestQueue中取出请求
//略
apis.handle(req)//调用KafkaApi.handle(),将请求交付给业务
} catch {}
}
}
6.KafkaApisKafkaApis类似一个工具类,解析用户请求并将请求交付给业务层,我们可以把它看做Kafka的API层。从上面 def handle(request: RequestChannel.Request) {
try {
ApiKeys.forId(request.requestId) match {
case ApiKeys.PRODUCE => handleProducerRequest(request)
case ApiKeys.FETCH => handleFetchRequest(request)
case ApiKeys.LIST_OFFSETS => handleOffsetRequest(request)
case ApiKeys.METADATA => handleTopicMetadataRequest(request)
case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
//其它ApiKeys,略
}
} catch { //异常处理,略 }
} finally{
request.apiLocalCompleteTimeMs = SystemTime.milliseconds
}
}
通过Switch-Case代码块,根据请求中的requestId,将请求交付给 结束这样,我们通过对Acceptor、Processor、RequestChannel、KafkaRequestHandler以及KafkaApis多个角色的解析,完成了整个Kafka的消息流通闭环,即从客户端建立连接、发送请求给Kafka Server进行处理、Kafka Server将请求交付给具体业务进行处理、业务将处理结果返回给网络层、网络层将结果通过NIO返回给客户端的整个流程。同时,由于多Processor线程、以及KafkaRequestHandlerPoll线程池的存在,通过交付-获取的方式而不是阻塞等待的方式,让整个消息处理实现完全的异步化,各个角色各司其职,模块之间无耦合,线程之间或者相互竞争任务,或者被上层安排处理部分任务,整个效率非常高,结构也相当清晰。Processor线程的数量、KafkaRequestHandlerPool线程池可配置,因此可以根据CPU以及内存性能,合理调整Kafka Server的并行程度和处理能力。对Kafka基于Reactor模式的网络层的理解,以及消息从网络层到业务层交付逻辑的理解,非常有利于我们对Kafka集群的管理以及对Kafka问题原因的排查诊断,同时,也可以基于各个不同的角色暴露出来的一个java metrics,我们可以对Kafka进行有效的调整优化。 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |