Jelajahi Sumber

style: sort go imports (#1844)

* style: sort go imports

Made by command: gci write --skip-generated -s standard -s default -s "prefix(github.com/lf-edge/ekuiper)" .

Signed-off-by: xjasonlyu <xjasonlyu@gmail.com>

* style: format with gofumpt

Made by command: gofumpt -w .

Signed-off-by: xjasonlyu <xjasonlyu@gmail.com>

---------

Signed-off-by: xjasonlyu <xjasonlyu@gmail.com>
Jason Lyu 1 tahun lalu
induk
melakukan
c6ad75afac
100 mengubah file dengan 472 tambahan dan 348 penghapusan
  1. 21 18
      cmd/kuiper/main.go
  2. 3 3
      extensions/functions/accumulateWordCount/accumulateWordCount.go
  3. 2 2
      extensions/functions/countPlusOne/countPlusOne.go
  4. 2 2
      extensions/functions/echo/echo.go
  5. 39 22
      extensions/functions/geohash/geohash.go
  6. 5 4
      extensions/functions/image/resize.go
  7. 1 1
      extensions/functions/image/resize_test.go
  8. 5 4
      extensions/functions/image/thumbnail.go
  9. 6 4
      extensions/functions/labelImage/labelImage.go
  10. 4 2
      extensions/functions/tfLite/interpreters.go
  11. 4 3
      extensions/functions/tfLite/tfLite.go
  12. 3 2
      extensions/functions/tfLite/tfLite_test.go
  13. 3 1
      extensions/sinks/influx/influx.go
  14. 3 1
      extensions/sinks/influx2/influx2.go
  15. 6 5
      extensions/sinks/influx2/influx2_test.go
  16. 7 5
      extensions/sinks/kafka/kafka.go
  17. 1 1
      extensions/sinks/sql/sql.go
  18. 3 3
      extensions/sinks/sql/sql_test.go
  19. 5 3
      extensions/sinks/tdengine/tdengine.go
  20. 10 9
      extensions/sinks/tdengine/tdengine_test.go
  21. 3 1
      extensions/sinks/zmq/zmq.go
  22. 1 1
      extensions/sources/random/random.go
  23. 1 1
      extensions/sources/sql/sql.go
  24. 6 4
      extensions/sources/video/video.go
  25. 3 2
      extensions/sources/zmq/zmq.go
  26. 1 0
      extensions/sqldatabase/sqlgen/commonSqlDialect.go
  27. 1 0
      extensions/sqldatabase/sqlgen/sqlServerDialect.go
  28. 0 1
      extensions/sqldatabase/sqlgen/sqlServerDialect_test.go
  29. 1 0
      extensions/sqldatabase/sqlgen/sqlgen.go
  30. 4 2
      extensions/sqldatabase/sqlgen/templateSqlDialect.go
  31. 2 2
      extensions/sqldatabase/sqlgen/templateSqlDialect_test.go
  32. 2 1
      extensions/util/pool.go
  33. 1 0
      internal/binder/function/binder.go
  34. 3 2
      internal/binder/function/binder_test.go
  35. 6 1
      internal/binder/function/funcs_agg.go
  36. 7 5
      internal/binder/function/funcs_agg_test.go
  37. 3 2
      internal/binder/function/funcs_analytic.go
  38. 21 20
      internal/binder/function/funcs_analytic_test.go
  39. 2 1
      internal/binder/function/funcs_cols.go
  40. 6 5
      internal/binder/function/funcs_cols_test.go
  41. 3 2
      internal/binder/function/funcs_math.go
  42. 5 4
      internal/binder/function/funcs_math_test.go
  43. 7 6
      internal/binder/function/funcs_misc.go
  44. 11 9
      internal/binder/function/funcs_misc_test.go
  45. 1 1
      internal/binder/function/funcs_srf_test.go
  46. 1 0
      internal/binder/function/funcs_stateful.go
  47. 5 4
      internal/binder/function/funcs_stateful_test.go
  48. 4 3
      internal/binder/function/funcs_str.go
  49. 6 3
      internal/binder/function/function.go
  50. 1 1
      internal/binder/function/function_test.go
  51. 2 3
      internal/binder/function/static_executor.go
  52. 1 0
      internal/binder/function/validator_funcs.go
  53. 1 0
      internal/binder/io/binder.go
  54. 3 2
      internal/binder/io/binder_test.go
  55. 5 3
      internal/binder/io/builtin.go
  56. 1 0
      internal/compressor/compressor.go
  57. 1 0
      internal/compressor/decompressor.go
  58. 3 1
      internal/compressor/flate.go
  59. 3 1
      internal/compressor/gzip.go
  60. 3 1
      internal/compressor/zlib.go
  61. 1 0
      internal/compressor/zstd.go
  62. 6 4
      internal/conf/conf.go
  63. 12 11
      internal/conf/conf_test.go
  64. 0 1
      internal/conf/connect_selector.go
  65. 3 1
      internal/conf/jsonpath_eval.go
  66. 0 1
      internal/conf/load_test.go
  67. 3 2
      internal/conf/logger.go
  68. 12 10
      internal/conf/path.go
  69. 1 1
      internal/conf/path_test.go
  70. 2 1
      internal/conf/redis_store_config.go
  71. 0 1
      internal/conf/redis_store_config_test.go
  72. 2 1
      internal/conf/syslog.go
  73. 3 5
      internal/conf/time.go
  74. 4 3
      internal/conf/yaml_config_ops.go
  75. 4 9
      internal/conf/yaml_config_ops_test.go
  76. 14 14
      internal/converter/converter.go
  77. 2 2
      internal/converter/custom/converter.go
  78. 8 6
      internal/converter/custom/converter_test.go
  79. 1 0
      internal/converter/custom/test/myformat.go
  80. 2 1
      internal/converter/delimited/converter.go
  81. 2 1
      internal/converter/delimited/converter_test.go
  82. 2 0
      internal/converter/protobuf/converter.go
  83. 6 5
      internal/converter/protobuf/converter_test.go
  84. 1 1
      internal/converter/protobuf/fieldConverterSingleton.go
  85. 1 0
      internal/converter/protobuf/test/helloworld_wrapper.go
  86. 2 1
      internal/converter/static/load.go
  87. 5 4
      internal/io/edgex/edgex_sink.go
  88. 12 8
      internal/io/edgex/edgex_sink_test.go
  89. 10 8
      internal/io/edgex/edgex_source.go
  90. 10 8
      internal/io/edgex/edgex_source_test.go
  91. 5 4
      internal/io/file/file_sink.go
  92. 29 18
      internal/io/file/file_sink_test.go
  93. 8 6
      internal/io/file/file_source.go
  94. 7 5
      internal/io/file/file_source_test.go
  95. 6 5
      internal/io/file/file_stream_test.go
  96. 5 4
      internal/io/http/client.go
  97. 2 1
      internal/io/http/client_test.go
  98. 2 3
      internal/io/http/httppull_source.go
  99. 3 2
      internal/io/http/httppush_source.go
  100. 0 0
      internal/io/http/httpserver/data_server.go

+ 21 - 18
cmd/kuiper/main.go

@@ -61,7 +61,7 @@ func main() {
 	app := cli.NewApp()
 	app.Version = Version
 
-	//nflag := []cli.Flag { cli.StringFlag{
+	// nflag := []cli.Flag { cli.StringFlag{
 	//		Name: "name, n",
 	//		Usage: "the name of stream",
 	//	}}
@@ -367,7 +367,7 @@ func main() {
 				{
 					Name:  "stream",
 					Usage: "describe stream $stream_name",
-					//Flags: nflag,
+					// Flags: nflag,
 					Action: func(c *cli.Context) error {
 						streamProcess(client, "")
 						return nil
@@ -376,7 +376,7 @@ func main() {
 				{
 					Name:  "table",
 					Usage: "describe table $table_name",
-					//Flags: nflag,
+					// Flags: nflag,
 					Action: func(c *cli.Context) error {
 						streamProcess(client, "")
 						return nil
@@ -404,7 +404,7 @@ func main() {
 				{
 					Name:  "plugin",
 					Usage: "describe plugin $plugin_type $plugin_name",
-					//Flags: nflag,
+					// Flags: nflag,
 					Action: func(c *cli.Context) error {
 						ptype, err := getPluginType(c.Args()[0])
 						if err != nil {
@@ -436,7 +436,7 @@ func main() {
 				{
 					Name:  "udf",
 					Usage: "describe udf $udf_name",
-					//Flags: nflag,
+					// Flags: nflag,
 					Action: func(c *cli.Context) error {
 						if len(c.Args()) != 1 {
 							fmt.Printf("Expect udf name.\n")
@@ -524,7 +524,7 @@ func main() {
 				{
 					Name:  "stream",
 					Usage: "drop stream $stream_name",
-					//Flags: nflag,
+					// Flags: nflag,
 					Action: func(c *cli.Context) error {
 						streamProcess(client, "")
 						return nil
@@ -533,7 +533,7 @@ func main() {
 				{
 					Name:  "table",
 					Usage: "drop table $table_name",
-					//Flags: nflag,
+					// Flags: nflag,
 					Action: func(c *cli.Context) error {
 						streamProcess(client, "")
 						return nil
@@ -542,7 +542,7 @@ func main() {
 				{
 					Name:  "rule",
 					Usage: "drop rule $rule_name",
-					//Flags: nflag,
+					// Flags: nflag,
 					Action: func(c *cli.Context) error {
 						if len(c.Args()) != 1 {
 							fmt.Printf("Expect rule name.\n")
@@ -718,7 +718,8 @@ func main() {
 						}
 						return nil
 					},
-				}, {
+				},
+				{
 					Name:  "services",
 					Usage: "show services",
 					Action: func(c *cli.Context) error {
@@ -731,7 +732,8 @@ func main() {
 						}
 						return nil
 					},
-				}, {
+				},
+				{
 					Name:  "service_funcs",
 					Usage: "show service_funcs",
 					Action: func(c *cli.Context) error {
@@ -744,7 +746,8 @@ func main() {
 						}
 						return nil
 					},
-				}, {
+				},
+				{
 					Name:  "schemas",
 					Usage: "show schemas $schema_type",
 					Action: func(c *cli.Context) error {
@@ -773,7 +776,7 @@ func main() {
 				{
 					Name:  "rule",
 					Usage: "getstatus rule $rule_name",
-					//Flags: nflag,
+					// Flags: nflag,
 					Action: func(c *cli.Context) error {
 						if len(c.Args()) != 1 {
 							fmt.Printf("Expect rule name.\n")
@@ -793,7 +796,7 @@ func main() {
 				{
 					Name:  "import",
 					Usage: "getstatus import",
-					//Flags: nflag,
+					// Flags: nflag,
 					Action: func(c *cli.Context) error {
 						var reply string
 						err = client.Call("Server.GetStatusImport", 0, &reply)
@@ -815,7 +818,7 @@ func main() {
 				{
 					Name:  "rule",
 					Usage: "getstopo rule $rule_name",
-					//Flags: nflag,
+					// Flags: nflag,
 					Action: func(c *cli.Context) error {
 						if len(c.Args()) != 1 {
 							fmt.Printf("Expect rule name.\n")
@@ -842,7 +845,7 @@ func main() {
 				{
 					Name:  "rule",
 					Usage: "start rule $rule_name",
-					//Flags: nflag,
+					// Flags: nflag,
 					Action: func(c *cli.Context) error {
 						if len(c.Args()) != 1 {
 							fmt.Printf("Expect rule name.\n")
@@ -869,7 +872,7 @@ func main() {
 				{
 					Name:  "rule",
 					Usage: "stop rule $rule_name",
-					//Flags: nflag,
+					// Flags: nflag,
 					Action: func(c *cli.Context) error {
 						if len(c.Args()) != 1 {
 							fmt.Printf("Expect rule name.\n")
@@ -896,7 +899,7 @@ func main() {
 				{
 					Name:  "rule",
 					Usage: "restart rule $rule_name",
-					//Flags: nflag,
+					// Flags: nflag,
 					Action: func(c *cli.Context) error {
 						if len(c.Args()) != 1 {
 							fmt.Printf("Expect rule name.\n")
@@ -1149,7 +1152,7 @@ func main() {
 
 	app.Action = func(c *cli.Context) error {
 		cli.ShowSubcommandHelp(c)
-		//cli.ShowVersion(c)
+		// cli.ShowVersion(c)
 
 		return nil
 	}

+ 3 - 3
extensions/functions/accumulateWordCount/accumulateWordCount.go

@@ -16,9 +16,10 @@ package main
 
 import (
 	"fmt"
+	"strings"
+
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/ast"
-	"strings"
 )
 
 /**
@@ -29,8 +30,7 @@ import (
  **  1: separator, a string literal for word separator
  **/
 
-type accumulateWordCountFunc struct {
-}
+type accumulateWordCountFunc struct{}
 
 func (f *accumulateWordCountFunc) Validate(args []interface{}) error {
 	if len(args) != 2 {

+ 2 - 2
extensions/functions/countPlusOne/countPlusOne.go

@@ -16,11 +16,11 @@ package main
 
 import (
 	"fmt"
+
 	"github.com/lf-edge/ekuiper/pkg/api"
 )
 
-type countPlusOneFunc struct {
-}
+type countPlusOneFunc struct{}
 
 func (f *countPlusOneFunc) Validate(args []interface{}) error {
 	if len(args) != 1 {

+ 2 - 2
extensions/functions/echo/echo.go

@@ -16,11 +16,11 @@ package main
 
 import (
 	"fmt"
+
 	"github.com/lf-edge/ekuiper/pkg/api"
 )
 
-type echo struct {
-}
+type echo struct{}
 
 func (f *echo) Validate(args []interface{}) error {
 	if len(args) != 1 {

+ 39 - 22
extensions/functions/geohash/geohash.go

@@ -16,30 +16,22 @@ package main
 
 import (
 	"fmt"
-	"github.com/lf-edge/ekuiper/pkg/api"
+
 	"github.com/mmcloughlin/geohash"
+
+	"github.com/lf-edge/ekuiper/pkg/api"
 )
 
-type geohashEncode struct {
-}
-type geohashEncodeInt struct {
-}
-type geohashDecode struct {
-}
-type geohashDecodeInt struct {
-}
-type geohashBoundingBox struct {
-}
-type geohashBoundingBoxInt struct {
-}
-type geohashNeighbor struct {
-}
-type geohashNeighborInt struct {
-}
-type geohashNeighbors struct {
-}
-type geohashNeighborsInt struct {
-}
+type geohashEncode struct{}
+type geohashEncodeInt struct{}
+type geohashDecode struct{}
+type geohashDecodeInt struct{}
+type geohashBoundingBox struct{}
+type geohashBoundingBoxInt struct{}
+type geohashNeighbor struct{}
+type geohashNeighborInt struct{}
+type geohashNeighbors struct{}
+type geohashNeighborsInt struct{}
 type position struct {
 	Longitude float64
 	Latitude  float64
@@ -64,36 +56,46 @@ var (
 		"South":     geohash.South,
 		"SouthWest": geohash.SouthWest,
 		"West":      geohash.West,
-		"NorthWest": geohash.NorthWest}
+		"NorthWest": geohash.NorthWest,
+	}
 )
 
 func (r *geohashEncode) IsAggregate() bool {
 	return false
 }
+
 func (r *geohashEncodeInt) IsAggregate() bool {
 	return false
 }
+
 func (r *geohashDecode) IsAggregate() bool {
 	return false
 }
+
 func (r *geohashDecodeInt) IsAggregate() bool {
 	return false
 }
+
 func (r *geohashBoundingBox) IsAggregate() bool {
 	return false
 }
+
 func (r *geohashBoundingBoxInt) IsAggregate() bool {
 	return false
 }
+
 func (r *geohashNeighbor) IsAggregate() bool {
 	return false
 }
+
 func (r *geohashNeighborInt) IsAggregate() bool {
 	return false
 }
+
 func (r *geohashNeighbors) IsAggregate() bool {
 	return false
 }
+
 func (r *geohashNeighborsInt) IsAggregate() bool {
 	return false
 }
@@ -104,54 +106,63 @@ func (r *geohashEncode) Validate(args []interface{}) error {
 	}
 	return nil
 }
+
 func (r *geohashEncodeInt) Validate(args []interface{}) error {
 	if len(args) != 2 {
 		return fmt.Errorf("The geohashEncodeInt function supports 2 parameters, but got %d", len(args))
 	}
 	return nil
 }
+
 func (r *geohashDecode) Validate(args []interface{}) error {
 	if len(args) != 1 {
 		return fmt.Errorf("The geohashDecode function supports 1 parameters, but got %d", len(args))
 	}
 	return nil
 }
+
 func (r *geohashDecodeInt) Validate(args []interface{}) error {
 	if len(args) != 1 {
 		return fmt.Errorf("The geohashDecodeInt function supports 1 parameters, but got %d", len(args))
 	}
 	return nil
 }
+
 func (r *geohashBoundingBox) Validate(args []interface{}) error {
 	if len(args) != 1 {
 		return fmt.Errorf("The geohashBoundingBox function supports 1 parameters, but got %d", len(args))
 	}
 	return nil
 }
+
 func (r *geohashBoundingBoxInt) Validate(args []interface{}) error {
 	if len(args) != 1 {
 		return fmt.Errorf("The geohashBoundingBoxInt function supports 1 parameters, but got %d", len(args))
 	}
 	return nil
 }
+
 func (r *geohashNeighbor) Validate(args []interface{}) error {
 	if len(args) != 2 {
 		return fmt.Errorf("The geohashNeighbor function supports 2 parameters, but got %d", len(args))
 	}
 	return nil
 }
+
 func (r *geohashNeighborInt) Validate(args []interface{}) error {
 	if len(args) != 2 {
 		return fmt.Errorf("The geohashNeighborInt function supports 2 parameters, but got %d", len(args))
 	}
 	return nil
 }
+
 func (r *geohashNeighbors) Validate(args []interface{}) error {
 	if len(args) != 1 {
 		return fmt.Errorf("The geohashNeighbors function supports 1 parameters, but got %d", len(args))
 	}
 	return nil
 }
+
 func (r *geohashNeighborsInt) Validate(args []interface{}) error {
 	if len(args) != 1 {
 		return fmt.Errorf("The geohashNeighborsInt function supports 1 parameters, but got %d", len(args))
@@ -170,6 +181,7 @@ func (r *geohashEncode) Exec(args []interface{}, _ api.FunctionContext) (interfa
 	}
 	return geohash.Encode(la, lo), true
 }
+
 func (r *geohashEncodeInt) Exec(args []interface{}, _ api.FunctionContext) (interface{}, bool) {
 	la, ok := args[0].(float64)
 	if !ok {
@@ -193,6 +205,7 @@ func (r *geohashDecode) Exec(args []interface{}, _ api.FunctionContext) (interfa
 	la, lo := geohash.Decode(hash)
 	return position{Longitude: lo, Latitude: la}, true
 }
+
 func (r *geohashDecodeInt) Exec(args []interface{}, _ api.FunctionContext) (interface{}, bool) {
 	hash, ok := args[0].(uint64)
 	if !ok || 0 > hash {
@@ -201,6 +214,7 @@ func (r *geohashDecodeInt) Exec(args []interface{}, _ api.FunctionContext) (inte
 	la, lo := geohash.DecodeInt(hash)
 	return position{Longitude: lo, Latitude: la}, true
 }
+
 func (r *geohashBoundingBox) Exec(args []interface{}, _ api.FunctionContext) (interface{}, bool) {
 	hash, ok := args[0].(string)
 	if !ok || 0 == len(hash) {
@@ -211,6 +225,7 @@ func (r *geohashBoundingBox) Exec(args []interface{}, _ api.FunctionContext) (in
 	}
 	return geohash.BoundingBox(hash), true
 }
+
 func (r *geohashBoundingBoxInt) Exec(args []interface{}, _ api.FunctionContext) (interface{}, bool) {
 	hash, ok := args[0].(uint64)
 	if !ok || 0 > hash {
@@ -218,6 +233,7 @@ func (r *geohashBoundingBoxInt) Exec(args []interface{}, _ api.FunctionContext)
 	}
 	return geohash.BoundingBoxInt(hash), true
 }
+
 func (r *geohashNeighbor) Exec(args []interface{}, _ api.FunctionContext) (interface{}, bool) {
 	hash, ok := args[0].(string)
 	if !ok || 0 == len(hash) {
@@ -239,6 +255,7 @@ func (r *geohashNeighbor) Exec(args []interface{}, _ api.FunctionContext) (inter
 	}
 	return geohash.Neighbor(hash, directionCode), true
 }
+
 func (r *geohashNeighborInt) Exec(args []interface{}, _ api.FunctionContext) (interface{}, bool) {
 	hash, ok := args[0].(uint64)
 	if !ok || 0 > hash {

+ 5 - 4
extensions/functions/image/resize.go

@@ -17,16 +17,17 @@ package main
 import (
 	"bytes"
 	"fmt"
-	"github.com/lf-edge/ekuiper/pkg/api"
-	"github.com/nfnt/resize"
 	"image"
 	"image/gif"
 	"image/jpeg"
 	"image/png"
+
+	"github.com/nfnt/resize"
+
+	"github.com/lf-edge/ekuiper/pkg/api"
 )
 
-type imageResize struct {
-}
+type imageResize struct{}
 
 func (f *imageResize) Validate(args []interface{}) error {
 	if len(args) < 3 {

+ 1 - 1
extensions/functions/image/resize_test.go

@@ -30,7 +30,7 @@ func TestResize(t *testing.T) {
 	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
 	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
 	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 2)
-	var tests = []struct {
+	tests := []struct {
 		image  string
 		result string
 	}{

+ 5 - 4
extensions/functions/image/thumbnail.go

@@ -17,15 +17,16 @@ package main
 import (
 	"bytes"
 	"fmt"
-	"github.com/lf-edge/ekuiper/pkg/api"
-	"github.com/nfnt/resize"
 	"image"
 	"image/jpeg"
 	"image/png"
+
+	"github.com/nfnt/resize"
+
+	"github.com/lf-edge/ekuiper/pkg/api"
 )
 
-type thumbnail struct {
-}
+type thumbnail struct{}
 
 func (f *thumbnail) Validate(args []interface{}) error {
 	if len(args) != 3 {

+ 6 - 4
extensions/functions/labelImage/labelImage.go

@@ -18,9 +18,6 @@ import (
 	"bufio"
 	"bytes"
 	"fmt"
-	"github.com/lf-edge/ekuiper/pkg/api"
-	tflite "github.com/mattn/go-tflite"
-	"github.com/nfnt/resize"
 	"image"
 	_ "image/jpeg"
 	_ "image/png"
@@ -28,6 +25,11 @@ import (
 	"path"
 	"sort"
 	"sync"
+
+	tflite "github.com/mattn/go-tflite"
+	"github.com/nfnt/resize"
+
+	"github.com/lf-edge/ekuiper/pkg/api"
 )
 
 type labelImage struct {
@@ -91,7 +93,7 @@ func (f *labelImage) Exec(args []interface{}, ctx api.FunctionContext) (interfac
 
 		f.interpreter = interpreter
 		// TODO If created, the interpreter will be kept through the whole life of kuiper. Refactor this later.
-		//defer interpreter.Delete()
+		// defer interpreter.Delete()
 	})
 
 	if f.interpreter == nil {

+ 4 - 2
extensions/functions/tfLite/interpreters.go

@@ -16,10 +16,12 @@ package main
 
 import (
 	"fmt"
-	"github.com/lf-edge/ekuiper/internal/conf"
-	"github.com/mattn/go-tflite"
 	"path/filepath"
 	"sync"
+
+	"github.com/mattn/go-tflite"
+
+	"github.com/lf-edge/ekuiper/internal/conf"
 )
 
 var ipManager *interpreterManager

+ 4 - 3
extensions/functions/tfLite/tfLite.go

@@ -16,13 +16,14 @@ package main
 
 import (
 	"fmt"
+
+	"github.com/mattn/go-tflite"
+
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/cast"
-	"github.com/mattn/go-tflite"
 )
 
-type Tffunc struct {
-}
+type Tffunc struct{}
 
 // Validate the arguments.
 // args[0]: string, model name which maps to a path

+ 3 - 2
extensions/functions/tfLite/tfLite_test.go

@@ -15,12 +15,13 @@
 package main
 
 import (
+	"reflect"
+	"testing"
+
 	"github.com/lf-edge/ekuiper/internal/conf"
 	kctx "github.com/lf-edge/ekuiper/internal/topo/context"
 	"github.com/lf-edge/ekuiper/internal/topo/state"
 	"github.com/lf-edge/ekuiper/pkg/api"
-	"reflect"
-	"testing"
 )
 
 func TestTffunc_Exec(t *testing.T) {

+ 3 - 1
extensions/sinks/influx/influx.go

@@ -32,11 +32,13 @@ package main
 import (
 	"encoding/json"
 	"fmt"
+	"time"
+
 	_ "github.com/influxdata/influxdb1-client/v2"
 	client "github.com/influxdata/influxdb1-client/v2"
+
 	"github.com/lf-edge/ekuiper/internal/topo/transform"
 	"github.com/lf-edge/ekuiper/pkg/api"
-	"time"
 )
 
 type influxSink struct {

+ 3 - 1
extensions/sinks/influx2/influx2.go

@@ -29,11 +29,13 @@ package main
 import (
 	"encoding/json"
 	"fmt"
+	"time"
+
 	_ "github.com/influxdata/influxdb-client-go/v2"
 	client "github.com/influxdata/influxdb-client-go/v2"
+
 	"github.com/lf-edge/ekuiper/internal/topo/transform"
 	"github.com/lf-edge/ekuiper/pkg/api"
-	"time"
 )
 
 type influxSink2 struct {

+ 6 - 5
extensions/sinks/influx2/influx2_test.go

@@ -16,18 +16,19 @@ package main
 
 import (
 	"fmt"
-	"github.com/lf-edge/ekuiper/internal/testx"
 	"reflect"
 	"testing"
+
+	"github.com/lf-edge/ekuiper/internal/testx"
 )
 
 func TestConfig(t *testing.T) {
-	var tests = []struct {
+	tests := []struct {
 		conf     map[string]interface{}
 		expected *influxSink2
 		error    string
 	}{
-		{ //0
+		{ // 0
 			conf: map[string]interface{}{
 				"addr":        "http://192.168.0.3:8086",
 				"token":       "Token_test",
@@ -52,7 +53,7 @@ func TestConfig(t *testing.T) {
 				hasTransform: false,
 			},
 		},
-		{ //1
+		{ // 1
 			conf: map[string]interface{}{
 				"addr":         "http://192.168.0.3:8086",
 				"token":        "Token_test",
@@ -78,7 +79,7 @@ func TestConfig(t *testing.T) {
 				hasTransform: false,
 			},
 		},
-		{ //2
+		{ // 2
 			conf: map[string]interface{}{
 				"addr":         "http://192.168.0.3:8086",
 				"token":        "Token_test",

+ 7 - 5
extensions/sinks/kafka/kafka.go

@@ -16,14 +16,16 @@ package main
 
 import (
 	"fmt"
-	"github.com/lf-edge/ekuiper/pkg/api"
-	"github.com/lf-edge/ekuiper/pkg/cast"
-	"github.com/lf-edge/ekuiper/pkg/errorx"
+	"strings"
+
 	kafkago "github.com/segmentio/kafka-go"
 	"github.com/segmentio/kafka-go/sasl"
 	"github.com/segmentio/kafka-go/sasl/plain"
 	"github.com/segmentio/kafka-go/sasl/scram"
-	"strings"
+
+	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/cast"
+	"github.com/lf-edge/ekuiper/pkg/errorx"
 )
 
 type kafkaSink struct {
@@ -77,7 +79,7 @@ func (m *kafkaSink) Open(ctx api.StreamContext) error {
 	var err error
 	var mechanism sasl.Mechanism
 
-	//sasl authentication type
+	// sasl authentication type
 	switch m.c.SaslAuthType {
 	case SASL_PLAIN:
 		mechanism = plain.Mechanism{

+ 1 - 1
extensions/sinks/sql/sql.go

@@ -89,7 +89,7 @@ func (t *sqlConfig) getKeyValues(ctx api.StreamContext, mapData map[string]inter
 
 type sqlSink struct {
 	conf *sqlConfig
-	//The db connection instance
+	// The db connection instance
 	db *sql.DB
 }
 

+ 3 - 3
extensions/sinks/sql/sql_test.go

@@ -61,7 +61,7 @@ func TestSingle(t *testing.T) {
 		t.Error(err)
 		return
 	}
-	var data = []map[string]interface{}{
+	data := []map[string]interface{}{
 		{"id": 1, "name": "John", "address": "343", "mobile": "334433"},
 		{"id": 2, "name": "Susan", "address": "34", "mobile": "334433"},
 		{"id": 3, "name": "Susan", "address": "34", "mobile": "334433"},
@@ -126,7 +126,7 @@ func TestBatch(t *testing.T) {
 		t.Error(err)
 		return
 	}
-	var data = []map[string]interface{}{
+	data := []map[string]interface{}{
 		{"id": 1, "name": "John", "address": "343", "mobile": "334433"},
 		{"id": 2, "name": "Susan", "address": "34", "mobile": "334433"},
 		{"id": 3, "name": "Susan", "address": "34", "mobile": "334433"},
@@ -190,7 +190,7 @@ func TestUpdate(t *testing.T) {
 		t.Error(err)
 		return
 	}
-	var test = []struct {
+	test := []struct {
 		d []map[string]interface{}
 		b bool
 		r []map[string]interface{}

+ 5 - 3
extensions/sinks/tdengine/tdengine.go

@@ -18,13 +18,15 @@ import (
 	"database/sql"
 	"encoding/json"
 	"fmt"
+	"reflect"
+	"strings"
+
+	_ "github.com/taosdata/driver-go/v2/taosSql"
+
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/errorx"
-	_ "github.com/taosdata/driver-go/v2/taosSql"
-	"reflect"
-	"strings"
 )
 
 type (

+ 10 - 9
extensions/sinks/tdengine/tdengine_test.go

@@ -16,21 +16,22 @@ package main
 
 import (
 	"fmt"
+	"reflect"
+	"testing"
+
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/testx"
 	"github.com/lf-edge/ekuiper/internal/topo/context"
 	"github.com/lf-edge/ekuiper/internal/topo/state"
-	"reflect"
-	"testing"
 )
 
 func TestConfig(t *testing.T) {
-	var tests = []struct {
+	tests := []struct {
 		conf     map[string]interface{}
 		expected *taosConfig
 		error    string
 	}{
-		{ //0
+		{ // 0
 			conf: map[string]interface{}{
 				"host":        "e0d9d8089bef",
 				"port":        6030,
@@ -52,7 +53,7 @@ func TestConfig(t *testing.T) {
 				Fields:      nil,
 			},
 		},
-		{ //1
+		{ // 1
 			conf: map[string]interface{}{
 				"ip":          "e0d9d8089bef",
 				"port":        6030,
@@ -76,7 +77,7 @@ func TestConfig(t *testing.T) {
 				Fields:      nil,
 			},
 		},
-		{ //2
+		{ // 2
 			conf: map[string]interface{}{
 				"port":        6030,
 				"database":    "dab",
@@ -101,7 +102,7 @@ func TestConfig(t *testing.T) {
 				TagFields:   []string{"a", "b"},
 			},
 		},
-		{ //3
+		{ // 3
 			conf: map[string]interface{}{
 				"port":     6030,
 				"database": "dab",
@@ -110,7 +111,7 @@ func TestConfig(t *testing.T) {
 			},
 			error: "property TsFieldName is required",
 		},
-		{ //4
+		{ // 4
 			conf: map[string]interface{}{
 				"port":        6030,
 				"database":    "dab",
@@ -136,7 +137,7 @@ func TestConfig(t *testing.T) {
 }
 
 func TestBuildSql(t *testing.T) {
-	var tests = []struct {
+	tests := []struct {
 		conf     *taosConfig
 		data     map[string]interface{}
 		expected string

+ 3 - 1
extensions/sinks/zmq/zmq.go

@@ -16,9 +16,11 @@ package main
 
 import (
 	"fmt"
+
+	zmq "github.com/pebbe/zmq4"
+
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/errorx"
-	zmq "github.com/pebbe/zmq4"
 )
 
 type zmqSink struct {

+ 1 - 1
extensions/sources/random/random.go

@@ -112,7 +112,7 @@ func (s *randomSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTup
 func randomize(p map[string]interface{}, seed int) map[string]interface{} {
 	r := make(map[string]interface{})
 	for k, v := range p {
-		//TODO other data types
+		// TODO other data types
 		vi, err := cast.ToInt(v, cast.STRICT)
 		if err != nil {
 			break

+ 1 - 1
extensions/sources/sql/sql.go

@@ -35,7 +35,7 @@ type sqlConConfig struct {
 type sqlsource struct {
 	conf  *sqlConConfig
 	Query sqlgen.SqlQueryGenerator
-	//The db connection instance
+	// The db connection instance
 	db *sql.DB
 }
 

+ 6 - 4
extensions/sources/video/video.go

@@ -20,14 +20,17 @@ import (
 	"os"
 	"time"
 
+	ffmpeg "github.com/u2takey/ffmpeg-go"
+
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/cast"
-	ffmpeg "github.com/u2takey/ffmpeg-go"
 )
 
-const RTSP_DEFAULT_INTERVAL = 10000
-const FRAMENUMBER = 5
+const (
+	RTSP_DEFAULT_INTERVAL = 10000
+	FRAMENUMBER           = 5
+)
 
 type VideoPullSource struct {
 	url      string
@@ -35,7 +38,6 @@ type VideoPullSource struct {
 }
 
 func (rps *VideoPullSource) Configure(_ string, props map[string]interface{}) error {
-
 	if u, ok := props["url"]; ok {
 		if p, ok := u.(string); ok {
 			rps.url = p

+ 3 - 2
extensions/sources/zmq/zmq.go

@@ -18,9 +18,10 @@ import (
 	"context"
 	"fmt"
 
+	zmq "github.com/pebbe/zmq4"
+
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/pkg/api"
-	zmq "github.com/pebbe/zmq4"
 )
 
 type zmqSource struct {
@@ -94,7 +95,7 @@ func (s *zmqSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple,
 			}
 			return
 		default:
-			//do nothing
+			// do nothing
 		}
 	}
 }

+ 1 - 0
extensions/sqldatabase/sqlgen/commonSqlDialect.go

@@ -16,6 +16,7 @@ package sqlgen
 
 import (
 	"fmt"
+
 	"github.com/lf-edge/ekuiper/pkg/cast"
 )
 

+ 1 - 0
extensions/sqldatabase/sqlgen/sqlServerDialect.go

@@ -16,6 +16,7 @@ package sqlgen
 
 import (
 	"fmt"
+
 	"github.com/lf-edge/ekuiper/pkg/cast"
 )
 

+ 0 - 1
extensions/sqldatabase/sqlgen/sqlServerDialect_test.go

@@ -81,7 +81,6 @@ func TestQueryGenerator_SqlQueryStatement(t *testing.T) {
 }
 
 func TestInternalQuery(t *testing.T) {
-
 	s := NewSqlServerQuery(&InternalSqlQueryCfg{
 		Table:      "table",
 		Limit:      10,

+ 1 - 0
extensions/sqldatabase/sqlgen/sqlgen.go

@@ -16,6 +16,7 @@ package sqlgen
 
 import (
 	"fmt"
+
 	"github.com/lf-edge/ekuiper/pkg/cast"
 )
 

+ 4 - 2
extensions/sqldatabase/sqlgen/templateSqlDialect.go

@@ -17,9 +17,11 @@ package sqlgen
 import (
 	"bytes"
 	"fmt"
-	"github.com/lf-edge/ekuiper/pkg/cast"
-	"github.com/posener/order"
 	"text/template"
+
+	"github.com/posener/order"
+
+	"github.com/lf-edge/ekuiper/pkg/cast"
 )
 
 type templateSqlQuery struct {

+ 2 - 2
extensions/sqldatabase/sqlgen/templateSqlDialect_test.go

@@ -141,7 +141,7 @@ func TestTemplateQuery_DateTime(t *testing.T) {
 
 	s, _ := NewTemplateSqlQuery(sqlcfg.TemplateSqlQueryCfg)
 
-	//query
+	// query
 	firstSqlStr, _ := s.SqlQueryStatement()
 
 	want := "select * from table where responseTime > `2008-10-25 14:56:59.123`"
@@ -150,7 +150,7 @@ func TestTemplateQuery_DateTime(t *testing.T) {
 		t.Errorf("SqlQueryStatement() = %v, want %v", firstSqlStr, want)
 	}
 
-	//query result
+	// query result
 	s.UpdateMaxIndexValue(map[string]interface{}{
 		"responseTime": getDatetimeFromstring("2008-10-29 14:56:59"),
 	})

+ 2 - 1
extensions/util/pool.go

@@ -19,8 +19,9 @@ import (
 	"strings"
 	"sync"
 
-	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/xo/dburl"
+
+	"github.com/lf-edge/ekuiper/internal/conf"
 )
 
 var GlobalPool *dbPool

+ 1 - 0
internal/binder/function/binder.go

@@ -17,6 +17,7 @@ package function
 import (
 	"errors"
 	"fmt"
+
 	"github.com/lf-edge/ekuiper/internal/binder"
 	"github.com/lf-edge/ekuiper/internal/plugin"
 	"github.com/lf-edge/ekuiper/pkg/api"

+ 3 - 2
internal/binder/function/binder_test.go

@@ -17,10 +17,11 @@ package function
 import (
 	"errors"
 	"fmt"
+	"testing"
+
 	"github.com/lf-edge/ekuiper/internal/binder"
 	"github.com/lf-edge/ekuiper/internal/binder/mock"
 	"github.com/lf-edge/ekuiper/pkg/errorx"
-	"testing"
 )
 
 func TestBinding(t *testing.T) {
@@ -35,7 +36,7 @@ func TestBinding(t *testing.T) {
 		t.Error(err)
 		return
 	}
-	var tests = []struct {
+	tests := []struct {
 		name      string
 		isFunc    bool
 		isFuncset bool

+ 6 - 1
internal/binder/function/funcs_agg.go

@@ -17,10 +17,11 @@ package function
 import (
 	"fmt"
 
+	"github.com/montanaflynn/stats"
+
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
-	"github.com/montanaflynn/stats"
 )
 
 func registerAggFunc() {
@@ -416,6 +417,7 @@ func sliceFloatTotal(s []interface{}) (float64, error) {
 	}
 	return total, nil
 }
+
 func sliceIntMax(s []interface{}, max int64) (int64, error) {
 	for _, v := range s {
 		vi, err := cast.ToInt64(v, cast.CONVERT_SAMEKIND)
@@ -429,6 +431,7 @@ func sliceIntMax(s []interface{}, max int64) (int64, error) {
 	}
 	return max, nil
 }
+
 func sliceFloatMax(s []interface{}, max float64) (float64, error) {
 	for _, v := range s {
 		if vf, ok := v.(float64); ok {
@@ -454,6 +457,7 @@ func sliceStringMax(s []interface{}, max string) (string, error) {
 	}
 	return max, nil
 }
+
 func sliceIntMin(s []interface{}, min int64) (int64, error) {
 	for _, v := range s {
 		vi, err := cast.ToInt64(v, cast.CONVERT_SAMEKIND)
@@ -467,6 +471,7 @@ func sliceIntMin(s []interface{}, min int64) (int64, error) {
 	}
 	return min, nil
 }
+
 func sliceFloatMin(s []interface{}, min float64) (float64, error) {
 	for _, v := range s {
 		if vf, ok := v.(float64); ok {

+ 7 - 5
internal/binder/function/funcs_agg_test.go

@@ -58,7 +58,7 @@ func TestAggExec(t *testing.T) {
 	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
 	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
 	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 2)
-	var tests = []struct {
+	tests := []struct {
 		args    []interface{}
 		avg     interface{}
 		max     interface{}
@@ -184,7 +184,7 @@ func TestPercentileExec(t *testing.T) {
 	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
 	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
 	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 2)
-	var tests = []struct {
+	tests := []struct {
 		args  []interface{}
 		pCont interface{}
 		pDisc interface{}
@@ -241,9 +241,11 @@ func TestPercentileExec(t *testing.T) {
 			},
 			pCont: float64(125),
 			pDisc: float64(150),
-		}, { //5
-			args: []interface{}{[]interface{}{},
-				[]interface{}{}},
+		}, { // 5
+			args: []interface{}{
+				[]interface{}{},
+				[]interface{}{},
+			},
 			pCont: nil,
 			pDisc: nil,
 		},

+ 3 - 2
internal/binder/function/funcs_analytic.go

@@ -16,11 +16,12 @@ package function
 
 import (
 	"fmt"
+	"reflect"
+	"strconv"
+
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
-	"reflect"
-	"strconv"
 )
 
 // registerAnalyticFunc registers the analytic functions

+ 21 - 20
internal/binder/function/funcs_analytic_test.go

@@ -16,13 +16,14 @@ package function
 
 import (
 	"fmt"
+	"reflect"
+	"testing"
+
 	"github.com/lf-edge/ekuiper/internal/conf"
 	kctx "github.com/lf-edge/ekuiper/internal/topo/context"
 	"github.com/lf-edge/ekuiper/internal/topo/state"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/ast"
-	"reflect"
-	"testing"
 )
 
 func TestChangedColValidation(t *testing.T) {
@@ -30,7 +31,7 @@ func TestChangedColValidation(t *testing.T) {
 	if !ok {
 		t.Fatal("builtin not found")
 	}
-	var tests = []struct {
+	tests := []struct {
 		args []ast.Expr
 		err  error
 	}{
@@ -76,7 +77,7 @@ func TestChangedColExec(t *testing.T) {
 	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
 	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
 	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 2)
-	var tests = []struct {
+	tests := []struct {
 		args   []interface{}
 		result interface{}
 	}{
@@ -147,7 +148,7 @@ func TestChangedColPartition(t *testing.T) {
 	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
 	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
 	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 2)
-	var tests = []struct {
+	tests := []struct {
 		args   []interface{}
 		result interface{}
 	}{
@@ -218,7 +219,7 @@ func TestChangedColPartitionWithWhen(t *testing.T) {
 	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
 	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
 	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 2)
-	var tests = []struct {
+	tests := []struct {
 		args   []interface{}
 		result interface{}
 	}{
@@ -301,7 +302,7 @@ func TestHadChangedValidation(t *testing.T) {
 	if !ok {
 		t.Fatal("builtin not found")
 	}
-	var tests = []struct {
+	tests := []struct {
 		args []ast.Expr
 		err  error
 	}{
@@ -354,7 +355,7 @@ func TestHadChangedExec(t *testing.T) {
 	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
 	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
 	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 1)
-	var tests = []struct {
+	tests := []struct {
 		args   []interface{}
 		result interface{}
 	}{
@@ -457,7 +458,7 @@ func TestHadChangedExecAllowNull(t *testing.T) {
 	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
 	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
 	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 1)
-	var tests = []struct {
+	tests := []struct {
 		args   []interface{}
 		result interface{}
 	}{
@@ -569,7 +570,7 @@ func TestHadChangedPartition(t *testing.T) {
 	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
 	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
 	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 1)
-	var tests = []struct {
+	tests := []struct {
 		args   []interface{}
 		result interface{}
 	}{
@@ -672,7 +673,7 @@ func TestHadChangedPartitionWithWhen(t *testing.T) {
 	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
 	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
 	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 1)
-	var tests = []struct {
+	tests := []struct {
 		args   []interface{}
 		result interface{}
 	}{
@@ -784,7 +785,7 @@ func TestLagExec(t *testing.T) {
 	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
 	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
 	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 2)
-	var tests = []struct {
+	tests := []struct {
 		args   []interface{}
 		result interface{}
 	}{
@@ -846,7 +847,7 @@ func TestLagPartition(t *testing.T) {
 	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
 	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
 	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 2)
-	var tests = []struct {
+	tests := []struct {
 		args   []interface{}
 		result interface{}
 	}{
@@ -908,7 +909,7 @@ func TestLagExecWithWhen(t *testing.T) {
 	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
 	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
 	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 2)
-	var tests = []struct {
+	tests := []struct {
 		args   []interface{}
 		result interface{}
 	}{
@@ -970,7 +971,7 @@ func TestLagPartitionWithWhen(t *testing.T) {
 	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
 	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
 	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 2)
-	var tests = []struct {
+	tests := []struct {
 		args   []interface{}
 		result interface{}
 	}{
@@ -1032,7 +1033,7 @@ func TestLagExecIndexWithDefaultValue(t *testing.T) {
 	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
 	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
 	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 2)
-	var tests = []struct {
+	tests := []struct {
 		args   []interface{}
 		result interface{}
 	}{
@@ -1104,7 +1105,7 @@ func TestLagExecIndex(t *testing.T) {
 	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
 	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
 	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 2)
-	var tests = []struct {
+	tests := []struct {
 		args   []interface{}
 		result interface{}
 	}{
@@ -1171,7 +1172,7 @@ func TestLatestExec(t *testing.T) {
 	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
 	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
 	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 2)
-	var tests = []struct {
+	tests := []struct {
 		args   []interface{}
 		result interface{}
 	}{
@@ -1233,7 +1234,7 @@ func TestLatestExecWithWhen(t *testing.T) {
 	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
 	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
 	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 2)
-	var tests = []struct {
+	tests := []struct {
 		args   []interface{}
 		result interface{}
 	}{
@@ -1295,7 +1296,7 @@ func TestLatestPartition(t *testing.T) {
 	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
 	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
 	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 2)
-	var tests = []struct {
+	tests := []struct {
 		args   []interface{}
 		result interface{}
 	}{

+ 2 - 1
internal/binder/function/funcs_cols.go

@@ -16,9 +16,10 @@ package function
 
 import (
 	"fmt"
+	"reflect"
+
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/ast"
-	"reflect"
 )
 
 type ResultCols map[string]interface{}

+ 6 - 5
internal/binder/function/funcs_cols_test.go

@@ -16,13 +16,14 @@ package function
 
 import (
 	"fmt"
+	"reflect"
+	"testing"
+
 	"github.com/lf-edge/ekuiper/internal/conf"
 	kctx "github.com/lf-edge/ekuiper/internal/topo/context"
 	"github.com/lf-edge/ekuiper/internal/topo/state"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/ast"
-	"reflect"
-	"testing"
 )
 
 func TestValidation(t *testing.T) {
@@ -30,7 +31,7 @@ func TestValidation(t *testing.T) {
 	if !ok {
 		t.Fatal("builtin not found")
 	}
-	var tests = []struct {
+	tests := []struct {
 		args []ast.Expr
 		err  error
 	}{
@@ -90,7 +91,7 @@ func TestExec(t *testing.T) {
 	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
 	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 1)
 	var nilResult ResultCols
-	var tests = []struct {
+	tests := []struct {
 		args   []interface{}
 		result interface{}
 	}{
@@ -201,7 +202,7 @@ func TestExecIgnoreNull(t *testing.T) {
 	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
 	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 1)
 	var nilResult ResultCols
-	var tests = []struct {
+	tests := []struct {
 		args   []interface{}
 		result interface{}
 	}{

+ 3 - 2
internal/binder/function/funcs_math.go

@@ -16,11 +16,12 @@ package function
 
 import (
 	"fmt"
+	"math"
+	"math/rand"
+
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
-	"math"
-	"math/rand"
 )
 
 func registerMathFunc() {

+ 5 - 4
internal/binder/function/funcs_math_test.go

@@ -16,13 +16,14 @@ package function
 
 import (
 	"fmt"
+	"math"
+	"reflect"
+	"testing"
+
 	"github.com/lf-edge/ekuiper/internal/conf"
 	kctx "github.com/lf-edge/ekuiper/internal/topo/context"
 	"github.com/lf-edge/ekuiper/internal/topo/state"
 	"github.com/lf-edge/ekuiper/pkg/api"
-	"math"
-	"reflect"
-	"testing"
 )
 
 func TestFuncMath(t *testing.T) {
@@ -126,7 +127,7 @@ func TestFuncMath(t *testing.T) {
 	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
 	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
 	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 2)
-	var tests = []struct {
+	tests := []struct {
 		args []interface{}
 		res  []interface{}
 	}{

+ 7 - 6
internal/binder/function/funcs_misc.go

@@ -22,17 +22,19 @@ import (
 	b64 "encoding/base64"
 	"encoding/json"
 	"fmt"
+	"io"
+	"math"
+	"reflect"
+	"strings"
+	"time"
+
 	"github.com/google/uuid"
+
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/keyedstate"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
-	"io"
-	"math"
-	"reflect"
-	"strings"
-	"time"
 )
 
 func registerMiscFunc() {
@@ -578,7 +580,6 @@ func registerMiscFunc() {
 			return nil
 		},
 	}
-
 }
 
 func round(num float64) int {

+ 11 - 9
internal/binder/function/funcs_misc_test.go

@@ -16,6 +16,9 @@ package function
 
 import (
 	"fmt"
+	"reflect"
+	"testing"
+
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/keyedstate"
 	"github.com/lf-edge/ekuiper/internal/testx"
@@ -23,8 +26,6 @@ import (
 	"github.com/lf-edge/ekuiper/internal/topo/state"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/ast"
-	"reflect"
-	"testing"
 )
 
 func init() {
@@ -40,7 +41,7 @@ func TestToMap(t *testing.T) {
 	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
 	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
 	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 2)
-	var tests = []struct {
+	tests := []struct {
 		args   []interface{}
 		result interface{}
 	}{
@@ -78,6 +79,7 @@ func TestToMap(t *testing.T) {
 		}
 	}
 }
+
 func TestCoalesceExec(t *testing.T) {
 	f, ok := builtins["coalesce"]
 	if !ok {
@@ -87,7 +89,7 @@ func TestCoalesceExec(t *testing.T) {
 	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
 	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
 	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 2)
-	var tests = []struct {
+	tests := []struct {
 		args   []interface{}
 		result interface{}
 	}{
@@ -149,7 +151,7 @@ func TestToJson(t *testing.T) {
 	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
 	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
 	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 2)
-	var tests = []struct {
+	tests := []struct {
 		args   []interface{}
 		result interface{}
 	}{
@@ -190,7 +192,7 @@ func TestFromJson(t *testing.T) {
 	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
 	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
 	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 2)
-	var tests = []struct {
+	tests := []struct {
 		args   []interface{}
 		result interface{}
 	}{
@@ -260,7 +262,7 @@ func TestDelay(t *testing.T) {
 		t.Fatal("expect no error")
 	}
 
-	var tests = []struct {
+	tests := []struct {
 		args   []interface{}
 		result interface{}
 	}{
@@ -291,7 +293,7 @@ func TestKeyedStateValidation(t *testing.T) {
 	if !ok {
 		t.Fatal("builtin not found")
 	}
-	var tests = []struct {
+	tests := []struct {
 		args []ast.Expr
 		err  error
 	}{
@@ -341,7 +343,7 @@ func TestKeyedStateExec(t *testing.T) {
 	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
 	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
 	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 1)
-	var tests = []struct {
+	tests := []struct {
 		args   []interface{}
 		result interface{}
 	}{

+ 1 - 1
internal/binder/function/funcs_srf_test.go

@@ -33,7 +33,7 @@ func TestUnnestFunctions(t *testing.T) {
 	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
 	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
 	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 2)
-	var tests = []struct {
+	tests := []struct {
 		args   []interface{}
 		result interface{}
 	}{

+ 1 - 0
internal/binder/function/funcs_stateful.go

@@ -16,6 +16,7 @@ package function
 
 import (
 	"fmt"
+
 	"github.com/lf-edge/ekuiper/internal/compressor"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/ast"

+ 5 - 4
internal/binder/function/funcs_stateful_test.go

@@ -16,12 +16,13 @@ package function
 
 import (
 	"fmt"
+	"reflect"
+	"testing"
+
 	"github.com/lf-edge/ekuiper/internal/conf"
 	kctx "github.com/lf-edge/ekuiper/internal/topo/context"
 	"github.com/lf-edge/ekuiper/internal/topo/state"
 	"github.com/lf-edge/ekuiper/pkg/api"
-	"reflect"
-	"testing"
 )
 
 func TestCompressExec(t *testing.T) {
@@ -34,7 +35,7 @@ func TestCompressExec(t *testing.T) {
 	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
 	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
 	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 2)
-	var tests = []struct {
+	tests := []struct {
 		args   []interface{}
 		result interface{}
 	}{
@@ -88,7 +89,7 @@ func TestDecompressExec(t *testing.T) {
 	ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
 	tempStore, _ := state.CreateStore("mockRule0", api.AtMostOnce)
 	fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), 2)
-	var tests = []struct {
+	tests := []struct {
 		args   []interface{}
 		result interface{}
 	}{

+ 4 - 3
internal/binder/function/funcs_str.go

@@ -17,13 +17,14 @@ package function
 import (
 	"bytes"
 	"fmt"
-	"github.com/lf-edge/ekuiper/pkg/api"
-	"github.com/lf-edge/ekuiper/pkg/ast"
-	"github.com/lf-edge/ekuiper/pkg/cast"
 	"regexp"
 	"strings"
 	"unicode"
 	"unicode/utf8"
+
+	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/ast"
+	"github.com/lf-edge/ekuiper/pkg/cast"
 )
 
 func registerStrFunc() {

+ 6 - 3
internal/binder/function/function.go

@@ -15,14 +15,17 @@
 package function
 
 import (
+	"strings"
+
 	"github.com/lf-edge/ekuiper/internal/plugin"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/ast"
-	"strings"
 )
 
-type funcExe func(ctx api.FunctionContext, args []interface{}) (interface{}, bool)
-type funcVal func(ctx api.FunctionContext, args []ast.Expr) error
+type (
+	funcExe func(ctx api.FunctionContext, args []interface{}) (interface{}, bool)
+	funcVal func(ctx api.FunctionContext, args []ast.Expr) error
+)
 
 type builtinFunc struct {
 	fType ast.FuncType

+ 1 - 1
internal/binder/function/function_test.go

@@ -20,7 +20,7 @@ import (
 )
 
 func TestManager(t *testing.T) {
-	var tests = []struct {
+	tests := []struct {
 		name  string
 		found bool
 	}{

+ 2 - 3
internal/binder/function/static_executor.go

@@ -16,6 +16,7 @@ package function
 
 import (
 	"fmt"
+
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 )
@@ -69,6 +70,4 @@ func (f *funcExecutor) GetFuncType(name string) ast.FuncType {
 	return fs.fType
 }
 
-var (
-	staticFuncExecutor = &funcExecutor{}
-)
+var staticFuncExecutor = &funcExecutor{}

+ 1 - 0
internal/binder/function/validator_funcs.go

@@ -16,6 +16,7 @@ package function
 
 import (
 	"fmt"
+
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 )

+ 1 - 0
internal/binder/io/binder.go

@@ -17,6 +17,7 @@ package io
 import (
 	"errors"
 	"fmt"
+
 	"github.com/lf-edge/ekuiper/internal/binder"
 	"github.com/lf-edge/ekuiper/internal/plugin"
 	"github.com/lf-edge/ekuiper/pkg/api"

+ 3 - 2
internal/binder/io/binder_test.go

@@ -17,10 +17,11 @@ package io
 import (
 	"errors"
 	"fmt"
+	"testing"
+
 	"github.com/lf-edge/ekuiper/internal/binder"
 	"github.com/lf-edge/ekuiper/internal/binder/mock"
 	"github.com/lf-edge/ekuiper/pkg/errorx"
-	"testing"
 )
 
 func TestBindings(t *testing.T) {
@@ -34,7 +35,7 @@ func TestBindings(t *testing.T) {
 		t.Error(err)
 		return
 	}
-	var tests = []struct {
+	tests := []struct {
 		name           string
 		isSource       bool
 		isLookupSource bool

+ 5 - 3
internal/binder/io/builtin.go

@@ -25,9 +25,11 @@ import (
 	"github.com/lf-edge/ekuiper/pkg/api"
 )
 
-type NewSourceFunc func() api.Source
-type NewLookupSourceFunc func() api.LookupSource
-type NewSinkFunc func() api.Sink
+type (
+	NewSourceFunc       func() api.Source
+	NewLookupSourceFunc func() api.LookupSource
+	NewSinkFunc         func() api.Sink
+)
 
 var (
 	sources = map[string]NewSourceFunc{

+ 1 - 0
internal/compressor/compressor.go

@@ -16,6 +16,7 @@ package compressor
 
 import (
 	"fmt"
+
 	"github.com/lf-edge/ekuiper/pkg/message"
 )
 

+ 1 - 0
internal/compressor/decompressor.go

@@ -16,6 +16,7 @@ package compressor
 
 import (
 	"fmt"
+
 	"github.com/lf-edge/ekuiper/pkg/message"
 )
 

+ 3 - 1
internal/compressor/flate.go

@@ -17,9 +17,11 @@ package compressor
 import (
 	"bytes"
 	"fmt"
+	"io"
+
 	"github.com/klauspost/compress/flate"
+
 	"github.com/lf-edge/ekuiper/internal/conf"
-	"io"
 )
 
 func newFlateCompressor() (*flateCompressor, error) {

+ 3 - 1
internal/compressor/gzip.go

@@ -17,9 +17,11 @@ package compressor
 import (
 	"bytes"
 	"fmt"
+	"io"
+
 	"github.com/klauspost/compress/gzip"
+
 	"github.com/lf-edge/ekuiper/internal/conf"
-	"io"
 )
 
 func newGzipCompressor() (*gzipCompressor, error) {

+ 3 - 1
internal/compressor/zlib.go

@@ -17,9 +17,11 @@ package compressor
 import (
 	"bytes"
 	"fmt"
+	"io"
+
 	"github.com/klauspost/compress/zlib"
+
 	"github.com/lf-edge/ekuiper/internal/conf"
-	"io"
 )
 
 func newZlibCompressor() (*zlibCompressor, error) {

+ 1 - 0
internal/compressor/zstd.go

@@ -16,6 +16,7 @@ package compressor
 
 import (
 	"bytes"
+
 	"github.com/klauspost/compress/zstd"
 )
 

+ 6 - 4
internal/conf/conf.go

@@ -17,13 +17,15 @@ package conf
 import (
 	"errors"
 	"fmt"
-	"github.com/lestrrat-go/file-rotatelogs"
-	"github.com/lf-edge/ekuiper/pkg/api"
-	"github.com/sirupsen/logrus"
 	"io"
 	"os"
 	"path"
 	"time"
+
+	"github.com/lestrrat-go/file-rotatelogs"
+	"github.com/sirupsen/logrus"
+
+	"github.com/lf-edge/ekuiper/pkg/api"
 )
 
 const ConfFileName = "kuiper.yaml"
@@ -167,7 +169,7 @@ func InitConf() {
 			LateTol:            1000,
 			Concurrency:        1,
 			BufferLength:       1024,
-			CheckpointInterval: 300000, //5 minutes
+			CheckpointInterval: 300000, // 5 minutes
 			SendError:          true,
 			Restart: &api.RestartStrategy{
 				Attempts:     0,

+ 12 - 11
internal/conf/conf_test.go

@@ -15,13 +15,14 @@ package conf
 
 import (
 	"fmt"
-	"github.com/lf-edge/ekuiper/pkg/api"
 	"reflect"
 	"testing"
+
+	"github.com/lf-edge/ekuiper/pkg/api"
 )
 
 func TestSourceConfValidate(t *testing.T) {
-	var tests = []struct {
+	tests := []struct {
 		s   *SourceConf
 		e   *SourceConf
 		err string
@@ -82,7 +83,7 @@ func TestSourceConfValidate(t *testing.T) {
 }
 
 func TestRuleOptionValidate(t *testing.T) {
-	var tests = []struct {
+	tests := []struct {
 		s   *api.RuleOption
 		e   *api.RuleOption
 		err string
@@ -96,7 +97,7 @@ func TestRuleOptionValidate(t *testing.T) {
 				LateTol:            1000,
 				Concurrency:        1,
 				BufferLength:       1024,
-				CheckpointInterval: 300000, //5 minutes
+				CheckpointInterval: 300000, // 5 minutes
 				SendError:          true,
 				Restart: &api.RestartStrategy{
 					Attempts:     0,
@@ -110,7 +111,7 @@ func TestRuleOptionValidate(t *testing.T) {
 				LateTol:            1000,
 				Concurrency:        1,
 				BufferLength:       1024,
-				CheckpointInterval: 300000, //5 minutes
+				CheckpointInterval: 300000, // 5 minutes
 				SendError:          true,
 				Restart: &api.RestartStrategy{
 					Attempts:     0,
@@ -126,7 +127,7 @@ func TestRuleOptionValidate(t *testing.T) {
 				LateTol:            1000,
 				Concurrency:        1,
 				BufferLength:       1024,
-				CheckpointInterval: 300000, //5 minutes
+				CheckpointInterval: 300000, // 5 minutes
 				SendError:          true,
 				Restart: &api.RestartStrategy{
 					Attempts:     3,
@@ -140,7 +141,7 @@ func TestRuleOptionValidate(t *testing.T) {
 				LateTol:            1000,
 				Concurrency:        1,
 				BufferLength:       1024,
-				CheckpointInterval: 300000, //5 minutes
+				CheckpointInterval: 300000, // 5 minutes
 				SendError:          true,
 				Restart: &api.RestartStrategy{
 					Attempts:     3,
@@ -156,7 +157,7 @@ func TestRuleOptionValidate(t *testing.T) {
 				LateTol:            1000,
 				Concurrency:        1,
 				BufferLength:       1024,
-				CheckpointInterval: 300000, //5 minutes
+				CheckpointInterval: 300000, // 5 minutes
 				SendError:          true,
 				Restart: &api.RestartStrategy{
 					Attempts:     3,
@@ -170,7 +171,7 @@ func TestRuleOptionValidate(t *testing.T) {
 				LateTol:            1000,
 				Concurrency:        1,
 				BufferLength:       1024,
-				CheckpointInterval: 300000, //5 minutes
+				CheckpointInterval: 300000, // 5 minutes
 				SendError:          true,
 				Restart: &api.RestartStrategy{
 					Attempts:     3,
@@ -186,7 +187,7 @@ func TestRuleOptionValidate(t *testing.T) {
 				LateTol:            1000,
 				Concurrency:        1,
 				BufferLength:       1024,
-				CheckpointInterval: 300000, //5 minutes
+				CheckpointInterval: 300000, // 5 minutes
 				SendError:          true,
 				Restart: &api.RestartStrategy{
 					Attempts:     -2,
@@ -200,7 +201,7 @@ func TestRuleOptionValidate(t *testing.T) {
 				LateTol:            1000,
 				Concurrency:        1,
 				BufferLength:       1024,
-				CheckpointInterval: 300000, //5 minutes
+				CheckpointInterval: 300000, // 5 minutes
 				SendError:          true,
 				Restart: &api.RestartStrategy{
 					Attempts:     0,

+ 0 - 1
internal/conf/connect_selector.go

@@ -22,7 +22,6 @@ func (c *ConSelector) Init() error {
 }
 
 func (c *ConSelector) ReadCfgFromYaml() (props map[string]interface{}, err error) {
-
 	yamlOps, err := NewConfigOperatorFromConnectionYaml(c.Type)
 	if err != nil {
 		return nil, err

+ 3 - 1
internal/conf/jsonpath_eval.go

@@ -18,10 +18,12 @@ import (
 	"context"
 	"encoding/json"
 	"fmt"
+	"reflect"
+
 	"github.com/PaesslerAG/gval"
 	"github.com/PaesslerAG/jsonpath"
+
 	"github.com/lf-edge/ekuiper/pkg/cast"
-	"reflect"
 )
 
 var builder = gval.Full(jsonpath.PlaceholderExtension())

+ 0 - 1
internal/conf/load_test.go

@@ -22,7 +22,6 @@ import (
 )
 
 func TestEnv(t *testing.T) {
-
 	key := "KUIPER__BASIC__CONSOLELOG"
 	value := "true"
 

+ 3 - 2
internal/conf/logger.go

@@ -15,10 +15,11 @@
 package conf
 
 import (
-	filename "github.com/keepeye/logrus-filename"
-	"github.com/sirupsen/logrus"
 	"os"
 	"strings"
+
+	filename "github.com/keepeye/logrus-filename"
+	"github.com/sirupsen/logrus"
 )
 
 const (

+ 12 - 10
internal/conf/path.go

@@ -31,13 +31,15 @@ const (
 	KuiperSyslogKey = "KuiperSyslogKey"
 )
 
-var LoadFileType = "relative"
-var AbsoluteMapping = map[string]string{
-	etcDir:     "/etc/kuiper",
-	dataDir:    "/var/lib/kuiper/data",
-	logDir:     "/var/log/kuiper",
-	pluginsDir: "/var/lib/kuiper/plugins",
-}
+var (
+	LoadFileType    = "relative"
+	AbsoluteMapping = map[string]string{
+		etcDir:     "/etc/kuiper",
+		dataDir:    "/var/lib/kuiper/data",
+		logDir:     "/var/log/kuiper",
+		pluginsDir: "/var/lib/kuiper/plugins",
+	}
+)
 
 func GetConfLoc() (string, error) {
 	return GetLoc(etcDir)
@@ -51,7 +53,7 @@ func GetDataLoc() (string, error) {
 		}
 		d := path.Join(dataDir, "test")
 		if _, err := os.Stat(d); os.IsNotExist(err) {
-			err = os.MkdirAll(d, 0755)
+			err = os.MkdirAll(d, 0o755)
 			if err != nil {
 				return "", err
 			}
@@ -113,11 +115,11 @@ func relativePath(subdir string) (dir string, err error) {
 				lastdir = dir
 				continue
 			}
-			//Log.Printf("Trying to load file from %s", confDir)
+			// Log.Printf("Trying to load file from %s", confDir)
 			return confDir, nil
 		}
 	} else {
-		//Log.Printf("Trying to load file from %s", confDir)
+		// Log.Printf("Trying to load file from %s", confDir)
 		return confDir, nil
 	}
 

+ 1 - 1
internal/conf/path_test.go

@@ -20,7 +20,7 @@ import (
 )
 
 func TestAbsolutePath(t *testing.T) {
-	var tests = []struct {
+	tests := []struct {
 		r string
 		a string
 	}{

+ 2 - 1
internal/conf/redis_store_config.go

@@ -2,6 +2,7 @@ package conf
 
 import (
 	"fmt"
+
 	"github.com/lf-edge/ekuiper/pkg/cast"
 )
 
@@ -28,7 +29,7 @@ func RedisStorageConSelectorApply(connectionSelector string, conf *KuiperConf) e
 		return err
 	}
 
-	//this should be edgeX redis config
+	// this should be edgeX redis config
 	kvs, err := sel.ReadCfgFromYaml()
 	if err != nil {
 		return err

+ 0 - 1
internal/conf/redis_store_config_test.go

@@ -50,7 +50,6 @@ func TestRedisStorageConSelectorApply(t *testing.T) {
 }
 
 func TestRedisStorageConSelector(t *testing.T) {
-
 	envs := map[string]string{
 		"KUIPER__STORE__TYPE":                                "redis",
 		"KUIPER__STORE__REDIS__CONNECTIONSELECTOR":           "edgex.redisMsgBus",

+ 2 - 1
internal/conf/syslog.go

@@ -18,9 +18,10 @@
 package conf
 
 import (
-	logrus_syslog "github.com/sirupsen/logrus/hooks/syslog"
 	"log/syslog"
 	"os"
+
+	logrus_syslog "github.com/sirupsen/logrus/hooks/syslog"
 )
 
 func initSyslog() {

+ 3 - 5
internal/conf/time.go

@@ -15,14 +15,13 @@
 package conf
 
 import (
-	"github.com/benbjohnson/clock"
 	"time"
-)
 
-var (
-	Clock clock.Clock
+	"github.com/benbjohnson/clock"
 )
 
+var Clock clock.Clock
+
 func InitClock() {
 	if IsTesting {
 		Log.Debugf("running in testing mode")
@@ -39,7 +38,6 @@ func GetLocalZone() int {
 		_, offset := time.Now().Local().Zone()
 		return offset
 	}
-
 }
 
 // Time related. For Mock

+ 4 - 3
internal/conf/yaml_config_ops.go

@@ -17,11 +17,12 @@ package conf
 import (
 	"encoding/json"
 	"fmt"
-	"github.com/lf-edge/ekuiper/internal/pkg/filex"
-	"github.com/lf-edge/ekuiper/pkg/cast"
 	"path"
 	"reflect"
 	"sync"
+
+	"github.com/lf-edge/ekuiper/internal/pkg/filex"
+	"github.com/lf-edge/ekuiper/pkg/cast"
 )
 
 // ConfKeysOperator define interface to query/add/update/delete the configs in memory
@@ -104,7 +105,7 @@ func (c *ConfigKeys) CopyConfContent() map[string]map[string]interface{} {
 		cf[key] = aux
 	}
 
-	//note: config keys in data directory will overwrite those in etc directory with same name
+	// note: config keys in data directory will overwrite those in etc directory with same name
 	for key, kvs := range c.dataCfg {
 		aux := make(map[string]interface{})
 		for k, v := range kvs {

+ 4 - 9
internal/conf/yaml_config_ops_test.go

@@ -17,11 +17,12 @@ package conf
 import (
 	"encoding/json"
 	"fmt"
-	"gopkg.in/yaml.v3"
 	"os"
 	"reflect"
 	"sort"
 	"testing"
+
+	"gopkg.in/yaml.v3"
 )
 
 func TestConfigKeys_LoadSourceFile(t *testing.T) {
@@ -43,11 +44,9 @@ func TestConfigKeys_LoadConnectionEdgex(t *testing.T) {
 	if err != nil {
 		t.Error(err)
 	}
-
 }
 
 func TestConfigKeys_Ops(t *testing.T) {
-
 	httpCfg, err := NewConfigOperatorFromSourceYaml("httppull")
 	if err != nil {
 		t.Error(err)
@@ -79,7 +78,6 @@ func TestConfigKeys_Ops(t *testing.T) {
 	if err := isDelData(delData, httpCfg.CopyConfContent()[`new`]); nil != err {
 		t.Error(err)
 	}
-
 }
 
 func TestConfigKeys_GetPluginName(t *testing.T) {
@@ -130,12 +128,11 @@ func TestConfigKeys_CopyReadOnlyConfContent(t *testing.T) {
 
 func TestConfigKeys_GetConfKeys(t *testing.T) {
 	mqttCfg, err := NewConfigOperatorFromSourceYaml("mqtt")
-
 	if err != nil {
 		t.Error(err)
 	}
 	keys := mqttCfg.GetConfKeys()
-	//currently only etcCfg, no dataCfg
+	// currently only etcCfg, no dataCfg
 	source := []string{"default", "demo_conf"}
 	if keys == nil {
 		t.Errorf("Not Equal")
@@ -150,7 +147,6 @@ func TestConfigKeys_GetConfKeys(t *testing.T) {
 			t.Errorf("Not equal, got %v, want %v", key, source[i])
 		}
 	}
-
 }
 
 func TestConfigKeys_GetReadOnlyConfKeys(t *testing.T) {
@@ -173,7 +169,6 @@ func TestConfigKeys_GetReadOnlyConfKeys(t *testing.T) {
 			t.Errorf("Not equal, got %v, want %v", key, source[i])
 		}
 	}
-
 }
 
 func TestConfigKeys_GetUpdatableConfKeys(t *testing.T) {
@@ -239,7 +234,6 @@ func TestConfigKeys_AddConfKeyField(t *testing.T) {
 	if err != nil {
 		t.Error(err)
 	}
-
 }
 
 func TestSourceConfigKeysOps_SaveCfgToFile(t *testing.T) {
@@ -339,6 +333,7 @@ func isDelData(js string, cf map[string]interface{}) error {
 	}
 	return nil
 }
+
 func isAddData(js string, cf map[string]interface{}) error {
 	var addNode map[string]interface{}
 	if err := json.Unmarshal([]byte(js), &addNode); nil != err {

+ 14 - 14
internal/converter/converter.go

@@ -16,31 +16,31 @@ package converter
 
 import (
 	"fmt"
+	"strings"
+
 	"github.com/lf-edge/ekuiper/internal/converter/binary"
 	"github.com/lf-edge/ekuiper/internal/converter/delimited"
 	"github.com/lf-edge/ekuiper/internal/converter/json"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/message"
-	"strings"
 )
 
 // Instantiator The format, schema information are passed in by stream options
 // The columns information is defined in the source side, like file source
 type Instantiator func(schemaFileName string, SchemaMessageName string, delimiter string) (message.Converter, error)
 
-var ( // init once and read only
-	converters = map[string]Instantiator{
-		message.FormatJson: func(_ string, _ string, _ string) (message.Converter, error) {
-			return json.GetConverter()
-		},
-		message.FormatBinary: func(_ string, _ string, _ string) (message.Converter, error) {
-			return binary.GetConverter()
-		},
-		message.FormatDelimited: func(_ string, _ string, delimiter string) (message.Converter, error) {
-			return delimited.NewConverter(delimiter)
-		},
-	}
-)
+// init once and read only
+var converters = map[string]Instantiator{
+	message.FormatJson: func(_ string, _ string, _ string) (message.Converter, error) {
+		return json.GetConverter()
+	},
+	message.FormatBinary: func(_ string, _ string, _ string) (message.Converter, error) {
+		return binary.GetConverter()
+	},
+	message.FormatDelimited: func(_ string, _ string, delimiter string) (message.Converter, error) {
+		return delimited.NewConverter(delimiter)
+	},
+}
 
 func GetOrCreateConverter(options *ast.Options) (message.Converter, error) {
 	t := strings.ToLower(options.FORMAT)

+ 2 - 2
internal/converter/custom/converter.go

@@ -16,6 +16,7 @@ package custom
 
 import (
 	"fmt"
+
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/converter/static"
 	"github.com/lf-edge/ekuiper/internal/pkg/def"
@@ -23,8 +24,7 @@ import (
 	"github.com/lf-edge/ekuiper/pkg/message"
 )
 
-type Converter struct {
-}
+type Converter struct{}
 
 var converter = &Converter{}
 

+ 8 - 6
internal/converter/custom/converter_test.go

@@ -16,14 +16,16 @@ package custom
 
 import (
 	"fmt"
-	"github.com/gdexlab/go-render/render"
-	"github.com/lf-edge/ekuiper/internal/conf"
-	"github.com/lf-edge/ekuiper/internal/schema"
-	"github.com/lf-edge/ekuiper/internal/testx"
 	"os"
 	"path/filepath"
 	"reflect"
 	"testing"
+
+	"github.com/gdexlab/go-render/render"
+
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/schema"
+	"github.com/lf-edge/ekuiper/internal/testx"
 )
 
 func init() {
@@ -47,12 +49,12 @@ func TestCustomConverter(t *testing.T) {
 		}
 	}()
 	// build the so file into data/test prior to running the test
-	//Copy the helloworld.so
+	// Copy the helloworld.so
 	bytesRead, err := os.ReadFile(filepath.Join(dataDir, "myFormat.so"))
 	if err != nil {
 		t.Fatal(err)
 	}
-	err = os.WriteFile(filepath.Join(etcDir, "myFormat.so"), bytesRead, 0755)
+	err = os.WriteFile(filepath.Join(etcDir, "myFormat.so"), bytesRead, 0o755)
 	if err != nil {
 		t.Fatal(err)
 	}

+ 1 - 0
internal/converter/custom/test/myformat.go

@@ -17,6 +17,7 @@ package main
 import (
 	"encoding/json"
 	"fmt"
+
 	"github.com/mitchellh/mapstructure"
 )
 

+ 2 - 1
internal/converter/delimited/converter.go

@@ -16,10 +16,11 @@ package delimited
 
 import (
 	"fmt"
-	"github.com/lf-edge/ekuiper/pkg/message"
 	"sort"
 	"strconv"
 	"strings"
+
+	"github.com/lf-edge/ekuiper/pkg/message"
 )
 
 type Converter struct {

+ 2 - 1
internal/converter/delimited/converter_test.go

@@ -16,9 +16,10 @@ package delimited
 
 import (
 	"fmt"
-	"github.com/lf-edge/ekuiper/internal/testx"
 	"reflect"
 	"testing"
+
+	"github.com/lf-edge/ekuiper/internal/testx"
 )
 
 func TestEncode(t *testing.T) {

+ 2 - 0
internal/converter/protobuf/converter.go

@@ -16,8 +16,10 @@ package protobuf
 
 import (
 	"fmt"
+
 	"github.com/jhump/protoreflect/desc"
 	"github.com/jhump/protoreflect/desc/protoparse"
+
 	"github.com/lf-edge/ekuiper/internal/converter/static"
 	"github.com/lf-edge/ekuiper/pkg/message"
 )

+ 6 - 5
internal/converter/protobuf/converter_test.go

@@ -16,13 +16,14 @@ package protobuf
 
 import (
 	"fmt"
-	"github.com/lf-edge/ekuiper/internal/conf"
-	"github.com/lf-edge/ekuiper/internal/schema"
-	"github.com/lf-edge/ekuiper/internal/testx"
 	"os"
 	"path/filepath"
 	"reflect"
 	"testing"
+
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/schema"
+	"github.com/lf-edge/ekuiper/internal/testx"
 )
 
 func TestEncode(t *testing.T) {
@@ -110,12 +111,12 @@ func TestStatic(t *testing.T) {
 		}
 	}()
 	// build the so file into data/test prior to running the test
-	//Copy the helloworld.so
+	// Copy the helloworld.so
 	bytesRead, err := os.ReadFile(filepath.Join(dataDir, "helloworld.so"))
 	if err != nil {
 		t.Fatal(err)
 	}
-	err = os.WriteFile(filepath.Join(etcDir, "helloworld.so"), bytesRead, 0755)
+	err = os.WriteFile(filepath.Join(etcDir, "helloworld.so"), bytesRead, 0o755)
 	if err != nil {
 		t.Fatal(err)
 	}

+ 1 - 1
internal/converter/protobuf/fieldConverterSingleton.go

@@ -286,7 +286,7 @@ func (fc *FieldConverter) DecodeField(src interface{}, field *desc.FieldDescript
 }
 
 func (fc *FieldConverter) decodeSubMessage(input interface{}, ft *desc.MessageDescriptor, sn cast.Strictness) (interface{}, error) {
-	var m = map[string]interface{}{}
+	m := map[string]interface{}{}
 	switch v := input.(type) {
 	case map[interface{}]interface{}:
 		for k, val := range v {

+ 1 - 0
internal/converter/protobuf/test/helloworld_wrapper.go

@@ -16,6 +16,7 @@ package main
 
 import (
 	"fmt"
+
 	"github.com/golang/protobuf/proto"
 )
 

+ 2 - 1
internal/converter/static/load.go

@@ -16,9 +16,10 @@ package static
 
 import (
 	"fmt"
+	"plugin"
+
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/pkg/message"
-	"plugin"
 )
 
 func LoadStaticConverter(soFile string, messageName string) (message.Converter, error) {

+ 5 - 4
internal/io/edgex/edgex_sink.go

@@ -21,16 +21,18 @@ import (
 	"encoding/base64"
 	"encoding/json"
 	"fmt"
+	"reflect"
+
 	v3 "github.com/edgexfoundry/go-mod-core-contracts/v3/common"
 	"github.com/edgexfoundry/go-mod-core-contracts/v3/dtos"
 	"github.com/edgexfoundry/go-mod-core-contracts/v3/dtos/requests"
+
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/topo/connection/clients"
 	"github.com/lf-edge/ekuiper/internal/topo/transform"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/errorx"
-	"reflect"
 )
 
 type SinkConf struct {
@@ -56,7 +58,6 @@ type EdgexMsgBusSink struct {
 }
 
 func (ems *EdgexMsgBusSink) Configure(ps map[string]interface{}) error {
-
 	c := &SinkConf{
 		MessageType: MessageTypeEvent,
 		ContentType: "application/json",
@@ -144,7 +145,7 @@ func (ems *EdgexMsgBusSink) produceEvents(ctx api.StreamContext, item interface{
 	}
 	m1 := ems.getMeta(m)
 	event := m1.createEvent()
-	//Override the devicename if user specified the value
+	// Override the devicename if user specified the value
 	if event.DeviceName == "" {
 		event.DeviceName = ems.c.DeviceName
 	}
@@ -466,7 +467,7 @@ func (ems *EdgexMsgBusSink) getMeta(result []map[string]interface{}) *meta {
 	if ems.c.Metadata == "" {
 		return newMetaFromMap(nil)
 	}
-	//Try to get the meta field
+	// Try to get the meta field
 	for _, v := range result {
 		if m, ok := v[ems.c.Metadata]; ok {
 			if m1, ok1 := m.(map[string]interface{}); ok1 {

+ 12 - 8
internal/io/edgex/edgex_sink_test.go

@@ -20,15 +20,17 @@ package edgex
 import (
 	"encoding/json"
 	"fmt"
+	"reflect"
+	"testing"
+
 	v3 "github.com/edgexfoundry/go-mod-core-contracts/v3/common"
 	"github.com/edgexfoundry/go-mod-core-contracts/v3/dtos"
+
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/testx"
 	"github.com/lf-edge/ekuiper/internal/topo/context"
 	"github.com/lf-edge/ekuiper/internal/topo/transform"
 	"github.com/lf-edge/ekuiper/pkg/cast"
-	"reflect"
-	"testing"
 )
 
 var (
@@ -70,7 +72,7 @@ func compareReading(expected, actual dtos.BaseReading) bool {
 }
 
 func TestConfigure(t *testing.T) {
-	var tests = []struct {
+	tests := []struct {
 		conf     map[string]interface{}
 		expected *SinkConf
 		error    string
@@ -187,7 +189,7 @@ func TestConfigure(t *testing.T) {
 }
 
 func TestProduceEvents(t1 *testing.T) {
-	var tests = []struct {
+	tests := []struct {
 		input    string
 		conf     map[string]interface{}
 		expected *dtos.Event
@@ -364,7 +366,7 @@ func TestProduceEvents(t1 *testing.T) {
 			error: "",
 		},
 		{ // 5
-			input: `[{"sa":["1","2",3,"4"]}]`, //invalid array, return nil
+			input: `[{"sa":["1","2",3,"4"]}]`, // invalid array, return nil
 			expected: &dtos.Event{
 				DeviceName:  "ekuiper",
 				ProfileName: "ekuiperProfile",
@@ -413,7 +415,8 @@ func TestProduceEvents(t1 *testing.T) {
 				},
 			},
 			error: "",
-		}, { // 7
+		},
+		{ // 7
 			input: `[
 						{"meta":{
 							"correlationid":"","deviceName":"demo","id":"","origin":3,
@@ -447,7 +450,8 @@ func TestProduceEvents(t1 *testing.T) {
 				},
 			},
 			error: "",
-		}, { // 8
+		},
+		{ // 8
 			input: `[
 						{"obj":{"a":1,"b":"sttt"}}
 					]`,
@@ -500,7 +504,7 @@ func TestProduceEvents(t1 *testing.T) {
 }
 
 func TestEdgeXTemplate_Apply(t1 *testing.T) {
-	var tests = []struct {
+	tests := []struct {
 		input    string
 		conf     map[string]interface{}
 		expected *dtos.Event

+ 10 - 8
internal/io/edgex/edgex_source.go

@@ -20,17 +20,19 @@ package edgex
 import (
 	"encoding/json"
 	"fmt"
+	"strconv"
+	"strings"
+
 	v3 "github.com/edgexfoundry/go-mod-core-contracts/v3/common"
 	"github.com/edgexfoundry/go-mod-core-contracts/v3/dtos"
 	"github.com/edgexfoundry/go-mod-core-contracts/v3/dtos/requests"
 	"github.com/edgexfoundry/go-mod-messaging/v3/pkg/types"
 	"github.com/fxamacker/cbor/v2"
+
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/topo/connection/clients"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/cast"
-	"strconv"
-	"strings"
 )
 
 type EdgexSource struct {
@@ -164,10 +166,10 @@ func (es *EdgexSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTup
 						}
 						r_meta := map[string]interface{}{}
 						r_meta["id"] = r.Id
-						//r_meta["created"] = r.Created
-						//r_meta["modified"] = r.Modified
+						// r_meta["created"] = r.Created
+						// r_meta["modified"] = r.Modified
 						r_meta["origin"] = r.Origin
-						//r_meta["pushed"] = r.Pushed
+						// r_meta["pushed"] = r.Pushed
 						r_meta["deviceName"] = r.DeviceName
 						r_meta["profileName"] = r.ProfileName
 						r_meta["valueType"] = r.ValueType
@@ -181,12 +183,12 @@ func (es *EdgexSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTup
 				}
 				if len(result) > 0 {
 					meta["id"] = e.Id
-					//meta["pushed"] = e.Pushed
+					// meta["pushed"] = e.Pushed
 					meta["deviceName"] = e.DeviceName
 					meta["profileName"] = e.ProfileName
 					meta["sourceName"] = e.SourceName
-					//meta["created"] = e.Created
-					//meta["modified"] = e.Modified
+					// meta["created"] = e.Created
+					// meta["modified"] = e.Modified
 					meta["origin"] = e.Origin
 					meta["tags"] = e.Tags
 					meta["correlationid"] = env.CorrelationID

+ 10 - 8
internal/io/edgex/edgex_source_test.go

@@ -20,14 +20,16 @@ package edgex
 import (
 	"encoding/json"
 	"fmt"
+	"math"
+	"reflect"
+	"testing"
+
 	v3 "github.com/edgexfoundry/go-mod-core-contracts/v3/common"
 	"github.com/edgexfoundry/go-mod-core-contracts/v3/dtos"
 	"github.com/edgexfoundry/go-mod-core-contracts/v3/models"
 	"github.com/edgexfoundry/go-mod-messaging/v3/pkg/types"
+
 	"github.com/lf-edge/ekuiper/internal/conf"
-	"math"
-	"reflect"
-	"testing"
 )
 
 var (
@@ -60,7 +62,7 @@ var (
 )
 
 func TestGetValue_IntFloat(t *testing.T) {
-	var testEvent = models.Event{DeviceName: "test"}
+	testEvent := models.Event{DeviceName: "test"}
 	for i := 1; i < 8; i++ {
 		name := fmt.Sprintf("i%d", i)
 		r1 := models.SimpleReading{
@@ -110,7 +112,7 @@ func almostEqual(a, b float64) bool {
 }
 
 func TestGetValue_IntFloatArr(t *testing.T) {
-	var testEvent = models.Event{DeviceName: "test"}
+	testEvent := models.Event{DeviceName: "test"}
 	for i := 1; i < 8; i++ {
 		ia := []int{i, i * 2}
 		jsonValue, _ := json.Marshal(ia)
@@ -183,7 +185,7 @@ func expectOne(t *testing.T, expected interface{}) {
 }
 
 func TestGetValue_Float(t *testing.T) {
-	var testEvent = models.Event{DeviceName: "test"}
+	testEvent := models.Event{DeviceName: "test"}
 	for i := 1; i < 3; i++ {
 		name := fmt.Sprintf("f%d", i)
 		r1 := models.SimpleReading{
@@ -278,8 +280,8 @@ func TestWrongType(t *testing.T) {
 }
 
 func TestWrongValue(t *testing.T) {
-	var testEvent = models.Event{DeviceName: "test"}
-	//100 cannot be converted to a boolean value
+	testEvent := models.Event{DeviceName: "test"}
+	// 100 cannot be converted to a boolean value
 	r1 := models.SimpleReading{
 		BaseReading: models.BaseReading{
 			ResourceName: "b1",

+ 5 - 4
internal/io/file/file_sink.go

@@ -17,15 +17,16 @@ package file
 import (
 	"errors"
 	"fmt"
-	"github.com/lf-edge/ekuiper/internal/conf"
-	"github.com/lf-edge/ekuiper/pkg/api"
-	"github.com/lf-edge/ekuiper/pkg/cast"
-	"github.com/lf-edge/ekuiper/pkg/message"
 	"path/filepath"
 	"sort"
 	"strings"
 	"sync"
 	"time"
+
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/cast"
+	"github.com/lf-edge/ekuiper/pkg/message"
 )
 
 type sinkConf struct {

+ 29 - 18
internal/io/file/file_sink_test.go

@@ -16,18 +16,19 @@ package file
 
 import (
 	"fmt"
-	"github.com/lf-edge/ekuiper/internal/compressor"
-	"github.com/lf-edge/ekuiper/internal/conf"
-	"github.com/lf-edge/ekuiper/internal/topo/context"
-	"github.com/lf-edge/ekuiper/internal/topo/topotest/mockclock"
-	"github.com/lf-edge/ekuiper/internal/topo/transform"
-	"github.com/lf-edge/ekuiper/pkg/message"
 	"os"
 	"path/filepath"
 	"reflect"
 	"strconv"
 	"testing"
 	"time"
+
+	"github.com/lf-edge/ekuiper/internal/compressor"
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/topo/context"
+	"github.com/lf-edge/ekuiper/internal/topo/topotest/mockclock"
+	"github.com/lf-edge/ekuiper/internal/topo/transform"
+	"github.com/lf-edge/ekuiper/pkg/message"
 )
 
 // Unit test for Configure function
@@ -59,9 +60,11 @@ func TestConfigure(t *testing.T) {
 	if err == nil {
 		t.Errorf("Configure() error = %v, wantErr not nil", err)
 	}
-	err = m.Configure(map[string]interface{}{"interval": 500,
+	err = m.Configure(map[string]interface{}{
+		"interval": 500,
 		"path":     "test",
-		"fileType": "csv"})
+		"fileType": "csv",
+	})
 	if err == nil {
 		t.Errorf("Configure() error = %v, wantErr not nil", err)
 	}
@@ -243,12 +246,14 @@ func TestFileSink_Collect(t *testing.T) {
 			ft:      LINES_TYPE,
 			fname:   "test_lines",
 			content: []byte("{\"key\":\"value1\"}\n{\"key\":\"value2\"}"),
-		}, {
+		},
+		{
 			name:    "json",
 			ft:      JSON_TYPE,
 			fname:   "test_json",
 			content: []byte(`[{"key":"value1"},{"key":"value2"}]`),
-		}, {
+		},
+		{
 			name:    "csv",
 			ft:      CSV_TYPE,
 			fname:   "test_csv",
@@ -260,13 +265,15 @@ func TestFileSink_Collect(t *testing.T) {
 			fname:    "test_lines",
 			content:  []byte("{\"key\":\"value1\"}\n{\"key\":\"value2\"}"),
 			compress: GZIP,
-		}, {
+		},
+		{
 			name:     "json",
 			ft:       JSON_TYPE,
 			fname:    "test_json",
 			content:  []byte(`[{"key":"value1"},{"key":"value2"}]`),
 			compress: GZIP,
-		}, {
+		},
+		{
 			name:     "csv",
 			ft:       CSV_TYPE,
 			fname:    "test_csv",
@@ -280,13 +287,15 @@ func TestFileSink_Collect(t *testing.T) {
 			fname:    "test_lines",
 			content:  []byte("{\"key\":\"value1\"}\n{\"key\":\"value2\"}"),
 			compress: ZSTD,
-		}, {
+		},
+		{
 			name:     "json",
 			ft:       JSON_TYPE,
 			fname:    "test_json",
 			content:  []byte(`[{"key":"value1"},{"key":"value2"}]`),
 			compress: ZSTD,
-		}, {
+		},
+		{
 			name:     "csv",
 			ft:       CSV_TYPE,
 			fname:    "test_csv",
@@ -367,7 +376,6 @@ func TestFileSink_Collect(t *testing.T) {
 					t.Errorf("\nexpected\t %q \nbut got\t\t %q", tt.content, string(contents))
 				}
 			}
-
 		})
 	}
 }
@@ -484,7 +492,8 @@ func TestFileSinkRolling_Collect(t *testing.T) {
 				[]byte("{\"key\":\"value0\",\"ts\":460}\n{\"key\":\"value1\",\"ts\":910}\n{\"key\":\"value2\",\"ts\":1360}"),
 				[]byte("{\"key\":\"value3\",\"ts\":1810}\n{\"key\":\"value4\",\"ts\":2260}"),
 			},
-		}, {
+		},
+		{
 			name:  "json",
 			ft:    JSON_TYPE,
 			fname: "test_json.log",
@@ -503,7 +512,8 @@ func TestFileSinkRolling_Collect(t *testing.T) {
 				[]byte("{\"key\":\"value3\",\"ts\":1810}\n{\"key\":\"value4\",\"ts\":2260}"),
 			},
 			compress: GZIP,
-		}, {
+		},
+		{
 			name:  "json",
 			ft:    JSON_TYPE,
 			fname: "test_json_gzip.log",
@@ -523,7 +533,8 @@ func TestFileSinkRolling_Collect(t *testing.T) {
 				[]byte("{\"key\":\"value3\",\"ts\":1810}\n{\"key\":\"value4\",\"ts\":2260}"),
 			},
 			compress: ZSTD,
-		}, {
+		},
+		{
 			name:  "json",
 			ft:    JSON_TYPE,
 			fname: "test_json_zstd.log",

+ 8 - 6
internal/io/file/file_source.go

@@ -20,18 +20,20 @@ import (
 	"encoding/json"
 	"errors"
 	"fmt"
-	"github.com/klauspost/compress/gzip"
-	"github.com/klauspost/compress/zstd"
-	"github.com/lf-edge/ekuiper/internal/conf"
-	"github.com/lf-edge/ekuiper/internal/xsql"
-	"github.com/lf-edge/ekuiper/pkg/api"
-	"github.com/lf-edge/ekuiper/pkg/cast"
 	"io"
 	"os"
 	"path/filepath"
 	"strconv"
 	"strings"
 	"time"
+
+	"github.com/klauspost/compress/gzip"
+	"github.com/klauspost/compress/zstd"
+
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/xsql"
+	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/cast"
 )
 
 type FileSourceConfig struct {

+ 7 - 5
internal/io/file/file_source_test.go

@@ -16,15 +16,17 @@ package file
 
 import (
 	"fmt"
-	"github.com/benbjohnson/clock"
-	"github.com/lf-edge/ekuiper/internal/conf"
-	"github.com/lf-edge/ekuiper/internal/io/mock"
-	"github.com/lf-edge/ekuiper/pkg/api"
 	"io"
 	"os"
 	"path/filepath"
 	"testing"
 	"time"
+
+	"github.com/benbjohnson/clock"
+
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/io/mock"
+	"github.com/lf-edge/ekuiper/pkg/api"
 )
 
 func TestJsonFile(t *testing.T) {
@@ -101,7 +103,7 @@ func TestCSVFolder(t *testing.T) {
 		t.Fatal(err)
 	}
 	testFolder := filepath.Join(path, "test", "csvTemp")
-	err = os.MkdirAll(testFolder, 0755)
+	err = os.MkdirAll(testFolder, 0o755)
 	if err != nil {
 		t.Fatal(err)
 	}

+ 6 - 5
internal/io/file/file_stream_test.go

@@ -15,7 +15,13 @@
 package file
 
 import (
+	"os"
+	"path/filepath"
+	"reflect"
+	"testing"
+
 	"github.com/benbjohnson/clock"
+
 	"github.com/lf-edge/ekuiper/internal/compressor"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/io/mock"
@@ -23,10 +29,6 @@ import (
 	"github.com/lf-edge/ekuiper/internal/topo/transform"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/message"
-	"os"
-	"path/filepath"
-	"reflect"
-	"testing"
 )
 
 func TestFileSinkCompress_Collect(t *testing.T) {
@@ -178,7 +180,6 @@ func TestFileSinkCompress_Collect(t *testing.T) {
 				api.NewDefaultSourceTupleWithTime(map[string]interface{}{"key": "value2"}, meta, mc.Now()),
 			}
 			mock.TestSourceOpen(r, exp, t)
-
 		})
 	}
 }

+ 5 - 4
internal/io/http/client.go

@@ -19,16 +19,17 @@ import (
 	"encoding/hex"
 	"encoding/json"
 	"fmt"
+	"io"
+	"net/http"
+	"strings"
+	"time"
+
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/io/mock"
 	"github.com/lf-edge/ekuiper/internal/pkg/cert"
 	"github.com/lf-edge/ekuiper/internal/pkg/httpx"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/cast"
-	"io"
-	"net/http"
-	"strings"
-	"time"
 )
 
 // ClientConf is the configuration for http client

+ 2 - 1
internal/io/http/client_test.go

@@ -16,9 +16,10 @@ package http
 
 import (
 	"fmt"
-	"github.com/lf-edge/ekuiper/internal/io/mock"
 	"reflect"
 	"testing"
+
+	"github.com/lf-edge/ekuiper/internal/io/mock"
 )
 
 func TestHeaderConf(t *testing.T) {

+ 2 - 3
internal/io/http/httppull_source.go

@@ -17,11 +17,10 @@ package http
 import (
 	"time"
 
-	"github.com/lf-edge/ekuiper/pkg/infra"
-
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/pkg/httpx"
 	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/infra"
 )
 
 type PullSource struct {
@@ -67,7 +66,7 @@ func (hps *PullSource) initTimerPull(ctx api.StreamContext, consumer chan<- api.
 	logger.Infof("Starting HTTP pull source with interval %d", hps.config.Interval)
 	ticker := time.NewTicker(time.Millisecond * time.Duration(hps.config.Interval))
 	defer ticker.Stop()
-	var omd5 = ""
+	omd5 := ""
 	for {
 		select {
 		case <-ticker.C:

+ 3 - 2
internal/io/http/httppush_source.go

@@ -16,14 +16,15 @@ package http
 
 import (
 	"fmt"
+	"net/http"
+	"strings"
+
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/io/http/httpserver"
 	"github.com/lf-edge/ekuiper/internal/io/memory/pubsub"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/infra"
-	"net/http"
-	"strings"
 )
 
 type PushConf struct {

+ 0 - 0
internal/io/http/httpserver/data_server.go


Beberapa file tidak ditampilkan karena terlalu banyak file yang berubah dalam diff ini