extension_test.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440
  1. package processors
  2. import (
  3. "bufio"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/emqx/kuiper/common"
  7. "github.com/emqx/kuiper/xsql"
  8. "github.com/emqx/kuiper/xstream"
  9. "github.com/emqx/kuiper/xstream/api"
  10. "github.com/emqx/kuiper/xstream/nodes"
  11. "github.com/emqx/kuiper/xstream/test"
  12. "os"
  13. "path"
  14. "reflect"
  15. "strings"
  16. "testing"
  17. "time"
  18. )
  19. //This cannot be run in Windows. And the plugins must be built to so before running this
  20. //For Windows, run it in wsl with go test xsql/processors/extension_test.go xsql/processors/xsql_processor.go
  21. func setup() *RuleProcessor {
  22. log := common.Log
  23. os.Remove(CACHE_FILE)
  24. dbDir, err := common.GetAndCreateDataLoc("test")
  25. if err != nil {
  26. log.Panic(err)
  27. }
  28. log.Infof("db location is %s", dbDir)
  29. p := NewStreamProcessor(path.Join(dbDir, "stream"))
  30. demo := `DROP STREAM ext`
  31. p.ExecStmt(demo)
  32. demo = "CREATE STREAM ext (count bigint) WITH (DATASOURCE=\"users\", FORMAT=\"JSON\", TYPE=\"random\", CONF_KEY=\"ext\")"
  33. _, err = p.ExecStmt(demo)
  34. if err != nil {
  35. panic(err)
  36. }
  37. demo = `DROP STREAM ext2`
  38. p.ExecStmt(demo)
  39. demo = "CREATE STREAM ext2 (count bigint) WITH (DATASOURCE=\"users\", FORMAT=\"JSON\", TYPE=\"random\", CONF_KEY=\"dedup\")"
  40. _, err = p.ExecStmt(demo)
  41. if err != nil {
  42. panic(err)
  43. }
  44. rp := NewRuleProcessor(dbDir)
  45. return rp
  46. }
  47. var CACHE_FILE = "cache"
  48. //Test for source, sink, func and agg func extensions
  49. //The .so files must be in the plugins folder
  50. func TestExtensions(t *testing.T) {
  51. log := common.Log
  52. var tests = []struct {
  53. name string
  54. rj string
  55. minLength int
  56. maxLength int
  57. }{
  58. {
  59. name: `$$test1`,
  60. rj: "{\"sql\": \"SELECT count(echo(count)) as c, echo(count) as e, countPlusOne(count) as p FROM ext where count > 49\",\"actions\": [{\"file\": {\"path\":\"" + CACHE_FILE + "\"}}]}",
  61. minLength: 5,
  62. }, {
  63. name: `$$test2`,
  64. rj: "{\"sql\": \"SELECT count(echo(count)) as c, echo(count) as e, countPlusOne(count) as p FROM ext2\",\"actions\": [{\"file\": {\"path\":\"" + CACHE_FILE + "\"}}]}",
  65. maxLength: 2,
  66. },
  67. }
  68. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  69. rp := setup()
  70. done := make(chan struct{})
  71. defer close(done)
  72. for i, tt := range tests {
  73. rp.ExecDrop(tt.name)
  74. rs, err := rp.ExecCreate(tt.name, tt.rj)
  75. if err != nil {
  76. t.Errorf("failed to create rule: %s.", err)
  77. continue
  78. }
  79. os.Create(CACHE_FILE)
  80. tp, err := rp.ExecInitRule(rs)
  81. if err != nil {
  82. t.Errorf("fail to init rule: %v", err)
  83. continue
  84. }
  85. go func() {
  86. select {
  87. case err := <-tp.Open():
  88. log.Println(err)
  89. tp.Cancel()
  90. case <-time.After(900 * time.Millisecond):
  91. tp.Cancel()
  92. }
  93. }()
  94. time.Sleep(1000 * time.Millisecond)
  95. log.Printf("exit main program after a second")
  96. results := getResults()
  97. log.Infof("get results %v", results)
  98. os.Remove(CACHE_FILE)
  99. var maps [][]map[string]interface{}
  100. for _, v := range results {
  101. var mapRes []map[string]interface{}
  102. err := json.Unmarshal([]byte(v), &mapRes)
  103. if err != nil {
  104. t.Errorf("Failed to parse the input into map")
  105. continue
  106. }
  107. maps = append(maps, mapRes)
  108. }
  109. if tt.minLength > 0 {
  110. if len(maps) < tt.minLength {
  111. t.Errorf("%d. %q\n\nresult length is smaller than minlength:\n\ngot=%#v\n\n", i, tt.rj, maps)
  112. break
  113. }
  114. }
  115. if tt.maxLength > 0 {
  116. if len(maps) > tt.maxLength {
  117. t.Errorf("%d. %q\n\nresult length is bigger than maxLength:\n\ngot=%#v\n\n", i, tt.rj, maps)
  118. break
  119. }
  120. }
  121. for _, r := range maps {
  122. if len(r) != 1 {
  123. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  124. break
  125. }
  126. r := r[0]
  127. c := int((r["c"]).(float64))
  128. if c != 1 {
  129. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  130. break
  131. }
  132. e := int((r["e"]).(float64))
  133. if e != 50 && e != 51 {
  134. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  135. break
  136. }
  137. p := int(r["p"].(float64))
  138. if p != 2 {
  139. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  140. break
  141. }
  142. }
  143. }
  144. }
  145. func getResults() []string {
  146. f, err := os.Open(CACHE_FILE)
  147. if err != nil {
  148. panic(err)
  149. }
  150. result := make([]string, 0)
  151. scanner := bufio.NewScanner(f)
  152. for scanner.Scan() {
  153. result = append(result, scanner.Text())
  154. }
  155. if err := scanner.Err(); err != nil {
  156. panic(err)
  157. }
  158. f.Close()
  159. return result
  160. }
  161. func getExtMockSource(name string, done <-chan int, size int) *nodes.SourceNode {
  162. var data []*xsql.Tuple
  163. switch name {
  164. case "text":
  165. data = []*xsql.Tuple{
  166. {
  167. Emitter: name,
  168. Message: map[string]interface{}{
  169. "slogan": "Impossible is nothing",
  170. "brand": "Adidas",
  171. },
  172. },
  173. {
  174. Emitter: name,
  175. Message: map[string]interface{}{
  176. "slogan": "Stronger than dirt",
  177. "brand": "Ajax",
  178. },
  179. },
  180. {
  181. Emitter: name,
  182. Message: map[string]interface{}{
  183. "slogan": "Belong anywhere",
  184. "brand": "Airbnb",
  185. },
  186. },
  187. {
  188. Emitter: name,
  189. Message: map[string]interface{}{
  190. "slogan": "I can't believe I ate the whole thing",
  191. "brand": "Alka Seltzer",
  192. },
  193. },
  194. {
  195. Emitter: name,
  196. Message: map[string]interface{}{
  197. "slogan": "You're in good hands",
  198. "brand": "Allstate",
  199. },
  200. },
  201. {
  202. Emitter: name,
  203. Message: map[string]interface{}{
  204. "slogan": "Don't leave home without it",
  205. "brand": "American Express",
  206. },
  207. },
  208. {
  209. Emitter: name,
  210. Message: map[string]interface{}{
  211. "slogan": "Think different",
  212. "brand": "Apple",
  213. },
  214. },
  215. {
  216. Emitter: name,
  217. Message: map[string]interface{}{
  218. "slogan": "We try harder",
  219. "brand": "Avis",
  220. },
  221. },
  222. }
  223. }
  224. return nodes.NewSourceNodeWithSource(name, test.NewMockSource(data[:size], done, false), map[string]string{
  225. "DATASOURCE": name,
  226. })
  227. }
  228. func setup2() *RuleProcessor {
  229. log := common.Log
  230. dbDir, err := common.GetAndCreateDataLoc("test")
  231. if err != nil {
  232. log.Panic(err)
  233. }
  234. log.Infof("db location is %s", dbDir)
  235. p := NewStreamProcessor(path.Join(dbDir, "stream"))
  236. demo := `DROP STREAM text`
  237. p.ExecStmt(demo)
  238. demo = "CREATE STREAM text (slogan string, brand string) WITH (DATASOURCE=\"users\", FORMAT=\"JSON\")"
  239. _, err = p.ExecStmt(demo)
  240. if err != nil {
  241. panic(err)
  242. }
  243. rp := NewRuleProcessor(dbDir)
  244. return rp
  245. }
  246. func TestFuncState(t *testing.T) {
  247. var tests = []struct {
  248. name string
  249. sql string
  250. r [][]map[string]interface{}
  251. s string
  252. m map[string]interface{}
  253. }{
  254. {
  255. name: `rule1`,
  256. sql: `SELECT accumulateWordCount(slogan, " ") as wc FROM text`,
  257. r: [][]map[string]interface{}{
  258. {{
  259. "wc": float64(3),
  260. }},
  261. {{
  262. "wc": float64(6),
  263. }},
  264. {{
  265. "wc": float64(8),
  266. }},
  267. {{
  268. "wc": float64(16),
  269. }},
  270. {{
  271. "wc": float64(20),
  272. }},
  273. {{
  274. "wc": float64(25),
  275. }},
  276. {{
  277. "wc": float64(27),
  278. }},
  279. {{
  280. "wc": float64(30),
  281. }},
  282. },
  283. m: map[string]interface{}{
  284. "op_preprocessor_text_0_exceptions_total": int64(0),
  285. "op_preprocessor_text_0_process_latency_ms": int64(0),
  286. "op_preprocessor_text_0_records_in_total": int64(8),
  287. "op_preprocessor_text_0_records_out_total": int64(8),
  288. "op_project_0_exceptions_total": int64(0),
  289. "op_project_0_process_latency_ms": int64(0),
  290. "op_project_0_records_in_total": int64(8),
  291. "op_project_0_records_out_total": int64(8),
  292. "sink_mockSink_0_exceptions_total": int64(0),
  293. "sink_mockSink_0_records_in_total": int64(8),
  294. "sink_mockSink_0_records_out_total": int64(8),
  295. "source_text_0_exceptions_total": int64(0),
  296. "source_text_0_records_in_total": int64(8),
  297. "source_text_0_records_out_total": int64(8),
  298. },
  299. s: "sink_mockSink_0_records_out_total",
  300. },
  301. }
  302. p := setup2()
  303. for i, tt := range tests {
  304. p.ExecDrop(tt.name)
  305. parser := xsql.NewParser(strings.NewReader(tt.sql))
  306. var (
  307. sources []*nodes.SourceNode
  308. syncs []chan int
  309. )
  310. if stmt, err := xsql.Language.Parse(parser); err != nil {
  311. t.Errorf("parse sql %s error: %s", tt.sql, err)
  312. } else {
  313. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  314. t.Errorf("sql %s is not a select statement", tt.sql)
  315. } else {
  316. streams := xsql.GetStreams(selectStmt)
  317. for _, stream := range streams {
  318. next := make(chan int)
  319. syncs = append(syncs, next)
  320. source := getExtMockSource(stream, next, 8)
  321. sources = append(sources, source)
  322. }
  323. }
  324. }
  325. tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql, Options: map[string]interface{}{
  326. "bufferLength": float64(100),
  327. }}, sources)
  328. if err != nil {
  329. t.Error(err)
  330. }
  331. mockSink := test.NewMockSink()
  332. sink := nodes.NewSinkNodeWithSink("mockSink", mockSink, nil)
  333. tp.AddSink(inputs, sink)
  334. errCh := tp.Open()
  335. func() {
  336. for i := 0; i < 8; i++ {
  337. syncs[i%len(syncs)] <- i
  338. select {
  339. case err = <-errCh:
  340. t.Log(err)
  341. tp.Cancel()
  342. return
  343. default:
  344. }
  345. }
  346. for retry := 100; retry > 0; retry-- {
  347. if err := compareMetrics2(tp, tt.m, tt.sql); err == nil {
  348. break
  349. }
  350. time.Sleep(time.Duration(retry) * time.Millisecond)
  351. }
  352. }()
  353. results := mockSink.GetResults()
  354. var maps [][]map[string]interface{}
  355. for _, v := range results {
  356. var mapRes []map[string]interface{}
  357. err := json.Unmarshal(v, &mapRes)
  358. if err != nil {
  359. t.Errorf("Failed to parse the input into map")
  360. continue
  361. }
  362. maps = append(maps, mapRes)
  363. }
  364. if !reflect.DeepEqual(tt.r, maps) {
  365. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  366. continue
  367. }
  368. if err := compareMetrics2(tp, tt.m, tt.sql); err != nil {
  369. t.Errorf("%d. %q\n\n%v", i, tt.sql, err)
  370. }
  371. tp.Cancel()
  372. }
  373. }
  374. func compareMetrics2(tp *xstream.TopologyNew, m map[string]interface{}, sql string) (err error) {
  375. keys, values := tp.GetMetrics()
  376. //for i, k := range keys {
  377. // log.Printf("%s:%v", k, values[i])
  378. //}
  379. for k, v := range m {
  380. var (
  381. index int
  382. key string
  383. matched bool
  384. )
  385. for index, key = range keys {
  386. if k == key {
  387. if strings.HasSuffix(k, "process_latency_ms") {
  388. if values[index].(int64) >= v.(int64) {
  389. matched = true
  390. continue
  391. } else {
  392. break
  393. }
  394. }
  395. if values[index] == v {
  396. matched = true
  397. }
  398. break
  399. }
  400. }
  401. if matched {
  402. continue
  403. }
  404. //do not find
  405. if index < len(values) {
  406. return fmt.Errorf("metrics mismatch for %s:\n\nexp=%#v(%t)\n\ngot=%#v(%t)\n\n", k, v, v, values[index], values[index])
  407. } else {
  408. return fmt.Errorf("metrics mismatch for %s:\n\nexp=%#v\n\ngot=nil\n\n", k, v)
  409. }
  410. }
  411. return nil
  412. }