ruleState_test.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package rule
  15. import (
  16. "errors"
  17. "fmt"
  18. "reflect"
  19. "sync"
  20. "testing"
  21. "time"
  22. "github.com/stretchr/testify/require"
  23. "github.com/lf-edge/ekuiper/internal/conf"
  24. "github.com/lf-edge/ekuiper/internal/processor"
  25. "github.com/lf-edge/ekuiper/internal/testx"
  26. "github.com/lf-edge/ekuiper/pkg/api"
  27. )
  28. var defaultOption = &api.RuleOption{
  29. IsEventTime: false,
  30. LateTol: 1000,
  31. Concurrency: 1,
  32. BufferLength: 1024,
  33. SendMetaToSink: false,
  34. SendError: true,
  35. Qos: api.AtMostOnce,
  36. CheckpointInterval: 300000,
  37. Restart: &api.RestartStrategy{
  38. Attempts: 0,
  39. Delay: 1000,
  40. Multiplier: 2,
  41. MaxDelay: 30000,
  42. JitterFactor: 0.1,
  43. },
  44. }
  45. func init() {
  46. testx.InitEnv()
  47. }
  48. func TestCreate(t *testing.T) {
  49. sp := processor.NewStreamProcessor()
  50. sp.ExecStmt(`CREATE STREAM demo () WITH (DATASOURCE="users", FORMAT="JSON")`)
  51. defer sp.ExecStmt(`DROP STREAM demo`)
  52. tests := []struct {
  53. r *api.Rule
  54. e error
  55. }{
  56. {
  57. r: &api.Rule{
  58. Triggered: false,
  59. Id: "test",
  60. Sql: "SELECT ts FROM demo",
  61. Actions: []map[string]interface{}{
  62. {
  63. "log": map[string]interface{}{},
  64. },
  65. },
  66. Options: defaultOption,
  67. },
  68. e: nil,
  69. },
  70. {
  71. r: &api.Rule{
  72. Triggered: false,
  73. Id: "test",
  74. Sql: "SELECT FROM demo",
  75. Actions: []map[string]interface{}{
  76. {
  77. "log": map[string]interface{}{},
  78. },
  79. },
  80. Options: defaultOption,
  81. },
  82. e: errors.New("Parse SQL SELECT FROM demo error: found \"FROM\", expected expression.."),
  83. },
  84. {
  85. r: &api.Rule{
  86. Triggered: false,
  87. Id: "test",
  88. Sql: "SELECT * FROM demo1",
  89. Actions: []map[string]interface{}{
  90. {
  91. "log": map[string]interface{}{},
  92. },
  93. },
  94. Options: defaultOption,
  95. },
  96. e: errors.New("fail to get stream demo1, please check if stream is created"),
  97. },
  98. }
  99. for i, tt := range tests {
  100. _, err := NewRuleState(tt.r)
  101. if !reflect.DeepEqual(err, tt.e) {
  102. t.Errorf("%d.\n\nerror mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.e, err)
  103. }
  104. }
  105. }
  106. func TestUpdate(t *testing.T) {
  107. sp := processor.NewStreamProcessor()
  108. sp.ExecStmt(`CREATE STREAM demo () WITH (DATASOURCE="users", FORMAT="JSON")`)
  109. defer sp.ExecStmt(`DROP STREAM demo`)
  110. rs, err := NewRuleState(&api.Rule{
  111. Triggered: false,
  112. Id: "test",
  113. Sql: "SELECT ts FROM demo",
  114. Actions: []map[string]interface{}{
  115. {
  116. "log": map[string]interface{}{},
  117. },
  118. },
  119. Options: defaultOption,
  120. })
  121. if err != nil {
  122. t.Error(err)
  123. return
  124. }
  125. defer rs.Close()
  126. err = rs.Start()
  127. if err != nil {
  128. t.Error(err)
  129. return
  130. }
  131. tests := []struct {
  132. r *api.Rule
  133. e error
  134. }{
  135. {
  136. r: &api.Rule{
  137. Triggered: false,
  138. Id: "test",
  139. Sql: "SELECT FROM demo",
  140. Actions: []map[string]interface{}{
  141. {
  142. "log": map[string]interface{}{},
  143. },
  144. },
  145. Options: defaultOption,
  146. },
  147. e: errors.New("Parse SQL SELECT FROM demo error: found \"FROM\", expected expression.."),
  148. },
  149. {
  150. r: &api.Rule{
  151. Triggered: false,
  152. Id: "test",
  153. Sql: "SELECT * FROM demo1",
  154. Actions: []map[string]interface{}{
  155. {
  156. "log": map[string]interface{}{},
  157. },
  158. },
  159. Options: defaultOption,
  160. },
  161. e: errors.New("fail to get stream demo1, please check if stream is created"),
  162. },
  163. {
  164. r: &api.Rule{
  165. Triggered: false,
  166. Id: "test",
  167. Sql: "SELECT * FROM demo",
  168. Actions: []map[string]interface{}{
  169. {
  170. "log": map[string]interface{}{},
  171. },
  172. },
  173. Options: defaultOption,
  174. },
  175. e: nil,
  176. },
  177. }
  178. for i, tt := range tests {
  179. err = rs.UpdateTopo(tt.r)
  180. if !reflect.DeepEqual(err, tt.e) {
  181. t.Errorf("%d.\n\nerror mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.e, err)
  182. }
  183. }
  184. }
  185. func TestMultipleAccess(t *testing.T) {
  186. sp := processor.NewStreamProcessor()
  187. sp.ExecStmt(`CREATE STREAM demo () WITH (DATASOURCE="users", FORMAT="JSON")`)
  188. defer sp.ExecStmt(`DROP STREAM demo`)
  189. rs, err := NewRuleState(&api.Rule{
  190. Triggered: false,
  191. Id: "test",
  192. Sql: "SELECT ts FROM demo",
  193. Actions: []map[string]interface{}{
  194. {
  195. "log": map[string]interface{}{},
  196. },
  197. },
  198. Options: defaultOption,
  199. })
  200. if err != nil {
  201. t.Error(err)
  202. return
  203. }
  204. defer rs.Close()
  205. err = rs.Start()
  206. if err != nil {
  207. t.Error(err)
  208. return
  209. }
  210. var wg sync.WaitGroup
  211. wg.Add(10)
  212. for i := 0; i < 10; i++ {
  213. if i%3 == 0 {
  214. go func(i int) {
  215. rs.Stop()
  216. fmt.Printf("%d:%d\n", i, rs.triggered)
  217. wg.Done()
  218. }(i)
  219. } else {
  220. go func(i int) {
  221. rs.Start()
  222. fmt.Printf("%d:%d\n", i, rs.triggered)
  223. wg.Done()
  224. }(i)
  225. }
  226. }
  227. wg.Wait()
  228. rs.Start()
  229. fmt.Printf("%d:%d\n", 10, rs.triggered)
  230. if rs.triggered != 1 {
  231. t.Errorf("triggered mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", 1, rs.triggered)
  232. }
  233. }
  234. // Test rule state message
  235. func TestRuleState_Start(t *testing.T) {
  236. sp := processor.NewStreamProcessor()
  237. sp.ExecStmt(`CREATE STREAM demo () WITH (TYPE="neuron", FORMAT="JSON")`)
  238. defer sp.ExecStmt(`DROP STREAM demo`)
  239. // Test rule not triggered
  240. r := &api.Rule{
  241. Triggered: false,
  242. Id: "test",
  243. Sql: "SELECT ts FROM demo",
  244. Actions: []map[string]interface{}{
  245. {
  246. "log": map[string]interface{}{},
  247. },
  248. },
  249. Options: defaultOption,
  250. }
  251. const ruleStopped = "Stopped: canceled manually."
  252. const ruleStarted = "Running"
  253. t.Run("test rule loaded but not started", func(t *testing.T) {
  254. rs, err := NewRuleState(r)
  255. if err != nil {
  256. t.Error(err)
  257. return
  258. }
  259. state, err := rs.GetState()
  260. if err != nil {
  261. t.Errorf("get rule state error: %v", err)
  262. return
  263. }
  264. if state != ruleStopped {
  265. t.Errorf("rule state mismatch: exp=%v, got=%v", ruleStopped, state)
  266. return
  267. }
  268. })
  269. t.Run("test rule started", func(t *testing.T) {
  270. rs, err := NewRuleState(r)
  271. if err != nil {
  272. t.Error(err)
  273. return
  274. }
  275. err = rs.Start()
  276. if err != nil {
  277. t.Error(err)
  278. return
  279. }
  280. time.Sleep(100 * time.Millisecond)
  281. state, err := rs.GetState()
  282. if err != nil {
  283. t.Errorf("get rule state error: %v", err)
  284. return
  285. }
  286. if state != ruleStarted {
  287. t.Errorf("rule state mismatch: exp=%v, got=%v", ruleStopped, state)
  288. return
  289. }
  290. })
  291. t.Run("test rule loaded and stopped", func(t *testing.T) {
  292. rs, err := NewRuleState(r)
  293. if err != nil {
  294. t.Error(err)
  295. return
  296. }
  297. err = rs.Start()
  298. if err != nil {
  299. t.Error(err)
  300. return
  301. }
  302. err = rs.Close()
  303. if err != nil {
  304. t.Error(err)
  305. return
  306. }
  307. state, err := rs.GetState()
  308. if err != nil {
  309. t.Errorf("get rule state error: %v", err)
  310. return
  311. }
  312. if state != ruleStopped {
  313. t.Errorf("rule state mismatch: exp=%v, got=%v", ruleStopped, state)
  314. return
  315. }
  316. })
  317. }
  318. func TestScheduleRule(t *testing.T) {
  319. conf.IsTesting = true
  320. sp := processor.NewStreamProcessor()
  321. sp.ExecStmt(`CREATE STREAM demo () WITH (TYPE="neuron", FORMAT="JSON")`)
  322. defer sp.ExecStmt(`DROP STREAM demo`)
  323. // Test rule not triggered
  324. r := &api.Rule{
  325. Triggered: false,
  326. Id: "test",
  327. Sql: "SELECT ts FROM demo",
  328. Actions: []map[string]interface{}{
  329. {
  330. "log": map[string]interface{}{},
  331. },
  332. },
  333. Options: defaultOption,
  334. }
  335. r.Options.Cron = "mockCron"
  336. r.Options.Duration = "1s"
  337. const ruleStarted = "Running"
  338. const ruleStopped = "Stopped: waiting for next schedule."
  339. func() {
  340. rs, err := NewRuleState(r)
  341. if err != nil {
  342. t.Error(err)
  343. return
  344. }
  345. if err := rs.startScheduleRule(); err != nil {
  346. t.Error(err)
  347. return
  348. }
  349. time.Sleep(500 * time.Millisecond)
  350. state, err := rs.GetState()
  351. if err != nil {
  352. t.Errorf("get rule state error: %v", err)
  353. return
  354. }
  355. if state != ruleStarted {
  356. t.Errorf("rule state mismatch: exp=%v, got=%v", ruleStarted, state)
  357. return
  358. }
  359. if !rs.cronState.isInSchedule {
  360. t.Error("cron state should be in schedule")
  361. return
  362. }
  363. }()
  364. func() {
  365. rs, err := NewRuleState(r)
  366. if err != nil {
  367. t.Error(err)
  368. return
  369. }
  370. if err := rs.startScheduleRule(); err != nil {
  371. t.Error(err)
  372. return
  373. }
  374. time.Sleep(1500 * time.Millisecond)
  375. state, err := rs.GetState()
  376. if err != nil {
  377. t.Errorf("get rule state error: %v", err)
  378. return
  379. }
  380. if state != ruleStopped {
  381. t.Errorf("rule state mismatch: exp=%v, got=%v", ruleStopped, state)
  382. return
  383. }
  384. if !rs.cronState.isInSchedule {
  385. t.Error("cron state should be in schedule")
  386. return
  387. }
  388. }()
  389. func() {
  390. rs, err := NewRuleState(r)
  391. if err != nil {
  392. t.Error(err)
  393. return
  394. }
  395. if err := rs.startScheduleRule(); err != nil {
  396. t.Error(err)
  397. return
  398. }
  399. if err := rs.startScheduleRule(); err == nil {
  400. t.Error("rule can't be register in cron twice")
  401. return
  402. } else {
  403. if err.Error() != "rule test is already in schedule" {
  404. t.Error("error message wrong")
  405. return
  406. }
  407. }
  408. }()
  409. func() {
  410. rs, err := NewRuleState(r)
  411. if err != nil {
  412. t.Error(err)
  413. return
  414. }
  415. if err := rs.startScheduleRule(); err != nil {
  416. t.Error(err)
  417. return
  418. }
  419. if err := rs.Stop(); err != nil {
  420. t.Error(err)
  421. return
  422. }
  423. state, err := rs.GetState()
  424. if err != nil {
  425. t.Errorf("get rule state error: %v", err)
  426. return
  427. }
  428. if state != "Stopped: canceled manually." {
  429. t.Errorf("rule state mismatch: exp=%v, got=%v", "Stopped: canceled manually.", state)
  430. return
  431. }
  432. if rs.cronState.isInSchedule {
  433. t.Error("cron state shouldn't be in schedule")
  434. return
  435. }
  436. }()
  437. func() {
  438. rs, err := NewRuleState(r)
  439. if err != nil {
  440. t.Error(err)
  441. return
  442. }
  443. if err := rs.Stop(); err != nil {
  444. t.Error(err)
  445. return
  446. }
  447. if err := rs.Close(); err != nil {
  448. t.Error(err)
  449. return
  450. }
  451. }()
  452. func() {
  453. rs, err := NewRuleState(r)
  454. if err != nil {
  455. t.Error(err)
  456. return
  457. }
  458. rs.cronState.isInSchedule = true
  459. status, err := rs.GetState()
  460. require.NoError(t, err)
  461. require.Equal(t, "Stopped: waiting for next schedule.", status)
  462. }()
  463. }