manager_test.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package memory
  15. import (
  16. "fmt"
  17. "github.com/gdexlab/go-render/render"
  18. "github.com/lf-edge/ekuiper/internal/conf"
  19. "github.com/lf-edge/ekuiper/internal/topo/context"
  20. "github.com/lf-edge/ekuiper/internal/topo/state"
  21. "github.com/lf-edge/ekuiper/pkg/api"
  22. "reflect"
  23. "regexp"
  24. "strings"
  25. "testing"
  26. "time"
  27. )
  28. func reset() {
  29. pubTopics = make(map[string]*pubConsumers)
  30. subExps = make(map[string]*subChan)
  31. }
  32. func TestSharedInmemoryNode(t *testing.T) {
  33. reset()
  34. id := "test_id"
  35. sinkProps := make(map[string]interface{})
  36. sinkProps[IdProperty] = id
  37. src := GetSource()
  38. snk := GetSink()
  39. contextLogger := conf.Log.WithField("rule", "test")
  40. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  41. consumer := make(chan api.SourceTuple)
  42. errorChannel := make(chan error)
  43. srcProps := make(map[string]interface{})
  44. srcProps["option"] = "value"
  45. err := snk.Configure(sinkProps)
  46. if err != nil {
  47. t.Error(err)
  48. return
  49. }
  50. err = snk.Open(ctx)
  51. if err != nil {
  52. t.Error(err)
  53. return
  54. }
  55. srcProps[IdProperty] = id
  56. err = src.Configure(id, srcProps)
  57. if err != nil {
  58. t.Error(err)
  59. }
  60. go func() {
  61. src.Open(ctx, consumer, errorChannel)
  62. }()
  63. if _, contains := pubTopics[id]; !contains {
  64. t.Errorf("there should be memory node for topic")
  65. }
  66. data := make(map[string]interface{})
  67. data["temperature"] = 33.0
  68. list := make([]map[string]interface{}, 0)
  69. list = append(list, data)
  70. go func() {
  71. err = snk.Collect(ctx, list)
  72. if err != nil {
  73. t.Error(err)
  74. }
  75. }()
  76. for {
  77. select {
  78. case res := <-consumer:
  79. expected := api.NewDefaultSourceTuple(data, map[string]interface{}{"topic": "test_id"})
  80. if !reflect.DeepEqual(expected, res) {
  81. t.Errorf("result %s should be equal to %s", res, expected)
  82. }
  83. return
  84. default:
  85. }
  86. }
  87. }
  88. func TestCreateAndClose(t *testing.T) {
  89. reset()
  90. var (
  91. sourceTopics = []string{"h/d1/c1/s2", "h/+/+/s1", "h/d3/#", "h/d1/c1/s2", "h/+/c1/s1"}
  92. sinkTopics = []string{"h/d1/c1/s1", "h/d1/c1/s2", "h/d2/c2/s1", "h/d3/c3/s1", "h/d1/c1/s1"}
  93. chans []chan api.SourceTuple
  94. )
  95. for i, topic := range sinkTopics {
  96. CreatePub(topic)
  97. var (
  98. r *regexp.Regexp
  99. err error
  100. )
  101. if strings.ContainsAny(sourceTopics[i], "+#") {
  102. r, err = getRegexp(sourceTopics[i])
  103. if err != nil {
  104. t.Error(err)
  105. return
  106. }
  107. }
  108. c := CreateSub(sourceTopics[i], r, fmt.Sprintf("%d", i), 100)
  109. chans = append(chans, c)
  110. }
  111. expPub := map[string]*pubConsumers{
  112. "h/d1/c1/s1": {
  113. count: 2,
  114. consumers: map[string]chan api.SourceTuple{
  115. "1": chans[1],
  116. "4": chans[4],
  117. },
  118. },
  119. "h/d1/c1/s2": {
  120. count: 1,
  121. consumers: map[string]chan api.SourceTuple{
  122. "0": chans[0],
  123. "3": chans[3],
  124. },
  125. },
  126. "h/d2/c2/s1": {
  127. count: 1,
  128. consumers: map[string]chan api.SourceTuple{
  129. "1": chans[1],
  130. },
  131. },
  132. "h/d3/c3/s1": {
  133. count: 1,
  134. consumers: map[string]chan api.SourceTuple{
  135. "1": chans[1],
  136. "2": chans[2],
  137. },
  138. },
  139. }
  140. if !reflect.DeepEqual(expPub, pubTopics) {
  141. t.Errorf("Error adding: Expect\n\t%v\nbut got\n\t%v", render.AsCode(expPub), render.AsCode(pubTopics))
  142. return
  143. }
  144. i := 0
  145. for i < 3 {
  146. CloseSourceConsumerChannel(sourceTopics[i], fmt.Sprintf("%d", i))
  147. RemovePub(sinkTopics[i])
  148. i++
  149. }
  150. expPub = map[string]*pubConsumers{
  151. "h/d1/c1/s1": {
  152. count: 1,
  153. consumers: map[string]chan api.SourceTuple{
  154. "4": chans[4],
  155. },
  156. },
  157. "h/d1/c1/s2": {
  158. count: 0,
  159. consumers: map[string]chan api.SourceTuple{
  160. "3": chans[3],
  161. },
  162. },
  163. "h/d3/c3/s1": {
  164. count: 1,
  165. consumers: map[string]chan api.SourceTuple{},
  166. },
  167. }
  168. if !reflect.DeepEqual(expPub, pubTopics) {
  169. t.Errorf("Error closing: Expect\n\t%v\nbut got\n\t %v", render.AsCode(expPub), render.AsCode(pubTopics))
  170. }
  171. }
  172. func TestMultipleTopics(t *testing.T) {
  173. reset()
  174. var (
  175. sourceTopics = []string{"h/d1/c1/s2", "h/+/+/s1", "h/d3/#", "h/d1/c1/s2", "h/+/c1/s1"}
  176. sinkTopics = []string{"h/d1/c1/s1", "h/d1/c1/s2", "h/d2/c2/s1", "h/d3/c3/s1"}
  177. sinkData = [][]map[string]interface{}{
  178. {
  179. {
  180. "id": 1,
  181. "temp": 23,
  182. },
  183. {
  184. "id": 2,
  185. "temp": 34,
  186. },
  187. {
  188. "id": 3,
  189. "temp": 28,
  190. },
  191. }, {
  192. {
  193. "id": 4,
  194. "color": "red",
  195. },
  196. {
  197. "id": 5,
  198. "color": "red",
  199. },
  200. {
  201. "id": 6,
  202. "color": "green",
  203. },
  204. }, {
  205. {
  206. "id": 7,
  207. "hum": 67.5,
  208. },
  209. {
  210. "id": 8,
  211. "hum": 77.1,
  212. },
  213. {
  214. "id": 9,
  215. "hum": 90.3,
  216. },
  217. }, {
  218. {
  219. "id": 10,
  220. "status": "on",
  221. },
  222. {
  223. "id": 11,
  224. "status": "off",
  225. },
  226. {
  227. "id": 12,
  228. "status": "on",
  229. },
  230. },
  231. }
  232. expected = []api.SourceTuple{
  233. &api.DefaultSourceTuple{
  234. Mess: map[string]interface{}{
  235. "id": 1,
  236. "temp": 23,
  237. },
  238. M: map[string]interface{}{
  239. "topic": "h/d1/c1/s1",
  240. },
  241. },
  242. &api.DefaultSourceTuple{
  243. Mess: map[string]interface{}{
  244. "id": 1,
  245. "temp": 23,
  246. },
  247. M: map[string]interface{}{
  248. "topic": "h/d1/c1/s1",
  249. },
  250. },
  251. &api.DefaultSourceTuple{
  252. Mess: map[string]interface{}{
  253. "id": 4,
  254. "color": "red",
  255. },
  256. M: map[string]interface{}{
  257. "topic": "h/d1/c1/s2",
  258. },
  259. },
  260. &api.DefaultSourceTuple{
  261. Mess: map[string]interface{}{
  262. "id": 4,
  263. "color": "red",
  264. },
  265. M: map[string]interface{}{
  266. "topic": "h/d1/c1/s2",
  267. },
  268. },
  269. &api.DefaultSourceTuple{
  270. Mess: map[string]interface{}{
  271. "id": 7,
  272. "hum": 67.5,
  273. },
  274. M: map[string]interface{}{
  275. "topic": "h/d2/c2/s1",
  276. },
  277. },
  278. &api.DefaultSourceTuple{
  279. Mess: map[string]interface{}{
  280. "id": 10,
  281. "status": "on",
  282. },
  283. M: map[string]interface{}{
  284. "topic": "h/d3/c3/s1",
  285. },
  286. },
  287. &api.DefaultSourceTuple{
  288. Mess: map[string]interface{}{
  289. "id": 10,
  290. "status": "on",
  291. },
  292. M: map[string]interface{}{
  293. "topic": "h/d3/c3/s1",
  294. },
  295. },
  296. &api.DefaultSourceTuple{
  297. Mess: map[string]interface{}{
  298. "id": 2,
  299. "temp": 34,
  300. },
  301. M: map[string]interface{}{
  302. "topic": "h/d1/c1/s1",
  303. },
  304. },
  305. &api.DefaultSourceTuple{
  306. Mess: map[string]interface{}{
  307. "id": 2,
  308. "temp": 34,
  309. },
  310. M: map[string]interface{}{
  311. "topic": "h/d1/c1/s1",
  312. },
  313. },
  314. &api.DefaultSourceTuple{
  315. Mess: map[string]interface{}{
  316. "id": 5,
  317. "color": "red",
  318. },
  319. M: map[string]interface{}{
  320. "topic": "h/d1/c1/s2",
  321. },
  322. },
  323. &api.DefaultSourceTuple{
  324. Mess: map[string]interface{}{
  325. "id": 5,
  326. "color": "red",
  327. },
  328. M: map[string]interface{}{
  329. "topic": "h/d1/c1/s2",
  330. },
  331. },
  332. &api.DefaultSourceTuple{
  333. Mess: map[string]interface{}{
  334. "id": 8,
  335. "hum": 77.1,
  336. },
  337. M: map[string]interface{}{
  338. "topic": "h/d2/c2/s1",
  339. },
  340. },
  341. &api.DefaultSourceTuple{
  342. Mess: map[string]interface{}{
  343. "id": 11,
  344. "status": "off",
  345. },
  346. M: map[string]interface{}{
  347. "topic": "h/d3/c3/s1",
  348. },
  349. },
  350. &api.DefaultSourceTuple{
  351. Mess: map[string]interface{}{
  352. "id": 11,
  353. "status": "off",
  354. },
  355. M: map[string]interface{}{
  356. "topic": "h/d3/c3/s1",
  357. },
  358. },
  359. &api.DefaultSourceTuple{
  360. Mess: map[string]interface{}{
  361. "id": 3,
  362. "temp": 28,
  363. },
  364. M: map[string]interface{}{
  365. "topic": "h/d1/c1/s1",
  366. },
  367. },
  368. &api.DefaultSourceTuple{
  369. Mess: map[string]interface{}{
  370. "id": 3,
  371. "temp": 28,
  372. },
  373. M: map[string]interface{}{
  374. "topic": "h/d1/c1/s1",
  375. },
  376. },
  377. &api.DefaultSourceTuple{
  378. Mess: map[string]interface{}{
  379. "id": 6,
  380. "color": "green",
  381. },
  382. M: map[string]interface{}{
  383. "topic": "h/d1/c1/s2",
  384. },
  385. },
  386. &api.DefaultSourceTuple{
  387. Mess: map[string]interface{}{
  388. "id": 6,
  389. "color": "green",
  390. },
  391. M: map[string]interface{}{
  392. "topic": "h/d1/c1/s2",
  393. },
  394. },
  395. &api.DefaultSourceTuple{
  396. Mess: map[string]interface{}{
  397. "id": 9,
  398. "hum": 90.3,
  399. },
  400. M: map[string]interface{}{
  401. "topic": "h/d2/c2/s1",
  402. },
  403. },
  404. &api.DefaultSourceTuple{
  405. Mess: map[string]interface{}{
  406. "id": 12,
  407. "status": "on",
  408. },
  409. M: map[string]interface{}{
  410. "topic": "h/d3/c3/s1",
  411. },
  412. },
  413. &api.DefaultSourceTuple{
  414. Mess: map[string]interface{}{
  415. "id": 12,
  416. "status": "on",
  417. },
  418. M: map[string]interface{}{
  419. "topic": "h/d3/c3/s1",
  420. },
  421. },
  422. }
  423. )
  424. contextLogger := conf.Log.WithField("rule", "test")
  425. ctx, cancel := context.WithValue(context.Background(), context.LoggerKey, contextLogger).WithCancel()
  426. consumer := make(chan api.SourceTuple)
  427. errorChannel := make(chan error)
  428. count := 0
  429. for _, topic := range sinkTopics {
  430. snk := GetSink()
  431. err := snk.Configure(map[string]interface{}{"topic": topic})
  432. if err != nil {
  433. t.Error(err)
  434. return
  435. }
  436. err = snk.Open(ctx)
  437. if err != nil {
  438. t.Error(err)
  439. return
  440. }
  441. src := GetSource()
  442. err = src.Configure(sourceTopics[count], make(map[string]interface{}))
  443. if err != nil {
  444. t.Error(err)
  445. return
  446. }
  447. go func(c int) {
  448. nc := ctx.WithMeta("rule1", fmt.Sprintf("op%d", c), &state.MemoryStore{})
  449. src.Open(nc, consumer, errorChannel)
  450. }(count)
  451. count++
  452. }
  453. for count < len(sourceTopics) {
  454. src := GetSource()
  455. err := src.Configure(sourceTopics[count], make(map[string]interface{}))
  456. if err != nil {
  457. t.Error(err)
  458. return
  459. }
  460. go func(c int) {
  461. nc := ctx.WithMeta("rule1", fmt.Sprintf("op%d", c), &state.MemoryStore{})
  462. src.Open(nc, consumer, errorChannel)
  463. }(count)
  464. count++
  465. }
  466. go func() {
  467. c := 0
  468. for c < 3 {
  469. for i, v := range sinkData {
  470. time.Sleep(10 * time.Millisecond)
  471. Produce(ctx, sinkTopics[i], v[c])
  472. }
  473. c++
  474. }
  475. cancel()
  476. time.Sleep(100 * time.Millisecond)
  477. close(consumer)
  478. }()
  479. var results []api.SourceTuple
  480. for res := range consumer {
  481. results = append(results, res)
  482. }
  483. if !reflect.DeepEqual(expected, results) {
  484. t.Errorf("Expect\t %v\n but got\t\t\t\t %v", render.AsCode(expected), render.AsCode(results))
  485. }
  486. }