str_func_test.go 8.9 KB


  1. package operator
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/lf-edge/ekuiper/internal/conf"
  6. "github.com/lf-edge/ekuiper/internal/topo/context"
  7. "github.com/lf-edge/ekuiper/internal/xsql"
  8. "github.com/lf-edge/ekuiper/pkg/cast"
  9. "reflect"
  10. "strings"
  11. "testing"
  12. )
  13. func TestStrFunc_Apply1(t *testing.T) {
  14. var tests = []struct {
  15. sql string
  16. data *xsql.Tuple
  17. result []map[string]interface{}
  18. }{
  19. {
  20. sql: "SELECT concat(a, b, c) AS a FROM test",
  21. data: &xsql.Tuple{
  22. Emitter: "test",
  23. Message: xsql.Message{
  24. "a": "mya",
  25. "b": "myb",
  26. "c": "myc",
  27. },
  28. },
  29. result: []map[string]interface{}{{
  30. "a": "myamybmyc",
  31. }},
  32. },
  33. {
  34. sql: "SELECT endswith(a, b) AS a FROM test",
  35. data: &xsql.Tuple{
  36. Emitter: "test",
  37. Message: xsql.Message{
  38. "a": "mya",
  39. "b": "myb",
  40. "c": "myc",
  41. },
  42. },
  43. result: []map[string]interface{}{{
  44. "a": false,
  45. }},
  46. },
  47. {
  48. sql: "SELECT endswith(a, b) AS a FROM test",
  49. data: &xsql.Tuple{
  50. Emitter: "test",
  51. Message: xsql.Message{
  52. "a": "mya",
  53. "b": "ya",
  54. "c": "myc",
  55. },
  56. },
  57. result: []map[string]interface{}{{
  58. "a": true,
  59. }},
  60. },
  61. {
  62. sql: "SELECT format_time(a, \"yyyy-MM-dd T HH:mm:ss\") AS a FROM test",
  63. data: &xsql.Tuple{
  64. Emitter: "test",
  65. Message: xsql.Message{
  66. "a": cast.TimeFromUnixMilli(1568854515000),
  67. "b": "ya",
  68. "c": "myc",
  69. },
  70. },
  71. result: []map[string]interface{}{{
  72. "a": "2019-09-19 T 00:55:15",
  73. }},
  74. },
  75. {
  76. sql: "SELECT format_time(meta(created) * 1000, \"yyyy-MM-dd T HH:mm:ss\") AS time FROM test",
  77. data: &xsql.Tuple{
  78. Emitter: "test",
  79. Message: xsql.Message{
  80. "a": "hello",
  81. "b": "ya",
  82. "c": "myc",
  83. },
  84. Metadata: xsql.Metadata{
  85. "created": 1.62000273e+09,
  86. },
  87. },
  88. result: []map[string]interface{}{{
  89. "time": "2021-05-03 T 00:45:30",
  90. }},
  91. },
  92. {
  93. sql: "SELECT indexof(a, \"a\") AS a FROM test",
  94. data: &xsql.Tuple{
  95. Emitter: "test",
  96. Message: xsql.Message{
  97. "a": "mya",
  98. "b": "ya",
  99. "c": "myc",
  100. },
  101. },
  102. result: []map[string]interface{}{{
  103. "a": float64(2),
  104. }},
  105. },
  106. {
  107. sql: "SELECT length(a) AS a FROM test",
  108. data: &xsql.Tuple{
  109. Emitter: "test",
  110. Message: xsql.Message{
  111. "a": "中国",
  112. "b": "ya",
  113. "c": "myc",
  114. },
  115. },
  116. result: []map[string]interface{}{{
  117. "a": float64(2),
  118. }},
  119. },
  120. {
  121. sql: "SELECT length(c) AS a FROM test",
  122. data: &xsql.Tuple{
  123. Emitter: "test",
  124. Message: xsql.Message{
  125. "a": "中国",
  126. "b": "ya",
  127. "c": "myc",
  128. },
  129. },
  130. result: []map[string]interface{}{{
  131. "a": float64(3),
  132. }},
  133. },
  134. {
  135. sql: "SELECT lower(a) AS a FROM test",
  136. data: &xsql.Tuple{
  137. Emitter: "test",
  138. Message: xsql.Message{
  139. "a": "NYCNicks",
  140. "b": "ya",
  141. "c": "myc",
  142. },
  143. },
  144. result: []map[string]interface{}{{
  145. "a": "nycnicks",
  146. }},
  147. },
  148. {
  149. sql: "SELECT lpad(a, 2) AS a FROM test",
  150. data: &xsql.Tuple{
  151. Emitter: "test",
  152. Message: xsql.Message{
  153. "a": "NYCNicks",
  154. "b": "ya",
  155. "c": "myc",
  156. },
  157. },
  158. result: []map[string]interface{}{{
  159. "a": " NYCNicks",
  160. }},
  161. },
  162. {
  163. sql: "SELECT ltrim(a) AS a FROM test",
  164. data: &xsql.Tuple{
  165. Emitter: "test",
  166. Message: xsql.Message{
  167. "a": " \ttrimme\n ",
  168. "b": "ya",
  169. "c": "myc",
  170. },
  171. },
  172. result: []map[string]interface{}{{
  173. "a": "trimme\n ",
  174. }},
  175. },
  176. {
  177. sql: "SELECT numbytes(a) AS a FROM test",
  178. data: &xsql.Tuple{
  179. Emitter: "test",
  180. Message: xsql.Message{
  181. "a": "中国",
  182. "b": "ya",
  183. "c": "myc",
  184. },
  185. },
  186. result: []map[string]interface{}{{
  187. "a": float64(6),
  188. }},
  189. },
  190. {
  191. sql: "SELECT numbytes(b) AS a FROM test",
  192. data: &xsql.Tuple{
  193. Emitter: "test",
  194. Message: xsql.Message{
  195. "a": "中国",
  196. "b": "ya",
  197. "c": "myc",
  198. },
  199. },
  200. result: []map[string]interface{}{{
  201. "a": float64(2),
  202. }},
  203. },
  204. {
  205. sql: "SELECT regexp_matches(a,\"foo.*\") AS a FROM test",
  206. data: &xsql.Tuple{
  207. Emitter: "test",
  208. Message: xsql.Message{
  209. "a": "seafood",
  210. "b": "ya",
  211. "c": "myc",
  212. },
  213. },
  214. result: []map[string]interface{}{{
  215. "a": true,
  216. }},
  217. },
  218. {
  219. sql: "SELECT regexp_matches(b,\"foo.*\") AS a FROM test",
  220. data: &xsql.Tuple{
  221. Emitter: "test",
  222. Message: xsql.Message{
  223. "a": "seafood",
  224. "b": "ya",
  225. "c": "myc",
  226. },
  227. },
  228. result: []map[string]interface{}{{
  229. "a": false,
  230. }},
  231. },
  232. {
  233. sql: "SELECT regexp_replace(a,\"a(x*)b\", \"REP\") AS a FROM test",
  234. data: &xsql.Tuple{
  235. Emitter: "test",
  236. Message: xsql.Message{
  237. "a": "-ab-axxb-",
  238. "b": "ya",
  239. "c": "myc",
  240. },
  241. },
  242. result: []map[string]interface{}{{
  243. "a": "-REP-REP-",
  244. }},
  245. },
  246. {
  247. sql: "SELECT regexp_substr(a,\"foo.*\") AS a FROM test",
  248. data: &xsql.Tuple{
  249. Emitter: "test",
  250. Message: xsql.Message{
  251. "a": "seafood",
  252. "b": "ya",
  253. "c": "myc",
  254. },
  255. },
  256. result: []map[string]interface{}{{
  257. "a": "food",
  258. }},
  259. },
  260. {
  261. sql: "SELECT rpad(a, 3) AS a FROM test",
  262. data: &xsql.Tuple{
  263. Emitter: "test",
  264. Message: xsql.Message{
  265. "a": "NYCNicks",
  266. "b": "ya",
  267. "c": "myc",
  268. },
  269. },
  270. result: []map[string]interface{}{{
  271. "a": "NYCNicks ",
  272. }},
  273. },
  274. {
  275. sql: "SELECT rtrim(a) AS a FROM test",
  276. data: &xsql.Tuple{
  277. Emitter: "test",
  278. Message: xsql.Message{
  279. "a": " \ttrimme\n ",
  280. "b": "ya",
  281. "c": "myc",
  282. },
  283. },
  284. result: []map[string]interface{}{{
  285. "a": " \ttrimme",
  286. }},
  287. },
  288. {
  289. sql: "SELECT substring(a, 3) AS a FROM test",
  290. data: &xsql.Tuple{
  291. Emitter: "test",
  292. Message: xsql.Message{
  293. "a": "NYCNicks",
  294. "b": "ya",
  295. "c": "myc",
  296. },
  297. },
  298. result: []map[string]interface{}{{
  299. "a": "Nicks",
  300. }},
  301. },
  302. {
  303. sql: "SELECT substring(a, 3, 5) AS a FROM test",
  304. data: &xsql.Tuple{
  305. Emitter: "test",
  306. Message: xsql.Message{
  307. "a": "NYCNicks",
  308. "b": "ya",
  309. "c": "myc",
  310. },
  311. },
  312. result: []map[string]interface{}{{
  313. "a": "Ni",
  314. }},
  315. },
  316. {
  317. sql: "SELECT endswith(a, b) AS a FROM test",
  318. data: &xsql.Tuple{
  319. Emitter: "test",
  320. Message: xsql.Message{
  321. "a": "mya",
  322. "b": "ya",
  323. "c": "myc",
  324. },
  325. },
  326. result: []map[string]interface{}{{
  327. "a": true,
  328. }},
  329. },
  330. {
  331. sql: "SELECT endswith(a, c) AS a FROM test",
  332. data: &xsql.Tuple{
  333. Emitter: "test",
  334. Message: xsql.Message{
  335. "a": "mya",
  336. "b": "ya",
  337. "c": "myc",
  338. },
  339. },
  340. result: []map[string]interface{}{{
  341. "a": false,
  342. }},
  343. },
  344. {
  345. sql: "SELECT trim(a) AS a FROM test",
  346. data: &xsql.Tuple{
  347. Emitter: "test",
  348. Message: xsql.Message{
  349. "a": " \ttrimme\n ",
  350. "b": "ya",
  351. "c": "myc",
  352. },
  353. },
  354. result: []map[string]interface{}{{
  355. "a": "trimme",
  356. }},
  357. },
  358. {
  359. sql: "SELECT upper(a) AS a FROM test",
  360. data: &xsql.Tuple{
  361. Emitter: "test",
  362. Message: xsql.Message{
  363. "a": "NYCNicks",
  364. "b": "ya",
  365. "c": "myc",
  366. },
  367. },
  368. result: []map[string]interface{}{{
  369. "a": "NYCNICKS",
  370. }},
  371. },
  372. {
  373. sql: `SELECT split_value(a,"/",0) AS a FROM test1`,
  374. data: &xsql.Tuple{
  375. Emitter: "test",
  376. Message: xsql.Message{
  377. "a": "test/device001/message",
  378. },
  379. },
  380. result: []map[string]interface{}{{
  381. "a": "test",
  382. }},
  383. },
  384. {
  385. sql: `SELECT split_value(a,"/",1) AS a FROM test1`,
  386. data: &xsql.Tuple{
  387. Emitter: "test",
  388. Message: xsql.Message{
  389. "a": "test/device001/message",
  390. },
  391. },
  392. result: []map[string]interface{}{{
  393. "a": "device001",
  394. }},
  395. },
  396. {
  397. sql: `SELECT split_value(a,"/",2) AS a FROM test1`,
  398. data: &xsql.Tuple{
  399. Emitter: "test",
  400. Message: xsql.Message{
  401. "a": "test/device001/message",
  402. },
  403. },
  404. result: []map[string]interface{}{{
  405. "a": "message",
  406. }},
  407. },
  408. {
  409. sql: `SELECT split_value(a,"/",0) AS a, split_value(a,"/",3) AS b FROM test1`,
  410. data: &xsql.Tuple{
  411. Emitter: "test",
  412. Message: xsql.Message{
  413. "a": "/test/device001/message",
  414. },
  415. },
  416. result: []map[string]interface{}{{
  417. "a": "",
  418. "b": "message",
  419. }},
  420. },
  421. }
  422. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  423. contextLogger := conf.Log.WithField("rule", "TestStrFunc_Apply1")
  424. ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
  425. for i, tt := range tests {
  426. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  427. if err != nil || stmt == nil {
  428. t.Errorf("parse sql %s error %v", tt.sql, err)
  429. }
  430. pp := &ProjectOp{Fields: stmt.Fields}
  431. fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
  432. result := pp.Apply(ctx, tt.data, fv, afv)
  433. var mapRes []map[string]interface{}
  434. if v, ok := result.([]byte); ok {
  435. err := json.Unmarshal(v, &mapRes)
  436. if err != nil {
  437. t.Errorf("Failed to parse the input into map.\n")
  438. continue
  439. }
  440. //fmt.Printf("%t\n", mapRes["kuiper_field_0"])
  441. if !reflect.DeepEqual(tt.result, mapRes) {
  442. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  443. }
  444. } else {
  445. t.Errorf("%d. The returned result is not type of []byte\n", i)
  446. }
  447. }
  448. }