node.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package node
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/internal/binder/io"
  18. "github.com/lf-edge/ekuiper/internal/conf"
  19. "github.com/lf-edge/ekuiper/internal/topo/checkpoint"
  20. "github.com/lf-edge/ekuiper/internal/topo/context"
  21. "github.com/lf-edge/ekuiper/internal/topo/node/metric"
  22. "github.com/lf-edge/ekuiper/internal/xsql"
  23. "github.com/lf-edge/ekuiper/pkg/api"
  24. )
  25. type OperatorNode interface {
  26. api.Operator
  27. Broadcast(data interface{}) error
  28. GetStreamContext() api.StreamContext
  29. GetInputCount() int
  30. AddInputCount()
  31. SetQos(api.Qos)
  32. SetBarrierHandler(checkpoint.BarrierHandler)
  33. RemoveMetrics(name string)
  34. }
  35. type DataSourceNode interface {
  36. api.Emitter
  37. Open(ctx api.StreamContext, errCh chan<- error)
  38. GetName() string
  39. GetMetrics() [][]interface{}
  40. RemoveMetrics(ruleId string)
  41. Broadcast(val interface{}) error
  42. GetStreamContext() api.StreamContext
  43. SetQos(api.Qos)
  44. }
  45. type defaultNode struct {
  46. name string
  47. outputs map[string]chan<- interface{}
  48. concurrency int
  49. sendError bool
  50. statManagers []metric.StatManager
  51. ctx api.StreamContext
  52. qos api.Qos
  53. }
  54. func (o *defaultNode) AddOutput(output chan<- interface{}, name string) error {
  55. if _, ok := o.outputs[name]; !ok {
  56. o.outputs[name] = output
  57. } else {
  58. return fmt.Errorf("fail to add output %s, node %s already has an output of the same name", name, o.name)
  59. }
  60. return nil
  61. }
  62. func (o *defaultNode) GetName() string {
  63. return o.name
  64. }
  65. // SetConcurrency sets the concurrency level for the operation
  66. func (o *defaultNode) SetConcurrency(concurr int) {
  67. o.concurrency = concurr
  68. if o.concurrency < 1 {
  69. o.concurrency = 1
  70. }
  71. }
  72. func (o *defaultNode) SetQos(qos api.Qos) {
  73. o.qos = qos
  74. }
  75. func (o *defaultNode) GetMetrics() (result [][]interface{}) {
  76. for _, stats := range o.statManagers {
  77. result = append(result, stats.GetMetrics())
  78. }
  79. return result
  80. }
  81. func (o *defaultNode) RemoveMetrics(ruleId string) {
  82. for _, stats := range o.statManagers {
  83. stats.Clean(ruleId)
  84. }
  85. }
  86. func (o *defaultNode) Broadcast(val interface{}) error {
  87. if _, ok := val.(error); ok && !o.sendError {
  88. return nil
  89. }
  90. if o.qos >= api.AtLeastOnce {
  91. boe := &checkpoint.BufferOrEvent{
  92. Data: val,
  93. Channel: o.name,
  94. }
  95. o.doBroadcast(boe)
  96. return nil
  97. }
  98. o.doBroadcast(val)
  99. return nil
  100. }
  101. func (o *defaultNode) doBroadcast(val interface{}) {
  102. for name, out := range o.outputs {
  103. select {
  104. case out <- val:
  105. // do nothing
  106. case <-o.ctx.Done():
  107. // rule stop so stop waiting
  108. default:
  109. o.statManagers[0].IncTotalExceptions(fmt.Sprintf("buffer full, drop message from to %s", name))
  110. o.ctx.GetLogger().Debugf("drop message from %s to %s", o.name, name)
  111. }
  112. switch vt := val.(type) {
  113. case xsql.Collection:
  114. val = vt.Clone()
  115. break
  116. case xsql.TupleRow:
  117. val = vt.Clone()
  118. }
  119. }
  120. }
  121. func (o *defaultNode) GetStreamContext() api.StreamContext {
  122. return o.ctx
  123. }
  124. type defaultSinkNode struct {
  125. *defaultNode
  126. input chan interface{}
  127. barrierHandler checkpoint.BarrierHandler
  128. inputCount int
  129. }
  130. func (o *defaultSinkNode) GetInput() (chan<- interface{}, string) {
  131. return o.input, o.name
  132. }
  133. func (o *defaultSinkNode) GetInputCount() int {
  134. return o.inputCount
  135. }
  136. func (o *defaultSinkNode) AddInputCount() {
  137. o.inputCount++
  138. }
  139. func (o *defaultSinkNode) SetBarrierHandler(bh checkpoint.BarrierHandler) {
  140. o.barrierHandler = bh
  141. }
  142. // return the data and if processed
  143. func (o *defaultSinkNode) preprocess(data interface{}) (interface{}, bool) {
  144. if o.qos >= api.AtLeastOnce {
  145. logger := o.ctx.GetLogger()
  146. logger.Debugf("%s preprocess receive data %+v", o.name, data)
  147. b, ok := data.(*checkpoint.BufferOrEvent)
  148. if ok {
  149. logger.Debugf("data is BufferOrEvent, start barrier handler")
  150. // if it is barrier return true and ignore the further processing
  151. // if it is blocked(align handler), return true and then write back to the channel later
  152. if o.barrierHandler.Process(b, o.ctx) {
  153. return nil, true
  154. } else {
  155. return b.Data, false
  156. }
  157. }
  158. }
  159. return data, false
  160. }
  161. func SinkOpen(sinkType string, config map[string]interface{}) error {
  162. sink, err := getSink(sinkType, config)
  163. if err != nil {
  164. return err
  165. }
  166. contextLogger := conf.Log.WithField("rule", "TestSinkOpen"+"_"+sinkType)
  167. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  168. defer func() {
  169. _ = sink.Close(ctx)
  170. }()
  171. return sink.Open(ctx)
  172. }
  173. func SourceOpen(sourceType string, config map[string]interface{}) error {
  174. dataSource := "/$$TEST_CONNECTION$$"
  175. if v, ok := config["DATASOURCE"]; ok {
  176. dataSource = v.(string)
  177. }
  178. ns, err := io.Source(sourceType)
  179. if err != nil {
  180. return err
  181. }
  182. if ns == nil {
  183. lns, err := io.LookupSource(sourceType)
  184. if err != nil {
  185. return err
  186. }
  187. if lns == nil {
  188. // should not happen
  189. return fmt.Errorf("source %s not found", sourceType)
  190. }
  191. err = lns.Configure(dataSource, config)
  192. if err != nil {
  193. return err
  194. }
  195. contextLogger := conf.Log.WithField("rule", "TestSourceOpen"+"_"+sourceType)
  196. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  197. lns.Close(ctx)
  198. } else {
  199. err = ns.Configure(dataSource, config)
  200. if err != nil {
  201. return err
  202. }
  203. contextLogger := conf.Log.WithField("rule", "TestSourceOpen"+"_"+sourceType)
  204. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  205. ns.Close(ctx)
  206. }
  207. return nil
  208. }