ruleCreator.go 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "bytes"
  17. "encoding/json"
  18. "fmt"
  19. "net/http"
  20. "strings"
  21. "time"
  22. )
  23. const (
  24. count = 300 // number of rules to create
  25. interval = 10 // interval between requests
  26. url = "http://127.0.0.1:9081/" // eKuiper url to send rule creation requests to
  27. mqttUrl = "tcp://127.0.0.1:1883" // mqtt broker url
  28. )
  29. type rule struct {
  30. Id string `json:"id"`
  31. Sql string `json:"sql"`
  32. Actions []map[string]interface{} `json:"actions"`
  33. Options map[string]interface{} `json:"options"`
  34. }
  35. func create() {
  36. fmt.Println("create stream")
  37. createStream()
  38. fmt.Println("create rules")
  39. createRules()
  40. }
  41. func createStream() {
  42. s := `{"sql":"CREATE STREAM rawdata() WITH (DATASOURCE=\"rawdata\", SHARED=\"TRUE\");"}`
  43. resp, err := http.Post(url+"streams", "application/json", strings.NewReader(s))
  44. if err != nil {
  45. fmt.Println(err)
  46. }
  47. if resp.StatusCode != http.StatusCreated {
  48. fmt.Printf("%v\n", resp)
  49. }
  50. }
  51. func createRules() {
  52. i := 0
  53. for ; i <= count; i++ {
  54. r := &rule{
  55. Id: fmt.Sprintf("rule%d", i),
  56. Sql: "SELECT temperature FROM rawdata WHERE temperature > 20",
  57. Actions: []map[string]interface{}{
  58. {
  59. "nop": map[string]interface{}{},
  60. },
  61. },
  62. Options: map[string]interface{}{},
  63. }
  64. if i%10 == 0 { // Send 1/10 requests to mqtt broker
  65. r.Actions = []map[string]interface{}{
  66. {
  67. "mqtt": map[string]interface{}{
  68. "server": mqttUrl,
  69. "topic": "demoSink",
  70. },
  71. },
  72. }
  73. }
  74. s, err := json.Marshal(r)
  75. if err != nil {
  76. fmt.Println(err)
  77. break
  78. }
  79. resp, err := http.Post(url+"rules", "application/json", bytes.NewReader(s))
  80. if err != nil {
  81. fmt.Println(err)
  82. break
  83. }
  84. if resp.StatusCode != http.StatusCreated {
  85. fmt.Printf("%v\n", resp)
  86. break
  87. }
  88. time.Sleep(time.Duration(interval) * time.Millisecond)
  89. }
  90. fmt.Printf("Run %d\n", i-1)
  91. }