sys_job.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. /*
  2. * @desc:定时任务处理
  3. * @company:云南省奇讯科技有限公司
  4. * @Author: yixiaohu
  5. * @Date: 2021/7/15 9:32
  6. */
  7. package service
  8. import (
  9. comModel "gfast/app/common/model"
  10. "gfast/app/system/dao"
  11. "gfast/app/system/model"
  12. "strings"
  13. "github.com/gogf/gf/container/gset"
  14. "github.com/gogf/gf/errors/gerror"
  15. "github.com/gogf/gf/frame/g"
  16. "github.com/gogf/gf/os/gcron"
  17. "github.com/gogf/gf/util/gconv"
  18. )
  19. type sysJob struct {
  20. }
  21. var SysJob = new(sysJob)
  22. // JobList 获取任务列表
  23. func (s *sysJob) JobList(req *dao.SysJobSearchReq) (total, page int, list []*model.SysJob, err error) {
  24. model := dao.SysJob.M
  25. if req != nil {
  26. if req.Status != "" {
  27. model = model.Where("status", gconv.Int(req.Status))
  28. }
  29. if req.JobGroup != "" {
  30. model = model.Where("job_group", req.JobGroup)
  31. }
  32. if req.JobName != "" {
  33. model = model.Where("job_name like ?", "%"+req.JobName+"%")
  34. }
  35. }
  36. total, err = model.Count()
  37. if err != nil {
  38. g.Log().Error(err)
  39. err = gerror.New("获取总行数失败")
  40. return
  41. }
  42. if req.PageNum == 0 {
  43. req.PageNum = 1
  44. }
  45. page = req.PageNum
  46. if req.PageSize == 0 {
  47. req.PageSize = comModel.PageSize
  48. }
  49. err = model.Page(page, req.PageSize).Order("job_id asc").Scan(&list)
  50. if err != nil {
  51. g.Log().Error(err)
  52. err = gerror.New("获取数据失败")
  53. }
  54. return
  55. }
  56. // GetJobs 获取已开启执行的任务
  57. func (s *sysJob) GetJobs() (jobs []*model.SysJob, err error) {
  58. err = dao.SysJob.Where(dao.SysJob.C.Status, 0).Scan(&jobs)
  59. return
  60. }
  61. func (s *sysJob) AddJob(req *dao.SysJobAddReq) (err error) {
  62. _, err = dao.SysJob.Insert(req)
  63. return
  64. }
  65. func (s *sysJob) GetJobInfoById(id int64) (job *model.SysJob, err error) {
  66. if id == 0 {
  67. err = gerror.New("参数错误")
  68. return
  69. }
  70. err = dao.SysJob.Where("job_id", id).Scan(&job)
  71. if err != nil {
  72. g.Log().Error(err)
  73. }
  74. if job == nil || err != nil {
  75. err = gerror.New("获取任务信息失败")
  76. }
  77. return
  78. }
  79. func (s *sysJob) EditJob(req *dao.SysJobEditReq) error {
  80. _, err := dao.SysJob.FieldsEx(dao.SysJob.C.JobId, dao.SysJob.C.CreateBy).Where(dao.SysJob.C.JobId, req.JobId).
  81. Update(req)
  82. return err
  83. }
  84. // JobStart 启动任务
  85. func (s *sysJob) JobStart(job *model.SysJob) error {
  86. //获取task目录下是否绑定对应的方法
  87. f := TimeTaskList.GetByName(job.InvokeTarget)
  88. if f == nil {
  89. return gerror.New("没有绑定对应的方法")
  90. }
  91. //传参
  92. paramArr := strings.Split(job.JobParams, "|")
  93. g.Log().Infof(job.JobParams)
  94. TimeTaskList.EditParams(f.FuncName, paramArr)
  95. //rs := gcron.Search(job.InvokeTarget)
  96. //if rs == nil {
  97. if job.MisfirePolicy == 1 {
  98. t, err := gcron.Add(job.CronExpression, f.Run, job.InvokeTarget)
  99. //t, err := gcron.AddSingleton(job.CronExpression, f.Run, job.InvokeTarget)
  100. if err != nil {
  101. return err
  102. }
  103. if t == nil {
  104. return gerror.New("启动任务失败")
  105. }
  106. } else {
  107. t, err := gcron.AddOnce(job.CronExpression, f.Run, job.InvokeTarget)
  108. if err != nil {
  109. return err
  110. }
  111. if t == nil {
  112. return gerror.New("启动任务失败")
  113. }
  114. }
  115. //}
  116. gcron.Start(job.InvokeTarget)
  117. if job.MisfirePolicy == 1 {
  118. job.Status = 0
  119. _, err := dao.SysJob.Where(dao.SysJob.C.JobId, job.JobId).Unscoped().Update(g.Map{
  120. dao.SysJob.C.Status: job.Status,
  121. })
  122. return err
  123. }
  124. return nil
  125. }
  126. // JobStop 停止任务
  127. func (s *sysJob) JobStop(job *model.SysJob) error {
  128. //获取task目录下是否绑定对应的方法
  129. f := TimeTaskList.GetByName(job.InvokeTarget)
  130. if f == nil {
  131. return gerror.New("没有绑定对应的方法")
  132. }
  133. rs := gcron.Search(job.InvokeTarget)
  134. if rs != nil {
  135. gcron.Remove(job.InvokeTarget)
  136. }
  137. job.Status = 1
  138. _, err := dao.SysJob.Where(dao.SysJob.C.JobId, job.JobId).Unscoped().Update(g.Map{
  139. dao.SysJob.C.Status: job.Status,
  140. })
  141. return err
  142. }
  143. //执行任务
  144. func (s *sysJob) JobRun(job *model.SysJob) error {
  145. //可以task目录下是否绑定对应的方法
  146. f := TimeTaskList.GetByName(job.InvokeTarget)
  147. if f == nil {
  148. return gerror.New("当前task目录下没有绑定这个方法")
  149. }
  150. //传参
  151. paramArr := strings.Split(job.JobParams, "|")
  152. TimeTaskList.EditParams(f.FuncName, paramArr)
  153. task, err := gcron.AddOnce("@every 1s", f.Run)
  154. if err != nil || task == nil {
  155. return gerror.New("启动执行失败")
  156. }
  157. return nil
  158. }
  159. // DeleteJobByIds 删除任务
  160. func (s *sysJob) DeleteJobByIds(ids []int) (err error) {
  161. if len(ids) == 0 {
  162. err = gerror.New("参数错误")
  163. return
  164. }
  165. gst := gset.NewFrom(ids)
  166. var jobs []*model.SysJob
  167. jobs, err = s.GetJobs()
  168. if err != nil {
  169. return
  170. }
  171. for _, job := range jobs {
  172. if gst.Contains(int(job.JobId)) {
  173. err = gerror.New("运行中的任务不能删除")
  174. return
  175. }
  176. }
  177. _, err = dao.SysJob.Delete(dao.SysJob.C.JobId+" in (?)", ids)
  178. if err != nil {
  179. g.Log().Error(err)
  180. err = gerror.New("删除失败")
  181. }
  182. return
  183. }