ruleCreator.go 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "bytes"
  17. "encoding/json"
  18. "fmt"
  19. "net/http"
  20. "time"
  21. )
  22. const (
  23. count = 300 // number of rules to create
  24. interval = 10 // interval between requests
  25. url = "http://127.0.0.1:9081/" // eKuiper url to send rule creation requests to
  26. mqttUrl = "tcp://127.0.0.1:1883" // mqtt broker url
  27. )
  28. type rule struct {
  29. Id string `json:"id"`
  30. Sql string `json:"sql"`
  31. Actions []map[string]interface{} `json:"actions"`
  32. Options map[string]interface{} `json:"options"`
  33. }
  34. func create() {
  35. fmt.Println("create stream")
  36. createStream()
  37. fmt.Println("create rules")
  38. createRules()
  39. }
  40. func createStream() {
  41. s := `{"sql":"CREATE STREAM rawdata() WITH (DATASOURCE=\"rawdata\", SHARED=\"TRUE\");"}`
  42. resp, err := http.Post(url+"streams", "application/json", bytes.NewReader([]byte(s)))
  43. if err != nil {
  44. fmt.Println(err)
  45. }
  46. if resp.StatusCode != http.StatusCreated {
  47. fmt.Printf("%v\n", resp)
  48. }
  49. }
  50. func createRules() {
  51. i := 0
  52. for ; i <= count; i++ {
  53. r := &rule{
  54. Id: fmt.Sprintf("rule%d", i),
  55. Sql: "SELECT temperature FROM rawdata WHERE temperature > 20",
  56. Actions: []map[string]interface{}{
  57. {
  58. "nop": map[string]interface{}{},
  59. },
  60. },
  61. Options: map[string]interface{}{},
  62. }
  63. if i%10 == 0 { // Send 1/10 requests to mqtt broker
  64. r.Actions = []map[string]interface{}{
  65. {
  66. "mqtt": map[string]interface{}{
  67. "server": mqttUrl,
  68. "topic": "demoSink",
  69. },
  70. },
  71. }
  72. }
  73. s, err := json.Marshal(r)
  74. if err != nil {
  75. fmt.Println(err)
  76. break
  77. }
  78. resp, err := http.Post(url+"rules", "application/json", bytes.NewReader(s))
  79. if err != nil {
  80. fmt.Println(err)
  81. break
  82. }
  83. if resp.StatusCode != http.StatusCreated {
  84. fmt.Printf("%v\n", resp)
  85. break
  86. }
  87. time.Sleep(time.Duration(interval) * time.Millisecond)
  88. }
  89. fmt.Printf("Run %d\n", i-1)
  90. }