rule_test.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package processor
  15. import (
  16. "reflect"
  17. "testing"
  18. "github.com/lf-edge/ekuiper/pkg/api"
  19. )
  20. func TestRuleActionParse_Apply(t *testing.T) {
  21. tests := []struct {
  22. ruleStr string
  23. result *api.Rule
  24. }{
  25. {
  26. ruleStr: `{
  27. "id": "ruleTest",
  28. "sql": "SELECT * from demo",
  29. "actions": [
  30. {
  31. "funcName": "RFC_READ_TABLE",
  32. "ashost": "192.168.1.100",
  33. "sysnr": "02",
  34. "client": "900",
  35. "user": "SPERF",
  36. "passwd": "PASSPASS",
  37. "params": {
  38. "QUERY_TABLE": "VBAP",
  39. "ROWCOUNT": 10,
  40. "FIELDS": [
  41. {"FIELDNAME": "MANDT"},
  42. {"FIELDNAME": "VBELN"},
  43. {"FIELDNAME": "POSNR"}
  44. ]
  45. }
  46. }
  47. ],
  48. "options": {
  49. "restartStrategy": {
  50. "attempts": 20
  51. }
  52. }
  53. }`,
  54. result: &api.Rule{
  55. Triggered: true,
  56. Id: "ruleTest",
  57. Sql: "SELECT * from demo",
  58. Actions: []map[string]interface{}{
  59. {
  60. "funcName": "RFC_READ_TABLE",
  61. "ashost": "192.168.1.100",
  62. "sysnr": "02",
  63. "client": "900",
  64. "user": "SPERF",
  65. "passwd": "PASSPASS",
  66. "params": map[string]interface{}{
  67. "QUERY_TABLE": "VBAP",
  68. "ROWCOUNT": float64(10),
  69. "FIELDS": []interface{}{
  70. map[string]interface{}{"FIELDNAME": "MANDT"},
  71. map[string]interface{}{"FIELDNAME": "VBELN"},
  72. map[string]interface{}{"FIELDNAME": "POSNR"},
  73. },
  74. },
  75. },
  76. },
  77. Options: &api.RuleOption{
  78. IsEventTime: false,
  79. LateTol: 1000,
  80. Concurrency: 1,
  81. BufferLength: 1024,
  82. SendMetaToSink: false,
  83. Qos: api.AtMostOnce,
  84. CheckpointInterval: 300000,
  85. SendError: true,
  86. Restart: &api.RestartStrategy{
  87. Attempts: 20,
  88. Delay: 1000,
  89. Multiplier: 2,
  90. MaxDelay: 30000,
  91. JitterFactor: 0.1,
  92. },
  93. },
  94. },
  95. }, {
  96. ruleStr: `{
  97. "id": "ruleTest2",
  98. "sql": "SELECT * from demo",
  99. "actions": [
  100. {
  101. "log": ""
  102. },
  103. {
  104. "sap": {
  105. "funcName": "RFC_READ_TABLE",
  106. "ashost": "192.168.100.10",
  107. "sysnr": "02",
  108. "client": "900",
  109. "user": "uuu",
  110. "passwd": "ppp."
  111. }
  112. }
  113. ],
  114. "options": {
  115. "isEventTime": true,
  116. "lateTolerance": 1000,
  117. "bufferLength": 10240,
  118. "qos": 2,
  119. "checkpointInterval": 60000
  120. }
  121. }`,
  122. result: &api.Rule{
  123. Triggered: true,
  124. Id: "ruleTest2",
  125. Sql: "SELECT * from demo",
  126. Actions: []map[string]interface{}{
  127. {
  128. "log": "",
  129. }, {
  130. "sap": map[string]interface{}{
  131. "funcName": "RFC_READ_TABLE",
  132. "ashost": "192.168.100.10",
  133. "sysnr": "02",
  134. "client": "900",
  135. "user": "uuu",
  136. "passwd": "ppp.",
  137. },
  138. },
  139. },
  140. Options: &api.RuleOption{
  141. IsEventTime: true,
  142. LateTol: 1000,
  143. Concurrency: 1,
  144. BufferLength: 10240,
  145. SendMetaToSink: false,
  146. Qos: api.ExactlyOnce,
  147. CheckpointInterval: 60000,
  148. SendError: true,
  149. Restart: &api.RestartStrategy{
  150. Attempts: 0,
  151. Delay: 1000,
  152. Multiplier: 2,
  153. MaxDelay: 30000,
  154. JitterFactor: 0.1,
  155. },
  156. },
  157. },
  158. }, {
  159. ruleStr: `{
  160. "id": "ruleTest",
  161. "sql": "SELECT * from demo",
  162. "actions": [
  163. {"log": {}}
  164. ],
  165. "triggered": false
  166. }`,
  167. result: &api.Rule{
  168. Triggered: false,
  169. Id: "ruleTest",
  170. Sql: "SELECT * from demo",
  171. Actions: []map[string]interface{}{
  172. {
  173. "log": map[string]interface{}{},
  174. },
  175. },
  176. Options: &api.RuleOption{
  177. IsEventTime: false,
  178. LateTol: 1000,
  179. Concurrency: 1,
  180. BufferLength: 1024,
  181. SendMetaToSink: false,
  182. Qos: api.AtMostOnce,
  183. CheckpointInterval: 300000,
  184. SendError: true,
  185. Restart: &api.RestartStrategy{
  186. Attempts: 0,
  187. Delay: 1000,
  188. Multiplier: 2,
  189. MaxDelay: 30000,
  190. JitterFactor: 0.1,
  191. },
  192. },
  193. },
  194. },
  195. }
  196. p := NewRuleProcessor()
  197. for i, tt := range tests {
  198. r, err := p.GetRuleByJson(tt.result.Id, tt.ruleStr)
  199. if err != nil {
  200. t.Errorf("get rule error: %s", err)
  201. }
  202. if !reflect.DeepEqual(tt.result, r) {
  203. t.Errorf("%d \tresult mismatch:\n\nexp=%+v\n\ngot=%+v\n\n", i, tt.result, r)
  204. }
  205. }
  206. }
  207. func TestAllRules(t *testing.T) {
  208. expected := map[string]string{
  209. "rule1": "{\"id\": \"rule1\",\"sql\": \"SELECT * FROM demo\",\"actions\": [{ \"log\": {}}]}",
  210. "rule2": "{\"id\": \"rule2\",\"sql\": \"SELECT * FROM demo\",\"actions\": [{ \"log\": {}}]}",
  211. "rule3": "{\"id\": \"rule3\",\"sql\": \"SELECT * FROM demo\",\"actions\": [{ \"log\": {}}]}",
  212. }
  213. sp := NewStreamProcessor()
  214. defer sp.db.Clean()
  215. sp.ExecStmt(`CREATE STREAM demo () WITH (DATASOURCE="users", FORMAT="JSON")`)
  216. p := NewRuleProcessor()
  217. p.db.Clean()
  218. defer p.db.Clean()
  219. for k, v := range expected {
  220. _, err := p.ExecCreateWithValidation(k, v)
  221. if err != nil {
  222. t.Error(err)
  223. return
  224. }
  225. defer p.ExecDrop(k)
  226. }
  227. all, err := p.GetAllRulesJson()
  228. if err != nil {
  229. t.Error(err)
  230. return
  231. }
  232. if !reflect.DeepEqual(all, expected) {
  233. t.Errorf("Expect\t %v\nBut got\t%v", expected, all)
  234. }
  235. }