templateSqlDialect.go 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package sqlgen
  15. import (
  16. "bytes"
  17. "fmt"
  18. "github.com/lf-edge/ekuiper/pkg/cast"
  19. "github.com/posener/order"
  20. "text/template"
  21. )
  22. type templateSqlQuery struct {
  23. tp *template.Template
  24. *TemplateSqlQueryCfg
  25. }
  26. func NewTemplateSqlQuery(cfg *TemplateSqlQueryCfg) (SqlQueryGenerator, error) {
  27. t := &templateSqlQuery{
  28. tp: nil,
  29. TemplateSqlQueryCfg: cfg,
  30. }
  31. if err := t.init(); err != nil {
  32. return nil, err
  33. } else {
  34. return t, nil
  35. }
  36. }
  37. func (t *templateSqlQuery) init() error {
  38. tp, err := template.New("sql").Parse(t.TemplateSql)
  39. if err != nil {
  40. return err
  41. }
  42. t.tp = tp
  43. return nil
  44. }
  45. func (t *templateSqlQuery) SqlQueryStatement() (string, error) {
  46. var val string
  47. if t.IndexFieldType == DATETIME_TYPE && t.DateTimeFormat != "" {
  48. time, err := cast.InterfaceToTime(t.IndexValue, t.DateTimeFormat)
  49. if err != nil {
  50. err = fmt.Errorf("SqlQueryStatement InterfaceToTime datetime convert got error %v", err)
  51. return "", err
  52. }
  53. val, err = cast.FormatTime(time, t.DateTimeFormat)
  54. if err != nil {
  55. err = fmt.Errorf("SqlQueryStatement FormatTime datetime convert got error %v", err)
  56. return "", err
  57. }
  58. } else {
  59. val = fmt.Sprintf("%v", t.IndexValue)
  60. }
  61. input := map[string]interface{}{
  62. t.IndexField: val,
  63. }
  64. var output bytes.Buffer
  65. err := t.tp.Execute(&output, input)
  66. if err != nil {
  67. return "", err
  68. }
  69. return string(output.Bytes()), nil
  70. }
  71. func (t *templateSqlQuery) UpdateMaxIndexValue(row map[string]interface{}) {
  72. if t.IndexField != "" {
  73. v, found := row[t.IndexField]
  74. if !found {
  75. return
  76. }
  77. if val := order.Is(v); val.Greater(t.IndexValue) {
  78. t.IndexValue = v
  79. }
  80. }
  81. }