random.go 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. package main
  2. import (
  3. "context"
  4. "github.com/emqx/kuiper/xstream/api"
  5. "math/rand"
  6. "time"
  7. )
  8. //Emit data randomly with only a string field
  9. type randomSource struct {
  10. interval int
  11. seed int
  12. pattern map[string]interface{}
  13. cancel context.CancelFunc
  14. }
  15. func (s *randomSource) Configure(topic string, props map[string]interface{}) error {
  16. if i, ok := props["interval"].(int); ok {
  17. s.interval = i
  18. } else {
  19. s.interval = 1000
  20. }
  21. if p, ok := props["pattern"].(map[string]interface{}); ok {
  22. s.pattern = p
  23. } else {
  24. s.pattern = make(map[string]interface{})
  25. s.pattern["count"] = 50
  26. }
  27. if i, ok := props["seed"].(int); ok {
  28. s.seed = i
  29. } else {
  30. s.seed = 1
  31. }
  32. return nil
  33. }
  34. func (s *randomSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
  35. t := time.NewTicker(time.Duration(s.interval) * time.Millisecond)
  36. exeCtx, cancel := ctx.WithCancel()
  37. s.cancel = cancel
  38. defer t.Stop()
  39. for {
  40. select {
  41. case <-t.C:
  42. consumer <- api.NewDefaultSourceTuple(randomize(s.pattern, s.seed), nil)
  43. case <-exeCtx.Done():
  44. return
  45. }
  46. }
  47. }
  48. func randomize(p map[string]interface{}, seed int) map[string]interface{} {
  49. r := make(map[string]interface{})
  50. for k, v := range p {
  51. vi := v.(int)
  52. r[k] = vi + rand.Intn(seed)
  53. }
  54. return r
  55. }
  56. func (s *randomSource) Close(ctx api.StreamContext) error {
  57. if s.cancel != nil {
  58. s.cancel()
  59. }
  60. return nil
  61. }
  62. func Random() api.Source {
  63. return &randomSource{}
  64. }