package main import ( "context" "github.com/emqx/kuiper/xstream/api" "math/rand" "time" ) //Emit data randomly with only a string field type randomSource struct { interval int seed int pattern map[string]interface{} cancel context.CancelFunc } func (s *randomSource) Configure(topic string, props map[string]interface{}) error { if i, ok := props["interval"].(float64); ok { s.interval = int(i) } else { s.interval = 1000 } if p, ok := props["pattern"].(map[string]interface{}); ok { s.pattern = p } else { s.pattern = make(map[string]interface{}) s.pattern["count"] = 50 } if i, ok := props["seed"].(float64); ok { s.seed = int(i) } else { s.seed = 1 } return nil } func (s *randomSource) Open(ctx api.StreamContext, consume api.ConsumeFunc) (err error) { t := time.NewTicker(time.Duration(s.interval) * time.Millisecond) exeCtx, cancel := ctx.WithCancel() s.cancel = cancel go func(exeCtx api.StreamContext) { defer t.Stop() for { select { case <-t.C: consume(randomize(s.pattern, s.seed), nil) case <-exeCtx.Done(): return } } }(exeCtx) return nil } func randomize(p map[string]interface{}, seed int) map[string]interface{} { r := make(map[string]interface{}) for k, v := range p { vi := v.(int) r[k] = vi + rand.Intn(seed) } return r } func (s *randomSource) Close(ctx api.StreamContext) error { if s.cancel != nil { s.cancel() } return nil } var Random randomSource