planner_alias_test.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. // Copyright 2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package planner
  15. import (
  16. "encoding/json"
  17. "reflect"
  18. "strings"
  19. "testing"
  20. "github.com/gdexlab/go-render/render"
  21. "github.com/lf-edge/ekuiper/internal/pkg/store"
  22. "github.com/lf-edge/ekuiper/internal/xsql"
  23. "github.com/lf-edge/ekuiper/pkg/api"
  24. "github.com/lf-edge/ekuiper/pkg/ast"
  25. )
  26. func TestPlannerAlias(t *testing.T) {
  27. kv, err := store.GetKV("stream")
  28. if err != nil {
  29. t.Error(err)
  30. return
  31. }
  32. streamSqls := map[string]string{
  33. "src1": `CREATE STREAM src1 (
  34. ) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
  35. "src2": `CREATE STREAM src2 (
  36. ) WITH (DATASOURCE="src2", FORMAT="json", KEY="ts");`,
  37. "tableInPlanner": `CREATE TABLE tableInPlanner (
  38. id BIGINT,
  39. name STRING,
  40. value STRING,
  41. hum BIGINT
  42. ) WITH (TYPE="file");`,
  43. }
  44. types := map[string]ast.StreamType{
  45. "src1": ast.TypeStream,
  46. "src2": ast.TypeStream,
  47. "tableInPlanner": ast.TypeTable,
  48. }
  49. for name, sql := range streamSqls {
  50. s, err := json.Marshal(&xsql.StreamInfo{
  51. StreamType: types[name],
  52. Statement: sql,
  53. })
  54. if err != nil {
  55. t.Error(err)
  56. t.Fail()
  57. }
  58. err = kv.Set(name, string(s))
  59. if err != nil {
  60. t.Error(err)
  61. t.Fail()
  62. }
  63. }
  64. streams := make(map[string]*ast.StreamStmt)
  65. for n := range streamSqls {
  66. streamStmt, err := xsql.GetDataSource(kv, n)
  67. if err != nil {
  68. t.Errorf("fail to get stream %s, please check if stream is created", n)
  69. return
  70. }
  71. streams[n] = streamStmt
  72. }
  73. aliasRef1 := &ast.AliasRef{
  74. Expression: &ast.BinaryExpr{
  75. OP: ast.ADD,
  76. LHS: &ast.FieldRef{
  77. StreamName: "src1",
  78. Name: "a",
  79. },
  80. RHS: &ast.FieldRef{
  81. StreamName: "src1",
  82. Name: "b",
  83. },
  84. },
  85. }
  86. aliasRef1.SetRefSource([]string{"src1"})
  87. aliasRef2 := &ast.AliasRef{
  88. Expression: &ast.BinaryExpr{
  89. OP: ast.ADD,
  90. LHS: &ast.FieldRef{
  91. StreamName: ast.AliasStream,
  92. Name: "sum",
  93. AliasRef: aliasRef1,
  94. },
  95. RHS: &ast.IntegerLiteral{
  96. Val: 1,
  97. },
  98. },
  99. }
  100. aliasRef2.SetRefSource([]string{"src1"})
  101. testcases := []struct {
  102. sql string
  103. p LogicalPlan
  104. err string
  105. }{
  106. {
  107. sql: "select a + b as sum, sum + 1 from src1",
  108. p: ProjectPlan{
  109. baseLogicalPlan: baseLogicalPlan{
  110. children: []LogicalPlan{
  111. DataSourcePlan{
  112. baseLogicalPlan: baseLogicalPlan{},
  113. name: "src1",
  114. streamFields: map[string]*ast.JsonStreamField{
  115. "a": nil,
  116. "b": nil,
  117. },
  118. streamStmt: streams["src1"],
  119. pruneFields: []string{},
  120. isSchemaless: true,
  121. metaFields: []string{},
  122. }.Init(),
  123. },
  124. },
  125. fields: []ast.Field{
  126. {
  127. AName: "sum",
  128. Expr: &ast.FieldRef{
  129. StreamName: ast.AliasStream,
  130. Name: "sum",
  131. AliasRef: aliasRef1,
  132. },
  133. },
  134. {
  135. Name: "kuiper_field_0",
  136. Expr: &ast.BinaryExpr{
  137. OP: ast.ADD,
  138. LHS: &ast.FieldRef{
  139. Name: "sum",
  140. StreamName: ast.AliasStream,
  141. AliasRef: aliasRef1,
  142. },
  143. RHS: &ast.IntegerLiteral{
  144. Val: 1,
  145. },
  146. },
  147. },
  148. },
  149. }.Init(),
  150. },
  151. {
  152. sql: "select a + b as sum, sum + 1 as sum2 from src1",
  153. p: ProjectPlan{
  154. baseLogicalPlan: baseLogicalPlan{
  155. children: []LogicalPlan{
  156. DataSourcePlan{
  157. baseLogicalPlan: baseLogicalPlan{},
  158. name: "src1",
  159. streamFields: map[string]*ast.JsonStreamField{
  160. "a": nil,
  161. "b": nil,
  162. },
  163. streamStmt: streams["src1"],
  164. pruneFields: []string{},
  165. isSchemaless: true,
  166. metaFields: []string{},
  167. }.Init(),
  168. },
  169. },
  170. fields: []ast.Field{
  171. {
  172. AName: "sum",
  173. Expr: &ast.FieldRef{
  174. StreamName: ast.AliasStream,
  175. Name: "sum",
  176. AliasRef: aliasRef1,
  177. },
  178. },
  179. {
  180. AName: "sum2",
  181. Expr: &ast.FieldRef{
  182. StreamName: ast.AliasStream,
  183. Name: "sum2",
  184. AliasRef: aliasRef2,
  185. },
  186. },
  187. },
  188. }.Init(),
  189. },
  190. }
  191. for i, tt := range testcases {
  192. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  193. if err != nil {
  194. t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
  195. continue
  196. }
  197. p, _ := createLogicalPlan(stmt, &api.RuleOption{
  198. IsEventTime: false,
  199. LateTol: 0,
  200. Concurrency: 0,
  201. BufferLength: 0,
  202. SendMetaToSink: false,
  203. Qos: 0,
  204. CheckpointInterval: 0,
  205. SendError: true,
  206. }, kv)
  207. if !reflect.DeepEqual(tt.p, p) {
  208. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
  209. }
  210. }
  211. }