当前位置:   article > 正文

k8s ingress详解(2)_ingresslist

ingresslist

这一篇看一下ingress controller的实现

ingress controller是一个守护进程,部署为一个Kubernetes Pod

ingress controller通过watch ingresses接口,动态更新ingress的资源

官方的ingress controller实现内容:
1.从APIserver拉去配置信息
2.基于golang text/template 模块书写Nginx配置模板
3.重新加载nginx

官方Nginx ingress controller 定义
  1. const (
  2. nginxConf = `
  3. events {
  4. worker_connections 1024;
  5. }
  6. http {
  7. {{range $ing := .Items}}
  8. {{range $rule := $ing.Spec.Rules}}
  9. server {
  10. listen 80;
  11. server_name {{$rule.Host}};
  12. {{ range $path := $rule.HTTP.Paths }}
  13. location {{$path.Path}} {
  14. proxy_set_header Host $host;
  15. proxy_pass http://{{$path.Backend.ServiceName}}.{{$ing.Namespace}}.svc.cluster.local:{{$path.Backend.ServicePort}};
  16. }
  17. {{end}}
  18. }
  19. {{end}}
  20. {{end}}
  21. }`
  22. )
这里通过定义nginxConf常量,实际内容为Nginx的配置模板

#配置加载
  1. func (ngx *Manager) CheckAndReload(cfg config.Configuration, ingressCfg ingress.Configuration) error {
  2. ngx.reloadRateLimiter.Accept()
  3. ngx.reloadLock.Lock()
  4. defer ngx.reloadLock.Unlock()
  5. newCfg, err := ngx.template.Write(cfg, ingressCfg, ngx.testTemplate)
  6. if err != nil {
  7. return fmt.Errorf("failed to write new nginx configuration. Avoiding reload: %v", err)
  8. }
  9. changed, err := ngx.needsReload(newCfg)
  10. if err != nil {
  11. return err
  12. }
  13. if changed {
  14. if err := ngx.shellOut("nginx -s reload"); err != nil {
  15. return fmt.Errorf("error reloading nginx: %v", err)
  16. }
  17. glog.Info("change in configuration detected. Reloading...")
  18. }
  19. return nil
  20. }

CheckAndReload的第一个参数的Nginx的全局配置参数,对于一个ingress controller启动后一般这些参数是不变的,我们关注的ingress的变化
ngx.template.Write(cfg, ingressCfg, ngx.testTemplate)实现了将ingress渲染到Nginx配置文件的过程
ngx.testTemplate用于测试配置文件是否正确,实际就是Nginx -t
ngx.template.Write就是调用的k8s.io\contrib\ingress\controllers\nginx\nginx\template\template.go Template.Write(cfg  config.Configuration,ingressCfg ingress.Configuration,isValidTemplate func([]byte) error) 函数

最终的配置conf 类型为 make(map[string]interface{}),一般的nginx用到的结构有:
  1. type Configuration struct {
  2. Upstreams> []*Upstream
  3. Servers []*Server
  4. TCPUpstreams []*Location
  5. UDPUpstreams []*Location
  6. }
  7. type Upstream struct {
  8. Name string
  9. Backends []UpstreamServer
  10. Secure bool
  11. }
  12. type UpstreamServer struct {
  13. Address string
  14. Port string
  15. MaxFails int
  16. FailTimeout int
  17. }
  18. type Server struct {
  19. Name string
  20. Locations []*Location
  21. SSL bool
  22. SSLCertificate string
  23. SSLCertificateKey string
  24. SSLPemChecksum string
  25. }
  26. type Location struct {
  27. Path string
  28. IsDefBackend bool
  29. Upstream Upstream
  30. Auth auth.Nginx
  31. RateLimit ratelimit.RateLimit
  32. Redirect rewrite.Redirect
  33. SecureUpstream bool
  34. Whitelist ipwhitelist.SourceRange
  35. EnableCORS bool
  36. ExternalAuthURL authreq.Auth
  37. }

那么ingress.Configuration是怎么产生的呢
  1. func (lbc *loadBalancerController) sync(key string) error {
  2. if !lbc.controllersInSync() {
  3. time.Sleep(podStoreSyncedPollPeriod)
  4. return fmt.Errorf("deferring sync till endpoints controller has synced")
  5. }
  6. // by default no custom configuration configmap
  7. cfg := &api.ConfigMap{}
  8. if lbc.nxgConfigMap != "" {
  9. // Search for custom configmap (defined in main args)
  10. var err error
  11. ns, name, _ := parseNsName(lbc.nxgConfigMap)
  12. cfg, err = lbc.getConfigMap(ns, name)
  13. if err != nil {
  14. return fmt.Errorf("unexpected error searching configmap %v: %v", lbc.nxgConfigMap, err)
  15. }
  16. }
  17. ngxConfig := lbc.nginx.ReadConfig(cfg)
  18. ngxConfig.HealthzURL = lbc.defHealthzURL
  19. ings := lbc.ingLister.Store.List()
  20. upstreams, servers := lbc.getUpstreamServers(ngxConfig, ings)
  21. return lbc.nginx.CheckAndReload(ngxConfig, ingress.Configuration{
  22. Upstreams: upstreams,
  23. Servers: servers,
  24. TCPUpstreams: lbc.getTCPServices(),
  25. UDPUpstreams: lbc.getUDPServices(),
  26. })
  27. }

lbc.ingLister.Store.List()获取到了最新ingress配置

那这个函数又是怎么被调用到的
  1. lbc.ingLister.Store, lbc.ingController = framework.NewInformer(
  2. &cache.ListWatch{
  3. ListFunc: ingressListFunc(lbc.client, namespace),
  4. WatchFunc: ingressWatchFunc(lbc.client, namespace),
  5. },
  6. &extensions.Ingress{}, resyncPeriod, ingEventHandler)
通过listwatch机制检测ingress资源
当有增删改等动作时都会调用lbc.syncQueue.enqueue(obj)

lbc.syncQueue实际上是通过NewTaskQueue 函数转变loadBalancerController.sync而来,每次lbc.syncQueue被调用时loadBalancerController.sync都会被调用


  1. func (lbc *loadBalancerController) Run() {
  2. glog.Infof("starting NGINX loadbalancer controller")
  3. go lbc.nginx.Start()
  4. go lbc.ingController.Run(lbc.stopCh)
  5. go lbc.endpController.Run(lbc.stopCh)
  6. go lbc.svcController.Run(lbc.stopCh)
  7. go lbc.secrController.Run(lbc.stopCh)
  8. go lbc.mapController.Run(lbc.stopCh)
  9. go lbc.syncQueue.run(time.Second, lbc.stopCh)
  10. go lbc.ingQueue.run(time.Second, lbc.stopCh)
  11. <-lbc.stopCh
  12. }

go lbc.syncQueue.run(time.Second, lbc.stopCh)实际运行函数为worker,调用了sync
  1. func (t *taskQueue) worker() {
  2. for {
  3. key, quit := t.queue.Get()
  4. if quit {
  5. close(t.workerDone)
  6. return
  7. }
  8. glog.V(3).Infof("syncing %v", key)
  9. if err := t.sync(key.(string)); err != nil {
  10. glog.Warningf("requeuing %v, err %v", key, err)
  11. t.requeue(key.(string))
  12. } else {
  13. t.queue.Forget(key)
  14. }
  15. t.queue.Done(key)
  16. }
  17. }

其他关于secret,configmap,service,endpoint的controller不再赘述

按照官方的nginx ingress controller实现我们可以发现,实际上就是通过api与apiserver与api进行交互,监控configmap secret service endpoint等信息的变化进行加载然后reload的过程。
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/article/detail/40814?site
推荐阅读
相关标签
  

闽ICP备14008679号