source_node.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. package nodes
  2. import (
  3. "github.com/emqx/kuiper/common"
  4. "github.com/emqx/kuiper/plugins"
  5. "github.com/emqx/kuiper/xsql"
  6. "github.com/emqx/kuiper/xstream/api"
  7. "github.com/emqx/kuiper/xstream/extensions"
  8. "github.com/go-yaml/yaml"
  9. "sync"
  10. )
  11. type SourceNode struct {
  12. *defaultNode
  13. sourceType string
  14. options map[string]string
  15. isMock bool
  16. mutex sync.RWMutex
  17. sources []api.Source
  18. }
  19. func NewSourceNode(name string, options map[string]string) *SourceNode {
  20. t, ok := options["TYPE"]
  21. if !ok {
  22. t = "mqtt"
  23. }
  24. return &SourceNode{
  25. sourceType: t,
  26. defaultNode: &defaultNode{
  27. name: name,
  28. outputs: make(map[string]chan<- interface{}),
  29. concurrency: 1,
  30. },
  31. options: options,
  32. }
  33. }
  34. const OFFSET_KEY = "$$offset"
  35. //Only for mock source, do not use it in production
  36. func NewSourceNodeWithSource(name string, source api.Source, options map[string]string) *SourceNode {
  37. return &SourceNode{
  38. sources: []api.Source{source},
  39. defaultNode: &defaultNode{
  40. name: name,
  41. outputs: make(map[string]chan<- interface{}),
  42. concurrency: 1,
  43. },
  44. options: options,
  45. isMock: true,
  46. }
  47. }
  48. func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
  49. m.ctx = ctx
  50. logger := ctx.GetLogger()
  51. logger.Infof("open source node %s with option %v", m.name, m.options)
  52. go func() {
  53. props := m.getConf(ctx)
  54. if c, ok := props["concurrency"]; ok {
  55. if t, err := common.ToInt(c); err != nil || t <= 0 {
  56. logger.Warnf("invalid type for concurrency property, should be positive integer but found %t", c)
  57. } else {
  58. m.concurrency = t
  59. }
  60. }
  61. bl := 102400
  62. if c, ok := props["bufferLength"]; ok {
  63. if t, err := common.ToInt(c); err != nil || t <= 0 {
  64. logger.Warnf("invalid type for bufferLength property, should be positive integer but found %t", c)
  65. } else {
  66. bl = t
  67. }
  68. }
  69. m.reset()
  70. logger.Infof("open source node %d instances", m.concurrency)
  71. for i := 0; i < m.concurrency; i++ { // workers
  72. go func(instance int) {
  73. //Do open source instances
  74. var source api.Source
  75. var err error
  76. if !m.isMock {
  77. source, err = getSource(m.sourceType)
  78. if err != nil {
  79. m.drainError(errCh, err, ctx, logger)
  80. return
  81. }
  82. err = source.Configure(m.options["DATASOURCE"], props)
  83. if err != nil {
  84. m.drainError(errCh, err, ctx, logger)
  85. return
  86. }
  87. m.mutex.Lock()
  88. m.sources = append(m.sources, source)
  89. m.mutex.Unlock()
  90. } else {
  91. logger.Debugf("get source instance %d from %d sources", instance, len(m.sources))
  92. source = m.sources[instance]
  93. }
  94. stats, err := NewStatManager("source", ctx)
  95. if err != nil {
  96. m.drainError(errCh, err, ctx, logger)
  97. return
  98. }
  99. m.mutex.Lock()
  100. m.statManagers = append(m.statManagers, stats)
  101. m.mutex.Unlock()
  102. if rw, ok := source.(api.Rewindable); ok {
  103. if offset, err := ctx.GetState(OFFSET_KEY); err != nil {
  104. m.drainError(errCh, err, ctx, logger)
  105. } else if offset != nil {
  106. logger.Infof("Source rewind from %v", offset)
  107. err = rw.Rewind(offset)
  108. if err != nil {
  109. m.drainError(errCh, err, ctx, logger)
  110. }
  111. }
  112. }
  113. buffer := NewDynamicChannelBuffer()
  114. buffer.SetLimit(bl)
  115. sourceErrCh := make(chan error)
  116. go source.Open(ctx.WithInstance(instance), buffer.In, sourceErrCh)
  117. logger.Infof("Start source %s instance %d successfully", m.name, instance)
  118. for {
  119. select {
  120. case <-ctx.Done():
  121. logger.Infof("source %s done", m.name)
  122. m.close(ctx, logger)
  123. return
  124. case err := <-sourceErrCh:
  125. m.drainError(errCh, err, ctx, logger)
  126. return
  127. case data := <-buffer.Out:
  128. stats.IncTotalRecordsIn()
  129. stats.ProcessTimeStart()
  130. tuple := &xsql.Tuple{Emitter: m.name, Message: data.Message(), Timestamp: common.GetNowInMilli(), Metadata: data.Meta()}
  131. stats.ProcessTimeEnd()
  132. logger.Debugf("source node %s is sending tuple %+v of timestamp %d", m.name, tuple, tuple.Timestamp)
  133. //blocking
  134. m.Broadcast(tuple)
  135. stats.IncTotalRecordsOut()
  136. stats.SetBufferLength(int64(buffer.GetLength()))
  137. if rw, ok := source.(api.Rewindable); ok {
  138. if offset, err := rw.GetOffset(); err != nil {
  139. m.drainError(errCh, err, ctx, logger)
  140. } else {
  141. err = ctx.PutState(OFFSET_KEY, offset)
  142. if err != nil {
  143. m.drainError(errCh, err, ctx, logger)
  144. }
  145. logger.Debugf("Source save offset %v", offset)
  146. }
  147. }
  148. logger.Debugf("source node %s has consumed tuple of timestamp %d", m.name, tuple.Timestamp)
  149. }
  150. }
  151. }(i)
  152. }
  153. }()
  154. }
  155. func (m *SourceNode) reset() {
  156. if !m.isMock {
  157. m.sources = nil
  158. }
  159. m.statManagers = nil
  160. }
  161. func doGetSource(t string) (api.Source, error) {
  162. var (
  163. s api.Source
  164. err error
  165. )
  166. switch t {
  167. case "mqtt":
  168. s = &extensions.MQTTSource{}
  169. case "httppull":
  170. s = &extensions.HTTPPullSource{}
  171. default:
  172. s, err = plugins.GetSource(t)
  173. if err != nil {
  174. return nil, err
  175. }
  176. }
  177. return s, nil
  178. }
  179. func (m *SourceNode) drainError(errCh chan<- error, err error, ctx api.StreamContext, logger api.Logger) {
  180. select {
  181. case errCh <- err:
  182. case <-ctx.Done():
  183. m.close(ctx, logger)
  184. }
  185. return
  186. }
  187. func (m *SourceNode) close(ctx api.StreamContext, logger api.Logger) {
  188. for _, s := range m.sources {
  189. if err := s.Close(ctx); err != nil {
  190. logger.Warnf("close source fails: %v", err)
  191. }
  192. }
  193. }
  194. func (m *SourceNode) getConf(ctx api.StreamContext) map[string]interface{} {
  195. confkey := m.options["CONF_KEY"]
  196. logger := ctx.GetLogger()
  197. confPath := "sources/" + m.sourceType + ".yaml"
  198. if m.sourceType == "mqtt" {
  199. confPath = "mqtt_source.yaml"
  200. }
  201. conf, err := common.LoadConf(confPath)
  202. props := make(map[string]interface{})
  203. if err == nil {
  204. cfg := make(map[interface{}]interface{})
  205. if err := yaml.Unmarshal(conf, &cfg); err != nil {
  206. logger.Warnf("fail to parse yaml for source %s. Return an empty configuration", m.sourceType)
  207. } else {
  208. def, ok := cfg["default"]
  209. if !ok {
  210. logger.Warnf("default conf is not found", confkey)
  211. } else {
  212. if def1, ok1 := def.(map[interface{}]interface{}); ok1 {
  213. props = common.ConvertMap(def1)
  214. }
  215. if c, ok := cfg[confkey]; ok {
  216. if c1, ok := c.(map[interface{}]interface{}); ok {
  217. c2 := common.ConvertMap(c1)
  218. for k, v := range c2 {
  219. props[k] = v
  220. }
  221. }
  222. }
  223. }
  224. }
  225. } else {
  226. logger.Warnf("config file %s.yaml is not loaded properly. Return an empty configuration", m.sourceType)
  227. }
  228. logger.Debugf("get conf for %s with conf key %s: %v", m.sourceType, confkey, props)
  229. return props
  230. }