binder.go 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package io
  15. import (
  16. "github.com/lf-edge/ekuiper/internal/binder"
  17. "github.com/lf-edge/ekuiper/pkg/api"
  18. "github.com/lf-edge/ekuiper/pkg/errorx"
  19. )
  20. var ( // init once and read only
  21. sourceFactories []binder.SourceFactory
  22. sourceFactoriesNames []string
  23. sinkFactories []binder.SinkFactory
  24. sinkFactoriesNames []string
  25. )
  26. func init() {
  27. f := binder.FactoryEntry{
  28. Name: "built-in",
  29. Factory: GetManager(),
  30. }
  31. applyFactory(f)
  32. }
  33. func Initialize(factories []binder.FactoryEntry) error {
  34. for _, f := range factories {
  35. applyFactory(f)
  36. }
  37. return nil
  38. }
  39. func applyFactory(f binder.FactoryEntry) {
  40. if s, ok := f.Factory.(binder.SourceFactory); ok {
  41. sourceFactories = append(sourceFactories, s)
  42. sourceFactoriesNames = append(sourceFactoriesNames, f.Name)
  43. }
  44. if s, ok := f.Factory.(binder.SinkFactory); ok {
  45. sinkFactories = append(sinkFactories, s)
  46. sinkFactoriesNames = append(sinkFactoriesNames, f.Name)
  47. }
  48. }
  49. func Source(name string) (api.Source, error) {
  50. e := make(errorx.MultiError)
  51. for i, sf := range sourceFactories {
  52. r, err := sf.Source(name)
  53. if err != nil {
  54. e[sourceFactoriesNames[i]] = err
  55. }
  56. if r != nil {
  57. return r, e.GetError()
  58. }
  59. }
  60. return nil, e.GetError()
  61. }
  62. func Sink(name string) (api.Sink, error) {
  63. e := make(errorx.MultiError)
  64. for i, sf := range sinkFactories {
  65. r, err := sf.Sink(name)
  66. if err != nil {
  67. e[sinkFactoriesNames[i]] = err
  68. }
  69. if r != nil {
  70. return r, e.GetError()
  71. }
  72. }
  73. return nil, e.GetError()
  74. }