source.go 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package runtime
  15. import (
  16. context2 "context"
  17. "fmt"
  18. "github.com/lf-edge/ekuiper/sdk/go/api"
  19. "github.com/lf-edge/ekuiper/sdk/go/connection"
  20. )
  21. // lifecycle controlled by plugin
  22. // if stop by error, inform plugin
  23. type sourceRuntime struct {
  24. s api.Source
  25. ch connection.DataOutChannel
  26. ctx api.StreamContext
  27. cancel context2.CancelFunc
  28. key string
  29. }
  30. func setupSourceRuntime(con *Control, s api.Source) (*sourceRuntime, error) {
  31. // init context with args
  32. ctx, err := parseContext(con)
  33. // TODO check cmd error handling or using health check
  34. if err != nil {
  35. return nil, err
  36. }
  37. // init config with args and call source config
  38. err = s.Configure(con.DataSource, con.Config)
  39. if err != nil {
  40. return nil, err
  41. }
  42. // connect to mq server
  43. ch, err := connection.CreateSourceChannel(ctx)
  44. if err != nil {
  45. return nil, err
  46. }
  47. ctx.GetLogger().Info("Setup message pipeline, start sending")
  48. ctx, cancel := ctx.WithCancel()
  49. return &sourceRuntime{
  50. s: s,
  51. ch: ch,
  52. ctx: ctx,
  53. cancel: cancel,
  54. key: fmt.Sprintf("%s_%s_%d_%s", con.Meta.RuleId, con.Meta.OpId, con.Meta.InstanceId, con.SymbolName),
  55. }, nil
  56. }
  57. func (s *sourceRuntime) run() {
  58. errCh := make(chan error)
  59. consumer := make(chan api.SourceTuple)
  60. go s.s.Open(s.ctx, consumer, errCh)
  61. for {
  62. select {
  63. case err := <-errCh:
  64. s.ctx.GetLogger().Errorf("%v", err)
  65. broadcast(s.ctx, s.ch, err)
  66. s.stop()
  67. case data := <-consumer:
  68. s.ctx.GetLogger().Debugf("broadcast data %v", data)
  69. broadcast(s.ctx, s.ch, data)
  70. case <-s.ctx.Done():
  71. s.s.Close(s.ctx)
  72. return
  73. }
  74. }
  75. }
  76. func (s *sourceRuntime) stop() error {
  77. s.cancel()
  78. err := s.ch.Close()
  79. if err != nil {
  80. s.ctx.GetLogger().Info(err)
  81. }
  82. s.ctx.GetLogger().Info("closed source data channel")
  83. reg.Delete(s.key)
  84. return nil
  85. }
  86. func (s *sourceRuntime) isRunning() bool {
  87. return s.ctx.Err() == nil
  88. }