plugin_rule_test.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package plugin
  15. import (
  16. "bufio"
  17. "encoding/json"
  18. "fmt"
  19. "os"
  20. "testing"
  21. "time"
  22. "github.com/lf-edge/ekuiper/internal/binder"
  23. "github.com/lf-edge/ekuiper/internal/binder/function"
  24. "github.com/lf-edge/ekuiper/internal/binder/io"
  25. "github.com/lf-edge/ekuiper/internal/conf"
  26. "github.com/lf-edge/ekuiper/internal/plugin/native"
  27. "github.com/lf-edge/ekuiper/internal/topo/planner"
  28. "github.com/lf-edge/ekuiper/internal/topo/topotest"
  29. "github.com/lf-edge/ekuiper/internal/topo/topotest/mockclock"
  30. "github.com/lf-edge/ekuiper/pkg/api"
  31. )
  32. func init() {
  33. nativeManager, err := native.InitManager()
  34. if err != nil {
  35. panic(err)
  36. }
  37. nativeEntry := binder.FactoryEntry{Name: "native plugin", Factory: nativeManager}
  38. err = function.Initialize([]binder.FactoryEntry{nativeEntry})
  39. if err != nil {
  40. panic(err)
  41. }
  42. err = io.Initialize([]binder.FactoryEntry{nativeEntry})
  43. if err != nil {
  44. panic(err)
  45. }
  46. }
  47. // This cannot be run in Windows. And the plugins must be built to so before running this
  48. // For Windows, run it in wsl with go test -tags test internal/topo/topotest/plugin_rule_test.go internal/topo/topotest/mock_topo.go
  49. var CACHE_FILE = "cache"
  50. // Test for source, sink, func and agg func extensions
  51. // The .so files must be in the plugins folder
  52. func TestExtensions(t *testing.T) {
  53. log := conf.Log
  54. // Reset
  55. streamList := []string{"ext", "ext2"}
  56. topotest.HandleStream(false, streamList, t)
  57. os.Remove(CACHE_FILE)
  58. os.Create(CACHE_FILE)
  59. tests := []struct {
  60. name string
  61. rj string
  62. minLength int
  63. maxLength int
  64. }{
  65. {
  66. name: `TestExtensionsRule1`,
  67. rj: "{\"sql\": \"SELECT count(echo(count)) as c, echo(count) as e, countPlusOne(count) as p FROM ext where count > 49\",\"actions\": [{\"file\": {\"path\":\"" + CACHE_FILE + "\"}}]}",
  68. minLength: 5,
  69. }, {
  70. name: `TestExtensionsRule2`,
  71. rj: "{\"sql\": \"SELECT count(echo(count)) as c, echo(count) as e, countPlusOne(count) as p FROM ext2\",\"actions\": [{\"file\": {\"path\":\"" + CACHE_FILE + "\"}}]}",
  72. maxLength: 2,
  73. },
  74. }
  75. topotest.HandleStream(true, streamList, t)
  76. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  77. for i, tt := range tests {
  78. mockclock.ResetClock(1541152486000)
  79. // Create rule
  80. rs, err := topotest.CreateRule(tt.name, tt.rj)
  81. if err != nil {
  82. t.Errorf("failed to create rule: %s.", err)
  83. continue
  84. }
  85. tp, err := planner.Plan(rs)
  86. if err != nil {
  87. t.Errorf("fail to init rule: %v", err)
  88. continue
  89. }
  90. go func() {
  91. select {
  92. case err := <-tp.Open():
  93. log.Println(err)
  94. tp.Cancel()
  95. case <-time.After(900 * time.Millisecond):
  96. tp.Cancel()
  97. }
  98. }()
  99. time.Sleep(1000 * time.Millisecond)
  100. log.Printf("exit main program after a second")
  101. results := getResults()
  102. log.Infof("get results %v", results)
  103. os.Remove(CACHE_FILE)
  104. var maps [][]map[string]interface{}
  105. for _, v := range results {
  106. var mapRes []map[string]interface{}
  107. err := json.Unmarshal([]byte(v), &mapRes)
  108. if err != nil {
  109. t.Errorf("Failed to parse the input into map")
  110. continue
  111. }
  112. maps = append(maps, mapRes)
  113. }
  114. if tt.minLength > 0 {
  115. if len(maps) < tt.minLength {
  116. t.Errorf("%d. %q\n\nresult length is smaller than minlength:\n\ngot=%#v\n\n", i, tt.rj, maps)
  117. break
  118. }
  119. }
  120. if tt.maxLength > 0 {
  121. if len(maps) > tt.maxLength {
  122. t.Errorf("%d. %q\n\nresult length is bigger than maxLength:\n\ngot=%#v\n\n", i, tt.rj, maps)
  123. break
  124. }
  125. }
  126. for _, r := range maps {
  127. if len(r) != 1 {
  128. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  129. break
  130. }
  131. r := r[0]
  132. c := int((r["c"]).(float64))
  133. if c != 1 {
  134. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  135. break
  136. }
  137. e := int((r["e"]).(float64))
  138. if e != 50 && e != 51 {
  139. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  140. break
  141. }
  142. p := int(r["p"].(float64))
  143. if p != 2 {
  144. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  145. break
  146. }
  147. }
  148. }
  149. }
  150. func getResults() []string {
  151. f, err := os.Open(CACHE_FILE)
  152. if err != nil {
  153. panic(err)
  154. }
  155. result := make([]string, 0)
  156. scanner := bufio.NewScanner(f)
  157. for scanner.Scan() {
  158. result = append(result, scanner.Text())
  159. }
  160. if err := scanner.Err(); err != nil {
  161. panic(err)
  162. }
  163. f.Close()
  164. return result
  165. }
  166. func TestFuncState(t *testing.T) {
  167. // Reset
  168. streamList := []string{"text"}
  169. topotest.HandleStream(false, streamList, t)
  170. // Data setup
  171. tests := []topotest.RuleTest{
  172. {
  173. Name: `TestFuncStateRule1`,
  174. Sql: `SELECT accumulateWordCount(slogan, " ") as wc FROM text`,
  175. R: [][]map[string]interface{}{
  176. {{
  177. "wc": float64(3),
  178. }},
  179. {{
  180. "wc": float64(6),
  181. }},
  182. {{
  183. "wc": float64(8),
  184. }},
  185. {{
  186. "wc": float64(16),
  187. }},
  188. {{
  189. "wc": float64(20),
  190. }},
  191. {{
  192. "wc": float64(25),
  193. }},
  194. {{
  195. "wc": float64(27),
  196. }},
  197. {{
  198. "wc": float64(30),
  199. }},
  200. },
  201. M: map[string]interface{}{
  202. "op_2_project_0_exceptions_total": int64(0),
  203. "op_2_project_0_process_latency_us": int64(0),
  204. "op_2_project_0_records_in_total": int64(8),
  205. "op_2_project_0_records_out_total": int64(8),
  206. "sink_mockSink_0_exceptions_total": int64(0),
  207. "sink_mockSink_0_records_in_total": int64(8),
  208. "sink_mockSink_0_records_out_total": int64(8),
  209. "source_text_0_exceptions_total": int64(0),
  210. "source_text_0_records_in_total": int64(8),
  211. "source_text_0_records_out_total": int64(8),
  212. },
  213. },
  214. }
  215. topotest.HandleStream(true, streamList, t)
  216. topotest.DoRuleTest(t, tests, 0, &api.RuleOption{
  217. BufferLength: 100,
  218. SendError: true,
  219. }, 0)
  220. }
  221. func TestFuncStateCheckpoint(t *testing.T) {
  222. streamList := []string{"text"}
  223. topotest.HandleStream(false, streamList, t)
  224. tests := []topotest.RuleCheckpointTest{
  225. {
  226. RuleTest: topotest.RuleTest{
  227. Name: `TestFuncStateCheckpointRule1`,
  228. Sql: `SELECT accumulateWordCount(slogan, " ") as wc FROM text`,
  229. R: [][]map[string]interface{}{
  230. {{
  231. "wc": float64(3),
  232. }},
  233. {{
  234. "wc": float64(6),
  235. }},
  236. {{
  237. "wc": float64(8),
  238. }},
  239. {{
  240. "wc": float64(8),
  241. }},
  242. {{
  243. "wc": float64(16),
  244. }},
  245. {{
  246. "wc": float64(20),
  247. }},
  248. {{
  249. "wc": float64(25),
  250. }},
  251. {{
  252. "wc": float64(27),
  253. }},
  254. {{
  255. "wc": float64(30),
  256. }},
  257. },
  258. M: map[string]interface{}{
  259. "op_2_project_0_exceptions_total": int64(0),
  260. "op_2_project_0_process_latency_us": int64(0),
  261. "op_2_project_0_records_in_total": int64(6),
  262. "op_2_project_0_records_out_total": int64(6),
  263. "sink_mockSink_0_exceptions_total": int64(0),
  264. "sink_mockSink_0_records_in_total": int64(6),
  265. "sink_mockSink_0_records_out_total": int64(6),
  266. "source_text_0_exceptions_total": int64(0),
  267. "source_text_0_records_in_total": int64(6),
  268. "source_text_0_records_out_total": int64(6),
  269. },
  270. },
  271. PauseSize: 3,
  272. Cc: 1,
  273. PauseMetric: map[string]interface{}{
  274. "op_2_project_0_exceptions_total": int64(0),
  275. "op_2_project_0_process_latency_us": int64(0),
  276. "op_2_project_0_records_in_total": int64(3),
  277. "op_2_project_0_records_out_total": int64(3),
  278. "sink_mockSink_0_exceptions_total": int64(0),
  279. "sink_mockSink_0_records_in_total": int64(3),
  280. "sink_mockSink_0_records_out_total": int64(3),
  281. "source_text_0_exceptions_total": int64(0),
  282. "source_text_0_records_in_total": int64(3),
  283. "source_text_0_records_out_total": int64(3),
  284. },
  285. },
  286. }
  287. topotest.HandleStream(true, streamList, t)
  288. topotest.DoCheckpointRuleTest(t, tests, 0, &api.RuleOption{
  289. BufferLength: 100,
  290. Qos: api.AtLeastOnce,
  291. CheckpointInterval: 2000,
  292. SendError: true,
  293. })
  294. }