perftrace.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400
  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. """
  4. Life's short, Python more.
  5. """
  6. import re
  7. import os
  8. import sys
  9. import json
  10. import uuid
  11. import signal
  12. import time
  13. import subprocess
  14. from optparse import OptionParser
  15. reload(sys)
  16. sys.setdefaultencoding('utf8')
  17. ##begin cli & help logic
  18. def getOptionParser():
  19. usage = getUsage()
  20. parser = OptionParser(usage = usage)
  21. #rdbms reader and writer
  22. parser.add_option('-r', '--reader', action='store', dest='reader', help='trace datasource read performance with specified !json! string')
  23. parser.add_option('-w', '--writer', action='store', dest='writer', help='trace datasource write performance with specified !json! string')
  24. parser.add_option('-c', '--channel', action='store', dest='channel', default='1', help='the number of concurrent sync thread, the default is 1')
  25. parser.add_option('-f', '--file', action='store', help='existing datax configuration file, include reader and writer params')
  26. parser.add_option('-t', '--type', action='store', default='reader', help='trace which side\'s performance, cooperate with -f --file params, need to be reader or writer')
  27. parser.add_option('-d', '--delete', action='store', default='true', help='delete temporary files, the default value is true')
  28. #parser.add_option('-h', '--help', action='store', default='true', help='print usage information')
  29. return parser
  30. def getUsage():
  31. return '''
  32. The following params are available for -r --reader:
  33. [these params is for rdbms reader, used to trace rdbms read performance, it's like datax's key]
  34. *datasourceType: datasource type, may be mysql|drds|oracle|ads|sqlserver|postgresql|db2 etc...
  35. *jdbcUrl: datasource jdbc connection string, mysql as a example: jdbc:mysql://ip:port/database
  36. *username: username for datasource
  37. *password: password for datasource
  38. *table: table name for read data
  39. column: column to be read, the default value is ['*']
  40. splitPk: the splitPk column of rdbms table
  41. where: limit the scope of the performance data set
  42. fetchSize: how many rows to be fetched at each communicate
  43. [these params is for stream reader, used to trace rdbms write performance]
  44. reader-sliceRecordCount: how man test data to mock(each channel), the default value is 10000
  45. reader-column : stream reader while generate test data(type supports: string|long|date|double|bool|bytes; support constant value and random function),demo: [{"type":"string","value":"abc"},{"type":"string","random":"10,20"}]
  46. The following params are available for -w --writer:
  47. [these params is for rdbms writer, used to trace rdbms write performance, it's like datax's key]
  48. datasourceType: datasource type, may be mysql|drds|oracle|ads|sqlserver|postgresql|db2|ads etc...
  49. *jdbcUrl: datasource jdbc connection string, mysql as a example: jdbc:mysql://ip:port/database
  50. *username: username for datasource
  51. *password: password for datasource
  52. *table: table name for write data
  53. column: column to be writed, the default value is ['*']
  54. batchSize: how many rows to be storeed at each communicate, the default value is 512
  55. preSql: prepare sql to be executed before write data, the default value is ''
  56. postSql: post sql to be executed end of write data, the default value is ''
  57. url: required for ads, pattern is ip:port
  58. schme: required for ads, ads database name
  59. [these params is for stream writer, used to trace rdbms read performance]
  60. writer-print: true means print data read from source datasource, the default value is false
  61. The following params are available global control:
  62. -c --channel: the number of concurrent tasks, the default value is 1
  63. -f --file: existing completely dataX configuration file path
  64. -t --type: test read or write performance for a datasource, couble be reader or writer, in collaboration with -f --file
  65. -h --help: print help message
  66. some demo:
  67. perftrace.py --channel=10 --reader='{"jdbcUrl":"jdbc:mysql://127.0.0.1:3306/database", "username":"", "password":"", "table": "", "where":"", "splitPk":"", "writer-print":"false"}'
  68. perftrace.py --channel=10 --writer='{"jdbcUrl":"jdbc:mysql://127.0.0.1:3306/database", "username":"", "password":"", "table": "", "reader-sliceRecordCount": "10000", "reader-column": [{"type":"string","value":"abc"},{"type":"string","random":"10,20"}]}'
  69. perftrace.py --file=/tmp/datax.job.json --type=reader --reader='{"writer-print": "false"}'
  70. perftrace.py --file=/tmp/datax.job.json --type=writer --writer='{"reader-sliceRecordCount": "10000", "reader-column": [{"type":"string","value":"abc"},{"type":"string","random":"10,20"}]}'
  71. some example jdbc url pattern, may help:
  72. jdbc:oracle:thin:@ip:port:database
  73. jdbc:mysql://ip:port/database
  74. jdbc:sqlserver://ip:port;DatabaseName=database
  75. jdbc:postgresql://ip:port/database
  76. warn: ads url pattern is ip:port
  77. warn: test write performance will write data into your table, you can use a temporary table just for test.
  78. '''
  79. def printCopyright():
  80. DATAX_VERSION = 'UNKNOWN_DATAX_VERSION'
  81. print('''
  82. DataX Util Tools (%s), From Alibaba !
  83. Copyright (C) 2010-2016, Alibaba Group. All Rights Reserved.''' % DATAX_VERSION)
  84. sys.stdout.flush()
  85. def yesNoChoice():
  86. yes = set(['yes','y', 'ye', ''])
  87. no = set(['no','n'])
  88. choice = raw_input().lower()
  89. if choice in yes:
  90. return True
  91. elif choice in no:
  92. return False
  93. else:
  94. sys.stdout.write("Please respond with 'yes' or 'no'")
  95. ##end cli & help logic
  96. ##begin process logic
  97. def suicide(signum, e):
  98. global childProcess
  99. print >> sys.stderr, "[Error] Receive unexpected signal %d, starts to suicide." % (signum)
  100. if childProcess:
  101. childProcess.send_signal(signal.SIGQUIT)
  102. time.sleep(1)
  103. childProcess.kill()
  104. print >> sys.stderr, "DataX Process was killed ! you did ?"
  105. sys.exit(-1)
  106. def registerSignal():
  107. global childProcess
  108. signal.signal(2, suicide)
  109. signal.signal(3, suicide)
  110. signal.signal(15, suicide)
  111. def fork(command, isShell=False):
  112. global childProcess
  113. childProcess = subprocess.Popen(command, shell = isShell)
  114. registerSignal()
  115. (stdout, stderr) = childProcess.communicate()
  116. #阻塞直到子进程结束
  117. childProcess.wait()
  118. return childProcess.returncode
  119. ##end process logic
  120. ##begin datax json generate logic
  121. #warn: if not '': -> true; if not None: -> true
  122. def notNone(obj, context):
  123. if not obj:
  124. raise Exception("Configuration property [%s] could not be blank!" % (context))
  125. def attributeNotNone(obj, attributes):
  126. for key in attributes:
  127. notNone(obj.get(key), key)
  128. def isBlank(value):
  129. if value is None or len(value.strip()) == 0:
  130. return True
  131. return False
  132. def parsePluginName(jdbcUrl, pluginType):
  133. import re
  134. #warn: drds
  135. name = 'pluginName'
  136. mysqlRegex = re.compile('jdbc:(mysql)://.*')
  137. if (mysqlRegex.match(jdbcUrl)):
  138. name = 'mysql'
  139. postgresqlRegex = re.compile('jdbc:(postgresql)://.*')
  140. if (postgresqlRegex.match(jdbcUrl)):
  141. name = 'postgresql'
  142. oracleRegex = re.compile('jdbc:(oracle):.*')
  143. if (oracleRegex.match(jdbcUrl)):
  144. name = 'oracle'
  145. sqlserverRegex = re.compile('jdbc:(sqlserver)://.*')
  146. if (sqlserverRegex.match(jdbcUrl)):
  147. name = 'sqlserver'
  148. db2Regex = re.compile('jdbc:(db2)://.*')
  149. if (db2Regex.match(jdbcUrl)):
  150. name = 'db2'
  151. return "%s%s" % (name, pluginType)
  152. def renderDataXJson(paramsDict, readerOrWriter = 'reader', channel = 1):
  153. dataxTemplate = {
  154. "job": {
  155. "setting": {
  156. "speed": {
  157. "channel": 1
  158. }
  159. },
  160. "content": [
  161. {
  162. "reader": {
  163. "name": "",
  164. "parameter": {
  165. "username": "",
  166. "password": "",
  167. "sliceRecordCount": "10000",
  168. "column": [
  169. "*"
  170. ],
  171. "connection": [
  172. {
  173. "table": [],
  174. "jdbcUrl": []
  175. }
  176. ]
  177. }
  178. },
  179. "writer": {
  180. "name": "",
  181. "parameter": {
  182. "print": "false",
  183. "connection": [
  184. {
  185. "table": [],
  186. "jdbcUrl": ''
  187. }
  188. ]
  189. }
  190. }
  191. }
  192. ]
  193. }
  194. }
  195. dataxTemplate['job']['setting']['speed']['channel'] = channel
  196. dataxTemplateContent = dataxTemplate['job']['content'][0]
  197. pluginName = ''
  198. if paramsDict.get('datasourceType'):
  199. pluginName = '%s%s' % (paramsDict['datasourceType'], readerOrWriter)
  200. elif paramsDict.get('jdbcUrl'):
  201. pluginName = parsePluginName(paramsDict['jdbcUrl'], readerOrWriter)
  202. elif paramsDict.get('url'):
  203. pluginName = 'adswriter'
  204. theOtherSide = 'writer' if readerOrWriter == 'reader' else 'reader'
  205. dataxPluginParamsContent = dataxTemplateContent.get(readerOrWriter).get('parameter')
  206. dataxPluginParamsContent.update(paramsDict)
  207. dataxPluginParamsContentOtherSide = dataxTemplateContent.get(theOtherSide).get('parameter')
  208. if readerOrWriter == 'reader':
  209. dataxTemplateContent.get('reader')['name'] = pluginName
  210. dataxTemplateContent.get('writer')['name'] = 'streamwriter'
  211. if paramsDict.get('writer-print'):
  212. dataxPluginParamsContentOtherSide['print'] = paramsDict['writer-print']
  213. del dataxPluginParamsContent['writer-print']
  214. del dataxPluginParamsContentOtherSide['connection']
  215. if readerOrWriter == 'writer':
  216. dataxTemplateContent.get('reader')['name'] = 'streamreader'
  217. dataxTemplateContent.get('writer')['name'] = pluginName
  218. if paramsDict.get('reader-column'):
  219. dataxPluginParamsContentOtherSide['column'] = paramsDict['reader-column']
  220. del dataxPluginParamsContent['reader-column']
  221. if paramsDict.get('reader-sliceRecordCount'):
  222. dataxPluginParamsContentOtherSide['sliceRecordCount'] = paramsDict['reader-sliceRecordCount']
  223. del dataxPluginParamsContent['reader-sliceRecordCount']
  224. del dataxPluginParamsContentOtherSide['connection']
  225. if paramsDict.get('jdbcUrl'):
  226. if readerOrWriter == 'reader':
  227. dataxPluginParamsContent['connection'][0]['jdbcUrl'].append(paramsDict['jdbcUrl'])
  228. else:
  229. dataxPluginParamsContent['connection'][0]['jdbcUrl'] = paramsDict['jdbcUrl']
  230. if paramsDict.get('table'):
  231. dataxPluginParamsContent['connection'][0]['table'].append(paramsDict['table'])
  232. traceJobJson = json.dumps(dataxTemplate, indent = 4)
  233. return traceJobJson
  234. def isUrl(path):
  235. if not path:
  236. return False
  237. if not isinstance(path, str):
  238. raise Exception('Configuration file path required for the string, you configure is:%s' % path)
  239. m = re.match(r"^http[s]?://\S+\w*", path.lower())
  240. if m:
  241. return True
  242. else:
  243. return False
  244. def readJobJsonFromLocal(jobConfigPath):
  245. jobConfigContent = None
  246. jobConfigPath = os.path.abspath(jobConfigPath)
  247. file = open(jobConfigPath)
  248. try:
  249. jobConfigContent = file.read()
  250. finally:
  251. file.close()
  252. if not jobConfigContent:
  253. raise Exception("Your job configuration file read the result is empty, please check the configuration is legal, path: [%s]\nconfiguration:\n%s" % (jobConfigPath, str(jobConfigContent)))
  254. return jobConfigContent
  255. def readJobJsonFromRemote(jobConfigPath):
  256. import urllib
  257. conn = urllib.urlopen(jobConfigPath)
  258. jobJson = conn.read()
  259. return jobJson
  260. def parseJson(strConfig, context):
  261. try:
  262. return json.loads(strConfig)
  263. except Exception as e:
  264. import traceback
  265. traceback.print_exc()
  266. sys.stdout.flush()
  267. print >> sys.stderr, '%s %s need in line with json syntax' % (context, strConfig)
  268. sys.exit(-1)
  269. def convert(options, args):
  270. traceJobJson = ''
  271. if options.file:
  272. if isUrl(options.file):
  273. traceJobJson = readJobJsonFromRemote(options.file)
  274. else:
  275. traceJobJson = readJobJsonFromLocal(options.file)
  276. traceJobDict = parseJson(traceJobJson, '%s content' % options.file)
  277. attributeNotNone(traceJobDict, ['job'])
  278. attributeNotNone(traceJobDict['job'], ['content'])
  279. attributeNotNone(traceJobDict['job']['content'][0], ['reader', 'writer'])
  280. attributeNotNone(traceJobDict['job']['content'][0]['reader'], ['name', 'parameter'])
  281. attributeNotNone(traceJobDict['job']['content'][0]['writer'], ['name', 'parameter'])
  282. if options.type == 'reader':
  283. traceJobDict['job']['content'][0]['writer']['name'] = 'streamwriter'
  284. if options.reader:
  285. traceReaderDict = parseJson(options.reader, 'reader config')
  286. if traceReaderDict.get('writer-print') is not None:
  287. traceJobDict['job']['content'][0]['writer']['parameter']['print'] = traceReaderDict.get('writer-print')
  288. else:
  289. traceJobDict['job']['content'][0]['writer']['parameter']['print'] = 'false'
  290. else:
  291. traceJobDict['job']['content'][0]['writer']['parameter']['print'] = 'false'
  292. elif options.type == 'writer':
  293. traceJobDict['job']['content'][0]['reader']['name'] = 'streamreader'
  294. if options.writer:
  295. traceWriterDict = parseJson(options.writer, 'writer config')
  296. if traceWriterDict.get('reader-column'):
  297. traceJobDict['job']['content'][0]['reader']['parameter']['column'] = traceWriterDict['reader-column']
  298. if traceWriterDict.get('reader-sliceRecordCount'):
  299. traceJobDict['job']['content'][0]['reader']['parameter']['sliceRecordCount'] = traceWriterDict['reader-sliceRecordCount']
  300. else:
  301. columnSize = len(traceJobDict['job']['content'][0]['writer']['parameter']['column'])
  302. streamReaderColumn = []
  303. for i in range(columnSize):
  304. streamReaderColumn.append({"type": "long", "random": "2,10"})
  305. traceJobDict['job']['content'][0]['reader']['parameter']['column'] = streamReaderColumn
  306. traceJobDict['job']['content'][0]['reader']['parameter']['sliceRecordCount'] = 10000
  307. else:
  308. pass#do nothing
  309. return json.dumps(traceJobDict, indent = 4)
  310. elif options.reader:
  311. traceReaderDict = parseJson(options.reader, 'reader config')
  312. return renderDataXJson(traceReaderDict, 'reader', options.channel)
  313. elif options.writer:
  314. traceWriterDict = parseJson(options.writer, 'writer config')
  315. return renderDataXJson(traceWriterDict, 'writer', options.channel)
  316. else:
  317. print(getUsage())
  318. sys.exit(-1)
  319. #dataxParams = {}
  320. #for opt, value in options.__dict__.items():
  321. # dataxParams[opt] = value
  322. ##end datax json generate logic
  323. if __name__ == "__main__":
  324. printCopyright()
  325. parser = getOptionParser()
  326. options, args = parser.parse_args(sys.argv[1:])
  327. #print options, args
  328. dataxTraceJobJson = convert(options, args)
  329. #由MAC地址、当前时间戳、随机数生成,可以保证全球范围内的唯一性
  330. dataxJobPath = os.path.join(os.getcwd(), "perftrace-" + str(uuid.uuid1()))
  331. jobConfigOk = True
  332. if os.path.exists(dataxJobPath):
  333. print("file already exists, truncate and rewrite it? %s" % dataxJobPath)
  334. if yesNoChoice():
  335. jobConfigOk = True
  336. else:
  337. print("exit failed, because of file conflict")
  338. sys.exit(-1)
  339. fileWriter = open(dataxJobPath, 'w')
  340. fileWriter.write(dataxTraceJobJson)
  341. fileWriter.close()
  342. print("trace environments:")
  343. print("dataxJobPath: %s" % dataxJobPath)
  344. dataxHomePath = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  345. print("dataxHomePath: %s" % dataxHomePath)
  346. dataxCommand = "%s %s" % (os.path.join(dataxHomePath, "bin", "datax.py"), dataxJobPath)
  347. print("dataxCommand: %s" % dataxCommand)
  348. returncode = fork(dataxCommand, True)
  349. if options.delete == 'true':
  350. os.remove(dataxJobPath)
  351. sys.exit(returncode)