result_uploader.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """Bioinfo pipeline result uploader.
  4. Make tar.gz of bioinfo pipeline and upload the tar to Nextcloud and online system.
  5. __author__ = Yifan Wang
  6. __copyright__ = Copyright 2021
  7. __version__ = 1.5
  8. __maintainer__ = Yifan Wang
  9. __email__ = yifan.wang@nuprobe.com
  10. __status__ = Prod
  11. __date__ = 2021-09-13
  12. """
  13. # How to use
  14. # First install package webdav3 with "pip install webdavclient3"
  15. # Then run this script like:
  16. # python3 result_uploader.py /ngs/G_Counseling/Data/V4335-Lane487
  17. import argparse
  18. import os
  19. import re
  20. import tarfile
  21. from shutil import copyfile, rmtree, SameFileError
  22. from webdav3.client import Client
  23. from webdav3.exceptions import ResponseErrorCode
  24. import requests
  25. from pprint import pprint
  26. NEEDED_FILE_SUFFIX = [
  27. '.azfcall.xls',
  28. '.CNV.xls',
  29. '.CR_VAF.png',
  30. '.GT.xls',
  31. '.NO.DEL.png',
  32. '.ratio.png',
  33. '.vardict.hg19_multianno.xlsx',
  34. '.gatk.hg19_multianno.xlsx',
  35. '_FullGenome.png',
  36. '_M_QC_plot.pdf',
  37. '.AZF.result.xls',
  38. 'DEL.png',
  39. 'QC.xls',
  40. '.cnv_anno.xls',
  41. ]
  42. WEBDAV_OPTIONS = {
  43. 'webdav_hostname': "https://nextcloud.nuprobe.com.cn",
  44. 'webdav_login': "carrier",
  45. 'webdav_password': "carriergene"
  46. }
  47. WEBDAV_ROOT = '/remote.php/dav/files/carrier/订单系统开发共享/08-生信分析输出'
  48. REQ_URL_PRD = 'http://dingdan.nuprobe.com.cn/carriergene/api/third/BioAnalysisFilesUpload'
  49. REQ_URL_DEV = 'http://dingdan-dev.nuprobe.com.cn/carriergene/api/third/BioAnalysisFilesUpload'
  50. REQ_TOKEN = 'E56149D8D96EF74B90A84229E255A803'
  51. DEV_MODE = False
  52. def _is_needed_file(file):
  53. if not file or file is None:
  54. return False
  55. if 'TEST' in file:
  56. return False
  57. for suffix in NEEDED_FILE_SUFFIX:
  58. if file.endswith(suffix):
  59. return True
  60. return False
  61. def _get_needed_files(dir_path, rm_libs=None):
  62. needed_files = []
  63. for root, dirs, files in os.walk(dir_path):
  64. if 'TEST' in root:
  65. continue
  66. if rm_libs is None:
  67. for file in files:
  68. if _is_needed_file(file):
  69. needed_files.append(os.path.join(root, file))
  70. continue
  71. for file in files:
  72. if _is_needed_file(file):
  73. try:
  74. for lib in rm_libs:
  75. if lib in file:
  76. raise Exception()
  77. except Exception:
  78. continue
  79. needed_files.append(os.path.join(root, file))
  80. return needed_files
  81. def _get_temp_tar_path(dir_path, to4digit=True, v2=False):
  82. upper_dir = os.path.basename(dir_path)
  83. matched = re.match(r'^(\w+)-Lane(\d+)$', upper_dir)
  84. if not matched:
  85. raise Exception('Fail to get lane number from dirname.')
  86. prefix = matched.group(1)
  87. lane_number = matched.group(2)
  88. if to4digit and len(lane_number) == 3:
  89. lane_number = '0' + lane_number
  90. temp_dir = os.path.abspath(os.path.join(
  91. dir_path, f'CG{lane_number}-{prefix}'))
  92. tar_path = os.path.abspath(temp_dir + '.tar.gz')
  93. if v2:
  94. tar_path = os.path.abspath(temp_dir + '-V2.tar.gz')
  95. return temp_dir, tar_path
  96. def _copy_file(file, dest, to4digit=True):
  97. filename = os.path.basename(file)
  98. if to4digit:
  99. matched = re.match(r'^CG(\d+)(V.+)$', filename)
  100. if not matched:
  101. raise Exception(f'Fail to get lane number from file: {filename}')
  102. lane_number = matched.group(1)
  103. remainder = matched.group(2)
  104. if len(lane_number) == 3:
  105. lane_number = '0' + lane_number
  106. filename = f'CG{lane_number}{remainder}'
  107. old_path = file
  108. new_path = os.path.join(dest, filename)
  109. try:
  110. copyfile(old_path, new_path)
  111. except SameFileError:
  112. pass
  113. def _make_tar(temp_dir, tar_path):
  114. with tarfile.open(tar_path, 'w:gz') as tar:
  115. for file in os.listdir(temp_dir):
  116. file_path = os.path.join(temp_dir, file)
  117. if not os.path.isfile(file_path):
  118. continue
  119. tar.add(file_path, arcname=file)
  120. def _make_tar_from_files(needed_files, tar_path, to4digit=True):
  121. with tarfile.open(tar_path, 'w:gz') as tar:
  122. for file in needed_files:
  123. filename = os.path.basename(file)
  124. if to4digit:
  125. matched = re.match(r'^CG(\d+)(V.+)$', filename)
  126. if not matched:
  127. raise Exception(
  128. f'Fail to get lane number from file: {filename}')
  129. lane_number = matched.group(1)
  130. remainder = matched.group(2)
  131. if len(lane_number) == 3:
  132. lane_number = '0' + lane_number
  133. filename = f'CG{lane_number}{remainder}'
  134. if not os.path.isfile(file):
  135. continue
  136. tar.add(file, arcname=filename)
  137. def _rm_temp_dir(temp_dir):
  138. rmtree(temp_dir)
  139. def _upload_tar_by_webdav(tar_path):
  140. client = Client(WEBDAV_OPTIONS)
  141. tar_name = os.path.basename(tar_path)
  142. remote_path = os.path.join(WEBDAV_ROOT, tar_name).replace('\\', '/')
  143. try:
  144. client.upload_sync(remote_path=remote_path,
  145. local_path=tar_path)
  146. except Exception as e:
  147. print('Exception while uploading to NextCloud.')
  148. def _rm_local_tar(tar_path):
  149. os.remove(tar_path)
  150. def _upload_tar_by_post(tar_path):
  151. req_url = REQ_URL_DEV if DEV_MODE else REQ_URL_PRD
  152. response = requests.post(
  153. req_url,
  154. files={'file': open(tar_path, 'rb')},
  155. headers={'authorization': REQ_TOKEN}
  156. )
  157. responded_code = response.json()['code']
  158. if responded_code == 0:
  159. return None
  160. print(response.json())
  161. if responded_code == 15:
  162. responded_detail = response.json()['detail']
  163. lib_pattern = r'(Lib\d{4,}[Bb]?)'
  164. libs = re.findall(lib_pattern, responded_detail)
  165. print(libs)
  166. if libs:
  167. return libs
  168. raise Exception('Failed to load file.')
  169. if responded_code == -1:
  170. raise Exception('Non-authorized request.')
  171. if responded_code == 12:
  172. raise Exception('Wrong request header: failed to parse token.')
  173. raise Exception('Unknown exception while uploading to online system.')
  174. if __name__ == '__main__':
  175. parser = argparse.ArgumentParser(description='MBY2 result uploader.')
  176. parser.add_argument('dir_path', metavar='DIR', type=str,
  177. help='Lane data result dir.')
  178. args = parser.parse_args()
  179. dir_path = args.dir_path.rstrip(r'\/')
  180. if not os.path.isdir(dir_path):
  181. raise Exception(f'The input is not a dir: {dir_path}')
  182. needed_files = _get_needed_files(dir_path)
  183. needed_filenames = {f: os.path.basename(f) for f in needed_files}
  184. sort_orders = sorted(needed_filenames.items(),
  185. key=lambda x: x[1], reverse=True)
  186. temp_dir, tar_path = _get_temp_tar_path(dir_path)
  187. _make_tar_from_files(needed_files, tar_path)
  188. print('Result tar created.')
  189. _upload_tar_by_webdav(tar_path)
  190. print('Result tar uploaded to NextCloud.')
  191. rm_libs = _upload_tar_by_post(tar_path)
  192. if rm_libs is None:
  193. print('Result tar uploaded to online system.')
  194. _rm_local_tar(tar_path)
  195. print('Result tar removed.')
  196. exit()
  197. all_rm_libs = rm_libs
  198. tar_path_v2 = tar_path
  199. while rm_libs is not None:
  200. _rm_local_tar(tar_path_v2)
  201. temp_dir, tar_path_v2 = _get_temp_tar_path(dir_path, v2=True)
  202. needed_files_v2 = _get_needed_files(dir_path, rm_libs=all_rm_libs)
  203. print(f'Found libs with no job ticket: ')
  204. print(all_rm_libs)
  205. _make_tar_from_files(needed_files_v2, tar_path_v2)
  206. print('Result tar V2 created.')
  207. _upload_tar_by_webdav(tar_path_v2)
  208. print('Result tar V2 uploaded to NextCloud.')
  209. rm_libs = _upload_tar_by_post(tar_path_v2)
  210. if rm_libs is not None:
  211. all_rm_libs = all_rm_libs + rm_libs
  212. print('Result tar V2 uploaded to online system.')
  213. _rm_local_tar(tar_path_v2)
  214. print('Result tar removed.')