123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172 |
- package main
- import (
- "context"
- "github.com/emqx/kuiper/xstream/api"
- "math/rand"
- "time"
- )
- //Emit data randomly with only a string field
- type randomSource struct {
- interval int
- seed int
- pattern map[string]interface{}
- cancel context.CancelFunc
- }
- func (s *randomSource) Configure(topic string, props map[string]interface{}) error{
- if i, ok := props["interval"].(float64); ok{
- s.interval = int(i)
- }else{
- s.interval = 1000
- }
- if p, ok := props["pattern"].(map[string]interface{}); ok{
- s.pattern = p
- }else{
- s.pattern = make(map[string]interface{})
- s.pattern["count"] = 50
- }
- if i, ok := props["seed"].(float64); ok{
- s.seed = int(i)
- }else{
- s.seed = 1
- }
- return nil
- }
- func (s *randomSource) Open(ctx api.StreamContext, consume api.ConsumeFunc) (err error) {
- t := time.NewTicker(time.Duration(s.interval) * time.Millisecond)
- exeCtx, cancel := ctx.WithCancel()
- s.cancel = cancel
- go func(exeCtx api.StreamContext){
- defer t.Stop()
- for{
- select{
- case <- t.C:
- consume(randomize(s.pattern, s.seed), nil)
- case <- exeCtx.Done():
- return
- }
- }
- }(exeCtx)
- return nil
- }
- func randomize(p map[string]interface{}, seed int) map[string]interface{}{
- r := make(map[string]interface{})
- for k, v := range p{
- vi := v.(int)
- r[k] = vi + rand.Intn(seed)
- }
- return r
- }
- func (s *randomSource) Close(ctx api.StreamContext) error{
- if s.cancel != nil{
- s.cancel()
- }
- return nil
- }
- var Random randomSource
|