stream.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. package api
  2. import (
  3. "context"
  4. "sync"
  5. )
  6. type SourceTuple interface {
  7. Message() map[string]interface{}
  8. Meta() map[string]interface{}
  9. }
  10. type DefaultSourceTuple struct {
  11. message map[string]interface{}
  12. meta map[string]interface{}
  13. }
  14. func NewDefaultSourceTuple(message map[string]interface{}, meta map[string]interface{}) *DefaultSourceTuple {
  15. return &DefaultSourceTuple{
  16. message: message,
  17. meta: meta,
  18. }
  19. }
  20. func (t *DefaultSourceTuple) Message() map[string]interface{} {
  21. return t.message
  22. }
  23. func (t *DefaultSourceTuple) Meta() map[string]interface{} {
  24. return t.meta
  25. }
  26. type Logger interface {
  27. Debug(args ...interface{})
  28. Info(args ...interface{})
  29. Warn(args ...interface{})
  30. Error(args ...interface{})
  31. Debugln(args ...interface{})
  32. Infoln(args ...interface{})
  33. Warnln(args ...interface{})
  34. Errorln(args ...interface{})
  35. Debugf(format string, args ...interface{})
  36. Infof(format string, args ...interface{})
  37. Warnf(format string, args ...interface{})
  38. Errorf(format string, args ...interface{})
  39. }
  40. type Store interface {
  41. SaveState(checkpointId int64, opId string, state map[string]interface{}) error
  42. SaveCheckpoint(checkpointId int64) error //Save the whole checkpoint state into storage
  43. GetOpState(opId string) (*sync.Map, error)
  44. Clean() error
  45. }
  46. type Closable interface {
  47. Close(ctx StreamContext) error
  48. }
  49. type Source interface {
  50. //Should be sync function for normal case. The container will run it in go func
  51. Open(ctx StreamContext, consumer chan<- SourceTuple, errCh chan<- error)
  52. //Called during initialization. Configure the source with the data source(e.g. topic for mqtt) and the properties
  53. //read from the yaml
  54. Configure(datasource string, props map[string]interface{}) error
  55. Closable
  56. }
  57. type TableSource interface {
  58. // Load the data at batch
  59. Load(ctx StreamContext) ([]SourceTuple, error)
  60. //Called during initialization. Configure the source with the data source(e.g. topic for mqtt) and the properties
  61. //read from the yaml
  62. Configure(datasource string, props map[string]interface{}) error
  63. }
  64. type Sink interface {
  65. //Should be sync function for normal case. The container will run it in go func
  66. Open(ctx StreamContext) error
  67. //Called during initialization. Configure the sink with the properties from rule action definition
  68. Configure(props map[string]interface{}) error
  69. //Called when each row of data has transferred to this sink
  70. Collect(ctx StreamContext, data interface{}) error
  71. Closable
  72. }
  73. type Emitter interface {
  74. AddOutput(chan<- interface{}, string) error
  75. }
  76. type Collector interface {
  77. GetInput() (chan<- interface{}, string)
  78. }
  79. type TopNode interface {
  80. GetName() string
  81. }
  82. type Rewindable interface {
  83. GetOffset() (interface{}, error)
  84. Rewind(offset interface{}) error
  85. }
  86. type RuleOption struct {
  87. IsEventTime bool `json:"isEventTime" yaml:"isEventTime"`
  88. LateTol int64 `json:"lateTolerance" yaml:"lateTolerance"`
  89. Concurrency int `json:"concurrency" yaml:"concurrency"`
  90. BufferLength int `json:"bufferLength" yaml:"bufferLength"`
  91. SendMetaToSink bool `json:"sendMetaToSink" yaml:"sendMetaToSink"`
  92. SendError bool `json:"sendError" yaml:"sendError"`
  93. Qos Qos `json:"qos" yaml:"qos"`
  94. CheckpointInterval int `json:"checkpointInterval" yaml:"checkpointInterval"`
  95. }
  96. type Rule struct {
  97. Triggered bool `json:"triggered"`
  98. Id string `json:"id"`
  99. Sql string `json:"sql"`
  100. Actions []map[string]interface{} `json:"actions"`
  101. Options *RuleOption `json:"options"`
  102. }
  103. type StreamContext interface {
  104. context.Context
  105. GetLogger() Logger
  106. GetRuleId() string
  107. GetOpId() string
  108. GetInstanceId() int
  109. GetRootPath() string
  110. WithMeta(ruleId string, opId string, store Store) StreamContext
  111. WithInstance(instanceId int) StreamContext
  112. WithCancel() (StreamContext, context.CancelFunc)
  113. SetError(e error)
  114. //State handling
  115. IncrCounter(key string, amount int) error
  116. GetCounter(key string) (int, error)
  117. PutState(key string, value interface{}) error
  118. GetState(key string) (interface{}, error)
  119. DeleteState(key string) error
  120. }
  121. type Operator interface {
  122. Emitter
  123. Collector
  124. Exec(StreamContext, chan<- error)
  125. GetName() string
  126. GetMetrics() [][]interface{}
  127. }
  128. type FunctionContext interface {
  129. StreamContext
  130. GetFuncId() int
  131. }
  132. type Function interface {
  133. //The argument is a list of xsql.Expr
  134. Validate(args []interface{}) error
  135. //Execute the function, return the result and if execution is successful.
  136. //If execution fails, return the error and false.
  137. Exec(args []interface{}, ctx FunctionContext) (interface{}, bool)
  138. //If this function is an aggregate function. Each parameter of an aggregate function will be a slice
  139. IsAggregate() bool
  140. }
  141. const (
  142. AtMostOnce Qos = iota
  143. AtLeastOnce
  144. ExactlyOnce
  145. )
  146. type Qos int