stream.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package api
  15. import (
  16. "context"
  17. "sync"
  18. )
  19. type SourceTuple interface {
  20. Message() map[string]interface{}
  21. Meta() map[string]interface{}
  22. }
  23. type DefaultSourceTuple struct {
  24. Mess map[string]interface{} `json:"message"`
  25. M map[string]interface{} `json:"meta"`
  26. }
  27. func NewDefaultSourceTuple(message map[string]interface{}, meta map[string]interface{}) *DefaultSourceTuple {
  28. return &DefaultSourceTuple{
  29. Mess: message,
  30. M: meta,
  31. }
  32. }
  33. func (t *DefaultSourceTuple) Message() map[string]interface{} {
  34. return t.Mess
  35. }
  36. func (t *DefaultSourceTuple) Meta() map[string]interface{} {
  37. return t.M
  38. }
  39. type Logger interface {
  40. Debug(args ...interface{})
  41. Info(args ...interface{})
  42. Warn(args ...interface{})
  43. Error(args ...interface{})
  44. Debugln(args ...interface{})
  45. Infoln(args ...interface{})
  46. Warnln(args ...interface{})
  47. Errorln(args ...interface{})
  48. Debugf(format string, args ...interface{})
  49. Infof(format string, args ...interface{})
  50. Warnf(format string, args ...interface{})
  51. Errorf(format string, args ...interface{})
  52. }
  53. type Store interface {
  54. SaveState(checkpointId int64, opId string, state map[string]interface{}) error
  55. SaveCheckpoint(checkpointId int64) error //Save the whole checkpoint state into storage
  56. GetOpState(opId string) (*sync.Map, error)
  57. Clean() error
  58. }
  59. type Closable interface {
  60. Close(ctx StreamContext) error
  61. }
  62. type Source interface {
  63. //Should be sync function for normal case. The container will run it in go func
  64. Open(ctx StreamContext, consumer chan<- SourceTuple, errCh chan<- error)
  65. //Called during initialization. Configure the source with the data source(e.g. topic for mqtt) and the properties
  66. //read from the yaml
  67. Configure(datasource string, props map[string]interface{}) error
  68. Closable
  69. }
  70. type TableSource interface {
  71. // Load the data at batch
  72. Load(ctx StreamContext) ([]SourceTuple, error)
  73. //Called during initialization. Configure the source with the data source(e.g. topic for mqtt) and the properties
  74. //read from the yaml
  75. Configure(datasource string, props map[string]interface{}) error
  76. }
  77. type Sink interface {
  78. //Should be sync function for normal case. The container will run it in go func
  79. Open(ctx StreamContext) error
  80. //Called during initialization. Configure the sink with the properties from rule action definition
  81. Configure(props map[string]interface{}) error
  82. //Called when each row of data has transferred to this sink
  83. Collect(ctx StreamContext, data interface{}) error
  84. Closable
  85. }
  86. type Emitter interface {
  87. AddOutput(chan<- interface{}, string) error
  88. }
  89. type Collector interface {
  90. GetInput() (chan<- interface{}, string)
  91. }
  92. type TopNode interface {
  93. GetName() string
  94. }
  95. type Rewindable interface {
  96. GetOffset() (interface{}, error)
  97. Rewind(offset interface{}) error
  98. }
  99. type RuleOption struct {
  100. IsEventTime bool `json:"isEventTime" yaml:"isEventTime"`
  101. LateTol int64 `json:"lateTolerance" yaml:"lateTolerance"`
  102. Concurrency int `json:"concurrency" yaml:"concurrency"`
  103. BufferLength int `json:"bufferLength" yaml:"bufferLength"`
  104. SendMetaToSink bool `json:"sendMetaToSink" yaml:"sendMetaToSink"`
  105. SendError bool `json:"sendError" yaml:"sendError"`
  106. Qos Qos `json:"qos" yaml:"qos"`
  107. CheckpointInterval int `json:"checkpointInterval" yaml:"checkpointInterval"`
  108. }
  109. type Rule struct {
  110. Triggered bool `json:"triggered"`
  111. Id string `json:"id"`
  112. Sql string `json:"sql"`
  113. Actions []map[string]interface{} `json:"actions"`
  114. Options *RuleOption `json:"options"`
  115. }
  116. type StreamContext interface {
  117. context.Context
  118. GetLogger() Logger
  119. GetRuleId() string
  120. GetOpId() string
  121. GetInstanceId() int
  122. GetRootPath() string
  123. WithMeta(ruleId string, opId string, store Store) StreamContext
  124. WithInstance(instanceId int) StreamContext
  125. WithCancel() (StreamContext, context.CancelFunc)
  126. SetError(e error)
  127. // State handling
  128. IncrCounter(key string, amount int) error
  129. GetCounter(key string) (int, error)
  130. PutState(key string, value interface{}) error
  131. GetState(key string) (interface{}, error)
  132. DeleteState(key string) error
  133. // Connection related methods
  134. GetConnection(connectSelector string) (interface{}, error)
  135. ReleaseConnection(connectSelector string)
  136. // Properties processing, prop is a json path
  137. ParseDynamicProp(prop string, data interface{}) (interface{}, error)
  138. // Transform output according to the properties like syntax
  139. TransformOutput() ([]byte, bool, error)
  140. }
  141. type Operator interface {
  142. Emitter
  143. Collector
  144. Exec(StreamContext, chan<- error)
  145. GetName() string
  146. GetMetrics() [][]interface{}
  147. }
  148. type FunctionContext interface {
  149. StreamContext
  150. GetFuncId() int
  151. }
  152. type Function interface {
  153. //The argument is a list of xsql.Expr
  154. Validate(args []interface{}) error
  155. //Execute the function, return the result and if execution is successful.
  156. //If execution fails, return the error and false.
  157. Exec(args []interface{}, ctx FunctionContext) (interface{}, bool)
  158. //If this function is an aggregate function. Each parameter of an aggregate function will be a slice
  159. IsAggregate() bool
  160. }
  161. const (
  162. AtMostOnce Qos = iota
  163. AtLeastOnce
  164. ExactlyOnce
  165. )
  166. type Qos int