planner_alias_test.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. // Copyright 2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package planner
  15. import (
  16. "encoding/json"
  17. "reflect"
  18. "strings"
  19. "testing"
  20. "github.com/gdexlab/go-render/render"
  21. "github.com/lf-edge/ekuiper/internal/pkg/store"
  22. "github.com/lf-edge/ekuiper/internal/xsql"
  23. "github.com/lf-edge/ekuiper/pkg/api"
  24. "github.com/lf-edge/ekuiper/pkg/ast"
  25. )
  26. func TestPlannerAlias(t *testing.T) {
  27. kv, err := store.GetKV("stream")
  28. if err != nil {
  29. t.Error(err)
  30. return
  31. }
  32. streamSqls := map[string]string{
  33. "src1": `CREATE STREAM src1 (
  34. ) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
  35. "src2": `CREATE STREAM src2 (
  36. ) WITH (DATASOURCE="src2", FORMAT="json", KEY="ts");`,
  37. "tableInPlanner": `CREATE TABLE tableInPlanner (
  38. id BIGINT,
  39. name STRING,
  40. value STRING,
  41. hum BIGINT
  42. ) WITH (TYPE="file");`,
  43. }
  44. types := map[string]ast.StreamType{
  45. "src1": ast.TypeStream,
  46. "src2": ast.TypeStream,
  47. "tableInPlanner": ast.TypeTable,
  48. }
  49. for name, sql := range streamSqls {
  50. s, err := json.Marshal(&xsql.StreamInfo{
  51. StreamType: types[name],
  52. Statement: sql,
  53. })
  54. if err != nil {
  55. t.Error(err)
  56. t.Fail()
  57. }
  58. err = kv.Set(name, string(s))
  59. if err != nil {
  60. t.Error(err)
  61. t.Fail()
  62. }
  63. }
  64. streams := make(map[string]*ast.StreamStmt)
  65. for n := range streamSqls {
  66. streamStmt, err := xsql.GetDataSource(kv, n)
  67. if err != nil {
  68. t.Errorf("fail to get stream %s, please check if stream is created", n)
  69. return
  70. }
  71. streams[n] = streamStmt
  72. }
  73. aliasRef1 := &ast.AliasRef{
  74. Expression: &ast.BinaryExpr{
  75. OP: ast.ADD,
  76. LHS: &ast.FieldRef{
  77. StreamName: "src1",
  78. Name: "a",
  79. },
  80. RHS: &ast.FieldRef{
  81. StreamName: "src1",
  82. Name: "b",
  83. },
  84. },
  85. }
  86. aliasRef1.SetRefSource([]string{"src1"})
  87. aliasRef2 := &ast.AliasRef{
  88. Expression: &ast.BinaryExpr{
  89. OP: ast.ADD,
  90. LHS: &ast.FieldRef{
  91. StreamName: ast.AliasStream,
  92. Name: "sum",
  93. AliasRef: aliasRef1,
  94. },
  95. RHS: &ast.IntegerLiteral{
  96. Val: 1,
  97. },
  98. },
  99. }
  100. aliasRef2.SetRefSource([]string{"src1"})
  101. testcases := []struct {
  102. sql string
  103. p LogicalPlan
  104. err string
  105. }{
  106. {
  107. sql: "select a + b as a, a + 1 from src1",
  108. p: ProjectPlan{
  109. baseLogicalPlan: baseLogicalPlan{
  110. children: []LogicalPlan{
  111. DataSourcePlan{
  112. baseLogicalPlan: baseLogicalPlan{},
  113. name: "src1",
  114. streamFields: map[string]*ast.JsonStreamField{
  115. "a": nil,
  116. "b": nil,
  117. },
  118. streamStmt: streams["src1"],
  119. pruneFields: []string{},
  120. isSchemaless: true,
  121. metaFields: []string{},
  122. }.Init(),
  123. },
  124. },
  125. fields: []ast.Field{
  126. {
  127. AName: "a",
  128. Expr: &ast.FieldRef{
  129. StreamName: ast.AliasStream,
  130. Name: "a",
  131. AliasRef: aliasRef1,
  132. },
  133. },
  134. {
  135. Name: "kuiper_field_0",
  136. Expr: &ast.BinaryExpr{
  137. OP: ast.ADD,
  138. LHS: &ast.FieldRef{
  139. Name: "a",
  140. StreamName: ast.AliasStream,
  141. AliasRef: aliasRef1,
  142. },
  143. RHS: &ast.IntegerLiteral{
  144. Val: 1,
  145. },
  146. },
  147. },
  148. },
  149. }.Init(),
  150. },
  151. {
  152. sql: "select a + b as sum, sum + 1 from src1",
  153. p: ProjectPlan{
  154. baseLogicalPlan: baseLogicalPlan{
  155. children: []LogicalPlan{
  156. DataSourcePlan{
  157. baseLogicalPlan: baseLogicalPlan{},
  158. name: "src1",
  159. streamFields: map[string]*ast.JsonStreamField{
  160. "a": nil,
  161. "b": nil,
  162. },
  163. streamStmt: streams["src1"],
  164. pruneFields: []string{},
  165. isSchemaless: true,
  166. metaFields: []string{},
  167. }.Init(),
  168. },
  169. },
  170. fields: []ast.Field{
  171. {
  172. AName: "sum",
  173. Expr: &ast.FieldRef{
  174. StreamName: ast.AliasStream,
  175. Name: "sum",
  176. AliasRef: aliasRef1,
  177. },
  178. },
  179. {
  180. Name: "kuiper_field_0",
  181. Expr: &ast.BinaryExpr{
  182. OP: ast.ADD,
  183. LHS: &ast.FieldRef{
  184. Name: "sum",
  185. StreamName: ast.AliasStream,
  186. AliasRef: aliasRef1,
  187. },
  188. RHS: &ast.IntegerLiteral{
  189. Val: 1,
  190. },
  191. },
  192. },
  193. },
  194. }.Init(),
  195. },
  196. {
  197. sql: "select a + b as sum, sum + 1 as sum2 from src1",
  198. p: ProjectPlan{
  199. baseLogicalPlan: baseLogicalPlan{
  200. children: []LogicalPlan{
  201. DataSourcePlan{
  202. baseLogicalPlan: baseLogicalPlan{},
  203. name: "src1",
  204. streamFields: map[string]*ast.JsonStreamField{
  205. "a": nil,
  206. "b": nil,
  207. },
  208. streamStmt: streams["src1"],
  209. pruneFields: []string{},
  210. isSchemaless: true,
  211. metaFields: []string{},
  212. }.Init(),
  213. },
  214. },
  215. fields: []ast.Field{
  216. {
  217. AName: "sum",
  218. Expr: &ast.FieldRef{
  219. StreamName: ast.AliasStream,
  220. Name: "sum",
  221. AliasRef: aliasRef1,
  222. },
  223. },
  224. {
  225. AName: "sum2",
  226. Expr: &ast.FieldRef{
  227. StreamName: ast.AliasStream,
  228. Name: "sum2",
  229. AliasRef: aliasRef2,
  230. },
  231. },
  232. },
  233. }.Init(),
  234. },
  235. }
  236. for i, tt := range testcases {
  237. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  238. if err != nil {
  239. t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
  240. continue
  241. }
  242. p, _ := createLogicalPlan(stmt, &api.RuleOption{
  243. IsEventTime: false,
  244. LateTol: 0,
  245. Concurrency: 0,
  246. BufferLength: 0,
  247. SendMetaToSink: false,
  248. Qos: 0,
  249. CheckpointInterval: 0,
  250. SendError: true,
  251. }, kv)
  252. if !reflect.DeepEqual(tt.p, p) {
  253. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
  254. }
  255. }
  256. }