streams.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. package xstream
  2. import (
  3. "context"
  4. "engine/common"
  5. "engine/xstream/operators"
  6. )
  7. var log = common.Log
  8. type TopologyNew struct {
  9. sources []Source
  10. sinks []Sink
  11. ctx context.Context
  12. drain chan error
  13. ops []Operator
  14. }
  15. func New() (*TopologyNew) {
  16. tp := &TopologyNew{}
  17. return tp
  18. }
  19. func (tp *TopologyNew) AddSrc(src Source) (*TopologyNew) {
  20. tp.sources = append(tp.sources, src)
  21. return tp
  22. }
  23. func (tp *TopologyNew) AddSink(inputs []Emitter, snk Sink) (*TopologyNew) {
  24. for _, input := range inputs{
  25. input.AddOutput(snk.GetInput())
  26. }
  27. tp.sinks = append(tp.sinks, snk)
  28. return tp
  29. }
  30. func (tp *TopologyNew) AddOperator(inputs []Emitter, operator Operator) (*TopologyNew) {
  31. for _, input := range inputs{
  32. input.AddOutput(operator.GetInput())
  33. }
  34. tp.ops = append(tp.ops, operator)
  35. return tp
  36. }
  37. func Transform(op operators.UnOperation, name string) *operators.UnaryOperator {
  38. operator := operators.New(name)
  39. operator.SetOperation(op)
  40. return operator
  41. }
  42. func (tp *TopologyNew) Map(f interface{}) (*TopologyNew){
  43. op, err := MapFunc(f)
  44. if err != nil {
  45. log.Println(err)
  46. }
  47. return tp.Transform(op)
  48. }
  49. // Filter takes a predicate user-defined func that filters the stream.
  50. // The specified function must be of type:
  51. // func (T) bool
  52. // If the func returns true, current item continues downstream.
  53. func (s *TopologyNew) Filter(f interface{}) *TopologyNew {
  54. op, err := FilterFunc(f)
  55. if err != nil {
  56. s.drainErr(err)
  57. }
  58. return s.Transform(op)
  59. }
  60. // Transform is the base method used to apply transfomrmative
  61. // unary operations to streamed elements (i.e. filter, map, etc)
  62. // It is exposed here for completeness, use the other more specific methods.
  63. func (s *TopologyNew) Transform(op operators.UnOperation) *TopologyNew {
  64. operator := operators.New("default")
  65. operator.SetOperation(op)
  66. s.ops = append(s.ops, operator)
  67. return s
  68. }
  69. // prepareContext setups internal context before
  70. // stream starts execution.
  71. func (s *TopologyNew) prepareContext() {
  72. if s.ctx == nil {
  73. s.ctx = context.TODO()
  74. }
  75. }
  76. func (s *TopologyNew) drainErr(err error) {
  77. go func() { s.drain <- err }()
  78. }
  79. func (s *TopologyNew) Open() <-chan error {
  80. s.prepareContext() // ensure context is set
  81. log.Println("Opening stream")
  82. // open stream
  83. go func() {
  84. // open source, if err bail
  85. for _, src := range s.sources{
  86. if err := src.Open(s.ctx); err != nil {
  87. s.drainErr(err)
  88. return
  89. }
  90. }
  91. //apply operators, if err bail
  92. for _, op := range s.ops {
  93. if err := op.Exec(s.ctx); err != nil {
  94. s.drainErr(err)
  95. return
  96. }
  97. }
  98. // open stream sink, after log sink is ready.
  99. for _, snk := range s.sinks{
  100. select {
  101. case err := <-snk.Open(s.ctx):
  102. log.Println("Closing stream")
  103. s.drain <- err
  104. }
  105. }
  106. }()
  107. return s.drain
  108. }