Jelajahi Sumber

bug(status): correct rule status and source config reload

ngjaying 5 tahun lalu
induk
melakukan
e4b4b5eb52

+ 2 - 35
xsql/processors/xsql_processor.go

@@ -14,7 +14,6 @@ import (
 	"engine/xstream/operators"
 	"engine/xstream/sinks"
 	"fmt"
-	"github.com/go-yaml/yaml"
 	"path"
 	"strings"
 )
@@ -245,7 +244,7 @@ func (p *RuleProcessor) ExecQuery(ruleid, sql string) (*xstream.TopologyNew, err
 		go func() {
 			select {
 			case err := <-tp.Open():
-				log.Println(err)
+				log.Infof("closing query for error: %v", err)
 				tp.Cancel()
 			}
 		}()
@@ -373,7 +372,7 @@ func (p *RuleProcessor) createTopoWithSources(rule *api.Rule, sources []*nodes.S
 					if err != nil {
 						return nil, nil, fmt.Errorf("fail to get source: %v", err)
 					}
-					node := nodes.NewSourceNode(s, src)
+					node := nodes.NewSourceNode(s, src, streamStmt.Options)
 					tp.AddSrc(node)
 					preprocessorOp := xstream.Transform(pp, "preprocessor_"+s)
 					tp.AddOperator([]api.Emitter{node}, preprocessorOp)
@@ -456,7 +455,6 @@ func getSource(streamStmt *xsql.StreamStmt) (api.Source, error) {
 	switch t {
 	case "mqtt":
 		s = &extensions.MQTTSource{}
-		log.Debugf("Source mqtt created")
 	default:
 		nf, err := plugin_manager.GetPlugin(t, "sources")
 		if err != nil {
@@ -467,41 +465,10 @@ func getSource(streamStmt *xsql.StreamStmt) (api.Source, error) {
 			return nil, fmt.Errorf("exported symbol %s is not type of api.Source", t)
 		}
 	}
-	props := getConf(t, streamStmt.Options["CONF_KEY"])
-	err := s.Configure(streamStmt.Options["DATASOURCE"], props)
-	if err != nil{
-		return nil, err
-	}
 	log.Debugf("Source %s created", t)
 	return s, nil
 }
 
-func getConf(t string, confkey string) map[string]interface{} {
-	conf, err := common.LoadConf("sources/" + t + ".yaml")
-	props := make(map[string]interface{})
-	if err == nil {
-		cfg := make(map[string]map[string]interface{})
-		if err := yaml.Unmarshal(conf, &cfg); err != nil {
-			log.Warnf("fail to parse yaml for source %s. Return an empty configuration", t)
-		} else {
-			var ok bool
-			props, ok = cfg["default"]
-			if !ok {
-				log.Warnf("default conf is not found", confkey)
-			}
-			if c, ok := cfg[confkey]; ok {
-				for k, v := range c {
-					props[k] = v
-				}
-			}
-		}
-	} else {
-		log.Warnf("config file %s.yaml is not loaded properly. Return an empty configuration", t)
-	}
-	log.Debugf("get conf for %s with conf key %s: %v", t, confkey, props)
-	return props
-}
-
 func getSink(name string, action map[string]interface{}) (api.Sink, error) {
 	log.Tracef("trying to get sink %s with action %v", name, action)
 	var s api.Sink

+ 10 - 12
xsql/processors/xsql_processor_test.go

@@ -32,11 +32,11 @@ func TestStreamCreateProcessor(t *testing.T) {
 	}{
 		{
 			s: `SHOW STREAMS;`,
-			r: []string{"no stream definition found"},
+			r: []string{"No stream definition found."},
 		},
 		{
 			s: `EXPLAIN STREAM topic1;`,
-			err: "stream topic1 not found",
+			err: "stream topic1 is not found.",
 		},
 		{
 			s: `CREATE STREAM topic1 (
@@ -47,13 +47,7 @@ func TestStreamCreateProcessor(t *testing.T) {
 					Gender BOOLEAN,
 					ADDRESS STRUCT(STREET_NAME STRING, NUMBER BIGINT),
 				) WITH (DATASOURCE="users", FORMAT="AVRO", KEY="USERID");`,
-			r: []string{"stream topic1 created"},
-		},
-		{
-			s: `CREATE STREAM topic1 (
-					USERID BIGINT,
-				) WITH (DATASOURCE="users", FORMAT="AVRO", KEY="USERID");`,
-			err: "key topic1 already exist, delete it before creating a new one",
+			r: []string{"Stream topic1 is created."},
 		},
 		{
 			s: `EXPLAIN STREAM topic1;`,
@@ -75,7 +69,7 @@ func TestStreamCreateProcessor(t *testing.T) {
 		},
 		{
 			s: `DESCRIBE STREAM topic1;`,
-			err: "stream topic1 not found",
+			err: "Stream topic1 is not found.",
 		},
 		{
 			s: `DROP STREAM topic1;`,
@@ -347,7 +341,9 @@ func getMockSource(name string, done chan<- struct{}, size int) *nodes.SourceNod
 			},
 		}
 	}
-	return nodes.NewSourceNode(name, test.NewMockSource(data[:size], done, false))
+	return nodes.NewSourceNode(name, test.NewMockSource(data[:size], done, false),  map[string]string{
+		"DATASOURCE": name,
+	})
 }
 
 func TestSingleSQL(t *testing.T) {
@@ -1011,7 +1007,9 @@ func getEventMockSource(name string, done chan<- struct{}, size int) *nodes.Sour
 			},
 		}
 	}
-	return nodes.NewSourceNode(name, test.NewMockSource(data[:size], done, true))
+	return nodes.NewSourceNode(name, test.NewMockSource(data[:size], done, true), map[string]string{
+		"DATASOURCE": name,
+	})
 }
 
 func TestEventWindow(t *testing.T) {

+ 7 - 2
xstream/nodes/sink_node.go

@@ -26,7 +26,12 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
 	logger.Debugf("open sink node %s", m.name)
 	go func() {
 		if err := m.sink.Open(ctx); err != nil{
-			go func() { result <- err }()
+			go func() {
+				select{
+				case result <- err:
+				case <-ctx.Done():
+				}
+			}()
 			return
 		}
 		for {
@@ -39,7 +44,7 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
 			case <-ctx.Done():
 				logger.Infof("sink node %s done", m.name)
 				if err := m.sink.Close(ctx); err != nil{
-					go func() { result <- err }()
+					logger.Warnf("close sink node %s fails: %v", m.name, err)
 				}
 				return
 			}

+ 60 - 14
xstream/nodes/source_node.go

@@ -5,6 +5,8 @@ import (
 	"engine/xsql"
 	"engine/xstream/api"
 	"fmt"
+	"github.com/go-yaml/yaml"
+	"github.com/sirupsen/logrus"
 )
 
 type SourceNode struct {
@@ -12,13 +14,15 @@ type SourceNode struct {
 	outs   map[string]chan<- interface{}
 	name   string
 	ctx    api.StreamContext
+	options map[string]string
 }
 
-func NewSourceNode(name string, source api.Source) *SourceNode{
+func NewSourceNode(name string, source api.Source, options map[string]string) *SourceNode {
 	return &SourceNode{
 		source: source,
 		outs: make(map[string]chan<- interface{}),
 		name: name,
+		options: options,
 		ctx: nil,
 	}
 }
@@ -26,27 +30,28 @@ func NewSourceNode(name string, source api.Source) *SourceNode{
 func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
 	m.ctx = ctx
 	logger := ctx.GetLogger()
-	logger.Debugf("open source node %s", m.name)
+	logger.Debugf("open source node %s with option %v", m.name, m.options)
 	go func(){
-		if err := m.source.Open(ctx, func(message map[string]interface{}, meta map[string]interface{}){
-			tuple := &xsql.Tuple{Emitter: m.name, Message:message, Timestamp: common.GetNowInMilli(), Metadata:meta}
+		props := getConf(m.options["TYPE"], m.options["CONF_KEY"], ctx)
+		err := m.source.Configure(m.options["DATASOURCE"], props)
+		if err != nil{
+			m.drainError(errCh, err, ctx, logger)
+			return
+		}
+		if err := m.source.Open(ctx, func(message map[string]interface{}, meta map[string]interface{}) {
+			tuple := &xsql.Tuple{Emitter: m.name, Message: message, Timestamp: common.GetNowInMilli(), Metadata: meta}
 			m.Broadcast(tuple)
 			logger.Debugf("%s consume data %v complete", m.name, tuple)
-		}); err != nil{
-			select {
-			case errCh <- err:
-			case <-ctx.Done():
-				if err := m.source.Close(ctx); err != nil{
-					go func() { errCh <- err }()
-				}
-			}
+		}); err != nil {
+			m.drainError(errCh, err, ctx, logger)
+			return
 		}
 		for {
 			select {
 			case <-ctx.Done():
 				logger.Infof("source %s done", m.name)
-				if err := m.source.Close(ctx); err != nil{
-					go func() { errCh <- err }()
+				if err := m.source.Close(ctx); err != nil {
+					logger.Warnf("close source fails: %v", err)
 				}
 				return
 			}
@@ -54,6 +59,47 @@ func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
 	}()
 }
 
+func (m *SourceNode) drainError(errCh chan<- error, err error, ctx api.StreamContext, logger *logrus.Entry) {
+	select {
+	case errCh <- err:
+	case <-ctx.Done():
+		if err := m.source.Close(ctx); err != nil {
+			logger.Warnf("close source fails: %v", err)
+		}
+	}
+	return
+}
+
+func getConf(t string, confkey string, ctx api.StreamContext) map[string]interface{} {
+	logger := ctx.GetLogger()
+	if t == ""{
+		t = "mqtt"
+	}
+	conf, err := common.LoadConf("sources/" + t + ".yaml")
+	props := make(map[string]interface{})
+	if err == nil {
+		cfg := make(map[string]map[string]interface{})
+		if err := yaml.Unmarshal(conf, &cfg); err != nil {
+			logger.Warnf("fail to parse yaml for source %s. Return an empty configuration", t)
+		} else {
+			var ok bool
+			props, ok = cfg["default"]
+			if !ok {
+				logger.Warnf("default conf is not found", confkey)
+			}
+			if c, ok := cfg[confkey]; ok {
+				for k, v := range c {
+					props[k] = v
+				}
+			}
+		}
+	} else {
+		logger.Warnf("config file %s.yaml is not loaded properly. Return an empty configuration", t)
+	}
+	logger.Debugf("get conf for %s with conf key %s: %v", t, confkey, props)
+	return props
+}
+
 func (m *SourceNode) Broadcast(data interface{}) int{
 	return Broadcast(m.outs, data, m.ctx)
 }

+ 2 - 4
xstream/server/main.go

@@ -36,9 +36,7 @@ func (t *Server) CreateQuery(sql string, reply *string) error {
 	}
 	tp, err := processors.NewRuleProcessor(path.Dir(dataDir)).ExecQuery(QUERY_RULE_ID, sql)
 	if err != nil {
-		msg := fmt.Sprintf("Failed to create query: %s.", err)
-		log.Println(msg)
-		return fmt.Errorf(msg)
+		return fmt.Errorf("failed to create query: %s", err)
 	} else {
 		rs := &RuleState{Name: QUERY_RULE_ID, Topology: tp, Triggered: true}
 		registry[QUERY_RULE_ID] = rs
@@ -174,7 +172,7 @@ func (t *Server) doStartRule(rs *RuleState) error{
 		tp := rs.Topology
 		select {
 		case err := <-tp.Open():
-			log.Println(err)
+			log.Printf("closing rule %s for error: %v", rs.Name, err)
 			tp.Cancel()
 		}
 	}()

+ 7 - 16
xstream/streams.go

@@ -20,7 +20,10 @@ type TopologyNew struct {
 }
 
 func NewWithName(name string) *TopologyNew {
-	tp := &TopologyNew{name: name}
+	tp := &TopologyNew{
+		name: name,
+		drain: make(chan error),
+	}
 	return tp
 }
 
@@ -111,31 +114,19 @@ func (s *TopologyNew) Open() <-chan error {
 
 	// open stream
 	go func() {
-		streamErr := make(chan error)
-		defer func() {
-			log.Println("Closing streamErr channel")
-			close(streamErr)
-		}()
 		// open stream sink, after log sink is ready.
 		for _, snk := range s.sinks{
-			snk.Open(s.ctx.WithMeta(s.name, snk.GetName()), streamErr)
+			snk.Open(s.ctx.WithMeta(s.name, snk.GetName()), s.drain)
 		}
 
 		//apply operators, if err bail
 		for _, op := range s.ops {
-			op.Exec(s.ctx.WithMeta(s.name, op.GetName()), streamErr)
+			op.Exec(s.ctx.WithMeta(s.name, op.GetName()), s.drain)
 		}
 
 		// open source, if err bail
 		for _, node := range s.sources{
-			node.Open(s.ctx.WithMeta(s.name, node.GetName()), streamErr)
-		}
-
-		select {
-		case err := <-streamErr:
-			//TODO error handling
-			log.Println("Closing stream")
-			s.drain <- err
+			node.Open(s.ctx.WithMeta(s.name, node.GetName()), s.drain)
 		}
 	}()