kv_store_test.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. package states
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/common"
  5. "log"
  6. "os"
  7. "path"
  8. "reflect"
  9. "sync"
  10. "testing"
  11. )
  12. func TestLifecycle(t *testing.T) {
  13. var (
  14. i = 0
  15. ruleId = "test1"
  16. checkpointIds = []int64{1, 2, 3}
  17. opIds = []string{"op1", "op2", "op3"}
  18. r = map[string]interface{}{
  19. "1": map[string]interface{}{
  20. "op1": map[string]interface{}{
  21. "op": "op1",
  22. "oi": 0,
  23. "ci": 0,
  24. },
  25. "op2": map[string]interface{}{
  26. "op": "op2",
  27. "oi": 1,
  28. "ci": 0,
  29. },
  30. "op3": map[string]interface{}{
  31. "op": "op3",
  32. "oi": 2,
  33. "ci": 0,
  34. },
  35. },
  36. "2": map[string]interface{}{
  37. "op1": map[string]interface{}{
  38. "op": "op1",
  39. "oi": 0,
  40. "ci": 1,
  41. },
  42. "op2": map[string]interface{}{
  43. "op": "op2",
  44. "oi": 1,
  45. "ci": 1,
  46. },
  47. "op3": map[string]interface{}{
  48. "op": "op3",
  49. "oi": 2,
  50. "ci": 1,
  51. },
  52. },
  53. "3": map[string]interface{}{
  54. "op1": map[string]interface{}{
  55. "op": "op1",
  56. "oi": 0,
  57. "ci": 2,
  58. },
  59. "op2": map[string]interface{}{
  60. "op": "op2",
  61. "oi": 1,
  62. "ci": 2,
  63. },
  64. "op3": map[string]interface{}{
  65. "op": "op3",
  66. "oi": 2,
  67. "ci": 2,
  68. },
  69. },
  70. }
  71. rm = map[string]interface{}{
  72. "1": map[string]interface{}{
  73. "op1": map[string]interface{}{
  74. "op": "op1",
  75. "oi": 0,
  76. "ci": 0,
  77. },
  78. "op2": map[string]interface{}{
  79. "op": "op2",
  80. "oi": 1,
  81. "ci": 0,
  82. },
  83. "op3": map[string]interface{}{
  84. "op": "op3",
  85. "oi": 2,
  86. "ci": 0,
  87. },
  88. },
  89. "2": map[string]interface{}{
  90. "op1": map[string]interface{}{
  91. "op": "op1",
  92. "oi": 0,
  93. "ci": 1,
  94. },
  95. "op2": map[string]interface{}{
  96. "op": "op2",
  97. "oi": 1,
  98. "ci": 1,
  99. },
  100. "op3": map[string]interface{}{
  101. "op": "op3",
  102. "oi": 2,
  103. "ci": 1,
  104. },
  105. },
  106. "3": map[string]interface{}{
  107. "op1": map[string]interface{}{
  108. "op": "op1",
  109. "oi": 0,
  110. "ci": 2,
  111. },
  112. "op2": map[string]interface{}{
  113. "op": "op2",
  114. "oi": 1,
  115. "ci": 2,
  116. },
  117. "op3": map[string]interface{}{
  118. "op": "op3",
  119. "oi": 2,
  120. "ci": 2,
  121. },
  122. },
  123. "10000": map[string]interface{}{
  124. "op2": map[string]interface{}{
  125. "op": "op2",
  126. "oi": 1,
  127. "ci": 10000,
  128. },
  129. "op3": map[string]interface{}{
  130. "op": "op3",
  131. "oi": 2,
  132. "ci": 10000,
  133. },
  134. },
  135. }
  136. )
  137. func() {
  138. defer cleanStateData()
  139. store, err := getKVStore(ruleId)
  140. if err != nil {
  141. t.Errorf("Get store for rule %s error: %s", ruleId, err)
  142. return
  143. }
  144. //Save for all checkpoints
  145. for i, cid := range checkpointIds {
  146. for j, opId := range opIds {
  147. err := store.SaveState(cid, opId, map[string]interface{}{
  148. "op": opId,
  149. "oi": j,
  150. "ci": i,
  151. })
  152. if err != nil {
  153. t.Errorf("Save state for rule %s op %s error: %s", ruleId, opId, err)
  154. return
  155. }
  156. }
  157. err := store.SaveCheckpoint(cid)
  158. if err != nil {
  159. t.Errorf("Save checkpoint %d for rule %s error: %s", cid, ruleId, err)
  160. return
  161. }
  162. }
  163. // compare checkpoints
  164. if !reflect.DeepEqual(checkpointIds, store.checkpoints) {
  165. t.Errorf("%d.Save checkpoint\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, checkpointIds, store.checkpoints)
  166. }
  167. // compare contents
  168. result := mapStoreToMap(store.mapStore)
  169. if !reflect.DeepEqual(r, result) {
  170. t.Errorf("%d.Save checkpoint\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, r, result)
  171. }
  172. //Save additional state but not serialized in checkpoint
  173. err = store.SaveState(10000, opIds[1], map[string]interface{}{
  174. "op": opIds[1],
  175. "oi": 1,
  176. "ci": 10000,
  177. })
  178. if err != nil {
  179. t.Errorf("Save state for rule %s op %s error: %s", ruleId, opIds[1], err)
  180. return
  181. }
  182. err = store.SaveState(10000, opIds[2], map[string]interface{}{
  183. "op": opIds[2],
  184. "oi": 2,
  185. "ci": 10000,
  186. })
  187. if err != nil {
  188. t.Errorf("Save state for rule %s op %s error: %s", ruleId, opIds[2], err)
  189. return
  190. }
  191. // compare checkpoints
  192. if !reflect.DeepEqual(checkpointIds, store.checkpoints) {
  193. t.Errorf("%d.Save state\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, checkpointIds, store.checkpoints)
  194. }
  195. // compare contents
  196. result = mapStoreToMap(store.mapStore)
  197. if !reflect.DeepEqual(rm, result) {
  198. t.Errorf("%d.Save state\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, r, result)
  199. }
  200. //simulate restore
  201. store = nil
  202. store, err = getKVStore(ruleId)
  203. if err != nil {
  204. t.Errorf("Restore store for rule %s error: %s", ruleId, err)
  205. return
  206. }
  207. // compare checkpoints
  208. if !reflect.DeepEqual(checkpointIds, store.checkpoints) {
  209. t.Errorf("%d.Restore checkpoint\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, checkpointIds, store.checkpoints)
  210. return
  211. }
  212. // compare contents
  213. result = mapStoreToMap(store.mapStore)
  214. if !reflect.DeepEqual(r, result) {
  215. t.Errorf("%d.Restore checkpoint\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, r, result)
  216. return
  217. }
  218. ns, err := store.GetOpState(opIds[1])
  219. if err != nil {
  220. t.Errorf("Get op %s state for rule %s error: %s", opIds[1], ruleId, err)
  221. return
  222. }
  223. sm := r[fmt.Sprintf("%v", checkpointIds[len(checkpointIds)-1])].(map[string]interface{})[opIds[1]]
  224. nsm := common.SyncMapToMap(ns)
  225. if !reflect.DeepEqual(sm, nsm) {
  226. t.Errorf("%d.Restore op state\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, sm, nsm)
  227. return
  228. }
  229. }()
  230. }
  231. func mapStoreToMap(sm *sync.Map) map[string]interface{} {
  232. m := make(map[string]interface{})
  233. sm.Range(func(k interface{}, v interface{}) bool {
  234. switch t := v.(type) {
  235. case *sync.Map:
  236. m[fmt.Sprintf("%v", k)] = mapStoreToMap(t)
  237. default:
  238. m[fmt.Sprintf("%v", k)] = t
  239. }
  240. return true
  241. })
  242. return m
  243. }
  244. func cleanStateData() {
  245. dbDir, err := common.GetDataLoc()
  246. if err != nil {
  247. log.Panic(err)
  248. }
  249. c := path.Join(dbDir, "checkpoints")
  250. err = os.RemoveAll(c)
  251. if err != nil {
  252. common.Log.Error(err)
  253. }
  254. }