sink_node.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package node
  15. import (
  16. "fmt"
  17. "github.com/lf-edge/ekuiper/internal/binder/io"
  18. "github.com/lf-edge/ekuiper/internal/conf"
  19. "github.com/lf-edge/ekuiper/internal/topo/context"
  20. "github.com/lf-edge/ekuiper/internal/topo/transform"
  21. "github.com/lf-edge/ekuiper/pkg/api"
  22. "github.com/lf-edge/ekuiper/pkg/cast"
  23. "github.com/lf-edge/ekuiper/pkg/errorx"
  24. "github.com/lf-edge/ekuiper/pkg/infra"
  25. "strings"
  26. "sync"
  27. "time"
  28. )
  29. type SinkConf struct {
  30. Concurrency int `json:"concurrency"`
  31. RunAsync bool `json:"runAsync"`
  32. RetryInterval int `json:"retryInterval"`
  33. RetryCount int `json:"retryCount"`
  34. CacheLength int `json:"cacheLength"`
  35. CacheSaveInterval int `json:"cacheSaveInterval"`
  36. Omitempty bool `json:"omitIfEmpty"`
  37. SendSingle bool `json:"sendSingle"`
  38. DataTemplate string `json:"dataTemplate"`
  39. }
  40. type SinkNode struct {
  41. *defaultSinkNode
  42. //static
  43. sinkType string
  44. mutex sync.RWMutex
  45. //configs (also static for sinks)
  46. options map[string]interface{}
  47. isMock bool
  48. //states varies after restart
  49. sinks []api.Sink
  50. tch chan struct{} //channel to trigger cache saved, will be trigger by checkpoint only
  51. }
  52. func NewSinkNode(name string, sinkType string, props map[string]interface{}) *SinkNode {
  53. bufferLength := 1024
  54. if c, ok := props["bufferLength"]; ok {
  55. if t, err := cast.ToInt(c, cast.STRICT); err != nil || t <= 0 {
  56. //invalid property bufferLength
  57. } else {
  58. bufferLength = t
  59. }
  60. }
  61. return &SinkNode{
  62. defaultSinkNode: &defaultSinkNode{
  63. input: make(chan interface{}, bufferLength),
  64. defaultNode: &defaultNode{
  65. name: name,
  66. concurrency: 1,
  67. ctx: nil,
  68. },
  69. },
  70. sinkType: sinkType,
  71. options: props,
  72. }
  73. }
  74. // NewSinkNodeWithSink Only for mock source, do not use it in production
  75. func NewSinkNodeWithSink(name string, sink api.Sink, props map[string]interface{}) *SinkNode {
  76. return &SinkNode{
  77. defaultSinkNode: &defaultSinkNode{
  78. input: make(chan interface{}, 1024),
  79. defaultNode: &defaultNode{
  80. name: name,
  81. concurrency: 1,
  82. ctx: nil,
  83. },
  84. },
  85. sinks: []api.Sink{sink},
  86. options: props,
  87. isMock: true,
  88. }
  89. }
  90. func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
  91. m.ctx = ctx
  92. logger := ctx.GetLogger()
  93. logger.Debugf("open sink node %s", m.name)
  94. if m.qos >= api.AtLeastOnce {
  95. m.tch = make(chan struct{})
  96. }
  97. go func() {
  98. err := infra.SafeRun(func() error {
  99. sconf := &SinkConf{
  100. Concurrency: 1,
  101. RunAsync: false,
  102. RetryInterval: 1000,
  103. RetryCount: 0,
  104. CacheLength: 1024,
  105. CacheSaveInterval: 1000,
  106. Omitempty: false,
  107. SendSingle: false,
  108. DataTemplate: "",
  109. }
  110. err := cast.MapToStruct(m.options, sconf)
  111. if err != nil {
  112. return fmt.Errorf("read properties %v fail with error: %v", m.options, err)
  113. }
  114. if sconf.Concurrency <= 0 {
  115. logger.Warnf("invalid type for concurrency property, should be positive integer but found %t", sconf.Concurrency)
  116. sconf.Concurrency = 1
  117. }
  118. m.concurrency = sconf.Concurrency
  119. if sconf.RetryInterval <= 0 {
  120. logger.Warnf("invalid type for retryInterval property, should be positive integer but found %t", sconf.RetryInterval)
  121. sconf.RetryInterval = 1000
  122. }
  123. if sconf.RetryCount < 0 {
  124. logger.Warnf("invalid type for retryCount property, should be positive integer but found %t", sconf.RetryCount)
  125. sconf.RetryCount = 3
  126. }
  127. if sconf.CacheLength < 0 {
  128. logger.Warnf("invalid type for cacheLength property, should be positive integer but found %t", sconf.CacheLength)
  129. sconf.CacheLength = 1024
  130. }
  131. if sconf.CacheSaveInterval < 0 {
  132. logger.Warnf("invalid type for cacheSaveInterval property, should be positive integer but found %t", sconf.CacheSaveInterval)
  133. sconf.CacheSaveInterval = 1000
  134. }
  135. tf, err := transform.GenTransform(sconf.DataTemplate)
  136. if err != nil {
  137. msg := fmt.Sprintf("property dataTemplate %v is invalid: %v", sconf.DataTemplate, err)
  138. logger.Warnf(msg)
  139. return fmt.Errorf(msg)
  140. }
  141. ctx = context.WithValue(ctx.(*context.DefaultContext), context.TransKey, tf)
  142. m.reset()
  143. logger.Infof("open sink node %d instances", m.concurrency)
  144. for i := 0; i < m.concurrency; i++ { // workers
  145. go func(instance int) {
  146. panicOrError := infra.SafeRun(func() error {
  147. var (
  148. sink api.Sink
  149. err error
  150. )
  151. if !m.isMock {
  152. logger.Debugf("Trying to get sink for rule %s with options %v\n", ctx.GetRuleId(), m.options)
  153. sink, err = getSink(m.sinkType, m.options)
  154. if err != nil {
  155. return err
  156. }
  157. logger.Debugf("Successfully get the sink %s", m.sinkType)
  158. m.mutex.Lock()
  159. m.sinks = append(m.sinks, sink)
  160. m.mutex.Unlock()
  161. logger.Debugf("Now is to open sink for rule %s.\n", ctx.GetRuleId())
  162. if err := sink.Open(ctx); err != nil {
  163. return err
  164. }
  165. logger.Debugf("Successfully open sink for rule %s.\n", ctx.GetRuleId())
  166. } else {
  167. sink = m.sinks[instance]
  168. }
  169. stats, err := NewStatManager(ctx, "sink")
  170. if err != nil {
  171. return err
  172. }
  173. m.mutex.Lock()
  174. m.statManagers = append(m.statManagers, stats)
  175. m.mutex.Unlock()
  176. if conf.Config.Sink.DisableCache {
  177. for {
  178. select {
  179. case data := <-m.input:
  180. if temp, processed := m.preprocess(data); processed {
  181. break
  182. } else {
  183. data = temp
  184. }
  185. stats.SetBufferLength(int64(len(m.input)))
  186. if sconf.RunAsync {
  187. go func() {
  188. p := infra.SafeRun(func() error {
  189. doCollect(ctx, sink, data, stats, sconf)
  190. return nil
  191. })
  192. if p != nil {
  193. infra.DrainError(ctx, p, result)
  194. }
  195. }()
  196. } else {
  197. doCollect(ctx, sink, data, stats, sconf)
  198. }
  199. case <-ctx.Done():
  200. logger.Infof("sink node %s instance %d done", m.name, instance)
  201. if err := sink.Close(ctx); err != nil {
  202. logger.Warnf("close sink node %s instance %d fails: %v", m.name, instance, err)
  203. }
  204. return nil
  205. case <-m.tch:
  206. logger.Debugf("rule %s sink receive checkpoint, do nothing", ctx.GetRuleId())
  207. }
  208. }
  209. } else {
  210. logger.Infof("Creating sink cache")
  211. var cache *Cache
  212. if m.qos >= api.AtLeastOnce {
  213. cache = NewCheckpointbasedCache(m.input, sconf.CacheLength, m.tch, result, ctx.WithInstance(instance))
  214. } else {
  215. cache = NewTimebasedCache(m.input, sconf.CacheLength, sconf.CacheSaveInterval, result, ctx.WithInstance(instance))
  216. }
  217. for {
  218. select {
  219. case data := <-cache.Out:
  220. if temp, processed := m.preprocess(data.data); processed {
  221. break
  222. } else {
  223. data.data = temp
  224. }
  225. stats.SetBufferLength(int64(len(m.input)))
  226. if sconf.RunAsync {
  227. go func() {
  228. p := infra.SafeRun(func() error {
  229. doCollect(ctx, sink, data.data, stats, sconf)
  230. return nil
  231. })
  232. if p != nil {
  233. infra.DrainError(ctx, p, result)
  234. }
  235. }()
  236. } else {
  237. doCollect(ctx, sink, data.data, stats, sconf)
  238. if cache.Complete != nil {
  239. select {
  240. case cache.Complete <- data.index:
  241. default:
  242. ctx.GetLogger().Warnf("sink cache missing response for %d", data.index)
  243. }
  244. }
  245. }
  246. case <-ctx.Done():
  247. logger.Infof("sink node %s instance %d done", m.name, instance)
  248. if err := sink.Close(ctx); err != nil {
  249. logger.Warnf("close sink node %s instance %d fails: %v", m.name, instance, err)
  250. }
  251. return nil
  252. }
  253. }
  254. }
  255. })
  256. if panicOrError != nil {
  257. infra.DrainError(ctx, panicOrError, result)
  258. }
  259. }(i)
  260. }
  261. return nil
  262. })
  263. if err != nil {
  264. infra.DrainError(ctx, err, result)
  265. }
  266. }()
  267. }
  268. func (m *SinkNode) reset() {
  269. if !m.isMock {
  270. m.sinks = nil
  271. }
  272. m.statManagers = nil
  273. }
  274. func doCollect(ctx api.StreamContext, sink api.Sink, item interface{}, stats StatManager, sconf *SinkConf) {
  275. stats.IncTotalRecordsIn()
  276. stats.ProcessTimeStart()
  277. defer stats.ProcessTimeEnd()
  278. var outs []map[string]interface{}
  279. switch val := item.(type) {
  280. case error:
  281. outs = []map[string]interface{}{
  282. {"error": val.Error()},
  283. }
  284. case []map[string]interface{}:
  285. outs = val
  286. default:
  287. outs = []map[string]interface{}{
  288. {"error": fmt.Sprintf("result is not a map slice but found %#v", val)},
  289. }
  290. }
  291. if sconf.Omitempty && (item == nil || len(outs) == 0) {
  292. ctx.GetLogger().Debugf("receive empty in sink")
  293. return
  294. }
  295. if !sconf.SendSingle {
  296. doCollectData(ctx, sink, outs, stats, sconf)
  297. } else {
  298. for _, d := range outs {
  299. if sconf.Omitempty && (d == nil || len(d) == 0) {
  300. ctx.GetLogger().Debugf("receive empty in sink")
  301. continue
  302. }
  303. doCollectData(ctx, sink, d, stats, sconf)
  304. }
  305. }
  306. }
  307. // doCollectData outData must be map or []map
  308. func doCollectData(ctx api.StreamContext, sink api.Sink, outData interface{}, stats StatManager, sconf *SinkConf) {
  309. retries := sconf.RetryCount
  310. for {
  311. select {
  312. case <-ctx.Done():
  313. ctx.GetLogger().Infof("sink node %s instance %d stops data resending", ctx.GetOpId(), ctx.GetInstanceId())
  314. return
  315. default:
  316. if err := sink.Collect(ctx, outData); err != nil {
  317. stats.IncTotalExceptions()
  318. ctx.GetLogger().Warnf("sink node %s instance %d publish %s error: %v", ctx.GetOpId(), ctx.GetInstanceId(), outData, err)
  319. if sconf.RetryInterval > 0 && retries > 0 && strings.HasPrefix(err.Error(), errorx.IOErr) {
  320. retries--
  321. time.Sleep(time.Duration(sconf.RetryInterval) * time.Millisecond)
  322. ctx.GetLogger().Debugf("try again")
  323. } else {
  324. return
  325. }
  326. } else {
  327. ctx.GetLogger().Debugf("success")
  328. stats.IncTotalRecordsOut()
  329. return
  330. }
  331. }
  332. }
  333. }
  334. func getSink(name string, action map[string]interface{}) (api.Sink, error) {
  335. var (
  336. s api.Sink
  337. err error
  338. )
  339. s, err = io.Sink(name)
  340. if s != nil {
  341. err = s.Configure(action)
  342. if err != nil {
  343. return nil, err
  344. }
  345. return s, nil
  346. } else {
  347. if err != nil {
  348. return nil, err
  349. } else {
  350. return nil, fmt.Errorf("sink %s not found", name)
  351. }
  352. }
  353. }
  354. // AddOutput Override defaultNode
  355. func (m *SinkNode) AddOutput(_ chan<- interface{}, name string) error {
  356. return fmt.Errorf("fail to add output %s, sink %s cannot add output", name, m.name)
  357. }
  358. // Broadcast Override defaultNode
  359. func (m *SinkNode) Broadcast(_ interface{}) error {
  360. return fmt.Errorf("sink %s cannot add broadcast", m.name)
  361. }
  362. // SaveCache Only called when checkpoint enabled
  363. func (m *SinkNode) SaveCache() {
  364. select {
  365. case m.tch <- struct{}{}:
  366. case <-m.ctx.Done():
  367. }
  368. }