|
@@ -54,6 +54,8 @@ const (
|
|
|
CROSS_JOIN
|
|
|
)
|
|
|
|
|
|
+const COLUMN_SEPARATOR = "\007"
|
|
|
+
|
|
|
type Join struct {
|
|
|
Name string
|
|
|
Alias string
|
|
@@ -538,26 +540,23 @@ type Message map[string]interface{}
|
|
|
|
|
|
// Value returns the value for a key in the Message.
|
|
|
func (m Message) Value(key string) (interface{}, bool) {
|
|
|
- key1 := strings.ToLower(key)
|
|
|
- if v, ok := m.valueUtil(key1); ok {
|
|
|
+ var colkey string
|
|
|
+ if keys := strings.Split(key, COLUMN_SEPARATOR); len(keys) == 1 {
|
|
|
+ colkey = key
|
|
|
+ } else if len(keys) == 2 {
|
|
|
+ colkey = keys[1]
|
|
|
+ } else {
|
|
|
+ common.Log.Println("Invalid key: " + key + ", expect source.field or field.")
|
|
|
+ return nil, false
|
|
|
+ }
|
|
|
+ key1 := strings.ToLower(colkey)
|
|
|
+ if v, ok := m[key1]; ok {
|
|
|
return v, ok
|
|
|
} else {
|
|
|
//Only when with 'SELECT * FROM ...' and 'schemaless', the key in map is not convert to lower case.
|
|
|
//So all of keys in map should be convert to lowercase and then compare them.
|
|
|
- return m.getIgnoreCase(key)
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-func (m Message) valueUtil(key string) (interface{}, bool) {
|
|
|
- if keys := strings.Split(key, "."); len(keys) == 1 {
|
|
|
- v, ok := m[key]
|
|
|
- return v, ok
|
|
|
- } else if len(keys) == 2 {
|
|
|
- v, ok := m[keys[1]]
|
|
|
- return v, ok
|
|
|
+ return m.getIgnoreCase(colkey)
|
|
|
}
|
|
|
- common.Log.Println("Invalid key: " + key + ", expect source.field or field.")
|
|
|
- return nil, false
|
|
|
}
|
|
|
|
|
|
func (m Message) getIgnoreCase(key interface{}) (interface{}, bool) {
|
|
@@ -741,7 +740,7 @@ func getTupleValue(tuple Tuple, t string, key string) (interface{}, bool) {
|
|
|
}
|
|
|
|
|
|
func (jt *JoinTuple) doGetValue(t string, key string) (interface{}, bool) {
|
|
|
- keys := strings.Split(key, ".")
|
|
|
+ keys := strings.Split(key, COLUMN_SEPARATOR)
|
|
|
tuples := jt.Tuples
|
|
|
switch len(keys) {
|
|
|
case 1:
|
|
@@ -1135,7 +1134,7 @@ func (v *ValuerEval) Eval(expr Expr) interface{} {
|
|
|
return val
|
|
|
} else {
|
|
|
//The field specified with stream source
|
|
|
- val, _ := v.Valuer.Value(string(expr.StreamName) + "." + expr.Name)
|
|
|
+ val, _ := v.Valuer.Value(string(expr.StreamName) + COLUMN_SEPARATOR + expr.Name)
|
|
|
return val
|
|
|
}
|
|
|
case *MetaRef:
|
|
@@ -1144,7 +1143,7 @@ func (v *ValuerEval) Eval(expr Expr) interface{} {
|
|
|
return val
|
|
|
} else {
|
|
|
//The field specified with stream source
|
|
|
- val, _ := v.Valuer.Meta(string(expr.StreamName) + "." + expr.Name)
|
|
|
+ val, _ := v.Valuer.Meta(string(expr.StreamName) + COLUMN_SEPARATOR + expr.Name)
|
|
|
return val
|
|
|
}
|
|
|
case *Wildcard:
|