stream.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. package api
  2. import (
  3. "context"
  4. "sync"
  5. )
  6. type SourceTuple interface {
  7. Message() map[string]interface{}
  8. Meta() map[string]interface{}
  9. }
  10. type DefaultSourceTuple struct {
  11. message map[string]interface{}
  12. meta map[string]interface{}
  13. }
  14. func NewDefaultSourceTuple(message map[string]interface{}, meta map[string]interface{}) *DefaultSourceTuple {
  15. return &DefaultSourceTuple{
  16. message: message,
  17. meta: meta,
  18. }
  19. }
  20. func (t *DefaultSourceTuple) Message() map[string]interface{} {
  21. return t.message
  22. }
  23. func (t *DefaultSourceTuple) Meta() map[string]interface{} {
  24. return t.meta
  25. }
  26. type Logger interface {
  27. Debug(args ...interface{})
  28. Info(args ...interface{})
  29. Warn(args ...interface{})
  30. Error(args ...interface{})
  31. Debugln(args ...interface{})
  32. Infoln(args ...interface{})
  33. Warnln(args ...interface{})
  34. Errorln(args ...interface{})
  35. Debugf(format string, args ...interface{})
  36. Infof(format string, args ...interface{})
  37. Warnf(format string, args ...interface{})
  38. Errorf(format string, args ...interface{})
  39. }
  40. type Store interface {
  41. SaveState(checkpointId int64, opId string, state map[string]interface{}) error
  42. SaveCheckpoint(checkpointId int64) error //Save the whole checkpoint state into storage like badger
  43. GetOpState(opId string) (*sync.Map, error)
  44. }
  45. type Closable interface {
  46. Close(ctx StreamContext) error
  47. }
  48. type Source interface {
  49. //Should be sync function for normal case. The container will run it in go func
  50. Open(ctx StreamContext, consumer chan<- SourceTuple, errCh chan<- error)
  51. //Called during initialization. Configure the source with the data source(e.g. topic for mqtt) and the properties
  52. //read from the yaml
  53. Configure(datasource string, props map[string]interface{}) error
  54. Closable
  55. }
  56. type TableSource interface {
  57. // Load the data at batch
  58. Load(ctx StreamContext) ([]SourceTuple, error)
  59. //Called during initialization. Configure the source with the data source(e.g. topic for mqtt) and the properties
  60. //read from the yaml
  61. Configure(datasource string, props map[string]interface{}) error
  62. }
  63. type Sink interface {
  64. //Should be sync function for normal case. The container will run it in go func
  65. Open(ctx StreamContext) error
  66. //Called during initialization. Configure the sink with the properties from rule action definition
  67. Configure(props map[string]interface{}) error
  68. //Called when each row of data has transferred to this sink
  69. Collect(ctx StreamContext, data interface{}) error
  70. Closable
  71. }
  72. type Emitter interface {
  73. AddOutput(chan<- interface{}, string) error
  74. }
  75. type Collector interface {
  76. GetInput() (chan<- interface{}, string)
  77. }
  78. type TopNode interface {
  79. GetName() string
  80. }
  81. type Rewindable interface {
  82. GetOffset() (interface{}, error)
  83. Rewind(offset interface{}) error
  84. }
  85. type RuleOption struct {
  86. IsEventTime bool `json:"isEventTime" yaml:"isEventTime"`
  87. LateTol int64 `json:"lateTolerance" yaml:"lateTolerance"`
  88. Concurrency int `json:"concurrency" yaml:"concurrency"`
  89. BufferLength int `json:"bufferLength" yaml:"bufferLength"`
  90. SendMetaToSink bool `json:"sendMetaToSink" yaml:"sendMetaToSink"`
  91. SendError bool `json:"sendError" yaml:"sendError"`
  92. Qos Qos `json:"qos" yaml:"qos"`
  93. CheckpointInterval int `json:"checkpointInterval" yaml:"checkpointInterval"`
  94. }
  95. type Rule struct {
  96. Triggered bool `json:"triggered"`
  97. Id string `json:"id"`
  98. Sql string `json:"sql"`
  99. Actions []map[string]interface{} `json:"actions"`
  100. Options *RuleOption `json:"options"`
  101. }
  102. type StreamContext interface {
  103. context.Context
  104. GetLogger() Logger
  105. GetRuleId() string
  106. GetOpId() string
  107. GetInstanceId() int
  108. GetRootPath() string
  109. WithMeta(ruleId string, opId string, store Store) StreamContext
  110. WithInstance(instanceId int) StreamContext
  111. WithCancel() (StreamContext, context.CancelFunc)
  112. SetError(e error)
  113. //State handling
  114. IncrCounter(key string, amount int) error
  115. GetCounter(key string) (int, error)
  116. PutState(key string, value interface{}) error
  117. GetState(key string) (interface{}, error)
  118. DeleteState(key string) error
  119. }
  120. type Operator interface {
  121. Emitter
  122. Collector
  123. Exec(StreamContext, chan<- error)
  124. GetName() string
  125. GetMetrics() [][]interface{}
  126. }
  127. type FunctionContext interface {
  128. StreamContext
  129. GetFuncId() int
  130. }
  131. type Function interface {
  132. //The argument is a list of xsql.Expr
  133. Validate(args []interface{}) error
  134. //Execute the function, return the result and if execution is successful.
  135. //If execution fails, return the error and false.
  136. Exec(args []interface{}, ctx FunctionContext) (interface{}, bool)
  137. //If this function is an aggregate function. Each parameter of an aggregate function will be a slice
  138. IsAggregate() bool
  139. }
  140. const (
  141. AtMostOnce Qos = iota
  142. AtLeastOnce
  143. ExactlyOnce
  144. )
  145. type Qos int