stream.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. package api
  2. import (
  3. "context"
  4. )
  5. type SourceTuple interface {
  6. Message() map[string]interface{}
  7. Meta() map[string]interface{}
  8. }
  9. type DefaultSourceTuple struct {
  10. message map[string]interface{}
  11. meta map[string]interface{}
  12. }
  13. func NewDefaultSourceTuple(message map[string]interface{}, meta map[string]interface{}) *DefaultSourceTuple {
  14. return &DefaultSourceTuple{
  15. message: message,
  16. meta: meta,
  17. }
  18. }
  19. func (t *DefaultSourceTuple) Message() map[string]interface{} {
  20. return t.message
  21. }
  22. func (t *DefaultSourceTuple) Meta() map[string]interface{} {
  23. return t.meta
  24. }
  25. type Logger interface {
  26. Debug(args ...interface{})
  27. Info(args ...interface{})
  28. Warn(args ...interface{})
  29. Error(args ...interface{})
  30. Debugln(args ...interface{})
  31. Infoln(args ...interface{})
  32. Warnln(args ...interface{})
  33. Errorln(args ...interface{})
  34. Debugf(format string, args ...interface{})
  35. Infof(format string, args ...interface{})
  36. Warnf(format string, args ...interface{})
  37. Errorf(format string, args ...interface{})
  38. }
  39. type Closable interface {
  40. Close(ctx StreamContext) error
  41. }
  42. type Source interface {
  43. //Should be sync function for normal case. The container will run it in go func
  44. Open(ctx StreamContext, consumer chan<- SourceTuple, errCh chan<- error)
  45. //Called during initialization. Configure the source with the data source(e.g. topic for mqtt) and the properties
  46. //read from the yaml
  47. Configure(datasource string, props map[string]interface{}) error
  48. Closable
  49. }
  50. type Sink interface {
  51. //Should be sync function for normal case. The container will run it in go func
  52. Open(ctx StreamContext) error
  53. //Called during initialization. Configure the sink with the properties from rule action definition
  54. Configure(props map[string]interface{}) error
  55. //Called when each row of data has transferred to this sink
  56. Collect(ctx StreamContext, data interface{}) error
  57. Closable
  58. }
  59. type Emitter interface {
  60. AddOutput(chan<- interface{}, string) error
  61. }
  62. type Collector interface {
  63. GetInput() (chan<- interface{}, string)
  64. }
  65. type TopNode interface {
  66. GetName() string
  67. }
  68. type Rule struct {
  69. Triggered bool `json:"triggered"`
  70. Id string `json:"id"`
  71. Sql string `json:"sql"`
  72. Actions []map[string]interface{} `json:"actions"`
  73. Options map[string]interface{} `json:"options"`
  74. }
  75. type StreamContext interface {
  76. context.Context
  77. GetLogger() Logger
  78. GetRuleId() string
  79. GetOpId() string
  80. GetInstanceId() int
  81. WithMeta(ruleId string, opId string) StreamContext
  82. WithInstance(instanceId int) StreamContext
  83. WithCancel() (StreamContext, context.CancelFunc)
  84. SetError(e error)
  85. //State handling
  86. IncrCounter(key string, amount int) error
  87. GetCounter(key string) (int, error)
  88. PutState(key string, value interface{}) error
  89. GetState(key string) (interface{}, error)
  90. DeleteState(key string) error
  91. SaveState(checkpointId int64) error
  92. }
  93. type Operator interface {
  94. Emitter
  95. Collector
  96. Exec(StreamContext, chan<- error)
  97. GetName() string
  98. GetMetrics() [][]interface{}
  99. }
  100. type FunctionContext interface {
  101. StreamContext
  102. GetFuncId() int
  103. }
  104. type Function interface {
  105. //The argument is a list of xsql.Expr
  106. Validate(args []interface{}) error
  107. //Execute the function, return the result and if execution is successful.
  108. //If execution fails, return the error and false.
  109. Exec(args []interface{}, ctx FunctionContext) (interface{}, bool)
  110. //If this function is an aggregate function. Each parameter of an aggregate function will be a slice
  111. IsAggregate() bool
  112. }