sys_job.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. package sys_job
  2. import (
  3. "gfast/app/task"
  4. "gfast/library/service"
  5. "github.com/gogf/gf/errors/gerror"
  6. "github.com/gogf/gf/frame/g"
  7. "github.com/gogf/gf/os/gcron"
  8. "github.com/gogf/gf/os/gtime"
  9. "github.com/gogf/gf/util/gconv"
  10. "strings"
  11. )
  12. // Fill with you ideas below.
  13. //添加操作请求参数
  14. type ReqAdd struct {
  15. JobName string `p:"jobName" v:"required#任务名称不能为空"`
  16. JobParams string `p:"jobParams"` // 任务参数
  17. JobGroup string `p:"jobGroup" `
  18. InvokeTarget string `p:"invokeTarget" v:"required#执行方法不能为空"`
  19. CronExpression string `p:"cronExpression" v:"required#任务表达式不能为空"`
  20. MisfirePolicy int `p:"misfirePolicy"`
  21. Concurrent int `p:"concurrent" `
  22. Status int `p:"status" v:"required#状态(0正常 1暂停)不能为空"`
  23. Remark string `p:"remark" `
  24. }
  25. //修改操作请求参数
  26. type ReqEdit struct {
  27. JobId int64 `p:"jobId" v:"min:1#任务id不能为空"`
  28. ReqAdd
  29. }
  30. //分页请求参数
  31. type SelectPageReq struct {
  32. JobName string `p:"jobName"` //任务名称
  33. JobGroup string `p:"jobGroup"` //任务组名
  34. Status string `p:"status"` //状态(0正常 1暂停)
  35. PageNum int `p:"page"` //当前页码
  36. PageSize int `p:"pageSize"` //每页数
  37. }
  38. func GetJobs() (jobs []*Entity, err error) {
  39. jobs, err = Model.Where("status", 0).All()
  40. return
  41. }
  42. //添加计划任务
  43. func Add(req *ReqAdd, userId int) (id int64, err error) {
  44. entity := new(Entity)
  45. entity.JobName = req.JobName
  46. entity.JobGroup = req.JobGroup
  47. entity.InvokeTarget = req.InvokeTarget
  48. entity.JobParams = req.JobParams
  49. entity.CronExpression = req.CronExpression
  50. entity.MisfirePolicy = req.MisfirePolicy
  51. entity.Remark = req.Remark
  52. entity.Status = req.Status
  53. entity.CreateTime = gconv.Uint64(gtime.Timestamp())
  54. entity.CreateBy = gconv.Uint64(userId)
  55. res, err := entity.Save()
  56. if err != nil {
  57. g.Log().Error(err)
  58. err = gerror.New("添加任务失败")
  59. }
  60. id, err = res.LastInsertId()
  61. if err != nil {
  62. g.Log().Error(err)
  63. err = gerror.New("添加任务失败")
  64. }
  65. return
  66. }
  67. //通过id获取任务信息
  68. func GetJobInfoById(id int64) (job *Entity, err error) {
  69. if id == 0 {
  70. err = gerror.New("参数错误")
  71. return
  72. }
  73. job, err = Model.FindOne("job_id", id)
  74. if err != nil {
  75. g.Log().Error(err)
  76. }
  77. if job == nil || err != nil {
  78. err = gerror.New("获取任务信息失败")
  79. return
  80. }
  81. return
  82. }
  83. //修改计划任务
  84. func Edit(req *ReqEdit, userId int) (rows int64, err error) {
  85. entity, err := GetJobInfoById(req.JobId)
  86. if err != nil {
  87. return
  88. }
  89. entity.JobName = req.JobName
  90. entity.JobGroup = req.JobGroup
  91. entity.InvokeTarget = req.InvokeTarget
  92. entity.JobParams = req.JobParams
  93. entity.CronExpression = req.CronExpression
  94. entity.MisfirePolicy = req.MisfirePolicy
  95. entity.Remark = req.Remark
  96. entity.Status = req.Status
  97. entity.UpdateTime = gconv.Uint64(gtime.Timestamp())
  98. entity.UpdateBy = gconv.Uint64(userId)
  99. res, err := entity.Update()
  100. if err != nil {
  101. g.Log().Error(err)
  102. err = gerror.New("修改任务失败")
  103. }
  104. rows, err = res.RowsAffected()
  105. if err != nil {
  106. g.Log().Error(err)
  107. err = gerror.New("修改任务失败")
  108. }
  109. return
  110. }
  111. //任务列表
  112. func JobListByPage(req *SelectPageReq) (total, page int, list []*Entity, err error) {
  113. model := Model
  114. if req != nil {
  115. if req.Status != "" {
  116. model = model.Where("status", gconv.Int(req.Status))
  117. }
  118. if req.JobGroup != "" {
  119. model = model.Where("job_group", req.JobGroup)
  120. }
  121. if req.JobName != "" {
  122. model = model.Where("job_name like ?", "%"+req.JobName+"%")
  123. }
  124. }
  125. total, err = model.Count()
  126. if err != nil {
  127. g.Log().Error(err)
  128. err = gerror.New("获取总行数失败")
  129. return
  130. }
  131. if req.PageNum == 0 {
  132. req.PageNum = 1
  133. }
  134. page = req.PageNum
  135. if req.PageSize == 0 {
  136. req.PageSize = service.AdminPageNum
  137. }
  138. list, err = model.Page(page, req.PageSize).Order("job_id asc").All()
  139. if err != nil {
  140. g.Log().Error(err)
  141. err = gerror.New("获取数据失败")
  142. return
  143. }
  144. return
  145. }
  146. //批量删除计划任务
  147. func DeleteJobByIds(ids []int) (err error) {
  148. if len(ids) == 0 {
  149. err = gerror.New("参数错误")
  150. }
  151. _, err = Model.Delete("job_id in (?)", ids)
  152. if err != nil {
  153. g.Log().Error(err)
  154. err = gerror.New("删除失败")
  155. }
  156. return
  157. }
  158. //启动任务
  159. func JobStart(job *Entity) error {
  160. //可以task目录下是否绑定对应的方法
  161. f := task.GetByName(job.InvokeTarget)
  162. if f == nil {
  163. return gerror.New("当前task目录下没有绑定这个方法")
  164. }
  165. //传参
  166. paramArr := strings.Split(job.JobParams, "|")
  167. task.EditParams(f.FuncName, paramArr)
  168. rs := gcron.Search(job.InvokeTarget)
  169. if rs == nil {
  170. if job.MisfirePolicy == 1 {
  171. task, err := gcron.Add(job.CronExpression, f.Run, job.InvokeTarget)
  172. if err != nil || task == nil {
  173. return err
  174. }
  175. } else {
  176. task, err := gcron.AddOnce(job.CronExpression, f.Run, job.InvokeTarget)
  177. if err != nil || task == nil {
  178. return err
  179. }
  180. }
  181. }
  182. gcron.Start(job.InvokeTarget)
  183. if job.MisfirePolicy == 1 {
  184. job.Status = 0
  185. job.Update()
  186. }
  187. return nil
  188. }
  189. //停止任务
  190. func JobStop(job *Entity) error {
  191. //可以task目录下是否绑定对应的方法
  192. f := task.GetByName(job.InvokeTarget)
  193. if f == nil {
  194. return gerror.New("当前task目录下没有绑定这个方法")
  195. }
  196. rs := gcron.Search(job.InvokeTarget)
  197. if rs != nil {
  198. gcron.Stop(job.InvokeTarget)
  199. }
  200. job.Status = 1
  201. job.Update()
  202. return nil
  203. }