framework.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. package framework
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. log "github.com/sirupsen/logrus"
  7. "io"
  8. "net"
  9. "net/http"
  10. "os"
  11. "path/filepath"
  12. "pmail/dto/parsemail"
  13. "pmail/models"
  14. "pmail/utils/context"
  15. "time"
  16. )
  17. type EmailHook interface {
  18. // SendBefore 邮件发送前的数据 同步执行
  19. SendBefore(ctx *context.Context, email *parsemail.Email)
  20. // SendAfter 邮件发送后的数据,err是每个收信服务器的错误信息 异步执行
  21. SendAfter(ctx *context.Context, email *parsemail.Email, err map[string]error)
  22. // ReceiveParseBefore 接收到邮件,解析之前的原始数据 同步执行
  23. ReceiveParseBefore(ctx *context.Context, email *[]byte)
  24. // ReceiveParseAfter 接收到邮件,解析之后的结构化数据 (收信规则前,写数据库前执行) 同步执行
  25. ReceiveParseAfter(ctx *context.Context, email *parsemail.Email)
  26. // ReceiveSaveAfter 邮件落库以后执行(收信规则后执行) 异步执行
  27. ReceiveSaveAfter(ctx *context.Context, email *parsemail.Email, ue []*models.UserEmail)
  28. }
  29. // HookDTO PMail 主程序和插件通信的结构体
  30. type HookDTO struct {
  31. ServerVersion string // 服务端程序版本
  32. Ctx *context.Context // 上下文
  33. Email *parsemail.Email // 邮件内容
  34. EmailByte *[]byte // 未解析前的文件内容
  35. ErrMap map[string]error // 错误信息
  36. UserEmail []*models.UserEmail
  37. }
  38. type Plugin struct {
  39. name string
  40. hook EmailHook
  41. }
  42. func CreatePlugin(name string, hook EmailHook) *Plugin {
  43. return &Plugin{
  44. name: name,
  45. hook: hook,
  46. }
  47. }
  48. type logFormatter struct {
  49. }
  50. // Format 定义日志输出格式
  51. func (l *logFormatter) Format(entry *log.Entry) ([]byte, error) {
  52. b := bytes.Buffer{}
  53. b.WriteString(fmt.Sprintf("[%s]", entry.Level.String()))
  54. b.WriteString(fmt.Sprintf("[%s]", entry.Time.Format("2006-01-02 15:04:05")))
  55. if entry.Context != nil {
  56. ctx := entry.Context.(*context.Context)
  57. if ctx != nil {
  58. b.WriteString(fmt.Sprintf("[%s]", ctx.GetValue(context.LogID)))
  59. }
  60. }
  61. b.WriteString(fmt.Sprintf("[%s:%d]", entry.Caller.File, entry.Caller.Line))
  62. b.WriteString(entry.Message)
  63. b.WriteString("\n")
  64. return b.Bytes(), nil
  65. }
  66. func (p *Plugin) Run() {
  67. // 设置日志格式为json格式
  68. log.SetFormatter(&logFormatter{})
  69. log.SetReportCaller(true)
  70. // 设置将日志输出到标准输出(默认的输出为stderr,标准错误)
  71. // 日志消息输出可以是任意的io.writer类型
  72. log.SetOutput(os.Stdout)
  73. if len(os.Args) < 2 {
  74. panic("Command Params Error!")
  75. }
  76. mux := http.NewServeMux()
  77. mux.HandleFunc("/SendBefore", func(writer http.ResponseWriter, request *http.Request) {
  78. log.Debugf("[%s] SendBefore Start", p.name)
  79. var hookDTO HookDTO
  80. body, _ := io.ReadAll(request.Body)
  81. err := json.Unmarshal(body, &hookDTO)
  82. if err != nil {
  83. log.Errorf("params error %+v", err)
  84. return
  85. }
  86. p.hook.SendBefore(hookDTO.Ctx, hookDTO.Email)
  87. body, _ = json.Marshal(hookDTO)
  88. writer.Write(body)
  89. log.Debugf("[%s] SendBefore End", p.name)
  90. })
  91. mux.HandleFunc("/SendAfter", func(writer http.ResponseWriter, request *http.Request) {
  92. log.Debugf("[%s] SendAfter Start", p.name)
  93. var hookDTO HookDTO
  94. body, _ := io.ReadAll(request.Body)
  95. err := json.Unmarshal(body, &hookDTO)
  96. if err != nil {
  97. log.Errorf("params error %+v", err)
  98. return
  99. }
  100. p.hook.SendAfter(hookDTO.Ctx, hookDTO.Email, hookDTO.ErrMap)
  101. body, _ = json.Marshal(hookDTO)
  102. writer.Write(body)
  103. log.Debugf("[%s] SendAfter End", p.name)
  104. })
  105. mux.HandleFunc("/ReceiveParseBefore", func(writer http.ResponseWriter, request *http.Request) {
  106. log.Debugf("[%s] ReceiveParseBefore Start", p.name)
  107. var hookDTO HookDTO
  108. body, _ := io.ReadAll(request.Body)
  109. err := json.Unmarshal(body, &hookDTO)
  110. if err != nil {
  111. log.Errorf("params error %+v", err)
  112. return
  113. }
  114. p.hook.ReceiveParseBefore(hookDTO.Ctx, hookDTO.EmailByte)
  115. body, _ = json.Marshal(hookDTO)
  116. writer.Write(body)
  117. log.Debugf("[%s] ReceiveParseBefore End", p.name)
  118. })
  119. mux.HandleFunc("/ReceiveParseAfter", func(writer http.ResponseWriter, request *http.Request) {
  120. log.Debugf("[%s] ReceiveParseAfter Start", p.name)
  121. var hookDTO HookDTO
  122. body, _ := io.ReadAll(request.Body)
  123. err := json.Unmarshal(body, &hookDTO)
  124. if err != nil {
  125. log.Errorf("params error %+v", err)
  126. return
  127. }
  128. p.hook.ReceiveParseAfter(hookDTO.Ctx, hookDTO.Email)
  129. body, _ = json.Marshal(hookDTO)
  130. writer.Write(body)
  131. log.Debugf("[%s] ReceiveParseAfter End", p.name)
  132. })
  133. mux.HandleFunc("/ReceiveSaveAfter", func(writer http.ResponseWriter, request *http.Request) {
  134. log.Debugf("[%s] ReceiveSaveAfter Start", p.name)
  135. var hookDTO HookDTO
  136. body, _ := io.ReadAll(request.Body)
  137. err := json.Unmarshal(body, &hookDTO)
  138. if err != nil {
  139. log.Errorf("params error %+v", err)
  140. return
  141. }
  142. p.hook.ReceiveSaveAfter(hookDTO.Ctx, hookDTO.Email, hookDTO.UserEmail)
  143. body, _ = json.Marshal(hookDTO)
  144. writer.Write(body)
  145. log.Debugf("[%s] ReceiveSaveAfter End", p.name)
  146. })
  147. server := http.Server{
  148. ReadTimeout: 5 * time.Second,
  149. WriteTimeout: 5 * time.Second,
  150. Handler: mux,
  151. }
  152. unixListener, err := net.Listen("unix", getExePath()+"/"+os.Args[1])
  153. if err != nil {
  154. panic(err)
  155. }
  156. err = server.Serve(unixListener)
  157. if err != nil {
  158. panic(err)
  159. }
  160. }
  161. func getExePath() string {
  162. ex, err := os.Executable()
  163. if err != nil {
  164. panic(err)
  165. }
  166. exePath := filepath.Dir(ex)
  167. return exePath
  168. }