Browse Source

fix: process unhandled errors (#1861)

* fix: process unhandled errors

Signed-off-by: xjasonlyu <xjasonlyu@gmail.com>

* refactor: ignore jsonOut error param

Signed-off-by: xjasonlyu <xjasonlyu@gmail.com>

* refactor: delete error param for jsonOut

Signed-off-by: xjasonlyu <xjasonlyu@gmail.com>

* refactor: use Errorf for error log

Signed-off-by: xjasonlyu <xjasonlyu@gmail.com>

---------

Signed-off-by: xjasonlyu <xjasonlyu@gmail.com>
Jason Lyu 1 year ago
parent
commit
6899cc5c32

+ 3 - 0
internal/compressor/compressor_test.go

@@ -32,6 +32,9 @@ func BenchmarkCompressor(b *testing.B) {
 	for _, c := range compressors {
 		b.Run(c, func(b *testing.B) {
 			wc, err := GetCompressor(c)
+			if err != nil {
+				b.Fatal(err)
+			}
 			firstCompressedData, err := wc.Compress(data)
 			if err != nil {
 				b.Fatal(err)

+ 8 - 4
internal/io/file/file_writer.go

@@ -17,13 +17,15 @@ package file
 import (
 	"bufio"
 	"fmt"
+	"io"
+	"os"
+	"time"
+
 	"github.com/klauspost/compress/gzip"
 	"github.com/klauspost/compress/zstd"
+
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/pkg/api"
-	"io"
-	"os"
-	"time"
 )
 
 type fileWriter struct {
@@ -46,7 +48,9 @@ func createFileWriter(ctx api.StreamContext, fn string, ft FileType, headers str
 		err error
 	)
 	if _, err = os.Stat(fn); os.IsNotExist(err) {
-		_, err = os.Create(fn)
+		if _, err := os.Create(fn); err != nil {
+			return nil, fmt.Errorf("fail to create file %s: %v", fn, err)
+		}
 	}
 	f, err = os.OpenFile(fn, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, os.ModeAppend)
 	if err != nil {

+ 14 - 12
internal/io/http/httppull_source_test.go

@@ -18,11 +18,6 @@ import (
 	"encoding/json"
 	"errors"
 	"fmt"
-	"github.com/benbjohnson/clock"
-	"github.com/gorilla/mux"
-	"github.com/lf-edge/ekuiper/internal/conf"
-	"github.com/lf-edge/ekuiper/internal/io/mock"
-	"github.com/lf-edge/ekuiper/pkg/api"
 	"net"
 	"net/http"
 	"net/http/httptest"
@@ -30,12 +25,19 @@ import (
 	"reflect"
 	"strconv"
 	"testing"
+
+	"github.com/benbjohnson/clock"
+	"github.com/gorilla/mux"
+
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/io/mock"
+	"github.com/lf-edge/ekuiper/pkg/api"
 )
 
-func jsonOut(w http.ResponseWriter, err error, out interface{}) {
+func jsonOut(w http.ResponseWriter, out interface{}) {
 	w.Header().Add("Content-Type", "application/json")
 	enc := json.NewEncoder(w)
-	err = enc.Encode(out)
+	err := enc.Encode(out)
 	// Problems encoding
 	if err != nil {
 		http.Error(w, err.Error(), http.StatusBadRequest)
@@ -75,7 +77,7 @@ func mockAuthServer() *httptest.Server {
 			ClientId:     "test",
 			Expires:      36000,
 		}
-		jsonOut(w, err, out)
+		jsonOut(w, out)
 	}).Methods(http.MethodPost)
 	router.HandleFunc("/refresh", func(w http.ResponseWriter, r *http.Request) {
 		token := r.Header.Get("Authorization")
@@ -97,7 +99,7 @@ func mockAuthServer() *httptest.Server {
 			ClientId:     "test",
 			Expires:      36000,
 		}
-		jsonOut(w, nil, out)
+		jsonOut(w, out)
 	}).Methods(http.MethodPost)
 	router.HandleFunc("/data", func(w http.ResponseWriter, r *http.Request) {
 		token := r.Header.Get("Authorization")
@@ -113,7 +115,7 @@ func mockAuthServer() *httptest.Server {
 			Temperature: 25.5,
 			Humidity:    60.0,
 		}
-		jsonOut(w, nil, out)
+		jsonOut(w, out)
 	}).Methods(http.MethodGet)
 	// Return same data for 3 times
 	router.HandleFunc("/data2", func(w http.ResponseWriter, r *http.Request) {
@@ -137,7 +139,7 @@ func mockAuthServer() *httptest.Server {
 			},
 		}
 		i++
-		jsonOut(w, nil, out)
+		jsonOut(w, out)
 	}).Methods(http.MethodGet)
 
 	router.HandleFunc("/data3", func(w http.ResponseWriter, r *http.Request) {
@@ -174,7 +176,7 @@ func mockAuthServer() *httptest.Server {
 				},
 			},
 		}
-		jsonOut(w, nil, out)
+		jsonOut(w, out)
 	}).Methods(http.MethodGet)
 
 	server := httptest.NewUnstartedServer(router)

+ 10 - 4
internal/io/memory/lookupsource_test.go

@@ -16,14 +16,16 @@ package memory
 
 import (
 	gocontext "context"
+	"reflect"
+	"testing"
+	"time"
+
 	"github.com/benbjohnson/clock"
+
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/io/memory/pubsub"
 	"github.com/lf-edge/ekuiper/internal/topo/context"
 	"github.com/lf-edge/ekuiper/pkg/api"
-	"reflect"
-	"testing"
-	"time"
 )
 
 func TestUpdateLookup(t *testing.T) {
@@ -68,6 +70,10 @@ func TestUpdateLookup(t *testing.T) {
 		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"ff": "value1", "gg": "value2"}, map[string]interface{}{"topic": "test"}, mc.Now()),
 	}
 	result, err := ls.Lookup(ctx, []string{}, []string{"ff"}, []interface{}{"value1"})
+	if err != nil {
+		t.Error(err)
+		return
+	}
 	if !reflect.DeepEqual(result, expected) {
 		t.Errorf("expect %v but got %v", expected, result)
 	}
@@ -116,7 +122,7 @@ func TestLookup(t *testing.T) {
 		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"ff": "value1", "gg": "value2"}, map[string]interface{}{"topic": "test2"}, mc.Now()),
 		api.NewDefaultSourceTupleWithTime(map[string]interface{}{"ff": "value1", "gg": "value4"}, map[string]interface{}{"topic": "test2"}, mc.Now()),
 	}
-	result, err := ls.Lookup(ctx, []string{}, []string{"ff"}, []interface{}{"value1"})
+	result, _ := ls.Lookup(ctx, []string{}, []string{"ff"}, []interface{}{"value1"})
 	if len(result) != 2 {
 		t.Errorf("expect %v but got %v", expected, result)
 	} else {

+ 9 - 7
internal/pkg/store/redis/redisTs.go

@@ -1,4 +1,4 @@
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -22,9 +22,11 @@ import (
 	"context"
 	"encoding/gob"
 	"fmt"
-	kvEncoding "github.com/lf-edge/ekuiper/internal/pkg/store/encoding"
-	"github.com/redis/go-redis/v9"
 	"strconv"
+
+	"github.com/redis/go-redis/v9"
+
+	kvEncoding "github.com/lf-edge/ekuiper/internal/pkg/store/encoding"
 )
 
 const (
@@ -82,12 +84,12 @@ func (t *ts) Set(key int64, value interface{}) (bool, error) {
 }
 
 func (t *ts) Get(key int64, value interface{}) (bool, error) {
-	reply, err := t.db.ZRevRangeByScore(context.Background(), t.key, &redis.ZRangeBy{Min: strconv.FormatInt(key, 10), Max: strconv.FormatInt(key, 10)}).Result()
+	reply, _ := t.db.ZRevRangeByScore(context.Background(), t.key, &redis.ZRangeBy{Min: strconv.FormatInt(key, 10), Max: strconv.FormatInt(key, 10)}).Result()
 	if len(reply) == 0 {
 		return false, fmt.Errorf("record under %s key and %d score not found", t.key, key)
 	}
 	dec := gob.NewDecoder(bytes.NewBuffer([]byte(reply[0])))
-	err = dec.Decode(value)
+	err := dec.Decode(value)
 	if err != nil {
 		return false, err
 	}
@@ -116,12 +118,12 @@ func (t *ts) Drop() error {
 
 func getLast(db *redis.Client, key string, value interface{}) (int64, error) {
 	var last int64 = 0
-	reply, err := db.ZRevRangeWithScores(context.Background(), key, 0, 0).Result()
+	reply, _ := db.ZRevRangeWithScores(context.Background(), key, 0, 0).Result()
 	if len(reply) > 0 {
 		if value != nil {
 			v := reply[0].Member.(string)
 			dec := gob.NewDecoder(bytes.NewBuffer([]byte(v)))
-			if err = dec.Decode(value); err != nil {
+			if err := dec.Decode(value); err != nil {
 				return 0, err
 			}
 		}

+ 7 - 3
internal/pkg/store/sql/sqlKv.go

@@ -1,4 +1,4 @@
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -19,9 +19,10 @@ import (
 	"database/sql"
 	"encoding/gob"
 	"fmt"
+	"strings"
+
 	kvEncoding "github.com/lf-edge/ekuiper/internal/pkg/store/encoding"
 	"github.com/lf-edge/ekuiper/pkg/errorx"
-	"strings"
 )
 
 type sqlKvStore struct {
@@ -48,11 +49,14 @@ func createSqlKvStore(database Database, table string) (*sqlKvStore, error) {
 func (kv *sqlKvStore) Setnx(key string, value interface{}) error {
 	return kv.database.Apply(func(db *sql.DB) error {
 		b, err := kvEncoding.Encode(value)
-		if nil != err {
+		if err != nil {
 			return err
 		}
 		query := fmt.Sprintf("INSERT INTO '%s'(key,val) values(?,?);", kv.table)
 		stmt, err := db.Prepare(query)
+		if err != nil {
+			return err
+		}
 		_, err = stmt.Exec(key, b)
 		stmt.Close()
 		if err != nil {

+ 4 - 4
internal/processor/stream.go

@@ -132,15 +132,15 @@ func (p *StreamProcessor) RecoverLookupTable() error {
 				parser := xsql.NewParser(strings.NewReader(vs.Statement))
 				stmt, e := xsql.Language.Parse(parser)
 				if e != nil {
-					log.Error(err)
+					log.Error(e)
 				}
 				switch s := stmt.(type) {
 				case *ast.StreamStmt:
 					log.Infof("Starting lookup table %s", s.Name)
 					e = lookup.CreateInstance(string(s.Name), s.Options.TYPE, s.Options)
-					if err != nil {
-						log.Errorf("%s", err.Error())
-						return err
+					if e != nil {
+						log.Errorf("%s", e.Error())
+						return e
 					}
 				default:
 					log.Errorf("Invalid lookup table statement: %s", vs.Statement)

+ 3 - 1
internal/server/rpc.go

@@ -360,7 +360,9 @@ func (t *Server) ExportConfiguration(arg *model.ExportDataDesc, reply *string) e
 	} else {
 		jsonBytes, err = ruleMigrationProcessor.ConfigurationPartialExport(rules)
 	}
-	jsonBytes, err = configurationExport()
+	if err != nil {
+		return err
+	}
 	_, err = io.Copy(f, bytes.NewReader(jsonBytes))
 	if err != nil {
 		return fmt.Errorf("fail to save to file %s:%v", file, err)

+ 5 - 1
internal/server/server.go

@@ -200,11 +200,15 @@ func initRuleset() error {
 		defer os.Create(signalFile)
 		content, err := os.ReadFile(filepath.Join(loc, "init.json"))
 		if err != nil {
-			conf.Log.Infof("fail to read init file: %v", err)
+			conf.Log.Errorf("fail to read init file: %v", err)
 			return nil
 		}
 		conf.Log.Infof("start to initialize ruleset")
 		_, counts, err := rulesetProcessor.Import(content)
+		if err != nil {
+			conf.Log.Errorf("fail to import ruleset: %v", err)
+			return nil
+		}
 		conf.Log.Infof("initialzie %d streams, %d tables and %d rules", counts[0], counts[1], counts[2])
 	}
 	return nil

+ 19 - 17
internal/service/external_service_rule_test.go

@@ -18,14 +18,6 @@ import (
 	"context"
 	"encoding/json"
 	"fmt"
-	"github.com/golang/protobuf/ptypes/empty"
-	"github.com/golang/protobuf/ptypes/wrappers"
-	"github.com/gorilla/mux"
-	kconf "github.com/lf-edge/ekuiper/internal/conf"
-	"github.com/lf-edge/ekuiper/internal/topo/topotest"
-	"github.com/lf-edge/ekuiper/pkg/api"
-	"github.com/msgpack-rpc/msgpack-rpc-go/rpc"
-	"google.golang.org/grpc"
 	"io"
 	"net"
 	"net/http"
@@ -33,6 +25,16 @@ import (
 	"reflect"
 	"strconv"
 	"testing"
+
+	"github.com/golang/protobuf/ptypes/empty"
+	"github.com/golang/protobuf/ptypes/wrappers"
+	"github.com/gorilla/mux"
+	"github.com/msgpack-rpc/msgpack-rpc-go/rpc"
+	"google.golang.org/grpc"
+
+	kconf "github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/topo/topotest"
+	"github.com/lf-edge/ekuiper/pkg/api"
 )
 
 type RestHelloRequest struct {
@@ -109,7 +111,7 @@ func TestRestService(t *testing.T) {
 			http.Error(w, err.Error(), http.StatusBadRequest)
 		}
 		out := &RestHelloReply{Message: body.Name}
-		jsonOut(w, err, out)
+		jsonOut(w, out)
 	}).Methods(http.MethodPost)
 	router.HandleFunc("/object_detection", func(w http.ResponseWriter, r *http.Request) {
 		req := &ObjectDetectRequest{}
@@ -127,7 +129,7 @@ func TestRestService(t *testing.T) {
 			Result: req.Command + " success",
 			Type:   "S",
 		}
-		jsonOut(w, err, out)
+		jsonOut(w, out)
 	}).Methods(http.MethodPost)
 	router.HandleFunc("/getStatus", func(w http.ResponseWriter, r *http.Request) {
 		result := count%2 == 0
@@ -153,7 +155,7 @@ func TestRestService(t *testing.T) {
 		}
 		idint, _ := strconv.Atoi(req.Id)
 		out := ShelfMessageOut{Id: int64(idint), Theme: req.Theme}
-		jsonOut(w, err, out)
+		jsonOut(w, out)
 	}).Methods(http.MethodPost)
 	router.HandleFunc("/bookshelf/v1/shelves/{shelf}/books/{book}", func(w http.ResponseWriter, r *http.Request) {
 		defer r.Body.Close()
@@ -164,7 +166,7 @@ func TestRestService(t *testing.T) {
 		}
 		idint, _ := strconv.Atoi(book)
 		out := BookMessage{Id: int64(idint), Author: "NA", Title: "title_" + book}
-		jsonOut(w, err, out)
+		jsonOut(w, out)
 	}).Methods(http.MethodGet)
 	router.HandleFunc("/messaging/v1/messages/{name}", func(w http.ResponseWriter, r *http.Request) {
 		defer r.Body.Close()
@@ -174,7 +176,7 @@ func TestRestService(t *testing.T) {
 			http.Error(w, "empty request", http.StatusBadRequest)
 		}
 		out := MessageMessage{Text: name + " content"}
-		jsonOut(w, err, out)
+		jsonOut(w, out)
 	}).Methods(http.MethodGet)
 	router.HandleFunc("/messaging/v1/messages/filter/{name}", func(w http.ResponseWriter, r *http.Request) {
 		defer r.Body.Close()
@@ -186,7 +188,7 @@ func TestRestService(t *testing.T) {
 			http.Error(w, "empty request", http.StatusBadRequest)
 		}
 		out := MessageMessage{Text: name + rev + sub}
-		jsonOut(w, err, out)
+		jsonOut(w, out)
 	}).Methods(http.MethodGet)
 	router.HandleFunc("/messaging/v1/messages/{name}", func(w http.ResponseWriter, r *http.Request) {
 		defer r.Body.Close()
@@ -201,7 +203,7 @@ func TestRestService(t *testing.T) {
 			http.Error(w, err.Error(), http.StatusBadRequest)
 		}
 		out := MessageMessage{Text: body.Text}
-		jsonOut(w, err, out)
+		jsonOut(w, out)
 	}).Methods(http.MethodPut, http.MethodPatch)
 	server := httptest.NewUnstartedServer(router)
 	server.Listener.Close()
@@ -517,10 +519,10 @@ func TestRestService(t *testing.T) {
 	}, 0)
 }
 
-func jsonOut(w http.ResponseWriter, err error, out interface{}) {
+func jsonOut(w http.ResponseWriter, out interface{}) {
 	w.Header().Add("Content-Type", "application/json")
 	enc := json.NewEncoder(w)
-	err = enc.Encode(out)
+	err := enc.Encode(out)
 	// Problems encoding
 	if err != nil {
 		http.Error(w, err.Error(), http.StatusBadRequest)

File diff suppressed because it is too large
+ 8 - 3
internal/service/manager_test.go


+ 10 - 5
internal/service/schemaHttp.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -16,14 +16,16 @@ package service
 
 import (
 	"fmt"
+	"net/http"
+	"regexp"
+	"strings"
+
 	dpb "github.com/golang/protobuf/protoc-gen-go/descriptor"
 	"github.com/jhump/protoreflect/desc"
 	"github.com/jhump/protoreflect/dynamic"
-	"github.com/lf-edge/ekuiper/pkg/cast"
 	"google.golang.org/protobuf/reflect/protoreflect"
-	"net/http"
-	"regexp"
-	"strings"
+
+	"github.com/lf-edge/ekuiper/pkg/cast"
 )
 
 type httpConnMeta struct {
@@ -181,6 +183,9 @@ func (d *wrappedProtoDescriptor) ConvertHttpMapping(method string, params []inte
 				return nil, fmt.Errorf("invalid body field %s, must be a message", ho.BodyField)
 			}
 		}
+		if err != nil {
+			return nil, err
+		}
 	} else { // If options are not set, use the default setting
 		hcm.Method = "POST"
 		hcm.Uri = "/" + method

+ 5 - 1
internal/topo/planner/planner.go

@@ -17,6 +17,7 @@ package planner
 import (
 	"errors"
 	"fmt"
+
 	"github.com/lf-edge/ekuiper/internal/conf"
 	store2 "github.com/lf-edge/ekuiper/internal/pkg/store"
 	"github.com/lf-edge/ekuiper/internal/topo"
@@ -198,7 +199,10 @@ func buildOps(lp LogicalPlan, tp *topo.Topo, options *api.RuleOption, sources []
 	case *ProjectSetPlan:
 		op = Transform(&operator.ProjectSetOperator{SrfMapping: t.SrfMapping}, fmt.Sprintf("%d_projectset", newIndex), options)
 	default:
-		return nil, 0, fmt.Errorf("unknown logical plan %v", t)
+		err = fmt.Errorf("unknown logical plan %v", t)
+	}
+	if err != nil {
+		return nil, 0, err
 	}
 	if uop, ok := op.(*node.UnaryOperator); ok {
 		uop.SetConcurrency(options.Concurrency)