store.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package store
  15. import (
  16. "context"
  17. "fmt"
  18. "github.com/lf-edge/ekuiper/internal/conf"
  19. "github.com/lf-edge/ekuiper/internal/topo/memory/pubsub"
  20. "github.com/lf-edge/ekuiper/pkg/ast"
  21. "regexp"
  22. )
  23. // Reg registers a topic to save it to memory store
  24. // Create a new go routine to listen to the topic and save the data to memory
  25. func Reg(topic string, topicRegex *regexp.Regexp, key string) (*Table, error) {
  26. t, isNew := db.addTable(topic, key)
  27. if isNew {
  28. go runTable(topic, topicRegex, t)
  29. }
  30. return t, nil
  31. }
  32. // runTable should only run in a single instance.
  33. // This go routine is used to accumulate data in memory
  34. // If the go routine close, the go routine exits but the data will be kept until table dropped
  35. func runTable(topic string, topicRegex *regexp.Regexp, t *Table) {
  36. conf.Log.Infof("runTable %s", topic)
  37. ch := pubsub.CreateSub(topic, topicRegex, fmt.Sprintf("store_%s", topic), 1024)
  38. ctx, cancel := context.WithCancel(context.Background())
  39. t.cancel = cancel
  40. for {
  41. select {
  42. case v, opened := <-ch:
  43. if !opened { // exit go routine is not sync with drop table
  44. return
  45. }
  46. switch vv := v.(type) {
  47. case *pubsub.UpdatableTuple:
  48. switch vv.Rowkind {
  49. case ast.RowkindInsert, ast.RowkindUpdate, ast.RowkindUpsert:
  50. t.add(vv.DefaultSourceTuple)
  51. case ast.RowkindDelete:
  52. t.delete(vv.Keyval)
  53. }
  54. default:
  55. t.add(v)
  56. }
  57. conf.Log.Debugf("receive data %v for %s", v, topic)
  58. case <-ctx.Done():
  59. return
  60. }
  61. }
  62. }
  63. // Unreg unregisters a topic to remove it from memory store
  64. func Unreg(topic string, key string) error {
  65. // Must be an atomic operation
  66. return db.dropTable(topic, key)
  67. }