stream.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. package api
  2. import (
  3. "context"
  4. )
  5. type SourceTuple interface {
  6. Message() map[string]interface{}
  7. Meta() map[string]interface{}
  8. }
  9. type DefaultSourceTuple struct {
  10. message map[string]interface{}
  11. meta map[string]interface{}
  12. }
  13. func NewDefaultSourceTuple(message map[string]interface{}, meta map[string]interface{}) *DefaultSourceTuple {
  14. return &DefaultSourceTuple{
  15. message: message,
  16. meta: meta,
  17. }
  18. }
  19. func (t *DefaultSourceTuple) Message() map[string]interface{} {
  20. return t.message
  21. }
  22. func (t *DefaultSourceTuple) Meta() map[string]interface{} {
  23. return t.meta
  24. }
  25. type Logger interface {
  26. Debug(args ...interface{})
  27. Info(args ...interface{})
  28. Warn(args ...interface{})
  29. Error(args ...interface{})
  30. Debugln(args ...interface{})
  31. Infoln(args ...interface{})
  32. Warnln(args ...interface{})
  33. Errorln(args ...interface{})
  34. Debugf(format string, args ...interface{})
  35. Infof(format string, args ...interface{})
  36. Warnf(format string, args ...interface{})
  37. Errorf(format string, args ...interface{})
  38. }
  39. type Closable interface {
  40. Close(ctx StreamContext) error
  41. }
  42. type Source interface {
  43. //Should be sync function for normal case. The container will run it in go func
  44. Open(ctx StreamContext, consumer chan<- SourceTuple, errCh chan<- error)
  45. //Called during initialization. Configure the source with the data source(e.g. topic for mqtt) and the properties
  46. //read from the yaml
  47. Configure(datasource string, props map[string]interface{}) error
  48. Closable
  49. }
  50. type Sink interface {
  51. //Should be sync function for normal case. The container will run it in go func
  52. Open(ctx StreamContext) error
  53. //Called during initialization. Configure the sink with the properties from rule action definition
  54. Configure(props map[string]interface{}) error
  55. //Called when each row of data has transferred to this sink
  56. Collect(ctx StreamContext, data interface{}) error
  57. Closable
  58. }
  59. type Emitter interface {
  60. AddOutput(chan<- interface{}, string) error
  61. }
  62. type Collector interface {
  63. GetInput() (chan<- interface{}, string)
  64. }
  65. type TopNode interface {
  66. GetName() string
  67. }
  68. type Rule struct {
  69. Id string `json:"id"`
  70. Sql string `json:"sql"`
  71. Actions []map[string]interface{} `json:"actions"`
  72. Options map[string]interface{} `json:"options"`
  73. }
  74. type StreamContext interface {
  75. context.Context
  76. GetLogger() Logger
  77. GetRuleId() string
  78. GetOpId() string
  79. GetInstanceId() int
  80. WithMeta(ruleId string, opId string) StreamContext
  81. WithInstance(instanceId int) StreamContext
  82. WithCancel() (StreamContext, context.CancelFunc)
  83. SetError(e error)
  84. //State handling
  85. IncrCounter(key string, amount int) error
  86. GetCounter(key string) (int, error)
  87. PutState(key string, value interface{}) error
  88. GetState(key string) (interface{}, error)
  89. DeleteState(key string) error
  90. }
  91. type Operator interface {
  92. Emitter
  93. Collector
  94. Exec(StreamContext, chan<- error)
  95. GetName() string
  96. GetMetrics() [][]interface{}
  97. }
  98. type FunctionContext interface {
  99. StreamContext
  100. GetFuncId() int
  101. }
  102. type Function interface {
  103. //The argument is a list of xsql.Expr
  104. Validate(args []interface{}) error
  105. //Execute the function, return the result and if execution is successful.
  106. //If execution fails, return the error and false.
  107. Exec(args []interface{}, ctx FunctionContext) (interface{}, bool)
  108. //If this function is an aggregate function. Each parameter of an aggregate function will be a slice
  109. IsAggregate() bool
  110. }