tdengine.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "database/sql"
  17. "fmt"
  18. "github.com/lf-edge/ekuiper/internal/conf"
  19. "github.com/lf-edge/ekuiper/pkg/api"
  20. "github.com/lf-edge/ekuiper/pkg/cast"
  21. _ "github.com/taosdata/driver-go/v2/taosSql"
  22. "reflect"
  23. "strings"
  24. )
  25. type (
  26. taosConfig struct {
  27. ProvideTs bool `json:"provideTs"`
  28. Port int `json:"port"`
  29. Ip string `json:"ip"` // To be deprecated
  30. Host string `json:"host"`
  31. User string `json:"user"`
  32. Password string `json:"password"`
  33. Database string `json:"database"`
  34. Table string `json:"table"`
  35. TsFieldName string `json:"tsFieldName"`
  36. Fields []string `json:"fields"`
  37. STable string `json:"sTable"`
  38. TagFields []string `json:"tagFields"`
  39. }
  40. taosSink struct {
  41. conf *taosConfig
  42. url string
  43. db *sql.DB
  44. }
  45. )
  46. func (t *taosConfig) delTsField() {
  47. var auxFields []string
  48. for _, v := range t.Fields {
  49. if v != t.TsFieldName {
  50. auxFields = append(auxFields, v)
  51. }
  52. }
  53. t.Fields = auxFields
  54. }
  55. func (t *taosConfig) buildSql(ctx api.StreamContext, mapData map[string]interface{}) (string, error) {
  56. if 0 == len(mapData) {
  57. return "", fmt.Errorf("data is empty.")
  58. }
  59. logger := ctx.GetLogger()
  60. var (
  61. table, sTable string
  62. keys, vals, tags []string
  63. err error
  64. )
  65. table, err = ctx.ParseTemplate(t.Table, mapData)
  66. if err != nil {
  67. logger.Errorf("parse template for table %s error: %v", t.Table, err)
  68. return "", err
  69. }
  70. sTable, err = ctx.ParseTemplate(t.STable, mapData)
  71. if err != nil {
  72. logger.Errorf("parse template for sTable %s error: %v", t.STable, err)
  73. return "", err
  74. }
  75. if t.ProvideTs {
  76. if v, ok := mapData[t.TsFieldName]; !ok {
  77. return "", fmt.Errorf("Timestamp field not found : %s.", t.TsFieldName)
  78. } else {
  79. keys = append(keys, t.TsFieldName)
  80. vals = append(vals, fmt.Sprintf(`"%v"`, v))
  81. }
  82. } else {
  83. vals = append(vals, "now")
  84. keys = append(keys, t.TsFieldName)
  85. }
  86. if len(t.Fields) != 0 {
  87. for _, k := range t.Fields {
  88. if k == t.TsFieldName {
  89. continue
  90. }
  91. if v, ok := mapData[k]; ok {
  92. keys = append(keys, k)
  93. if reflect.String == reflect.TypeOf(v).Kind() {
  94. vals = append(vals, fmt.Sprintf(`"%v"`, v))
  95. } else {
  96. vals = append(vals, fmt.Sprintf(`%v`, v))
  97. }
  98. } else {
  99. logger.Warnln("not found field:", k)
  100. }
  101. }
  102. } else {
  103. for k, v := range mapData {
  104. if k == t.TsFieldName {
  105. continue
  106. }
  107. keys = append(keys, k)
  108. if reflect.String == reflect.TypeOf(v).Kind() {
  109. vals = append(vals, fmt.Sprintf(`"%v"`, v))
  110. } else {
  111. vals = append(vals, fmt.Sprintf(`%v`, v))
  112. }
  113. }
  114. }
  115. if len(t.TagFields) > 0 {
  116. for _, v := range t.TagFields {
  117. switch mapData[v].(type) {
  118. case string:
  119. tags = append(tags, fmt.Sprintf(`"%s"`, mapData[v]))
  120. default:
  121. tags = append(tags, fmt.Sprintf(`%v`, mapData[v]))
  122. }
  123. }
  124. }
  125. sqlStr := fmt.Sprintf("INSERT INTO %s (%s)", table, strings.Join(keys, ","))
  126. if sTable != "" {
  127. sqlStr += " using " + sTable
  128. }
  129. if len(tags) != 0 {
  130. sqlStr += " tags (" + strings.Join(tags, ",") + ")"
  131. }
  132. sqlStr += " values (" + strings.Join(vals, ",") + ");"
  133. return sqlStr, nil
  134. }
  135. func (m *taosSink) Configure(props map[string]interface{}) error {
  136. cfg := &taosConfig{
  137. User: "root",
  138. Password: "taosdata",
  139. }
  140. err := cast.MapToStruct(props, cfg)
  141. if err != nil {
  142. return fmt.Errorf("read properties %v fail with error: %v", props, err)
  143. }
  144. if cfg.Ip != "" {
  145. conf.Log.Warnf("Deprecated: Tdengine sink ip property is deprecated, use host instead.")
  146. if cfg.Host == "" {
  147. cfg.Host = cfg.Ip
  148. }
  149. }
  150. if cfg.Host == "" {
  151. cfg.Host = "localhost"
  152. }
  153. if cfg.User == "" {
  154. return fmt.Errorf("propert user is required.")
  155. }
  156. if cfg.Password == "" {
  157. return fmt.Errorf("propert password is required.")
  158. }
  159. if cfg.Database == "" {
  160. return fmt.Errorf("property database is required")
  161. }
  162. if cfg.Table == "" {
  163. return fmt.Errorf("property table is required")
  164. }
  165. if cfg.TsFieldName == "" {
  166. return fmt.Errorf("property TsFieldName is required")
  167. }
  168. if cfg.STable != "" && len(cfg.TagFields) == 0 {
  169. return fmt.Errorf("property tagFields is required when sTable is set")
  170. }
  171. m.url = fmt.Sprintf(`%s:%s@tcp(%s:%d)/%s`, cfg.User, cfg.Password, cfg.Host, cfg.Port, cfg.Database)
  172. m.conf = cfg
  173. return nil
  174. }
  175. func (m *taosSink) Open(ctx api.StreamContext) (err error) {
  176. ctx.GetLogger().Debug("Opening tdengine sink")
  177. m.db, err = sql.Open("taosSql", m.url)
  178. return err
  179. }
  180. func (m *taosSink) Collect(ctx api.StreamContext, item interface{}) error {
  181. ctx.GetLogger().Debugf("tdengine sink receive %s", item)
  182. switch v := item.(type) {
  183. case []map[string]interface{}:
  184. var err error
  185. for _, mapData := range v {
  186. e := m.writeToDB(ctx, mapData)
  187. if e != nil {
  188. err = e
  189. }
  190. }
  191. return err
  192. case map[string]interface{}:
  193. return m.writeToDB(ctx, v)
  194. default: // never happen
  195. return fmt.Errorf("unsupported type: %T", item)
  196. }
  197. }
  198. func (m *taosSink) writeToDB(ctx api.StreamContext, mapData map[string]interface{}) error {
  199. sql, err := m.conf.buildSql(ctx, mapData)
  200. if nil != err {
  201. return err
  202. }
  203. ctx.GetLogger().Debugf(sql)
  204. rows, err := m.db.Query(sql)
  205. if err != nil {
  206. return err
  207. }
  208. rows.Close()
  209. return nil
  210. }
  211. func (m *taosSink) Close(ctx api.StreamContext) error {
  212. if m.db != nil {
  213. return m.db.Close()
  214. }
  215. return nil
  216. }
  217. func Tdengine() api.Sink {
  218. return &taosSink{}
  219. }