|
- // Copyright 2021-2023 EMQ Technologies Co., Ltd.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package conf
- import (
- "errors"
- "fmt"
- "io"
- "os"
- "path"
- "time"
- "github.com/lestrrat-go/file-rotatelogs"
- "github.com/sirupsen/logrus"
- "github.com/lf-edge/ekuiper/pkg/api"
- )
- const ConfFileName = "kuiper.yaml"
- var (
- Config *KuiperConf
- IsTesting bool
- )
- type tlsConf struct {
- Certfile string `yaml:"certfile"`
- Keyfile string `yaml:"keyfile"`
- }
- type SinkConf struct {
- MemoryCacheThreshold int `json:"memoryCacheThreshold" yaml:"memoryCacheThreshold"`
- MaxDiskCache int `json:"maxDiskCache" yaml:"maxDiskCache"`
- BufferPageSize int `json:"bufferPageSize" yaml:"bufferPageSize"`
- EnableCache bool `json:"enableCache" yaml:"enableCache"`
- ResendInterval int `json:"resendInterval" yaml:"resendInterval"`
- CleanCacheAtStop bool `json:"cleanCacheAtStop" yaml:"cleanCacheAtStop"`
- }
- // Validate the configuration and reset to the default value for invalid values.
- func (sc *SinkConf) Validate() error {
- var errs error
- if sc.MemoryCacheThreshold < 0 {
- sc.MemoryCacheThreshold = 1024
- Log.Warnf("memoryCacheThreshold is less than 0, set to 1024")
- errs = errors.Join(errs, errors.New("memoryCacheThreshold:memoryCacheThreshold must be positive"))
- }
- if sc.MaxDiskCache < 0 {
- sc.MaxDiskCache = 1024000
- Log.Warnf("maxDiskCache is less than 0, set to 1024000")
- errs = errors.Join(errs, errors.New("maxDiskCache:maxDiskCache must be positive"))
- }
- if sc.BufferPageSize <= 0 {
- sc.BufferPageSize = 256
- Log.Warnf("bufferPageSize is less than or equal to 0, set to 256")
- errs = errors.Join(errs, errors.New("bufferPageSize:bufferPageSize must be positive"))
- }
- if sc.ResendInterval < 0 {
- sc.ResendInterval = 0
- Log.Warnf("resendInterval is less than 0, set to 0")
- errs = errors.Join(errs, errors.New("resendInterval:resendInterval must be positive"))
- }
- if sc.BufferPageSize > sc.MemoryCacheThreshold {
- sc.MemoryCacheThreshold = sc.BufferPageSize
- Log.Warnf("memoryCacheThreshold is less than bufferPageSize, set to %d", sc.BufferPageSize)
- errs = errors.Join(errs, errors.New("memoryCacheThresholdTooSmall:memoryCacheThreshold must be greater than or equal to bufferPageSize"))
- }
- if sc.MemoryCacheThreshold%sc.BufferPageSize != 0 {
- sc.MemoryCacheThreshold = sc.BufferPageSize * (sc.MemoryCacheThreshold/sc.BufferPageSize + 1)
- Log.Warnf("memoryCacheThreshold is not a multiple of bufferPageSize, set to %d", sc.MemoryCacheThreshold)
- errs = errors.Join(errs, errors.New("memoryCacheThresholdNotMultiple:memoryCacheThreshold must be a multiple of bufferPageSize"))
- }
- if sc.BufferPageSize > sc.MaxDiskCache {
- sc.MaxDiskCache = sc.BufferPageSize
- Log.Warnf("maxDiskCache is less than bufferPageSize, set to %d", sc.BufferPageSize)
- errs = errors.Join(errs, errors.New("maxDiskCacheTooSmall:maxDiskCache must be greater than bufferPageSize"))
- }
- if sc.MaxDiskCache%sc.BufferPageSize != 0 {
- sc.MaxDiskCache = sc.BufferPageSize * (sc.MaxDiskCache/sc.BufferPageSize + 1)
- Log.Warnf("maxDiskCache is not a multiple of bufferPageSize, set to %d", sc.MaxDiskCache)
- errs = errors.Join(errs, errors.New("maxDiskCacheNotMultiple:maxDiskCache must be a multiple of bufferPageSize"))
- }
- return errs
- }
- type SourceConf struct {
- HttpServerIp string `json:"httpServerIp" yaml:"httpServerIp"`
- HttpServerPort int `json:"httpServerPort" yaml:"httpServerPort"`
- HttpServerTls *tlsConf `json:"httpServerTls" yaml:"httpServerTls"`
- }
- func (sc *SourceConf) Validate() error {
- var errs error
- if sc.HttpServerIp == "" {
- sc.HttpServerIp = "0.0.0.0"
- }
- if sc.HttpServerPort <= 0 || sc.HttpServerPort > 65535 {
- Log.Warnf("invalid source.httpServerPort configuration %d, set to 10081", sc.HttpServerPort)
- errs = errors.Join(errs, errors.New("invalidHttpServerPort:httpServerPort must between 0 and 65535"))
- sc.HttpServerPort = 10081
- }
- return errs
- }
- type SQLConf struct {
- MaxConnections int `yaml:"maxConnections"`
- }
- type KuiperConf struct {
- Basic struct {
- Debug bool `yaml:"debug"`
- ConsoleLog bool `yaml:"consoleLog"`
- FileLog bool `yaml:"fileLog"`
- RotateTime int `yaml:"rotateTime"`
- MaxAge int `yaml:"maxAge"`
- Ip string `yaml:"ip"`
- Port int `yaml:"port"`
- RestIp string `yaml:"restIp"`
- RestPort int `yaml:"restPort"`
- RestTls *tlsConf `yaml:"restTls"`
- Prometheus bool `yaml:"prometheus"`
- PrometheusPort int `yaml:"prometheusPort"`
- PluginHosts string `yaml:"pluginHosts"`
- Authentication bool `yaml:"authentication"`
- IgnoreCase bool `yaml:"ignoreCase"`
- SQLConf *SQLConf `yaml:"sql"`
- }
- Rule api.RuleOption
- Sink *SinkConf
- Source *SourceConf
- Store struct {
- Type string `yaml:"type"`
- ExtStateType string `yaml:"extStateType"`
- Redis struct {
- Host string `yaml:"host"`
- Port int `yaml:"port"`
- Password string `yaml:"password"`
- Timeout int `yaml:"timeout"`
- ConnectionSelector string `yaml:"connectionSelector"`
- }
- Sqlite struct {
- Name string `yaml:"name"`
- }
- }
- Portable struct {
- PythonBin string `yaml:"pythonBin"`
- InitTimeout int `yaml:"initTimeout"`
- }
- }
- func InitConf() {
- cpath, err := GetConfLoc()
- if err != nil {
- panic(err)
- }
- kc := KuiperConf{
- Rule: api.RuleOption{
- LateTol: 1000,
- Concurrency: 1,
- BufferLength: 1024,
- CheckpointInterval: 300000, // 5 minutes
- SendError: true,
- Restart: &api.RestartStrategy{
- Attempts: 0,
- Delay: 1000,
- Multiplier: 2,
- MaxDelay: 30000,
- JitterFactor: 0.1,
- },
- },
- }
- err = LoadConfigFromPath(path.Join(cpath, ConfFileName), &kc)
- if err != nil {
- Log.Fatal(err)
- panic(err)
- }
- Config = &kc
- if 0 == len(Config.Basic.Ip) {
- Config.Basic.Ip = "0.0.0.0"
- }
- if 0 == len(Config.Basic.RestIp) {
- Config.Basic.RestIp = "0.0.0.0"
- }
- if Config.Basic.Debug {
- Log.SetLevel(logrus.DebugLevel)
- }
- if Config.Basic.FileLog {
- logDir, err := GetLoc(logDir)
- if err != nil {
- Log.Fatal(err)
- }
- file := path.Join(logDir, logFileName)
- logWriter, err := rotatelogs.New(
- file+".%Y-%m-%d_%H-%M-%S",
- rotatelogs.WithLinkName(file),
- rotatelogs.WithRotationTime(time.Hour*time.Duration(Config.Basic.RotateTime)),
- rotatelogs.WithMaxAge(time.Hour*time.Duration(Config.Basic.MaxAge)),
- )
- if err != nil {
- fmt.Println("Failed to init log file settings..." + err.Error())
- Log.Infof("Failed to log to file, using default stderr.")
- } else if Config.Basic.ConsoleLog {
- mw := io.MultiWriter(os.Stdout, logWriter)
- Log.SetOutput(mw)
- } else if !Config.Basic.ConsoleLog {
- Log.SetOutput(logWriter)
- }
- } else if Config.Basic.ConsoleLog {
- Log.SetOutput(os.Stdout)
- }
- if Config.Store.Type == "redis" && Config.Store.Redis.ConnectionSelector != "" {
- if err := RedisStorageConSelectorApply(Config.Store.Redis.ConnectionSelector, Config); err != nil {
- Log.Fatal(err)
- }
- }
- if Config.Store.ExtStateType == "" {
- Config.Store.ExtStateType = "sqlite"
- }
- if Config.Portable.PythonBin == "" {
- Config.Portable.PythonBin = "python"
- }
- if Config.Portable.InitTimeout <= 0 {
- Config.Portable.InitTimeout = 5000
- }
- if Config.Source == nil {
- Config.Source = &SourceConf{}
- }
- _ = Config.Source.Validate()
- if Config.Sink == nil {
- Config.Sink = &SinkConf{}
- }
- _ = Config.Sink.Validate()
- _ = ValidateRuleOption(&Config.Rule)
- }
- func ValidateRuleOption(option *api.RuleOption) error {
- var errs error
- if option.CheckpointInterval < 0 {
- option.CheckpointInterval = 0
- Log.Warnf("checkpointInterval is negative, set to 0")
- errs = errors.Join(errs, errors.New("invalidCheckpointInterval:checkpointInterval must be greater than 0"))
- }
- if option.Concurrency < 0 {
- option.Concurrency = 1
- Log.Warnf("concurrency is negative, set to 1")
- errs = errors.Join(errs, errors.New("invalidConcurrency:concurrency must be greater than 0"))
- }
- if option.BufferLength < 0 {
- option.BufferLength = 1024
- Log.Warnf("bufferLength is negative, set to 1024")
- errs = errors.Join(errs, errors.New("invalidBufferLength:bufferLength must be greater than 0"))
- }
- if option.LateTol < 0 {
- option.LateTol = 1000
- Log.Warnf("lateTol is negative, set to 1000")
- errs = errors.Join(errs, errors.New("invalidLateTol:lateTol must be greater than 0"))
- }
- if option.Restart != nil {
- if option.Restart.Multiplier <= 0 {
- option.Restart.Multiplier = 2
- Log.Warnf("restart multiplier is negative, set to 2")
- errs = errors.Join(errs, errors.New("invalidRestartMultiplier:restart multiplier must be greater than 0"))
- }
- if option.Restart.Attempts < 0 {
- option.Restart.Attempts = 0
- Log.Warnf("restart attempts is negative, set to 0")
- errs = errors.Join(errs, errors.New("invalidRestartAttempts:restart attempts must be greater than 0"))
- }
- if option.Restart.Delay <= 0 {
- option.Restart.Delay = 1000
- Log.Warnf("restart delay is negative, set to 1000")
- errs = errors.Join(errs, errors.New("invalidRestartDelay:restart delay must be greater than 0"))
- }
- if option.Restart.MaxDelay <= 0 {
- option.Restart.MaxDelay = option.Restart.Delay
- Log.Warnf("restart maxDelay is negative, set to %d", option.Restart.Delay)
- errs = errors.Join(errs, errors.New("invalidRestartMaxDelay:restart maxDelay must be greater than 0"))
- }
- if option.Restart.JitterFactor <= 0 || option.Restart.JitterFactor >= 1 {
- option.Restart.JitterFactor = 0.1
- Log.Warnf("restart jitterFactor must between 0 and 1, set to 0.1")
- errs = errors.Join(errs, errors.New("invalidRestartJitterFactor:restart jitterFactor must between [0, 1)"))
- }
- }
- return errs
- }
- func init() {
- InitLogger()
- InitClock()
- }
|