extension_test.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. // +build !windows
  2. package processors
  3. import (
  4. "bufio"
  5. "encoding/json"
  6. "fmt"
  7. "github.com/emqx/kuiper/common"
  8. "github.com/emqx/kuiper/xstream/api"
  9. "github.com/emqx/kuiper/xstream/planner"
  10. "github.com/emqx/kuiper/xstream/test"
  11. "os"
  12. "testing"
  13. "time"
  14. )
  15. //This cannot be run in Windows. And the plugins must be built to so before running this
  16. //For Windows, run it in wsl with go test xsql/processors/extension_test.go xsql/processors/xsql_processor.go
  17. var CACHE_FILE = "cache"
  18. //Test for source, sink, func and agg func extensions
  19. //The .so files must be in the plugins folder
  20. func TestExtensions(t *testing.T) {
  21. log := common.Log
  22. //Reset
  23. streamList := []string{"ext", "ext2"}
  24. handleStream(false, streamList, t)
  25. os.Remove(CACHE_FILE)
  26. os.Create(CACHE_FILE)
  27. var tests = []struct {
  28. name string
  29. rj string
  30. minLength int
  31. maxLength int
  32. }{
  33. {
  34. name: `TestExtensionsRule1`,
  35. rj: "{\"sql\": \"SELECT count(echo(count)) as c, echo(count) as e, countPlusOne(count) as p FROM ext where count > 49\",\"actions\": [{\"file\": {\"path\":\"" + CACHE_FILE + "\"}}]}",
  36. minLength: 5,
  37. }, {
  38. name: `TestExtensionsRule2`,
  39. rj: "{\"sql\": \"SELECT count(echo(count)) as c, echo(count) as e, countPlusOne(count) as p FROM ext2\",\"actions\": [{\"file\": {\"path\":\"" + CACHE_FILE + "\"}}]}",
  40. maxLength: 2,
  41. },
  42. }
  43. handleStream(true, streamList, t)
  44. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  45. for i, tt := range tests {
  46. // Rest for each test
  47. cleanStateData()
  48. test.ResetClock(1541152486000)
  49. // Create stream
  50. p := NewRuleProcessor(DbDir)
  51. p.ExecDrop(tt.name)
  52. rs, err := p.ExecCreate(tt.name, tt.rj)
  53. if err != nil {
  54. t.Errorf("failed to create rule: %s.", err)
  55. continue
  56. }
  57. tp, err := planner.Plan(rs, DbDir)
  58. if err != nil {
  59. t.Errorf("fail to init rule: %v", err)
  60. continue
  61. }
  62. go func() {
  63. select {
  64. case err := <-tp.Open():
  65. log.Println(err)
  66. tp.Cancel()
  67. case <-time.After(900 * time.Millisecond):
  68. tp.Cancel()
  69. }
  70. }()
  71. time.Sleep(1000 * time.Millisecond)
  72. log.Printf("exit main program after a second")
  73. results := getResults()
  74. log.Infof("get results %v", results)
  75. os.Remove(CACHE_FILE)
  76. var maps [][]map[string]interface{}
  77. for _, v := range results {
  78. var mapRes []map[string]interface{}
  79. err := json.Unmarshal([]byte(v), &mapRes)
  80. if err != nil {
  81. t.Errorf("Failed to parse the input into map")
  82. continue
  83. }
  84. maps = append(maps, mapRes)
  85. }
  86. if tt.minLength > 0 {
  87. if len(maps) < tt.minLength {
  88. t.Errorf("%d. %q\n\nresult length is smaller than minlength:\n\ngot=%#v\n\n", i, tt.rj, maps)
  89. break
  90. }
  91. }
  92. if tt.maxLength > 0 {
  93. if len(maps) > tt.maxLength {
  94. t.Errorf("%d. %q\n\nresult length is bigger than maxLength:\n\ngot=%#v\n\n", i, tt.rj, maps)
  95. break
  96. }
  97. }
  98. for _, r := range maps {
  99. if len(r) != 1 {
  100. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  101. break
  102. }
  103. r := r[0]
  104. c := int((r["c"]).(float64))
  105. if c != 1 {
  106. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  107. break
  108. }
  109. e := int((r["e"]).(float64))
  110. if e != 50 && e != 51 {
  111. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  112. break
  113. }
  114. p := int(r["p"].(float64))
  115. if p != 2 {
  116. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  117. break
  118. }
  119. }
  120. }
  121. }
  122. func getResults() []string {
  123. f, err := os.Open(CACHE_FILE)
  124. if err != nil {
  125. panic(err)
  126. }
  127. result := make([]string, 0)
  128. scanner := bufio.NewScanner(f)
  129. for scanner.Scan() {
  130. result = append(result, scanner.Text())
  131. }
  132. if err := scanner.Err(); err != nil {
  133. panic(err)
  134. }
  135. f.Close()
  136. return result
  137. }
  138. func TestFuncState(t *testing.T) {
  139. //Reset
  140. streamList := []string{"text"}
  141. handleStream(false, streamList, t)
  142. //Data setup
  143. var tests = []ruleTest{
  144. {
  145. name: `TestFuncStateRule1`,
  146. sql: `SELECT accumulateWordCount(slogan, " ") as wc FROM text`,
  147. r: [][]map[string]interface{}{
  148. {{
  149. "wc": float64(3),
  150. }},
  151. {{
  152. "wc": float64(6),
  153. }},
  154. {{
  155. "wc": float64(8),
  156. }},
  157. {{
  158. "wc": float64(16),
  159. }},
  160. {{
  161. "wc": float64(20),
  162. }},
  163. {{
  164. "wc": float64(25),
  165. }},
  166. {{
  167. "wc": float64(27),
  168. }},
  169. {{
  170. "wc": float64(30),
  171. }},
  172. },
  173. m: map[string]interface{}{
  174. "op_1_preprocessor_text_0_exceptions_total": int64(0),
  175. "op_1_preprocessor_text_0_process_latency_us": int64(0),
  176. "op_1_preprocessor_text_0_records_in_total": int64(8),
  177. "op_1_preprocessor_text_0_records_out_total": int64(8),
  178. "op_2_project_0_exceptions_total": int64(0),
  179. "op_2_project_0_process_latency_us": int64(0),
  180. "op_2_project_0_records_in_total": int64(8),
  181. "op_2_project_0_records_out_total": int64(8),
  182. "sink_mockSink_0_exceptions_total": int64(0),
  183. "sink_mockSink_0_records_in_total": int64(8),
  184. "sink_mockSink_0_records_out_total": int64(8),
  185. "source_text_0_exceptions_total": int64(0),
  186. "source_text_0_records_in_total": int64(8),
  187. "source_text_0_records_out_total": int64(8),
  188. },
  189. },
  190. }
  191. handleStream(true, streamList, t)
  192. doRuleTest(t, tests, 0, &api.RuleOption{
  193. BufferLength: 100,
  194. SendError: true,
  195. })
  196. }
  197. func TestFuncStateCheckpoint(t *testing.T) {
  198. common.IsTesting = true
  199. streamList := []string{"text"}
  200. handleStream(false, streamList, t)
  201. var tests = []ruleCheckpointTest{
  202. {
  203. ruleTest: ruleTest{
  204. name: `TestFuncStateCheckpointRule1`,
  205. sql: `SELECT accumulateWordCount(slogan, " ") as wc FROM text`,
  206. r: [][]map[string]interface{}{
  207. {{
  208. "wc": float64(3),
  209. }},
  210. {{
  211. "wc": float64(6),
  212. }},
  213. {{
  214. "wc": float64(8),
  215. }},
  216. {{
  217. "wc": float64(8),
  218. }},
  219. {{
  220. "wc": float64(16),
  221. }},
  222. {{
  223. "wc": float64(20),
  224. }},
  225. {{
  226. "wc": float64(25),
  227. }},
  228. {{
  229. "wc": float64(27),
  230. }},
  231. {{
  232. "wc": float64(30),
  233. }},
  234. },
  235. m: map[string]interface{}{
  236. "op_1_preprocessor_text_0_exceptions_total": int64(0),
  237. "op_1_preprocessor_text_0_process_latency_us": int64(0),
  238. "op_1_preprocessor_text_0_records_in_total": int64(6),
  239. "op_1_preprocessor_text_0_records_out_total": int64(6),
  240. "op_2_project_0_exceptions_total": int64(0),
  241. "op_2_project_0_process_latency_us": int64(0),
  242. "op_2_project_0_records_in_total": int64(6),
  243. "op_2_project_0_records_out_total": int64(6),
  244. "sink_mockSink_0_exceptions_total": int64(0),
  245. "sink_mockSink_0_records_in_total": int64(6),
  246. "sink_mockSink_0_records_out_total": int64(6),
  247. "source_text_0_exceptions_total": int64(0),
  248. "source_text_0_records_in_total": int64(6),
  249. "source_text_0_records_out_total": int64(6),
  250. },
  251. },
  252. pauseSize: 3,
  253. cc: 1,
  254. pauseMetric: map[string]interface{}{
  255. "op_1_preprocessor_text_0_exceptions_total": int64(0),
  256. "op_1_preprocessor_text_0_process_latency_us": int64(0),
  257. "op_1_preprocessor_text_0_records_in_total": int64(3),
  258. "op_1_preprocessor_text_0_records_out_total": int64(3),
  259. "op_2_project_0_exceptions_total": int64(0),
  260. "op_2_project_0_process_latency_us": int64(0),
  261. "op_2_project_0_records_in_total": int64(3),
  262. "op_2_project_0_records_out_total": int64(3),
  263. "sink_mockSink_0_exceptions_total": int64(0),
  264. "sink_mockSink_0_records_in_total": int64(3),
  265. "sink_mockSink_0_records_out_total": int64(3),
  266. "source_text_0_exceptions_total": int64(0),
  267. "source_text_0_records_in_total": int64(3),
  268. "source_text_0_records_out_total": int64(3),
  269. },
  270. },
  271. }
  272. handleStream(true, streamList, t)
  273. doCheckpointRuleTest(t, tests, 0, &api.RuleOption{
  274. BufferLength: 100,
  275. Qos: api.AtLeastOnce,
  276. CheckpointInterval: 2000,
  277. SendError: true,
  278. })
  279. }