source_node.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. package nodes
  2. import (
  3. "github.com/emqx/kuiper/common"
  4. "github.com/emqx/kuiper/plugins"
  5. "github.com/emqx/kuiper/xsql"
  6. "github.com/emqx/kuiper/xstream/api"
  7. "github.com/emqx/kuiper/xstream/extensions"
  8. "sync"
  9. )
  10. type SourceNode struct {
  11. *defaultNode
  12. streamType xsql.StreamType
  13. sourceType string
  14. options *xsql.Options
  15. bufferLength int
  16. props map[string]interface{}
  17. mutex sync.RWMutex
  18. sources []api.Source
  19. }
  20. func NewSourceNode(name string, st xsql.StreamType, options *xsql.Options) *SourceNode {
  21. t := options.TYPE
  22. if t == "" {
  23. if st == xsql.TypeStream {
  24. t = "mqtt"
  25. } else if st == xsql.TypeTable {
  26. t = "file"
  27. }
  28. }
  29. return &SourceNode{
  30. streamType: st,
  31. sourceType: t,
  32. defaultNode: &defaultNode{
  33. name: name,
  34. outputs: make(map[string]chan<- interface{}),
  35. concurrency: 1,
  36. },
  37. options: options,
  38. }
  39. }
  40. const OffsetKey = "$$offset"
  41. func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
  42. m.ctx = ctx
  43. logger := ctx.GetLogger()
  44. logger.Infof("open source node %s with option %v", m.name, m.options)
  45. go func() {
  46. props := getSourceConf(ctx, m.sourceType, m.options)
  47. m.props = props
  48. if c, ok := props["concurrency"]; ok {
  49. if t, err := common.ToInt(c, common.STRICT); err != nil || t <= 0 {
  50. logger.Warnf("invalid type for concurrency property, should be positive integer but found %t", c)
  51. } else {
  52. m.concurrency = t
  53. }
  54. }
  55. bl := 102400
  56. if c, ok := props["bufferLength"]; ok {
  57. if t, err := common.ToInt(c, common.STRICT); err != nil || t <= 0 {
  58. logger.Warnf("invalid type for bufferLength property, should be positive integer but found %t", c)
  59. } else {
  60. bl = t
  61. }
  62. }
  63. m.bufferLength = bl
  64. // Set retain size for table type
  65. if m.options.RETAIN_SIZE > 0 && m.streamType == xsql.TypeTable {
  66. props["$retainSize"] = m.options.RETAIN_SIZE
  67. }
  68. m.reset()
  69. logger.Infof("open source node %d instances", m.concurrency)
  70. for i := 0; i < m.concurrency; i++ { // workers
  71. go func(instance int) {
  72. //Do open source instances
  73. var (
  74. si *sourceInstance
  75. buffer *DynamicChannelBuffer
  76. err error
  77. )
  78. si, err = getSourceInstance(m, instance)
  79. if err != nil {
  80. m.drainError(errCh, err, ctx, logger)
  81. return
  82. }
  83. m.mutex.Lock()
  84. m.sources = append(m.sources, si.source)
  85. m.mutex.Unlock()
  86. buffer = si.dataCh
  87. stats, err := NewStatManager("source", ctx)
  88. if err != nil {
  89. m.drainError(errCh, err, ctx, logger)
  90. return
  91. }
  92. m.mutex.Lock()
  93. m.statManagers = append(m.statManagers, stats)
  94. m.mutex.Unlock()
  95. logger.Infof("Start source %s instance %d successfully", m.name, instance)
  96. for {
  97. select {
  98. case <-ctx.Done():
  99. logger.Infof("source %s done", m.name)
  100. m.close(ctx, logger)
  101. buffer.Close()
  102. return
  103. case err := <-si.errorCh:
  104. m.drainError(errCh, err, ctx, logger)
  105. return
  106. case data := <-buffer.Out:
  107. stats.IncTotalRecordsIn()
  108. stats.ProcessTimeStart()
  109. tuple := &xsql.Tuple{Emitter: m.name, Message: data.Message(), Timestamp: common.GetNowInMilli(), Metadata: data.Meta()}
  110. stats.ProcessTimeEnd()
  111. logger.Debugf("source node %s is sending tuple %+v of timestamp %d", m.name, tuple, tuple.Timestamp)
  112. //blocking
  113. m.Broadcast(tuple)
  114. stats.IncTotalRecordsOut()
  115. stats.SetBufferLength(int64(buffer.GetLength()))
  116. if rw, ok := si.source.(api.Rewindable); ok {
  117. if offset, err := rw.GetOffset(); err != nil {
  118. m.drainError(errCh, err, ctx, logger)
  119. } else {
  120. err = ctx.PutState(OffsetKey, offset)
  121. if err != nil {
  122. m.drainError(errCh, err, ctx, logger)
  123. }
  124. logger.Debugf("Source save offset %v", offset)
  125. }
  126. }
  127. logger.Debugf("source node %s has consumed tuple of timestamp %d", m.name, tuple.Timestamp)
  128. }
  129. }
  130. }(i)
  131. }
  132. }()
  133. }
  134. func (m *SourceNode) reset() {
  135. m.statManagers = nil
  136. }
  137. func doGetSource(t string) (api.Source, error) {
  138. var (
  139. s api.Source
  140. err error
  141. )
  142. switch t {
  143. case "mqtt":
  144. s = &extensions.MQTTSource{}
  145. case "httppull":
  146. s = &extensions.HTTPPullSource{}
  147. case "file":
  148. s = &extensions.FileSource{}
  149. default:
  150. s, err = plugins.GetSource(t)
  151. if err != nil {
  152. return nil, err
  153. }
  154. }
  155. return s, nil
  156. }
  157. func (m *SourceNode) drainError(errCh chan<- error, err error, ctx api.StreamContext, logger api.Logger) {
  158. select {
  159. case errCh <- err:
  160. case <-ctx.Done():
  161. m.close(ctx, logger)
  162. }
  163. return
  164. }
  165. func (m *SourceNode) close(ctx api.StreamContext, logger api.Logger) {
  166. if !m.options.SHARED {
  167. for _, s := range m.sources {
  168. if err := s.Close(ctx); err != nil {
  169. logger.Warnf("close source fails: %v", err)
  170. }
  171. }
  172. } else {
  173. removeSourceInstance(m)
  174. }
  175. }