sink_test.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package neuron
  15. import (
  16. "reflect"
  17. "testing"
  18. "time"
  19. "github.com/lf-edge/ekuiper/internal/io/mock"
  20. )
  21. func sinkTest(t *testing.T) {
  22. server, ch := mockNeuron(false, true, DefaultNeuronUrl)
  23. defer server.Close()
  24. s := GetSink()
  25. s.Configure(map[string]interface{}{
  26. "nodeName": "test1",
  27. "groupName": "grp",
  28. "tags": []string{"temperature", "status"},
  29. "raw": false,
  30. })
  31. data := []interface{}{
  32. map[string]interface{}{
  33. "temperature": 22,
  34. "humidity": 50,
  35. "status": "green",
  36. },
  37. map[string]interface{}{
  38. "temperature": 25,
  39. "humidity": 82,
  40. "status": "wet",
  41. },
  42. map[string]interface{}{
  43. "temperature": 33,
  44. "humidity": 60,
  45. "status": "hot",
  46. },
  47. }
  48. err := mock.RunSinkCollect(s, data)
  49. if err != nil {
  50. t.Errorf(err.Error())
  51. return
  52. }
  53. exp := []string{
  54. `{"group_name":"grp","node_name":"test1","tag_name":"temperature","value":22}`,
  55. `{"group_name":"grp","node_name":"test1","tag_name":"status","value":"green"}`,
  56. `{"group_name":"grp","node_name":"test1","tag_name":"temperature","value":25}`,
  57. `{"group_name":"grp","node_name":"test1","tag_name":"status","value":"wet"}`,
  58. `{"group_name":"grp","node_name":"test1","tag_name":"temperature","value":33}`,
  59. `{"group_name":"grp","node_name":"test1","tag_name":"status","value":"hot"}`,
  60. }
  61. var actual []string
  62. ticker := time.After(10 * time.Second)
  63. for i := 0; i < len(exp); i++ {
  64. select {
  65. case <-ticker:
  66. t.Errorf("timeout")
  67. return
  68. case d := <-ch:
  69. actual = append(actual, string(d))
  70. }
  71. }
  72. if !reflect.DeepEqual(actual, exp) {
  73. t.Errorf("result mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", exp, actual)
  74. }
  75. time.Sleep(100 * time.Millisecond)
  76. }
  77. func sinkConnExpTest(t *testing.T) {
  78. s := GetSink()
  79. s.Configure(map[string]interface{}{
  80. "nodeName": "test1",
  81. "groupName": "grp",
  82. "tags": []string{"temperature", "status"},
  83. "raw": false,
  84. })
  85. data := []interface{}{
  86. map[string]interface{}{
  87. "temperature": 22,
  88. "humidity": 50,
  89. "status": "green",
  90. },
  91. map[string]interface{}{
  92. "temperature": 25,
  93. "humidity": 82,
  94. "status": "wet",
  95. },
  96. map[string]interface{}{
  97. "temperature": 33,
  98. "humidity": 60,
  99. "status": "hot",
  100. },
  101. }
  102. expErrStr := "io error: Error publish the tag payload temperature: io error: neuron connection is not established"
  103. err := mock.RunSinkCollect(s, data)
  104. if err == nil {
  105. t.Errorf("should have error")
  106. return
  107. } else if err.Error() != expErrStr {
  108. t.Errorf("error mismatch:\n\nexp=%s\n\ngot=%s\n\n", expErrStr, err.Error())
  109. }
  110. }