extension_test.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. // +build !windows
  2. package processors
  3. import (
  4. "bufio"
  5. "encoding/json"
  6. "fmt"
  7. "github.com/emqx/kuiper/common"
  8. "os"
  9. "path"
  10. "testing"
  11. "time"
  12. )
  13. //This cannot be run in Windows. And the plugins must be built to so before running this
  14. //For Windows, run it in wsl with go test xsql/processors/extension_test.go xsql/processors/xsql_processor.go
  15. func setup() *RuleProcessor {
  16. log := common.Log
  17. os.Remove(CACHE_FILE)
  18. dbDir, err := common.GetAndCreateDataLoc("test")
  19. if err != nil {
  20. log.Panic(err)
  21. }
  22. log.Infof("db location is %s", dbDir)
  23. p := NewStreamProcessor(path.Join(dbDir, "stream"))
  24. demo := `DROP STREAM ext`
  25. p.ExecStmt(demo)
  26. demo = "CREATE STREAM ext (count bigint) WITH (DATASOURCE=\"users\", FORMAT=\"JSON\", TYPE=\"random\", CONF_KEY=\"ext\")"
  27. _, err = p.ExecStmt(demo)
  28. if err != nil {
  29. panic(err)
  30. }
  31. rp := NewRuleProcessor(dbDir)
  32. return rp
  33. }
  34. var CACHE_FILE = "cache"
  35. //Test for source, sink, func and agg func extensions
  36. //The .so files must be in the plugins folder
  37. func TestExtensions(t *testing.T) {
  38. log := common.Log
  39. var tests = []struct {
  40. name string
  41. rj string
  42. r [][]map[string]interface{}
  43. }{
  44. {
  45. name: `$$test1`,
  46. rj: "{\"sql\": \"SELECT echo(count) as e, countPlusOne(count) as p FROM ext where count > 49\",\"actions\": [{\"file\": {\"path\":\"" + CACHE_FILE + "\"}}]}",
  47. },
  48. }
  49. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  50. rp := setup()
  51. done := make(chan struct{})
  52. defer close(done)
  53. for i, tt := range tests {
  54. rp.ExecDrop("$$test1")
  55. rs, err := rp.ExecCreate(tt.name, tt.rj)
  56. if err != nil {
  57. t.Errorf("failed to create rule: %s.", err)
  58. continue
  59. }
  60. tp, err := rp.ExecInitRule(rs)
  61. if err != nil {
  62. t.Errorf("fail to init rule: %v", err)
  63. continue
  64. }
  65. go func() {
  66. select {
  67. case err := <-tp.Open():
  68. log.Println(err)
  69. tp.Cancel()
  70. }
  71. }()
  72. time.Sleep(5000 * time.Millisecond)
  73. log.Printf("exit main program after 5 seconds")
  74. results := getResults()
  75. if len(results) == 0 {
  76. t.Errorf("no result found")
  77. continue
  78. }
  79. log.Debugf("get results %v", results)
  80. var maps [][]map[string]interface{}
  81. for _, v := range results {
  82. var mapRes []map[string]interface{}
  83. err := json.Unmarshal([]byte(v), &mapRes)
  84. if err != nil {
  85. t.Errorf("Failed to parse the input into map")
  86. continue
  87. }
  88. maps = append(maps, mapRes)
  89. }
  90. for _, r := range maps {
  91. if len(r) != 1 {
  92. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  93. break
  94. }
  95. r := r[0]
  96. e := int((r["e"]).(float64))
  97. if e != 50 && e != 51 {
  98. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  99. break
  100. }
  101. p := int(r["p"].(float64))
  102. if p != 2 {
  103. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  104. break
  105. }
  106. }
  107. tp.Cancel()
  108. }
  109. }
  110. func getResults() []string {
  111. f, err := os.Open(CACHE_FILE)
  112. if err != nil {
  113. panic(err)
  114. }
  115. defer f.Close()
  116. result := make([]string, 0)
  117. scanner := bufio.NewScanner(f)
  118. for scanner.Scan() {
  119. result = append(result, scanner.Text())
  120. }
  121. if err := scanner.Err(); err != nil {
  122. panic(err)
  123. }
  124. return result
  125. }