| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216 |
- package sys_job
- import (
- "gfast/app/task"
- "gfast/library/service"
- "github.com/gogf/gf/errors/gerror"
- "github.com/gogf/gf/frame/g"
- "github.com/gogf/gf/os/gcron"
- "github.com/gogf/gf/os/gtime"
- "github.com/gogf/gf/util/gconv"
- "strings"
- )
- // Fill with you ideas below.
- //添加操作请求参数
- type ReqAdd struct {
- JobName string `p:"jobName" v:"required#任务名称不能为空"`
- JobParams string `p:"jobParams"` // 任务参数
- JobGroup string `p:"jobGroup" `
- InvokeTarget string `p:"invokeTarget" v:"required#执行方法不能为空"`
- CronExpression string `p:"cronExpression" v:"required#任务表达式不能为空"`
- MisfirePolicy int `p:"misfirePolicy"`
- Concurrent int `p:"concurrent" `
- Status int `p:"status" v:"required#状态(0正常 1暂停)不能为空"`
- Remark string `p:"remark" `
- }
- //修改操作请求参数
- type ReqEdit struct {
- JobId int64 `p:"jobId" v:"min:1#任务id不能为空"`
- ReqAdd
- }
- //分页请求参数
- type SelectPageReq struct {
- JobName string `p:"jobName"` //任务名称
- JobGroup string `p:"jobGroup"` //任务组名
- Status string `p:"status"` //状态(0正常 1暂停)
- PageNum int `p:"page"` //当前页码
- PageSize int `p:"pageSize"` //每页数
- }
- func GetJobs() (jobs []*Entity, err error) {
- jobs, err = Model.Where("status", 0).All()
- return
- }
- //添加计划任务
- func Add(req *ReqAdd, userId int) (id int64, err error) {
- entity := new(Entity)
- entity.JobName = req.JobName
- entity.JobGroup = req.JobGroup
- entity.InvokeTarget = req.InvokeTarget
- entity.JobParams = req.JobParams
- entity.CronExpression = req.CronExpression
- entity.MisfirePolicy = req.MisfirePolicy
- entity.Remark = req.Remark
- entity.Status = req.Status
- entity.CreateTime = gconv.Uint64(gtime.Timestamp())
- entity.CreateBy = gconv.Uint64(userId)
- res, err := entity.Save()
- if err != nil {
- g.Log().Error(err)
- err = gerror.New("添加任务失败")
- }
- id, err = res.LastInsertId()
- if err != nil {
- g.Log().Error(err)
- err = gerror.New("添加任务失败")
- }
- return
- }
- //通过id获取任务信息
- func GetJobInfoById(id int64) (job *Entity, err error) {
- if id == 0 {
- err = gerror.New("参数错误")
- return
- }
- job, err = Model.FindOne("job_id", id)
- if err != nil {
- g.Log().Error(err)
- }
- if job == nil || err != nil {
- err = gerror.New("获取任务信息失败")
- return
- }
- return
- }
- //修改计划任务
- func Edit(req *ReqEdit, userId int) (rows int64, err error) {
- entity, err := GetJobInfoById(req.JobId)
- if err != nil {
- return
- }
- entity.JobName = req.JobName
- entity.JobGroup = req.JobGroup
- entity.InvokeTarget = req.InvokeTarget
- entity.JobParams = req.JobParams
- entity.CronExpression = req.CronExpression
- entity.MisfirePolicy = req.MisfirePolicy
- entity.Remark = req.Remark
- entity.Status = req.Status
- entity.UpdateTime = gconv.Uint64(gtime.Timestamp())
- entity.UpdateBy = gconv.Uint64(userId)
- res, err := entity.Update()
- if err != nil {
- g.Log().Error(err)
- err = gerror.New("修改任务失败")
- }
- rows, err = res.RowsAffected()
- if err != nil {
- g.Log().Error(err)
- err = gerror.New("修改任务失败")
- }
- return
- }
- //任务列表
- func JobListByPage(req *SelectPageReq) (total, page int, list []*Entity, err error) {
- model := Model
- if req != nil {
- if req.Status != "" {
- model = model.Where("status", gconv.Int(req.Status))
- }
- if req.JobGroup != "" {
- model = model.Where("job_group", req.JobGroup)
- }
- if req.JobName != "" {
- model = model.Where("job_name like ?", "%"+req.JobName+"%")
- }
- }
- total, err = model.Count()
- if err != nil {
- g.Log().Error(err)
- err = gerror.New("获取总行数失败")
- return
- }
- if req.PageNum == 0 {
- req.PageNum = 1
- }
- page = req.PageNum
- if req.PageSize == 0 {
- req.PageSize = service.AdminPageNum
- }
- list, err = model.Page(page, req.PageSize).Order("job_id asc").All()
- if err != nil {
- g.Log().Error(err)
- err = gerror.New("获取数据失败")
- return
- }
- return
- }
- //批量删除计划任务
- func DeleteJobByIds(ids []int) (err error) {
- if len(ids) == 0 {
- err = gerror.New("参数错误")
- }
- _, err = Model.Delete("job_id in (?)", ids)
- if err != nil {
- g.Log().Error(err)
- err = gerror.New("删除失败")
- }
- return
- }
- //启动任务
- func JobStart(job *Entity) error {
- //可以task目录下是否绑定对应的方法
- f := task.GetByName(job.InvokeTarget)
- if f == nil {
- return gerror.New("当前task目录下没有绑定这个方法")
- }
- //传参
- paramArr := strings.Split(job.JobParams, "|")
- task.EditParams(f.FuncName, paramArr)
- rs := gcron.Search(job.InvokeTarget)
- if rs == nil {
- if job.MisfirePolicy == 1 {
- task, err := gcron.Add(job.CronExpression, f.Run, job.InvokeTarget)
- if err != nil || task == nil {
- return err
- }
- } else {
- task, err := gcron.AddOnce(job.CronExpression, f.Run, job.InvokeTarget)
- if err != nil || task == nil {
- return err
- }
- }
- }
- gcron.Start(job.InvokeTarget)
- if job.MisfirePolicy == 1 {
- job.Status = 0
- job.Update()
- }
- return nil
- }
- //停止任务
- func JobStop(job *Entity) error {
- //可以task目录下是否绑定对应的方法
- f := task.GetByName(job.InvokeTarget)
- if f == nil {
- return gerror.New("当前task目录下没有绑定这个方法")
- }
- rs := gcron.Search(job.InvokeTarget)
- if rs != nil {
- gcron.Stop(job.InvokeTarget)
- }
- job.Status = 1
- job.Update()
- return nil
- }
|