一、Kafka通信机制的整体结构

Kafka 0.8 NIO通信机制-LMLPHP

同时,这也是SEDA多线程模型。

  1. 对于broker来说,客户端连接数量有限,不会频繁新建大量连接。因此一个Acceptor thread线程处理新建连接绰绰有余。
  2. Kafka高吐吞量,则要求broker接收和发送数据必须快速,因此用proccssor thread线程池处理,并把读取客户端数据转交给缓冲区,不会导致客户端请求大量堆积。
  3. Kafka磁盘操作比较频繁会且有io阻塞或等待,IO Thread线程数量一般设置为proccssor thread num两倍,可以根据运行环境需要进行调节。

二、SocketServer整体设计时序图

Kafka 0.8 NIO通信机制-LMLPHP

Kafka SocketServer是基于Java NIO来开发的,采用了Reactor的模式,其中包含了1个Acceptor负责接受客户端请求,N个Processor线程负责读写数据,M个Handler来处理业务逻辑。在Acceptor和Processor,Processor和Handler之间都有队列来缓冲请求。

下面我们就针对以上整体设计思路分开讲解各个不同部分的源代码。

2.1 启动初始化工作

/**
* Start the socket server
*/
def startup() {
this.synchronized { connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides) val sendBufferSize = config.socketSendBufferBytes
val recvBufferSize = config.socketReceiveBufferBytes
val brokerId = config.brokerId var processorBeginIndex = 0
endpoints.values.foreach { endpoint =>
val protocol = endpoint.protocolType
val processorEndIndex = processorBeginIndex + numProcessorThreads for (i <- processorBeginIndex until processorEndIndex)
processors(i) = newProcessor(i, connectionQuotas, protocol) val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
acceptors.put(endpoint, acceptor)
Utils.newThread("kafka-socket-acceptor-%s-%d".format(protocol.toString, endpoint.port), acceptor, false).start()
acceptor.awaitStartup() processorBeginIndex = processorEndIndex
}
} newGauge("NetworkProcessorAvgIdlePercent",
new Gauge[Double] {
def value = allMetricNames.map( metricName =>
metrics.metrics().get(metricName).value()).sum / totalProcessorThreads
}
) info("Started " + acceptors.size + " acceptor threads")
}
  • ConnectionQuotas对象负责管理连接数/IP,
  • 创建一个Acceptor侦听者线程,初始化N个Processor线程,processors是一个线程数组,可以作为线程池使用,默认是三个.
  • Acceptor线程和N个Processor线程中每个线程都独立创建Selector.open()多路复用器.
val numProcessorThreads = config.numNetworkThreads //num.network.threads=3 配置的是16
val serverChannel = openServerSocket(endPoint.host, endPoint.port)

2.2 Acceptor线程

/**
* Accept loop that checks for new connection attempts
*/
def run() {
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT) // 2.1.1 注册OP_ACCEPT事件
startupComplete()
try {
var currentProcessor = 0
while (isRunning) {
try {
val ready = nioSelector.select(500) // 2.1.2 采用的是同步非阻塞逻辑,每隔500MS轮询一次
if (ready > 0) {
val keys = nioSelector.selectedKeys()
val iter = keys.iterator()
while (iter.hasNext && isRunning) {
try {
val key = iter.next
iter.remove()
if (key.isAcceptable)
accept(key, processors(currentProcessor)) // 2.1.2 将代码添加到newConnections队列之后返回
else
throw new IllegalStateException("Unrecognized key state for acceptor thread.") // 2.1.2 当有请求到来的时候采用轮询的方式获取一个Processor线程处理请求
currentProcessor = (currentProcessor + 1) % processors.length
} catch {
case e: Throwable => error("Error while accepting connection", e)
}
}
}
}
catch {
// We catch all the throwables to prevent the acceptor thread from exiting on exceptions due
// to a select operation on a specific channel or a bad request. We don't want the
// the broker to stop responding to requests from other clients in these scenarios.
case e: ControlThrowable => throw e
case e: Throwable => error("Error occurred", e)
}
}
} finally {
debug("Closing server socket and selector.")
swallowError(serverChannel.close())
swallowError(nioSelector.close())
shutdownComplete()
}
}

2.1.1 注册OP_ACCEPT事件

serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)

2.1.2 内部逻辑

此处采用的是同步非阻塞逻辑,每隔500MS轮询一次.

当有请求到来的时候采用轮询的方式获取一个Processor线程处理请求,代码如下:

之后将代码添加到newConnections队列之后返回,代码如下:

def accept(socketChannel: SocketChannel) {  newConnections.add(socketChannel)  wakeup()}

//newConnections是一个线程安全的队列,存放SocketChannel通道
private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()

2.3 kafka.net.Processor

override def run() {
startupComplete()
while (isRunning) {
try {
// setup any new connections that have been queued up
configureNewConnections()
// register any new responses for writing
processNewResponses()
poll()
processCompletedReceives()
processCompletedSends()
processDisconnected()
} catch {
// We catch all the throwables here to prevent the processor thread from exiting. We do this because
// letting a processor exit might cause a bigger impact on the broker. Usually the exceptions thrown would
// be either associated with a specific socket channel or a bad request. We just ignore the bad socket channel
// or request. This behavior might need to be reviewed if we see an exception that need the entire broker to stop.
case e: ControlThrowable => throw e
case e: Throwable =>
error("Processor got uncaught exception.", e)
}
} debug("Closing selector - processor " + id)
swallowError(closeAll())
shutdownComplete()
}

先来重点看一下configureNewConnections这个方法:

/**
* Register any new connections that have been queued up
*/
private def configureNewConnections() {
while (!newConnections.isEmpty) {
val channel = newConnections.poll()
debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}")
selector.register(connectionId, channel)
}
}

循环判断NewConnections的大小,如果有值则弹出,并且注册为OP_READ读事件。

再回到主逻辑看一下read方法。(这个方法估计在0.10的代码中被废弃了,没有找到。)

  1. 把当前SelectionKey和事件循环时间放入LRU映射表中,将来检查时回收连接资源。
  2. 建立BoundedByteBufferReceive对象,具体读取操作由这个对象的readFrom方法负责进行,返回读取的字节大小。
  • 如果读取完成,则修改状态为receive.complete,并通过requestChannel.sendRequest(req)将封装好的Request对象放到RequestQueue队列中。
  • 如果没有读取完成,则让selector继续侦听OP_READ事件。

2.4 kafka.server.KafkaRequestHandler

/**
* A thread that answers kafka requests.
*/
def run() {
while(true) {
var req : RequestChannel.Request = null
while (req == null) {
// We use a single meter for aggregate idle percentage for the thread pool.
// Since meter is calculated as total_recorded_value / time_window and
// time_window is independent of the number of threads, each recorded idle
// time should be discounted by # threads.
val startSelectTime = SystemTime.nanoseconds
req = requestChannel.receiveRequest(300)
val idleTime = SystemTime.nanoseconds - startSelectTime
aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
} if(req eq RequestChannel.AllDone) {
debug("Kafka request handler %d on broker %d received shut down command".format(
id, brokerId))
return
}
req.requestDequeueTimeMs = SystemTime.milliseconds
trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))
apis.handle(req)
}
}

KafkaRequestHandler也是一个事件处理线程,不断的循环读取requestQueue队列中的Request请求数据,其中超时时间设置为300MS,并将请求发送到 apis.handle方法中处理,并将请求响应结果放到responseQueue队列中去。

RequestKeys.ProduceKeyproducer请求ProducerRequest
RequestKeys.FetchKeyconsumer请求FetchRequest
RequestKeys.OffsetsKeytopic的offset请求OffsetRequest
RequestKeys.MetadataKeytopic元数据请求TopicMetadataRequest
RequestKeys.LeaderAndIsrKeyleader和isr信息更新请求LeaderAndIsrRequest
RequestKeys.StopReplicaKey停止replica请求StopReplicaRequest
RequestKeys.UpdateMetadataKey更新元数据请求UpdateMetadataRequest
RequestKeys.ControlledShutdownKeycontrolledShutdown请求ControlledShutdownRequest
RequestKeys.OffsetCommitKeycommitOffset请求OffsetCommitRequest
RequestKeys.OffsetFetchKeyconsumer的offset请求OffsetFetchRequest

2.5 Processor响应数据处理

def processNewResponses() {
var curr = requestChannel.receiveResponse(id)
while (curr != null) {
try {
curr.responseAction match {
case RequestChannel.NoOpAction =>
// There is no response to send to the client, we need to read more pipelined requests
// that are sitting in the server's socket buffer
curr.request.updateRequestMetrics
trace("Socket server received empty response to send, registering for read: " + curr)
selector.unmute(curr.request.connectionId)
case RequestChannel.SendAction =>
sendResponse(curr)
case RequestChannel.CloseConnectionAction =>
curr.request.updateRequestMetrics
trace("Closing socket connection actively according to the response code.")
close(selector, curr.request.connectionId)
}
} finally {
curr = requestChannel.receiveResponse(id)
}
}
}

我们回到Processor线程类中,processNewRequest()方法是发送请求,那么会调用processNewResponses()来处理Handler提供给客户端的Response,把requestChannel中responseQueue的Response取出来,注册OP_WRITE事件,将数据返回给客户端。

05-06 13:07