simple_processor_test.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. package processors
  2. import (
  3. "github.com/emqx/kuiper/xstream/api"
  4. "reflect"
  5. "testing"
  6. )
  7. func TestRuleActionParse_Apply(t *testing.T) {
  8. var tests = []struct {
  9. ruleStr string
  10. result *api.Rule
  11. }{
  12. {
  13. ruleStr: `{
  14. "id": "ruleTest",
  15. "sql": "SELECT * from demo",
  16. "actions": [
  17. {
  18. "funcName": "RFC_READ_TABLE",
  19. "ashost": "192.168.1.100",
  20. "sysnr": "02",
  21. "client": "900",
  22. "user": "SPERF",
  23. "passwd": "PASSPASS",
  24. "params": {
  25. "QUERY_TABLE": "VBAP",
  26. "ROWCOUNT": 10,
  27. "FIELDS": [
  28. {"FIELDNAME": "MANDT"},
  29. {"FIELDNAME": "VBELN"},
  30. {"FIELDNAME": "POSNR"}
  31. ]
  32. }
  33. }
  34. ]
  35. }`,
  36. result: &api.Rule{
  37. Triggered: false,
  38. Id: "ruleTest",
  39. Sql: "SELECT * from demo",
  40. Actions: []map[string]interface{}{
  41. {
  42. "funcName": "RFC_READ_TABLE",
  43. "ashost": "192.168.1.100",
  44. "sysnr": "02",
  45. "client": "900",
  46. "user": "SPERF",
  47. "passwd": "PASSPASS",
  48. "params": map[string]interface{}{
  49. "QUERY_TABLE": "VBAP",
  50. "ROWCOUNT": float64(10),
  51. "FIELDS": []interface{}{
  52. map[string]interface{}{"FIELDNAME": "MANDT"},
  53. map[string]interface{}{"FIELDNAME": "VBELN"},
  54. map[string]interface{}{"FIELDNAME": "POSNR"},
  55. },
  56. },
  57. },
  58. },
  59. Options: &api.RuleOption{
  60. IsEventTime: false,
  61. LateTol: 1000,
  62. Concurrency: 1,
  63. BufferLength: 1024,
  64. SendMetaToSink: false,
  65. Qos: api.AtMostOnce,
  66. CheckpointInterval: 300000,
  67. },
  68. },
  69. }, {
  70. ruleStr: `{
  71. "id": "ruleTest2",
  72. "sql": "SELECT * from demo",
  73. "actions": [
  74. {
  75. "log": ""
  76. },
  77. {
  78. "sap": {
  79. "funcName": "RFC_READ_TABLE",
  80. "ashost": "192.168.100.10",
  81. "sysnr": "02",
  82. "client": "900",
  83. "user": "uuu",
  84. "passwd": "ppp."
  85. }
  86. }
  87. ],
  88. "options": {
  89. "isEventTime": true,
  90. "lateTolerance": 1000,
  91. "bufferLength": 10240,
  92. "qos": 2,
  93. "checkpointInterval": 60000
  94. }
  95. }`,
  96. result: &api.Rule{
  97. Triggered: false,
  98. Id: "ruleTest2",
  99. Sql: "SELECT * from demo",
  100. Actions: []map[string]interface{}{
  101. {
  102. "log": "",
  103. }, {
  104. "sap": map[string]interface{}{
  105. "funcName": "RFC_READ_TABLE",
  106. "ashost": "192.168.100.10",
  107. "sysnr": "02",
  108. "client": "900",
  109. "user": "uuu",
  110. "passwd": "ppp.",
  111. },
  112. },
  113. },
  114. Options: &api.RuleOption{
  115. IsEventTime: true,
  116. LateTol: 1000,
  117. Concurrency: 1,
  118. BufferLength: 10240,
  119. SendMetaToSink: false,
  120. Qos: api.ExactlyOnce,
  121. CheckpointInterval: 60000,
  122. },
  123. },
  124. },
  125. }
  126. p := NewRuleProcessor(DbDir)
  127. for i, tt := range tests {
  128. r, err := p.getRuleByJson(tt.result.Id, tt.ruleStr)
  129. if err != nil {
  130. t.Errorf("get rule error: %s", err)
  131. }
  132. if !reflect.DeepEqual(tt.result, r) {
  133. t.Errorf("%d \tresult mismatch:\n\nexp=%+v\n\ngot=%+v\n\n", i, tt.result, r)
  134. }
  135. }
  136. }