manager_test.go 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. // Copyright 2021 INTECH Process Automation Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package shared
  15. import (
  16. "encoding/json"
  17. "github.com/lf-edge/ekuiper/internal/topo/context"
  18. "github.com/lf-edge/ekuiper/pkg/api"
  19. "reflect"
  20. "testing"
  21. )
  22. func TestSharedInmemoryNode(t *testing.T) {
  23. id := "test_id"
  24. sinkProps := make(map[string]interface{})
  25. sinkProps[IdProperty] = id
  26. src := GetSource()
  27. snk, err := GetSink(sinkProps)
  28. if err != nil {
  29. t.Error(err)
  30. }
  31. ctx := context.Background()
  32. consumer := make(chan api.SourceTuple)
  33. errorChannel := make(chan error)
  34. srcProps := make(map[string]interface{})
  35. srcProps["option"] = "value"
  36. go func() {
  37. src.Open(ctx, consumer, errorChannel)
  38. }()
  39. err = src.Configure(id, srcProps)
  40. if err != nil {
  41. t.Error(err)
  42. }
  43. srcProps[IdProperty] = id
  44. if _, contains := sinkChannels[id]; !contains {
  45. t.Errorf("there should be memory node for topic")
  46. }
  47. data := make(map[string]interface{})
  48. data["temperature"] = 33.0
  49. list := make([]map[string]interface{}, 0)
  50. list = append(list, data)
  51. go func() {
  52. var buf []byte
  53. buf, err = asJsonBytes(list)
  54. if err != nil {
  55. t.Error(err)
  56. }
  57. err = snk.Collect(ctx, buf)
  58. if err != nil {
  59. t.Error(err)
  60. }
  61. }()
  62. for {
  63. select {
  64. case res := <-consumer:
  65. expected := api.NewDefaultSourceTuple(data, make(map[string]interface{}))
  66. if !reflect.DeepEqual(expected, res) {
  67. t.Errorf("result %s should be equal to %s", res, expected)
  68. }
  69. return
  70. default:
  71. }
  72. }
  73. }
  74. func asJsonBytes(m []map[string]interface{}) ([]byte, error) {
  75. return json.Marshal(m)
  76. }