planner_test.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. package planner
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/xsql"
  5. "github.com/emqx/kuiper/xstream/api"
  6. "reflect"
  7. "strings"
  8. "testing"
  9. )
  10. func Test_createLogicalPlan(t *testing.T) {
  11. var tests = []struct {
  12. sql string
  13. p LogicalPlan
  14. err string
  15. }{
  16. { // 0
  17. sql: `SELECT name FROM tbl`,
  18. p: ProjectPlan{
  19. baseLogicalPlan: baseLogicalPlan{
  20. children: []LogicalPlan{
  21. DataSourcePlan{
  22. baseLogicalPlan: baseLogicalPlan{},
  23. name: "tbl",
  24. isWildCard: true,
  25. needMeta: false,
  26. fields: nil,
  27. metaFields: nil,
  28. }.Init(),
  29. },
  30. },
  31. fields: []xsql.Field{
  32. {
  33. Expr: &xsql.FieldRef{Name: "name"},
  34. Name: "name",
  35. AName: ""},
  36. },
  37. isAggregate: false,
  38. sendMeta: false,
  39. }.Init(),
  40. }, { // 1 optimize where to data source
  41. sql: `SELECT abc FROM src1 WHERE f1 = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`,
  42. p: ProjectPlan{
  43. baseLogicalPlan: baseLogicalPlan{
  44. children: []LogicalPlan{
  45. WindowPlan{
  46. baseLogicalPlan: baseLogicalPlan{
  47. children: []LogicalPlan{
  48. FilterPlan{
  49. baseLogicalPlan: baseLogicalPlan{
  50. children: []LogicalPlan{
  51. DataSourcePlan{
  52. name: "src1",
  53. isWildCard: true,
  54. needMeta: false,
  55. fields: nil,
  56. metaFields: nil,
  57. }.Init(),
  58. },
  59. },
  60. condition: &xsql.BinaryExpr{
  61. LHS: &xsql.FieldRef{Name: "f1"},
  62. OP: xsql.EQ,
  63. RHS: &xsql.StringLiteral{Val: "v1"},
  64. },
  65. }.Init(),
  66. },
  67. },
  68. condition: nil,
  69. wtype: xsql.TUMBLING_WINDOW,
  70. length: 10000,
  71. interval: 0,
  72. limit: 0,
  73. }.Init(),
  74. },
  75. },
  76. fields: []xsql.Field{
  77. {
  78. Expr: &xsql.FieldRef{Name: "abc"},
  79. Name: "abc",
  80. AName: ""},
  81. },
  82. isAggregate: false,
  83. sendMeta: false,
  84. }.Init(),
  85. }, { // 2 condition that cannot be optimized
  86. sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 WHERE src1.temp > 20 OR src2.hum > 60 GROUP BY TUMBLINGWINDOW(ss, 10)`,
  87. p: ProjectPlan{
  88. baseLogicalPlan: baseLogicalPlan{
  89. children: []LogicalPlan{
  90. FilterPlan{
  91. baseLogicalPlan: baseLogicalPlan{
  92. children: []LogicalPlan{
  93. JoinPlan{
  94. baseLogicalPlan: baseLogicalPlan{
  95. children: []LogicalPlan{
  96. WindowPlan{
  97. baseLogicalPlan: baseLogicalPlan{
  98. children: []LogicalPlan{
  99. DataSourcePlan{
  100. name: "src1",
  101. isWildCard: true,
  102. needMeta: false,
  103. fields: nil,
  104. metaFields: nil,
  105. }.Init(),
  106. DataSourcePlan{
  107. name: "src2",
  108. isWildCard: true,
  109. needMeta: false,
  110. fields: nil,
  111. metaFields: nil,
  112. }.Init(),
  113. },
  114. },
  115. condition: nil,
  116. wtype: xsql.TUMBLING_WINDOW,
  117. length: 10000,
  118. interval: 0,
  119. limit: 0,
  120. }.Init(),
  121. },
  122. },
  123. from: &xsql.Table{Name: "src1"},
  124. joins: xsql.Joins{xsql.Join{
  125. Name: "src2",
  126. JoinType: xsql.INNER_JOIN,
  127. Expr: &xsql.BinaryExpr{
  128. OP: xsql.EQ,
  129. LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"},
  130. RHS: &xsql.FieldRef{Name: "id2", StreamName: "src2"},
  131. },
  132. }},
  133. }.Init(),
  134. },
  135. },
  136. condition: &xsql.BinaryExpr{
  137. LHS: &xsql.BinaryExpr{
  138. OP: xsql.GT,
  139. LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"},
  140. RHS: &xsql.IntegerLiteral{Val: 20},
  141. },
  142. OP: xsql.OR,
  143. RHS: &xsql.BinaryExpr{
  144. OP: xsql.GT,
  145. LHS: &xsql.FieldRef{Name: "hum", StreamName: "src2"},
  146. RHS: &xsql.IntegerLiteral{Val: 60},
  147. },
  148. },
  149. }.Init(),
  150. },
  151. },
  152. fields: []xsql.Field{
  153. {
  154. Expr: &xsql.FieldRef{Name: "id1"},
  155. Name: "id1",
  156. AName: ""},
  157. },
  158. isAggregate: false,
  159. sendMeta: false,
  160. }.Init(),
  161. }, { // 3 optimize window filter
  162. sql: `SELECT abc FROM src1 WHERE f1 = "v1" GROUP BY TUMBLINGWINDOW(ss, 10) FILTER( WHERE size > 2)`,
  163. p: ProjectPlan{
  164. baseLogicalPlan: baseLogicalPlan{
  165. children: []LogicalPlan{
  166. WindowPlan{
  167. baseLogicalPlan: baseLogicalPlan{
  168. children: []LogicalPlan{
  169. FilterPlan{
  170. baseLogicalPlan: baseLogicalPlan{
  171. children: []LogicalPlan{
  172. DataSourcePlan{
  173. name: "src1",
  174. isWildCard: true,
  175. needMeta: false,
  176. fields: nil,
  177. metaFields: nil,
  178. }.Init(),
  179. },
  180. },
  181. condition: &xsql.BinaryExpr{
  182. OP: xsql.AND,
  183. LHS: &xsql.BinaryExpr{
  184. LHS: &xsql.FieldRef{Name: "f1"},
  185. OP: xsql.EQ,
  186. RHS: &xsql.StringLiteral{Val: "v1"},
  187. },
  188. RHS: &xsql.BinaryExpr{
  189. LHS: &xsql.FieldRef{Name: "size"},
  190. OP: xsql.GT,
  191. RHS: &xsql.IntegerLiteral{Val: 2},
  192. },
  193. },
  194. }.Init(),
  195. },
  196. },
  197. condition: nil,
  198. wtype: xsql.TUMBLING_WINDOW,
  199. length: 10000,
  200. interval: 0,
  201. limit: 0,
  202. }.Init(),
  203. },
  204. },
  205. fields: []xsql.Field{
  206. {
  207. Expr: &xsql.FieldRef{Name: "abc"},
  208. Name: "abc",
  209. AName: ""},
  210. },
  211. isAggregate: false,
  212. sendMeta: false,
  213. }.Init(),
  214. }, { // 4. do not optimize count window
  215. sql: `SELECT * FROM demo WHERE temperature > 20 GROUP BY COUNTWINDOW(5,1) HAVING COUNT(*) > 2`,
  216. p: ProjectPlan{
  217. baseLogicalPlan: baseLogicalPlan{
  218. children: []LogicalPlan{
  219. HavingPlan{
  220. baseLogicalPlan: baseLogicalPlan{
  221. children: []LogicalPlan{
  222. FilterPlan{
  223. baseLogicalPlan: baseLogicalPlan{
  224. children: []LogicalPlan{
  225. WindowPlan{
  226. baseLogicalPlan: baseLogicalPlan{
  227. children: []LogicalPlan{
  228. DataSourcePlan{
  229. name: "demo",
  230. isWildCard: true,
  231. needMeta: false,
  232. fields: nil,
  233. metaFields: nil,
  234. }.Init(),
  235. },
  236. },
  237. condition: nil,
  238. wtype: xsql.COUNT_WINDOW,
  239. length: 5,
  240. interval: 1,
  241. limit: 0,
  242. }.Init(),
  243. },
  244. },
  245. condition: &xsql.BinaryExpr{
  246. LHS: &xsql.FieldRef{Name: "temperature"},
  247. OP: xsql.GT,
  248. RHS: &xsql.IntegerLiteral{Val: 20},
  249. },
  250. }.Init(),
  251. },
  252. },
  253. condition: &xsql.BinaryExpr{
  254. LHS: &xsql.Call{Name: "COUNT", Args: []xsql.Expr{&xsql.StringLiteral{
  255. Val: "*",
  256. }}},
  257. OP: xsql.GT,
  258. RHS: &xsql.IntegerLiteral{Val: 2},
  259. },
  260. }.Init(),
  261. },
  262. },
  263. fields: []xsql.Field{
  264. {
  265. Expr: &xsql.Wildcard{Token: xsql.ASTERISK},
  266. Name: "",
  267. AName: ""},
  268. },
  269. isAggregate: false,
  270. sendMeta: false,
  271. }.Init(),
  272. },
  273. }
  274. //TODO optimize having, optimize on
  275. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  276. for i, tt := range tests {
  277. //fmt.Printf("Parsing SQL %q.\n", tt.s)
  278. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  279. if err != nil {
  280. t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
  281. }
  282. p, err := createLogicalPlan(stmt, &api.RuleOption{
  283. IsEventTime: false,
  284. LateTol: 0,
  285. Concurrency: 0,
  286. BufferLength: 0,
  287. SendMetaToSink: false,
  288. Qos: 0,
  289. CheckpointInterval: 0,
  290. })
  291. if err != nil {
  292. t.Errorf("%d. %q\n\nerror:%v\n\n", i, tt.sql, err)
  293. }
  294. if !reflect.DeepEqual(tt.p, p) {
  295. t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.p, p)
  296. }
  297. }
  298. }