db.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package store
  15. import (
  16. "context"
  17. "fmt"
  18. "sync"
  19. "github.com/lf-edge/ekuiper/internal/conf"
  20. "github.com/lf-edge/ekuiper/pkg/api"
  21. )
  22. type tableCount struct {
  23. sync.RWMutex
  24. count int
  25. t *Table
  26. }
  27. func (tc *tableCount) Increase() int {
  28. tc.Lock()
  29. defer tc.Unlock()
  30. tc.count++
  31. return tc.count
  32. }
  33. func (tc *tableCount) Decrease() int {
  34. tc.Lock()
  35. defer tc.Unlock()
  36. tc.count--
  37. if tc.count < 0 {
  38. conf.Log.Errorf("Table count is less than 0: %d", tc.count)
  39. }
  40. return tc.count
  41. }
  42. type database struct {
  43. sync.RWMutex
  44. tables map[string]*tableCount // topic_key: table
  45. }
  46. // getTable return the table of the topic.
  47. func (db *database) getTable(topic string, key string) (*Table, bool) {
  48. db.RLock()
  49. defer db.RUnlock()
  50. tableId := fmt.Sprintf("%s_%s", topic, key)
  51. tc, ok := db.tables[tableId]
  52. if ok {
  53. return tc.t, true
  54. } else {
  55. return nil, false
  56. }
  57. }
  58. // addTable add a table to the database
  59. // If the table already exists, return the existing table;
  60. // otherwise, create a new table and return it.
  61. // The second argument is to indicate if the table is newly created
  62. func (db *database) addTable(topic string, key string) (*Table, bool) {
  63. db.Lock()
  64. defer db.Unlock()
  65. tableId := fmt.Sprintf("%s_%s", topic, key)
  66. tc, ok := db.tables[tableId]
  67. if ok {
  68. tc.Increase()
  69. } else {
  70. t := createTable(topic, key)
  71. tc = &tableCount{
  72. count: 1,
  73. t: t,
  74. }
  75. db.tables[tableId] = tc
  76. }
  77. return tc.t, !ok
  78. }
  79. // dropTable drop the table of the topic/values
  80. // stops to accumulate job
  81. // deletes the cache data
  82. func (db *database) dropTable(topic string, key string) error {
  83. tableId := fmt.Sprintf("%s_%s", topic, key)
  84. db.Lock()
  85. defer db.Unlock()
  86. if tc, ok := db.tables[tableId]; ok {
  87. if tc.Decrease() == 0 {
  88. if tc.t != nil && tc.t.cancel != nil {
  89. tc.t.cancel()
  90. }
  91. delete(db.tables, tableId)
  92. }
  93. return nil
  94. }
  95. return fmt.Errorf("Table %s not found", tableId)
  96. }
  97. // Table has one writer and multiple reader
  98. type Table struct {
  99. sync.RWMutex
  100. topic string
  101. key string
  102. // datamap is the overall data indexed by primary key
  103. datamap map[interface{}]api.SourceTuple
  104. cancel context.CancelFunc
  105. }
  106. func createTable(topic string, key string) *Table {
  107. t := &Table{topic: topic, key: key, datamap: make(map[interface{}]api.SourceTuple)}
  108. return t
  109. }
  110. func (t *Table) add(value api.SourceTuple) {
  111. t.Lock()
  112. defer t.Unlock()
  113. keyval, ok := value.Message()[t.key]
  114. if !ok {
  115. conf.Log.Errorf("add to table %s omitted, value not found for key %s", t.topic, t.key)
  116. }
  117. t.datamap[keyval] = value
  118. }
  119. func (t *Table) delete(key interface{}) {
  120. t.Lock()
  121. defer t.Unlock()
  122. delete(t.datamap, key)
  123. }
  124. func (t *Table) Read(keys []string, values []interface{}) ([]api.SourceTuple, error) {
  125. t.RLock()
  126. defer t.RUnlock()
  127. // Find the primary key
  128. var matched api.SourceTuple
  129. for i, k := range keys {
  130. if k == t.key {
  131. matched = t.datamap[values[i]]
  132. }
  133. }
  134. if matched != nil {
  135. match := true
  136. for i, k := range keys {
  137. if val, ok := matched.Message()[k]; !ok || val != values[i] {
  138. match = false
  139. break
  140. }
  141. }
  142. if match {
  143. return []api.SourceTuple{matched}, nil
  144. } else {
  145. return nil, nil
  146. }
  147. }
  148. var result []api.SourceTuple
  149. for _, v := range t.datamap {
  150. match := true
  151. for i, k := range keys {
  152. if val, ok := v.Message()[k]; !ok || val != values[i] {
  153. match = false
  154. break
  155. }
  156. }
  157. if match {
  158. result = append(result, v)
  159. }
  160. }
  161. return result, nil
  162. }
  163. var db = &database{
  164. tables: make(map[string]*tableCount),
  165. }