一、Kafka通信机制的整体结构
同时,这也是SEDA多线程模型。
- 对于broker来说,客户端连接数量有限,不会频繁新建大量连接。因此一个Acceptor thread线程处理新建连接绰绰有余。
- Kafka高吐吞量,则要求broker接收和发送数据必须快速,因此用proccssor thread线程池处理,并把读取客户端数据转交给缓冲区,不会导致客户端请求大量堆积。
- Kafka磁盘操作比较频繁会且有io阻塞或等待,IO Thread线程数量一般设置为proccssor thread num两倍,可以根据运行环境需要进行调节。
二、SocketServer整体设计时序图
Kafka SocketServer是基于Java NIO来开发的,采用了Reactor的模式,其中包含了1个Acceptor负责接受客户端请求,N个Processor线程负责读写数据,M个Handler来处理业务逻辑。在Acceptor和Processor,Processor和Handler之间都有队列来缓冲请求。
下面我们就针对以上整体设计思路分开讲解各个不同部分的源代码。
2.1 启动初始化工作
/**
* Start the socket server
*/
def startup() {
this.synchronized {
connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
val sendBufferSize = config.socketSendBufferBytes
val recvBufferSize = config.socketReceiveBufferBytes
val brokerId = config.brokerId
var processorBeginIndex = 0
endpoints.values.foreach { endpoint =>
val protocol = endpoint.protocolType
val processorEndIndex = processorBeginIndex + numProcessorThreads
for (i <- processorBeginIndex until processorEndIndex)
processors(i) = newProcessor(i, connectionQuotas, protocol)
val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
acceptors.put(endpoint, acceptor)
Utils.newThread("kafka-socket-acceptor-%s-%d".format(protocol.toString, endpoint.port), acceptor, false).start()
acceptor.awaitStartup()
processorBeginIndex = processorEndIndex
}
}
newGauge("NetworkProcessorAvgIdlePercent",
new Gauge[Double] {
def value = allMetricNames.map( metricName =>
metrics.metrics().get(metricName).value()).sum / totalProcessorThreads
}
)
info("Started " + acceptors.size + " acceptor threads")
}
- ConnectionQuotas对象负责管理连接数/IP,
- 创建一个Acceptor侦听者线程,初始化N个Processor线程,processors是一个线程数组,可以作为线程池使用,默认是三个.
- Acceptor线程和N个Processor线程中每个线程都独立创建Selector.open()多路复用器.
val numProcessorThreads = config.numNetworkThreads //num.network.threads=3 配置的是16
val serverChannel = openServerSocket(endPoint.host, endPoint.port)
2.2 Acceptor线程
/**
* Accept loop that checks for new connection attempts
*/
def run() {
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT) // 2.1.1 注册OP_ACCEPT事件
startupComplete()
try {
var currentProcessor = 0
while (isRunning) {
try {
val ready = nioSelector.select(500) // 2.1.2 采用的是同步非阻塞逻辑,每隔500MS轮询一次
if (ready > 0) {
val keys = nioSelector.selectedKeys()
val iter = keys.iterator()
while (iter.hasNext && isRunning) {
try {
val key = iter.next
iter.remove()
if (key.isAcceptable)
accept(key, processors(currentProcessor)) // 2.1.2 将代码添加到newConnections队列之后返回
else
throw new IllegalStateException("Unrecognized key state for acceptor thread.")
// 2.1.2 当有请求到来的时候采用轮询的方式获取一个Processor线程处理请求
currentProcessor = (currentProcessor + 1) % processors.length
} catch {
case e: Throwable => error("Error while accepting connection", e)
}
}
}
}
catch {
// We catch all the throwables to prevent the acceptor thread from exiting on exceptions due
// to a select operation on a specific channel or a bad request. We don't want the
// the broker to stop responding to requests from other clients in these scenarios.
case e: ControlThrowable => throw e
case e: Throwable => error("Error occurred", e)
}
}
} finally {
debug("Closing server socket and selector.")
swallowError(serverChannel.close())
swallowError(nioSelector.close())
shutdownComplete()
}
}
2.1.1 注册OP_ACCEPT事件
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
2.1.2 内部逻辑
此处采用的是同步非阻塞逻辑,每隔500MS轮询一次.
当有请求到来的时候采用轮询的方式获取一个Processor线程处理请求,代码如下:
之后将代码添加到newConnections队列之后返回,代码如下:
def accept(socketChannel: SocketChannel) { newConnections.add(socketChannel) wakeup()}
//newConnections是一个线程安全的队列,存放SocketChannel通道
private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()
2.3 kafka.net.Processor
override def run() {
startupComplete()
while (isRunning) {
try {
// setup any new connections that have been queued up
configureNewConnections()
// register any new responses for writing
processNewResponses()
poll()
processCompletedReceives()
processCompletedSends()
processDisconnected()
} catch {
// We catch all the throwables here to prevent the processor thread from exiting. We do this because
// letting a processor exit might cause a bigger impact on the broker. Usually the exceptions thrown would
// be either associated with a specific socket channel or a bad request. We just ignore the bad socket channel
// or request. This behavior might need to be reviewed if we see an exception that need the entire broker to stop.
case e: ControlThrowable => throw e
case e: Throwable =>
error("Processor got uncaught exception.", e)
}
}
debug("Closing selector - processor " + id)
swallowError(closeAll())
shutdownComplete()
}
先来重点看一下configureNewConnections这个方法:
/**
* Register any new connections that have been queued up
*/
private def configureNewConnections() {
while (!newConnections.isEmpty) {
val channel = newConnections.poll()
debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}")
selector.register(connectionId, channel)
}
}
循环判断NewConnections的大小,如果有值则弹出,并且注册为OP_READ读事件。
再回到主逻辑看一下read方法。(这个方法估计在0.10的代码中被废弃了,没有找到。)
- 把当前SelectionKey和事件循环时间放入LRU映射表中,将来检查时回收连接资源。
- 建立BoundedByteBufferReceive对象,具体读取操作由这个对象的readFrom方法负责进行,返回读取的字节大小。
- 如果读取完成,则修改状态为receive.complete,并通过requestChannel.sendRequest(req)将封装好的Request对象放到RequestQueue队列中。
- 如果没有读取完成,则让selector继续侦听OP_READ事件。
2.4 kafka.server.KafkaRequestHandler
/**
* A thread that answers kafka requests.
*/
def run() {
while(true) {
var req : RequestChannel.Request = null
while (req == null) {
// We use a single meter for aggregate idle percentage for the thread pool.
// Since meter is calculated as total_recorded_value / time_window and
// time_window is independent of the number of threads, each recorded idle
// time should be discounted by # threads.
val startSelectTime = SystemTime.nanoseconds
req = requestChannel.receiveRequest(300)
val idleTime = SystemTime.nanoseconds - startSelectTime
aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
}
if(req eq RequestChannel.AllDone) {
debug("Kafka request handler %d on broker %d received shut down command".format(
id, brokerId))
return
}
req.requestDequeueTimeMs = SystemTime.milliseconds
trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))
apis.handle(req)
}
}
KafkaRequestHandler也是一个事件处理线程,不断的循环读取requestQueue队列中的Request请求数据,其中超时时间设置为300MS,并将请求发送到 apis.handle
方法中处理,并将请求响应结果放到responseQueue队列中去。
RequestKeys.ProduceKey | producer请求 | ProducerRequest |
RequestKeys.FetchKey | consumer请求 | FetchRequest |
RequestKeys.OffsetsKey | topic的offset请求 | OffsetRequest |
RequestKeys.MetadataKey | topic元数据请求 | TopicMetadataRequest |
RequestKeys.LeaderAndIsrKey | leader和isr信息更新请求 | LeaderAndIsrRequest |
RequestKeys.StopReplicaKey | 停止replica请求 | StopReplicaRequest |
RequestKeys.UpdateMetadataKey | 更新元数据请求 | UpdateMetadataRequest |
RequestKeys.ControlledShutdownKey | controlledShutdown请求 | ControlledShutdownRequest |
RequestKeys.OffsetCommitKey | commitOffset请求 | OffsetCommitRequest |
RequestKeys.OffsetFetchKey | consumer的offset请求 | OffsetFetchRequest |
2.5 Processor响应数据处理
def processNewResponses() {
var curr = requestChannel.receiveResponse(id)
while (curr != null) {
try {
curr.responseAction match {
case RequestChannel.NoOpAction =>
// There is no response to send to the client, we need to read more pipelined requests
// that are sitting in the server's socket buffer
curr.request.updateRequestMetrics
trace("Socket server received empty response to send, registering for read: " + curr)
selector.unmute(curr.request.connectionId)
case RequestChannel.SendAction =>
sendResponse(curr)
case RequestChannel.CloseConnectionAction =>
curr.request.updateRequestMetrics
trace("Closing socket connection actively according to the response code.")
close(selector, curr.request.connectionId)
}
} finally {
curr = requestChannel.receiveResponse(id)
}
}
}
我们回到Processor线程类中,processNewRequest()
方法是发送请求,那么会调用processNewResponses()
来处理Handler提供给客户端的Response,把requestChannel
中responseQueue的Response取出来,注册OP_WRITE
事件,将数据返回给客户端。