util.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. package common
  2. import (
  3. "archive/zip"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/benbjohnson/clock"
  7. "github.com/emqx/kuiper/xstream/api"
  8. "github.com/go-yaml/yaml"
  9. "github.com/keepeye/logrus-filename"
  10. "github.com/lestrrat-go/file-rotatelogs"
  11. "github.com/sirupsen/logrus"
  12. "io"
  13. "io/ioutil"
  14. "os"
  15. "path"
  16. "path/filepath"
  17. "strings"
  18. "time"
  19. )
  20. const (
  21. logFileName = "stream.log"
  22. etc_dir = "/etc/"
  23. data_dir = "/data/"
  24. log_dir = "/log/"
  25. StreamConf = "kuiper.yaml"
  26. KuiperBaseKey = "KuiperBaseKey"
  27. KuiperSyslogKey = "KuiperSyslogKey"
  28. MetaKey = "__meta"
  29. )
  30. var (
  31. Log *logrus.Logger
  32. Config *KuiperConf
  33. IsTesting bool
  34. Clock clock.Clock
  35. logFile *os.File
  36. LoadFileType = "relative"
  37. )
  38. func LoadConf(confName string) ([]byte, error) {
  39. confDir, err := GetConfLoc()
  40. if err != nil {
  41. return nil, err
  42. }
  43. file := path.Join(confDir, confName)
  44. b, err := ioutil.ReadFile(file)
  45. if err != nil {
  46. return nil, err
  47. }
  48. return b, nil
  49. }
  50. type tlsConf struct {
  51. Certfile string `yaml:"certfile"`
  52. Keyfile string `yaml:"keyfile"`
  53. }
  54. type KuiperConf struct {
  55. Basic struct {
  56. Debug bool `yaml:"debug"`
  57. ConsoleLog bool `yaml:"consoleLog"`
  58. FileLog bool `yaml:"fileLog"`
  59. RotateTime int `yaml:"rotateTime"`
  60. MaxAge int `yaml:"maxAge"`
  61. Ip string `yaml:"ip"`
  62. Port int `yaml:"port"`
  63. RestIp string `yaml:"restIp"`
  64. RestPort int `yaml:"restPort"`
  65. RestTls *tlsConf `yaml:"restTls"`
  66. Prometheus bool `yaml:"prometheus"`
  67. PrometheusPort int `yaml:"prometheusPort"`
  68. PluginHosts string `yaml:"pluginHosts"`
  69. }
  70. Rule api.RuleOption
  71. Sink struct {
  72. CacheThreshold int `yaml:"cacheThreshold"`
  73. CacheTriggerCount int `yaml:"cacheTriggerCount"`
  74. DisableCache bool `yaml:"disableCache""`
  75. }
  76. }
  77. func init() {
  78. Log = logrus.New()
  79. initSyslog()
  80. filenameHook := filename.NewHook()
  81. filenameHook.Field = "file"
  82. Log.AddHook(filenameHook)
  83. Log.SetFormatter(&logrus.TextFormatter{
  84. TimestampFormat: "2006-01-02 15:04:05",
  85. DisableColors: true,
  86. FullTimestamp: true,
  87. })
  88. Log.Debugf("init with args %s", os.Args)
  89. for _, arg := range os.Args {
  90. if strings.HasPrefix(arg, "-test.") {
  91. IsTesting = true
  92. break
  93. }
  94. }
  95. if IsTesting {
  96. Log.Debugf("running in testing mode")
  97. Clock = clock.NewMock()
  98. } else {
  99. Clock = clock.New()
  100. }
  101. }
  102. func InitConf() {
  103. b, err := LoadConf(StreamConf)
  104. if err != nil {
  105. Log.Fatal(err)
  106. }
  107. kc := KuiperConf{
  108. Rule: api.RuleOption{
  109. LateTol: 1000,
  110. Concurrency: 1,
  111. BufferLength: 1024,
  112. CheckpointInterval: 300000, //5 minutes
  113. SendError: true,
  114. },
  115. }
  116. if err := yaml.Unmarshal(b, &kc); err != nil {
  117. Log.Fatal(err)
  118. } else {
  119. Config = &kc
  120. }
  121. if 0 == len(Config.Basic.Ip) {
  122. Config.Basic.Ip = "0.0.0.0"
  123. }
  124. if 0 == len(Config.Basic.RestIp) {
  125. Config.Basic.RestIp = "0.0.0.0"
  126. }
  127. if Config.Basic.Debug {
  128. Log.SetLevel(logrus.DebugLevel)
  129. }
  130. if Config.Basic.FileLog {
  131. logDir, err := GetLoc(log_dir)
  132. if err != nil {
  133. Log.Fatal(err)
  134. }
  135. file := path.Join(logDir, logFileName)
  136. logWriter, err := rotatelogs.New(
  137. file+".%Y-%m-%d_%H-%M-%S",
  138. rotatelogs.WithLinkName(file),
  139. rotatelogs.WithRotationTime(time.Hour*time.Duration(Config.Basic.RotateTime)),
  140. rotatelogs.WithMaxAge(time.Hour*time.Duration(Config.Basic.MaxAge)),
  141. )
  142. if err != nil {
  143. fmt.Println("Failed to init log file settings..." + err.Error())
  144. Log.Infof("Failed to log to file, using default stderr.")
  145. } else if Config.Basic.ConsoleLog {
  146. mw := io.MultiWriter(os.Stdout, logWriter)
  147. Log.SetOutput(mw)
  148. } else if !Config.Basic.ConsoleLog {
  149. Log.SetOutput(logWriter)
  150. }
  151. } else if Config.Basic.ConsoleLog {
  152. Log.SetOutput(os.Stdout)
  153. }
  154. }
  155. func CloseLogger() {
  156. if logFile != nil {
  157. logFile.Close()
  158. }
  159. }
  160. func GetConfLoc() (string, error) {
  161. return GetLoc(etc_dir)
  162. }
  163. func GetDataLoc() (string, error) {
  164. if IsTesting {
  165. dataDir, err := GetLoc(data_dir)
  166. if err != nil {
  167. return "", err
  168. }
  169. d := path.Join(path.Dir(dataDir), "test")
  170. if _, err := os.Stat(d); os.IsNotExist(err) {
  171. err = os.MkdirAll(d, 0755)
  172. if err != nil {
  173. return "", err
  174. }
  175. }
  176. return d, nil
  177. }
  178. return GetLoc(data_dir)
  179. }
  180. func absolutePath(subdir string) (dir string, err error) {
  181. subdir = strings.TrimLeft(subdir, `/`)
  182. subdir = strings.TrimRight(subdir, `/`)
  183. switch subdir {
  184. case "etc":
  185. dir = "/etc/kuiper/"
  186. break
  187. case "data":
  188. dir = "/var/lib/kuiper/data/"
  189. break
  190. case "log":
  191. dir = "/var/log/kuiper/"
  192. break
  193. case "plugins":
  194. dir = "/var/lib/kuiper/plugins/"
  195. break
  196. }
  197. if 0 == len(dir) {
  198. return "", fmt.Errorf("no find such file : %s", subdir)
  199. }
  200. return dir, nil
  201. }
  202. func GetLoc(subdir string) (string, error) {
  203. if "relative" == LoadFileType {
  204. return relativePath(subdir)
  205. }
  206. if "absolute" == LoadFileType {
  207. return absolutePath(subdir)
  208. }
  209. return "", fmt.Errorf("Unrecognized loading method.")
  210. }
  211. func relativePath(subdir string) (dir string, err error) {
  212. dir, err = os.Getwd()
  213. if err != nil {
  214. return "", err
  215. }
  216. if base := os.Getenv(KuiperBaseKey); base != "" {
  217. Log.Infof("Specified Kuiper base folder at location %s.\n", base)
  218. dir = base
  219. }
  220. confDir := dir + subdir
  221. if _, err := os.Stat(confDir); os.IsNotExist(err) {
  222. lastdir := dir
  223. for len(dir) > 0 {
  224. dir = filepath.Dir(dir)
  225. if lastdir == dir {
  226. break
  227. }
  228. confDir = dir + subdir
  229. if _, err := os.Stat(confDir); os.IsNotExist(err) {
  230. lastdir = dir
  231. continue
  232. } else {
  233. //Log.Printf("Trying to load file from %s", confDir)
  234. return confDir, nil
  235. }
  236. }
  237. } else {
  238. //Log.Printf("Trying to load file from %s", confDir)
  239. return confDir, nil
  240. }
  241. return "", fmt.Errorf("conf dir not found, please set KuiperBaseKey program environment variable correctly.")
  242. }
  243. func ProcessPath(p string) (string, error) {
  244. if abs, err := filepath.Abs(p); err != nil {
  245. return "", nil
  246. } else {
  247. if _, err := os.Stat(abs); os.IsNotExist(err) {
  248. return "", err
  249. }
  250. return abs, nil
  251. }
  252. }
  253. func ReadJsonUnmarshal(path string, ret interface{}) error {
  254. sliByte, err := ioutil.ReadFile(path)
  255. if nil != err {
  256. return err
  257. }
  258. err = json.Unmarshal(sliByte, ret)
  259. if nil != err {
  260. return err
  261. }
  262. return nil
  263. }
  264. func WriteYamlMarshal(path string, data interface{}) error {
  265. y, err := yaml.Marshal(data)
  266. if nil != err {
  267. return err
  268. }
  269. return ioutil.WriteFile(path, y, 0666)
  270. }
  271. func ReadYamlUnmarshal(path string, ret interface{}) error {
  272. sliByte, err := ioutil.ReadFile(path)
  273. if nil != err {
  274. return err
  275. }
  276. err = yaml.Unmarshal(sliByte, ret)
  277. if nil != err {
  278. return err
  279. }
  280. return nil
  281. }
  282. func UnzipTo(f *zip.File, fpath string) error {
  283. _, err := os.Stat(fpath)
  284. if f.FileInfo().IsDir() {
  285. // Make Folder
  286. if _, err := os.Stat(fpath); os.IsNotExist(err) {
  287. if err := os.MkdirAll(fpath, os.ModePerm); err != nil {
  288. return err
  289. }
  290. }
  291. return nil
  292. }
  293. if err == nil || !os.IsNotExist(err) {
  294. if err = os.RemoveAll(fpath); err != nil {
  295. return fmt.Errorf("failed to delete file %s", fpath)
  296. }
  297. }
  298. if _, err := os.Stat(filepath.Dir(fpath)); os.IsNotExist(err) {
  299. if err := os.MkdirAll(filepath.Dir(fpath), os.ModePerm); err != nil {
  300. return err
  301. }
  302. }
  303. outFile, err := os.OpenFile(fpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, f.Mode())
  304. if err != nil {
  305. return err
  306. }
  307. rc, err := f.Open()
  308. if err != nil {
  309. return err
  310. }
  311. _, err = io.Copy(outFile, rc)
  312. outFile.Close()
  313. rc.Close()
  314. return err
  315. }