util.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398
  1. package common
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/benbjohnson/clock"
  7. "github.com/emqx/kuiper/xstream/api"
  8. "github.com/go-yaml/yaml"
  9. "github.com/keepeye/logrus-filename"
  10. "github.com/lestrrat-go/file-rotatelogs"
  11. "github.com/sirupsen/logrus"
  12. "io"
  13. "io/ioutil"
  14. "os"
  15. "path"
  16. "path/filepath"
  17. "sort"
  18. "strings"
  19. "sync"
  20. "time"
  21. )
  22. const (
  23. logFileName = "stream.log"
  24. etc_dir = "/etc/"
  25. data_dir = "/data/"
  26. log_dir = "/log/"
  27. StreamConf = "kuiper.yaml"
  28. KuiperBaseKey = "KuiperBaseKey"
  29. KuiperSyslogKey = "KuiperSyslogKey"
  30. MetaKey = "__meta"
  31. )
  32. var (
  33. Log *logrus.Logger
  34. Config *KuiperConf
  35. IsTesting bool
  36. Clock clock.Clock
  37. logFile *os.File
  38. LoadFileType = "relative"
  39. )
  40. func LoadConf(confName string) ([]byte, error) {
  41. confDir, err := GetConfLoc()
  42. if err != nil {
  43. return nil, err
  44. }
  45. file := path.Join(confDir, confName)
  46. b, err := ioutil.ReadFile(file)
  47. if err != nil {
  48. return nil, err
  49. }
  50. return b, nil
  51. }
  52. type tlsConf struct {
  53. Certfile string `yaml:"certfile"`
  54. Keyfile string `yaml:"keyfile"`
  55. }
  56. type KuiperConf struct {
  57. Basic struct {
  58. Debug bool `yaml:"debug"`
  59. ConsoleLog bool `yaml:"consoleLog"`
  60. FileLog bool `yaml:"fileLog"`
  61. RotateTime int `yaml:"rotateTime"`
  62. MaxAge int `yaml:"maxAge"`
  63. Ip string `yaml:"ip"`
  64. Port int `yaml:"port"`
  65. RestIp string `yaml:"restIp"`
  66. RestPort int `yaml:"restPort"`
  67. RestTls *tlsConf `yaml:"restTls"`
  68. Prometheus bool `yaml:"prometheus"`
  69. PrometheusPort int `yaml:"prometheusPort"`
  70. PluginHosts string `yaml:"pluginHosts"`
  71. }
  72. Rule api.RuleOption
  73. Sink struct {
  74. CacheThreshold int `yaml:"cacheThreshold"`
  75. CacheTriggerCount int `yaml:"cacheTriggerCount"`
  76. DisableCache bool `yaml:"disableCache""`
  77. }
  78. }
  79. func init() {
  80. Log = logrus.New()
  81. initSyslog()
  82. filenameHook := filename.NewHook()
  83. filenameHook.Field = "file"
  84. Log.AddHook(filenameHook)
  85. Log.SetFormatter(&logrus.TextFormatter{
  86. TimestampFormat: "2006-01-02 15:04:05",
  87. DisableColors: true,
  88. FullTimestamp: true,
  89. })
  90. Log.Debugf("init with args %s", os.Args)
  91. for _, arg := range os.Args {
  92. if strings.HasPrefix(arg, "-test.") {
  93. IsTesting = true
  94. break
  95. }
  96. }
  97. if IsTesting {
  98. Log.Debugf("running in testing mode")
  99. Clock = clock.NewMock()
  100. } else {
  101. Clock = clock.New()
  102. }
  103. }
  104. func InitConf() {
  105. b, err := LoadConf(StreamConf)
  106. if err != nil {
  107. Log.Fatal(err)
  108. }
  109. kc := KuiperConf{
  110. Rule: api.RuleOption{
  111. LateTol: 1000,
  112. Concurrency: 1,
  113. BufferLength: 1024,
  114. CheckpointInterval: 300000, //5 minutes
  115. SendError: true,
  116. },
  117. }
  118. if err := yaml.Unmarshal(b, &kc); err != nil {
  119. Log.Fatal(err)
  120. } else {
  121. Config = &kc
  122. }
  123. if 0 == len(Config.Basic.Ip) {
  124. Config.Basic.Ip = "0.0.0.0"
  125. }
  126. if 0 == len(Config.Basic.RestIp) {
  127. Config.Basic.RestIp = "0.0.0.0"
  128. }
  129. if Config.Basic.Debug {
  130. Log.SetLevel(logrus.DebugLevel)
  131. }
  132. if Config.Basic.FileLog {
  133. logDir, err := GetLoc(log_dir)
  134. if err != nil {
  135. Log.Fatal(err)
  136. }
  137. file := path.Join(logDir, logFileName)
  138. logWriter, err := rotatelogs.New(
  139. file+".%Y-%m-%d_%H-%M-%S",
  140. rotatelogs.WithLinkName(file),
  141. rotatelogs.WithRotationTime(time.Hour*time.Duration(Config.Basic.RotateTime)),
  142. rotatelogs.WithMaxAge(time.Hour*time.Duration(Config.Basic.MaxAge)),
  143. )
  144. if err != nil {
  145. fmt.Println("Failed to init log file settings..." + err.Error())
  146. Log.Infof("Failed to log to file, using default stderr.")
  147. } else if Config.Basic.ConsoleLog {
  148. mw := io.MultiWriter(os.Stdout, logWriter)
  149. Log.SetOutput(mw)
  150. } else if !Config.Basic.ConsoleLog {
  151. Log.SetOutput(logWriter)
  152. }
  153. } else if Config.Basic.ConsoleLog {
  154. Log.SetOutput(os.Stdout)
  155. }
  156. }
  157. func PrintMap(m map[string]string, buff *bytes.Buffer) {
  158. si := make([]string, 0, len(m))
  159. for s := range m {
  160. si = append(si, s)
  161. }
  162. sort.Strings(si)
  163. for _, s := range si {
  164. buff.WriteString(fmt.Sprintf("%s: %s\n", s, m[s]))
  165. }
  166. }
  167. func CloseLogger() {
  168. if logFile != nil {
  169. logFile.Close()
  170. }
  171. }
  172. func GetConfLoc() (string, error) {
  173. return GetLoc(etc_dir)
  174. }
  175. func GetDataLoc() (string, error) {
  176. if IsTesting {
  177. dataDir, err := GetLoc(data_dir)
  178. if err != nil {
  179. return "", err
  180. }
  181. d := path.Join(path.Dir(dataDir), "test")
  182. if _, err := os.Stat(d); os.IsNotExist(err) {
  183. err = os.MkdirAll(d, 0755)
  184. if err != nil {
  185. return "", err
  186. }
  187. }
  188. return d, nil
  189. }
  190. return GetLoc(data_dir)
  191. }
  192. func absolutePath(subdir string) (dir string, err error) {
  193. subdir = strings.TrimLeft(subdir, `/`)
  194. subdir = strings.TrimRight(subdir, `/`)
  195. switch subdir {
  196. case "etc":
  197. dir = "/etc/kuiper/"
  198. break
  199. case "data":
  200. dir = "/var/lib/kuiper/data/"
  201. break
  202. case "log":
  203. dir = "/var/log/kuiper/"
  204. break
  205. case "plugins":
  206. dir = "/var/lib/kuiper/plugins/"
  207. break
  208. }
  209. if 0 == len(dir) {
  210. return "", fmt.Errorf("no find such file : %s", subdir)
  211. }
  212. return dir, nil
  213. }
  214. func GetLoc(subdir string) (string, error) {
  215. if "relative" == LoadFileType {
  216. return relativePath(subdir)
  217. }
  218. if "absolute" == LoadFileType {
  219. return absolutePath(subdir)
  220. }
  221. return "", fmt.Errorf("Unrecognized loading method.")
  222. }
  223. func relativePath(subdir string) (dir string, err error) {
  224. dir, err = os.Getwd()
  225. if err != nil {
  226. return "", err
  227. }
  228. if base := os.Getenv(KuiperBaseKey); base != "" {
  229. Log.Infof("Specified Kuiper base folder at location %s.\n", base)
  230. dir = base
  231. }
  232. confDir := dir + subdir
  233. if _, err := os.Stat(confDir); os.IsNotExist(err) {
  234. lastdir := dir
  235. for len(dir) > 0 {
  236. dir = filepath.Dir(dir)
  237. if lastdir == dir {
  238. break
  239. }
  240. confDir = dir + subdir
  241. if _, err := os.Stat(confDir); os.IsNotExist(err) {
  242. lastdir = dir
  243. continue
  244. } else {
  245. //Log.Printf("Trying to load file from %s", confDir)
  246. return confDir, nil
  247. }
  248. }
  249. } else {
  250. //Log.Printf("Trying to load file from %s", confDir)
  251. return confDir, nil
  252. }
  253. return "", fmt.Errorf("conf dir not found, please set KuiperBaseKey program environment variable correctly.")
  254. }
  255. func ProcessPath(p string) (string, error) {
  256. if abs, err := filepath.Abs(p); err != nil {
  257. return "", nil
  258. } else {
  259. if _, err := os.Stat(abs); os.IsNotExist(err) {
  260. return "", err
  261. }
  262. return abs, nil
  263. }
  264. }
  265. /*********** Type Cast Utilities *****/
  266. //TODO datetime type
  267. func ToString(input interface{}) string {
  268. return fmt.Sprintf("%v", input)
  269. }
  270. func ToInt(input interface{}) (int, error) {
  271. switch t := input.(type) {
  272. case float64:
  273. return int(t), nil
  274. case int64:
  275. return int(t), nil
  276. case int:
  277. return t, nil
  278. default:
  279. return 0, fmt.Errorf("unsupported type %T of %[1]v", input)
  280. }
  281. }
  282. /*
  283. * Convert a map into a struct. The output parameter must be a pointer to a struct
  284. * The struct can have the json meta data
  285. */
  286. func MapToStruct(input, output interface{}) error {
  287. // convert map to json
  288. jsonString, err := json.Marshal(input)
  289. if err != nil {
  290. return err
  291. }
  292. // convert json to struct
  293. return json.Unmarshal(jsonString, output)
  294. }
  295. func ConvertMap(s map[interface{}]interface{}) map[string]interface{} {
  296. r := make(map[string]interface{})
  297. for k, v := range s {
  298. switch t := v.(type) {
  299. case map[interface{}]interface{}:
  300. v = ConvertMap(t)
  301. case []interface{}:
  302. v = ConvertArray(t)
  303. }
  304. r[fmt.Sprintf("%v", k)] = v
  305. }
  306. return r
  307. }
  308. func ConvertArray(s []interface{}) []interface{} {
  309. r := make([]interface{}, len(s))
  310. for i, e := range s {
  311. switch t := e.(type) {
  312. case map[interface{}]interface{}:
  313. e = ConvertMap(t)
  314. case []interface{}:
  315. e = ConvertArray(t)
  316. }
  317. r[i] = e
  318. }
  319. return r
  320. }
  321. func SyncMapToMap(sm *sync.Map) map[string]interface{} {
  322. m := make(map[string]interface{})
  323. sm.Range(func(k interface{}, v interface{}) bool {
  324. m[fmt.Sprintf("%v", k)] = v
  325. return true
  326. })
  327. return m
  328. }
  329. func MapToSyncMap(m map[string]interface{}) *sync.Map {
  330. sm := new(sync.Map)
  331. for k, v := range m {
  332. sm.Store(k, v)
  333. }
  334. return sm
  335. }
  336. func ReadJsonUnmarshal(path string, ret interface{}) error {
  337. sliByte, err := ioutil.ReadFile(path)
  338. if nil != err {
  339. return err
  340. }
  341. err = json.Unmarshal(sliByte, ret)
  342. if nil != err {
  343. return err
  344. }
  345. return nil
  346. }
  347. func ReadYamlUnmarshal(path string, ret interface{}) error {
  348. sliByte, err := ioutil.ReadFile(path)
  349. if nil != err {
  350. return err
  351. }
  352. err = yaml.Unmarshal(sliByte, ret)
  353. if nil != err {
  354. return err
  355. }
  356. return nil
  357. }
  358. func WriteYamlMarshal(path string, data interface{}) error {
  359. y, err := yaml.Marshal(data)
  360. if nil != err {
  361. return err
  362. }
  363. return ioutil.WriteFile(path, y, 0666)
  364. }