binder.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package io
  15. import (
  16. "errors"
  17. "fmt"
  18. "github.com/lf-edge/ekuiper/internal/binder"
  19. "github.com/lf-edge/ekuiper/internal/plugin"
  20. "github.com/lf-edge/ekuiper/pkg/api"
  21. )
  22. var ( // init once and read only
  23. sourceFactories []binder.SourceFactory
  24. sourceFactoriesNames []string
  25. sinkFactories []binder.SinkFactory
  26. sinkFactoriesNames []string
  27. )
  28. func init() {
  29. f := binder.FactoryEntry{
  30. Name: "built-in",
  31. Factory: GetManager(),
  32. }
  33. applyFactory(f)
  34. }
  35. func Initialize(factories []binder.FactoryEntry) error {
  36. for _, f := range factories {
  37. applyFactory(f)
  38. }
  39. return nil
  40. }
  41. func applyFactory(f binder.FactoryEntry) {
  42. if s, ok := f.Factory.(binder.SourceFactory); ok {
  43. sourceFactories = append(sourceFactories, s)
  44. sourceFactoriesNames = append(sourceFactoriesNames, f.Name)
  45. }
  46. if s, ok := f.Factory.(binder.SinkFactory); ok {
  47. sinkFactories = append(sinkFactories, s)
  48. sinkFactoriesNames = append(sinkFactoriesNames, f.Name)
  49. }
  50. }
  51. func Source(name string) (api.Source, error) {
  52. var errs error
  53. for i, sf := range sourceFactories {
  54. r, err := sf.Source(name)
  55. if err != nil {
  56. errs = errors.Join(errs, fmt.Errorf("%s:%v", sourceFactoriesNames[i], err))
  57. }
  58. if r != nil {
  59. return r, errs
  60. }
  61. }
  62. return nil, errs
  63. }
  64. func GetSourcePlugin(name string) (plugin.EXTENSION_TYPE, string, string) {
  65. for _, sf := range sourceFactories {
  66. t, s1, s2 := sf.SourcePluginInfo(name)
  67. if t == plugin.NONE_EXTENSION {
  68. continue
  69. }
  70. return t, s1, s2
  71. }
  72. return plugin.NONE_EXTENSION, "", ""
  73. }
  74. func Sink(name string) (api.Sink, error) {
  75. var errs error
  76. for i, sf := range sinkFactories {
  77. r, err := sf.Sink(name)
  78. if err != nil {
  79. errs = errors.Join(errs, fmt.Errorf("%s:%v", sinkFactoriesNames[i], err))
  80. }
  81. if r != nil {
  82. return r, errs
  83. }
  84. }
  85. return nil, errs
  86. }
  87. func GetSinkPlugin(name string) (plugin.EXTENSION_TYPE, string, string) {
  88. for _, sf := range sinkFactories {
  89. t, s1, s2 := sf.SinkPluginInfo(name)
  90. if t == plugin.NONE_EXTENSION {
  91. continue
  92. }
  93. return t, s1, s2
  94. }
  95. return plugin.NONE_EXTENSION, "", ""
  96. }
  97. func LookupSource(name string) (api.LookupSource, error) {
  98. var errs error
  99. for i, sf := range sourceFactories {
  100. r, err := sf.LookupSource(name)
  101. if err != nil {
  102. errs = errors.Join(errs, fmt.Errorf("%s:%v", sourceFactoriesNames[i], err))
  103. }
  104. if r != nil {
  105. return r, errs
  106. }
  107. }
  108. if errs == nil {
  109. errs = fmt.Errorf("lookup source type %s not found", name)
  110. }
  111. return nil, errs
  112. }