Kaynağa Gözat

refactor(project): fix all source code format. (#317)

jinfahua 4 yıl önce
ebeveyn
işleme
5bfd1473f5

+ 3 - 2
common/http_util.go

@@ -12,7 +12,8 @@ import (
 )
 
 var BodyTypeMap = map[string]string{"none": "", "text": "text/plain", "json": "application/json", "html": "text/html", "xml": "application/xml", "javascript": "application/javascript", "form": ""}
-func Send(logger api.Logger, client *http.Client, bodyType string, method string, u string, headers map[string]string, sendSingle bool, v interface{}) (*http.Response, error){
+
+func Send(logger api.Logger, client *http.Client, bodyType string, method string, u string, headers map[string]string, sendSingle bool, v interface{}) (*http.Response, error) {
 	var req *http.Request
 	var err error
 	switch bodyType {
@@ -89,4 +90,4 @@ func convertToMap(v interface{}, sendSingle bool) (map[string]interface{}, error
 		return nil, fmt.Errorf("invalid content: %v", v)
 	}
 	return nil, fmt.Errorf("invalid content: %v", v)
-}
+}

+ 1 - 1
common/time_util.go

@@ -354,4 +354,4 @@ func GetTimer(duration int) *clock.Timer {
 
 func GetNowInMilli() int64 {
 	return TimeToUnixMilli(Clock.Now())
-}
+}

+ 4 - 4
common/time_util_test.go

@@ -15,13 +15,13 @@ func TestDateToAndFromMilli(t *testing.T) {
 		{int64(2579140864913), time.Date(2051, time.September, 24, 4, 1, 4, 913000000, time.UTC)},
 		{int64(-1579140864913), time.Date(1919, time.December, 17, 21, 45, 35, 87000000, time.UTC)},
 	}
-	for i, tt := range tests{
+	for i, tt := range tests {
 		time := TimeFromUnixMilli(tt.m)
-		if !time.Equal(tt.t){
+		if !time.Equal(tt.t) {
 			t.Errorf("%d time from milli result mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.t, time)
 		}
 		milli := TimeToUnixMilli(tt.t)
-		if tt.m != milli{
+		if tt.m != milli {
 			t.Errorf("%d time to milli result mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.m, milli)
 		}
 	}
@@ -29,7 +29,7 @@ func TestDateToAndFromMilli(t *testing.T) {
 
 func TestMockClock(t *testing.T) {
 	n := GetNowInMilli()
-	if n != 0{
+	if n != 0 {
 		t.Errorf("mock clock now mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", 0, n)
 	}
 }

+ 1 - 1
deploy/docker/conf_util.go

@@ -159,7 +159,7 @@ func Handle(file string, conf map[interface{}]interface{}, skeys []string, val s
 	}
 }
 
-func getKey(file string, key string) string{
+func getKey(file string, key string) string {
 	if m, ok := file_keys_map[file][key]; ok {
 		return m
 	} else {

+ 56 - 56
deploy/docker/conf_util_test.go

@@ -14,21 +14,21 @@ func TestHandle(t *testing.T) {
 	}{
 		{
 			config: map[interface{}]interface{}{
-				"default": map[interface{}]interface{} {
+				"default": map[interface{}]interface{}{
 					"protocol": "tcp",
-					"port": 5563,
-					"optional": map[interface{}] interface{} {
+					"port":     5563,
+					"optional": map[interface{}]interface{}{
 						"ClientId": "client1",
 					},
 				},
 			},
-			skeys:[]string{"default", "protocol"},
-			val: "ssl",
+			skeys: []string{"default", "protocol"},
+			val:   "ssl",
 			exp: map[interface{}]interface{}{
-				"default": map[interface{}]interface{} {
+				"default": map[interface{}]interface{}{
 					"protocol": "ssl",
-					"port": 5563,
-					"optional": map[interface{}] interface{} {
+					"port":     5563,
+					"optional": map[interface{}]interface{}{
 						"ClientId": "client1",
 					},
 				},
@@ -37,21 +37,21 @@ func TestHandle(t *testing.T) {
 
 		{
 			config: map[interface{}]interface{}{
-				"default": map[interface{}]interface{} {
+				"default": map[interface{}]interface{}{
 					"protocol": "tcp",
-					"port": 5563,
-					"optional": map[interface{}] interface{} {
+					"port":     5563,
+					"optional": map[interface{}]interface{}{
 						"ClientId": "client1",
 					},
 				},
 			},
-			skeys:[]string{"default", "optional", "CLIENTID"},
-			val: "client2",
+			skeys: []string{"default", "optional", "CLIENTID"},
+			val:   "client2",
 			exp: map[interface{}]interface{}{
-				"default": map[interface{}]interface{} {
+				"default": map[interface{}]interface{}{
 					"protocol": "tcp",
-					"port": 5563,
-					"optional": map[interface{}] interface{} {
+					"port":     5563,
+					"optional": map[interface{}]interface{}{
 						"ClientId": "client2",
 					},
 				},
@@ -60,22 +60,22 @@ func TestHandle(t *testing.T) {
 
 		{
 			config: map[interface{}]interface{}{
-				"default": map[interface{}]interface{} {
+				"default": map[interface{}]interface{}{
 					"protocol": "tcp",
-					"port": 5563,
-					"optional": map[interface{}] interface{} {
+					"port":     5563,
+					"optional": map[interface{}]interface{}{
 						"ClientId": "client1",
 					},
 				},
 			},
-			skeys:[]string{"default", "optional", "KEEPALIVE"},
-			val: "6000",
+			skeys: []string{"default", "optional", "KEEPALIVE"},
+			val:   "6000",
 			exp: map[interface{}]interface{}{
-				"default": map[interface{}]interface{} {
+				"default": map[interface{}]interface{}{
 					"protocol": "tcp",
-					"port": 5563,
-					"optional": map[interface{}] interface{} {
-						"ClientId": "client1",
+					"port":     5563,
+					"optional": map[interface{}]interface{}{
+						"ClientId":  "client1",
 						"KeepAlive": int64(6000),
 					},
 				},
@@ -84,21 +84,21 @@ func TestHandle(t *testing.T) {
 
 		{
 			config: map[interface{}]interface{}{
-				"default": map[interface{}]interface{} {
+				"default": map[interface{}]interface{}{
 					"protocol": "tcp",
-					"port": 5563,
-					"optional": map[interface{}] interface{} {
+					"port":     5563,
+					"optional": map[interface{}]interface{}{
 						"ClientId": "client1",
 					},
 				},
 			},
-			skeys:[]string{"default", "optional", "RETAINED"},
-			val: "true",
+			skeys: []string{"default", "optional", "RETAINED"},
+			val:   "true",
 			exp: map[interface{}]interface{}{
-				"default": map[interface{}]interface{} {
+				"default": map[interface{}]interface{}{
 					"protocol": "tcp",
-					"port": 5563,
-					"optional": map[interface{}] interface{} {
+					"port":     5563,
+					"optional": map[interface{}]interface{}{
 						"ClientId": "client1",
 						"Retained": true,
 					},
@@ -108,23 +108,23 @@ func TestHandle(t *testing.T) {
 
 		{
 			config: map[interface{}]interface{}{
-				"default": map[interface{}]interface{} {
+				"default": map[interface{}]interface{}{
 					"protocol": "tcp",
-					"port": 5563,
-					"optional": map[interface{}] interface{} {
+					"port":     5563,
+					"optional": map[interface{}]interface{}{
 						"ClientId": "client1",
 					},
 				},
 			},
-			skeys:[]string{"default", "optional", "test"},
-			val: "3.14",
+			skeys: []string{"default", "optional", "test"},
+			val:   "3.14",
 			exp: map[interface{}]interface{}{
-				"default": map[interface{}]interface{} {
+				"default": map[interface{}]interface{}{
 					"protocol": "tcp",
-					"port": 5563,
-					"optional": map[interface{}] interface{} {
+					"port":     5563,
+					"optional": map[interface{}]interface{}{
 						"ClientId": "client1",
-						"test": 3.14,
+						"test":     3.14,
 					},
 				},
 			},
@@ -132,25 +132,25 @@ func TestHandle(t *testing.T) {
 
 		{
 			config: map[interface{}]interface{}{
-				"default": map[interface{}]interface{} {
+				"default": map[interface{}]interface{}{
 					"protocol": "tcp",
-					"port": 5563,
-					"optional": map[interface{}] interface{} {
+					"port":     5563,
+					"optional": map[interface{}]interface{}{
 						"ClientId": "client1",
 					},
 				},
 			},
-			skeys:[]string{"application_conf", "test"},
-			val: "ssl",
+			skeys: []string{"application_conf", "test"},
+			val:   "ssl",
 			exp: map[interface{}]interface{}{
-				"default": map[interface{}]interface{} {
+				"default": map[interface{}]interface{}{
 					"protocol": "tcp",
-					"port": 5563,
-					"optional": map[interface{}] interface{} {
+					"port":     5563,
+					"optional": map[interface{}]interface{}{
 						"ClientId": "client1",
 					},
 				},
-				"application_conf": map[interface{}]interface{} {
+				"application_conf": map[interface{}]interface{}{
 					"test": "ssl",
 				},
 			},
@@ -180,15 +180,15 @@ func TestProcessEnv(t *testing.T) {
 			},
 			file: "edgex",
 			expt: map[interface{}]interface{}{
-				"default": map[interface {}]interface{} {
+				"default": map[interface{}]interface{}{
 					"protocol": "tcp",
-					"type": "zmq",
-					"optional": map[interface{}] interface{} {
+					"type":     "zmq",
+					"optional": map[interface{}]interface{}{
 						"ClientId": "clientid_0000",
 					},
 				},
-				"application_conf": map[interface{}]interface{} {
-					"protocol":"ssl",
+				"application_conf": map[interface{}]interface{}{
+					"protocol": "ssl",
 				},
 			},
 		},

+ 5 - 5
fvt_scripts/edgex/benchmark/pub.go

@@ -24,7 +24,7 @@ var msgConfig1 = types.MessageBusConfig{
 		Port:     5563,
 		Protocol: "tcp",
 	},
-	Type:messaging.ZeroMQ,
+	Type: messaging.ZeroMQ,
 }
 
 type data struct {
@@ -54,15 +54,15 @@ func pubEventClientZeroMq(count int, wg *sync.WaitGroup) {
 			log.Fatal(ec)
 		} else {
 			client := coredata.NewEventClient(local.New("test"))
-			index := 0;
+			index := 0
 			for i := 0; i < count; i++ {
-				if i % 10 == 0 {
+				if i%10 == 0 {
 					index = 0
 				}
 
 				var testEvent = models.Event{Device: "demo"}
 				var r1 = models.Reading{Device: "Temperature device", Name: "Temperature", Value: fmt.Sprintf("%d", mockup[index].temperature)}
-				var r2 = models.Reading{Device: "Humidity device", Name: "Humidity", Value: fmt.Sprintf("%d",  mockup[index].humidity)}
+				var r2 = models.Reading{Device: "Humidity device", Name: "Humidity", Value: fmt.Sprintf("%d", mockup[index].humidity)}
 				index++
 
 				testEvent.Readings = append(testEvent.Readings, r1, r2)
@@ -101,7 +101,7 @@ func main() {
 	var wg sync.WaitGroup
 	for i := 0; i < 1; i++ {
 		wg.Add(1)
-		go pubEventClientZeroMq(count ,&wg)
+		go pubEventClientZeroMq(count, &wg)
 	}
 	wg.Wait()
 	t := time.Now()

+ 12 - 13
fvt_scripts/edgex/pub.go

@@ -19,7 +19,7 @@ var msgConfig1 = types.MessageBusConfig{
 		Port:     5563,
 		Protocol: "tcp",
 	},
-	Type:messaging.ZeroMQ,
+	Type: messaging.ZeroMQ,
 }
 
 func pubEventClientZeroMq() {
@@ -39,16 +39,16 @@ func pubEventClientZeroMq() {
 				var r1 = models.Reading{Pushed: 123, Created: 123, Origin: 123, Modified: 123, Device: "test device name", Name: "Temperature", Value: fmt.Sprintf("%d", i*8)}
 				var r2 = models.Reading{Pushed: 123, Created: 123, Origin: 123, Modified: 123, Device: "test device name", Name: "Humidity", Value: fmt.Sprintf("%d", i*9)}
 
-				var r3 = models.Reading{Name:"b1"}
-				if i % 2 == 0 {
+				var r3 = models.Reading{Name: "b1"}
+				if i%2 == 0 {
 					r3.Value = "true"
 				} else {
 					r3.Value = "false"
 				}
 
-				r4 := models.Reading{Name:"i1", Value:fmt.Sprintf("%d", i)}
-				r5 := models.Reading{Name:"f1", Value:fmt.Sprintf("%.2f", float64(i)/2.0)}
-				r6 := models.Reading{Name:"ui64", Value:"10796529505058023104"}
+				r4 := models.Reading{Name: "i1", Value: fmt.Sprintf("%d", i)}
+				r5 := models.Reading{Name: "f1", Value: fmt.Sprintf("%.2f", float64(i)/2.0)}
+				r6 := models.Reading{Name: "ui64", Value: "10796529505058023104"}
 
 				testEvent.Readings = append(testEvent.Readings, r1, r2, r3, r4, r5, r6)
 
@@ -80,7 +80,7 @@ func pubToAnother() {
 			Port:     5571,
 			Protocol: "tcp",
 		},
-		Type:messaging.ZeroMQ,
+		Type: messaging.ZeroMQ,
 	}
 	if msgClient, err := messaging.NewMessageClient(msgConfig2); err != nil {
 		log.Fatal(err)
@@ -121,10 +121,10 @@ func pubToMQTT(host string) {
 			Port:     1883,
 			Protocol: "tcp",
 		},
-		Optional:map[string]string{
+		Optional: map[string]string{
 			"ClientId": "0001_client_id",
 		},
-		Type:messaging.MQTT,
+		Type: messaging.MQTT,
 	}
 	if msgClient, err := messaging.NewMessageClient(msgConfig2); err != nil {
 		log.Fatal(err)
@@ -170,9 +170,9 @@ func pubMetaSource() {
 			evtDevice := []string{"demo1", "demo2"}
 			for i, device := range evtDevice {
 				j := int64(i) + 1
-				testEvent := models.Event{Device: device, Created: 11*j, Modified: 12*j, Origin: 13*j}
-				r1 := models.Reading{Pushed: 22*j, Created: 23*j, Origin: 24*j, Modified: 25*j, Device: "Temperature sensor", Name: "Temperature", Value: fmt.Sprintf("%d", j*8)}
-				r2 := models.Reading{Pushed: 32*j, Created: 33*j, Origin: 34*j, Modified: 35*j, Device: "Humidity sensor", Name: "Humidity", Value: fmt.Sprintf("%d", j*8)}
+				testEvent := models.Event{Device: device, Created: 11 * j, Modified: 12 * j, Origin: 13 * j}
+				r1 := models.Reading{Pushed: 22 * j, Created: 23 * j, Origin: 24 * j, Modified: 25 * j, Device: "Temperature sensor", Name: "Temperature", Value: fmt.Sprintf("%d", j*8)}
+				r2 := models.Reading{Pushed: 32 * j, Created: 33 * j, Origin: 34 * j, Modified: 35 * j, Device: "Humidity sensor", Name: "Humidity", Value: fmt.Sprintf("%d", j*8)}
 
 				testEvent.Readings = append(testEvent.Readings, r1, r2)
 				data, err := client.MarshalEvent(testEvent)
@@ -213,4 +213,3 @@ func main() {
 		}
 	}
 }
-

+ 2 - 2
fvt_scripts/edgex/sub/sub.go

@@ -15,7 +15,7 @@ func subEventsFromZMQ() {
 			Port:     5571,
 			Protocol: "tcp",
 		},
-		Type:messaging.ZeroMQ,
+		Type: messaging.ZeroMQ,
 	}
 
 	if msgClient, err := messaging.NewMessageClient(msgConfig1); err != nil {
@@ -39,7 +39,7 @@ func subEventsFromZMQ() {
 						common.Log.Errorf("%s\n", e1)
 						return
 					case env := <-messages:
-						count ++
+						count++
 						fmt.Printf("%s\n", env.Payload)
 						if count == 1 {
 							return

+ 7 - 9
fvt_scripts/edgex/valuedesc/vd_server.go

@@ -20,18 +20,18 @@ const (
 )
 
 var vd1 = models.ValueDescriptor{Id: "Temperature", Created: 123, Modified: 123, Origin: 123, Name: "Temperature",
-	Description: "test description", Min: -70, Max: 140, DefaultValue: 32, Formatting: "%f", Type:"Float32",
+	Description: "test description", Min: -70, Max: 140, DefaultValue: 32, Formatting: "%f", Type: "Float32",
 	Labels: []string{"temp", "room temp"}, UomLabel: "F", MediaType: clients.ContentTypeJSON, FloatEncoding: "eNotation"}
 
 var vd2 = models.ValueDescriptor{Id: "Humidity", Created: 123, Modified: 123, Origin: 123, Name: "Humidity",
-	Description: "test description", Min: -70, Max: 140, DefaultValue: 32, Formatting: "%d", Type:"Uint64",
+	Description: "test description", Min: -70, Max: 140, DefaultValue: 32, Formatting: "%d", Type: "Uint64",
 	Labels: []string{"humi", "room humidity"}, UomLabel: "F", MediaType: clients.ContentTypeJSON, FloatEncoding: "eNotation"}
 
-var vd3 = models.ValueDescriptor{Id: "b1", Name: "b1", Formatting: "%t", Type:"Bool", MediaType: clients.ContentTypeJSON}
-var vd4 = models.ValueDescriptor{Id: "i1", Name: "i1", Formatting: "%d", Type:"UINT8", MediaType: clients.ContentTypeJSON}
-var vd5 = models.ValueDescriptor{Id: "f1", Name: "f1", Formatting: "%f", Type:"FLOAT64", MediaType: clients.ContentTypeJSON}
-var vd6 = models.ValueDescriptor{Id: "s1", Name: "s1", Formatting: "%s", Type:"String", MediaType: clients.ContentTypeJSON}
-var vd7 = models.ValueDescriptor{Id: "ui64", Name: "ui64", Formatting: "%d", Type:"UINT64", MediaType: clients.ContentTypeJSON}
+var vd3 = models.ValueDescriptor{Id: "b1", Name: "b1", Formatting: "%t", Type: "Bool", MediaType: clients.ContentTypeJSON}
+var vd4 = models.ValueDescriptor{Id: "i1", Name: "i1", Formatting: "%d", Type: "UINT8", MediaType: clients.ContentTypeJSON}
+var vd5 = models.ValueDescriptor{Id: "f1", Name: "f1", Formatting: "%f", Type: "FLOAT64", MediaType: clients.ContentTypeJSON}
+var vd6 = models.ValueDescriptor{Id: "s1", Name: "s1", Formatting: "%s", Type: "String", MediaType: clients.ContentTypeJSON}
+var vd7 = models.ValueDescriptor{Id: "ui64", Name: "ui64", Formatting: "%d", Type: "UINT64", MediaType: clients.ContentTypeJSON}
 
 func main() {
 	http.HandleFunc(clients.ApiValueDescriptorRoute, Hello)
@@ -73,5 +73,3 @@ func Hello(w http.ResponseWriter, req *http.Request) {
 	}
 	//_, _ = w.Write(data)
 }
-
-

+ 0 - 1
fvt_scripts/plugins/pub/zmq_pub.go

@@ -96,4 +96,3 @@ func main() {
 		}
 	}
 }
-

+ 2 - 1
fvt_scripts/plugins/service/server.go

@@ -31,6 +31,7 @@ type Sensor struct {
 }
 
 var s = &Sensor{}
+
 func pullSrv(w http.ResponseWriter, req *http.Request) {
 	buf, bodyErr := ioutil.ReadAll(req.Body)
 	if bodyErr != nil {
@@ -41,7 +42,7 @@ func pullSrv(w http.ResponseWriter, req *http.Request) {
 		fmt.Println(string(buf))
 	}
 
-	if count % 2 == 0 {
+	if count%2 == 0 {
 		rand.Seed(time.Now().UnixNano())
 		s.Temperature = rand.Intn(100)
 		s.Humidity = rand.Intn(100)

+ 25 - 25
xsql/ast_test.go

@@ -15,84 +15,84 @@ func Test_MessageValTest(t *testing.T) {
 	}{
 		{
 			key: "key1",
-			message:Message{
+			message: Message{
 				"key1": "val1",
 				"key2": "val2",
 			},
-			exptV: "val1",
+			exptV:  "val1",
 			exptOk: true,
 		},
 
 		{
 			key: "key0",
-			message:Message{
+			message: Message{
 				"key1": "val1",
 				"key2": "val2",
 			},
-			exptV: nil,
+			exptV:  nil,
 			exptOk: false,
 		},
 
 		{
 			key: "key1",
-			message:Message{
+			message: Message{
 				"Key1": "val1",
 				"key2": "val2",
 			},
-			exptV: "val1",
+			exptV:  "val1",
 			exptOk: true,
 		},
 
 		{
 			key: "key1" + COLUMN_SEPARATOR + "subkey",
-			message:Message{
-				"Key1": "val1",
+			message: Message{
+				"Key1":   "val1",
 				"subkey": "subval",
 			},
-			exptV: "subval",
+			exptV:  "subval",
 			exptOk: true,
 		},
 
 		{
 			key: "192.168.0.1",
-			message:Message{
-				"Key1": "val1",
+			message: Message{
+				"Key1":        "val1",
 				"192.168.0.1": "000",
 			},
-			exptV: "000",
+			exptV:  "000",
 			exptOk: true,
 		},
 
 		{
 			key: "parent" + COLUMN_SEPARATOR + "child",
-			message:Message{
-				"key1": "val1",
-				"child": "child_val",
+			message: Message{
+				"key1":         "val1",
+				"child":        "child_val",
 				"parent.child": "demo",
 			},
-			exptV: "child_val",
+			exptV:  "child_val",
 			exptOk: true,
 		},
 
 		{
 			key: "parent.child",
-			message:Message{
-				"key1": "val1",
-				"child": "child_val",
+			message: Message{
+				"key1":         "val1",
+				"child":        "child_val",
 				"parent.child": "demo",
 			},
-			exptV: "demo",
+			exptV:  "demo",
 			exptOk: true,
 		},
 
 		{
 			key: "parent.Child",
-			message:Message{
-				"key1": "val1",
-				"child": "child_val",
+			message: Message{
+				"key1":         "val1",
+				"child":        "child_val",
 				"parent.child": "demo",
 			},
-			exptV: "demo",
+			exptV:  "demo",
 			exptOk: true,
 		},
 	}
@@ -107,4 +107,4 @@ func Test_MessageValTest(t *testing.T) {
 			t.Errorf("%d. \n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.exptV, v)
 		}
 	}
-}
+}

+ 2 - 2
xsql/parser_test.go

@@ -1229,7 +1229,7 @@ func TestParser_ParseStatement(t *testing.T) {
 		},
 
 		{
-			s:    `select tstamp() as tp from demo`,
+			s: `select tstamp() as tp from demo`,
 			stmt: &SelectStatement{
 				Fields: []Field{
 					{
@@ -1242,7 +1242,7 @@ func TestParser_ParseStatement(t *testing.T) {
 				},
 				Sources: []Source{&Table{Name: "demo"}},
 			},
-			err:  "",
+			err: "",
 		},
 
 		{

+ 2 - 2
xsql/xsql_stream_test.go

@@ -154,7 +154,7 @@ func TestParser_ParseCreateStream(t *testing.T) {
 		
 				) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
 			stmt: &StreamStmt{
-				Name: StreamName("demo"),
+				Name:         StreamName("demo"),
 				StreamFields: nil,
 				Options: map[string]string{
 					"DATASOURCE": "users",
@@ -167,7 +167,7 @@ func TestParser_ParseCreateStream(t *testing.T) {
 		{
 			s: `CREATE STREAM demo() WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
 			stmt: &StreamStmt{
-				Name: StreamName("demo"),
+				Name:         StreamName("demo"),
 				StreamFields: nil,
 				Options: map[string]string{
 					"DATASOURCE": "users",

+ 4 - 5
xstream/extensions/edgex_source.go

@@ -29,11 +29,11 @@ type EdgexSource struct {
 }
 
 func (es *EdgexSource) Configure(device string, props map[string]interface{}) error {
-	var protocol = "tcp";
+	var protocol = "tcp"
 	if p, ok := props["protocol"]; ok {
 		protocol = p.(string)
 	}
-	var server = "localhost";
+	var server = "localhost"
 	if s, ok := props["server"]; ok {
 		server = s.(string)
 	}
@@ -135,7 +135,7 @@ func (es *EdgexSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTup
 						if len > 200 {
 							len = 200
 						}
-						log.Warnf("payload %s unmarshal fail: %v", env.Payload[0:(len - 1)], err)
+						log.Warnf("payload %s unmarshal fail: %v", env.Payload[0:(len-1)], err)
 					} else {
 						result := make(map[string]interface{})
 						meta := make(map[string]interface{})
@@ -298,7 +298,6 @@ func (es *EdgexSource) getFloatValue(r models.Reading, logger api.Logger) (inter
 	}
 }
 
-
 func (es *EdgexSource) fetchAllDataDescriptors() error {
 	if vdArr, err := es.vdc.ValueDescriptors(context.Background()); err != nil {
 		return err
@@ -340,4 +339,4 @@ func (es *EdgexSource) Close(ctx api.StreamContext) error {
 		}
 	}
 	return nil
-}
+}

+ 20 - 21
xstream/extensions/edgex_source_test.go

@@ -10,24 +10,24 @@ import (
 )
 
 var es = EdgexSource{valueDescs: map[string]string{
-	"b1" : "bool",
-	"i1" : "int8",
-	"i2" : "INT16",
-	"i3" : "INT32",
-	"i4" : "INT64",
-	"i5" : "UINT8",
-	"i6" : "UINT16",
-	"i7" : "UINT32",
-	"s1" : "String",
-	"f1" : "Float32", //FLOAT32 will be handled by special case
-	"f2" : "Float64", //FLOAT64 will be handled by special case
-	"i8" : "UINT64",  //UINT64 will be handled by special case
-	},
+	"b1": "bool",
+	"i1": "int8",
+	"i2": "INT16",
+	"i3": "INT32",
+	"i4": "INT64",
+	"i5": "UINT8",
+	"i6": "UINT16",
+	"i7": "UINT32",
+	"s1": "String",
+	"f1": "Float32", //FLOAT32 will be handled by special case
+	"f2": "Float64", //FLOAT64 will be handled by special case
+	"i8": "UINT64",  //UINT64 will be handled by special case
+},
 }
 
 func TestGetValue_Int(t *testing.T) {
 	var testEvent = models.Event{Device: "test"}
-	for i := 1; i < 8; i++{
+	for i := 1; i < 8; i++ {
 		r1 := models.Reading{Name: fmt.Sprintf("i%d", i), Value: "1"}
 		testEvent.Readings = append(testEvent.Readings, r1)
 	}
@@ -40,7 +40,7 @@ func TestGetValue_Int(t *testing.T) {
 		}
 	}
 
-	rf_01 := models.Reading{Name:"f1", Value:"fwtOaw=="}
+	rf_01 := models.Reading{Name: "f1", Value: "fwtOaw=="}
 	if v, e := es.getValue(rf_01, common.Log); e != nil {
 		t.Errorf("%s", e)
 	} else {
@@ -75,7 +75,7 @@ func expectOne(t *testing.T, expected interface{}) {
 
 func TestGetValue_Float(t *testing.T) {
 	var testEvent = models.Event{Device: "test"}
-	for i := 1; i < 3; i++{
+	for i := 1; i < 3; i++ {
 		r1 := models.Reading{Name: fmt.Sprintf("f%d", i), Value: "3.14"}
 		testEvent.Readings = append(testEvent.Readings, r1)
 	}
@@ -99,7 +99,6 @@ func expectPi(t *testing.T, expected interface{}) {
 	}
 }
 
-
 func TestGetValue_Bool(t *testing.T) {
 	///////////True
 	trues := []string{"1", "t", "T", "true", "TRUE", "True"}
@@ -157,7 +156,7 @@ func expectFalse(t *testing.T, expected interface{}) {
 func TestWrongType(t *testing.T) {
 	es1 := EdgexSource{valueDescs: map[string]string{
 		"f": "FLOAT", //A not exsited type
-		},
+	},
 	}
 	r1 := models.Reading{Name: "f", Value: "100"}
 	if v, _ := es1.getValue(r1, common.Log); v != "100" {
@@ -167,8 +166,8 @@ func TestWrongType(t *testing.T) {
 
 func TestWrongValue(t *testing.T) {
 	var testEvent = models.Event{Device: "test"}
-	r1 := models.Reading{Name: "b1", Value: "100"} //100 cannot be converted to a boolean value
-	r2 := models.Reading{Name: "i1", Value: "int"} //'int' string cannot be converted to int value
+	r1 := models.Reading{Name: "b1", Value: "100"}   //100 cannot be converted to a boolean value
+	r2 := models.Reading{Name: "i1", Value: "int"}   //'int' string cannot be converted to int value
 	r3 := models.Reading{Name: "f1", Value: "float"} //'float' string cannot be converted to int value
 	testEvent.Readings = append(testEvent.Readings, r1, r2, r3)
 
@@ -192,4 +191,4 @@ func TestCastToString(t *testing.T) {
 	if v, ok := CastToString(12.3); v != "12.30" || !ok {
 		t.Errorf("Failed to cast float.")
 	}
-}
+}

+ 3 - 3
xstream/extensions/httppull_source.go

@@ -33,7 +33,7 @@ type HTTPPullSource struct {
 var bodyTypeMap = map[string]string{"none": "", "text": "text/plain", "json": "application/json", "html": "text/html", "xml": "application/xml", "javascript": "application/javascript", "form": ""}
 
 func (hps *HTTPPullSource) Configure(device string, props map[string]interface{}) error {
-	hps.url = "http://localhost";
+	hps.url = "http://localhost"
 	if u, ok := props["url"]; ok {
 		if p, ok := u.(string); ok {
 			hps.url = p
@@ -138,7 +138,7 @@ func (hps *HTTPPullSource) initTimerPull(ctx api.StreamContext, consumer chan<-
 	ticker := time.NewTicker(time.Millisecond * time.Duration(hps.interval))
 	logger := ctx.GetLogger()
 	defer ticker.Stop()
-	var omd5 string = ""
+	var omd5 = ""
 	for {
 		select {
 		case <-ticker.C:
@@ -186,4 +186,4 @@ func (hps *HTTPPullSource) initTimerPull(ctx api.StreamContext, consumer chan<-
 func getMD5Hash(text []byte) string {
 	hash := md5.Sum(text)
 	return hex.EncodeToString(hash[:])
-}
+}

+ 1 - 1
xstream/extensions/source_util.go

@@ -18,4 +18,4 @@ func CastToString(v interface{}) (result string, ok bool) {
 	default:
 		return "", false
 	}
-}
+}

+ 1 - 2
xstream/nodes/with_edgex.go

@@ -15,7 +15,6 @@ func getSource(t string) (api.Source, error) {
 	return doGetSource(t)
 }
 
-
 func getSink(name string, action map[string]interface{}) (api.Sink, error) {
 	if name == "edgex" {
 		s := &sinks.EdgexMsgBusSink{}
@@ -26,4 +25,4 @@ func getSink(name string, action map[string]interface{}) (api.Sink, error) {
 		}
 	}
 	return doGetSink(name, action)
-}
+}

+ 1 - 1
xstream/nodes/without_edgex.go

@@ -10,4 +10,4 @@ func getSource(t string) (api.Source, error) {
 
 func getSink(name string, action map[string]interface{}) (api.Sink, error) {
 	return doGetSink(name, action)
-}
+}

+ 8 - 8
xstream/operators/window_op.go

@@ -40,8 +40,8 @@ func NewWindowOp(name string, w *xsql.Window, isEventTime bool, lateTolerance in
 	o.isEventTime = isEventTime
 	if w != nil {
 		o.window = &WindowConfig{
-			Type:     w.WindowType,
-			Length:   w.Length.Val,
+			Type:   w.WindowType,
+			Length: w.Length.Val,
 		}
 		if w.Interval != nil {
 			o.window.Interval = w.Interval.Val
@@ -172,7 +172,7 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, errCh chan<
 				case xsql.COUNT_WINDOW:
 					o.msgCount++
 					log.Debugf(fmt.Sprintf("msgCount: %d", o.msgCount))
-					if o.msgCount% o.window.Interval != 0 {
+					if o.msgCount%o.window.Interval != 0 {
 						continue
 					} else {
 						o.msgCount = 0
@@ -183,7 +183,7 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, errCh chan<
 						errCh <- er
 					} else {
 						log.Debugf(fmt.Sprintf("It has %d of count window.", tl.count()))
-						for ; tl.hasMoreCountWindow(); {
+						for tl.hasMoreCountWindow() {
 							tsets := tl.nextCountWindow()
 							log.Debugf("Sent: %v", tsets)
 							//blocking if one of the channel is full
@@ -238,9 +238,9 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, errCh chan<
 }
 
 type TupleList struct {
-	tuples     []*xsql.Tuple
-	index      int //Current index
-	size       int //The size for count window
+	tuples []*xsql.Tuple
+	index  int //Current index
+	size   int //The size for count window
 }
 
 func NewTupleList(tuples []*xsql.Tuple, windowSize int) (TupleList, error) {
@@ -271,7 +271,7 @@ func (tl *TupleList) count() int {
 func (tl *TupleList) nextCountWindow() xsql.WindowTuplesSet {
 	var results xsql.WindowTuplesSet = make([]xsql.WindowTuples, 0)
 	var subT []*xsql.Tuple
-	subT = tl.tuples[len(tl.tuples) -tl.size : len(tl.tuples)]
+	subT = tl.tuples[len(tl.tuples)-tl.size : len(tl.tuples)]
 	for _, tuple := range subT {
 		results = results.AddTuple(tuple)
 	}

+ 4 - 4
xstream/sinks/edgex_sink.go

@@ -117,7 +117,7 @@ func (ems *EdgexMsgBusSink) Open(ctx api.StreamContext) error {
 			Port:     ems.port,
 			Protocol: ems.protocol,
 		},
-		Type: ems.ptype,
+		Type:     ems.ptype,
 		Optional: ems.optional,
 	}
 	log.Infof("Using configuration for EdgeX message bus sink: %+v", conf)
@@ -198,7 +198,7 @@ func (ems *EdgexMsgBusSink) getMeta(result []map[string]interface{}) (meta, bool
 	return nil, false
 }
 
-func (m meta) getIntVal(k string) (int64) {
+func (m meta) getIntVal(k string) int64 {
 	if v, ok := m[k]; ok {
 		if v1, ok1 := v.(float64); ok1 {
 			return int64(v1)
@@ -207,7 +207,7 @@ func (m meta) getIntVal(k string) (int64) {
 	return 0
 }
 
-func (m meta) getStrVal(k string) (string) {
+func (m meta) getStrVal(k string) string {
 	if v, ok := m[k]; ok {
 		if v1, ok1 := v.(string); ok1 {
 			return v1
@@ -216,7 +216,7 @@ func (m meta) getStrVal(k string) (string) {
 	return ""
 }
 
-func (ems *EdgexMsgBusSink) getMetaValueAsMap(m meta, k string) (map[string]interface{}) {
+func (ems *EdgexMsgBusSink) getMetaValueAsMap(m meta, k string) map[string]interface{} {
 	if v, ok := m[k]; ok {
 		if v1, ok1 := v.(map[string]interface{}); ok1 {
 			return v1

+ 48 - 48
xstream/sinks/edgex_sink_test.go

@@ -36,28 +36,28 @@ func TestProduceEvents(t1 *testing.T) {
 				Origin:   3,
 				Readings: []models.Reading{
 					{
-						Name: "humidity",
-						Value: "100",
-						Created: 11,
-						Device: "test device name1",
-						Id: "12",
+						Name:     "humidity",
+						Value:    "100",
+						Created:  11,
+						Device:   "test device name1",
+						Id:       "12",
 						Modified: 13,
-						Origin: 14,
-						Pushed: 15,
+						Origin:   14,
+						Pushed:   15,
 					},
 					{
-						Name: "temperature",
-						Value: "50",
-						Created: 21,
-						Device: "test device name2",
-						Id: "22",
+						Name:     "temperature",
+						Value:    "50",
+						Created:  21,
+						Device:   "test device name2",
+						Id:       "22",
 						Modified: 23,
-						Origin: 24,
-						Pushed: 25,
+						Origin:   24,
+						Pushed:   25,
 					},
 				},
 			},
-			error:     "",
+			error: "",
 		},
 
 		{
@@ -79,18 +79,18 @@ func TestProduceEvents(t1 *testing.T) {
 				Origin:   3,
 				Readings: []models.Reading{
 					{
-						Name: "h1",
-						Value: "100",
-						Created: 0,
-						Device: "",
-						Id: "",
+						Name:     "h1",
+						Value:    "100",
+						Created:  0,
+						Device:   "",
+						Id:       "",
 						Modified: 0,
-						Origin: 0,
-						Pushed: 0,
+						Origin:   0,
+						Pushed:   0,
 					},
 				},
 			},
-			error:     "",
+			error: "",
 		},
 
 		{
@@ -107,18 +107,18 @@ func TestProduceEvents(t1 *testing.T) {
 				Origin:   0,
 				Readings: []models.Reading{
 					{
-						Name: "h1",
-						Value: "100",
-						Created: 0,
-						Device: "",
-						Id: "",
+						Name:     "h1",
+						Value:    "100",
+						Created:  0,
+						Device:   "",
+						Id:       "",
 						Modified: 0,
-						Origin: 0,
-						Pushed: 0,
+						Origin:   0,
+						Pushed:   0,
 					},
 				},
 			},
-			error:     "",
+			error: "",
 		},
 
 		{
@@ -135,32 +135,32 @@ func TestProduceEvents(t1 *testing.T) {
 				Origin:   0,
 				Readings: []models.Reading{
 					{
-						Name: "meta1",
-						Value: "50",
-						Created: 0,
-						Device: "",
-						Id: "",
+						Name:     "meta1",
+						Value:    "50",
+						Created:  0,
+						Device:   "",
+						Id:       "",
 						Modified: 0,
-						Origin: 0,
-						Pushed: 0,
+						Origin:   0,
+						Pushed:   0,
 					},
 					{
-						Name: "h1",
-						Value: "100",
-						Created: 0,
-						Device: "",
-						Id: "",
+						Name:     "h1",
+						Value:    "100",
+						Created:  0,
+						Device:   "",
+						Id:       "",
 						Modified: 0,
-						Origin: 0,
-						Pushed: 0,
+						Origin:   0,
+						Pushed:   0,
 					},
 				},
 			},
-			error:     "",
+			error: "",
 		},
 
 		{
-			input: `[]`,
+			input:      `[]`,
 			deviceName: "kuiper",
 			expected: &models.Event{
 				ID:       "",
@@ -171,7 +171,7 @@ func TestProduceEvents(t1 *testing.T) {
 				Origin:   0,
 				Readings: nil,
 			},
-			error:     "",
+			error: "",
 		},
 	}
 

+ 2 - 2
xstream/sinks/nop_sink.go

@@ -5,7 +5,7 @@ import (
 )
 
 type NopSink struct {
-	log  bool
+	log bool
 }
 
 func (ns *NopSink) Configure(ps map[string]interface{}) error {
@@ -32,4 +32,4 @@ func (ns *NopSink) Collect(ctx api.StreamContext, item interface{}) error {
 
 func (ns *NopSink) Close(ctx api.StreamContext) error {
 	return nil
-}
+}

+ 2 - 2
xstream/test/clock.go

@@ -5,12 +5,12 @@ import (
 	"github.com/emqx/kuiper/common"
 )
 
-func ResetClock(t int64){
+func ResetClock(t int64) {
 	mock := clock.NewMock()
 	mock.Set(common.TimeFromUnixMilli(t))
 	common.Clock = mock
 }
 
-func GetMockClock() *clock.Mock{
+func GetMockClock() *clock.Mock {
 	return common.Clock.(*clock.Mock)
 }