rpc.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package server
  15. import (
  16. "bytes"
  17. "encoding/json"
  18. "fmt"
  19. "github.com/lf-edge/ekuiper/internal/pkg/model"
  20. "github.com/lf-edge/ekuiper/internal/plugin"
  21. "github.com/lf-edge/ekuiper/internal/plugin/native"
  22. "github.com/lf-edge/ekuiper/internal/plugin/portable"
  23. "github.com/lf-edge/ekuiper/internal/service"
  24. "github.com/lf-edge/ekuiper/internal/topo/sink"
  25. "strings"
  26. "time"
  27. )
  28. const QueryRuleId = "internal-ekuiper_query_rule"
  29. type Server int
  30. func (t *Server) CreateQuery(sql string, reply *string) error {
  31. if _, ok := registry.Load(QueryRuleId); ok {
  32. stopQuery()
  33. }
  34. tp, err := ruleProcessor.ExecQuery(QueryRuleId, sql)
  35. if err != nil {
  36. return err
  37. } else {
  38. rs := &RuleState{Name: QueryRuleId, Topology: tp, Triggered: true}
  39. registry.Store(QueryRuleId, rs)
  40. msg := fmt.Sprintf("Query was submit successfully.")
  41. logger.Println(msg)
  42. *reply = fmt.Sprintf(msg)
  43. }
  44. return nil
  45. }
  46. func stopQuery() {
  47. if rs, ok := registry.Load(QueryRuleId); ok {
  48. logger.Printf("stop the query.")
  49. (*rs.Topology).Cancel()
  50. registry.Delete(QueryRuleId)
  51. }
  52. }
  53. /**
  54. * qid is not currently used.
  55. */
  56. func (t *Server) GetQueryResult(_ string, reply *string) error {
  57. if rs, ok := registry.Load(QueryRuleId); ok {
  58. c := (*rs.Topology).GetContext()
  59. if c != nil && c.Err() != nil {
  60. return c.Err()
  61. }
  62. }
  63. sink.QR.LastFetch = time.Now()
  64. sink.QR.Mux.Lock()
  65. if len(sink.QR.Results) > 0 {
  66. *reply = strings.Join(sink.QR.Results, "")
  67. sink.QR.Results = make([]string, 10)
  68. } else {
  69. *reply = ""
  70. }
  71. sink.QR.Mux.Unlock()
  72. return nil
  73. }
  74. func (t *Server) Stream(stream string, reply *string) error {
  75. content, err := streamProcessor.ExecStmt(stream)
  76. if err != nil {
  77. return fmt.Errorf("Stream command error: %s", err)
  78. } else {
  79. for _, c := range content {
  80. *reply = *reply + fmt.Sprintln(c)
  81. }
  82. }
  83. return nil
  84. }
  85. func (t *Server) CreateRule(rule *model.RPCArgDesc, reply *string) error {
  86. r, err := ruleProcessor.ExecCreate(rule.Name, rule.Json)
  87. if err != nil {
  88. return fmt.Errorf("Create rule error : %s.", err)
  89. } else {
  90. *reply = fmt.Sprintf("Rule %s was created successfully, please use 'bin/kuiper getstatus rule %s' command to get rule status.", rule.Name, rule.Name)
  91. }
  92. //Start the rule
  93. rs, err := createRuleState(r)
  94. if err != nil {
  95. return err
  96. }
  97. err = doStartRule(rs)
  98. if err != nil {
  99. return err
  100. }
  101. return nil
  102. }
  103. func (t *Server) GetStatusRule(name string, reply *string) error {
  104. if r, err := getRuleStatus(name); err != nil {
  105. return err
  106. } else {
  107. *reply = r
  108. }
  109. return nil
  110. }
  111. func (t *Server) GetTopoRule(name string, reply *string) error {
  112. if r, err := getRuleTopo(name); err != nil {
  113. return err
  114. } else {
  115. dst := &bytes.Buffer{}
  116. if err = json.Indent(dst, []byte(r), "", " "); err != nil {
  117. *reply = r
  118. } else {
  119. *reply = dst.String()
  120. }
  121. }
  122. return nil
  123. }
  124. func (t *Server) StartRule(name string, reply *string) error {
  125. if err := startRule(name); err != nil {
  126. return err
  127. } else {
  128. *reply = fmt.Sprintf("Rule %s was started", name)
  129. }
  130. return nil
  131. }
  132. func (t *Server) StopRule(name string, reply *string) error {
  133. *reply = stopRule(name)
  134. return nil
  135. }
  136. func (t *Server) RestartRule(name string, reply *string) error {
  137. err := restartRule(name)
  138. if err != nil {
  139. return err
  140. }
  141. *reply = fmt.Sprintf("Rule %s was restarted.", name)
  142. return nil
  143. }
  144. func (t *Server) DescRule(name string, reply *string) error {
  145. r, err := ruleProcessor.ExecDesc(name)
  146. if err != nil {
  147. return fmt.Errorf("Desc rule error : %s.", err)
  148. } else {
  149. *reply = r
  150. }
  151. return nil
  152. }
  153. func (t *Server) ShowRules(_ int, reply *string) error {
  154. r, err := getAllRulesWithStatus()
  155. if err != nil {
  156. return fmt.Errorf("Show rule error : %s.", err)
  157. }
  158. if len(r) == 0 {
  159. *reply = "No rule definitions are found."
  160. } else {
  161. result, err := json.Marshal(r)
  162. if err != nil {
  163. return fmt.Errorf("Show rule error : %s.", err)
  164. }
  165. dst := &bytes.Buffer{}
  166. if err := json.Indent(dst, result, "", " "); err != nil {
  167. return fmt.Errorf("Show rule error : %s.", err)
  168. }
  169. *reply = dst.String()
  170. }
  171. return nil
  172. }
  173. func (t *Server) DropRule(name string, reply *string) error {
  174. deleteRule(name)
  175. r, err := ruleProcessor.ExecDrop(name)
  176. if err != nil {
  177. return fmt.Errorf("Drop rule error : %s.", err)
  178. } else {
  179. err := t.StopRule(name, reply)
  180. if err != nil {
  181. return err
  182. }
  183. }
  184. *reply = r
  185. return nil
  186. }
  187. func (t *Server) CreatePlugin(arg *model.PluginDesc, reply *string) error {
  188. pt := plugin.PluginType(arg.Type)
  189. p, err := getPluginByJson(arg, pt)
  190. if err != nil {
  191. return fmt.Errorf("Create plugin error: %s", err)
  192. }
  193. if p.GetFile() == "" {
  194. return fmt.Errorf("Create plugin error: Missing plugin file url.")
  195. }
  196. if pt == plugin.PORTABLE {
  197. err = portable.GetManager().Register(p)
  198. } else {
  199. err = native.GetManager().Register(pt, p)
  200. }
  201. if err != nil {
  202. return fmt.Errorf("Create plugin error: %s", err)
  203. } else {
  204. *reply = fmt.Sprintf("Plugin %s is created.", p.GetName())
  205. }
  206. return nil
  207. }
  208. func (t *Server) RegisterPlugin(arg *model.PluginDesc, reply *string) error {
  209. p, err := getPluginByJson(arg, plugin.FUNCTION)
  210. if err != nil {
  211. return fmt.Errorf("Register plugin functions error: %s", err)
  212. }
  213. if len(p.GetSymbols()) == 0 {
  214. return fmt.Errorf("Register plugin functions error: Missing function list.")
  215. }
  216. err = native.GetManager().RegisterFuncs(p.GetName(), p.GetSymbols())
  217. if err != nil {
  218. return fmt.Errorf("Create plugin error: %s", err)
  219. } else {
  220. *reply = fmt.Sprintf("Plugin %s is created.", p.GetName())
  221. }
  222. return nil
  223. }
  224. func (t *Server) DropPlugin(arg *model.PluginDesc, reply *string) error {
  225. pt := plugin.PluginType(arg.Type)
  226. p, err := getPluginByJson(arg, pt)
  227. if err != nil {
  228. return fmt.Errorf("Drop plugin error: %s", err)
  229. }
  230. if pt == plugin.PORTABLE {
  231. err = portable.GetManager().Delete(p.GetName())
  232. if err != nil {
  233. return fmt.Errorf("Drop plugin error: %s", err)
  234. } else {
  235. *reply = fmt.Sprintf("Plugin %s is dropped .", p.GetName())
  236. }
  237. } else {
  238. err = native.GetManager().Delete(pt, p.GetName(), arg.Stop)
  239. if err != nil {
  240. return fmt.Errorf("Drop plugin error: %s", err)
  241. } else {
  242. if arg.Stop {
  243. *reply = fmt.Sprintf("Plugin %s is dropped and Kuiper will be stopped.", p.GetName())
  244. } else {
  245. *reply = fmt.Sprintf("Plugin %s is dropped and Kuiper must restart for the change to take effect.", p.GetName())
  246. }
  247. }
  248. }
  249. return nil
  250. }
  251. func (t *Server) ShowPlugins(arg int, reply *string) error {
  252. pt := plugin.PluginType(arg)
  253. l := native.GetManager().List(pt)
  254. if len(l) == 0 {
  255. l = append(l, "No plugin is found.")
  256. }
  257. *reply = strings.Join(l, "\n")
  258. return nil
  259. }
  260. func (t *Server) ShowUdfs(_ int, reply *string) error {
  261. l := native.GetManager().ListSymbols()
  262. if len(l) == 0 {
  263. l = append(l, "No udf is found.")
  264. }
  265. *reply = strings.Join(l, "\n")
  266. return nil
  267. }
  268. func (t *Server) DescPlugin(arg *model.PluginDesc, reply *string) error {
  269. pt := plugin.PluginType(arg.Type)
  270. p, err := getPluginByJson(arg, pt)
  271. if err != nil {
  272. return fmt.Errorf("Describe plugin error: %s", err)
  273. }
  274. var m interface{}
  275. var ok bool
  276. if pt == plugin.PORTABLE {
  277. m, ok = portable.GetManager().GetPluginInfo(p.GetName())
  278. } else {
  279. m, ok = native.GetManager().GetPluginInfo(pt, p.GetName())
  280. }
  281. if !ok {
  282. return fmt.Errorf("Describe plugin error: not found")
  283. } else {
  284. r, err := marshalDesc(m)
  285. if err != nil {
  286. return fmt.Errorf("Describe plugin error: %v", err)
  287. }
  288. *reply = r
  289. }
  290. return nil
  291. }
  292. func (t *Server) DescUdf(arg string, reply *string) error {
  293. m, ok := native.GetManager().GetPluginBySymbol(plugin.FUNCTION, arg)
  294. if !ok {
  295. return fmt.Errorf("Describe udf error: not found")
  296. } else {
  297. j := map[string]string{
  298. "name": arg,
  299. "plugin": m,
  300. }
  301. r, err := marshalDesc(j)
  302. if err != nil {
  303. return fmt.Errorf("Describe udf error: %v", err)
  304. }
  305. *reply = r
  306. }
  307. return nil
  308. }
  309. func (t *Server) CreateService(arg *model.RPCArgDesc, reply *string) error {
  310. sd := &service.ServiceCreationRequest{}
  311. if arg.Json != "" {
  312. if err := json.Unmarshal([]byte(arg.Json), sd); err != nil {
  313. return fmt.Errorf("Parse service %s error : %s.", arg.Json, err)
  314. }
  315. }
  316. if sd.Name != arg.Name {
  317. return fmt.Errorf("Create service error: name mismatch.")
  318. }
  319. if sd.File == "" {
  320. return fmt.Errorf("Create service error: Missing service file url.")
  321. }
  322. err := service.GetManager().Create(sd)
  323. if err != nil {
  324. return fmt.Errorf("Create service error: %s", err)
  325. } else {
  326. *reply = fmt.Sprintf("Service %s is created.", arg.Name)
  327. }
  328. return nil
  329. }
  330. func (t *Server) DescService(name string, reply *string) error {
  331. s, err := service.GetManager().Get(name)
  332. if err != nil {
  333. return fmt.Errorf("Desc service error : %s.", err)
  334. } else {
  335. r, err := marshalDesc(s)
  336. if err != nil {
  337. return fmt.Errorf("Describe service error: %v", err)
  338. }
  339. *reply = r
  340. }
  341. return nil
  342. }
  343. func (t *Server) DescServiceFunc(name string, reply *string) error {
  344. s, err := service.GetManager().GetFunction(name)
  345. if err != nil {
  346. return fmt.Errorf("Desc service func error : %s.", err)
  347. } else {
  348. r, err := marshalDesc(s)
  349. if err != nil {
  350. return fmt.Errorf("Describe service func error: %v", err)
  351. }
  352. *reply = r
  353. }
  354. return nil
  355. }
  356. func (t *Server) DropService(name string, reply *string) error {
  357. err := service.GetManager().Delete(name)
  358. if err != nil {
  359. return fmt.Errorf("Drop service error : %s.", err)
  360. }
  361. *reply = fmt.Sprintf("Service %s is dropped", name)
  362. return nil
  363. }
  364. func (t *Server) ShowServices(_ int, reply *string) error {
  365. s, err := service.GetManager().List()
  366. if err != nil {
  367. return fmt.Errorf("Show service error: %s.", err)
  368. }
  369. if len(s) == 0 {
  370. *reply = "No service definitions are found."
  371. } else {
  372. r, err := marshalDesc(s)
  373. if err != nil {
  374. return fmt.Errorf("Show service error: %v", err)
  375. }
  376. *reply = r
  377. }
  378. return nil
  379. }
  380. func (t *Server) ShowServiceFuncs(_ int, reply *string) error {
  381. s, err := service.GetManager().ListFunctions()
  382. if err != nil {
  383. return fmt.Errorf("Show service funcs error: %s.", err)
  384. }
  385. if len(s) == 0 {
  386. *reply = "No service definitions are found."
  387. } else {
  388. r, err := marshalDesc(s)
  389. if err != nil {
  390. return fmt.Errorf("Show service funcs error: %v", err)
  391. }
  392. *reply = r
  393. }
  394. return nil
  395. }
  396. func marshalDesc(m interface{}) (string, error) {
  397. s, err := json.Marshal(m)
  398. if err != nil {
  399. return "", fmt.Errorf("invalid json %v", m)
  400. }
  401. dst := &bytes.Buffer{}
  402. if err := json.Indent(dst, s, "", " "); err != nil {
  403. return "", fmt.Errorf("indent json error %v", err)
  404. }
  405. return dst.String(), nil
  406. }
  407. func getPluginByJson(arg *model.PluginDesc, pt plugin.PluginType) (plugin.Plugin, error) {
  408. p := plugin.NewPluginByType(pt)
  409. if arg.Json != "" {
  410. if err := json.Unmarshal([]byte(arg.Json), p); err != nil {
  411. return nil, fmt.Errorf("Parse plugin %s error : %s.", arg.Json, err)
  412. }
  413. }
  414. p.SetName(arg.Name)
  415. return p, nil
  416. }
  417. func init() {
  418. ticker := time.NewTicker(time.Second * 5)
  419. go func() {
  420. for {
  421. <-ticker.C
  422. if registry != nil {
  423. if _, ok := registry.Load(QueryRuleId); !ok {
  424. continue
  425. }
  426. n := time.Now()
  427. w := 10 * time.Second
  428. if v := n.Sub(sink.QR.LastFetch); v >= w {
  429. logger.Printf("The client seems no longer fetch the query result, stop the query now.")
  430. stopQuery()
  431. ticker.Stop()
  432. return
  433. }
  434. }
  435. }
  436. }()
  437. }