lookup_node.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package node
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/internal/binder/io"
  18. "github.com/lf-edge/ekuiper/internal/conf"
  19. "github.com/lf-edge/ekuiper/internal/topo/node/metric"
  20. "github.com/lf-edge/ekuiper/internal/xsql"
  21. "github.com/lf-edge/ekuiper/pkg/api"
  22. "github.com/lf-edge/ekuiper/pkg/ast"
  23. "github.com/lf-edge/ekuiper/pkg/infra"
  24. )
  25. // LookupNode will look up the data from the external source when receiving an event
  26. type LookupNode struct {
  27. *defaultSinkNode
  28. statManager metric.StatManager
  29. sourceType string
  30. joinType ast.JoinType
  31. vals []ast.Expr
  32. srcOptions *ast.Options
  33. Keys []string
  34. }
  35. func NewLookupNode(name string, keys []string, joinType ast.JoinType, vals []ast.Expr, srcOptions *ast.Options, options *api.RuleOption) (*LookupNode, error) {
  36. t := srcOptions.TYPE
  37. if t == "" {
  38. return nil, fmt.Errorf("source type is not specified")
  39. }
  40. n := &LookupNode{
  41. Keys: keys,
  42. srcOptions: srcOptions,
  43. sourceType: t,
  44. joinType: joinType,
  45. vals: vals,
  46. }
  47. n.defaultSinkNode = &defaultSinkNode{
  48. input: make(chan interface{}, options.BufferLength),
  49. defaultNode: &defaultNode{
  50. outputs: make(map[string]chan<- interface{}),
  51. name: name,
  52. sendError: options.SendError,
  53. },
  54. }
  55. return n, nil
  56. }
  57. func (n *LookupNode) Exec(ctx api.StreamContext, errCh chan<- error) {
  58. n.ctx = ctx
  59. log := ctx.GetLogger()
  60. log.Debugf("LookupNode %s is started", n.name)
  61. if len(n.outputs) <= 0 {
  62. infra.DrainError(ctx, fmt.Errorf("no output channel found"), errCh)
  63. return
  64. }
  65. stats, err := metric.NewStatManager(ctx, "op")
  66. if err != nil {
  67. infra.DrainError(ctx, fmt.Errorf("no output channel found"), errCh)
  68. return
  69. }
  70. n.statManager = stats
  71. go func() {
  72. err := infra.SafeRun(func() error {
  73. props := getSourceConf(ctx, n.sourceType, n.srcOptions)
  74. ctx.GetLogger().Infof("open lookup source node with props %v", conf.Printable(props))
  75. // Create the lookup source according to the source options
  76. ns, err := io.LookupSource(n.sourceType)
  77. if err != nil {
  78. return err
  79. }
  80. err = ns.Configure(n.srcOptions.DATASOURCE, props, n.Keys)
  81. if err != nil {
  82. return err
  83. }
  84. err = ns.Open(ctx)
  85. if err != nil {
  86. return err
  87. }
  88. fv, _ := xsql.NewFunctionValuersForOp(ctx)
  89. // Start the lookup source loop
  90. for {
  91. log.Debugf("LookupNode %s is looping", n.name)
  92. select {
  93. // process incoming item from both streams(transformed) and tables
  94. case item, opened := <-n.input:
  95. processed := false
  96. if item, processed = n.preprocess(item); processed {
  97. break
  98. }
  99. n.statManager.IncTotalRecordsIn()
  100. n.statManager.ProcessTimeStart()
  101. if !opened {
  102. n.statManager.IncTotalExceptions("input channel closed")
  103. break
  104. }
  105. switch d := item.(type) {
  106. case error:
  107. n.Broadcast(d)
  108. n.statManager.IncTotalExceptions(d.Error())
  109. case xsql.TupleRow:
  110. log.Debugf("Lookup Node receive tuple input %s", d)
  111. n.statManager.ProcessTimeStart()
  112. sets := &xsql.JoinTuples{Content: make([]*xsql.JoinTuple, 0)}
  113. err := n.lookup(ctx, d, fv, ns, sets)
  114. if err != nil {
  115. n.Broadcast(err)
  116. n.statManager.IncTotalExceptions(err.Error())
  117. } else {
  118. n.Broadcast(sets)
  119. n.statManager.IncTotalRecordsOut()
  120. }
  121. n.statManager.ProcessTimeEnd()
  122. n.statManager.SetBufferLength(int64(len(n.input)))
  123. case *xsql.WindowTuples:
  124. log.Debugf("Lookup Node receive window input %s", d)
  125. n.statManager.ProcessTimeStart()
  126. sets := &xsql.JoinTuples{Content: make([]*xsql.JoinTuple, 0)}
  127. err := d.Range(func(i int, r xsql.ReadonlyRow) (bool, error) {
  128. tr, ok := r.(xsql.TupleRow)
  129. if !ok {
  130. return false, fmt.Errorf("Invalid window element, must be a tuple row but got %v", r)
  131. }
  132. err := n.lookup(ctx, tr, fv, ns, sets)
  133. if err != nil {
  134. return false, err
  135. }
  136. return true, nil
  137. })
  138. if err != nil {
  139. n.Broadcast(err)
  140. n.statManager.IncTotalExceptions(err.Error())
  141. } else {
  142. n.Broadcast(sets)
  143. n.statManager.IncTotalRecordsOut()
  144. }
  145. n.statManager.ProcessTimeEnd()
  146. n.statManager.SetBufferLength(int64(len(n.input)))
  147. default:
  148. e := fmt.Errorf("run lookup node error: invalid input type but got %[1]T(%[1]v)", d)
  149. n.Broadcast(e)
  150. n.statManager.IncTotalExceptions(e.Error())
  151. }
  152. case <-ctx.Done():
  153. log.Infoln("Cancelling lookup node....")
  154. return nil
  155. }
  156. }
  157. })
  158. if err != nil {
  159. infra.DrainError(ctx, err, errCh)
  160. }
  161. }()
  162. }
  163. func (n *LookupNode) lookup(ctx api.StreamContext, d xsql.TupleRow, fv *xsql.FunctionValuer, ns api.LookupSource, tuples *xsql.JoinTuples) error {
  164. ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(d, fv)}
  165. cvs := make([]interface{}, len(n.vals))
  166. for i, val := range n.vals {
  167. cvs[i] = ve.Eval(val)
  168. }
  169. r, e := ns.Lookup(ctx, cvs)
  170. if e != nil {
  171. return e
  172. } else {
  173. if len(r) == 0 {
  174. if n.joinType == ast.LEFT_JOIN {
  175. merged := &xsql.JoinTuple{}
  176. merged.AddTuple(d)
  177. tuples.Content = append(tuples.Content, merged)
  178. } else {
  179. ctx.GetLogger().Debugf("Lookup Node %s no result found for tuple %s", n.name, d)
  180. return nil
  181. }
  182. }
  183. for _, v := range r {
  184. merged := &xsql.JoinTuple{}
  185. merged.AddTuple(d)
  186. t := &xsql.Tuple{
  187. Emitter: n.name,
  188. Message: v,
  189. Timestamp: conf.GetNowInMilli(),
  190. }
  191. merged.AddTuple(t)
  192. tuples.Content = append(tuples.Content, merged)
  193. }
  194. return nil
  195. }
  196. }
  197. func (n *LookupNode) GetMetrics() [][]interface{} {
  198. if n.statManager != nil {
  199. return [][]interface{}{
  200. n.statManager.GetMetrics(),
  201. }
  202. } else {
  203. return nil
  204. }
  205. }
  206. func (n *LookupNode) merge(ctx api.StreamContext, d xsql.TupleRow, r []map[string]interface{}) {
  207. n.statManager.ProcessTimeStart()
  208. sets := &xsql.JoinTuples{Content: make([]*xsql.JoinTuple, 0)}
  209. if len(r) == 0 {
  210. if n.joinType == ast.LEFT_JOIN {
  211. merged := &xsql.JoinTuple{}
  212. merged.AddTuple(d)
  213. sets.Content = append(sets.Content, merged)
  214. } else {
  215. ctx.GetLogger().Debugf("Lookup Node %s no result found for tuple %s", n.name, d)
  216. return
  217. }
  218. }
  219. for _, v := range r {
  220. merged := &xsql.JoinTuple{}
  221. merged.AddTuple(d)
  222. t := &xsql.Tuple{
  223. Emitter: n.name,
  224. Message: v,
  225. Timestamp: conf.GetNowInMilli(),
  226. }
  227. merged.AddTuple(t)
  228. sets.Content = append(sets.Content, merged)
  229. }
  230. n.Broadcast(sets)
  231. n.statManager.ProcessTimeEnd()
  232. n.statManager.IncTotalRecordsOut()
  233. n.statManager.SetBufferLength(int64(len(n.input)))
  234. }