random.go 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. package main
  2. import (
  3. "context"
  4. "engine/xstream/api"
  5. "time"
  6. )
  7. //Emit data randomly with only a string field
  8. type randomSource struct {
  9. interval int
  10. pattern map[string]interface{}
  11. cancel context.CancelFunc
  12. }
  13. func (s *randomSource) Configure(topic string, props map[string]interface{}) error{
  14. if i, ok := props["interval"].(int); ok{
  15. s.interval = i
  16. }else{
  17. s.interval = 1000
  18. }
  19. if p, ok := props["pattern"].(map[string]interface{}); ok{
  20. s.pattern = p
  21. }else{
  22. s.pattern = make(map[string]interface{})
  23. s.pattern["count"] = 50
  24. }
  25. return nil
  26. }
  27. func (s *randomSource) Open(ctx api.StreamContext, consume api.ConsumeFunc) (err error) {
  28. t := time.NewTicker(time.Duration(s.interval) * time.Millisecond)
  29. exeCtx, cancel := ctx.WithCancel()
  30. s.cancel = cancel
  31. go func(exeCtx api.StreamContext){
  32. defer t.Stop()
  33. for{
  34. select{
  35. case <- t.C:
  36. consume(randomize(s.pattern), nil)
  37. case <- exeCtx.Done():
  38. return
  39. }
  40. }
  41. }(exeCtx)
  42. return nil
  43. }
  44. func randomize(p map[string]interface{}) map[string]interface{}{
  45. r := make(map[string]interface{})
  46. for k, v := range p{
  47. vi := v.(int)
  48. r[k] = vi + 1
  49. }
  50. return r
  51. }
  52. func (s *randomSource) Close(ctx api.StreamContext) error{
  53. if s.cancel != nil{
  54. s.cancel()
  55. }
  56. return nil
  57. }
  58. var Random randomSource