当前位置:   article > 正文

hystrix-go 源码分析

hystrix-go核心代码走读

阅读源码的过程,就像是在像武侠小说里阅读武功秘籍一样,分析高手的一招一式,提炼出精髓,来增强自己的内力。 之前的帖子说了一下微服务的雪崩效应和常见的解决方案,太水,没有上代码怎么叫解决方案。github上有很多开源的库来解决雪崩问题,比较出名的是Netflix的开源库hystrix。集流量控制熔断容错等于一身的java语言的库。今天分析的源码库是 hystrix-go,他是hystrix的的go语言版,应该是说简化版本,用很少的代码量实现了主要功能。很推荐朋友们有时间读一读。

使用简单

hystrix的使用是非常简单的,同步执行,直接调用Do方法。

  1. err := hystrix.Do("my_command", func() error {
  2. // talk to other services
  3. return nil
  4. }, func(err error) error {
  5. // do this when services are down
  6. return nil
  7. })
  8. 复制代码

异步执行Go方法,内部实现是启动了一个gorouting,如果想得到自定义方法的数据,需要你传channel来处理数据,或者输出。返回的error也是一个channel

  1. output := make(chan bool, 1)
  2. errors := hystrix.Go("my_command", func() error {
  3. // talk to other services
  4. output <- true
  5. return nil
  6. }, nil)
  7. select {
  8. case out := <-output:
  9. // success
  10. case err := <-errors:
  11. // failure
  12. 复制代码

大概的执行流程图

其实方法DoGo方法内部都是调用了hystrix.GoC方法,只是Do方法处理了异步的过程

  1. func DoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) error {
  2. done := make(chan struct{}, 1)
  3. r := func(ctx context.Context) error {
  4. err := run(ctx)
  5. if err != nil {
  6. return err
  7. }
  8. done <- struct{}{}
  9. return nil
  10. }
  11. f := func(ctx context.Context, e error) error {
  12. err := fallback(ctx, e)
  13. if err != nil {
  14. return err
  15. }
  16. done <- struct{}{}
  17. return nil
  18. }
  19. var errChan chan error
  20. if fallback == nil {
  21. errChan = GoC(ctx, name, r, nil)
  22. } else {
  23. errChan = GoC(ctx, name, r, f)
  24. }
  25. select {
  26. case <-done:
  27. return nil
  28. case err := <-errChan:
  29. return err
  30. }
  31. }
  32. 复制代码

自定义Command配置

在调用Do Go等方法之前我们可以先自定义一些配置

  1. hystrix.ConfigureCommand("mycommand", hystrix.CommandConfig{
  2. Timeout: int(time.Second * 3),
  3. MaxConcurrentRequests: 100,
  4. SleepWindow: int(time.Second * 5),
  5. RequestVolumeThreshold: 30,
  6. ErrorPercentThreshold: 50,
  7. })
  8. err := hystrix.DoC(context.Background(), "mycommand", func(ctx context.Context) error {
  9. // ...
  10. return nil
  11. }, func(i context.Context, e error) error {
  12. // ...
  13. return e
  14. })
  15. 复制代码

我大要说了一下CommandConfig第个字段的意义:

  • Timeout: 执行command的超时时间。默认时间是1000毫秒
  • MaxConcurrentRequests:command的最大并发量 默认值是10
  • SleepWindow:当熔断器被打开后,SleepWindow的时间就是控制过多久后去尝试服务是否可用了。默认值是5000毫秒
  • RequestVolumeThreshold: 一个统计窗口10秒内请求数量。达到这个请求数量后才去判断是否要开启熔断。默认值是20
  • ErrorPercentThreshold:错误百分比,请求数量大于等于RequestVolumeThreshold并且错误率到达这个百分比后就会启动熔断 默认值是50

当然如果不配置他们,会使用默认值

讲完了怎么用,接下来就是分析源码了。我是从下层到上层的顺序分析代码和执行流程

统计控制器

每一个Command都会有一个默认统计控制器,当然也可以添加多个自定义的控制器。 默认的统计控制器DefaultMetricCollector保存着熔断器的所有状态,调用次数失败次数被拒绝次数等等

  1. type DefaultMetricCollector struct {
  2. mutex *sync.RWMutex
  3. numRequests *rolling.Number
  4. errors *rolling.Number
  5. successes *rolling.Number
  6. failures *rolling.Number
  7. rejects *rolling.Number
  8. shortCircuits *rolling.Number
  9. timeouts *rolling.Number
  10. contextCanceled *rolling.Number
  11. contextDeadlineExceeded *rolling.Number
  12. fallbackSuccesses *rolling.Number
  13. fallbackFailures *rolling.Number
  14. totalDuration *rolling.Timing
  15. runDuration *rolling.Timing
  16. }
  17. 复制代码

最主要的还是要看一下rolling.Numberrolling.Number才是状态最终保存的地方 Number保存了10秒内的Buckets数据信息,每一个Bucket的统计时长为1秒

  1. type Number struct {
  2. Buckets map[int64]*numberBucket
  3. Mutex *sync.RWMutex
  4. }
  5. type numberBucket struct {
  6. Value float64
  7. }
  8. 复制代码

字典字段Buckets map[int64]*numberBucket 中的Key保存的是当前时间 可能你会好奇Number是如何保证只保存10秒内的数据的。每一次对熔断器的状态进行修改时,Number都要先得到当前的时间(秒级)的Bucket不存在则创建。

  1. func (r *Number) getCurrentBucket() *numberBucket {
  2. now := time.Now().Unix()
  3. var bucket *numberBucket
  4. var ok bool
  5. if bucket, ok = r.Buckets[now]; !ok {
  6. bucket = &numberBucket{}
  7. r.Buckets[now] = bucket
  8. }
  9. return bucket
  10. }
  11. 复制代码

修改完后去掉10秒外的数据

  1. func (r *Number) removeOldBuckets() {
  2. now := time.Now().Unix() - 10
  3. for timestamp := range r.Buckets {
  4. // TODO: configurable rolling window
  5. if timestamp <= now {
  6. delete(r.Buckets, timestamp)
  7. }
  8. }
  9. }
  10. 复制代码

比如Increment方法,先得到Bucket再删除旧的数据

  1. func (r *Number) Increment(i float64) {
  2. if i == 0 {
  3. return
  4. }
  5. r.Mutex.Lock()
  6. defer r.Mutex.Unlock()
  7. b := r.getCurrentBucket()
  8. b.Value += i
  9. r.removeOldBuckets()
  10. }
  11. 复制代码

统计控制器是最基层和最重要的一个实现,上层所有的执行判断都是基于他的数据进行逻辑处理的

上报执行状态信息

  1. 断路器-->执行-->上报执行状态信息-->保存到相应的Buckets
  2. 复制代码

每一次断路器逻辑的执行都会上报执行过程中的状态,

  1. // ReportEvent records command metrics for tracking recent error rates and exposing data to the dashboard.
  2. func (circuit *CircuitBreaker) ReportEvent(eventTypes []string, start time.Time, runDuration time.Duration) error {
  3. // ...
  4. circuit.mutex.RLock()
  5. o := circuit.open
  6. circuit.mutex.RUnlock()
  7. if eventTypes[0] == "success" && o {
  8. circuit.setClose()
  9. }
  10. var concurrencyInUse float64
  11. if circuit.executorPool.Max > 0 {
  12. concurrencyInUse = float64(circuit.executorPool.ActiveCount()) / float64(circuit.executorPool.Max)
  13. }
  14. select {
  15. case circuit.metrics.Updates <- &commandExecution{
  16. Types: eventTypes,
  17. Start: start,
  18. RunDuration: runDuration,
  19. ConcurrencyInUse: concurrencyInUse,
  20. }:
  21. default:
  22. return CircuitError{Message: fmt.Sprintf("metrics channel (%v) is at capacity", circuit.Name)}
  23. }
  24. return nil
  25. }
  26. 复制代码

circuit.metrics.Updates 这个信道就是处理上报信息的,上报执行状态自信的结构是metricExchange,结构体很简单只有4个字段。要的就是

  • channel字段Updates 他是一个有bufferchannel默认的数量是2000个,所有的状态信息都在他里面
  • metricCollectors字段,就是保存的具体的这个command执行过程中的各种信息
  1. type metricExchange struct {
  2. Name string
  3. Updates chan *commandExecution
  4. Mutex *sync.RWMutex
  5. metricCollectors []metricCollector.MetricCollector
  6. }
  7. type commandExecution struct {
  8. Types []string `json:"types"`
  9. Start time.Time `json:"start_time"`
  10. RunDuration time.Duration `json:"run_duration"`
  11. ConcurrencyInUse float64 `json:"concurrency_inuse"`
  12. }
  13. func newMetricExchange(name string) *metricExchange {
  14. m := &metricExchange{}
  15. m.Name = name
  16. m.Updates = make(chan *commandExecution, 2000)
  17. m.Mutex = &sync.RWMutex{}
  18. m.metricCollectors = metricCollector.Registry.InitializeMetricCollectors(name)
  19. m.Reset()
  20. go m.Monitor()
  21. return m
  22. }
  23. 复制代码

在执行newMetricExchange的时候会启动一个协程 go m.Monitor()去监控Updates的数据,然后上报给metricCollectors 保存执行的信息数据比如前面提到的调用次数失败次数被拒绝次数等等

  1. func (m *metricExchange) Monitor() {
  2. for update := range m.Updates {
  3. // we only grab a read lock to make sure Reset() isn't changing the numbers.
  4. m.Mutex.RLock()
  5. totalDuration := time.Since(update.Start)
  6. wg := &sync.WaitGroup{}
  7. for _, collector := range m.metricCollectors {
  8. wg.Add(1)
  9. go m.IncrementMetrics(wg, collector, update, totalDuration)
  10. }
  11. wg.Wait()
  12. m.Mutex.RUnlock()
  13. }
  14. }
  15. 复制代码

更新调用的是go m.IncrementMetrics(wg, collector, update, totalDuration),里面判断了他的状态

  1. func (m *metricExchange) IncrementMetrics(wg *sync.WaitGroup, collector metricCollector.MetricCollector, update *commandExecution, totalDuration time.Duration) {
  2. // granular metrics
  3. r := metricCollector.MetricResult{
  4. Attempts: 1,
  5. TotalDuration: totalDuration,
  6. RunDuration: update.RunDuration,
  7. ConcurrencyInUse: update.ConcurrencyInUse,
  8. }
  9. switch update.Types[0] {
  10. case "success":
  11. r.Successes = 1
  12. case "failure":
  13. r.Failures = 1
  14. r.Errors = 1
  15. case "rejected":
  16. r.Rejects = 1
  17. r.Errors = 1
  18. // ...
  19. }
  20. // ...
  21. collector.Update(r)
  22. wg.Done()
  23. }
  24. 复制代码

流量控制

hystrix-go对流量控制的代码是很简单的。用了一个简单的令牌算法,能得到令牌的就可以执行后继的工作,执行完后要返还令牌。得不到令牌就拒绝,拒绝后调用用户设置的callback方法,如果没有设置就不执行。 结构体executorPool就是hystrix-go 流量控制的具体实现。字段Max就是每秒最大的并发值。

  1. type executorPool struct {
  2. Name string
  3. Metrics *poolMetrics
  4. Max int
  5. Tickets chan *struct{}
  6. }
  7. 复制代码

在创建executorPool的时候,会根据Max值来创建令牌。Max值如果没有设置会使用默认值10

  1. func newExecutorPool(name string) *executorPool {
  2. p := &executorPool{}
  3. p.Name = name
  4. p.Metrics = newPoolMetrics(name)
  5. p.Max = getSettings(name).MaxConcurrentRequests
  6. p.Tickets = make(chan *struct{}, p.Max)
  7. for i := 0; i < p.Max; i++ {
  8. p.Tickets <- &struct{}{}
  9. }
  10. return p
  11. }
  12. 复制代码

流量控制上报状态

注意一下字段 Metrics 他用于统计执行数量,比如:执行的总数量,最大的并发数 具体的代码就不贴上来了。这个数量也可以显露出,供可视化程序直观的表现出来。

令牌使用完后是需要返还的,返回的时候才会做上面所说的统计工作。

  1. func (p *executorPool) Return(ticket *struct{}) {
  2. if ticket == nil {
  3. return
  4. }
  5. p.Metrics.Updates <- poolMetricsUpdate{
  6. activeCount: p.ActiveCount(),
  7. }
  8. p.Tickets <- ticket
  9. }
  10. func (p *executorPool) ActiveCount() int {
  11. return p.Max - len(p.Tickets)
  12. }
  13. 复制代码

一次Command的执行的流程

上面把 统计控制器流量控制上报执行状态讲完了,主要的实现也就讲的差不多了。最后就是串一次command的执行都经历了啥:

  1. err := hystrix.Do("my_command", func() error {
  2. // talk to other services
  3. return nil
  4. }, func(err error) error {
  5. // do this when services are down
  6. return nil
  7. })
  8. 复制代码

hystrix在执行一次command的前面也有提到过会调用GoC方法,下面我把代码贴出来来,篇幅问题去掉了一些代码,主要逻辑都在。就是在判断断路器是否已打开得到Ticket得不到就限流,执行我们自己的的方法判断context是否Done或者执行是否超时 当然,每次执行结果都要上报执行状态,最后要返还Ticket

  1. func GoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) chan error {
  2. cmd := &command{
  3. run: run,
  4. fallback: fallback,
  5. start: time.Now(),
  6. errChan: make(chan error, 1),
  7. finished: make(chan bool, 1),
  8. }
  9. //得到断路器,不存在则创建
  10. circuit, _, err := GetCircuit(name)
  11. if err != nil {
  12. cmd.errChan <- err
  13. return cmd.errChan
  14. }
  15. //...
  16. // 返还ticket
  17. returnTicket := func() {
  18. // ...
  19. cmd.circuit.executorPool.Return(cmd.ticket)
  20. }
  21. // 上报执行状态
  22. reportAllEvent := func() {
  23. err := cmd.circuit.ReportEvent(cmd.events, cmd.start, cmd.runDuration)
  24. // ...
  25. }
  26. go func() {
  27. defer func() { cmd.finished <- true }()
  28. // 查看断路器是否已打开
  29. if !cmd.circuit.AllowRequest() {
  30. // ...
  31. returnOnce.Do(func() {
  32. returnTicket()
  33. cmd.errorWithFallback(ctx, ErrCircuitOpen)
  34. reportAllEvent()
  35. })
  36. return
  37. }
  38. // ...
  39. // 获取ticket 如果得不到就限流
  40. select {
  41. case cmd.ticket = <-circuit.executorPool.Tickets:
  42. ticketChecked = true
  43. ticketCond.Signal()
  44. cmd.Unlock()
  45. default:
  46. // ...
  47. returnOnce.Do(func() {
  48. returnTicket()
  49. cmd.errorWithFallback(ctx, ErrMaxConcurrency)
  50. reportAllEvent()
  51. })
  52. return
  53. }
  54. // 执行我们自已的方法,并上报执行信息
  55. returnOnce.Do(func() {
  56. defer reportAllEvent()
  57. cmd.runDuration = time.Since(runStart)
  58. returnTicket()
  59. if runErr != nil {
  60. cmd.errorWithFallback(ctx, runErr)
  61. return
  62. }
  63. cmd.reportEvent("success")
  64. })
  65. }()
  66. // 等待context是否被结束,或执行者超时,并上报
  67. go func() {
  68. timer := time.NewTimer(getSettings(name).Timeout)
  69. defer timer.Stop()
  70. select {
  71. case <-cmd.finished:
  72. // returnOnce has been executed in another goroutine
  73. case <-ctx.Done():
  74. // ...
  75. return
  76. case <-timer.C:
  77. // ...
  78. }
  79. }()
  80. return cmd.errChan
  81. }
  82. 复制代码

dashboard 可视化hystrix的上报信息

代码中StreamHandler就是把所有断路器的状态以流的方式不断的推送到dashboard. 这部分代码我就不用说了,很简单。 需要在你的服务端加3行代码,启动我们的流服务

  1. hystrixStreamHandler := hystrix.NewStreamHandler()
  2. hystrixStreamHandler.Start()
  3. go http.ListenAndServe(net.JoinHostPort("", "81"), hystrixStreamHandler)
  4. 复制代码

dashboard我使用的是docker版。

  1. docker run -d -p 8888:9002 --name hystrix-dashboard mlabouardy/hystrix-dashboard:latest
  2. 复制代码

在下面输入你服务的地址,我是 http://192.168.1.67:81/hystrix.stream

如果是集群可以使用Turbine进行监控,有时间大家自己来看吧

转载于:https://juejin.im/post/5d09b7b9f265da1b6836c065

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号