source_node.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. package nodes
  2. import (
  3. "github.com/emqx/kuiper/common"
  4. "github.com/emqx/kuiper/plugins"
  5. "github.com/emqx/kuiper/xsql"
  6. "github.com/emqx/kuiper/xstream/api"
  7. "github.com/emqx/kuiper/xstream/extensions"
  8. "sync"
  9. )
  10. type SourceNode struct {
  11. *defaultNode
  12. streamType xsql.StreamType
  13. sourceType string
  14. options *xsql.Options
  15. isMock bool
  16. mutex sync.RWMutex
  17. sources []api.Source
  18. }
  19. func NewSourceNode(name string, st xsql.StreamType, options *xsql.Options) *SourceNode {
  20. t := options.TYPE
  21. if t == "" {
  22. if st == xsql.TypeStream {
  23. t = "mqtt"
  24. } else if st == xsql.TypeTable {
  25. t = "file"
  26. }
  27. }
  28. return &SourceNode{
  29. streamType: st,
  30. sourceType: t,
  31. defaultNode: &defaultNode{
  32. name: name,
  33. outputs: make(map[string]chan<- interface{}),
  34. concurrency: 1,
  35. },
  36. options: options,
  37. }
  38. }
  39. const OFFSET_KEY = "$$offset"
  40. //Only for mock source, do not use it in production
  41. func NewSourceNodeWithSource(name string, source api.Source, options *xsql.Options) *SourceNode {
  42. return &SourceNode{
  43. sources: []api.Source{source},
  44. defaultNode: &defaultNode{
  45. name: name,
  46. outputs: make(map[string]chan<- interface{}),
  47. concurrency: 1,
  48. },
  49. options: options,
  50. isMock: true,
  51. }
  52. }
  53. func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
  54. m.ctx = ctx
  55. logger := ctx.GetLogger()
  56. logger.Infof("open source node %s with option %v", m.name, m.options)
  57. go func() {
  58. props := getSourceConf(ctx, m.sourceType, m.options)
  59. if c, ok := props["concurrency"]; ok {
  60. if t, err := common.ToInt(c, common.STRICT); err != nil || t <= 0 {
  61. logger.Warnf("invalid type for concurrency property, should be positive integer but found %t", c)
  62. } else {
  63. m.concurrency = t
  64. }
  65. }
  66. bl := 102400
  67. if c, ok := props["bufferLength"]; ok {
  68. if t, err := common.ToInt(c, common.STRICT); err != nil || t <= 0 {
  69. logger.Warnf("invalid type for bufferLength property, should be positive integer but found %t", c)
  70. } else {
  71. bl = t
  72. }
  73. }
  74. // Set retain size for table type
  75. if m.options.RETAIN_SIZE > 0 && m.streamType == xsql.TypeTable {
  76. props["$retainSize"] = m.options.RETAIN_SIZE
  77. }
  78. m.reset()
  79. logger.Infof("open source node %d instances", m.concurrency)
  80. for i := 0; i < m.concurrency; i++ { // workers
  81. go func(instance int) {
  82. //Do open source instances
  83. var source api.Source
  84. var err error
  85. if !m.isMock {
  86. source, err = getSource(m.sourceType)
  87. if err != nil {
  88. m.drainError(errCh, err, ctx, logger)
  89. return
  90. }
  91. err = source.Configure(m.options.DATASOURCE, props)
  92. if err != nil {
  93. m.drainError(errCh, err, ctx, logger)
  94. return
  95. }
  96. m.mutex.Lock()
  97. m.sources = append(m.sources, source)
  98. m.mutex.Unlock()
  99. } else {
  100. logger.Debugf("get source instance %d from %d sources", instance, len(m.sources))
  101. source = m.sources[instance]
  102. }
  103. stats, err := NewStatManager("source", ctx)
  104. if err != nil {
  105. m.drainError(errCh, err, ctx, logger)
  106. return
  107. }
  108. m.mutex.Lock()
  109. m.statManagers = append(m.statManagers, stats)
  110. m.mutex.Unlock()
  111. if rw, ok := source.(api.Rewindable); ok {
  112. if offset, err := ctx.GetState(OFFSET_KEY); err != nil {
  113. m.drainError(errCh, err, ctx, logger)
  114. } else if offset != nil {
  115. logger.Infof("Source rewind from %v", offset)
  116. err = rw.Rewind(offset)
  117. if err != nil {
  118. m.drainError(errCh, err, ctx, logger)
  119. }
  120. }
  121. }
  122. buffer := NewDynamicChannelBuffer()
  123. buffer.SetLimit(bl)
  124. sourceErrCh := make(chan error)
  125. go source.Open(ctx.WithInstance(instance), buffer.In, sourceErrCh)
  126. logger.Infof("Start source %s instance %d successfully", m.name, instance)
  127. for {
  128. select {
  129. case <-ctx.Done():
  130. logger.Infof("source %s done", m.name)
  131. m.close(ctx, logger)
  132. buffer.Close()
  133. return
  134. case err := <-sourceErrCh:
  135. m.drainError(errCh, err, ctx, logger)
  136. return
  137. case data := <-buffer.Out:
  138. stats.IncTotalRecordsIn()
  139. stats.ProcessTimeStart()
  140. tuple := &xsql.Tuple{Emitter: m.name, Message: data.Message(), Timestamp: common.GetNowInMilli(), Metadata: data.Meta()}
  141. stats.ProcessTimeEnd()
  142. logger.Debugf("source node %s is sending tuple %+v of timestamp %d", m.name, tuple, tuple.Timestamp)
  143. //blocking
  144. m.Broadcast(tuple)
  145. stats.IncTotalRecordsOut()
  146. stats.SetBufferLength(int64(buffer.GetLength()))
  147. if rw, ok := source.(api.Rewindable); ok {
  148. if offset, err := rw.GetOffset(); err != nil {
  149. m.drainError(errCh, err, ctx, logger)
  150. } else {
  151. err = ctx.PutState(OFFSET_KEY, offset)
  152. if err != nil {
  153. m.drainError(errCh, err, ctx, logger)
  154. }
  155. logger.Debugf("Source save offset %v", offset)
  156. }
  157. }
  158. logger.Debugf("source node %s has consumed tuple of timestamp %d", m.name, tuple.Timestamp)
  159. }
  160. }
  161. }(i)
  162. }
  163. }()
  164. }
  165. func (m *SourceNode) reset() {
  166. if !m.isMock {
  167. m.sources = nil
  168. }
  169. m.statManagers = nil
  170. }
  171. func doGetSource(t string) (api.Source, error) {
  172. var (
  173. s api.Source
  174. err error
  175. )
  176. switch t {
  177. case "mqtt":
  178. s = &extensions.MQTTSource{}
  179. case "httppull":
  180. s = &extensions.HTTPPullSource{}
  181. case "file":
  182. s = &extensions.FileSource{}
  183. default:
  184. s, err = plugins.GetSource(t)
  185. if err != nil {
  186. return nil, err
  187. }
  188. }
  189. return s, nil
  190. }
  191. func (m *SourceNode) drainError(errCh chan<- error, err error, ctx api.StreamContext, logger api.Logger) {
  192. select {
  193. case errCh <- err:
  194. case <-ctx.Done():
  195. m.close(ctx, logger)
  196. }
  197. return
  198. }
  199. func (m *SourceNode) close(ctx api.StreamContext, logger api.Logger) {
  200. for _, s := range m.sources {
  201. if err := s.Close(ctx); err != nil {
  202. logger.Warnf("close source fails: %v", err)
  203. }
  204. }
  205. }