analyzer.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393
  1. package planner
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common/kv"
  5. "github.com/emqx/kuiper/xsql"
  6. "strconv"
  7. "strings"
  8. )
  9. // Analyze the select statement by decorating the info from stream statement.
  10. // Typically, set the correct stream name for fieldRefs
  11. func decorateStmt(s *xsql.SelectStatement, store kv.KeyValue) ([]*xsql.StreamStmt, error) {
  12. streamsFromStmt := xsql.GetStreams(s)
  13. streamStmts := make([]*xsql.StreamStmt, len(streamsFromStmt))
  14. isSchemaless := false
  15. for i, s := range streamsFromStmt {
  16. streamStmt, err := xsql.GetDataSource(store, s)
  17. if err != nil {
  18. return nil, fmt.Errorf("fail to get stream %s, please check if stream is created", s)
  19. }
  20. streamStmts[i] = streamStmt
  21. // TODO fine grain control of schemaless
  22. if streamStmt.StreamFields == nil {
  23. isSchemaless = true
  24. }
  25. }
  26. dsn := xsql.DefaultStream
  27. if len(streamsFromStmt) == 1 {
  28. dsn = streamStmts[0].Name
  29. }
  30. // [fieldName][streamsName][*aliasRef] if alias, with special key alias/default. Each key has exactly one value
  31. fieldsMap := newFieldsMap(isSchemaless, dsn)
  32. if !isSchemaless {
  33. for _, streamStmt := range streamStmts {
  34. for _, field := range streamStmt.StreamFields {
  35. fieldsMap.reserve(field.Name, streamStmt.Name)
  36. }
  37. }
  38. }
  39. var (
  40. walkErr error
  41. aliasFields []*xsql.Field
  42. )
  43. // Scan columns fields: bind all field refs, collect alias
  44. for i, f := range s.Fields {
  45. xsql.WalkFunc(f.Expr, func(n xsql.Node) bool {
  46. switch f := n.(type) {
  47. case *xsql.FieldRef:
  48. if f.IsSQLField() {
  49. walkErr = fieldsMap.bind(f)
  50. }
  51. }
  52. return true
  53. })
  54. if walkErr != nil {
  55. return nil, walkErr
  56. }
  57. // assign name for anonymous select expression
  58. if f.Name == "" && f.AName == "" {
  59. s.Fields[i].Name = fieldsMap.getDefaultName()
  60. }
  61. if f.AName != "" {
  62. aliasFields = append(aliasFields, &s.Fields[i])
  63. }
  64. }
  65. // bind alias field expressions
  66. for _, f := range aliasFields {
  67. ar, err := xsql.NewAliasRef(f.Expr)
  68. if err != nil {
  69. walkErr = err
  70. } else {
  71. f.Expr = &xsql.FieldRef{
  72. StreamName: xsql.AliasStream,
  73. Name: f.AName,
  74. AliasRef: ar,
  75. }
  76. walkErr = fieldsMap.save(f.AName, xsql.AliasStream, ar)
  77. }
  78. }
  79. // bind field ref for alias AND set StreamName for all field ref
  80. xsql.WalkFunc(s, func(n xsql.Node) bool {
  81. switch f := n.(type) {
  82. case xsql.Fields: // do not bind selection fields, should have done above
  83. return false
  84. case *xsql.FieldRef:
  85. walkErr = fieldsMap.bind(f)
  86. }
  87. return true
  88. })
  89. if walkErr != nil {
  90. return nil, walkErr
  91. }
  92. walkErr = validate(s)
  93. return streamStmts, walkErr
  94. }
  95. func validate(s *xsql.SelectStatement) (err error) {
  96. if xsql.IsAggregate(s.Condition) {
  97. return fmt.Errorf("Not allowed to call aggregate functions in WHERE clause.")
  98. }
  99. if !allAggregate(s.Having) {
  100. return fmt.Errorf("Not allowed to call non-aggregate functions in HAVING clause.")
  101. }
  102. for _, d := range s.Dimensions {
  103. if xsql.IsAggregate(d.Expr) {
  104. return fmt.Errorf("Not allowed to call aggregate functions in GROUP BY clause.")
  105. }
  106. }
  107. xsql.WalkFunc(s, func(n xsql.Node) bool {
  108. switch f := n.(type) {
  109. case *xsql.Call:
  110. // aggregate call should not have any aggregate arg
  111. if xsql.IsAggFunc(f) {
  112. for _, arg := range f.Args {
  113. tr := xsql.IsAggregate(arg)
  114. if tr {
  115. err = fmt.Errorf("invalid argument for func %s: aggregate argument is not allowed", f.Name)
  116. return false
  117. }
  118. }
  119. }
  120. }
  121. return true
  122. })
  123. return
  124. }
  125. // file-private functions below
  126. // allAggregate checks if all expressions of binary expression are aggregate
  127. func allAggregate(expr xsql.Expr) (r bool) {
  128. r = true
  129. xsql.WalkFunc(expr, func(n xsql.Node) bool {
  130. switch f := expr.(type) {
  131. case *xsql.BinaryExpr:
  132. switch f.OP {
  133. case xsql.SUBSET, xsql.ARROW:
  134. // do nothing
  135. default:
  136. r = allAggregate(f.LHS) && allAggregate(f.RHS)
  137. return false
  138. }
  139. case *xsql.Call, *xsql.FieldRef:
  140. if !xsql.IsAggregate(f) {
  141. r = false
  142. return false
  143. }
  144. }
  145. return true
  146. })
  147. return
  148. }
  149. type fieldsMap struct {
  150. content map[string]streamFieldStore
  151. isSchemaless bool
  152. defaultStream xsql.StreamName
  153. }
  154. func newFieldsMap(isSchemaless bool, defaultStream xsql.StreamName) *fieldsMap {
  155. return &fieldsMap{content: make(map[string]streamFieldStore), isSchemaless: isSchemaless, defaultStream: defaultStream}
  156. }
  157. func (f *fieldsMap) reserve(fieldName string, streamName xsql.StreamName) {
  158. if fm, ok := f.content[strings.ToLower(fieldName)]; ok {
  159. fm.add(streamName)
  160. } else {
  161. fm := newStreamFieldStore(f.isSchemaless, f.defaultStream)
  162. fm.add(streamName)
  163. f.content[strings.ToLower(fieldName)] = fm
  164. }
  165. }
  166. func (f *fieldsMap) save(fieldName string, streamName xsql.StreamName, field *xsql.AliasRef) error {
  167. fm, ok := f.content[strings.ToLower(fieldName)]
  168. if !ok {
  169. if streamName == xsql.AliasStream || f.isSchemaless {
  170. fm = newStreamFieldStore(f.isSchemaless, f.defaultStream)
  171. f.content[strings.ToLower(fieldName)] = fm
  172. } else {
  173. return fmt.Errorf("unknown field %s", fieldName)
  174. }
  175. }
  176. err := fm.ref(streamName, field)
  177. if err != nil {
  178. return fmt.Errorf("%s%s", err, fieldName)
  179. }
  180. return nil
  181. }
  182. func (f *fieldsMap) bind(fr *xsql.FieldRef) error {
  183. fm, ok := f.content[strings.ToLower(fr.Name)]
  184. if !ok {
  185. if f.isSchemaless && fr.Name != "" {
  186. fm = newStreamFieldStore(f.isSchemaless, f.defaultStream)
  187. f.content[strings.ToLower(fr.Name)] = fm
  188. } else {
  189. return fmt.Errorf("unknown field %s", fr.Name)
  190. }
  191. }
  192. err := fm.bindRef(fr)
  193. if err != nil {
  194. return fmt.Errorf("%s%s", err, fr.Name)
  195. }
  196. return nil
  197. }
  198. func (f *fieldsMap) getDefaultName() string {
  199. for i := 0; i < 2048; i++ {
  200. key := xsql.DEFAULT_FIELD_NAME_PREFIX + strconv.Itoa(i)
  201. if _, ok := f.content[key]; !ok {
  202. return key
  203. }
  204. }
  205. return ""
  206. }
  207. type streamFieldStore interface {
  208. add(k xsql.StreamName)
  209. ref(k xsql.StreamName, v *xsql.AliasRef) error
  210. bindRef(f *xsql.FieldRef) error
  211. }
  212. func newStreamFieldStore(isSchemaless bool, defaultStream xsql.StreamName) streamFieldStore {
  213. if !isSchemaless {
  214. return &streamFieldMap{content: make(map[xsql.StreamName]*xsql.AliasRef)}
  215. } else {
  216. return &streamFieldMapSchemaless{content: make(map[xsql.StreamName]*xsql.AliasRef), defaultStream: defaultStream}
  217. }
  218. }
  219. type streamFieldMap struct {
  220. content map[xsql.StreamName]*xsql.AliasRef
  221. }
  222. // add the stream name must not be default.
  223. // This is used when traversing stream schema
  224. func (s *streamFieldMap) add(k xsql.StreamName) {
  225. s.content[k] = nil
  226. }
  227. //bind for schema field, all keys must be created before running bind
  228. // can bind alias & col. For alias, the stream name must be empty; For col, the field must be a col
  229. func (s *streamFieldMap) ref(k xsql.StreamName, v *xsql.AliasRef) error {
  230. if k == xsql.AliasStream { // must not exist, save alias ref for alias
  231. _, ok := s.content[k]
  232. if ok {
  233. return fmt.Errorf("duplicate alias ")
  234. }
  235. s.content[k] = v
  236. } else { // the key must exist after the schema travers, do validation
  237. if k == xsql.DefaultStream { // In schema mode, default stream won't be a key
  238. l := len(s.content)
  239. if l == 0 {
  240. return fmt.Errorf("unknow field ")
  241. } else if l == 1 {
  242. // valid, do nothing
  243. } else {
  244. return fmt.Errorf("ambiguous field ")
  245. }
  246. } else {
  247. _, ok := s.content[k]
  248. if !ok {
  249. return fmt.Errorf("unknow field %s.", k)
  250. }
  251. }
  252. }
  253. return nil
  254. }
  255. func (s *streamFieldMap) bindRef(fr *xsql.FieldRef) error {
  256. l := len(s.content)
  257. if fr.StreamName == "" {
  258. fr.StreamName = xsql.DefaultStream
  259. }
  260. k := fr.StreamName
  261. if k == xsql.DefaultStream {
  262. switch l {
  263. case 0:
  264. return fmt.Errorf("unknown field ")
  265. case 1: // if alias, return this
  266. for sk, sv := range s.content {
  267. fr.RefSelection(sv)
  268. fr.StreamName = sk
  269. }
  270. return nil
  271. default:
  272. r, ok := s.content[xsql.AliasStream] // if alias exists
  273. if ok {
  274. fr.RefSelection(r)
  275. fr.StreamName = xsql.AliasStream
  276. return nil
  277. } else {
  278. return fmt.Errorf("ambiguous field ")
  279. }
  280. }
  281. } else {
  282. r, ok := s.content[k]
  283. if ok {
  284. fr.RefSelection(r)
  285. return nil
  286. } else {
  287. return fmt.Errorf("unknown field %s.", k)
  288. }
  289. }
  290. }
  291. type streamFieldMapSchemaless struct {
  292. content map[xsql.StreamName]*xsql.AliasRef
  293. defaultStream xsql.StreamName
  294. }
  295. // add this should not be called for schemaless
  296. func (s *streamFieldMapSchemaless) add(k xsql.StreamName) {
  297. s.content[k] = nil
  298. }
  299. //bind for schemaless field, create column if not exist
  300. // can bind alias & col. For alias, the stream name must be empty; For col, the field must be a col
  301. func (s *streamFieldMapSchemaless) ref(k xsql.StreamName, v *xsql.AliasRef) error {
  302. if k == xsql.AliasStream { // must not exist
  303. _, ok := s.content[k]
  304. if ok {
  305. return fmt.Errorf("duplicate alias ")
  306. }
  307. s.content[k] = v
  308. } else { // the key may or may not exist. But always have only one default stream field.
  309. // Replace with stream name if another stream found. The key can be duplicate
  310. l := len(s.content)
  311. if k == xsql.DefaultStream { // In schemaless mode, default stream can only exist when length is 1
  312. if l < 1 {
  313. // valid, do nothing
  314. } else {
  315. return fmt.Errorf("ambiguous field ")
  316. }
  317. } else {
  318. if l == 1 {
  319. for sk := range s.content {
  320. if sk == xsql.DefaultStream {
  321. delete(s.content, k)
  322. }
  323. }
  324. }
  325. }
  326. }
  327. return nil
  328. }
  329. func (s *streamFieldMapSchemaless) bindRef(fr *xsql.FieldRef) error {
  330. l := len(s.content)
  331. if fr.StreamName == "" || fr.StreamName == xsql.DefaultStream {
  332. if l == 1 {
  333. for sk := range s.content {
  334. fr.StreamName = sk
  335. }
  336. } else {
  337. fr.StreamName = s.defaultStream
  338. }
  339. }
  340. k := fr.StreamName
  341. if k == xsql.DefaultStream {
  342. switch l {
  343. case 0: // must be a column because alias are fields and have been traversed
  344. // reserve a hole and do nothing
  345. s.content[k] = nil
  346. return nil
  347. case 1: // if alias or single col, return this
  348. for sk, sv := range s.content {
  349. fr.RefSelection(sv)
  350. fr.StreamName = sk
  351. }
  352. return nil
  353. default:
  354. r, ok := s.content[xsql.AliasStream] // if alias exists
  355. if ok {
  356. fr.RefSelection(r)
  357. fr.StreamName = xsql.AliasStream
  358. return nil
  359. } else {
  360. return fmt.Errorf("ambiguous field ")
  361. }
  362. }
  363. } else {
  364. r, ok := s.content[k]
  365. if !ok { // reserver a hole
  366. s.content[k] = nil
  367. } else {
  368. fr.RefSelection(r)
  369. }
  370. return nil
  371. }
  372. }