|
@@ -193,25 +193,6 @@ func (m *sqlSink) Collect(ctx api.StreamContext, item interface{}) error {
|
|
|
return m.writeToDB(ctx, &sqlStr)
|
|
|
}
|
|
|
return nil
|
|
|
-
|
|
|
- case interface{}:
|
|
|
- mapData, ok := v.(map[string]interface{})
|
|
|
- if !ok {
|
|
|
- ctx.GetLogger().Errorf("unsupported type: %T", v)
|
|
|
- return fmt.Errorf("unsupported type: %T", v)
|
|
|
- }
|
|
|
-
|
|
|
- keys, vars, err = m.conf.buildSql(ctx, mapData)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- values = append(values, vars)
|
|
|
- if keys != nil {
|
|
|
- sqlStr := fmt.Sprintf("INSERT INTO %s (%s) values ", table, strings.Join(keys, ",")) + strings.Join(values, ",") + ";"
|
|
|
- return m.writeToDB(ctx, &sqlStr)
|
|
|
- }
|
|
|
- return nil
|
|
|
-
|
|
|
default: // never happen
|
|
|
return fmt.Errorf("unsupported type: %T", item)
|
|
|
}
|