async.go 1.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. package async
  2. import (
  3. "errors"
  4. log "github.com/sirupsen/logrus"
  5. "github.com/spf13/cast"
  6. "pmail/dto"
  7. "runtime/debug"
  8. "sync"
  9. )
  10. type Callback func()
  11. type Async struct {
  12. wg *sync.WaitGroup
  13. lastError error
  14. ctx *dto.Context
  15. }
  16. func New(ctx *dto.Context) *Async {
  17. return &Async{
  18. ctx: ctx,
  19. }
  20. }
  21. func (as *Async) LastError() error {
  22. return as.lastError
  23. }
  24. func (as *Async) WaitProcess(callback Callback) {
  25. if as.wg == nil {
  26. as.wg = &sync.WaitGroup{}
  27. }
  28. as.wg.Add(1)
  29. as.Process(func() {
  30. defer as.wg.Done()
  31. callback()
  32. })
  33. }
  34. func (as *Async) Process(callback Callback) {
  35. go func() {
  36. defer func() {
  37. if err := recover(); err != nil {
  38. as.lastError = as.HandleErrRecover(err)
  39. }
  40. }()
  41. callback()
  42. }()
  43. }
  44. func (as *Async) Wait() {
  45. if as.wg == nil {
  46. return
  47. }
  48. as.wg.Wait()
  49. }
  50. // HandleErrRecover panic恢复处理
  51. func (as *Async) HandleErrRecover(err interface{}) (returnErr error) {
  52. switch err.(type) {
  53. case error:
  54. returnErr = err.(error)
  55. default:
  56. returnErr = errors.New(cast.ToString(err))
  57. }
  58. log.WithContext(as.ctx).Errorf("goroutine panic:%s \n %s", err, string(debug.Stack()))
  59. return
  60. }