table_processor_test.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. package operators
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/xsql"
  7. "github.com/emqx/kuiper/xstream/contexts"
  8. "reflect"
  9. "testing"
  10. )
  11. func TestTableProcessor_Apply(t *testing.T) {
  12. var tests = []struct {
  13. stmt *xsql.StreamStmt
  14. data []byte
  15. result interface{}
  16. }{
  17. {
  18. stmt: &xsql.StreamStmt{
  19. Name: xsql.StreamName("demo"),
  20. StreamFields: []xsql.StreamField{
  21. {Name: "a", FieldType: &xsql.ArrayType{
  22. Type: xsql.STRUCT,
  23. FieldType: &xsql.RecType{
  24. StreamFields: []xsql.StreamField{
  25. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  26. },
  27. },
  28. }},
  29. },
  30. },
  31. data: []byte(`[{"a": [{"b" : "hello1"}, {"b" : "hello2"}]},{"a": [{"b" : "hello2"}, {"b" : "hello3"}]},{"a": [{"b" : "hello3"}, {"b" : "hello4"}]}]`),
  32. result: xsql.WindowTuples{
  33. Emitter: "demo",
  34. Tuples: []xsql.Tuple{
  35. {
  36. Message: xsql.Message{
  37. "a": []map[string]interface{}{
  38. {"b": "hello1"},
  39. {"b": "hello2"},
  40. },
  41. },
  42. Emitter: "demo",
  43. },
  44. {
  45. Message: xsql.Message{
  46. "a": []map[string]interface{}{
  47. {"b": "hello2"},
  48. {"b": "hello3"},
  49. },
  50. },
  51. Emitter: "demo",
  52. },
  53. {
  54. Message: xsql.Message{
  55. "a": []map[string]interface{}{
  56. {"b": "hello3"},
  57. {"b": "hello4"},
  58. },
  59. },
  60. Emitter: "demo",
  61. },
  62. },
  63. },
  64. }, {
  65. stmt: &xsql.StreamStmt{
  66. Name: xsql.StreamName("demo"),
  67. StreamFields: nil,
  68. },
  69. data: []byte(`[{"a": {"b" : "hello", "c": {"d": 35.2}}},{"a": {"b" : "world", "c": {"d": 65.2}}}]`),
  70. result: xsql.WindowTuples{
  71. Emitter: "demo",
  72. Tuples: []xsql.Tuple{
  73. {
  74. Message: xsql.Message{
  75. "a": map[string]interface{}{
  76. "b": "hello",
  77. "c": map[string]interface{}{
  78. "d": 35.2,
  79. },
  80. },
  81. },
  82. Emitter: "demo",
  83. },
  84. {
  85. Message: xsql.Message{
  86. "a": map[string]interface{}{
  87. "b": "world",
  88. "c": map[string]interface{}{
  89. "d": 65.2,
  90. },
  91. },
  92. },
  93. Emitter: "demo",
  94. },
  95. },
  96. },
  97. },
  98. }
  99. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  100. defer common.CloseLogger()
  101. contextLogger := common.Log.WithField("rule", "TestPreprocessor_Apply")
  102. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  103. for i, tt := range tests {
  104. pp := &TableProcessor{}
  105. pp.streamFields = convertFields(tt.stmt.StreamFields)
  106. var dm []map[string]interface{}
  107. if e := json.Unmarshal(tt.data, &dm); e != nil {
  108. t.Log(e)
  109. t.Fail()
  110. } else {
  111. tuples := make([]*xsql.Tuple, len(dm))
  112. for i, m := range dm {
  113. tuples[i] = &xsql.Tuple{
  114. Emitter: "demo",
  115. Message: m,
  116. }
  117. }
  118. fv, afv := xsql.NewFunctionValuersForOp(nil)
  119. result := pp.Apply(ctx, tuples, fv, afv)
  120. if !reflect.DeepEqual(tt.result, result) {
  121. t.Errorf("%d. result mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.result, result)
  122. }
  123. }
  124. }
  125. }