lookup_node.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package node
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/internal/conf"
  18. "github.com/lf-edge/ekuiper/internal/topo/lookup"
  19. "github.com/lf-edge/ekuiper/internal/topo/node/metric"
  20. "github.com/lf-edge/ekuiper/internal/xsql"
  21. "github.com/lf-edge/ekuiper/pkg/api"
  22. "github.com/lf-edge/ekuiper/pkg/ast"
  23. "github.com/lf-edge/ekuiper/pkg/infra"
  24. )
  25. // LookupNode will look up the data from the external source when receiving an event
  26. type LookupNode struct {
  27. *defaultSinkNode
  28. statManager metric.StatManager
  29. sourceType string
  30. joinType ast.JoinType
  31. vals []ast.Expr
  32. srcOptions *ast.Options
  33. fields []string
  34. keys []string
  35. }
  36. func NewLookupNode(name string, fields []string, keys []string, joinType ast.JoinType, vals []ast.Expr, srcOptions *ast.Options, options *api.RuleOption) (*LookupNode, error) {
  37. t := srcOptions.TYPE
  38. if t == "" {
  39. return nil, fmt.Errorf("source type is not specified")
  40. }
  41. n := &LookupNode{
  42. fields: fields,
  43. keys: keys,
  44. srcOptions: srcOptions,
  45. sourceType: t,
  46. joinType: joinType,
  47. vals: vals,
  48. }
  49. n.defaultSinkNode = &defaultSinkNode{
  50. input: make(chan interface{}, options.BufferLength),
  51. defaultNode: &defaultNode{
  52. outputs: make(map[string]chan<- interface{}),
  53. name: name,
  54. sendError: options.SendError,
  55. },
  56. }
  57. return n, nil
  58. }
  59. func (n *LookupNode) Exec(ctx api.StreamContext, errCh chan<- error) {
  60. n.ctx = ctx
  61. log := ctx.GetLogger()
  62. log.Debugf("LookupNode %s is started", n.name)
  63. if len(n.outputs) <= 0 {
  64. infra.DrainError(ctx, fmt.Errorf("no output channel found"), errCh)
  65. return
  66. }
  67. stats, err := metric.NewStatManager(ctx, "op")
  68. if err != nil {
  69. infra.DrainError(ctx, fmt.Errorf("no output channel found"), errCh)
  70. return
  71. }
  72. n.statManager = stats
  73. go func() {
  74. err := infra.SafeRun(func() error {
  75. ns, err := lookup.Attach(n.name)
  76. if err != nil {
  77. return err
  78. }
  79. defer lookup.Detach(n.name)
  80. fv, _ := xsql.NewFunctionValuersForOp(ctx)
  81. // Start the lookup source loop
  82. for {
  83. log.Debugf("LookupNode %s is looping", n.name)
  84. select {
  85. // process incoming item from both streams(transformed) and tables
  86. case item, opened := <-n.input:
  87. processed := false
  88. if item, processed = n.preprocess(item); processed {
  89. break
  90. }
  91. n.statManager.IncTotalRecordsIn()
  92. n.statManager.ProcessTimeStart()
  93. if !opened {
  94. n.statManager.IncTotalExceptions("input channel closed")
  95. break
  96. }
  97. switch d := item.(type) {
  98. case error:
  99. n.Broadcast(d)
  100. n.statManager.IncTotalExceptions(d.Error())
  101. case xsql.TupleRow:
  102. log.Debugf("Lookup Node receive tuple input %s", d)
  103. n.statManager.ProcessTimeStart()
  104. sets := &xsql.JoinTuples{Content: make([]*xsql.JoinTuple, 0)}
  105. err := n.lookup(ctx, d, fv, ns, sets)
  106. if err != nil {
  107. n.Broadcast(err)
  108. n.statManager.IncTotalExceptions(err.Error())
  109. } else {
  110. n.Broadcast(sets)
  111. n.statManager.IncTotalRecordsOut()
  112. }
  113. n.statManager.ProcessTimeEnd()
  114. n.statManager.SetBufferLength(int64(len(n.input)))
  115. case *xsql.WindowTuples:
  116. log.Debugf("Lookup Node receive window input %s", d)
  117. n.statManager.ProcessTimeStart()
  118. sets := &xsql.JoinTuples{Content: make([]*xsql.JoinTuple, 0)}
  119. err := d.Range(func(i int, r xsql.ReadonlyRow) (bool, error) {
  120. tr, ok := r.(xsql.TupleRow)
  121. if !ok {
  122. return false, fmt.Errorf("Invalid window element, must be a tuple row but got %v", r)
  123. }
  124. err := n.lookup(ctx, tr, fv, ns, sets)
  125. if err != nil {
  126. return false, err
  127. }
  128. return true, nil
  129. })
  130. if err != nil {
  131. n.Broadcast(err)
  132. n.statManager.IncTotalExceptions(err.Error())
  133. } else {
  134. n.Broadcast(sets)
  135. n.statManager.IncTotalRecordsOut()
  136. }
  137. n.statManager.ProcessTimeEnd()
  138. n.statManager.SetBufferLength(int64(len(n.input)))
  139. default:
  140. e := fmt.Errorf("run lookup node error: invalid input type but got %[1]T(%[1]v)", d)
  141. n.Broadcast(e)
  142. n.statManager.IncTotalExceptions(e.Error())
  143. }
  144. case <-ctx.Done():
  145. log.Infoln("Cancelling lookup node....")
  146. return nil
  147. }
  148. }
  149. })
  150. if err != nil {
  151. infra.DrainError(ctx, err, errCh)
  152. }
  153. }()
  154. }
  155. func (n *LookupNode) lookup(ctx api.StreamContext, d xsql.TupleRow, fv *xsql.FunctionValuer, ns api.LookupSource, tuples *xsql.JoinTuples) error {
  156. ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(d, fv)}
  157. cvs := make([]interface{}, len(n.vals))
  158. hasNil := false
  159. for i, val := range n.vals {
  160. cvs[i] = ve.Eval(val)
  161. if cvs[i] == nil {
  162. hasNil = true
  163. }
  164. }
  165. var (
  166. r []api.SourceTuple
  167. e error
  168. )
  169. if !hasNil { // if any of the value is nil, the lookup will always return empty result
  170. r, e = ns.Lookup(ctx, n.fields, n.keys, cvs)
  171. }
  172. if e != nil {
  173. return e
  174. } else {
  175. if len(r) == 0 {
  176. if n.joinType == ast.LEFT_JOIN {
  177. merged := &xsql.JoinTuple{}
  178. merged.AddTuple(d)
  179. tuples.Content = append(tuples.Content, merged)
  180. } else {
  181. ctx.GetLogger().Debugf("Lookup Node %s no result found for tuple %s", n.name, d)
  182. return nil
  183. }
  184. }
  185. for _, v := range r {
  186. merged := &xsql.JoinTuple{}
  187. merged.AddTuple(d)
  188. t := &xsql.Tuple{
  189. Emitter: n.name,
  190. Message: v.Message(),
  191. Metadata: v.Meta(),
  192. Timestamp: conf.GetNowInMilli(),
  193. }
  194. merged.AddTuple(t)
  195. tuples.Content = append(tuples.Content, merged)
  196. }
  197. return nil
  198. }
  199. }
  200. func (n *LookupNode) GetMetrics() [][]interface{} {
  201. if n.statManager != nil {
  202. return [][]interface{}{
  203. n.statManager.GetMetrics(),
  204. }
  205. } else {
  206. return nil
  207. }
  208. }
  209. func (n *LookupNode) merge(ctx api.StreamContext, d xsql.TupleRow, r []map[string]interface{}) {
  210. n.statManager.ProcessTimeStart()
  211. sets := &xsql.JoinTuples{Content: make([]*xsql.JoinTuple, 0)}
  212. if len(r) == 0 {
  213. if n.joinType == ast.LEFT_JOIN {
  214. merged := &xsql.JoinTuple{}
  215. merged.AddTuple(d)
  216. sets.Content = append(sets.Content, merged)
  217. } else {
  218. ctx.GetLogger().Debugf("Lookup Node %s no result found for tuple %s", n.name, d)
  219. return
  220. }
  221. }
  222. for _, v := range r {
  223. merged := &xsql.JoinTuple{}
  224. merged.AddTuple(d)
  225. t := &xsql.Tuple{
  226. Emitter: n.name,
  227. Message: v,
  228. Timestamp: conf.GetNowInMilli(),
  229. }
  230. merged.AddTuple(t)
  231. sets.Content = append(sets.Content, merged)
  232. }
  233. n.Broadcast(sets)
  234. n.statManager.ProcessTimeEnd()
  235. n.statManager.IncTotalRecordsOut()
  236. n.statManager.SetBufferLength(int64(len(n.input)))
  237. }