extension_test.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. package processors
  2. import (
  3. "bufio"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/emqx/kuiper/common"
  7. "os"
  8. "path"
  9. "testing"
  10. "time"
  11. )
  12. //This cannot be run in Windows. And the plugins must be built to so before running this
  13. //For Windows, run it in wsl with go test xsql/processors/extension_test.go xsql/processors/xsql_processor.go
  14. func setup() *RuleProcessor {
  15. log := common.Log
  16. os.Remove(CACHE_FILE)
  17. dbDir, err := common.GetAndCreateDataLoc("test")
  18. if err != nil {
  19. log.Panic(err)
  20. }
  21. log.Infof("db location is %s", dbDir)
  22. p := NewStreamProcessor(path.Join(dbDir, "stream"))
  23. demo := `DROP STREAM ext`
  24. p.ExecStmt(demo)
  25. demo = "CREATE STREAM ext (count bigint) WITH (DATASOURCE=\"users\", FORMAT=\"JSON\", TYPE=\"random\", CONF_KEY=\"ext\")"
  26. _, err = p.ExecStmt(demo)
  27. if err != nil {
  28. panic(err)
  29. }
  30. rp := NewRuleProcessor(dbDir)
  31. return rp
  32. }
  33. var CACHE_FILE = "cache"
  34. //Test for source, sink, func and agg func extensions
  35. //The .so files must be in the plugins folder
  36. func TestExtensions(t *testing.T) {
  37. log := common.Log
  38. var tests = []struct {
  39. name string
  40. rj string
  41. r [][]map[string]interface{}
  42. }{
  43. {
  44. name: `$$test1`,
  45. rj: "{\"sql\": \"SELECT echo(count) as e, countPlusOne(count) as p FROM ext where count > 49\",\"actions\": [{\"file\": {\"path\":\"" + CACHE_FILE + "\"}}]}",
  46. },
  47. }
  48. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  49. rp := setup()
  50. done := make(chan struct{})
  51. defer close(done)
  52. for i, tt := range tests {
  53. rp.ExecDrop("$$test1")
  54. rs, err := rp.ExecCreate(tt.name, tt.rj)
  55. if err != nil {
  56. t.Errorf("failed to create rule: %s.", err)
  57. continue
  58. }
  59. tp, err := rp.ExecInitRule(rs)
  60. if err != nil {
  61. t.Errorf("fail to init rule: %v", err)
  62. continue
  63. }
  64. go func() {
  65. select {
  66. case err := <-tp.Open():
  67. log.Println(err)
  68. tp.Cancel()
  69. }
  70. }()
  71. time.Sleep(5000 * time.Millisecond)
  72. log.Printf("exit main program after 5 seconds")
  73. results := getResults()
  74. if len(results) == 0 {
  75. t.Errorf("no result found")
  76. continue
  77. }
  78. log.Debugf("get results %v", results)
  79. var maps [][]map[string]interface{}
  80. for _, v := range results {
  81. var mapRes []map[string]interface{}
  82. err := json.Unmarshal([]byte(v), &mapRes)
  83. if err != nil {
  84. t.Errorf("Failed to parse the input into map")
  85. continue
  86. }
  87. maps = append(maps, mapRes)
  88. }
  89. for _, r := range maps {
  90. if len(r) != 1 {
  91. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  92. break
  93. }
  94. r := r[0]
  95. e := int((r["e"]).(float64))
  96. if e != 50 && e != 51 {
  97. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  98. break
  99. }
  100. p := int(r["p"].(float64))
  101. if p != 2 {
  102. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  103. break
  104. }
  105. }
  106. tp.Cancel()
  107. }
  108. }
  109. func getResults() []string {
  110. f, err := os.Open(CACHE_FILE)
  111. if err != nil {
  112. panic(err)
  113. }
  114. defer f.Close()
  115. result := make([]string, 0)
  116. scanner := bufio.NewScanner(f)
  117. for scanner.Scan() {
  118. result = append(result, scanner.Text())
  119. }
  120. if err := scanner.Err(); err != nil {
  121. panic(err)
  122. }
  123. return result
  124. }