stream.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. package api
  2. import (
  3. "context"
  4. )
  5. //The function to call when data is emitted by the source.
  6. type ConsumeFunc func(message map[string]interface{}, metadata map[string]interface{})
  7. type Logger interface {
  8. Debug(args ...interface{})
  9. Info(args ...interface{})
  10. Warn(args ...interface{})
  11. Error(args ...interface{})
  12. Debugln(args ...interface{})
  13. Infoln(args ...interface{})
  14. Warnln(args ...interface{})
  15. Errorln(args ...interface{})
  16. Debugf(format string, args ...interface{})
  17. Infof(format string, args ...interface{})
  18. Warnf(format string, args ...interface{})
  19. Errorf(format string, args ...interface{})
  20. }
  21. type Closable interface {
  22. Close(ctx StreamContext) error
  23. }
  24. type Source interface {
  25. //Should be sync function for normal case. The container will run it in go func
  26. Open(ctx StreamContext, consume ConsumeFunc) error
  27. //Called during initialization. Configure the source with the data source(e.g. topic for mqtt) and the properties
  28. //read from the yaml
  29. Configure(datasource string, props map[string]interface{}) error
  30. Closable
  31. }
  32. type Sink interface {
  33. //Should be sync function for normal case. The container will run it in go func
  34. Open(ctx StreamContext) error
  35. //Called during initialization. Configure the sink with the properties from rule action definition
  36. Configure(props map[string]interface{}) error
  37. //Called when each row of data has transferred to this sink
  38. Collect(ctx StreamContext, data interface{}) error
  39. Closable
  40. }
  41. type Emitter interface {
  42. AddOutput(chan<- interface{}, string) error
  43. }
  44. type Collector interface {
  45. GetInput() (chan<- interface{}, string)
  46. }
  47. type TopNode interface {
  48. GetName() string
  49. }
  50. type Rule struct {
  51. Id string `json:"id"`
  52. Sql string `json:"sql"`
  53. Actions []map[string]interface{} `json:"actions"`
  54. Options map[string]interface{} `json:"options"`
  55. }
  56. type StreamContext interface {
  57. context.Context
  58. GetLogger() Logger
  59. GetRuleId() string
  60. GetOpId() string
  61. GetInstanceId() int
  62. WithMeta(ruleId string, opId string) StreamContext
  63. WithInstance(instanceId int) StreamContext
  64. WithCancel() (StreamContext, context.CancelFunc)
  65. }
  66. type Operator interface {
  67. Emitter
  68. Collector
  69. Exec(StreamContext, chan<- error)
  70. GetName() string
  71. GetMetrics() map[string]interface{}
  72. }
  73. type Function interface {
  74. //The argument is a list of xsql.Expr
  75. Validate(args []interface{}) error
  76. //Execute the function, return the result and if execution is successful.
  77. //If execution fails, return the error and false.
  78. Exec(args []interface{}) (interface{}, bool)
  79. //If this function is an aggregate function. Each parameter of an aggregate function will be a slice
  80. IsAggregate() bool
  81. }