stream.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package api
  15. import (
  16. "context"
  17. "sync"
  18. "time"
  19. )
  20. type SourceTuple interface {
  21. Message() map[string]interface{}
  22. Meta() map[string]interface{}
  23. Timestamp() time.Time
  24. }
  25. type DefaultSourceTuple struct {
  26. Mess map[string]interface{} `json:"message"`
  27. M map[string]interface{} `json:"meta"`
  28. Time time.Time `json:"timestamp"`
  29. }
  30. func NewDefaultSourceTuple(message map[string]interface{}, meta map[string]interface{}) *DefaultSourceTuple {
  31. return &DefaultSourceTuple{
  32. Mess: message,
  33. M: meta,
  34. Time: time.Now(),
  35. }
  36. }
  37. func NewDefaultSourceTupleWithTime(message map[string]interface{}, meta map[string]interface{}, timestamp time.Time) *DefaultSourceTuple {
  38. return &DefaultSourceTuple{
  39. Mess: message,
  40. M: meta,
  41. Time: timestamp,
  42. }
  43. }
  44. func (t *DefaultSourceTuple) Message() map[string]interface{} {
  45. return t.Mess
  46. }
  47. func (t *DefaultSourceTuple) Meta() map[string]interface{} {
  48. return t.M
  49. }
  50. func (t *DefaultSourceTuple) Timestamp() time.Time {
  51. return t.Time
  52. }
  53. type Logger interface {
  54. Debug(args ...interface{})
  55. Info(args ...interface{})
  56. Warn(args ...interface{})
  57. Error(args ...interface{})
  58. Debugln(args ...interface{})
  59. Infoln(args ...interface{})
  60. Warnln(args ...interface{})
  61. Errorln(args ...interface{})
  62. Debugf(format string, args ...interface{})
  63. Infof(format string, args ...interface{})
  64. Warnf(format string, args ...interface{})
  65. Errorf(format string, args ...interface{})
  66. }
  67. type Store interface {
  68. SaveState(checkpointId int64, opId string, state map[string]interface{}) error
  69. SaveCheckpoint(checkpointId int64) error //Save the whole checkpoint state into storage
  70. GetOpState(opId string) (*sync.Map, error)
  71. Clean() error
  72. }
  73. type Closable interface {
  74. Close(ctx StreamContext) error
  75. }
  76. type Source interface {
  77. // Open Should be sync function for normal case. The container will run it in go func
  78. Open(ctx StreamContext, consumer chan<- SourceTuple, errCh chan<- error)
  79. // Configure Called during initialization. Configure the source with the data source(e.g. topic for mqtt) and the properties
  80. //read from the yaml
  81. Configure(datasource string, props map[string]interface{}) error
  82. Closable
  83. }
  84. type LookupSource interface {
  85. // Open creates the connection to the external data source
  86. Open(ctx StreamContext) error
  87. // Configure Called during initialization. Configure the source with the data source(e.g. topic for mqtt) and the properties
  88. //read from the yaml
  89. Configure(datasource string, props map[string]interface{}) error
  90. // Lookup receive lookup values to construct the query and return query results
  91. Lookup(ctx StreamContext, fields []string, keys []string, values []interface{}) ([]SourceTuple, error)
  92. Closable
  93. }
  94. type Sink interface {
  95. // Open Should be sync function for normal case. The container will run it in go func
  96. Open(ctx StreamContext) error
  97. // Configure Called during initialization. Configure the sink with the properties from rule action definition
  98. Configure(props map[string]interface{}) error
  99. // Collect Called when each row of data has transferred to this sink
  100. Collect(ctx StreamContext, data interface{}) error
  101. Closable
  102. }
  103. type Emitter interface {
  104. AddOutput(chan<- interface{}, string) error
  105. }
  106. type Collector interface {
  107. GetInput() (chan<- interface{}, string)
  108. }
  109. type TopNode interface {
  110. GetName() string
  111. }
  112. type Rewindable interface {
  113. GetOffset() (interface{}, error)
  114. Rewind(offset interface{}) error
  115. }
  116. type RuleOption struct {
  117. IsEventTime bool `json:"isEventTime" yaml:"isEventTime"`
  118. LateTol int64 `json:"lateTolerance" yaml:"lateTolerance"`
  119. Concurrency int `json:"concurrency" yaml:"concurrency"`
  120. BufferLength int `json:"bufferLength" yaml:"bufferLength"`
  121. SendMetaToSink bool `json:"sendMetaToSink" yaml:"sendMetaToSink"`
  122. SendError bool `json:"sendError" yaml:"sendError"`
  123. Qos Qos `json:"qos" yaml:"qos"`
  124. CheckpointInterval int `json:"checkpointInterval" yaml:"checkpointInterval"`
  125. Restart *RestartStrategy `json:"restartStrategy" yaml:"restartStrategy"`
  126. }
  127. type RestartStrategy struct {
  128. Attempts int `json:"attempts" yaml:"attempts"`
  129. Delay int `json:"delay" yaml:"delay"`
  130. Multiplier float64 `json:"multiplier" yaml:"multiplier"`
  131. MaxDelay int `json:"maxDelay" yaml:"maxDelay"`
  132. JitterFactor float64 `json:"jitter" yaml:"jitter"`
  133. }
  134. type PrintableTopo struct {
  135. Sources []string `json:"sources"`
  136. Edges map[string][]interface{} `json:"edges"`
  137. }
  138. type GraphNode struct {
  139. Type string `json:"type"`
  140. NodeType string `json:"nodeType"`
  141. Props map[string]interface{} `json:"props"`
  142. UI map[string]interface{} `json:"ui"` //placeholder for ui properties
  143. }
  144. type RuleGraph struct {
  145. Nodes map[string]*GraphNode `json:"nodes"`
  146. Topo *PrintableTopo `json:"topo"`
  147. }
  148. // Rule the definition of the business logic
  149. // Sql and Graph are mutually exclusive, at least one of them should be set
  150. type Rule struct {
  151. Triggered bool `json:"triggered"`
  152. Id string `json:"id,omitempty"`
  153. Name string `json:"name,omitempty"` // The display name of a rule
  154. Sql string `json:"sql,omitempty"`
  155. Graph *RuleGraph `json:"graph,omitempty"`
  156. Actions []map[string]interface{} `json:"actions,omitempty"`
  157. Options *RuleOption `json:"options,omitempty"`
  158. }
  159. type StreamContext interface {
  160. context.Context
  161. GetLogger() Logger
  162. GetRuleId() string
  163. GetOpId() string
  164. GetInstanceId() int
  165. GetRootPath() string
  166. WithMeta(ruleId string, opId string, store Store) StreamContext
  167. WithInstance(instanceId int) StreamContext
  168. WithCancel() (StreamContext, context.CancelFunc)
  169. SetError(e error)
  170. // IncrCounter State handling
  171. IncrCounter(key string, amount int) error
  172. GetCounter(key string) (int, error)
  173. PutState(key string, value interface{}) error
  174. GetState(key string) (interface{}, error)
  175. DeleteState(key string) error
  176. // ParseTemplate parse the template string with the given data
  177. ParseTemplate(template string, data interface{}) (string, error)
  178. // ParseJsonPath parse the jsonPath string with the given data
  179. ParseJsonPath(jsonPath string, data interface{}) (interface{}, error)
  180. // TransformOutput Transform output according to the properties including dataTemplate, sendSingle
  181. // The second parameter is whether the data is transformed or just return as its json format.
  182. TransformOutput(data interface{}) ([]byte, bool, error)
  183. // Decode is set in the source according to the format.
  184. // It decodes byte array into map or map slice.
  185. Decode(data []byte) (map[string]interface{}, error)
  186. DecodeIntoList(data []byte) ([]map[string]interface{}, error)
  187. }
  188. type Operator interface {
  189. Emitter
  190. Collector
  191. Exec(StreamContext, chan<- error)
  192. GetName() string
  193. GetMetrics() [][]interface{}
  194. }
  195. type FunctionContext interface {
  196. StreamContext
  197. GetFuncId() int
  198. }
  199. type Function interface {
  200. // Validate The argument is a list of xsql.Expr
  201. Validate(args []interface{}) error
  202. // Exec Execute the function, return the result and if execution is successful.
  203. //If execution fails, return the error and false.
  204. Exec(args []interface{}, ctx FunctionContext) (interface{}, bool)
  205. // IsAggregate If this function is an aggregate function. Each parameter of an aggregate function will be a slice
  206. IsAggregate() bool
  207. }
  208. const (
  209. AtMostOnce Qos = iota
  210. AtLeastOnce
  211. ExactlyOnce
  212. )
  213. type Qos int
  214. type MessageClient interface {
  215. Subscribe(c StreamContext, subChan []TopicChannel, messageErrors chan error, params map[string]interface{}) error
  216. Publish(c StreamContext, topic string, message []byte, params map[string]interface{}) error
  217. }
  218. // TopicChannel is the data structure for subscriber
  219. type TopicChannel struct {
  220. // Topic for subscriber to filter on if any
  221. Topic string
  222. // Messages is the returned message channel for the subscriber
  223. Messages chan<- interface{}
  224. }