stream.go 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. package api
  2. import (
  3. "context"
  4. "github.com/sirupsen/logrus"
  5. )
  6. //The function to call when data is emitted by the source.
  7. type ConsumeFunc func(message map[string]interface{}, metadata map[string]interface{})
  8. type Closable interface {
  9. Close(ctx StreamContext) error
  10. }
  11. type Source interface {
  12. //Should be sync function for normal case. The container will run it in go func
  13. Open(ctx StreamContext, consume ConsumeFunc) error
  14. //Called during initialization. Configure the source with the data source(e.g. topic for mqtt) and the properties
  15. //read from the yaml
  16. Configure(datasource string, props map[string]interface{}) error
  17. Closable
  18. }
  19. type Sink interface {
  20. //Should be sync function for normal case. The container will run it in go func
  21. Open(ctx StreamContext) error
  22. //Called during initialization. Configure the sink with the properties from rule action definition
  23. Configure(props map[string]interface{}) error
  24. //Called when each row of data has transferred to this sink
  25. Collect(ctx StreamContext, data interface{}) error
  26. Closable
  27. }
  28. type Emitter interface {
  29. AddOutput(chan<- interface{}, string) error
  30. }
  31. type Collector interface {
  32. GetInput() (chan<- interface{}, string)
  33. }
  34. type TopNode interface {
  35. GetName() string
  36. }
  37. type Rule struct {
  38. Id string `json:"id"`
  39. Sql string `json:"sql"`
  40. Actions []map[string]interface{} `json:"actions"`
  41. Options map[string]interface{} `json:"options"`
  42. }
  43. type StreamContext interface {
  44. context.Context
  45. GetLogger() *logrus.Entry
  46. GetRuleId() string
  47. GetOpId() string
  48. WithMeta(ruleId string, opId string) StreamContext
  49. WithCancel() (StreamContext, context.CancelFunc)
  50. }
  51. type Operator interface {
  52. Emitter
  53. Collector
  54. Exec(StreamContext, chan<- error)
  55. GetName() string
  56. }
  57. type Function interface {
  58. //The argument is a list of xsql.Expr
  59. Validate(args []interface{}) error
  60. //Execute the function, return the result and if execution is successful.
  61. //If execution fails, return the error and false.
  62. Exec(args []interface{}) (interface{}, bool)
  63. }