plugin_rule_test.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. // +build !windows
  15. package topotest
  16. import (
  17. "bufio"
  18. "encoding/json"
  19. "fmt"
  20. "github.com/lf-edge/ekuiper/internal/conf"
  21. "github.com/lf-edge/ekuiper/internal/plugin"
  22. "github.com/lf-edge/ekuiper/internal/topo/planner"
  23. "github.com/lf-edge/ekuiper/internal/topo/topotest/mockclock"
  24. "github.com/lf-edge/ekuiper/internal/xsql"
  25. "github.com/lf-edge/ekuiper/pkg/api"
  26. "os"
  27. "testing"
  28. "time"
  29. )
  30. var manager *plugin.Manager
  31. func init() {
  32. var err error
  33. manager, err = plugin.NewPluginManager()
  34. if err != nil {
  35. panic(err)
  36. }
  37. xsql.InitFuncRegisters(manager)
  38. }
  39. //This cannot be run in Windows. And the plugins must be built to so before running this
  40. //For Windows, run it in wsl with go test xsql/processors/extension_test.go xsql/processors/xsql_processor.go
  41. var CACHE_FILE = "cache"
  42. //Test for source, sink, func and agg func extensions
  43. //The .so files must be in the plugins folder
  44. func TestExtensions(t *testing.T) {
  45. log := conf.Log
  46. //Reset
  47. streamList := []string{"ext", "ext2"}
  48. HandleStream(false, streamList, t)
  49. os.Remove(CACHE_FILE)
  50. os.Create(CACHE_FILE)
  51. var tests = []struct {
  52. name string
  53. rj string
  54. minLength int
  55. maxLength int
  56. }{
  57. {
  58. name: `TestExtensionsRule1`,
  59. rj: "{\"sql\": \"SELECT count(echo(count)) as c, echo(count) as e, countPlusOne(count) as p FROM ext where count > 49\",\"actions\": [{\"file\": {\"path\":\"" + CACHE_FILE + "\"}}]}",
  60. minLength: 5,
  61. }, {
  62. name: `TestExtensionsRule2`,
  63. rj: "{\"sql\": \"SELECT count(echo(count)) as c, echo(count) as e, countPlusOne(count) as p FROM ext2\",\"actions\": [{\"file\": {\"path\":\"" + CACHE_FILE + "\"}}]}",
  64. maxLength: 2,
  65. },
  66. }
  67. HandleStream(true, streamList, t)
  68. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  69. for i, tt := range tests {
  70. mockclock.ResetClock(1541152486000)
  71. // Create rule
  72. rs, err := CreateRule(tt.name, tt.rj)
  73. if err != nil {
  74. t.Errorf("failed to create rule: %s.", err)
  75. continue
  76. }
  77. tp, err := planner.Plan(rs)
  78. if err != nil {
  79. t.Errorf("fail to init rule: %v", err)
  80. continue
  81. }
  82. go func() {
  83. select {
  84. case err := <-tp.Open():
  85. log.Println(err)
  86. tp.Cancel()
  87. case <-time.After(900 * time.Millisecond):
  88. tp.Cancel()
  89. }
  90. }()
  91. time.Sleep(1000 * time.Millisecond)
  92. log.Printf("exit main program after a second")
  93. results := getResults()
  94. log.Infof("get results %v", results)
  95. os.Remove(CACHE_FILE)
  96. var maps [][]map[string]interface{}
  97. for _, v := range results {
  98. var mapRes []map[string]interface{}
  99. err := json.Unmarshal([]byte(v), &mapRes)
  100. if err != nil {
  101. t.Errorf("Failed to parse the input into map")
  102. continue
  103. }
  104. maps = append(maps, mapRes)
  105. }
  106. if tt.minLength > 0 {
  107. if len(maps) < tt.minLength {
  108. t.Errorf("%d. %q\n\nresult length is smaller than minlength:\n\ngot=%#v\n\n", i, tt.rj, maps)
  109. break
  110. }
  111. }
  112. if tt.maxLength > 0 {
  113. if len(maps) > tt.maxLength {
  114. t.Errorf("%d. %q\n\nresult length is bigger than maxLength:\n\ngot=%#v\n\n", i, tt.rj, maps)
  115. break
  116. }
  117. }
  118. for _, r := range maps {
  119. if len(r) != 1 {
  120. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  121. break
  122. }
  123. r := r[0]
  124. c := int((r["c"]).(float64))
  125. if c != 1 {
  126. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  127. break
  128. }
  129. e := int((r["e"]).(float64))
  130. if e != 50 && e != 51 {
  131. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  132. break
  133. }
  134. p := int(r["p"].(float64))
  135. if p != 2 {
  136. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  137. break
  138. }
  139. }
  140. }
  141. }
  142. func getResults() []string {
  143. f, err := os.Open(CACHE_FILE)
  144. if err != nil {
  145. panic(err)
  146. }
  147. result := make([]string, 0)
  148. scanner := bufio.NewScanner(f)
  149. for scanner.Scan() {
  150. result = append(result, scanner.Text())
  151. }
  152. if err := scanner.Err(); err != nil {
  153. panic(err)
  154. }
  155. f.Close()
  156. return result
  157. }
  158. func TestFuncState(t *testing.T) {
  159. //Reset
  160. streamList := []string{"text"}
  161. HandleStream(false, streamList, t)
  162. //Data setup
  163. var tests = []RuleTest{
  164. {
  165. Name: `TestFuncStateRule1`,
  166. Sql: `SELECT accumulateWordCount(slogan, " ") as wc FROM text`,
  167. R: [][]map[string]interface{}{
  168. {{
  169. "wc": float64(3),
  170. }},
  171. {{
  172. "wc": float64(6),
  173. }},
  174. {{
  175. "wc": float64(8),
  176. }},
  177. {{
  178. "wc": float64(16),
  179. }},
  180. {{
  181. "wc": float64(20),
  182. }},
  183. {{
  184. "wc": float64(25),
  185. }},
  186. {{
  187. "wc": float64(27),
  188. }},
  189. {{
  190. "wc": float64(30),
  191. }},
  192. },
  193. M: map[string]interface{}{
  194. "op_1_preprocessor_text_0_exceptions_total": int64(0),
  195. "op_1_preprocessor_text_0_process_latency_us": int64(0),
  196. "op_1_preprocessor_text_0_records_in_total": int64(8),
  197. "op_1_preprocessor_text_0_records_out_total": int64(8),
  198. "op_2_project_0_exceptions_total": int64(0),
  199. "op_2_project_0_process_latency_us": int64(0),
  200. "op_2_project_0_records_in_total": int64(8),
  201. "op_2_project_0_records_out_total": int64(8),
  202. "sink_mockSink_0_exceptions_total": int64(0),
  203. "sink_mockSink_0_records_in_total": int64(8),
  204. "sink_mockSink_0_records_out_total": int64(8),
  205. "source_text_0_exceptions_total": int64(0),
  206. "source_text_0_records_in_total": int64(8),
  207. "source_text_0_records_out_total": int64(8),
  208. },
  209. },
  210. }
  211. HandleStream(true, streamList, t)
  212. DoRuleTest(t, tests, 0, &api.RuleOption{
  213. BufferLength: 100,
  214. SendError: true,
  215. }, 0)
  216. }
  217. func TestFuncStateCheckpoint(t *testing.T) {
  218. streamList := []string{"text"}
  219. HandleStream(false, streamList, t)
  220. var tests = []RuleCheckpointTest{
  221. {
  222. RuleTest: RuleTest{
  223. Name: `TestFuncStateCheckpointRule1`,
  224. Sql: `SELECT accumulateWordCount(slogan, " ") as wc FROM text`,
  225. R: [][]map[string]interface{}{
  226. {{
  227. "wc": float64(3),
  228. }},
  229. {{
  230. "wc": float64(6),
  231. }},
  232. {{
  233. "wc": float64(8),
  234. }},
  235. {{
  236. "wc": float64(8),
  237. }},
  238. {{
  239. "wc": float64(16),
  240. }},
  241. {{
  242. "wc": float64(20),
  243. }},
  244. {{
  245. "wc": float64(25),
  246. }},
  247. {{
  248. "wc": float64(27),
  249. }},
  250. {{
  251. "wc": float64(30),
  252. }},
  253. },
  254. M: map[string]interface{}{
  255. "op_1_preprocessor_text_0_exceptions_total": int64(0),
  256. "op_1_preprocessor_text_0_process_latency_us": int64(0),
  257. "op_1_preprocessor_text_0_records_in_total": int64(6),
  258. "op_1_preprocessor_text_0_records_out_total": int64(6),
  259. "op_2_project_0_exceptions_total": int64(0),
  260. "op_2_project_0_process_latency_us": int64(0),
  261. "op_2_project_0_records_in_total": int64(6),
  262. "op_2_project_0_records_out_total": int64(6),
  263. "sink_mockSink_0_exceptions_total": int64(0),
  264. "sink_mockSink_0_records_in_total": int64(6),
  265. "sink_mockSink_0_records_out_total": int64(6),
  266. "source_text_0_exceptions_total": int64(0),
  267. "source_text_0_records_in_total": int64(6),
  268. "source_text_0_records_out_total": int64(6),
  269. },
  270. },
  271. PauseSize: 3,
  272. Cc: 1,
  273. PauseMetric: map[string]interface{}{
  274. "op_1_preprocessor_text_0_exceptions_total": int64(0),
  275. "op_1_preprocessor_text_0_process_latency_us": int64(0),
  276. "op_1_preprocessor_text_0_records_in_total": int64(3),
  277. "op_1_preprocessor_text_0_records_out_total": int64(3),
  278. "op_2_project_0_exceptions_total": int64(0),
  279. "op_2_project_0_process_latency_us": int64(0),
  280. "op_2_project_0_records_in_total": int64(3),
  281. "op_2_project_0_records_out_total": int64(3),
  282. "sink_mockSink_0_exceptions_total": int64(0),
  283. "sink_mockSink_0_records_in_total": int64(3),
  284. "sink_mockSink_0_records_out_total": int64(3),
  285. "source_text_0_exceptions_total": int64(0),
  286. "source_text_0_records_in_total": int64(3),
  287. "source_text_0_records_out_total": int64(3),
  288. },
  289. },
  290. }
  291. HandleStream(true, streamList, t)
  292. DoCheckpointRuleTest(t, tests, 0, &api.RuleOption{
  293. BufferLength: 100,
  294. Qos: api.AtLeastOnce,
  295. CheckpointInterval: 2000,
  296. SendError: true,
  297. })
  298. }