ast.go 44 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148
  1. package xsql
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "math"
  7. "reflect"
  8. "sort"
  9. "strings"
  10. "time"
  11. )
  12. type Node interface {
  13. node()
  14. }
  15. type NameNode interface {
  16. Node
  17. GetName() string
  18. }
  19. type Expr interface {
  20. Node
  21. expr()
  22. }
  23. type Field struct {
  24. Name string
  25. AName string
  26. Expr
  27. }
  28. type Source interface {
  29. Node
  30. source()
  31. }
  32. type Sources []Source
  33. func (ss Sources) node() {}
  34. type Table struct {
  35. Name string
  36. Alias string
  37. }
  38. func (t *Table) source() {}
  39. func (ss *Table) node() {}
  40. type JoinType int
  41. const (
  42. LEFT_JOIN JoinType = iota
  43. INNER_JOIN
  44. RIGHT_JOIN
  45. FULL_JOIN
  46. CROSS_JOIN
  47. )
  48. const (
  49. DEFAULT_FIELD_NAME_PREFIX string = "kuiper_field_"
  50. PRIVATE_PREFIX string = "$$"
  51. )
  52. var COLUMN_SEPARATOR = tokens[COLSEP]
  53. type Join struct {
  54. Name string
  55. Alias string
  56. JoinType JoinType
  57. Expr Expr
  58. }
  59. func (j *Join) source() {}
  60. func (ss *Join) node() {}
  61. type Joins []Join
  62. func (ss Joins) node() {}
  63. type Statement interface {
  64. Stmt()
  65. Node
  66. }
  67. type SelectStatement struct {
  68. Fields Fields
  69. Sources Sources
  70. Joins Joins
  71. Condition Expr
  72. Dimensions Dimensions
  73. Having Expr
  74. SortFields SortFields
  75. }
  76. func (ss *SelectStatement) Stmt() {}
  77. func (ss *SelectStatement) node() {}
  78. type Literal interface {
  79. Expr
  80. literal()
  81. }
  82. type ParenExpr struct {
  83. Expr Expr
  84. }
  85. type ArrowExpr struct {
  86. Expr Expr
  87. }
  88. type BracketExpr struct {
  89. Expr Expr
  90. }
  91. type ColonExpr struct {
  92. Start int
  93. End int
  94. }
  95. type IndexExpr struct {
  96. Index int
  97. }
  98. type BooleanLiteral struct {
  99. Val bool
  100. }
  101. type TimeLiteral struct {
  102. Val Token
  103. }
  104. type IntegerLiteral struct {
  105. Val int
  106. }
  107. type StringLiteral struct {
  108. Val string
  109. }
  110. type NumberLiteral struct {
  111. Val float64
  112. }
  113. type Wildcard struct {
  114. Token Token
  115. }
  116. type Dimension struct {
  117. Expr Expr
  118. }
  119. type SortField struct {
  120. Name string
  121. Ascending bool
  122. }
  123. type SortFields []SortField
  124. type Dimensions []Dimension
  125. func (f *Field) expr() {}
  126. func (f *Field) node() {}
  127. func (f *Field) GetName() string {
  128. if f.AName != "" {
  129. return f.AName
  130. } else {
  131. return f.Name
  132. }
  133. }
  134. func (f *Field) IsSelectionField() bool {
  135. if f.AName != "" {
  136. return true
  137. }
  138. _, ok := f.Expr.(*FieldRef)
  139. if ok {
  140. return true
  141. }
  142. return false
  143. }
  144. func (f *Field) IsColumn() bool {
  145. if f.AName != "" {
  146. return false
  147. }
  148. _, ok := f.Expr.(*FieldRef)
  149. if ok {
  150. return true
  151. }
  152. return false
  153. }
  154. func (pe *ParenExpr) expr() {}
  155. func (pe *ParenExpr) node() {}
  156. func (ae *ArrowExpr) expr() {}
  157. func (ae *ArrowExpr) node() {}
  158. func (be *BracketExpr) expr() {}
  159. func (be *BracketExpr) node() {}
  160. func (be *ColonExpr) expr() {}
  161. func (be *ColonExpr) node() {}
  162. func (be *IndexExpr) expr() {}
  163. func (be *IndexExpr) node() {}
  164. func (w *Wildcard) expr() {}
  165. func (w *Wildcard) node() {}
  166. func (bl *BooleanLiteral) expr() {}
  167. func (bl *BooleanLiteral) literal() {}
  168. func (bl *BooleanLiteral) node() {}
  169. func (tl *TimeLiteral) expr() {}
  170. func (tl *TimeLiteral) literal() {}
  171. func (tl *TimeLiteral) node() {}
  172. func (il *IntegerLiteral) expr() {}
  173. func (il *IntegerLiteral) literal() {}
  174. func (il *IntegerLiteral) node() {}
  175. func (nl *NumberLiteral) expr() {}
  176. func (nl *NumberLiteral) literal() {}
  177. func (nl *NumberLiteral) node() {}
  178. func (sl *StringLiteral) expr() {}
  179. func (sl *StringLiteral) literal() {}
  180. func (sl *StringLiteral) node() {}
  181. func (d *Dimension) expr() {}
  182. func (d *Dimension) node() {}
  183. func (d Dimensions) node() {}
  184. func (d *Dimensions) GetWindow() *Window {
  185. for _, child := range *d {
  186. if w, ok := child.Expr.(*Window); ok {
  187. return w
  188. }
  189. }
  190. return nil
  191. }
  192. func (d *Dimensions) GetGroups() Dimensions {
  193. var nd Dimensions
  194. for _, child := range *d {
  195. if _, ok := child.Expr.(*Window); !ok {
  196. nd = append(nd, child)
  197. }
  198. }
  199. return nd
  200. }
  201. func (sf *SortField) expr() {}
  202. func (sf *SortField) node() {}
  203. func (sf SortFields) node() {}
  204. type Call struct {
  205. Name string
  206. Args []Expr
  207. }
  208. func (c *Call) expr() {}
  209. func (c *Call) literal() {}
  210. func (c *Call) node() {}
  211. type WhenClause struct {
  212. // The condition expression
  213. Expr Expr
  214. Result Expr
  215. }
  216. func (w *WhenClause) expr() {}
  217. func (w *WhenClause) literal() {}
  218. func (w *WhenClause) node() {}
  219. type CaseExpr struct {
  220. // The compare value expression. It can be a value expression or nil.
  221. // When it is nil, the WhenClause Expr must be a logical(comparison) expression
  222. Value Expr
  223. WhenClauses []*WhenClause
  224. ElseClause Expr
  225. }
  226. func (c *CaseExpr) expr() {}
  227. func (c *CaseExpr) literal() {}
  228. func (c *CaseExpr) node() {}
  229. type WindowType int
  230. const (
  231. NOT_WINDOW WindowType = iota
  232. TUMBLING_WINDOW
  233. HOPPING_WINDOW
  234. SLIDING_WINDOW
  235. SESSION_WINDOW
  236. COUNT_WINDOW
  237. )
  238. type Window struct {
  239. WindowType WindowType
  240. Length *IntegerLiteral
  241. Interval *IntegerLiteral
  242. Filter Expr
  243. }
  244. func (w *Window) expr() {}
  245. func (w *Window) literal() {}
  246. func (w *Window) node() {}
  247. type SelectStatements []SelectStatement
  248. func (ss *SelectStatements) node() {}
  249. type Fields []Field
  250. func (fs Fields) node() {}
  251. func (fs Fields) Len() int {
  252. return len(fs)
  253. }
  254. func (fs Fields) Swap(i, j int) {
  255. fs[i], fs[j] = fs[j], fs[i]
  256. }
  257. func (fs Fields) Less(i int, j int) bool {
  258. m := fs[i].AName
  259. if m == "" {
  260. m = fs[i].Name
  261. }
  262. n := fs[j].AName
  263. if n == "" {
  264. n = fs[j].Name
  265. }
  266. return m < n
  267. }
  268. type BinaryExpr struct {
  269. OP Token
  270. LHS Expr
  271. RHS Expr
  272. }
  273. func (fe *BinaryExpr) expr() {}
  274. func (be *BinaryExpr) node() {}
  275. const (
  276. DefaultStream = StreamName("$$default")
  277. AliasStream = StreamName("$$alias")
  278. )
  279. // FieldRef could be
  280. // 1. SQL Field
  281. // 1.1 Explicit field "stream.col"
  282. // 1.2 Implicit field "col" -> only exist in schemaless stream. Otherwise, explicit stream name will be bound
  283. // 1.3 Alias field "expr as c" -> refer to an expression or column
  284. // 2. Json expression field like a->b
  285. type FieldRef struct {
  286. // optional, bind in analyzer, empty means alias, default means not set
  287. // MUST have after binding for SQL fields. For 1.2,1.3 and 1.4, use special constant as stream name
  288. StreamName StreamName
  289. // optional, set only once. For selections, empty name will be assigned a default name
  290. // MUST have after binding, assign a name for 1.4
  291. Name string
  292. // Only for alias
  293. *AliasRef
  294. }
  295. func (fr *FieldRef) expr() {}
  296. func (fr *FieldRef) node() {}
  297. func (fr *FieldRef) IsColumn() bool {
  298. return fr.StreamName != AliasStream && fr.StreamName != ""
  299. }
  300. func (fr *FieldRef) IsAlias() bool {
  301. return fr.StreamName == AliasStream
  302. }
  303. func (fr *FieldRef) IsSQLField() bool {
  304. return fr.StreamName != ""
  305. }
  306. func (fr *FieldRef) IsAggregate() bool {
  307. if fr.StreamName != AliasStream {
  308. return false
  309. }
  310. // lazy calculate
  311. if fr.isAggregate == nil {
  312. tr := IsAggregate(fr.expression)
  313. fr.isAggregate = &tr
  314. }
  315. return *fr.isAggregate
  316. }
  317. func (fr *FieldRef) RefSelection(a *AliasRef) {
  318. fr.AliasRef = a
  319. }
  320. // RefSources Must call after binding or will get empty
  321. func (fr *FieldRef) RefSources() []StreamName {
  322. if fr.StreamName == AliasStream {
  323. return fr.refSources
  324. } else if fr.StreamName != "" {
  325. return []StreamName{fr.StreamName}
  326. } else {
  327. return nil
  328. }
  329. }
  330. // SetRefSource Only call this for alias field ref
  331. func (fr *FieldRef) SetRefSource(names []StreamName) {
  332. fr.refSources = names
  333. }
  334. type AliasRef struct {
  335. // MUST have, It is used for evaluation
  336. expression Expr
  337. // MUST have after binding, calculate once in initializer. Could be 0 when alias an expression without col like "1+2"
  338. refSources []StreamName
  339. // optional, lazy set when calculating isAggregate
  340. isAggregate *bool
  341. }
  342. func NewAliasRef(e Expr) (*AliasRef, error) {
  343. r := make(map[StreamName]bool)
  344. var walkErr error
  345. WalkFunc(e, func(n Node) bool {
  346. switch f := n.(type) {
  347. case *FieldRef:
  348. switch f.StreamName {
  349. case AliasStream:
  350. walkErr = fmt.Errorf("cannot use alias %s inside another alias %v", f.Name, e)
  351. return false
  352. default:
  353. r[f.StreamName] = true
  354. }
  355. }
  356. return true
  357. })
  358. if walkErr != nil {
  359. return nil, walkErr
  360. }
  361. rs := make([]StreamName, 0)
  362. for k := range r {
  363. rs = append(rs, k)
  364. }
  365. return &AliasRef{
  366. expression: e,
  367. refSources: rs,
  368. }, nil
  369. }
  370. // for testing only
  371. func MockAliasRef(e Expr, r []StreamName, a *bool) *AliasRef {
  372. return &AliasRef{e, r, a}
  373. }
  374. type MetaRef struct {
  375. StreamName StreamName
  376. Name string
  377. }
  378. func (fr *MetaRef) expr() {}
  379. func (fr *MetaRef) node() {}
  380. // The stream AST tree
  381. type Options struct {
  382. DATASOURCE string
  383. KEY string
  384. FORMAT string
  385. CONF_KEY string
  386. TYPE string
  387. STRICT_VALIDATION bool
  388. TIMESTAMP string
  389. TIMESTAMP_FORMAT string
  390. RETAIN_SIZE int
  391. SHARED bool
  392. }
  393. func (o Options) node() {}
  394. type StreamName string
  395. func (sn *StreamName) node() {}
  396. type StreamType int
  397. const (
  398. TypeStream StreamType = iota
  399. TypeTable
  400. )
  401. var StreamTypeMap = map[StreamType]string{
  402. TypeStream: "stream",
  403. TypeTable: "table",
  404. }
  405. type StreamStmt struct {
  406. Name StreamName
  407. StreamFields StreamFields
  408. Options *Options
  409. StreamType StreamType //default to TypeStream
  410. }
  411. func (ss *StreamStmt) node() {}
  412. func (ss *StreamStmt) Stmt() {}
  413. type FieldType interface {
  414. fieldType()
  415. Node
  416. }
  417. type StreamField struct {
  418. Name string
  419. FieldType
  420. }
  421. func (u *StreamField) MarshalJSON() ([]byte, error) {
  422. return json.Marshal(&struct {
  423. FieldType interface{}
  424. Name string
  425. }{
  426. FieldType: PrintFieldTypeForJson(u.FieldType),
  427. Name: u.Name,
  428. })
  429. }
  430. type StreamFields []StreamField
  431. func (sf StreamFields) node() {}
  432. type BasicType struct {
  433. Type DataType
  434. }
  435. func (bt *BasicType) fieldType() {}
  436. func (bt *BasicType) node() {}
  437. type ArrayType struct {
  438. Type DataType
  439. FieldType
  440. }
  441. func (at *ArrayType) fieldType() {}
  442. func (at *ArrayType) node() {}
  443. type RecType struct {
  444. StreamFields StreamFields
  445. }
  446. func (rt *RecType) fieldType() {}
  447. func (rt *RecType) node() {}
  448. type ShowStreamsStatement struct {
  449. }
  450. type DescribeStreamStatement struct {
  451. Name string
  452. }
  453. type ExplainStreamStatement struct {
  454. Name string
  455. }
  456. type DropStreamStatement struct {
  457. Name string
  458. }
  459. func (ss *ShowStreamsStatement) Stmt() {}
  460. func (ss *ShowStreamsStatement) node() {}
  461. func (dss *DescribeStreamStatement) Stmt() {}
  462. func (dss *DescribeStreamStatement) node() {}
  463. func (dss *DescribeStreamStatement) GetName() string { return dss.Name }
  464. func (ess *ExplainStreamStatement) Stmt() {}
  465. func (ess *ExplainStreamStatement) node() {}
  466. func (ess *ExplainStreamStatement) GetName() string { return ess.Name }
  467. func (dss *DropStreamStatement) Stmt() {}
  468. func (dss *DropStreamStatement) node() {}
  469. func (dss *DropStreamStatement) GetName() string { return dss.Name }
  470. type ShowTablesStatement struct {
  471. }
  472. type DescribeTableStatement struct {
  473. Name string
  474. }
  475. type ExplainTableStatement struct {
  476. Name string
  477. }
  478. type DropTableStatement struct {
  479. Name string
  480. }
  481. func (ss *ShowTablesStatement) Stmt() {}
  482. func (ss *ShowTablesStatement) node() {}
  483. func (dss *DescribeTableStatement) Stmt() {}
  484. func (dss *DescribeTableStatement) node() {}
  485. func (dss *DescribeTableStatement) GetName() string { return dss.Name }
  486. func (ess *ExplainTableStatement) Stmt() {}
  487. func (ess *ExplainTableStatement) node() {}
  488. func (ess *ExplainTableStatement) GetName() string { return ess.Name }
  489. func (dss *DropTableStatement) Stmt() {}
  490. func (dss *DropTableStatement) node() {}
  491. func (dss *DropTableStatement) GetName() string { return dss.Name }
  492. type Visitor interface {
  493. Visit(Node) bool
  494. }
  495. func Walk(v Visitor, node Node) {
  496. if node == nil || reflect.ValueOf(node).IsNil() {
  497. return
  498. }
  499. if !v.Visit(node) {
  500. return
  501. }
  502. switch n := node.(type) {
  503. case *BinaryExpr:
  504. Walk(v, n.LHS)
  505. Walk(v, n.RHS)
  506. case *Call:
  507. for _, expr := range n.Args {
  508. Walk(v, expr)
  509. }
  510. case Dimensions:
  511. Walk(v, n.GetWindow())
  512. for _, dimension := range n.GetGroups() {
  513. Walk(v, dimension.Expr)
  514. }
  515. case *Window:
  516. Walk(v, n.Length)
  517. Walk(v, n.Interval)
  518. Walk(v, n.Filter)
  519. case *Field:
  520. Walk(v, n.Expr)
  521. if fr, ok := n.Expr.(*FieldRef); ok && fr.IsAlias() {
  522. Walk(v, fr.expression)
  523. }
  524. case Fields:
  525. for _, c := range n {
  526. Walk(v, &c)
  527. }
  528. case *ParenExpr:
  529. Walk(v, n.Expr)
  530. case *SelectStatement:
  531. Walk(v, n.Fields)
  532. Walk(v, n.Dimensions)
  533. Walk(v, n.Sources)
  534. Walk(v, n.Joins)
  535. Walk(v, n.Condition)
  536. Walk(v, n.SortFields)
  537. Walk(v, n.Having)
  538. case SortFields:
  539. for _, sf := range n {
  540. Walk(v, &sf)
  541. }
  542. case Sources:
  543. for _, s := range n {
  544. Walk(v, s)
  545. }
  546. case Joins:
  547. for _, s := range n {
  548. Walk(v, &s)
  549. }
  550. case *Join:
  551. Walk(v, n.Expr)
  552. case *CaseExpr:
  553. Walk(v, n.Value)
  554. for _, w := range n.WhenClauses {
  555. Walk(v, w)
  556. }
  557. Walk(v, n.ElseClause)
  558. case *WhenClause:
  559. Walk(v, n.Expr)
  560. Walk(v, n.Result)
  561. case *StreamStmt:
  562. Walk(v, &n.Name)
  563. Walk(v, n.StreamFields)
  564. Walk(v, n.Options)
  565. case *BasicType, *ArrayType, *RecType:
  566. Walk(v, n)
  567. case *ShowStreamsStatement, *DescribeStreamStatement, *ExplainStreamStatement, *DropStreamStatement,
  568. *ShowTablesStatement, *DescribeTableStatement, *ExplainTableStatement, *DropTableStatement:
  569. Walk(v, n)
  570. }
  571. }
  572. // WalkFunc traverses a node hierarchy in depth-first order.
  573. func WalkFunc(node Node, fn func(Node) bool) {
  574. Walk(walkFuncVisitor(fn), node)
  575. }
  576. type walkFuncVisitor func(Node) bool
  577. func (fn walkFuncVisitor) Visit(n Node) bool { return fn(n) }
  578. // Valuer is the interface that wraps the Value() method.
  579. type Valuer interface {
  580. // Value returns the value and existence flag for a given key.
  581. Value(key string) (interface{}, bool)
  582. Meta(key string) (interface{}, bool)
  583. AppendAlias(key string, value interface{}) bool
  584. }
  585. // CallValuer implements the Call method for evaluating function calls.
  586. type CallValuer interface {
  587. Valuer
  588. // Call is invoked to evaluate a function call (if possible).
  589. Call(name string, args []interface{}) (interface{}, bool)
  590. }
  591. type AggregateCallValuer interface {
  592. CallValuer
  593. GetAllTuples() AggregateData
  594. GetSingleCallValuer() CallValuer
  595. }
  596. type Wildcarder interface {
  597. // Value returns the value and existence flag for a given key.
  598. All(stream string) (interface{}, bool)
  599. }
  600. type DataValuer interface {
  601. Valuer
  602. Wildcarder
  603. Clone() DataValuer
  604. }
  605. type WildcardValuer struct {
  606. Data Wildcarder
  607. }
  608. //TODO deal with wildcard of a stream, e.g. SELECT Table.* from Table inner join Table1
  609. func (wv *WildcardValuer) Value(key string) (interface{}, bool) {
  610. if key == "" {
  611. return wv.Data.All(key)
  612. } else {
  613. a := strings.Index(key, COLUMN_SEPARATOR+"*")
  614. if a <= 0 {
  615. return nil, false
  616. } else {
  617. return wv.Data.All(key[:a])
  618. }
  619. }
  620. }
  621. func (wv *WildcardValuer) Meta(string) (interface{}, bool) {
  622. return nil, false
  623. }
  624. func (wv *WildcardValuer) AppendAlias(string, interface{}) bool {
  625. // do nothing
  626. return false
  627. }
  628. /**********************************
  629. ** Various Data Types for SQL transformation
  630. */
  631. type AggregateData interface {
  632. AggregateEval(expr Expr, v CallValuer) []interface{}
  633. GetWindowStart() int64
  634. GetWindowEnd() int64
  635. }
  636. // Message is a valuer that substitutes values for the mapped interface.
  637. type Message map[string]interface{}
  638. func ToMessage(input interface{}) (Message, bool) {
  639. var result Message
  640. switch m := input.(type) {
  641. case Message:
  642. result = m
  643. case Metadata:
  644. result = Message(m)
  645. case map[string]interface{}:
  646. result = m
  647. default:
  648. return nil, false
  649. }
  650. return result, true
  651. }
  652. // Value returns the value for a key in the Message.
  653. func (m Message) Value(key string) (interface{}, bool) {
  654. var colkey string
  655. if keys := strings.Split(key, COLUMN_SEPARATOR); len(keys) == 1 {
  656. colkey = key
  657. } else if len(keys) == 2 {
  658. colkey = keys[1]
  659. } else {
  660. common.Log.Println("Invalid key: " + key + ", expect source.field or field.")
  661. return nil, false
  662. }
  663. key1 := strings.ToLower(colkey)
  664. if v, ok := m[key1]; ok {
  665. return v, ok
  666. } else {
  667. //Only when with 'SELECT * FROM ...' and 'schemaless', the key in map is not convert to lower case.
  668. //So all of keys in map should be convert to lowercase and then compare them.
  669. return m.getIgnoreCase(colkey)
  670. }
  671. }
  672. func (m Message) getIgnoreCase(key interface{}) (interface{}, bool) {
  673. if k, ok := key.(string); ok {
  674. key = strings.ToLower(k)
  675. for k, v := range m {
  676. if strings.ToLower(k) == key {
  677. return v, true
  678. }
  679. }
  680. }
  681. return nil, false
  682. }
  683. func (m Message) Meta(key string) (interface{}, bool) {
  684. if key == "*" {
  685. return map[string]interface{}(m), true
  686. }
  687. return m.Value(key)
  688. }
  689. func (m Message) AppendAlias(k string, v interface{}) bool {
  690. fmt.Printf("append alias %s:%v\n", k, v)
  691. return false
  692. }
  693. type Event interface {
  694. GetTimestamp() int64
  695. IsWatermark() bool
  696. }
  697. type Metadata Message
  698. func (m Metadata) Value(key string) (interface{}, bool) {
  699. msg := Message(m)
  700. return msg.Value(key)
  701. }
  702. func (m Metadata) Meta(key string) (interface{}, bool) {
  703. if key == "*" {
  704. return map[string]interface{}(m), true
  705. }
  706. msg := Message(m)
  707. return msg.Meta(key)
  708. }
  709. type Alias struct {
  710. AliasMap Message
  711. }
  712. func (a *Alias) AppendAlias(key string, value interface{}) bool {
  713. if a.AliasMap == nil {
  714. a.AliasMap = make(map[string]interface{})
  715. }
  716. a.AliasMap[PRIVATE_PREFIX+key] = value
  717. return true
  718. }
  719. func (a *Alias) AliasValue(key string) (interface{}, bool) {
  720. if a.AliasMap == nil {
  721. return nil, false
  722. }
  723. return a.AliasMap.Value(key)
  724. }
  725. type Tuple struct {
  726. Emitter string
  727. Message Message // immutable
  728. Timestamp int64
  729. Metadata Metadata // immutable
  730. Alias
  731. }
  732. func (t *Tuple) Value(key string) (interface{}, bool) {
  733. r, ok := t.AliasValue(key)
  734. if ok {
  735. return r, ok
  736. }
  737. return t.Message.Value(key)
  738. }
  739. func (t *Tuple) Meta(key string) (interface{}, bool) {
  740. if key == "*" {
  741. return map[string]interface{}(t.Metadata), true
  742. }
  743. return t.Metadata.Value(key)
  744. }
  745. func (t *Tuple) All(string) (interface{}, bool) {
  746. return t.Message, true
  747. }
  748. func (t *Tuple) AggregateEval(expr Expr, v CallValuer) []interface{} {
  749. return []interface{}{Eval(expr, MultiValuer(t, v, &WildcardValuer{t}))}
  750. }
  751. func (t *Tuple) GetWindowStart() int64 {
  752. return 0
  753. }
  754. func (t *Tuple) GetWindowEnd() int64 {
  755. return 0
  756. }
  757. func (t *Tuple) GetTimestamp() int64 {
  758. return t.Timestamp
  759. }
  760. func (t *Tuple) GetMetadata() Metadata {
  761. return t.Metadata
  762. }
  763. func (t *Tuple) IsWatermark() bool {
  764. return false
  765. }
  766. func (t *Tuple) Clone() DataValuer {
  767. c := &Tuple{
  768. Emitter: t.Emitter,
  769. Timestamp: t.Timestamp,
  770. }
  771. if t.Message != nil {
  772. m := Message{}
  773. for k, v := range t.Message {
  774. m[k] = v
  775. }
  776. c.Message = m
  777. }
  778. if t.Metadata != nil {
  779. md := Metadata{}
  780. for k, v := range t.Metadata {
  781. md[k] = v
  782. }
  783. c.Metadata = md
  784. }
  785. return c
  786. }
  787. type WindowTuples struct {
  788. Emitter string
  789. Tuples []Tuple
  790. }
  791. type WindowRange struct {
  792. WindowStart int64
  793. WindowEnd int64
  794. }
  795. func (r *WindowRange) GetWindowStart() int64 {
  796. return r.WindowStart
  797. }
  798. func (r *WindowRange) GetWindowEnd() int64 {
  799. return r.WindowEnd
  800. }
  801. type WindowTuplesSet struct {
  802. Content []WindowTuples
  803. *WindowRange
  804. }
  805. func (w WindowTuplesSet) GetBySrc(src string) []Tuple {
  806. for _, me := range w.Content {
  807. if me.Emitter == src {
  808. return me.Tuples
  809. }
  810. }
  811. return nil
  812. }
  813. func (w WindowTuplesSet) Len() int {
  814. if len(w.Content) > 0 {
  815. return len(w.Content[0].Tuples)
  816. }
  817. return 0
  818. }
  819. func (w WindowTuplesSet) Swap(i, j int) {
  820. if len(w.Content) > 0 {
  821. s := w.Content[0].Tuples
  822. s[i], s[j] = s[j], s[i]
  823. }
  824. }
  825. func (w WindowTuplesSet) Index(i int) Valuer {
  826. if len(w.Content) > 0 {
  827. s := w.Content[0].Tuples
  828. return &(s[i])
  829. }
  830. return nil
  831. }
  832. func (w WindowTuplesSet) AddTuple(tuple *Tuple) WindowTuplesSet {
  833. found := false
  834. for i, t := range w.Content {
  835. if t.Emitter == tuple.Emitter {
  836. t.Tuples = append(t.Tuples, *tuple)
  837. found = true
  838. w.Content[i] = t
  839. break
  840. }
  841. }
  842. if !found {
  843. ets := &WindowTuples{Emitter: tuple.Emitter}
  844. ets.Tuples = append(ets.Tuples, *tuple)
  845. w.Content = append(w.Content, *ets)
  846. }
  847. return w
  848. }
  849. //Sort by tuple timestamp
  850. func (w WindowTuplesSet) Sort() {
  851. for _, t := range w.Content {
  852. tuples := t.Tuples
  853. sort.SliceStable(tuples, func(i, j int) bool {
  854. return tuples[i].Timestamp < tuples[j].Timestamp
  855. })
  856. t.Tuples = tuples
  857. }
  858. }
  859. func (w WindowTuplesSet) AggregateEval(expr Expr, v CallValuer) []interface{} {
  860. var result []interface{}
  861. if len(w.Content) != 1 { //should never happen
  862. return nil
  863. }
  864. for _, t := range w.Content[0].Tuples {
  865. result = append(result, Eval(expr, MultiValuer(&t, v, &WildcardValuer{&t})))
  866. }
  867. return result
  868. }
  869. type JoinTuple struct {
  870. Tuples []Tuple
  871. Alias
  872. }
  873. func (jt *JoinTuple) AddTuple(tuple Tuple) {
  874. jt.Tuples = append(jt.Tuples, tuple)
  875. }
  876. func (jt *JoinTuple) AddTuples(tuples []Tuple) {
  877. for _, t := range tuples {
  878. jt.Tuples = append(jt.Tuples, t)
  879. }
  880. }
  881. func getTupleValue(tuple Tuple, t string, key string) (interface{}, bool) {
  882. switch t {
  883. case "value":
  884. return tuple.Value(key)
  885. case "meta":
  886. return tuple.Meta(key)
  887. default:
  888. common.Log.Errorf("cannot get tuple for type %s", t)
  889. return nil, false
  890. }
  891. }
  892. func (jt *JoinTuple) doGetValue(t string, key string) (interface{}, bool) {
  893. keys := strings.Split(key, COLUMN_SEPARATOR)
  894. tuples := jt.Tuples
  895. switch len(keys) {
  896. case 1:
  897. if len(tuples) > 1 {
  898. for _, tuple := range tuples { //TODO support key without modifier?
  899. v, ok := getTupleValue(tuple, t, key)
  900. if ok {
  901. return v, ok
  902. }
  903. }
  904. common.Log.Debugf("Wrong key: %s not found", key)
  905. return nil, false
  906. } else {
  907. return getTupleValue(tuples[0], t, key)
  908. }
  909. case 2:
  910. emitter, key := keys[0], keys[1]
  911. //TODO should use hash here
  912. for _, tuple := range tuples {
  913. if tuple.Emitter == emitter {
  914. return getTupleValue(tuple, t, key)
  915. }
  916. }
  917. return nil, false
  918. default:
  919. common.Log.Infoln("Wrong key: ", key, ", expect dot in the expression.")
  920. return nil, false
  921. }
  922. }
  923. func (jt *JoinTuple) Value(key string) (interface{}, bool) {
  924. r, ok := jt.AliasValue(key)
  925. if ok {
  926. return r, ok
  927. }
  928. return jt.doGetValue("value", key)
  929. }
  930. func (jt *JoinTuple) Meta(key string) (interface{}, bool) {
  931. return jt.doGetValue("meta", key)
  932. }
  933. func (jt *JoinTuple) All(stream string) (interface{}, bool) {
  934. if stream != "" {
  935. for _, t := range jt.Tuples {
  936. if t.Emitter == stream {
  937. return t.Message, true
  938. }
  939. }
  940. } else {
  941. var r Message = make(map[string]interface{})
  942. for _, t := range jt.Tuples {
  943. for k, v := range t.Message {
  944. if _, ok := r[k]; !ok {
  945. r[k] = v
  946. }
  947. }
  948. }
  949. return r, true
  950. }
  951. return nil, false
  952. }
  953. func (jt *JoinTuple) Clone() DataValuer {
  954. ts := make([]Tuple, len(jt.Tuples))
  955. for i, t := range jt.Tuples {
  956. ts[i] = *(t.Clone().(*Tuple))
  957. }
  958. return &JoinTuple{Tuples: ts}
  959. }
  960. type JoinTupleSets struct {
  961. Content []JoinTuple
  962. *WindowRange
  963. }
  964. func (s *JoinTupleSets) Len() int { return len(s.Content) }
  965. func (s *JoinTupleSets) Swap(i, j int) { s.Content[i], s.Content[j] = s.Content[j], s.Content[i] }
  966. func (s *JoinTupleSets) Index(i int) Valuer { return &(s.Content[i]) }
  967. func (s *JoinTupleSets) AggregateEval(expr Expr, v CallValuer) []interface{} {
  968. var result []interface{}
  969. for _, t := range s.Content {
  970. result = append(result, Eval(expr, MultiValuer(&t, v, &WildcardValuer{&t})))
  971. }
  972. return result
  973. }
  974. type GroupedTuples struct {
  975. Content []DataValuer
  976. *WindowRange
  977. }
  978. func (s GroupedTuples) AggregateEval(expr Expr, v CallValuer) []interface{} {
  979. var result []interface{}
  980. for _, t := range s.Content {
  981. result = append(result, Eval(expr, MultiValuer(t, v, &WildcardValuer{t})))
  982. }
  983. return result
  984. }
  985. type GroupedTuplesSet []GroupedTuples
  986. func (s GroupedTuplesSet) Len() int { return len(s) }
  987. func (s GroupedTuplesSet) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
  988. func (s GroupedTuplesSet) Index(i int) Valuer { return s[i].Content[0] }
  989. type SortingData interface {
  990. Len() int
  991. Swap(i, j int)
  992. Index(i int) Valuer
  993. }
  994. // multiSorter implements the Sort interface, sorting the changes within.Hi
  995. type MultiSorter struct {
  996. SortingData
  997. fields SortFields
  998. valuer CallValuer
  999. values []map[string]interface{}
  1000. }
  1001. // OrderedBy returns a Sorter that sorts using the less functions, in order.
  1002. // Call its Sort method to sort the data.
  1003. func OrderedBy(fields SortFields, fv *FunctionValuer) *MultiSorter {
  1004. return &MultiSorter{
  1005. fields: fields,
  1006. valuer: fv,
  1007. }
  1008. }
  1009. // Less is part of sort.Interface. It is implemented by looping along the
  1010. // less functions until it finds a comparison that discriminates between
  1011. // the two items (one is less than the other). Note that it can call the
  1012. // less functions twice per call. We could change the functions to return
  1013. // -1, 0, 1 and reduce the number of calls for greater efficiency: an
  1014. // exercise for the reader.
  1015. func (ms *MultiSorter) Less(i, j int) bool {
  1016. p, q := ms.values[i], ms.values[j]
  1017. v := &ValuerEval{Valuer: MultiValuer(ms.valuer)}
  1018. for _, field := range ms.fields {
  1019. n := field.Name
  1020. vp, _ := p[n]
  1021. vq, _ := q[n]
  1022. if vp == nil && vq != nil {
  1023. return false
  1024. } else if vp != nil && vq == nil {
  1025. ms.valueSwap(true, i, j)
  1026. return true
  1027. } else if vp == nil && vq == nil {
  1028. return false
  1029. }
  1030. switch {
  1031. case v.simpleDataEval(vp, vq, LT):
  1032. ms.valueSwap(field.Ascending, i, j)
  1033. return field.Ascending
  1034. case v.simpleDataEval(vq, vp, LT):
  1035. ms.valueSwap(!field.Ascending, i, j)
  1036. return !field.Ascending
  1037. }
  1038. }
  1039. return false
  1040. }
  1041. func (ms *MultiSorter) valueSwap(s bool, i, j int) {
  1042. if s {
  1043. ms.values[i], ms.values[j] = ms.values[j], ms.values[i]
  1044. }
  1045. }
  1046. // Sort sorts the argument slice according to the less functions passed to OrderedBy.
  1047. func (ms *MultiSorter) Sort(data SortingData) error {
  1048. ms.SortingData = data
  1049. types := make([]string, len(ms.fields))
  1050. ms.values = make([]map[string]interface{}, data.Len())
  1051. //load and validate data
  1052. for i := 0; i < data.Len(); i++ {
  1053. ms.values[i] = make(map[string]interface{})
  1054. p := data.Index(i)
  1055. vep := &ValuerEval{Valuer: MultiValuer(p, ms.valuer)}
  1056. for j, field := range ms.fields {
  1057. n := field.Name
  1058. vp, _ := vep.Valuer.Value(n)
  1059. if err, ok := vp.(error); ok {
  1060. return err
  1061. } else {
  1062. if types[j] == "" && vp != nil {
  1063. types[j] = fmt.Sprintf("%T", vp)
  1064. }
  1065. if err := validate(types[j], vp); err != nil {
  1066. return err
  1067. } else {
  1068. ms.values[i][n] = vp
  1069. }
  1070. }
  1071. }
  1072. }
  1073. sort.Sort(ms)
  1074. return nil
  1075. }
  1076. func validate(t string, v interface{}) error {
  1077. if v == nil || t == "" {
  1078. return nil
  1079. }
  1080. vt := fmt.Sprintf("%T", v)
  1081. switch t {
  1082. case "int", "int64", "float64", "uint64":
  1083. if vt == "int" || vt == "int64" || vt == "float64" || vt == "uint64" {
  1084. return nil
  1085. } else {
  1086. return fmt.Errorf("incompatible types for comparison: %s and %s", t, vt)
  1087. }
  1088. case "bool":
  1089. if vt == "bool" {
  1090. return nil
  1091. } else {
  1092. return fmt.Errorf("incompatible types for comparison: %s and %s", t, vt)
  1093. }
  1094. case "string":
  1095. if vt == "string" {
  1096. return nil
  1097. } else {
  1098. return fmt.Errorf("incompatible types for comparison: %s and %s", t, vt)
  1099. }
  1100. case "time.Time":
  1101. _, err := common.InterfaceToTime(v, "")
  1102. if err != nil {
  1103. return fmt.Errorf("incompatible types for comparison: %s and %s", t, vt)
  1104. } else {
  1105. return nil
  1106. }
  1107. default:
  1108. return fmt.Errorf("incompatible types for comparison: %s and %s", t, vt)
  1109. }
  1110. return nil
  1111. }
  1112. type EvalResultMessage struct {
  1113. Emitter string
  1114. Result interface{}
  1115. Message Message
  1116. }
  1117. type ResultsAndMessages []EvalResultMessage
  1118. // Eval evaluates expr against a map.
  1119. func Eval(expr Expr, m Valuer) interface{} {
  1120. eval := ValuerEval{Valuer: m}
  1121. return eval.Eval(expr)
  1122. }
  1123. // ValuerEval will evaluate an expression using the Valuer.
  1124. type ValuerEval struct {
  1125. Valuer Valuer
  1126. // IntegerFloatDivision will set the eval system to treat
  1127. // a division between two integers as a floating point division.
  1128. IntegerFloatDivision bool
  1129. }
  1130. // MultiValuer returns a Valuer that iterates over multiple Valuer instances
  1131. // to find a match.
  1132. func MultiValuer(valuers ...Valuer) Valuer {
  1133. return multiValuer(valuers)
  1134. }
  1135. type multiValuer []Valuer
  1136. func (a multiValuer) Value(key string) (interface{}, bool) {
  1137. for _, valuer := range a {
  1138. if v, ok := valuer.Value(key); ok {
  1139. return v, true
  1140. }
  1141. }
  1142. return nil, false
  1143. }
  1144. func (a multiValuer) Meta(key string) (interface{}, bool) {
  1145. for _, valuer := range a {
  1146. if v, ok := valuer.Meta(key); ok {
  1147. return v, true
  1148. }
  1149. }
  1150. return nil, false
  1151. }
  1152. func (a multiValuer) AppendAlias(key string, value interface{}) bool {
  1153. for _, valuer := range a {
  1154. if ok := valuer.AppendAlias(key, value); ok {
  1155. return true
  1156. }
  1157. }
  1158. return false
  1159. }
  1160. func (a multiValuer) Call(name string, args []interface{}) (interface{}, bool) {
  1161. for _, valuer := range a {
  1162. if valuer, ok := valuer.(CallValuer); ok {
  1163. if v, ok := valuer.Call(name, args); ok {
  1164. return v, true
  1165. } else {
  1166. return fmt.Errorf("call func %s error: %v", name, v), false
  1167. }
  1168. }
  1169. }
  1170. return nil, false
  1171. }
  1172. type multiAggregateValuer struct {
  1173. data AggregateData
  1174. multiValuer
  1175. singleCallValuer CallValuer
  1176. }
  1177. func MultiAggregateValuer(data AggregateData, singleCallValuer CallValuer, valuers ...Valuer) Valuer {
  1178. return &multiAggregateValuer{
  1179. data: data,
  1180. multiValuer: valuers,
  1181. singleCallValuer: singleCallValuer,
  1182. }
  1183. }
  1184. func (a *multiAggregateValuer) Call(name string, args []interface{}) (interface{}, bool) {
  1185. // assume the aggFuncMap already cache the custom agg funcs in IsAggFunc()
  1186. _, isAgg := aggFuncMap[name]
  1187. for _, valuer := range a.multiValuer {
  1188. if a, ok := valuer.(AggregateCallValuer); ok && isAgg {
  1189. if v, ok := a.Call(name, args); ok {
  1190. return v, true
  1191. } else {
  1192. return fmt.Errorf("call func %s error: %v", name, v), false
  1193. }
  1194. } else if c, ok := valuer.(CallValuer); ok && !isAgg {
  1195. if v, ok := c.Call(name, args); ok {
  1196. return v, true
  1197. }
  1198. }
  1199. }
  1200. return nil, false
  1201. }
  1202. func (a *multiAggregateValuer) GetAllTuples() AggregateData {
  1203. return a.data
  1204. }
  1205. func (a *multiAggregateValuer) GetSingleCallValuer() CallValuer {
  1206. return a.singleCallValuer
  1207. }
  1208. type BracketEvalResult struct {
  1209. Start, End int
  1210. }
  1211. func (ber *BracketEvalResult) isIndex() bool {
  1212. return ber.Start == ber.End
  1213. }
  1214. // Eval evaluates an expression and returns a value.
  1215. func (v *ValuerEval) Eval(expr Expr) interface{} {
  1216. if expr == nil {
  1217. return nil
  1218. }
  1219. switch expr := expr.(type) {
  1220. case *BinaryExpr:
  1221. return v.evalBinaryExpr(expr)
  1222. case *IntegerLiteral:
  1223. return expr.Val
  1224. case *NumberLiteral:
  1225. return expr.Val
  1226. case *ParenExpr:
  1227. return v.Eval(expr.Expr)
  1228. case *StringLiteral:
  1229. return expr.Val
  1230. case *BooleanLiteral:
  1231. return expr.Val
  1232. case *ColonExpr:
  1233. return &BracketEvalResult{Start: expr.Start, End: expr.End}
  1234. case *IndexExpr:
  1235. return &BracketEvalResult{Start: expr.Index, End: expr.Index}
  1236. case *Call:
  1237. if valuer, ok := v.Valuer.(CallValuer); ok {
  1238. switch expr.Name {
  1239. case "window_start", "window_end":
  1240. if aggreValuer, ok := valuer.(AggregateCallValuer); ok {
  1241. ad := aggreValuer.GetAllTuples()
  1242. if expr.Name == "window_start" {
  1243. return ad.GetWindowStart()
  1244. } else {
  1245. return ad.GetWindowEnd()
  1246. }
  1247. }
  1248. default:
  1249. var args []interface{}
  1250. if len(expr.Args) > 0 {
  1251. args = make([]interface{}, len(expr.Args))
  1252. for i, arg := range expr.Args {
  1253. if aggreValuer, ok := valuer.(AggregateCallValuer); IsAggFunc(expr) && ok {
  1254. args[i] = aggreValuer.GetAllTuples().AggregateEval(arg, aggreValuer.GetSingleCallValuer())
  1255. } else {
  1256. args[i] = v.Eval(arg)
  1257. if _, ok := args[i].(error); ok {
  1258. return args[i]
  1259. }
  1260. }
  1261. }
  1262. }
  1263. val, _ := valuer.Call(expr.Name, args)
  1264. return val
  1265. }
  1266. }
  1267. return nil
  1268. case *FieldRef:
  1269. var n string
  1270. if expr.IsAlias() { // alias is renamed internally to avoid accidentally evaled as a col with the same name
  1271. n = fmt.Sprintf("%s%s", PRIVATE_PREFIX, expr.Name)
  1272. } else if expr.StreamName == DefaultStream {
  1273. n = expr.Name
  1274. } else {
  1275. n = fmt.Sprintf("%s%s%s", string(expr.StreamName), COLUMN_SEPARATOR, expr.Name)
  1276. }
  1277. if n != "" {
  1278. val, ok := v.Valuer.Value(n)
  1279. if ok {
  1280. return val
  1281. }
  1282. }
  1283. if expr.IsAlias() {
  1284. r := v.Eval(expr.expression)
  1285. v.Valuer.AppendAlias(expr.Name, r)
  1286. return r
  1287. }
  1288. return nil
  1289. case *MetaRef:
  1290. if expr.StreamName == "" || expr.StreamName == DefaultStream {
  1291. val, _ := v.Valuer.Meta(expr.Name)
  1292. return val
  1293. } else {
  1294. //The field specified with stream source
  1295. val, _ := v.Valuer.Meta(string(expr.StreamName) + COLUMN_SEPARATOR + expr.Name)
  1296. return val
  1297. }
  1298. case *Wildcard:
  1299. val, _ := v.Valuer.Value("")
  1300. return val
  1301. case *CaseExpr:
  1302. return v.evalCase(expr)
  1303. default:
  1304. return nil
  1305. }
  1306. }
  1307. func (v *ValuerEval) evalBinaryExpr(expr *BinaryExpr) interface{} {
  1308. lhs := v.Eval(expr.LHS)
  1309. switch val := lhs.(type) {
  1310. case map[string]interface{}:
  1311. return v.evalJsonExpr(val, expr.OP, expr.RHS)
  1312. case Message:
  1313. return v.evalJsonExpr(map[string]interface{}(val), expr.OP, expr.RHS)
  1314. case error:
  1315. return val
  1316. }
  1317. // shortcut for bool
  1318. switch expr.OP {
  1319. case AND:
  1320. if bv, ok := lhs.(bool); ok && !bv {
  1321. return false
  1322. }
  1323. case OR:
  1324. if bv, ok := lhs.(bool); ok && bv {
  1325. return true
  1326. }
  1327. }
  1328. if isSliceOrArray(lhs) {
  1329. return v.evalJsonExpr(lhs, expr.OP, expr.RHS)
  1330. }
  1331. rhs := v.Eval(expr.RHS)
  1332. if _, ok := rhs.(error); ok {
  1333. return rhs
  1334. }
  1335. return v.simpleDataEval(lhs, rhs, expr.OP)
  1336. }
  1337. func (v *ValuerEval) evalCase(expr *CaseExpr) interface{} {
  1338. if expr.Value != nil { // compare value to all when clause
  1339. ev := v.Eval(expr.Value)
  1340. for _, w := range expr.WhenClauses {
  1341. wv := v.Eval(w.Expr)
  1342. switch r := v.simpleDataEval(ev, wv, EQ).(type) {
  1343. case error:
  1344. return fmt.Errorf("evaluate case expression error: %s", r)
  1345. case bool:
  1346. if r {
  1347. return v.Eval(w.Result)
  1348. }
  1349. }
  1350. }
  1351. } else {
  1352. for _, w := range expr.WhenClauses {
  1353. switch r := v.Eval(w.Expr).(type) {
  1354. case error:
  1355. return fmt.Errorf("evaluate case expression error: %s", r)
  1356. case bool:
  1357. if r {
  1358. return v.Eval(w.Result)
  1359. }
  1360. }
  1361. }
  1362. }
  1363. if expr.ElseClause != nil {
  1364. return v.Eval(expr.ElseClause)
  1365. }
  1366. return nil
  1367. }
  1368. func isSliceOrArray(v interface{}) bool {
  1369. kind := reflect.ValueOf(v).Kind()
  1370. return kind == reflect.Array || kind == reflect.Slice
  1371. }
  1372. func (v *ValuerEval) evalJsonExpr(result interface{}, op Token, expr Expr) interface{} {
  1373. switch op {
  1374. case ARROW:
  1375. if val, ok := result.(map[string]interface{}); ok {
  1376. switch e := expr.(type) {
  1377. case *FieldRef, *MetaRef:
  1378. ve := &ValuerEval{Valuer: Message(val)}
  1379. return ve.Eval(e)
  1380. default:
  1381. return fmt.Errorf("the right expression is not a field reference node")
  1382. }
  1383. } else {
  1384. return fmt.Errorf("the result %v is not a type of map[string]interface{}", result)
  1385. }
  1386. case SUBSET:
  1387. if isSliceOrArray(result) {
  1388. return v.subset(result, expr)
  1389. } else {
  1390. return fmt.Errorf("%v is an invalid operation for %T", op, result)
  1391. }
  1392. default:
  1393. return fmt.Errorf("%v is an invalid operation for %T", op, result)
  1394. }
  1395. }
  1396. func (v *ValuerEval) subset(result interface{}, expr Expr) interface{} {
  1397. val := reflect.ValueOf(result)
  1398. ber := v.Eval(expr)
  1399. if berVal, ok1 := ber.(*BracketEvalResult); ok1 {
  1400. if berVal.isIndex() {
  1401. if 0 > berVal.Start {
  1402. if 0 > berVal.Start+val.Len() {
  1403. return fmt.Errorf("out of index: %d of %d", berVal.Start, val.Len())
  1404. }
  1405. berVal.Start += val.Len()
  1406. } else if berVal.Start >= val.Len() {
  1407. return fmt.Errorf("out of index: %d of %d", berVal.Start, val.Len())
  1408. }
  1409. return val.Index(berVal.Start).Interface()
  1410. } else {
  1411. if 0 > berVal.Start {
  1412. if 0 > berVal.Start+val.Len() {
  1413. return fmt.Errorf("out of index: %d of %d", berVal.Start, val.Len())
  1414. }
  1415. berVal.Start += val.Len()
  1416. } else if berVal.Start >= val.Len() {
  1417. return fmt.Errorf("start value is out of index: %d of %d", berVal.Start, val.Len())
  1418. }
  1419. if math.MinInt32 == berVal.End {
  1420. berVal.End = val.Len()
  1421. } else if 0 > berVal.End {
  1422. if 0 > berVal.End+val.Len() {
  1423. return fmt.Errorf("out of index: %d of %d", berVal.End, val.Len())
  1424. }
  1425. berVal.End += val.Len()
  1426. } else if berVal.End > val.Len() {
  1427. return fmt.Errorf("end value is out of index: %d of %d", berVal.End, val.Len())
  1428. } else if berVal.Start >= berVal.End {
  1429. return fmt.Errorf("start cannot be greater than end. start:%d end:%d", berVal.Start, berVal.End)
  1430. }
  1431. return val.Slice(berVal.Start, berVal.End).Interface()
  1432. }
  1433. } else {
  1434. return fmt.Errorf("invalid evaluation result - %v", berVal)
  1435. }
  1436. }
  1437. //lhs and rhs are non-nil
  1438. func (v *ValuerEval) simpleDataEval(lhs, rhs interface{}, op Token) interface{} {
  1439. if lhs == nil || rhs == nil {
  1440. switch op {
  1441. case EQ, LTE, GTE:
  1442. if lhs == nil && rhs == nil {
  1443. return true
  1444. } else {
  1445. return false
  1446. }
  1447. case NEQ:
  1448. if lhs == nil && rhs == nil {
  1449. return false
  1450. } else {
  1451. return true
  1452. }
  1453. case LT, GT:
  1454. return false
  1455. default:
  1456. return nil
  1457. }
  1458. }
  1459. lhs = convertNum(lhs)
  1460. rhs = convertNum(rhs)
  1461. // Evaluate if both sides are simple types.
  1462. switch lhs := lhs.(type) {
  1463. case bool:
  1464. rhs, ok := rhs.(bool)
  1465. if !ok {
  1466. return invalidOpError(lhs, op, rhs)
  1467. }
  1468. switch op {
  1469. case AND:
  1470. return lhs && rhs
  1471. case OR:
  1472. return lhs || rhs
  1473. case BITWISE_AND:
  1474. return lhs && rhs
  1475. case BITWISE_OR:
  1476. return lhs || rhs
  1477. case BITWISE_XOR:
  1478. return lhs != rhs
  1479. case EQ:
  1480. return lhs == rhs
  1481. case NEQ:
  1482. return lhs != rhs
  1483. default:
  1484. return invalidOpError(lhs, op, rhs)
  1485. }
  1486. case float64:
  1487. // Try the rhs as a float64, int64, or uint64
  1488. rhsf, ok := rhs.(float64)
  1489. if !ok {
  1490. switch val := rhs.(type) {
  1491. case int64:
  1492. rhsf, ok = float64(val), true
  1493. case uint64:
  1494. rhsf, ok = float64(val), true
  1495. }
  1496. }
  1497. if !ok {
  1498. return invalidOpError(lhs, op, rhs)
  1499. }
  1500. rhs := rhsf
  1501. switch op {
  1502. case EQ:
  1503. return lhs == rhs
  1504. case NEQ:
  1505. return lhs != rhs
  1506. case LT:
  1507. return lhs < rhs
  1508. case LTE:
  1509. return lhs <= rhs
  1510. case GT:
  1511. return lhs > rhs
  1512. case GTE:
  1513. return lhs >= rhs
  1514. case ADD:
  1515. return lhs + rhs
  1516. case SUB:
  1517. return lhs - rhs
  1518. case MUL:
  1519. return lhs * rhs
  1520. case DIV:
  1521. if rhs == 0 {
  1522. return fmt.Errorf("divided by zero")
  1523. }
  1524. return lhs / rhs
  1525. case MOD:
  1526. if rhs == 0 {
  1527. return fmt.Errorf("divided by zero")
  1528. }
  1529. return math.Mod(lhs, rhs)
  1530. default:
  1531. return invalidOpError(lhs, op, rhs)
  1532. }
  1533. case int64:
  1534. // Try as a float64 to see if a float cast is required.
  1535. switch rhs := rhs.(type) {
  1536. case float64:
  1537. lhs := float64(lhs)
  1538. switch op {
  1539. case EQ:
  1540. return lhs == rhs
  1541. case NEQ:
  1542. return lhs != rhs
  1543. case LT:
  1544. return lhs < rhs
  1545. case LTE:
  1546. return lhs <= rhs
  1547. case GT:
  1548. return lhs > rhs
  1549. case GTE:
  1550. return lhs >= rhs
  1551. case ADD:
  1552. return lhs + rhs
  1553. case SUB:
  1554. return lhs - rhs
  1555. case MUL:
  1556. return lhs * rhs
  1557. case DIV:
  1558. if rhs == 0 {
  1559. return fmt.Errorf("divided by zero")
  1560. }
  1561. return lhs / rhs
  1562. case MOD:
  1563. if rhs == 0 {
  1564. return fmt.Errorf("divided by zero")
  1565. }
  1566. return math.Mod(lhs, rhs)
  1567. default:
  1568. return invalidOpError(lhs, op, rhs)
  1569. }
  1570. case int64:
  1571. switch op {
  1572. case EQ:
  1573. return lhs == rhs
  1574. case NEQ:
  1575. return lhs != rhs
  1576. case LT:
  1577. return lhs < rhs
  1578. case LTE:
  1579. return lhs <= rhs
  1580. case GT:
  1581. return lhs > rhs
  1582. case GTE:
  1583. return lhs >= rhs
  1584. case ADD:
  1585. return lhs + rhs
  1586. case SUB:
  1587. return lhs - rhs
  1588. case MUL:
  1589. return lhs * rhs
  1590. case DIV:
  1591. if v.IntegerFloatDivision {
  1592. if rhs == 0 {
  1593. return fmt.Errorf("divided by zero")
  1594. }
  1595. return float64(lhs) / float64(rhs)
  1596. }
  1597. if rhs == 0 {
  1598. return fmt.Errorf("divided by zero")
  1599. }
  1600. return lhs / rhs
  1601. case MOD:
  1602. if rhs == 0 {
  1603. return fmt.Errorf("divided by zero")
  1604. }
  1605. return lhs % rhs
  1606. case BITWISE_AND:
  1607. return lhs & rhs
  1608. case BITWISE_OR:
  1609. return lhs | rhs
  1610. case BITWISE_XOR:
  1611. return lhs ^ rhs
  1612. default:
  1613. return invalidOpError(lhs, op, rhs)
  1614. }
  1615. case uint64:
  1616. switch op {
  1617. case EQ:
  1618. return uint64(lhs) == rhs
  1619. case NEQ:
  1620. return uint64(lhs) != rhs
  1621. case LT:
  1622. if lhs < 0 {
  1623. return true
  1624. }
  1625. return uint64(lhs) < rhs
  1626. case LTE:
  1627. if lhs < 0 {
  1628. return true
  1629. }
  1630. return uint64(lhs) <= rhs
  1631. case GT:
  1632. if lhs < 0 {
  1633. return false
  1634. }
  1635. return uint64(lhs) > rhs
  1636. case GTE:
  1637. if lhs < 0 {
  1638. return false
  1639. }
  1640. return uint64(lhs) >= rhs
  1641. case ADD:
  1642. return uint64(lhs) + rhs
  1643. case SUB:
  1644. return uint64(lhs) - rhs
  1645. case MUL:
  1646. return uint64(lhs) * rhs
  1647. case DIV:
  1648. if rhs == 0 {
  1649. return fmt.Errorf("divided by zero")
  1650. }
  1651. return uint64(lhs) / rhs
  1652. case MOD:
  1653. if rhs == 0 {
  1654. return fmt.Errorf("divided by zero")
  1655. }
  1656. return uint64(lhs) % rhs
  1657. case BITWISE_AND:
  1658. return uint64(lhs) & rhs
  1659. case BITWISE_OR:
  1660. return uint64(lhs) | rhs
  1661. case BITWISE_XOR:
  1662. return uint64(lhs) ^ rhs
  1663. default:
  1664. return invalidOpError(lhs, op, rhs)
  1665. }
  1666. default:
  1667. return invalidOpError(lhs, op, rhs)
  1668. }
  1669. case uint64:
  1670. // Try as a float64 to see if a float cast is required.
  1671. switch rhs := rhs.(type) {
  1672. case float64:
  1673. lhs := float64(lhs)
  1674. switch op {
  1675. case EQ:
  1676. return lhs == rhs
  1677. case NEQ:
  1678. return lhs != rhs
  1679. case LT:
  1680. return lhs < rhs
  1681. case LTE:
  1682. return lhs <= rhs
  1683. case GT:
  1684. return lhs > rhs
  1685. case GTE:
  1686. return lhs >= rhs
  1687. case ADD:
  1688. return lhs + rhs
  1689. case SUB:
  1690. return lhs - rhs
  1691. case MUL:
  1692. return lhs * rhs
  1693. case DIV:
  1694. if rhs == 0 {
  1695. return fmt.Errorf("divided by zero")
  1696. }
  1697. return lhs / rhs
  1698. case MOD:
  1699. if rhs == 0 {
  1700. return fmt.Errorf("divided by zero")
  1701. }
  1702. return math.Mod(lhs, rhs)
  1703. default:
  1704. return invalidOpError(lhs, op, rhs)
  1705. }
  1706. case int64:
  1707. switch op {
  1708. case EQ:
  1709. return lhs == uint64(rhs)
  1710. case NEQ:
  1711. return lhs != uint64(rhs)
  1712. case LT:
  1713. if rhs < 0 {
  1714. return false
  1715. }
  1716. return lhs < uint64(rhs)
  1717. case LTE:
  1718. if rhs < 0 {
  1719. return false
  1720. }
  1721. return lhs <= uint64(rhs)
  1722. case GT:
  1723. if rhs < 0 {
  1724. return true
  1725. }
  1726. return lhs > uint64(rhs)
  1727. case GTE:
  1728. if rhs < 0 {
  1729. return true
  1730. }
  1731. return lhs >= uint64(rhs)
  1732. case ADD:
  1733. return lhs + uint64(rhs)
  1734. case SUB:
  1735. return lhs - uint64(rhs)
  1736. case MUL:
  1737. return lhs * uint64(rhs)
  1738. case DIV:
  1739. if rhs == 0 {
  1740. return fmt.Errorf("divided by zero")
  1741. }
  1742. return lhs / uint64(rhs)
  1743. case MOD:
  1744. if rhs == 0 {
  1745. return fmt.Errorf("divided by zero")
  1746. }
  1747. return lhs % uint64(rhs)
  1748. case BITWISE_AND:
  1749. return lhs & uint64(rhs)
  1750. case BITWISE_OR:
  1751. return lhs | uint64(rhs)
  1752. case BITWISE_XOR:
  1753. return lhs ^ uint64(rhs)
  1754. default:
  1755. return invalidOpError(lhs, op, rhs)
  1756. }
  1757. case uint64:
  1758. switch op {
  1759. case EQ:
  1760. return lhs == rhs
  1761. case NEQ:
  1762. return lhs != rhs
  1763. case LT:
  1764. return lhs < rhs
  1765. case LTE:
  1766. return lhs <= rhs
  1767. case GT:
  1768. return lhs > rhs
  1769. case GTE:
  1770. return lhs >= rhs
  1771. case ADD:
  1772. return lhs + rhs
  1773. case SUB:
  1774. return lhs - rhs
  1775. case MUL:
  1776. return lhs * rhs
  1777. case DIV:
  1778. if rhs == 0 {
  1779. return fmt.Errorf("divided by zero")
  1780. }
  1781. return lhs / rhs
  1782. case MOD:
  1783. if rhs == 0 {
  1784. return fmt.Errorf("divided by zero")
  1785. }
  1786. return lhs % rhs
  1787. case BITWISE_AND:
  1788. return lhs & rhs
  1789. case BITWISE_OR:
  1790. return lhs | rhs
  1791. case BITWISE_XOR:
  1792. return lhs ^ rhs
  1793. default:
  1794. return invalidOpError(lhs, op, rhs)
  1795. }
  1796. default:
  1797. return invalidOpError(lhs, op, rhs)
  1798. }
  1799. case string:
  1800. rhss, ok := rhs.(string)
  1801. if !ok {
  1802. return invalidOpError(lhs, op, rhs)
  1803. }
  1804. switch op {
  1805. case EQ:
  1806. return lhs == rhss
  1807. case NEQ:
  1808. return lhs != rhss
  1809. case LT:
  1810. return lhs < rhss
  1811. case LTE:
  1812. return lhs <= rhss
  1813. case GT:
  1814. return lhs > rhss
  1815. case GTE:
  1816. return lhs >= rhss
  1817. default:
  1818. return invalidOpError(lhs, op, rhs)
  1819. }
  1820. case time.Time:
  1821. rt, err := common.InterfaceToTime(rhs, "")
  1822. if err != nil {
  1823. return invalidOpError(lhs, op, rhs)
  1824. }
  1825. switch op {
  1826. case EQ:
  1827. return lhs.Equal(rt)
  1828. case NEQ:
  1829. return !lhs.Equal(rt)
  1830. case LT:
  1831. return lhs.Before(rt)
  1832. case LTE:
  1833. return lhs.Before(rt) || lhs.Equal(rt)
  1834. case GT:
  1835. return lhs.After(rt)
  1836. case GTE:
  1837. return lhs.After(rt) || lhs.Equal(rt)
  1838. default:
  1839. return invalidOpError(lhs, op, rhs)
  1840. }
  1841. default:
  1842. return invalidOpError(lhs, op, rhs)
  1843. }
  1844. return invalidOpError(lhs, op, rhs)
  1845. }
  1846. func invalidOpError(lhs interface{}, op Token, rhs interface{}) error {
  1847. return fmt.Errorf("invalid operation %[1]T(%[1]v) %s %[3]T(%[3]v)", lhs, tokens[op], rhs)
  1848. }
  1849. func convertNum(para interface{}) interface{} {
  1850. if isInt(para) {
  1851. para = toInt64(para)
  1852. } else if isFloat(para) {
  1853. para = toFloat64(para)
  1854. }
  1855. return para
  1856. }
  1857. func isInt(para interface{}) bool {
  1858. switch para.(type) {
  1859. case int:
  1860. return true
  1861. case int8:
  1862. return true
  1863. case int16:
  1864. return true
  1865. case int32:
  1866. return true
  1867. case int64:
  1868. return true
  1869. }
  1870. return false
  1871. }
  1872. func toInt64(para interface{}) int64 {
  1873. if v, ok := para.(int); ok {
  1874. return int64(v)
  1875. } else if v, ok := para.(int8); ok {
  1876. return int64(v)
  1877. } else if v, ok := para.(int16); ok {
  1878. return int64(v)
  1879. } else if v, ok := para.(int32); ok {
  1880. return int64(v)
  1881. } else if v, ok := para.(int64); ok {
  1882. return v
  1883. }
  1884. return 0
  1885. }
  1886. func isFloat(para interface{}) bool {
  1887. switch para.(type) {
  1888. case float32:
  1889. return true
  1890. case float64:
  1891. return true
  1892. }
  1893. return false
  1894. }
  1895. func toFloat64(para interface{}) float64 {
  1896. if v, ok := para.(float32); ok {
  1897. return float64(v)
  1898. } else if v, ok := para.(float64); ok {
  1899. return v
  1900. }
  1901. return 0
  1902. }