stream.go 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. package api
  2. import (
  3. "context"
  4. "engine/xsql"
  5. "github.com/sirupsen/logrus"
  6. )
  7. //The function to call when data is emitted by the source.
  8. type ConsumeFunc func(message xsql.Message, metadata xsql.Metadata)
  9. type Closable interface {
  10. Close(ctx StreamContext) error
  11. }
  12. type Source interface {
  13. //Should be sync function for normal case. The container will run it in go func
  14. Open(ctx StreamContext, consume ConsumeFunc) error
  15. //Called during initialization. Configure the source with the data source(e.g. topic for mqtt) and the properties
  16. //read from the yaml
  17. Configure(datasource string, props map[string]interface{}) error
  18. Closable
  19. }
  20. type Sink interface {
  21. //Should be sync function for normal case. The container will run it in go func
  22. Open(ctx StreamContext) error
  23. //Called during initialization. Configure the sink with the properties from rule action definition
  24. Configure(props map[string]interface{}) error
  25. //Called when each row of data has transferred to this sink
  26. Collect(ctx StreamContext, data interface{}) error
  27. Closable
  28. }
  29. type Emitter interface {
  30. AddOutput(chan<- interface{}, string) error
  31. }
  32. type Collector interface {
  33. GetInput() (chan<- interface{}, string)
  34. }
  35. type TopNode interface {
  36. GetName() string
  37. }
  38. type Rule struct {
  39. Id string `json:"id"`
  40. Sql string `json:"sql"`
  41. Actions []map[string]interface{} `json:"actions"`
  42. Options map[string]interface{} `json:"options"`
  43. }
  44. type StreamContext interface {
  45. context.Context
  46. GetLogger() *logrus.Entry
  47. GetRuleId() string
  48. GetOpId() string
  49. WithMeta(ruleId string, opId string) StreamContext
  50. WithCancel() (StreamContext, context.CancelFunc)
  51. }
  52. type Operator interface {
  53. Emitter
  54. Collector
  55. Exec(StreamContext, chan<- error)
  56. GetName() string
  57. }