manager_test.go 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package memory
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "github.com/gdexlab/go-render/render"
  19. "github.com/lf-edge/ekuiper/internal/conf"
  20. "github.com/lf-edge/ekuiper/internal/topo/context"
  21. "github.com/lf-edge/ekuiper/internal/topo/state"
  22. "github.com/lf-edge/ekuiper/pkg/api"
  23. "reflect"
  24. "regexp"
  25. "strings"
  26. "testing"
  27. "time"
  28. )
  29. func reset() {
  30. pubTopics = make(map[string]*pubConsumers)
  31. subExps = make(map[string]*subChan)
  32. }
  33. func TestSharedInmemoryNode(t *testing.T) {
  34. reset()
  35. id := "test_id"
  36. sinkProps := make(map[string]interface{})
  37. sinkProps[IdProperty] = id
  38. src := GetSource()
  39. snk := GetSink()
  40. contextLogger := conf.Log.WithField("rule", "test")
  41. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  42. consumer := make(chan api.SourceTuple)
  43. errorChannel := make(chan error)
  44. srcProps := make(map[string]interface{})
  45. srcProps["option"] = "value"
  46. err := snk.Configure(sinkProps)
  47. if err != nil {
  48. t.Error(err)
  49. return
  50. }
  51. err = snk.Open(ctx)
  52. if err != nil {
  53. t.Error(err)
  54. return
  55. }
  56. srcProps[IdProperty] = id
  57. err = src.Configure(id, srcProps)
  58. if err != nil {
  59. t.Error(err)
  60. }
  61. go func() {
  62. src.Open(ctx, consumer, errorChannel)
  63. }()
  64. if _, contains := pubTopics[id]; !contains {
  65. t.Errorf("there should be memory node for topic")
  66. }
  67. data := make(map[string]interface{})
  68. data["temperature"] = 33.0
  69. list := make([]map[string]interface{}, 0)
  70. list = append(list, data)
  71. go func() {
  72. var buf []byte
  73. buf, err = asJsonBytes(list)
  74. if err != nil {
  75. t.Error(err)
  76. }
  77. err = snk.Collect(ctx, buf)
  78. if err != nil {
  79. t.Error(err)
  80. }
  81. }()
  82. for {
  83. select {
  84. case res := <-consumer:
  85. expected := api.NewDefaultSourceTuple(data, make(map[string]interface{}))
  86. if !reflect.DeepEqual(expected, res) {
  87. t.Errorf("result %s should be equal to %s", res, expected)
  88. }
  89. return
  90. default:
  91. }
  92. }
  93. }
  94. func TestCreateAndClose(t *testing.T) {
  95. reset()
  96. var (
  97. sourceTopics = []string{"h/d1/c1/s2", "h/+/+/s1", "h/d3/#", "h/d1/c1/s2", "h/+/c1/s1"}
  98. sinkTopics = []string{"h/d1/c1/s1", "h/d1/c1/s2", "h/d2/c2/s1", "h/d3/c3/s1", "h/d1/c1/s1"}
  99. chans []chan map[string]interface{}
  100. )
  101. for i, topic := range sinkTopics {
  102. createPub(topic)
  103. var (
  104. r *regexp.Regexp
  105. err error
  106. )
  107. if strings.ContainsAny(sourceTopics[i], "+#") {
  108. r, err = getRegexp(sourceTopics[i])
  109. if err != nil {
  110. t.Error(err)
  111. return
  112. }
  113. }
  114. c := createSub(sourceTopics[i], r, fmt.Sprintf("%d", i))
  115. chans = append(chans, c)
  116. }
  117. expPub := map[string]*pubConsumers{
  118. "h/d1/c1/s1": {
  119. count: 2,
  120. consumers: map[string]chan map[string]interface{}{
  121. "1": chans[1],
  122. "4": chans[4],
  123. },
  124. },
  125. "h/d1/c1/s2": {
  126. count: 1,
  127. consumers: map[string]chan map[string]interface{}{
  128. "0": chans[0],
  129. "3": chans[3],
  130. },
  131. },
  132. "h/d2/c2/s1": {
  133. count: 1,
  134. consumers: map[string]chan map[string]interface{}{
  135. "1": chans[1],
  136. },
  137. },
  138. "h/d3/c3/s1": {
  139. count: 1,
  140. consumers: map[string]chan map[string]interface{}{
  141. "1": chans[1],
  142. "2": chans[2],
  143. },
  144. },
  145. }
  146. if !reflect.DeepEqual(expPub, pubTopics) {
  147. t.Errorf("Error adding: Expect\n\t%v\nbut got\n\t%v", render.AsCode(expPub), render.AsCode(pubTopics))
  148. return
  149. }
  150. i := 0
  151. for i < 3 {
  152. closeSourceConsumerChannel(sourceTopics[i], fmt.Sprintf("%d", i))
  153. closeSink(sinkTopics[i])
  154. i++
  155. }
  156. expPub = map[string]*pubConsumers{
  157. "h/d1/c1/s1": {
  158. count: 1,
  159. consumers: map[string]chan map[string]interface{}{
  160. "4": chans[4],
  161. },
  162. },
  163. "h/d1/c1/s2": {
  164. count: 0,
  165. consumers: map[string]chan map[string]interface{}{
  166. "3": chans[3],
  167. },
  168. },
  169. "h/d3/c3/s1": {
  170. count: 1,
  171. consumers: map[string]chan map[string]interface{}{},
  172. },
  173. }
  174. if !reflect.DeepEqual(expPub, pubTopics) {
  175. t.Errorf("Error closing: Expect\n\t%v\nbut got\n\t %v", render.AsCode(expPub), render.AsCode(pubTopics))
  176. }
  177. }
  178. func TestMultipleTopics(t *testing.T) {
  179. reset()
  180. var (
  181. sourceTopics = []string{"h/d1/c1/s2", "h/+/+/s1", "h/d3/#", "h/d1/c1/s2", "h/+/c1/s1"}
  182. sinkTopics = []string{"h/d1/c1/s1", "h/d1/c1/s2", "h/d2/c2/s1", "h/d3/c3/s1"}
  183. sinkData = [][]map[string]interface{}{
  184. {
  185. {
  186. "id": 1,
  187. "temp": 23,
  188. },
  189. {
  190. "id": 2,
  191. "temp": 34,
  192. },
  193. {
  194. "id": 3,
  195. "temp": 28,
  196. },
  197. }, {
  198. {
  199. "id": 4,
  200. "color": "red",
  201. },
  202. {
  203. "id": 5,
  204. "color": "red",
  205. },
  206. {
  207. "id": 6,
  208. "color": "green",
  209. },
  210. }, {
  211. {
  212. "id": 7,
  213. "hum": 67.5,
  214. },
  215. {
  216. "id": 8,
  217. "hum": 77.1,
  218. },
  219. {
  220. "id": 9,
  221. "hum": 90.3,
  222. },
  223. }, {
  224. {
  225. "id": 10,
  226. "status": "on",
  227. },
  228. {
  229. "id": 11,
  230. "status": "off",
  231. },
  232. {
  233. "id": 12,
  234. "status": "on",
  235. },
  236. },
  237. }
  238. expected = []api.SourceTuple{
  239. &api.DefaultSourceTuple{
  240. Mess: map[string]interface{}{
  241. "id": 1,
  242. "temp": 23,
  243. },
  244. M: make(map[string]interface{}),
  245. },
  246. &api.DefaultSourceTuple{
  247. Mess: map[string]interface{}{
  248. "id": 1,
  249. "temp": 23,
  250. },
  251. M: make(map[string]interface{}),
  252. },
  253. &api.DefaultSourceTuple{
  254. Mess: map[string]interface{}{
  255. "id": 4,
  256. "color": "red",
  257. },
  258. M: make(map[string]interface{}),
  259. },
  260. &api.DefaultSourceTuple{
  261. Mess: map[string]interface{}{
  262. "id": 4,
  263. "color": "red",
  264. },
  265. M: make(map[string]interface{}),
  266. },
  267. &api.DefaultSourceTuple{
  268. Mess: map[string]interface{}{
  269. "id": 7,
  270. "hum": 67.5,
  271. },
  272. M: make(map[string]interface{}),
  273. },
  274. &api.DefaultSourceTuple{
  275. Mess: map[string]interface{}{
  276. "id": 10,
  277. "status": "on",
  278. },
  279. M: make(map[string]interface{}),
  280. },
  281. &api.DefaultSourceTuple{
  282. Mess: map[string]interface{}{
  283. "id": 10,
  284. "status": "on",
  285. },
  286. M: make(map[string]interface{}),
  287. },
  288. &api.DefaultSourceTuple{
  289. Mess: map[string]interface{}{
  290. "id": 2,
  291. "temp": 34,
  292. },
  293. M: make(map[string]interface{}),
  294. },
  295. &api.DefaultSourceTuple{
  296. Mess: map[string]interface{}{
  297. "id": 2,
  298. "temp": 34,
  299. },
  300. M: make(map[string]interface{}),
  301. },
  302. &api.DefaultSourceTuple{
  303. Mess: map[string]interface{}{
  304. "id": 5,
  305. "color": "red",
  306. },
  307. M: make(map[string]interface{}),
  308. },
  309. &api.DefaultSourceTuple{
  310. Mess: map[string]interface{}{
  311. "id": 5,
  312. "color": "red",
  313. },
  314. M: make(map[string]interface{}),
  315. },
  316. &api.DefaultSourceTuple{
  317. Mess: map[string]interface{}{
  318. "id": 8,
  319. "hum": 77.1,
  320. },
  321. M: make(map[string]interface{}),
  322. },
  323. &api.DefaultSourceTuple{
  324. Mess: map[string]interface{}{
  325. "id": 11,
  326. "status": "off",
  327. },
  328. M: make(map[string]interface{}),
  329. },
  330. &api.DefaultSourceTuple{
  331. Mess: map[string]interface{}{
  332. "id": 11,
  333. "status": "off",
  334. },
  335. M: make(map[string]interface{}),
  336. },
  337. &api.DefaultSourceTuple{
  338. Mess: map[string]interface{}{
  339. "id": 3,
  340. "temp": 28,
  341. },
  342. M: make(map[string]interface{}),
  343. },
  344. &api.DefaultSourceTuple{
  345. Mess: map[string]interface{}{
  346. "id": 3,
  347. "temp": 28,
  348. },
  349. M: make(map[string]interface{}),
  350. },
  351. &api.DefaultSourceTuple{
  352. Mess: map[string]interface{}{
  353. "id": 6,
  354. "color": "green",
  355. },
  356. M: make(map[string]interface{}),
  357. },
  358. &api.DefaultSourceTuple{
  359. Mess: map[string]interface{}{
  360. "id": 6,
  361. "color": "green",
  362. },
  363. M: make(map[string]interface{}),
  364. },
  365. &api.DefaultSourceTuple{
  366. Mess: map[string]interface{}{
  367. "id": 9,
  368. "hum": 90.3,
  369. },
  370. M: make(map[string]interface{}),
  371. },
  372. &api.DefaultSourceTuple{
  373. Mess: map[string]interface{}{
  374. "id": 12,
  375. "status": "on",
  376. },
  377. M: make(map[string]interface{}),
  378. },
  379. &api.DefaultSourceTuple{
  380. Mess: map[string]interface{}{
  381. "id": 12,
  382. "status": "on",
  383. },
  384. M: make(map[string]interface{}),
  385. },
  386. }
  387. )
  388. contextLogger := conf.Log.WithField("rule", "test")
  389. ctx, cancel := context.WithValue(context.Background(), context.LoggerKey, contextLogger).WithCancel()
  390. consumer := make(chan api.SourceTuple)
  391. errorChannel := make(chan error)
  392. count := 0
  393. for _, topic := range sinkTopics {
  394. snk := GetSink()
  395. err := snk.Configure(map[string]interface{}{"topic": topic})
  396. if err != nil {
  397. t.Error(err)
  398. return
  399. }
  400. err = snk.Open(ctx)
  401. if err != nil {
  402. t.Error(err)
  403. return
  404. }
  405. src := GetSource()
  406. err = src.Configure(sourceTopics[count], make(map[string]interface{}))
  407. if err != nil {
  408. t.Error(err)
  409. return
  410. }
  411. go func(c int) {
  412. nc := ctx.WithMeta("rule1", fmt.Sprintf("op%d", c), &state.MemoryStore{})
  413. src.Open(nc, consumer, errorChannel)
  414. }(count)
  415. count++
  416. }
  417. for count < len(sourceTopics) {
  418. src := GetSource()
  419. err := src.Configure(sourceTopics[count], make(map[string]interface{}))
  420. if err != nil {
  421. t.Error(err)
  422. return
  423. }
  424. go func(c int) {
  425. nc := ctx.WithMeta("rule1", fmt.Sprintf("op%d", c), &state.MemoryStore{})
  426. src.Open(nc, consumer, errorChannel)
  427. }(count)
  428. count++
  429. }
  430. go func() {
  431. c := 0
  432. for c < 3 {
  433. for i, v := range sinkData {
  434. time.Sleep(10 * time.Millisecond)
  435. produce(ctx, sinkTopics[i], v[c])
  436. }
  437. c++
  438. }
  439. cancel()
  440. time.Sleep(100 * time.Millisecond)
  441. close(consumer)
  442. }()
  443. var results []api.SourceTuple
  444. for res := range consumer {
  445. results = append(results, res)
  446. }
  447. if !reflect.DeepEqual(expected, results) {
  448. t.Errorf("Expect\t %v\n but got\t\t\t %v", render.AsCode(expected), render.AsCode(results))
  449. }
  450. }
  451. func asJsonBytes(m []map[string]interface{}) ([]byte, error) {
  452. return json.Marshal(m)
  453. }