memory_test.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package memory
  15. import (
  16. "fmt"
  17. "github.com/gdexlab/go-render/render"
  18. "github.com/lf-edge/ekuiper/internal/conf"
  19. "github.com/lf-edge/ekuiper/internal/topo/context"
  20. "github.com/lf-edge/ekuiper/internal/topo/memory/pubsub"
  21. "github.com/lf-edge/ekuiper/internal/topo/state"
  22. "github.com/lf-edge/ekuiper/pkg/api"
  23. "reflect"
  24. "testing"
  25. "time"
  26. )
  27. func TestSharedInmemoryNode(t *testing.T) {
  28. pubsub.Reset()
  29. id := "test_id"
  30. sinkProps := make(map[string]interface{})
  31. sinkProps[pubsub.IdProperty] = id
  32. src := GetSource()
  33. snk := GetSink()
  34. contextLogger := conf.Log.WithField("rule", "test")
  35. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  36. consumer := make(chan api.SourceTuple)
  37. errorChannel := make(chan error)
  38. srcProps := make(map[string]interface{})
  39. srcProps["option"] = "value"
  40. err := snk.Configure(sinkProps)
  41. if err != nil {
  42. t.Error(err)
  43. return
  44. }
  45. err = snk.Open(ctx)
  46. if err != nil {
  47. t.Error(err)
  48. return
  49. }
  50. srcProps[pubsub.IdProperty] = id
  51. err = src.Configure(id, srcProps)
  52. if err != nil {
  53. t.Error(err)
  54. }
  55. go func() {
  56. src.Open(ctx, consumer, errorChannel)
  57. }()
  58. //if _, contains := pubTopics[id]; !contains {
  59. // t.Errorf("there should be memory node for topic")
  60. //}
  61. data := make(map[string]interface{})
  62. data["temperature"] = 33.0
  63. list := make([]map[string]interface{}, 0)
  64. list = append(list, data)
  65. go func() {
  66. err = snk.Collect(ctx, list)
  67. if err != nil {
  68. t.Error(err)
  69. }
  70. }()
  71. for {
  72. select {
  73. case res := <-consumer:
  74. expected := api.NewDefaultSourceTuple(data, map[string]interface{}{"topic": "test_id"})
  75. if !reflect.DeepEqual(expected, res) {
  76. t.Errorf("result %s should be equal to %s", res, expected)
  77. }
  78. return
  79. default:
  80. }
  81. }
  82. }
  83. func TestMultipleTopics(t *testing.T) {
  84. pubsub.Reset()
  85. var (
  86. sourceTopics = []string{"h/d1/c1/s2", "h/+/+/s1", "h/d3/#", "h/d1/c1/s2", "h/+/c1/s1"}
  87. sinkTopics = []string{"h/d1/c1/s1", "h/d1/c1/s2", "h/d2/c2/s1", "h/d3/c3/s1"}
  88. sinkData = [][]map[string]interface{}{
  89. {
  90. {
  91. "id": 1,
  92. "temp": 23,
  93. },
  94. {
  95. "id": 2,
  96. "temp": 34,
  97. },
  98. {
  99. "id": 3,
  100. "temp": 28,
  101. },
  102. }, {
  103. {
  104. "id": 4,
  105. "color": "red",
  106. },
  107. {
  108. "id": 5,
  109. "color": "red",
  110. },
  111. {
  112. "id": 6,
  113. "color": "green",
  114. },
  115. }, {
  116. {
  117. "id": 7,
  118. "hum": 67.5,
  119. },
  120. {
  121. "id": 8,
  122. "hum": 77.1,
  123. },
  124. {
  125. "id": 9,
  126. "hum": 90.3,
  127. },
  128. }, {
  129. {
  130. "id": 10,
  131. "status": "on",
  132. },
  133. {
  134. "id": 11,
  135. "status": "off",
  136. },
  137. {
  138. "id": 12,
  139. "status": "on",
  140. },
  141. },
  142. }
  143. expected = []api.SourceTuple{
  144. &api.DefaultSourceTuple{
  145. Mess: map[string]interface{}{
  146. "id": 1,
  147. "temp": 23,
  148. },
  149. M: map[string]interface{}{
  150. "topic": "h/d1/c1/s1",
  151. },
  152. },
  153. &api.DefaultSourceTuple{
  154. Mess: map[string]interface{}{
  155. "id": 1,
  156. "temp": 23,
  157. },
  158. M: map[string]interface{}{
  159. "topic": "h/d1/c1/s1",
  160. },
  161. },
  162. &api.DefaultSourceTuple{
  163. Mess: map[string]interface{}{
  164. "id": 4,
  165. "color": "red",
  166. },
  167. M: map[string]interface{}{
  168. "topic": "h/d1/c1/s2",
  169. },
  170. },
  171. &api.DefaultSourceTuple{
  172. Mess: map[string]interface{}{
  173. "id": 4,
  174. "color": "red",
  175. },
  176. M: map[string]interface{}{
  177. "topic": "h/d1/c1/s2",
  178. },
  179. },
  180. &api.DefaultSourceTuple{
  181. Mess: map[string]interface{}{
  182. "id": 7,
  183. "hum": 67.5,
  184. },
  185. M: map[string]interface{}{
  186. "topic": "h/d2/c2/s1",
  187. },
  188. },
  189. &api.DefaultSourceTuple{
  190. Mess: map[string]interface{}{
  191. "id": 10,
  192. "status": "on",
  193. },
  194. M: map[string]interface{}{
  195. "topic": "h/d3/c3/s1",
  196. },
  197. },
  198. &api.DefaultSourceTuple{
  199. Mess: map[string]interface{}{
  200. "id": 10,
  201. "status": "on",
  202. },
  203. M: map[string]interface{}{
  204. "topic": "h/d3/c3/s1",
  205. },
  206. },
  207. &api.DefaultSourceTuple{
  208. Mess: map[string]interface{}{
  209. "id": 2,
  210. "temp": 34,
  211. },
  212. M: map[string]interface{}{
  213. "topic": "h/d1/c1/s1",
  214. },
  215. },
  216. &api.DefaultSourceTuple{
  217. Mess: map[string]interface{}{
  218. "id": 2,
  219. "temp": 34,
  220. },
  221. M: map[string]interface{}{
  222. "topic": "h/d1/c1/s1",
  223. },
  224. },
  225. &api.DefaultSourceTuple{
  226. Mess: map[string]interface{}{
  227. "id": 5,
  228. "color": "red",
  229. },
  230. M: map[string]interface{}{
  231. "topic": "h/d1/c1/s2",
  232. },
  233. },
  234. &api.DefaultSourceTuple{
  235. Mess: map[string]interface{}{
  236. "id": 5,
  237. "color": "red",
  238. },
  239. M: map[string]interface{}{
  240. "topic": "h/d1/c1/s2",
  241. },
  242. },
  243. &api.DefaultSourceTuple{
  244. Mess: map[string]interface{}{
  245. "id": 8,
  246. "hum": 77.1,
  247. },
  248. M: map[string]interface{}{
  249. "topic": "h/d2/c2/s1",
  250. },
  251. },
  252. &api.DefaultSourceTuple{
  253. Mess: map[string]interface{}{
  254. "id": 11,
  255. "status": "off",
  256. },
  257. M: map[string]interface{}{
  258. "topic": "h/d3/c3/s1",
  259. },
  260. },
  261. &api.DefaultSourceTuple{
  262. Mess: map[string]interface{}{
  263. "id": 11,
  264. "status": "off",
  265. },
  266. M: map[string]interface{}{
  267. "topic": "h/d3/c3/s1",
  268. },
  269. },
  270. &api.DefaultSourceTuple{
  271. Mess: map[string]interface{}{
  272. "id": 3,
  273. "temp": 28,
  274. },
  275. M: map[string]interface{}{
  276. "topic": "h/d1/c1/s1",
  277. },
  278. },
  279. &api.DefaultSourceTuple{
  280. Mess: map[string]interface{}{
  281. "id": 3,
  282. "temp": 28,
  283. },
  284. M: map[string]interface{}{
  285. "topic": "h/d1/c1/s1",
  286. },
  287. },
  288. &api.DefaultSourceTuple{
  289. Mess: map[string]interface{}{
  290. "id": 6,
  291. "color": "green",
  292. },
  293. M: map[string]interface{}{
  294. "topic": "h/d1/c1/s2",
  295. },
  296. },
  297. &api.DefaultSourceTuple{
  298. Mess: map[string]interface{}{
  299. "id": 6,
  300. "color": "green",
  301. },
  302. M: map[string]interface{}{
  303. "topic": "h/d1/c1/s2",
  304. },
  305. },
  306. &api.DefaultSourceTuple{
  307. Mess: map[string]interface{}{
  308. "id": 9,
  309. "hum": 90.3,
  310. },
  311. M: map[string]interface{}{
  312. "topic": "h/d2/c2/s1",
  313. },
  314. },
  315. &api.DefaultSourceTuple{
  316. Mess: map[string]interface{}{
  317. "id": 12,
  318. "status": "on",
  319. },
  320. M: map[string]interface{}{
  321. "topic": "h/d3/c3/s1",
  322. },
  323. },
  324. &api.DefaultSourceTuple{
  325. Mess: map[string]interface{}{
  326. "id": 12,
  327. "status": "on",
  328. },
  329. M: map[string]interface{}{
  330. "topic": "h/d3/c3/s1",
  331. },
  332. },
  333. }
  334. )
  335. contextLogger := conf.Log.WithField("rule", "test")
  336. ctx, cancel := context.WithValue(context.Background(), context.LoggerKey, contextLogger).WithCancel()
  337. consumer := make(chan api.SourceTuple)
  338. errorChannel := make(chan error)
  339. count := 0
  340. for _, topic := range sinkTopics {
  341. snk := GetSink()
  342. err := snk.Configure(map[string]interface{}{"topic": topic})
  343. if err != nil {
  344. t.Error(err)
  345. return
  346. }
  347. err = snk.Open(ctx)
  348. if err != nil {
  349. t.Error(err)
  350. return
  351. }
  352. src := GetSource()
  353. err = src.Configure(sourceTopics[count], make(map[string]interface{}))
  354. if err != nil {
  355. t.Error(err)
  356. return
  357. }
  358. go func(c int) {
  359. nc := ctx.WithMeta("rule1", fmt.Sprintf("op%d", c), &state.MemoryStore{})
  360. src.Open(nc, consumer, errorChannel)
  361. }(count)
  362. count++
  363. }
  364. for count < len(sourceTopics) {
  365. src := GetSource()
  366. err := src.Configure(sourceTopics[count], make(map[string]interface{}))
  367. if err != nil {
  368. t.Error(err)
  369. return
  370. }
  371. go func(c int) {
  372. nc := ctx.WithMeta("rule1", fmt.Sprintf("op%d", c), &state.MemoryStore{})
  373. src.Open(nc, consumer, errorChannel)
  374. }(count)
  375. count++
  376. }
  377. go func() {
  378. c := 0
  379. for c < 3 {
  380. for i, v := range sinkData {
  381. time.Sleep(10 * time.Millisecond)
  382. pubsub.Produce(ctx, sinkTopics[i], v[c])
  383. }
  384. c++
  385. }
  386. cancel()
  387. time.Sleep(100 * time.Millisecond)
  388. close(consumer)
  389. }()
  390. var results []api.SourceTuple
  391. for res := range consumer {
  392. results = append(results, res)
  393. }
  394. if !reflect.DeepEqual(expected, results) {
  395. t.Errorf("Expect\t %v\n but got\t\t\t\t %v", render.AsCode(expected), render.AsCode(results))
  396. }
  397. }