util.go 8.0 KB


  1. package common
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/benbjohnson/clock"
  7. "github.com/emqx/kuiper/xstream/api"
  8. "github.com/go-yaml/yaml"
  9. "github.com/keepeye/logrus-filename"
  10. "github.com/sirupsen/logrus"
  11. "io"
  12. "io/ioutil"
  13. "os"
  14. "path"
  15. "path/filepath"
  16. //"runtime"
  17. "sort"
  18. "strings"
  19. "sync"
  20. )
  21. const (
  22. logFileName = "stream.log"
  23. etc_dir = "/etc/"
  24. data_dir = "/data/"
  25. log_dir = "/log/"
  26. StreamConf = "kuiper.yaml"
  27. KuiperBaseKey = "KuiperBaseKey"
  28. MetaKey = "__meta"
  29. )
  30. var (
  31. Log *logrus.Logger
  32. Config *KuiperConf
  33. IsTesting bool
  34. Clock clock.Clock
  35. logFile *os.File
  36. LoadFileType = "relative"
  37. )
  38. func LoadConf(confName string) ([]byte, error) {
  39. confDir, err := GetConfLoc()
  40. if err != nil {
  41. return nil, err
  42. }
  43. file := path.Join(confDir, confName)
  44. b, err := ioutil.ReadFile(file)
  45. if err != nil {
  46. return nil, err
  47. }
  48. return b, nil
  49. }
  50. type tlsConf struct {
  51. Certfile string `yaml:"certfile"`
  52. Keyfile string `yaml:"keyfile"`
  53. }
  54. type KuiperConf struct {
  55. Basic struct {
  56. Debug bool `yaml:"debug"`
  57. ConsoleLog bool `yaml:"consoleLog"`
  58. FileLog bool `yaml:"fileLog"`
  59. Port int `yaml:"port"`
  60. RestPort int `yaml:"restPort"`
  61. RestTls *tlsConf `yaml:"restTls"`
  62. Prometheus bool `yaml:"prometheus"`
  63. PrometheusPort int `yaml:"prometheusPort"`
  64. PluginHosts string `yaml:"pluginHosts"`
  65. }
  66. Rule api.RuleOption
  67. Sink struct {
  68. CacheThreshold int `yaml:"cacheThreshold"`
  69. CacheTriggerCount int `yaml:"cacheTriggerCount"`
  70. DisableCache bool `yaml:"disableCache""`
  71. }
  72. }
  73. func init() {
  74. filenameHook := filename.NewHook()
  75. filenameHook.Field = "file"
  76. Log = logrus.New()
  77. Log.AddHook(filenameHook)
  78. /*
  79. Log.SetReportCaller(true)
  80. Log.SetFormatter(&logrus.TextFormatter{
  81. CallerPrettyfier: func(f *runtime.Frame) (string, string) {
  82. filename := path.Base(f.File)
  83. return "", fmt.Sprintf("%s:%d", filename, f.Line)
  84. },
  85. DisableColors: true,
  86. FullTimestamp: true,
  87. })
  88. */
  89. Log.SetFormatter(&logrus.TextFormatter{
  90. TimestampFormat: "2006-01-02 15:04:05",
  91. DisableColors: true,
  92. FullTimestamp: true,
  93. })
  94. Log.Debugf("init with args %s", os.Args)
  95. for _, arg := range os.Args {
  96. if strings.HasPrefix(arg, "-test.") {
  97. IsTesting = true
  98. break
  99. }
  100. }
  101. if IsTesting {
  102. Log.Debugf("running in testing mode")
  103. Clock = clock.NewMock()
  104. } else {
  105. Clock = clock.New()
  106. }
  107. }
  108. func InitConf() {
  109. b, err := LoadConf(StreamConf)
  110. if err != nil {
  111. Log.Fatal(err)
  112. }
  113. kc := KuiperConf{
  114. Rule: api.RuleOption{
  115. LateTol: 1000,
  116. Concurrency: 1,
  117. BufferLength: 1024,
  118. CheckpointInterval: 300000, //5 minutes
  119. },
  120. }
  121. if err := yaml.Unmarshal(b, &kc); err != nil {
  122. Log.Fatal(err)
  123. } else {
  124. Config = &kc
  125. }
  126. if Config.Basic.Debug {
  127. Log.SetLevel(logrus.DebugLevel)
  128. }
  129. logDir, err := GetLoc(log_dir)
  130. if err != nil {
  131. Log.Fatal(err)
  132. }
  133. file := logDir + logFileName
  134. logFile, err := os.OpenFile(file, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
  135. if err == nil {
  136. if Config.Basic.ConsoleLog {
  137. if Config.Basic.FileLog {
  138. mw := io.MultiWriter(os.Stdout, logFile)
  139. Log.SetOutput(mw)
  140. }
  141. } else {
  142. if Config.Basic.FileLog {
  143. Log.SetOutput(logFile)
  144. }
  145. }
  146. } else {
  147. fmt.Println("Failed to init log file settings...")
  148. Log.Infof("Failed to log to file, using default stderr.")
  149. }
  150. }
  151. func PrintMap(m map[string]string, buff *bytes.Buffer) {
  152. si := make([]string, 0, len(m))
  153. for s := range m {
  154. si = append(si, s)
  155. }
  156. sort.Strings(si)
  157. for _, s := range si {
  158. buff.WriteString(fmt.Sprintf("%s: %s\n", s, m[s]))
  159. }
  160. }
  161. func CloseLogger() {
  162. if logFile != nil {
  163. logFile.Close()
  164. }
  165. }
  166. func GetConfLoc() (string, error) {
  167. return GetLoc(etc_dir)
  168. }
  169. func GetDataLoc() (string, error) {
  170. if IsTesting {
  171. dataDir, err := GetLoc(data_dir)
  172. if err != nil {
  173. return "", err
  174. }
  175. d := path.Join(path.Dir(dataDir), "test")
  176. if _, err := os.Stat(d); os.IsNotExist(err) {
  177. err = os.MkdirAll(d, 0755)
  178. if err != nil {
  179. return "", err
  180. }
  181. }
  182. return d, nil
  183. }
  184. return GetLoc(data_dir)
  185. }
  186. func absolutePath(subdir string) (dir string, err error) {
  187. subdir = strings.TrimLeft(subdir, `/`)
  188. subdir = strings.TrimRight(subdir, `/`)
  189. switch subdir {
  190. case "etc":
  191. dir = "/etc/kuiper/"
  192. break
  193. case "data":
  194. dir = "/var/lib/kuiper/data/"
  195. break
  196. case "log":
  197. dir = "/var/log/kuiper/"
  198. break
  199. case "plugins":
  200. dir = "/var/lib/kuiper/plugins/"
  201. break
  202. }
  203. if 0 == len(dir) {
  204. return "", fmt.Errorf("no find such file : %s", subdir)
  205. }
  206. return dir, nil
  207. }
  208. func GetLoc(subdir string) (string, error) {
  209. if "relative" == LoadFileType {
  210. return relativePath(subdir)
  211. }
  212. if "absolute" == LoadFileType {
  213. return absolutePath(subdir)
  214. }
  215. return "", fmt.Errorf("Unrecognized loading method.")
  216. }
  217. func relativePath(subdir string) (dir string, err error) {
  218. dir, err = os.Getwd()
  219. if err != nil {
  220. return "", err
  221. }
  222. if base := os.Getenv(KuiperBaseKey); base != "" {
  223. Log.Infof("Specified Kuiper base folder at location %s.\n", base)
  224. dir = base
  225. }
  226. confDir := dir + subdir
  227. if _, err := os.Stat(confDir); os.IsNotExist(err) {
  228. lastdir := dir
  229. for len(dir) > 0 {
  230. dir = filepath.Dir(dir)
  231. if lastdir == dir {
  232. break
  233. }
  234. confDir = dir + subdir
  235. if _, err := os.Stat(confDir); os.IsNotExist(err) {
  236. lastdir = dir
  237. continue
  238. } else {
  239. //Log.Printf("Trying to load file from %s", confDir)
  240. return confDir, nil
  241. }
  242. }
  243. } else {
  244. //Log.Printf("Trying to load file from %s", confDir)
  245. return confDir, nil
  246. }
  247. return "", fmt.Errorf("conf dir not found, please set KuiperBaseKey program environment variable correctly.")
  248. }
  249. func ProcessPath(p string) (string, error) {
  250. if abs, err := filepath.Abs(p); err != nil {
  251. return "", nil
  252. } else {
  253. if _, err := os.Stat(abs); os.IsNotExist(err) {
  254. return "", err
  255. }
  256. return abs, nil
  257. }
  258. }
  259. /*********** Type Cast Utilities *****/
  260. //TODO datetime type
  261. func ToString(input interface{}) string {
  262. return fmt.Sprintf("%v", input)
  263. }
  264. func ToInt(input interface{}) (int, error) {
  265. switch t := input.(type) {
  266. case float64:
  267. return int(t), nil
  268. case int64:
  269. return int(t), nil
  270. case int:
  271. return t, nil
  272. default:
  273. return 0, fmt.Errorf("unsupported type %T of %[1]v", input)
  274. }
  275. }
  276. /*
  277. * Convert a map into a struct. The output parameter must be a pointer to a struct
  278. * The struct can have the json meta data
  279. */
  280. func MapToStruct(input, output interface{}) error {
  281. // convert map to json
  282. jsonString, err := json.Marshal(input)
  283. if err != nil {
  284. return err
  285. }
  286. // convert json to struct
  287. return json.Unmarshal(jsonString, output)
  288. }
  289. func ConvertMap(s map[interface{}]interface{}) map[string]interface{} {
  290. r := make(map[string]interface{})
  291. for k, v := range s {
  292. switch t := v.(type) {
  293. case map[interface{}]interface{}:
  294. v = ConvertMap(t)
  295. case []interface{}:
  296. v = ConvertArray(t)
  297. }
  298. r[fmt.Sprintf("%v", k)] = v
  299. }
  300. return r
  301. }
  302. func ConvertArray(s []interface{}) []interface{} {
  303. r := make([]interface{}, len(s))
  304. for i, e := range s {
  305. switch t := e.(type) {
  306. case map[interface{}]interface{}:
  307. e = ConvertMap(t)
  308. case []interface{}:
  309. e = ConvertArray(t)
  310. }
  311. r[i] = e
  312. }
  313. return r
  314. }
  315. func SyncMapToMap(sm *sync.Map) map[string]interface{} {
  316. m := make(map[string]interface{})
  317. sm.Range(func(k interface{}, v interface{}) bool {
  318. m[fmt.Sprintf("%v", k)] = v
  319. return true
  320. })
  321. return m
  322. }
  323. func MapToSyncMap(m map[string]interface{}) *sync.Map {
  324. sm := new(sync.Map)
  325. for k, v := range m {
  326. sm.Store(k, v)
  327. }
  328. return sm
  329. }
  330. func ReadJsonUnmarshal(path string, ret interface{}) error {
  331. sliByte, err := ioutil.ReadFile(path)
  332. if nil != err {
  333. return err
  334. }
  335. err = json.Unmarshal(sliByte, ret)
  336. if nil != err {
  337. return err
  338. }
  339. return nil
  340. }
  341. func ReadYamlUnmarshal(path string, ret interface{}) error {
  342. sliByte, err := ioutil.ReadFile(path)
  343. if nil != err {
  344. return err
  345. }
  346. err = yaml.Unmarshal(sliByte, ret)
  347. if nil != err {
  348. return err
  349. }
  350. return nil
  351. }
  352. func WriteYamlMarshal(path string, data interface{}) error {
  353. y, err := yaml.Marshal(data)
  354. if nil != err {
  355. return err
  356. }
  357. return ioutil.WriteFile(path, y, 0666)
  358. }