问题描述
我有一个新的TCP服务器,在Go上写有100多个客户端。每个客户端传输的数据都需要集中查看,因为他们正在通过来自各个位置的空气波查看无线数据包,然后进行分析。代码可以工作,但是我看到很多关于锁定的争论和增加的CPU,并且关于如何避免锁定(如果可能)或围绕它进行优化的想法。
当TCP服务器为接收到的每个数据包启动GoRoutine时, addMessage
函数需要一定程度的同步。这些数据包也会在稍后的另一个函数中进行分析,在地图上执行 RLock()
。
每秒被调用一次的 cullMessages()
函数真的会被自动调用,并且可能真的放慢速度,有时需要2-3秒来运行问题接下来的2-3个操作将排队等待解锁并立即运行!
任何想法/想法将不胜感激!
var dataMessagesMutex sync.RWMutex
var dataMessages map [string] [] * trackingPacket_v1
//每个TCP客户端调用函数需要分享这些数据
func addMessage(trackingPacket * trackingPacket_v1){
dataMessagesMutex.Lock()
dataMessages [trackingPacket.packetID] = append(dataMessages [trackingPacket.packetID],trackingPacket)
dataMessagesMutex.Unlock()
}
//在循环中调用的函数,需要根据年龄删除
func cullM消息(){
cullTS:= time.Now().Add(-time.Second * MODES_MAX_MESSAGE_AGE)
dataMessagesMutex.Lock()
defer dataMessagesMutex.Unlock()
for avr,data:= range dataMessages {
sort.Sort(PacketSorter(data))
highestIndex:= 0
for i,messages: =范围数据{
if cullTS.Sub(messages.ProcessedTime)> 0 {
//在这里需要删除消息
messages = nil
highestIndex = i
}
}
//将新切片复制到数据变量
data = data [highestIndex + 1:]
if len(data)== 0 {
//空消息,删除
delete(dataMessages, avr)
}
}
}
更新:
添加了分析函数
$ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $'$'$'$'$'$' RUnlock()
用于_,data:=范围dataMessages {
numberOfMessages:= len(data)
for a:= 0; a< numberOfMessages;对于b:= 0,a ++ {
packetA:= data [a]
applicablePackets:= [] * trackingPacket_v1 {packetA}
; b< numberOfMessages; b ++ {
//不比较相同的数据包
如果b == a {
continue
}
packetB:= data [b]
//只有当它在可接受的
//时间戳阈值内时才考虑这个数据包
tsDelta:= math.Abs(packetA.NormalisedTS - packetB.NormalisedTS)
if tsDelta< MAX_MESSAGE_TS_DIFF {
//最后,我们需要确保每个
//台只有一条消息被包含在批处理
stationAlreadyRepresented中:= false
for i:= 0;我< LEN(applicablePackets); i ++ {
if applicablePackets [i] .Sharecode == packetB.Sharecode {
stationAlreadyRepresented = true
}
}
if stationAlreadyRepresented == false {
applicablePackets = append(applicablePackets,packetB)
}
}
}
//删除任何被认为距离太近的电台另一个
如果len(applicablePackets)> = MIN_STATIONS_NEEDED {
applicablePackets = cullPackets(applicablePackets)
}
//假设我们仍然有足够的数据包...... 。
if len(applicablePackets)> = MIN_STATIONS_NEEDED {
//为这个批次生成一个散列...
hash:= generateHashForPackets(applicablePackets)
batchIsUnique: = true
for _,packet:= range applicablePackets {
if packet.containsHash(hash){
batchIsUnique = false
break
}
if batchIsUnique == true {
for _,packet:= range applicablePackets {
packet.addHash(hash)
}
go sendOfDataForWork(applicablePackets)
}
}
}
}
}
不是有一张大地图,而是每个packetID都有一个goroutine。调度程序goroutine可以有一个 map [string] chan * trackingPacket_v1
,并在相应的通道上发送传入的数据包。然后,该packetID的goroutine会将数据包收集到一个本地片中,然后剔除它们并间隔分析它们。
不知何故,您需要终止那些没有 t在MODES_MAX_MESSAGE_AGE中接收到一个数据包。调度员的goroutine可能会跟踪每个packetID最近被看到的时间,并定期检查并检查是否过时。然后它会关闭这些频道并将其从地图上移除。分析goroutine发现它的频道已关闭时,它将退出。
I have a new TCP server written in Go that has 100+ clients attached to it. Each client streams in data that needs to be looked at centrally as they are looking at radio packets over the air waves from various locations which then get analysed. The code works but I am seeing a lot of contention and increased CPU around the locking and was after some thoughts on how to avoid the locking (if possible) or optimise around it.
As the TCP server spins up a GoRoutine for each packet received the addMessage
function needs a level of synchronisation. These packets also get analysed in another function later on that does a RLock()
on the map.
It is the cullMessages()
function that gets called once per second that really gets caught up in itself and can really slow down, sometimes taking 2-3 seconds to run which compounds the issue as the next 2-3 operations are queued waiting to unlock and run straight away!
Any ideas/thoughts would be appreciated!
var dataMessagesMutex sync.RWMutex
var dataMessages map[string][]*trackingPacket_v1
// Function is called from each TCP client who need to share this data
func addMessage(trackingPacket *trackingPacket_v1) {
dataMessagesMutex.Lock()
dataMessages[trackingPacket.packetID] = append(dataMessages[trackingPacket.packetID], trackingPacket)
dataMessagesMutex.Unlock()
}
// Function called on a loop, need to delete based on age here
func cullMessages() {
cullTS := time.Now().Add(-time.Second * MODES_MAX_MESSAGE_AGE)
dataMessagesMutex.Lock()
defer dataMessagesMutex.Unlock()
for avr, data := range dataMessages {
sort.Sort(PacketSorter(data))
highestIndex := 0
for i, messages := range data {
if cullTS.Sub(messages.ProcessedTime) > 0 {
// Need to delete the message here
messages = nil
highestIndex = i
}
}
// Copy the new slice into the data variable
data = data[highestIndex+1:]
if len(data) == 0 {
// Empty Messages, delete
delete(dataMessages, avr)
}
}
}
UPDATE:Added analysis function
func processCandidates() {
mlatMessagesMutex.RLock()
defer dataMessagesMutex.RUnlock()
for _, data := range dataMessages {
numberOfMessages := len(data)
for a := 0; a < numberOfMessages; a++ {
packetA := data[a]
applicablePackets := []*trackingPacket_v1{packetA}
for b := 0; b < numberOfMessages; b++ {
// Don't compare identical packets
if b == a {
continue
}
packetB := data[b]
// Only consider this packet if it's within an acceptable
// timestamp threshold
tsDelta := math.Abs(packetA.NormalisedTS - packetB.NormalisedTS)
if tsDelta < MAX_MESSAGE_TS_DIFF {
// Finally, we need to make sure that only one message per
// station is included in our batch
stationAlreadyRepresented := false
for i := 0; i < len(applicablePackets); i++ {
if applicablePackets[i].Sharecode == packetB.Sharecode {
stationAlreadyRepresented = true
}
}
if stationAlreadyRepresented == false {
applicablePackets = append(applicablePackets, packetB)
}
}
}
// Remove any stations which are deemed too close to one another
if len(applicablePackets) >= MIN_STATIONS_NEEDED {
applicablePackets = cullPackets(applicablePackets)
}
// Provided we still have enough packets....
if len(applicablePackets) >= MIN_STATIONS_NEEDED {
// Generate a hash for this batch...
hash := generateHashForPackets(applicablePackets)
batchIsUnique := true
for _, packet := range applicablePackets {
if packet.containsHash(hash) {
batchIsUnique = false
break
}
}
if batchIsUnique == true {
for _, packet := range applicablePackets {
packet.addHash(hash)
}
go sendOfDataForWork(applicablePackets)
}
}
}
}
}
Instead of having one big map, have a goroutine for each packetID. A dispatcher goroutine could have a map[string]chan *trackingPacket_v1
, and send the incoming packets on the appropriate channel. Then the goroutine for that packetID would collect the packets into a local slice, and cull them and analyze them at intervals.
Somehow you would need to terminate the goroutines that haven't received a packet in MODES_MAX_MESSAGE_AGE. Probably the dispatcher goroutine would keep track of when each packetID was most recently seen, and periodically go through and check for ones that were too old. Then it would close those channels and remove them from its map. When the analysis goroutine discovered that its channel had been closed, it would exit.
这篇关于大量的瞬态对象 - 避免争用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!