|
@@ -3,9 +3,10 @@ package xsql
|
|
|
import (
|
|
|
"engine/common"
|
|
|
"fmt"
|
|
|
- "log"
|
|
|
"math"
|
|
|
+ "sort"
|
|
|
"strings"
|
|
|
+ "time"
|
|
|
)
|
|
|
|
|
|
|
|
@@ -47,6 +48,9 @@ type JoinType int
|
|
|
const (
|
|
|
LEFT_JOIN JoinType = iota
|
|
|
INNER_JOIN
|
|
|
+ RIGHT_JOIN
|
|
|
+ FULL_JOIN
|
|
|
+ CROSS_JOIN
|
|
|
)
|
|
|
|
|
|
type Join struct {
|
|
@@ -68,11 +72,12 @@ type Statement interface{
|
|
|
}
|
|
|
|
|
|
type SelectStatement struct {
|
|
|
- Fields Fields
|
|
|
- Sources Sources
|
|
|
- Joins Joins
|
|
|
- Condition Expr
|
|
|
+ Fields Fields
|
|
|
+ Sources Sources
|
|
|
+ Joins Joins
|
|
|
+ Condition Expr
|
|
|
Dimensions Dimensions
|
|
|
+ Having Expr
|
|
|
SortFields SortFields
|
|
|
}
|
|
|
|
|
@@ -187,6 +192,23 @@ func (d *Dimension) expr() {}
|
|
|
func (d *Dimension) node(){}
|
|
|
|
|
|
func (d Dimensions) node(){}
|
|
|
+func (d *Dimensions) GetWindow() *Window{
|
|
|
+ for _, child := range *d {
|
|
|
+ if w, ok := child.Expr.(*Window); ok{
|
|
|
+ return w
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+func (d *Dimensions) GetGroups() Dimensions{
|
|
|
+ var nd Dimensions
|
|
|
+ for _, child := range *d {
|
|
|
+ if _, ok := child.Expr.(*Window); !ok{
|
|
|
+ nd = append(nd, child)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return nd
|
|
|
+}
|
|
|
|
|
|
func (sf *SortField) expr() {}
|
|
|
func (sf *SortField) node(){}
|
|
@@ -212,14 +234,15 @@ const (
|
|
|
SESSION_WINDOW
|
|
|
)
|
|
|
|
|
|
-type Windows struct {
|
|
|
- Args []Expr
|
|
|
+type Window struct {
|
|
|
WindowType WindowType
|
|
|
+ Length *IntegerLiteral
|
|
|
+ Interval *IntegerLiteral
|
|
|
}
|
|
|
|
|
|
-func (w *Windows) expr() {}
|
|
|
-func (w *Windows) literal() {}
|
|
|
-func (w *Windows) node(){}
|
|
|
+func (w *Window) expr() {}
|
|
|
+func (w *Window) literal() {}
|
|
|
+func (w *Window) node() {}
|
|
|
|
|
|
type SelectStatements []SelectStatement
|
|
|
|
|
@@ -349,10 +372,9 @@ func Walk(v Visitor, node Node) {
|
|
|
Walk(v, expr)
|
|
|
}
|
|
|
|
|
|
- case *Windows:
|
|
|
- for _, expr := range n.Args {
|
|
|
- Walk(v, expr)
|
|
|
- }
|
|
|
+ case *Window:
|
|
|
+ Walk(v, n.Length)
|
|
|
+ Walk(v, n.Interval)
|
|
|
|
|
|
case *Field:
|
|
|
Walk(v, n.Expr)
|
|
@@ -387,6 +409,8 @@ func Walk(v Visitor, node Node) {
|
|
|
for _, s := range n {
|
|
|
Walk(v, &s)
|
|
|
}
|
|
|
+ case *Join:
|
|
|
+ Walk(v, n.Expr)
|
|
|
|
|
|
case *StreamStmt:
|
|
|
Walk(v, &n.Name)
|
|
@@ -417,12 +441,23 @@ func Walk(v Visitor, node Node) {
|
|
|
}
|
|
|
|
|
|
|
|
|
+// WalkFunc traverses a node hierarchy in depth-first order.
|
|
|
+func WalkFunc(node Node, fn func(Node)) {
|
|
|
+ Walk(walkFuncVisitor(fn), node)
|
|
|
+}
|
|
|
+
|
|
|
+type walkFuncVisitor func(Node)
|
|
|
+
|
|
|
+func (fn walkFuncVisitor) Visit(n Node) Visitor { fn(n); return fn }
|
|
|
+
|
|
|
+
|
|
|
// Valuer is the interface that wraps the Value() method.
|
|
|
type Valuer interface {
|
|
|
// Value returns the value and existence flag for a given key.
|
|
|
Value(key string) (interface{}, bool)
|
|
|
}
|
|
|
|
|
|
+
|
|
|
// CallValuer implements the Call method for evaluating function calls.
|
|
|
type CallValuer interface {
|
|
|
Valuer
|
|
@@ -431,11 +466,53 @@ type CallValuer interface {
|
|
|
Call(name string, args []interface{}) (interface{}, bool)
|
|
|
}
|
|
|
|
|
|
-// MapValuer is a valuer that substitutes values for the mapped interface.
|
|
|
-type MapValuer map[string]interface{}
|
|
|
+type AggregateCallValuer interface {
|
|
|
+ CallValuer
|
|
|
+ GetAllTuples() AggregateData
|
|
|
+}
|
|
|
+
|
|
|
+type Wildcarder interface {
|
|
|
+ // Value returns the value and existence flag for a given key.
|
|
|
+ All(stream string) (interface{}, bool)
|
|
|
+}
|
|
|
+
|
|
|
+type DataValuer interface {
|
|
|
+ Valuer
|
|
|
+ Wildcarder
|
|
|
+}
|
|
|
+
|
|
|
+type WildcardValuer struct {
|
|
|
+ Data Wildcarder
|
|
|
+}
|
|
|
+
|
|
|
+//TODO deal with wildcard of a stream, e.g. SELECT Table.* from Table inner join Table1
|
|
|
+func (wv *WildcardValuer) Value(key string) (interface{}, bool) {
|
|
|
+ if key == ""{
|
|
|
+ return wv.Data.All(key)
|
|
|
+ }else{
|
|
|
+ a := strings.Index(key, ".*")
|
|
|
+ if a <= 0{
|
|
|
+ return nil, false
|
|
|
+ }else{
|
|
|
+ return wv.Data.All(key[:a])
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
|
|
|
-// Value returns the value for a key in the MapValuer.
|
|
|
-func (m MapValuer) Value(key string) (interface{}, bool) {
|
|
|
+/**********************************
|
|
|
+** Various Data Types for SQL transformation
|
|
|
+ */
|
|
|
+
|
|
|
+type AggregateData interface {
|
|
|
+ AggregateEval(expr Expr) []interface{}
|
|
|
+}
|
|
|
+
|
|
|
+// Message is a valuer that substitutes values for the mapped interface.
|
|
|
+type Message map[string]interface{}
|
|
|
+
|
|
|
+// Value returns the value for a key in the Message.
|
|
|
+func (m Message) Value(key string) (interface{}, bool) {
|
|
|
+ key = strings.ToLower(key)
|
|
|
if keys := strings.Split(key, "."); len(keys) == 1 {
|
|
|
v, ok := m[key]
|
|
|
return v, ok
|
|
@@ -447,138 +524,275 @@ func (m MapValuer) Value(key string) (interface{}, bool) {
|
|
|
return nil, false
|
|
|
}
|
|
|
|
|
|
-type WildcardValuer struct {
|
|
|
- Data map[string]interface{}
|
|
|
+type Event interface {
|
|
|
+ GetTimestamp() int64
|
|
|
+ IsWatermark() bool
|
|
|
}
|
|
|
|
|
|
-func (wv *WildcardValuer) Value(key string) (interface{}, bool) {
|
|
|
- //TODO Need to read the schema from stream, and fill into the map
|
|
|
- return wv.Data, true
|
|
|
+type Tuple struct {
|
|
|
+ Emitter string
|
|
|
+ Message Message
|
|
|
+ Timestamp int64
|
|
|
}
|
|
|
|
|
|
-
|
|
|
-type EmitterTuple struct {
|
|
|
- Emitter string
|
|
|
- Message map[string]interface{}
|
|
|
+func (t *Tuple) Value(key string) (interface{}, bool) {
|
|
|
+ return t.Message.Value(key)
|
|
|
}
|
|
|
|
|
|
-type MergedEmitterTuple struct {
|
|
|
- MergedMessage []EmitterTuple
|
|
|
+func (t *Tuple) All(stream string) (interface{}, bool) {
|
|
|
+ return t.Message, true
|
|
|
}
|
|
|
|
|
|
-func (me *MergedEmitterTuple) AddMergedMessage(message EmitterTuple) {
|
|
|
- me.MergedMessage = append(me.MergedMessage, message)
|
|
|
+func (t *Tuple) AggregateEval(expr Expr) []interface{} {
|
|
|
+ return []interface{}{Eval(expr, t)}
|
|
|
}
|
|
|
|
|
|
-type MergedEmitterTupleSets []MergedEmitterTuple
|
|
|
-
|
|
|
-
|
|
|
-type Messages []map[string]interface{}
|
|
|
-
|
|
|
-type EmitterTuples struct {
|
|
|
- Emitter string
|
|
|
- Messages Messages
|
|
|
+func (t *Tuple) GetTimestamp() int64 {
|
|
|
+ return t.Timestamp
|
|
|
}
|
|
|
|
|
|
-type EvalResultAndMessage struct {
|
|
|
- Stream string
|
|
|
- Result interface{}
|
|
|
- Message map[string]interface{}
|
|
|
+func (t *Tuple) IsWatermark() bool {
|
|
|
+ return false
|
|
|
}
|
|
|
|
|
|
-type ResultsAndMessages []EvalResultAndMessage
|
|
|
+type WindowTuples struct {
|
|
|
+ Emitter string
|
|
|
+ Tuples []Tuple
|
|
|
+}
|
|
|
|
|
|
-type MultiEmitterTuples []EmitterTuples
|
|
|
+type WindowTuplesSet []WindowTuples
|
|
|
|
|
|
-func (met *MultiEmitterTuples) GetBySrc(src string) Messages {
|
|
|
- for _, me := range *met {
|
|
|
+func (w WindowTuplesSet) GetBySrc(src string) []Tuple {
|
|
|
+ for _, me := range w {
|
|
|
if me.Emitter == src {
|
|
|
- return me.Messages
|
|
|
+ return me.Tuples
|
|
|
}
|
|
|
}
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-type Tuple struct {
|
|
|
- EmitterName string
|
|
|
- Message interface{}
|
|
|
- Timestamp int64
|
|
|
+func (w WindowTuplesSet) Len() int {
|
|
|
+ if len(w) > 0{
|
|
|
+ return len(w[0].Tuples)
|
|
|
+ }
|
|
|
+ return 0
|
|
|
}
|
|
|
-
|
|
|
-func (met *MultiEmitterTuples) addTuple(tuple *Tuple) {
|
|
|
- found := false
|
|
|
- m, ok := tuple.Message.(map[string]interface{})
|
|
|
- if !ok {
|
|
|
- log.Printf("Expect map[string]interface{} for the message type.")
|
|
|
- return
|
|
|
+func (w WindowTuplesSet) Swap(i, j int) {
|
|
|
+ if len(w) > 0{
|
|
|
+ s := w[0].Tuples
|
|
|
+ s[i], s[j] = s[j], s[i]
|
|
|
+ }
|
|
|
+}
|
|
|
+func (w WindowTuplesSet) Index(i int) Valuer {
|
|
|
+ if len(w) > 0{
|
|
|
+ s := w[0].Tuples
|
|
|
+ return &(s[i])
|
|
|
}
|
|
|
+ return nil
|
|
|
+}
|
|
|
|
|
|
- for _, t := range *met {
|
|
|
- if t.Emitter == tuple.EmitterName {
|
|
|
- t.Messages = append(t.Messages, m)
|
|
|
+func (w WindowTuplesSet) AddTuple(tuple *Tuple) WindowTuplesSet{
|
|
|
+ found := false
|
|
|
+ for i, t := range w {
|
|
|
+ if t.Emitter == tuple.Emitter {
|
|
|
+ t.Tuples = append(t.Tuples, *tuple)
|
|
|
+ found = true
|
|
|
+ w[i] = t
|
|
|
break
|
|
|
}
|
|
|
}
|
|
|
|
|
|
if !found {
|
|
|
- ets := &EmitterTuples{Emitter:tuple.EmitterName}
|
|
|
- ets.Messages = append(ets.Messages, m)
|
|
|
- *met = append(*met, *ets)
|
|
|
+ ets := &WindowTuples{Emitter: tuple.Emitter}
|
|
|
+ ets.Tuples = append(ets.Tuples, *tuple)
|
|
|
+ w = append(w, *ets)
|
|
|
+ }
|
|
|
+ return w
|
|
|
+}
|
|
|
+
|
|
|
+//Sort by tuple timestamp
|
|
|
+func (w WindowTuplesSet) Sort() {
|
|
|
+ for _, t := range w {
|
|
|
+ tuples := t.Tuples
|
|
|
+ sort.SliceStable(tuples, func(i, j int) bool {
|
|
|
+ return tuples[i].Timestamp < tuples[j].Timestamp
|
|
|
+ })
|
|
|
+ t.Tuples = tuples
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (met MultiEmitterTuples) Value(key string) (interface{}, bool) {
|
|
|
- var ret ResultsAndMessages
|
|
|
- if keys := strings.Split(key, "."); len(keys) != 2 {
|
|
|
+func (w WindowTuplesSet) AggregateEval(expr Expr) []interface{} {
|
|
|
+ var result []interface{}
|
|
|
+ if len(w) != 1 { //should never happen
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ for _, t := range w[0].Tuples {
|
|
|
+ result = append(result, Eval(expr, &t))
|
|
|
+ }
|
|
|
+ return result
|
|
|
+}
|
|
|
+
|
|
|
+type JoinTuple struct {
|
|
|
+ Tuples []Tuple
|
|
|
+}
|
|
|
+
|
|
|
+func (jt *JoinTuple) AddTuple(tuple Tuple) {
|
|
|
+ jt.Tuples = append(jt.Tuples, tuple)
|
|
|
+}
|
|
|
+
|
|
|
+func (jt *JoinTuple) AddTuples(tuples []Tuple) {
|
|
|
+ for _, t := range tuples {
|
|
|
+ jt.Tuples = append(jt.Tuples, t)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (jt *JoinTuple) Value(key string) (interface{}, bool) {
|
|
|
+ keys := strings.Split(key, ".")
|
|
|
+ tuples := jt.Tuples
|
|
|
+ switch len(keys) {
|
|
|
+ case 1:
|
|
|
+ if len(tuples) > 1 {
|
|
|
+ for _, tuple := range tuples { //TODO support key without modifier?
|
|
|
+ v, ok := tuple.Message[key]
|
|
|
+ if ok{
|
|
|
+ return v, ok
|
|
|
+ }
|
|
|
+ }
|
|
|
+ common.Log.Infoln("Wrong key: ", key, ", not found")
|
|
|
+ return nil, false
|
|
|
+ } else{
|
|
|
+ v, ok := tuples[0].Message[key]
|
|
|
+ return v, ok
|
|
|
+ }
|
|
|
+ case 2:
|
|
|
+ emitter, key := keys[0], keys[1]
|
|
|
+ //TODO should use hash here
|
|
|
+ for _, tuple := range tuples {
|
|
|
+ if tuple.Emitter == emitter {
|
|
|
+ v, ok := tuple.Message[key]
|
|
|
+ return v, ok
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return nil, false
|
|
|
+ default:
|
|
|
common.Log.Infoln("Wrong key: ", key, ", expect dot in the expression.")
|
|
|
return nil, false
|
|
|
- } else {
|
|
|
- emitter, key := keys[0], keys[1]
|
|
|
- for _, me := range met {
|
|
|
- if me.Emitter == emitter {
|
|
|
- for _, m := range me.Messages {
|
|
|
- if r, ok := m[key]; ok {
|
|
|
- rm := &EvalResultAndMessage{Stream: keys[0], Result:r, Message:m}
|
|
|
- ret = append(ret, *rm)
|
|
|
- }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (jt *JoinTuple) All(stream string) (interface{}, bool) {
|
|
|
+ if stream != ""{
|
|
|
+ for _, t := range jt.Tuples{
|
|
|
+ if t.Emitter == stream{
|
|
|
+ return t.Message, true
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }else{
|
|
|
+ var r Message = make(map[string]interface{})
|
|
|
+ for _, t := range jt.Tuples{
|
|
|
+ for k, v := range t.Message{
|
|
|
+ if _, ok := r[k]; !ok{
|
|
|
+ r[k] = v
|
|
|
}
|
|
|
- break
|
|
|
}
|
|
|
}
|
|
|
+ return r, true
|
|
|
}
|
|
|
+ return nil, false
|
|
|
+}
|
|
|
|
|
|
- if len(ret) > 0 {
|
|
|
- return ret, true
|
|
|
- } else {
|
|
|
- return nil, false
|
|
|
+type JoinTupleSets []JoinTuple
|
|
|
+func (s JoinTupleSets) Len() int { return len(s) }
|
|
|
+func (s JoinTupleSets) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
|
|
|
+func (s JoinTupleSets) Index(i int) Valuer { return &(s[i]) }
|
|
|
+
|
|
|
+func (s JoinTupleSets) AggregateEval(expr Expr) []interface{} {
|
|
|
+ var result []interface{}
|
|
|
+ for _, t := range s {
|
|
|
+ result = append(result, Eval(expr, &t))
|
|
|
}
|
|
|
+ return result
|
|
|
}
|
|
|
|
|
|
-func (met *MultiEmitterTuples) AddTuple(tuple *Tuple) {
|
|
|
- found := false
|
|
|
- m, ok := tuple.Message.(map[string]interface{})
|
|
|
- if !ok {
|
|
|
- common.Log.Printf("Expect map[string]interface{} for the message type.")
|
|
|
- return
|
|
|
+type GroupedTuples []DataValuer
|
|
|
+func (s GroupedTuples) AggregateEval(expr Expr) []interface{} {
|
|
|
+ var result []interface{}
|
|
|
+ for _, t := range s {
|
|
|
+ result = append(result, Eval(expr, t))
|
|
|
}
|
|
|
+ return result
|
|
|
+}
|
|
|
|
|
|
- for _, t := range *met {
|
|
|
- if t.Emitter == tuple.EmitterName {
|
|
|
- t.Messages = append(t.Messages, m)
|
|
|
- break
|
|
|
- }
|
|
|
+type GroupedTuplesSet []GroupedTuples
|
|
|
+func (s GroupedTuplesSet) Len() int { return len(s) }
|
|
|
+func (s GroupedTuplesSet) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
|
|
|
+func (s GroupedTuplesSet) Index(i int) Valuer { return s[i][0] }
|
|
|
+
|
|
|
+type SortingData interface {
|
|
|
+ Len() int
|
|
|
+ Swap(i, j int)
|
|
|
+ Index(i int) Valuer
|
|
|
+}
|
|
|
+
|
|
|
+// multiSorter implements the Sort interface, sorting the changes within.Hi
|
|
|
+type MultiSorter struct {
|
|
|
+ SortingData
|
|
|
+ fields SortFields
|
|
|
+}
|
|
|
+
|
|
|
+// OrderedBy returns a Sorter that sorts using the less functions, in order.
|
|
|
+// Call its Sort method to sort the data.
|
|
|
+func OrderedBy(fields SortFields) *MultiSorter {
|
|
|
+ return &MultiSorter{
|
|
|
+ fields: fields,
|
|
|
}
|
|
|
+}
|
|
|
|
|
|
- if !found {
|
|
|
- ets := &EmitterTuples{Emitter:tuple.EmitterName}
|
|
|
- ets.Messages = append(ets.Messages, m)
|
|
|
- *met = append(*met, *ets)
|
|
|
+// Less is part of sort.Interface. It is implemented by looping along the
|
|
|
+// less functions until it finds a comparison that discriminates between
|
|
|
+// the two items (one is less than the other). Note that it can call the
|
|
|
+// less functions twice per call. We could change the functions to return
|
|
|
+// -1, 0, 1 and reduce the number of calls for greater efficiency: an
|
|
|
+// exercise for the reader.
|
|
|
+func (ms *MultiSorter) Less(i, j int) bool {
|
|
|
+ p, q := ms.SortingData.Index(i), ms.SortingData.Index(j)
|
|
|
+ vep, veq := &ValuerEval{Valuer: MultiValuer(p, &FunctionValuer{})}, &ValuerEval{Valuer: MultiValuer(q, &FunctionValuer{})}
|
|
|
+ for _, field := range ms.fields {
|
|
|
+ vp, ok := vep.Valuer.Value(field.Name)
|
|
|
+ if !ok {
|
|
|
+ return !field.Ascending
|
|
|
+ }
|
|
|
+ vq, ok := veq.Valuer.Value(field.Name)
|
|
|
+ if !ok {
|
|
|
+ return !field.Ascending
|
|
|
+ }
|
|
|
+ switch {
|
|
|
+ case vep.simpleDataEval(vp, vq, LT):
|
|
|
+ return field.Ascending
|
|
|
+ case veq.simpleDataEval(vq, vp, LT):
|
|
|
+ return !field.Ascending
|
|
|
+ }
|
|
|
}
|
|
|
+ return false
|
|
|
+}
|
|
|
+
|
|
|
+// Sort sorts the argument slice according to the less functions passed to OrderedBy.
|
|
|
+func (ms *MultiSorter) Sort(data SortingData) {
|
|
|
+ ms.SortingData = data
|
|
|
+ sort.Sort(ms)
|
|
|
+}
|
|
|
+
|
|
|
+type EvalResultMessage struct {
|
|
|
+ Emitter string
|
|
|
+ Result interface{}
|
|
|
+ Message Message
|
|
|
}
|
|
|
|
|
|
+type ResultsAndMessages []EvalResultMessage
|
|
|
+
|
|
|
// Eval evaluates expr against a map.
|
|
|
-func Eval(expr Expr, m map[string]interface{}) interface{} {
|
|
|
- eval := ValuerEval{Valuer: MapValuer(m)}
|
|
|
+func Eval(expr Expr, m Valuer) interface{} {
|
|
|
+ eval := ValuerEval{Valuer: MultiValuer(m, &FunctionValuer{})}
|
|
|
return eval.Eval(expr)
|
|
|
}
|
|
|
|
|
@@ -589,7 +803,6 @@ type ValuerEval struct {
|
|
|
// IntegerFloatDivision will set the eval system to treat
|
|
|
// a division between two integers as a floating point division.
|
|
|
IntegerFloatDivision bool
|
|
|
- JoinType JoinType
|
|
|
}
|
|
|
|
|
|
// MultiValuer returns a Valuer that iterates over multiple Valuer instances
|
|
@@ -614,12 +827,66 @@ func (a multiValuer) Call(name string, args []interface{}) (interface{}, bool) {
|
|
|
if valuer, ok := valuer.(CallValuer); ok {
|
|
|
if v, ok := valuer.Call(name, args); ok {
|
|
|
return v, true
|
|
|
+ } else {
|
|
|
+ common.Log.Println(fmt.Sprintf("Found error \"%s\" when call func %s.\n", v, name))
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return nil, false
|
|
|
+}
|
|
|
+
|
|
|
+type multiAggregateValuer struct{
|
|
|
+ data AggregateData
|
|
|
+ valuers []Valuer
|
|
|
+}
|
|
|
+
|
|
|
+func MultiAggregateValuer(data AggregateData, valuers ...Valuer) Valuer {
|
|
|
+ return &multiAggregateValuer{
|
|
|
+ data: data,
|
|
|
+ valuers: valuers,
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (a *multiAggregateValuer) Value(key string) (interface{}, bool) {
|
|
|
+ for _, valuer := range a.valuers {
|
|
|
+ if v, ok := valuer.Value(key); ok {
|
|
|
+ return v, true
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return nil, false
|
|
|
+}
|
|
|
+
|
|
|
+//The args is [][] for aggregation
|
|
|
+func (a *multiAggregateValuer) Call(name string, args []interface{}) (interface{}, bool) {
|
|
|
+ var singleArgs []interface{} = nil
|
|
|
+ for _, valuer := range a.valuers {
|
|
|
+ if a, ok := valuer.(AggregateCallValuer); ok {
|
|
|
+ if v, ok := a.Call(name, args); ok {
|
|
|
+ return v, true
|
|
|
+ }
|
|
|
+ }else if c, ok := valuer.(CallValuer); ok{
|
|
|
+ if singleArgs == nil{
|
|
|
+ for _, arg := range args{
|
|
|
+ if arg, ok := arg.([]interface{}); ok{
|
|
|
+ singleArgs = append(singleArgs, arg[0])
|
|
|
+ }else{
|
|
|
+ common.Log.Infof("multiAggregateValuer does not get [][] args but get %v", args)
|
|
|
+ return nil, false
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if v, ok := c.Call(name, singleArgs); ok {
|
|
|
+ return v, true
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
return nil, false
|
|
|
}
|
|
|
|
|
|
+func (a *multiAggregateValuer) GetAllTuples() AggregateData {
|
|
|
+ return a.data
|
|
|
+}
|
|
|
+
|
|
|
type BracketEvalResult struct {
|
|
|
Start, End int
|
|
|
}
|
|
@@ -646,6 +913,8 @@ func (v *ValuerEval) Eval(expr Expr) interface{} {
|
|
|
return v.Eval(expr.Expr)
|
|
|
case *StringLiteral:
|
|
|
return expr.Val
|
|
|
+ case *BooleanLiteral:
|
|
|
+ return expr.Val
|
|
|
case *ColonExpr:
|
|
|
return &BracketEvalResult{Start:expr.Start, End:expr.End}
|
|
|
case *IndexExpr:
|
|
@@ -653,10 +922,17 @@ func (v *ValuerEval) Eval(expr Expr) interface{} {
|
|
|
case *Call:
|
|
|
if valuer, ok := v.Valuer.(CallValuer); ok {
|
|
|
var args []interface{}
|
|
|
+
|
|
|
if len(expr.Args) > 0 {
|
|
|
args = make([]interface{}, len(expr.Args))
|
|
|
- for i := range expr.Args {
|
|
|
- args[i] = v.Eval(expr.Args[i])
|
|
|
+ if aggreValuer, ok := valuer.(AggregateCallValuer); ok{
|
|
|
+ for i := range expr.Args {
|
|
|
+ args[i] = aggreValuer.GetAllTuples().AggregateEval(expr.Args[i])
|
|
|
+ }
|
|
|
+ }else{
|
|
|
+ for i := range expr.Args {
|
|
|
+ args[i] = v.Eval(expr.Args[i])
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
val, _ := valuer.Call(expr.Name, args)
|
|
@@ -685,8 +961,6 @@ func (v *ValuerEval) Eval(expr Expr) interface{} {
|
|
|
func (v *ValuerEval) evalBinaryExpr(expr *BinaryExpr) interface{} {
|
|
|
lhs := v.Eval(expr.LHS)
|
|
|
switch val := lhs.(type) {
|
|
|
- case ResultsAndMessages:
|
|
|
- return v.evalSet(val, expr)
|
|
|
case map[string]interface{}:
|
|
|
return v.evalJsonExpr(val, expr.OP, expr.RHS)
|
|
|
case []interface{}:
|
|
@@ -706,8 +980,6 @@ func (v *ValuerEval) evalBinaryExpr(expr *BinaryExpr) interface{} {
|
|
|
rhs = false
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
return v.simpleDataEval(lhs, rhs, expr.OP)
|
|
|
}
|
|
|
|
|
@@ -717,7 +989,7 @@ func (v *ValuerEval) evalJsonExpr(result interface{}, op Token, expr Expr) inte
|
|
|
switch op {
|
|
|
case ARROW:
|
|
|
if exp, ok := expr.(*FieldRef); ok {
|
|
|
- ve := &ValuerEval{Valuer: MapValuer(val)}
|
|
|
+ ve := &ValuerEval{Valuer: Message(val)}
|
|
|
return ve.Eval(exp)
|
|
|
} else {
|
|
|
fmt.Printf("The right expression is not a field reference node.\n")
|
|
@@ -764,59 +1036,6 @@ func (v *ValuerEval) evalJsonExpr(result interface{}, op Token, expr Expr) inte
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (v *ValuerEval) evalSet(lefts ResultsAndMessages, expr *BinaryExpr) interface{} {
|
|
|
- //For the JSON expressions
|
|
|
- if expr.OP == ARROW || expr.OP == SUBSET {
|
|
|
- for i, left := range lefts {
|
|
|
- lefts[i].Result = v.evalJsonExpr(left.Result, expr.OP, expr.RHS)
|
|
|
- }
|
|
|
- return lefts
|
|
|
- }
|
|
|
-
|
|
|
- //For the simple type expressions
|
|
|
- rhs := v.Eval(expr.RHS)
|
|
|
- rights, ok := rhs.(ResultsAndMessages)
|
|
|
- if rhs != nil && !ok {
|
|
|
- for i, left := range lefts {
|
|
|
- r := v.simpleDataEval(left.Result, rhs, expr.OP)
|
|
|
- lefts[i].Result = r
|
|
|
- }
|
|
|
- return lefts
|
|
|
- }
|
|
|
-
|
|
|
- sets := MergedEmitterTupleSets{}
|
|
|
- for _, left := range lefts {
|
|
|
- merged := &MergedEmitterTuple{}
|
|
|
- lm := &EmitterTuple{string(left.Stream), left.Message}
|
|
|
- if v.JoinType == LEFT_JOIN {
|
|
|
- merged.AddMergedMessage(*lm)
|
|
|
- }
|
|
|
-
|
|
|
- innerAppend := false
|
|
|
- for _, right := range rights {
|
|
|
- r := v.simpleDataEval(left.Result, right.Result, expr.OP)
|
|
|
- if v1, ok := r.(bool); ok {
|
|
|
- if v1 {
|
|
|
- if v.JoinType == INNER_JOIN && !innerAppend{
|
|
|
- merged.AddMergedMessage(*lm)
|
|
|
- innerAppend = true
|
|
|
- }
|
|
|
- rm := &EmitterTuple{string(right.Stream), right.Message}
|
|
|
- merged.AddMergedMessage(*rm)
|
|
|
- }
|
|
|
- } else {
|
|
|
- common.Log.Infoln("Evaluation error for set.")
|
|
|
- }
|
|
|
- }
|
|
|
- if len(merged.MergedMessage) > 0 {
|
|
|
- sets = append(sets, *merged)
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- return sets
|
|
|
-}
|
|
|
-
|
|
|
-
|
|
|
func (v *ValuerEval) simpleDataEval(lhs, rhs interface{}, op Token) interface{} {
|
|
|
lhs = convertNum(lhs)
|
|
|
rhs = convertNum(rhs)
|
|
@@ -1153,6 +1372,49 @@ func (v *ValuerEval) simpleDataEval(lhs, rhs interface{}, op Token) interface{}
|
|
|
return false
|
|
|
}
|
|
|
return lhs != rhs
|
|
|
+ case LT:
|
|
|
+ rhs, ok := rhs.(string)
|
|
|
+ if !ok {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ return lhs < rhs
|
|
|
+ case LTE:
|
|
|
+ rhs, ok := rhs.(string)
|
|
|
+ if !ok {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ return lhs <= rhs
|
|
|
+ case GT:
|
|
|
+ rhs, ok := rhs.(string)
|
|
|
+ if !ok {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ return lhs > rhs
|
|
|
+ case GTE:
|
|
|
+ rhs, ok := rhs.(string)
|
|
|
+ if !ok {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ return lhs >= rhs
|
|
|
+ }
|
|
|
+ case time.Time:
|
|
|
+ rt, err := common.InterfaceToTime(rhs, "")
|
|
|
+ if err != nil{
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ switch op {
|
|
|
+ case EQ:
|
|
|
+ return lhs.Equal(rt)
|
|
|
+ case NEQ:
|
|
|
+ return !lhs.Equal(rt)
|
|
|
+ case LT:
|
|
|
+ return lhs.Before(rt)
|
|
|
+ case LTE:
|
|
|
+ return lhs.Before(rt) || lhs.Equal(rt)
|
|
|
+ case GT:
|
|
|
+ return lhs.After(rt)
|
|
|
+ case GTE:
|
|
|
+ return lhs.After(rt) || lhs.Equal(rt)
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1222,4 +1484,24 @@ func toFloat64(para interface{}) float64 {
|
|
|
return v
|
|
|
}
|
|
|
return 0
|
|
|
+}
|
|
|
+
|
|
|
+func IsAggStatement(node Node) (bool) {
|
|
|
+ var r bool = false
|
|
|
+ WalkFunc(node, func(n Node) {
|
|
|
+ if f, ok := n.(*Call); ok {
|
|
|
+ fn := strings.ToLower(f.Name)
|
|
|
+ if _, ok1 := aggFuncMap[fn]; ok1 {
|
|
|
+ r = true
|
|
|
+ return
|
|
|
+ }
|
|
|
+ } else if d, ok := n.(Dimensions); ok {
|
|
|
+ ds := d.GetGroups()
|
|
|
+ if ds != nil && len(ds) > 0 {
|
|
|
+ r = true
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ return r
|
|
|
}
|