12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576 |
- package api
- import (
- "context"
- "github.com/sirupsen/logrus"
- )
- //The function to call when data is emitted by the source.
- type ConsumeFunc func(message map[string]interface{}, metadata map[string]interface{})
- type Closable interface {
- Close(ctx StreamContext) error
- }
- type Source interface {
- //Should be sync function for normal case. The container will run it in go func
- Open(ctx StreamContext, consume ConsumeFunc) error
- //Called during initialization. Configure the source with the data source(e.g. topic for mqtt) and the properties
- //read from the yaml
- Configure(datasource string, props map[string]interface{}) error
- Closable
- }
- type Sink interface {
- //Should be sync function for normal case. The container will run it in go func
- Open(ctx StreamContext) error
- //Called during initialization. Configure the sink with the properties from rule action definition
- Configure(props map[string]interface{}) error
- //Called when each row of data has transferred to this sink
- Collect(ctx StreamContext, data interface{}) error
- Closable
- }
- type Emitter interface {
- AddOutput(chan<- interface{}, string) error
- }
- type Collector interface {
- GetInput() (chan<- interface{}, string)
- }
- type TopNode interface {
- GetName() string
- }
- type Rule struct {
- Id string `json:"id"`
- Sql string `json:"sql"`
- Actions []map[string]interface{} `json:"actions"`
- Options map[string]interface{} `json:"options"`
- }
- type StreamContext interface {
- context.Context
- GetLogger() *logrus.Entry
- GetRuleId() string
- GetOpId() string
- WithMeta(ruleId string, opId string) StreamContext
- WithCancel() (StreamContext, context.CancelFunc)
- }
- type Operator interface {
- Emitter
- Collector
- Exec(StreamContext, chan<- error)
- GetName() string
- }
- type Function interface {
- //The argument is a list of xsql.Expr
- Validate(args []interface{}) error
- //Execute the function, return the result and if execution is successful.
- //If execution fails, return the error and false.
- Exec(args []interface{}) (interface{}, bool)
- }
|