planner_alias_test.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. // Copyright 2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package planner
  15. import (
  16. "encoding/json"
  17. "reflect"
  18. "strings"
  19. "testing"
  20. "github.com/gdexlab/go-render/render"
  21. "github.com/lf-edge/ekuiper/internal/pkg/store"
  22. "github.com/lf-edge/ekuiper/internal/xsql"
  23. "github.com/lf-edge/ekuiper/pkg/api"
  24. "github.com/lf-edge/ekuiper/pkg/ast"
  25. )
  26. func TestPlannerAlias(t *testing.T) {
  27. kv, err := store.GetKV("stream")
  28. if err != nil {
  29. t.Error(err)
  30. return
  31. }
  32. streamSqls := map[string]string{
  33. "src1": `CREATE STREAM src1 (
  34. ) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
  35. "src2": `CREATE STREAM src2 (
  36. ) WITH (DATASOURCE="src2", FORMAT="json", KEY="ts");`,
  37. "tableInPlanner": `CREATE TABLE tableInPlanner (
  38. id BIGINT,
  39. name STRING,
  40. value STRING,
  41. hum BIGINT
  42. ) WITH (TYPE="file");`,
  43. }
  44. types := map[string]ast.StreamType{
  45. "src1": ast.TypeStream,
  46. "src2": ast.TypeStream,
  47. "tableInPlanner": ast.TypeTable,
  48. }
  49. for name, sql := range streamSqls {
  50. s, err := json.Marshal(&xsql.StreamInfo{
  51. StreamType: types[name],
  52. Statement: sql,
  53. })
  54. if err != nil {
  55. t.Error(err)
  56. t.Fail()
  57. }
  58. err = kv.Set(name, string(s))
  59. if err != nil {
  60. t.Error(err)
  61. t.Fail()
  62. }
  63. }
  64. streams := make(map[string]*ast.StreamStmt)
  65. for n := range streamSqls {
  66. streamStmt, err := xsql.GetDataSource(kv, n)
  67. if err != nil {
  68. t.Errorf("fail to get stream %s, please check if stream is created", n)
  69. return
  70. }
  71. streams[n] = streamStmt
  72. }
  73. aliasRef1 := &ast.AliasRef{
  74. Expression: &ast.BinaryExpr{
  75. OP: ast.ADD,
  76. LHS: &ast.FieldRef{
  77. StreamName: "src1",
  78. Name: "a",
  79. },
  80. RHS: &ast.FieldRef{
  81. StreamName: "src1",
  82. Name: "b",
  83. },
  84. },
  85. }
  86. aliasRef1.SetRefSource([]string{"src1"})
  87. aliasRef2 := &ast.AliasRef{
  88. Expression: &ast.BinaryExpr{
  89. OP: ast.ADD,
  90. LHS: &ast.FieldRef{
  91. StreamName: ast.AliasStream,
  92. Name: "sum",
  93. AliasRef: aliasRef1,
  94. },
  95. RHS: &ast.IntegerLiteral{
  96. Val: 1,
  97. },
  98. },
  99. }
  100. aliasRef2.SetRefSource([]string{"src1"})
  101. testcases := []struct {
  102. sql string
  103. p LogicalPlan
  104. err string
  105. }{
  106. {
  107. sql: "select a + b as sum, sum + 1 as sum2 from src1",
  108. p: ProjectPlan{
  109. baseLogicalPlan: baseLogicalPlan{
  110. children: []LogicalPlan{
  111. DataSourcePlan{
  112. baseLogicalPlan: baseLogicalPlan{},
  113. name: "src1",
  114. streamFields: map[string]*ast.JsonStreamField{
  115. "a": nil,
  116. "b": nil,
  117. },
  118. streamStmt: streams["src1"],
  119. pruneFields: []string{},
  120. isSchemaless: true,
  121. metaFields: []string{},
  122. }.Init(),
  123. },
  124. },
  125. fields: []ast.Field{
  126. {
  127. AName: "sum",
  128. Expr: &ast.FieldRef{
  129. StreamName: ast.AliasStream,
  130. Name: "sum",
  131. AliasRef: aliasRef1,
  132. },
  133. },
  134. {
  135. AName: "sum2",
  136. Expr: &ast.FieldRef{
  137. StreamName: ast.AliasStream,
  138. Name: "sum2",
  139. AliasRef: aliasRef2,
  140. },
  141. },
  142. },
  143. }.Init(),
  144. },
  145. }
  146. for i, tt := range testcases {
  147. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  148. if err != nil {
  149. t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
  150. continue
  151. }
  152. p, _ := createLogicalPlan(stmt, &api.RuleOption{
  153. IsEventTime: false,
  154. LateTol: 0,
  155. Concurrency: 0,
  156. BufferLength: 0,
  157. SendMetaToSink: false,
  158. Qos: 0,
  159. CheckpointInterval: 0,
  160. SendError: true,
  161. }, kv)
  162. if !reflect.DeepEqual(tt.p, p) {
  163. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
  164. }
  165. }
  166. }