sink.go 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package conf
  15. import (
  16. "github.com/lf-edge/ekuiper/internal/conf"
  17. )
  18. const ResourceID = "resourceId"
  19. func GetSinkConf(sinkType string, action map[string]interface{}) map[string]interface{} {
  20. resourceId, ok := action[ResourceID].(string)
  21. if !ok {
  22. return action
  23. }
  24. delete(action, ResourceID)
  25. yamlOps, err := conf.NewConfigOperatorFromSinkYaml(sinkType)
  26. if err != nil {
  27. conf.Log.Warnf("fail to parse yaml for sink %s. Return error %v", sinkType, err)
  28. return action
  29. }
  30. var props map[string]interface{}
  31. cfg := yamlOps.CopyConfContent()
  32. if len(cfg) == 0 {
  33. conf.Log.Warnf("fail to parse yaml for sink %s. Return an empty configuration", sinkType)
  34. return action
  35. } else {
  36. def, ok := cfg[resourceId]
  37. if !ok {
  38. conf.Log.Warnf("resource id %s is not found", resourceId)
  39. return action
  40. } else {
  41. props = def
  42. for k, v := range action {
  43. props[k] = v
  44. }
  45. }
  46. }
  47. conf.Log.Debugf("get conf for %s with resource id %s: %v", sinkType, resourceId, printable(props))
  48. return props
  49. }