str_func_test.go 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package operator
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "github.com/lf-edge/ekuiper/internal/conf"
  19. "github.com/lf-edge/ekuiper/internal/topo/context"
  20. "github.com/lf-edge/ekuiper/internal/xsql"
  21. "github.com/lf-edge/ekuiper/pkg/cast"
  22. "reflect"
  23. "strings"
  24. "testing"
  25. )
  26. func TestStrFunc_Apply1(t *testing.T) {
  27. var tests = []struct {
  28. sql string
  29. data *xsql.Tuple
  30. result []map[string]interface{}
  31. }{
  32. {
  33. sql: "SELECT concat(a, b, c) AS a FROM test",
  34. data: &xsql.Tuple{
  35. Emitter: "test",
  36. Message: xsql.Message{
  37. "a": "mya",
  38. "b": "myb",
  39. "c": "myc",
  40. },
  41. },
  42. result: []map[string]interface{}{{
  43. "a": "myamybmyc",
  44. }},
  45. },
  46. {
  47. sql: "SELECT endswith(a, b) AS a FROM test",
  48. data: &xsql.Tuple{
  49. Emitter: "test",
  50. Message: xsql.Message{
  51. "a": "mya",
  52. "b": "myb",
  53. "c": "myc",
  54. },
  55. },
  56. result: []map[string]interface{}{{
  57. "a": false,
  58. }},
  59. },
  60. {
  61. sql: "SELECT endswith(a, b) AS a FROM test",
  62. data: &xsql.Tuple{
  63. Emitter: "test",
  64. Message: xsql.Message{
  65. "a": "mya",
  66. "b": "ya",
  67. "c": "myc",
  68. },
  69. },
  70. result: []map[string]interface{}{{
  71. "a": true,
  72. }},
  73. },
  74. {
  75. sql: "SELECT format_time(a, \"yyyy-MM-dd T HH:mm:ss\") AS a FROM test",
  76. data: &xsql.Tuple{
  77. Emitter: "test",
  78. Message: xsql.Message{
  79. "a": cast.TimeFromUnixMilli(1568854515000),
  80. "b": "ya",
  81. "c": "myc",
  82. },
  83. },
  84. result: []map[string]interface{}{{
  85. "a": "2019-09-19 T 00:55:15",
  86. }},
  87. },
  88. {
  89. sql: "SELECT format_time(meta(created) * 1000, \"yyyy-MM-dd T HH:mm:ss\") AS time FROM test",
  90. data: &xsql.Tuple{
  91. Emitter: "test",
  92. Message: xsql.Message{
  93. "a": "hello",
  94. "b": "ya",
  95. "c": "myc",
  96. },
  97. Metadata: xsql.Metadata{
  98. "created": 1.62000273e+09,
  99. },
  100. },
  101. result: []map[string]interface{}{{
  102. "time": "2021-05-03 T 00:45:30",
  103. }},
  104. },
  105. {
  106. sql: "SELECT indexof(a, \"a\") AS a FROM test",
  107. data: &xsql.Tuple{
  108. Emitter: "test",
  109. Message: xsql.Message{
  110. "a": "mya",
  111. "b": "ya",
  112. "c": "myc",
  113. },
  114. },
  115. result: []map[string]interface{}{{
  116. "a": float64(2),
  117. }},
  118. },
  119. {
  120. sql: "SELECT length(a) AS a FROM test",
  121. data: &xsql.Tuple{
  122. Emitter: "test",
  123. Message: xsql.Message{
  124. "a": "中国",
  125. "b": "ya",
  126. "c": "myc",
  127. },
  128. },
  129. result: []map[string]interface{}{{
  130. "a": float64(2),
  131. }},
  132. },
  133. {
  134. sql: "SELECT length(c) AS a FROM test",
  135. data: &xsql.Tuple{
  136. Emitter: "test",
  137. Message: xsql.Message{
  138. "a": "中国",
  139. "b": "ya",
  140. "c": "myc",
  141. },
  142. },
  143. result: []map[string]interface{}{{
  144. "a": float64(3),
  145. }},
  146. },
  147. {
  148. sql: "SELECT lower(a) AS a FROM test",
  149. data: &xsql.Tuple{
  150. Emitter: "test",
  151. Message: xsql.Message{
  152. "a": "NYCNicks",
  153. "b": "ya",
  154. "c": "myc",
  155. },
  156. },
  157. result: []map[string]interface{}{{
  158. "a": "nycnicks",
  159. }},
  160. },
  161. {
  162. sql: "SELECT lpad(a, 2) AS a FROM test",
  163. data: &xsql.Tuple{
  164. Emitter: "test",
  165. Message: xsql.Message{
  166. "a": "NYCNicks",
  167. "b": "ya",
  168. "c": "myc",
  169. },
  170. },
  171. result: []map[string]interface{}{{
  172. "a": " NYCNicks",
  173. }},
  174. },
  175. {
  176. sql: "SELECT ltrim(a) AS a FROM test",
  177. data: &xsql.Tuple{
  178. Emitter: "test",
  179. Message: xsql.Message{
  180. "a": " \ttrimme\n ",
  181. "b": "ya",
  182. "c": "myc",
  183. },
  184. },
  185. result: []map[string]interface{}{{
  186. "a": "trimme\n ",
  187. }},
  188. },
  189. {
  190. sql: "SELECT numbytes(a) AS a FROM test",
  191. data: &xsql.Tuple{
  192. Emitter: "test",
  193. Message: xsql.Message{
  194. "a": "中国",
  195. "b": "ya",
  196. "c": "myc",
  197. },
  198. },
  199. result: []map[string]interface{}{{
  200. "a": float64(6),
  201. }},
  202. },
  203. {
  204. sql: "SELECT numbytes(b) AS a FROM test",
  205. data: &xsql.Tuple{
  206. Emitter: "test",
  207. Message: xsql.Message{
  208. "a": "中国",
  209. "b": "ya",
  210. "c": "myc",
  211. },
  212. },
  213. result: []map[string]interface{}{{
  214. "a": float64(2),
  215. }},
  216. },
  217. {
  218. sql: "SELECT regexp_matches(a,\"foo.*\") AS a FROM test",
  219. data: &xsql.Tuple{
  220. Emitter: "test",
  221. Message: xsql.Message{
  222. "a": "seafood",
  223. "b": "ya",
  224. "c": "myc",
  225. },
  226. },
  227. result: []map[string]interface{}{{
  228. "a": true,
  229. }},
  230. },
  231. {
  232. sql: "SELECT regexp_matches(b,\"foo.*\") AS a FROM test",
  233. data: &xsql.Tuple{
  234. Emitter: "test",
  235. Message: xsql.Message{
  236. "a": "seafood",
  237. "b": "ya",
  238. "c": "myc",
  239. },
  240. },
  241. result: []map[string]interface{}{{
  242. "a": false,
  243. }},
  244. },
  245. {
  246. sql: "SELECT regexp_replace(a,\"a(x*)b\", \"REP\") AS a FROM test",
  247. data: &xsql.Tuple{
  248. Emitter: "test",
  249. Message: xsql.Message{
  250. "a": "-ab-axxb-",
  251. "b": "ya",
  252. "c": "myc",
  253. },
  254. },
  255. result: []map[string]interface{}{{
  256. "a": "-REP-REP-",
  257. }},
  258. },
  259. {
  260. sql: "SELECT regexp_substr(a,\"foo.*\") AS a FROM test",
  261. data: &xsql.Tuple{
  262. Emitter: "test",
  263. Message: xsql.Message{
  264. "a": "seafood",
  265. "b": "ya",
  266. "c": "myc",
  267. },
  268. },
  269. result: []map[string]interface{}{{
  270. "a": "food",
  271. }},
  272. },
  273. {
  274. sql: "SELECT rpad(a, 3) AS a FROM test",
  275. data: &xsql.Tuple{
  276. Emitter: "test",
  277. Message: xsql.Message{
  278. "a": "NYCNicks",
  279. "b": "ya",
  280. "c": "myc",
  281. },
  282. },
  283. result: []map[string]interface{}{{
  284. "a": "NYCNicks ",
  285. }},
  286. },
  287. {
  288. sql: "SELECT rtrim(a) AS a FROM test",
  289. data: &xsql.Tuple{
  290. Emitter: "test",
  291. Message: xsql.Message{
  292. "a": " \ttrimme\n ",
  293. "b": "ya",
  294. "c": "myc",
  295. },
  296. },
  297. result: []map[string]interface{}{{
  298. "a": " \ttrimme",
  299. }},
  300. },
  301. {
  302. sql: "SELECT substring(a, 3) AS a FROM test",
  303. data: &xsql.Tuple{
  304. Emitter: "test",
  305. Message: xsql.Message{
  306. "a": "NYCNicks",
  307. "b": "ya",
  308. "c": "myc",
  309. },
  310. },
  311. result: []map[string]interface{}{{
  312. "a": "Nicks",
  313. }},
  314. },
  315. {
  316. sql: "SELECT substring(a, 3, 5) AS a FROM test",
  317. data: &xsql.Tuple{
  318. Emitter: "test",
  319. Message: xsql.Message{
  320. "a": "NYCNicks",
  321. "b": "ya",
  322. "c": "myc",
  323. },
  324. },
  325. result: []map[string]interface{}{{
  326. "a": "Ni",
  327. }},
  328. },
  329. {
  330. sql: "SELECT endswith(a, b) AS a FROM test",
  331. data: &xsql.Tuple{
  332. Emitter: "test",
  333. Message: xsql.Message{
  334. "a": "mya",
  335. "b": "ya",
  336. "c": "myc",
  337. },
  338. },
  339. result: []map[string]interface{}{{
  340. "a": true,
  341. }},
  342. },
  343. {
  344. sql: "SELECT endswith(a, c) AS a FROM test",
  345. data: &xsql.Tuple{
  346. Emitter: "test",
  347. Message: xsql.Message{
  348. "a": "mya",
  349. "b": "ya",
  350. "c": "myc",
  351. },
  352. },
  353. result: []map[string]interface{}{{
  354. "a": false,
  355. }},
  356. },
  357. {
  358. sql: "SELECT trim(a) AS a FROM test",
  359. data: &xsql.Tuple{
  360. Emitter: "test",
  361. Message: xsql.Message{
  362. "a": " \ttrimme\n ",
  363. "b": "ya",
  364. "c": "myc",
  365. },
  366. },
  367. result: []map[string]interface{}{{
  368. "a": "trimme",
  369. }},
  370. },
  371. {
  372. sql: "SELECT upper(a) AS a FROM test",
  373. data: &xsql.Tuple{
  374. Emitter: "test",
  375. Message: xsql.Message{
  376. "a": "NYCNicks",
  377. "b": "ya",
  378. "c": "myc",
  379. },
  380. },
  381. result: []map[string]interface{}{{
  382. "a": "NYCNICKS",
  383. }},
  384. },
  385. {
  386. sql: `SELECT split_value(a,"/",0) AS a FROM test1`,
  387. data: &xsql.Tuple{
  388. Emitter: "test",
  389. Message: xsql.Message{
  390. "a": "test/device001/message",
  391. },
  392. },
  393. result: []map[string]interface{}{{
  394. "a": "test",
  395. }},
  396. },
  397. {
  398. sql: `SELECT split_value(a,"/",1) AS a FROM test1`,
  399. data: &xsql.Tuple{
  400. Emitter: "test",
  401. Message: xsql.Message{
  402. "a": "test/device001/message",
  403. },
  404. },
  405. result: []map[string]interface{}{{
  406. "a": "device001",
  407. }},
  408. },
  409. {
  410. sql: `SELECT split_value(a,"/",2) AS a FROM test1`,
  411. data: &xsql.Tuple{
  412. Emitter: "test",
  413. Message: xsql.Message{
  414. "a": "test/device001/message",
  415. },
  416. },
  417. result: []map[string]interface{}{{
  418. "a": "message",
  419. }},
  420. },
  421. {
  422. sql: `SELECT split_value(a,"/",0) AS a, split_value(a,"/",3) AS b FROM test1`,
  423. data: &xsql.Tuple{
  424. Emitter: "test",
  425. Message: xsql.Message{
  426. "a": "/test/device001/message",
  427. },
  428. },
  429. result: []map[string]interface{}{{
  430. "a": "",
  431. "b": "message",
  432. }},
  433. },
  434. }
  435. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  436. contextLogger := conf.Log.WithField("rule", "TestStrFunc_Apply1")
  437. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  438. for i, tt := range tests {
  439. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  440. if err != nil || stmt == nil {
  441. t.Errorf("parse sql %s error %v", tt.sql, err)
  442. }
  443. pp := &ProjectOp{Fields: stmt.Fields}
  444. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  445. result := pp.Apply(ctx, tt.data, fv, afv)
  446. var mapRes []map[string]interface{}
  447. if v, ok := result.([]byte); ok {
  448. err := json.Unmarshal(v, &mapRes)
  449. if err != nil {
  450. t.Errorf("Failed to parse the input into map.\n")
  451. continue
  452. }
  453. //fmt.Printf("%t\n", mapRes["kuiper_field_0"])
  454. if !reflect.DeepEqual(tt.result, mapRes) {
  455. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  456. }
  457. } else {
  458. t.Errorf("%d. The returned result is not type of []byte\n", i)
  459. }
  460. }
  461. }