rule_test.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. package processor
  2. import (
  3. "github.com/emqx/kuiper/pkg/api"
  4. "reflect"
  5. "testing"
  6. )
  7. func TestRuleActionParse_Apply(t *testing.T) {
  8. var tests = []struct {
  9. ruleStr string
  10. result *api.Rule
  11. }{
  12. {
  13. ruleStr: `{
  14. "id": "ruleTest",
  15. "sql": "SELECT * from demo",
  16. "actions": [
  17. {
  18. "funcName": "RFC_READ_TABLE",
  19. "ashost": "192.168.1.100",
  20. "sysnr": "02",
  21. "client": "900",
  22. "user": "SPERF",
  23. "passwd": "PASSPASS",
  24. "params": {
  25. "QUERY_TABLE": "VBAP",
  26. "ROWCOUNT": 10,
  27. "FIELDS": [
  28. {"FIELDNAME": "MANDT"},
  29. {"FIELDNAME": "VBELN"},
  30. {"FIELDNAME": "POSNR"}
  31. ]
  32. }
  33. }
  34. ]
  35. }`,
  36. result: &api.Rule{
  37. Triggered: false,
  38. Id: "ruleTest",
  39. Sql: "SELECT * from demo",
  40. Actions: []map[string]interface{}{
  41. {
  42. "funcName": "RFC_READ_TABLE",
  43. "ashost": "192.168.1.100",
  44. "sysnr": "02",
  45. "client": "900",
  46. "user": "SPERF",
  47. "passwd": "PASSPASS",
  48. "params": map[string]interface{}{
  49. "QUERY_TABLE": "VBAP",
  50. "ROWCOUNT": float64(10),
  51. "FIELDS": []interface{}{
  52. map[string]interface{}{"FIELDNAME": "MANDT"},
  53. map[string]interface{}{"FIELDNAME": "VBELN"},
  54. map[string]interface{}{"FIELDNAME": "POSNR"},
  55. },
  56. },
  57. },
  58. },
  59. Options: &api.RuleOption{
  60. IsEventTime: false,
  61. LateTol: 1000,
  62. Concurrency: 1,
  63. BufferLength: 1024,
  64. SendMetaToSink: false,
  65. Qos: api.AtMostOnce,
  66. CheckpointInterval: 300000,
  67. SendError: true,
  68. },
  69. },
  70. }, {
  71. ruleStr: `{
  72. "id": "ruleTest2",
  73. "sql": "SELECT * from demo",
  74. "actions": [
  75. {
  76. "log": ""
  77. },
  78. {
  79. "sap": {
  80. "funcName": "RFC_READ_TABLE",
  81. "ashost": "192.168.100.10",
  82. "sysnr": "02",
  83. "client": "900",
  84. "user": "uuu",
  85. "passwd": "ppp."
  86. }
  87. }
  88. ],
  89. "options": {
  90. "isEventTime": true,
  91. "lateTolerance": 1000,
  92. "bufferLength": 10240,
  93. "qos": 2,
  94. "checkpointInterval": 60000
  95. }
  96. }`,
  97. result: &api.Rule{
  98. Triggered: false,
  99. Id: "ruleTest2",
  100. Sql: "SELECT * from demo",
  101. Actions: []map[string]interface{}{
  102. {
  103. "log": "",
  104. }, {
  105. "sap": map[string]interface{}{
  106. "funcName": "RFC_READ_TABLE",
  107. "ashost": "192.168.100.10",
  108. "sysnr": "02",
  109. "client": "900",
  110. "user": "uuu",
  111. "passwd": "ppp.",
  112. },
  113. },
  114. },
  115. Options: &api.RuleOption{
  116. IsEventTime: true,
  117. LateTol: 1000,
  118. Concurrency: 1,
  119. BufferLength: 10240,
  120. SendMetaToSink: false,
  121. Qos: api.ExactlyOnce,
  122. CheckpointInterval: 60000,
  123. SendError: true,
  124. },
  125. },
  126. },
  127. }
  128. p := NewRuleProcessor(DbDir)
  129. for i, tt := range tests {
  130. r, err := p.getRuleByJson(tt.result.Id, tt.ruleStr)
  131. if err != nil {
  132. t.Errorf("get rule error: %s", err)
  133. }
  134. if !reflect.DeepEqual(tt.result, r) {
  135. t.Errorf("%d \tresult mismatch:\n\nexp=%+v\n\ngot=%+v\n\n", i, tt.result, r)
  136. }
  137. }
  138. }