table_node.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. package nodes
  2. import (
  3. "github.com/emqx/kuiper/common"
  4. "github.com/emqx/kuiper/xsql"
  5. "github.com/emqx/kuiper/xstream/api"
  6. "github.com/emqx/kuiper/xstream/extensions"
  7. )
  8. // Node for table source
  9. type TableNode struct {
  10. *defaultNode
  11. sourceType string
  12. options map[string]string
  13. }
  14. func NewTableNode(name string, options map[string]string) *TableNode {
  15. t, ok := options["TYPE"]
  16. if !ok {
  17. t = "file"
  18. }
  19. return &TableNode{
  20. sourceType: t,
  21. defaultNode: &defaultNode{
  22. name: name,
  23. outputs: make(map[string]chan<- interface{}),
  24. concurrency: 1,
  25. },
  26. options: options,
  27. }
  28. }
  29. func (m *TableNode) Open(ctx api.StreamContext, errCh chan<- error) {
  30. m.ctx = ctx
  31. logger := ctx.GetLogger()
  32. logger.Infof("open table node %s with option %v", m.name, m.options)
  33. go func() {
  34. props := getSourceConf(ctx, m.sourceType, m.options)
  35. //TODO apply properties like concurrency
  36. source, err := doGetTableSource(m.sourceType)
  37. if err != nil {
  38. m.drainError(errCh, err, ctx)
  39. return
  40. }
  41. err = source.Configure(m.options["DATASOURCE"], props)
  42. if err != nil {
  43. m.drainError(errCh, err, ctx)
  44. return
  45. }
  46. stats, err := NewStatManager("source", ctx)
  47. if err != nil {
  48. m.drainError(errCh, err, ctx)
  49. return
  50. }
  51. m.statManagers = append(m.statManagers, stats)
  52. stats.ProcessTimeStart()
  53. if data, err := source.Load(ctx); err != nil {
  54. stats.IncTotalExceptions()
  55. stats.ProcessTimeEnd()
  56. m.drainError(errCh, err, ctx)
  57. return
  58. } else {
  59. stats.IncTotalRecordsIn()
  60. stats.ProcessTimeEnd()
  61. logger.Debugf("table node %s is sending result", m.name)
  62. result := make([]*xsql.Tuple, len(data))
  63. for i, t := range data {
  64. tuple := &xsql.Tuple{Emitter: m.name, Message: t.Message(), Metadata: t.Meta(), Timestamp: common.GetNowInMilli()}
  65. result[i] = tuple
  66. }
  67. m.doBroadcast(result)
  68. stats.IncTotalRecordsOut()
  69. logger.Debugf("table node %s has consumed all data", m.name)
  70. }
  71. }()
  72. }
  73. func (m *TableNode) drainError(errCh chan<- error, err error, ctx api.StreamContext) {
  74. select {
  75. case errCh <- err:
  76. case <-ctx.Done():
  77. }
  78. return
  79. }
  80. func doGetTableSource(t string) (api.TableSource, error) {
  81. var s api.TableSource
  82. switch t {
  83. case "file":
  84. s = &extensions.FileSource{}
  85. default: //TODO table source plugin
  86. //s, err = plugins.GetTableSource(t)
  87. //if err != nil {
  88. // return nil, err
  89. //}
  90. }
  91. return s, nil
  92. }