source_node.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. package nodes
  2. import (
  3. "github.com/emqx/kuiper/common"
  4. "github.com/emqx/kuiper/plugins"
  5. "github.com/emqx/kuiper/xsql"
  6. "github.com/emqx/kuiper/xstream/api"
  7. "github.com/emqx/kuiper/xstream/extensions"
  8. "github.com/go-yaml/yaml"
  9. "strings"
  10. "sync"
  11. )
  12. type SourceNode struct {
  13. *defaultNode
  14. sourceType string
  15. options map[string]string
  16. isMock bool
  17. mutex sync.RWMutex
  18. sources []api.Source
  19. }
  20. func NewSourceNode(name string, options map[string]string) *SourceNode {
  21. t, ok := options["TYPE"]
  22. if !ok {
  23. t = "mqtt"
  24. }
  25. return &SourceNode{
  26. sourceType: t,
  27. defaultNode: &defaultNode{
  28. name: name,
  29. outputs: make(map[string]chan<- interface{}),
  30. concurrency: 1,
  31. },
  32. options: options,
  33. }
  34. }
  35. const OFFSET_KEY = "$$offset"
  36. //Only for mock source, do not use it in production
  37. func NewSourceNodeWithSource(name string, source api.Source, options map[string]string) *SourceNode {
  38. return &SourceNode{
  39. sources: []api.Source{source},
  40. defaultNode: &defaultNode{
  41. name: name,
  42. outputs: make(map[string]chan<- interface{}),
  43. concurrency: 1,
  44. },
  45. options: options,
  46. isMock: true,
  47. }
  48. }
  49. func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
  50. m.ctx = ctx
  51. logger := ctx.GetLogger()
  52. logger.Infof("open source node %s with option %v", m.name, m.options)
  53. go func() {
  54. props := m.getConf(ctx)
  55. if c, ok := props["concurrency"]; ok {
  56. if t, err := common.ToInt(c); err != nil || t <= 0 {
  57. logger.Warnf("invalid type for concurrency property, should be positive integer but found %t", c)
  58. } else {
  59. m.concurrency = t
  60. }
  61. }
  62. bl := 102400
  63. if c, ok := props["bufferLength"]; ok {
  64. if t, err := common.ToInt(c); err != nil || t <= 0 {
  65. logger.Warnf("invalid type for bufferLength property, should be positive integer but found %t", c)
  66. } else {
  67. bl = t
  68. }
  69. }
  70. m.reset()
  71. logger.Infof("open source node %d instances", m.concurrency)
  72. for i := 0; i < m.concurrency; i++ { // workers
  73. go func(instance int) {
  74. //Do open source instances
  75. var source api.Source
  76. var err error
  77. if !m.isMock {
  78. source, err = getSource(m.sourceType)
  79. if err != nil {
  80. m.drainError(errCh, err, ctx, logger)
  81. return
  82. }
  83. err = source.Configure(m.options["DATASOURCE"], props)
  84. if err != nil {
  85. m.drainError(errCh, err, ctx, logger)
  86. return
  87. }
  88. m.mutex.Lock()
  89. m.sources = append(m.sources, source)
  90. m.mutex.Unlock()
  91. } else {
  92. logger.Debugf("get source instance %d from %d sources", instance, len(m.sources))
  93. source = m.sources[instance]
  94. }
  95. stats, err := NewStatManager("source", ctx)
  96. if err != nil {
  97. m.drainError(errCh, err, ctx, logger)
  98. return
  99. }
  100. m.mutex.Lock()
  101. m.statManagers = append(m.statManagers, stats)
  102. m.mutex.Unlock()
  103. if rw, ok := source.(api.Rewindable); ok {
  104. if offset, err := ctx.GetState(OFFSET_KEY); err != nil {
  105. m.drainError(errCh, err, ctx, logger)
  106. } else if offset != nil {
  107. logger.Infof("Source rewind from %v", offset)
  108. err = rw.Rewind(offset)
  109. if err != nil {
  110. m.drainError(errCh, err, ctx, logger)
  111. }
  112. }
  113. }
  114. buffer := NewDynamicChannelBuffer()
  115. buffer.SetLimit(bl)
  116. sourceErrCh := make(chan error)
  117. go source.Open(ctx.WithInstance(instance), buffer.In, sourceErrCh)
  118. logger.Infof("Start source %s instance %d successfully", m.name, instance)
  119. for {
  120. select {
  121. case <-ctx.Done():
  122. logger.Infof("source %s done", m.name)
  123. m.close(ctx, logger)
  124. buffer.Close()
  125. return
  126. case err := <-sourceErrCh:
  127. m.drainError(errCh, err, ctx, logger)
  128. return
  129. case data := <-buffer.Out:
  130. stats.IncTotalRecordsIn()
  131. stats.ProcessTimeStart()
  132. tuple := &xsql.Tuple{Emitter: m.name, Message: data.Message(), Timestamp: common.GetNowInMilli(), Metadata: data.Meta()}
  133. stats.ProcessTimeEnd()
  134. logger.Debugf("source node %s is sending tuple %+v of timestamp %d", m.name, tuple, tuple.Timestamp)
  135. //blocking
  136. m.Broadcast(tuple)
  137. stats.IncTotalRecordsOut()
  138. stats.SetBufferLength(int64(buffer.GetLength()))
  139. if rw, ok := source.(api.Rewindable); ok {
  140. if offset, err := rw.GetOffset(); err != nil {
  141. m.drainError(errCh, err, ctx, logger)
  142. } else {
  143. err = ctx.PutState(OFFSET_KEY, offset)
  144. if err != nil {
  145. m.drainError(errCh, err, ctx, logger)
  146. }
  147. logger.Debugf("Source save offset %v", offset)
  148. }
  149. }
  150. logger.Debugf("source node %s has consumed tuple of timestamp %d", m.name, tuple.Timestamp)
  151. }
  152. }
  153. }(i)
  154. }
  155. }()
  156. }
  157. func (m *SourceNode) reset() {
  158. if !m.isMock {
  159. m.sources = nil
  160. }
  161. m.statManagers = nil
  162. }
  163. func doGetSource(t string) (api.Source, error) {
  164. var (
  165. s api.Source
  166. err error
  167. )
  168. switch t {
  169. case "mqtt":
  170. s = &extensions.MQTTSource{}
  171. case "httppull":
  172. s = &extensions.HTTPPullSource{}
  173. default:
  174. s, err = plugins.GetSource(t)
  175. if err != nil {
  176. return nil, err
  177. }
  178. }
  179. return s, nil
  180. }
  181. func (m *SourceNode) drainError(errCh chan<- error, err error, ctx api.StreamContext, logger api.Logger) {
  182. select {
  183. case errCh <- err:
  184. case <-ctx.Done():
  185. m.close(ctx, logger)
  186. }
  187. return
  188. }
  189. func (m *SourceNode) close(ctx api.StreamContext, logger api.Logger) {
  190. for _, s := range m.sources {
  191. if err := s.Close(ctx); err != nil {
  192. logger.Warnf("close source fails: %v", err)
  193. }
  194. }
  195. }
  196. func (m *SourceNode) getConf(ctx api.StreamContext) map[string]interface{} {
  197. confkey := m.options["CONF_KEY"]
  198. logger := ctx.GetLogger()
  199. confPath := "sources/" + m.sourceType + ".yaml"
  200. if m.sourceType == "mqtt" {
  201. confPath = "mqtt_source.yaml"
  202. }
  203. conf, err := common.LoadConf(confPath)
  204. props := make(map[string]interface{})
  205. if err == nil {
  206. cfg := make(map[interface{}]interface{})
  207. if err := yaml.Unmarshal(conf, &cfg); err != nil {
  208. logger.Warnf("fail to parse yaml for source %s. Return an empty configuration", m.sourceType)
  209. } else {
  210. def, ok := cfg["default"]
  211. if !ok {
  212. logger.Warnf("default conf is not found", confkey)
  213. } else {
  214. if def1, ok1 := def.(map[interface{}]interface{}); ok1 {
  215. props = common.ConvertMap(def1)
  216. }
  217. if c, ok := cfg[confkey]; ok {
  218. if c1, ok := c.(map[interface{}]interface{}); ok {
  219. c2 := common.ConvertMap(c1)
  220. for k, v := range c2 {
  221. props[k] = v
  222. }
  223. }
  224. }
  225. }
  226. }
  227. } else {
  228. logger.Warnf("config file %s.yaml is not loaded properly. Return an empty configuration", m.sourceType)
  229. }
  230. f, ok := m.options["FORMAT"]
  231. if !ok || f == "" {
  232. f = "json"
  233. }
  234. props["format"] = strings.ToLower(f)
  235. logger.Debugf("get conf for %s with conf key %s: %v", m.sourceType, confkey, props)
  236. return props
  237. }