random.go 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. package main
  2. import (
  3. "context"
  4. "github.com/emqx/kuiper/xstream/api"
  5. "math/rand"
  6. "time"
  7. )
  8. //Emit data randomly with only a string field
  9. type randomSource struct {
  10. interval int
  11. seed int
  12. pattern map[string]interface{}
  13. cancel context.CancelFunc
  14. }
  15. func (s *randomSource) Configure(topic string, props map[string]interface{}) error {
  16. if i, ok := props["interval"].(float64); ok {
  17. s.interval = int(i)
  18. } else {
  19. s.interval = 1000
  20. }
  21. if p, ok := props["pattern"].(map[string]interface{}); ok {
  22. s.pattern = p
  23. } else {
  24. s.pattern = make(map[string]interface{})
  25. s.pattern["count"] = 50
  26. }
  27. if i, ok := props["seed"].(float64); ok {
  28. s.seed = int(i)
  29. } else {
  30. s.seed = 1
  31. }
  32. return nil
  33. }
  34. func (s *randomSource) Open(ctx api.StreamContext, consume api.ConsumeFunc, onError api.ErrorFunc) {
  35. t := time.NewTicker(time.Duration(s.interval) * time.Millisecond)
  36. exeCtx, cancel := ctx.WithCancel()
  37. s.cancel = cancel
  38. go func(exeCtx api.StreamContext) {
  39. defer t.Stop()
  40. for {
  41. select {
  42. case <-t.C:
  43. consume(randomize(s.pattern, s.seed), nil)
  44. case <-exeCtx.Done():
  45. return
  46. }
  47. }
  48. }(exeCtx)
  49. }
  50. func randomize(p map[string]interface{}, seed int) map[string]interface{} {
  51. r := make(map[string]interface{})
  52. for k, v := range p {
  53. vi := v.(int)
  54. r[k] = vi + rand.Intn(seed)
  55. }
  56. return r
  57. }
  58. func (s *randomSource) Close(ctx api.StreamContext) error {
  59. if s.cancel != nil {
  60. s.cancel()
  61. }
  62. return nil
  63. }
  64. var Random randomSource