赞
踩
这一篇看一下ingress controller的实现
ingress controller通过watch ingresses接口,动态更新ingress的资源
官方的ingress controller实现内容:这里通过定义nginxConf常量,实际内容为Nginx的配置模板
const ( nginxConf = ` events { worker_connections 1024; } http { {{range $ing := .Items}} {{range $rule := $ing.Spec.Rules}} server { listen 80; server_name {{$rule.Host}}; {{ range $path := $rule.HTTP.Paths }} location {{$path.Path}} { proxy_set_header Host $host; proxy_pass http://{{$path.Backend.ServiceName}}.{{$ing.Namespace}}.svc.cluster.local:{{$path.Backend.ServicePort}}; } {{end}} } {{end}} {{end}} }` )
func (ngx *Manager) CheckAndReload(cfg config.Configuration, ingressCfg ingress.Configuration) error { ngx.reloadRateLimiter.Accept() ngx.reloadLock.Lock() defer ngx.reloadLock.Unlock() newCfg, err := ngx.template.Write(cfg, ingressCfg, ngx.testTemplate) if err != nil { return fmt.Errorf("failed to write new nginx configuration. Avoiding reload: %v", err) } changed, err := ngx.needsReload(newCfg) if err != nil { return err } if changed { if err := ngx.shellOut("nginx -s reload"); err != nil { return fmt.Errorf("error reloading nginx: %v", err) } glog.Info("change in configuration detected. Reloading...") } return nil }
type Configuration struct { Upstreams> []*Upstream Servers []*Server TCPUpstreams []*Location UDPUpstreams []*Location } type Upstream struct { Name string Backends []UpstreamServer Secure bool } type UpstreamServer struct { Address string Port string MaxFails int FailTimeout int } type Server struct { Name string Locations []*Location SSL bool SSLCertificate string SSLCertificateKey string SSLPemChecksum string } type Location struct { Path string IsDefBackend bool Upstream Upstream Auth auth.Nginx RateLimit ratelimit.RateLimit Redirect rewrite.Redirect SecureUpstream bool Whitelist ipwhitelist.SourceRange EnableCORS bool ExternalAuthURL authreq.Auth }
func (lbc *loadBalancerController) sync(key string) error { if !lbc.controllersInSync() { time.Sleep(podStoreSyncedPollPeriod) return fmt.Errorf("deferring sync till endpoints controller has synced") } // by default no custom configuration configmap cfg := &api.ConfigMap{} if lbc.nxgConfigMap != "" { // Search for custom configmap (defined in main args) var err error ns, name, _ := parseNsName(lbc.nxgConfigMap) cfg, err = lbc.getConfigMap(ns, name) if err != nil { return fmt.Errorf("unexpected error searching configmap %v: %v", lbc.nxgConfigMap, err) } } ngxConfig := lbc.nginx.ReadConfig(cfg) ngxConfig.HealthzURL = lbc.defHealthzURL ings := lbc.ingLister.Store.List() upstreams, servers := lbc.getUpstreamServers(ngxConfig, ings) return lbc.nginx.CheckAndReload(ngxConfig, ingress.Configuration{ Upstreams: upstreams, Servers: servers, TCPUpstreams: lbc.getTCPServices(), UDPUpstreams: lbc.getUDPServices(), }) }
- lbc.ingLister.Store, lbc.ingController = framework.NewInformer(
- &cache.ListWatch{
- ListFunc: ingressListFunc(lbc.client, namespace),
- WatchFunc: ingressWatchFunc(lbc.client, namespace),
- },
- &extensions.Ingress{}, resyncPeriod, ingEventHandler)
通过listwatch机制检测ingress资源
lbc.syncQueue实际上是通过NewTaskQueue 函数转变loadBalancerController.sync而来,每次lbc.syncQueue被调用时loadBalancerController.sync都会被调用
- func (lbc *loadBalancerController) Run() {
- glog.Infof("starting NGINX loadbalancer controller")
- go lbc.nginx.Start()
-
- go lbc.ingController.Run(lbc.stopCh)
- go lbc.endpController.Run(lbc.stopCh)
- go lbc.svcController.Run(lbc.stopCh)
- go lbc.secrController.Run(lbc.stopCh)
- go lbc.mapController.Run(lbc.stopCh)
-
- go lbc.syncQueue.run(time.Second, lbc.stopCh)
- go lbc.ingQueue.run(time.Second, lbc.stopCh)
-
- <-lbc.stopCh
- }
func (t *taskQueue) worker() { for { key, quit := t.queue.Get() if quit { close(t.workerDone) return } glog.V(3).Infof("syncing %v", key) if err := t.sync(key.(string)); err != nil { glog.Warningf("requeuing %v, err %v", key, err) t.requeue(key.(string)) } else { t.queue.Forget(key) } t.queue.Done(key) } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。