random.go 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. package main
  2. import (
  3. "context"
  4. "github.com/emqx/kuiper/xstream/api"
  5. "math/rand"
  6. "time"
  7. )
  8. //Emit data randomly with only a string field
  9. type randomSource struct {
  10. interval int
  11. seed int
  12. pattern map[string]interface{}
  13. cancel context.CancelFunc
  14. }
  15. func (s *randomSource) Configure(topic string, props map[string]interface{}) error {
  16. if i, ok := props["interval"].(float64); ok {
  17. s.interval = int(i)
  18. } else {
  19. s.interval = 1000
  20. }
  21. if p, ok := props["pattern"].(map[string]interface{}); ok {
  22. s.pattern = p
  23. } else {
  24. s.pattern = make(map[string]interface{})
  25. s.pattern["count"] = 50
  26. }
  27. if i, ok := props["seed"].(float64); ok {
  28. s.seed = int(i)
  29. } else {
  30. s.seed = 1
  31. }
  32. return nil
  33. }
  34. func (s *randomSource) Open(ctx api.StreamContext, consume api.ConsumeFunc) (err error) {
  35. t := time.NewTicker(time.Duration(s.interval) * time.Millisecond)
  36. exeCtx, cancel := ctx.WithCancel()
  37. s.cancel = cancel
  38. go func(exeCtx api.StreamContext) {
  39. defer t.Stop()
  40. for {
  41. select {
  42. case <-t.C:
  43. consume(randomize(s.pattern, s.seed), nil)
  44. case <-exeCtx.Done():
  45. return
  46. }
  47. }
  48. }(exeCtx)
  49. return nil
  50. }
  51. func randomize(p map[string]interface{}, seed int) map[string]interface{} {
  52. r := make(map[string]interface{})
  53. for k, v := range p {
  54. vi := v.(int)
  55. r[k] = vi + rand.Intn(seed)
  56. }
  57. return r
  58. }
  59. func (s *randomSource) Close(ctx api.StreamContext) error {
  60. if s.cancel != nil {
  61. s.cancel()
  62. }
  63. return nil
  64. }
  65. var Random randomSource