stream.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package api
  15. import (
  16. "context"
  17. "sync"
  18. )
  19. type SourceTuple interface {
  20. Message() map[string]interface{}
  21. Meta() map[string]interface{}
  22. }
  23. type DefaultSourceTuple struct {
  24. Mess map[string]interface{} `json:"message"`
  25. M map[string]interface{} `json:"meta"`
  26. }
  27. func NewDefaultSourceTuple(message map[string]interface{}, meta map[string]interface{}) *DefaultSourceTuple {
  28. return &DefaultSourceTuple{
  29. Mess: message,
  30. M: meta,
  31. }
  32. }
  33. func (t *DefaultSourceTuple) Message() map[string]interface{} {
  34. return t.Mess
  35. }
  36. func (t *DefaultSourceTuple) Meta() map[string]interface{} {
  37. return t.M
  38. }
  39. type Logger interface {
  40. Debug(args ...interface{})
  41. Info(args ...interface{})
  42. Warn(args ...interface{})
  43. Error(args ...interface{})
  44. Debugln(args ...interface{})
  45. Infoln(args ...interface{})
  46. Warnln(args ...interface{})
  47. Errorln(args ...interface{})
  48. Debugf(format string, args ...interface{})
  49. Infof(format string, args ...interface{})
  50. Warnf(format string, args ...interface{})
  51. Errorf(format string, args ...interface{})
  52. }
  53. type Store interface {
  54. SaveState(checkpointId int64, opId string, state map[string]interface{}) error
  55. SaveCheckpoint(checkpointId int64) error //Save the whole checkpoint state into storage
  56. GetOpState(opId string) (*sync.Map, error)
  57. Clean() error
  58. }
  59. type Closable interface {
  60. Close(ctx StreamContext) error
  61. }
  62. type Source interface {
  63. // Open Should be sync function for normal case. The container will run it in go func
  64. Open(ctx StreamContext, consumer chan<- SourceTuple, errCh chan<- error)
  65. // Configure Called during initialization. Configure the source with the data source(e.g. topic for mqtt) and the properties
  66. //read from the yaml
  67. Configure(datasource string, props map[string]interface{}) error
  68. Closable
  69. }
  70. type LookupSource interface {
  71. // Open creates the connection to the external data source
  72. Open(ctx StreamContext) error
  73. // Configure Called during initialization. Configure the source with the data source(e.g. topic for mqtt) and the properties
  74. //read from the yaml
  75. Configure(datasource string, props map[string]interface{}) error
  76. // Lookup receive lookup values to construct the query and return query results
  77. Lookup(ctx StreamContext, fields []string, keys []string, values []interface{}) ([]SourceTuple, error)
  78. Closable
  79. }
  80. type Sink interface {
  81. // Open Should be sync function for normal case. The container will run it in go func
  82. Open(ctx StreamContext) error
  83. // Configure Called during initialization. Configure the sink with the properties from rule action definition
  84. Configure(props map[string]interface{}) error
  85. // Collect Called when each row of data has transferred to this sink
  86. Collect(ctx StreamContext, data interface{}) error
  87. Closable
  88. }
  89. type Emitter interface {
  90. AddOutput(chan<- interface{}, string) error
  91. }
  92. type Collector interface {
  93. GetInput() (chan<- interface{}, string)
  94. }
  95. type TopNode interface {
  96. GetName() string
  97. }
  98. type Rewindable interface {
  99. GetOffset() (interface{}, error)
  100. Rewind(offset interface{}) error
  101. }
  102. type RuleOption struct {
  103. IsEventTime bool `json:"isEventTime" yaml:"isEventTime"`
  104. LateTol int64 `json:"lateTolerance" yaml:"lateTolerance"`
  105. Concurrency int `json:"concurrency" yaml:"concurrency"`
  106. BufferLength int `json:"bufferLength" yaml:"bufferLength"`
  107. SendMetaToSink bool `json:"sendMetaToSink" yaml:"sendMetaToSink"`
  108. SendError bool `json:"sendError" yaml:"sendError"`
  109. Qos Qos `json:"qos" yaml:"qos"`
  110. CheckpointInterval int `json:"checkpointInterval" yaml:"checkpointInterval"`
  111. Restart *RestartStrategy `json:"restartStrategy" yaml:"restartStrategy"`
  112. }
  113. type RestartStrategy struct {
  114. Attempts int `json:"attempts" yaml:"attempts"`
  115. Delay int `json:"delay" yaml:"delay"`
  116. Multiplier float64 `json:"multiplier" yaml:"multiplier"`
  117. MaxDelay int `json:"maxDelay" yaml:"maxDelay"`
  118. JitterFactor float64 `json:"jitter" yaml:"jitter"`
  119. }
  120. type PrintableTopo struct {
  121. Sources []string `json:"sources"`
  122. Edges map[string][]interface{} `json:"edges"`
  123. }
  124. type GraphNode struct {
  125. Type string `json:"type"`
  126. NodeType string `json:"nodeType"`
  127. Props map[string]interface{} `json:"props"`
  128. UI map[string]interface{} `json:"ui"` //placeholder for ui properties
  129. }
  130. type RuleGraph struct {
  131. Nodes map[string]*GraphNode `json:"nodes"`
  132. Topo *PrintableTopo `json:"topo"`
  133. }
  134. // Rule the definition of the business logic
  135. // Sql and Graph are mutually exclusive, at least one of them should be set
  136. type Rule struct {
  137. Triggered bool `json:"triggered"`
  138. Id string `json:"id,omitempty"`
  139. Name string `json:"name,omitempty"` // The display name of a rule
  140. Sql string `json:"sql,omitempty"`
  141. Graph *RuleGraph `json:"graph,omitempty"`
  142. Actions []map[string]interface{} `json:"actions,omitempty"`
  143. Options *RuleOption `json:"options,omitempty"`
  144. }
  145. type StreamContext interface {
  146. context.Context
  147. GetLogger() Logger
  148. GetRuleId() string
  149. GetOpId() string
  150. GetInstanceId() int
  151. GetRootPath() string
  152. WithMeta(ruleId string, opId string, store Store) StreamContext
  153. WithInstance(instanceId int) StreamContext
  154. WithCancel() (StreamContext, context.CancelFunc)
  155. SetError(e error)
  156. // IncrCounter State handling
  157. IncrCounter(key string, amount int) error
  158. GetCounter(key string) (int, error)
  159. PutState(key string, value interface{}) error
  160. GetState(key string) (interface{}, error)
  161. DeleteState(key string) error
  162. // ParseTemplate parse the template string with the given data
  163. ParseTemplate(template string, data interface{}) (string, error)
  164. // ParseJsonPath parse the jsonPath string with the given data
  165. ParseJsonPath(jsonPath string, data interface{}) (interface{}, error)
  166. // TransformOutput Transform output according to the properties including dataTemplate, sendSingle
  167. // The second parameter is whether the data is transformed or just return as its json format.
  168. TransformOutput(data interface{}) ([]byte, bool, error)
  169. // Decode is set in the source according to the format.
  170. // It decodes byte array into map or map slice.
  171. Decode(data []byte) (map[string]interface{}, error)
  172. }
  173. type Operator interface {
  174. Emitter
  175. Collector
  176. Exec(StreamContext, chan<- error)
  177. GetName() string
  178. GetMetrics() [][]interface{}
  179. }
  180. type FunctionContext interface {
  181. StreamContext
  182. GetFuncId() int
  183. }
  184. type Function interface {
  185. // Validate The argument is a list of xsql.Expr
  186. Validate(args []interface{}) error
  187. // Exec Execute the function, return the result and if execution is successful.
  188. //If execution fails, return the error and false.
  189. Exec(args []interface{}, ctx FunctionContext) (interface{}, bool)
  190. // IsAggregate If this function is an aggregate function. Each parameter of an aggregate function will be a slice
  191. IsAggregate() bool
  192. }
  193. const (
  194. AtMostOnce Qos = iota
  195. AtLeastOnce
  196. ExactlyOnce
  197. )
  198. type Qos int
  199. type MessageClient interface {
  200. Subscribe(c StreamContext, subChan []TopicChannel, messageErrors chan error, params map[string]interface{}) error
  201. Publish(c StreamContext, topic string, message []byte, params map[string]interface{}) error
  202. }
  203. // TopicChannel is the data structure for subscriber
  204. type TopicChannel struct {
  205. // Topic for subscriber to filter on if any
  206. Topic string
  207. // Messages is the returned message channel for the subscriber
  208. Messages chan<- interface{}
  209. }