extension_test.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295
  1. // +build !windows
  2. package processors
  3. import (
  4. "bufio"
  5. "encoding/json"
  6. "fmt"
  7. "github.com/emqx/kuiper/common"
  8. "github.com/emqx/kuiper/xstream/api"
  9. "github.com/emqx/kuiper/xstream/test"
  10. "os"
  11. "testing"
  12. "time"
  13. )
  14. //This cannot be run in Windows. And the plugins must be built to so before running this
  15. //For Windows, run it in wsl with go test xsql/processors/extension_test.go xsql/processors/xsql_processor.go
  16. var CACHE_FILE = "cache"
  17. //Test for source, sink, func and agg func extensions
  18. //The .so files must be in the plugins folder
  19. func TestExtensions(t *testing.T) {
  20. log := common.Log
  21. //Reset
  22. streamList := []string{"ext", "ext2"}
  23. handleStream(false, streamList, t)
  24. os.Remove(CACHE_FILE)
  25. os.Create(CACHE_FILE)
  26. var tests = []struct {
  27. name string
  28. rj string
  29. minLength int
  30. maxLength int
  31. }{
  32. {
  33. name: `TestExtensionsRule1`,
  34. rj: "{\"sql\": \"SELECT count(echo(count)) as c, echo(count) as e, countPlusOne(count) as p FROM ext where count > 49\",\"actions\": [{\"file\": {\"path\":\"" + CACHE_FILE + "\"}}]}",
  35. minLength: 5,
  36. }, {
  37. name: `TestExtensionsRule2`,
  38. rj: "{\"sql\": \"SELECT count(echo(count)) as c, echo(count) as e, countPlusOne(count) as p FROM ext2\",\"actions\": [{\"file\": {\"path\":\"" + CACHE_FILE + "\"}}]}",
  39. maxLength: 2,
  40. },
  41. }
  42. handleStream(true, streamList, t)
  43. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  44. for i, tt := range tests {
  45. // Rest for each test
  46. cleanStateData()
  47. test.ResetClock(1541152485800)
  48. // Create stream
  49. p := NewRuleProcessor(DbDir)
  50. p.ExecDrop(tt.name)
  51. rs, err := p.ExecCreate(tt.name, tt.rj)
  52. if err != nil {
  53. t.Errorf("failed to create rule: %s.", err)
  54. continue
  55. }
  56. tp, err := p.ExecInitRule(rs)
  57. if err != nil {
  58. t.Errorf("fail to init rule: %v", err)
  59. continue
  60. }
  61. go func() {
  62. select {
  63. case err := <-tp.Open():
  64. log.Println(err)
  65. tp.Cancel()
  66. case <-time.After(900 * time.Millisecond):
  67. tp.Cancel()
  68. }
  69. }()
  70. time.Sleep(1000 * time.Millisecond)
  71. log.Printf("exit main program after a second")
  72. results := getResults()
  73. log.Infof("get results %v", results)
  74. os.Remove(CACHE_FILE)
  75. var maps [][]map[string]interface{}
  76. for _, v := range results {
  77. var mapRes []map[string]interface{}
  78. err := json.Unmarshal([]byte(v), &mapRes)
  79. if err != nil {
  80. t.Errorf("Failed to parse the input into map")
  81. continue
  82. }
  83. maps = append(maps, mapRes)
  84. }
  85. if tt.minLength > 0 {
  86. if len(maps) < tt.minLength {
  87. t.Errorf("%d. %q\n\nresult length is smaller than minlength:\n\ngot=%#v\n\n", i, tt.rj, maps)
  88. break
  89. }
  90. }
  91. if tt.maxLength > 0 {
  92. if len(maps) > tt.maxLength {
  93. t.Errorf("%d. %q\n\nresult length is bigger than maxLength:\n\ngot=%#v\n\n", i, tt.rj, maps)
  94. break
  95. }
  96. }
  97. for _, r := range maps {
  98. if len(r) != 1 {
  99. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  100. break
  101. }
  102. r := r[0]
  103. c := int((r["c"]).(float64))
  104. if c != 1 {
  105. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  106. break
  107. }
  108. e := int((r["e"]).(float64))
  109. if e != 50 && e != 51 {
  110. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  111. break
  112. }
  113. p := int(r["p"].(float64))
  114. if p != 2 {
  115. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  116. break
  117. }
  118. }
  119. }
  120. }
  121. func getResults() []string {
  122. f, err := os.Open(CACHE_FILE)
  123. if err != nil {
  124. panic(err)
  125. }
  126. result := make([]string, 0)
  127. scanner := bufio.NewScanner(f)
  128. for scanner.Scan() {
  129. result = append(result, scanner.Text())
  130. }
  131. if err := scanner.Err(); err != nil {
  132. panic(err)
  133. }
  134. f.Close()
  135. return result
  136. }
  137. func TestFuncState(t *testing.T) {
  138. //Reset
  139. streamList := []string{"text"}
  140. handleStream(false, streamList, t)
  141. //Data setup
  142. var tests = []ruleTest{
  143. {
  144. name: `TestFuncStateRule1`,
  145. sql: `SELECT accumulateWordCount(slogan, " ") as wc FROM text`,
  146. r: [][]map[string]interface{}{
  147. {{
  148. "wc": float64(3),
  149. }},
  150. {{
  151. "wc": float64(6),
  152. }},
  153. {{
  154. "wc": float64(8),
  155. }},
  156. {{
  157. "wc": float64(16),
  158. }},
  159. {{
  160. "wc": float64(20),
  161. }},
  162. {{
  163. "wc": float64(25),
  164. }},
  165. {{
  166. "wc": float64(27),
  167. }},
  168. {{
  169. "wc": float64(30),
  170. }},
  171. },
  172. m: map[string]interface{}{
  173. "op_preprocessor_text_0_exceptions_total": int64(0),
  174. "op_preprocessor_text_0_process_latency_ms": int64(0),
  175. "op_preprocessor_text_0_records_in_total": int64(8),
  176. "op_preprocessor_text_0_records_out_total": int64(8),
  177. "op_project_0_exceptions_total": int64(0),
  178. "op_project_0_process_latency_ms": int64(0),
  179. "op_project_0_records_in_total": int64(8),
  180. "op_project_0_records_out_total": int64(8),
  181. "sink_mockSink_0_exceptions_total": int64(0),
  182. "sink_mockSink_0_records_in_total": int64(8),
  183. "sink_mockSink_0_records_out_total": int64(8),
  184. "source_text_0_exceptions_total": int64(0),
  185. "source_text_0_records_in_total": int64(8),
  186. "source_text_0_records_out_total": int64(8),
  187. },
  188. },
  189. }
  190. handleStream(true, streamList, t)
  191. doRuleTest(t, tests, 0, &api.RuleOption{
  192. BufferLength: 100,
  193. })
  194. }
  195. func TestFuncStateCheckpoint(t *testing.T) {
  196. common.IsTesting = true
  197. streamList := []string{"text"}
  198. handleStream(false, streamList, t)
  199. var tests = []ruleCheckpointTest{
  200. {
  201. ruleTest: ruleTest{
  202. name: `TestFuncStateCheckpointRule1`,
  203. sql: `SELECT accumulateWordCount(slogan, " ") as wc FROM text`,
  204. r: [][]map[string]interface{}{
  205. {{
  206. "wc": float64(3),
  207. }},
  208. {{
  209. "wc": float64(6),
  210. }},
  211. {{
  212. "wc": float64(8),
  213. }},
  214. {{
  215. "wc": float64(8),
  216. }},
  217. {{
  218. "wc": float64(16),
  219. }},
  220. {{
  221. "wc": float64(20),
  222. }},
  223. {{
  224. "wc": float64(25),
  225. }},
  226. {{
  227. "wc": float64(27),
  228. }},
  229. {{
  230. "wc": float64(30),
  231. }},
  232. },
  233. m: map[string]interface{}{
  234. "op_preprocessor_text_0_exceptions_total": int64(0),
  235. "op_preprocessor_text_0_process_latency_ms": int64(0),
  236. "op_preprocessor_text_0_records_in_total": int64(6),
  237. "op_preprocessor_text_0_records_out_total": int64(6),
  238. "op_project_0_exceptions_total": int64(0),
  239. "op_project_0_process_latency_ms": int64(0),
  240. "op_project_0_records_in_total": int64(6),
  241. "op_project_0_records_out_total": int64(6),
  242. "sink_mockSink_0_exceptions_total": int64(0),
  243. "sink_mockSink_0_records_in_total": int64(6),
  244. "sink_mockSink_0_records_out_total": int64(6),
  245. "source_text_0_exceptions_total": int64(0),
  246. "source_text_0_records_in_total": int64(6),
  247. "source_text_0_records_out_total": int64(6),
  248. },
  249. },
  250. pauseSize: 3,
  251. cc: 1,
  252. pauseMetric: map[string]interface{}{
  253. "op_preprocessor_text_0_exceptions_total": int64(0),
  254. "op_preprocessor_text_0_process_latency_ms": int64(0),
  255. "op_preprocessor_text_0_records_in_total": int64(3),
  256. "op_preprocessor_text_0_records_out_total": int64(3),
  257. "op_project_0_exceptions_total": int64(0),
  258. "op_project_0_process_latency_ms": int64(0),
  259. "op_project_0_records_in_total": int64(3),
  260. "op_project_0_records_out_total": int64(3),
  261. "sink_mockSink_0_exceptions_total": int64(0),
  262. "sink_mockSink_0_records_in_total": int64(3),
  263. "sink_mockSink_0_records_out_total": int64(3),
  264. "source_text_0_exceptions_total": int64(0),
  265. "source_text_0_records_in_total": int64(3),
  266. "source_text_0_records_out_total": int64(3),
  267. },
  268. },
  269. }
  270. handleStream(true, streamList, t)
  271. doCheckpointRuleTest(t, tests, 0, &api.RuleOption{
  272. BufferLength: 100,
  273. Qos: api.AtLeastOnce,
  274. CheckpointInterval: 2000,
  275. })
  276. }