当前位置:   article > 正文

读猿码系列——5.解析Golang常用定时任务库gron和cron_@daily cron golang

@daily cron golang

在实际开发环境中,我们经常会接触到定时任务的概念,比如每6个月清理一次历史日志,每天0点推送卡片消息或者每天凌晨2点重启服务等多种场景。在Linux系统中用crontab就可以搞定,你只需要简单的语法控制就能实现定时的语义,具体用法可以参考下在线工具:https://crontab.guru/。

更形象一点表示就是:

  1. ┌───────────── minute (0 - 59)
  2. │ ┌───────────── hour (0 - 23)
  3. │ │ ┌───────────── day of the month (1 - 31)
  4. │ │ │ ┌───────────── month (1 - 12 or JAN-DEC)
  5. │ │ │ │ ┌───────────── day of the week (0 - 6 or SUN-SAT, Sunday=0 or 7)
  6. │ │ │ │ │
  7. │ │ │ │ │
  8. │ │ │ │ │
  9. * * * * *

其中(*)表示所有字段的可能值;(,)表示指定值列表;(-)表示指定值范围;(/)表示指定步长值。

crontab是Linux操作系统级别工具,如果定时任务失败或者压根没有启动,crontab是没办法通知提醒开发者的。在golang开源库中有两个比较常用且方便上手的库,就是今天要和大家介绍的gron和cron。

gron

开源地址:

https://github.com/roylee0704/gron

首先使用go get安装依赖:

go get github.com/roylee0704/gron

我们先来通过官方给出的quick start简单体验下使用方法:

  1. package main
  2. import (
  3.   "fmt"
  4.   "time"
  5.   "github.com/roylee0704/gron"
  6. )
  7. func main() {
  8.   c := gron.New()
  9.   c.AddFunc(gron.Every(1*time.Hour), func() {
  10.     fmt.Println("runs every hour.")
  11.   })
  12.   c.Start()
  13. }

实现的效果是每小时在终端上打印出runs every hour.

我们跟到New()方法中看下它的源码实现如下,可以看到在New()方法之后返回的是一个指向Cron对象的指针,其中为stop和add这两个channel做了初始化。

  1. type Cron struct {
  2.   entries []*Entry // 记录一组定时任务
  3.   running bool // 标识这个cron是否已经启动
  4.   add     chan *Entry //是一个channel,用于在Cron启动后新增定时任务 
  5.   stop    chan struct{} // 是一个channel,是个空结构体,用来控制Cron停止
  6. }
  7. // New instantiates new Cron instant c.
  8. func New() *Cron {
  9.   return &Cron{
  10.     stop: make(chan struct{}),
  11.     add:  make(chan *Entry),
  12.   }
  13. }

再跟到Entry中,我们看到一句:Entry consists of a schedule and the job to be executed on that schedule.

  1. type Entry struct {
  2.   Schedule Schedule
  3.   Job      Job
  4.   // the next time the job will run. This is zero time if Cron has not been
  5.   // started or invalid schedule.
  6.   Next time.Time
  7.   // the last time the job was run. This is zero time if the job has not been
  8.   // run.
  9.   Prev time.Time
  10. }

对应的两个接口类型Schedule和Job:

  1. type Schedule interface {
  2.   Next(t time.Timetime.Time
  3. }
  4. type Job interface {
  5.   Run()
  6. }

Schedule代表具体的定时策略,它包含一个Next()方法,接受一个时间点,业务要返回下一次触发调度的时间点。

Job是对定时任务的抽象,只需要实现Run()方法即可。

接着看回我们的quick start,之后出现的是AddFunc()方法,其中加入gron.Every(2 * time.Second)一个简单的定时任务。我们跟到AddFunc()方法看下:

  1. type JobFunc func()
  2. // Run calls j()
  3. func (j JobFunc) Run() {
  4.   j()
  5. }
  6. // AddFunc registers the Job function for the given Schedule.
  7. func (c *Cron) AddFunc(s Schedule, j func()) {
  8.   c.Add(s, JobFunc(j))
  9. }

我们发现它的核心方法是Add,至此整个流程是用户传入一个func(),它在内部会被转化为JobFunc,即实现了刚刚提到的Job接口。如果Cron示例未启动,就加入到entries定时任务列表中,在启动后被处理;否则放到add这个channel中,进行额外新增的调度流程。

  1. func (c *Cron) Add(s Schedule, j Job) {
  2.   entry := &Entry{
  3.     Schedule: s,
  4.     Job:      j,
  5.   }
  6.   if !c.running {
  7.     c.entries = append(c.entries, entry)
  8.     return
  9.   }
  10.   c.add <- entry
  11. }

最后是去启动Cron,即c.Start()我们也跟到源码中看看:

  1. // Start signals cron instant c to get up and running.
  2. func (c *Cron) Start() {
  3.   c.running = true
  4.   go c.run()
  5. }

Start()方法执行时先将running置为true,用来标识实例已启动,然后启动一个goroutine来实际跑启动的逻辑。

另外在Stop()方法中将running置为false,标识实例停止,然后向stop这个channel中放入一个空结构体。

  1. // Stop halts cron instant c from running.
  2. func (c *Cron) Stop() {
  3.   if !c.running {
  4.     return
  5.   }
  6.   c.running = false
  7.   c.stop <- struct{}{}
  8. }

再来看看c.run()中发生了什么:

  1. func (c *Cron) run() {
  2.   var effective time.Time
  3.   now := time.Now().Local()
  4.   // to figure next trig time for entries, referenced from now
  5.   for _, e := range c.entries {
  6.     e.Next = e.Schedule.Next(now)
  7.   }
  8.   for {
  9.     sort.Sort(byTime(c.entries))
  10.     if len(c.entries) > 0 {
  11.       effective = c.entries[0].Next
  12.     } else {
  13.       effective = now.AddDate(1500// to prevent phantom jobs.
  14.     }
  15.     select {
  16.     case now = <-after(effective.Sub(now)):
  17.       // entries with same time gets run.
  18.       for _, entry := range c.entries {
  19.         if entry.Next != effective {
  20.           break
  21.         }
  22.         entry.Prev = now
  23.         entry.Next = entry.Schedule.Next(now)
  24.         go entry.Job.Run()
  25.       }
  26.     case e := <-c.add:
  27.       e.Next = e.Schedule.Next(time.Now())
  28.       c.entries = append(c.entries, e)
  29.     case <-c.stop:
  30.       return // terminate go-routine.
  31.     }
  32.   }
  • 首先拿到当前时区时间now;

  • 循环entries定时任务列表,根据now计算出下一次定时任务触发时间;

  • 将任务列表根据时间sort排序;

  • 拿到最近要到期的时间点,在select中通过time.After监听;到点了就新启动一个goroutine跑对应entry中的Job,并回到for循环,继续重新根据时间排序,再走同样的流程;

  • 如果add channel中有新的Entry被加进来,就放到entries定时任务列表中,触发新的sort;

  • 如果stop channel中收到信号,直接返回,结束执行。

整个流程还是比较简单的,值得我们学习的是Cron中控制退出的写法。因为停止只需要一个信号,核心逻辑使用for+select格式,并向stop channel中传入空结构体,还能大大节省内存。核心代码如下:

  1. type Cron struct {
  2.  stop    chan struct{}
  3. }
  4. func (c *Cron) Stop() {
  5.  c.stop <- struct{}{}
  6. }
  7. func (c *Cron) run() {
  8.  for {
  9.   select {
  10.   case <-c.stop:
  11.    return // terminate go-routine.
  12.   }
  13.  }
  14. }

好的,到此我们通过官方quick start的示例深入源码了解了gron库的执行流程,还有一些时间格式及自定义定时任务的使用方法我放到了gitlab上,这里就不再赘述了。

https://gitlab.com/893376179/daily-golang-package/-/tree/main/crontab

下面来重点看下cron,由于gron代码很简洁,功能也相对简单,适合用来学习,但作者在6年前已经停止维护,两者也是大同小异。如果有定时任务需求,还是建议使用cron。


robfig/cron

开源地址:

https://github.com/robfig/cron

首先使用go get安装依赖:

go get -u github.com/robfig/cron/v3

我们还是先通过官方给出的quick start简单体验下使用方法:

  1. package main
  2. import (
  3.  "fmt"
  4.  "time"
  5.  "github.com/robfig/cron/v3"
  6. )
  7. func main() {
  8.  c := cron.New()
  9.  c.AddFunc("@every 1s", func() {
  10.   fmt.Println("tick every 1 second")
  11.  })
  12.  c.Start()
  13.  time.Sleep(5 * time.Second)
  14. }
  15. // tick every 1 second
  16. // tick every 1 second
  17. // tick every 1 second
  18. // tick every 1 second
  19. // tick every 1 second

实现的效果就是每秒打印一次 tick every 1 second

cron支持固定时间间隔,像是示例中的@every 1s,意为每隔固定时间触发一次,例如2h30m30s。还支持以下几种时间格式:

  1. package main
  2. import (
  3.  "fmt"
  4.  "time"
  5.  "github.com/robfig/cron/v3"
  6. )
  7. func main() {
  8.  c := cron.New()
  9.  c.AddFunc("30 * * * *"func() {
  10.   fmt.Println("Every hour on the half hour")
  11.  })
  12.  c.AddFunc("30 3-6,20-23 * * *"func() {
  13.   fmt.Println("On the half hour of 3-6am, 8-11pm")
  14.  })
  15.  c.AddFunc("0 0 1 1 *"func() {
  16.   fmt.Println("Jan 1 every year")
  17.  })
  18.  c.AddFunc("@hourly"func() {
  19.   fmt.Println("Every hour")
  20.  })
  21.  c.AddFunc("@daily"func() {
  22.   fmt.Println("Every day")
  23.  })
  24.  c.AddFunc("@weekly"func() {
  25.   fmt.Println("Every week")
  26.  })
  27.  c.Start()
  28.  for {
  29.   time.Sleep(time.Second)
  30.  }
  31. }

可以看到它和Linux中的crontab命令语法相似,用5个空格分割的域来表示时间,其中分别表示Minutes、Hours、Day of month、Month、Day of week。另外还可以预定义时间规则,比如@yearly表示每年第一天的 0 点;@monthly表示每月第一天的 0 点;@hourly表示每小时的开始。

我们也可以指定时区,根据不同时区设置不同定时任务:

  1. package main
  2. import (
  3.  "fmt"
  4.  "time"
  5.  "github.com/robfig/cron/v3"
  6. )
  7. func main() {
  8.  nyc, _ := time.LoadLocation("America/New_York")
  9.  c := cron.New(cron.WithLocation(nyc))
  10.  c.AddFunc("0 6 * * ?"func() {
  11.   fmt.Println("Every 6 o'clock at New York")
  12.  })
  13.  c.AddFunc("CRON_TZ=Asia/Tokyo 0 6 * * ?"func() {
  14.   fmt.Println("Every 6 o'clock at Tokyo")
  15.  })
  16.  c.Start()
  17.  for {
  18.   time.Sleep(time.Second)
  19.  }
  20. }

cron同gron一样,它也支持Job接口:

  1. // cron.go
  2. type Job interface {
  3.   Run()
  4. }

我们需要自定义实现接口Job的结构体,完成它的Run()方法即可:

  1. package main
  2. import (
  3.  "fmt"
  4.  "time"
  5.  "github.com/robfig/cron/v3"
  6. )
  7. type GreetingJob struct {
  8.  Msg string
  9. }
  10. func (g GreetingJob) Run() {
  11.  fmt.Println("Hello " + g.Msg)
  12. }
  13. func main() {
  14.  c := cron.New()
  15.  c.AddJob("@every 1s", GreetingJob{"wolrd"})
  16.  c.Start()
  17.  time.Sleep(3 * time.Second)
  18. }
  19. // Hello world
  20. // Hello world
  21. // Hello world

cron对象的AddJob()方法将GreetingJob对象添加到定时管理器中。在AddFunc()方法中,将传入的回调转为FuncJob类型,然后调用AddJob()方法:

  1. func (c *Cron) AddFunc(spec string, cmd func()) (EntryID, error) {
  2.   return c.AddJob(spec, FuncJob(cmd))
  3. }

cron对象创建不仅有上述提到的指定时区,还可以使用自定义解析器,对这部分感兴趣可以到官方库使用文档中看看。除此之外cron还提供了WithLogger和WithChain两种选项。

WithLogger可以设置cron内部使用我们自定义的Logger:

  1. package main
  2. import (
  3.  "fmt"
  4.  "log"
  5.  "os"
  6.  "time"
  7.  "github.com/robfig/cron/v3"
  8. )
  9. func main() {
  10.  c := cron.New(
  11.   cron.WithLogger(
  12.    cron.VerbosePrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags))))
  13.  c.AddFunc("@every 2s", func() {
  14.   fmt.Println("hello world")
  15.  })
  16.  c.Start()
  17.  time.Sleep(5 * time.Second)
  18. }
  19. // cron: 2022/10/11 19:13:05 start
  20. // cron: 2022/10/11 19:13:05 schedule, now=2022-10-11T19:13:05+08:00, entry=1next=2022-10-11T19:13:07+08:00
  21. // cron: 2022/10/11 19:13:07 wake, now=2022-10-11T19:13:07+08:00
  22. // cron: 2022/10/11 19:13:07 run, now=2022-10-11T19:13:07+08:00, entry=1next=2022-10-11T19:13:09+08:00
  23. // hello world
  24. // cron: 2022/10/11 19:13:09 wake, now=2022-10-11T19:13:09+08:00
  25. // hello world
  26. // cron: 2022/10/11 19:13:09 run, now=2022-10-11T19:13:09+08:00, entry=1next=2022-10-11T19:13:11+08:00

WithChain可以在执行实际的Job前后添加一些逻辑:比如捕获panic、如果上次运行还未结束,推迟/跳过本次执行、记录每个Job执行情况。实际上就是在Job的执行逻辑外在封装一层逻辑得到JobWrapper。

  1. // chain.go
  2. type JobWrapper func(JobJob

然后使用一个Chain对象将这些JobWrapper组合到一起,调用Chain对象的Then(job)方法应用这些JobWrapper,返回最终的Job。

  1. type Chain struct {
  2.   wrappers []JobWrapper
  3. }
  4. func NewChain(c ...JobWrapper) Chain {
  5.   return Chain{c}
  6. }
  7. func (c Chain) Then(j Job) Job {
  8.   for i := range c.wrappers {
  9.     j = c.wrappers[len(c.wrappers)-i-1](j)
  10.   }
  11.   return j
  12. }

继续来看刚刚提到的那三种JobWrapper的方法:

Recover:捕获内部Job产生的 panic

  1. package main
  2. import (
  3.  "fmt"
  4.  "time"
  5.  "github.com/robfig/cron/v3"
  6. )
  7. type panicJob struct {
  8.  count int
  9. }
  10. func (job *panicJob) Run() {
  11.  job.count++
  12.  if job.count == 1 {
  13.   panic("oooooooooops!")
  14.  }
  15.  fmt.Println("hello world")
  16. }
  17. func main() {
  18.  c := cron.New()
  19.  c.AddJob("@every 1s", cron.NewChain(cron.Recover(cron.DefaultLogger)).Then(&panicJob{}))
  20.  c.Start()
  21.  time.Sleep(5 * time.Second)
  22. }
  23. // cron: 2022/10/11 21:08:21 panic, error=oooooooooops!, stack=...
  24. // goroutine 7 [running]:
  25. // github.com/robfig/cron/v3.Recover.func1.1.1()
  26. //  /Users/apple/go/pkg/mod/github.com/robfig/cron/v3@v3.0.1/chain.go:45 +0x85
  27. // panic({0x10a69a00x10d8a80})
  28. //  /usr/local/Cellar/go/1.17.5/libexec/src/runtime/panic.go:1038 +0x215
  29. // main.(*panicJob).Run(0xedad761c4)
  30. //  /Users/apple/Desktop/daily-golang-package/crontab/cron/jobWrapper/recover/recover.go:17 +0x85
  31. // github.com/robfig/cron/v3.Recover.func1.1()
  32. //  /Users/apple/go/pkg/mod/github.com/robfig/cron/v3@v3.0.1/chain.go:53 +0x73
  33. // github.com/robfig/cron/v3.FuncJob.Run(0x0)
  34. //  /Users/apple/go/pkg/mod/github.com/robfig/cron/v3@v3.0.1/cron.go:136 +0x1a
  35. // github.com/robfig/cron/v3.(*Cron).startJob.func1()
  36. //  /Users/apple/go/pkg/mod/github.com/robfig/cron/v3@v3.0.1/cron.go:312 +0x6a
  37. // created by github.com/robfig/cron/v3.(*Cron).startJob
  38. //  /Users/apple/go/pkg/mod/github.com/robfig/cron/v3@v3.0.1/cron.go:310 +0xb2
  39. // hello world
  40. // hello world
  41. // hello world
  42. // hello world

DelayIfStillRunning:触发时,如果上一次任务还未执行完成(耗时太长),则等待上一次任务完成之后再执行

  1. package main
  2. import (
  3.  "log"
  4.  "time"
  5.  "github.com/robfig/cron/v3"
  6. )
  7. type delayJob struct {
  8.  count int
  9. }
  10. func (job *delayJob) Run() {
  11.  time.Sleep(2 * time.Second)
  12.  job.count++
  13.  log.Printf("%d: hello world\n", job.count)
  14. }
  15. func main() {
  16.  c := cron.New()
  17.  c.AddJob("@every 1s", cron.NewChain(cron.DelayIfStillRunning(cron.DefaultLogger)).Then(&delayJob{}))
  18.  c.Start()
  19.  time.Sleep(10 * time.Second)
  20. }
  21. // 2022/10/11 21:22:18 1: hello world
  22. // 2022/10/11 21:22:20 2: hello world
  23. // 2022/10/11 21:22:22 3: hello world
  24. // 2022/10/11 21:22:24 4: hello world
  25. package main
  26. import (
  27.  "log"
  28.  "time"
  29.  "github.com/robfig/cron/v3"
  30. )
  31. type delayJob struct {
  32.  count int
  33. }
  34. func (job *delayJob) Run() {
  35.  time.Sleep(2 * time.Second)
  36.  job.count++
  37.  log.Printf("%d: hello world\n", job.count)
  38. }
  39. func main() {
  40.  c := cron.New()
  41.  c.AddJob("@every 1s", cron.NewChain(cron.DelayIfStillRunning(cron.DefaultLogger)).Then(&delayJob{}))
  42.  c.Start()
  43.  time.Sleep(10 * time.Second)
  44. }
  45. // 2022/10/11 21:22:18 1: hello world
  46. // 2022/10/11 21:22:20 2: hello world
  47. // 2022/10/11 21:22:22 3: hello world
  48. // 2022/10/11 21:22:24 4: hello world

  1. func DelayIfStillRunning(logger Logger) JobWrapper {
  2.  return func(j Job) Job {
  3.   var mu sync.Mutex
  4.   return FuncJob(func() {
  5.    start := time.Now()
  6.    mu.Lock()
  7.    defer mu.Unlock()
  8.    if dur := time.Since(start); dur > time.Minute {
  9.     logger.Info("delay""duration", dur)
  10.    }
  11.    j.Run()
  12.   })
  13.  }
  14. }

首先定义一个互斥锁sync.Mutex,记录当前时间并获取锁,如果上一个任务还未结束就一直持有锁,直到上一个执行结束,锁才会被释放,保证了任务被串行执行。

SkipIfStillRunning:触发时,如果上一次任务还未完成,则跳过此次执行

  1. package main
  2. import (
  3.  "log"
  4.  "sync/atomic"
  5.  "time"
  6.  "github.com/robfig/cron/v3"
  7. )
  8. type skipJob struct {
  9.  count int32
  10. }
  11. func (job *skipJob) Run() {
  12.  atomic.AddInt32(&job.count1)
  13.  log.Printf("%d: hello world\n", job.count)
  14.  if atomic.LoadInt32(&job.count== 1 {
  15.   time.Sleep(2 * time.Second)
  16.  }
  17. }
  18. func main() {
  19.  c := cron.New()
  20.  c.AddJob("@every 1s", cron.NewChain(cron.SkipIfStillRunning(cron.DefaultLogger)).Then(&skipJob{}))
  21.  c.Start()
  22.  time.Sleep(10 * time.Second)
  23. }
  24. // 2022/10/11 21:29:41 1: hello world
  25. // 2022/10/11 21:29:44 2: hello world
  26. // 2022/10/11 21:29:45 3: hello world
  27. // 2022/10/11 21:29:46 4: hello world
  28. // 2022/10/11 21:29:47 5: hello world
  29. // 2022/10/11 21:29:48 6: hello world
  30. // 2022/10/11 21:29:49 7: hello world
  31. // 2022/10/11 21:29:50 8: hello world

我们跟到源码里看下这个方法是如何实现的:

  1. func SkipIfStillRunning(logger Logger) JobWrapper {
  2.  return func(j Job) Job {
  3.   var ch = make(chan struct{}, 1)
  4.   ch <- struct{}{}
  5.   return FuncJob(func() {
  6.    select {
  7.    case v := <-ch:
  8.     j.Run()
  9.     ch <- v
  10.    default:
  11.     logger.Info("skip")
  12.    }
  13.   })
  14.  }
  15. }

定义一个缓存大小为1的channel,初始发送空结构体保证第一个任务正常执行。在执行任务时从channel中取值,如果成功,执行任务并向chennel中发送下一个值,否则跳过。

对于gron和cron这两个定时任务相关的常用库,其实现相对简单且优雅,有兴趣的朋友可以去学习下!本文涉及的全部代码我放到了git上。之后再看到有意思的常用库也会放到对应目录下。日拱一卒,感谢你的阅读!

https://gitlab.com/893376179/daily-golang-package/-/tree/main/

参考

https://zhuanlan.zhihu.com/p/343895819

https://juejin.cn/post/7132715360293716004

https://darjun.github.io/2020/06/25/godailylib/cron

我的个人公众号,“才浅coding攻略”,这里可以第一时间收到推送,期待你的关注和催更! 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/article/detail/42803
推荐阅读
相关标签
  

闽ICP备14008679号