lookup_node.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package node
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/internal/binder/io"
  18. "github.com/lf-edge/ekuiper/internal/conf"
  19. "github.com/lf-edge/ekuiper/internal/topo/node/metric"
  20. "github.com/lf-edge/ekuiper/internal/xsql"
  21. "github.com/lf-edge/ekuiper/pkg/api"
  22. "github.com/lf-edge/ekuiper/pkg/ast"
  23. "github.com/lf-edge/ekuiper/pkg/infra"
  24. )
  25. // LookupNode will look up the data from the external source when receiving an event
  26. type LookupNode struct {
  27. *defaultSinkNode
  28. statManager metric.StatManager
  29. sourceType string
  30. joinType ast.JoinType
  31. vals []ast.Expr
  32. srcOptions *ast.Options
  33. Keys []string
  34. }
  35. func NewLookupNode(name string, keys []string, joinType ast.JoinType, vals []ast.Expr, srcOptions *ast.Options, options *api.RuleOption) (*LookupNode, error) {
  36. t := srcOptions.TYPE
  37. if t == "" {
  38. return nil, fmt.Errorf("source type is not specified")
  39. }
  40. n := &LookupNode{
  41. Keys: keys,
  42. srcOptions: srcOptions,
  43. sourceType: t,
  44. joinType: joinType,
  45. vals: vals,
  46. }
  47. n.defaultSinkNode = &defaultSinkNode{
  48. input: make(chan interface{}, options.BufferLength),
  49. defaultNode: &defaultNode{
  50. outputs: make(map[string]chan<- interface{}),
  51. name: name,
  52. sendError: options.SendError,
  53. },
  54. }
  55. return n, nil
  56. }
  57. func (n *LookupNode) Exec(ctx api.StreamContext, errCh chan<- error) {
  58. n.ctx = ctx
  59. log := ctx.GetLogger()
  60. log.Debugf("LookupNode %s is started", n.name)
  61. if len(n.outputs) <= 0 {
  62. infra.DrainError(ctx, fmt.Errorf("no output channel found"), errCh)
  63. return
  64. }
  65. stats, err := metric.NewStatManager(ctx, "op")
  66. if err != nil {
  67. infra.DrainError(ctx, fmt.Errorf("no output channel found"), errCh)
  68. return
  69. }
  70. n.statManager = stats
  71. go func() {
  72. err := infra.SafeRun(func() error {
  73. props := getSourceConf(ctx, n.sourceType, n.srcOptions)
  74. ctx.GetLogger().Infof("open lookup source node with props %v", conf.Printable(props))
  75. // Create the lookup source according to the source options
  76. ns, err := io.LookupSource(n.sourceType)
  77. if err != nil {
  78. return err
  79. }
  80. err = ns.Configure(n.srcOptions.DATASOURCE, props, n.Keys)
  81. if err != nil {
  82. return err
  83. }
  84. err = ns.Open(ctx)
  85. if err != nil {
  86. return err
  87. }
  88. fv, _ := xsql.NewFunctionValuersForOp(ctx)
  89. // Start the lookup source loop
  90. for {
  91. log.Debugf("LookupNode %s is looping", n.name)
  92. select {
  93. // process incoming item from both streams(transformed) and tables
  94. case item, opened := <-n.input:
  95. processed := false
  96. if item, processed = n.preprocess(item); processed {
  97. break
  98. }
  99. n.statManager.IncTotalRecordsIn()
  100. n.statManager.ProcessTimeStart()
  101. if !opened {
  102. n.statManager.IncTotalExceptions("input channel closed")
  103. break
  104. }
  105. switch d := item.(type) {
  106. case error:
  107. n.Broadcast(d)
  108. n.statManager.IncTotalExceptions(d.Error())
  109. case xsql.TupleRow:
  110. log.Debugf("Lookup Node receive tuple input %s", d)
  111. n.statManager.ProcessTimeStart()
  112. sets := &xsql.JoinTuples{Content: make([]*xsql.JoinTuple, 0)}
  113. err := n.lookup(ctx, d, fv, ns, sets)
  114. if err != nil {
  115. n.Broadcast(err)
  116. n.statManager.IncTotalExceptions(err.Error())
  117. } else {
  118. n.Broadcast(sets)
  119. n.statManager.IncTotalRecordsOut()
  120. }
  121. n.statManager.ProcessTimeEnd()
  122. n.statManager.SetBufferLength(int64(len(n.input)))
  123. case *xsql.WindowTuples:
  124. log.Debugf("Lookup Node receive window input %s", d)
  125. n.statManager.ProcessTimeStart()
  126. sets := &xsql.JoinTuples{Content: make([]*xsql.JoinTuple, 0)}
  127. err := d.Range(func(i int, r xsql.ReadonlyRow) (bool, error) {
  128. tr, ok := r.(xsql.TupleRow)
  129. if !ok {
  130. return false, fmt.Errorf("Invalid window element, must be a tuple row but got %v", r)
  131. }
  132. err := n.lookup(ctx, tr, fv, ns, sets)
  133. if err != nil {
  134. return false, err
  135. }
  136. return true, nil
  137. })
  138. if err != nil {
  139. n.Broadcast(err)
  140. n.statManager.IncTotalExceptions(err.Error())
  141. } else {
  142. n.Broadcast(sets)
  143. n.statManager.IncTotalRecordsOut()
  144. }
  145. n.statManager.ProcessTimeEnd()
  146. n.statManager.SetBufferLength(int64(len(n.input)))
  147. default:
  148. e := fmt.Errorf("run lookup node error: invalid input type but got %[1]T(%[1]v)", d)
  149. n.Broadcast(e)
  150. n.statManager.IncTotalExceptions(e.Error())
  151. }
  152. case <-ctx.Done():
  153. log.Infoln("Cancelling lookup node....")
  154. return nil
  155. }
  156. }
  157. })
  158. if err != nil {
  159. infra.DrainError(ctx, err, errCh)
  160. }
  161. }()
  162. }
  163. func (n *LookupNode) lookup(ctx api.StreamContext, d xsql.TupleRow, fv *xsql.FunctionValuer, ns api.LookupSource, tuples *xsql.JoinTuples) error {
  164. ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(d, fv)}
  165. cvs := make([]interface{}, len(n.vals))
  166. for i, val := range n.vals {
  167. cvs[i] = ve.Eval(val)
  168. }
  169. r, e := ns.Lookup(ctx, cvs)
  170. if e != nil {
  171. return e
  172. } else {
  173. if len(r) == 0 {
  174. if n.joinType == ast.LEFT_JOIN {
  175. merged := &xsql.JoinTuple{}
  176. merged.AddTuple(d)
  177. tuples.Content = append(tuples.Content, merged)
  178. } else {
  179. ctx.GetLogger().Debugf("Lookup Node %s no result found for tuple %s", n.name, d)
  180. return nil
  181. }
  182. }
  183. for _, v := range r {
  184. merged := &xsql.JoinTuple{}
  185. merged.AddTuple(d)
  186. t := &xsql.Tuple{
  187. Emitter: n.name,
  188. Message: v.Message(),
  189. Metadata: v.Meta(),
  190. Timestamp: conf.GetNowInMilli(),
  191. }
  192. merged.AddTuple(t)
  193. tuples.Content = append(tuples.Content, merged)
  194. }
  195. return nil
  196. }
  197. }
  198. func (n *LookupNode) GetMetrics() [][]interface{} {
  199. if n.statManager != nil {
  200. return [][]interface{}{
  201. n.statManager.GetMetrics(),
  202. }
  203. } else {
  204. return nil
  205. }
  206. }
  207. func (n *LookupNode) merge(ctx api.StreamContext, d xsql.TupleRow, r []map[string]interface{}) {
  208. n.statManager.ProcessTimeStart()
  209. sets := &xsql.JoinTuples{Content: make([]*xsql.JoinTuple, 0)}
  210. if len(r) == 0 {
  211. if n.joinType == ast.LEFT_JOIN {
  212. merged := &xsql.JoinTuple{}
  213. merged.AddTuple(d)
  214. sets.Content = append(sets.Content, merged)
  215. } else {
  216. ctx.GetLogger().Debugf("Lookup Node %s no result found for tuple %s", n.name, d)
  217. return
  218. }
  219. }
  220. for _, v := range r {
  221. merged := &xsql.JoinTuple{}
  222. merged.AddTuple(d)
  223. t := &xsql.Tuple{
  224. Emitter: n.name,
  225. Message: v,
  226. Timestamp: conf.GetNowInMilli(),
  227. }
  228. merged.AddTuple(t)
  229. sets.Content = append(sets.Content, merged)
  230. }
  231. n.Broadcast(sets)
  232. n.statManager.ProcessTimeEnd()
  233. n.statManager.IncTotalRecordsOut()
  234. n.statManager.SetBufferLength(int64(len(n.input)))
  235. }