stream.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. package api
  2. import (
  3. "context"
  4. "sync"
  5. )
  6. type SourceTuple interface {
  7. Message() map[string]interface{}
  8. Meta() map[string]interface{}
  9. }
  10. type DefaultSourceTuple struct {
  11. message map[string]interface{}
  12. meta map[string]interface{}
  13. }
  14. func NewDefaultSourceTuple(message map[string]interface{}, meta map[string]interface{}) *DefaultSourceTuple {
  15. return &DefaultSourceTuple{
  16. message: message,
  17. meta: meta,
  18. }
  19. }
  20. func (t *DefaultSourceTuple) Message() map[string]interface{} {
  21. return t.message
  22. }
  23. func (t *DefaultSourceTuple) Meta() map[string]interface{} {
  24. return t.meta
  25. }
  26. type Logger interface {
  27. Debug(args ...interface{})
  28. Info(args ...interface{})
  29. Warn(args ...interface{})
  30. Error(args ...interface{})
  31. Debugln(args ...interface{})
  32. Infoln(args ...interface{})
  33. Warnln(args ...interface{})
  34. Errorln(args ...interface{})
  35. Debugf(format string, args ...interface{})
  36. Infof(format string, args ...interface{})
  37. Warnf(format string, args ...interface{})
  38. Errorf(format string, args ...interface{})
  39. }
  40. type Store interface {
  41. SaveState(checkpointId int64, opId string, state map[string]interface{}) error
  42. SaveCheckpoint(checkpointId int64) error //Save the whole checkpoint state into storage like badger
  43. GetOpState(opId string) (*sync.Map, error)
  44. }
  45. type Closable interface {
  46. Close(ctx StreamContext) error
  47. }
  48. type Source interface {
  49. //Should be sync function for normal case. The container will run it in go func
  50. Open(ctx StreamContext, consumer chan<- SourceTuple, errCh chan<- error)
  51. //Called during initialization. Configure the source with the data source(e.g. topic for mqtt) and the properties
  52. //read from the yaml
  53. Configure(datasource string, props map[string]interface{}) error
  54. Closable
  55. }
  56. type Sink interface {
  57. //Should be sync function for normal case. The container will run it in go func
  58. Open(ctx StreamContext) error
  59. //Called during initialization. Configure the sink with the properties from rule action definition
  60. Configure(props map[string]interface{}) error
  61. //Called when each row of data has transferred to this sink
  62. Collect(ctx StreamContext, data interface{}) error
  63. Closable
  64. }
  65. type Emitter interface {
  66. AddOutput(chan<- interface{}, string) error
  67. }
  68. type Collector interface {
  69. GetInput() (chan<- interface{}, string)
  70. }
  71. type TopNode interface {
  72. GetName() string
  73. }
  74. type Rewindable interface {
  75. GetOffset() (interface{}, error)
  76. Rewind(offset interface{}) error
  77. }
  78. type RuleOption struct {
  79. IsEventTime bool `json:"isEventTime"`
  80. LateTol int64 `json:"lateTolerance"`
  81. Concurrency int `json:"concurrency"`
  82. BufferLength int `json:"bufferLength"`
  83. SendMetaToSink bool `json:"sendMetaToSink"`
  84. Qos Qos `json:"qos"`
  85. CheckpointInterval int `json:"checkpointInterval"`
  86. }
  87. type Rule struct {
  88. Triggered bool `json:"triggered"`
  89. Id string `json:"id"`
  90. Sql string `json:"sql"`
  91. Actions []map[string]interface{} `json:"actions"`
  92. Options *RuleOption `json:"options"`
  93. }
  94. type StreamContext interface {
  95. context.Context
  96. GetLogger() Logger
  97. GetRuleId() string
  98. GetOpId() string
  99. GetInstanceId() int
  100. WithMeta(ruleId string, opId string, store Store) StreamContext
  101. WithInstance(instanceId int) StreamContext
  102. WithCancel() (StreamContext, context.CancelFunc)
  103. SetError(e error)
  104. //State handling
  105. IncrCounter(key string, amount int) error
  106. GetCounter(key string) (int, error)
  107. PutState(key string, value interface{}) error
  108. GetState(key string) (interface{}, error)
  109. DeleteState(key string) error
  110. Snapshot() error
  111. SaveState(checkpointId int64) error
  112. }
  113. type Operator interface {
  114. Emitter
  115. Collector
  116. Exec(StreamContext, chan<- error)
  117. GetName() string
  118. GetMetrics() [][]interface{}
  119. }
  120. type FunctionContext interface {
  121. StreamContext
  122. GetFuncId() int
  123. }
  124. type Function interface {
  125. //The argument is a list of xsql.Expr
  126. Validate(args []interface{}) error
  127. //Execute the function, return the result and if execution is successful.
  128. //If execution fails, return the error and false.
  129. Exec(args []interface{}, ctx FunctionContext) (interface{}, bool)
  130. //If this function is an aggregate function. Each parameter of an aggregate function will be a slice
  131. IsAggregate() bool
  132. }
  133. const (
  134. AtMostOnce Qos = iota
  135. AtLeastOnce
  136. ExactlyOnce
  137. )
  138. type Qos int