source_node.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. package node
  2. import (
  3. "github.com/lf-edge/ekuiper/internal/conf"
  4. "github.com/lf-edge/ekuiper/internal/plugin"
  5. "github.com/lf-edge/ekuiper/internal/topo/source"
  6. "github.com/lf-edge/ekuiper/internal/xsql"
  7. "github.com/lf-edge/ekuiper/pkg/api"
  8. "github.com/lf-edge/ekuiper/pkg/ast"
  9. "github.com/lf-edge/ekuiper/pkg/cast"
  10. "sync"
  11. )
  12. type SourceNode struct {
  13. *defaultNode
  14. streamType ast.StreamType
  15. sourceType string
  16. options *ast.Options
  17. bufferLength int
  18. props map[string]interface{}
  19. mutex sync.RWMutex
  20. sources []api.Source
  21. }
  22. func NewSourceNode(name string, st ast.StreamType, options *ast.Options) *SourceNode {
  23. t := options.TYPE
  24. if t == "" {
  25. if st == ast.TypeStream {
  26. t = "mqtt"
  27. } else if st == ast.TypeTable {
  28. t = "file"
  29. }
  30. }
  31. return &SourceNode{
  32. streamType: st,
  33. sourceType: t,
  34. defaultNode: &defaultNode{
  35. name: name,
  36. outputs: make(map[string]chan<- interface{}),
  37. concurrency: 1,
  38. },
  39. options: options,
  40. }
  41. }
  42. const OffsetKey = "$$offset"
  43. func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
  44. m.ctx = ctx
  45. logger := ctx.GetLogger()
  46. logger.Infof("open source node %s with option %v", m.name, m.options)
  47. go func() {
  48. props := getSourceConf(ctx, m.sourceType, m.options)
  49. m.props = props
  50. if c, ok := props["concurrency"]; ok {
  51. if t, err := cast.ToInt(c, cast.STRICT); err != nil || t <= 0 {
  52. logger.Warnf("invalid type for concurrency property, should be positive integer but found %t", c)
  53. } else {
  54. m.concurrency = t
  55. }
  56. }
  57. bl := 102400
  58. if c, ok := props["bufferLength"]; ok {
  59. if t, err := cast.ToInt(c, cast.STRICT); err != nil || t <= 0 {
  60. logger.Warnf("invalid type for bufferLength property, should be positive integer but found %t", c)
  61. } else {
  62. bl = t
  63. }
  64. }
  65. m.bufferLength = bl
  66. // Set retain size for table type
  67. if m.options.RETAIN_SIZE > 0 && m.streamType == ast.TypeTable {
  68. props["$retainSize"] = m.options.RETAIN_SIZE
  69. }
  70. m.reset()
  71. logger.Infof("open source node %d instances", m.concurrency)
  72. for i := 0; i < m.concurrency; i++ { // workers
  73. go func(instance int) {
  74. //Do open source instances
  75. var (
  76. si *sourceInstance
  77. buffer *DynamicChannelBuffer
  78. err error
  79. )
  80. si, err = getSourceInstance(m, instance)
  81. if err != nil {
  82. m.drainError(errCh, err, ctx, logger)
  83. return
  84. }
  85. m.mutex.Lock()
  86. m.sources = append(m.sources, si.source)
  87. m.mutex.Unlock()
  88. buffer = si.dataCh
  89. stats, err := NewStatManager("source", ctx)
  90. if err != nil {
  91. m.drainError(errCh, err, ctx, logger)
  92. return
  93. }
  94. m.mutex.Lock()
  95. m.statManagers = append(m.statManagers, stats)
  96. m.mutex.Unlock()
  97. logger.Infof("Start source %s instance %d successfully", m.name, instance)
  98. for {
  99. select {
  100. case <-ctx.Done():
  101. logger.Infof("source %s done", m.name)
  102. m.close(ctx, logger)
  103. buffer.Close()
  104. return
  105. case err := <-si.errorCh:
  106. m.drainError(errCh, err, ctx, logger)
  107. return
  108. case data := <-buffer.Out:
  109. stats.IncTotalRecordsIn()
  110. stats.ProcessTimeStart()
  111. tuple := &xsql.Tuple{Emitter: m.name, Message: data.Message(), Timestamp: conf.GetNowInMilli(), Metadata: data.Meta()}
  112. stats.ProcessTimeEnd()
  113. logger.Debugf("source node %s is sending tuple %+v of timestamp %d", m.name, tuple, tuple.Timestamp)
  114. //blocking
  115. m.Broadcast(tuple)
  116. stats.IncTotalRecordsOut()
  117. stats.SetBufferLength(int64(buffer.GetLength()))
  118. if rw, ok := si.source.(api.Rewindable); ok {
  119. if offset, err := rw.GetOffset(); err != nil {
  120. m.drainError(errCh, err, ctx, logger)
  121. } else {
  122. err = ctx.PutState(OffsetKey, offset)
  123. if err != nil {
  124. m.drainError(errCh, err, ctx, logger)
  125. }
  126. logger.Debugf("Source save offset %v", offset)
  127. }
  128. }
  129. logger.Debugf("source node %s has consumed tuple of timestamp %d", m.name, tuple.Timestamp)
  130. }
  131. }
  132. }(i)
  133. }
  134. }()
  135. }
  136. func (m *SourceNode) reset() {
  137. m.statManagers = nil
  138. }
  139. func doGetSource(t string) (api.Source, error) {
  140. var (
  141. s api.Source
  142. err error
  143. )
  144. switch t {
  145. case "mqtt":
  146. s = &source.MQTTSource{}
  147. case "httppull":
  148. s = &source.HTTPPullSource{}
  149. case "file":
  150. s = &source.FileSource{}
  151. default:
  152. s, err = plugin.GetSource(t)
  153. if err != nil {
  154. return nil, err
  155. }
  156. }
  157. return s, nil
  158. }
  159. func (m *SourceNode) drainError(errCh chan<- error, err error, ctx api.StreamContext, logger api.Logger) {
  160. select {
  161. case errCh <- err:
  162. case <-ctx.Done():
  163. m.close(ctx, logger)
  164. }
  165. return
  166. }
  167. func (m *SourceNode) close(ctx api.StreamContext, logger api.Logger) {
  168. if !m.options.SHARED {
  169. for _, s := range m.sources {
  170. if err := s.Close(ctx); err != nil {
  171. logger.Warnf("close source fails: %v", err)
  172. }
  173. }
  174. } else {
  175. removeSourceInstance(m)
  176. }
  177. }