preprocessor_test.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. package plans
  2. import (
  3. "engine/common"
  4. "engine/xsql"
  5. "fmt"
  6. "reflect"
  7. "testing"
  8. )
  9. func TestPreprocessor_Apply(t *testing.T) {
  10. var tests = []struct {
  11. stmt *xsql.StreamStmt
  12. data []byte
  13. result interface{}
  14. }{
  15. //Basic type
  16. {
  17. stmt: &xsql.StreamStmt{
  18. Name: xsql.StreamName("demo"),
  19. StreamFields: []xsql.StreamField{
  20. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  21. },
  22. },
  23. data: []byte(`{"a": 6}`),
  24. result: nil,
  25. },
  26. {
  27. stmt: &xsql.StreamStmt{
  28. Name: xsql.StreamName("demo"),
  29. StreamFields: []xsql.StreamField{
  30. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  31. },
  32. },
  33. data: []byte(`{"abc": 6}`),
  34. result: map[string]interface{}{
  35. "abc" : int(6),
  36. },
  37. },
  38. {
  39. stmt: &xsql.StreamStmt{
  40. Name: xsql.StreamName("demo"),
  41. StreamFields: []xsql.StreamField{
  42. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  43. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  44. },
  45. },
  46. data: []byte(`{"abc": 34, "def" : "hello", "ghi": 50}`),
  47. result: map[string]interface{}{
  48. "abc" : float64(34),
  49. "def" : "hello",
  50. },
  51. },
  52. {
  53. stmt: &xsql.StreamStmt{
  54. Name: xsql.StreamName("demo"),
  55. StreamFields: []xsql.StreamField{
  56. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  57. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.BOOLEAN}},
  58. },
  59. },
  60. data: []byte(`{"abc": 77, "def" : "hello"}`),
  61. result: nil,
  62. },
  63. //Array type
  64. {
  65. stmt: &xsql.StreamStmt{
  66. Name: xsql.StreamName("demo"),
  67. StreamFields: []xsql.StreamField{
  68. {Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
  69. {Name: "def", FieldType: &xsql.BasicType{Type: xsql.BOOLEAN}},
  70. },
  71. },
  72. data: []byte(`{"a": {"b" : "hello"}}`),
  73. result: nil,
  74. },
  75. //Rec type
  76. {
  77. stmt: &xsql.StreamStmt{
  78. Name: xsql.StreamName("demo"),
  79. StreamFields: []xsql.StreamField{
  80. {Name: "a", FieldType: &xsql.RecType{
  81. StreamFields: []xsql.StreamField{
  82. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  83. },
  84. }},
  85. },
  86. },
  87. data: []byte(`{"a": {"b" : "hello"}}`),
  88. result: map[string]interface{}{
  89. "a" : map[string]interface{}{
  90. "b": "hello",
  91. },
  92. },
  93. },
  94. //Array of complex type
  95. {
  96. stmt: &xsql.StreamStmt{
  97. Name: xsql.StreamName("demo"),
  98. StreamFields: []xsql.StreamField{
  99. {Name: "a", FieldType: &xsql.ArrayType{
  100. Type: xsql.STRUCT,
  101. FieldType: &xsql.RecType{
  102. StreamFields: []xsql.StreamField{
  103. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  104. },
  105. },
  106. }},
  107. },
  108. },
  109. data: []byte(`{"a": [{"b" : "hello1"}, {"b" : "hello2"}]}`),
  110. result: map[string]interface{}{
  111. "a" : []map[string]interface{}{
  112. {"b": "hello1"},
  113. {"b": "hello2"},
  114. },
  115. },
  116. },
  117. //Rec of complex type
  118. {
  119. stmt: &xsql.StreamStmt{
  120. Name: xsql.StreamName("demo"),
  121. StreamFields: []xsql.StreamField{
  122. {Name: "a", FieldType: &xsql.RecType{
  123. StreamFields: []xsql.StreamField{
  124. {Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
  125. {Name: "c", FieldType: &xsql.RecType{
  126. StreamFields: []xsql.StreamField{
  127. {Name: "d", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
  128. },
  129. }},
  130. },
  131. }},
  132. },
  133. },
  134. data: []byte(`{"a": {"b" : "hello", "c": {"d": 35.2}}}`),
  135. result: map[string]interface{}{
  136. "a" : map[string]interface{}{
  137. "b" : "hello",
  138. "c" : map[string]interface{}{
  139. "d": int(35),
  140. },
  141. },
  142. },
  143. },
  144. }
  145. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  146. defer common.CloseLogger()
  147. for i, tt := range tests {
  148. pp := &Preprocessor{StreamStmt:tt.stmt}
  149. result := pp.Apply(nil, tt.data)
  150. if !reflect.DeepEqual(tt.result, result) {
  151. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.data, tt.result, result)
  152. }
  153. }
  154. }