|
@@ -48,22 +48,13 @@ func ToMessage(input interface{}) (Message, bool) {
|
|
|
}
|
|
|
|
|
|
// Value returns the value for a key in the Message.
|
|
|
-func (m Message) Value(key string) (interface{}, bool) {
|
|
|
- var colkey string
|
|
|
- if keys := strings.Split(key, ast.COLUMN_SEPARATOR); len(keys) == 1 {
|
|
|
- colkey = key
|
|
|
- } else if len(keys) == 2 {
|
|
|
- colkey = keys[1]
|
|
|
- } else {
|
|
|
- conf.Log.Println("Invalid key: " + key + ", expect source.field or field.")
|
|
|
- return nil, false
|
|
|
- }
|
|
|
- if v, ok := m[colkey]; ok {
|
|
|
+func (m Message) Value(key, table string) (interface{}, bool) {
|
|
|
+ if v, ok := m[key]; ok {
|
|
|
return v, ok
|
|
|
} else {
|
|
|
//Only when with 'SELECT * FROM ...' and 'schemaless', the key in map is not convert to lower case.
|
|
|
//So all of keys in map should be convert to lowercase and then compare them.
|
|
|
- return m.getIgnoreCase(colkey)
|
|
|
+ return m.getIgnoreCase(key)
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -78,11 +69,11 @@ func (m Message) getIgnoreCase(key interface{}) (interface{}, bool) {
|
|
|
return nil, false
|
|
|
}
|
|
|
|
|
|
-func (m Message) Meta(key string) (interface{}, bool) {
|
|
|
+func (m Message) Meta(key, table string) (interface{}, bool) {
|
|
|
if key == "*" {
|
|
|
return map[string]interface{}(m), true
|
|
|
}
|
|
|
- return m.Value(key)
|
|
|
+ return m.Value(key, table)
|
|
|
}
|
|
|
|
|
|
func (m Message) AppendAlias(k string, v interface{}) bool {
|
|
@@ -97,17 +88,17 @@ type Event interface {
|
|
|
|
|
|
type Metadata Message
|
|
|
|
|
|
-func (m Metadata) Value(key string) (interface{}, bool) {
|
|
|
+func (m Metadata) Value(key, table string) (interface{}, bool) {
|
|
|
msg := Message(m)
|
|
|
- return msg.Value(key)
|
|
|
+ return msg.Value(key, table)
|
|
|
}
|
|
|
|
|
|
-func (m Metadata) Meta(key string) (interface{}, bool) {
|
|
|
+func (m Metadata) Meta(key, table string) (interface{}, bool) {
|
|
|
if key == "*" {
|
|
|
return map[string]interface{}(m), true
|
|
|
}
|
|
|
msg := Message(m)
|
|
|
- return msg.Meta(key)
|
|
|
+ return msg.Meta(key, table)
|
|
|
}
|
|
|
|
|
|
type Alias struct {
|
|
@@ -126,7 +117,7 @@ func (a *Alias) AliasValue(key string) (interface{}, bool) {
|
|
|
if a.AliasMap == nil {
|
|
|
return nil, false
|
|
|
}
|
|
|
- return a.AliasMap.Value(key)
|
|
|
+ return a.AliasMap.Value(key, "")
|
|
|
}
|
|
|
|
|
|
type Tuple struct {
|
|
@@ -137,19 +128,19 @@ type Tuple struct {
|
|
|
Alias
|
|
|
}
|
|
|
|
|
|
-func (t *Tuple) Value(key string) (interface{}, bool) {
|
|
|
+func (t *Tuple) Value(key, table string) (interface{}, bool) {
|
|
|
r, ok := t.AliasValue(key)
|
|
|
if ok {
|
|
|
return r, ok
|
|
|
}
|
|
|
- return t.Message.Value(key)
|
|
|
+ return t.Message.Value(key, table)
|
|
|
}
|
|
|
|
|
|
-func (t *Tuple) Meta(key string) (interface{}, bool) {
|
|
|
+func (t *Tuple) Meta(key, table string) (interface{}, bool) {
|
|
|
if key == "*" {
|
|
|
return map[string]interface{}(t.Metadata), true
|
|
|
}
|
|
|
- return t.Metadata.Value(key)
|
|
|
+ return t.Metadata.Value(key, table)
|
|
|
}
|
|
|
|
|
|
func (t *Tuple) All(string) (interface{}, bool) {
|
|
@@ -203,11 +194,11 @@ type WindowRangeValuer struct {
|
|
|
*WindowRange
|
|
|
}
|
|
|
|
|
|
-func (r *WindowRangeValuer) Value(_ string) (interface{}, bool) {
|
|
|
+func (r *WindowRangeValuer) Value(_, _ string) (interface{}, bool) {
|
|
|
return nil, false
|
|
|
}
|
|
|
|
|
|
-func (r *WindowRangeValuer) Meta(_ string) (interface{}, bool) {
|
|
|
+func (r *WindowRangeValuer) Meta(_, _ string) (interface{}, bool) {
|
|
|
return nil, false
|
|
|
}
|
|
|
|
|
@@ -321,26 +312,20 @@ func (jt *JoinTuple) AddTuples(tuples []Tuple) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func getTupleValue(tuple Tuple, t string, key string) (interface{}, bool) {
|
|
|
- switch t {
|
|
|
- case "value":
|
|
|
- return tuple.Value(key)
|
|
|
- case "meta":
|
|
|
- return tuple.Meta(key)
|
|
|
- default:
|
|
|
- conf.Log.Errorf("cannot get tuple for type %s", t)
|
|
|
- return nil, false
|
|
|
+func getTupleValue(tuple Tuple, key string, isVal bool) (interface{}, bool) {
|
|
|
+ if isVal {
|
|
|
+ return tuple.Value(key, "")
|
|
|
+ } else {
|
|
|
+ return tuple.Meta(key, "")
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (jt *JoinTuple) doGetValue(t string, key string) (interface{}, bool) {
|
|
|
- keys := strings.Split(key, ast.COLUMN_SEPARATOR)
|
|
|
+func (jt *JoinTuple) doGetValue(key, table string, isVal bool) (interface{}, bool) {
|
|
|
tuples := jt.Tuples
|
|
|
- switch len(keys) {
|
|
|
- case 1:
|
|
|
+ if table == "" {
|
|
|
if len(tuples) > 1 {
|
|
|
for _, tuple := range tuples { //TODO support key without modifier?
|
|
|
- v, ok := getTupleValue(tuple, t, key)
|
|
|
+ v, ok := getTupleValue(tuple, key, isVal)
|
|
|
if ok {
|
|
|
return v, ok
|
|
|
}
|
|
@@ -348,33 +333,29 @@ func (jt *JoinTuple) doGetValue(t string, key string) (interface{}, bool) {
|
|
|
conf.Log.Debugf("Wrong key: %s not found", key)
|
|
|
return nil, false
|
|
|
} else {
|
|
|
- return getTupleValue(tuples[0], t, key)
|
|
|
+ return getTupleValue(tuples[0], key, isVal)
|
|
|
}
|
|
|
- case 2:
|
|
|
- emitter, key := keys[0], keys[1]
|
|
|
+ } else {
|
|
|
//TODO should use hash here
|
|
|
for _, tuple := range tuples {
|
|
|
- if tuple.Emitter == emitter {
|
|
|
- return getTupleValue(tuple, t, key)
|
|
|
+ if tuple.Emitter == table {
|
|
|
+ return getTupleValue(tuple, key, isVal)
|
|
|
}
|
|
|
}
|
|
|
return nil, false
|
|
|
- default:
|
|
|
- conf.Log.Infoln("Wrong key: ", key, ", expect dot in the expression.")
|
|
|
- return nil, false
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (jt *JoinTuple) Value(key string) (interface{}, bool) {
|
|
|
+func (jt *JoinTuple) Value(key, table string) (interface{}, bool) {
|
|
|
r, ok := jt.AliasValue(key)
|
|
|
if ok {
|
|
|
return r, ok
|
|
|
}
|
|
|
- return jt.doGetValue("value", key)
|
|
|
+ return jt.doGetValue(key, table, true)
|
|
|
}
|
|
|
|
|
|
-func (jt *JoinTuple) Meta(key string) (interface{}, bool) {
|
|
|
- return jt.doGetValue("meta", key)
|
|
|
+func (jt *JoinTuple) Meta(key, table string) (interface{}, bool) {
|
|
|
+ return jt.doGetValue(key, table, false)
|
|
|
}
|
|
|
|
|
|
func (jt *JoinTuple) All(stream string) (interface{}, bool) {
|