str_func_test.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439
  1. package plans
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/xsql"
  7. "github.com/emqx/kuiper/xstream/contexts"
  8. "reflect"
  9. "strings"
  10. "testing"
  11. )
  12. func TestStrFunc_Apply1(t *testing.T) {
  13. var tests = []struct {
  14. sql string
  15. data *xsql.Tuple
  16. result []map[string]interface{}
  17. }{
  18. {
  19. sql: "SELECT concat(a, b, c) AS a FROM test",
  20. data: &xsql.Tuple{
  21. Emitter: "test",
  22. Message: xsql.Message{
  23. "a": "mya",
  24. "b": "myb",
  25. "c": "myc",
  26. },
  27. },
  28. result: []map[string]interface{}{{
  29. "a": "myamybmyc",
  30. }},
  31. },
  32. {
  33. sql: "SELECT endswith(a, b) AS a FROM test",
  34. data: &xsql.Tuple{
  35. Emitter: "test",
  36. Message: xsql.Message{
  37. "a": "mya",
  38. "b": "myb",
  39. "c": "myc",
  40. },
  41. },
  42. result: []map[string]interface{}{{
  43. "a": false,
  44. }},
  45. },
  46. {
  47. sql: "SELECT endswith(a, b) AS a FROM test",
  48. data: &xsql.Tuple{
  49. Emitter: "test",
  50. Message: xsql.Message{
  51. "a": "mya",
  52. "b": "ya",
  53. "c": "myc",
  54. },
  55. },
  56. result: []map[string]interface{}{{
  57. "a": true,
  58. }},
  59. },
  60. {
  61. sql: "SELECT format_time(a, \"yyyy-MM-dd T HH:mm:ss\") AS a FROM test",
  62. data: &xsql.Tuple{
  63. Emitter: "test",
  64. Message: xsql.Message{
  65. "a": common.TimeFromUnixMilli(1568854515000),
  66. "b": "ya",
  67. "c": "myc",
  68. },
  69. },
  70. result: []map[string]interface{}{{
  71. "a": "2019-09-19 T 00:55:15",
  72. }},
  73. },
  74. {
  75. sql: "SELECT indexof(a, \"a\") AS a FROM test",
  76. data: &xsql.Tuple{
  77. Emitter: "test",
  78. Message: xsql.Message{
  79. "a": "mya",
  80. "b": "ya",
  81. "c": "myc",
  82. },
  83. },
  84. result: []map[string]interface{}{{
  85. "a": float64(2),
  86. }},
  87. },
  88. {
  89. sql: "SELECT length(a) AS a FROM test",
  90. data: &xsql.Tuple{
  91. Emitter: "test",
  92. Message: xsql.Message{
  93. "a": "中国",
  94. "b": "ya",
  95. "c": "myc",
  96. },
  97. },
  98. result: []map[string]interface{}{{
  99. "a": float64(2),
  100. }},
  101. },
  102. {
  103. sql: "SELECT length(c) AS a FROM test",
  104. data: &xsql.Tuple{
  105. Emitter: "test",
  106. Message: xsql.Message{
  107. "a": "中国",
  108. "b": "ya",
  109. "c": "myc",
  110. },
  111. },
  112. result: []map[string]interface{}{{
  113. "a": float64(3),
  114. }},
  115. },
  116. {
  117. sql: "SELECT lower(a) AS a FROM test",
  118. data: &xsql.Tuple{
  119. Emitter: "test",
  120. Message: xsql.Message{
  121. "a": "NYCNicks",
  122. "b": "ya",
  123. "c": "myc",
  124. },
  125. },
  126. result: []map[string]interface{}{{
  127. "a": "nycnicks",
  128. }},
  129. },
  130. {
  131. sql: "SELECT lpad(a, 2) AS a FROM test",
  132. data: &xsql.Tuple{
  133. Emitter: "test",
  134. Message: xsql.Message{
  135. "a": "NYCNicks",
  136. "b": "ya",
  137. "c": "myc",
  138. },
  139. },
  140. result: []map[string]interface{}{{
  141. "a": " NYCNicks",
  142. }},
  143. },
  144. {
  145. sql: "SELECT ltrim(a) AS a FROM test",
  146. data: &xsql.Tuple{
  147. Emitter: "test",
  148. Message: xsql.Message{
  149. "a": " \ttrimme\n ",
  150. "b": "ya",
  151. "c": "myc",
  152. },
  153. },
  154. result: []map[string]interface{}{{
  155. "a": "trimme\n ",
  156. }},
  157. },
  158. {
  159. sql: "SELECT numbytes(a) AS a FROM test",
  160. data: &xsql.Tuple{
  161. Emitter: "test",
  162. Message: xsql.Message{
  163. "a": "中国",
  164. "b": "ya",
  165. "c": "myc",
  166. },
  167. },
  168. result: []map[string]interface{}{{
  169. "a": float64(6),
  170. }},
  171. },
  172. {
  173. sql: "SELECT numbytes(b) AS a FROM test",
  174. data: &xsql.Tuple{
  175. Emitter: "test",
  176. Message: xsql.Message{
  177. "a": "中国",
  178. "b": "ya",
  179. "c": "myc",
  180. },
  181. },
  182. result: []map[string]interface{}{{
  183. "a": float64(2),
  184. }},
  185. },
  186. {
  187. sql: "SELECT regexp_matches(a,\"foo.*\") AS a FROM test",
  188. data: &xsql.Tuple{
  189. Emitter: "test",
  190. Message: xsql.Message{
  191. "a": "seafood",
  192. "b": "ya",
  193. "c": "myc",
  194. },
  195. },
  196. result: []map[string]interface{}{{
  197. "a": true,
  198. }},
  199. },
  200. {
  201. sql: "SELECT regexp_matches(b,\"foo.*\") AS a FROM test",
  202. data: &xsql.Tuple{
  203. Emitter: "test",
  204. Message: xsql.Message{
  205. "a": "seafood",
  206. "b": "ya",
  207. "c": "myc",
  208. },
  209. },
  210. result: []map[string]interface{}{{
  211. "a": false,
  212. }},
  213. },
  214. {
  215. sql: "SELECT regexp_replace(a,\"a(x*)b\", \"REP\") AS a FROM test",
  216. data: &xsql.Tuple{
  217. Emitter: "test",
  218. Message: xsql.Message{
  219. "a": "-ab-axxb-",
  220. "b": "ya",
  221. "c": "myc",
  222. },
  223. },
  224. result: []map[string]interface{}{{
  225. "a": "-REP-REP-",
  226. }},
  227. },
  228. {
  229. sql: "SELECT regexp_substr(a,\"foo.*\") AS a FROM test",
  230. data: &xsql.Tuple{
  231. Emitter: "test",
  232. Message: xsql.Message{
  233. "a": "seafood",
  234. "b": "ya",
  235. "c": "myc",
  236. },
  237. },
  238. result: []map[string]interface{}{{
  239. "a": "food",
  240. }},
  241. },
  242. {
  243. sql: "SELECT rpad(a, 3) AS a FROM test",
  244. data: &xsql.Tuple{
  245. Emitter: "test",
  246. Message: xsql.Message{
  247. "a": "NYCNicks",
  248. "b": "ya",
  249. "c": "myc",
  250. },
  251. },
  252. result: []map[string]interface{}{{
  253. "a": "NYCNicks ",
  254. }},
  255. },
  256. {
  257. sql: "SELECT rtrim(a) AS a FROM test",
  258. data: &xsql.Tuple{
  259. Emitter: "test",
  260. Message: xsql.Message{
  261. "a": " \ttrimme\n ",
  262. "b": "ya",
  263. "c": "myc",
  264. },
  265. },
  266. result: []map[string]interface{}{{
  267. "a": " \ttrimme",
  268. }},
  269. },
  270. {
  271. sql: "SELECT substring(a, 3) AS a FROM test",
  272. data: &xsql.Tuple{
  273. Emitter: "test",
  274. Message: xsql.Message{
  275. "a": "NYCNicks",
  276. "b": "ya",
  277. "c": "myc",
  278. },
  279. },
  280. result: []map[string]interface{}{{
  281. "a": "Nicks",
  282. }},
  283. },
  284. {
  285. sql: "SELECT substring(a, 3, 5) AS a FROM test",
  286. data: &xsql.Tuple{
  287. Emitter: "test",
  288. Message: xsql.Message{
  289. "a": "NYCNicks",
  290. "b": "ya",
  291. "c": "myc",
  292. },
  293. },
  294. result: []map[string]interface{}{{
  295. "a": "Ni",
  296. }},
  297. },
  298. {
  299. sql: "SELECT endswith(a, b) AS a FROM test",
  300. data: &xsql.Tuple{
  301. Emitter: "test",
  302. Message: xsql.Message{
  303. "a": "mya",
  304. "b": "ya",
  305. "c": "myc",
  306. },
  307. },
  308. result: []map[string]interface{}{{
  309. "a": true,
  310. }},
  311. },
  312. {
  313. sql: "SELECT endswith(a, c) AS a FROM test",
  314. data: &xsql.Tuple{
  315. Emitter: "test",
  316. Message: xsql.Message{
  317. "a": "mya",
  318. "b": "ya",
  319. "c": "myc",
  320. },
  321. },
  322. result: []map[string]interface{}{{
  323. "a": false,
  324. }},
  325. },
  326. {
  327. sql: "SELECT trim(a) AS a FROM test",
  328. data: &xsql.Tuple{
  329. Emitter: "test",
  330. Message: xsql.Message{
  331. "a": " \ttrimme\n ",
  332. "b": "ya",
  333. "c": "myc",
  334. },
  335. },
  336. result: []map[string]interface{}{{
  337. "a": "trimme",
  338. }},
  339. },
  340. {
  341. sql: "SELECT upper(a) AS a FROM test",
  342. data: &xsql.Tuple{
  343. Emitter: "test",
  344. Message: xsql.Message{
  345. "a": "NYCNicks",
  346. "b": "ya",
  347. "c": "myc",
  348. },
  349. },
  350. result: []map[string]interface{}{{
  351. "a": "NYCNICKS",
  352. }},
  353. },
  354. {
  355. sql: `SELECT split_value(a,"/",0) AS a FROM test1`,
  356. data: &xsql.Tuple{
  357. Emitter: "test",
  358. Message: xsql.Message{
  359. "a": "test/device001/message",
  360. },
  361. },
  362. result: []map[string]interface{}{{
  363. "a": "test",
  364. }},
  365. },
  366. {
  367. sql: `SELECT split_value(a,"/",1) AS a FROM test1`,
  368. data: &xsql.Tuple{
  369. Emitter: "test",
  370. Message: xsql.Message{
  371. "a": "test/device001/message",
  372. },
  373. },
  374. result: []map[string]interface{}{{
  375. "a": "device001",
  376. }},
  377. },
  378. {
  379. sql: `SELECT split_value(a,"/",2) AS a FROM test1`,
  380. data: &xsql.Tuple{
  381. Emitter: "test",
  382. Message: xsql.Message{
  383. "a": "test/device001/message",
  384. },
  385. },
  386. result: []map[string]interface{}{{
  387. "a": "message",
  388. }},
  389. },
  390. {
  391. sql: `SELECT split_value(a,"/",0) AS a, split_value(a,"/",3) AS b FROM test1`,
  392. data: &xsql.Tuple{
  393. Emitter: "test",
  394. Message: xsql.Message{
  395. "a": "/test/device001/message",
  396. },
  397. },
  398. result: []map[string]interface{}{{
  399. "a": "",
  400. "b": "message",
  401. }},
  402. },
  403. }
  404. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  405. contextLogger := common.Log.WithField("rule", "TestStrFunc_Apply1")
  406. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  407. for i, tt := range tests {
  408. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  409. if err != nil || stmt == nil {
  410. t.Errorf("parse sql %s error %v", tt.sql, err)
  411. }
  412. pp := &ProjectPlan{Fields: stmt.Fields}
  413. pp.isTest = true
  414. result := pp.Apply(ctx, tt.data)
  415. var mapRes []map[string]interface{}
  416. if v, ok := result.([]byte); ok {
  417. err := json.Unmarshal(v, &mapRes)
  418. if err != nil {
  419. t.Errorf("Failed to parse the input into map.\n")
  420. continue
  421. }
  422. //fmt.Printf("%t\n", mapRes["rengine_field_0"])
  423. if !reflect.DeepEqual(tt.result, mapRes) {
  424. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  425. }
  426. } else {
  427. t.Errorf("%d. The returned result is not type of []byte\n", i)
  428. }
  429. }
  430. }