Преглед изворни кода

add mqtt & spli_value func

RockyJin пре 5 година
родитељ
комит
a50ff015ab

+ 20 - 5
docs/en_US/rules/sinks/mqtt.md

@@ -2,9 +2,24 @@
 
 The action is used for publish output message into a MQTT server. 
 
-| Property name | Optional | Description                                                  |
-| ------------- | -------- | ------------------------------------------------------------ |
-| server        | false    | The broker address of the mqtt server, such as ``tcp://127.0.0.1:1883`` |
-| topic         | false    | The mqtt topic, such as ``analysis/result``                  |
-| clientId      | true     | The client id for mqtt connection. If not specified, an uuid will be used |
+| Property name    | Optional | Description                                                  |
+| ---------------- | -------- | ------------------------------------------------------------ |
+| server           | false    | The broker address of the mqtt server, such as ``tcp://127.0.0.1:1883`` |
+| topic            | false    | The mqtt topic, such as ``analysis/result``                  |
+| clientId         | true     | The client id for mqtt connection. If not specified, an uuid will be used |
+| protocol_version | true     | 3.1 (also refer as MQTT 3) or 3.1.1 (also refer as MQTT 4).  If not specified, the default value is 3.1. |
+| username         | true     | The user name for the connection.                        |
+| password         | true     | The password for the connection.                             |
+
+Below is one of the sample configuration.
+```json
+{
+  "mqtt": {
+  	"server": "tcp://sink_server:1883",
+  	"topic": "demoSink",
+  	"clientId": "client_id_1",
+    "protocol_version": "3.1.1"
+  }
+}
+```
 

+ 8 - 6
docs/en_US/sqls/built-in_functions.md

@@ -56,7 +56,7 @@ Aggregate functions perform a calculation on a set of values and return a single
 | lower    | lower(col1) | Returns the lowercase version of the given String.                                                                         |
 | lpad     | lpad(col1, 2) | Returns the String argument, padded on the left side with the number of spaces specified by the second argument.         |
 | ltrim    | ltrim(col1) | Removes all leading whitespace (tabs and spaces) from the provided String.                                                |
-| numbytes | numbytes(col1) | Returns the number of bytes in the UTF-8 encoding of the provided string.                                         | 
+| numbytes | numbytes(col1) | Returns the number of bytes in the UTF-8 encoding of the provided string.                                         |
 | regexp_matches| regexp_matches(col1, regex) | Returns true if the string (first argument) contains a match for the regular expression.            |
 | regexp_replace| regexp_matches(col1, regex, str) | Replaces all occurrences of the second argument (regular expression) in the first argument with the third argument.                                                          |
 | regexp_substr| regexp_substr(col1, regex) | Finds the first match of the 2nd parameter (regex) in the first parameter.                            |
@@ -64,6 +64,7 @@ Aggregate functions perform a calculation on a set of values and return a single
 | rtrim    | rtrim(col1) | Removes all trailing whitespace (tabs and spaces) from the provided String.                                                |
 | substring| substring(col1, start, end) |  returns the substring of the provided String from the provided Int index (0-based, inclusive) to the end of the String.                                                           |
 | startswith| startswith(col1, str) | Returns Boolean, whether the first string argument starts with the second string argument.                  |
+| split_value | split_value(col1, str_splitter, index) | Split the value of the 1st parameter with the 2nd parameter, and return the value of split array that indexed with the 3rd parameter.<br />``split_value("/test/device001/message","/",0) AS a``, the returned value of function is empty; <br />``split_value("/test/device001/message","/",3) AS a``, the returned value of function is ``message``; |
 | trim      | trim(col1) | Removes all leading and trailing whitespace (tabs and spaces) from the provided String.                                    |
 | upper     | upper(col1)| Returns the uppercase version of the given String.|
 
@@ -86,8 +87,9 @@ Aggregate functions perform a calculation on a set of values and return a single
 | sha512   | sha512(col1)| Hashed value of the argument                   |
 
 ## Other Functions
-| Function | Example     | Description                                    |
-| -------- | ----------- | ---------------------------------------------- |
-| isNull   | isNull(col1)| Returns true if the argument is the Null value.|
-| newuuid  | newuuid()   | Returns a random 16-byte UUID.                 |
-| timestamp| timestamp() | Returns the current timestamp in milliseconds from 00:00:00 Coordinated Universal Time (UTC), Thursday, 1 January 1970 |
+| Function  | Example      | Description                                                  |
+| --------- | ------------ | ------------------------------------------------------------ |
+| isNull    | isNull(col1) | Returns true if the argument is the Null value.              |
+| newuuid   | newuuid()    | Returns a random 16-byte UUID.                               |
+| timestamp | timestamp()  | Returns the current timestamp in milliseconds from 00:00:00 Coordinated Universal Time (UTC), Thursday, 1 January 1970 |
+| mqtt      | mqtt(topic)  | Returns the MQTT meta-data of specified key. The current supported keys<br />- topic: return the topic of message.  If there are multiple stream source, then specify the source name in parameter. Such as ``mqtt(src1.topic)``<br />- messageid: return the message id of message. If there are multiple stream source, then specify the source name in parameter. Such as ``mqtt(src2.messageid)`` |

+ 30 - 0
xsql/funcs_ast_validator.go

@@ -160,6 +160,24 @@ func validateStrFunc(name string, args []Expr) (error) {
 				}
 			}
 		}
+	case "split_value":
+		if len != 3 {
+			return fmt.Errorf("the arguments for split_value should be 3")
+		}
+		if isNumericArg(args[0]) || isTimeArg(args[0]) || isBooleanArg(args[0]) {
+			return produceErrInfo(name, 0, "string")
+		}
+		if isNumericArg(args[1]) || isTimeArg(args[1]) || isBooleanArg(args[1]) {
+			return produceErrInfo(name, 1, "string")
+		}
+		if isFloatArg(args[2]) || isTimeArg(args[2]) || isBooleanArg(args[2]) || isStringArg(args[2]) {
+			return produceErrInfo(name, 2, "int")
+		}
+		if s, ok := args[2].(*IntegerLiteral); ok {
+			if s.Val < 0 {
+				return fmt.Errorf("The index should not be a nagtive integer.")
+			}
+		}
 	}
 	return nil
 }
@@ -254,6 +272,18 @@ func validateOtherFunc(name string, args []Expr) (error) {
 		if err := validateLen(name, 0, len); err != nil {
 			return  err
 		}
+	case "mqtt":
+		if err := validateLen(name, 1, len); err != nil {
+			return err
+		}
+		if isIntegerArg(args[0]) || isTimeArg(args[0]) || isBooleanArg(args[0]) || isStringArg(args[0]) || isFloatArg(args[0]) {
+			return produceErrInfo(name, 0, "field reference")
+		}
+		if p, ok := args[0].(*FieldRef); ok {
+			if _, ok := SpecialKeyMapper[p.Name]; !ok {
+				return fmt.Errorf("Parameter of mqtt function can be only topic or messageid.")
+			}
+		}
 	}
 	return nil
 }

+ 30 - 0
xsql/funcs_ast_validator_test.go

@@ -387,6 +387,36 @@ func TestFuncValidator(t *testing.T) {
 			err: "Expect string type for 1 parameter of function sha512.",
 		},
 
+		{
+			s: `SELECT mqtt("topic") FROM tbl`,
+			stmt: nil,
+			err: "Expect field reference type for 1 parameter of function mqtt.",
+		},
+
+		{
+			s: `SELECT mqtt(topic1) FROM tbl`,
+			stmt: nil,
+			err: "Parameter of mqtt function can be only topic or messageid.",
+		},
+
+		{
+			s: `SELECT split_value(topic1) FROM tbl`,
+			stmt: nil,
+			err: "the arguments for split_value should be 3",
+		},
+
+		{
+			s: `SELECT split_value(topic1, 3, 1) FROM tbl`,
+			stmt: nil,
+			err: "Expect string type for 2 parameter of function split_value.",
+		},
+
+		{
+			s: `SELECT split_value(topic1, "hello", -1) FROM tbl`,
+			stmt: nil,
+			err: "The index should not be a nagtive integer.",
+		},
+
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))

+ 7 - 0
xsql/funcs_misc.go

@@ -201,7 +201,14 @@ func otherCall(name string, args []interface{}) (interface{}, bool) {
 		}
 	case "timestamp":
 		return common.TimeToUnixMilli(time.Now()), true
+	case "mqtt":
+		if v, ok := args[0].(string); ok {
+			return v, true
+		}
+		return nil, false
 	default:
 		return fmt.Errorf("unknown function name %s", name), false
 	}
 }
+
+

+ 35 - 0
xsql/funcs_rewriter.go

@@ -0,0 +1,35 @@
+package xsql
+
+import (
+	"strings"
+)
+
+const INTERNAL_MQTT_TOPIC_KEY string = "internal_mqtt_topic_key_$$"
+const INTERNAL_MQTT_MSG_ID_KEY string = "internal_mqtt_msg_id_key_$$"
+
+
+//For functions such as mqtt(topic). If the field definitions also has a field named "topic", then it need to
+//have an internal key for "topic" to avoid key conflicts.
+var SpecialKeyMapper = map[string]string{"topic" : INTERNAL_MQTT_TOPIC_KEY, "messageid" : INTERNAL_MQTT_MSG_ID_KEY}
+func AddSpecialKeyMap(left, right string) {
+	SpecialKeyMapper[left] = right
+}
+
+/**
+The function is used for re-write the parameter names.
+For example, for mqtt function, the arguments could be 'topic' or 'messageid'.
+If the field name defined in stream happens to be 'topic' or 'messageid', it will have conflicts.
+ */
+func (c Call) rewrite_func() *Call {
+	if strings.ToLower(c.Name) == "mqtt" {
+		if f, ok := c.Args[0].(*FieldRef); ok {
+			if n, ok1 := SpecialKeyMapper[f.Name]; ok1 {
+				f.Name = n
+				c.Args[0] = f
+			}
+		}
+	}
+	return &c
+}
+
+

+ 9 - 0
xsql/funcs_str.go

@@ -102,6 +102,15 @@ func strCall(name string, args []interface{}) (interface{}, bool) {
 	case "startswith":
 		arg0, arg1 := common.ToString(args[0]), common.ToString(args[1])
 		return strings.HasPrefix(arg0, arg1), true
+	case "split_value":
+		arg0, arg1 := common.ToString(args[0]), common.ToString(args[1])
+		ss := strings.Split(arg0, arg1)
+		v, _ := common.ToInt(args[2])
+		if v > (len(ss) - 1) {
+			return fmt.Errorf("%d out of index array (size = %d)", v, ss), false
+		} else {
+			return ss[v], true
+		}
 	case "trim":
 		arg0 := common.ToString(args[0])
 		return strings.TrimSpace(arg0), true

+ 2 - 2
xsql/functions.go

@@ -35,7 +35,7 @@ var strFuncMap = map[string]string{"concat": "",
 	"length":   "", "lower": "", "lpad": "", "ltrim": "",
 	"numbytes":       "",
 	"regexp_matches": "", "regexp_replace": "", "regexp_substr": "", "rpad": "", "rtrim": "",
-	"substring": "", "startswith": "",
+	"substring": "", "startswith": "", "split_value": "",
 	"trim":  "",
 	"upper": "",
 }
@@ -50,7 +50,7 @@ var hashFuncMap = map[string]string{ "md5": "",
 }
 
 var otherFuncMap = map[string]string{"isNull": "",
-	"newuuid": "", "timestamp": "",
+	"newuuid": "", "timestamp": "", "mqtt": "",
 }
 
 func (*FunctionValuer) Call(name string, args []interface{}) (interface{}, bool) {

+ 3 - 3
xsql/parser.go

@@ -592,13 +592,13 @@ func (p *Parser) parseCall(name string) (Expr, error) {
 	var args []Expr
 	for {
 		if tok, _ := p.scanIgnoreWhitespace(); tok == RPAREN {
-			return &Call{Name: name, Args: args}, nil
+			return Call{Name: name, Args: args}.rewrite_func(), nil
 		} else if tok == ASTERISK {
 			if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 != RPAREN {
 				return nil, fmt.Errorf("found %q, expected right paren.", lit2)
 			} else {
 				args = append(args, &StringLiteral{Val:"*"})
-				return &Call{Name: name, Args: args}, nil
+				return Call{Name: name, Args: args}.rewrite_func(), nil
 			}
 		} else {
 			p.unscan()
@@ -623,7 +623,7 @@ func (p *Parser) parseCall(name string) (Expr, error) {
 		if valErr := validateFuncs(name, args); valErr != nil {
 			return nil, valErr
 		}
-		return &Call{Name: name, Args: args}, nil
+		return Call{Name: name, Args: args}.rewrite_func(), nil
 	} else {
 		if error != nil {
 			return nil, error

+ 33 - 0
xsql/plans/join_test.go

@@ -561,6 +561,39 @@ func TestLeftJoinPlan_Apply(t *testing.T) {
 			},
 		},
 
+		{
+			sql: "SELECT id1, mqtt(src1.topic) AS a, mqtt(src2.topic) as b FROM src1 left join src2 on src1.id1 = src2.id2",
+			data: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter:"src1",
+					Tuples:[]xsql.Tuple{
+						{
+							Emitter: "src1",
+							Message: xsql.Message{ "id1" : 1, "f1" : "v1", xsql.INTERNAL_MQTT_TOPIC_KEY: "devices/type1/device001"},
+						},
+					},
+				},
+
+				xsql.WindowTuples{
+					Emitter:"src2",
+					Tuples:[]xsql.Tuple{
+						{
+							Emitter: "src2",
+							Message: xsql.Message{ "id2" : 1, "f2" : "w1", xsql.INTERNAL_MQTT_TOPIC_KEY: "devices/type2/device001" },
+						},
+					},
+				},
+			},
+			result: xsql.JoinTupleSets{
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src1", Message: xsql.Message{ "id1" : 1, "f1" : "v1" , xsql.INTERNAL_MQTT_TOPIC_KEY: "devices/type1/device001"},},
+						{Emitter: "src2", Message: xsql.Message{ "id2" : 1, "f2" : "w1", xsql.INTERNAL_MQTT_TOPIC_KEY: "devices/type2/device001" },},
+					},
+				},
+			},
+		},
+
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))

+ 78 - 1
xsql/plans/misc_func_test.go

@@ -85,6 +85,83 @@ func TestHashFunc_Apply1(t *testing.T) {
 				"a": strings.ToLower("07E547D9586F6A73F73FBAC0435ED76951218FB7D0C8D788A309D785436BBB642E93A252A954F23912547D1E8A3B5ED6E1BFD7097821233FA0538F3DB854FEE6"),
 			}},
 		},
+
+		{
+			sql: "SELECT mqtt(topic) AS a FROM test",
+			data: &xsql.Tuple{
+				Emitter: "test",
+				Message: xsql.Message{
+					xsql.INTERNAL_MQTT_TOPIC_KEY : "devices/device_001/message",
+				},
+			},
+			result: []map[string]interface{}{{
+				"a": "devices/device_001/message",
+			}},
+		},
+
+		{
+			sql: "SELECT mqtt(topic) AS a FROM test",
+			data: &xsql.Tuple{
+				Emitter: "test",
+				Message: xsql.Message{
+					xsql.INTERNAL_MQTT_TOPIC_KEY : "devices/device_001/message",
+				},
+			},
+			result: []map[string]interface{}{{
+				"a": "devices/device_001/message",
+			}},
+		},
+
+
+	}
+
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	for i, tt := range tests {
+		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
+		if err != nil || stmt == nil {
+			t.Errorf("parse sql %s error %v", tt.sql, err)
+		}
+		pp := &ProjectPlan{Fields:stmt.Fields}
+		result := pp.Apply(nil, tt.data)
+		var mapRes []map[string]interface{}
+		if v, ok := result.([]byte); ok {
+			err := json.Unmarshal(v, &mapRes)
+			if err != nil {
+				t.Errorf("Failed to parse the input into map.\n")
+				continue
+			}
+			//fmt.Printf("%t\n", mapRes["rengine_field_0"])
+
+			if !reflect.DeepEqual(tt.result, mapRes) {
+				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
+			}
+		} else {
+			t.Errorf("The returned result is not type of []byte\n")
+		}
+	}
+}
+func TestMqttFunc_Apply2(t *testing.T) {
+	var tests = []struct {
+		sql  string
+		data xsql.JoinTupleSets
+		result []map[string]interface{}
+	}{
+		{
+			sql: "SELECT id1, mqtt(src1.topic) AS a, mqtt(src2.topic) as b FROM src1 LEFT JOIN src2 ON src1.id1 = src2.id1",
+			data: xsql.JoinTupleSets{
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "src1", Message: xsql.Message{ "id1" : "1", "f1" : "v1" , xsql.INTERNAL_MQTT_TOPIC_KEY: "devices/type1/device001"},},
+						{Emitter: "src2", Message: xsql.Message{ "id2" : "1", "f2" : "w1", xsql.INTERNAL_MQTT_TOPIC_KEY: "devices/type2/device001" },},
+					},
+				},
+			},
+			result: []map[string]interface{}{{
+				"id1": "1",
+				"a": "devices/type1/device001",
+				"b": "devices/type2/device001",
+			}},
+		},
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
@@ -111,4 +188,4 @@ func TestHashFunc_Apply1(t *testing.T) {
 			t.Errorf("The returned result is not type of []byte\n")
 		}
 	}
-}
+}

+ 8 - 0
xsql/plans/preprocessor.go

@@ -55,6 +55,14 @@ func (p *Preprocessor) Apply(ctx context.Context, data interface{}) interface{}
 			return nil
 		}
 	}
+
+	//Add the value of special keys
+	for _, v := range xsql.SpecialKeyMapper {
+		if v1, ok := tuple.Message[v]; ok {
+			result[v] = v1
+		}
+	}
+
 	tuple.Message = result
 	if p.isEventTime{
 		if t, ok := result[p.timestampField]; ok{

+ 64 - 0
xsql/plans/str_func_test.go

@@ -353,6 +353,70 @@ func TestStrFunc_Apply1(t *testing.T) {
 				"a": "NYCNICKS",
 			}},
 		},
+
+		{
+			sql: `SELECT split_value(a,"/",0) AS a FROM test1`,
+			data: &xsql.Tuple{
+				Emitter: "test",
+				Message: xsql.Message{
+					"a" : "test/device001/message",
+				},
+			},
+			result: []map[string]interface{}{{
+				"a": "test",
+			}},
+		},
+
+		{
+			sql: `SELECT split_value(a,"/",1) AS a FROM test1`,
+			data: &xsql.Tuple{
+				Emitter: "test",
+				Message: xsql.Message{
+					"a" : "test/device001/message",
+				},
+			},
+			result: []map[string]interface{}{{
+				"a": "device001",
+			}},
+		},
+
+		{
+			sql: `SELECT split_value(a,"/",2) AS a FROM test1`,
+			data: &xsql.Tuple{
+				Emitter: "test",
+				Message: xsql.Message{
+					"a" : "test/device001/message",
+				},
+			},
+			result: []map[string]interface{}{{
+				"a": "message",
+			}},
+		},
+
+		{
+			sql: `SELECT split_value(a,"/",0) AS a, split_value(a,"/",3) AS b FROM test1`,
+			data: &xsql.Tuple{
+				Emitter: "test",
+				Message: xsql.Message{
+					"a" : "/test/device001/message",
+				},
+			},
+			result: []map[string]interface{}{{
+				"a": "",
+				"b": "message",
+			}},
+		},
+
+		{
+			sql: `SELECT split_value(a,"/",3) AS a FROM test1`,
+			data: &xsql.Tuple{
+				Emitter: "test",
+				Message: xsql.Message{
+					"a" : "test/device001/message",
+				},
+			},
+			result: []map[string]interface{}{map[string]interface {}{}},
+		},
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))

+ 51 - 18
xstream/extensions/mqtt_source.go

@@ -10,6 +10,8 @@ import (
 	"github.com/go-yaml/yaml"
 	"github.com/google/uuid"
 	"os"
+	"strconv"
+	"strings"
 	"time"
 )
 
@@ -17,6 +19,10 @@ type MQTTSource struct {
 	srv      string
 	tpc      string
 	clientid string
+	pVersion uint
+	uName 	 string
+	password string
+
 	schema   map[string]interface{}
 
 	outs  map[string]chan<- interface{}
@@ -31,9 +37,11 @@ type MQTTConfig struct {
 	Sharedsubscription string `yaml:"sharedsubscription"`
 	Servers []string `yaml:"servers"`
 	Clientid string `yaml:"clientid"`
+	PVersion string `yaml:"protocolVersion"`
+	Uname string `yaml:"username"`
+	Password string `yaml:"password"`
 }
 
-
 const confName string = "mqtt_source.yaml"
 
 func NewWithName(name string, topic string, confKey string) (*MQTTSource, error) {
@@ -63,6 +71,23 @@ func NewWithName(name string, topic string, confKey string) (*MQTTSource, error)
 	} else {
 		ms.clientid = cfg["default"].Clientid
 	}
+
+	var pversion uint = 3
+	if pv := cfg[confKey].PVersion; pv != "" {
+		if pv == "3.1.1" {
+			pversion = 4
+		}
+	}
+	ms.pVersion = pversion
+
+	if uname := cfg[confKey].Uname; uname != "" {
+		ms.uName = strings.Trim(uname, " ")
+	}
+
+	if password := cfg[confKey].Password; password != "" {
+		ms.password = strings.Trim(password, " ")
+	}
+
 	return ms, nil
 }
 
@@ -94,7 +119,7 @@ func (ms *MQTTSource) Open(ctx context.Context) error {
 	log := common.GetLogger(ctx)
 	go func() {
 		exeCtx, cancel := context.WithCancel(ctx)
-		opts := MQTT.NewClientOptions().AddBroker(ms.srv)
+		opts := MQTT.NewClientOptions().AddBroker(ms.srv).SetProtocolVersion(ms.pVersion)
 
 		if ms.clientid == "" {
 			if uuid, err := uuid.NewUUID(); err != nil {
@@ -108,24 +133,32 @@ func (ms *MQTTSource) Open(ctx context.Context) error {
 			opts.SetClientID(ms.clientid)
 		}
 
+		if ms.uName != "" {
+			opts.SetUsername(ms.uName)
+		}
+
+		if ms.password != "" {
+			opts.SetPassword(ms.password)
+		}
+
+
 		h := func(client MQTT.Client, msg MQTT.Message) {
-			if ms.tpc != msg.Topic() {
+			log.Infof("received %s", msg.Payload())
+
+			result := make(map[string]interface{})
+			//The unmarshal type can only be bool, float64, string, []interface{}, map[string]interface{}, nil
+			if e := json.Unmarshal(msg.Payload(), &result); e != nil {
+				log.Errorf("Invalid data format, cannot convert %s into JSON with error %s", string(msg.Payload()), e)
 				return
-			} else {
-				log.Infof("received %s", msg.Payload())
-
-				result := make(map[string]interface{})
-				//The unmarshal type can only be bool, float64, string, []interface{}, map[string]interface{}, nil
-				if e := json.Unmarshal(msg.Payload(), &result); e != nil {
-					log.Errorf("Invalid data format, cannot convert %s into JSON with error %s", string(msg.Payload()), e)
-					return
-				}
-				//Convert the keys to lowercase
-				result = xsql.LowercaseKeyMap(result)
-				tuple := &xsql.Tuple{Emitter: ms.tpc, Message:result, Timestamp: common.TimeToUnixMilli(time.Now())}
-				for _, out := range ms.outs{
-					out <- tuple
-				}
+			}
+			//Convert the keys to lowercase
+			result = xsql.LowercaseKeyMap(result)
+			result[xsql.INTERNAL_MQTT_TOPIC_KEY] = msg.Topic()
+			result[xsql.INTERNAL_MQTT_MSG_ID_KEY] = strconv.Itoa(int(msg.MessageID()))
+
+			tuple := &xsql.Tuple{Emitter: ms.tpc, Message:result, Timestamp: common.TimeToUnixMilli(time.Now())}
+			for _, out := range ms.outs{
+				out <- tuple
 			}
 		}
 

+ 44 - 2
xstream/sinks/mqtt_sink.go

@@ -6,12 +6,16 @@ import (
 	"fmt"
 	MQTT "github.com/eclipse/paho.mqtt.golang"
 	"github.com/google/uuid"
+	"strings"
 )
 
 type MQTTSink struct {
 	srv      string
 	tpc      string
 	clientid string
+	pVersion uint
+	uName 	string
+	password string
 
 	input chan interface{}
 	conn MQTT.Client
@@ -41,7 +45,38 @@ func NewMqttSink(name string, ruleId string, properties interface{}) (*MQTTSink,
 			clientid = uuid.String()
 		}
 	}
-	ms := &MQTTSink{name:name, ruleId: ruleId, input: make(chan interface{}), srv: srv.(string), tpc: tpc.(string), clientid: clientid.(string)}
+	var pVersion uint = 3
+	pVersionStr, ok := ps["protocol_version"];
+	if ok {
+		v, _ := pVersionStr.(string)
+		if v == "3.1" {
+			pVersion = 3
+		} else if v == "3.1.1" {
+			pVersion = 4
+		} else {
+			return nil, fmt.Errorf("Unknown protocol version {0}, the value could be only 3.1 or 3.1.1 (also refers to MQTT version 4).", pVersionStr)
+		}
+	}
+
+	uName := ""
+	un, ok := ps["username"];
+	if ok {
+		v, _ := un.(string)
+		if strings.Trim(v, " ") != "" {
+			uName = v
+		}
+	}
+
+	password := ""
+	pwd, ok := ps["password"];
+	if ok {
+		v, _ := pwd.(string)
+		if strings.Trim(v, " ") != "" {
+			password = v
+		}
+	}
+
+	ms := &MQTTSink{name:name, ruleId: ruleId, input: make(chan interface{}), srv: srv.(string), tpc: tpc.(string), clientid: clientid.(string), pVersion:pVersion, uName:uName, password:password}
 	return ms, nil
 }
 
@@ -59,7 +94,14 @@ func (ms *MQTTSink) Open(ctx context.Context, result chan<- error) {
 
 	go func() {
 		exeCtx, cancel := context.WithCancel(ctx)
-		opts := MQTT.NewClientOptions().AddBroker(ms.srv).SetClientID(ms.clientid)
+		opts := MQTT.NewClientOptions().AddBroker(ms.srv).SetClientID(ms.clientid).SetProtocolVersion(ms.pVersion)
+		if ms.uName != "" {
+			opts = opts.SetUsername(ms.uName)
+		}
+
+		if ms.password != "" {
+			opts = opts.SetPassword(ms.password)
+		}
 
 		c := MQTT.NewClient(opts)
 		if token := c.Connect(); token.Wait() && token.Error() != nil {