portable_rule_test.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package test
  15. import (
  16. "bufio"
  17. "context"
  18. "encoding/json"
  19. "fmt"
  20. "github.com/lf-edge/ekuiper/internal/binder"
  21. "github.com/lf-edge/ekuiper/internal/binder/function"
  22. "github.com/lf-edge/ekuiper/internal/binder/io"
  23. "github.com/lf-edge/ekuiper/internal/plugin/portable"
  24. "github.com/lf-edge/ekuiper/internal/plugin/portable/runtime"
  25. "github.com/lf-edge/ekuiper/internal/processor"
  26. "github.com/lf-edge/ekuiper/internal/topo"
  27. "github.com/lf-edge/ekuiper/internal/topo/planner"
  28. "github.com/lf-edge/ekuiper/internal/topo/topotest"
  29. "github.com/lf-edge/ekuiper/pkg/api"
  30. "log"
  31. "os"
  32. "reflect"
  33. "testing"
  34. "time"
  35. )
  36. func init() {
  37. m, err := portable.InitManager()
  38. if err != nil {
  39. panic(err)
  40. }
  41. entry := binder.FactoryEntry{Name: "portable plugin", Factory: m}
  42. err = function.Initialize([]binder.FactoryEntry{entry})
  43. if err != nil {
  44. panic(err)
  45. }
  46. err = io.Initialize([]binder.FactoryEntry{entry})
  47. if err != nil {
  48. panic(err)
  49. }
  50. }
  51. func TestSourceAndFunc(t *testing.T) {
  52. streamList := []string{"ext", "extpy"}
  53. topotest.HandleStream(false, streamList, t)
  54. var tests = []struct {
  55. Name string
  56. Rule string
  57. R [][]map[string]interface{}
  58. M map[string]interface{}
  59. }{
  60. {
  61. Name: `TestPortableRule1`,
  62. Rule: `{"sql":"SELECT echo(count) as ee FROM ext","actions":[{"file":{"path":"cache1"}}]}`,
  63. R: [][]map[string]interface{}{
  64. {{
  65. "ee": float64(50),
  66. }},
  67. {{
  68. "ee": float64(50),
  69. }},
  70. {{
  71. "ee": float64(50),
  72. }},
  73. },
  74. M: map[string]interface{}{
  75. "source_ext_0_exceptions_total": int64(0),
  76. "source_ext_0_records_in_total": int64(3),
  77. "source_ext_0_records_out_total": int64(3),
  78. "sink_file_0_0_records_out_total": int64(3),
  79. },
  80. }, {
  81. Name: `TestPythonRule`,
  82. Rule: `{"sql":"SELECT revert(name) as ee FROM extpy","actions":[{"file":{"path":"cache2"}},{"print":{}}]}`,
  83. R: [][]map[string]interface{}{
  84. {{
  85. "ee": "nosjyp",
  86. }},
  87. {{
  88. "ee": "nosjyp",
  89. }},
  90. {{
  91. "ee": "nosjyp",
  92. }},
  93. },
  94. M: map[string]interface{}{
  95. "source_extpy_0_exceptions_total": int64(0),
  96. "source_extpy_0_records_in_total": int64(3),
  97. "source_extpy_0_records_out_total": int64(3),
  98. "sink_file_0_0_records_out_total": int64(3),
  99. "sink_print_1_0_records_out_total": int64(3),
  100. },
  101. },
  102. }
  103. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  104. defer runtime.GetPluginInsManager().KillAll()
  105. for i, tt := range tests {
  106. _ = os.Remove(fmt.Sprintf("cache%d", i+1))
  107. topotest.HandleStream(true, streamList[i:i+1], t)
  108. rs, err := CreateRule(tt.Name, tt.Rule)
  109. if err != nil {
  110. t.Errorf("failed to create rule: %s.", err)
  111. continue
  112. }
  113. tp, err := planner.Plan(rs)
  114. if err != nil {
  115. t.Errorf("fail to init rule: %v", err)
  116. continue
  117. }
  118. ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
  119. go func(ctx context.Context) {
  120. select {
  121. case err := <-tp.Open():
  122. log.Println(err)
  123. tp.Cancel()
  124. case <-ctx.Done():
  125. log.Printf("ctx done %v\n", ctx.Err())
  126. tp.Cancel()
  127. }
  128. fmt.Println("all exit")
  129. }(ctx)
  130. topotest.HandleStream(false, streamList[i:i+1], t)
  131. for {
  132. if ctx.Err() != nil {
  133. t.Errorf("Exiting with error %v", ctx.Err())
  134. break
  135. }
  136. time.Sleep(10 * time.Millisecond)
  137. if compareMetrics(tp, tt.M) {
  138. cancel()
  139. // need to wait for file results
  140. time.Sleep(500 * time.Millisecond)
  141. results := getResults(fmt.Sprintf("cache%d", i+1))
  142. fmt.Printf("get results %v\n", results)
  143. time.Sleep(10 * time.Millisecond)
  144. var mm [][]map[string]interface{}
  145. for i, v := range results {
  146. if i >= 3 {
  147. break
  148. }
  149. var mapRes []map[string]interface{}
  150. err := json.Unmarshal([]byte(v), &mapRes)
  151. if err != nil {
  152. t.Errorf("Failed to parse the input into map")
  153. continue
  154. }
  155. mm = append(mm, mapRes)
  156. }
  157. if !reflect.DeepEqual(tt.R, mm) {
  158. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.Rule, tt.R, mm)
  159. }
  160. break
  161. }
  162. }
  163. }
  164. // wait for rule clean up
  165. time.Sleep(1 * time.Second)
  166. }
  167. func compareMetrics(tp *topo.Topo, m map[string]interface{}) bool {
  168. keys, values := tp.GetMetrics()
  169. for k, v := range m {
  170. var (
  171. index int
  172. key string
  173. matched bool
  174. )
  175. for index, key = range keys {
  176. if k == key {
  177. va, ok := values[index].(int64)
  178. if !ok {
  179. continue
  180. }
  181. ve := v.(int64)
  182. if va < ve {
  183. return false
  184. }
  185. matched = true
  186. }
  187. }
  188. if !matched {
  189. return false
  190. }
  191. }
  192. return true
  193. }
  194. func getResults(fileName string) []string {
  195. f, err := os.Open(fileName)
  196. if err != nil {
  197. panic(err)
  198. }
  199. result := make([]string, 0)
  200. scanner := bufio.NewScanner(f)
  201. for scanner.Scan() {
  202. result = append(result, scanner.Text())
  203. }
  204. if err := scanner.Err(); err != nil {
  205. panic(err)
  206. }
  207. f.Close()
  208. return result
  209. }
  210. func CreateRule(name, sql string) (*api.Rule, error) {
  211. p := processor.NewRuleProcessor()
  212. p.ExecDrop(name)
  213. return p.ExecCreate(name, sql)
  214. }