stream.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. package api
  2. import (
  3. "context"
  4. )
  5. type SourceTuple interface {
  6. Message() map[string]interface{}
  7. Meta() map[string]interface{}
  8. OriginalKeys() map[string]interface{}
  9. }
  10. type DefaultSourceTuple struct {
  11. message map[string]interface{}
  12. meta map[string]interface{}
  13. origkey map[string]interface{}
  14. }
  15. func NewDefaultSourceTuple(message map[string]interface{}, meta map[string]interface{}) *DefaultSourceTuple {
  16. origkey := make(map[string]interface{})
  17. return &DefaultSourceTuple{
  18. message: message,
  19. meta: meta,
  20. origkey: origkey,
  21. }
  22. }
  23. func NewDefaultSourceTupleWithOrigKey(message map[string]interface{}, meta map[string]interface{}, origkeys map[string]interface{}) *DefaultSourceTuple {
  24. return &DefaultSourceTuple{
  25. message: message,
  26. meta: meta,
  27. origkey: origkeys,
  28. }
  29. }
  30. func (t *DefaultSourceTuple) Message() map[string]interface{} {
  31. return t.message
  32. }
  33. func (t *DefaultSourceTuple) Meta() map[string]interface{} {
  34. return t.meta
  35. }
  36. func (t *DefaultSourceTuple) OriginalKeys() map[string]interface{} {
  37. return t.origkey
  38. }
  39. type Logger interface {
  40. Debug(args ...interface{})
  41. Info(args ...interface{})
  42. Warn(args ...interface{})
  43. Error(args ...interface{})
  44. Debugln(args ...interface{})
  45. Infoln(args ...interface{})
  46. Warnln(args ...interface{})
  47. Errorln(args ...interface{})
  48. Debugf(format string, args ...interface{})
  49. Infof(format string, args ...interface{})
  50. Warnf(format string, args ...interface{})
  51. Errorf(format string, args ...interface{})
  52. }
  53. type Closable interface {
  54. Close(ctx StreamContext) error
  55. }
  56. type Source interface {
  57. //Should be sync function for normal case. The container will run it in go func
  58. Open(ctx StreamContext, consumer chan<- SourceTuple, errCh chan<- error)
  59. //Called during initialization. Configure the source with the data source(e.g. topic for mqtt) and the properties
  60. //read from the yaml
  61. Configure(datasource string, props map[string]interface{}) error
  62. Closable
  63. }
  64. type Sink interface {
  65. //Should be sync function for normal case. The container will run it in go func
  66. Open(ctx StreamContext) error
  67. //Called during initialization. Configure the sink with the properties from rule action definition
  68. Configure(props map[string]interface{}) error
  69. //Called when each row of data has transferred to this sink
  70. Collect(ctx StreamContext, data interface{}) error
  71. Closable
  72. }
  73. type Emitter interface {
  74. AddOutput(chan<- interface{}, string) error
  75. }
  76. type Collector interface {
  77. GetInput() (chan<- interface{}, string)
  78. }
  79. type TopNode interface {
  80. GetName() string
  81. }
  82. type Rule struct {
  83. Id string `json:"id"`
  84. Sql string `json:"sql"`
  85. Actions []map[string]interface{} `json:"actions"`
  86. Options map[string]interface{} `json:"options"`
  87. }
  88. type StreamContext interface {
  89. context.Context
  90. GetLogger() Logger
  91. GetRuleId() string
  92. GetOpId() string
  93. GetInstanceId() int
  94. WithMeta(ruleId string, opId string) StreamContext
  95. WithInstance(instanceId int) StreamContext
  96. WithCancel() (StreamContext, context.CancelFunc)
  97. SetError(e error)
  98. }
  99. type Operator interface {
  100. Emitter
  101. Collector
  102. Exec(StreamContext, chan<- error)
  103. GetName() string
  104. GetMetrics() [][]interface{}
  105. }
  106. type Function interface {
  107. //The argument is a list of xsql.Expr
  108. Validate(args []interface{}) error
  109. //Execute the function, return the result and if execution is successful.
  110. //If execution fails, return the error and false.
  111. Exec(args []interface{}) (interface{}, bool)
  112. //If this function is an aggregate function. Each parameter of an aggregate function will be a slice
  113. IsAggregate() bool
  114. }