stream.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. package api
  2. import (
  3. "context"
  4. "sync"
  5. )
  6. type SourceTuple interface {
  7. Message() map[string]interface{}
  8. Meta() map[string]interface{}
  9. }
  10. type DefaultSourceTuple struct {
  11. message map[string]interface{}
  12. meta map[string]interface{}
  13. }
  14. func NewDefaultSourceTuple(message map[string]interface{}, meta map[string]interface{}) *DefaultSourceTuple {
  15. return &DefaultSourceTuple{
  16. message: message,
  17. meta: meta,
  18. }
  19. }
  20. func (t *DefaultSourceTuple) Message() map[string]interface{} {
  21. return t.message
  22. }
  23. func (t *DefaultSourceTuple) Meta() map[string]interface{} {
  24. return t.meta
  25. }
  26. type Logger interface {
  27. Debug(args ...interface{})
  28. Info(args ...interface{})
  29. Warn(args ...interface{})
  30. Error(args ...interface{})
  31. Debugln(args ...interface{})
  32. Infoln(args ...interface{})
  33. Warnln(args ...interface{})
  34. Errorln(args ...interface{})
  35. Debugf(format string, args ...interface{})
  36. Infof(format string, args ...interface{})
  37. Warnf(format string, args ...interface{})
  38. Errorf(format string, args ...interface{})
  39. }
  40. type Store interface {
  41. SaveState(checkpointId int64, opId string, state map[string]interface{}) error
  42. SaveCheckpoint(checkpointId int64) error //Save the whole checkpoint state into storage like badger
  43. GetOpState(opId string) (*sync.Map, error)
  44. }
  45. type Closable interface {
  46. Close(ctx StreamContext) error
  47. }
  48. type Source interface {
  49. //Should be sync function for normal case. The container will run it in go func
  50. Open(ctx StreamContext, consumer chan<- SourceTuple, errCh chan<- error)
  51. //Called during initialization. Configure the source with the data source(e.g. topic for mqtt) and the properties
  52. //read from the yaml
  53. Configure(datasource string, props map[string]interface{}) error
  54. Closable
  55. }
  56. type Sink interface {
  57. //Should be sync function for normal case. The container will run it in go func
  58. Open(ctx StreamContext) error
  59. //Called during initialization. Configure the sink with the properties from rule action definition
  60. Configure(props map[string]interface{}) error
  61. //Called when each row of data has transferred to this sink
  62. Collect(ctx StreamContext, data interface{}) error
  63. Closable
  64. }
  65. type Emitter interface {
  66. AddOutput(chan<- interface{}, string) error
  67. }
  68. type Collector interface {
  69. GetInput() (chan<- interface{}, string)
  70. }
  71. type TopNode interface {
  72. GetName() string
  73. }
  74. type Rewindable interface {
  75. GetOffset() (interface{}, error)
  76. Rewind(offset interface{}) error
  77. }
  78. type RuleOption struct {
  79. IsEventTime bool `json:"isEventTime" yaml:"isEventTime"`
  80. LateTol int64 `json:"lateTolerance" yaml:"lateTolerance"`
  81. Concurrency int `json:"concurrency" yaml:"concurrency"`
  82. BufferLength int `json:"bufferLength" yaml:"bufferLength"`
  83. SendMetaToSink bool `json:"sendMetaToSink" yaml:"sendMetaToSink"`
  84. SendError bool `json:"sendError" yaml:"sendError"`
  85. Qos Qos `json:"qos" yaml:"qos"`
  86. CheckpointInterval int `json:"checkpointInterval" yaml:"checkpointInterval"`
  87. }
  88. type Rule struct {
  89. Triggered bool `json:"triggered"`
  90. Id string `json:"id"`
  91. Sql string `json:"sql"`
  92. Actions []map[string]interface{} `json:"actions"`
  93. Options *RuleOption `json:"options"`
  94. }
  95. type StreamContext interface {
  96. context.Context
  97. GetLogger() Logger
  98. GetRuleId() string
  99. GetOpId() string
  100. GetInstanceId() int
  101. WithMeta(ruleId string, opId string, store Store) StreamContext
  102. WithInstance(instanceId int) StreamContext
  103. WithCancel() (StreamContext, context.CancelFunc)
  104. SetError(e error)
  105. //State handling
  106. IncrCounter(key string, amount int) error
  107. GetCounter(key string) (int, error)
  108. PutState(key string, value interface{}) error
  109. GetState(key string) (interface{}, error)
  110. DeleteState(key string) error
  111. }
  112. type Operator interface {
  113. Emitter
  114. Collector
  115. Exec(StreamContext, chan<- error)
  116. GetName() string
  117. GetMetrics() [][]interface{}
  118. }
  119. type FunctionContext interface {
  120. StreamContext
  121. GetFuncId() int
  122. }
  123. type Function interface {
  124. //The argument is a list of xsql.Expr
  125. Validate(args []interface{}) error
  126. //Execute the function, return the result and if execution is successful.
  127. //If execution fails, return the error and false.
  128. Exec(args []interface{}, ctx FunctionContext) (interface{}, bool)
  129. //If this function is an aggregate function. Each parameter of an aggregate function will be a slice
  130. IsAggregate() bool
  131. }
  132. const (
  133. AtMostOnce Qos = iota
  134. AtLeastOnce
  135. ExactlyOnce
  136. )
  137. type Qos int