stream.go 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. package api
  2. import (
  3. "context"
  4. "github.com/sirupsen/logrus"
  5. )
  6. type ConsumeFunc func(data interface{})
  7. type Closable interface {
  8. Close(StreamContext) error
  9. }
  10. type Source interface {
  11. //Should be sync function for normal case. The container will run it in go func
  12. Open(StreamContext, ConsumeFunc) error
  13. Closable
  14. }
  15. type Sink interface {
  16. //Should be sync function for normal case. The container will run it in go func
  17. Open(StreamContext) error
  18. Collect(StreamContext, interface{}) error
  19. Closable
  20. }
  21. type Emitter interface {
  22. AddOutput(chan<- interface{}, string) error
  23. }
  24. type Collector interface {
  25. GetInput() (chan<- interface{}, string)
  26. }
  27. type TopNode interface {
  28. GetName() string
  29. }
  30. type Rule struct {
  31. Id string `json:"id"`
  32. Sql string `json:"sql"`
  33. Actions []map[string]interface{} `json:"actions"`
  34. Options map[string]interface{} `json:"options"`
  35. }
  36. type StreamContext interface {
  37. context.Context
  38. GetLogger() *logrus.Entry
  39. GetRuleId() string
  40. GetOpId() string
  41. WithMeta(ruleId string, opId string) StreamContext
  42. WithCancel() (StreamContext, context.CancelFunc)
  43. }
  44. type Operator interface {
  45. Emitter
  46. Collector
  47. Exec(StreamContext, chan<- error)
  48. GetName() string
  49. }