sink_node.go 6.8 KB


  1. package nodes
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/plugins"
  6. "github.com/emqx/kuiper/xstream/api"
  7. "github.com/emqx/kuiper/xstream/sinks"
  8. "sync"
  9. "time"
  10. )
  11. type SinkNode struct {
  12. //static
  13. input chan interface{}
  14. name string
  15. sinkType string
  16. mutex sync.RWMutex
  17. //configs (also static for sinks)
  18. concurrency int
  19. options map[string]interface{}
  20. isMock bool
  21. //states varies after restart
  22. ctx api.StreamContext
  23. statManagers []StatManager
  24. sinks []api.Sink
  25. }
  26. func NewSinkNode(name string, sinkType string, props map[string]interface{}) *SinkNode {
  27. bufferLength := 1024
  28. if c, ok := props["bufferLength"]; ok {
  29. if t, err := common.ToInt(c); err != nil || t <= 0 {
  30. //invalid property bufferLength
  31. } else {
  32. bufferLength = t
  33. }
  34. }
  35. return &SinkNode{
  36. input: make(chan interface{}, bufferLength),
  37. name: name,
  38. sinkType: sinkType,
  39. options: props,
  40. concurrency: 1,
  41. ctx: nil,
  42. }
  43. }
  44. //Only for mock source, do not use it in production
  45. func NewSinkNodeWithSink(name string, sink api.Sink) *SinkNode {
  46. return &SinkNode{
  47. input: make(chan interface{}, 1024),
  48. name: name,
  49. sinks: []api.Sink{sink},
  50. options: nil,
  51. concurrency: 1,
  52. ctx: nil,
  53. isMock: true,
  54. }
  55. }
  56. func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
  57. m.ctx = ctx
  58. logger := ctx.GetLogger()
  59. logger.Debugf("open sink node %s", m.name)
  60. go func() {
  61. if c, ok := m.options["concurrency"]; ok {
  62. if t, err := common.ToInt(c); err != nil || t <= 0 {
  63. logger.Warnf("invalid type for concurrency property, should be positive integer but found %t", c)
  64. } else {
  65. m.concurrency = t
  66. }
  67. }
  68. runAsync := false
  69. if c, ok := m.options["runAsync"]; ok {
  70. if t, ok := c.(bool); !ok {
  71. logger.Warnf("invalid type for runAsync property, should be bool but found %t", c)
  72. } else {
  73. runAsync = t
  74. }
  75. }
  76. retryInterval := 1000
  77. if c, ok := m.options["retryInterval"]; ok {
  78. if t, err := common.ToInt(c); err != nil || t < 0 {
  79. logger.Warnf("invalid type for retryInterval property, should be positive integer but found %t", c)
  80. } else {
  81. retryInterval = t
  82. }
  83. }
  84. cacheLength := 1024
  85. if c, ok := m.options["cacheLength"]; ok {
  86. if t, err := common.ToInt(c); err != nil || t < 0 {
  87. logger.Warnf("invalid type for cacheLength property, should be positive integer but found %t", c)
  88. } else {
  89. cacheLength = t
  90. }
  91. }
  92. cacheSaveInterval := 1000
  93. if c, ok := m.options["cacheSaveInterval"]; ok {
  94. if t, err := common.ToInt(c); err != nil || t < 0 {
  95. logger.Warnf("invalid type for cacheSaveInterval property, should be positive integer but found %t", c)
  96. } else {
  97. cacheSaveInterval = t
  98. }
  99. }
  100. omitIfEmpty := false
  101. if c, ok := m.options["omitIfEmpty"]; ok {
  102. if t, ok := c.(bool); !ok {
  103. logger.Warnf("invalid type for omitIfEmpty property, should be a bool value 'true/false'.", c)
  104. } else {
  105. omitIfEmpty = t
  106. }
  107. }
  108. m.reset()
  109. logger.Infof("open sink node %d instances", m.concurrency)
  110. for i := 0; i < m.concurrency; i++ { // workers
  111. go func(instance int) {
  112. var sink api.Sink
  113. var err error
  114. if !m.isMock {
  115. sink, err = getSink(m.sinkType, m.options)
  116. if err != nil {
  117. m.drainError(result, err, ctx, logger)
  118. return
  119. }
  120. m.mutex.Lock()
  121. m.sinks = append(m.sinks, sink)
  122. m.mutex.Unlock()
  123. if err := sink.Open(ctx); err != nil {
  124. m.drainError(result, err, ctx, logger)
  125. return
  126. }
  127. } else {
  128. sink = m.sinks[instance]
  129. }
  130. stats, err := NewStatManager("sink", ctx)
  131. if err != nil {
  132. m.drainError(result, err, ctx, logger)
  133. return
  134. }
  135. m.mutex.Lock()
  136. m.statManagers = append(m.statManagers, stats)
  137. m.mutex.Unlock()
  138. cache := NewCache(m.input, cacheLength, cacheSaveInterval, result, ctx)
  139. for {
  140. select {
  141. case data := <-cache.Out:
  142. stats.SetBufferLength(int64(cache.Length()))
  143. if runAsync {
  144. go doCollect(sink, data, stats, retryInterval, omitIfEmpty, cache.Complete, ctx)
  145. } else {
  146. doCollect(sink, data, stats, retryInterval, omitIfEmpty,cache.Complete, ctx)
  147. }
  148. case <-ctx.Done():
  149. logger.Infof("sink node %s instance %d done", m.name, instance)
  150. if err := sink.Close(ctx); err != nil {
  151. logger.Warnf("close sink node %s instance %d fails: %v", m.name, instance, err)
  152. }
  153. return
  154. }
  155. }
  156. }(i)
  157. }
  158. }()
  159. }
  160. func (m *SinkNode) reset() {
  161. if !m.isMock {
  162. m.sinks = nil
  163. }
  164. m.statManagers = nil
  165. }
  166. func doCollect(sink api.Sink, item *CacheTuple, stats StatManager, retryInterval int, omitIfEmpty bool, signalCh chan<- int, ctx api.StreamContext) {
  167. stats.IncTotalRecordsIn()
  168. stats.ProcessTimeStart()
  169. logger := ctx.GetLogger()
  170. var outdata []byte
  171. switch val := item.data.(type) {
  172. case []byte:
  173. outdata = val
  174. case error:
  175. outdata = []byte(fmt.Sprintf(`[{"error":"%s"}]`, val.Error()))
  176. default:
  177. outdata = []byte(fmt.Sprintf(`[{"error":"result is not a string but found %#v"}]`, val))
  178. }
  179. for {
  180. if omitIfEmpty && string(outdata) == "[{}]" {
  181. break
  182. }
  183. if err := sink.Collect(ctx, outdata); err != nil {
  184. stats.IncTotalExceptions()
  185. logger.Warnf("sink node %s instance %d publish %s error: %v", ctx.GetOpId(), ctx.GetInstanceId(), outdata, err)
  186. if retryInterval > 0 {
  187. time.Sleep(time.Duration(retryInterval) * time.Millisecond)
  188. logger.Debugf("try again")
  189. } else {
  190. break
  191. }
  192. } else {
  193. logger.Debugf("success")
  194. stats.IncTotalRecordsOut()
  195. signalCh <- item.index
  196. break
  197. }
  198. }
  199. stats.ProcessTimeEnd()
  200. }
  201. func doGetSink(name string, action map[string]interface{}) (api.Sink, error) {
  202. var s api.Sink
  203. switch name {
  204. case "log":
  205. s = sinks.NewLogSink()
  206. case "logToMemory":
  207. s = sinks.NewLogSinkToMemory()
  208. case "mqtt":
  209. s = &sinks.MQTTSink{}
  210. case "rest":
  211. s = &sinks.RestSink{}
  212. case "nop":
  213. s = &sinks.NopSink{}
  214. default:
  215. nf, err := plugins.GetPlugin(name, plugins.SINK)
  216. if err != nil {
  217. return nil, err
  218. }
  219. var ok bool
  220. s, ok = nf.(api.Sink)
  221. if !ok {
  222. return nil, fmt.Errorf("exported symbol %s is not type of api.Sink", name)
  223. }
  224. }
  225. err := s.Configure(action)
  226. if err != nil {
  227. return nil, err
  228. }
  229. return s, nil
  230. }
  231. func (m *SinkNode) GetName() string {
  232. return m.name
  233. }
  234. func (m *SinkNode) GetInput() (chan<- interface{}, string) {
  235. return m.input, m.name
  236. }
  237. func (m *SinkNode) GetMetrics() (result [][]interface{}) {
  238. for _, stats := range m.statManagers {
  239. result = append(result, stats.GetMetrics())
  240. }
  241. return result
  242. }
  243. func (m *SinkNode) drainError(errCh chan<- error, err error, ctx api.StreamContext, logger api.Logger) {
  244. go func() {
  245. select {
  246. case errCh <- err:
  247. case <-ctx.Done():
  248. m.close(ctx, logger)
  249. }
  250. }()
  251. }
  252. func (m *SinkNode) close(ctx api.StreamContext, logger api.Logger) {
  253. for _, s := range m.sinks {
  254. if err := s.Close(ctx); err != nil {
  255. logger.Warnf("close sink fails: %v", err)
  256. }
  257. }
  258. }