ruleState_test.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package rule
  15. import (
  16. "errors"
  17. "fmt"
  18. "reflect"
  19. "sync"
  20. "testing"
  21. "time"
  22. "github.com/benbjohnson/clock"
  23. "github.com/stretchr/testify/require"
  24. "github.com/lf-edge/ekuiper/internal/conf"
  25. "github.com/lf-edge/ekuiper/internal/processor"
  26. "github.com/lf-edge/ekuiper/internal/testx"
  27. "github.com/lf-edge/ekuiper/pkg/api"
  28. )
  29. var defaultOption = &api.RuleOption{
  30. IsEventTime: false,
  31. LateTol: 1000,
  32. Concurrency: 1,
  33. BufferLength: 1024,
  34. SendMetaToSink: false,
  35. SendError: true,
  36. Qos: api.AtMostOnce,
  37. CheckpointInterval: 300000,
  38. Restart: &api.RestartStrategy{
  39. Attempts: 0,
  40. Delay: 1000,
  41. Multiplier: 2,
  42. MaxDelay: 30000,
  43. JitterFactor: 0.1,
  44. },
  45. }
  46. func init() {
  47. testx.InitEnv()
  48. }
  49. func TestCreate(t *testing.T) {
  50. sp := processor.NewStreamProcessor()
  51. sp.ExecStmt(`CREATE STREAM demo () WITH (DATASOURCE="users", FORMAT="JSON")`)
  52. defer sp.ExecStmt(`DROP STREAM demo`)
  53. tests := []struct {
  54. r *api.Rule
  55. e error
  56. }{
  57. {
  58. r: &api.Rule{
  59. Triggered: false,
  60. Id: "test",
  61. Sql: "SELECT ts FROM demo",
  62. Actions: []map[string]interface{}{
  63. {
  64. "log": map[string]interface{}{},
  65. },
  66. },
  67. Options: defaultOption,
  68. },
  69. e: nil,
  70. },
  71. {
  72. r: &api.Rule{
  73. Triggered: false,
  74. Id: "test",
  75. Sql: "SELECT FROM demo",
  76. Actions: []map[string]interface{}{
  77. {
  78. "log": map[string]interface{}{},
  79. },
  80. },
  81. Options: defaultOption,
  82. },
  83. e: errors.New("Parse SQL SELECT FROM demo error: found \"FROM\", expected expression.."),
  84. },
  85. {
  86. r: &api.Rule{
  87. Triggered: false,
  88. Id: "test",
  89. Sql: "SELECT * FROM demo1",
  90. Actions: []map[string]interface{}{
  91. {
  92. "log": map[string]interface{}{},
  93. },
  94. },
  95. Options: defaultOption,
  96. },
  97. e: errors.New("fail to get stream demo1, please check if stream is created"),
  98. },
  99. }
  100. for i, tt := range tests {
  101. _, err := NewRuleState(tt.r)
  102. if !reflect.DeepEqual(err, tt.e) {
  103. t.Errorf("%d.\n\nerror mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.e, err)
  104. }
  105. }
  106. }
  107. func TestUpdate(t *testing.T) {
  108. ignoreSignal = true
  109. defer func() {
  110. ignoreSignal = false
  111. }()
  112. sp := processor.NewStreamProcessor()
  113. sp.ExecStmt(`CREATE STREAM demo () WITH (DATASOURCE="users", FORMAT="JSON")`)
  114. defer sp.ExecStmt(`DROP STREAM demo`)
  115. tests := []struct {
  116. r *api.Rule
  117. e error
  118. triggered int
  119. }{
  120. {
  121. r: &api.Rule{
  122. Triggered: false,
  123. Id: "test",
  124. Sql: "SELECT FROM demo",
  125. Actions: []map[string]interface{}{
  126. {
  127. "log": map[string]interface{}{},
  128. },
  129. },
  130. Options: defaultOption,
  131. },
  132. e: errors.New("Parse SQL SELECT FROM demo error: found \"FROM\", expected expression.."),
  133. triggered: 1,
  134. },
  135. {
  136. r: &api.Rule{
  137. Triggered: false,
  138. Id: "test",
  139. Sql: "SELECT * FROM demo1",
  140. Actions: []map[string]interface{}{
  141. {
  142. "log": map[string]interface{}{},
  143. },
  144. },
  145. Options: defaultOption,
  146. },
  147. e: errors.New("fail to get stream demo1, please check if stream is created"),
  148. triggered: 1,
  149. },
  150. {
  151. r: &api.Rule{
  152. Triggered: true,
  153. Id: "test",
  154. Sql: "SELECT * FROM demo",
  155. Actions: []map[string]interface{}{
  156. {
  157. "log": map[string]interface{}{},
  158. },
  159. },
  160. Options: defaultOption,
  161. },
  162. e: nil,
  163. triggered: 1,
  164. },
  165. {
  166. r: &api.Rule{
  167. Triggered: false,
  168. Id: "test",
  169. Sql: "SELECT * FROM demo",
  170. Actions: []map[string]interface{}{
  171. {
  172. "log": map[string]interface{}{},
  173. },
  174. },
  175. Options: defaultOption,
  176. },
  177. e: nil,
  178. triggered: 0,
  179. },
  180. }
  181. for i, tt := range tests {
  182. rs, err := NewRuleState(&api.Rule{
  183. Triggered: true,
  184. Id: "test",
  185. Sql: "SELECT ts FROM demo",
  186. Actions: []map[string]interface{}{
  187. {
  188. "log": map[string]interface{}{},
  189. },
  190. },
  191. Options: defaultOption,
  192. })
  193. require.NoError(t, err)
  194. err = rs.Start()
  195. require.NoError(t, err)
  196. time.Sleep(5 * time.Millisecond)
  197. require.Equal(t, 1, rs.triggered, fmt.Sprintf("case %v failed", i))
  198. err = rs.UpdateTopo(tt.r)
  199. time.Sleep(5 * time.Millisecond)
  200. require.Equal(t, tt.e, err, fmt.Sprintf("case %v failed", i))
  201. require.Equal(t, tt.triggered, rs.triggered, fmt.Sprintf("case %v failed", i))
  202. rs.Close()
  203. }
  204. }
  205. func TestUpdateScheduleRule(t *testing.T) {
  206. sp := processor.NewStreamProcessor()
  207. sp.ExecStmt(`CREATE STREAM demo () WITH (DATASOURCE="users", FORMAT="JSON")`)
  208. defer sp.ExecStmt(`DROP STREAM demo`)
  209. scheduleOption1 := *defaultOption
  210. scheduleOption1.Cron = "mockCron"
  211. scheduleOption1.Duration = "1s"
  212. rule1 := &api.Rule{
  213. Triggered: true,
  214. Id: "test",
  215. Sql: "SELECT ts FROM demo",
  216. Actions: []map[string]interface{}{
  217. {
  218. "log": map[string]interface{}{},
  219. },
  220. },
  221. Options: &scheduleOption1,
  222. }
  223. rs, err := NewRuleState(rule1)
  224. require.NoError(t, err)
  225. defer rs.Close()
  226. err = rs.startScheduleRule()
  227. require.NoError(t, err)
  228. require.True(t, rs.cronState.isInSchedule)
  229. require.Equal(t, "mockCron", rs.cronState.cron)
  230. require.Equal(t, "1s", rs.cronState.duration)
  231. scheduleOption2 := *defaultOption
  232. scheduleOption2.Cron = "mockCron2"
  233. scheduleOption2.Duration = "2s"
  234. rule2 := &api.Rule{
  235. Triggered: true,
  236. Id: "test",
  237. Sql: "SELECT ts FROM demo",
  238. Actions: []map[string]interface{}{
  239. {
  240. "log": map[string]interface{}{},
  241. },
  242. },
  243. Options: &scheduleOption2,
  244. }
  245. err = rs.UpdateTopo(rule2)
  246. require.NoError(t, err)
  247. require.Equal(t, "mockCron2", rs.cronState.cron)
  248. require.Equal(t, "2s", rs.cronState.duration)
  249. }
  250. func TestMultipleAccess(t *testing.T) {
  251. sp := processor.NewStreamProcessor()
  252. sp.ExecStmt(`CREATE STREAM demo () WITH (DATASOURCE="users", FORMAT="JSON")`)
  253. defer sp.ExecStmt(`DROP STREAM demo`)
  254. rs, err := NewRuleState(&api.Rule{
  255. Triggered: false,
  256. Id: "test",
  257. Sql: "SELECT ts FROM demo",
  258. Actions: []map[string]interface{}{
  259. {
  260. "log": map[string]interface{}{},
  261. },
  262. },
  263. Options: defaultOption,
  264. })
  265. if err != nil {
  266. t.Error(err)
  267. return
  268. }
  269. defer rs.Close()
  270. err = rs.Start()
  271. if err != nil {
  272. t.Error(err)
  273. return
  274. }
  275. var wg sync.WaitGroup
  276. wg.Add(10)
  277. for i := 0; i < 10; i++ {
  278. if i%3 == 0 {
  279. go func(i int) {
  280. rs.Stop()
  281. fmt.Printf("%d:%d\n", i, rs.triggered)
  282. wg.Done()
  283. }(i)
  284. } else {
  285. go func(i int) {
  286. rs.Start()
  287. fmt.Printf("%d:%d\n", i, rs.triggered)
  288. wg.Done()
  289. }(i)
  290. }
  291. }
  292. wg.Wait()
  293. rs.Start()
  294. fmt.Printf("%d:%d\n", 10, rs.triggered)
  295. if rs.triggered != 1 {
  296. t.Errorf("triggered mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", 1, rs.triggered)
  297. }
  298. }
  299. // Test rule state message
  300. func TestRuleState_Start(t *testing.T) {
  301. sp := processor.NewStreamProcessor()
  302. sp.ExecStmt(`CREATE STREAM demo () WITH (TYPE="neuron", FORMAT="JSON")`)
  303. defer sp.ExecStmt(`DROP STREAM demo`)
  304. // Test rule not triggered
  305. r := &api.Rule{
  306. Triggered: false,
  307. Id: "test",
  308. Sql: "SELECT ts FROM demo",
  309. Actions: []map[string]interface{}{
  310. {
  311. "log": map[string]interface{}{},
  312. },
  313. },
  314. Options: defaultOption,
  315. }
  316. const ruleStopped = "Stopped: canceled manually."
  317. const ruleStarted = "Running"
  318. t.Run("test rule loaded but not started", func(t *testing.T) {
  319. rs, err := NewRuleState(r)
  320. if err != nil {
  321. t.Error(err)
  322. return
  323. }
  324. state, err := rs.GetState()
  325. if err != nil {
  326. t.Errorf("get rule state error: %v", err)
  327. return
  328. }
  329. if state != ruleStopped {
  330. t.Errorf("rule state mismatch: exp=%v, got=%v", ruleStopped, state)
  331. return
  332. }
  333. })
  334. t.Run("test rule started", func(t *testing.T) {
  335. rs, err := NewRuleState(r)
  336. if err != nil {
  337. t.Error(err)
  338. return
  339. }
  340. err = rs.Start()
  341. if err != nil {
  342. t.Error(err)
  343. return
  344. }
  345. time.Sleep(100 * time.Millisecond)
  346. state, err := rs.GetState()
  347. if err != nil {
  348. t.Errorf("get rule state error: %v", err)
  349. return
  350. }
  351. if state != ruleStarted {
  352. t.Errorf("rule state mismatch: exp=%v, got=%v", ruleStopped, state)
  353. return
  354. }
  355. })
  356. t.Run("test rule loaded and stopped", func(t *testing.T) {
  357. rs, err := NewRuleState(r)
  358. if err != nil {
  359. t.Error(err)
  360. return
  361. }
  362. err = rs.Start()
  363. if err != nil {
  364. t.Error(err)
  365. return
  366. }
  367. err = rs.Close()
  368. if err != nil {
  369. t.Error(err)
  370. return
  371. }
  372. state, err := rs.GetState()
  373. if err != nil {
  374. t.Errorf("get rule state error: %v", err)
  375. return
  376. }
  377. if state != ruleStopped {
  378. t.Errorf("rule state mismatch: exp=%v, got=%v", ruleStopped, state)
  379. return
  380. }
  381. })
  382. }
  383. func TestScheduleRule(t *testing.T) {
  384. conf.IsTesting = true
  385. sp := processor.NewStreamProcessor()
  386. sp.ExecStmt(`CREATE STREAM demo () WITH (TYPE="neuron", FORMAT="JSON")`)
  387. defer sp.ExecStmt(`DROP STREAM demo`)
  388. // Test rule not triggered
  389. r := &api.Rule{
  390. Triggered: false,
  391. Id: "test",
  392. Sql: "SELECT ts FROM demo",
  393. Actions: []map[string]interface{}{
  394. {
  395. "log": map[string]interface{}{},
  396. },
  397. },
  398. Options: defaultOption,
  399. }
  400. r.Options.Cron = "mockCron"
  401. r.Options.Duration = "1s"
  402. const ruleStarted = "Running"
  403. const ruleStopped = "Stopped: waiting for next schedule."
  404. func() {
  405. rs, err := NewRuleState(r)
  406. if err != nil {
  407. t.Error(err)
  408. return
  409. }
  410. if err := rs.startScheduleRule(); err != nil {
  411. t.Error(err)
  412. return
  413. }
  414. time.Sleep(500 * time.Millisecond)
  415. state, err := rs.GetState()
  416. if err != nil {
  417. t.Errorf("get rule state error: %v", err)
  418. return
  419. }
  420. if state != ruleStarted {
  421. t.Errorf("rule state mismatch: exp=%v, got=%v", ruleStarted, state)
  422. return
  423. }
  424. if !rs.cronState.isInSchedule {
  425. t.Error("cron state should be in schedule")
  426. return
  427. }
  428. }()
  429. func() {
  430. rs, err := NewRuleState(r)
  431. if err != nil {
  432. t.Error(err)
  433. return
  434. }
  435. if err := rs.startScheduleRule(); err != nil {
  436. t.Error(err)
  437. return
  438. }
  439. time.Sleep(1500 * time.Millisecond)
  440. state, err := rs.GetState()
  441. if err != nil {
  442. t.Errorf("get rule state error: %v", err)
  443. return
  444. }
  445. if state != ruleStopped {
  446. t.Errorf("rule state mismatch: exp=%v, got=%v", ruleStopped, state)
  447. return
  448. }
  449. if !rs.cronState.isInSchedule {
  450. t.Error("cron state should be in schedule")
  451. return
  452. }
  453. }()
  454. func() {
  455. rs, err := NewRuleState(r)
  456. if err != nil {
  457. t.Error(err)
  458. return
  459. }
  460. if err := rs.startScheduleRule(); err != nil {
  461. t.Error(err)
  462. return
  463. }
  464. if err := rs.startScheduleRule(); err == nil {
  465. t.Error("rule can't be register in cron twice")
  466. return
  467. } else {
  468. if err.Error() != "rule test is already in schedule" {
  469. t.Error("error message wrong")
  470. return
  471. }
  472. }
  473. }()
  474. func() {
  475. rs, err := NewRuleState(r)
  476. if err != nil {
  477. t.Error(err)
  478. return
  479. }
  480. if err := rs.startScheduleRule(); err != nil {
  481. t.Error(err)
  482. return
  483. }
  484. if err := rs.Stop(); err != nil {
  485. t.Error(err)
  486. return
  487. }
  488. state, err := rs.GetState()
  489. if err != nil {
  490. t.Errorf("get rule state error: %v", err)
  491. return
  492. }
  493. if state != "Stopped: canceled manually." {
  494. t.Errorf("rule state mismatch: exp=%v, got=%v", "Stopped: canceled manually.", state)
  495. return
  496. }
  497. if rs.cronState.isInSchedule {
  498. t.Error("cron state shouldn't be in schedule")
  499. return
  500. }
  501. }()
  502. func() {
  503. rs, err := NewRuleState(r)
  504. if err != nil {
  505. t.Error(err)
  506. return
  507. }
  508. if err := rs.Stop(); err != nil {
  509. t.Error(err)
  510. return
  511. }
  512. if err := rs.Close(); err != nil {
  513. t.Error(err)
  514. return
  515. }
  516. }()
  517. func() {
  518. rs, err := NewRuleState(r)
  519. if err != nil {
  520. t.Error(err)
  521. return
  522. }
  523. rs.cronState.isInSchedule = true
  524. status, err := rs.GetState()
  525. require.NoError(t, err)
  526. require.Equal(t, "Stopped: waiting for next schedule.", status)
  527. }()
  528. }
  529. func TestScheduleRuleInRange(t *testing.T) {
  530. now := time.Now()
  531. m := conf.Clock.(*clock.Mock)
  532. m.Set(now)
  533. before := now.AddDate(-10, -10, -10)
  534. after := now.Add(10 * time.Second)
  535. conf.IsTesting = true
  536. sp := processor.NewStreamProcessor()
  537. sp.ExecStmt(`CREATE STREAM demo () WITH (TYPE="neuron", FORMAT="JSON")`)
  538. defer sp.ExecStmt(`DROP STREAM demo`)
  539. // Test rule not triggered
  540. r := &api.Rule{
  541. Triggered: false,
  542. Id: "test",
  543. Sql: "SELECT ts FROM demo",
  544. Actions: []map[string]interface{}{
  545. {
  546. "log": map[string]interface{}{},
  547. },
  548. },
  549. Options: defaultOption,
  550. }
  551. r.Options.Cron = "mockCron"
  552. r.Options.Duration = "1s"
  553. r.Options.CronDatetimeRange = []api.DatetimeRange{
  554. {
  555. Begin: before.Format(layout),
  556. End: after.Format(layout),
  557. },
  558. }
  559. func() {
  560. rs, err := NewRuleState(r)
  561. require.NoError(t, err)
  562. err = rs.startScheduleRule()
  563. require.NoError(t, err)
  564. time.Sleep(500 * time.Millisecond)
  565. state, err := rs.GetState()
  566. require.NoError(t, err)
  567. require.Equal(t, RuleStarted, state)
  568. require.True(t, rs.cronState.isInSchedule)
  569. }()
  570. r.Options.CronDatetimeRange = []api.DatetimeRange{
  571. {
  572. Begin: after.Format(layout),
  573. End: after.Format(layout),
  574. },
  575. }
  576. func() {
  577. rs, err := NewRuleState(r)
  578. require.NoError(t, err)
  579. err = rs.startScheduleRule()
  580. require.NoError(t, err)
  581. time.Sleep(500 * time.Millisecond)
  582. state, err := rs.GetState()
  583. require.NoError(t, err)
  584. require.Equal(t, RuleWait, state)
  585. require.True(t, rs.cronState.isInSchedule)
  586. }()
  587. r.Options.CronDatetimeRange = []api.DatetimeRange{
  588. {
  589. Begin: before.Format(layout),
  590. End: before.Format(layout),
  591. },
  592. }
  593. func() {
  594. rs, err := NewRuleState(r)
  595. require.NoError(t, err)
  596. err = rs.startScheduleRule()
  597. require.NoError(t, err)
  598. time.Sleep(500 * time.Millisecond)
  599. state, err := rs.GetState()
  600. require.NoError(t, err)
  601. require.Equal(t, RuleTerminated, state)
  602. require.True(t, rs.cronState.isInSchedule)
  603. }()
  604. now2, err := time.Parse(layout, "2006-01-02 15:04:01")
  605. require.NoError(t, err)
  606. r.Options.Cron = "4 15 * * *"
  607. r.Options.CronDatetimeRange = nil
  608. r.Options.Duration = "2s"
  609. m.Set(now2)
  610. func() {
  611. rs, err := NewRuleState(r)
  612. require.NoError(t, err)
  613. require.NoError(t, rs.startScheduleRule())
  614. time.Sleep(500 * time.Millisecond)
  615. state, err := rs.GetState()
  616. require.NoError(t, err)
  617. require.Equal(t, state, RuleStarted)
  618. time.Sleep(3 * time.Second)
  619. state, err = rs.GetState()
  620. require.NoError(t, err)
  621. require.Equal(t, state, RuleWait)
  622. }()
  623. }
  624. const layout = "2006-01-02 15:04:05"
  625. func TestStartLongRunningScheduleRule(t *testing.T) {
  626. conf.IsTesting = true
  627. sp := processor.NewStreamProcessor()
  628. sp.ExecStmt(`CREATE STREAM demo () WITH (TYPE="neuron", FORMAT="JSON")`)
  629. defer sp.ExecStmt(`DROP STREAM demo`)
  630. now := time.Now()
  631. m := conf.Clock.(*clock.Mock)
  632. m.Set(now)
  633. before := now.AddDate(-10, -10, -10)
  634. after := now.Add(10 * time.Second)
  635. r := &api.Rule{
  636. Triggered: false,
  637. Id: "test",
  638. Sql: "SELECT ts FROM demo",
  639. Actions: []map[string]interface{}{
  640. {
  641. "log": map[string]interface{}{},
  642. },
  643. },
  644. Options: defaultOption,
  645. }
  646. r.Options.Cron = ""
  647. r.Options.Duration = ""
  648. r.Options.CronDatetimeRange = []api.DatetimeRange{
  649. {
  650. Begin: before.Format(layout),
  651. End: after.Format(layout),
  652. },
  653. }
  654. func() {
  655. rs, err := NewRuleState(r)
  656. require.NoError(t, err)
  657. require.NoError(t, rs.Start())
  658. time.Sleep(500 * time.Millisecond)
  659. state, err := rs.GetState()
  660. require.NoError(t, err)
  661. require.Equal(t, state, RuleStarted)
  662. }()
  663. r.Options.CronDatetimeRange = []api.DatetimeRange{
  664. {
  665. Begin: before.Format(layout),
  666. End: before.Format(layout),
  667. },
  668. }
  669. func() {
  670. rs, err := NewRuleState(r)
  671. require.NoError(t, err)
  672. require.NoError(t, rs.Start())
  673. time.Sleep(500 * time.Millisecond)
  674. state, err := rs.GetState()
  675. require.NoError(t, err)
  676. require.Equal(t, state, RuleTerminated)
  677. }()
  678. r.Options.CronDatetimeRange = []api.DatetimeRange{
  679. {
  680. Begin: after.Format(layout),
  681. End: after.Format(layout),
  682. },
  683. }
  684. func() {
  685. rs, err := NewRuleState(r)
  686. require.NoError(t, err)
  687. require.NoError(t, rs.Start())
  688. time.Sleep(500 * time.Millisecond)
  689. state, err := rs.GetState()
  690. require.NoError(t, err)
  691. require.Equal(t, state, RuleWait)
  692. }()
  693. r.Options.CronDatetimeRange = []api.DatetimeRange{
  694. {
  695. Begin: before.Format(layout),
  696. End: after.Format(layout),
  697. },
  698. }
  699. func() {
  700. rs, err := NewRuleState(r)
  701. require.NoError(t, err)
  702. require.NoError(t, rs.Start())
  703. time.Sleep(500 * time.Millisecond)
  704. state, err := rs.GetState()
  705. require.NoError(t, err)
  706. require.Equal(t, state, RuleStarted)
  707. require.NoError(t, rs.Stop())
  708. time.Sleep(500 * time.Millisecond)
  709. state, err = rs.GetState()
  710. require.NoError(t, err)
  711. require.Equal(t, state, RuleStopped)
  712. }()
  713. }
  714. func TestRuleStateInternalStop(t *testing.T) {
  715. conf.IsTesting = true
  716. sp := processor.NewStreamProcessor()
  717. sp.ExecStmt(`CREATE STREAM demo () WITH (TYPE="neuron", FORMAT="JSON")`)
  718. defer sp.ExecStmt(`DROP STREAM demo`)
  719. r := &api.Rule{
  720. Triggered: false,
  721. Id: "test",
  722. Sql: "SELECT ts FROM demo",
  723. Actions: []map[string]interface{}{
  724. {
  725. "log": map[string]interface{}{},
  726. },
  727. },
  728. Options: defaultOption,
  729. }
  730. r.Options.Cron = "123"
  731. rs, err := NewRuleState(r)
  732. require.NoError(t, err)
  733. err = rs.InternalStop()
  734. require.Error(t, err)
  735. r.Options.Cron = ""
  736. r.Options.Duration = ""
  737. r.Options.CronDatetimeRange = []api.DatetimeRange{
  738. {
  739. Begin: layout,
  740. End: layout,
  741. },
  742. }
  743. rs, err = NewRuleState(r)
  744. require.NoError(t, err)
  745. err = rs.InternalStop()
  746. require.NoError(t, err)
  747. require.Equal(t, rs.triggered, 2)
  748. }