job.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. package monitor_service
  2. import (
  3. "gfast/app/model/admin/sys_job"
  4. "gfast/app/task"
  5. "gfast/library/utils"
  6. "github.com/gogf/gf/errors/gerror"
  7. "github.com/gogf/gf/frame/g"
  8. "github.com/gogf/gf/os/gcron"
  9. "github.com/gogf/gf/os/gtime"
  10. "github.com/gogf/gf/util/gconv"
  11. "strings"
  12. )
  13. func init() {
  14. //自动执行已开启的任务
  15. jobs, err := sys_job.Model.Where("status", 0).All()
  16. if err != nil {
  17. g.Log().Error(err)
  18. }
  19. for _, job := range jobs {
  20. JobStart(job)
  21. }
  22. }
  23. //添加计划任务
  24. func AddJob(req *sys_job.ReqAdd, userId int) (id int64, err error) {
  25. entity := new(sys_job.Entity)
  26. entity.JobName = req.JobName
  27. entity.JobGroup = req.JobGroup
  28. entity.InvokeTarget = req.InvokeTarget
  29. entity.JobParams = req.JobParams
  30. entity.CronExpression = req.CronExpression
  31. entity.MisfirePolicy = req.MisfirePolicy
  32. entity.Remark = req.Remark
  33. entity.Status = req.Status
  34. entity.CreateTime = gconv.Uint64(gtime.Timestamp())
  35. entity.CreateBy = gconv.Uint64(userId)
  36. res, err := entity.Save()
  37. if err != nil {
  38. g.Log().Error(err)
  39. err = gerror.New("添加任务失败")
  40. }
  41. id, err = res.LastInsertId()
  42. if err != nil {
  43. g.Log().Error(err)
  44. err = gerror.New("添加任务失败")
  45. }
  46. return
  47. }
  48. //修改计划任务
  49. func EditJob(req *sys_job.ReqEdit, userId int) (rows int64, err error) {
  50. entity, err := GetJobInfoById(req.JobId)
  51. if err != nil {
  52. return
  53. }
  54. entity.JobName = req.JobName
  55. entity.JobGroup = req.JobGroup
  56. entity.InvokeTarget = req.InvokeTarget
  57. entity.JobParams = req.JobParams
  58. entity.CronExpression = req.CronExpression
  59. entity.MisfirePolicy = req.MisfirePolicy
  60. entity.Remark = req.Remark
  61. entity.Status = req.Status
  62. entity.UpdateTime = gconv.Uint64(gtime.Timestamp())
  63. entity.UpdateBy = gconv.Uint64(userId)
  64. res, err := entity.Update()
  65. if err != nil {
  66. g.Log().Error(err)
  67. err = gerror.New("修改任务失败")
  68. }
  69. rows, err = res.RowsAffected()
  70. if err != nil {
  71. g.Log().Error(err)
  72. err = gerror.New("修改任务失败")
  73. }
  74. return
  75. }
  76. //任务列表
  77. func JobListByPage(req *sys_job.SelectPageReq) (total, page int, list []*sys_job.Entity, err error) {
  78. model := sys_job.Model
  79. if req != nil {
  80. if req.Status != "" {
  81. model = model.Where("status", gconv.Int(req.Status))
  82. }
  83. if req.JobGroup != "" {
  84. model = model.Where("job_group", req.JobGroup)
  85. }
  86. if req.JobName != "" {
  87. model = model.Where("job_name like ?", "%"+req.JobName+"%")
  88. }
  89. }
  90. total, err = model.Count()
  91. if err != nil {
  92. g.Log().Error(err)
  93. err = gerror.New("获取总行数失败")
  94. return
  95. }
  96. if req.PageNum == 0 {
  97. req.PageNum = 1
  98. }
  99. page = req.PageNum
  100. if req.PageSize == 0 {
  101. req.PageSize = utils.AdminPageNum
  102. }
  103. list, err = model.Page(page, req.PageSize).Order("job_id asc").All()
  104. if err != nil {
  105. g.Log().Error(err)
  106. err = gerror.New("获取数据失败")
  107. return
  108. }
  109. return
  110. }
  111. //通过id获取任务信息
  112. func GetJobInfoById(id int64) (job *sys_job.Entity, err error) {
  113. if id == 0 {
  114. err = gerror.New("参数错误")
  115. return
  116. }
  117. job, err = sys_job.Model.FindOne("job_id", id)
  118. if err != nil {
  119. g.Log().Error(err)
  120. }
  121. if job == nil || err != nil {
  122. err = gerror.New("获取任务信息失败")
  123. return
  124. }
  125. return
  126. }
  127. //批量删除计划任务
  128. func DeleteJobByIds(ids []int) (err error) {
  129. if len(ids) == 0 {
  130. err = gerror.New("参数错误")
  131. }
  132. _, err = sys_job.Model.Delete("job_id in (?)", ids)
  133. if err != nil {
  134. g.Log().Error(err)
  135. err = gerror.New("删除失败")
  136. }
  137. return
  138. }
  139. //启动任务
  140. func JobStart(job *sys_job.Entity) error {
  141. //可以task目录下是否绑定对应的方法
  142. f := task.GetByName(job.InvokeTarget)
  143. if f == nil {
  144. return gerror.New("当前task目录下没有绑定这个方法")
  145. }
  146. //传参
  147. paramArr := strings.Split(job.JobParams, "|")
  148. g.Log().Debug(paramArr)
  149. task.EditParams(f.FuncName, paramArr)
  150. rs := gcron.Search(job.InvokeTarget)
  151. if rs == nil {
  152. if job.MisfirePolicy == 1 {
  153. task, err := gcron.Add(job.CronExpression, f.Run, job.InvokeTarget)
  154. if err != nil || task == nil {
  155. return err
  156. }
  157. } else {
  158. task, err := gcron.AddOnce(job.CronExpression, f.Run, job.InvokeTarget)
  159. if err != nil || task == nil {
  160. return err
  161. }
  162. }
  163. }
  164. gcron.Start(job.InvokeTarget)
  165. if job.MisfirePolicy == 1 {
  166. job.Status = 0
  167. job.Update()
  168. }
  169. return nil
  170. }
  171. //停止任务
  172. func JobStop(job *sys_job.Entity) error {
  173. //可以task目录下是否绑定对应的方法
  174. f := task.GetByName(job.InvokeTarget)
  175. if f == nil {
  176. return gerror.New("当前task目录下没有绑定这个方法")
  177. }
  178. rs := gcron.Search(job.InvokeTarget)
  179. if rs != nil {
  180. gcron.Stop(job.InvokeTarget)
  181. }
  182. job.Status = 1
  183. job.Update()
  184. return nil
  185. }