浏览代码

add cert & key for source

RockyJin 5 年之前
父节点
当前提交
fcddcf6ea7
共有 3 个文件被更改,包括 33 次插入33 次删除
  1. 24 24
      xsql/processors/xsql_processor.go
  2. 4 4
      xstream/extensions/mqtt_source.go
  3. 5 5
      xstream/sinks/mqtt_sink.go

+ 24 - 24
xsql/processors/xsql_processor.go

@@ -78,14 +78,14 @@ func (p *StreamProcessor) execCreateStream(stmt *xsql.StreamStmt, db *badger.DB)
 	if err != nil {
 		return "", err
 	}else{
-		return fmt.Sprintf("Stream %s is created.\n", stmt.Name), nil
+		return fmt.Sprintf("Stream %s is created.", stmt.Name), nil
 	}
 }
 
 func (p *StreamProcessor) execShowStream(stmt *xsql.ShowStreamsStatement, db *badger.DB) ([]string,error) {
 	keys, err := common.DbKeys(db)
 	if len(keys) == 0 {
-		keys = append(keys, "No stream definitions are found.\n")
+		keys = append(keys, "No stream definitions are found.")
 	}
 	return keys, err
 }
@@ -93,14 +93,14 @@ func (p *StreamProcessor) execShowStream(stmt *xsql.ShowStreamsStatement, db *ba
 func (p *StreamProcessor) execDescribeStream(stmt *xsql.DescribeStreamStatement, db *badger.DB) (string,error) {
 	s, err := common.DbGet(db, string(stmt.Name))
 	if err != nil {
-		return "", fmt.Errorf("Stream %s is not found.\n", stmt.Name)
+		return "", fmt.Errorf("Stream %s is not found.", stmt.Name)
 	}
 
 	parser := xsql.NewParser(strings.NewReader(s))
 	stream, err := xsql.Language.Parse(parser)
 	streamStmt, ok := stream.(*xsql.StreamStmt)
 	if !ok{
-		return "", fmt.Errorf("Error resolving the stream %s, the data in db may be corrupted.\n", stmt.Name)
+		return "", fmt.Errorf("Error resolving the stream %s, the data in db may be corrupted.", stmt.Name)
 	}
 	var buff bytes.Buffer
 	buff.WriteString("Fields\n--------------------------------------------------------------------------------\n")
@@ -117,7 +117,7 @@ func (p *StreamProcessor) execDescribeStream(stmt *xsql.DescribeStreamStatement,
 func (p *StreamProcessor) execExplainStream(stmt *xsql.ExplainStreamStatement, db *badger.DB) (string,error) {
 	_, err := common.DbGet(db, string(stmt.Name))
 	if err != nil{
-		return "", fmt.Errorf("Stream %s is not found.\n", stmt.Name)
+		return "", fmt.Errorf("Stream %s is not found.", stmt.Name)
 	}
 	return "TO BE SUPPORTED", nil
 }
@@ -127,7 +127,7 @@ func (p *StreamProcessor) execDropStream(stmt *xsql.DropStreamStatement, db *bad
 	if err != nil {
 		return "", err
 	}else{
-		return fmt.Sprintf("Stream %s is dropped.\n", stmt.Name), nil
+		return fmt.Sprintf("Stream %s is dropped.", stmt.Name), nil
 	}
 }
 
@@ -141,7 +141,7 @@ func GetStream(db *badger.DB, name string) (stmt *xsql.StreamStmt, err error){
 	stream, err := xsql.Language.Parse(parser)
 	stmt, ok := stream.(*xsql.StreamStmt)
 	if !ok{
-		err = fmt.Errorf("Error resolving the stream %s, the data in db may be corrupted.\n", name)
+		err = fmt.Errorf("Error resolving the stream %s, the data in db may be corrupted.", name)
 	}
 	return
 }
@@ -172,7 +172,7 @@ func (p *RuleProcessor) ExecCreate(name, ruleJson string) (*api.Rule, error) {
 		common.DbClose(db)
 		return nil, err
 	}else{
-		log.Infof("Rule %s is created.\n", name)
+		log.Infof("Rule %s is created.", name)
 		common.DbClose(db)
 	}
 	return rule, nil
@@ -186,7 +186,7 @@ func (p *RuleProcessor) GetRuleByName(name string) (*api.Rule, error) {
 	defer common.DbClose(db)
 	s, err := common.DbGet(db, string(name))
 	if err != nil {
-		return nil, fmt.Errorf("Rule %s is not found.\n", name)
+		return nil, fmt.Errorf("Rule %s is not found.", name)
 	}
 	return p.getRuleByJson(name, s)
 }
@@ -194,18 +194,18 @@ func (p *RuleProcessor) GetRuleByName(name string) (*api.Rule, error) {
 func (p *RuleProcessor) getRuleByJson(name, ruleJson string) (*api.Rule, error) {
 	var rule api.Rule
 	if err := json.Unmarshal([]byte(ruleJson), &rule); err != nil {
-		return nil, fmt.Errorf("Parse rule %s error : %s.\n", ruleJson, err)
+		return nil, fmt.Errorf("Parse rule %s error : %s.", ruleJson, err)
 	}
 	rule.Id = name
 	//validation
 	if name == ""{
-		return nil, fmt.Errorf("Missing rule id.\n")
+		return nil, fmt.Errorf("Missing rule id.")
 	}
 	if rule.Sql == ""{
-		return nil, fmt.Errorf("Missing rule SQL.\n")
+		return nil, fmt.Errorf("Missing rule SQL.")
 	}
 	if rule.Actions == nil || len(rule.Actions) == 0{
-		return nil, fmt.Errorf("Missing rule actions.\n")
+		return nil, fmt.Errorf("Missing rule actions.")
 	}
 	return &rule, nil
 }
@@ -218,17 +218,17 @@ func (p *RuleProcessor) ExecInitRule(rule *api.Rule) (*xstream.TopologyNew, erro
 			for name, action := range m {
 				switch name {
 				case "log":
-					log.Printf("Create log sink with %s.\n", action)
+					log.Printf("Create log sink with %s.", action)
 					tp.AddSink(inputs, nodes.NewSinkNode("sink_log", sinks.NewLogSink()))
 				case "mqtt":
-					log.Printf("Create mqtt sink with %s.\n", action)
+					log.Printf("Create mqtt sink with %s.", action)
 					if ms, err := sinks.NewMqttSink(action); err != nil{
 						return nil, err
 					}else{
 						tp.AddSink(inputs, nodes.NewSinkNode("sink_mqtt", ms))
 					}
 				default:
-					return nil, fmt.Errorf("unsupported action: %s.\n", name)
+					return nil, fmt.Errorf("unsupported action: %s.", name)
 				}
 			}
 		}
@@ -260,7 +260,7 @@ func (p *RuleProcessor) ExecDesc(name string) (string, error) {
 	defer common.DbClose(db)
 	s, err := common.DbGet(db, string(name))
 	if err != nil {
-		return "", fmt.Errorf("Rule %s is not found.\n", name)
+		return "", fmt.Errorf("Rule %s is not found.", name)
 	}
 	dst := &bytes.Buffer{}
 	if err := json.Indent(dst, []byte(s), "", "  "); err != nil {
@@ -276,7 +276,7 @@ func (p *RuleProcessor) ExecShow() (string, error) {
 		return "", err
 	}
 	if len(keys) == 0 {
-		keys = append(keys, "No rule definitions are found.\n")
+		keys = append(keys, "No rule definitions are found.")
 	}
 	var result string
 	for _, c := range keys{
@@ -304,7 +304,7 @@ func (p *RuleProcessor) ExecDrop(name string) (string, error) {
 	if err != nil {
 		return "", err
 	}else{
-		return fmt.Sprintf("Rule %s is dropped.\n", name), nil
+		return fmt.Sprintf("Rule %s is dropped.", name), nil
 	}
 }
 
@@ -321,7 +321,7 @@ func (p *RuleProcessor) createTopoWithSources(rule *api.Rule, sources []*nodes.S
 	if iet, ok := rule.Options["isEventTime"]; ok{
 		isEventTime, ok = iet.(bool)
 		if !ok{
-			return nil, nil, fmt.Errorf("Invalid rule option isEventTime %v, bool type is required.\n", iet)
+			return nil, nil, fmt.Errorf("Invalid rule option isEventTime %v, bool type is required.", iet)
 		}
 	}
 	if isEventTime {
@@ -329,23 +329,23 @@ func (p *RuleProcessor) createTopoWithSources(rule *api.Rule, sources []*nodes.S
 			if fl, ok := l.(float64); ok{
 				lateTol = int64(fl)
 			}else{
-				return nil, nil, fmt.Errorf("Invalid rule option lateTolerance %v, int type is required.\n", l)
+				return nil, nil, fmt.Errorf("Invalid rule option lateTolerance %v, int type is required.", l)
 			}
 		}
 	}
 	shouldCreateSource := sources == nil
 	parser := xsql.NewParser(strings.NewReader(sql))
 	if stmt, err := xsql.Language.Parse(parser); err != nil{
-		return nil, nil, fmt.Errorf("Parse SQL %s error: %s.\n", sql , err)
+		return nil, nil, fmt.Errorf("Parse SQL %s error: %s.", sql , err)
 	}else {
 		if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
-			return nil, nil, fmt.Errorf("SQL %s is not a select statement.\n", sql)
+			return nil, nil, fmt.Errorf("SQL %s is not a select statement.", sql)
 		} else {
 			tp := xstream.NewWithName(name)
 			var inputs []api.Emitter
 			streamsFromStmt := xsql.GetStreams(selectStmt)
 			if !shouldCreateSource && len(streamsFromStmt) != len(sources){
-				return nil, nil, fmt.Errorf("Invalid parameter sources or streams, the length cannot match the statement, expect %d sources.\n", len(streamsFromStmt))
+				return nil, nil, fmt.Errorf("Invalid parameter sources or streams, the length cannot match the statement, expect %d sources.", len(streamsFromStmt))
 			}
 			db, err := common.DbOpen(path.Join(p.badgerDir, "stream"))
 			if err != nil {

+ 4 - 4
xstream/extensions/mqtt_source.go

@@ -131,11 +131,11 @@ func (ms *MQTTSource) Open(ctx api.StreamContext, consume api.ConsumeFunc) error
 	}
 
 	if ms.certPath != "" || ms.pkeyPath != "" {
-		log.Printf("Connect MQTT broker with certification and keys.")
+		log.Infof("Connect MQTT broker with certification and keys.")
 		if cp, err := common.ProcessPath(ms.certPath); err == nil {
-			log.Printf("The certification file is %s.", cp)
+			log.Infof("The certification file is %s.", cp)
 			if kp, err1 := common.ProcessPath(ms.pkeyPath); err1 == nil {
-				log.Printf("The private key file is %s.", kp)
+				log.Infof("The private key file is %s.", kp)
 				if cer, err2 := tls.LoadX509KeyPair(cp, kp); err2 != nil {
 					return err2
 				} else {
@@ -148,7 +148,7 @@ func (ms *MQTTSource) Open(ctx api.StreamContext, consume api.ConsumeFunc) error
 			return err
 		}
 	} else {
-		log.Printf("Connect MQTT broker with username and password.")
+		log.Infof("Connect MQTT broker with username and password.")
 		if ms.uName != "" {
 			opts = opts.SetUsername(ms.uName)
 		}

+ 5 - 5
xstream/sinks/mqtt_sink.go

@@ -7,7 +7,6 @@ import (
 	"fmt"
 	MQTT "github.com/eclipse/paho.mqtt.golang"
 	"github.com/google/uuid"
-	"log"
 	"strings"
 )
 
@@ -97,11 +96,12 @@ func NewMqttSink(properties interface{}) (*MQTTSink, error) {
 
 
 func (ms *MQTTSink) Open(ctx api.StreamContext) error {
-	log.Printf("Opening mqtt sink for rule %s.", ctx.GetRuleId())
+	log := ctx.GetLogger()
+	log.Infof("Opening mqtt sink for rule %s.", ctx.GetRuleId())
 	opts := MQTT.NewClientOptions().AddBroker(ms.srv).SetClientID(ms.clientid)
 
 	if ms.certPath != "" || ms.pkeyPath != "" {
-		log.Printf("Connect MQTT broker with certification and keys.")
+		log.Infof("Connect MQTT broker with certification and keys.")
 		if cp, err := common.ProcessPath(ms.certPath); err == nil {
 			if kp, err1 := common.ProcessPath(ms.pkeyPath); err1 == nil {
 				if cer, err2 := tls.LoadX509KeyPair(cp, kp); err2 != nil {
@@ -116,7 +116,7 @@ func (ms *MQTTSink) Open(ctx api.StreamContext) error {
 			return err
 		}
 	} else {
-		log.Printf("Connect MQTT broker with username and password.")
+		log.Infof("Connect MQTT broker with username and password.")
 		if ms.uName != "" {
 			opts = opts.SetUsername(ms.uName)
 		}
@@ -130,7 +130,7 @@ func (ms *MQTTSink) Open(ctx api.StreamContext) error {
 	if token := c.Connect(); token.Wait() && token.Error() != nil {
 		return fmt.Errorf("Found error: %s", token.Error())
 	}
-	log.Printf("The connection to server %s was established successfully", ms.srv)
+	log.Infof("The connection to server %s was established successfully", ms.srv)
 	ms.conn = c
 	return nil
 }