stream.go 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. package api
  2. import (
  3. "context"
  4. )
  5. //The function to call when data is emitted by the source.
  6. type ConsumeFunc func(message map[string]interface{}, metadata map[string]interface{})
  7. type Logger interface{
  8. Debug(args ...interface{})
  9. Info(args ...interface{})
  10. Warn(args ...interface{})
  11. Error(args ...interface{})
  12. Debugln(args ...interface{})
  13. Infoln(args ...interface{})
  14. Warnln(args ...interface{})
  15. Errorln(args ...interface{})
  16. Debugf(format string, args ...interface{})
  17. Infof(format string, args ...interface{})
  18. Warnf(format string, args ...interface{})
  19. Errorf(format string, args ...interface{})
  20. }
  21. type Closable interface {
  22. Close(ctx StreamContext) error
  23. }
  24. type Source interface {
  25. //Should be sync function for normal case. The container will run it in go func
  26. Open(ctx StreamContext, consume ConsumeFunc) error
  27. //Called during initialization. Configure the source with the data source(e.g. topic for mqtt) and the properties
  28. //read from the yaml
  29. Configure(datasource string, props map[string]interface{}) error
  30. Closable
  31. }
  32. type Sink interface {
  33. //Should be sync function for normal case. The container will run it in go func
  34. Open(ctx StreamContext) error
  35. //Called during initialization. Configure the sink with the properties from rule action definition
  36. Configure(props map[string]interface{}) error
  37. //Called when each row of data has transferred to this sink
  38. Collect(ctx StreamContext, data interface{}) error
  39. Closable
  40. }
  41. type Emitter interface {
  42. AddOutput(chan<- interface{}, string) error
  43. }
  44. type Collector interface {
  45. GetInput() (chan<- interface{}, string)
  46. }
  47. type TopNode interface {
  48. GetName() string
  49. }
  50. type Rule struct {
  51. Id string `json:"id"`
  52. Sql string `json:"sql"`
  53. Actions []map[string]interface{} `json:"actions"`
  54. Options map[string]interface{} `json:"options"`
  55. }
  56. type StreamContext interface {
  57. context.Context
  58. GetLogger() Logger
  59. GetRuleId() string
  60. GetOpId() string
  61. WithMeta(ruleId string, opId string) StreamContext
  62. WithCancel() (StreamContext, context.CancelFunc)
  63. }
  64. type Operator interface {
  65. Emitter
  66. Collector
  67. Exec(StreamContext, chan<- error)
  68. GetName() string
  69. }
  70. type Function interface {
  71. //The argument is a list of xsql.Expr
  72. Validate(args []interface{}) error
  73. //Execute the function, return the result and if execution is successful.
  74. //If execution fails, return the error and false.
  75. Exec(args []interface{}) (interface{}, bool)
  76. //If this function is an aggregate function. Each parameter of an aggregate function will be a slice
  77. IsAggregate() bool
  78. }