default_test.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package context
  15. import (
  16. "fmt"
  17. "log"
  18. "os"
  19. "path"
  20. "reflect"
  21. "testing"
  22. "github.com/lf-edge/ekuiper/internal/conf"
  23. "github.com/lf-edge/ekuiper/internal/pkg/store"
  24. "github.com/lf-edge/ekuiper/internal/topo/state"
  25. "github.com/lf-edge/ekuiper/internal/topo/transform"
  26. "github.com/lf-edge/ekuiper/pkg/api"
  27. )
  28. func TestState(t *testing.T) {
  29. err := store.SetupDefault()
  30. if err != nil {
  31. t.Error(err)
  32. }
  33. var (
  34. i = 0
  35. ruleId = "testStateRule"
  36. value1 = 21
  37. value2 = "hello"
  38. value3 = "world"
  39. s = map[string]interface{}{
  40. "key1": 21,
  41. "key3": "world",
  42. }
  43. )
  44. // initialization
  45. cStore, err := state.CreateStore(ruleId, api.AtLeastOnce)
  46. if err != nil {
  47. t.Errorf("Get store for rule %s error: %s", ruleId, err)
  48. return
  49. }
  50. ctx := Background().WithMeta("testStateRule", "op1", cStore).(*DefaultContext)
  51. defer cleanStateData()
  52. // Do state function
  53. _ = ctx.IncrCounter("key1", 20)
  54. _ = ctx.IncrCounter("key1", 1)
  55. v, err := ctx.GetCounter("key1")
  56. if err != nil {
  57. t.Errorf("%d.Get counter error: %s", i, err)
  58. return
  59. }
  60. if !reflect.DeepEqual(value1, v) {
  61. t.Errorf("%d.Get counter\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, value1, v)
  62. }
  63. err = ctx.PutState("key2", value2)
  64. if err != nil {
  65. t.Errorf("%d.Put state key2 error: %s", i, err)
  66. return
  67. }
  68. err = ctx.PutState("key3", value3)
  69. if err != nil {
  70. t.Errorf("%d.Put state key3 error: %s", i, err)
  71. return
  72. }
  73. v2, err := ctx.GetState("key2")
  74. if err != nil {
  75. t.Errorf("%d.Get state key2 error: %s", i, err)
  76. return
  77. }
  78. if !reflect.DeepEqual(value2, v2) {
  79. t.Errorf("%d.Get state\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, value2, v2)
  80. }
  81. err = ctx.DeleteState("key2")
  82. if err != nil {
  83. t.Errorf("%d.Delete state key2 error: %s", i, err)
  84. return
  85. }
  86. err = ctx.Snapshot()
  87. if err != nil {
  88. t.Errorf("%d.Snapshot error: %s", i, err)
  89. return
  90. }
  91. rs := ctx.snapshot
  92. if !reflect.DeepEqual(s, rs) {
  93. t.Errorf("%d.Snapshot\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, s, rs)
  94. }
  95. }
  96. func cleanStateData() {
  97. dbDir, err := conf.GetDataLoc()
  98. if err != nil {
  99. log.Panic(err)
  100. }
  101. c := path.Join(dbDir, state.CheckpointListKey)
  102. err = os.RemoveAll(c)
  103. if err != nil {
  104. conf.Log.Error(err)
  105. }
  106. }
  107. func TestParseJsonPath(t *testing.T) {
  108. tests := []struct {
  109. j string
  110. v []interface{} // values
  111. r []interface{} // parsed results
  112. }{
  113. {
  114. j: "$.a",
  115. v: []interface{}{
  116. map[string]interface{}{
  117. "a": 123,
  118. "b": "dafds",
  119. },
  120. map[string]interface{}{
  121. "a": "single",
  122. "c": 20.2,
  123. },
  124. map[string]interface{}{
  125. "b": "b",
  126. "c": "c",
  127. },
  128. },
  129. r: []interface{}{
  130. 123,
  131. "single",
  132. nil,
  133. },
  134. }, {
  135. j: "$[0].a",
  136. v: []interface{}{
  137. []map[string]interface{}{{
  138. "a": 123,
  139. "b": "dafds",
  140. }},
  141. []map[string]interface{}{},
  142. []map[string]interface{}{
  143. {
  144. "a": "single",
  145. "c": 20.2,
  146. },
  147. {
  148. "b": "b",
  149. "c": "c",
  150. },
  151. },
  152. },
  153. r: []interface{}{
  154. 123,
  155. nil,
  156. "single",
  157. },
  158. },
  159. }
  160. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  161. ctx := Background().WithMeta("testStateRule", "op1", &state.MemoryStore{})
  162. for i, tt := range tests {
  163. var result []interface{}
  164. for _, v := range tt.v {
  165. prop, err := ctx.ParseJsonPath(tt.j, v)
  166. if err != nil {
  167. fmt.Printf("%d:%s parse %v error\n", i, tt.j, v)
  168. }
  169. result = append(result, prop)
  170. }
  171. if !reflect.DeepEqual(tt.r, result) {
  172. t.Errorf("%d. %s\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.j, tt.r, result)
  173. }
  174. }
  175. }
  176. func TestParseTemplate(t *testing.T) {
  177. tests := []struct {
  178. j string
  179. v []interface{} // values
  180. r []interface{} // parsed results
  181. }{
  182. {
  183. j: "devices/{{.a}}",
  184. v: []interface{}{
  185. map[string]interface{}{
  186. "a": 123,
  187. "b": "dafds",
  188. },
  189. map[string]interface{}{
  190. "a": "single",
  191. "c": 20.2,
  192. },
  193. map[string]interface{}{
  194. "b": "b",
  195. "c": "c",
  196. },
  197. },
  198. r: []interface{}{
  199. "devices/123",
  200. "devices/single",
  201. "devices/<no value>",
  202. },
  203. },
  204. }
  205. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  206. ctx := Background().WithMeta("testStateRule", "op1", &state.MemoryStore{})
  207. for i, tt := range tests {
  208. var result []interface{}
  209. for _, v := range tt.v {
  210. prop, err := ctx.ParseTemplate(tt.j, v)
  211. if err != nil {
  212. fmt.Printf("%d:%s parse %v error\n", i, tt.j, v)
  213. }
  214. result = append(result, prop)
  215. }
  216. if !reflect.DeepEqual(tt.r, result) {
  217. t.Errorf("%d. %s\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.j, tt.r, result)
  218. }
  219. }
  220. }
  221. func TestTransition(t *testing.T) {
  222. var mockFunc transform.TransFunc = func(d interface{}) ([]byte, bool, error) {
  223. return []byte(fmt.Sprintf("%v", d)), true, nil
  224. }
  225. tests := []struct {
  226. data interface{}
  227. r []byte
  228. }{
  229. {
  230. data: "hello",
  231. r: []byte(`hello`),
  232. }, {
  233. data: "world",
  234. r: []byte(`world`),
  235. }, {
  236. data: map[string]interface{}{"a": "hello"},
  237. r: []byte(`map[a:hello]`),
  238. },
  239. }
  240. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  241. ctx := Background().WithMeta("testTransRule", "op1", &state.MemoryStore{}).(*DefaultContext)
  242. nc := WithValue(ctx, TransKey, mockFunc)
  243. for i, tt := range tests {
  244. r, _, _ := nc.TransformOutput(tt.data)
  245. if !reflect.DeepEqual(tt.r, r) {
  246. t.Errorf("%d\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, string(tt.r), string(r))
  247. }
  248. }
  249. }