stream.go 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. package api
  2. import (
  3. "context"
  4. )
  5. //The function to call when data is emitted by the source.
  6. type ConsumeFunc func(message map[string]interface{}, metadata map[string]interface{})
  7. type Logger interface {
  8. Debug(args ...interface{})
  9. Info(args ...interface{})
  10. Warn(args ...interface{})
  11. Error(args ...interface{})
  12. Debugln(args ...interface{})
  13. Infoln(args ...interface{})
  14. Warnln(args ...interface{})
  15. Errorln(args ...interface{})
  16. Debugf(format string, args ...interface{})
  17. Infof(format string, args ...interface{})
  18. Warnf(format string, args ...interface{})
  19. Errorf(format string, args ...interface{})
  20. }
  21. type Closable interface {
  22. Close(ctx StreamContext) error
  23. }
  24. type Source interface {
  25. //Should be sync function for normal case. The container will run it in go func
  26. Open(ctx StreamContext, consume ConsumeFunc) error
  27. //Called during initialization. Configure the source with the data source(e.g. topic for mqtt) and the properties
  28. //read from the yaml
  29. Configure(datasource string, props map[string]interface{}) error
  30. Closable
  31. }
  32. type Sink interface {
  33. //Should be sync function for normal case. The container will run it in go func
  34. Open(ctx StreamContext) error
  35. //Called during initialization. Configure the sink with the properties from rule action definition
  36. Configure(props map[string]interface{}) error
  37. //Called when each row of data has transferred to this sink
  38. Collect(ctx StreamContext, data interface{}) error
  39. Closable
  40. }
  41. type Emitter interface {
  42. AddOutput(chan<- interface{}, string) error
  43. }
  44. type Collector interface {
  45. GetInput() (chan<- interface{}, string)
  46. }
  47. type TopNode interface {
  48. GetName() string
  49. }
  50. type Rule struct {
  51. Id string `json:"id"`
  52. Sql string `json:"sql"`
  53. Actions []map[string]interface{} `json:"actions"`
  54. Options map[string]interface{} `json:"options"`
  55. }
  56. type StreamContext interface {
  57. context.Context
  58. GetLogger() Logger
  59. GetRuleId() string
  60. GetOpId() string
  61. GetInstanceId() int
  62. WithMeta(ruleId string, opId string) StreamContext
  63. WithInstance(instanceId int) StreamContext
  64. WithCancel() (StreamContext, context.CancelFunc)
  65. GetError() error
  66. SetError(e error)
  67. }
  68. type Operator interface {
  69. Emitter
  70. Collector
  71. Exec(StreamContext, chan<- error)
  72. GetName() string
  73. GetMetrics() map[string]interface{}
  74. }
  75. type Function interface {
  76. //The argument is a list of xsql.Expr
  77. Validate(args []interface{}) error
  78. //Execute the function, return the result and if execution is successful.
  79. //If execution fails, return the error and false.
  80. Exec(args []interface{}) (interface{}, bool)
  81. //If this function is an aggregate function. Each parameter of an aggregate function will be a slice
  82. IsAggregate() bool
  83. }