extension_test.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442
  1. // +build !windows
  2. package processors
  3. import (
  4. "bufio"
  5. "encoding/json"
  6. "fmt"
  7. "github.com/emqx/kuiper/common"
  8. "github.com/emqx/kuiper/xsql"
  9. "github.com/emqx/kuiper/xstream"
  10. "github.com/emqx/kuiper/xstream/api"
  11. "github.com/emqx/kuiper/xstream/nodes"
  12. "github.com/emqx/kuiper/xstream/test"
  13. "os"
  14. "path"
  15. "reflect"
  16. "strings"
  17. "testing"
  18. "time"
  19. )
  20. //This cannot be run in Windows. And the plugins must be built to so before running this
  21. //For Windows, run it in wsl with go test xsql/processors/extension_test.go xsql/processors/xsql_processor.go
  22. func setup() *RuleProcessor {
  23. log := common.Log
  24. os.Remove(CACHE_FILE)
  25. dbDir, err := common.GetDataLoc()
  26. if err != nil {
  27. log.Panic(err)
  28. }
  29. log.Infof("db location is %s", dbDir)
  30. p := NewStreamProcessor(path.Join(dbDir, "stream"))
  31. demo := `DROP STREAM ext`
  32. p.ExecStmt(demo)
  33. demo = "CREATE STREAM ext (count bigint) WITH (DATASOURCE=\"users\", FORMAT=\"JSON\", TYPE=\"random\", CONF_KEY=\"ext\")"
  34. _, err = p.ExecStmt(demo)
  35. if err != nil {
  36. panic(err)
  37. }
  38. demo = `DROP STREAM ext2`
  39. p.ExecStmt(demo)
  40. demo = "CREATE STREAM ext2 (count bigint) WITH (DATASOURCE=\"users\", FORMAT=\"JSON\", TYPE=\"random\", CONF_KEY=\"dedup\")"
  41. _, err = p.ExecStmt(demo)
  42. if err != nil {
  43. panic(err)
  44. }
  45. rp := NewRuleProcessor(dbDir)
  46. return rp
  47. }
  48. var CACHE_FILE = "cache"
  49. //Test for source, sink, func and agg func extensions
  50. //The .so files must be in the plugins folder
  51. func TestExtensions(t *testing.T) {
  52. log := common.Log
  53. var tests = []struct {
  54. name string
  55. rj string
  56. minLength int
  57. maxLength int
  58. }{
  59. {
  60. name: `$$test1`,
  61. rj: "{\"sql\": \"SELECT count(echo(count)) as c, echo(count) as e, countPlusOne(count) as p FROM ext where count > 49\",\"actions\": [{\"file\": {\"path\":\"" + CACHE_FILE + "\"}}]}",
  62. minLength: 5,
  63. }, {
  64. name: `$$test2`,
  65. rj: "{\"sql\": \"SELECT count(echo(count)) as c, echo(count) as e, countPlusOne(count) as p FROM ext2\",\"actions\": [{\"file\": {\"path\":\"" + CACHE_FILE + "\"}}]}",
  66. maxLength: 2,
  67. },
  68. }
  69. fmt.Printf("The test bucket size is %d.\n\n", len(tests))
  70. rp := setup()
  71. done := make(chan struct{})
  72. defer close(done)
  73. for i, tt := range tests {
  74. rp.ExecDrop(tt.name)
  75. rs, err := rp.ExecCreate(tt.name, tt.rj)
  76. if err != nil {
  77. t.Errorf("failed to create rule: %s.", err)
  78. continue
  79. }
  80. os.Create(CACHE_FILE)
  81. tp, err := rp.ExecInitRule(rs)
  82. if err != nil {
  83. t.Errorf("fail to init rule: %v", err)
  84. continue
  85. }
  86. go func() {
  87. select {
  88. case err := <-tp.Open():
  89. log.Println(err)
  90. tp.Cancel()
  91. case <-time.After(900 * time.Millisecond):
  92. tp.Cancel()
  93. }
  94. }()
  95. time.Sleep(1000 * time.Millisecond)
  96. log.Printf("exit main program after a second")
  97. results := getResults()
  98. log.Infof("get results %v", results)
  99. os.Remove(CACHE_FILE)
  100. var maps [][]map[string]interface{}
  101. for _, v := range results {
  102. var mapRes []map[string]interface{}
  103. err := json.Unmarshal([]byte(v), &mapRes)
  104. if err != nil {
  105. t.Errorf("Failed to parse the input into map")
  106. continue
  107. }
  108. maps = append(maps, mapRes)
  109. }
  110. if tt.minLength > 0 {
  111. if len(maps) < tt.minLength {
  112. t.Errorf("%d. %q\n\nresult length is smaller than minlength:\n\ngot=%#v\n\n", i, tt.rj, maps)
  113. break
  114. }
  115. }
  116. if tt.maxLength > 0 {
  117. if len(maps) > tt.maxLength {
  118. t.Errorf("%d. %q\n\nresult length is bigger than maxLength:\n\ngot=%#v\n\n", i, tt.rj, maps)
  119. break
  120. }
  121. }
  122. for _, r := range maps {
  123. if len(r) != 1 {
  124. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  125. break
  126. }
  127. r := r[0]
  128. c := int((r["c"]).(float64))
  129. if c != 1 {
  130. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  131. break
  132. }
  133. e := int((r["e"]).(float64))
  134. if e != 50 && e != 51 {
  135. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  136. break
  137. }
  138. p := int(r["p"].(float64))
  139. if p != 2 {
  140. t.Errorf("%d. %q\n\nresult mismatch:\n\ngot=%#v\n\n", i, tt.rj, maps)
  141. break
  142. }
  143. }
  144. }
  145. }
  146. func getResults() []string {
  147. f, err := os.Open(CACHE_FILE)
  148. if err != nil {
  149. panic(err)
  150. }
  151. result := make([]string, 0)
  152. scanner := bufio.NewScanner(f)
  153. for scanner.Scan() {
  154. result = append(result, scanner.Text())
  155. }
  156. if err := scanner.Err(); err != nil {
  157. panic(err)
  158. }
  159. f.Close()
  160. return result
  161. }
  162. func getExtMockSource(name string, done <-chan int, size int) *nodes.SourceNode {
  163. var data []*xsql.Tuple
  164. switch name {
  165. case "text":
  166. data = []*xsql.Tuple{
  167. {
  168. Emitter: name,
  169. Message: map[string]interface{}{
  170. "slogan": "Impossible is nothing",
  171. "brand": "Adidas",
  172. },
  173. },
  174. {
  175. Emitter: name,
  176. Message: map[string]interface{}{
  177. "slogan": "Stronger than dirt",
  178. "brand": "Ajax",
  179. },
  180. },
  181. {
  182. Emitter: name,
  183. Message: map[string]interface{}{
  184. "slogan": "Belong anywhere",
  185. "brand": "Airbnb",
  186. },
  187. },
  188. {
  189. Emitter: name,
  190. Message: map[string]interface{}{
  191. "slogan": "I can't believe I ate the whole thing",
  192. "brand": "Alka Seltzer",
  193. },
  194. },
  195. {
  196. Emitter: name,
  197. Message: map[string]interface{}{
  198. "slogan": "You're in good hands",
  199. "brand": "Allstate",
  200. },
  201. },
  202. {
  203. Emitter: name,
  204. Message: map[string]interface{}{
  205. "slogan": "Don't leave home without it",
  206. "brand": "American Express",
  207. },
  208. },
  209. {
  210. Emitter: name,
  211. Message: map[string]interface{}{
  212. "slogan": "Think different",
  213. "brand": "Apple",
  214. },
  215. },
  216. {
  217. Emitter: name,
  218. Message: map[string]interface{}{
  219. "slogan": "We try harder",
  220. "brand": "Avis",
  221. },
  222. },
  223. }
  224. }
  225. return nodes.NewSourceNodeWithSource(name, test.NewMockSource(data[:size], done, false), map[string]string{
  226. "DATASOURCE": name,
  227. })
  228. }
  229. func setup2() *RuleProcessor {
  230. log := common.Log
  231. dbDir, err := common.GetDataLoc()
  232. if err != nil {
  233. log.Panic(err)
  234. }
  235. log.Infof("db location is %s", dbDir)
  236. p := NewStreamProcessor(path.Join(dbDir, "stream"))
  237. demo := `DROP STREAM text`
  238. p.ExecStmt(demo)
  239. demo = "CREATE STREAM text (slogan string, brand string) WITH (DATASOURCE=\"users\", FORMAT=\"JSON\")"
  240. _, err = p.ExecStmt(demo)
  241. if err != nil {
  242. panic(err)
  243. }
  244. rp := NewRuleProcessor(dbDir)
  245. return rp
  246. }
  247. func TestFuncState(t *testing.T) {
  248. var tests = []struct {
  249. name string
  250. sql string
  251. r [][]map[string]interface{}
  252. s string
  253. m map[string]interface{}
  254. }{
  255. {
  256. name: `rule1`,
  257. sql: `SELECT accumulateWordCount(slogan, " ") as wc FROM text`,
  258. r: [][]map[string]interface{}{
  259. {{
  260. "wc": float64(3),
  261. }},
  262. {{
  263. "wc": float64(6),
  264. }},
  265. {{
  266. "wc": float64(8),
  267. }},
  268. {{
  269. "wc": float64(16),
  270. }},
  271. {{
  272. "wc": float64(20),
  273. }},
  274. {{
  275. "wc": float64(25),
  276. }},
  277. {{
  278. "wc": float64(27),
  279. }},
  280. {{
  281. "wc": float64(30),
  282. }},
  283. },
  284. m: map[string]interface{}{
  285. "op_preprocessor_text_0_exceptions_total": int64(0),
  286. "op_preprocessor_text_0_process_latency_ms": int64(0),
  287. "op_preprocessor_text_0_records_in_total": int64(8),
  288. "op_preprocessor_text_0_records_out_total": int64(8),
  289. "op_project_0_exceptions_total": int64(0),
  290. "op_project_0_process_latency_ms": int64(0),
  291. "op_project_0_records_in_total": int64(8),
  292. "op_project_0_records_out_total": int64(8),
  293. "sink_mockSink_0_exceptions_total": int64(0),
  294. "sink_mockSink_0_records_in_total": int64(8),
  295. "sink_mockSink_0_records_out_total": int64(8),
  296. "source_text_0_exceptions_total": int64(0),
  297. "source_text_0_records_in_total": int64(8),
  298. "source_text_0_records_out_total": int64(8),
  299. },
  300. s: "sink_mockSink_0_records_out_total",
  301. },
  302. }
  303. p := setup2()
  304. for i, tt := range tests {
  305. p.ExecDrop(tt.name)
  306. parser := xsql.NewParser(strings.NewReader(tt.sql))
  307. var (
  308. sources []*nodes.SourceNode
  309. syncs []chan int
  310. )
  311. if stmt, err := xsql.Language.Parse(parser); err != nil {
  312. t.Errorf("parse sql %s error: %s", tt.sql, err)
  313. } else {
  314. if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
  315. t.Errorf("sql %s is not a select statement", tt.sql)
  316. } else {
  317. streams := xsql.GetStreams(selectStmt)
  318. for _, stream := range streams {
  319. next := make(chan int)
  320. syncs = append(syncs, next)
  321. source := getExtMockSource(stream, next, 8)
  322. sources = append(sources, source)
  323. }
  324. }
  325. }
  326. tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql, Options: &api.RuleOption{
  327. BufferLength: 100,
  328. }}, sources)
  329. if err != nil {
  330. t.Error(err)
  331. }
  332. mockSink := test.NewMockSink()
  333. sink := nodes.NewSinkNodeWithSink("mockSink", mockSink, nil)
  334. tp.AddSink(inputs, sink)
  335. errCh := tp.Open()
  336. func() {
  337. for i := 0; i < 8; i++ {
  338. syncs[i%len(syncs)] <- i
  339. select {
  340. case err = <-errCh:
  341. t.Log(err)
  342. tp.Cancel()
  343. return
  344. default:
  345. }
  346. }
  347. for retry := 100; retry > 0; retry-- {
  348. if err := compareMetrics2(tp, tt.m, tt.sql); err == nil {
  349. break
  350. }
  351. time.Sleep(time.Duration(retry) * time.Millisecond)
  352. }
  353. }()
  354. results := mockSink.GetResults()
  355. var maps [][]map[string]interface{}
  356. for _, v := range results {
  357. var mapRes []map[string]interface{}
  358. err := json.Unmarshal(v, &mapRes)
  359. if err != nil {
  360. t.Errorf("Failed to parse the input into map")
  361. continue
  362. }
  363. maps = append(maps, mapRes)
  364. }
  365. if !reflect.DeepEqual(tt.r, maps) {
  366. t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
  367. continue
  368. }
  369. if err := compareMetrics2(tp, tt.m, tt.sql); err != nil {
  370. t.Errorf("%d. %q\n\n%v", i, tt.sql, err)
  371. }
  372. tp.Cancel()
  373. }
  374. }
  375. func compareMetrics2(tp *xstream.TopologyNew, m map[string]interface{}, sql string) (err error) {
  376. keys, values := tp.GetMetrics()
  377. //for i, k := range keys {
  378. // log.Printf("%s:%v", k, values[i])
  379. //}
  380. for k, v := range m {
  381. var (
  382. index int
  383. key string
  384. matched bool
  385. )
  386. for index, key = range keys {
  387. if k == key {
  388. if strings.HasSuffix(k, "process_latency_ms") {
  389. if values[index].(int64) >= v.(int64) {
  390. matched = true
  391. continue
  392. } else {
  393. break
  394. }
  395. }
  396. if values[index] == v {
  397. matched = true
  398. }
  399. break
  400. }
  401. }
  402. if matched {
  403. continue
  404. }
  405. //do not find
  406. if index < len(values) {
  407. return fmt.Errorf("metrics mismatch for %s:\n\nexp=%#v(%t)\n\ngot=%#v(%t)\n\n", k, v, v, values[index], values[index])
  408. } else {
  409. return fmt.Errorf("metrics mismatch for %s:\n\nexp=%#v\n\ngot=nil\n\n", k, v)
  410. }
  411. }
  412. return nil
  413. }