ruleState_test.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package rule
  15. import (
  16. "errors"
  17. "fmt"
  18. "reflect"
  19. "sync"
  20. "testing"
  21. "time"
  22. "github.com/stretchr/testify/require"
  23. "github.com/lf-edge/ekuiper/internal/conf"
  24. "github.com/lf-edge/ekuiper/internal/processor"
  25. "github.com/lf-edge/ekuiper/internal/testx"
  26. "github.com/lf-edge/ekuiper/pkg/api"
  27. )
  28. var defaultOption = &api.RuleOption{
  29. IsEventTime: false,
  30. LateTol: 1000,
  31. Concurrency: 1,
  32. BufferLength: 1024,
  33. SendMetaToSink: false,
  34. SendError: true,
  35. Qos: api.AtMostOnce,
  36. CheckpointInterval: 300000,
  37. Restart: &api.RestartStrategy{
  38. Attempts: 0,
  39. Delay: 1000,
  40. Multiplier: 2,
  41. MaxDelay: 30000,
  42. JitterFactor: 0.1,
  43. },
  44. }
  45. func init() {
  46. testx.InitEnv()
  47. }
  48. func TestCreate(t *testing.T) {
  49. sp := processor.NewStreamProcessor()
  50. sp.ExecStmt(`CREATE STREAM demo () WITH (DATASOURCE="users", FORMAT="JSON")`)
  51. defer sp.ExecStmt(`DROP STREAM demo`)
  52. tests := []struct {
  53. r *api.Rule
  54. e error
  55. }{
  56. {
  57. r: &api.Rule{
  58. Triggered: false,
  59. Id: "test",
  60. Sql: "SELECT ts FROM demo",
  61. Actions: []map[string]interface{}{
  62. {
  63. "log": map[string]interface{}{},
  64. },
  65. },
  66. Options: defaultOption,
  67. },
  68. e: nil,
  69. },
  70. {
  71. r: &api.Rule{
  72. Triggered: false,
  73. Id: "test",
  74. Sql: "SELECT FROM demo",
  75. Actions: []map[string]interface{}{
  76. {
  77. "log": map[string]interface{}{},
  78. },
  79. },
  80. Options: defaultOption,
  81. },
  82. e: errors.New("Parse SQL SELECT FROM demo error: found \"FROM\", expected expression.."),
  83. },
  84. {
  85. r: &api.Rule{
  86. Triggered: false,
  87. Id: "test",
  88. Sql: "SELECT * FROM demo1",
  89. Actions: []map[string]interface{}{
  90. {
  91. "log": map[string]interface{}{},
  92. },
  93. },
  94. Options: defaultOption,
  95. },
  96. e: errors.New("fail to get stream demo1, please check if stream is created"),
  97. },
  98. }
  99. for i, tt := range tests {
  100. _, err := NewRuleState(tt.r)
  101. if !reflect.DeepEqual(err, tt.e) {
  102. t.Errorf("%d.\n\nerror mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.e, err)
  103. }
  104. }
  105. }
  106. func TestUpdate(t *testing.T) {
  107. sp := processor.NewStreamProcessor()
  108. sp.ExecStmt(`CREATE STREAM demo () WITH (DATASOURCE="users", FORMAT="JSON")`)
  109. defer sp.ExecStmt(`DROP STREAM demo`)
  110. rs, err := NewRuleState(&api.Rule{
  111. Triggered: false,
  112. Id: "test",
  113. Sql: "SELECT ts FROM demo",
  114. Actions: []map[string]interface{}{
  115. {
  116. "log": map[string]interface{}{},
  117. },
  118. },
  119. Options: defaultOption,
  120. })
  121. if err != nil {
  122. t.Error(err)
  123. return
  124. }
  125. defer rs.Close()
  126. err = rs.Start()
  127. if err != nil {
  128. t.Error(err)
  129. return
  130. }
  131. tests := []struct {
  132. r *api.Rule
  133. e error
  134. }{
  135. {
  136. r: &api.Rule{
  137. Triggered: false,
  138. Id: "test",
  139. Sql: "SELECT FROM demo",
  140. Actions: []map[string]interface{}{
  141. {
  142. "log": map[string]interface{}{},
  143. },
  144. },
  145. Options: defaultOption,
  146. },
  147. e: errors.New("Parse SQL SELECT FROM demo error: found \"FROM\", expected expression.."),
  148. },
  149. {
  150. r: &api.Rule{
  151. Triggered: false,
  152. Id: "test",
  153. Sql: "SELECT * FROM demo1",
  154. Actions: []map[string]interface{}{
  155. {
  156. "log": map[string]interface{}{},
  157. },
  158. },
  159. Options: defaultOption,
  160. },
  161. e: errors.New("fail to get stream demo1, please check if stream is created"),
  162. },
  163. {
  164. r: &api.Rule{
  165. Triggered: false,
  166. Id: "test",
  167. Sql: "SELECT * FROM demo",
  168. Actions: []map[string]interface{}{
  169. {
  170. "log": map[string]interface{}{},
  171. },
  172. },
  173. Options: defaultOption,
  174. },
  175. e: nil,
  176. },
  177. }
  178. for i, tt := range tests {
  179. err = rs.UpdateTopo(tt.r)
  180. if !reflect.DeepEqual(err, tt.e) {
  181. t.Errorf("%d.\n\nerror mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.e, err)
  182. }
  183. }
  184. }
  185. func TestUpdateScheduleRule(t *testing.T) {
  186. sp := processor.NewStreamProcessor()
  187. sp.ExecStmt(`CREATE STREAM demo () WITH (DATASOURCE="users", FORMAT="JSON")`)
  188. defer sp.ExecStmt(`DROP STREAM demo`)
  189. scheduleOption1 := *defaultOption
  190. scheduleOption1.Cron = "mockCron1"
  191. scheduleOption1.Duration = "1s"
  192. rule1 := &api.Rule{
  193. Triggered: false,
  194. Id: "test",
  195. Sql: "SELECT ts FROM demo",
  196. Actions: []map[string]interface{}{
  197. {
  198. "log": map[string]interface{}{},
  199. },
  200. },
  201. Options: &scheduleOption1,
  202. }
  203. rs, err := NewRuleState(rule1)
  204. require.NoError(t, err)
  205. defer rs.Close()
  206. err = rs.startScheduleRule()
  207. require.NoError(t, err)
  208. require.True(t, rs.cronState.isInSchedule)
  209. require.Equal(t, "mockCron1", rs.cronState.cron)
  210. require.Equal(t, "1s", rs.cronState.duration)
  211. scheduleOption2 := *defaultOption
  212. scheduleOption2.Cron = "mockCron2"
  213. scheduleOption2.Duration = "2s"
  214. rule2 := &api.Rule{
  215. Triggered: false,
  216. Id: "test",
  217. Sql: "SELECT ts FROM demo",
  218. Actions: []map[string]interface{}{
  219. {
  220. "log": map[string]interface{}{},
  221. },
  222. },
  223. Options: &scheduleOption2,
  224. }
  225. err = rs.UpdateTopo(rule2)
  226. require.NoError(t, err)
  227. require.Equal(t, "mockCron2", rs.cronState.cron)
  228. require.Equal(t, "2s", rs.cronState.duration)
  229. }
  230. func TestMultipleAccess(t *testing.T) {
  231. sp := processor.NewStreamProcessor()
  232. sp.ExecStmt(`CREATE STREAM demo () WITH (DATASOURCE="users", FORMAT="JSON")`)
  233. defer sp.ExecStmt(`DROP STREAM demo`)
  234. rs, err := NewRuleState(&api.Rule{
  235. Triggered: false,
  236. Id: "test",
  237. Sql: "SELECT ts FROM demo",
  238. Actions: []map[string]interface{}{
  239. {
  240. "log": map[string]interface{}{},
  241. },
  242. },
  243. Options: defaultOption,
  244. })
  245. if err != nil {
  246. t.Error(err)
  247. return
  248. }
  249. defer rs.Close()
  250. err = rs.Start()
  251. if err != nil {
  252. t.Error(err)
  253. return
  254. }
  255. var wg sync.WaitGroup
  256. wg.Add(10)
  257. for i := 0; i < 10; i++ {
  258. if i%3 == 0 {
  259. go func(i int) {
  260. rs.Stop()
  261. fmt.Printf("%d:%d\n", i, rs.triggered)
  262. wg.Done()
  263. }(i)
  264. } else {
  265. go func(i int) {
  266. rs.Start()
  267. fmt.Printf("%d:%d\n", i, rs.triggered)
  268. wg.Done()
  269. }(i)
  270. }
  271. }
  272. wg.Wait()
  273. rs.Start()
  274. fmt.Printf("%d:%d\n", 10, rs.triggered)
  275. if rs.triggered != 1 {
  276. t.Errorf("triggered mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", 1, rs.triggered)
  277. }
  278. }
  279. // Test rule state message
  280. func TestRuleState_Start(t *testing.T) {
  281. sp := processor.NewStreamProcessor()
  282. sp.ExecStmt(`CREATE STREAM demo () WITH (TYPE="neuron", FORMAT="JSON")`)
  283. defer sp.ExecStmt(`DROP STREAM demo`)
  284. // Test rule not triggered
  285. r := &api.Rule{
  286. Triggered: false,
  287. Id: "test",
  288. Sql: "SELECT ts FROM demo",
  289. Actions: []map[string]interface{}{
  290. {
  291. "log": map[string]interface{}{},
  292. },
  293. },
  294. Options: defaultOption,
  295. }
  296. const ruleStopped = "Stopped: canceled manually."
  297. const ruleStarted = "Running"
  298. t.Run("test rule loaded but not started", func(t *testing.T) {
  299. rs, err := NewRuleState(r)
  300. if err != nil {
  301. t.Error(err)
  302. return
  303. }
  304. state, err := rs.GetState()
  305. if err != nil {
  306. t.Errorf("get rule state error: %v", err)
  307. return
  308. }
  309. if state != ruleStopped {
  310. t.Errorf("rule state mismatch: exp=%v, got=%v", ruleStopped, state)
  311. return
  312. }
  313. })
  314. t.Run("test rule started", func(t *testing.T) {
  315. rs, err := NewRuleState(r)
  316. if err != nil {
  317. t.Error(err)
  318. return
  319. }
  320. err = rs.Start()
  321. if err != nil {
  322. t.Error(err)
  323. return
  324. }
  325. time.Sleep(100 * time.Millisecond)
  326. state, err := rs.GetState()
  327. if err != nil {
  328. t.Errorf("get rule state error: %v", err)
  329. return
  330. }
  331. if state != ruleStarted {
  332. t.Errorf("rule state mismatch: exp=%v, got=%v", ruleStopped, state)
  333. return
  334. }
  335. })
  336. t.Run("test rule loaded and stopped", func(t *testing.T) {
  337. rs, err := NewRuleState(r)
  338. if err != nil {
  339. t.Error(err)
  340. return
  341. }
  342. err = rs.Start()
  343. if err != nil {
  344. t.Error(err)
  345. return
  346. }
  347. err = rs.Close()
  348. if err != nil {
  349. t.Error(err)
  350. return
  351. }
  352. state, err := rs.GetState()
  353. if err != nil {
  354. t.Errorf("get rule state error: %v", err)
  355. return
  356. }
  357. if state != ruleStopped {
  358. t.Errorf("rule state mismatch: exp=%v, got=%v", ruleStopped, state)
  359. return
  360. }
  361. })
  362. }
  363. func TestScheduleRule(t *testing.T) {
  364. conf.IsTesting = true
  365. sp := processor.NewStreamProcessor()
  366. sp.ExecStmt(`CREATE STREAM demo () WITH (TYPE="neuron", FORMAT="JSON")`)
  367. defer sp.ExecStmt(`DROP STREAM demo`)
  368. // Test rule not triggered
  369. r := &api.Rule{
  370. Triggered: false,
  371. Id: "test",
  372. Sql: "SELECT ts FROM demo",
  373. Actions: []map[string]interface{}{
  374. {
  375. "log": map[string]interface{}{},
  376. },
  377. },
  378. Options: defaultOption,
  379. }
  380. r.Options.Cron = "mockCron"
  381. r.Options.Duration = "1s"
  382. const ruleStarted = "Running"
  383. const ruleStopped = "Stopped: waiting for next schedule."
  384. func() {
  385. rs, err := NewRuleState(r)
  386. if err != nil {
  387. t.Error(err)
  388. return
  389. }
  390. if err := rs.startScheduleRule(); err != nil {
  391. t.Error(err)
  392. return
  393. }
  394. time.Sleep(500 * time.Millisecond)
  395. state, err := rs.GetState()
  396. if err != nil {
  397. t.Errorf("get rule state error: %v", err)
  398. return
  399. }
  400. if state != ruleStarted {
  401. t.Errorf("rule state mismatch: exp=%v, got=%v", ruleStarted, state)
  402. return
  403. }
  404. if !rs.cronState.isInSchedule {
  405. t.Error("cron state should be in schedule")
  406. return
  407. }
  408. }()
  409. func() {
  410. rs, err := NewRuleState(r)
  411. if err != nil {
  412. t.Error(err)
  413. return
  414. }
  415. if err := rs.startScheduleRule(); err != nil {
  416. t.Error(err)
  417. return
  418. }
  419. time.Sleep(1500 * time.Millisecond)
  420. state, err := rs.GetState()
  421. if err != nil {
  422. t.Errorf("get rule state error: %v", err)
  423. return
  424. }
  425. if state != ruleStopped {
  426. t.Errorf("rule state mismatch: exp=%v, got=%v", ruleStopped, state)
  427. return
  428. }
  429. if !rs.cronState.isInSchedule {
  430. t.Error("cron state should be in schedule")
  431. return
  432. }
  433. }()
  434. func() {
  435. rs, err := NewRuleState(r)
  436. if err != nil {
  437. t.Error(err)
  438. return
  439. }
  440. if err := rs.startScheduleRule(); err != nil {
  441. t.Error(err)
  442. return
  443. }
  444. if err := rs.startScheduleRule(); err == nil {
  445. t.Error("rule can't be register in cron twice")
  446. return
  447. } else {
  448. if err.Error() != "rule test is already in schedule" {
  449. t.Error("error message wrong")
  450. return
  451. }
  452. }
  453. }()
  454. func() {
  455. rs, err := NewRuleState(r)
  456. if err != nil {
  457. t.Error(err)
  458. return
  459. }
  460. if err := rs.startScheduleRule(); err != nil {
  461. t.Error(err)
  462. return
  463. }
  464. if err := rs.Stop(); err != nil {
  465. t.Error(err)
  466. return
  467. }
  468. state, err := rs.GetState()
  469. if err != nil {
  470. t.Errorf("get rule state error: %v", err)
  471. return
  472. }
  473. if state != "Stopped: canceled manually." {
  474. t.Errorf("rule state mismatch: exp=%v, got=%v", "Stopped: canceled manually.", state)
  475. return
  476. }
  477. if rs.cronState.isInSchedule {
  478. t.Error("cron state shouldn't be in schedule")
  479. return
  480. }
  481. }()
  482. func() {
  483. rs, err := NewRuleState(r)
  484. if err != nil {
  485. t.Error(err)
  486. return
  487. }
  488. if err := rs.Stop(); err != nil {
  489. t.Error(err)
  490. return
  491. }
  492. if err := rs.Close(); err != nil {
  493. t.Error(err)
  494. return
  495. }
  496. }()
  497. func() {
  498. rs, err := NewRuleState(r)
  499. if err != nil {
  500. t.Error(err)
  501. return
  502. }
  503. rs.cronState.isInSchedule = true
  504. status, err := rs.GetState()
  505. require.NoError(t, err)
  506. require.Equal(t, "Stopped: waiting for next schedule.", status)
  507. }()
  508. }