Преглед на файлове

test(benchmark): benchmark test for multiple rules

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang преди 3 години
родител
ревизия
19e4950364

+ 25 - 0
test/benchmark/multiple_rules/main.go

@@ -0,0 +1,25 @@
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import "os"
+
+func main() {
+	if len(os.Args) > 1 && os.Args[1] == "create" {
+		create()
+	} else {
+		pub()
+	}
+}

+ 42 - 0
test/benchmark/multiple_rules/pub.go

@@ -0,0 +1,42 @@
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+	"fmt"
+	mqtt "github.com/eclipse/paho.mqtt.golang"
+	"time"
+)
+
+const (
+	tps = 500
+)
+
+func pub() {
+	opts := mqtt.NewClientOptions().AddBroker(mqttUrl)
+	client := mqtt.NewClient(opts)
+	defer client.Disconnect(0)
+	if token := client.Connect(); token.Wait() && token.Error() != nil {
+		panic(token.Error())
+	}
+	for i := 0; ; i++ {
+		payload := []byte(fmt.Sprintf(`{"temperature":%d,"humidity":90}`, 12+i%10))
+		if token := client.Publish("rawdata", 0, false, payload); token.Wait() && token.Error() != nil {
+			fmt.Println(token.Error())
+		}
+
+		time.Sleep(1000 / tps * time.Millisecond)
+	}
+}

+ 12 - 0
test/benchmark/multiple_rules/readme.md

@@ -0,0 +1,12 @@
+# Test
+
+In this multiple rules scenario benchmark, we will create a shared stream and multiple rules on that single stream. By default, there will be 300 rules with 500 tps which means 150000 processing happen per second. 
+
+## Run the test
+
+Recommend to set kuiper.yaml ignoreCase to false, then start eKuiper and the mqtt broker.
+   
+0. Setting variables: open `ruleCreator.go`, modify the const variables to set up the eKuiper url, mqtt broker url and how many rules to create. Then open `pub.go` to set up the tps.
+1. Build the test util. In this directory, run `go build -o pub100 .` It will produce an executable `pub100`.
+2. Create rules: `./pub100 create` which will create the rules. In eKuiper `data/sqliteKV.db` can be backed up. So in the future, just restore this file to create these rules.
+3. Run `./pub100`. Monitor the CPU, memory usage and the mqtt sink topic `demoSink` metric to measure the workload.

+ 97 - 0
test/benchmark/multiple_rules/ruleCreator.go

@@ -0,0 +1,97 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+	"bytes"
+	"encoding/json"
+	"fmt"
+	"net/http"
+	"time"
+)
+
+const (
+	count    = 300                      // number of rules to create
+	interval = 10                       // interval between requests
+	url      = "http://127.0.0.1:9081/" // eKuiper url to send rule creation requests to
+	mqttUrl  = "tcp://127.0.0.1:1883"   // mqtt broker url
+)
+
+type rule struct {
+	Id      string                   `json:"id"`
+	Sql     string                   `json:"sql"`
+	Actions []map[string]interface{} `json:"actions"`
+	Options map[string]interface{}   `json:"options"`
+}
+
+func create() {
+	fmt.Println("create stream")
+	createStream()
+	fmt.Println("create rules")
+	createRules()
+}
+
+func createStream() {
+	s := `{"sql":"CREATE STREAM rawdata() WITH (DATASOURCE=\"rawdata\", SHARED=\"TRUE\");"}`
+	resp, err := http.Post(url+"streams", "application/json", bytes.NewReader([]byte(s)))
+	if err != nil {
+		fmt.Println(err)
+	}
+	if resp.StatusCode != http.StatusCreated {
+		fmt.Printf("%v\n", resp)
+	}
+}
+
+func createRules() {
+	i := 0
+	for ; i <= count; i++ {
+		r := &rule{
+			Id:  fmt.Sprintf("rule%d", i),
+			Sql: "SELECT temperature FROM rawdata WHERE temperature > 20",
+			Actions: []map[string]interface{}{
+				{
+					"nop": map[string]interface{}{},
+				},
+			},
+			Options: map[string]interface{}{},
+		}
+		if i%10 == 0 { // Send 1/10 requests to mqtt broker
+			r.Actions = []map[string]interface{}{
+				{
+					"mqtt": map[string]interface{}{
+						"server": mqttUrl,
+						"topic":  "demoSink",
+					},
+				},
+			}
+		}
+		s, err := json.Marshal(r)
+		if err != nil {
+			fmt.Println(err)
+			break
+		}
+		resp, err := http.Post(url+"rules", "application/json", bytes.NewReader(s))
+		if err != nil {
+			fmt.Println(err)
+			break
+		}
+		if resp.StatusCode != http.StatusCreated {
+			fmt.Printf("%v\n", resp)
+			break
+		}
+		time.Sleep(time.Duration(interval) * time.Millisecond)
+	}
+	fmt.Printf("Run %d\n", i-1)
+}