log_sink.go 839 B

12345678910111213141516171819202122232425262728293031323334353637
  1. package sinks
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/xstream/api"
  5. "github.com/emqx/kuiper/xstream/collectors"
  6. "sync"
  7. "time"
  8. )
  9. // log action, no properties now
  10. // example: {"log":{}}
  11. func NewLogSink() *collectors.FuncCollector {
  12. return collectors.Func(func(ctx api.StreamContext, data interface{}) error {
  13. log := ctx.GetLogger()
  14. log.Infof("sink result for rule %s: %s", ctx.GetRuleId(), data)
  15. return nil
  16. })
  17. }
  18. type QueryResult struct {
  19. Results []string
  20. LastFetch time.Time
  21. Mux sync.Mutex
  22. }
  23. var QR = &QueryResult{LastFetch: time.Now()}
  24. func NewLogSinkToMemory() *collectors.FuncCollector {
  25. QR.Results = make([]string, 10)
  26. return collectors.Func(func(ctx api.StreamContext, data interface{}) error {
  27. QR.Mux.Lock()
  28. QR.Results = append(QR.Results, fmt.Sprintf("%s", data))
  29. QR.Mux.Unlock()
  30. return nil
  31. })
  32. }