random.go 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. package main
  2. import (
  3. "context"
  4. "github.com/emqx/kuiper/xstream/api"
  5. "math/rand"
  6. "time"
  7. )
  8. //Emit data randomly with only a string field
  9. type randomSource struct {
  10. interval int
  11. seed int
  12. pattern map[string]interface{}
  13. cancel context.CancelFunc
  14. }
  15. func (s *randomSource) Configure(topic string, props map[string]interface{}) error{
  16. if i, ok := props["interval"].(float64); ok{
  17. s.interval = int(i)
  18. }else{
  19. s.interval = 1000
  20. }
  21. if p, ok := props["pattern"].(map[string]interface{}); ok{
  22. s.pattern = p
  23. }else{
  24. s.pattern = make(map[string]interface{})
  25. s.pattern["count"] = 50
  26. }
  27. if i, ok := props["seed"].(float64); ok{
  28. s.seed = int(i)
  29. }else{
  30. s.seed = 1
  31. }
  32. return nil
  33. }
  34. func (s *randomSource) Open(ctx api.StreamContext, consume api.ConsumeFunc) (err error) {
  35. t := time.NewTicker(time.Duration(s.interval) * time.Millisecond)
  36. exeCtx, cancel := ctx.WithCancel()
  37. s.cancel = cancel
  38. go func(exeCtx api.StreamContext){
  39. defer t.Stop()
  40. for{
  41. select{
  42. case <- t.C:
  43. consume(randomize(s.pattern, s.seed), nil)
  44. case <- exeCtx.Done():
  45. return
  46. }
  47. }
  48. }(exeCtx)
  49. return nil
  50. }
  51. func randomize(p map[string]interface{}, seed int) map[string]interface{}{
  52. r := make(map[string]interface{})
  53. for k, v := range p{
  54. vi := v.(int)
  55. r[k] = vi + rand.Intn(seed)
  56. }
  57. return r
  58. }
  59. func (s *randomSource) Close(ctx api.StreamContext) error{
  60. if s.cancel != nil{
  61. s.cancel()
  62. }
  63. return nil
  64. }
  65. var Random randomSource