misc_func_test.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359
  1. package plans
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/emqx/kuiper/common"
  6. "github.com/emqx/kuiper/xsql"
  7. "github.com/emqx/kuiper/xstream/contexts"
  8. "reflect"
  9. "strings"
  10. "testing"
  11. )
  12. func TestMiscFunc_Apply1(t *testing.T) {
  13. var tests = []struct {
  14. sql string
  15. data *xsql.Tuple
  16. result []map[string]interface{}
  17. }{
  18. {
  19. sql: "SELECT md5(a) AS a FROM test",
  20. data: &xsql.Tuple{
  21. Emitter: "test",
  22. Message: xsql.Message{
  23. "a": "The quick brown fox jumps over the lazy dog",
  24. "b": "myb",
  25. "c": "myc",
  26. },
  27. },
  28. result: []map[string]interface{}{{
  29. "a": strings.ToLower("9E107D9D372BB6826BD81D3542A419D6"),
  30. }},
  31. },
  32. {
  33. sql: "SELECT sha1(a) AS a FROM test",
  34. data: &xsql.Tuple{
  35. Emitter: "test",
  36. Message: xsql.Message{
  37. "a": "The quick brown fox jumps over the lazy dog",
  38. "b": "myb",
  39. "c": "myc",
  40. },
  41. },
  42. result: []map[string]interface{}{{
  43. "a": strings.ToLower("2FD4E1C67A2D28FCED849EE1BB76E7391B93EB12"),
  44. }},
  45. },
  46. {
  47. sql: "SELECT sha256(a) AS a FROM test",
  48. data: &xsql.Tuple{
  49. Emitter: "test",
  50. Message: xsql.Message{
  51. "a": "The quick brown fox jumps over the lazy dog",
  52. "b": "myb",
  53. "c": "myc",
  54. },
  55. },
  56. result: []map[string]interface{}{{
  57. "a": strings.ToLower("D7A8FBB307D7809469CA9ABCB0082E4F8D5651E46D3CDB762D02D0BF37C9E592"),
  58. }},
  59. },
  60. {
  61. sql: "SELECT sha384(a) AS a FROM test",
  62. data: &xsql.Tuple{
  63. Emitter: "test",
  64. Message: xsql.Message{
  65. "a": "The quick brown fox jumps over the lazy dog",
  66. "b": "myb",
  67. "c": "myc",
  68. },
  69. },
  70. result: []map[string]interface{}{{
  71. "a": strings.ToLower("CA737F1014A48F4C0B6DD43CB177B0AFD9E5169367544C494011E3317DBF9A509CB1E5DC1E85A941BBEE3D7F2AFBC9B1"),
  72. }},
  73. },
  74. {
  75. sql: "SELECT sha512(a) AS a FROM test",
  76. data: &xsql.Tuple{
  77. Emitter: "test",
  78. Message: xsql.Message{
  79. "a": "The quick brown fox jumps over the lazy dog",
  80. "b": "myb",
  81. "c": "myc",
  82. },
  83. },
  84. result: []map[string]interface{}{{
  85. "a": strings.ToLower("07E547D9586F6A73F73FBAC0435ED76951218FB7D0C8D788A309D785436BBB642E93A252A954F23912547D1E8A3B5ED6E1BFD7097821233FA0538F3DB854FEE6"),
  86. }},
  87. },
  88. {
  89. sql: "SELECT mqtt(topic) AS a FROM test",
  90. data: &xsql.Tuple{
  91. Emitter: "test",
  92. Message: xsql.Message{},
  93. Metadata: xsql.Metadata{
  94. "topic": "devices/device_001/message",
  95. },
  96. },
  97. result: []map[string]interface{}{{
  98. "a": "devices/device_001/message",
  99. }},
  100. },
  101. {
  102. sql: "SELECT mqtt(topic) AS a FROM test",
  103. data: &xsql.Tuple{
  104. Emitter: "test",
  105. Message: xsql.Message{},
  106. Metadata: xsql.Metadata{
  107. "topic": "devices/device_001/message",
  108. },
  109. },
  110. result: []map[string]interface{}{{
  111. "a": "devices/device_001/message",
  112. }},
  113. },
  114. {
  115. sql: "SELECT topic, mqtt(topic) AS a FROM test",
  116. data: &xsql.Tuple{
  117. Emitter: "test",
  118. Message: xsql.Message{
  119. "topic": "fff",
  120. },
  121. Metadata: xsql.Metadata{
  122. "topic": "devices/device_001/message",
  123. },
  124. },
  125. result: []map[string]interface{}{{
  126. "topic": "fff",
  127. "a": "devices/device_001/message",
  128. }},
  129. },
  130. {
  131. sql: "SELECT isNull(arr) as r FROM test",
  132. data: &xsql.Tuple{
  133. Emitter: "test",
  134. Message: xsql.Message{
  135. "temperature": 43.2,
  136. "arr": []int{},
  137. },
  138. },
  139. result: []map[string]interface{}{{
  140. "r": false,
  141. }},
  142. },
  143. {
  144. sql: "SELECT isNull(arr) as r FROM test",
  145. data: &xsql.Tuple{
  146. Emitter: "test",
  147. Message: xsql.Message{
  148. "temperature": 43.2,
  149. "arr": []float64(nil),
  150. },
  151. },
  152. result: []map[string]interface{}{{
  153. "r": true,
  154. }},
  155. }, {
  156. sql: "SELECT isNull(rec) as r FROM test",
  157. data: &xsql.Tuple{
  158. Emitter: "test",
  159. Message: xsql.Message{
  160. "temperature": 43.2,
  161. "rec": map[string]interface{}(nil),
  162. },
  163. },
  164. result: []map[string]interface{}{{
  165. "r": true,
  166. }},
  167. },
  168. }
  169. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  170. contextLogger := common.Log.WithField("rule", "TestMiscFunc_Apply1")
  171. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  172. for i, tt := range tests {
  173. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  174. if err != nil || stmt == nil {
  175. t.Errorf("parse sql %s error %v", tt.sql, err)
  176. }
  177. pp := &ProjectPlan{Fields: stmt.Fields}
  178. pp.isTest = true
  179. fv, afv := xsql.NewAggregateFunctionValuers()
  180. result := pp.Apply(ctx, tt.data, fv, afv)
  181. var mapRes []map[string]interface{}
  182. if v, ok := result.([]byte); ok {
  183. err := json.Unmarshal(v, &mapRes)
  184. if err != nil {
  185. t.Errorf("Failed to parse the input into map.\n")
  186. continue
  187. }
  188. //fmt.Printf("%t\n", mapRes["rengine_field_0"])
  189. if !reflect.DeepEqual(tt.result, mapRes) {
  190. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  191. }
  192. } else {
  193. t.Errorf("The returned result is not type of []byte\n")
  194. }
  195. }
  196. }
  197. func TestMqttFunc_Apply2(t *testing.T) {
  198. var tests = []struct {
  199. sql string
  200. data xsql.JoinTupleSets
  201. result []map[string]interface{}
  202. }{
  203. {
  204. sql: "SELECT id1, mqtt(src1.topic) AS a, mqtt(src2.topic) as b FROM src1 LEFT JOIN src2 ON src1.id1 = src2.id1",
  205. data: xsql.JoinTupleSets{
  206. xsql.JoinTuple{
  207. Tuples: []xsql.Tuple{
  208. {Emitter: "src1", Message: xsql.Message{"id1": "1", "f1": "v1"}, Metadata: xsql.Metadata{"topic": "devices/type1/device001"}},
  209. {Emitter: "src2", Message: xsql.Message{"id2": "1", "f2": "w1"}, Metadata: xsql.Metadata{"topic": "devices/type2/device001"}},
  210. },
  211. },
  212. },
  213. result: []map[string]interface{}{{
  214. "id1": "1",
  215. "a": "devices/type1/device001",
  216. "b": "devices/type2/device001",
  217. }},
  218. },
  219. }
  220. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  221. contextLogger := common.Log.WithField("rule", "TestMqttFunc_Apply2")
  222. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  223. for i, tt := range tests {
  224. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  225. if err != nil || stmt == nil {
  226. t.Errorf("parse sql %s error %v", tt.sql, err)
  227. }
  228. pp := &ProjectPlan{Fields: stmt.Fields}
  229. pp.isTest = true
  230. fv, afv := xsql.NewAggregateFunctionValuers()
  231. result := pp.Apply(ctx, tt.data, fv, afv)
  232. var mapRes []map[string]interface{}
  233. if v, ok := result.([]byte); ok {
  234. err := json.Unmarshal(v, &mapRes)
  235. if err != nil {
  236. t.Errorf("Failed to parse the input into map.\n")
  237. continue
  238. }
  239. //fmt.Printf("%t\n", mapRes["rengine_field_0"])
  240. if !reflect.DeepEqual(tt.result, mapRes) {
  241. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  242. }
  243. } else {
  244. t.Errorf("The returned result is not type of []byte\n")
  245. }
  246. }
  247. }
  248. func TestMetaFunc_Apply1(t *testing.T) {
  249. var tests = []struct {
  250. sql string
  251. data interface{}
  252. result interface{}
  253. }{
  254. {
  255. sql: "SELECT topic, meta(topic) AS a FROM test",
  256. data: &xsql.Tuple{
  257. Emitter: "test",
  258. Message: xsql.Message{
  259. "topic": "fff",
  260. },
  261. Metadata: xsql.Metadata{
  262. "topic": "devices/device_001/message",
  263. },
  264. },
  265. result: []map[string]interface{}{{
  266. "topic": "fff",
  267. "a": "devices/device_001/message",
  268. }},
  269. },
  270. {
  271. sql: "SELECT meta(device) as d, meta(temperature->device) as r FROM test",
  272. data: &xsql.Tuple{
  273. Emitter: "test",
  274. Message: xsql.Message{
  275. "temperature": 43.2,
  276. },
  277. Metadata: xsql.Metadata{
  278. "temperature": map[string]interface{}{
  279. "id": "dfadfasfas",
  280. "device": "device2",
  281. },
  282. "device": "gateway",
  283. },
  284. },
  285. result: []map[string]interface{}{{
  286. "d": "gateway",
  287. "r": "device2",
  288. }},
  289. },
  290. {
  291. sql: "SELECT meta(*) as r FROM test",
  292. data: &xsql.Tuple{
  293. Emitter: "test",
  294. Message: xsql.Message{
  295. "temperature": 43.2,
  296. },
  297. Metadata: xsql.Metadata{
  298. "temperature": map[string]interface{}{
  299. "id": "dfadfasfas",
  300. "device": "device2",
  301. },
  302. "device": "gateway",
  303. },
  304. },
  305. result: []map[string]interface{}{{
  306. "r": map[string]interface{}{
  307. "temperature": map[string]interface{}{
  308. "id": "dfadfasfas",
  309. "device": "device2",
  310. },
  311. "device": "gateway",
  312. },
  313. }},
  314. },
  315. }
  316. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  317. contextLogger := common.Log.WithField("rule", "TestMetaFunc_Apply1")
  318. ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
  319. for i, tt := range tests {
  320. if i != 2 {
  321. continue
  322. }
  323. stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
  324. if err != nil || stmt == nil {
  325. t.Errorf("parse sql %s error %v", tt.sql, err)
  326. }
  327. pp := &ProjectPlan{Fields: stmt.Fields}
  328. pp.isTest = true
  329. fv, afv := xsql.NewAggregateFunctionValuers()
  330. result := pp.Apply(ctx, tt.data, fv, afv)
  331. var mapRes []map[string]interface{}
  332. if v, ok := result.([]byte); ok {
  333. err := json.Unmarshal(v, &mapRes)
  334. if err != nil {
  335. t.Errorf("Failed to parse the input into map.\n")
  336. continue
  337. }
  338. //fmt.Printf("%t\n", mapRes["rengine_field_0"])
  339. if !reflect.DeepEqual(tt.result, mapRes) {
  340. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
  341. }
  342. } else {
  343. t.Errorf("The returned result is not type of []byte\n")
  344. }
  345. }
  346. }