plugin_rule_test.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package plugin
  15. import (
  16. "bufio"
  17. "encoding/json"
  18. "os"
  19. "testing"
  20. "time"
  21. "github.com/lf-edge/ekuiper/internal/binder"
  22. "github.com/lf-edge/ekuiper/internal/binder/function"
  23. "github.com/lf-edge/ekuiper/internal/binder/io"
  24. "github.com/lf-edge/ekuiper/internal/conf"
  25. "github.com/lf-edge/ekuiper/internal/plugin/native"
  26. "github.com/lf-edge/ekuiper/internal/topo/planner"
  27. "github.com/lf-edge/ekuiper/internal/topo/topotest"
  28. "github.com/lf-edge/ekuiper/internal/topo/topotest/mockclock"
  29. "github.com/lf-edge/ekuiper/pkg/api"
  30. )
  31. func init() {
  32. nativeManager, err := native.InitManager()
  33. if err != nil {
  34. panic(err)
  35. }
  36. nativeEntry := binder.FactoryEntry{Name: "native plugin", Factory: nativeManager}
  37. err = function.Initialize([]binder.FactoryEntry{nativeEntry})
  38. if err != nil {
  39. panic(err)
  40. }
  41. err = io.Initialize([]binder.FactoryEntry{nativeEntry})
  42. if err != nil {
  43. panic(err)
  44. }
  45. }
  46. // This cannot be run in Windows. And the plugins must be built to so before running this
  47. // For Windows, run it in wsl with go test -tags test internal/topo/topotest/plugin/plugin_rule_test.go internal/topo/topotest/mock_topo.go
  48. var CACHE_FILE = "cache"
  49. // Test for source, sink, func and agg func extensions
  50. // The .so files must be in the plugins folder
  51. func TestExtensions(t *testing.T) {
  52. log := conf.Log
  53. // Reset
  54. streamList := []string{"ext", "ext2"}
  55. topotest.HandleStream(false, streamList, t)
  56. os.Remove(CACHE_FILE)
  57. os.Create(CACHE_FILE)
  58. tests := []struct {
  59. name string
  60. rj string
  61. minLength int
  62. maxLength int
  63. }{
  64. {
  65. name: `TestExtensionsRule1`,
  66. rj: "{\"sql\": \"SELECT count(echo(count)) as c, echo(count) as e, countPlusOne(count) as p FROM ext where count > 49\",\"actions\": [{\"file\": {\"path\":\"" + CACHE_FILE + "\"}}]}",
  67. minLength: 5,
  68. }, {
  69. name: `TestExtensionsRule2`,
  70. rj: "{\"sql\": \"SELECT count(echo(count)) as c, echo(count) as e, countPlusOne(count) as p FROM ext2\",\"actions\": [{\"file\": {\"path\":\"" + CACHE_FILE + "\"}}]}",
  71. maxLength: 2,
  72. },
  73. }
  74. topotest.HandleStream(true, streamList, t)
  75. t.Logf("The test bucket size is %d.\n\n", len(tests))
  76. for i, tt := range tests {
  77. mockclock.ResetClock(1541152486000)
  78. // Create rule
  79. rs, err := topotest.CreateRule(tt.name, tt.rj)
  80. if err != nil {
  81. t.Errorf("failed to create rule: %s.", err)
  82. continue
  83. }
  84. tp, err := planner.Plan(rs)
  85. if err != nil {
  86. t.Errorf("fail to init rule: %v", err)
  87. continue
  88. }
  89. go func() {
  90. select {
  91. case err := <-tp.Open():
  92. log.Println(err)
  93. tp.Cancel()
  94. case <-time.After(900 * time.Millisecond):
  95. tp.Cancel()
  96. }
  97. }()
  98. time.Sleep(1000 * time.Millisecond)
  99. log.Printf("exit main program after a second")
  100. results := getResults()
  101. log.Infof("get results %v", results)
  102. os.Remove(CACHE_FILE)
  103. var maps [][]map[string]interface{}
  104. for _, v := range results {
  105. var mapRes []map[string]interface{}
  106. err := json.Unmarshal([]byte(v), &mapRes)
  107. if err != nil {
  108. t.Errorf("Failed to parse the input into map")
  109. continue
  110. }
  111. maps = append(maps, mapRes)
  112. }
  113. if tt.minLength > 0 {
  114. if len(maps) < tt.minLength {
  115. t.Errorf("%d. %q\n\nresult length is smaller than minlength:\n\ngot=%#v\n\n", i, tt.rj, maps)
  116. break
  117. }
  118. }
  119. if tt.maxLength > 0 {
  120. if len(maps) > tt.maxLength {
  121. t.Errorf("%d. %q\n\nresult length is bigger than maxLength:\n\ngot=%#v\n\n", i, tt.rj, maps)
  122. break
  123. }
  124. }
  125. for _, r := range maps {
  126. if len(r) != 1 {
  127. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  128. break
  129. }
  130. r := r[0]
  131. c := int((r["c"]).(float64))
  132. if c != 1 {
  133. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  134. break
  135. }
  136. e := int((r["e"]).(float64))
  137. if e != 50 && e != 51 {
  138. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  139. break
  140. }
  141. p := int(r["p"].(float64))
  142. if p != 2 {
  143. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  144. break
  145. }
  146. }
  147. }
  148. }
  149. func getResults() []string {
  150. f, err := os.Open(CACHE_FILE)
  151. if err != nil {
  152. panic(err)
  153. }
  154. result := make([]string, 0)
  155. scanner := bufio.NewScanner(f)
  156. for scanner.Scan() {
  157. result = append(result, scanner.Text())
  158. }
  159. if err := scanner.Err(); err != nil {
  160. panic(err)
  161. }
  162. f.Close()
  163. return result
  164. }
  165. func TestFuncState(t *testing.T) {
  166. // Reset
  167. streamList := []string{"text"}
  168. topotest.HandleStream(false, streamList, t)
  169. // Data setup
  170. tests := []topotest.RuleTest{
  171. {
  172. Name: `TestFuncStateRule1`,
  173. Sql: `SELECT accumulateWordCount(slogan, " ") as wc FROM text`,
  174. R: [][]map[string]interface{}{
  175. {{
  176. "wc": float64(3),
  177. }},
  178. {{
  179. "wc": float64(6),
  180. }},
  181. {{
  182. "wc": float64(8),
  183. }},
  184. {{
  185. "wc": float64(16),
  186. }},
  187. {{
  188. "wc": float64(20),
  189. }},
  190. {{
  191. "wc": float64(25),
  192. }},
  193. {{
  194. "wc": float64(27),
  195. }},
  196. {{
  197. "wc": float64(30),
  198. }},
  199. },
  200. M: map[string]interface{}{
  201. "op_2_project_0_exceptions_total": int64(0),
  202. "op_2_project_0_process_latency_us": int64(0),
  203. "op_2_project_0_records_in_total": int64(8),
  204. "op_2_project_0_records_out_total": int64(8),
  205. "sink_mockSink_0_exceptions_total": int64(0),
  206. "sink_mockSink_0_records_in_total": int64(8),
  207. "sink_mockSink_0_records_out_total": int64(8),
  208. "source_text_0_exceptions_total": int64(0),
  209. "source_text_0_records_in_total": int64(8),
  210. "source_text_0_records_out_total": int64(8),
  211. },
  212. },
  213. }
  214. topotest.HandleStream(true, streamList, t)
  215. topotest.DoRuleTest(t, tests, 0, &api.RuleOption{
  216. BufferLength: 100,
  217. SendError: true,
  218. }, 0)
  219. }
  220. func TestFuncStateCheckpoint(t *testing.T) {
  221. streamList := []string{"text"}
  222. topotest.HandleStream(false, streamList, t)
  223. tests := []topotest.RuleCheckpointTest{
  224. {
  225. RuleTest: topotest.RuleTest{
  226. Name: `TestFuncStateCheckpointRule1`,
  227. Sql: `SELECT accumulateWordCount(slogan, " ") as wc FROM text`,
  228. R: [][]map[string]interface{}{
  229. {{
  230. "wc": float64(3),
  231. }},
  232. {{
  233. "wc": float64(6),
  234. }},
  235. {{
  236. "wc": float64(8),
  237. }},
  238. {{
  239. "wc": float64(8),
  240. }},
  241. {{
  242. "wc": float64(16),
  243. }},
  244. {{
  245. "wc": float64(20),
  246. }},
  247. {{
  248. "wc": float64(25),
  249. }},
  250. {{
  251. "wc": float64(27),
  252. }},
  253. {{
  254. "wc": float64(30),
  255. }},
  256. },
  257. M: map[string]interface{}{
  258. "op_2_project_0_exceptions_total": int64(0),
  259. "op_2_project_0_process_latency_us": int64(0),
  260. "op_2_project_0_records_in_total": int64(6),
  261. "op_2_project_0_records_out_total": int64(6),
  262. "sink_mockSink_0_exceptions_total": int64(0),
  263. "sink_mockSink_0_records_in_total": int64(6),
  264. "sink_mockSink_0_records_out_total": int64(6),
  265. "source_text_0_exceptions_total": int64(0),
  266. "source_text_0_records_in_total": int64(6),
  267. "source_text_0_records_out_total": int64(6),
  268. },
  269. },
  270. PauseSize: 3,
  271. Cc: 1,
  272. PauseMetric: map[string]interface{}{
  273. "op_2_project_0_exceptions_total": int64(0),
  274. "op_2_project_0_process_latency_us": int64(0),
  275. "op_2_project_0_records_in_total": int64(3),
  276. "op_2_project_0_records_out_total": int64(3),
  277. "sink_mockSink_0_exceptions_total": int64(0),
  278. "sink_mockSink_0_records_in_total": int64(3),
  279. "sink_mockSink_0_records_out_total": int64(3),
  280. "source_text_0_exceptions_total": int64(0),
  281. "source_text_0_records_in_total": int64(3),
  282. "source_text_0_records_out_total": int64(3),
  283. },
  284. },
  285. }
  286. topotest.HandleStream(true, streamList, t)
  287. topotest.DoCheckpointRuleTest(t, tests, 0, &api.RuleOption{
  288. BufferLength: 100,
  289. Qos: api.AtLeastOnce,
  290. CheckpointInterval: 2000,
  291. SendError: true,
  292. })
  293. }