浏览代码

Merge pull request #150 from emqx/meta

Refactor Mqtt func
jinfahua 5 年之前
父节点
当前提交
e9fd056e93

+ 1 - 0
docs/en_US/sqls/built-in_functions.md

@@ -93,3 +93,4 @@ Aggregate functions perform a calculation on a set of values and return a single
 | newuuid   | newuuid()    | Returns a random 16-byte UUID.                               |
 | timestamp | timestamp()  | Returns the current timestamp in milliseconds from 00:00:00 Coordinated Universal Time (UTC), Thursday, 1 January 1970 |
 | mqtt      | mqtt(topic)  | Returns the MQTT meta-data of specified key. The current supported keys<br />- topic: return the topic of message.  If there are multiple stream source, then specify the source name in parameter. Such as ``mqtt(src1.topic)``<br />- messageid: return the message id of message. If there are multiple stream source, then specify the source name in parameter. Such as ``mqtt(src2.messageid)`` |
+| meta      | meta(topic)  | Returns the meta-data of specified key. The key could be:<br/> - a standalone key if there is only one source in the from clause, such as ``meta(device)``<br />- A qualified key to specify the stream, such as ``meta(src1.device)`` <br />- A key with arrow for multi level meta data, such as ``meta(src1.reading->device->name)`` This assumes reading is a map structure meta data.|

+ 4 - 3
xsql/funcs_ast_validator.go

@@ -294,10 +294,11 @@ func validateOtherFunc(name string, args []Expr) error {
 			return err
 		}
 		if isIntegerArg(args[0]) || isTimeArg(args[0]) || isBooleanArg(args[0]) || isStringArg(args[0]) || isFloatArg(args[0]) {
-			return produceErrInfo(name, 0, "field reference")
+			return produceErrInfo(name, 0, "meta reference")
 		}
-		if p, ok := args[0].(*FieldRef); ok {
-			if _, ok := SpecialKeyMapper[p.Name]; !ok {
+		if p, ok := args[0].(*MetaRef); ok {
+			name := strings.ToLower(p.Name)
+			if name != "topic" && name != "messageid" {
 				return fmt.Errorf("Parameter of mqtt function can be only topic or messageid.")
 			}
 		}

+ 1 - 1
xsql/funcs_ast_validator_test.go

@@ -389,7 +389,7 @@ func TestFuncValidator(t *testing.T) {
 		{
 			s:    `SELECT mqtt("topic") FROM tbl`,
 			stmt: nil,
-			err:  "Expect field reference type for 1 parameter of function mqtt.",
+			err:  "Expect meta reference type for 1 parameter of function mqtt.",
 		},
 
 		{

+ 0 - 31
xsql/metadata_util.go

@@ -1,31 +0,0 @@
-package xsql
-
-import "strings"
-
-const INTERNAL_MQTT_TOPIC_KEY string = "internal_mqtt_topic_key_$$"
-const INTERNAL_MQTT_MSG_ID_KEY string = "internal_mqtt_msg_id_key_$$"
-
-//For functions such as mqtt(topic). If the field definitions also has a field named "topic", then it need to
-//have an internal key for "topic" to avoid key conflicts.
-var SpecialKeyMapper = map[string]string{"topic": INTERNAL_MQTT_TOPIC_KEY, "messageid": INTERNAL_MQTT_MSG_ID_KEY}
-
-func AddSpecialKeyMap(left, right string) {
-	SpecialKeyMapper[left] = right
-}
-
-/**
-The function is used for re-write the parameter names.
-For example, for mqtt function, the arguments could be 'topic' or 'messageid'.
-If the field name defined in stream happens to be 'topic' or 'messageid', it will have conflicts.
-*/
-func (c Call) rewrite_func() *Call {
-	if strings.ToLower(c.Name) == "mqtt" {
-		if f, ok := c.Args[0].(*FieldRef); ok {
-			if n, ok1 := SpecialKeyMapper[f.Name]; ok1 {
-				f.Name = n
-				c.Args[0] = f
-			}
-		}
-	}
-	return &c
-}

+ 4 - 4
xsql/parser.go

@@ -595,7 +595,7 @@ func (p *Parser) parseAs(f *Field) (*Field, error) {
 }
 
 func (p *Parser) parseCall(name string) (Expr, error) {
-	if strings.ToLower(name) == "meta" {
+	if strings.ToLower(name) == "meta" || strings.ToLower(name) == "mqtt" {
 		p.inmeta = true
 		defer func() {
 			p.inmeta = false
@@ -604,13 +604,13 @@ func (p *Parser) parseCall(name string) (Expr, error) {
 	var args []Expr
 	for {
 		if tok, _ := p.scanIgnoreWhitespace(); tok == RPAREN {
-			return Call{Name: name, Args: args}.rewrite_func(), nil
+			return &Call{Name: name, Args: args}, nil
 		} else if tok == ASTERISK {
 			if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 != RPAREN {
 				return nil, fmt.Errorf("found %q, expected right paren.", lit2)
 			} else {
 				args = append(args, &StringLiteral{Val: "*"})
-				return Call{Name: name, Args: args}.rewrite_func(), nil
+				return &Call{Name: name, Args: args}, nil
 			}
 		} else {
 			p.unscan()
@@ -635,7 +635,7 @@ func (p *Parser) parseCall(name string) (Expr, error) {
 		if valErr := validateFuncs(name, args); valErr != nil {
 			return nil, valErr
 		}
-		return Call{Name: name, Args: args}.rewrite_func(), nil
+		return &Call{Name: name, Args: args}, nil
 	} else {
 		if error != nil {
 			return nil, error

+ 8 - 6
xsql/plans/join_test.go

@@ -620,8 +620,9 @@ func TestLeftJoinPlan_Apply(t *testing.T) {
 					Emitter: "src1",
 					Tuples: []xsql.Tuple{
 						{
-							Emitter: "src1",
-							Message: xsql.Message{"id1": 1, "f1": "v1", xsql.INTERNAL_MQTT_TOPIC_KEY: "devices/type1/device001"},
+							Emitter:  "src1",
+							Message:  xsql.Message{"id1": 1, "f1": "v1"},
+							Metadata: xsql.Metadata{"topic": "devices/type1/device001"},
 						},
 					},
 				},
@@ -630,8 +631,9 @@ func TestLeftJoinPlan_Apply(t *testing.T) {
 					Emitter: "src2",
 					Tuples: []xsql.Tuple{
 						{
-							Emitter: "src2",
-							Message: xsql.Message{"id2": 1, "f2": "w1", xsql.INTERNAL_MQTT_TOPIC_KEY: "devices/type2/device001"},
+							Emitter:  "src2",
+							Message:  xsql.Message{"id2": 1, "f2": "w1"},
+							Metadata: xsql.Metadata{"topic": "devices/type2/device001"},
 						},
 					},
 				},
@@ -639,8 +641,8 @@ func TestLeftJoinPlan_Apply(t *testing.T) {
 			result: xsql.JoinTupleSets{
 				xsql.JoinTuple{
 					Tuples: []xsql.Tuple{
-						{Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1", xsql.INTERNAL_MQTT_TOPIC_KEY: "devices/type1/device001"}},
-						{Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1", xsql.INTERNAL_MQTT_TOPIC_KEY: "devices/type2/device001"}},
+						{Emitter: "src1", Message: xsql.Message{"id1": 1, "f1": "v1"}, Metadata: xsql.Metadata{"topic": "devices/type1/device001"}},
+						{Emitter: "src2", Message: xsql.Message{"id2": 1, "f2": "w1"}, Metadata: xsql.Metadata{"topic": "devices/type2/device001"}},
 					},
 				},
 			},

+ 12 - 8
xsql/plans/misc_func_test.go

@@ -92,8 +92,9 @@ func TestHashFunc_Apply1(t *testing.T) {
 			sql: "SELECT mqtt(topic) AS a FROM test",
 			data: &xsql.Tuple{
 				Emitter: "test",
-				Message: xsql.Message{
-					xsql.INTERNAL_MQTT_TOPIC_KEY: "devices/device_001/message",
+				Message: xsql.Message{},
+				Metadata: xsql.Metadata{
+					"topic": "devices/device_001/message",
 				},
 			},
 			result: []map[string]interface{}{{
@@ -105,8 +106,9 @@ func TestHashFunc_Apply1(t *testing.T) {
 			sql: "SELECT mqtt(topic) AS a FROM test",
 			data: &xsql.Tuple{
 				Emitter: "test",
-				Message: xsql.Message{
-					xsql.INTERNAL_MQTT_TOPIC_KEY: "devices/device_001/message",
+				Message: xsql.Message{},
+				Metadata: xsql.Metadata{
+					"topic": "devices/device_001/message",
 				},
 			},
 			result: []map[string]interface{}{{
@@ -119,8 +121,10 @@ func TestHashFunc_Apply1(t *testing.T) {
 			data: &xsql.Tuple{
 				Emitter: "test",
 				Message: xsql.Message{
-					"topic":                      "fff",
-					xsql.INTERNAL_MQTT_TOPIC_KEY: "devices/device_001/message",
+					"topic": "fff",
+				},
+				Metadata: xsql.Metadata{
+					"topic": "devices/device_001/message",
 				},
 			},
 			result: []map[string]interface{}{{
@@ -169,8 +173,8 @@ func TestMqttFunc_Apply2(t *testing.T) {
 			data: xsql.JoinTupleSets{
 				xsql.JoinTuple{
 					Tuples: []xsql.Tuple{
-						{Emitter: "src1", Message: xsql.Message{"id1": "1", "f1": "v1", xsql.INTERNAL_MQTT_TOPIC_KEY: "devices/type1/device001"}},
-						{Emitter: "src2", Message: xsql.Message{"id2": "1", "f2": "w1", xsql.INTERNAL_MQTT_TOPIC_KEY: "devices/type2/device001"}},
+						{Emitter: "src1", Message: xsql.Message{"id1": "1", "f1": "v1"}, Metadata: xsql.Metadata{"topic": "devices/type1/device001"}},
+						{Emitter: "src2", Message: xsql.Message{"id2": "1", "f2": "w1"}, Metadata: xsql.Metadata{"topic": "devices/type2/device001"}},
 					},
 				},
 			},

+ 2 - 2
xstream/extensions/mqtt_source.go

@@ -154,8 +154,8 @@ func subscribe(topic string, client MQTT.Client, ctx api.StreamContext, consumer
 		result = xsql.LowercaseKeyMap(result)
 
 		meta := make(map[string]interface{})
-		meta[xsql.INTERNAL_MQTT_TOPIC_KEY] = msg.Topic()
-		meta[xsql.INTERNAL_MQTT_MSG_ID_KEY] = strconv.Itoa(int(msg.MessageID()))
+		meta["topic"] = msg.Topic()
+		meta["messageid"] = strconv.Itoa(int(msg.MessageID()))
 		select {
 		case consumer <- api.NewDefaultSourceTuple(result, meta):
 			log.Debugf("send data to source node")