|
@@ -541,8 +541,7 @@ func (m Message) Value(key string) (interface{}, bool) {
|
|
}
|
|
}
|
|
|
|
|
|
func (m Message) Meta(key string) (interface{}, bool) {
|
|
func (m Message) Meta(key string) (interface{}, bool) {
|
|
- common.Log.Println("Message cannot get meta")
|
|
|
|
- return nil, false
|
|
|
|
|
|
+ return m.Value(key)
|
|
}
|
|
}
|
|
|
|
|
|
type Event interface {
|
|
type Event interface {
|
|
@@ -550,7 +549,17 @@ type Event interface {
|
|
IsWatermark() bool
|
|
IsWatermark() bool
|
|
}
|
|
}
|
|
|
|
|
|
-type Metadata map[string]interface{}
|
|
|
|
|
|
+type Metadata Message
|
|
|
|
+
|
|
|
|
+func (m Metadata) Value(key string) (interface{}, bool) {
|
|
|
|
+ msg := Message(m)
|
|
|
|
+ return msg.Value(key)
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (m Metadata) Meta(key string) (interface{}, bool) {
|
|
|
|
+ msg := Message(m)
|
|
|
|
+ return msg.Meta(key)
|
|
|
|
+}
|
|
|
|
|
|
type Tuple struct {
|
|
type Tuple struct {
|
|
Emitter string
|
|
Emitter string
|
|
@@ -559,20 +568,6 @@ type Tuple struct {
|
|
Metadata Metadata
|
|
Metadata Metadata
|
|
}
|
|
}
|
|
|
|
|
|
-// Value returns the value for a key in the Message.
|
|
|
|
-func (m Metadata) Value(key string) (interface{}, bool) {
|
|
|
|
- key = strings.ToLower(key)
|
|
|
|
- if keys := strings.Split(key, "."); len(keys) == 1 {
|
|
|
|
- v, ok := m[key]
|
|
|
|
- return v, ok
|
|
|
|
- } else if len(keys) == 2 {
|
|
|
|
- v, ok := m[keys[1]]
|
|
|
|
- return v, ok
|
|
|
|
- }
|
|
|
|
- common.Log.Println("Invalid key: " + key + ", expect source.field or field.")
|
|
|
|
- return nil, false
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
func (t *Tuple) Value(key string) (interface{}, bool) {
|
|
func (t *Tuple) Value(key string) (interface{}, bool) {
|
|
return t.Message.Value(key)
|
|
return t.Message.Value(key)
|
|
}
|
|
}
|
|
@@ -692,48 +687,26 @@ func (jt *JoinTuple) AddTuples(tuples []Tuple) {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-func (jt *JoinTuple) Value(key string) (interface{}, bool) {
|
|
|
|
- keys := strings.Split(key, ".")
|
|
|
|
- tuples := jt.Tuples
|
|
|
|
- switch len(keys) {
|
|
|
|
- case 1:
|
|
|
|
- if len(tuples) > 1 {
|
|
|
|
- for _, tuple := range tuples { //TODO support key without modifier?
|
|
|
|
- v, ok := tuple.Message[key]
|
|
|
|
- if ok {
|
|
|
|
- return v, ok
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- common.Log.Infoln("Wrong key: ", key, ", not found")
|
|
|
|
- return nil, false
|
|
|
|
- } else {
|
|
|
|
- v, ok := tuples[0].Message[key]
|
|
|
|
- return v, ok
|
|
|
|
- }
|
|
|
|
- case 2:
|
|
|
|
- emitter, key := keys[0], keys[1]
|
|
|
|
- //TODO should use hash here
|
|
|
|
- for _, tuple := range tuples {
|
|
|
|
- if tuple.Emitter == emitter {
|
|
|
|
- v, ok := tuple.Message[key]
|
|
|
|
- return v, ok
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- return nil, false
|
|
|
|
|
|
+func getTupleValue(tuple Tuple, t string, key string) (interface{}, bool) {
|
|
|
|
+ switch t {
|
|
|
|
+ case "value":
|
|
|
|
+ return tuple.Value(key)
|
|
|
|
+ case "meta":
|
|
|
|
+ return tuple.Meta(key)
|
|
default:
|
|
default:
|
|
- common.Log.Infoln("Wrong key: ", key, ", expect dot in the expression.")
|
|
|
|
|
|
+ common.Log.Errorf("cannot get tuple for type %s", t)
|
|
return nil, false
|
|
return nil, false
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-func (jt *JoinTuple) Meta(key string) (interface{}, bool) {
|
|
|
|
|
|
+func (jt *JoinTuple) doGetValue(t string, key string) (interface{}, bool) {
|
|
keys := strings.Split(key, ".")
|
|
keys := strings.Split(key, ".")
|
|
tuples := jt.Tuples
|
|
tuples := jt.Tuples
|
|
switch len(keys) {
|
|
switch len(keys) {
|
|
case 1:
|
|
case 1:
|
|
if len(tuples) > 1 {
|
|
if len(tuples) > 1 {
|
|
for _, tuple := range tuples { //TODO support key without modifier?
|
|
for _, tuple := range tuples { //TODO support key without modifier?
|
|
- v, ok := tuple.Metadata[key]
|
|
|
|
|
|
+ v, ok := getTupleValue(tuple, t, key)
|
|
if ok {
|
|
if ok {
|
|
return v, ok
|
|
return v, ok
|
|
}
|
|
}
|
|
@@ -741,16 +714,14 @@ func (jt *JoinTuple) Meta(key string) (interface{}, bool) {
|
|
common.Log.Infoln("Wrong key: ", key, ", not found")
|
|
common.Log.Infoln("Wrong key: ", key, ", not found")
|
|
return nil, false
|
|
return nil, false
|
|
} else {
|
|
} else {
|
|
- v, ok := tuples[0].Metadata[key]
|
|
|
|
- return v, ok
|
|
|
|
|
|
+ return getTupleValue(tuples[0], t, key)
|
|
}
|
|
}
|
|
case 2:
|
|
case 2:
|
|
emitter, key := keys[0], keys[1]
|
|
emitter, key := keys[0], keys[1]
|
|
//TODO should use hash here
|
|
//TODO should use hash here
|
|
for _, tuple := range tuples {
|
|
for _, tuple := range tuples {
|
|
if tuple.Emitter == emitter {
|
|
if tuple.Emitter == emitter {
|
|
- v, ok := tuple.Metadata[key]
|
|
|
|
- return v, ok
|
|
|
|
|
|
+ return getTupleValue(tuple, t, key)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return nil, false
|
|
return nil, false
|
|
@@ -760,6 +731,14 @@ func (jt *JoinTuple) Meta(key string) (interface{}, bool) {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+func (jt *JoinTuple) Value(key string) (interface{}, bool) {
|
|
|
|
+ return jt.doGetValue("value", key)
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (jt *JoinTuple) Meta(key string) (interface{}, bool) {
|
|
|
|
+ return jt.doGetValue("meta", key)
|
|
|
|
+}
|
|
|
|
+
|
|
func (jt *JoinTuple) All(stream string) (interface{}, bool) {
|
|
func (jt *JoinTuple) All(stream string) (interface{}, bool) {
|
|
if stream != "" {
|
|
if stream != "" {
|
|
for _, t := range jt.Tuples {
|
|
for _, t := range jt.Tuples {
|