template.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package transform
  15. import (
  16. "bytes"
  17. "encoding/json"
  18. "fmt"
  19. "text/template"
  20. "github.com/lf-edge/ekuiper/internal/conf"
  21. "github.com/lf-edge/ekuiper/internal/converter"
  22. "github.com/lf-edge/ekuiper/internal/converter/delimited"
  23. "github.com/lf-edge/ekuiper/pkg/ast"
  24. "github.com/lf-edge/ekuiper/pkg/message"
  25. )
  26. // TransFunc is the function to transform data
  27. type TransFunc func(interface{}) ([]byte, bool, error)
  28. func GenTransform(dt string, format string, schemaId string, delimiter string, dataField string, fields []string) (TransFunc, error) {
  29. var (
  30. tp *template.Template = nil
  31. c message.Converter
  32. err error
  33. )
  34. switch format {
  35. case message.FormatProtobuf, message.FormatCustom:
  36. c, err = converter.GetOrCreateConverter(&ast.Options{FORMAT: format, SCHEMAID: schemaId})
  37. if err != nil {
  38. return nil, err
  39. }
  40. case message.FormatDelimited:
  41. c, err = converter.GetOrCreateConverter(&ast.Options{FORMAT: format, DELIMITER: delimiter})
  42. if err != nil {
  43. return nil, err
  44. }
  45. c.(*delimited.Converter).SetColumns(fields)
  46. case message.FormatJson:
  47. c, err = converter.GetOrCreateConverter(&ast.Options{FORMAT: format})
  48. if err != nil {
  49. return nil, err
  50. }
  51. }
  52. if dt != "" {
  53. temp, err := template.New("sink").Funcs(conf.FuncMap).Parse(dt)
  54. if err != nil {
  55. return nil, err
  56. }
  57. tp = temp
  58. }
  59. return func(d interface{}) ([]byte, bool, error) {
  60. var (
  61. bs []byte
  62. transformed bool
  63. selected bool
  64. m interface{}
  65. e error
  66. )
  67. if tp != nil {
  68. var output bytes.Buffer
  69. err := tp.Execute(&output, d)
  70. if err != nil {
  71. return nil, false, fmt.Errorf("fail to encode data %v with dataTemplate for error %v", d, err)
  72. }
  73. bs = output.Bytes()
  74. transformed = true
  75. }
  76. if transformed {
  77. m, selected, e = TransItem(bs, dataField, fields)
  78. } else {
  79. m, selected, e = TransItem(d, dataField, fields)
  80. }
  81. if e != nil {
  82. return nil, false, fmt.Errorf("fail to TransItem data %v for error %v", d, e)
  83. }
  84. if selected {
  85. d = m
  86. }
  87. switch format {
  88. case message.FormatJson:
  89. if transformed && !selected {
  90. return bs, true, nil
  91. }
  92. outBytes, err := c.Encode(d)
  93. return outBytes, transformed || selected, err
  94. case message.FormatProtobuf, message.FormatCustom, message.FormatDelimited:
  95. if transformed && !selected {
  96. m := make(map[string]interface{})
  97. err := json.Unmarshal(bs, &m)
  98. if err != nil {
  99. return nil, false, fmt.Errorf("fail to decode data %s after applying dataTemplate for error %v", string(bs), err)
  100. }
  101. d = m
  102. }
  103. outBytes, err := c.Encode(d)
  104. return outBytes, transformed || selected, err
  105. default: // should not happen
  106. return nil, false, fmt.Errorf("unsupported format %v", format)
  107. }
  108. }, nil
  109. }
  110. func GenTp(dt string) (*template.Template, error) {
  111. return template.New("sink").Funcs(conf.FuncMap).Parse(dt)
  112. }
  113. // If you do not need to convert data to []byte, you can use this function directly. Otherwise, use TransFunc.
  114. func TransItem(input interface{}, dataField string, fields []string) (interface{}, bool, error) {
  115. if dataField == "" && len(fields) == 0 {
  116. return input, false, nil
  117. }
  118. if _, ok := input.([]byte); ok {
  119. var m interface{}
  120. err := json.Unmarshal(input.([]byte), &m)
  121. if err != nil {
  122. return input, false, fmt.Errorf("fail to decode data %s for error %v", string(input.([]byte)), err)
  123. }
  124. input = m
  125. }
  126. if dataField != "" {
  127. switch input.(type) {
  128. case map[string]interface{}:
  129. input = input.(map[string]interface{})[dataField]
  130. case []interface{}:
  131. if len(input.([]interface{})) == 0 {
  132. return nil, false, nil
  133. }
  134. input = input.([]interface{})[0].(map[string]interface{})[dataField]
  135. case []map[string]interface{}:
  136. if len(input.([]map[string]interface{})) == 0 {
  137. return nil, false, nil
  138. }
  139. input = input.([]map[string]interface{})[0][dataField]
  140. default:
  141. return nil, false, fmt.Errorf("fail to decode data %v", input)
  142. }
  143. }
  144. m, err := selectMap(input, fields)
  145. if err != nil && err.Error() != "fields cannot be empty" {
  146. return nil, false, fmt.Errorf("fail to decode data %v for error %v", input, err)
  147. } else {
  148. return m, true, nil
  149. }
  150. }
  151. // selectMap select fields from input map or array of map.
  152. func selectMap(input interface{}, fields []string) (interface{}, error) {
  153. if len(fields) == 0 {
  154. return input, fmt.Errorf("fields cannot be empty")
  155. }
  156. outputs := make([]map[string]interface{}, 0)
  157. switch input.(type) {
  158. case map[string]interface{}:
  159. output := make(map[string]interface{})
  160. for _, field := range fields {
  161. output[field] = input.(map[string]interface{})[field]
  162. }
  163. return output, nil
  164. case []interface{}:
  165. for _, v := range input.([]interface{}) {
  166. output := make(map[string]interface{})
  167. if out, ok := v.(map[string]interface{}); !ok {
  168. return input, fmt.Errorf("unsupported type %v", input)
  169. } else {
  170. for _, field := range fields {
  171. output[field] = out[field]
  172. }
  173. outputs = append(outputs, output)
  174. }
  175. }
  176. return outputs, nil
  177. case []map[string]interface{}:
  178. for _, v := range input.([]map[string]interface{}) {
  179. output := make(map[string]interface{})
  180. for _, field := range fields {
  181. output[field] = v[field]
  182. }
  183. outputs = append(outputs, output)
  184. }
  185. return outputs, nil
  186. default:
  187. return input, fmt.Errorf("unsupported type %v", input)
  188. }
  189. }