binder.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package io
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/internal/binder"
  18. "github.com/lf-edge/ekuiper/pkg/api"
  19. "github.com/lf-edge/ekuiper/pkg/errorx"
  20. )
  21. var ( // init once and read only
  22. sourceFactories []binder.SourceFactory
  23. sourceFactoriesNames []string
  24. sinkFactories []binder.SinkFactory
  25. sinkFactoriesNames []string
  26. )
  27. func init() {
  28. f := binder.FactoryEntry{
  29. Name: "built-in",
  30. Factory: GetManager(),
  31. }
  32. applyFactory(f)
  33. }
  34. func Initialize(factories []binder.FactoryEntry) error {
  35. for _, f := range factories {
  36. applyFactory(f)
  37. }
  38. return nil
  39. }
  40. func applyFactory(f binder.FactoryEntry) {
  41. if s, ok := f.Factory.(binder.SourceFactory); ok {
  42. sourceFactories = append(sourceFactories, s)
  43. sourceFactoriesNames = append(sourceFactoriesNames, f.Name)
  44. }
  45. if s, ok := f.Factory.(binder.SinkFactory); ok {
  46. sinkFactories = append(sinkFactories, s)
  47. sinkFactoriesNames = append(sinkFactoriesNames, f.Name)
  48. }
  49. }
  50. func Source(name string) (api.Source, error) {
  51. e := make(errorx.MultiError)
  52. for i, sf := range sourceFactories {
  53. r, err := sf.Source(name)
  54. if err != nil {
  55. e[sourceFactoriesNames[i]] = err
  56. }
  57. if r != nil {
  58. return r, e.GetError()
  59. }
  60. }
  61. return nil, e.GetError()
  62. }
  63. func Sink(name string) (api.Sink, error) {
  64. e := make(errorx.MultiError)
  65. for i, sf := range sinkFactories {
  66. r, err := sf.Sink(name)
  67. if err != nil {
  68. e[sinkFactoriesNames[i]] = err
  69. }
  70. if r != nil {
  71. return r, e.GetError()
  72. }
  73. }
  74. return nil, e.GetError()
  75. }
  76. func LookupSource(name string) (api.LookupSource, error) {
  77. e := make(errorx.MultiError)
  78. for i, sf := range sourceFactories {
  79. r, err := sf.LookupSource(name)
  80. if err != nil {
  81. e[sourceFactoriesNames[i]] = err
  82. }
  83. if r != nil {
  84. return r, e.GetError()
  85. }
  86. }
  87. err := e.GetError()
  88. if err == nil {
  89. err = fmt.Errorf("lookup source type %s not found", name)
  90. }
  91. return nil, err
  92. }