plugin_rule_test.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //go:build !windows
  15. // +build !windows
  16. package topotest
  17. import (
  18. "bufio"
  19. "encoding/json"
  20. "fmt"
  21. "github.com/lf-edge/ekuiper/internal/binder"
  22. "github.com/lf-edge/ekuiper/internal/binder/function"
  23. "github.com/lf-edge/ekuiper/internal/binder/io"
  24. "github.com/lf-edge/ekuiper/internal/conf"
  25. "github.com/lf-edge/ekuiper/internal/plugin/native"
  26. "github.com/lf-edge/ekuiper/internal/topo/planner"
  27. "github.com/lf-edge/ekuiper/internal/topo/topotest/mockclock"
  28. "github.com/lf-edge/ekuiper/pkg/api"
  29. "os"
  30. "testing"
  31. "time"
  32. )
  33. func init() {
  34. nativeManager, err := native.InitManager()
  35. if err != nil {
  36. panic(err)
  37. }
  38. nativeEntry := binder.FactoryEntry{Name: "native plugin", Factory: nativeManager}
  39. err = function.Initialize([]binder.FactoryEntry{nativeEntry})
  40. if err != nil {
  41. panic(err)
  42. }
  43. err = io.Initialize([]binder.FactoryEntry{nativeEntry})
  44. if err != nil {
  45. panic(err)
  46. }
  47. }
  48. //This cannot be run in Windows. And the plugins must be built to so before running this
  49. //For Windows, run it in wsl with go test -tags test internal/topo/topotest/plugin_rule_test.go internal/topo/topotest/mock_topo.go
  50. var CACHE_FILE = "cache"
  51. //Test for source, sink, func and agg func extensions
  52. //The .so files must be in the plugins folder
  53. func TestExtensions(t *testing.T) {
  54. log := conf.Log
  55. //Reset
  56. streamList := []string{"ext", "ext2"}
  57. HandleStream(false, streamList, t)
  58. os.Remove(CACHE_FILE)
  59. os.Create(CACHE_FILE)
  60. var tests = []struct {
  61. name string
  62. rj string
  63. minLength int
  64. maxLength int
  65. }{
  66. {
  67. name: `TestExtensionsRule1`,
  68. rj: "{\"sql\": \"SELECT count(echo(count)) as c, echo(count) as e, countPlusOne(count) as p FROM ext where count > 49\",\"actions\": [{\"file\": {\"path\":\"" + CACHE_FILE + "\"}}]}",
  69. minLength: 5,
  70. }, {
  71. name: `TestExtensionsRule2`,
  72. rj: "{\"sql\": \"SELECT count(echo(count)) as c, echo(count) as e, countPlusOne(count) as p FROM ext2\",\"actions\": [{\"file\": {\"path\":\"" + CACHE_FILE + "\"}}]}",
  73. maxLength: 2,
  74. },
  75. }
  76. HandleStream(true, streamList, t)
  77. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  78. for i, tt := range tests {
  79. mockclock.ResetClock(1541152486000)
  80. // Create rule
  81. rs, err := CreateRule(tt.name, tt.rj)
  82. if err != nil {
  83. t.Errorf("failed to create rule: %s.", err)
  84. continue
  85. }
  86. tp, err := planner.Plan(rs)
  87. if err != nil {
  88. t.Errorf("fail to init rule: %v", err)
  89. continue
  90. }
  91. go func() {
  92. select {
  93. case err := <-tp.Open():
  94. log.Println(err)
  95. tp.Cancel()
  96. case <-time.After(900 * time.Millisecond):
  97. tp.Cancel()
  98. }
  99. }()
  100. time.Sleep(1000 * time.Millisecond)
  101. log.Printf("exit main program after a second")
  102. results := getResults()
  103. log.Infof("get results %v", results)
  104. os.Remove(CACHE_FILE)
  105. var maps [][]map[string]interface{}
  106. for _, v := range results {
  107. var mapRes []map[string]interface{}
  108. err := json.Unmarshal([]byte(v), &mapRes)
  109. if err != nil {
  110. t.Errorf("Failed to parse the input into map")
  111. continue
  112. }
  113. maps = append(maps, mapRes)
  114. }
  115. if tt.minLength > 0 {
  116. if len(maps) < tt.minLength {
  117. t.Errorf("%d. %q\n\nresult length is smaller than minlength:\n\ngot=%#v\n\n", i, tt.rj, maps)
  118. break
  119. }
  120. }
  121. if tt.maxLength > 0 {
  122. if len(maps) > tt.maxLength {
  123. t.Errorf("%d. %q\n\nresult length is bigger than maxLength:\n\ngot=%#v\n\n", i, tt.rj, maps)
  124. break
  125. }
  126. }
  127. for _, r := range maps {
  128. if len(r) != 1 {
  129. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  130. break
  131. }
  132. r := r[0]
  133. c := int((r["c"]).(float64))
  134. if c != 1 {
  135. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  136. break
  137. }
  138. e := int((r["e"]).(float64))
  139. if e != 50 && e != 51 {
  140. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  141. break
  142. }
  143. p := int(r["p"].(float64))
  144. if p != 2 {
  145. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  146. break
  147. }
  148. }
  149. }
  150. }
  151. func getResults() []string {
  152. f, err := os.Open(CACHE_FILE)
  153. if err != nil {
  154. panic(err)
  155. }
  156. result := make([]string, 0)
  157. scanner := bufio.NewScanner(f)
  158. for scanner.Scan() {
  159. result = append(result, scanner.Text())
  160. }
  161. if err := scanner.Err(); err != nil {
  162. panic(err)
  163. }
  164. f.Close()
  165. return result
  166. }
  167. func TestFuncState(t *testing.T) {
  168. //Reset
  169. streamList := []string{"text"}
  170. HandleStream(false, streamList, t)
  171. //Data setup
  172. var tests = []RuleTest{
  173. {
  174. Name: `TestFuncStateRule1`,
  175. Sql: `SELECT accumulateWordCount(slogan, " ") as wc FROM text`,
  176. R: [][]map[string]interface{}{
  177. {{
  178. "wc": float64(3),
  179. }},
  180. {{
  181. "wc": float64(6),
  182. }},
  183. {{
  184. "wc": float64(8),
  185. }},
  186. {{
  187. "wc": float64(16),
  188. }},
  189. {{
  190. "wc": float64(20),
  191. }},
  192. {{
  193. "wc": float64(25),
  194. }},
  195. {{
  196. "wc": float64(27),
  197. }},
  198. {{
  199. "wc": float64(30),
  200. }},
  201. },
  202. M: map[string]interface{}{
  203. "op_1_preprocessor_text_0_exceptions_total": int64(0),
  204. "op_1_preprocessor_text_0_process_latency_us": int64(0),
  205. "op_1_preprocessor_text_0_records_in_total": int64(8),
  206. "op_1_preprocessor_text_0_records_out_total": int64(8),
  207. "op_2_project_0_exceptions_total": int64(0),
  208. "op_2_project_0_process_latency_us": int64(0),
  209. "op_2_project_0_records_in_total": int64(8),
  210. "op_2_project_0_records_out_total": int64(8),
  211. "sink_mockSink_0_exceptions_total": int64(0),
  212. "sink_mockSink_0_records_in_total": int64(8),
  213. "sink_mockSink_0_records_out_total": int64(8),
  214. "source_text_0_exceptions_total": int64(0),
  215. "source_text_0_records_in_total": int64(8),
  216. "source_text_0_records_out_total": int64(8),
  217. },
  218. },
  219. }
  220. HandleStream(true, streamList, t)
  221. DoRuleTest(t, tests, 0, &api.RuleOption{
  222. BufferLength: 100,
  223. SendError: true,
  224. }, 0)
  225. }
  226. func TestFuncStateCheckpoint(t *testing.T) {
  227. streamList := []string{"text"}
  228. HandleStream(false, streamList, t)
  229. var tests = []RuleCheckpointTest{
  230. {
  231. RuleTest: RuleTest{
  232. Name: `TestFuncStateCheckpointRule1`,
  233. Sql: `SELECT accumulateWordCount(slogan, " ") as wc FROM text`,
  234. R: [][]map[string]interface{}{
  235. {{
  236. "wc": float64(3),
  237. }},
  238. {{
  239. "wc": float64(6),
  240. }},
  241. {{
  242. "wc": float64(8),
  243. }},
  244. {{
  245. "wc": float64(8),
  246. }},
  247. {{
  248. "wc": float64(16),
  249. }},
  250. {{
  251. "wc": float64(20),
  252. }},
  253. {{
  254. "wc": float64(25),
  255. }},
  256. {{
  257. "wc": float64(27),
  258. }},
  259. {{
  260. "wc": float64(30),
  261. }},
  262. },
  263. M: map[string]interface{}{
  264. "op_1_preprocessor_text_0_exceptions_total": int64(0),
  265. "op_1_preprocessor_text_0_process_latency_us": int64(0),
  266. "op_1_preprocessor_text_0_records_in_total": int64(6),
  267. "op_1_preprocessor_text_0_records_out_total": int64(6),
  268. "op_2_project_0_exceptions_total": int64(0),
  269. "op_2_project_0_process_latency_us": int64(0),
  270. "op_2_project_0_records_in_total": int64(6),
  271. "op_2_project_0_records_out_total": int64(6),
  272. "sink_mockSink_0_exceptions_total": int64(0),
  273. "sink_mockSink_0_records_in_total": int64(6),
  274. "sink_mockSink_0_records_out_total": int64(6),
  275. "source_text_0_exceptions_total": int64(0),
  276. "source_text_0_records_in_total": int64(6),
  277. "source_text_0_records_out_total": int64(6),
  278. },
  279. },
  280. PauseSize: 3,
  281. Cc: 1,
  282. PauseMetric: map[string]interface{}{
  283. "op_1_preprocessor_text_0_exceptions_total": int64(0),
  284. "op_1_preprocessor_text_0_process_latency_us": int64(0),
  285. "op_1_preprocessor_text_0_records_in_total": int64(3),
  286. "op_1_preprocessor_text_0_records_out_total": int64(3),
  287. "op_2_project_0_exceptions_total": int64(0),
  288. "op_2_project_0_process_latency_us": int64(0),
  289. "op_2_project_0_records_in_total": int64(3),
  290. "op_2_project_0_records_out_total": int64(3),
  291. "sink_mockSink_0_exceptions_total": int64(0),
  292. "sink_mockSink_0_records_in_total": int64(3),
  293. "sink_mockSink_0_records_out_total": int64(3),
  294. "source_text_0_exceptions_total": int64(0),
  295. "source_text_0_records_in_total": int64(3),
  296. "source_text_0_records_out_total": int64(3),
  297. },
  298. },
  299. }
  300. HandleStream(true, streamList, t)
  301. DoCheckpointRuleTest(t, tests, 0, &api.RuleOption{
  302. BufferLength: 100,
  303. Qos: api.AtLeastOnce,
  304. CheckpointInterval: 2000,
  305. SendError: true,
  306. })
  307. }