Преглед на файлове

benchmark test for edgex source

RockyJin преди 5 години
родител
ревизия
cc09772e7d

+ 124 - 0
fvt_scripts/edgex/benchmark/README.md

@@ -0,0 +1,124 @@
+
+
+- Build the ``pub.go``, this is used for simulating the data.
+    ```shell
+    # go build -o test/pub test/pub.go
+    ```
+
+- A mockup EdgeX value descriptor service should be compiled and run before test.
+
+    ```shell
+    # go build -o fvt_scripts/edgex/valuedesc/vdmocker fvt_scripts/edgex/valuedesc/vd_server.go
+    
+    # fvt_scripts/edgex/valuedesc/vdmocker > vdmocker.out 2>&1 &
+    ```
+
+- Update edgex configuration. Update ``Server`` configuration to the address where you run ``pub`` in the 1st step, update ``serviceServer`` to the address where you run ``vdmocker`` of last step.  
+  ```yaml
+  default:
+    protocol: tcp
+    server: 172.31.1.144
+    port: 5563
+    topic: events
+    serviceServer: http://localhost:48080 
+  ```
+
+- The rule is listed as following, save the rule as ``rule.txt``.
+
+   ```json
+    {
+      "sql": "SELECT * from demo where temperature>50",
+      "actions": [
+        {
+          "nop": {
+            "log": false
+          }
+        }
+      ]
+    }
+   ```
+  
+- Create stream ``bin/cli create stream demo'() WITH (FORMAT="JSON", TYPE="edgex")'``
+
+- Create rule ``bin/cli create rule rule1 -f rule.txt``. To add another rule, just change the rule name of command, then deploy another rule, e,g, ``bin/cli create rule rule2 -f rule.txt``
+
+- Run ``pub`` application ``./pub 1000000``, below is an example.
+  ```shell script
+  ubuntu@ip-172-31-1-144:~$ ./pub 1000000
+  elapsed 174.924363s
+  ```
+  
+  Check the status of rule,
+  ```shell script
+  ubuntu@ip-172-31-5-85:/tmp/kuiper-master/_build/kuiper--linux-x86_64$ bin/cli getstatus rule rule1
+    Connecting to 127.0.0.1:20498...
+    {
+      "source_demo_0_records_in_total": 1000000,
+      "source_demo_0_records_out_total": 1000000,
+      "source_demo_0_exceptions_total": 0,
+      "source_demo_0_process_latency_ms": 0,
+      "source_demo_0_buffer_length": 0,
+      "source_demo_0_last_invocation": "2020-04-10T04:26:15.51329",
+      "op_preprocessor_demo_0_records_in_total": 1000000,
+      "op_preprocessor_demo_0_records_out_total": 1000000,
+      "op_preprocessor_demo_0_exceptions_total": 0,
+      "op_preprocessor_demo_0_process_latency_ms": 0,
+      "op_preprocessor_demo_0_buffer_length": 0,
+      "op_preprocessor_demo_0_last_invocation": "2020-04-10T04:26:15.513371",
+      "op_filter_0_records_in_total": 1000000,
+      "op_filter_0_records_out_total": 100000,
+      "op_filter_0_exceptions_total": 0,
+      "op_filter_0_process_latency_ms": 0,
+      "op_filter_0_buffer_length": 0,
+      "op_filter_0_last_invocation": "2020-04-10T04:26:15.513391",
+      "op_project_0_records_in_total": 100000,
+      "op_project_0_records_out_total": 100000,
+      "op_project_0_exceptions_total": 0,
+      "op_project_0_process_latency_ms": 0,
+      "op_project_0_buffer_length": 0,
+      "op_project_0_last_invocation": "2020-04-10T04:26:15.513468",
+      "sink_sink_nop_0_records_in_total": 100000,
+      "sink_sink_nop_0_records_out_total": 100000,
+      "sink_sink_nop_0_exceptions_total": 0,
+      "sink_sink_nop_0_process_latency_ms": 0,
+      "sink_sink_nop_0_buffer_length": 1,
+      "sink_sink_nop_0_last_invocation": "2020-04-10T04:26:15.513501"
+    }
+  ubuntu@ip-172-31-5-85:/tmp/kuiper-master/_build/kuiper--linux-x86_64$ bin/cli getstatus rule rule2
+    Connecting to 127.0.0.1:20498...
+    {
+      "source_demo_0_records_in_total": 1000000,
+      "source_demo_0_records_out_total": 1000000,
+      "source_demo_0_exceptions_total": 0,
+      "source_demo_0_process_latency_ms": 0,
+      "source_demo_0_buffer_length": 0,
+      "source_demo_0_last_invocation": "2020-04-10T04:26:15.514621",
+      "op_preprocessor_demo_0_records_in_total": 1000000,
+      "op_preprocessor_demo_0_records_out_total": 1000000,
+      "op_preprocessor_demo_0_exceptions_total": 0,
+      "op_preprocessor_demo_0_process_latency_ms": 0,
+      "op_preprocessor_demo_0_buffer_length": 0,
+      "op_preprocessor_demo_0_last_invocation": "2020-04-10T04:26:15.514631",
+      "op_filter_0_records_in_total": 1000000,
+      "op_filter_0_records_out_total": 100000,
+      "op_filter_0_exceptions_total": 0,
+      "op_filter_0_process_latency_ms": 0,
+      "op_filter_0_buffer_length": 0,
+      "op_filter_0_last_invocation": "2020-04-10T04:26:15.514635",
+      "op_project_0_records_in_total": 100000,
+      "op_project_0_records_out_total": 100000,
+      "op_project_0_exceptions_total": 0,
+      "op_project_0_process_latency_ms": 0,
+      "op_project_0_buffer_length": 0,
+      "op_project_0_last_invocation": "2020-04-10T04:26:15.514639",
+      "sink_sink_nop_0_records_in_total": 100000,
+      "sink_sink_nop_0_records_out_total": 100000,
+      "sink_sink_nop_0_exceptions_total": 0,
+      "sink_sink_nop_0_process_latency_ms": 0,
+      "sink_sink_nop_0_buffer_length": 1,
+      "sink_sink_nop_0_last_invocation": "2020-04-10T04:26:15.514652"
+    }
+  ```
+Below is the system usage screenshot,
+
+  ![](system_usage.png)

+ 111 - 0
fvt_scripts/edgex/benchmark/pub.go

@@ -0,0 +1,111 @@
+// +build benchmark
+
+//Not necessary to build the file, until for the edgex benchmark test
+package main
+
+import (
+	"context"
+	"fmt"
+	"github.com/edgexfoundry/go-mod-core-contracts/clients/coredata"
+	"github.com/edgexfoundry/go-mod-core-contracts/clients/urlclient/local"
+	"github.com/edgexfoundry/go-mod-core-contracts/models"
+	"github.com/edgexfoundry/go-mod-messaging/messaging"
+	"github.com/edgexfoundry/go-mod-messaging/pkg/types"
+	"log"
+	"os"
+	"strconv"
+	"sync"
+	"time"
+)
+
+var msgConfig1 = types.MessageBusConfig{
+	PublishHost: types.HostInfo{
+		Host:     "172.31.1.144",
+		Port:     5563,
+		Protocol: "tcp",
+	},
+	Type:messaging.ZeroMQ,
+}
+
+type data struct {
+	temperature int
+	humidity    int
+}
+
+var mockup = []data{
+	{temperature: 10, humidity: 15},
+	{temperature: 15, humidity: 20},
+	{temperature: 20, humidity: 25},
+	{temperature: 25, humidity: 30},
+	{temperature: 30, humidity: 35},
+	{temperature: 35, humidity: 40},
+	{temperature: 40, humidity: 45},
+	{temperature: 45, humidity: 50},
+	{temperature: 50, humidity: 55},
+	{temperature: 55, humidity: 60},
+}
+
+func pubEventClientZeroMq(count int, wg *sync.WaitGroup) {
+	defer wg.Done()
+	if msgClient, err := messaging.NewMessageClient(msgConfig1); err != nil {
+		log.Fatal(err)
+	} else {
+		if ec := msgClient.Connect(); ec != nil {
+			log.Fatal(ec)
+		} else {
+			client := coredata.NewEventClient(local.New("test"))
+			index := 0;
+			for i := 0; i < count; i++ {
+				if i % 10 == 0 {
+					index = 0
+				}
+
+				var testEvent = models.Event{Device: "demo"}
+				var r1 = models.Reading{Device: "Temperature device", Name: "Temperature", Value: fmt.Sprintf("%d", mockup[index].temperature)}
+				var r2 = models.Reading{Device: "Humidity device", Name: "Humidity", Value: fmt.Sprintf("%d",  mockup[index].humidity)}
+				index++
+
+				testEvent.Readings = append(testEvent.Readings, r1, r2)
+
+				data, err := client.MarshalEvent(testEvent)
+				if err != nil {
+					fmt.Errorf("unexpected error MarshalEvent %v", err)
+				}
+
+				env := types.NewMessageEnvelope([]byte(data), context.Background())
+				env.ContentType = "application/json"
+
+				if e := msgClient.Publish(env, "events"); e != nil {
+					log.Fatal(e)
+				} else {
+					//fmt.Printf("%d - %s\n", index, string(data))
+				}
+				time.Sleep(100 * time.Nanosecond)
+			}
+		}
+	}
+}
+
+func main() {
+	start := time.Now()
+	count := 1000
+	if len(os.Args) == 2 {
+		v := os.Args[1]
+		if c, err := strconv.Atoi(v); err != nil {
+			fmt.Errorf("%s\n", err)
+		} else {
+			count = c
+		}
+	}
+
+	var wg sync.WaitGroup
+	for i := 0; i < 1; i++ {
+		wg.Add(1)
+		go pubEventClientZeroMq(count ,&wg)
+	}
+	wg.Wait()
+	t := time.Now()
+	elapsed := t.Sub(start)
+
+	fmt.Printf("elapsed %2fs\n", elapsed.Seconds())
+}

BIN
fvt_scripts/edgex/benchmark/system_usage.png


+ 2 - 0
xstream/nodes/sink_node.go

@@ -220,6 +220,8 @@ func doGetSink(name string, action map[string]interface{}) (api.Sink, error) {
 		s = &sinks.MQTTSink{}
 	case "rest":
 		s = &sinks.RestSink{}
+	case "nop":
+		s = &sinks.NopSink{}
 	default:
 		nf, err := plugins.GetPlugin(name, plugins.SINK)
 		if err != nil {

+ 3 - 1
xstream/server/server/server.go

@@ -67,7 +67,9 @@ func StartUp(Version string) {
 	// Listen to TPC connections on port 1234
 	listener, e := net.Listen("tcp", fmt.Sprintf(":%d", common.Config.Port))
 	if e != nil {
-		logger.Fatal("Listen error: ", e)
+		m := fmt.Sprintf("Listen error: %s", e)
+		fmt.Printf(m)
+		logger.Fatal(m)
 	}
 
 	if common.Config.Prometheus {

+ 35 - 0
xstream/sinks/nop_sink.go

@@ -0,0 +1,35 @@
+package sinks
+
+import (
+	"github.com/emqx/kuiper/xstream/api"
+)
+
+type NopSink struct {
+	log  bool
+}
+
+func (ns *NopSink) Configure(ps map[string]interface{}) error {
+	var log = false
+	l, ok := ps["log"]
+	if ok {
+		log = l.(bool)
+	}
+	ns.log = log
+	return nil
+}
+
+func (ns *NopSink) Open(ctx api.StreamContext) error {
+	return nil
+}
+
+func (ns *NopSink) Collect(ctx api.StreamContext, item interface{}) error {
+	logger := ctx.GetLogger()
+	if ns.log {
+		logger.Infof("%s", item)
+	}
+	return nil
+}
+
+func (ns *NopSink) Close(ctx api.StreamContext) error {
+	return nil
+}