Browse Source

feat(sinks): fixed several problems of rest sink
1. refactor the kuiper.yaml loading
2. fixed rest header settings
3. fixed rest cache problems - the cache entries cannot be deleted even they are sent successfully

RockyJin 4 years atrás
parent
commit
a00ae7031a

+ 1 - 1
common/http_util.go

@@ -70,7 +70,7 @@ func Send(logger api.Logger, client *http.Client, bodyType string, method string
 			req.Header.Set(k, v)
 		}
 	}
-	logger.Debugf("do request: %s %s with %s", method, u, req.Body)
+	logger.Debugf("do request: %#v", req)
 	return client.Do(req)
 }
 

+ 66 - 0
common/templates/funcs_test.go

@@ -0,0 +1,66 @@
+package templates
+
+import (
+	"encoding/base64"
+	"fmt"
+	"reflect"
+	"testing"
+)
+
+func TestBase64Encode(t *testing.T) {
+	var tests = []struct {
+		para   interface{}
+		expect string
+		err    string
+	}{
+		{
+			para:   1,
+			expect: "1",
+		},
+
+		{
+			para:   float32(3.14),
+			expect: "3.14",
+		},
+
+		{
+			para:   float64(3.1415),
+			expect: "3.1415",
+		},
+		{
+			para:   "hello",
+			expect: "hello",
+		},
+		{
+			para:   "{\"hello\" : 3}",
+			expect: "{\"hello\" : 3}",
+		},
+		{
+			para: map[string]interface{}{
+				"temperature": 30,
+				"humidity":    20,
+			},
+			expect: `{"humidity":20,"temperature":30}`,
+		},
+	}
+
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	for i, tt := range tests {
+		result, err := Base64Encode(tt.para)
+		r, _ := base64.StdEncoding.DecodeString(result)
+		if !reflect.DeepEqual(tt.err, errstring(err)) {
+			t.Errorf("%d. %q: error mismatch:\n  exp=%s\n  got=%s\n\n", i, tt.para, tt.err, err)
+
+		} else if tt.err == "" && !reflect.DeepEqual(tt.expect, string(r)) {
+			t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.para, tt.expect, string(r))
+		}
+	}
+}
+
+// errstring returns the string representation of an error.
+func errstring(err error) string {
+	if err != nil {
+		return err.Error()
+	}
+	return ""
+}

+ 24 - 21
common/util.go

@@ -29,7 +29,7 @@ const (
 
 var (
 	Log       *logrus.Logger
-	Config    *XStreamConf
+	Config    *KuiperConf
 	IsTesting bool
 	Clock     clock.Clock
 	logFile   *os.File
@@ -54,15 +54,21 @@ type tlsConf struct {
 	Keyfile  string `yaml:"keyfile"`
 }
 
-type XStreamConf struct {
-	Debug          bool     `yaml:"debug"`
-	ConsoleLog     bool     `yaml:"consoleLog"`
-	FileLog        bool     `yaml:"fileLog"`
-	Port           int      `yaml:"port"`
-	RestPort       int      `yaml:"restPort"`
-	RestTls        *tlsConf `yaml:"restTls"`
-	Prometheus     bool     `yaml:"prometheus"`
-	PrometheusPort int      `yaml:"prometheusPort"`
+type KuiperConf struct {
+	Basic struct {
+		Debug          bool     `yaml:"debug"`
+		ConsoleLog     bool     `yaml:"consoleLog"`
+		FileLog        bool     `yaml:"fileLog"`
+		Port           int      `yaml:"port"`
+		RestPort       int      `yaml:"restPort"`
+		RestTls        *tlsConf `yaml:"restTls"`
+		Prometheus     bool     `yaml:"prometheus"`
+		PrometheusPort int      `yaml:"prometheusPort"`
+	}
+	Sink struct {
+		CacheThreshold    int `yaml:"cacheThreshold"`
+		CacheTriggerCount int `yaml:"cacheTriggerCount"`
+	}
 }
 
 func init() {
@@ -96,18 +102,15 @@ func InitConf() {
 	if err != nil {
 		Log.Fatal(err)
 	}
-	var cfg map[string]XStreamConf
-	if err := yaml.Unmarshal(b, &cfg); err != nil {
-		Log.Fatal(err)
-	}
 
-	if c, ok := cfg["basic"]; !ok {
-		Log.Fatal("No basic config in kuiper.yaml")
+	kc := KuiperConf{}
+	if err := yaml.Unmarshal(b, &kc); err != nil {
+		Log.Fatal(err)
 	} else {
-		Config = &c
+		Config = &kc
 	}
 
-	if Config.Debug {
+	if Config.Basic.Debug {
 		Log.SetLevel(logrus.DebugLevel)
 	}
 
@@ -118,13 +121,13 @@ func InitConf() {
 	file := logDir + logFileName
 	logFile, err := os.OpenFile(file, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
 	if err == nil {
-		if Config.ConsoleLog {
-			if Config.FileLog {
+		if Config.Basic.ConsoleLog {
+			if Config.Basic.FileLog {
 				mw := io.MultiWriter(os.Stdout, logFile)
 				Log.SetOutput(mw)
 			}
 		} else {
-			if Config.FileLog {
+			if Config.Basic.FileLog {
 				Log.SetOutput(logFile)
 			}
 		}

+ 11 - 1
etc/kuiper.yaml

@@ -14,4 +14,14 @@ basic:
   #    keyfile: /var/https-server.key
   # Prometheus settings
   prometheus: false
-  prometheusPort: 20499
+  prometheusPort: 20499
+
+sink:
+  # The cache persistence threshold size. If the message in sink cache is larger than 10, then it triggers persistence. If you find
+  # the remote system is slow to response, or sink throughput is small, then it's recommend to increase below 2 configurations.
+  # More memory is required with the increase of below 2 configurations.
+  # If the message count reaches below value, then it triggers persistence.
+  cacheThreshold: 10
+  # The message persistence is triggered by a ticker, and cacheTriggerCount is for using configure the count to trigger the persistence procedure
+  # regardless if the message number reaches cacheThreshold or not. This is to prevent the data won't be saved as the cache never pass the threshold.
+  cacheTriggerCount: 15

+ 1 - 0
xsql/processors/xsql_processor_test.go

@@ -19,6 +19,7 @@ import (
 var DbDir = getDbDir()
 
 func getDbDir() string {
+	common.InitConf()
 	dbDir, err := common.GetAndCreateDataLoc("test")
 	if err != nil {
 		log.Panic(err)

+ 21 - 16
xstream/nodes/sink_cache.go

@@ -5,7 +5,6 @@ import (
 	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/xstream/api"
-	"github.com/prometheus/common/log"
 	"io"
 	"path"
 	"sort"
@@ -50,6 +49,10 @@ func (l *LinkedQueue) clone() *LinkedQueue {
 	return result
 }
 
+func (l *LinkedQueue) String() string {
+	return fmt.Sprintf("tail: %d, data: %v", l.Tail, l.Data)
+}
+
 type Cache struct {
 	//Data and control channels
 	in       <-chan interface{}
@@ -62,14 +65,11 @@ type Cache struct {
 	key     string //the key for current cache
 	store   common.KeyValue
 	changed bool
-	saved   int
 	//configs
 	limit        int
 	saveInterval int
 }
 
-const THRESHOLD int = 10
-
 func NewCache(in <-chan interface{}, limit int, saveInterval int, errCh chan<- error, ctx api.StreamContext) *Cache {
 	c := &Cache{
 		in:       in,
@@ -100,6 +100,7 @@ func (c *Cache) run(ctx api.StreamContext) {
 	}
 
 	ticker := common.GetTicker(c.saveInterval)
+	var tcount = 0
 	for {
 		select {
 		case item := <-c.in:
@@ -115,6 +116,7 @@ func (c *Cache) run(ctx api.StreamContext) {
 			c.pending.delete(index)
 			c.changed = true
 		case <-ticker.C:
+			tcount++
 			l := c.pending.length()
 			if l == 0 {
 				c.pending.reset()
@@ -122,23 +124,25 @@ func (c *Cache) run(ctx api.StreamContext) {
 			//If the data is still changing, only do a save when the cache has more than threshold to prevent too much file IO
 			//If the data is not changing in the time slot and have not saved before, save it. This is to prevent the
 			//data won't be saved as the cache never pass the threshold
-			if (c.changed && l > THRESHOLD) || (!c.changed && c.saved != l) {
-				logger.Infof("save cache for rule %s", ctx.GetRuleId())
+			//logger.Infof("ticker %t, l=%d\n", c.changed, l)
+			if (c.changed && l > common.Config.Sink.CacheThreshold) || (tcount == common.Config.Sink.CacheTriggerCount && c.changed) {
+				logger.Infof("save cache for rule %s, %s", ctx.GetRuleId(), c.pending.String())
 				clone := c.pending.clone()
+				c.changed = false
 				go func() {
-					if err := c.saveCache(clone); err != nil {
+					if err := c.saveCache(logger, clone); err != nil {
 						logger.Debugf("%v", err)
 						c.drainError(err)
 					}
 				}()
-				c.saved = l
-			} else if c.changed {
-				c.saved = 0
 			}
-			c.changed = false
+			if tcount >= common.Config.Sink.CacheThreshold {
+				tcount = 0
+			}
 		case <-ctx.Done():
-			if c.changed {
-				c.saveCache(c.pending)
+			err := c.saveCache(logger, c.pending)
+			if err != nil {
+				logger.Warnf("Error found during saving cache: %s \n ", err)
 			}
 			logger.Infof("sink node %s instance cache %d done", ctx.GetOpId(), ctx.GetInstanceId())
 			return
@@ -161,6 +165,7 @@ func (c *Cache) loadCache() error {
 		if t, f := c.store.Get(c.key); f {
 			if mt, ok := t.(*LinkedQueue); ok {
 				c.pending = mt
+				c.changed = true
 				// To store the keys in slice in sorted order
 				var keys []int
 				for k := range mt.Data {
@@ -168,11 +173,11 @@ func (c *Cache) loadCache() error {
 				}
 				sort.Ints(keys)
 				for _, k := range keys {
-					log.Debugf("send by cache %d", k)
-					c.Out <- &CacheTuple{
+					t := &CacheTuple{
 						index: k,
 						data:  mt.Data[k],
 					}
+					c.Out <- t
 				}
 				return nil
 			} else {
@@ -183,7 +188,7 @@ func (c *Cache) loadCache() error {
 	return nil
 }
 
-func (c *Cache) saveCache(p *LinkedQueue) error {
+func (c *Cache) saveCache(logger api.Logger, p *LinkedQueue) error {
 	err := c.store.Open()
 	if err != nil {
 		return err

+ 6 - 2
xstream/nodes/sink_node.go

@@ -128,11 +128,15 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
 				logger.Warnf("invalid type for dateTemplate property, should be a string value.", c)
 			} else {
 				funcMap := template.FuncMap{
-					"json": templates.JsonMarshal,
+					"json":   templates.JsonMarshal,
+					"base64": templates.Base64Encode,
 				}
 				temp, err := template.New("sink").Funcs(funcMap).Parse(t)
 				if err != nil {
-					logger.Warnf("property dataTemplate %v is invalid: %v", t, err)
+					msg := fmt.Sprintf("property dataTemplate %v is invalid: %v", t, err)
+					result <- fmt.Errorf(msg)
+					logger.Warnf(msg)
+					return
 				} else {
 					tp = temp
 				}

+ 1 - 1
xstream/nodes/stats_manager.go

@@ -60,7 +60,7 @@ func NewStatManager(opType string, ctx api.StreamContext) (StatManager, error) {
 	}
 
 	var sm StatManager
-	if common.Config != nil && common.Config.Prometheus {
+	if common.Config != nil && common.Config.Basic.Prometheus {
 		ctx.GetLogger().Debugf("Create prometheus stat manager")
 		psm := &PrometheusStatManager{
 			DefaultStatManager: DefaultStatManager{

+ 8 - 8
xstream/server/server/server.go

@@ -65,16 +65,16 @@ func StartUp(Version string) {
 	// Register a HTTP handler
 	rpc.HandleHTTP()
 	// Listen to TPC connections on port 1234
-	listener, e := net.Listen("tcp", fmt.Sprintf(":%d", common.Config.Port))
+	listener, e := net.Listen("tcp", fmt.Sprintf(":%d", common.Config.Basic.Port))
 	if e != nil {
 		m := fmt.Sprintf("Listen error: %s", e)
 		fmt.Printf(m)
 		logger.Fatal(m)
 	}
 
-	if common.Config.Prometheus {
+	if common.Config.Basic.Prometheus {
 		go func() {
-			port := common.Config.PrometheusPort
+			port := common.Config.Basic.PrometheusPort
 			if port <= 0 {
 				logger.Fatal("Miss configuration prometheusPort")
 			}
@@ -89,24 +89,24 @@ func StartUp(Version string) {
 	}
 
 	//Start rest service
-	srv := createRestServer(common.Config.RestPort)
+	srv := createRestServer(common.Config.Basic.RestPort)
 
 	go func() {
 		var err error
-		if common.Config.RestTls == nil {
+		if common.Config.Basic.RestTls == nil {
 			err = srv.ListenAndServe()
 		} else {
-			err = srv.ListenAndServeTLS(common.Config.RestTls.Certfile, common.Config.RestTls.Keyfile)
+			err = srv.ListenAndServeTLS(common.Config.Basic.RestTls.Certfile, common.Config.Basic.RestTls.Keyfile)
 		}
 		if err != nil {
 			logger.Fatal("Error serving rest service: ", err)
 		}
 	}()
 	t := "http"
-	if common.Config.RestTls != nil {
+	if common.Config.Basic.RestTls != nil {
 		t = "https"
 	}
-	msg := fmt.Sprintf("Serving kuiper (version - %s) on port %d, and restful api on %s://0.0.0.0:%d. \n", Version, common.Config.Port, t, common.Config.RestPort)
+	msg := fmt.Sprintf("Serving kuiper (version - %s) on port %d, and restful api on %s://0.0.0.0:%d. \n", Version, common.Config.Basic.Port, t, common.Config.Basic.RestPort)
 	logger.Info(msg)
 	fmt.Printf(msg)
 

+ 15 - 6
xstream/sinks/rest_sink.go

@@ -58,9 +58,17 @@ func (ms *RestSink) Configure(ps map[string]interface{}) error {
 
 	temp, ok = ps["headers"]
 	if ok {
-		ms.headers, ok = temp.(map[string]string)
-		if !ok {
-			return fmt.Errorf("rest sink property headers %v is not a map[string]string", temp)
+		ms.headers = make(map[string]string)
+		if m, ok := temp.(map[string]interface{}); ok {
+			for k, v := range m {
+				if v1, ok1 := v.(string); ok1 {
+					ms.headers[k] = v1
+				} else {
+					return fmt.Errorf("header value %s for header %s is not a string", v, k)
+				}
+			}
+		} else {
+			return fmt.Errorf("rest sink property headers %v is not a map[string]interface", temp)
 		}
 	}
 
@@ -144,11 +152,13 @@ func (ms *RestSink) Collect(ctx api.StreamContext, item interface{}) error {
 	logger.Debugf("rest sink receive %s", item)
 	resp, err := ms.Send(v, logger)
 	if err != nil {
-		return fmt.Errorf("rest sink fails to send out the data")
+		return fmt.Errorf("rest sink fails to send out the data: %s", err)
 	} else {
 		logger.Debugf("rest sink got response %v", resp)
 		if resp.StatusCode < 200 || resp.StatusCode > 299 {
-			return fmt.Errorf("rest sink fails to err http return code: %d.", resp.StatusCode)
+			buf, _ := ioutil.ReadAll(resp.Body)
+			logger.Errorf("%s\n", string(buf))
+			return fmt.Errorf("rest sink fails to err http return code: %d and error message %s.", resp.StatusCode, string(buf))
 		} else {
 			if ms.debugResp {
 				if buf, bodyErr := ioutil.ReadAll(resp.Body); bodyErr != nil {
@@ -157,7 +167,6 @@ func (ms *RestSink) Collect(ctx api.StreamContext, item interface{}) error {
 					logger.Infof("Response content: %s\n", string(buf))
 				}
 			}
-
 		}
 	}
 	return nil