source_node.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. package nodes
  2. import (
  3. "github.com/emqx/kuiper/common"
  4. "github.com/emqx/kuiper/plugins"
  5. "github.com/emqx/kuiper/xsql"
  6. "github.com/emqx/kuiper/xstream/api"
  7. "github.com/emqx/kuiper/xstream/extensions"
  8. "github.com/go-yaml/yaml"
  9. "sync"
  10. )
  11. type SourceNode struct {
  12. *defaultNode
  13. sourceType string
  14. options map[string]string
  15. isMock bool
  16. mutex sync.RWMutex
  17. sources []api.Source
  18. }
  19. func NewSourceNode(name string, options map[string]string) *SourceNode {
  20. t, ok := options["TYPE"]
  21. if !ok {
  22. t = "mqtt"
  23. }
  24. return &SourceNode{
  25. sourceType: t,
  26. defaultNode: &defaultNode{
  27. name: name,
  28. outputs: make(map[string]chan<- interface{}),
  29. concurrency: 1,
  30. },
  31. options: options,
  32. }
  33. }
  34. const OFFSET_KEY = "$$offset"
  35. //Only for mock source, do not use it in production
  36. func NewSourceNodeWithSource(name string, source api.Source, options map[string]string) *SourceNode {
  37. return &SourceNode{
  38. sources: []api.Source{source},
  39. defaultNode: &defaultNode{
  40. name: name,
  41. outputs: make(map[string]chan<- interface{}),
  42. concurrency: 1,
  43. },
  44. options: options,
  45. isMock: true,
  46. }
  47. }
  48. func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
  49. m.ctx = ctx
  50. logger := ctx.GetLogger()
  51. logger.Infof("open source node %s with option %v", m.name, m.options)
  52. go func() {
  53. props := m.getConf(ctx)
  54. if c, ok := props["concurrency"]; ok {
  55. if t, err := common.ToInt(c); err != nil || t <= 0 {
  56. logger.Warnf("invalid type for concurrency property, should be positive integer but found %t", c)
  57. } else {
  58. m.concurrency = t
  59. }
  60. }
  61. bl := 102400
  62. if c, ok := props["bufferLength"]; ok {
  63. if t, err := common.ToInt(c); err != nil || t <= 0 {
  64. logger.Warnf("invalid type for bufferLength property, should be positive integer but found %t", c)
  65. } else {
  66. bl = t
  67. }
  68. }
  69. m.reset()
  70. logger.Infof("open source node %d instances", m.concurrency)
  71. for i := 0; i < m.concurrency; i++ { // workers
  72. go func(instance int) {
  73. //Do open source instances
  74. var source api.Source
  75. var err error
  76. if !m.isMock {
  77. source, err = getSource(m.sourceType)
  78. if err != nil {
  79. m.drainError(errCh, err, ctx, logger)
  80. return
  81. }
  82. err = source.Configure(m.options["DATASOURCE"], props)
  83. if err != nil {
  84. m.drainError(errCh, err, ctx, logger)
  85. return
  86. }
  87. m.mutex.Lock()
  88. m.sources = append(m.sources, source)
  89. m.mutex.Unlock()
  90. } else {
  91. logger.Debugf("get source instance %d from %d sources", instance, len(m.sources))
  92. source = m.sources[instance]
  93. }
  94. stats, err := NewStatManager("source", ctx)
  95. if err != nil {
  96. m.drainError(errCh, err, ctx, logger)
  97. return
  98. }
  99. m.mutex.Lock()
  100. m.statManagers = append(m.statManagers, stats)
  101. m.mutex.Unlock()
  102. if rw, ok := source.(api.Rewindable); ok {
  103. if offset, err := ctx.GetState(OFFSET_KEY); err != nil {
  104. m.drainError(errCh, err, ctx, logger)
  105. } else if offset != nil {
  106. logger.Infof("Source rewind from %v", offset)
  107. err = rw.Rewind(offset)
  108. if err != nil {
  109. m.drainError(errCh, err, ctx, logger)
  110. }
  111. }
  112. }
  113. buffer := NewDynamicChannelBuffer()
  114. buffer.SetLimit(bl)
  115. sourceErrCh := make(chan error)
  116. go source.Open(ctx.WithInstance(instance), buffer.In, sourceErrCh)
  117. logger.Infof("Start source %s instance %d successfully", m.name, instance)
  118. for {
  119. select {
  120. case <-ctx.Done():
  121. logger.Infof("source %s done", m.name)
  122. m.close(ctx, logger)
  123. buffer.Close()
  124. return
  125. case err := <-sourceErrCh:
  126. m.drainError(errCh, err, ctx, logger)
  127. return
  128. case data := <-buffer.Out:
  129. stats.IncTotalRecordsIn()
  130. stats.ProcessTimeStart()
  131. tuple := &xsql.Tuple{Emitter: m.name, Message: data.Message(), Timestamp: common.GetNowInMilli(), Metadata: data.Meta()}
  132. stats.ProcessTimeEnd()
  133. logger.Debugf("source node %s is sending tuple %+v of timestamp %d", m.name, tuple, tuple.Timestamp)
  134. //blocking
  135. m.Broadcast(tuple)
  136. stats.IncTotalRecordsOut()
  137. stats.SetBufferLength(int64(buffer.GetLength()))
  138. if rw, ok := source.(api.Rewindable); ok {
  139. if offset, err := rw.GetOffset(); err != nil {
  140. m.drainError(errCh, err, ctx, logger)
  141. } else {
  142. err = ctx.PutState(OFFSET_KEY, offset)
  143. if err != nil {
  144. m.drainError(errCh, err, ctx, logger)
  145. }
  146. logger.Debugf("Source save offset %v", offset)
  147. }
  148. }
  149. logger.Debugf("source node %s has consumed tuple of timestamp %d", m.name, tuple.Timestamp)
  150. }
  151. }
  152. }(i)
  153. }
  154. }()
  155. }
  156. func (m *SourceNode) reset() {
  157. if !m.isMock {
  158. m.sources = nil
  159. }
  160. m.statManagers = nil
  161. }
  162. func doGetSource(t string) (api.Source, error) {
  163. var (
  164. s api.Source
  165. err error
  166. )
  167. switch t {
  168. case "mqtt":
  169. s = &extensions.MQTTSource{}
  170. case "httppull":
  171. s = &extensions.HTTPPullSource{}
  172. default:
  173. s, err = plugins.GetSource(t)
  174. if err != nil {
  175. return nil, err
  176. }
  177. }
  178. return s, nil
  179. }
  180. func (m *SourceNode) drainError(errCh chan<- error, err error, ctx api.StreamContext, logger api.Logger) {
  181. select {
  182. case errCh <- err:
  183. case <-ctx.Done():
  184. m.close(ctx, logger)
  185. }
  186. return
  187. }
  188. func (m *SourceNode) close(ctx api.StreamContext, logger api.Logger) {
  189. for _, s := range m.sources {
  190. if err := s.Close(ctx); err != nil {
  191. logger.Warnf("close source fails: %v", err)
  192. }
  193. }
  194. }
  195. func (m *SourceNode) getConf(ctx api.StreamContext) map[string]interface{} {
  196. confkey := m.options["CONF_KEY"]
  197. logger := ctx.GetLogger()
  198. confPath := "sources/" + m.sourceType + ".yaml"
  199. if m.sourceType == "mqtt" {
  200. confPath = "mqtt_source.yaml"
  201. }
  202. conf, err := common.LoadConf(confPath)
  203. props := make(map[string]interface{})
  204. if err == nil {
  205. cfg := make(map[interface{}]interface{})
  206. if err := yaml.Unmarshal(conf, &cfg); err != nil {
  207. logger.Warnf("fail to parse yaml for source %s. Return an empty configuration", m.sourceType)
  208. } else {
  209. def, ok := cfg["default"]
  210. if !ok {
  211. logger.Warnf("default conf is not found", confkey)
  212. } else {
  213. if def1, ok1 := def.(map[interface{}]interface{}); ok1 {
  214. props = common.ConvertMap(def1)
  215. }
  216. if c, ok := cfg[confkey]; ok {
  217. if c1, ok := c.(map[interface{}]interface{}); ok {
  218. c2 := common.ConvertMap(c1)
  219. for k, v := range c2 {
  220. props[k] = v
  221. }
  222. }
  223. }
  224. }
  225. }
  226. } else {
  227. logger.Warnf("config file %s.yaml is not loaded properly. Return an empty configuration", m.sourceType)
  228. }
  229. logger.Debugf("get conf for %s with conf key %s: %v", m.sourceType, confkey, props)
  230. return props
  231. }