base.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. package hooks
  2. import (
  3. oContext "context"
  4. "encoding/json"
  5. "fmt"
  6. log "github.com/sirupsen/logrus"
  7. "io"
  8. "net"
  9. "net/http"
  10. "os"
  11. "path/filepath"
  12. "pmail/dto/parsemail"
  13. "pmail/hooks/framework"
  14. "pmail/utils/context"
  15. "strings"
  16. "time"
  17. )
  18. // HookList
  19. var HookList []framework.EmailHook
  20. type HookSender struct {
  21. httpc http.Client
  22. name string
  23. socket string
  24. }
  25. func (h *HookSender) SendBefore(ctx *context.Context, email *parsemail.Email) {
  26. dto := framework.HookDTO{
  27. Ctx: ctx,
  28. Email: email,
  29. }
  30. body, _ := json.Marshal(dto)
  31. ret, err := h.httpc.Post("http://plugin/SendBefore", "application/json", strings.NewReader(string(body)))
  32. if err != nil {
  33. log.WithContext(ctx).Errorf("[%s] Error! %v", h.name, err)
  34. return
  35. }
  36. body, _ = io.ReadAll(ret.Body)
  37. json.Unmarshal(body, &dto)
  38. ctx = dto.Ctx
  39. email = dto.Email
  40. }
  41. func (h *HookSender) SendAfter(ctx *context.Context, email *parsemail.Email, err map[string]error) {
  42. dto := framework.HookDTO{
  43. Ctx: ctx,
  44. Email: email,
  45. ErrMap: err,
  46. }
  47. body, _ := json.Marshal(dto)
  48. ret, errL := h.httpc.Post("http://plugin/SendAfter", "application/json", strings.NewReader(string(body)))
  49. if errL != nil {
  50. log.WithContext(ctx).Errorf("[%s] Error! %v", h.name, errL)
  51. return
  52. }
  53. body, _ = io.ReadAll(ret.Body)
  54. json.Unmarshal(body, &dto)
  55. ctx = dto.Ctx
  56. email = dto.Email
  57. err = dto.ErrMap
  58. }
  59. func (h *HookSender) ReceiveParseBefore(ctx *context.Context, email *[]byte) {
  60. dto := framework.HookDTO{
  61. Ctx: ctx,
  62. EmailByte: email,
  63. }
  64. body, _ := json.Marshal(dto)
  65. ret, errL := h.httpc.Post("http://plugin/ReceiveParseBefore", "application/json", strings.NewReader(string(body)))
  66. if errL != nil {
  67. log.WithContext(ctx).Errorf("[%s] Error! %v", h.name, errL)
  68. return
  69. }
  70. body, _ = io.ReadAll(ret.Body)
  71. json.Unmarshal(body, &dto)
  72. ctx = dto.Ctx
  73. email = dto.EmailByte
  74. }
  75. func (h *HookSender) ReceiveParseAfter(ctx *context.Context, email *parsemail.Email) {
  76. dto := framework.HookDTO{
  77. Ctx: ctx,
  78. Email: email,
  79. }
  80. body, _ := json.Marshal(dto)
  81. ret, errL := h.httpc.Post("http://plugin/ReceiveParseAfter", "application/json", strings.NewReader(string(body)))
  82. if errL != nil {
  83. log.WithContext(ctx).Errorf("[%s] Error! %v", h.name, errL)
  84. return
  85. }
  86. body, _ = io.ReadAll(ret.Body)
  87. json.Unmarshal(body, &dto)
  88. ctx = dto.Ctx
  89. email = dto.Email
  90. }
  91. func NewHookSender(socketPath string, name string) *HookSender {
  92. httpc := http.Client{
  93. Timeout: time.Second * 10,
  94. Transport: &http.Transport{
  95. DialContext: func(ctx oContext.Context, network, addr string) (net.Conn, error) {
  96. return net.Dial("unix", socketPath)
  97. },
  98. },
  99. }
  100. return &HookSender{
  101. httpc: httpc,
  102. socket: socketPath,
  103. name: name,
  104. }
  105. }
  106. // Init 注册hook对象
  107. func Init() {
  108. env := os.Environ()
  109. procAttr := &os.ProcAttr{
  110. Env: env,
  111. Files: []*os.File{
  112. os.Stdin,
  113. os.Stdout,
  114. os.Stderr,
  115. },
  116. }
  117. root := "./plugins"
  118. pluginNo := 1
  119. filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
  120. if info != nil && !info.IsDir() && !strings.Contains(info.Name(), ".") {
  121. socketPath := fmt.Sprintf("%s/%d.socket", root, pluginNo)
  122. os.Remove(socketPath)
  123. log.Infof("[%s] Plugin Load", info.Name())
  124. p, err := os.StartProcess(path, []string{
  125. info.Name(),
  126. fmt.Sprintf("%d.socket", pluginNo),
  127. }, procAttr)
  128. if err != nil {
  129. panic(err)
  130. }
  131. fmt.Printf("[%s] Plugin Start! PID:%d", info.Name(), p.Pid)
  132. pluginNo++
  133. HookList = append(HookList, NewHookSender(socketPath, info.Name()))
  134. go func() {
  135. stat, err := p.Wait()
  136. log.Errorf("[%s] Plugin Stop. Error:%v Stat:%v", info.Name(), err, stat.String())
  137. }()
  138. for i := 0; i < 5; i++ {
  139. time.Sleep(1 * time.Second)
  140. if _, err := os.Stat(socketPath); err == nil {
  141. break
  142. }
  143. if i == 4 {
  144. panic(fmt.Sprintf("[%s] Start Fail!", info.Name()))
  145. }
  146. }
  147. }
  148. return nil
  149. })
  150. }