conf.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. package conf
  2. import (
  3. "fmt"
  4. "github.com/emqx/kuiper/pkg/api"
  5. "github.com/lestrrat-go/file-rotatelogs"
  6. "github.com/sirupsen/logrus"
  7. "gopkg.in/yaml.v3"
  8. "io"
  9. "io/ioutil"
  10. "os"
  11. "path"
  12. "time"
  13. )
  14. const StreamConf = "kuiper.yaml"
  15. var (
  16. Config *KuiperConf
  17. IsTesting bool
  18. )
  19. func LoadConf(confName string) ([]byte, error) {
  20. confDir, err := GetConfLoc()
  21. if err != nil {
  22. return nil, err
  23. }
  24. file := path.Join(confDir, confName)
  25. b, err := ioutil.ReadFile(file)
  26. if err != nil {
  27. return nil, err
  28. }
  29. return b, nil
  30. }
  31. type tlsConf struct {
  32. Certfile string `yaml:"certfile"`
  33. Keyfile string `yaml:"keyfile"`
  34. }
  35. type KuiperConf struct {
  36. Basic struct {
  37. Debug bool `yaml:"debug"`
  38. ConsoleLog bool `yaml:"consoleLog"`
  39. FileLog bool `yaml:"fileLog"`
  40. RotateTime int `yaml:"rotateTime"`
  41. MaxAge int `yaml:"maxAge"`
  42. Ip string `yaml:"ip"`
  43. Port int `yaml:"port"`
  44. RestIp string `yaml:"restIp"`
  45. RestPort int `yaml:"restPort"`
  46. RestTls *tlsConf `yaml:"restTls"`
  47. Prometheus bool `yaml:"prometheus"`
  48. PrometheusPort int `yaml:"prometheusPort"`
  49. PluginHosts string `yaml:"pluginHosts"`
  50. }
  51. Rule api.RuleOption
  52. Sink struct {
  53. CacheThreshold int `yaml:"cacheThreshold"`
  54. CacheTriggerCount int `yaml:"cacheTriggerCount"`
  55. DisableCache bool `yaml:"disableCache""`
  56. }
  57. }
  58. func InitConf() {
  59. b, err := LoadConf(StreamConf)
  60. if err != nil {
  61. Log.Fatal(err)
  62. }
  63. kc := KuiperConf{
  64. Rule: api.RuleOption{
  65. LateTol: 1000,
  66. Concurrency: 1,
  67. BufferLength: 1024,
  68. CheckpointInterval: 300000, //5 minutes
  69. SendError: true,
  70. },
  71. }
  72. if err := yaml.Unmarshal(b, &kc); err != nil {
  73. Log.Fatal(err)
  74. } else {
  75. Config = &kc
  76. }
  77. if 0 == len(Config.Basic.Ip) {
  78. Config.Basic.Ip = "0.0.0.0"
  79. }
  80. if 0 == len(Config.Basic.RestIp) {
  81. Config.Basic.RestIp = "0.0.0.0"
  82. }
  83. if Config.Basic.Debug {
  84. Log.SetLevel(logrus.DebugLevel)
  85. }
  86. if Config.Basic.FileLog {
  87. logDir, err := GetLoc(logDir)
  88. if err != nil {
  89. Log.Fatal(err)
  90. }
  91. file := path.Join(logDir, logFileName)
  92. logWriter, err := rotatelogs.New(
  93. file+".%Y-%m-%d_%H-%M-%S",
  94. rotatelogs.WithLinkName(file),
  95. rotatelogs.WithRotationTime(time.Hour*time.Duration(Config.Basic.RotateTime)),
  96. rotatelogs.WithMaxAge(time.Hour*time.Duration(Config.Basic.MaxAge)),
  97. )
  98. if err != nil {
  99. fmt.Println("Failed to init log file settings..." + err.Error())
  100. Log.Infof("Failed to log to file, using default stderr.")
  101. } else if Config.Basic.ConsoleLog {
  102. mw := io.MultiWriter(os.Stdout, logWriter)
  103. Log.SetOutput(mw)
  104. } else if !Config.Basic.ConsoleLog {
  105. Log.SetOutput(logWriter)
  106. }
  107. } else if Config.Basic.ConsoleLog {
  108. Log.SetOutput(os.Stdout)
  109. }
  110. }
  111. func init() {
  112. InitLogger()
  113. InitClock()
  114. }