node.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. package nodes
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "github.com/emqx/kuiper/xstream/api"
  6. "github.com/emqx/kuiper/xstream/checkpoints"
  7. "github.com/go-yaml/yaml"
  8. "strings"
  9. "sync"
  10. )
  11. type OperatorNode interface {
  12. api.Operator
  13. Broadcast(data interface{}) error
  14. GetStreamContext() api.StreamContext
  15. GetInputCount() int
  16. AddInputCount()
  17. SetQos(api.Qos)
  18. SetBarrierHandler(checkpoints.BarrierHandler)
  19. }
  20. type DataSourceNode interface {
  21. api.Emitter
  22. Open(ctx api.StreamContext, errCh chan<- error)
  23. GetName() string
  24. GetMetrics() [][]interface{}
  25. Broadcast(val interface{}) error
  26. GetStreamContext() api.StreamContext
  27. SetQos(api.Qos)
  28. }
  29. type defaultNode struct {
  30. name string
  31. outputs map[string]chan<- interface{}
  32. concurrency int
  33. sendError bool
  34. statManagers []StatManager
  35. ctx api.StreamContext
  36. qos api.Qos
  37. }
  38. func (o *defaultNode) AddOutput(output chan<- interface{}, name string) error {
  39. if _, ok := o.outputs[name]; !ok {
  40. o.outputs[name] = output
  41. } else {
  42. return fmt.Errorf("fail to add output %s, node %s already has an output of the same name", name, o.name)
  43. }
  44. return nil
  45. }
  46. func (o *defaultNode) GetName() string {
  47. return o.name
  48. }
  49. // SetConcurrency sets the concurrency level for the operation
  50. func (o *defaultNode) SetConcurrency(concurr int) {
  51. o.concurrency = concurr
  52. if o.concurrency < 1 {
  53. o.concurrency = 1
  54. }
  55. }
  56. func (o *defaultNode) SetQos(qos api.Qos) {
  57. o.qos = qos
  58. }
  59. func (o *defaultNode) GetMetrics() (result [][]interface{}) {
  60. for _, stats := range o.statManagers {
  61. result = append(result, stats.GetMetrics())
  62. }
  63. return result
  64. }
  65. func (o *defaultNode) Broadcast(val interface{}) error {
  66. if !o.sendError {
  67. if _, ok := val.(error); ok {
  68. return nil
  69. }
  70. }
  71. if o.qos >= api.AtLeastOnce {
  72. boe := &checkpoints.BufferOrEvent{
  73. Data: val,
  74. Channel: o.name,
  75. }
  76. return o.doBroadcast(boe)
  77. }
  78. return o.doBroadcast(val)
  79. }
  80. func (o *defaultNode) doBroadcast(val interface{}) error {
  81. logger := o.ctx.GetLogger()
  82. var wg sync.WaitGroup
  83. wg.Add(len(o.outputs))
  84. for n, out := range o.outputs {
  85. go func(name string, output chan<- interface{}) {
  86. select {
  87. case output <- val:
  88. logger.Debugf("broadcast from %s to %s done", o.ctx.GetOpId(), name)
  89. case <-o.ctx.Done():
  90. // rule stop so stop waiting
  91. }
  92. wg.Done()
  93. }(n, out)
  94. }
  95. logger.Debugf("broadcasting from %s", o.ctx.GetOpId())
  96. wg.Wait()
  97. return nil
  98. }
  99. func (o *defaultNode) GetStreamContext() api.StreamContext {
  100. return o.ctx
  101. }
  102. type defaultSinkNode struct {
  103. *defaultNode
  104. input chan interface{}
  105. barrierHandler checkpoints.BarrierHandler
  106. inputCount int
  107. }
  108. func (o *defaultSinkNode) GetInput() (chan<- interface{}, string) {
  109. return o.input, o.name
  110. }
  111. func (o *defaultSinkNode) GetInputCount() int {
  112. return o.inputCount
  113. }
  114. func (o *defaultSinkNode) AddInputCount() {
  115. o.inputCount++
  116. }
  117. func (o *defaultSinkNode) SetBarrierHandler(bh checkpoints.BarrierHandler) {
  118. o.barrierHandler = bh
  119. }
  120. // return the data and if processed
  121. func (o *defaultSinkNode) preprocess(data interface{}) (interface{}, bool) {
  122. if o.qos >= api.AtLeastOnce {
  123. logger := o.ctx.GetLogger()
  124. logger.Debugf("%s preprocess receive data %+v", o.name, data)
  125. b, ok := data.(*checkpoints.BufferOrEvent)
  126. if ok {
  127. logger.Debugf("data is BufferOrEvent, start barrier handler")
  128. //if it is barrier return true and ignore the further processing
  129. //if it is blocked(align handler), return true and then write back to the channel later
  130. if o.barrierHandler.Process(b, o.ctx) {
  131. return nil, true
  132. } else {
  133. return b.Data, false
  134. }
  135. }
  136. }
  137. return data, false
  138. }
  139. func getSourceConf(ctx api.StreamContext, sourceType string, options map[string]string) map[string]interface{} {
  140. confkey := options["CONF_KEY"]
  141. logger := ctx.GetLogger()
  142. confPath := "sources/" + sourceType + ".yaml"
  143. if sourceType == "mqtt" {
  144. confPath = "mqtt_source.yaml"
  145. }
  146. conf, err := common.LoadConf(confPath)
  147. props := make(map[string]interface{})
  148. if err == nil {
  149. cfg := make(map[interface{}]interface{})
  150. if err := yaml.Unmarshal(conf, &cfg); err != nil {
  151. logger.Warnf("fail to parse yaml for source %s. Return an empty configuration", sourceType)
  152. } else {
  153. def, ok := cfg["default"]
  154. if !ok {
  155. logger.Warnf("default conf is not found", confkey)
  156. } else {
  157. if def1, ok1 := def.(map[interface{}]interface{}); ok1 {
  158. props = common.ConvertMap(def1)
  159. }
  160. if c, ok := cfg[confkey]; ok {
  161. if c1, ok := c.(map[interface{}]interface{}); ok {
  162. c2 := common.ConvertMap(c1)
  163. for k, v := range c2 {
  164. props[k] = v
  165. }
  166. }
  167. }
  168. }
  169. }
  170. } else {
  171. logger.Warnf("config file %s.yaml is not loaded properly. Return an empty configuration", sourceType)
  172. }
  173. f, ok := options["FORMAT"]
  174. if !ok || f == "" {
  175. f = "json"
  176. }
  177. props["format"] = strings.ToLower(f)
  178. logger.Debugf("get conf for %s with conf key %s: %v", sourceType, confkey, props)
  179. return props
  180. }