lookup_node_test.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package node
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/internal/binder"
  18. "github.com/lf-edge/ekuiper/internal/binder/io"
  19. "github.com/lf-edge/ekuiper/internal/conf"
  20. "github.com/lf-edge/ekuiper/internal/topo/context"
  21. "github.com/lf-edge/ekuiper/internal/xsql"
  22. "github.com/lf-edge/ekuiper/pkg/api"
  23. "github.com/lf-edge/ekuiper/pkg/ast"
  24. "reflect"
  25. "testing"
  26. "time"
  27. )
  28. type mockLookupSrc struct {
  29. }
  30. func (m *mockLookupSrc) Open(_ api.StreamContext) error {
  31. return nil
  32. }
  33. func (m *mockLookupSrc) Configure(_ string, _ map[string]interface{}, _ []string) error {
  34. return nil
  35. }
  36. // Lookup accept int value as the first array value
  37. func (m *mockLookupSrc) Lookup(_ api.StreamContext, values []interface{}) ([]map[string]interface{}, error) {
  38. a1, ok := values[0].(int)
  39. if ok {
  40. var result []map[string]interface{}
  41. c := a1 % 2
  42. if c != 0 {
  43. result = append(result, map[string]interface{}{
  44. "newA": c,
  45. "newB": c * 2,
  46. })
  47. }
  48. c = a1 % 3
  49. if c != 0 {
  50. result = append(result, map[string]interface{}{
  51. "newA": c,
  52. "newB": c * 2,
  53. })
  54. }
  55. c = a1 % 5
  56. if c != 0 {
  57. result = append(result, map[string]interface{}{
  58. "newA": c,
  59. "newB": c * 2,
  60. })
  61. }
  62. c = a1 % 7
  63. if c != 0 {
  64. result = append(result, map[string]interface{}{
  65. "newA": c,
  66. "newB": c * 2,
  67. })
  68. }
  69. return result, nil
  70. } else {
  71. return []map[string]interface{}{
  72. {
  73. "newA": 0,
  74. "newB": 0,
  75. },
  76. }, nil
  77. }
  78. }
  79. func (m *mockLookupSrc) Close(ctx api.StreamContext) error {
  80. // do nothing
  81. return nil
  82. }
  83. type mockFac struct{}
  84. func (m *mockFac) Source(_ string) (api.Source, error) {
  85. return nil, nil
  86. }
  87. func (m *mockFac) LookupSource(name string) (api.LookupSource, error) {
  88. if name == "mock" {
  89. return &mockLookupSrc{}, nil
  90. }
  91. return nil, nil
  92. }
  93. // init mock lookup source factory
  94. func init() {
  95. io.Initialize([]binder.FactoryEntry{
  96. {Name: "native plugin", Factory: &mockFac{}, Weight: 10},
  97. })
  98. }
  99. func TestLookup(t *testing.T) {
  100. var tests = []struct {
  101. input interface{} // a tuple or a window
  102. output *xsql.JoinTuples
  103. }{
  104. {
  105. input: &xsql.Tuple{
  106. Emitter: "demo",
  107. Message: map[string]interface{}{
  108. "a": 6,
  109. "b": "aaaa",
  110. },
  111. },
  112. output: &xsql.JoinTuples{
  113. Content: []*xsql.JoinTuple{
  114. {
  115. Tuples: []xsql.TupleRow{
  116. &xsql.Tuple{
  117. Emitter: "demo",
  118. Message: map[string]interface{}{
  119. "a": 6,
  120. "b": "aaaa",
  121. },
  122. },
  123. &xsql.Tuple{
  124. Emitter: "mock",
  125. Message: map[string]interface{}{
  126. "newA": 1,
  127. "newB": 2,
  128. },
  129. },
  130. },
  131. }, {
  132. Tuples: []xsql.TupleRow{
  133. &xsql.Tuple{
  134. Emitter: "demo",
  135. Message: map[string]interface{}{
  136. "a": 6,
  137. "b": "aaaa",
  138. },
  139. },
  140. &xsql.Tuple{
  141. Emitter: "mock",
  142. Message: map[string]interface{}{
  143. "newA": 6,
  144. "newB": 12,
  145. },
  146. },
  147. },
  148. },
  149. },
  150. },
  151. },
  152. {
  153. input: &xsql.WindowTuples{
  154. Content: []xsql.TupleRow{
  155. &xsql.Tuple{
  156. Emitter: "demo",
  157. Message: map[string]interface{}{
  158. "a": 9,
  159. "b": "aaaa",
  160. },
  161. },
  162. &xsql.Tuple{
  163. Emitter: "demo",
  164. Message: map[string]interface{}{
  165. "a": 4,
  166. "b": "bbaa",
  167. },
  168. },
  169. },
  170. },
  171. output: &xsql.JoinTuples{
  172. Content: []*xsql.JoinTuple{
  173. {
  174. Tuples: []xsql.TupleRow{
  175. &xsql.Tuple{
  176. Emitter: "demo",
  177. Message: map[string]interface{}{
  178. "a": 9,
  179. "b": "aaaa",
  180. },
  181. },
  182. &xsql.Tuple{
  183. Emitter: "mock",
  184. Message: map[string]interface{}{
  185. "newA": 1,
  186. "newB": 2,
  187. },
  188. },
  189. },
  190. }, {
  191. Tuples: []xsql.TupleRow{
  192. &xsql.Tuple{
  193. Emitter: "demo",
  194. Message: map[string]interface{}{
  195. "a": 9,
  196. "b": "aaaa",
  197. },
  198. },
  199. &xsql.Tuple{
  200. Emitter: "mock",
  201. Message: map[string]interface{}{
  202. "newA": 4,
  203. "newB": 8,
  204. },
  205. },
  206. },
  207. }, {
  208. Tuples: []xsql.TupleRow{
  209. &xsql.Tuple{
  210. Emitter: "demo",
  211. Message: map[string]interface{}{
  212. "a": 9,
  213. "b": "aaaa",
  214. },
  215. },
  216. &xsql.Tuple{
  217. Emitter: "mock",
  218. Message: map[string]interface{}{
  219. "newA": 2,
  220. "newB": 4,
  221. },
  222. },
  223. },
  224. }, {
  225. Tuples: []xsql.TupleRow{
  226. &xsql.Tuple{
  227. Emitter: "demo",
  228. Message: map[string]interface{}{
  229. "a": 4,
  230. "b": "bbaa",
  231. },
  232. },
  233. &xsql.Tuple{
  234. Emitter: "mock",
  235. Message: map[string]interface{}{
  236. "newA": 1,
  237. "newB": 2,
  238. },
  239. },
  240. },
  241. }, {
  242. Tuples: []xsql.TupleRow{
  243. &xsql.Tuple{
  244. Emitter: "demo",
  245. Message: map[string]interface{}{
  246. "a": 4,
  247. "b": "bbaa",
  248. },
  249. },
  250. &xsql.Tuple{
  251. Emitter: "mock",
  252. Message: map[string]interface{}{
  253. "newA": 4,
  254. "newB": 8,
  255. },
  256. },
  257. },
  258. }, {
  259. Tuples: []xsql.TupleRow{
  260. &xsql.Tuple{
  261. Emitter: "demo",
  262. Message: map[string]interface{}{
  263. "a": 4,
  264. "b": "bbaa",
  265. },
  266. },
  267. &xsql.Tuple{
  268. Emitter: "mock",
  269. Message: map[string]interface{}{
  270. "newA": 4,
  271. "newB": 8,
  272. },
  273. },
  274. },
  275. },
  276. },
  277. },
  278. },
  279. }
  280. contextLogger := conf.Log.WithField("rule", "TestLookup")
  281. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  282. l, _ := NewLookupNode("mock", []string{"a"}, ast.INNER_JOIN, []ast.Expr{&ast.FieldRef{
  283. StreamName: "",
  284. Name: "a",
  285. }}, &ast.Options{
  286. DATASOURCE: "mock",
  287. TYPE: "mock",
  288. STRICT_VALIDATION: true,
  289. KIND: "lookup",
  290. }, &api.RuleOption{
  291. IsEventTime: false,
  292. LateTol: 0,
  293. Concurrency: 0,
  294. BufferLength: 0,
  295. SendMetaToSink: false,
  296. SendError: false,
  297. Qos: 0,
  298. CheckpointInterval: 0,
  299. })
  300. errCh := make(chan error)
  301. outputCh := make(chan interface{}, 1)
  302. l.outputs["mock"] = outputCh
  303. l.Exec(ctx, errCh)
  304. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  305. for i, tt := range tests {
  306. select {
  307. case err := <-errCh:
  308. t.Error(err)
  309. return
  310. case l.input <- tt.input:
  311. case <-time.After(1 * time.Second):
  312. t.Error("send message timeout")
  313. return
  314. }
  315. select {
  316. case err := <-errCh:
  317. t.Error(err)
  318. return
  319. case output := <-outputCh:
  320. if !reflect.DeepEqual(tt.output, output) {
  321. t.Errorf("\ncase %d: expect %v but got %v", i, tt.output, output)
  322. }
  323. case <-time.After(1 * time.Second):
  324. t.Error("send message timeout")
  325. return
  326. }
  327. }
  328. }