stream.go 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. package api
  2. import (
  3. "context"
  4. "engine/xsql"
  5. "github.com/sirupsen/logrus"
  6. )
  7. type ConsumeFunc func(xsql.Message, xsql.Metadata)
  8. type Closable interface {
  9. Close(StreamContext) error
  10. }
  11. type Source interface {
  12. //Should be sync function for normal case. The container will run it in go func
  13. Open(StreamContext, ConsumeFunc) error
  14. Configure(string, map[string]interface{}) error
  15. Closable
  16. }
  17. type Sink interface {
  18. //Should be sync function for normal case. The container will run it in go func
  19. Open(StreamContext) error
  20. Configure(map[string]interface{}) error
  21. Collect(StreamContext, interface{}) error
  22. Closable
  23. }
  24. type Emitter interface {
  25. AddOutput(chan<- interface{}, string) error
  26. }
  27. type Collector interface {
  28. GetInput() (chan<- interface{}, string)
  29. }
  30. type TopNode interface {
  31. GetName() string
  32. }
  33. type Rule struct {
  34. Id string `json:"id"`
  35. Sql string `json:"sql"`
  36. Actions []map[string]interface{} `json:"actions"`
  37. Options map[string]interface{} `json:"options"`
  38. }
  39. type StreamContext interface {
  40. context.Context
  41. GetLogger() *logrus.Entry
  42. GetRuleId() string
  43. GetOpId() string
  44. WithMeta(ruleId string, opId string) StreamContext
  45. WithCancel() (StreamContext, context.CancelFunc)
  46. }
  47. type Operator interface {
  48. Emitter
  49. Collector
  50. Exec(StreamContext, chan<- error)
  51. GetName() string
  52. }