memory_test.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package memory
  15. import (
  16. "fmt"
  17. "reflect"
  18. "testing"
  19. "time"
  20. "github.com/benbjohnson/clock"
  21. "github.com/gdexlab/go-render/render"
  22. "github.com/lf-edge/ekuiper/internal/conf"
  23. "github.com/lf-edge/ekuiper/internal/io/memory/pubsub"
  24. "github.com/lf-edge/ekuiper/internal/topo/context"
  25. "github.com/lf-edge/ekuiper/internal/topo/state"
  26. "github.com/lf-edge/ekuiper/pkg/api"
  27. )
  28. func TestSharedInmemoryNode(t *testing.T) {
  29. pubsub.Reset()
  30. id := "test_id"
  31. sinkProps := make(map[string]interface{})
  32. sinkProps[pubsub.IdProperty] = id
  33. src := GetSource()
  34. snk := GetSink()
  35. contextLogger := conf.Log.WithField("rule", "test")
  36. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  37. consumer := make(chan api.SourceTuple)
  38. errorChannel := make(chan error)
  39. srcProps := make(map[string]interface{})
  40. srcProps["option"] = "value"
  41. err := snk.Configure(sinkProps)
  42. if err != nil {
  43. t.Error(err)
  44. return
  45. }
  46. err = snk.Open(ctx)
  47. if err != nil {
  48. t.Error(err)
  49. return
  50. }
  51. srcProps[pubsub.IdProperty] = id
  52. err = src.Configure(id, srcProps)
  53. if err != nil {
  54. t.Error(err)
  55. }
  56. go func() {
  57. src.Open(ctx, consumer, errorChannel)
  58. }()
  59. //if _, contains := pubTopics[id]; !contains {
  60. // t.Errorf("there should be memory node for topic")
  61. //}
  62. data := make(map[string]interface{})
  63. data["temperature"] = 33.0
  64. list := make([]map[string]interface{}, 0)
  65. list = append(list, data)
  66. go func() {
  67. err = snk.Collect(ctx, list)
  68. if err != nil {
  69. t.Error(err)
  70. }
  71. }()
  72. for {
  73. select {
  74. case res := <-consumer:
  75. mc := conf.Clock.(*clock.Mock)
  76. expected := api.NewDefaultSourceTupleWithTime(data, map[string]interface{}{"topic": "test_id"}, mc.Now())
  77. if !reflect.DeepEqual(expected, res) {
  78. t.Errorf("result %s should be equal to %s", res, expected)
  79. }
  80. return
  81. default:
  82. }
  83. }
  84. }
  85. func TestMultipleTopics(t *testing.T) {
  86. pubsub.Reset()
  87. var (
  88. sourceTopics = []string{"h/d1/c1/s2", "h/+/+/s1", "h/d3/#", "h/d1/c1/s2", "h/+/c1/s1"}
  89. sinkTopics = []string{"h/d1/c1/s1", "h/d1/c1/s2", "h/d2/c2/s1", "h/d3/c3/s1"}
  90. sinkData = [][]map[string]interface{}{
  91. {
  92. {
  93. "id": 1,
  94. "temp": 23,
  95. },
  96. {
  97. "id": 2,
  98. "temp": 34,
  99. },
  100. {
  101. "id": 3,
  102. "temp": 28,
  103. },
  104. }, {
  105. {
  106. "id": 4,
  107. "color": "red",
  108. },
  109. {
  110. "id": 5,
  111. "color": "red",
  112. },
  113. {
  114. "id": 6,
  115. "color": "green",
  116. },
  117. }, {
  118. {
  119. "id": 7,
  120. "hum": 67.5,
  121. },
  122. {
  123. "id": 8,
  124. "hum": 77.1,
  125. },
  126. {
  127. "id": 9,
  128. "hum": 90.3,
  129. },
  130. }, {
  131. {
  132. "id": 10,
  133. "status": "on",
  134. },
  135. {
  136. "id": 11,
  137. "status": "off",
  138. },
  139. {
  140. "id": 12,
  141. "status": "on",
  142. },
  143. },
  144. }
  145. expected = []api.SourceTuple{
  146. &api.DefaultSourceTuple{
  147. Mess: map[string]interface{}{
  148. "id": 1,
  149. "temp": 23,
  150. },
  151. M: map[string]interface{}{
  152. "topic": "h/d1/c1/s1",
  153. },
  154. },
  155. &api.DefaultSourceTuple{
  156. Mess: map[string]interface{}{
  157. "id": 1,
  158. "temp": 23,
  159. },
  160. M: map[string]interface{}{
  161. "topic": "h/d1/c1/s1",
  162. },
  163. },
  164. &api.DefaultSourceTuple{
  165. Mess: map[string]interface{}{
  166. "id": 4,
  167. "color": "red",
  168. },
  169. M: map[string]interface{}{
  170. "topic": "h/d1/c1/s2",
  171. },
  172. },
  173. &api.DefaultSourceTuple{
  174. Mess: map[string]interface{}{
  175. "id": 4,
  176. "color": "red",
  177. },
  178. M: map[string]interface{}{
  179. "topic": "h/d1/c1/s2",
  180. },
  181. },
  182. &api.DefaultSourceTuple{
  183. Mess: map[string]interface{}{
  184. "id": 7,
  185. "hum": 67.5,
  186. },
  187. M: map[string]interface{}{
  188. "topic": "h/d2/c2/s1",
  189. },
  190. },
  191. &api.DefaultSourceTuple{
  192. Mess: map[string]interface{}{
  193. "id": 10,
  194. "status": "on",
  195. },
  196. M: map[string]interface{}{
  197. "topic": "h/d3/c3/s1",
  198. },
  199. },
  200. &api.DefaultSourceTuple{
  201. Mess: map[string]interface{}{
  202. "id": 10,
  203. "status": "on",
  204. },
  205. M: map[string]interface{}{
  206. "topic": "h/d3/c3/s1",
  207. },
  208. },
  209. &api.DefaultSourceTuple{
  210. Mess: map[string]interface{}{
  211. "id": 2,
  212. "temp": 34,
  213. },
  214. M: map[string]interface{}{
  215. "topic": "h/d1/c1/s1",
  216. },
  217. },
  218. &api.DefaultSourceTuple{
  219. Mess: map[string]interface{}{
  220. "id": 2,
  221. "temp": 34,
  222. },
  223. M: map[string]interface{}{
  224. "topic": "h/d1/c1/s1",
  225. },
  226. },
  227. &api.DefaultSourceTuple{
  228. Mess: map[string]interface{}{
  229. "id": 5,
  230. "color": "red",
  231. },
  232. M: map[string]interface{}{
  233. "topic": "h/d1/c1/s2",
  234. },
  235. },
  236. &api.DefaultSourceTuple{
  237. Mess: map[string]interface{}{
  238. "id": 5,
  239. "color": "red",
  240. },
  241. M: map[string]interface{}{
  242. "topic": "h/d1/c1/s2",
  243. },
  244. },
  245. &api.DefaultSourceTuple{
  246. Mess: map[string]interface{}{
  247. "id": 8,
  248. "hum": 77.1,
  249. },
  250. M: map[string]interface{}{
  251. "topic": "h/d2/c2/s1",
  252. },
  253. },
  254. &api.DefaultSourceTuple{
  255. Mess: map[string]interface{}{
  256. "id": 11,
  257. "status": "off",
  258. },
  259. M: map[string]interface{}{
  260. "topic": "h/d3/c3/s1",
  261. },
  262. },
  263. &api.DefaultSourceTuple{
  264. Mess: map[string]interface{}{
  265. "id": 11,
  266. "status": "off",
  267. },
  268. M: map[string]interface{}{
  269. "topic": "h/d3/c3/s1",
  270. },
  271. },
  272. &api.DefaultSourceTuple{
  273. Mess: map[string]interface{}{
  274. "id": 3,
  275. "temp": 28,
  276. },
  277. M: map[string]interface{}{
  278. "topic": "h/d1/c1/s1",
  279. },
  280. },
  281. &api.DefaultSourceTuple{
  282. Mess: map[string]interface{}{
  283. "id": 3,
  284. "temp": 28,
  285. },
  286. M: map[string]interface{}{
  287. "topic": "h/d1/c1/s1",
  288. },
  289. },
  290. &api.DefaultSourceTuple{
  291. Mess: map[string]interface{}{
  292. "id": 6,
  293. "color": "green",
  294. },
  295. M: map[string]interface{}{
  296. "topic": "h/d1/c1/s2",
  297. },
  298. },
  299. &api.DefaultSourceTuple{
  300. Mess: map[string]interface{}{
  301. "id": 6,
  302. "color": "green",
  303. },
  304. M: map[string]interface{}{
  305. "topic": "h/d1/c1/s2",
  306. },
  307. },
  308. &api.DefaultSourceTuple{
  309. Mess: map[string]interface{}{
  310. "id": 9,
  311. "hum": 90.3,
  312. },
  313. M: map[string]interface{}{
  314. "topic": "h/d2/c2/s1",
  315. },
  316. },
  317. &api.DefaultSourceTuple{
  318. Mess: map[string]interface{}{
  319. "id": 12,
  320. "status": "on",
  321. },
  322. M: map[string]interface{}{
  323. "topic": "h/d3/c3/s1",
  324. },
  325. },
  326. &api.DefaultSourceTuple{
  327. Mess: map[string]interface{}{
  328. "id": 12,
  329. "status": "on",
  330. },
  331. M: map[string]interface{}{
  332. "topic": "h/d3/c3/s1",
  333. },
  334. },
  335. }
  336. )
  337. contextLogger := conf.Log.WithField("rule", "test")
  338. ctx, cancel := context.WithValue(context.Background(), context.LoggerKey, contextLogger).WithCancel()
  339. consumer := make(chan api.SourceTuple)
  340. errorChannel := make(chan error)
  341. count := 0
  342. for _, topic := range sinkTopics {
  343. snk := GetSink()
  344. err := snk.Configure(map[string]interface{}{"topic": topic})
  345. if err != nil {
  346. t.Error(err)
  347. return
  348. }
  349. err = snk.Open(ctx)
  350. if err != nil {
  351. t.Error(err)
  352. return
  353. }
  354. src := GetSource()
  355. err = src.Configure(sourceTopics[count], make(map[string]interface{}))
  356. if err != nil {
  357. t.Error(err)
  358. return
  359. }
  360. go func(c int) {
  361. nc := ctx.WithMeta("rule1", fmt.Sprintf("op%d", c), &state.MemoryStore{})
  362. src.Open(nc, consumer, errorChannel)
  363. }(count)
  364. count++
  365. }
  366. for count < len(sourceTopics) {
  367. src := GetSource()
  368. err := src.Configure(sourceTopics[count], make(map[string]interface{}))
  369. if err != nil {
  370. t.Error(err)
  371. return
  372. }
  373. go func(c int) {
  374. nc := ctx.WithMeta("rule1", fmt.Sprintf("op%d", c), &state.MemoryStore{})
  375. src.Open(nc, consumer, errorChannel)
  376. }(count)
  377. count++
  378. }
  379. go func() {
  380. c := 0
  381. for c < 3 {
  382. for i, v := range sinkData {
  383. time.Sleep(10 * time.Millisecond)
  384. pubsub.Produce(ctx, sinkTopics[i], v[c])
  385. }
  386. c++
  387. }
  388. cancel()
  389. time.Sleep(100 * time.Millisecond)
  390. close(consumer)
  391. }()
  392. var results []api.SourceTuple
  393. for res := range consumer {
  394. results = append(results, res)
  395. }
  396. for i, r := range results {
  397. if !reflect.DeepEqual(r.Message(), expected[i].Message()) || !reflect.DeepEqual(r.Meta(), expected[i].Meta()) {
  398. t.Errorf("Expect\t %v\n but got\t\t\t\t %v", render.AsCode(expected[i]), render.AsCode(r))
  399. }
  400. }
  401. }