analyzer.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package planner
  15. import (
  16. "fmt"
  17. "sort"
  18. "strings"
  19. "github.com/lf-edge/ekuiper/internal/binder/function"
  20. "github.com/lf-edge/ekuiper/internal/schema"
  21. "github.com/lf-edge/ekuiper/internal/xsql"
  22. "github.com/lf-edge/ekuiper/pkg/ast"
  23. "github.com/lf-edge/ekuiper/pkg/kv"
  24. )
  25. type streamInfo struct {
  26. stmt *ast.StreamStmt
  27. schema ast.StreamFields
  28. }
  29. // Analyze the select statement by decorating the info from stream statement.
  30. // Typically, set the correct stream name for fieldRefs
  31. func decorateStmt(s *ast.SelectStatement, store kv.KeyValue) ([]*streamInfo, []*ast.Call, []*ast.Call, error) {
  32. streamsFromStmt := xsql.GetStreams(s)
  33. streamStmts := make([]*streamInfo, len(streamsFromStmt))
  34. isSchemaless := false
  35. for i, s := range streamsFromStmt {
  36. streamStmt, err := xsql.GetDataSource(store, s)
  37. if err != nil {
  38. return nil, nil, nil, fmt.Errorf("fail to get stream %s, please check if stream is created", s)
  39. }
  40. si, err := convertStreamInfo(streamStmt)
  41. if err != nil {
  42. return nil, nil, nil, err
  43. }
  44. streamStmts[i] = si
  45. if si.schema == nil {
  46. isSchemaless = true
  47. }
  48. }
  49. if checkAliasReferenceCycle(s) {
  50. return nil, nil, nil, fmt.Errorf("select fields have cycled alias")
  51. }
  52. if !isSchemaless {
  53. aliasFieldTopoSort(s, streamStmts)
  54. }
  55. dsn := ast.DefaultStream
  56. if len(streamsFromStmt) == 1 {
  57. dsn = streamStmts[0].stmt.Name
  58. }
  59. // [fieldName][streamsName][*aliasRef] if alias, with special key alias/default. Each key has exactly one value
  60. fieldsMap := newFieldsMap(isSchemaless, dsn)
  61. if !isSchemaless {
  62. for _, streamStmt := range streamStmts {
  63. for _, field := range streamStmt.schema {
  64. fieldsMap.reserve(field.Name, streamStmt.stmt.Name)
  65. }
  66. }
  67. }
  68. var (
  69. walkErr error
  70. aliasFields []*ast.Field
  71. analyticFieldFuncs []*ast.Call
  72. analyticFuncs []*ast.Call
  73. )
  74. // Scan columns fields: bind all field refs, collect alias
  75. for i, f := range s.Fields {
  76. ast.WalkFunc(f.Expr, func(n ast.Node) bool {
  77. switch f := n.(type) {
  78. case *ast.FieldRef:
  79. walkErr = fieldsMap.bind(f)
  80. }
  81. return true
  82. })
  83. if walkErr != nil {
  84. return nil, nil, nil, walkErr
  85. }
  86. if f.AName != "" {
  87. aliasFields = append(aliasFields, &s.Fields[i])
  88. fieldsMap.bindAlias(f.AName)
  89. }
  90. }
  91. // bind alias field expressions
  92. for _, f := range aliasFields {
  93. ar, err := ast.NewAliasRef(f.Expr)
  94. if err != nil {
  95. walkErr = err
  96. } else {
  97. f.Expr = &ast.FieldRef{
  98. StreamName: ast.AliasStream,
  99. Name: f.AName,
  100. AliasRef: ar,
  101. }
  102. walkErr = fieldsMap.save(f.AName, ast.AliasStream, ar)
  103. for _, subF := range s.Fields {
  104. ast.WalkFunc(&subF, func(node ast.Node) bool {
  105. switch fr := node.(type) {
  106. case *ast.FieldRef:
  107. if fr.Name == f.AName {
  108. fr.StreamName = ast.AliasStream
  109. fr.AliasRef = ar
  110. }
  111. return false
  112. }
  113. return true
  114. })
  115. }
  116. }
  117. }
  118. // Bind field ref for alias AND set StreamName for all field ref
  119. ast.WalkFunc(s, func(n ast.Node) bool {
  120. switch f := n.(type) {
  121. case ast.Fields: // do not bind selection fields, should have done above
  122. return false
  123. case *ast.FieldRef:
  124. if f.StreamName != "" && f.StreamName != ast.DefaultStream {
  125. // check if stream exists
  126. found := false
  127. for _, sn := range streamsFromStmt {
  128. if sn == string(f.StreamName) {
  129. found = true
  130. break
  131. }
  132. }
  133. if !found {
  134. walkErr = fmt.Errorf("stream %s not found", f.StreamName)
  135. return true
  136. }
  137. }
  138. walkErr = fieldsMap.bind(f)
  139. }
  140. return true
  141. })
  142. if walkErr != nil {
  143. return nil, nil, nil, walkErr
  144. }
  145. walkErr = validate(s)
  146. // Collect all analytic function calls so that we can let them run firstly
  147. ast.WalkFunc(s, func(n ast.Node) bool {
  148. switch f := n.(type) {
  149. case ast.Fields:
  150. return false
  151. case *ast.Call:
  152. if function.IsAnalyticFunc(f.Name) {
  153. f.CachedField = fmt.Sprintf("%s_%s_%d", function.AnalyticPrefix, f.Name, f.FuncId)
  154. f.Cached = true
  155. analyticFuncs = append(analyticFuncs, &ast.Call{
  156. Name: f.Name,
  157. FuncId: f.FuncId,
  158. FuncType: f.FuncType,
  159. Args: f.Args,
  160. CachedField: f.CachedField,
  161. Partition: f.Partition,
  162. WhenExpr: f.WhenExpr,
  163. })
  164. }
  165. }
  166. return true
  167. })
  168. if walkErr != nil {
  169. return nil, nil, nil, walkErr
  170. }
  171. // walk sources at last to let them run firstly
  172. // because another clause may depend on the alias defined here
  173. for _, field := range s.Fields {
  174. var calls []*ast.Call
  175. ast.WalkFunc(&field, func(n ast.Node) bool {
  176. switch f := n.(type) {
  177. case *ast.Call:
  178. if function.IsAnalyticFunc(f.Name) {
  179. f.CachedField = fmt.Sprintf("%s_%s_%d", function.AnalyticPrefix, f.Name, f.FuncId)
  180. f.Cached = true
  181. calls = append([]*ast.Call{
  182. {
  183. Name: f.Name,
  184. FuncId: f.FuncId,
  185. FuncType: f.FuncType,
  186. Args: f.Args,
  187. CachedField: f.CachedField,
  188. Partition: f.Partition,
  189. WhenExpr: f.WhenExpr,
  190. },
  191. }, calls...)
  192. }
  193. }
  194. return true
  195. })
  196. analyticFieldFuncs = append(analyticFieldFuncs, calls...)
  197. }
  198. if walkErr != nil {
  199. return nil, nil, nil, walkErr
  200. }
  201. return streamStmts, analyticFuncs, analyticFieldFuncs, walkErr
  202. }
  203. type aliasTopoDegree struct {
  204. alias string
  205. degree int
  206. field ast.Field
  207. }
  208. type aliasTopoDegrees []*aliasTopoDegree
  209. func (a aliasTopoDegrees) Len() int {
  210. return len(a)
  211. }
  212. func (a aliasTopoDegrees) Less(i, j int) bool {
  213. if a[i].degree == a[j].degree {
  214. return a[i].alias < a[j].alias
  215. }
  216. return a[i].degree < a[j].degree
  217. }
  218. func (a aliasTopoDegrees) Swap(i, j int) {
  219. a[i], a[j] = a[j], a[i]
  220. }
  221. // checkAliasReferenceCycle checks whether exists select a + 1 as b, b + 1 as a from demo;
  222. func checkAliasReferenceCycle(s *ast.SelectStatement) bool {
  223. aliasRef := make(map[string]map[string]struct{})
  224. for _, field := range s.Fields {
  225. if len(field.AName) > 0 {
  226. aliasRef[field.AName] = make(map[string]struct{})
  227. }
  228. }
  229. if len(aliasRef) < 1 {
  230. return false
  231. }
  232. hasCycleAlias := false
  233. for _, field := range s.Fields {
  234. if len(field.AName) > 0 {
  235. ast.WalkFunc(&field, func(node ast.Node) bool {
  236. switch f := node.(type) {
  237. case *ast.FieldRef:
  238. if len(f.Name) > 0 {
  239. if f.Name == field.AName {
  240. return true
  241. }
  242. _, ok := aliasRef[f.Name]
  243. if ok {
  244. aliasRef[field.AName][f.Name] = struct{}{}
  245. v, ok1 := aliasRef[f.Name]
  246. if ok1 {
  247. _, ok2 := v[field.AName]
  248. if ok2 {
  249. hasCycleAlias = true
  250. return false
  251. }
  252. }
  253. }
  254. }
  255. }
  256. return true
  257. })
  258. if hasCycleAlias {
  259. return true
  260. }
  261. }
  262. }
  263. return false
  264. }
  265. func aliasFieldTopoSort(s *ast.SelectStatement, streamStmts []*streamInfo) {
  266. nonAliasFields := make([]ast.Field, 0)
  267. aliasDegreeMap := make(map[string]*aliasTopoDegree)
  268. for _, field := range s.Fields {
  269. if field.AName != "" {
  270. aliasDegreeMap[field.AName] = &aliasTopoDegree{
  271. alias: field.AName,
  272. degree: -1,
  273. field: field,
  274. }
  275. } else {
  276. nonAliasFields = append(nonAliasFields, field)
  277. }
  278. }
  279. for !isAliasFieldTopoSortFinish(aliasDegreeMap) {
  280. for _, field := range s.Fields {
  281. if field.AName != "" && aliasDegreeMap[field.AName].degree < 0 {
  282. skip := false
  283. degree := 0
  284. ast.WalkFunc(field.Expr, func(node ast.Node) bool {
  285. switch f := node.(type) {
  286. case *ast.FieldRef:
  287. if fDegree, ok := aliasDegreeMap[f.Name]; ok && fDegree.degree >= 0 {
  288. if degree < fDegree.degree+1 {
  289. degree = fDegree.degree + 1
  290. }
  291. return true
  292. }
  293. if !isFieldRefNameExists(f.Name, streamStmts) {
  294. skip = true
  295. return false
  296. }
  297. }
  298. return true
  299. })
  300. if !skip {
  301. aliasDegreeMap[field.AName].degree = degree
  302. }
  303. }
  304. }
  305. }
  306. as := make(aliasTopoDegrees, 0)
  307. for _, degree := range aliasDegreeMap {
  308. as = append(as, degree)
  309. }
  310. sort.Sort(as)
  311. s.Fields = make([]ast.Field, 0)
  312. for _, d := range as {
  313. s.Fields = append(s.Fields, d.field)
  314. }
  315. s.Fields = append(s.Fields, nonAliasFields...)
  316. }
  317. func isFieldRefNameExists(name string, streamStmts []*streamInfo) bool {
  318. for _, streamStmt := range streamStmts {
  319. for _, col := range streamStmt.schema {
  320. if col.Name == name {
  321. return true
  322. }
  323. }
  324. }
  325. return false
  326. }
  327. func isAliasFieldTopoSortFinish(aliasDegrees map[string]*aliasTopoDegree) bool {
  328. for _, aliasDegree := range aliasDegrees {
  329. if aliasDegree.degree < 0 {
  330. return false
  331. }
  332. }
  333. return true
  334. }
  335. func validate(s *ast.SelectStatement) (err error) {
  336. isAggStmt := false
  337. if xsql.IsAggregate(s.Condition) {
  338. return fmt.Errorf("Not allowed to call aggregate functions in WHERE clause.")
  339. }
  340. if !allAggregate(s.Having) {
  341. return fmt.Errorf("Not allowed to call non-aggregate functions in HAVING clause.")
  342. }
  343. for _, d := range s.Dimensions {
  344. isAggStmt = true
  345. if xsql.IsAggregate(d.Expr) {
  346. return fmt.Errorf("Not allowed to call aggregate functions in GROUP BY clause.")
  347. }
  348. }
  349. if s.Joins != nil {
  350. isAggStmt = true
  351. }
  352. ast.WalkFunc(s, func(n ast.Node) bool {
  353. switch f := n.(type) {
  354. case *ast.Call:
  355. // aggregate call should not have any aggregate arg
  356. if function.IsAggFunc(f.Name) {
  357. for _, arg := range f.Args {
  358. tr := xsql.IsAggregate(arg)
  359. if tr {
  360. err = fmt.Errorf("invalid argument for func %s: aggregate argument is not allowed", f.Name)
  361. return false
  362. }
  363. }
  364. }
  365. if isAggStmt && function.NoAggFunc(f.Name) {
  366. err = fmt.Errorf("function %s is not allowed in an aggregate query", f.Name)
  367. return false
  368. }
  369. case *ast.Window:
  370. // agg func check is done in dimensions.
  371. // in window trigger condition, NoAggFunc is allowed unlike normal condition so return false to skip that check
  372. return false
  373. }
  374. return true
  375. })
  376. return
  377. }
  378. // file-private functions below
  379. // allAggregate checks if all expressions of binary expression are aggregate
  380. func allAggregate(expr ast.Expr) (r bool) {
  381. r = true
  382. ast.WalkFunc(expr, func(n ast.Node) bool {
  383. switch f := expr.(type) {
  384. case *ast.BinaryExpr:
  385. switch f.OP {
  386. case ast.SUBSET, ast.ARROW:
  387. // do nothing
  388. default:
  389. r = allAggregate(f.LHS) && allAggregate(f.RHS)
  390. return false
  391. }
  392. case *ast.Call, *ast.FieldRef:
  393. if !xsql.IsAggregate(f) {
  394. r = false
  395. return false
  396. }
  397. }
  398. return true
  399. })
  400. return
  401. }
  402. func convertStreamInfo(streamStmt *ast.StreamStmt) (*streamInfo, error) {
  403. ss := streamStmt.StreamFields
  404. var err error
  405. if streamStmt.Options.SCHEMAID != "" {
  406. ss, err = schema.InferFromSchemaFile(streamStmt.Options.FORMAT, streamStmt.Options.SCHEMAID)
  407. if err != nil {
  408. return nil, err
  409. }
  410. }
  411. return &streamInfo{
  412. stmt: streamStmt,
  413. schema: ss,
  414. }, nil
  415. }
  416. type fieldsMap struct {
  417. content map[string]streamFieldStore
  418. aliasNames map[string]struct{}
  419. isSchemaless bool
  420. defaultStream ast.StreamName
  421. }
  422. func newFieldsMap(isSchemaless bool, defaultStream ast.StreamName) *fieldsMap {
  423. return &fieldsMap{content: make(map[string]streamFieldStore), aliasNames: map[string]struct{}{}, isSchemaless: isSchemaless, defaultStream: defaultStream}
  424. }
  425. func (f *fieldsMap) reserve(fieldName string, streamName ast.StreamName) {
  426. lname := strings.ToLower(fieldName)
  427. if fm, ok := f.content[lname]; ok {
  428. fm.add(streamName)
  429. } else {
  430. fm := newStreamFieldStore(f.isSchemaless, f.defaultStream)
  431. fm.add(streamName)
  432. f.content[lname] = fm
  433. }
  434. }
  435. func (f *fieldsMap) save(fieldName string, streamName ast.StreamName, field *ast.AliasRef) error {
  436. lname := strings.ToLower(fieldName)
  437. fm, ok := f.content[lname]
  438. if !ok {
  439. if streamName == ast.AliasStream || f.isSchemaless {
  440. fm = newStreamFieldStore(f.isSchemaless, f.defaultStream)
  441. f.content[lname] = fm
  442. } else {
  443. return fmt.Errorf("unknown field %s", fieldName)
  444. }
  445. }
  446. err := fm.ref(streamName, field)
  447. if err != nil {
  448. return fmt.Errorf("%s%s", err, fieldName)
  449. }
  450. return nil
  451. }
  452. func (f *fieldsMap) bindAlias(aliasName string) {
  453. f.aliasNames[aliasName] = struct{}{}
  454. }
  455. func (f *fieldsMap) bind(fr *ast.FieldRef) error {
  456. lname := strings.ToLower(fr.Name)
  457. fm, ok1 := f.content[lname]
  458. _, ok2 := f.aliasNames[lname]
  459. if !ok1 && !ok2 {
  460. if f.isSchemaless && fr.Name != "" {
  461. fm = newStreamFieldStore(f.isSchemaless, f.defaultStream)
  462. f.content[lname] = fm
  463. } else {
  464. return fmt.Errorf("unknown field %s", fr.Name)
  465. }
  466. }
  467. if fm != nil {
  468. err := fm.bindRef(fr)
  469. if err != nil {
  470. return fmt.Errorf("%s%s", err, fr.Name)
  471. }
  472. }
  473. return nil
  474. }
  475. type streamFieldStore interface {
  476. add(k ast.StreamName)
  477. ref(k ast.StreamName, v *ast.AliasRef) error
  478. bindRef(f *ast.FieldRef) error
  479. }
  480. func newStreamFieldStore(isSchemaless bool, defaultStream ast.StreamName) streamFieldStore {
  481. if !isSchemaless {
  482. return &streamFieldMap{content: make(map[ast.StreamName]*ast.AliasRef)}
  483. } else {
  484. return &streamFieldMapSchemaless{content: make(map[ast.StreamName]*ast.AliasRef), defaultStream: defaultStream}
  485. }
  486. }
  487. type streamFieldMap struct {
  488. content map[ast.StreamName]*ast.AliasRef
  489. }
  490. // add the stream name must not be default.
  491. // This is used when traversing stream schema
  492. func (s *streamFieldMap) add(k ast.StreamName) {
  493. s.content[k] = nil
  494. }
  495. // bind for schema field, all keys must be created before running bind
  496. // can bind alias & col. For alias, the stream name must be empty; For col, the field must be a col
  497. func (s *streamFieldMap) ref(k ast.StreamName, v *ast.AliasRef) error {
  498. if k == ast.AliasStream { // must not exist, save alias ref for alias
  499. _, ok := s.content[k]
  500. if ok {
  501. return fmt.Errorf("duplicate alias ")
  502. }
  503. s.content[k] = v
  504. } else { // the key must exist after the schema travers, do validation
  505. if k == ast.DefaultStream { // In schema mode, default stream won't be a key
  506. l := len(s.content)
  507. if l == 0 {
  508. return fmt.Errorf("unknow field ")
  509. } else if l == 1 {
  510. // valid, do nothing
  511. } else {
  512. return fmt.Errorf("ambiguous field ")
  513. }
  514. } else {
  515. _, ok := s.content[k]
  516. if !ok {
  517. return fmt.Errorf("unknow field %s.", k)
  518. }
  519. }
  520. }
  521. return nil
  522. }
  523. func (s *streamFieldMap) bindRef(fr *ast.FieldRef) error {
  524. l := len(s.content)
  525. if fr.StreamName == "" {
  526. fr.StreamName = ast.DefaultStream
  527. }
  528. k := fr.StreamName
  529. if k == ast.DefaultStream {
  530. switch l {
  531. case 0:
  532. return fmt.Errorf("unknown field ")
  533. case 1: // if alias, return this
  534. for sk, sv := range s.content {
  535. fr.RefSelection(sv)
  536. fr.StreamName = sk
  537. }
  538. return nil
  539. default:
  540. r, ok := s.content[ast.AliasStream] // if alias exists
  541. if ok {
  542. fr.RefSelection(r)
  543. fr.StreamName = ast.AliasStream
  544. return nil
  545. } else {
  546. return fmt.Errorf("ambiguous field ")
  547. }
  548. }
  549. } else {
  550. r, ok := s.content[k]
  551. if ok {
  552. fr.RefSelection(r)
  553. return nil
  554. } else {
  555. return fmt.Errorf("unknown field %s.", k)
  556. }
  557. }
  558. }
  559. type streamFieldMapSchemaless struct {
  560. content map[ast.StreamName]*ast.AliasRef
  561. defaultStream ast.StreamName
  562. }
  563. // add this should not be called for schemaless
  564. func (s *streamFieldMapSchemaless) add(k ast.StreamName) {
  565. s.content[k] = nil
  566. }
  567. // bind for schemaless field, create column if not exist
  568. // can bind alias & col. For alias, the stream name must be empty; For col, the field must be a col
  569. func (s *streamFieldMapSchemaless) ref(k ast.StreamName, v *ast.AliasRef) error {
  570. if k == ast.AliasStream { // must not exist
  571. _, ok := s.content[k]
  572. if ok {
  573. return fmt.Errorf("duplicate alias ")
  574. }
  575. s.content[k] = v
  576. } else { // the key may or may not exist. But always have only one default stream field.
  577. // Replace with stream name if another stream found. The key can be duplicate
  578. l := len(s.content)
  579. if k == ast.DefaultStream { // In schemaless mode, default stream can only exist when length is 1
  580. if l < 1 {
  581. // valid, do nothing
  582. } else {
  583. return fmt.Errorf("ambiguous field ")
  584. }
  585. } else {
  586. if l == 1 {
  587. for sk := range s.content {
  588. if sk == ast.DefaultStream {
  589. delete(s.content, k)
  590. }
  591. }
  592. }
  593. }
  594. }
  595. return nil
  596. }
  597. func (s *streamFieldMapSchemaless) bindRef(fr *ast.FieldRef) error {
  598. l := len(s.content)
  599. if fr.StreamName == "" || fr.StreamName == ast.DefaultStream {
  600. if l == 1 {
  601. for sk := range s.content {
  602. fr.StreamName = sk
  603. }
  604. }
  605. }
  606. k := fr.StreamName
  607. if k == ast.DefaultStream {
  608. switch l {
  609. case 0: // must be a column because alias are fields and have been traversed
  610. // reserve a hole and do nothing
  611. fr.StreamName = s.defaultStream
  612. s.content[s.defaultStream] = nil
  613. return nil
  614. case 1: // if alias or single col, return this
  615. for sk, sv := range s.content {
  616. fr.RefSelection(sv)
  617. fr.StreamName = sk
  618. }
  619. return nil
  620. default:
  621. r, ok := s.content[ast.AliasStream] // if alias exists
  622. if ok {
  623. fr.RefSelection(r)
  624. fr.StreamName = ast.AliasStream
  625. return nil
  626. } else {
  627. fr.StreamName = s.defaultStream
  628. }
  629. }
  630. }
  631. if fr.StreamName != ast.DefaultStream {
  632. r, ok := s.content[k]
  633. if !ok { // reserver a hole
  634. s.content[k] = nil
  635. } else {
  636. fr.RefSelection(r)
  637. }
  638. return nil
  639. }
  640. return fmt.Errorf("ambiguous field ")
  641. }