1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192 |
- package main
- import (
- "engine/common"
- "engine/xsql"
- "engine/xsql/plans"
- "engine/xstream"
- "engine/xstream/collectors"
- "engine/xstream/extensions"
- "engine/xstream/operators"
- "strings"
- )
- func main() {
- log := common.Log
- demo1Stream, err := xsql.NewParser(strings.NewReader("CREATE STREAM demo (count bigint) WITH (datasource=\"demo\", FORMAT=\"AVRO\", KEY=\"USERID\")")).ParseCreateStreamStmt()
- //demo2Stream, err := xsql.NewParser(strings.NewReader("CREATE STREAM demo2 (abc bigint) WITH (datasource=\"demo2\", FORMAT=\"AVRO\", KEY=\"USERID\")")).ParseCreateStreamStmt()
- //stmt, err := xsql.NewParser(strings.NewReader("SELECT count FROM demo1 where demo1.count > 3")).Parse()
- if err != nil {
- log.Fatal("Failed to parse SQL for %s. \n", err)
- }
- tp := xstream.New()
- mqs1, err := extensions.NewWithName("srv1", "demo", "")
- if err != nil {
- log.Fatalf("Found error %s.\n", err)
- return
- }
- tp.AddSrc(mqs1)
- //mqs2, err := extensions.NewWithName("srv1", "demo2")
- //if err != nil {
- // log.Fatalf("Found error %s.\n", err)
- // return
- //}
- //tp.AddSrc(mqs2)
- preprocessorOp1 := xstream.Transform(&plans.Preprocessor{StreamStmt: demo1Stream}, "preprocessor1")
- tp.AddOperator([]xstream.Emitter{mqs1}, preprocessorOp1)
- //preprocessorOp2 := xstream.Transform(&plans.Preprocessor{StreamStmt: demo2Stream}, "preprocessor2")
- //tp.AddOperator([]xstream.Emitter{mqs2}, preprocessorOp2)
- //filterOp := xstream.Transform(&plans.FilterPlan{Condition: stmt.Condition}, "filter plan")
- //filterOp.SetConcurrency(3)
- //tp.AddOperator([]xstream.Emitter{preprocessorOp1, preprocessorOp2}, filterOp)
- //
- //projectOp := xstream.Transform(&plans.ProjectPlan{Fields: stmt.Fields}, "project plan")
- //tp.AddOperator([]xstream.Emitter{filterOp}, projectOp)
- //windowOp := operators.NewWindowOp("windowOp", &operators.WindowConfig{
- // Type: operators.NO_WINDOW,
- //})
- //windowOp := operators.NewWindowOp("windowOp", &operators.WindowConfig{
- // Type: operators.TUMBLING_WINDOW,
- // Length: 30000,
- //})
- //windowOp := operators.NewWindowOp("windowOp", &operators.WindowConfig{
- // Type: operators.HOPPING_WINDOW,
- // Length: 20000,
- // Interval: 10000,
- //})
- //
- //windowOp := operators.NewWindowOp("windowOp", &operators.WindowConfig{
- // Type: operators.SLIDING_WINDOW,
- // Length: 20000,
- //})
- windowOp := operators.NewWindowOp("windowOp", &operators.WindowConfig{
- Type: operators.SESSION_WINDOW,
- Length: 20000,
- Interval: 6000,
- })
- tp.AddOperator([]xstream.Emitter{preprocessorOp1}, windowOp)
- tp.AddSink([]xstream.Emitter{windowOp}, collectors.Func(func(data interface{}) error {
- log.Println("sink result %s", data)
- return nil
- }))
- if err := <-tp.Open(); err != nil {
- log.Fatal(err)
- return
- }
- }
|