lookup_node_test.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package node
  15. import (
  16. "fmt"
  17. "github.com/benbjohnson/clock"
  18. "github.com/lf-edge/ekuiper/internal/binder"
  19. "github.com/lf-edge/ekuiper/internal/binder/io"
  20. "github.com/lf-edge/ekuiper/internal/conf"
  21. "github.com/lf-edge/ekuiper/internal/topo/context"
  22. "github.com/lf-edge/ekuiper/internal/topo/lookup"
  23. "github.com/lf-edge/ekuiper/internal/xsql"
  24. "github.com/lf-edge/ekuiper/pkg/api"
  25. "github.com/lf-edge/ekuiper/pkg/ast"
  26. "reflect"
  27. "testing"
  28. "time"
  29. )
  30. type mockLookupSrc struct {
  31. data []api.SourceTuple // new mock data
  32. }
  33. func (m *mockLookupSrc) Open(_ api.StreamContext) error {
  34. return nil
  35. }
  36. func (m *mockLookupSrc) Configure(_ string, _ map[string]interface{}) error {
  37. return nil
  38. }
  39. // Lookup accept int value as the first array value
  40. func (m *mockLookupSrc) Lookup(_ api.StreamContext, fields []string, _ []string, values []interface{}) ([]api.SourceTuple, error) {
  41. if len(fields) > 0 { // if fields is not empty, the value will be kept
  42. if m.data != nil {
  43. return m.data, nil
  44. } else {
  45. m.data = []api.SourceTuple{api.NewDefaultSourceTuple(map[string]interface{}{
  46. "newA": 1000,
  47. "newB": 1000,
  48. }, nil)}
  49. }
  50. }
  51. a1, ok := values[0].(int)
  52. if ok {
  53. var result []api.SourceTuple
  54. c := a1 % 2
  55. if c != 0 {
  56. result = append(result, api.NewDefaultSourceTuple(map[string]interface{}{
  57. "newA": c,
  58. "newB": c * 2,
  59. }, nil))
  60. }
  61. c = a1 % 3
  62. if c != 0 {
  63. result = append(result, api.NewDefaultSourceTuple(map[string]interface{}{
  64. "newA": c,
  65. "newB": c * 2,
  66. }, nil))
  67. }
  68. c = a1 % 5
  69. if c != 0 {
  70. result = append(result, api.NewDefaultSourceTuple(map[string]interface{}{
  71. "newA": c,
  72. "newB": c * 2,
  73. }, nil))
  74. }
  75. c = a1 % 7
  76. if c != 0 {
  77. result = append(result, api.NewDefaultSourceTuple(map[string]interface{}{
  78. "newA": c,
  79. "newB": c * 2,
  80. }, nil))
  81. }
  82. return result, nil
  83. } else {
  84. return []api.SourceTuple{
  85. api.NewDefaultSourceTuple(map[string]interface{}{
  86. "newA": 0,
  87. "newB": 0,
  88. }, nil),
  89. }, nil
  90. }
  91. }
  92. func (m *mockLookupSrc) Close(_ api.StreamContext) error {
  93. // do nothing
  94. return nil
  95. }
  96. type mockFac struct{}
  97. func (m *mockFac) Source(_ string) (api.Source, error) {
  98. return nil, nil
  99. }
  100. func (m *mockFac) LookupSource(name string) (api.LookupSource, error) {
  101. if name == "mock" {
  102. return &mockLookupSrc{}, nil
  103. }
  104. return nil, nil
  105. }
  106. // init mock lookup source factory
  107. func init() {
  108. io.Initialize([]binder.FactoryEntry{
  109. {Name: "native plugin", Factory: &mockFac{}, Weight: 10},
  110. })
  111. }
  112. func TestLookup(t *testing.T) {
  113. var tests = []struct {
  114. input interface{} // a tuple or a window
  115. output *xsql.JoinTuples
  116. }{
  117. {
  118. input: &xsql.Tuple{
  119. Emitter: "demo",
  120. Message: map[string]interface{}{
  121. "a": 6,
  122. "b": "aaaa",
  123. },
  124. },
  125. output: &xsql.JoinTuples{
  126. Content: []*xsql.JoinTuple{
  127. {
  128. Tuples: []xsql.TupleRow{
  129. &xsql.Tuple{
  130. Emitter: "demo",
  131. Message: map[string]interface{}{
  132. "a": 6,
  133. "b": "aaaa",
  134. },
  135. },
  136. &xsql.Tuple{
  137. Emitter: "mock",
  138. Message: map[string]interface{}{
  139. "newA": 1,
  140. "newB": 2,
  141. },
  142. },
  143. },
  144. }, {
  145. Tuples: []xsql.TupleRow{
  146. &xsql.Tuple{
  147. Emitter: "demo",
  148. Message: map[string]interface{}{
  149. "a": 6,
  150. "b": "aaaa",
  151. },
  152. },
  153. &xsql.Tuple{
  154. Emitter: "mock",
  155. Message: map[string]interface{}{
  156. "newA": 6,
  157. "newB": 12,
  158. },
  159. },
  160. },
  161. },
  162. },
  163. },
  164. },
  165. {
  166. input: &xsql.WindowTuples{
  167. Content: []xsql.TupleRow{
  168. &xsql.Tuple{
  169. Emitter: "demo",
  170. Message: map[string]interface{}{
  171. "a": 9,
  172. "b": "aaaa",
  173. },
  174. },
  175. &xsql.Tuple{
  176. Emitter: "demo",
  177. Message: map[string]interface{}{
  178. "a": 4,
  179. "b": "bbaa",
  180. },
  181. },
  182. },
  183. },
  184. output: &xsql.JoinTuples{
  185. Content: []*xsql.JoinTuple{
  186. {
  187. Tuples: []xsql.TupleRow{
  188. &xsql.Tuple{
  189. Emitter: "demo",
  190. Message: map[string]interface{}{
  191. "a": 9,
  192. "b": "aaaa",
  193. },
  194. },
  195. &xsql.Tuple{
  196. Emitter: "mock",
  197. Message: map[string]interface{}{
  198. "newA": 1,
  199. "newB": 2,
  200. },
  201. },
  202. },
  203. }, {
  204. Tuples: []xsql.TupleRow{
  205. &xsql.Tuple{
  206. Emitter: "demo",
  207. Message: map[string]interface{}{
  208. "a": 9,
  209. "b": "aaaa",
  210. },
  211. },
  212. &xsql.Tuple{
  213. Emitter: "mock",
  214. Message: map[string]interface{}{
  215. "newA": 4,
  216. "newB": 8,
  217. },
  218. },
  219. },
  220. }, {
  221. Tuples: []xsql.TupleRow{
  222. &xsql.Tuple{
  223. Emitter: "demo",
  224. Message: map[string]interface{}{
  225. "a": 9,
  226. "b": "aaaa",
  227. },
  228. },
  229. &xsql.Tuple{
  230. Emitter: "mock",
  231. Message: map[string]interface{}{
  232. "newA": 2,
  233. "newB": 4,
  234. },
  235. },
  236. },
  237. }, {
  238. Tuples: []xsql.TupleRow{
  239. &xsql.Tuple{
  240. Emitter: "demo",
  241. Message: map[string]interface{}{
  242. "a": 4,
  243. "b": "bbaa",
  244. },
  245. },
  246. &xsql.Tuple{
  247. Emitter: "mock",
  248. Message: map[string]interface{}{
  249. "newA": 1,
  250. "newB": 2,
  251. },
  252. },
  253. },
  254. }, {
  255. Tuples: []xsql.TupleRow{
  256. &xsql.Tuple{
  257. Emitter: "demo",
  258. Message: map[string]interface{}{
  259. "a": 4,
  260. "b": "bbaa",
  261. },
  262. },
  263. &xsql.Tuple{
  264. Emitter: "mock",
  265. Message: map[string]interface{}{
  266. "newA": 4,
  267. "newB": 8,
  268. },
  269. },
  270. },
  271. }, {
  272. Tuples: []xsql.TupleRow{
  273. &xsql.Tuple{
  274. Emitter: "demo",
  275. Message: map[string]interface{}{
  276. "a": 4,
  277. "b": "bbaa",
  278. },
  279. },
  280. &xsql.Tuple{
  281. Emitter: "mock",
  282. Message: map[string]interface{}{
  283. "newA": 4,
  284. "newB": 8,
  285. },
  286. },
  287. },
  288. },
  289. },
  290. },
  291. },
  292. }
  293. options := &ast.Options{
  294. DATASOURCE: "mock",
  295. TYPE: "mock",
  296. STRICT_VALIDATION: true,
  297. KIND: "lookup",
  298. }
  299. lookup.CreateInstance("mock", "mock", options)
  300. contextLogger := conf.Log.WithField("rule", "TestLookup")
  301. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  302. l, _ := NewLookupNode("mock", []string{}, []string{"a"}, ast.INNER_JOIN, []ast.Expr{&ast.FieldRef{
  303. StreamName: "",
  304. Name: "a",
  305. }}, options, &api.RuleOption{
  306. IsEventTime: false,
  307. LateTol: 0,
  308. Concurrency: 0,
  309. BufferLength: 0,
  310. SendMetaToSink: false,
  311. SendError: false,
  312. Qos: 0,
  313. CheckpointInterval: 0,
  314. })
  315. errCh := make(chan error)
  316. outputCh := make(chan interface{}, 1)
  317. l.outputs["mock"] = outputCh
  318. l.Exec(ctx, errCh)
  319. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  320. for i, tt := range tests {
  321. select {
  322. case err := <-errCh:
  323. t.Error(err)
  324. return
  325. case l.input <- tt.input:
  326. case <-time.After(1 * time.Second):
  327. t.Error("send message timeout")
  328. return
  329. }
  330. select {
  331. case err := <-errCh:
  332. t.Error(err)
  333. return
  334. case output := <-outputCh:
  335. if !reflect.DeepEqual(tt.output, output) {
  336. t.Errorf("\ncase %d: expect %v but got %v", i, tt.output, output)
  337. }
  338. case <-time.After(1 * time.Second):
  339. t.Error("send message timeout")
  340. return
  341. }
  342. }
  343. }
  344. func TestCachedLookup(t *testing.T) {
  345. options := &ast.Options{
  346. DATASOURCE: "mock",
  347. TYPE: "mock",
  348. STRICT_VALIDATION: true,
  349. KIND: "lookup",
  350. }
  351. lookup.CreateInstance("mock", "mock", options)
  352. contextLogger := conf.Log.WithField("rule", "TestLookup")
  353. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  354. l, _ := NewLookupNode("mock", []string{"fixed"}, []string{"a"}, ast.INNER_JOIN, []ast.Expr{&ast.FieldRef{
  355. StreamName: "",
  356. Name: "a",
  357. }}, options, &api.RuleOption{
  358. IsEventTime: false,
  359. LateTol: 0,
  360. Concurrency: 0,
  361. BufferLength: 0,
  362. SendMetaToSink: false,
  363. SendError: false,
  364. Qos: 0,
  365. CheckpointInterval: 0,
  366. })
  367. l.conf = &LookupConf{
  368. Cache: true,
  369. CacheTTL: 20,
  370. CacheMissingKey: false,
  371. }
  372. errCh := make(chan error)
  373. outputCh := make(chan interface{}, 1)
  374. l.outputs["mock"] = outputCh
  375. l.Exec(ctx, errCh)
  376. input := &xsql.Tuple{
  377. Emitter: "demo",
  378. Message: map[string]interface{}{
  379. "a": 6,
  380. "b": "aaaa",
  381. },
  382. }
  383. outputBefore := &xsql.JoinTuples{
  384. Content: []*xsql.JoinTuple{
  385. {
  386. Tuples: []xsql.TupleRow{
  387. &xsql.Tuple{
  388. Emitter: "demo",
  389. Message: map[string]interface{}{
  390. "a": 6,
  391. "b": "aaaa",
  392. },
  393. },
  394. &xsql.Tuple{
  395. Emitter: "mock",
  396. Message: map[string]interface{}{
  397. "newA": 1,
  398. "newB": 2,
  399. },
  400. Timestamp: 11000,
  401. },
  402. },
  403. }, {
  404. Tuples: []xsql.TupleRow{
  405. &xsql.Tuple{
  406. Emitter: "demo",
  407. Message: map[string]interface{}{
  408. "a": 6,
  409. "b": "aaaa",
  410. },
  411. },
  412. &xsql.Tuple{
  413. Emitter: "mock",
  414. Message: map[string]interface{}{
  415. "newA": 6,
  416. "newB": 12,
  417. },
  418. Timestamp: 11000,
  419. },
  420. },
  421. },
  422. },
  423. }
  424. outputAfter := &xsql.JoinTuples{
  425. Content: []*xsql.JoinTuple{
  426. {
  427. Tuples: []xsql.TupleRow{
  428. &xsql.Tuple{
  429. Emitter: "demo",
  430. Message: map[string]interface{}{
  431. "a": 6,
  432. "b": "aaaa",
  433. },
  434. },
  435. &xsql.Tuple{
  436. Emitter: "mock",
  437. Message: map[string]interface{}{
  438. "newA": 1000,
  439. "newB": 1000,
  440. },
  441. Timestamp: 22000,
  442. },
  443. },
  444. },
  445. },
  446. }
  447. // First run and the set mock result
  448. clock := conf.Clock.(*clock.Mock)
  449. select {
  450. case err := <-errCh:
  451. t.Error(err)
  452. return
  453. case l.input <- input:
  454. case <-time.After(1 * time.Second):
  455. t.Error("send message timeout")
  456. return
  457. }
  458. select {
  459. case err := <-errCh:
  460. t.Error(err)
  461. return
  462. case <-outputCh:
  463. //if !reflect.DeepEqual(output, outputBefore) {
  464. // t.Errorf("\nfirst case: expect %v but got %v", output, outputBefore)
  465. //}
  466. case <-time.After(1 * time.Second):
  467. t.Error("send message timeout")
  468. return
  469. }
  470. // Get cache
  471. clock.Add(11 * time.Second)
  472. select {
  473. case err := <-errCh:
  474. t.Error(err)
  475. return
  476. case l.input <- input:
  477. case <-time.After(1 * time.Second):
  478. t.Error("send message timeout")
  479. return
  480. }
  481. select {
  482. case err := <-errCh:
  483. t.Error(err)
  484. return
  485. case output := <-outputCh:
  486. if !reflect.DeepEqual(output, outputBefore) {
  487. t.Errorf("\ncached case: expect %v but got %v", output, outputBefore)
  488. }
  489. case <-time.After(1 * time.Second):
  490. t.Error("send message timeout")
  491. return
  492. }
  493. // Cache expired
  494. clock.Add(11 * time.Second)
  495. select {
  496. case err := <-errCh:
  497. t.Error(err)
  498. return
  499. case l.input <- input:
  500. case <-time.After(1 * time.Second):
  501. t.Error("send message timeout")
  502. return
  503. }
  504. select {
  505. case err := <-errCh:
  506. t.Error(err)
  507. return
  508. case output := <-outputCh:
  509. if !reflect.DeepEqual(output, outputAfter) {
  510. t.Errorf("\nexpired case: expect %v but got %v", output, outputAfter)
  511. }
  512. case <-time.After(1 * time.Second):
  513. t.Error("send message timeout")
  514. return
  515. }
  516. }