sink_node.go 6.5 KB


  1. package nodes
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/common/plugin_manager"
  6. "github.com/emqx/kuiper/xstream/api"
  7. "github.com/emqx/kuiper/xstream/sinks"
  8. "sync"
  9. "time"
  10. )
  11. type SinkNode struct {
  12. //static
  13. input chan interface{}
  14. name string
  15. sinkType string
  16. mutex sync.RWMutex
  17. //configs (also static for sinks)
  18. concurrency int
  19. options map[string]interface{}
  20. isMock bool
  21. //states varies after restart
  22. ctx api.StreamContext
  23. statManagers []StatManager
  24. sinks []api.Sink
  25. }
  26. func NewSinkNode(name string, sinkType string, props map[string]interface{}) *SinkNode {
  27. bufferLength := 1024
  28. if c, ok := props["bufferLength"]; ok {
  29. if t, err := common.ToInt(c); err != nil || t <= 0 {
  30. //invalid property bufferLength
  31. } else {
  32. bufferLength = t
  33. }
  34. }
  35. return &SinkNode{
  36. input: make(chan interface{}, bufferLength),
  37. name: name,
  38. sinkType: sinkType,
  39. options: props,
  40. concurrency: 1,
  41. ctx: nil,
  42. }
  43. }
  44. //Only for mock source, do not use it in production
  45. func NewSinkNodeWithSink(name string, sink api.Sink) *SinkNode {
  46. return &SinkNode{
  47. input: make(chan interface{}, 1024),
  48. name: name,
  49. sinks: []api.Sink{sink},
  50. options: nil,
  51. concurrency: 1,
  52. ctx: nil,
  53. isMock: true,
  54. }
  55. }
  56. func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
  57. m.ctx = ctx
  58. logger := ctx.GetLogger()
  59. logger.Debugf("open sink node %s", m.name)
  60. go func() {
  61. if c, ok := m.options["concurrency"]; ok {
  62. if t, err := common.ToInt(c); err != nil || t <= 0 {
  63. logger.Warnf("invalid type for concurrency property, should be positive integer but found %t", c)
  64. } else {
  65. m.concurrency = t
  66. }
  67. }
  68. runAsync := false
  69. if c, ok := m.options["runAsync"]; ok {
  70. if t, ok := c.(bool); !ok {
  71. logger.Warnf("invalid type for runAsync property, should be bool but found %t", c)
  72. } else {
  73. runAsync = t
  74. }
  75. }
  76. retryInterval := 1000
  77. if c, ok := m.options["retryInterval"]; ok {
  78. if t, err := common.ToInt(c); err != nil || t < 0 {
  79. logger.Warnf("invalid type for retryInterval property, should be positive integer but found %t", c)
  80. } else {
  81. retryInterval = t
  82. }
  83. }
  84. cacheLength := 1024
  85. if c, ok := m.options["cacheLength"]; ok {
  86. if t, err := common.ToInt(c); err != nil || t < 0 {
  87. logger.Warnf("invalid type for cacheLength property, should be positive integer but found %t", c)
  88. } else {
  89. cacheLength = t
  90. }
  91. }
  92. cacheSaveInterval := 1000
  93. if c, ok := m.options["cacheSaveInterval"]; ok {
  94. if t, err := common.ToInt(c); err != nil || t < 0 {
  95. logger.Warnf("invalid type for cacheSaveInterval property, should be positive integer but found %t", c)
  96. } else {
  97. cacheSaveInterval = t
  98. }
  99. }
  100. m.reset()
  101. logger.Infof("open sink node %d instances", m.concurrency)
  102. for i := 0; i < m.concurrency; i++ { // workers
  103. go func(instance int) {
  104. var sink api.Sink
  105. var err error
  106. if !m.isMock {
  107. sink, err = getSink(m.sinkType, m.options)
  108. if err != nil {
  109. m.drainError(result, err, ctx, logger)
  110. return
  111. }
  112. m.mutex.Lock()
  113. m.sinks = append(m.sinks, sink)
  114. m.mutex.Unlock()
  115. if err := sink.Open(ctx); err != nil {
  116. m.drainError(result, err, ctx, logger)
  117. return
  118. }
  119. } else {
  120. sink = m.sinks[instance]
  121. }
  122. stats, err := NewStatManager("sink", ctx)
  123. if err != nil {
  124. m.drainError(result, err, ctx, logger)
  125. return
  126. }
  127. m.mutex.Lock()
  128. m.statManagers = append(m.statManagers, stats)
  129. m.mutex.Unlock()
  130. cache := NewCache(m.input, cacheLength, cacheSaveInterval, result, ctx)
  131. for {
  132. select {
  133. case data := <-cache.Out:
  134. stats.SetBufferLength(int64(cache.Length()))
  135. if runAsync {
  136. go doCollect(sink, data, stats, retryInterval, cache.Complete, ctx)
  137. } else {
  138. doCollect(sink, data, stats, retryInterval, cache.Complete, ctx)
  139. }
  140. case <-ctx.Done():
  141. logger.Infof("sink node %s instance %d done", m.name, instance)
  142. if err := sink.Close(ctx); err != nil {
  143. logger.Warnf("close sink node %s instance %d fails: %v", m.name, instance, err)
  144. }
  145. return
  146. }
  147. }
  148. }(i)
  149. }
  150. }()
  151. }
  152. func (m *SinkNode) reset() {
  153. if !m.isMock {
  154. m.sinks = nil
  155. }
  156. m.statManagers = nil
  157. }
  158. func doCollect(sink api.Sink, item *CacheTuple, stats StatManager, retryInterval int, signalCh chan<- int, ctx api.StreamContext) {
  159. stats.IncTotalRecordsIn()
  160. stats.ProcessTimeStart()
  161. logger := ctx.GetLogger()
  162. var outdata []byte
  163. switch val := item.data.(type) {
  164. case []byte:
  165. outdata = val
  166. case error:
  167. outdata = []byte(fmt.Sprintf(`[{"error":"%s"}]`, val.Error()))
  168. default:
  169. outdata = []byte(fmt.Sprintf(`[{"error":"result is not a string but found %#v"}]`, val))
  170. }
  171. for {
  172. if err := sink.Collect(ctx, outdata); err != nil {
  173. stats.IncTotalExceptions()
  174. logger.Warnf("sink node %s instance %d publish %s error: %v", ctx.GetOpId(), ctx.GetInstanceId(), outdata, err)
  175. if retryInterval > 0 {
  176. time.Sleep(time.Duration(retryInterval) * time.Millisecond)
  177. logger.Debugf("try again")
  178. } else {
  179. break
  180. }
  181. } else {
  182. logger.Debugf("success")
  183. stats.IncTotalRecordsOut()
  184. signalCh <- item.index
  185. break
  186. }
  187. }
  188. stats.ProcessTimeEnd()
  189. }
  190. func doGetSink(name string, action map[string]interface{}) (api.Sink, error) {
  191. var s api.Sink
  192. switch name {
  193. case "log":
  194. s = sinks.NewLogSink()
  195. case "logToMemory":
  196. s = sinks.NewLogSinkToMemory()
  197. case "mqtt":
  198. s = &sinks.MQTTSink{}
  199. case "rest":
  200. s = &sinks.RestSink{}
  201. default:
  202. nf, err := plugin_manager.GetPlugin(name, "sinks")
  203. if err != nil {
  204. return nil, err
  205. }
  206. var ok bool
  207. s, ok = nf.(api.Sink)
  208. if !ok {
  209. return nil, fmt.Errorf("exported symbol %s is not type of api.Sink", name)
  210. }
  211. }
  212. err := s.Configure(action)
  213. if err != nil {
  214. return nil, err
  215. }
  216. return s, nil
  217. }
  218. func (m *SinkNode) GetName() string {
  219. return m.name
  220. }
  221. func (m *SinkNode) GetInput() (chan<- interface{}, string) {
  222. return m.input, m.name
  223. }
  224. func (m *SinkNode) GetMetrics() (result [][]interface{}) {
  225. for _, stats := range m.statManagers {
  226. result = append(result, stats.GetMetrics())
  227. }
  228. return result
  229. }
  230. func (m *SinkNode) drainError(errCh chan<- error, err error, ctx api.StreamContext, logger api.Logger) {
  231. go func() {
  232. select {
  233. case errCh <- err:
  234. case <-ctx.Done():
  235. m.close(ctx, logger)
  236. }
  237. }()
  238. }
  239. func (m *SinkNode) close(ctx api.StreamContext, logger api.Logger) {
  240. for _, s := range m.sinks {
  241. if err := s.Close(ctx); err != nil {
  242. logger.Warnf("close sink fails: %v", err)
  243. }
  244. }
  245. }