table.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package lookup
  15. import (
  16. "fmt"
  17. "sync"
  18. "sync/atomic"
  19. "github.com/lf-edge/ekuiper/internal/binder/io"
  20. "github.com/lf-edge/ekuiper/internal/conf"
  21. kctx "github.com/lf-edge/ekuiper/internal/topo/context"
  22. nodeConf "github.com/lf-edge/ekuiper/internal/topo/node/conf"
  23. "github.com/lf-edge/ekuiper/pkg/api"
  24. "github.com/lf-edge/ekuiper/pkg/ast"
  25. )
  26. // Table is a lookup table runtime instance. It will run once the table is created.
  27. // It will only stop once the table is dropped.
  28. type info struct {
  29. ls api.LookupSource
  30. count int32
  31. }
  32. var (
  33. instances = make(map[string]*info)
  34. lock = &sync.Mutex{}
  35. )
  36. // Attach called by lookup nodes. Add a count to the info
  37. func Attach(name string) (api.LookupSource, error) {
  38. lock.Lock()
  39. defer lock.Unlock()
  40. if i, ok := instances[name]; ok {
  41. atomic.AddInt32(&i.count, 1)
  42. return i.ls, nil
  43. }
  44. return nil, fmt.Errorf("lookup table %s is not found", name)
  45. }
  46. // Detach called by lookup nodes when it is closed
  47. func Detach(name string) error {
  48. lock.Lock()
  49. defer lock.Unlock()
  50. if i, ok := instances[name]; ok {
  51. atomic.AddInt32(&i.count, -1)
  52. return nil
  53. }
  54. return fmt.Errorf("lookup table %s is not found", name)
  55. }
  56. // CreateInstance called when create a lookup table
  57. func CreateInstance(name string, sourceType string, options *ast.Options) error {
  58. lock.Lock()
  59. defer lock.Unlock()
  60. contextLogger := conf.Log.WithField("table", name)
  61. ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
  62. props := nodeConf.GetSourceConf(sourceType, options)
  63. ctx.GetLogger().Infof("open lookup table with props %v", conf.Printable(props))
  64. // Create the lookup source according to the source options
  65. ns, err := io.LookupSource(sourceType)
  66. if err != nil {
  67. ctx.GetLogger().Error(err)
  68. return err
  69. }
  70. ctx.GetLogger().Debugf("lookup source %s is created", sourceType)
  71. err = ns.Configure(options.DATASOURCE, props)
  72. if err != nil {
  73. return err
  74. }
  75. ctx.GetLogger().Debugf("lookup source %s is configured", sourceType)
  76. err = ns.Open(ctx)
  77. if err != nil {
  78. return err
  79. }
  80. ctx.GetLogger().Debugf("lookup source %s is opened", sourceType)
  81. instances[name] = &info{ls: ns, count: 0}
  82. return nil
  83. }
  84. // DropInstance called when drop a lookup table
  85. func DropInstance(name string) error {
  86. lock.Lock()
  87. defer lock.Unlock()
  88. if i, ok := instances[name]; ok {
  89. if atomic.LoadInt32(&i.count) > 0 {
  90. return fmt.Errorf("lookup table %s is still in use, stop all using rules before dropping it", name)
  91. }
  92. delete(instances, name)
  93. return nil
  94. } else {
  95. return nil
  96. }
  97. }