manager_test.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package memory
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "github.com/gdexlab/go-render/render"
  19. "github.com/lf-edge/ekuiper/internal/conf"
  20. "github.com/lf-edge/ekuiper/internal/topo/context"
  21. "github.com/lf-edge/ekuiper/internal/topo/state"
  22. "github.com/lf-edge/ekuiper/pkg/api"
  23. "reflect"
  24. "testing"
  25. "time"
  26. )
  27. func reset() {
  28. pubTopics = make(map[string]*pubConsumers)
  29. subExps = make(map[string]*subChan)
  30. }
  31. func TestSharedInmemoryNode(t *testing.T) {
  32. reset()
  33. id := "test_id"
  34. sinkProps := make(map[string]interface{})
  35. sinkProps[IdProperty] = id
  36. src := GetSource()
  37. snk := GetSink()
  38. contextLogger := conf.Log.WithField("rule", "test")
  39. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  40. consumer := make(chan api.SourceTuple)
  41. errorChannel := make(chan error)
  42. srcProps := make(map[string]interface{})
  43. srcProps["option"] = "value"
  44. err := snk.Configure(sinkProps)
  45. if err != nil {
  46. t.Error(err)
  47. return
  48. }
  49. err = snk.Open(ctx)
  50. if err != nil {
  51. t.Error(err)
  52. return
  53. }
  54. srcProps[IdProperty] = id
  55. err = src.Configure(id, srcProps)
  56. if err != nil {
  57. t.Error(err)
  58. }
  59. go func() {
  60. src.Open(ctx, consumer, errorChannel)
  61. }()
  62. if _, contains := pubTopics[id]; !contains {
  63. t.Errorf("there should be memory node for topic")
  64. }
  65. data := make(map[string]interface{})
  66. data["temperature"] = 33.0
  67. list := make([]map[string]interface{}, 0)
  68. list = append(list, data)
  69. go func() {
  70. var buf []byte
  71. buf, err = asJsonBytes(list)
  72. if err != nil {
  73. t.Error(err)
  74. }
  75. err = snk.Collect(ctx, buf)
  76. if err != nil {
  77. t.Error(err)
  78. }
  79. }()
  80. for {
  81. select {
  82. case res := <-consumer:
  83. expected := api.NewDefaultSourceTuple(data, make(map[string]interface{}))
  84. if !reflect.DeepEqual(expected, res) {
  85. t.Errorf("result %s should be equal to %s", res, expected)
  86. }
  87. return
  88. default:
  89. }
  90. }
  91. }
  92. func TestCreateAndClose(t *testing.T) {
  93. reset()
  94. var (
  95. sourceTopics = []string{"h/d1/c1/s2", "h/+/+/s1", "h/d3/#", "h/d1/c1/s2", "h/+/c1/s1"}
  96. sinkTopics = []string{"h/d1/c1/s1", "h/d1/c1/s2", "h/d2/c2/s1", "h/d3/c3/s1", "h/d1/c1/s1"}
  97. chans []chan map[string]interface{}
  98. )
  99. for i, topic := range sinkTopics {
  100. createPub(topic)
  101. r, err := getRegexp(sourceTopics[i])
  102. if err != nil {
  103. t.Error(err)
  104. return
  105. }
  106. c := createSub(sourceTopics[i], r, fmt.Sprintf("%d", i))
  107. chans = append(chans, c)
  108. }
  109. expPub := map[string]*pubConsumers{
  110. "h/d1/c1/s1": {
  111. count: 2,
  112. consumers: map[string]chan map[string]interface{}{
  113. "1": chans[1],
  114. "4": chans[4],
  115. },
  116. },
  117. "h/d1/c1/s2": {
  118. count: 1,
  119. consumers: map[string]chan map[string]interface{}{
  120. "0": chans[0],
  121. "3": chans[3],
  122. },
  123. },
  124. "h/d2/c2/s1": {
  125. count: 1,
  126. consumers: map[string]chan map[string]interface{}{
  127. "1": chans[1],
  128. },
  129. },
  130. "h/d3/c3/s1": {
  131. count: 1,
  132. consumers: map[string]chan map[string]interface{}{
  133. "1": chans[1],
  134. "2": chans[2],
  135. },
  136. },
  137. }
  138. if !reflect.DeepEqual(expPub, pubTopics) {
  139. t.Errorf("Error adding: Expect\n\t%v\nbut got\n\t%v", render.AsCode(expPub), render.AsCode(pubTopics))
  140. return
  141. }
  142. i := 0
  143. for i < 3 {
  144. closeSourceConsumerChannel(sourceTopics[i], fmt.Sprintf("%d", i))
  145. closeSink(sinkTopics[i])
  146. i++
  147. }
  148. expPub = map[string]*pubConsumers{
  149. "h/d1/c1/s1": {
  150. count: 1,
  151. consumers: map[string]chan map[string]interface{}{
  152. "4": chans[4],
  153. },
  154. },
  155. "h/d1/c1/s2": {
  156. count: 0,
  157. consumers: map[string]chan map[string]interface{}{
  158. "3": chans[3],
  159. },
  160. },
  161. "h/d3/c3/s1": {
  162. count: 1,
  163. consumers: map[string]chan map[string]interface{}{},
  164. },
  165. }
  166. if !reflect.DeepEqual(expPub, pubTopics) {
  167. t.Errorf("Error closing: Expect\n\t%v\nbut got\n\t %v", render.AsCode(expPub), render.AsCode(pubTopics))
  168. }
  169. }
  170. func TestMultipleTopics(t *testing.T) {
  171. reset()
  172. var (
  173. sourceTopics = []string{"h/d1/c1/s2", "h/+/+/s1", "h/d3/#", "h/d1/c1/s2", "h/+/c1/s1"}
  174. sinkTopics = []string{"h/d1/c1/s1", "h/d1/c1/s2", "h/d2/c2/s1", "h/d3/c3/s1"}
  175. sinkData = [][]map[string]interface{}{
  176. {
  177. {
  178. "id": 1,
  179. "temp": 23,
  180. },
  181. {
  182. "id": 2,
  183. "temp": 34,
  184. },
  185. {
  186. "id": 3,
  187. "temp": 28,
  188. },
  189. }, {
  190. {
  191. "id": 4,
  192. "color": "red",
  193. },
  194. {
  195. "id": 5,
  196. "color": "red",
  197. },
  198. {
  199. "id": 6,
  200. "color": "green",
  201. },
  202. }, {
  203. {
  204. "id": 7,
  205. "hum": 67.5,
  206. },
  207. {
  208. "id": 8,
  209. "hum": 77.1,
  210. },
  211. {
  212. "id": 9,
  213. "hum": 90.3,
  214. },
  215. }, {
  216. {
  217. "id": 10,
  218. "status": "on",
  219. },
  220. {
  221. "id": 11,
  222. "status": "off",
  223. },
  224. {
  225. "id": 12,
  226. "status": "on",
  227. },
  228. },
  229. }
  230. expected = []api.SourceTuple{
  231. &api.DefaultSourceTuple{
  232. Mess: map[string]interface{}{
  233. "id": 1,
  234. "temp": 23,
  235. },
  236. M: make(map[string]interface{}),
  237. },
  238. &api.DefaultSourceTuple{
  239. Mess: map[string]interface{}{
  240. "id": 1,
  241. "temp": 23,
  242. },
  243. M: make(map[string]interface{}),
  244. },
  245. &api.DefaultSourceTuple{
  246. Mess: map[string]interface{}{
  247. "id": 4,
  248. "color": "red",
  249. },
  250. M: make(map[string]interface{}),
  251. },
  252. &api.DefaultSourceTuple{
  253. Mess: map[string]interface{}{
  254. "id": 4,
  255. "color": "red",
  256. },
  257. M: make(map[string]interface{}),
  258. },
  259. &api.DefaultSourceTuple{
  260. Mess: map[string]interface{}{
  261. "id": 7,
  262. "hum": 67.5,
  263. },
  264. M: make(map[string]interface{}),
  265. },
  266. &api.DefaultSourceTuple{
  267. Mess: map[string]interface{}{
  268. "id": 10,
  269. "status": "on",
  270. },
  271. M: make(map[string]interface{}),
  272. },
  273. &api.DefaultSourceTuple{
  274. Mess: map[string]interface{}{
  275. "id": 10,
  276. "status": "on",
  277. },
  278. M: make(map[string]interface{}),
  279. },
  280. &api.DefaultSourceTuple{
  281. Mess: map[string]interface{}{
  282. "id": 2,
  283. "temp": 34,
  284. },
  285. M: make(map[string]interface{}),
  286. },
  287. &api.DefaultSourceTuple{
  288. Mess: map[string]interface{}{
  289. "id": 2,
  290. "temp": 34,
  291. },
  292. M: make(map[string]interface{}),
  293. },
  294. &api.DefaultSourceTuple{
  295. Mess: map[string]interface{}{
  296. "id": 5,
  297. "color": "red",
  298. },
  299. M: make(map[string]interface{}),
  300. },
  301. &api.DefaultSourceTuple{
  302. Mess: map[string]interface{}{
  303. "id": 5,
  304. "color": "red",
  305. },
  306. M: make(map[string]interface{}),
  307. },
  308. &api.DefaultSourceTuple{
  309. Mess: map[string]interface{}{
  310. "id": 8,
  311. "hum": 77.1,
  312. },
  313. M: make(map[string]interface{}),
  314. },
  315. &api.DefaultSourceTuple{
  316. Mess: map[string]interface{}{
  317. "id": 11,
  318. "status": "off",
  319. },
  320. M: make(map[string]interface{}),
  321. },
  322. &api.DefaultSourceTuple{
  323. Mess: map[string]interface{}{
  324. "id": 11,
  325. "status": "off",
  326. },
  327. M: make(map[string]interface{}),
  328. },
  329. &api.DefaultSourceTuple{
  330. Mess: map[string]interface{}{
  331. "id": 3,
  332. "temp": 28,
  333. },
  334. M: make(map[string]interface{}),
  335. },
  336. &api.DefaultSourceTuple{
  337. Mess: map[string]interface{}{
  338. "id": 3,
  339. "temp": 28,
  340. },
  341. M: make(map[string]interface{}),
  342. },
  343. &api.DefaultSourceTuple{
  344. Mess: map[string]interface{}{
  345. "id": 6,
  346. "color": "green",
  347. },
  348. M: make(map[string]interface{}),
  349. },
  350. &api.DefaultSourceTuple{
  351. Mess: map[string]interface{}{
  352. "id": 6,
  353. "color": "green",
  354. },
  355. M: make(map[string]interface{}),
  356. },
  357. &api.DefaultSourceTuple{
  358. Mess: map[string]interface{}{
  359. "id": 9,
  360. "hum": 90.3,
  361. },
  362. M: make(map[string]interface{}),
  363. },
  364. &api.DefaultSourceTuple{
  365. Mess: map[string]interface{}{
  366. "id": 12,
  367. "status": "on",
  368. },
  369. M: make(map[string]interface{}),
  370. },
  371. &api.DefaultSourceTuple{
  372. Mess: map[string]interface{}{
  373. "id": 12,
  374. "status": "on",
  375. },
  376. M: make(map[string]interface{}),
  377. },
  378. }
  379. )
  380. contextLogger := conf.Log.WithField("rule", "test")
  381. ctx, cancel := context.WithValue(context.Background(), context.LoggerKey, contextLogger).WithCancel()
  382. consumer := make(chan api.SourceTuple)
  383. errorChannel := make(chan error)
  384. count := 0
  385. for _, topic := range sinkTopics {
  386. snk := GetSink()
  387. err := snk.Configure(map[string]interface{}{"topic": topic})
  388. if err != nil {
  389. t.Error(err)
  390. return
  391. }
  392. err = snk.Open(ctx)
  393. if err != nil {
  394. t.Error(err)
  395. return
  396. }
  397. src := GetSource()
  398. err = src.Configure(sourceTopics[count], make(map[string]interface{}))
  399. if err != nil {
  400. t.Error(err)
  401. return
  402. }
  403. go func(c int) {
  404. nc := ctx.WithMeta("rule1", fmt.Sprintf("op%d", c), &state.MemoryStore{})
  405. src.Open(nc, consumer, errorChannel)
  406. }(count)
  407. count++
  408. }
  409. for count < len(sourceTopics) {
  410. src := GetSource()
  411. err := src.Configure(sourceTopics[count], make(map[string]interface{}))
  412. if err != nil {
  413. t.Error(err)
  414. return
  415. }
  416. go func(c int) {
  417. nc := ctx.WithMeta("rule1", fmt.Sprintf("op%d", c), &state.MemoryStore{})
  418. src.Open(nc, consumer, errorChannel)
  419. }(count)
  420. count++
  421. }
  422. go func() {
  423. c := 0
  424. for c < 3 {
  425. for i, v := range sinkData {
  426. time.Sleep(10 * time.Millisecond)
  427. produce(ctx, sinkTopics[i], v[c])
  428. }
  429. c++
  430. }
  431. cancel()
  432. time.Sleep(100 * time.Millisecond)
  433. close(consumer)
  434. }()
  435. var results []api.SourceTuple
  436. for res := range consumer {
  437. results = append(results, res)
  438. }
  439. if !reflect.DeepEqual(expected, results) {
  440. t.Errorf("Expect\t %v\n but got\t\t\t %v", render.AsCode(expected), render.AsCode(results))
  441. }
  442. }
  443. func asJsonBytes(m []map[string]interface{}) ([]byte, error) {
  444. return json.Marshal(m)
  445. }