extension_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560
  1. // +build !windows
  2. package processors
  3. import (
  4. "bufio"
  5. "encoding/json"
  6. "fmt"
  7. "github.com/emqx/kuiper/common"
  8. "github.com/emqx/kuiper/xsql"
  9. "github.com/emqx/kuiper/xstream/api"
  10. "github.com/emqx/kuiper/xstream/nodes"
  11. "github.com/emqx/kuiper/xstream/test"
  12. "os"
  13. "path"
  14. "reflect"
  15. "strings"
  16. "testing"
  17. "time"
  18. )
  19. //This cannot be run in Windows. And the plugins must be built to so before running this
  20. //For Windows, run it in wsl with go test xsql/processors/extension_test.go xsql/processors/xsql_processor.go
  21. func setup() *RuleProcessor {
  22. log := common.Log
  23. os.Remove(CACHE_FILE)
  24. dbDir, err := common.GetDataLoc()
  25. if err != nil {
  26. log.Panic(err)
  27. }
  28. log.Infof("db location is %s", dbDir)
  29. p := NewStreamProcessor(path.Join(dbDir, "stream"))
  30. demo := `DROP STREAM ext`
  31. p.ExecStmt(demo)
  32. demo = "CREATE STREAM ext (count bigint) WITH (DATASOURCE=\"users\", FORMAT=\"JSON\", TYPE=\"random\", CONF_KEY=\"ext\")"
  33. _, err = p.ExecStmt(demo)
  34. if err != nil {
  35. panic(err)
  36. }
  37. demo = `DROP STREAM ext2`
  38. p.ExecStmt(demo)
  39. demo = "CREATE STREAM ext2 (count bigint) WITH (DATASOURCE=\"users\", FORMAT=\"JSON\", TYPE=\"random\", CONF_KEY=\"dedup\")"
  40. _, err = p.ExecStmt(demo)
  41. if err != nil {
  42. panic(err)
  43. }
  44. rp := NewRuleProcessor(dbDir)
  45. return rp
  46. }
  47. var CACHE_FILE = "cache"
  48. //Test for source, sink, func and agg func extensions
  49. //The .so files must be in the plugins folder
  50. func TestExtensions(t *testing.T) {
  51. log := common.Log
  52. var tests = []struct {
  53. name string
  54. rj string
  55. minLength int
  56. maxLength int
  57. }{
  58. {
  59. name: `$$test1`,
  60. rj: "{\"sql\": \"SELECT count(echo(count)) as c, echo(count) as e, countPlusOne(count) as p FROM ext where count > 49\",\"actions\": [{\"file\": {\"path\":\"" + CACHE_FILE + "\"}}]}",
  61. minLength: 5,
  62. }, {
  63. name: `$$test2`,
  64. rj: "{\"sql\": \"SELECT count(echo(count)) as c, echo(count) as e, countPlusOne(count) as p FROM ext2\",\"actions\": [{\"file\": {\"path\":\"" + CACHE_FILE + "\"}}]}",
  65. maxLength: 2,
  66. },
  67. }
  68. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  69. rp := setup()
  70. done := make(chan struct{})
  71. defer close(done)
  72. for i, tt := range tests {
  73. rp.ExecDrop(tt.name)
  74. rs, err := rp.ExecCreate(tt.name, tt.rj)
  75. if err != nil {
  76. t.Errorf("failed to create rule: %s.", err)
  77. continue
  78. }
  79. os.Create(CACHE_FILE)
  80. tp, err := rp.ExecInitRule(rs)
  81. if err != nil {
  82. t.Errorf("fail to init rule: %v", err)
  83. continue
  84. }
  85. go func() {
  86. select {
  87. case err := <-tp.Open():
  88. log.Println(err)
  89. tp.Cancel()
  90. case <-time.After(900 * time.Millisecond):
  91. tp.Cancel()
  92. }
  93. }()
  94. time.Sleep(1000 * time.Millisecond)
  95. log.Printf("exit main program after a second")
  96. results := getResults()
  97. log.Infof("get results %v", results)
  98. os.Remove(CACHE_FILE)
  99. var maps [][]map[string]interface{}
  100. for _, v := range results {
  101. var mapRes []map[string]interface{}
  102. err := json.Unmarshal([]byte(v), &mapRes)
  103. if err != nil {
  104. t.Errorf("Failed to parse the input into map")
  105. continue
  106. }
  107. maps = append(maps, mapRes)
  108. }
  109. if tt.minLength > 0 {
  110. if len(maps) < tt.minLength {
  111. t.Errorf("%d. %q\n\nresult length is smaller than minlength:\n\ngot=%#v\n\n", i, tt.rj, maps)
  112. break
  113. }
  114. }
  115. if tt.maxLength > 0 {
  116. if len(maps) > tt.maxLength {
  117. t.Errorf("%d. %q\n\nresult length is bigger than maxLength:\n\ngot=%#v\n\n", i, tt.rj, maps)
  118. break
  119. }
  120. }
  121. for _, r := range maps {
  122. if len(r) != 1 {
  123. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  124. break
  125. }
  126. r := r[0]
  127. c := int((r["c"]).(float64))
  128. if c != 1 {
  129. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  130. break
  131. }
  132. e := int((r["e"]).(float64))
  133. if e != 50 && e != 51 {
  134. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  135. break
  136. }
  137. p := int(r["p"].(float64))
  138. if p != 2 {
  139. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  140. break
  141. }
  142. }
  143. }
  144. }
  145. func getResults() []string {
  146. f, err := os.Open(CACHE_FILE)
  147. if err != nil {
  148. panic(err)
  149. }
  150. result := make([]string, 0)
  151. scanner := bufio.NewScanner(f)
  152. for scanner.Scan() {
  153. result = append(result, scanner.Text())
  154. }
  155. if err := scanner.Err(); err != nil {
  156. panic(err)
  157. }
  158. f.Close()
  159. return result
  160. }
  161. func getExtMockSource(name string, done <-chan int, size int) *nodes.SourceNode {
  162. var data []*xsql.Tuple
  163. switch name {
  164. case "text":
  165. data = []*xsql.Tuple{
  166. {
  167. Emitter: name,
  168. Message: map[string]interface{}{
  169. "slogan": "Impossible is nothing",
  170. "brand": "Adidas",
  171. },
  172. Timestamp: 1541152486500,
  173. },
  174. {
  175. Emitter: name,
  176. Message: map[string]interface{}{
  177. "slogan": "Stronger than dirt",
  178. "brand": "Ajax",
  179. },
  180. Timestamp: 1541152487400,
  181. },
  182. {
  183. Emitter: name,
  184. Message: map[string]interface{}{
  185. "slogan": "Belong anywhere",
  186. "brand": "Airbnb",
  187. },
  188. Timestamp: 1541152488300,
  189. },
  190. {
  191. Emitter: name,
  192. Message: map[string]interface{}{
  193. "slogan": "I can't believe I ate the whole thing",
  194. "brand": "Alka Seltzer",
  195. },
  196. Timestamp: 1541152489200,
  197. },
  198. {
  199. Emitter: name,
  200. Message: map[string]interface{}{
  201. "slogan": "You're in good hands",
  202. "brand": "Allstate",
  203. },
  204. Timestamp: 1541152490100,
  205. },
  206. {
  207. Emitter: name,
  208. Message: map[string]interface{}{
  209. "slogan": "Don't leave home without it",
  210. "brand": "American Express",
  211. },
  212. Timestamp: 1541152491200,
  213. },
  214. {
  215. Emitter: name,
  216. Message: map[string]interface{}{
  217. "slogan": "Think different",
  218. "brand": "Apple",
  219. },
  220. Timestamp: 1541152492300,
  221. },
  222. {
  223. Emitter: name,
  224. Message: map[string]interface{}{
  225. "slogan": "We try harder",
  226. "brand": "Avis",
  227. },
  228. Timestamp: 1541152493400,
  229. },
  230. }
  231. }
  232. return nodes.NewSourceNodeWithSource(name, test.NewMockSource(data[:size], done, false), map[string]string{
  233. "DATASOURCE": name,
  234. })
  235. }
  236. func setup2() *RuleProcessor {
  237. log := common.Log
  238. dbDir, err := common.GetDataLoc()
  239. if err != nil {
  240. log.Panic(err)
  241. }
  242. log.Infof("db location is %s", dbDir)
  243. p := NewStreamProcessor(path.Join(dbDir, "stream"))
  244. demo := `DROP STREAM text`
  245. p.ExecStmt(demo)
  246. demo = "CREATE STREAM text (slogan string, brand string) WITH (DATASOURCE=\"users\", FORMAT=\"JSON\")"
  247. _, err = p.ExecStmt(demo)
  248. if err != nil {
  249. panic(err)
  250. }
  251. rp := NewRuleProcessor(dbDir)
  252. return rp
  253. }
  254. func TestFuncState(t *testing.T) {
  255. var tests = []struct {
  256. name string
  257. sql string
  258. r [][]map[string]interface{}
  259. s string
  260. m map[string]interface{}
  261. }{
  262. {
  263. name: `rule1`,
  264. sql: `SELECT accumulateWordCount(slogan, " ") as wc FROM text`,
  265. r: [][]map[string]interface{}{
  266. {{
  267. "wc": float64(3),
  268. }},
  269. {{
  270. "wc": float64(6),
  271. }},
  272. {{
  273. "wc": float64(8),
  274. }},
  275. {{
  276. "wc": float64(16),
  277. }},
  278. {{
  279. "wc": float64(20),
  280. }},
  281. {{
  282. "wc": float64(25),
  283. }},
  284. {{
  285. "wc": float64(27),
  286. }},
  287. {{
  288. "wc": float64(30),
  289. }},
  290. },
  291. m: map[string]interface{}{
  292. "op_preprocessor_text_0_exceptions_total": int64(0),
  293. "op_preprocessor_text_0_process_latency_ms": int64(0),
  294. "op_preprocessor_text_0_records_in_total": int64(8),
  295. "op_preprocessor_text_0_records_out_total": int64(8),
  296. "op_project_0_exceptions_total": int64(0),
  297. "op_project_0_process_latency_ms": int64(0),
  298. "op_project_0_records_in_total": int64(8),
  299. "op_project_0_records_out_total": int64(8),
  300. "sink_mockSink_0_exceptions_total": int64(0),
  301. "sink_mockSink_0_records_in_total": int64(8),
  302. "sink_mockSink_0_records_out_total": int64(8),
  303. "source_text_0_exceptions_total": int64(0),
  304. "source_text_0_records_in_total": int64(8),
  305. "source_text_0_records_out_total": int64(8),
  306. },
  307. s: "sink_mockSink_0_records_out_total",
  308. },
  309. }
  310. p := setup2()
  311. for i, tt := range tests {
  312. p.ExecDrop(tt.name)
  313. parser := xsql.NewParser(strings.NewReader(tt.sql))
  314. var (
  315. sources []*nodes.SourceNode
  316. syncs []chan int
  317. )
  318. if stmt, err := xsql.Language.Parse(parser); err != nil {
  319. t.Errorf("parse sql %s error: %s", tt.sql, err)
  320. } else {
  321. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  322. t.Errorf("sql %s is not a select statement", tt.sql)
  323. } else {
  324. streams := xsql.GetStreams(selectStmt)
  325. for _, stream := range streams {
  326. next := make(chan int)
  327. syncs = append(syncs, next)
  328. source := getExtMockSource(stream, next, 8)
  329. sources = append(sources, source)
  330. }
  331. }
  332. }
  333. tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql, Options: &api.RuleOption{
  334. BufferLength: 100,
  335. }}, sources)
  336. if err != nil {
  337. t.Error(err)
  338. }
  339. mockSink := test.NewMockSink()
  340. sink := nodes.NewSinkNodeWithSink("mockSink", mockSink, nil)
  341. tp.AddSink(inputs, sink)
  342. errCh := tp.Open()
  343. func() {
  344. for i := 0; i < 8; i++ {
  345. syncs[i%len(syncs)] <- i
  346. select {
  347. case err = <-errCh:
  348. t.Log(err)
  349. tp.Cancel()
  350. return
  351. default:
  352. }
  353. }
  354. for retry := 100; retry > 0; retry-- {
  355. if err := compareMetrics(tp, tt.m, tt.sql); err == nil {
  356. break
  357. }
  358. time.Sleep(time.Duration(retry) * time.Millisecond)
  359. }
  360. }()
  361. results := mockSink.GetResults()
  362. var maps [][]map[string]interface{}
  363. for _, v := range results {
  364. var mapRes []map[string]interface{}
  365. err := json.Unmarshal(v, &mapRes)
  366. if err != nil {
  367. t.Errorf("Failed to parse the input into map")
  368. continue
  369. }
  370. maps = append(maps, mapRes)
  371. }
  372. if !reflect.DeepEqual(tt.r, maps) {
  373. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  374. continue
  375. }
  376. if err := compareMetrics(tp, tt.m, tt.sql); err != nil {
  377. t.Errorf("%d. %q\n\n%v", i, tt.sql, err)
  378. }
  379. tp.Cancel()
  380. }
  381. }
  382. func TestFuncStateCheckpoint(t *testing.T) {
  383. var tests = []struct {
  384. name string
  385. sql string
  386. r [][]map[string]interface{}
  387. size int
  388. cc int
  389. breakSize int
  390. }{
  391. {
  392. name: `rule1`,
  393. sql: `SELECT accumulateWordCount(slogan, " ") as wc FROM text`,
  394. size: 8,
  395. breakSize: 3,
  396. cc: 1,
  397. r: [][]map[string]interface{}{
  398. {{
  399. "wc": float64(3),
  400. }},
  401. {{
  402. "wc": float64(6),
  403. }},
  404. {{
  405. "wc": float64(6),
  406. }},
  407. {{
  408. "wc": float64(8),
  409. }},
  410. {{
  411. "wc": float64(16),
  412. }},
  413. {{
  414. "wc": float64(20),
  415. }},
  416. {{
  417. "wc": float64(25),
  418. }},
  419. {{
  420. "wc": float64(27),
  421. }},
  422. {{
  423. "wc": float64(30),
  424. }},
  425. },
  426. },
  427. }
  428. p := setup2()
  429. for i, tt := range tests {
  430. p.ExecDrop(tt.name)
  431. cleanStateData()
  432. test.ResetClock(1541152485000)
  433. mockClock := test.GetMockClock()
  434. parser := xsql.NewParser(strings.NewReader(tt.sql))
  435. var (
  436. sources []*nodes.SourceNode
  437. syncs []chan int
  438. )
  439. if stmt, err := xsql.Language.Parse(parser); err != nil {
  440. t.Errorf("parse sql %s error: %s", tt.sql, err)
  441. } else {
  442. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  443. t.Errorf("sql %s is not a select statement", tt.sql)
  444. } else {
  445. streams := xsql.GetStreams(selectStmt)
  446. for _, stream := range streams {
  447. next := make(chan int)
  448. syncs = append(syncs, next)
  449. source := getExtMockSource(stream, next, 8)
  450. sources = append(sources, source)
  451. }
  452. }
  453. }
  454. tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql, Options: &api.RuleOption{
  455. BufferLength: 100,
  456. Qos: api.AtLeastOnce,
  457. CheckpointInterval: 2000,
  458. }}, sources)
  459. if err != nil {
  460. t.Error(err)
  461. }
  462. mockSink := test.NewMockSink()
  463. sink := nodes.NewSinkNodeWithSink("mockSink", mockSink, nil)
  464. tp.AddSink(inputs, sink)
  465. errCh := tp.Open()
  466. func() {
  467. for i := 0; i < tt.breakSize; i++ {
  468. mockClock.Add(1000)
  469. log.Debugf("before sent %d at %d", i, common.TimeToUnixMilli(mockClock.Now()))
  470. syncs[i%len(syncs)] <- i
  471. common.Log.Debugf("send out %d", i)
  472. select {
  473. case err = <-errCh:
  474. t.Log(err)
  475. tp.Cancel()
  476. return
  477. default:
  478. }
  479. }
  480. log.Debugf("first send done at %d", common.TimeToUnixMilli(mockClock.Now()))
  481. actual := tp.GetCoordinator().GetCompleteCount()
  482. if !reflect.DeepEqual(tt.cc, actual) {
  483. t.Errorf("%d. checkpoint count\n\nresult mismatch:\n\nexp=%#v\n\ngot=%d\n\n", i, tt.cc, actual)
  484. }
  485. time.Sleep(1000)
  486. tp.Cancel()
  487. common.Log.Debugf("cancel and resume data %d", i)
  488. errCh := tp.Open()
  489. close(syncs[i%len(syncs)])
  490. for i := 0; i < tt.size*len(syncs); i++ {
  491. common.Log.Debugf("resending data %d", i)
  492. retry := 100
  493. for ; retry > 0; retry-- {
  494. common.Log.Debugf("retry %d", retry)
  495. if getMetric(tp, "source_text_0_records_in_total") == i {
  496. break
  497. }
  498. time.Sleep(time.Duration(100 * retry))
  499. }
  500. select {
  501. case err = <-errCh:
  502. t.Log(err)
  503. tp.Cancel()
  504. return
  505. default:
  506. }
  507. }
  508. }()
  509. common.Log.Debugf("done sending data")
  510. results := mockSink.GetResults()
  511. var maps [][]map[string]interface{}
  512. for _, v := range results {
  513. var mapRes []map[string]interface{}
  514. err := json.Unmarshal(v, &mapRes)
  515. if err != nil {
  516. t.Errorf("Failed to parse the input into map")
  517. continue
  518. }
  519. maps = append(maps, mapRes)
  520. }
  521. if len(tt.r) != len(maps) {
  522. tt.r = tt.r[:len(maps)]
  523. }
  524. if !reflect.DeepEqual(tt.r, maps) {
  525. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  526. continue
  527. }
  528. tp.Cancel()
  529. }
  530. }