source.go 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package memory
  15. import (
  16. "fmt"
  17. "regexp"
  18. "strings"
  19. "github.com/lf-edge/ekuiper/internal/io/memory/pubsub"
  20. "github.com/lf-edge/ekuiper/pkg/api"
  21. "github.com/lf-edge/ekuiper/pkg/cast"
  22. )
  23. type source struct {
  24. topic string
  25. topicRegex *regexp.Regexp
  26. bufferLength int
  27. }
  28. func (s *source) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, _ chan<- error) {
  29. ch := pubsub.CreateSub(s.topic, s.topicRegex, fmt.Sprintf("%s_%s_%d", ctx.GetRuleId(), ctx.GetOpId(), ctx.GetInstanceId()), s.bufferLength)
  30. for {
  31. select {
  32. case v, opened := <-ch:
  33. if !opened {
  34. return
  35. }
  36. consumer <- v
  37. case <-ctx.Done():
  38. return
  39. }
  40. }
  41. }
  42. func (s *source) Configure(datasource string, props map[string]interface{}) error {
  43. s.topic = datasource
  44. s.bufferLength = 1024
  45. if c, ok := props["bufferLength"]; ok {
  46. if bl, err := cast.ToInt(c, cast.STRICT); err != nil || bl > 0 {
  47. s.bufferLength = bl
  48. }
  49. }
  50. if strings.ContainsAny(datasource, "+#") {
  51. r, err := getRegexp(datasource)
  52. if err != nil {
  53. return err
  54. }
  55. s.topicRegex = r
  56. }
  57. return nil
  58. }
  59. func getRegexp(topic string) (*regexp.Regexp, error) {
  60. if len(topic) == 0 {
  61. return nil, fmt.Errorf("invalid empty topic")
  62. }
  63. levels := strings.Split(topic, "/")
  64. for i, level := range levels {
  65. if level == "#" && i != len(levels)-1 {
  66. return nil, fmt.Errorf("invalid topic %s: # must at the last level", topic)
  67. }
  68. }
  69. regstr := strings.Replace(strings.ReplaceAll(topic, "+", "([^/]+)"), "#", ".", 1)
  70. return regexp.Compile(regstr)
  71. }
  72. func (s *source) Close(ctx api.StreamContext) error {
  73. ctx.GetLogger().Debugf("closing memory source")
  74. pubsub.CloseSourceConsumerChannel(s.topic, fmt.Sprintf("%s_%s_%d", ctx.GetRuleId(), ctx.GetOpId(), ctx.GetInstanceId()))
  75. return nil
  76. }