stream.go 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. // Copyright 2021-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package api
  15. import (
  16. "context"
  17. "sync"
  18. "time"
  19. )
  20. type SourceTuple interface {
  21. Message() map[string]interface{}
  22. Meta() map[string]interface{}
  23. Timestamp() time.Time
  24. }
  25. type DefaultSourceTuple struct {
  26. Mess map[string]interface{} `json:"message"`
  27. M map[string]interface{} `json:"meta"`
  28. Time time.Time `json:"timestamp"`
  29. }
  30. func NewDefaultSourceTuple(message map[string]interface{}, meta map[string]interface{}) *DefaultSourceTuple {
  31. return &DefaultSourceTuple{
  32. Mess: message,
  33. M: meta,
  34. Time: time.Now(),
  35. }
  36. }
  37. func NewDefaultSourceTupleWithTime(message map[string]interface{}, meta map[string]interface{}, timestamp time.Time) *DefaultSourceTuple {
  38. return &DefaultSourceTuple{
  39. Mess: message,
  40. M: meta,
  41. Time: timestamp,
  42. }
  43. }
  44. func (t *DefaultSourceTuple) Message() map[string]interface{} {
  45. return t.Mess
  46. }
  47. func (t *DefaultSourceTuple) Meta() map[string]interface{} {
  48. return t.M
  49. }
  50. func (t *DefaultSourceTuple) Timestamp() time.Time {
  51. return t.Time
  52. }
  53. type Logger interface {
  54. Debug(args ...interface{})
  55. Info(args ...interface{})
  56. Warn(args ...interface{})
  57. Error(args ...interface{})
  58. Debugln(args ...interface{})
  59. Infoln(args ...interface{})
  60. Warnln(args ...interface{})
  61. Errorln(args ...interface{})
  62. Debugf(format string, args ...interface{})
  63. Infof(format string, args ...interface{})
  64. Warnf(format string, args ...interface{})
  65. Errorf(format string, args ...interface{})
  66. }
  67. type Store interface {
  68. SaveState(checkpointId int64, opId string, state map[string]interface{}) error
  69. // SaveCheckpoint saves the whole checkpoint state into storage
  70. SaveCheckpoint(checkpointId int64) error
  71. GetOpState(opId string) (*sync.Map, error)
  72. Clean() error
  73. }
  74. type Closable interface {
  75. Close(ctx StreamContext) error
  76. }
  77. type Source interface {
  78. // Open Should be sync function for normal case. The container will run it in go func
  79. Open(ctx StreamContext, consumer chan<- SourceTuple, errCh chan<- error)
  80. // Configure Called during initialization. Configure the source with the data source(e.g. topic for mqtt) and the properties
  81. // read from the yaml
  82. Configure(datasource string, props map[string]interface{}) error
  83. Closable
  84. }
  85. type LookupSource interface {
  86. // Open creates the connection to the external data source
  87. Open(ctx StreamContext) error
  88. // Configure Called during initialization. Configure the source with the data source(e.g. topic for mqtt) and the properties
  89. // read from the yaml
  90. Configure(datasource string, props map[string]interface{}) error
  91. // Lookup receive lookup values to construct the query and return query results
  92. Lookup(ctx StreamContext, fields []string, keys []string, values []interface{}) ([]SourceTuple, error)
  93. Closable
  94. }
  95. type Sink interface {
  96. // Open Should be sync function for normal case. The container will run it in go func
  97. Open(ctx StreamContext) error
  98. // Configure Called during initialization. Configure the sink with the properties from rule action definition
  99. Configure(props map[string]interface{}) error
  100. // Collect Called when each row of data has transferred to this sink
  101. Collect(ctx StreamContext, data interface{}) error
  102. Closable
  103. }
  104. type ResendSink interface {
  105. Sink
  106. // CollectResend Called when the sink cache resend is triggered
  107. CollectResend(ctx StreamContext, data interface{}) error
  108. }
  109. type Emitter interface {
  110. AddOutput(chan<- interface{}, string) error
  111. }
  112. type Collector interface {
  113. GetInput() (chan<- interface{}, string)
  114. }
  115. type TopNode interface {
  116. GetName() string
  117. }
  118. type Rewindable interface {
  119. GetOffset() (interface{}, error)
  120. Rewind(offset interface{}) error
  121. }
  122. type RuleOption struct {
  123. Debug bool `json:"debug" yaml:"debug"`
  124. LogFilename string `json:"logFilename" yaml:"logFilename"`
  125. IsEventTime bool `json:"isEventTime" yaml:"isEventTime"`
  126. LateTol int64 `json:"lateTolerance" yaml:"lateTolerance"`
  127. Concurrency int `json:"concurrency" yaml:"concurrency"`
  128. BufferLength int `json:"bufferLength" yaml:"bufferLength"`
  129. SendMetaToSink bool `json:"sendMetaToSink" yaml:"sendMetaToSink"`
  130. SendError bool `json:"sendError" yaml:"sendError"`
  131. Qos Qos `json:"qos" yaml:"qos"`
  132. CheckpointInterval int `json:"checkpointInterval" yaml:"checkpointInterval"`
  133. Restart *RestartStrategy `json:"restartStrategy" yaml:"restartStrategy"`
  134. Cron string `json:"cron" yaml:"cron"`
  135. Duration string `json:"duration" yaml:"duration"`
  136. CronDatetimeRange []DatetimeRange `json:"cronDatetimeRange" yaml:"cronDatetimeRange"`
  137. }
  138. type DatetimeRange struct {
  139. Begin string `json:"begin" yaml:"begin"`
  140. End string `json:"end" yaml:"end"`
  141. }
  142. type RestartStrategy struct {
  143. Attempts int `json:"attempts" yaml:"attempts"`
  144. Delay int `json:"delay" yaml:"delay"`
  145. Multiplier float64 `json:"multiplier" yaml:"multiplier"`
  146. MaxDelay int `json:"maxDelay" yaml:"maxDelay"`
  147. JitterFactor float64 `json:"jitter" yaml:"jitter"`
  148. }
  149. type PrintableTopo struct {
  150. Sources []string `json:"sources"`
  151. Edges map[string][]interface{} `json:"edges"`
  152. }
  153. type GraphNode struct {
  154. Type string `json:"type"`
  155. NodeType string `json:"nodeType"`
  156. Props map[string]interface{} `json:"props"`
  157. // UI is a placeholder for ui properties
  158. UI map[string]interface{} `json:"ui"`
  159. }
  160. // SourceMeta is the meta data of a source node. It describes what existed stream/table to refer to.
  161. // It is part of the Props in the GraphNode and it is optional
  162. type SourceMeta struct {
  163. SourceName string `json:"sourceName"` // the name of the stream or table
  164. SourceType string `json:"sourceType"` // stream or table
  165. }
  166. type RuleGraph struct {
  167. Nodes map[string]*GraphNode `json:"nodes"`
  168. Topo *PrintableTopo `json:"topo"`
  169. }
  170. // Rule the definition of the business logic
  171. // Sql and Graph are mutually exclusive, at least one of them should be set
  172. type Rule struct {
  173. Triggered bool `json:"triggered"`
  174. Id string `json:"id,omitempty"`
  175. Name string `json:"name,omitempty"` // The display name of a rule
  176. Sql string `json:"sql,omitempty"`
  177. Graph *RuleGraph `json:"graph,omitempty"`
  178. Actions []map[string]interface{} `json:"actions,omitempty"`
  179. Options *RuleOption `json:"options,omitempty"`
  180. }
  181. func (r *Rule) IsLongRunningScheduleRule() bool {
  182. if r.Options == nil {
  183. return false
  184. }
  185. return len(r.Options.Cron) == 0 && len(r.Options.Duration) == 0 && len(r.Options.CronDatetimeRange) > 0
  186. }
  187. func (r *Rule) IsScheduleRule() bool {
  188. if r.Options == nil {
  189. return false
  190. }
  191. return len(r.Options.Cron) > 0 && len(r.Options.Duration) > 0
  192. }
  193. type StreamContext interface {
  194. context.Context
  195. GetLogger() Logger
  196. GetRuleId() string
  197. GetOpId() string
  198. GetInstanceId() int
  199. GetRootPath() string
  200. WithMeta(ruleId string, opId string, store Store) StreamContext
  201. WithInstance(instanceId int) StreamContext
  202. WithCancel() (StreamContext, context.CancelFunc)
  203. SetError(e error)
  204. // IncrCounter State handling
  205. IncrCounter(key string, amount int) error
  206. GetCounter(key string) (int, error)
  207. PutState(key string, value interface{}) error
  208. GetState(key string) (interface{}, error)
  209. DeleteState(key string) error
  210. // ParseTemplate parse the template string with the given data
  211. ParseTemplate(template string, data interface{}) (string, error)
  212. // ParseJsonPath parse the jsonPath string with the given data
  213. ParseJsonPath(jsonPath string, data interface{}) (interface{}, error)
  214. // TransformOutput Transform output according to the properties including dataTemplate, sendSingle, fields
  215. // TransformOutput first transform data through the dataTemplate property,and then select data based on the fields property
  216. // It is recommended that you do not configure both the dataTemplate property and the fields property.
  217. // The second parameter is whether the data is transformed or just return as its json format.
  218. TransformOutput(data interface{}) ([]byte, bool, error)
  219. // Decode is set in the source according to the format.
  220. // It decodes byte array into map or map slice.
  221. Decode(data []byte) (map[string]interface{}, error)
  222. DecodeIntoList(data []byte) ([]map[string]interface{}, error)
  223. }
  224. type Operator interface {
  225. Emitter
  226. Collector
  227. Exec(StreamContext, chan<- error)
  228. GetName() string
  229. GetMetrics() [][]interface{}
  230. }
  231. type FunctionContext interface {
  232. StreamContext
  233. GetFuncId() int
  234. }
  235. type Function interface {
  236. // Validate The argument is a list of xsql.Expr
  237. Validate(args []interface{}) error
  238. // Exec Execute the function, return the result and if execution is successful.
  239. // If execution fails, return the error and false.
  240. Exec(args []interface{}, ctx FunctionContext) (interface{}, bool)
  241. // IsAggregate If this function is an aggregate function. Each parameter of an aggregate function will be a slice
  242. IsAggregate() bool
  243. }
  244. const (
  245. AtMostOnce Qos = iota
  246. AtLeastOnce
  247. ExactlyOnce
  248. )
  249. type Qos int
  250. type MessageClient interface {
  251. Subscribe(c StreamContext, subChan []TopicChannel, messageErrors chan error, params map[string]interface{}) error
  252. Publish(c StreamContext, topic string, message []byte, params map[string]interface{}) error
  253. }
  254. // TopicChannel is the data structure for subscriber
  255. type TopicChannel struct {
  256. // Topic for subscriber to filter on if any
  257. Topic string
  258. // Messages is the returned message channel for the subscriber
  259. Messages chan<- interface{}
  260. }