default_test.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package context
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/internal/conf"
  18. "github.com/lf-edge/ekuiper/internal/pkg/store"
  19. "github.com/lf-edge/ekuiper/internal/topo/state"
  20. "github.com/lf-edge/ekuiper/pkg/api"
  21. "log"
  22. "os"
  23. "path"
  24. "reflect"
  25. "testing"
  26. )
  27. func TestState(t *testing.T) {
  28. err := store.SetupDefault()
  29. if err != nil {
  30. t.Error(err)
  31. }
  32. var (
  33. i = 0
  34. ruleId = "testStateRule"
  35. value1 = 21
  36. value2 = "hello"
  37. value3 = "world"
  38. s = map[string]interface{}{
  39. "key1": 21,
  40. "key3": "world",
  41. }
  42. )
  43. //initialization
  44. cStore, err := state.CreateStore(ruleId, api.AtLeastOnce)
  45. if err != nil {
  46. t.Errorf("Get store for rule %s error: %s", ruleId, err)
  47. return
  48. }
  49. ctx := Background().WithMeta("testStateRule", "op1", cStore).(*DefaultContext)
  50. defer cleanStateData()
  51. // Do state function
  52. _ = ctx.IncrCounter("key1", 20)
  53. _ = ctx.IncrCounter("key1", 1)
  54. v, err := ctx.GetCounter("key1")
  55. if err != nil {
  56. t.Errorf("%d.Get counter error: %s", i, err)
  57. return
  58. }
  59. if !reflect.DeepEqual(value1, v) {
  60. t.Errorf("%d.Get counter\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, value1, v)
  61. }
  62. err = ctx.PutState("key2", value2)
  63. if err != nil {
  64. t.Errorf("%d.Put state key2 error: %s", i, err)
  65. return
  66. }
  67. err = ctx.PutState("key3", value3)
  68. if err != nil {
  69. t.Errorf("%d.Put state key3 error: %s", i, err)
  70. return
  71. }
  72. v2, err := ctx.GetState("key2")
  73. if err != nil {
  74. t.Errorf("%d.Get state key2 error: %s", i, err)
  75. return
  76. }
  77. if !reflect.DeepEqual(value2, v2) {
  78. t.Errorf("%d.Get state\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, value2, v2)
  79. }
  80. err = ctx.DeleteState("key2")
  81. if err != nil {
  82. t.Errorf("%d.Delete state key2 error: %s", i, err)
  83. return
  84. }
  85. err = ctx.Snapshot()
  86. if err != nil {
  87. t.Errorf("%d.Snapshot error: %s", i, err)
  88. return
  89. }
  90. rs := ctx.snapshot
  91. if !reflect.DeepEqual(s, rs) {
  92. t.Errorf("%d.Snapshot\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, s, rs)
  93. }
  94. }
  95. func cleanStateData() {
  96. dbDir, err := conf.GetDataLoc()
  97. if err != nil {
  98. log.Panic(err)
  99. }
  100. c := path.Join(dbDir, state.CheckpointListKey)
  101. err = os.RemoveAll(c)
  102. if err != nil {
  103. conf.Log.Error(err)
  104. }
  105. }
  106. func TestDynamicProp(t *testing.T) {
  107. var tests = []struct {
  108. j string
  109. v []interface{} // values
  110. r []interface{} // parsed results
  111. }{
  112. {
  113. j: "$.a",
  114. v: []interface{}{
  115. map[string]interface{}{
  116. "a": 123,
  117. "b": "dafds",
  118. },
  119. map[string]interface{}{
  120. "a": "single",
  121. "c": 20.2,
  122. },
  123. map[string]interface{}{
  124. "b": "b",
  125. "c": "c",
  126. },
  127. },
  128. r: []interface{}{
  129. 123,
  130. "single",
  131. nil,
  132. },
  133. }, {
  134. j: "$[0].a",
  135. v: []interface{}{
  136. []map[string]interface{}{{
  137. "a": 123,
  138. "b": "dafds",
  139. }},
  140. []map[string]interface{}{},
  141. []map[string]interface{}{
  142. {
  143. "a": "single",
  144. "c": 20.2,
  145. },
  146. {
  147. "b": "b",
  148. "c": "c",
  149. },
  150. },
  151. },
  152. r: []interface{}{
  153. 123,
  154. nil,
  155. "single",
  156. },
  157. }, {
  158. j: "a",
  159. v: []interface{}{
  160. map[string]interface{}{
  161. "a": 123,
  162. "b": "dafds",
  163. },
  164. map[string]interface{}{
  165. "a": "single",
  166. "c": 20.2,
  167. },
  168. map[string]interface{}{
  169. "b": "b",
  170. "c": "c",
  171. },
  172. },
  173. r: []interface{}{
  174. "a",
  175. "a",
  176. "a",
  177. },
  178. },
  179. }
  180. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  181. ctx := Background().WithMeta("testStateRule", "op1", &state.MemoryStore{})
  182. for i, tt := range tests {
  183. var result []interface{}
  184. for _, v := range tt.v {
  185. prop, err := ctx.ParseDynamicProp(tt.j, v)
  186. if err != nil {
  187. fmt.Printf("%d:%s parse %v error\n", i, tt.j, v)
  188. }
  189. result = append(result, prop)
  190. }
  191. if !reflect.DeepEqual(tt.r, result) {
  192. t.Errorf("%d. %s\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.j, tt.r, result)
  193. }
  194. }
  195. }
  196. func TestTransition(t *testing.T) {
  197. mockFunc := func(d interface{}) ([]byte, bool, error) {
  198. return []byte(fmt.Sprintf("%v", d)), true, nil
  199. }
  200. var tests = []struct {
  201. trans *TransConfig
  202. r []byte
  203. }{
  204. {
  205. trans: &TransConfig{
  206. Data: "hello",
  207. TFunc: mockFunc,
  208. },
  209. r: []byte(`hello`),
  210. }, {
  211. trans: &TransConfig{
  212. Data: "world",
  213. TFunc: mockFunc,
  214. },
  215. r: []byte(`world`),
  216. }, {
  217. trans: &TransConfig{
  218. Data: map[string]interface{}{"a": "hello"},
  219. TFunc: mockFunc,
  220. },
  221. r: []byte(`map[a:hello]`),
  222. },
  223. }
  224. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  225. ctx := Background().WithMeta("testTransRule", "op1", &state.MemoryStore{}).(*DefaultContext)
  226. for i, tt := range tests {
  227. nc := WithValue(ctx, TransKey, tt.trans)
  228. r, _, _ := nc.TransformOutput()
  229. if !reflect.DeepEqual(tt.r, r) {
  230. t.Errorf("%d\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, string(tt.r), string(r))
  231. }
  232. }
  233. }