source_node.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. package nodes
  2. import (
  3. "github.com/emqx/kuiper/common"
  4. "github.com/emqx/kuiper/plugins"
  5. "github.com/emqx/kuiper/xsql"
  6. "github.com/emqx/kuiper/xstream/api"
  7. "github.com/emqx/kuiper/xstream/extensions"
  8. "sync"
  9. )
  10. type SourceNode struct {
  11. *defaultNode
  12. streamType xsql.StreamType
  13. sourceType string
  14. options map[string]string
  15. isMock bool
  16. mutex sync.RWMutex
  17. sources []api.Source
  18. }
  19. func NewSourceNode(name string, st xsql.StreamType, options map[string]string) *SourceNode {
  20. t, ok := options["TYPE"]
  21. if !ok {
  22. if st == xsql.TypeStream {
  23. t = "mqtt"
  24. } else if st == xsql.TypeTable {
  25. t = "file"
  26. }
  27. }
  28. return &SourceNode{
  29. sourceType: t,
  30. defaultNode: &defaultNode{
  31. name: name,
  32. outputs: make(map[string]chan<- interface{}),
  33. concurrency: 1,
  34. },
  35. options: options,
  36. }
  37. }
  38. const OFFSET_KEY = "$$offset"
  39. //Only for mock source, do not use it in production
  40. func NewSourceNodeWithSource(name string, source api.Source, options map[string]string) *SourceNode {
  41. return &SourceNode{
  42. sources: []api.Source{source},
  43. defaultNode: &defaultNode{
  44. name: name,
  45. outputs: make(map[string]chan<- interface{}),
  46. concurrency: 1,
  47. },
  48. options: options,
  49. isMock: true,
  50. }
  51. }
  52. func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
  53. m.ctx = ctx
  54. logger := ctx.GetLogger()
  55. logger.Infof("open source node %s with option %v", m.name, m.options)
  56. go func() {
  57. props := getSourceConf(ctx, m.sourceType, m.options)
  58. if c, ok := props["concurrency"]; ok {
  59. if t, err := common.ToInt(c, common.STRICT); err != nil || t <= 0 {
  60. logger.Warnf("invalid type for concurrency property, should be positive integer but found %t", c)
  61. } else {
  62. m.concurrency = t
  63. }
  64. }
  65. bl := 102400
  66. if c, ok := props["bufferLength"]; ok {
  67. if t, err := common.ToInt(c, common.STRICT); err != nil || t <= 0 {
  68. logger.Warnf("invalid type for bufferLength property, should be positive integer but found %t", c)
  69. } else {
  70. bl = t
  71. }
  72. }
  73. m.reset()
  74. logger.Infof("open source node %d instances", m.concurrency)
  75. for i := 0; i < m.concurrency; i++ { // workers
  76. go func(instance int) {
  77. //Do open source instances
  78. var source api.Source
  79. var err error
  80. if !m.isMock {
  81. source, err = getSource(m.sourceType)
  82. if err != nil {
  83. m.drainError(errCh, err, ctx, logger)
  84. return
  85. }
  86. err = source.Configure(m.options["DATASOURCE"], props)
  87. if err != nil {
  88. m.drainError(errCh, err, ctx, logger)
  89. return
  90. }
  91. m.mutex.Lock()
  92. m.sources = append(m.sources, source)
  93. m.mutex.Unlock()
  94. } else {
  95. logger.Debugf("get source instance %d from %d sources", instance, len(m.sources))
  96. source = m.sources[instance]
  97. }
  98. stats, err := NewStatManager("source", ctx)
  99. if err != nil {
  100. m.drainError(errCh, err, ctx, logger)
  101. return
  102. }
  103. m.mutex.Lock()
  104. m.statManagers = append(m.statManagers, stats)
  105. m.mutex.Unlock()
  106. if rw, ok := source.(api.Rewindable); ok {
  107. if offset, err := ctx.GetState(OFFSET_KEY); err != nil {
  108. m.drainError(errCh, err, ctx, logger)
  109. } else if offset != nil {
  110. logger.Infof("Source rewind from %v", offset)
  111. err = rw.Rewind(offset)
  112. if err != nil {
  113. m.drainError(errCh, err, ctx, logger)
  114. }
  115. }
  116. }
  117. buffer := NewDynamicChannelBuffer()
  118. buffer.SetLimit(bl)
  119. sourceErrCh := make(chan error)
  120. go source.Open(ctx.WithInstance(instance), buffer.In, sourceErrCh)
  121. logger.Infof("Start source %s instance %d successfully", m.name, instance)
  122. for {
  123. select {
  124. case <-ctx.Done():
  125. logger.Infof("source %s done", m.name)
  126. m.close(ctx, logger)
  127. buffer.Close()
  128. return
  129. case err := <-sourceErrCh:
  130. m.drainError(errCh, err, ctx, logger)
  131. return
  132. case data := <-buffer.Out:
  133. stats.IncTotalRecordsIn()
  134. stats.ProcessTimeStart()
  135. tuple := &xsql.Tuple{Emitter: m.name, Message: data.Message(), Timestamp: common.GetNowInMilli(), Metadata: data.Meta()}
  136. stats.ProcessTimeEnd()
  137. logger.Debugf("source node %s is sending tuple %+v of timestamp %d", m.name, tuple, tuple.Timestamp)
  138. //blocking
  139. m.Broadcast(tuple)
  140. stats.IncTotalRecordsOut()
  141. stats.SetBufferLength(int64(buffer.GetLength()))
  142. if rw, ok := source.(api.Rewindable); ok {
  143. if offset, err := rw.GetOffset(); err != nil {
  144. m.drainError(errCh, err, ctx, logger)
  145. } else {
  146. err = ctx.PutState(OFFSET_KEY, offset)
  147. if err != nil {
  148. m.drainError(errCh, err, ctx, logger)
  149. }
  150. logger.Debugf("Source save offset %v", offset)
  151. }
  152. }
  153. logger.Debugf("source node %s has consumed tuple of timestamp %d", m.name, tuple.Timestamp)
  154. }
  155. }
  156. }(i)
  157. }
  158. }()
  159. }
  160. func (m *SourceNode) reset() {
  161. if !m.isMock {
  162. m.sources = nil
  163. }
  164. m.statManagers = nil
  165. }
  166. func doGetSource(t string) (api.Source, error) {
  167. var (
  168. s api.Source
  169. err error
  170. )
  171. switch t {
  172. case "mqtt":
  173. s = &extensions.MQTTSource{}
  174. case "httppull":
  175. s = &extensions.HTTPPullSource{}
  176. case "file":
  177. s = &extensions.FileSource{}
  178. default:
  179. s, err = plugins.GetSource(t)
  180. if err != nil {
  181. return nil, err
  182. }
  183. }
  184. return s, nil
  185. }
  186. func (m *SourceNode) drainError(errCh chan<- error, err error, ctx api.StreamContext, logger api.Logger) {
  187. select {
  188. case errCh <- err:
  189. case <-ctx.Done():
  190. m.close(ctx, logger)
  191. }
  192. return
  193. }
  194. func (m *SourceNode) close(ctx api.StreamContext, logger api.Logger) {
  195. for _, s := range m.sources {
  196. if err := s.Close(ctx); err != nil {
  197. logger.Warnf("close source fails: %v", err)
  198. }
  199. }
  200. }