我有一个存储接收到的数据的 channel ,我想在满足以下条件之一时进行处理:
1, channel 达到其容量。
2,自上次处理以来,计时器被触发。
我看到了帖子
Golang - How to know a buffered channel is full
更新:
我从该帖子和OneOfOne的建议中得到启发,这是play:
package main
import (
"fmt"
"math/rand"
"time"
)
var c chan int
var timer *time.Timer
const (
capacity = 5
timerDration = 3
)
func main() {
c = make(chan int, capacity)
timer = time.NewTimer(time.Second * timerDration)
go checkTimer()
go sendRecords("A")
go sendRecords("B")
go sendRecords("C")
time.Sleep(time.Second * 20)
}
func sendRecords(name string) {
for i := 0; i < 20; i++ {
fmt.Println(name+" sending record....", i)
sendOneRecord(i)
interval := time.Duration(rand.Intn(500))
time.Sleep(time.Millisecond * interval)
}
}
func sendOneRecord(record int) {
select {
case c <- record:
default:
fmt.Println("channel is full !!!")
process()
c <- record
timer.Reset(time.Second * timerDration)
}
}
func checkTimer() {
for {
select {
case <-timer.C:
fmt.Println("3s timer ----------")
process()
timer.Reset(time.Second * timerDration)
}
}
}
func process() {
for i := 0; i < capacity; i++ {
fmt.Println("process......", <-c)
}
}
这似乎可以正常工作,但是我有一个问题,我想在调用process()时阻止其他goroutine写入 channel ,上面的代码可以做到吗?还是应该在处理方法的开头添加互斥锁?
有什么优雅的解决方案吗?
最佳答案
如@OneOfOne所述,select实际上是检查 channel 是否已满的唯一方法。
如果使用 channel 进行批处理,则始终可以创建一个无缓冲 channel ,并具有goroutine提取项并附加到 slice 。
当 slice 达到特定大小时,处理项目。
这是play的示例
package main
import (
"fmt"
"sync"
"time"
)
const BATCH_SIZE = 10
func batchProcessor(ch <-chan int) {
batch := make([]int, 0, BATCH_SIZE)
for i := range ch {
batch = append(batch, i)
if len(batch) == BATCH_SIZE {
fmt.Println("Process batch:", batch)
time.Sleep(time.Second)
batch = batch[:0] // trim back to zero size
}
}
fmt.Println("Process last batch:", batch)
}
func main() {
var wg sync.WaitGroup
ch := make(chan int)
wg.Add(1)
go func() {
batchProcessor(ch)
wg.Done()
}()
fmt.Println("Submitting tasks")
for i := 0; i < 55; i++ {
ch <- i
}
close(ch)
wg.Wait()
}
关于go - 如何在Golang中正确处理缓冲 channel ?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/38417297/