util.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352
  1. package common
  2. import (
  3. "archive/zip"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/benbjohnson/clock"
  7. "github.com/emqx/kuiper/xstream/api"
  8. "github.com/go-yaml/yaml"
  9. "github.com/keepeye/logrus-filename"
  10. "github.com/lestrrat-go/file-rotatelogs"
  11. "github.com/sirupsen/logrus"
  12. "io"
  13. "io/ioutil"
  14. "os"
  15. "path"
  16. "path/filepath"
  17. "strings"
  18. "time"
  19. )
  20. const (
  21. logFileName = "stream.log"
  22. etc_dir = "etc"
  23. data_dir = "data"
  24. log_dir = "log"
  25. plugins_dir = "plugins"
  26. StreamConf = "kuiper.yaml"
  27. KuiperBaseKey = "KuiperBaseKey"
  28. KuiperSyslogKey = "KuiperSyslogKey"
  29. MetaKey = "__meta"
  30. )
  31. var (
  32. Log *logrus.Logger
  33. Config *KuiperConf
  34. IsTesting bool
  35. Clock clock.Clock
  36. logFile *os.File
  37. LoadFileType = "relative"
  38. AbsoluteMapping = map[string]string{
  39. etc_dir: "/etc/kuiper",
  40. data_dir: "/var/lib/kuiper/data",
  41. log_dir: "/var/log/kuiper",
  42. plugins_dir: "/var/lib/kuiper/plugins",
  43. }
  44. )
  45. func LoadConf(confName string) ([]byte, error) {
  46. confDir, err := GetConfLoc()
  47. if err != nil {
  48. return nil, err
  49. }
  50. file := path.Join(confDir, confName)
  51. b, err := ioutil.ReadFile(file)
  52. if err != nil {
  53. return nil, err
  54. }
  55. return b, nil
  56. }
  57. type tlsConf struct {
  58. Certfile string `yaml:"certfile"`
  59. Keyfile string `yaml:"keyfile"`
  60. }
  61. type KuiperConf struct {
  62. Basic struct {
  63. Debug bool `yaml:"debug"`
  64. ConsoleLog bool `yaml:"consoleLog"`
  65. FileLog bool `yaml:"fileLog"`
  66. RotateTime int `yaml:"rotateTime"`
  67. MaxAge int `yaml:"maxAge"`
  68. Ip string `yaml:"ip"`
  69. Port int `yaml:"port"`
  70. RestIp string `yaml:"restIp"`
  71. RestPort int `yaml:"restPort"`
  72. RestTls *tlsConf `yaml:"restTls"`
  73. Prometheus bool `yaml:"prometheus"`
  74. PrometheusPort int `yaml:"prometheusPort"`
  75. PluginHosts string `yaml:"pluginHosts"`
  76. }
  77. Rule api.RuleOption
  78. Sink struct {
  79. CacheThreshold int `yaml:"cacheThreshold"`
  80. CacheTriggerCount int `yaml:"cacheTriggerCount"`
  81. DisableCache bool `yaml:"disableCache""`
  82. }
  83. }
  84. func init() {
  85. Log = logrus.New()
  86. initSyslog()
  87. filenameHook := filename.NewHook()
  88. filenameHook.Field = "file"
  89. Log.AddHook(filenameHook)
  90. Log.SetFormatter(&logrus.TextFormatter{
  91. TimestampFormat: "2006-01-02 15:04:05",
  92. DisableColors: true,
  93. FullTimestamp: true,
  94. })
  95. Log.Debugf("init with args %s", os.Args)
  96. for _, arg := range os.Args {
  97. if strings.HasPrefix(arg, "-test.") {
  98. IsTesting = true
  99. break
  100. }
  101. }
  102. if IsTesting {
  103. Log.Debugf("running in testing mode")
  104. Clock = clock.NewMock()
  105. } else {
  106. Clock = clock.New()
  107. }
  108. }
  109. func InitConf() {
  110. b, err := LoadConf(StreamConf)
  111. if err != nil {
  112. Log.Fatal(err)
  113. }
  114. kc := KuiperConf{
  115. Rule: api.RuleOption{
  116. LateTol: 1000,
  117. Concurrency: 1,
  118. BufferLength: 1024,
  119. CheckpointInterval: 300000, //5 minutes
  120. SendError: true,
  121. },
  122. }
  123. if err := yaml.Unmarshal(b, &kc); err != nil {
  124. Log.Fatal(err)
  125. } else {
  126. Config = &kc
  127. }
  128. if 0 == len(Config.Basic.Ip) {
  129. Config.Basic.Ip = "0.0.0.0"
  130. }
  131. if 0 == len(Config.Basic.RestIp) {
  132. Config.Basic.RestIp = "0.0.0.0"
  133. }
  134. if Config.Basic.Debug {
  135. Log.SetLevel(logrus.DebugLevel)
  136. }
  137. if Config.Basic.FileLog {
  138. logDir, err := GetLoc(log_dir)
  139. if err != nil {
  140. Log.Fatal(err)
  141. }
  142. file := path.Join(logDir, logFileName)
  143. logWriter, err := rotatelogs.New(
  144. file+".%Y-%m-%d_%H-%M-%S",
  145. rotatelogs.WithLinkName(file),
  146. rotatelogs.WithRotationTime(time.Hour*time.Duration(Config.Basic.RotateTime)),
  147. rotatelogs.WithMaxAge(time.Hour*time.Duration(Config.Basic.MaxAge)),
  148. )
  149. if err != nil {
  150. fmt.Println("Failed to init log file settings..." + err.Error())
  151. Log.Infof("Failed to log to file, using default stderr.")
  152. } else if Config.Basic.ConsoleLog {
  153. mw := io.MultiWriter(os.Stdout, logWriter)
  154. Log.SetOutput(mw)
  155. } else if !Config.Basic.ConsoleLog {
  156. Log.SetOutput(logWriter)
  157. }
  158. } else if Config.Basic.ConsoleLog {
  159. Log.SetOutput(os.Stdout)
  160. }
  161. }
  162. func CloseLogger() {
  163. if logFile != nil {
  164. logFile.Close()
  165. }
  166. }
  167. func GetConfLoc() (string, error) {
  168. return GetLoc(etc_dir)
  169. }
  170. func GetDataLoc() (string, error) {
  171. if IsTesting {
  172. dataDir, err := GetLoc(data_dir)
  173. if err != nil {
  174. return "", err
  175. }
  176. d := path.Join(dataDir, "test")
  177. if _, err := os.Stat(d); os.IsNotExist(err) {
  178. err = os.MkdirAll(d, 0755)
  179. if err != nil {
  180. return "", err
  181. }
  182. }
  183. return d, nil
  184. }
  185. return GetLoc(data_dir)
  186. }
  187. func GetPluginsLoc() (string, error) {
  188. return GetLoc(plugins_dir)
  189. }
  190. func absolutePath(loc string) (dir string, err error) {
  191. for relDir, absoluteDir := range AbsoluteMapping {
  192. if strings.HasPrefix(loc, relDir) {
  193. dir = strings.Replace(loc, relDir, absoluteDir, 1)
  194. break
  195. }
  196. }
  197. if 0 == len(dir) {
  198. return "", fmt.Errorf("location %s is not allowed for absolue mode", loc)
  199. }
  200. return dir, nil
  201. }
  202. // GetLoc subdir must be a relative path
  203. func GetLoc(subdir string) (string, error) {
  204. if "relative" == LoadFileType {
  205. return relativePath(subdir)
  206. }
  207. if "absolute" == LoadFileType {
  208. return absolutePath(subdir)
  209. }
  210. return "", fmt.Errorf("Unrecognized loading method.")
  211. }
  212. func relativePath(subdir string) (dir string, err error) {
  213. dir, err = os.Getwd()
  214. if err != nil {
  215. return "", err
  216. }
  217. if base := os.Getenv(KuiperBaseKey); base != "" {
  218. Log.Infof("Specified Kuiper base folder at location %s.\n", base)
  219. dir = base
  220. }
  221. confDir := path.Join(dir, subdir)
  222. if _, err := os.Stat(confDir); os.IsNotExist(err) {
  223. lastdir := dir
  224. for len(dir) > 0 {
  225. dir = filepath.Dir(dir)
  226. if lastdir == dir {
  227. break
  228. }
  229. confDir = path.Join(dir, subdir)
  230. if _, err := os.Stat(confDir); os.IsNotExist(err) {
  231. lastdir = dir
  232. continue
  233. } else {
  234. //Log.Printf("Trying to load file from %s", confDir)
  235. return confDir, nil
  236. }
  237. }
  238. } else {
  239. //Log.Printf("Trying to load file from %s", confDir)
  240. return confDir, nil
  241. }
  242. return "", fmt.Errorf("conf dir not found, please set KuiperBaseKey program environment variable correctly.")
  243. }
  244. func ProcessPath(p string) (string, error) {
  245. if abs, err := filepath.Abs(p); err != nil {
  246. return "", nil
  247. } else {
  248. if _, err := os.Stat(abs); os.IsNotExist(err) {
  249. return "", err
  250. }
  251. return abs, nil
  252. }
  253. }
  254. func ReadJsonUnmarshal(path string, ret interface{}) error {
  255. sliByte, err := ioutil.ReadFile(path)
  256. if nil != err {
  257. return err
  258. }
  259. err = json.Unmarshal(sliByte, ret)
  260. if nil != err {
  261. return err
  262. }
  263. return nil
  264. }
  265. func WriteYamlMarshal(path string, data interface{}) error {
  266. y, err := yaml.Marshal(data)
  267. if nil != err {
  268. return err
  269. }
  270. return ioutil.WriteFile(path, y, 0666)
  271. }
  272. func ReadYamlUnmarshal(path string, ret interface{}) error {
  273. sliByte, err := ioutil.ReadFile(path)
  274. if nil != err {
  275. return err
  276. }
  277. err = yaml.Unmarshal(sliByte, ret)
  278. if nil != err {
  279. return err
  280. }
  281. return nil
  282. }
  283. func UnzipTo(f *zip.File, fpath string) error {
  284. _, err := os.Stat(fpath)
  285. if f.FileInfo().IsDir() {
  286. // Make Folder
  287. if _, err := os.Stat(fpath); os.IsNotExist(err) {
  288. if err := os.MkdirAll(fpath, os.ModePerm); err != nil {
  289. return err
  290. }
  291. }
  292. return nil
  293. }
  294. if err == nil || !os.IsNotExist(err) {
  295. if err = os.RemoveAll(fpath); err != nil {
  296. return fmt.Errorf("failed to delete file %s", fpath)
  297. }
  298. }
  299. if _, err := os.Stat(filepath.Dir(fpath)); os.IsNotExist(err) {
  300. if err := os.MkdirAll(filepath.Dir(fpath), os.ModePerm); err != nil {
  301. return err
  302. }
  303. }
  304. outFile, err := os.OpenFile(fpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, f.Mode())
  305. if err != nil {
  306. return err
  307. }
  308. rc, err := f.Open()
  309. if err != nil {
  310. return err
  311. }
  312. _, err = io.Copy(outFile, rc)
  313. outFile.Close()
  314. rc.Close()
  315. return err
  316. }