extension_test.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. package processors
  2. import (
  3. "bufio"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/emqx/kuiper/common"
  7. "os"
  8. "path"
  9. "testing"
  10. "time"
  11. )
  12. //This cannot be run in Windows. And the plugins must be built to so before running this
  13. //For Windows, run it in wsl with go test xsql/processors/extension_test.go xsql/processors/xsql_processor.go
  14. func setup() *RuleProcessor {
  15. log := common.Log
  16. os.Remove(CACHE_FILE)
  17. dbDir, err := common.GetAndCreateDataLoc("test")
  18. if err != nil {
  19. log.Panic(err)
  20. }
  21. log.Infof("db location is %s", dbDir)
  22. demo := `DROP STREAM ext`
  23. NewStreamProcessor(demo, path.Join(dbDir, "stream")).Exec()
  24. demo = "CREATE STREAM ext (count bigint) WITH (DATASOURCE=\"users\", FORMAT=\"JSON\", TYPE=\"random\", CONF_KEY=\"ext\")"
  25. _, err = NewStreamProcessor(demo, path.Join(dbDir, "stream")).Exec()
  26. if err != nil {
  27. panic(err)
  28. }
  29. rp := NewRuleProcessor(dbDir)
  30. return rp
  31. }
  32. var CACHE_FILE = "cache"
  33. //Test for source, sink, func and agg func extensions
  34. //The .so files must be in the plugins folder
  35. func TestExtensions(t *testing.T) {
  36. log := common.Log
  37. var tests = []struct {
  38. name string
  39. rj string
  40. r [][]map[string]interface{}
  41. }{
  42. {
  43. name: `$$test1`,
  44. rj: "{\"sql\": \"SELECT echo(count) as e, countPlusOne(count) as p FROM ext where count > 49\",\"actions\": [{\"file\": {\"path\":\"" + CACHE_FILE + "\"}}]}",
  45. },
  46. }
  47. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  48. rp := setup()
  49. done := make(chan struct{})
  50. defer close(done)
  51. for i, tt := range tests {
  52. rp.ExecDrop("$$test1")
  53. rs, err := rp.ExecCreate(tt.name, tt.rj)
  54. if err != nil {
  55. t.Errorf("failed to create rule: %s.", err)
  56. continue
  57. }
  58. tp, err := rp.ExecInitRule(rs)
  59. if err != nil {
  60. t.Errorf("fail to init rule: %v", err)
  61. continue
  62. }
  63. go func() {
  64. select {
  65. case err := <-tp.Open():
  66. log.Println(err)
  67. tp.Cancel()
  68. }
  69. }()
  70. time.Sleep(5000 * time.Millisecond)
  71. log.Printf("exit main program after 5 seconds")
  72. results := getResults()
  73. if len(results) == 0 {
  74. t.Errorf("no result found")
  75. continue
  76. }
  77. log.Debugf("get results %v", results)
  78. var maps [][]map[string]interface{}
  79. for _, v := range results {
  80. var mapRes []map[string]interface{}
  81. err := json.Unmarshal([]byte(v), &mapRes)
  82. if err != nil {
  83. t.Errorf("Failed to parse the input into map")
  84. continue
  85. }
  86. maps = append(maps, mapRes)
  87. }
  88. for _, r := range maps {
  89. if len(r) != 1 {
  90. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  91. break
  92. }
  93. r := r[0]
  94. e := int((r["e"]).(float64))
  95. if e != 50 && e != 51 {
  96. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  97. break
  98. }
  99. p := int(r["p"].(float64))
  100. if p != 2 {
  101. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  102. break
  103. }
  104. }
  105. tp.Cancel()
  106. }
  107. }
  108. func getResults() []string {
  109. f, err := os.Open(CACHE_FILE)
  110. if err != nil {
  111. panic(err)
  112. }
  113. defer f.Close()
  114. result := make([]string, 0)
  115. scanner := bufio.NewScanner(f)
  116. for scanner.Scan() {
  117. result = append(result, scanner.Text())
  118. }
  119. if err := scanner.Err(); err != nil {
  120. panic(err)
  121. }
  122. return result
  123. }