kv_store_test.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. package state
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/internal/conf"
  5. "github.com/emqx/kuiper/pkg/cast"
  6. "log"
  7. "os"
  8. "path"
  9. "reflect"
  10. "sync"
  11. "testing"
  12. )
  13. func TestLifecycle(t *testing.T) {
  14. var (
  15. i = 0
  16. ruleId = "test1"
  17. checkpointIds = []int64{1, 2, 3}
  18. opIds = []string{"op1", "op2", "op3"}
  19. r = map[string]interface{}{
  20. "1": map[string]interface{}{
  21. "op1": map[string]interface{}{
  22. "op": "op1",
  23. "oi": 0,
  24. "ci": 0,
  25. },
  26. "op2": map[string]interface{}{
  27. "op": "op2",
  28. "oi": 1,
  29. "ci": 0,
  30. },
  31. "op3": map[string]interface{}{
  32. "op": "op3",
  33. "oi": 2,
  34. "ci": 0,
  35. },
  36. },
  37. "2": map[string]interface{}{
  38. "op1": map[string]interface{}{
  39. "op": "op1",
  40. "oi": 0,
  41. "ci": 1,
  42. },
  43. "op2": map[string]interface{}{
  44. "op": "op2",
  45. "oi": 1,
  46. "ci": 1,
  47. },
  48. "op3": map[string]interface{}{
  49. "op": "op3",
  50. "oi": 2,
  51. "ci": 1,
  52. },
  53. },
  54. "3": map[string]interface{}{
  55. "op1": map[string]interface{}{
  56. "op": "op1",
  57. "oi": 0,
  58. "ci": 2,
  59. },
  60. "op2": map[string]interface{}{
  61. "op": "op2",
  62. "oi": 1,
  63. "ci": 2,
  64. },
  65. "op3": map[string]interface{}{
  66. "op": "op3",
  67. "oi": 2,
  68. "ci": 2,
  69. },
  70. },
  71. }
  72. rm = map[string]interface{}{
  73. "1": map[string]interface{}{
  74. "op1": map[string]interface{}{
  75. "op": "op1",
  76. "oi": 0,
  77. "ci": 0,
  78. },
  79. "op2": map[string]interface{}{
  80. "op": "op2",
  81. "oi": 1,
  82. "ci": 0,
  83. },
  84. "op3": map[string]interface{}{
  85. "op": "op3",
  86. "oi": 2,
  87. "ci": 0,
  88. },
  89. },
  90. "2": map[string]interface{}{
  91. "op1": map[string]interface{}{
  92. "op": "op1",
  93. "oi": 0,
  94. "ci": 1,
  95. },
  96. "op2": map[string]interface{}{
  97. "op": "op2",
  98. "oi": 1,
  99. "ci": 1,
  100. },
  101. "op3": map[string]interface{}{
  102. "op": "op3",
  103. "oi": 2,
  104. "ci": 1,
  105. },
  106. },
  107. "3": map[string]interface{}{
  108. "op1": map[string]interface{}{
  109. "op": "op1",
  110. "oi": 0,
  111. "ci": 2,
  112. },
  113. "op2": map[string]interface{}{
  114. "op": "op2",
  115. "oi": 1,
  116. "ci": 2,
  117. },
  118. "op3": map[string]interface{}{
  119. "op": "op3",
  120. "oi": 2,
  121. "ci": 2,
  122. },
  123. },
  124. "10000": map[string]interface{}{
  125. "op2": map[string]interface{}{
  126. "op": "op2",
  127. "oi": 1,
  128. "ci": 10000,
  129. },
  130. "op3": map[string]interface{}{
  131. "op": "op3",
  132. "oi": 2,
  133. "ci": 10000,
  134. },
  135. },
  136. }
  137. )
  138. func() {
  139. defer cleanStateData()
  140. store, err := getKVStore(ruleId)
  141. if err != nil {
  142. t.Errorf("Get store for rule %s error: %s", ruleId, err)
  143. return
  144. }
  145. //Save for all checkpoints
  146. for i, cid := range checkpointIds {
  147. for j, opId := range opIds {
  148. err := store.SaveState(cid, opId, map[string]interface{}{
  149. "op": opId,
  150. "oi": j,
  151. "ci": i,
  152. })
  153. if err != nil {
  154. t.Errorf("Save state for rule %s op %s error: %s", ruleId, opId, err)
  155. return
  156. }
  157. }
  158. err := store.SaveCheckpoint(cid)
  159. if err != nil {
  160. t.Errorf("Save checkpoint %d for rule %s error: %s", cid, ruleId, err)
  161. return
  162. }
  163. }
  164. // compare checkpoints
  165. if !reflect.DeepEqual(checkpointIds, store.checkpoints) {
  166. t.Errorf("%d.Save checkpoint\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, checkpointIds, store.checkpoints)
  167. }
  168. // compare contents
  169. result := mapStoreToMap(store.mapStore)
  170. if !reflect.DeepEqual(r, result) {
  171. t.Errorf("%d.Save checkpoint\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, r, result)
  172. }
  173. //Save additional state but not serialized in checkpoint
  174. err = store.SaveState(10000, opIds[1], map[string]interface{}{
  175. "op": opIds[1],
  176. "oi": 1,
  177. "ci": 10000,
  178. })
  179. if err != nil {
  180. t.Errorf("Save state for rule %s op %s error: %s", ruleId, opIds[1], err)
  181. return
  182. }
  183. err = store.SaveState(10000, opIds[2], map[string]interface{}{
  184. "op": opIds[2],
  185. "oi": 2,
  186. "ci": 10000,
  187. })
  188. if err != nil {
  189. t.Errorf("Save state for rule %s op %s error: %s", ruleId, opIds[2], err)
  190. return
  191. }
  192. // compare checkpoints
  193. if !reflect.DeepEqual(checkpointIds, store.checkpoints) {
  194. t.Errorf("%d.Save state\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, checkpointIds, store.checkpoints)
  195. }
  196. // compare contents
  197. result = mapStoreToMap(store.mapStore)
  198. if !reflect.DeepEqual(rm, result) {
  199. t.Errorf("%d.Save state\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, r, result)
  200. }
  201. //simulate restore
  202. store = nil
  203. store, err = getKVStore(ruleId)
  204. if err != nil {
  205. t.Errorf("Restore store for rule %s error: %s", ruleId, err)
  206. return
  207. }
  208. // compare checkpoints
  209. if !reflect.DeepEqual(checkpointIds, store.checkpoints) {
  210. t.Errorf("%d.Restore checkpoint\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, checkpointIds, store.checkpoints)
  211. return
  212. }
  213. // compare contents
  214. result = mapStoreToMap(store.mapStore)
  215. if !reflect.DeepEqual(r, result) {
  216. t.Errorf("%d.Restore checkpoint\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, r, result)
  217. return
  218. }
  219. ns, err := store.GetOpState(opIds[1])
  220. if err != nil {
  221. t.Errorf("Get op %s state for rule %s error: %s", opIds[1], ruleId, err)
  222. return
  223. }
  224. sm := r[fmt.Sprintf("%v", checkpointIds[len(checkpointIds)-1])].(map[string]interface{})[opIds[1]]
  225. nsm := cast.SyncMapToMap(ns)
  226. if !reflect.DeepEqual(sm, nsm) {
  227. t.Errorf("%d.Restore op state\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, sm, nsm)
  228. return
  229. }
  230. }()
  231. }
  232. func mapStoreToMap(sm *sync.Map) map[string]interface{} {
  233. m := make(map[string]interface{})
  234. sm.Range(func(k interface{}, v interface{}) bool {
  235. switch t := v.(type) {
  236. case *sync.Map:
  237. m[fmt.Sprintf("%v", k)] = mapStoreToMap(t)
  238. default:
  239. m[fmt.Sprintf("%v", k)] = t
  240. }
  241. return true
  242. })
  243. return m
  244. }
  245. func cleanStateData() {
  246. dbDir, err := conf.GetDataLoc()
  247. if err != nil {
  248. log.Panic(err)
  249. }
  250. c := path.Join(dbDir, CheckpointListKey)
  251. err = os.RemoveAll(c)
  252. if err != nil {
  253. conf.Log.Error(err)
  254. }
  255. }