stream.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. package api
  2. import (
  3. "context"
  4. "sync"
  5. )
  6. type SourceTuple interface {
  7. Message() map[string]interface{}
  8. Meta() map[string]interface{}
  9. }
  10. type DefaultSourceTuple struct {
  11. message map[string]interface{}
  12. meta map[string]interface{}
  13. }
  14. func NewDefaultSourceTuple(message map[string]interface{}, meta map[string]interface{}) *DefaultSourceTuple {
  15. return &DefaultSourceTuple{
  16. message: message,
  17. meta: meta,
  18. }
  19. }
  20. func (t *DefaultSourceTuple) Message() map[string]interface{} {
  21. return t.message
  22. }
  23. func (t *DefaultSourceTuple) Meta() map[string]interface{} {
  24. return t.meta
  25. }
  26. type Logger interface {
  27. Debug(args ...interface{})
  28. Info(args ...interface{})
  29. Warn(args ...interface{})
  30. Error(args ...interface{})
  31. Debugln(args ...interface{})
  32. Infoln(args ...interface{})
  33. Warnln(args ...interface{})
  34. Errorln(args ...interface{})
  35. Debugf(format string, args ...interface{})
  36. Infof(format string, args ...interface{})
  37. Warnf(format string, args ...interface{})
  38. Errorf(format string, args ...interface{})
  39. }
  40. type Store interface {
  41. SaveState(checkpointId int64, opId string, state map[string]interface{}) error
  42. SaveCheckpoint(checkpointId int64) error //Save the whole checkpoint state into storage like badger
  43. GetOpState(opId string) (*sync.Map, error)
  44. }
  45. type Closable interface {
  46. Close(ctx StreamContext) error
  47. }
  48. type Source interface {
  49. //Should be sync function for normal case. The container will run it in go func
  50. Open(ctx StreamContext, consumer chan<- SourceTuple, errCh chan<- error)
  51. //Called during initialization. Configure the source with the data source(e.g. topic for mqtt) and the properties
  52. //read from the yaml
  53. Configure(datasource string, props map[string]interface{}) error
  54. Closable
  55. }
  56. type Sink interface {
  57. //Should be sync function for normal case. The container will run it in go func
  58. Open(ctx StreamContext) error
  59. //Called during initialization. Configure the sink with the properties from rule action definition
  60. Configure(props map[string]interface{}) error
  61. //Called when each row of data has transferred to this sink
  62. Collect(ctx StreamContext, data interface{}) error
  63. Closable
  64. }
  65. type Emitter interface {
  66. AddOutput(chan<- interface{}, string) error
  67. }
  68. type Collector interface {
  69. GetInput() (chan<- interface{}, string)
  70. }
  71. type TopNode interface {
  72. GetName() string
  73. }
  74. type RuleOption struct {
  75. IsEventTime bool `json:"isEventTime"`
  76. LateTol int64 `json:"lateTolerance"`
  77. Concurrency int `json:"concurrency"`
  78. BufferLength int `json:"bufferLength"`
  79. SendMetaToSink bool `json:"sendMetaToSink"`
  80. Qos Qos `json:"qos"`
  81. CheckpointInterval int `json:"checkpointInterval"`
  82. }
  83. type Rule struct {
  84. Triggered bool `json:"triggered"`
  85. Id string `json:"id"`
  86. Sql string `json:"sql"`
  87. Actions []map[string]interface{} `json:"actions"`
  88. Options *RuleOption `json:"options"`
  89. }
  90. type StreamContext interface {
  91. context.Context
  92. GetLogger() Logger
  93. GetRuleId() string
  94. GetOpId() string
  95. GetInstanceId() int
  96. WithMeta(ruleId string, opId string, store Store) StreamContext
  97. WithInstance(instanceId int) StreamContext
  98. WithCancel() (StreamContext, context.CancelFunc)
  99. SetError(e error)
  100. //State handling
  101. IncrCounter(key string, amount int) error
  102. GetCounter(key string) (int, error)
  103. PutState(key string, value interface{}) error
  104. GetState(key string) (interface{}, error)
  105. DeleteState(key string) error
  106. Snapshot() error
  107. SaveState(checkpointId int64) error
  108. }
  109. type Operator interface {
  110. Emitter
  111. Collector
  112. Exec(StreamContext, chan<- error)
  113. GetName() string
  114. GetMetrics() [][]interface{}
  115. }
  116. type FunctionContext interface {
  117. StreamContext
  118. GetFuncId() int
  119. }
  120. type Function interface {
  121. //The argument is a list of xsql.Expr
  122. Validate(args []interface{}) error
  123. //Execute the function, return the result and if execution is successful.
  124. //If execution fails, return the error and false.
  125. Exec(args []interface{}, ctx FunctionContext) (interface{}, bool)
  126. //If this function is an aggregate function. Each parameter of an aggregate function will be a slice
  127. IsAggregate() bool
  128. }
  129. const (
  130. AtMostOnce Qos = iota
  131. AtLeastOnce
  132. ExactlyOnce
  133. )
  134. type Qos int