stream.go 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package api
  2. import (
  3. "context"
  4. )
  5. //The function to call when data is emitted by the source.
  6. type ConsumeFunc func(message map[string]interface{}, metadata map[string]interface{})
  7. type Logger interface {
  8. Debug(args ...interface{})
  9. Info(args ...interface{})
  10. Warn(args ...interface{})
  11. Error(args ...interface{})
  12. Debugln(args ...interface{})
  13. Infoln(args ...interface{})
  14. Warnln(args ...interface{})
  15. Errorln(args ...interface{})
  16. Debugf(format string, args ...interface{})
  17. Infof(format string, args ...interface{})
  18. Warnf(format string, args ...interface{})
  19. Errorf(format string, args ...interface{})
  20. }
  21. type Closable interface {
  22. Close(ctx StreamContext) error
  23. }
  24. type Source interface {
  25. //Should be sync function for normal case. The container will run it in go func
  26. Open(ctx StreamContext, consume ConsumeFunc) error
  27. //Called during initialization. Configure the source with the data source(e.g. topic for mqtt) and the properties
  28. //read from the yaml
  29. Configure(datasource string, props map[string]interface{}) error
  30. Closable
  31. }
  32. type Sink interface {
  33. //Should be sync function for normal case. The container will run it in go func
  34. Open(ctx StreamContext) error
  35. //Called during initialization. Configure the sink with the properties from rule action definition
  36. Configure(props map[string]interface{}) error
  37. //Called when each row of data has transferred to this sink
  38. Collect(ctx StreamContext, data interface{}) error
  39. Closable
  40. }
  41. type Emitter interface {
  42. AddOutput(chan<- interface{}, string) error
  43. }
  44. type Collector interface {
  45. GetInput() (chan<- interface{}, string)
  46. }
  47. type TopNode interface {
  48. GetName() string
  49. }
  50. type Rule struct {
  51. Id string `json:"id"`
  52. Sql string `json:"sql"`
  53. Actions []map[string]interface{} `json:"actions"`
  54. Options map[string]interface{} `json:"options"`
  55. }
  56. type StreamContext interface {
  57. context.Context
  58. GetLogger() Logger
  59. GetRuleId() string
  60. GetOpId() string
  61. GetInstanceId() int
  62. WithMeta(ruleId string, opId string) StreamContext
  63. WithInstance(instanceId int) StreamContext
  64. WithCancel() (StreamContext, context.CancelFunc)
  65. SetError(e error)
  66. }
  67. type Operator interface {
  68. Emitter
  69. Collector
  70. Exec(StreamContext, chan<- error)
  71. GetName() string
  72. GetMetrics() map[string]interface{}
  73. }
  74. type Function interface {
  75. //The argument is a list of xsql.Expr
  76. Validate(args []interface{}) error
  77. //Execute the function, return the result and if execution is successful.
  78. //If execution fails, return the error and false.
  79. Exec(args []interface{}) (interface{}, bool)
  80. //If this function is an aggregate function. Each parameter of an aggregate function will be a slice
  81. IsAggregate() bool
  82. }