sys_job.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. /*
  2. * @desc:定时任务处理
  3. * @company:云南省奇讯科技有限公司
  4. * @Author: yixiaohu
  5. * @Date: 2021/7/15 9:32
  6. */
  7. package service
  8. import (
  9. comModel "gfast/app/common/model"
  10. "gfast/app/system/dao"
  11. "gfast/app/system/model"
  12. "strings"
  13. "github.com/gogf/gf/container/gset"
  14. "github.com/gogf/gf/errors/gerror"
  15. "github.com/gogf/gf/frame/g"
  16. "github.com/gogf/gf/os/gcron"
  17. "github.com/gogf/gf/util/gconv"
  18. )
  19. type sysJob struct {
  20. }
  21. var SysJob = new(sysJob)
  22. // JobList 获取任务列表
  23. func (s *sysJob) JobList(req *dao.SysJobSearchReq) (total, page int, list []*model.SysJob, err error) {
  24. model := dao.SysJob.M
  25. if req != nil {
  26. if req.Status != "" {
  27. model = model.Where("status", gconv.Int(req.Status))
  28. }
  29. if req.JobGroup != "" {
  30. model = model.Where("job_group", req.JobGroup)
  31. }
  32. if req.JobName != "" {
  33. model = model.Where("job_name like ?", "%"+req.JobName+"%")
  34. }
  35. }
  36. total, err = model.Count()
  37. if err != nil {
  38. g.Log().Error(err)
  39. err = gerror.New("获取总行数失败")
  40. return
  41. }
  42. if req.PageNum == 0 {
  43. req.PageNum = 1
  44. }
  45. page = req.PageNum
  46. if req.PageSize == 0 {
  47. req.PageSize = comModel.PageSize
  48. }
  49. err = model.Page(page, req.PageSize).Order("job_id asc").Scan(&list)
  50. if err != nil {
  51. g.Log().Error(err)
  52. err = gerror.New("获取数据失败")
  53. }
  54. return
  55. }
  56. // GetJobs 获取已开启执行的任务
  57. func (s *sysJob) GetJobs() (jobs []*model.SysJob, err error) {
  58. err = dao.SysJob.Where(dao.SysJob.C.Status, 0).Scan(&jobs)
  59. return
  60. }
  61. func (s *sysJob) AddJob(req *dao.SysJobAddReq) (err error) {
  62. _, err = dao.SysJob.Insert(req)
  63. return
  64. }
  65. func (s *sysJob) GetJobInfoById(id int64) (job *model.SysJob, err error) {
  66. if id == 0 {
  67. err = gerror.New("参数错误")
  68. return
  69. }
  70. err = dao.SysJob.Where("job_id", id).Scan(&job)
  71. if err != nil {
  72. g.Log().Error(err)
  73. }
  74. if job == nil || err != nil {
  75. err = gerror.New("获取任务信息失败")
  76. }
  77. return
  78. }
  79. func (s *sysJob) EditJob(req *dao.SysJobEditReq) error {
  80. _, err := dao.SysJob.FieldsEx(dao.SysJob.C.JobId, dao.SysJob.C.CreateBy).Where(dao.SysJob.C.JobId, req.JobId).
  81. Update(req)
  82. return err
  83. }
  84. // JobStart 启动任务
  85. func (s *sysJob) JobStart(job *model.SysJob) error {
  86. //获取task目录下是否绑定对应的方法
  87. f := TimeTaskList.GetByName(job.InvokeTarget)
  88. if f == nil {
  89. return gerror.New("没有绑定对应的方法")
  90. }
  91. //传参
  92. paramArr := strings.Split(job.JobParams, "|")
  93. TimeTaskList.EditParams(f.FuncName, paramArr)
  94. rs := gcron.Search(job.InvokeTarget)
  95. if rs == nil {
  96. if job.MisfirePolicy == 1 {
  97. t, err := gcron.AddSingleton(job.CronExpression, f.Run, job.InvokeTarget)
  98. if err != nil {
  99. return err
  100. }
  101. if t == nil {
  102. return gerror.New("启动任务失败")
  103. }
  104. } else {
  105. t, err := gcron.AddOnce(job.CronExpression, f.Run, job.InvokeTarget)
  106. if err != nil {
  107. return err
  108. }
  109. if t == nil {
  110. return gerror.New("启动任务失败")
  111. }
  112. }
  113. }
  114. gcron.Start(job.InvokeTarget)
  115. if job.MisfirePolicy == 1 {
  116. job.Status = 0
  117. _, err := dao.SysJob.Where(dao.SysJob.C.JobId, job.JobId).Unscoped().Update(g.Map{
  118. dao.SysJob.C.Status: job.Status,
  119. })
  120. return err
  121. }
  122. return nil
  123. }
  124. // JobStop 停止任务
  125. func (s *sysJob) JobStop(job *model.SysJob) error {
  126. //获取task目录下是否绑定对应的方法
  127. f := TimeTaskList.GetByName(job.InvokeTarget)
  128. if f == nil {
  129. return gerror.New("没有绑定对应的方法")
  130. }
  131. rs := gcron.Search(job.InvokeTarget)
  132. if rs != nil {
  133. gcron.Remove(job.InvokeTarget)
  134. }
  135. job.Status = 1
  136. _, err := dao.SysJob.Where(dao.SysJob.C.JobId, job.JobId).Unscoped().Update(g.Map{
  137. dao.SysJob.C.Status: job.Status,
  138. })
  139. return err
  140. }
  141. //执行任务
  142. func (s *sysJob) JobRun(job *model.SysJob) error {
  143. //可以task目录下是否绑定对应的方法
  144. f := TimeTaskList.GetByName(job.InvokeTarget)
  145. if f == nil {
  146. return gerror.New("当前task目录下没有绑定这个方法")
  147. }
  148. //传参
  149. paramArr := strings.Split(job.JobParams, "|")
  150. TimeTaskList.EditParams(f.FuncName, paramArr)
  151. task, err := gcron.AddOnce("@every 1s", f.Run)
  152. if err != nil || task == nil {
  153. return gerror.New("启动执行失败")
  154. }
  155. return nil
  156. }
  157. // DeleteJobByIds 删除任务
  158. func (s *sysJob) DeleteJobByIds(ids []int) (err error) {
  159. if len(ids) == 0 {
  160. err = gerror.New("参数错误")
  161. return
  162. }
  163. gst := gset.NewFrom(ids)
  164. var jobs []*model.SysJob
  165. jobs, err = s.GetJobs()
  166. if err != nil {
  167. return
  168. }
  169. for _, job := range jobs {
  170. if gst.Contains(int(job.JobId)) {
  171. err = gerror.New("运行中的任务不能删除")
  172. return
  173. }
  174. }
  175. _, err = dao.SysJob.Delete(dao.SysJob.C.JobId+" in (?)", ids)
  176. if err != nil {
  177. g.Log().Error(err)
  178. err = gerror.New("删除失败")
  179. }
  180. return
  181. }