str_func_test.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457
  1. package operators
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/xsql"
  7. "github.com/emqx/kuiper/xstream/contexts"
  8. "reflect"
  9. "strings"
  10. "testing"
  11. )
  12. func TestStrFunc_Apply1(t *testing.T) {
  13. var tests = []struct {
  14. sql string
  15. data *xsql.Tuple
  16. result []map[string]interface{}
  17. }{
  18. {
  19. sql: "SELECT concat(a, b, c) AS a FROM test",
  20. data: &xsql.Tuple{
  21. Emitter: "test",
  22. Message: xsql.Message{
  23. "a": "mya",
  24. "b": "myb",
  25. "c": "myc",
  26. },
  27. },
  28. result: []map[string]interface{}{{
  29. "a": "myamybmyc",
  30. }},
  31. },
  32. {
  33. sql: "SELECT endswith(a, b) AS a FROM test",
  34. data: &xsql.Tuple{
  35. Emitter: "test",
  36. Message: xsql.Message{
  37. "a": "mya",
  38. "b": "myb",
  39. "c": "myc",
  40. },
  41. },
  42. result: []map[string]interface{}{{
  43. "a": false,
  44. }},
  45. },
  46. {
  47. sql: "SELECT endswith(a, b) AS a FROM test",
  48. data: &xsql.Tuple{
  49. Emitter: "test",
  50. Message: xsql.Message{
  51. "a": "mya",
  52. "b": "ya",
  53. "c": "myc",
  54. },
  55. },
  56. result: []map[string]interface{}{{
  57. "a": true,
  58. }},
  59. },
  60. {
  61. sql: "SELECT format_time(a, \"yyyy-MM-dd T HH:mm:ss\") AS a FROM test",
  62. data: &xsql.Tuple{
  63. Emitter: "test",
  64. Message: xsql.Message{
  65. "a": common.TimeFromUnixMilli(1568854515000),
  66. "b": "ya",
  67. "c": "myc",
  68. },
  69. },
  70. result: []map[string]interface{}{{
  71. "a": "2019-09-19 T 00:55:15",
  72. }},
  73. },
  74. {
  75. sql: "SELECT format_time(meta(created) * 1000, \"yyyy-MM-dd T HH:mm:ss\") AS time FROM test",
  76. data: &xsql.Tuple{
  77. Emitter: "test",
  78. Message: xsql.Message{
  79. "a": "hello",
  80. "b": "ya",
  81. "c": "myc",
  82. },
  83. Metadata: xsql.Metadata{
  84. "created": float64(1.62000273e+09),
  85. },
  86. },
  87. result: []map[string]interface{}{{
  88. "time": "2021-05-03 T 00:45:30",
  89. }},
  90. },
  91. {
  92. sql: "SELECT indexof(a, \"a\") AS a FROM test",
  93. data: &xsql.Tuple{
  94. Emitter: "test",
  95. Message: xsql.Message{
  96. "a": "mya",
  97. "b": "ya",
  98. "c": "myc",
  99. },
  100. },
  101. result: []map[string]interface{}{{
  102. "a": float64(2),
  103. }},
  104. },
  105. {
  106. sql: "SELECT length(a) AS a FROM test",
  107. data: &xsql.Tuple{
  108. Emitter: "test",
  109. Message: xsql.Message{
  110. "a": "中国",
  111. "b": "ya",
  112. "c": "myc",
  113. },
  114. },
  115. result: []map[string]interface{}{{
  116. "a": float64(2),
  117. }},
  118. },
  119. {
  120. sql: "SELECT length(c) AS a FROM test",
  121. data: &xsql.Tuple{
  122. Emitter: "test",
  123. Message: xsql.Message{
  124. "a": "中国",
  125. "b": "ya",
  126. "c": "myc",
  127. },
  128. },
  129. result: []map[string]interface{}{{
  130. "a": float64(3),
  131. }},
  132. },
  133. {
  134. sql: "SELECT lower(a) AS a FROM test",
  135. data: &xsql.Tuple{
  136. Emitter: "test",
  137. Message: xsql.Message{
  138. "a": "NYCNicks",
  139. "b": "ya",
  140. "c": "myc",
  141. },
  142. },
  143. result: []map[string]interface{}{{
  144. "a": "nycnicks",
  145. }},
  146. },
  147. {
  148. sql: "SELECT lpad(a, 2) AS a FROM test",
  149. data: &xsql.Tuple{
  150. Emitter: "test",
  151. Message: xsql.Message{
  152. "a": "NYCNicks",
  153. "b": "ya",
  154. "c": "myc",
  155. },
  156. },
  157. result: []map[string]interface{}{{
  158. "a": " NYCNicks",
  159. }},
  160. },
  161. {
  162. sql: "SELECT ltrim(a) AS a FROM test",
  163. data: &xsql.Tuple{
  164. Emitter: "test",
  165. Message: xsql.Message{
  166. "a": " \ttrimme\n ",
  167. "b": "ya",
  168. "c": "myc",
  169. },
  170. },
  171. result: []map[string]interface{}{{
  172. "a": "trimme\n ",
  173. }},
  174. },
  175. {
  176. sql: "SELECT numbytes(a) AS a FROM test",
  177. data: &xsql.Tuple{
  178. Emitter: "test",
  179. Message: xsql.Message{
  180. "a": "中国",
  181. "b": "ya",
  182. "c": "myc",
  183. },
  184. },
  185. result: []map[string]interface{}{{
  186. "a": float64(6),
  187. }},
  188. },
  189. {
  190. sql: "SELECT numbytes(b) AS a FROM test",
  191. data: &xsql.Tuple{
  192. Emitter: "test",
  193. Message: xsql.Message{
  194. "a": "中国",
  195. "b": "ya",
  196. "c": "myc",
  197. },
  198. },
  199. result: []map[string]interface{}{{
  200. "a": float64(2),
  201. }},
  202. },
  203. {
  204. sql: "SELECT regexp_matches(a,\"foo.*\") AS a FROM test",
  205. data: &xsql.Tuple{
  206. Emitter: "test",
  207. Message: xsql.Message{
  208. "a": "seafood",
  209. "b": "ya",
  210. "c": "myc",
  211. },
  212. },
  213. result: []map[string]interface{}{{
  214. "a": true,
  215. }},
  216. },
  217. {
  218. sql: "SELECT regexp_matches(b,\"foo.*\") AS a FROM test",
  219. data: &xsql.Tuple{
  220. Emitter: "test",
  221. Message: xsql.Message{
  222. "a": "seafood",
  223. "b": "ya",
  224. "c": "myc",
  225. },
  226. },
  227. result: []map[string]interface{}{{
  228. "a": false,
  229. }},
  230. },
  231. {
  232. sql: "SELECT regexp_replace(a,\"a(x*)b\", \"REP\") AS a FROM test",
  233. data: &xsql.Tuple{
  234. Emitter: "test",
  235. Message: xsql.Message{
  236. "a": "-ab-axxb-",
  237. "b": "ya",
  238. "c": "myc",
  239. },
  240. },
  241. result: []map[string]interface{}{{
  242. "a": "-REP-REP-",
  243. }},
  244. },
  245. {
  246. sql: "SELECT regexp_substr(a,\"foo.*\") AS a FROM test",
  247. data: &xsql.Tuple{
  248. Emitter: "test",
  249. Message: xsql.Message{
  250. "a": "seafood",
  251. "b": "ya",
  252. "c": "myc",
  253. },
  254. },
  255. result: []map[string]interface{}{{
  256. "a": "food",
  257. }},
  258. },
  259. {
  260. sql: "SELECT rpad(a, 3) AS a FROM test",
  261. data: &xsql.Tuple{
  262. Emitter: "test",
  263. Message: xsql.Message{
  264. "a": "NYCNicks",
  265. "b": "ya",
  266. "c": "myc",
  267. },
  268. },
  269. result: []map[string]interface{}{{
  270. "a": "NYCNicks ",
  271. }},
  272. },
  273. {
  274. sql: "SELECT rtrim(a) AS a FROM test",
  275. data: &xsql.Tuple{
  276. Emitter: "test",
  277. Message: xsql.Message{
  278. "a": " \ttrimme\n ",
  279. "b": "ya",
  280. "c": "myc",
  281. },
  282. },
  283. result: []map[string]interface{}{{
  284. "a": " \ttrimme",
  285. }},
  286. },
  287. {
  288. sql: "SELECT substring(a, 3) AS a FROM test",
  289. data: &xsql.Tuple{
  290. Emitter: "test",
  291. Message: xsql.Message{
  292. "a": "NYCNicks",
  293. "b": "ya",
  294. "c": "myc",
  295. },
  296. },
  297. result: []map[string]interface{}{{
  298. "a": "Nicks",
  299. }},
  300. },
  301. {
  302. sql: "SELECT substring(a, 3, 5) AS a FROM test",
  303. data: &xsql.Tuple{
  304. Emitter: "test",
  305. Message: xsql.Message{
  306. "a": "NYCNicks",
  307. "b": "ya",
  308. "c": "myc",
  309. },
  310. },
  311. result: []map[string]interface{}{{
  312. "a": "Ni",
  313. }},
  314. },
  315. {
  316. sql: "SELECT endswith(a, b) AS a FROM test",
  317. data: &xsql.Tuple{
  318. Emitter: "test",
  319. Message: xsql.Message{
  320. "a": "mya",
  321. "b": "ya",
  322. "c": "myc",
  323. },
  324. },
  325. result: []map[string]interface{}{{
  326. "a": true,
  327. }},
  328. },
  329. {
  330. sql: "SELECT endswith(a, c) AS a FROM test",
  331. data: &xsql.Tuple{
  332. Emitter: "test",
  333. Message: xsql.Message{
  334. "a": "mya",
  335. "b": "ya",
  336. "c": "myc",
  337. },
  338. },
  339. result: []map[string]interface{}{{
  340. "a": false,
  341. }},
  342. },
  343. {
  344. sql: "SELECT trim(a) AS a FROM test",
  345. data: &xsql.Tuple{
  346. Emitter: "test",
  347. Message: xsql.Message{
  348. "a": " \ttrimme\n ",
  349. "b": "ya",
  350. "c": "myc",
  351. },
  352. },
  353. result: []map[string]interface{}{{
  354. "a": "trimme",
  355. }},
  356. },
  357. {
  358. sql: "SELECT upper(a) AS a FROM test",
  359. data: &xsql.Tuple{
  360. Emitter: "test",
  361. Message: xsql.Message{
  362. "a": "NYCNicks",
  363. "b": "ya",
  364. "c": "myc",
  365. },
  366. },
  367. result: []map[string]interface{}{{
  368. "a": "NYCNICKS",
  369. }},
  370. },
  371. {
  372. sql: `SELECT split_value(a,"/",0) AS a FROM test1`,
  373. data: &xsql.Tuple{
  374. Emitter: "test",
  375. Message: xsql.Message{
  376. "a": "test/device001/message",
  377. },
  378. },
  379. result: []map[string]interface{}{{
  380. "a": "test",
  381. }},
  382. },
  383. {
  384. sql: `SELECT split_value(a,"/",1) AS a FROM test1`,
  385. data: &xsql.Tuple{
  386. Emitter: "test",
  387. Message: xsql.Message{
  388. "a": "test/device001/message",
  389. },
  390. },
  391. result: []map[string]interface{}{{
  392. "a": "device001",
  393. }},
  394. },
  395. {
  396. sql: `SELECT split_value(a,"/",2) AS a FROM test1`,
  397. data: &xsql.Tuple{
  398. Emitter: "test",
  399. Message: xsql.Message{
  400. "a": "test/device001/message",
  401. },
  402. },
  403. result: []map[string]interface{}{{
  404. "a": "message",
  405. }},
  406. },
  407. {
  408. sql: `SELECT split_value(a,"/",0) AS a, split_value(a,"/",3) AS b FROM test1`,
  409. data: &xsql.Tuple{
  410. Emitter: "test",
  411. Message: xsql.Message{
  412. "a": "/test/device001/message",
  413. },
  414. },
  415. result: []map[string]interface{}{{
  416. "a": "",
  417. "b": "message",
  418. }},
  419. },
  420. }
  421. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  422. contextLogger := common.Log.WithField("rule", "TestStrFunc_Apply1")
  423. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  424. for i, tt := range tests {
  425. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  426. if err != nil || stmt == nil {
  427. t.Errorf("parse sql %s error %v", tt.sql, err)
  428. }
  429. pp := &ProjectOp{Fields: stmt.Fields}
  430. pp.isTest = true
  431. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  432. result := pp.Apply(ctx, tt.data, fv, afv)
  433. var mapRes []map[string]interface{}
  434. if v, ok := result.([]byte); ok {
  435. err := json.Unmarshal(v, &mapRes)
  436. if err != nil {
  437. t.Errorf("Failed to parse the input into map.\n")
  438. continue
  439. }
  440. //fmt.Printf("%t\n", mapRes["kuiper_field_0"])
  441. if !reflect.DeepEqual(tt.result, mapRes) {
  442. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  443. }
  444. } else {
  445. t.Errorf("%d. The returned result is not type of []byte\n", i)
  446. }
  447. }
  448. }