kv_store_test.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package state
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/internal/conf"
  18. "github.com/lf-edge/ekuiper/pkg/cast"
  19. "log"
  20. "os"
  21. "path"
  22. "reflect"
  23. "sync"
  24. "testing"
  25. )
  26. func TestLifecycle(t *testing.T) {
  27. var (
  28. i = 0
  29. ruleId = "test1"
  30. checkpointIds = []int64{1, 2, 3}
  31. opIds = []string{"op1", "op2", "op3"}
  32. r = map[string]interface{}{
  33. "1": map[string]interface{}{
  34. "op1": map[string]interface{}{
  35. "op": "op1",
  36. "oi": 0,
  37. "ci": 0,
  38. },
  39. "op2": map[string]interface{}{
  40. "op": "op2",
  41. "oi": 1,
  42. "ci": 0,
  43. },
  44. "op3": map[string]interface{}{
  45. "op": "op3",
  46. "oi": 2,
  47. "ci": 0,
  48. },
  49. },
  50. "2": map[string]interface{}{
  51. "op1": map[string]interface{}{
  52. "op": "op1",
  53. "oi": 0,
  54. "ci": 1,
  55. },
  56. "op2": map[string]interface{}{
  57. "op": "op2",
  58. "oi": 1,
  59. "ci": 1,
  60. },
  61. "op3": map[string]interface{}{
  62. "op": "op3",
  63. "oi": 2,
  64. "ci": 1,
  65. },
  66. },
  67. "3": map[string]interface{}{
  68. "op1": map[string]interface{}{
  69. "op": "op1",
  70. "oi": 0,
  71. "ci": 2,
  72. },
  73. "op2": map[string]interface{}{
  74. "op": "op2",
  75. "oi": 1,
  76. "ci": 2,
  77. },
  78. "op3": map[string]interface{}{
  79. "op": "op3",
  80. "oi": 2,
  81. "ci": 2,
  82. },
  83. },
  84. }
  85. rm = map[string]interface{}{
  86. "1": map[string]interface{}{
  87. "op1": map[string]interface{}{
  88. "op": "op1",
  89. "oi": 0,
  90. "ci": 0,
  91. },
  92. "op2": map[string]interface{}{
  93. "op": "op2",
  94. "oi": 1,
  95. "ci": 0,
  96. },
  97. "op3": map[string]interface{}{
  98. "op": "op3",
  99. "oi": 2,
  100. "ci": 0,
  101. },
  102. },
  103. "2": map[string]interface{}{
  104. "op1": map[string]interface{}{
  105. "op": "op1",
  106. "oi": 0,
  107. "ci": 1,
  108. },
  109. "op2": map[string]interface{}{
  110. "op": "op2",
  111. "oi": 1,
  112. "ci": 1,
  113. },
  114. "op3": map[string]interface{}{
  115. "op": "op3",
  116. "oi": 2,
  117. "ci": 1,
  118. },
  119. },
  120. "3": map[string]interface{}{
  121. "op1": map[string]interface{}{
  122. "op": "op1",
  123. "oi": 0,
  124. "ci": 2,
  125. },
  126. "op2": map[string]interface{}{
  127. "op": "op2",
  128. "oi": 1,
  129. "ci": 2,
  130. },
  131. "op3": map[string]interface{}{
  132. "op": "op3",
  133. "oi": 2,
  134. "ci": 2,
  135. },
  136. },
  137. "10000": map[string]interface{}{
  138. "op2": map[string]interface{}{
  139. "op": "op2",
  140. "oi": 1,
  141. "ci": 10000,
  142. },
  143. "op3": map[string]interface{}{
  144. "op": "op3",
  145. "oi": 2,
  146. "ci": 10000,
  147. },
  148. },
  149. }
  150. )
  151. func() {
  152. cleanStateData()
  153. store, err := getKVStore(ruleId)
  154. if err != nil {
  155. t.Errorf("Get store for rule %s error: %s", ruleId, err)
  156. return
  157. }
  158. //Save for all checkpoints
  159. for i, cid := range checkpointIds {
  160. for j, opId := range opIds {
  161. err := store.SaveState(cid, opId, map[string]interface{}{
  162. "op": opId,
  163. "oi": j,
  164. "ci": i,
  165. })
  166. if err != nil {
  167. t.Errorf("Save state for rule %s op %s error: %s", ruleId, opId, err)
  168. return
  169. }
  170. }
  171. err := store.SaveCheckpoint(cid)
  172. if err != nil {
  173. t.Errorf("Save checkpoint %d for rule %s error: %s", cid, ruleId, err)
  174. return
  175. }
  176. }
  177. // compare checkpoints
  178. if !reflect.DeepEqual(checkpointIds, store.checkpoints) {
  179. t.Errorf("%d.Save checkpoint\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, checkpointIds, store.checkpoints)
  180. }
  181. // compare contents
  182. result := mapStoreToMap(store.mapStore)
  183. if !reflect.DeepEqual(r, result) {
  184. t.Errorf("%d.Save checkpoint\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, r, result)
  185. }
  186. //Save additional state but not serialized in checkpoint
  187. err = store.SaveState(10000, opIds[1], map[string]interface{}{
  188. "op": opIds[1],
  189. "oi": 1,
  190. "ci": 10000,
  191. })
  192. if err != nil {
  193. t.Errorf("Save state for rule %s op %s error: %s", ruleId, opIds[1], err)
  194. return
  195. }
  196. err = store.SaveState(10000, opIds[2], map[string]interface{}{
  197. "op": opIds[2],
  198. "oi": 2,
  199. "ci": 10000,
  200. })
  201. if err != nil {
  202. t.Errorf("Save state for rule %s op %s error: %s", ruleId, opIds[2], err)
  203. return
  204. }
  205. // compare checkpoints
  206. if !reflect.DeepEqual(checkpointIds, store.checkpoints) {
  207. t.Errorf("%d.Save state\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, checkpointIds, store.checkpoints)
  208. }
  209. // compare contents
  210. result = mapStoreToMap(store.mapStore)
  211. if !reflect.DeepEqual(rm, result) {
  212. t.Errorf("%d.Save state\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, r, result)
  213. }
  214. //simulate restore
  215. store = nil
  216. store, err = getKVStore(ruleId)
  217. if err != nil {
  218. t.Errorf("Restore store for rule %s error: %s", ruleId, err)
  219. return
  220. }
  221. // compare checkpoints
  222. if !reflect.DeepEqual(checkpointIds[2:], store.checkpoints) {
  223. t.Errorf("%d.Restore checkpoint\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, checkpointIds, store.checkpoints)
  224. return
  225. }
  226. // compare contents
  227. result = mapStoreToMap(store.mapStore)
  228. last := map[string]interface{}{
  229. "3": r["3"],
  230. }
  231. if !reflect.DeepEqual(last, result) {
  232. t.Errorf("%d.Restore checkpoint\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, last, result)
  233. return
  234. }
  235. ns, err := store.GetOpState(opIds[1])
  236. if err != nil {
  237. t.Errorf("Get op %s state for rule %s error: %s", opIds[1], ruleId, err)
  238. return
  239. }
  240. sm := r[fmt.Sprintf("%v", checkpointIds[len(checkpointIds)-1])].(map[string]interface{})[opIds[1]]
  241. nsm := cast.SyncMapToMap(ns)
  242. if !reflect.DeepEqual(sm, nsm) {
  243. t.Errorf("%d.Restore op state\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, sm, nsm)
  244. return
  245. }
  246. }()
  247. }
  248. func mapStoreToMap(sm *sync.Map) map[string]interface{} {
  249. m := make(map[string]interface{})
  250. sm.Range(func(k interface{}, v interface{}) bool {
  251. switch t := v.(type) {
  252. case *sync.Map:
  253. m[fmt.Sprintf("%v", k)] = mapStoreToMap(t)
  254. default:
  255. m[fmt.Sprintf("%v", k)] = t
  256. }
  257. return true
  258. })
  259. return m
  260. }
  261. func cleanStateData() {
  262. dbDir, err := conf.GetDataLoc()
  263. if err != nil {
  264. log.Panic(err)
  265. }
  266. c := path.Join(dbDir)
  267. err = os.RemoveAll(c)
  268. if err != nil {
  269. conf.Log.Error(err)
  270. }
  271. }