plugin_rule_test.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. // +build !windows
  2. package topotest
  3. import (
  4. "bufio"
  5. "encoding/json"
  6. "fmt"
  7. "github.com/emqx/kuiper/internal/conf"
  8. "github.com/emqx/kuiper/internal/plugin"
  9. "github.com/emqx/kuiper/internal/topo/planner"
  10. "github.com/emqx/kuiper/internal/topo/topotest/mockclock"
  11. "github.com/emqx/kuiper/internal/xsql"
  12. "github.com/emqx/kuiper/pkg/api"
  13. "os"
  14. "testing"
  15. "time"
  16. )
  17. var manager *plugin.Manager
  18. func init() {
  19. var err error
  20. manager, err = plugin.NewPluginManager()
  21. if err != nil {
  22. panic(err)
  23. }
  24. xsql.InitFuncRegisters(manager)
  25. }
  26. //This cannot be run in Windows. And the plugins must be built to so before running this
  27. //For Windows, run it in wsl with go test xsql/processors/extension_test.go xsql/processors/xsql_processor.go
  28. var CACHE_FILE = "cache"
  29. //Test for source, sink, func and agg func extensions
  30. //The .so files must be in the plugins folder
  31. func TestExtensions(t *testing.T) {
  32. log := conf.Log
  33. //Reset
  34. streamList := []string{"ext", "ext2"}
  35. HandleStream(false, streamList, t)
  36. os.Remove(CACHE_FILE)
  37. os.Create(CACHE_FILE)
  38. var tests = []struct {
  39. name string
  40. rj string
  41. minLength int
  42. maxLength int
  43. }{
  44. {
  45. name: `TestExtensionsRule1`,
  46. rj: "{\"sql\": \"SELECT count(echo(count)) as c, echo(count) as e, countPlusOne(count) as p FROM ext where count > 49\",\"actions\": [{\"file\": {\"path\":\"" + CACHE_FILE + "\"}}]}",
  47. minLength: 5,
  48. }, {
  49. name: `TestExtensionsRule2`,
  50. rj: "{\"sql\": \"SELECT count(echo(count)) as c, echo(count) as e, countPlusOne(count) as p FROM ext2\",\"actions\": [{\"file\": {\"path\":\"" + CACHE_FILE + "\"}}]}",
  51. maxLength: 2,
  52. },
  53. }
  54. HandleStream(true, streamList, t)
  55. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  56. for i, tt := range tests {
  57. mockclock.ResetClock(1541152486000)
  58. // Create rule
  59. rs, err := CreateRule(tt.name, tt.rj)
  60. if err != nil {
  61. t.Errorf("failed to create rule: %s.", err)
  62. continue
  63. }
  64. tp, err := planner.Plan(rs, DbDir)
  65. if err != nil {
  66. t.Errorf("fail to init rule: %v", err)
  67. continue
  68. }
  69. go func() {
  70. select {
  71. case err := <-tp.Open():
  72. log.Println(err)
  73. tp.Cancel()
  74. case <-time.After(900 * time.Millisecond):
  75. tp.Cancel()
  76. }
  77. }()
  78. time.Sleep(1000 * time.Millisecond)
  79. log.Printf("exit main program after a second")
  80. results := getResults()
  81. log.Infof("get results %v", results)
  82. os.Remove(CACHE_FILE)
  83. var maps [][]map[string]interface{}
  84. for _, v := range results {
  85. var mapRes []map[string]interface{}
  86. err := json.Unmarshal([]byte(v), &mapRes)
  87. if err != nil {
  88. t.Errorf("Failed to parse the input into map")
  89. continue
  90. }
  91. maps = append(maps, mapRes)
  92. }
  93. if tt.minLength > 0 {
  94. if len(maps) < tt.minLength {
  95. t.Errorf("%d. %q\n\nresult length is smaller than minlength:\n\ngot=%#v\n\n", i, tt.rj, maps)
  96. break
  97. }
  98. }
  99. if tt.maxLength > 0 {
  100. if len(maps) > tt.maxLength {
  101. t.Errorf("%d. %q\n\nresult length is bigger than maxLength:\n\ngot=%#v\n\n", i, tt.rj, maps)
  102. break
  103. }
  104. }
  105. for _, r := range maps {
  106. if len(r) != 1 {
  107. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  108. break
  109. }
  110. r := r[0]
  111. c := int((r["c"]).(float64))
  112. if c != 1 {
  113. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  114. break
  115. }
  116. e := int((r["e"]).(float64))
  117. if e != 50 && e != 51 {
  118. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  119. break
  120. }
  121. p := int(r["p"].(float64))
  122. if p != 2 {
  123. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  124. break
  125. }
  126. }
  127. }
  128. }
  129. func getResults() []string {
  130. f, err := os.Open(CACHE_FILE)
  131. if err != nil {
  132. panic(err)
  133. }
  134. result := make([]string, 0)
  135. scanner := bufio.NewScanner(f)
  136. for scanner.Scan() {
  137. result = append(result, scanner.Text())
  138. }
  139. if err := scanner.Err(); err != nil {
  140. panic(err)
  141. }
  142. f.Close()
  143. return result
  144. }
  145. func TestFuncState(t *testing.T) {
  146. //Reset
  147. streamList := []string{"text"}
  148. HandleStream(false, streamList, t)
  149. //Data setup
  150. var tests = []RuleTest{
  151. {
  152. Name: `TestFuncStateRule1`,
  153. Sql: `SELECT accumulateWordCount(slogan, " ") as wc FROM text`,
  154. R: [][]map[string]interface{}{
  155. {{
  156. "wc": float64(3),
  157. }},
  158. {{
  159. "wc": float64(6),
  160. }},
  161. {{
  162. "wc": float64(8),
  163. }},
  164. {{
  165. "wc": float64(16),
  166. }},
  167. {{
  168. "wc": float64(20),
  169. }},
  170. {{
  171. "wc": float64(25),
  172. }},
  173. {{
  174. "wc": float64(27),
  175. }},
  176. {{
  177. "wc": float64(30),
  178. }},
  179. },
  180. M: map[string]interface{}{
  181. "op_1_preprocessor_text_0_exceptions_total": int64(0),
  182. "op_1_preprocessor_text_0_process_latency_us": int64(0),
  183. "op_1_preprocessor_text_0_records_in_total": int64(8),
  184. "op_1_preprocessor_text_0_records_out_total": int64(8),
  185. "op_2_project_0_exceptions_total": int64(0),
  186. "op_2_project_0_process_latency_us": int64(0),
  187. "op_2_project_0_records_in_total": int64(8),
  188. "op_2_project_0_records_out_total": int64(8),
  189. "sink_mockSink_0_exceptions_total": int64(0),
  190. "sink_mockSink_0_records_in_total": int64(8),
  191. "sink_mockSink_0_records_out_total": int64(8),
  192. "source_text_0_exceptions_total": int64(0),
  193. "source_text_0_records_in_total": int64(8),
  194. "source_text_0_records_out_total": int64(8),
  195. },
  196. },
  197. }
  198. HandleStream(true, streamList, t)
  199. DoRuleTest(t, tests, 0, &api.RuleOption{
  200. BufferLength: 100,
  201. SendError: true,
  202. }, 0)
  203. }
  204. func TestFuncStateCheckpoint(t *testing.T) {
  205. streamList := []string{"text"}
  206. HandleStream(false, streamList, t)
  207. var tests = []RuleCheckpointTest{
  208. {
  209. RuleTest: RuleTest{
  210. Name: `TestFuncStateCheckpointRule1`,
  211. Sql: `SELECT accumulateWordCount(slogan, " ") as wc FROM text`,
  212. R: [][]map[string]interface{}{
  213. {{
  214. "wc": float64(3),
  215. }},
  216. {{
  217. "wc": float64(6),
  218. }},
  219. {{
  220. "wc": float64(8),
  221. }},
  222. {{
  223. "wc": float64(8),
  224. }},
  225. {{
  226. "wc": float64(16),
  227. }},
  228. {{
  229. "wc": float64(20),
  230. }},
  231. {{
  232. "wc": float64(25),
  233. }},
  234. {{
  235. "wc": float64(27),
  236. }},
  237. {{
  238. "wc": float64(30),
  239. }},
  240. },
  241. M: map[string]interface{}{
  242. "op_1_preprocessor_text_0_exceptions_total": int64(0),
  243. "op_1_preprocessor_text_0_process_latency_us": int64(0),
  244. "op_1_preprocessor_text_0_records_in_total": int64(6),
  245. "op_1_preprocessor_text_0_records_out_total": int64(6),
  246. "op_2_project_0_exceptions_total": int64(0),
  247. "op_2_project_0_process_latency_us": int64(0),
  248. "op_2_project_0_records_in_total": int64(6),
  249. "op_2_project_0_records_out_total": int64(6),
  250. "sink_mockSink_0_exceptions_total": int64(0),
  251. "sink_mockSink_0_records_in_total": int64(6),
  252. "sink_mockSink_0_records_out_total": int64(6),
  253. "source_text_0_exceptions_total": int64(0),
  254. "source_text_0_records_in_total": int64(6),
  255. "source_text_0_records_out_total": int64(6),
  256. },
  257. },
  258. PauseSize: 3,
  259. Cc: 1,
  260. PauseMetric: map[string]interface{}{
  261. "op_1_preprocessor_text_0_exceptions_total": int64(0),
  262. "op_1_preprocessor_text_0_process_latency_us": int64(0),
  263. "op_1_preprocessor_text_0_records_in_total": int64(3),
  264. "op_1_preprocessor_text_0_records_out_total": int64(3),
  265. "op_2_project_0_exceptions_total": int64(0),
  266. "op_2_project_0_process_latency_us": int64(0),
  267. "op_2_project_0_records_in_total": int64(3),
  268. "op_2_project_0_records_out_total": int64(3),
  269. "sink_mockSink_0_exceptions_total": int64(0),
  270. "sink_mockSink_0_records_in_total": int64(3),
  271. "sink_mockSink_0_records_out_total": int64(3),
  272. "source_text_0_exceptions_total": int64(0),
  273. "source_text_0_records_in_total": int64(3),
  274. "source_text_0_records_out_total": int64(3),
  275. },
  276. },
  277. }
  278. HandleStream(true, streamList, t)
  279. DoCheckpointRuleTest(t, tests, 0, &api.RuleOption{
  280. BufferLength: 100,
  281. Qos: api.AtLeastOnce,
  282. CheckpointInterval: 2000,
  283. SendError: true,
  284. })
  285. }