package xsql import ( "context" "crypto/md5" "crypto/sha1" "crypto/sha256" "crypto/sha512" b64 "encoding/base64" "fmt" "github.com/PaesslerAG/gval" "github.com/PaesslerAG/jsonpath" "github.com/emqx/kuiper/common" "github.com/google/uuid" "hash" "io" "math" "reflect" "strconv" "strings" ) func convCall(name string, args []interface{}) (interface{}, bool) { switch name { case "cast": if v, ok := args[1].(string); ok { v = strings.ToLower(v) switch v { case "bigint": if v1, ok1 := args[0].(int); ok1 { return v1, true } else if v1, ok1 := args[0].(float64); ok1 { return int(v1), true } else if v1, ok1 := args[0].(string); ok1 { if temp, err := strconv.Atoi(v1); err == nil { return temp, true } else { return err, false } } else if v1, ok1 := args[0].(bool); ok1 { if v1 { return 1, true } else { return 0, true } } else { return fmt.Errorf("Not supported type conversion."), false } case "float": if v1, ok1 := args[0].(int); ok1 { return float64(v1), true } else if v1, ok1 := args[0].(float64); ok1 { return v1, true } else if v1, ok1 := args[0].(string); ok1 { if temp, err := strconv.ParseFloat(v1, 64); err == nil { return temp, true } else { return err, false } } else if v1, ok1 := args[0].(bool); ok1 { if v1 { return 1.0, true } else { return 0.0, true } } else { return fmt.Errorf("Not supported type conversion."), false } case "string": if v1, ok1 := args[0].(int); ok1 { return fmt.Sprintf("%d", v1), true } else if v1, ok1 := args[0].(float64); ok1 { return fmt.Sprintf("%g", v1), true } else if v1, ok1 := args[0].(string); ok1 { return v1, true } else if v1, ok1 := args[0].(bool); ok1 { if v1 { return "true", true } else { return "false", true } } else { return fmt.Errorf("Not supported type conversion."), false } case "boolean": if v1, ok1 := args[0].(int); ok1 { if v1 == 0 { return false, true } else { return true, true } } else if v1, ok1 := args[0].(float64); ok1 { if v1 == 0.0 { return false, true } else { return true, true } } else if v1, ok1 := args[0].(string); ok1 { if temp, err := strconv.ParseBool(v1); err == nil { return temp, true } else { return err, false } } else if v1, ok1 := args[0].(bool); ok1 { return v1, true } else { return fmt.Errorf("Not supported type conversion."), false } case "datetime": return fmt.Errorf("Not supported type conversion."), false default: return fmt.Errorf("Unknow type, only support bigint, float, string, boolean and datetime."), false } } else { return fmt.Errorf("Expect string type for the 2nd parameter."), false } case "chr": if v, ok := args[0].(int); ok { return rune(v), true } else if v, ok := args[0].(float64); ok { temp := int(v) return rune(temp), true } else if v, ok := args[0].(string); ok { if len(v) > 1 { return fmt.Errorf("Parameter length cannot larger than 1."), false } r := []rune(v) return r[0], true } else { return fmt.Errorf("Only bigint, float and string type can be convert to char type."), false } case "encode": if v, ok := args[1].(string); ok { v = strings.ToLower(v) if v == "base64" { if v1, ok1 := args[0].(string); ok1 { return b64.StdEncoding.EncodeToString([]byte(v1)), true } else { return fmt.Errorf("Only string type can be encoded."), false } } else { return fmt.Errorf("Only base64 encoding is supported."), false } } case "trunc": var v0 float64 if v1, ok := args[0].(int); ok { v0 = float64(v1) } else if v1, ok := args[0].(float64); ok { v0 = v1 } else { return fmt.Errorf("Only int and float type can be truncated."), false } if v2, ok := args[1].(int); ok { return toFixed(v0, v2), true } else { return fmt.Errorf("The 2nd parameter must be int value."), false } default: return fmt.Errorf("Not supported function name %s", name), false } return nil, false } func round(num float64) int { return int(num + math.Copysign(0.5, num)) } func toFixed(num float64, precision int) float64 { output := math.Pow(10, float64(precision)) return float64(round(num*output)) / output } func hashCall(name string, args []interface{}) (interface{}, bool) { arg0 := common.ToString(args[0]) var h hash.Hash switch name { case "md5": h = md5.New() case "sha1": h = sha1.New() case "sha256": h = sha256.New() case "sha384": h = sha512.New384() case "sha512": h = sha512.New() default: return fmt.Errorf("unknown hash function name %s", name), false } io.WriteString(h, arg0) return fmt.Sprintf("%x", h.Sum(nil)), true } func otherCall(name string, args []interface{}) (interface{}, bool) { switch name { case "isnull": if args[0] == nil { return true, true } else { v := reflect.ValueOf(args[0]) switch v.Kind() { case reflect.Slice, reflect.Map: return v.IsNil(), true default: return false, true } } return false, true case "newuuid": if uuid, err := uuid.NewUUID(); err != nil { return err, false } else { return uuid.String(), true } case "tstamp": return common.GetNowInMilli(), true case "mqtt": if v, ok := args[0].(string); ok { return v, true } return nil, false case "meta": return args[0], true case "cardinality": val := reflect.ValueOf(args[0]) if val.Kind() == reflect.Slice { return val.Len(), true } return 0, true default: return fmt.Errorf("unknown function name %s", name), false } } func jsonCall(name string, args []interface{}) (interface{}, bool) { var input interface{} switch reflect.TypeOf(args[0]).Kind() { case reflect.Map: input = convertToInterfaceArr(args[0].(map[string]interface{})) case reflect.Slice: input = convertSlice(args[0]) default: return fmt.Errorf("%s function error: the first argument must be a map but got %v", name, args[0]), false } builder := gval.Full(jsonpath.PlaceholderExtension()) path, err := builder.NewEvaluable(args[1].(string)) if err != nil { return fmt.Errorf("%s function error: %s", name, err), false } result, err := path(context.Background(), input) if err != nil { if name == "json_path_exists" { return false, true } return fmt.Errorf("%s function error: %s", name, err), false } switch name { case "json_path_query": return result, true case "json_path_query_first": if arr, ok := result.([]interface{}); ok { return arr[0], true } else { return fmt.Errorf("%s function error: query result (%v) is not an array", name, result), false } case "json_path_exists": e := true switch reflect.TypeOf(result).Kind() { case reflect.Slice, reflect.Array: e = reflect.ValueOf(result).Len() > 0 default: e = result != nil } return e, true } return fmt.Errorf("invalid function name: %s", name), false } func convertToInterfaceArr(orig map[string]interface{}) map[string]interface{} { result := make(map[string]interface{}) for k, v := range orig { switch reflect.TypeOf(v).Kind() { case reflect.Slice: result[k] = convertSlice(v) case reflect.Map: result[k] = convertToInterfaceArr(v.(map[string]interface{})) default: result[k] = v } } return result } func convertSlice(v interface{}) []interface{} { value := reflect.ValueOf(v) tempArr := make([]interface{}, value.Len()) for i := 0; i < value.Len(); i++ { item := value.Index(i) if item.Kind() == reflect.Map { tempArr[i] = convertToInterfaceArr(item.Interface().(map[string]interface{})) } else if item.Kind() == reflect.Slice { tempArr[i] = convertSlice(item.Interface()) } else { tempArr[i] = item.Interface() } } return tempArr }