|
@@ -76,8 +76,8 @@ import (
|
|
|
)
|
|
|
|
|
|
type mysqlConfig struct {
|
|
|
- url string `json:"url"`
|
|
|
- table string `json:"table"`
|
|
|
+ Url string `json:"url"`
|
|
|
+ Table string `json:"table"`
|
|
|
}
|
|
|
|
|
|
type mysqlSink struct {
|
|
@@ -92,11 +92,11 @@ func (m *mysqlSink) Configure(props map[string]interface{}) error {
|
|
|
if err != nil {
|
|
|
return fmt.Errorf("read properties %v fail with error: %v", props, err)
|
|
|
}
|
|
|
- if cfg.url == ""{
|
|
|
- return fmt.Errorf("property url is required")
|
|
|
+ if cfg.Url == "" {
|
|
|
+ return fmt.Errorf("property Url is required")
|
|
|
}
|
|
|
- if cfg.table == ""{
|
|
|
- return fmt.Errorf("property table is required")
|
|
|
+ if cfg.Table == "" {
|
|
|
+ return fmt.Errorf("property Table is required")
|
|
|
}
|
|
|
return nil
|
|
|
}
|
|
@@ -104,7 +104,7 @@ func (m *mysqlSink) Configure(props map[string]interface{}) error {
|
|
|
func (m *mysqlSink) Open(ctx api.StreamContext) (err error) {
|
|
|
logger := ctx.GetLogger()
|
|
|
logger.Debug("Opening mysql sink")
|
|
|
- m.db, err = sql.Open("mysql", m.conf.url)
|
|
|
+ m.db, err = sql.Open("mysql", m.conf.Url)
|
|
|
return
|
|
|
}
|
|
|
|
|
@@ -117,7 +117,7 @@ func (m *mysqlSink) Collect(ctx api.StreamContext, item interface{}) error {
|
|
|
// And it is possible to be any other kind of data if the sink `dataTemplate` is set
|
|
|
logger.Debugf("mysql sink receive %s", item)
|
|
|
//TODO hard coded column here. In production, we'd better get the column/value pair from the item
|
|
|
- sql := fmt.Sprintf("INSERT INTO %s (`name`) VALUES ('%s')", m.conf.table, v)
|
|
|
+ sql := fmt.Sprintf("INSERT INTO %s (`name`) VALUES ('%s')", m.conf.Table, v)
|
|
|
logger.Debugf(sql)
|
|
|
insert, err := m.db.Query(sql)
|
|
|
if err != nil {
|