sink_test.go 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package neuron
  15. import (
  16. "github.com/lf-edge/ekuiper/internal/topo/mock"
  17. "reflect"
  18. "testing"
  19. "time"
  20. )
  21. func sinkTest(t *testing.T) {
  22. server, ch := mockNeuron(false, true)
  23. defer server.Close()
  24. s := GetSink()
  25. s.Configure(map[string]interface{}{
  26. "nodeName": "test1",
  27. "groupName": "grp",
  28. "tags": []string{"temperature", "status"},
  29. "raw": false,
  30. })
  31. data := []interface{}{
  32. map[string]interface{}{
  33. "temperature": 22,
  34. "humidity": 50,
  35. "status": "green",
  36. },
  37. map[string]interface{}{
  38. "temperature": 25,
  39. "humidity": 82,
  40. "status": "wet",
  41. },
  42. map[string]interface{}{
  43. "temperature": 33,
  44. "humidity": 60,
  45. "status": "hot",
  46. },
  47. }
  48. err := mock.RunSinkCollect(s, data)
  49. if err != nil {
  50. t.Errorf(err.Error())
  51. return
  52. }
  53. exp := []string{
  54. `{"group_name":"grp","node_name":"test1","tag_name":"temperature","value":22}`,
  55. `{"group_name":"grp","node_name":"test1","tag_name":"status","value":"green"}`,
  56. `{"group_name":"grp","node_name":"test1","tag_name":"temperature","value":25}`,
  57. `{"group_name":"grp","node_name":"test1","tag_name":"status","value":"wet"}`,
  58. `{"group_name":"grp","node_name":"test1","tag_name":"temperature","value":33}`,
  59. `{"group_name":"grp","node_name":"test1","tag_name":"status","value":"hot"}`,
  60. }
  61. var actual []string
  62. ticker := time.After(10 * time.Second)
  63. for i := 0; i < len(exp); i++ {
  64. select {
  65. case <-ticker:
  66. t.Errorf("timeout")
  67. return
  68. case d := <-ch:
  69. actual = append(actual, string(d))
  70. }
  71. }
  72. if !reflect.DeepEqual(actual, exp) {
  73. t.Errorf("result mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", exp, actual)
  74. }
  75. time.Sleep(100 * time.Millisecond)
  76. }