stream.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. package api
  2. import (
  3. "context"
  4. )
  5. type SourceTuple interface {
  6. Message() map[string]interface{}
  7. Meta() map[string]interface{}
  8. }
  9. type DefaultSourceTuple struct {
  10. message map[string]interface{}
  11. meta map[string]interface{}
  12. }
  13. func NewDefaultSourceTuple(message map[string]interface{}, meta map[string]interface{}) *DefaultSourceTuple {
  14. return &DefaultSourceTuple{
  15. message: message,
  16. meta: meta,
  17. }
  18. }
  19. func (t *DefaultSourceTuple) Message() map[string]interface{} {
  20. return t.message
  21. }
  22. func (t *DefaultSourceTuple) Meta() map[string]interface{} {
  23. return t.meta
  24. }
  25. type Logger interface {
  26. Debug(args ...interface{})
  27. Info(args ...interface{})
  28. Warn(args ...interface{})
  29. Error(args ...interface{})
  30. Debugln(args ...interface{})
  31. Infoln(args ...interface{})
  32. Warnln(args ...interface{})
  33. Errorln(args ...interface{})
  34. Debugf(format string, args ...interface{})
  35. Infof(format string, args ...interface{})
  36. Warnf(format string, args ...interface{})
  37. Errorf(format string, args ...interface{})
  38. }
  39. type Closable interface {
  40. Close(ctx StreamContext) error
  41. }
  42. type Source interface {
  43. //Should be sync function for normal case. The container will run it in go func
  44. Open(ctx StreamContext, consumer chan<- SourceTuple, errCh chan<- error)
  45. //Called during initialization. Configure the source with the data source(e.g. topic for mqtt) and the properties
  46. //read from the yaml
  47. Configure(datasource string, props map[string]interface{}) error
  48. Closable
  49. }
  50. type Sink interface {
  51. //Should be sync function for normal case. The container will run it in go func
  52. Open(ctx StreamContext) error
  53. //Called during initialization. Configure the sink with the properties from rule action definition
  54. Configure(props map[string]interface{}) error
  55. //Called when each row of data has transferred to this sink
  56. Collect(ctx StreamContext, data interface{}) error
  57. Closable
  58. }
  59. type Emitter interface {
  60. AddOutput(chan<- interface{}, string) error
  61. }
  62. type Collector interface {
  63. GetInput() (chan<- interface{}, string)
  64. }
  65. type TopNode interface {
  66. GetName() string
  67. }
  68. type Rule struct {
  69. Id string `json:"id"`
  70. Sql string `json:"sql"`
  71. Actions []map[string]interface{} `json:"actions"`
  72. Options map[string]interface{} `json:"options"`
  73. }
  74. type StreamContext interface {
  75. context.Context
  76. GetLogger() Logger
  77. GetRuleId() string
  78. GetOpId() string
  79. GetInstanceId() int
  80. WithMeta(ruleId string, opId string) StreamContext
  81. WithInstance(instanceId int) StreamContext
  82. WithCancel() (StreamContext, context.CancelFunc)
  83. SetError(e error)
  84. }
  85. type Operator interface {
  86. Emitter
  87. Collector
  88. Exec(StreamContext, chan<- error)
  89. GetName() string
  90. GetMetrics() [][]interface{}
  91. }
  92. type Function interface {
  93. //The argument is a list of xsql.Expr
  94. Validate(args []interface{}) error
  95. //Execute the function, return the result and if execution is successful.
  96. //If execution fails, return the error and false.
  97. Exec(args []interface{}) (interface{}, bool)
  98. //If this function is an aggregate function. Each parameter of an aggregate function will be a slice
  99. IsAggregate() bool
  100. }