Bläddra i källkod

move metadata into metadata map

RockyJin 5 år sedan
förälder
incheckning
80cf503d48
5 ändrade filer med 45 tillägg och 42 borttagningar
  1. 27 1
      xsql/ast.go
  2. 0 35
      xsql/funcs_rewriter.go
  3. 10 0
      xsql/metadata_util.go
  4. 3 3
      xsql/parser.go
  5. 5 3
      xstream/extensions/mqtt_source.go

+ 27 - 1
xsql/ast.go

@@ -529,14 +529,35 @@ type Event interface {
 	IsWatermark() bool
 }
 
+type Metadata map[string]interface{}
+
 type Tuple struct {
 	Emitter   string
 	Message   Message
 	Timestamp int64
+	Metadata  Metadata
+}
+
+// Value returns the value for a key in the Message.
+func (m Metadata) Value(key string) (interface{}, bool) {
+	key = strings.ToLower(key)
+	if keys := strings.Split(key, "."); len(keys) == 1 {
+		v, ok := m[key]
+		return v, ok
+	} else if len(keys) == 2 {
+		v, ok := m[keys[1]]
+		return v, ok
+	}
+	common.Log.Println("Invalid key: " + key + ", expect source.field or field.")
+	return nil, false
 }
 
 func (t *Tuple) Value(key string) (interface{}, bool) {
-	return t.Message.Value(key)
+	if v, ok := t.Message.Value(key); ok {
+		return v, ok
+	} else {
+		return t.Metadata.Value(key)
+	}
 }
 
 func (t *Tuple) All(stream string) (interface{}, bool) {
@@ -551,6 +572,11 @@ func (t *Tuple) GetTimestamp() int64 {
 	return t.Timestamp
 }
 
+func (t *Tuple) GetMetadata() Metadata {
+	return t.Metadata
+}
+
+
 func (t *Tuple) IsWatermark() bool {
 	return false
 }

+ 0 - 35
xsql/funcs_rewriter.go

@@ -1,35 +0,0 @@
-package xsql
-
-import (
-	"strings"
-)
-
-const INTERNAL_MQTT_TOPIC_KEY string = "internal_mqtt_topic_key_$$"
-const INTERNAL_MQTT_MSG_ID_KEY string = "internal_mqtt_msg_id_key_$$"
-
-
-//For functions such as mqtt(topic). If the field definitions also has a field named "topic", then it need to
-//have an internal key for "topic" to avoid key conflicts.
-var SpecialKeyMapper = map[string]string{"topic" : INTERNAL_MQTT_TOPIC_KEY, "messageid" : INTERNAL_MQTT_MSG_ID_KEY}
-func AddSpecialKeyMap(left, right string) {
-	SpecialKeyMapper[left] = right
-}
-
-/**
-The function is used for re-write the parameter names.
-For example, for mqtt function, the arguments could be 'topic' or 'messageid'.
-If the field name defined in stream happens to be 'topic' or 'messageid', it will have conflicts.
- */
-func (c Call) rewrite_func() *Call {
-	if strings.ToLower(c.Name) == "mqtt" {
-		if f, ok := c.Args[0].(*FieldRef); ok {
-			if n, ok1 := SpecialKeyMapper[f.Name]; ok1 {
-				f.Name = n
-				c.Args[0] = f
-			}
-		}
-	}
-	return &c
-}
-
-

+ 10 - 0
xsql/metadata_util.go

@@ -0,0 +1,10 @@
+package xsql
+
+const INTERNAL_MQTT_TOPIC_KEY string = "topic"
+const INTERNAL_MQTT_MSG_ID_KEY string = "id"
+
+var SpecialKeyMapper = map[string]string{INTERNAL_MQTT_TOPIC_KEY : "", INTERNAL_MQTT_MSG_ID_KEY : ""}
+
+func AddSpecialKeyMap(key string) {
+	SpecialKeyMapper[key] = ""
+}

+ 3 - 3
xsql/parser.go

@@ -592,13 +592,13 @@ func (p *Parser) parseCall(name string) (Expr, error) {
 	var args []Expr
 	for {
 		if tok, _ := p.scanIgnoreWhitespace(); tok == RPAREN {
-			return Call{Name: name, Args: args}.rewrite_func(), nil
+			return &Call{Name: name, Args: args}, nil
 		} else if tok == ASTERISK {
 			if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 != RPAREN {
 				return nil, fmt.Errorf("found %q, expected right paren.", lit2)
 			} else {
 				args = append(args, &StringLiteral{Val:"*"})
-				return Call{Name: name, Args: args}.rewrite_func(), nil
+				return &Call{Name: name, Args: args}, nil
 			}
 		} else {
 			p.unscan()
@@ -623,7 +623,7 @@ func (p *Parser) parseCall(name string) (Expr, error) {
 		if valErr := validateFuncs(name, args); valErr != nil {
 			return nil, valErr
 		}
-		return Call{Name: name, Args: args}.rewrite_func(), nil
+		return &Call{Name: name, Args: args}, nil
 	} else {
 		if error != nil {
 			return nil, error

+ 5 - 3
xstream/extensions/mqtt_source.go

@@ -153,10 +153,12 @@ func (ms *MQTTSource) Open(ctx context.Context) error {
 			}
 			//Convert the keys to lowercase
 			result = xsql.LowercaseKeyMap(result)
-			result[xsql.INTERNAL_MQTT_TOPIC_KEY] = msg.Topic()
-			result[xsql.INTERNAL_MQTT_MSG_ID_KEY] = strconv.Itoa(int(msg.MessageID()))
 
-			tuple := &xsql.Tuple{Emitter: ms.tpc, Message:result, Timestamp: common.TimeToUnixMilli(time.Now())}
+			meta := make(map[string]interface{})
+			meta[xsql.INTERNAL_MQTT_TOPIC_KEY] = msg.Topic()
+			meta[xsql.INTERNAL_MQTT_MSG_ID_KEY] = strconv.Itoa(int(msg.MessageID()))
+
+			tuple := &xsql.Tuple{Emitter: ms.tpc, Message:result, Timestamp: common.TimeToUnixMilli(time.Now()), Metadata:meta}
 			for _, out := range ms.outs{
 				out <- tuple
 			}