planner_graph_test.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package planner
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "github.com/lf-edge/ekuiper/internal/testx"
  19. "github.com/lf-edge/ekuiper/pkg/api"
  20. "reflect"
  21. "testing"
  22. )
  23. func TestPlannerGraphValidate(t *testing.T) {
  24. var tests = []struct {
  25. graph string
  26. err string
  27. }{
  28. {
  29. graph: `{
  30. "nodes": {
  31. "abc": {
  32. "type": "source",
  33. "nodeType": "mqtt",
  34. "props": {
  35. "datasource": "demo"
  36. }
  37. },
  38. "myfilter": {
  39. "type": "operator",
  40. "nodeType": "filter",
  41. "props": {
  42. "expr": "temperature > 20"
  43. }
  44. },
  45. "logfunc": {
  46. "type": "operator",
  47. "nodeType": "function",
  48. "props": {
  49. "expr": "log(temperature) as log_temperature"
  50. }
  51. },
  52. "sinfunc": {
  53. "type": "operator",
  54. "nodeType": "function",
  55. "props": {
  56. "expr": "sin(temperature) as sin_temperature"
  57. }
  58. },
  59. "pick": {
  60. "type": "operator",
  61. "nodeType": "pick",
  62. "props": {
  63. "fields": [
  64. "log_temperature",
  65. "humidity"
  66. ]
  67. }
  68. },
  69. "mqttpv": {
  70. "type": "sink",
  71. "nodeType": "mqtt",
  72. "props": {
  73. "server": "tcp://syno.home:1883",
  74. "topic": "result",
  75. "sendSingle": true
  76. }
  77. },
  78. "mqtt2": {
  79. "type": "sink",
  80. "nodeType": "mqtt",
  81. "props": {
  82. "server": "tcp://syno.home:1883",
  83. "topic": "result2",
  84. "sendSingle": true
  85. }
  86. }
  87. },
  88. "topo": {
  89. "sources": [
  90. "abc"
  91. ],
  92. "edges": {
  93. "abc": [
  94. "myfilter",
  95. "sinfunc"
  96. ],
  97. "myfilter": [
  98. "logfunc"
  99. ],
  100. "logfunc": [
  101. "pick"
  102. ],
  103. "pick": [
  104. "mqttpv"
  105. ],
  106. "sinfunc": [
  107. "mqtt2"
  108. ]
  109. }
  110. }
  111. }`,
  112. err: "",
  113. }, {
  114. graph: `{
  115. "nodes": {
  116. "abc": {
  117. "type": "source",
  118. "nodeType": "mqtt",
  119. "props": {
  120. "datasource": "demo"
  121. }
  122. },
  123. "mqtt2": {
  124. "type": "sink",
  125. "nodeType": "mqtt",
  126. "props": {
  127. "server": "tcp://syno.home:1883",
  128. "topic": "result2",
  129. "sendSingle": true
  130. }
  131. }
  132. },
  133. "topo": {
  134. "sources": [
  135. "abc"
  136. ],
  137. "edges": {
  138. "abc": [
  139. "myfilter"
  140. ]
  141. }
  142. }
  143. }`,
  144. err: "node myfilter is not defined",
  145. }, {
  146. graph: `{
  147. "nodes": {
  148. "abc": {
  149. "type": "source",
  150. "nodeType": "mqtt",
  151. "props": {
  152. "datasource": "demo"
  153. }
  154. },
  155. "mqtt2": {
  156. "type": "sink",
  157. "nodeType": "mqtt",
  158. "props": {
  159. "server": "tcp://syno.home:1883",
  160. "topic": "result2",
  161. "sendSingle": true
  162. }
  163. }
  164. },
  165. "topo": {
  166. "sources": [
  167. "abc"
  168. ],
  169. "edges": {
  170. }
  171. }
  172. }`,
  173. err: "no edge defined for source node abc",
  174. }, {
  175. graph: `{
  176. "nodes": {
  177. "abc": {
  178. "type": "source",
  179. "nodeType": "mqtt",
  180. "props": {
  181. "datasource": "demo"
  182. }
  183. },
  184. "aggfunc": {
  185. "type": "operator",
  186. "nodeType": "aggfunc",
  187. "props": {
  188. "expr": "avg(temperature) as avg_temperature"
  189. }
  190. },
  191. "mqtt2": {
  192. "type": "sink",
  193. "nodeType": "mqtt",
  194. "props": {
  195. "server": "tcp://syno.home:1883",
  196. "topic": "result2",
  197. "sendSingle": true
  198. }
  199. }
  200. },
  201. "topo": {
  202. "sources": [
  203. "abc"
  204. ],
  205. "edges": {
  206. "abc": ["aggfunc"],
  207. "aggfunc": ["mqtt2"]
  208. }
  209. }
  210. }`,
  211. err: "node abc output does not match node aggfunc input: input type mismatch, expect collection, got row",
  212. }, {
  213. graph: `{
  214. "nodes": {
  215. "abc": {
  216. "type": "source",
  217. "nodeType": "mqtt",
  218. "props": {
  219. "datasource": "demo"
  220. }
  221. },
  222. "abc2": {
  223. "type": "source",
  224. "nodeType": "mqtt",
  225. "props": {
  226. "datasource": "demo1"
  227. }
  228. },
  229. "joinop": {
  230. "type": "operator",
  231. "nodeType": "join",
  232. "props": {
  233. "from": "abc",
  234. "joins": [
  235. {
  236. "name": "abc2",
  237. "type": "inner",
  238. "on": "abc.id = abc2.id"
  239. }
  240. ]
  241. }
  242. },
  243. "mqtt2": {
  244. "type": "sink",
  245. "nodeType": "mqtt",
  246. "props": {
  247. "server": "tcp://syno.home:1883",
  248. "topic": "result2",
  249. "sendSingle": true
  250. }
  251. }
  252. },
  253. "topo": {
  254. "sources": [
  255. "abc","abc2"
  256. ],
  257. "edges": {
  258. "abc": ["joinop"],
  259. "abc2": ["joinop"],
  260. "joinop": ["mqtt2"]
  261. }
  262. }
  263. }`,
  264. err: "operator joinop of type join does not allow multiple inputs",
  265. }, {
  266. graph: `{
  267. "nodes": {
  268. "abc": {
  269. "type": "source",
  270. "nodeType": "mqtt",
  271. "props": {
  272. "datasource": "demo"
  273. }
  274. },
  275. "abc2": {
  276. "type": "source",
  277. "nodeType": "mqtt",
  278. "props": {
  279. "datasource": "demo1"
  280. }
  281. },
  282. "windowop": {
  283. "type": "operator",
  284. "nodeType": "window",
  285. "props": {
  286. "type": "hoppingwindow",
  287. "unit": "ss",
  288. "size": 10,
  289. "interval": 5
  290. }
  291. },
  292. "joinop": {
  293. "type": "operator",
  294. "nodeType": "join",
  295. "props": {
  296. "from": "abc",
  297. "joins": [
  298. {
  299. "name": "abc2",
  300. "type": "inner",
  301. "on": "abc.id = abc2.id"
  302. }
  303. ]
  304. }
  305. },
  306. "groupop": {
  307. "type": "operator",
  308. "nodeType": "groupby",
  309. "props": {
  310. "dimensions": ["id","userId"]
  311. }
  312. },
  313. "mqtt2": {
  314. "type": "sink",
  315. "nodeType": "mqtt",
  316. "props": {
  317. "server": "tcp://syno.home:1883",
  318. "topic": "result2",
  319. "sendSingle": true
  320. }
  321. }
  322. },
  323. "topo": {
  324. "sources": [
  325. "abc","abc2"
  326. ],
  327. "edges": {
  328. "abc": ["windowop"],
  329. "abc2": ["windowop"],
  330. "windowop": ["joinop"],
  331. "joinop": ["groupop"],
  332. "groupop": ["mqtt2"]
  333. }
  334. }
  335. }`,
  336. err: "",
  337. }, {
  338. graph: `{
  339. "nodes": {
  340. "abc": {
  341. "type": "source",
  342. "nodeType": "mqtt",
  343. "props": {
  344. "datasource": "demo"
  345. }
  346. },
  347. "abc2": {
  348. "type": "source",
  349. "nodeType": "mqtt",
  350. "props": {
  351. "datasource": "demo1"
  352. }
  353. },
  354. "windowop": {
  355. "type": "operator",
  356. "nodeType": "window",
  357. "props": {
  358. "type": "hoppingwindow",
  359. "unit": "ss",
  360. "size": 10,
  361. "interval": 5
  362. }
  363. },
  364. "joinop": {
  365. "type": "operator",
  366. "nodeType": "join",
  367. "props": {
  368. "from": "abc",
  369. "joins": [
  370. {
  371. "name": "abc2",
  372. "type": "inner",
  373. "on": "abc.id = abc2.id"
  374. }
  375. ]
  376. }
  377. },
  378. "groupop": {
  379. "type": "operator",
  380. "nodeType": "groupby",
  381. "props": {
  382. "dimensions": ["id","userId"]
  383. }
  384. },
  385. "mqtt2": {
  386. "type": "sink",
  387. "nodeType": "mqtt",
  388. "props": {
  389. "server": "tcp://syno.home:1883",
  390. "topic": "result2",
  391. "sendSingle": true
  392. }
  393. }
  394. },
  395. "topo": {
  396. "sources": [
  397. "abc","abc2"
  398. ],
  399. "edges": {
  400. "abc": ["windowop"],
  401. "abc2": ["windowop"],
  402. "windowop": ["groupop"],
  403. "joinop": ["mqtt2"],
  404. "groupop": ["joinop"]
  405. }
  406. }
  407. }`,
  408. err: "node groupop output does not match node joinop input: collection type mismatch, expect non-grouped collection, got grouped collection",
  409. }, {
  410. graph: `{
  411. "nodes": {
  412. "abc": {
  413. "type": "source",
  414. "nodeType": "mqtt",
  415. "props": {
  416. "datasource": "demo"
  417. }
  418. },
  419. "abc2": {
  420. "type": "source",
  421. "nodeType": "mqtt",
  422. "props": {
  423. "datasource": "demo1"
  424. }
  425. },
  426. "windowop": {
  427. "type": "operator",
  428. "nodeType": "window",
  429. "props": {
  430. "type": "hoppingwindow",
  431. "unit": "ss",
  432. "size": 10,
  433. "interval": 5
  434. }
  435. },
  436. "joinop": {
  437. "type": "operator",
  438. "nodeType": "join",
  439. "props": {
  440. "from": "abc",
  441. "joins": [
  442. {
  443. "name": "abc2",
  444. "type": "inner",
  445. "on": "abc.id = abc2.id"
  446. }
  447. ]
  448. }
  449. },
  450. "groupop": {
  451. "type": "operator",
  452. "nodeType": "groupby",
  453. "props": {
  454. "dimensions": ["id","userId"]
  455. }
  456. },
  457. "aggfunc": {
  458. "type": "operator",
  459. "nodeType": "aggFunc",
  460. "props": {
  461. "expr": "avg(temperature) as avg_temperature"
  462. }
  463. },
  464. "mqtt2": {
  465. "type": "sink",
  466. "nodeType": "mqtt",
  467. "props": {
  468. "server": "tcp://syno.home:1883",
  469. "topic": "result2",
  470. "sendSingle": true
  471. }
  472. }
  473. },
  474. "topo": {
  475. "sources": [
  476. "abc","abc2"
  477. ],
  478. "edges": {
  479. "abc": ["windowop"],
  480. "abc2": ["windowop"],
  481. "windowop": ["groupop"],
  482. "joinop": ["mqtt2"],
  483. "groupop": ["aggfunc"],
  484. "aggfunc": ["joinop"]
  485. }
  486. }
  487. }`,
  488. err: "node aggfunc output does not match node joinop input: collection type mismatch, expect non-grouped collection, got grouped collection",
  489. },
  490. }
  491. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  492. for i, tt := range tests {
  493. rg := &api.RuleGraph{}
  494. err := json.Unmarshal([]byte(tt.graph), rg)
  495. if err != nil {
  496. t.Error(err)
  497. continue
  498. }
  499. _, err = PlanByGraph(&api.Rule{
  500. Triggered: false,
  501. Id: fmt.Sprintf("rule%d", i),
  502. Name: fmt.Sprintf("rule%d", i),
  503. Graph: rg,
  504. Options: &api.RuleOption{
  505. IsEventTime: false,
  506. LateTol: 1000,
  507. Concurrency: 1,
  508. BufferLength: 1024,
  509. SendMetaToSink: false,
  510. SendError: true,
  511. Qos: api.AtMostOnce,
  512. CheckpointInterval: 300000,
  513. },
  514. })
  515. if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
  516. t.Errorf("%d: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.err, err)
  517. }
  518. }
  519. }