本文介绍了多个码头工容器日志的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我试图一次从多个码头集装箱取得日志(订单无关紧要)。如果 types.ContainerLogsOption.Follow
被设置为false,这可以按预期工作。如果 types.ContainerLogsOption.Follow
设置为true,则有时日志输出会在几个日志之后卡住,而且没有跟进日志打印到标准输出。
另外,如果我重新启动一个或全部容器的命令不会像 docker logs -f containerName
那样退出。
func(w * Whatever)Logs(options LogOptions){
readers:= [] io.Reader {}
for _,container:= range options.Containers {
responseBody,err:= w.Docker.Client.ContainerLogs(context.Background(),container,types.ContainerLogsOptions {
ShowStdout:true,
ShowStderr:true,
关注: options.Follow,
})
推迟responseBody.Close()
if err!= nil {
log.Fatal(err)
}
reader = append(reader,responseBody)
}
//连接所有阅读器到一个
multiReader:= io.MultiReader(阅读器...)
_,err:= stdcopy.StdCopy(os.Stdout,os.Stderr ,multiReader)
if err!= nil&& err!= io.EOF {
log.Fatal(err)
}
}
基本上,我的实现与 docker logs
,因此我想知道是什么导致了这个问题。
io.MultiReader
。您需要做的是从每个响应中分别读取每个响应并合并输出。由于你正在处理日志,所以拆分换行符的读法是有意义的。 bufio.Scanner
对单个 io.Reader
执行此操作。因此,一种选择是创建一种可同时扫描多个阅读器的新类型。
您可以像这样使用它:
scanner:= NewConcurrentScanner(阅读器...)
用于scanner.Scan(){
fmt.Println(scanner.Text())
}
if err:= scanner.Err(); err!= nil {
log.Fatalln(err)
}
示例一个并发扫描器的实现:
pre $ // ConcurrentScanner像io.Scanner一样工作,但是有多个io.Readers
类型ConcurrentScanner结构{
扫描chan []字节//读取扫描数据
错误chan错误//读者错误
完成chan struct {} //表示所有读者已完成
取消func()//取消所有阅读器(在第一个错误时停止)
data [] byte //上次扫描值
错误错误
}
// NewConcurrentScanner开始在单独的goroutine
//中扫描每个阅读器并返回一个* ConcurrentScanner。
func NewConcurrentScanner(阅读器... io.Reader)* ConcurrentScanner {
ctx,cancel:= context.WithCancel(context.Background())
s:=& ConcurrentScanner {
扫描:make(chan [] byte),
errors:make(chan error),
done:make(chan struct {}),
cancel:cancel,
}
var wg sync.WaitGroup
wg.Add(len(读者))
for _,reader:=范围读者{
//开始在它自己的goroutine中为每个阅读器提供扫描仪。
go扫描仪:= bufio.NewScanner(阅读器)
for scanner.Scan()($阅读器io.Reader){
defer wg.Done()
扫描仪: {
select {
case s.scans< - scanner.Bytes():
//当有数据时,将它发送到s.scans,
//这将直到调用Scan()为止。
case< -ctx.Done():
//当上下文被取消时触发,
//表示我们现在应该退出。
return
}
}
if err:= scanner.Err(); err!= nil {
select {
case s.errors< - err:
// Reprort we got a error
case< -ctx.Done():
//现在退出,如果上下文被取消,否则发送
//错误并且这个goroutine永远不会退出。
返回
}
}
}(读者)
}
去func(){
//发出所有信号扫描仪已完成
wg.Wait()
关闭(s.done)
}()
返回s
}
func(s * ConcurrentScanner)Scan()bool {
select {
case s.data =< -s.scans:
//从扫描仪获取数据
返回true
case< -s.done:
//所有扫描器都完成了,无所事事。
case s.err =< -s.errors:
//其中一个扫描仪错误,已完成。
}
s.cancel()//取消上下文,无论我们如何退出。
返回false
}
func(s * ConcurrentScanner)Bytes()[] byte {
return s.data
}
func(s * ConcurrentScanner)Text()字符串{
返回字符串(s.data)
}
func(s * ConcurrentScanner)Err()error {
返回s.err
}
下面是它在Go游乐场:
您可以看到并发扫描器输出是交错的。而不是阅读所有的阅读者,然后转向下一个阅读者,如 io.MultiReader
。
所示。
I'm trying to get the logs from multiple docker containers at once (order doesn't matter). This works as expected if types.ContainerLogsOption.Follow
is set to false.
If types.ContainerLogsOption.Follow
is set to true sometimes the log output get stuck after a few logs and no follow up logs are printed to stdout.
If the output doesn't get stuck it works as expected.
Additionally if I restart one or all of the containers the command doesn't exit like docker logs -f containerName
does.
func (w *Whatever) Logs(options LogOptions) {
readers := []io.Reader{}
for _, container := range options.Containers {
responseBody, err := w.Docker.Client.ContainerLogs(context.Background(), container, types.ContainerLogsOptions{
ShowStdout: true,
ShowStderr: true,
Follow: options.Follow,
})
defer responseBody.Close()
if err != nil {
log.Fatal(err)
}
readers = append(readers, responseBody)
}
// concatenate all readers to one
multiReader := io.MultiReader(readers...)
_, err := stdcopy.StdCopy(os.Stdout, os.Stderr, multiReader)
if err != nil && err != io.EOF {
log.Fatal(err)
}
}
Basically there is no great difference in my implementation from that of docker logs
https://github.com/docker/docker/blob/master/cli/command/container/logs.go, hence I'm wondering what causes this issues.
解决方案
As JimB commented, that method won't work due to the operation of io.MultiReader
. What you need to do is read from each from each response individually and combine the output. Since you're dealing with logs, it would make sense to break up the reads on newlines. bufio.Scanner
does this for a single io.Reader
. So one option would be to create a new type that scans multiple readers concurrently.
You could use it like this:
scanner := NewConcurrentScanner(readers...)
for scanner.Scan() {
fmt.Println(scanner.Text())
}
if err := scanner.Err(); err != nil {
log.Fatalln(err)
}
Example implementation of a concurrent scanner:
// ConcurrentScanner works like io.Scanner, but with multiple io.Readers
type ConcurrentScanner struct {
scans chan []byte // Scanned data from readers
errors chan error // Errors from readers
done chan struct{} // Signal that all readers have completed
cancel func() // Cancel all readers (stop on first error)
data []byte // Last scanned value
err error
}
// NewConcurrentScanner starts scanning each reader in a separate goroutine
// and returns a *ConcurrentScanner.
func NewConcurrentScanner(readers ...io.Reader) *ConcurrentScanner {
ctx, cancel := context.WithCancel(context.Background())
s := &ConcurrentScanner{
scans: make(chan []byte),
errors: make(chan error),
done: make(chan struct{}),
cancel: cancel,
}
var wg sync.WaitGroup
wg.Add(len(readers))
for _, reader := range readers {
// Start a scanner for each reader in it's own goroutine.
go func(reader io.Reader) {
defer wg.Done()
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
select {
case s.scans <- scanner.Bytes():
// While there is data, send it to s.scans,
// this will block until Scan() is called.
case <-ctx.Done():
// This fires when context is cancelled,
// indicating that we should exit now.
return
}
}
if err := scanner.Err(); err != nil {
select {
case s.errors <- err:
// Reprort we got an error
case <-ctx.Done():
// Exit now if context was cancelled, otherwise sending
// the error and this goroutine will never exit.
return
}
}
}(reader)
}
go func() {
// Signal that all scanners have completed
wg.Wait()
close(s.done)
}()
return s
}
func (s *ConcurrentScanner) Scan() bool {
select {
case s.data = <-s.scans:
// Got data from a scanner
return true
case <-s.done:
// All scanners are done, nothing to do.
case s.err = <-s.errors:
// One of the scanners error'd, were done.
}
s.cancel() // Cancel context regardless of how we exited.
return false
}
func (s *ConcurrentScanner) Bytes() []byte {
return s.data
}
func (s *ConcurrentScanner) Text() string {
return string(s.data)
}
func (s *ConcurrentScanner) Err() error {
return s.err
}
Here's an example of it working in the Go Playground: https://play.golang.org/p/EUB0K2V7iT
You can see that the concurrent scanner output is interleaved. Rather than reading all of one reader, then moving on to the next, as is seen with io.MultiReader
.
这篇关于多个码头工容器日志的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!