这篇文章来看下docker中的apiserver的实现。

我们从api := apiserver.New(serverConfig)这个代码开始看。来看下New的实现是什么,这里serverConfig结构体的内容为:

1
2
3
4
5
6
7
serverConfig := &apiserver.ServerConfig{
    Logging:     true,
    EnableCors:  daemonCfg.EnableCors,
    CorsHeaders: daemonCfg.CorsHeaders,
    Version:     dockerversion.VERSION,
}
serverConfig = setPlatformServerConfig(serverConfig, daemonCfg)

New的代码为:

1
2
3
4
5
6
7
8
9
func New(cfg *ServerConfig) *Server {
    srv := &Server{
        cfg:   cfg,
        start: make(chan struct{}),
    }   
    r := createRouter(srv)
    srv.router = r
    returnsrv
}

这里建立了一个server结构体:

1
2
3
4
5
6
7
type Server struct{
    daemon  *daemon.Daemon
    cfg     *ServerConfig
    router  *mux.Router
    start   chan struct{}
    servers []serverCloser
}

同时这个server结构体的router被赋值了一个router结构体,这里的router是什么呢?

熟悉OpenStack中使用频率特别高的routes组件的同学应该能猜到了,这里的router就是决定了一个URL最终映射到什么处理函数的一个路由。我们可以看到createRoute的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// we keep enableCors just for legacy usage, need to be removed in the future
func createRouter(s *Server) *mux.Router {
    r := mux.NewRouter()
    ifos.Getenv("DEBUG") != ""{
        ProfilerSetup(r, "/debug/")
    }
    m := map[string]map[string]HttpApiFunc{
        "GET": {
            "/_ping":                          s.ping,
            ......
        },
        "POST": {
            "/auth":                         s.postAuth,
            ......
        },
        "DELETE": {
            "/containers/{name:.*}": s.deleteContainers,
            ......
        },
        "OPTIONS": {
            "": s.optionsHandler,
        },
    }
 
    // If "api-cors-header" is not given, but "api-enable-cors" is true, we set cors to "*"
    // otherwise, all head values will be passed to HTTP handler
    corsHeaders := s.cfg.CorsHeaders
    ifcorsHeaders == ""&& s.cfg.EnableCors {
        corsHeaders = "*"
    }
 
    formethod, routes := range m {
        forroute, fct := range routes {
            logrus.Debugf("Registering %s, %s", method, route)
            // NOTE: scope issue, make sure the variables are local and won't be changed
            localRoute := route
            localFct := fct
            localMethod := method
 
            // build the handler function
            f := makeHttpHandler(s.cfg.Logging, localMethod, localRoute, localFct, corsHeaders, version.Version(s.cfg.Version))
 
            // add the new route
            iflocalRoute == ""{
                r.Methods(localMethod).HandlerFunc(f)
            } else{
                r.Path("/v{version:[0-9.]+}"+ localRoute).Methods(localMethod).HandlerFunc(f)
                r.Path(localRoute).Methods(localMethod).HandlerFunc(f)
            }
        }
    }
 
    returnr
}

上面的代码首先建立了一个mux.NewRouter()结构体r,接着我们可以看到一个m的map,其中存放了HTTP请求类型、URL地址以及实际的处理函数的映射关系。然后会将这个map注入到r中,来看下面的这段代码:

1
2
3
4
5
6
7
8
9
10
// build the handler function
f := makeHttpHandler(s.cfg.Logging, localMethod, localRoute, localFct, corsHeaders, version.Version(s.cfg.Version))
 
// add the new route
iflocalRoute == ""{
    r.Methods(localMethod).HandlerFunc(f)
} else{
    r.Path("/v{version:[0-9.]+}"+ localRoute).Methods(localMethod).HandlerFunc(f)
    r.Path(localRoute).Methods(localMethod).HandlerFunc(f)
}

makeHttpHandler实质上是对localFct做了一个wrap,也就是说当apiserver收到一个HTTP请求的时候,makeHttpHandler生成的func会先对这个HTTP请求做一些检查,比如client和server的版本号是否ok等等,然后才会调用localFct:

1
2
3
4
iferr := handlerFunc(version, w, r, mux.Vars(r)); err != nil {
    logrus.Errorf("Handler for %s %s returned error: %s", localMethod, localRoute, err)
    httpError(w, err)
}

接着代码会判断localRoute是否为空字符串,如果是的话就不绑定带版本号的URL了。至于r.Path(localRoute).Methods(localMethod).HandlerFunc(f)做的就是在r中存放这个信息。熟悉Python中routes或者是熟悉tornado的同学应该很容易理解这个过程。之后在看某个方法的具体实现的时候可以来这里找到URL的mapping,然后查看对应函数的实现。

现在我们有了一个server,看下这个server启动时发生了什么。在代码中可以看到:

1
2
3
4
5
6
7
8
go func() {
    iferr := api.ServeApi(flHosts); err != nil {
        logrus.Errorf("ServeAPI error: %v", err)
        serveAPIWait <- err
        return
    }
    serveAPIWait <- nil
}()

这里应该是启动我们的server了,看下其实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// ServeApi loops through all of the protocols sent in to docker and spawns
// off a go routine to setup a serving http.Server for each.
func (s *Server) ServeApi(protoAddrs []string) error {
    var chErrors = make(chan error, len(protoAddrs))
 
    for_, protoAddr := range protoAddrs {
        protoAddrParts := strings.SplitN(protoAddr, "://", 2)
        iflen(protoAddrParts) != 2 {
            returnfmt.Errorf("bad format, expected PROTO://ADDR")
        }
        srv, err := s.newServer(protoAddrParts[0], protoAddrParts[1])
        iferr != nil {
            returnerr
        }
        s.servers = append(s.servers, srv...)
 
        for_, s := range srv {
            logrus.Infof("Listening for HTTP on %s (%s)", protoAddrParts[0], protoAddrParts[1])
            go func(s serverCloser) {
                iferr := s.Serve(); err != nil && strings.Contains(err.Error(), "use of closed network connection") {
                    err = nil
                }
                chErrors <- err
            }(s)
        }
    }
 
    fori := 0; i < len(protoAddrs); i++ {
        err := <-chErrors
        iferr != nil {
            returnerr
        }
    }
 
    returnnil
}

首先根据监听的地址分别调用s.newServer(protoAddrParts[0], protoAddrParts[1])创建一个干活的server,然后后者调用Serve开始服务。s.newServer会根据监听的协议类型建立套接字,然后启动一个HttpServer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
......
    switchproto {
......
    case"tcp":
        l, err := s.initTcpSocket(addr)
        iferr != nil {
            returnnil, err
        }
        ls = append(ls, l)
    default:
        returnnil, fmt.Errorf("Invalid protocol format: %q", proto)
    }
    var res []serverCloser
    for_, l := range ls {
        res = append(res, &HttpServer{
            &http.Server{
                Addr:    addr,
                Handler: s.router,
            },
            l,
        })
    }
    returnres, nil
}

这里HttpServer的结构体为:

1
2
3
4
type HttpServer struct{
    srv *http.Server
    l   net.Listener
}

而http.Server则是go的原生server。

稍微来理一下,我们现在有下面的结构:
* 一个api,这个api是docker实现的server,代表了提供HTTP服务的一个逻辑上的server
* api下面有一个servers数组,每个数组都是实际提供服务的server,之所以有这么多server是因为可能存在多种监听方式或者是监听的地址
* api.servers中的server的最终实现是通过go的http.Server实现的

再来接着看主线代码:

1
2
3
4
5
6
7
8
9
10
11
12
registryService := registry.NewService(registryCfg)
d, err := daemon.NewDaemon(daemonCfg, registryService)
iferr != nil {
    ifpfile != nil {
        iferr := pfile.Remove(); err != nil {
            logrus.Error(err)
        }
    }
    logrus.Fatalf("Error starting daemon: %v", err)
}
 
logrus.Info("Daemon has completed initialization")

这里我们看到了一个registryService以及一个daemon.NewDaemon的函数,registryService目前只是一个保存了配置信息的结构体:

1
2
3
4
5
6
7
8
9
10
11
// NewService returns a new instance of Service ready to be
// installed no an engine.
func NewService(options *Options) *Service {
    return&Service{
        Config: NewServiceConfig(options),
    }
}
......
type Service struct{
    Config *ServiceConfig
}

daemon.NewDaemon比较复杂,我们最后再看。先来继续看api的剩余代码:

1
2
3
// after the daemon is done setting up we can tell the api to start
// accepting connections with specified daemon
api.AcceptConnections(d)
1
2
3
4
5
6
7
8
9
10
11
12
func (s *Server) AcceptConnections(d *daemon.Daemon) {
    // Tell the init daemon we are accepting requests
    s.daemon = d
    s.registerSubRouter()
    go systemd.SdNotify("READY=1")
    // close the lock so the listeners start accepting connections
    select {
    case<-s.start:
    default:
        close(s.start)
    }
}

这里通过channel释放了start的锁。start的锁是在等待呢?上面提到的套接字建立的代码中可以看到类似如下代码:

1
2
3
4
5
case"tcp":
    l, err := s.initTcpSocket(addr)
......
case"unix":
    l, err := sockets.NewUnixSocket(addr, s.cfg.SocketGroup, s.start)

initTcpSocket中也会传递s.start。

apiserver基本上就是这么多东西,其实就是一个带了route的http server。接下来有两大块地方要分析,一个是我们的daemon,还有一个就是请求的真正实现。我们先通过一个简单的请求的实现代码来看下daemon的作用:

1
"/containers/ps":                  s.getContainersJSON,

s.getContainersJSON的实现为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (s *Server) getContainersJSON(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
    iferr := parseForm(r); err != nil {
        returnerr
    }  
         
    config := &daemon.ContainersConfig{
        All:     boolValue(r, "all"),
        Size:    boolValue(r, "size"),
        Since:   r.Form.Get("since"),
        Before:  r.Form.Get("before"),
        Filters: r.Form.Get("filters"),     
    }      
             
    iftmpLimit := r.Form.Get("limit"); tmpLimit != ""{
        limit, err := strconv.Atoi(tmpLimit)
        iferr != nil {
            returnerr
        }
        config.Limit = limit
    }
 
    containers, err := s.daemon.Containers(config)
    iferr != nil {
        returnerr
    }
 
    returnwriteJSON(w, http.StatusOK, containers)
}

可以看到实际上这个请求是由s.daemon完成的。我们上面看到了api其实就是一个路由请求的框架,会把URL路由的s的相关函数上。但是这些函数其实还是空架子,真正干活的是s.daemon。现在对daemon的地位及功能有了大概的认识后,我们接下来的文章会分析daemon的实现以及一些重要请求处理函数的实现。

11-10 07:05