ruleState_test.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package rule
  15. import (
  16. "errors"
  17. "fmt"
  18. "reflect"
  19. "sync"
  20. "testing"
  21. "time"
  22. "github.com/lf-edge/ekuiper/internal/processor"
  23. "github.com/lf-edge/ekuiper/internal/testx"
  24. "github.com/lf-edge/ekuiper/pkg/api"
  25. )
  26. var defaultOption = &api.RuleOption{
  27. IsEventTime: false,
  28. LateTol: 1000,
  29. Concurrency: 1,
  30. BufferLength: 1024,
  31. SendMetaToSink: false,
  32. SendError: true,
  33. Qos: api.AtMostOnce,
  34. CheckpointInterval: 300000,
  35. Restart: &api.RestartStrategy{
  36. Attempts: 0,
  37. Delay: 1000,
  38. Multiplier: 2,
  39. MaxDelay: 30000,
  40. JitterFactor: 0.1,
  41. },
  42. }
  43. func init() {
  44. testx.InitEnv()
  45. }
  46. func TestCreate(t *testing.T) {
  47. sp := processor.NewStreamProcessor()
  48. sp.ExecStmt(`CREATE STREAM demo () WITH (DATASOURCE="users", FORMAT="JSON")`)
  49. defer sp.ExecStmt(`DROP STREAM demo`)
  50. tests := []struct {
  51. r *api.Rule
  52. e error
  53. }{
  54. {
  55. r: &api.Rule{
  56. Triggered: false,
  57. Id: "test",
  58. Sql: "SELECT ts FROM demo",
  59. Actions: []map[string]interface{}{
  60. {
  61. "log": map[string]interface{}{},
  62. },
  63. },
  64. Options: defaultOption,
  65. },
  66. e: nil,
  67. },
  68. {
  69. r: &api.Rule{
  70. Triggered: false,
  71. Id: "test",
  72. Sql: "SELECT FROM demo",
  73. Actions: []map[string]interface{}{
  74. {
  75. "log": map[string]interface{}{},
  76. },
  77. },
  78. Options: defaultOption,
  79. },
  80. e: errors.New("Parse SQL SELECT FROM demo error: found \"FROM\", expected expression.."),
  81. },
  82. {
  83. r: &api.Rule{
  84. Triggered: false,
  85. Id: "test",
  86. Sql: "SELECT * FROM demo1",
  87. Actions: []map[string]interface{}{
  88. {
  89. "log": map[string]interface{}{},
  90. },
  91. },
  92. Options: defaultOption,
  93. },
  94. e: errors.New("fail to get stream demo1, please check if stream is created"),
  95. },
  96. }
  97. for i, tt := range tests {
  98. _, err := NewRuleState(tt.r)
  99. if !reflect.DeepEqual(err, tt.e) {
  100. t.Errorf("%d.\n\nerror mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.e, err)
  101. }
  102. }
  103. }
  104. func TestUpdate(t *testing.T) {
  105. sp := processor.NewStreamProcessor()
  106. sp.ExecStmt(`CREATE STREAM demo () WITH (DATASOURCE="users", FORMAT="JSON")`)
  107. defer sp.ExecStmt(`DROP STREAM demo`)
  108. rs, err := NewRuleState(&api.Rule{
  109. Triggered: false,
  110. Id: "test",
  111. Sql: "SELECT ts FROM demo",
  112. Actions: []map[string]interface{}{
  113. {
  114. "log": map[string]interface{}{},
  115. },
  116. },
  117. Options: defaultOption,
  118. })
  119. if err != nil {
  120. t.Error(err)
  121. return
  122. }
  123. defer rs.Close()
  124. err = rs.Start()
  125. if err != nil {
  126. t.Error(err)
  127. return
  128. }
  129. tests := []struct {
  130. r *api.Rule
  131. e error
  132. }{
  133. {
  134. r: &api.Rule{
  135. Triggered: false,
  136. Id: "test",
  137. Sql: "SELECT FROM demo",
  138. Actions: []map[string]interface{}{
  139. {
  140. "log": map[string]interface{}{},
  141. },
  142. },
  143. Options: defaultOption,
  144. },
  145. e: errors.New("Parse SQL SELECT FROM demo error: found \"FROM\", expected expression.."),
  146. },
  147. {
  148. r: &api.Rule{
  149. Triggered: false,
  150. Id: "test",
  151. Sql: "SELECT * FROM demo1",
  152. Actions: []map[string]interface{}{
  153. {
  154. "log": map[string]interface{}{},
  155. },
  156. },
  157. Options: defaultOption,
  158. },
  159. e: errors.New("fail to get stream demo1, please check if stream is created"),
  160. },
  161. {
  162. r: &api.Rule{
  163. Triggered: false,
  164. Id: "test",
  165. Sql: "SELECT * FROM demo",
  166. Actions: []map[string]interface{}{
  167. {
  168. "log": map[string]interface{}{},
  169. },
  170. },
  171. Options: defaultOption,
  172. },
  173. e: nil,
  174. },
  175. }
  176. for i, tt := range tests {
  177. err = rs.UpdateTopo(tt.r)
  178. if !reflect.DeepEqual(err, tt.e) {
  179. t.Errorf("%d.\n\nerror mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.e, err)
  180. }
  181. }
  182. }
  183. func TestMultipleAccess(t *testing.T) {
  184. sp := processor.NewStreamProcessor()
  185. sp.ExecStmt(`CREATE STREAM demo () WITH (DATASOURCE="users", FORMAT="JSON")`)
  186. defer sp.ExecStmt(`DROP STREAM demo`)
  187. rs, err := NewRuleState(&api.Rule{
  188. Triggered: false,
  189. Id: "test",
  190. Sql: "SELECT ts FROM demo",
  191. Actions: []map[string]interface{}{
  192. {
  193. "log": map[string]interface{}{},
  194. },
  195. },
  196. Options: defaultOption,
  197. })
  198. if err != nil {
  199. t.Error(err)
  200. return
  201. }
  202. defer rs.Close()
  203. err = rs.Start()
  204. if err != nil {
  205. t.Error(err)
  206. return
  207. }
  208. var wg sync.WaitGroup
  209. wg.Add(10)
  210. for i := 0; i < 10; i++ {
  211. if i%3 == 0 {
  212. go func(i int) {
  213. rs.Stop()
  214. fmt.Printf("%d:%d\n", i, rs.triggered)
  215. wg.Done()
  216. }(i)
  217. } else {
  218. go func(i int) {
  219. rs.Start()
  220. fmt.Printf("%d:%d\n", i, rs.triggered)
  221. wg.Done()
  222. }(i)
  223. }
  224. }
  225. wg.Wait()
  226. rs.Start()
  227. fmt.Printf("%d:%d\n", 10, rs.triggered)
  228. if rs.triggered != 1 {
  229. t.Errorf("triggered mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", 1, rs.triggered)
  230. }
  231. }
  232. // Test rule state message
  233. func TestRuleState_Start(t *testing.T) {
  234. sp := processor.NewStreamProcessor()
  235. sp.ExecStmt(`CREATE STREAM demo () WITH (TYPE="neuron", FORMAT="JSON")`)
  236. defer sp.ExecStmt(`DROP STREAM demo`)
  237. // Test rule not triggered
  238. r := &api.Rule{
  239. Triggered: false,
  240. Id: "test",
  241. Sql: "SELECT ts FROM demo",
  242. Actions: []map[string]interface{}{
  243. {
  244. "log": map[string]interface{}{},
  245. },
  246. },
  247. Options: defaultOption,
  248. }
  249. const ruleStopped = "Stopped: canceled manually."
  250. const ruleStarted = "Running"
  251. t.Run("test rule loaded but not started", func(t *testing.T) {
  252. rs, err := NewRuleState(r)
  253. if err != nil {
  254. t.Error(err)
  255. return
  256. }
  257. state, err := rs.GetState()
  258. if err != nil {
  259. t.Errorf("get rule state error: %v", err)
  260. return
  261. }
  262. if state != ruleStopped {
  263. t.Errorf("rule state mismatch: exp=%v, got=%v", ruleStopped, state)
  264. return
  265. }
  266. })
  267. t.Run("test rule started", func(t *testing.T) {
  268. rs, err := NewRuleState(r)
  269. if err != nil {
  270. t.Error(err)
  271. return
  272. }
  273. err = rs.Start()
  274. if err != nil {
  275. t.Error(err)
  276. return
  277. }
  278. time.Sleep(100 * time.Millisecond)
  279. state, err := rs.GetState()
  280. if err != nil {
  281. t.Errorf("get rule state error: %v", err)
  282. return
  283. }
  284. if state != ruleStarted {
  285. t.Errorf("rule state mismatch: exp=%v, got=%v", ruleStopped, state)
  286. return
  287. }
  288. })
  289. t.Run("test rule loaded and stopped", func(t *testing.T) {
  290. rs, err := NewRuleState(r)
  291. if err != nil {
  292. t.Error(err)
  293. return
  294. }
  295. err = rs.Start()
  296. if err != nil {
  297. t.Error(err)
  298. return
  299. }
  300. err = rs.Close()
  301. if err != nil {
  302. t.Error(err)
  303. return
  304. }
  305. state, err := rs.GetState()
  306. if err != nil {
  307. t.Errorf("get rule state error: %v", err)
  308. return
  309. }
  310. if state != ruleStopped {
  311. t.Errorf("rule state mismatch: exp=%v, got=%v", ruleStopped, state)
  312. return
  313. }
  314. })
  315. }