stream.go 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. package api
  2. import (
  3. "context"
  4. )
  5. //The function to call when data is emitted by the source.
  6. type ConsumeFunc func(message map[string]interface{}, metadata map[string]interface{})
  7. type ErrorFunc func(err error)
  8. type Logger interface {
  9. Debug(args ...interface{})
  10. Info(args ...interface{})
  11. Warn(args ...interface{})
  12. Error(args ...interface{})
  13. Debugln(args ...interface{})
  14. Infoln(args ...interface{})
  15. Warnln(args ...interface{})
  16. Errorln(args ...interface{})
  17. Debugf(format string, args ...interface{})
  18. Infof(format string, args ...interface{})
  19. Warnf(format string, args ...interface{})
  20. Errorf(format string, args ...interface{})
  21. }
  22. type Closable interface {
  23. Close(ctx StreamContext) error
  24. }
  25. type Source interface {
  26. //Should be sync function for normal case. The container will run it in go func
  27. Open(ctx StreamContext, consume ConsumeFunc, onError ErrorFunc)
  28. //Called during initialization. Configure the source with the data source(e.g. topic for mqtt) and the properties
  29. //read from the yaml
  30. Configure(datasource string, props map[string]interface{}) error
  31. Closable
  32. }
  33. type Sink interface {
  34. //Should be sync function for normal case. The container will run it in go func
  35. Open(ctx StreamContext) error
  36. //Called during initialization. Configure the sink with the properties from rule action definition
  37. Configure(props map[string]interface{}) error
  38. //Called when each row of data has transferred to this sink
  39. Collect(ctx StreamContext, data interface{}) error
  40. Closable
  41. }
  42. type Emitter interface {
  43. AddOutput(chan<- interface{}, string) error
  44. }
  45. type Collector interface {
  46. GetInput() (chan<- interface{}, string)
  47. }
  48. type TopNode interface {
  49. GetName() string
  50. }
  51. type Rule struct {
  52. Id string `json:"id"`
  53. Sql string `json:"sql"`
  54. Actions []map[string]interface{} `json:"actions"`
  55. Options map[string]interface{} `json:"options"`
  56. }
  57. type StreamContext interface {
  58. context.Context
  59. GetLogger() Logger
  60. GetRuleId() string
  61. GetOpId() string
  62. GetInstanceId() int
  63. WithMeta(ruleId string, opId string) StreamContext
  64. WithInstance(instanceId int) StreamContext
  65. WithCancel() (StreamContext, context.CancelFunc)
  66. SetError(e error)
  67. }
  68. type Operator interface {
  69. Emitter
  70. Collector
  71. Exec(StreamContext, chan<- error)
  72. GetName() string
  73. GetMetrics() [][]interface{}
  74. }
  75. type Function interface {
  76. //The argument is a list of xsql.Expr
  77. Validate(args []interface{}) error
  78. //Execute the function, return the result and if execution is successful.
  79. //If execution fails, return the error and false.
  80. Exec(args []interface{}) (interface{}, bool)
  81. //If this function is an aggregate function. Each parameter of an aggregate function will be a slice
  82. IsAggregate() bool
  83. }