lookup_node_test.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package node
  15. import (
  16. "fmt"
  17. "github.com/benbjohnson/clock"
  18. "github.com/lf-edge/ekuiper/internal/binder"
  19. "github.com/lf-edge/ekuiper/internal/binder/io"
  20. "github.com/lf-edge/ekuiper/internal/conf"
  21. "github.com/lf-edge/ekuiper/internal/plugin"
  22. "github.com/lf-edge/ekuiper/internal/topo/context"
  23. "github.com/lf-edge/ekuiper/internal/topo/lookup"
  24. "github.com/lf-edge/ekuiper/internal/xsql"
  25. "github.com/lf-edge/ekuiper/pkg/api"
  26. "github.com/lf-edge/ekuiper/pkg/ast"
  27. "reflect"
  28. "testing"
  29. "time"
  30. )
  31. type mockLookupSrc struct {
  32. data []api.SourceTuple // new mock data
  33. }
  34. func (m *mockLookupSrc) Open(_ api.StreamContext) error {
  35. return nil
  36. }
  37. func (m *mockLookupSrc) Configure(_ string, _ map[string]interface{}) error {
  38. return nil
  39. }
  40. // Lookup accept int value as the first array value
  41. func (m *mockLookupSrc) Lookup(_ api.StreamContext, fields []string, _ []string, values []interface{}) ([]api.SourceTuple, error) {
  42. if len(fields) > 0 { // if fields is not empty, the value will be kept
  43. if m.data != nil {
  44. return m.data, nil
  45. } else {
  46. m.data = []api.SourceTuple{api.NewDefaultSourceTuple(map[string]interface{}{
  47. "newA": 1000,
  48. "newB": 1000,
  49. }, nil)}
  50. }
  51. }
  52. a1, ok := values[0].(int)
  53. if ok {
  54. var result []api.SourceTuple
  55. c := a1 % 2
  56. if c != 0 {
  57. result = append(result, api.NewDefaultSourceTuple(map[string]interface{}{
  58. "newA": c,
  59. "newB": c * 2,
  60. }, nil))
  61. }
  62. c = a1 % 3
  63. if c != 0 {
  64. result = append(result, api.NewDefaultSourceTuple(map[string]interface{}{
  65. "newA": c,
  66. "newB": c * 2,
  67. }, nil))
  68. }
  69. c = a1 % 5
  70. if c != 0 {
  71. result = append(result, api.NewDefaultSourceTuple(map[string]interface{}{
  72. "newA": c,
  73. "newB": c * 2,
  74. }, nil))
  75. }
  76. c = a1 % 7
  77. if c != 0 {
  78. result = append(result, api.NewDefaultSourceTuple(map[string]interface{}{
  79. "newA": c,
  80. "newB": c * 2,
  81. }, nil))
  82. }
  83. return result, nil
  84. } else {
  85. return []api.SourceTuple{
  86. api.NewDefaultSourceTuple(map[string]interface{}{
  87. "newA": 0,
  88. "newB": 0,
  89. }, nil),
  90. }, nil
  91. }
  92. }
  93. func (m *mockLookupSrc) Close(_ api.StreamContext) error {
  94. // do nothing
  95. return nil
  96. }
  97. type mockFac struct{}
  98. func (m *mockFac) Source(_ string) (api.Source, error) {
  99. return nil, nil
  100. }
  101. func (m *mockFac) SourcePluginInfo(_ string) (plugin.EXTENSION_TYPE, string, string) {
  102. return plugin.INTERNAL, "", ""
  103. }
  104. func (m *mockFac) LookupSource(name string) (api.LookupSource, error) {
  105. if name == "mock" {
  106. return &mockLookupSrc{}, nil
  107. }
  108. return nil, nil
  109. }
  110. // init mock lookup source factory
  111. func init() {
  112. io.Initialize([]binder.FactoryEntry{
  113. {Name: "native plugin", Factory: &mockFac{}, Weight: 10},
  114. })
  115. }
  116. func TestLookup(t *testing.T) {
  117. var tests = []struct {
  118. input interface{} // a tuple or a window
  119. output *xsql.JoinTuples
  120. }{
  121. {
  122. input: &xsql.Tuple{
  123. Emitter: "demo",
  124. Message: map[string]interface{}{
  125. "a": 6,
  126. "b": "aaaa",
  127. },
  128. },
  129. output: &xsql.JoinTuples{
  130. Content: []*xsql.JoinTuple{
  131. {
  132. Tuples: []xsql.TupleRow{
  133. &xsql.Tuple{
  134. Emitter: "demo",
  135. Message: map[string]interface{}{
  136. "a": 6,
  137. "b": "aaaa",
  138. },
  139. },
  140. &xsql.Tuple{
  141. Emitter: "mock",
  142. Message: map[string]interface{}{
  143. "newA": 1,
  144. "newB": 2,
  145. },
  146. },
  147. },
  148. }, {
  149. Tuples: []xsql.TupleRow{
  150. &xsql.Tuple{
  151. Emitter: "demo",
  152. Message: map[string]interface{}{
  153. "a": 6,
  154. "b": "aaaa",
  155. },
  156. },
  157. &xsql.Tuple{
  158. Emitter: "mock",
  159. Message: map[string]interface{}{
  160. "newA": 6,
  161. "newB": 12,
  162. },
  163. },
  164. },
  165. },
  166. },
  167. },
  168. },
  169. {
  170. input: &xsql.WindowTuples{
  171. Content: []xsql.TupleRow{
  172. &xsql.Tuple{
  173. Emitter: "demo",
  174. Message: map[string]interface{}{
  175. "a": 9,
  176. "b": "aaaa",
  177. },
  178. },
  179. &xsql.Tuple{
  180. Emitter: "demo",
  181. Message: map[string]interface{}{
  182. "a": 4,
  183. "b": "bbaa",
  184. },
  185. },
  186. },
  187. },
  188. output: &xsql.JoinTuples{
  189. Content: []*xsql.JoinTuple{
  190. {
  191. Tuples: []xsql.TupleRow{
  192. &xsql.Tuple{
  193. Emitter: "demo",
  194. Message: map[string]interface{}{
  195. "a": 9,
  196. "b": "aaaa",
  197. },
  198. },
  199. &xsql.Tuple{
  200. Emitter: "mock",
  201. Message: map[string]interface{}{
  202. "newA": 1,
  203. "newB": 2,
  204. },
  205. },
  206. },
  207. }, {
  208. Tuples: []xsql.TupleRow{
  209. &xsql.Tuple{
  210. Emitter: "demo",
  211. Message: map[string]interface{}{
  212. "a": 9,
  213. "b": "aaaa",
  214. },
  215. },
  216. &xsql.Tuple{
  217. Emitter: "mock",
  218. Message: map[string]interface{}{
  219. "newA": 4,
  220. "newB": 8,
  221. },
  222. },
  223. },
  224. }, {
  225. Tuples: []xsql.TupleRow{
  226. &xsql.Tuple{
  227. Emitter: "demo",
  228. Message: map[string]interface{}{
  229. "a": 9,
  230. "b": "aaaa",
  231. },
  232. },
  233. &xsql.Tuple{
  234. Emitter: "mock",
  235. Message: map[string]interface{}{
  236. "newA": 2,
  237. "newB": 4,
  238. },
  239. },
  240. },
  241. }, {
  242. Tuples: []xsql.TupleRow{
  243. &xsql.Tuple{
  244. Emitter: "demo",
  245. Message: map[string]interface{}{
  246. "a": 4,
  247. "b": "bbaa",
  248. },
  249. },
  250. &xsql.Tuple{
  251. Emitter: "mock",
  252. Message: map[string]interface{}{
  253. "newA": 1,
  254. "newB": 2,
  255. },
  256. },
  257. },
  258. }, {
  259. Tuples: []xsql.TupleRow{
  260. &xsql.Tuple{
  261. Emitter: "demo",
  262. Message: map[string]interface{}{
  263. "a": 4,
  264. "b": "bbaa",
  265. },
  266. },
  267. &xsql.Tuple{
  268. Emitter: "mock",
  269. Message: map[string]interface{}{
  270. "newA": 4,
  271. "newB": 8,
  272. },
  273. },
  274. },
  275. }, {
  276. Tuples: []xsql.TupleRow{
  277. &xsql.Tuple{
  278. Emitter: "demo",
  279. Message: map[string]interface{}{
  280. "a": 4,
  281. "b": "bbaa",
  282. },
  283. },
  284. &xsql.Tuple{
  285. Emitter: "mock",
  286. Message: map[string]interface{}{
  287. "newA": 4,
  288. "newB": 8,
  289. },
  290. },
  291. },
  292. },
  293. },
  294. },
  295. },
  296. }
  297. options := &ast.Options{
  298. DATASOURCE: "mock",
  299. TYPE: "mock",
  300. STRICT_VALIDATION: true,
  301. KIND: "lookup",
  302. }
  303. lookup.CreateInstance("mock", "mock", options)
  304. contextLogger := conf.Log.WithField("rule", "TestLookup")
  305. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  306. l, _ := NewLookupNode("mock", []string{}, []string{"a"}, ast.INNER_JOIN, []ast.Expr{&ast.FieldRef{
  307. StreamName: "",
  308. Name: "a",
  309. }}, options, &api.RuleOption{
  310. IsEventTime: false,
  311. LateTol: 0,
  312. Concurrency: 0,
  313. BufferLength: 0,
  314. SendMetaToSink: false,
  315. SendError: false,
  316. Qos: 0,
  317. CheckpointInterval: 0,
  318. })
  319. errCh := make(chan error)
  320. outputCh := make(chan interface{}, 1)
  321. l.outputs["mock"] = outputCh
  322. l.Exec(ctx, errCh)
  323. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  324. for i, tt := range tests {
  325. select {
  326. case err := <-errCh:
  327. t.Error(err)
  328. return
  329. case l.input <- tt.input:
  330. case <-time.After(1 * time.Second):
  331. t.Error("send message timeout")
  332. return
  333. }
  334. select {
  335. case err := <-errCh:
  336. t.Error(err)
  337. return
  338. case output := <-outputCh:
  339. if !reflect.DeepEqual(tt.output, output) {
  340. t.Errorf("\ncase %d: expect %v but got %v", i, tt.output, output)
  341. }
  342. case <-time.After(1 * time.Second):
  343. t.Error("send message timeout")
  344. return
  345. }
  346. }
  347. }
  348. func TestCachedLookup(t *testing.T) {
  349. options := &ast.Options{
  350. DATASOURCE: "mock",
  351. TYPE: "mock",
  352. STRICT_VALIDATION: true,
  353. KIND: "lookup",
  354. }
  355. lookup.CreateInstance("mock", "mock", options)
  356. contextLogger := conf.Log.WithField("rule", "TestLookup")
  357. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  358. l, _ := NewLookupNode("mock", []string{"fixed"}, []string{"a"}, ast.INNER_JOIN, []ast.Expr{&ast.FieldRef{
  359. StreamName: "",
  360. Name: "a",
  361. }}, options, &api.RuleOption{
  362. IsEventTime: false,
  363. LateTol: 0,
  364. Concurrency: 0,
  365. BufferLength: 0,
  366. SendMetaToSink: false,
  367. SendError: false,
  368. Qos: 0,
  369. CheckpointInterval: 0,
  370. })
  371. l.conf = &LookupConf{
  372. Cache: true,
  373. CacheTTL: 20,
  374. CacheMissingKey: false,
  375. }
  376. errCh := make(chan error)
  377. outputCh := make(chan interface{}, 1)
  378. l.outputs["mock"] = outputCh
  379. l.Exec(ctx, errCh)
  380. input := &xsql.Tuple{
  381. Emitter: "demo",
  382. Message: map[string]interface{}{
  383. "a": 6,
  384. "b": "aaaa",
  385. },
  386. }
  387. outputBefore := &xsql.JoinTuples{
  388. Content: []*xsql.JoinTuple{
  389. {
  390. Tuples: []xsql.TupleRow{
  391. &xsql.Tuple{
  392. Emitter: "demo",
  393. Message: map[string]interface{}{
  394. "a": 6,
  395. "b": "aaaa",
  396. },
  397. },
  398. &xsql.Tuple{
  399. Emitter: "mock",
  400. Message: map[string]interface{}{
  401. "newA": 1,
  402. "newB": 2,
  403. },
  404. Timestamp: 11000,
  405. },
  406. },
  407. }, {
  408. Tuples: []xsql.TupleRow{
  409. &xsql.Tuple{
  410. Emitter: "demo",
  411. Message: map[string]interface{}{
  412. "a": 6,
  413. "b": "aaaa",
  414. },
  415. },
  416. &xsql.Tuple{
  417. Emitter: "mock",
  418. Message: map[string]interface{}{
  419. "newA": 6,
  420. "newB": 12,
  421. },
  422. Timestamp: 11000,
  423. },
  424. },
  425. },
  426. },
  427. }
  428. outputAfter := &xsql.JoinTuples{
  429. Content: []*xsql.JoinTuple{
  430. {
  431. Tuples: []xsql.TupleRow{
  432. &xsql.Tuple{
  433. Emitter: "demo",
  434. Message: map[string]interface{}{
  435. "a": 6,
  436. "b": "aaaa",
  437. },
  438. },
  439. &xsql.Tuple{
  440. Emitter: "mock",
  441. Message: map[string]interface{}{
  442. "newA": 1000,
  443. "newB": 1000,
  444. },
  445. Timestamp: 22000,
  446. },
  447. },
  448. },
  449. },
  450. }
  451. // First run and the set mock result
  452. clock := conf.Clock.(*clock.Mock)
  453. select {
  454. case err := <-errCh:
  455. t.Error(err)
  456. return
  457. case l.input <- input:
  458. case <-time.After(1 * time.Second):
  459. t.Error("send message timeout")
  460. return
  461. }
  462. select {
  463. case err := <-errCh:
  464. t.Error(err)
  465. return
  466. case <-outputCh:
  467. //if !reflect.DeepEqual(output, outputBefore) {
  468. // t.Errorf("\nfirst case: expect %v but got %v", output, outputBefore)
  469. //}
  470. case <-time.After(1 * time.Second):
  471. t.Error("send message timeout")
  472. return
  473. }
  474. // Get cache
  475. clock.Add(11 * time.Second)
  476. select {
  477. case err := <-errCh:
  478. t.Error(err)
  479. return
  480. case l.input <- input:
  481. case <-time.After(1 * time.Second):
  482. t.Error("send message timeout")
  483. return
  484. }
  485. select {
  486. case err := <-errCh:
  487. t.Error(err)
  488. return
  489. case output := <-outputCh:
  490. if !reflect.DeepEqual(output, outputBefore) {
  491. t.Errorf("\ncached case: expect %v but got %v", output, outputBefore)
  492. }
  493. case <-time.After(1 * time.Second):
  494. t.Error("send message timeout")
  495. return
  496. }
  497. // Cache expired
  498. clock.Add(11 * time.Second)
  499. select {
  500. case err := <-errCh:
  501. t.Error(err)
  502. return
  503. case l.input <- input:
  504. case <-time.After(1 * time.Second):
  505. t.Error("send message timeout")
  506. return
  507. }
  508. select {
  509. case err := <-errCh:
  510. t.Error(err)
  511. return
  512. case output := <-outputCh:
  513. if !reflect.DeepEqual(output, outputAfter) {
  514. t.Errorf("\nexpired case: expect %v but got %v", output, outputAfter)
  515. }
  516. case <-time.After(1 * time.Second):
  517. t.Error("send message timeout")
  518. return
  519. }
  520. }