123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """Bioinfo pipeline result uploader.
- Make tar.gz of bioinfo pipeline and upload the tar to Nextcloud and online system.
- __author__ = Yifan Wang
- __copyright__ = Copyright 2021
- __version__ = 1.5
- __maintainer__ = Yifan Wang
- __email__ = yifan.wang@nuprobe.com
- __status__ = Prod
- __date__ = 2021-09-13
- """
- # How to use
- # First install package webdav3 with "pip install webdavclient3"
- # Then run this script like:
- # python3 result_uploader.py /ngs/G_Counseling/Data/V4335-Lane487
- import argparse
- import os
- import re
- import tarfile
- from shutil import copyfile, rmtree, SameFileError
- from webdav3.client import Client
- from webdav3.exceptions import ResponseErrorCode
- import requests
- from pprint import pprint
- NEEDED_FILE_SUFFIX = [
- '.azfcall.xls',
- '.CNV.xls',
- '.CR_VAF.png',
- '.GT.xls',
- '.NO.DEL.png',
- '.ratio.png',
- '.vardict.hg19_multianno.xlsx',
- '.gatk.hg19_multianno.xlsx',
- '_FullGenome.png',
- '_M_QC_plot.pdf',
- '.AZF.result.xls',
- 'DEL.png',
- 'QC.xls',
- '.cnv_anno.xls',
- ]
- WEBDAV_OPTIONS = {
- 'webdav_hostname': "https://nextcloud.nuprobe.com.cn",
- 'webdav_login': "carrier",
- 'webdav_password': "carriergene"
- }
- WEBDAV_ROOT = '/remote.php/dav/files/carrier/订单系统开发共享/08-生信分析输出'
- REQ_URL_PRD = 'http://dingdan.nuprobe.com.cn/carriergene/api/third/BioAnalysisFilesUpload'
- REQ_URL_DEV = 'http://dingdan-dev.nuprobe.com.cn/carriergene/api/third/BioAnalysisFilesUpload'
- REQ_TOKEN = 'E56149D8D96EF74B90A84229E255A803'
- DEV_MODE = False
- def _is_needed_file(file):
- if not file or file is None:
- return False
- if 'TEST' in file:
- return False
- for suffix in NEEDED_FILE_SUFFIX:
- if file.endswith(suffix):
- return True
- return False
- def _get_needed_files(dir_path, rm_libs=None):
- needed_files = []
- for root, dirs, files in os.walk(dir_path):
- if 'TEST' in root:
- continue
- if rm_libs is None:
- for file in files:
- if _is_needed_file(file):
- needed_files.append(os.path.join(root, file))
- continue
- for file in files:
- if _is_needed_file(file):
- try:
- for lib in rm_libs:
- if lib in file:
- raise Exception()
- except Exception:
- continue
- needed_files.append(os.path.join(root, file))
- return needed_files
- def _get_temp_tar_path(dir_path, to4digit=True, v2=False):
- upper_dir = os.path.basename(dir_path)
- matched = re.match(r'^(\w+)-Lane(\d+)$', upper_dir)
- if not matched:
- raise Exception('Fail to get lane number from dirname.')
- prefix = matched.group(1)
- lane_number = matched.group(2)
- if to4digit and len(lane_number) == 3:
- lane_number = '0' + lane_number
- temp_dir = os.path.abspath(os.path.join(
- dir_path, f'CG{lane_number}-{prefix}'))
- tar_path = os.path.abspath(temp_dir + '.tar.gz')
- if v2:
- tar_path = os.path.abspath(temp_dir + '-V2.tar.gz')
- return temp_dir, tar_path
- def _copy_file(file, dest, to4digit=True):
- filename = os.path.basename(file)
- if to4digit:
- matched = re.match(r'^CG(\d+)(V.+)$', filename)
- if not matched:
- raise Exception(f'Fail to get lane number from file: {filename}')
- lane_number = matched.group(1)
- remainder = matched.group(2)
- if len(lane_number) == 3:
- lane_number = '0' + lane_number
- filename = f'CG{lane_number}{remainder}'
- old_path = file
- new_path = os.path.join(dest, filename)
- try:
- copyfile(old_path, new_path)
- except SameFileError:
- pass
- def _make_tar(temp_dir, tar_path):
- with tarfile.open(tar_path, 'w:gz') as tar:
- for file in os.listdir(temp_dir):
- file_path = os.path.join(temp_dir, file)
- if not os.path.isfile(file_path):
- continue
- tar.add(file_path, arcname=file)
- def _make_tar_from_files(needed_files, tar_path, to4digit=True):
- with tarfile.open(tar_path, 'w:gz') as tar:
- for file in needed_files:
- filename = os.path.basename(file)
- if to4digit:
- matched = re.match(r'^CG(\d+)(V.+)$', filename)
- if not matched:
- raise Exception(
- f'Fail to get lane number from file: {filename}')
- lane_number = matched.group(1)
- remainder = matched.group(2)
- if len(lane_number) == 3:
- lane_number = '0' + lane_number
- filename = f'CG{lane_number}{remainder}'
- if not os.path.isfile(file):
- continue
- tar.add(file, arcname=filename)
- def _rm_temp_dir(temp_dir):
- rmtree(temp_dir)
- def _upload_tar_by_webdav(tar_path):
- client = Client(WEBDAV_OPTIONS)
- tar_name = os.path.basename(tar_path)
- remote_path = os.path.join(WEBDAV_ROOT, tar_name).replace('\\', '/')
- try:
- client.upload_sync(remote_path=remote_path,
- local_path=tar_path)
- except Exception as e:
- print('Exception while uploading to NextCloud.')
- def _rm_local_tar(tar_path):
- os.remove(tar_path)
- def _upload_tar_by_post(tar_path):
- req_url = REQ_URL_DEV if DEV_MODE else REQ_URL_PRD
- response = requests.post(
- req_url,
- files={'file': open(tar_path, 'rb')},
- headers={'authorization': REQ_TOKEN}
- )
- responded_code = response.json()['code']
- if responded_code == 0:
- return None
- print(response.json())
- if responded_code == 15:
- responded_detail = response.json()['detail']
- lib_pattern = r'(Lib\d{4,}[Bb]?)'
- libs = re.findall(lib_pattern, responded_detail)
- print(libs)
- if libs:
- return libs
- raise Exception('Failed to load file.')
- if responded_code == -1:
- raise Exception('Non-authorized request.')
- if responded_code == 12:
- raise Exception('Wrong request header: failed to parse token.')
- raise Exception('Unknown exception while uploading to online system.')
- if __name__ == '__main__':
- parser = argparse.ArgumentParser(description='MBY2 result uploader.')
- parser.add_argument('dir_path', metavar='DIR', type=str,
- help='Lane data result dir.')
- args = parser.parse_args()
- dir_path = args.dir_path.rstrip(r'\/')
- if not os.path.isdir(dir_path):
- raise Exception(f'The input is not a dir: {dir_path}')
- needed_files = _get_needed_files(dir_path)
- needed_filenames = {f: os.path.basename(f) for f in needed_files}
- sort_orders = sorted(needed_filenames.items(),
- key=lambda x: x[1], reverse=True)
- temp_dir, tar_path = _get_temp_tar_path(dir_path)
- _make_tar_from_files(needed_files, tar_path)
- print('Result tar created.')
- _upload_tar_by_webdav(tar_path)
- print('Result tar uploaded to NextCloud.')
- rm_libs = _upload_tar_by_post(tar_path)
- if rm_libs is None:
- print('Result tar uploaded to online system.')
- _rm_local_tar(tar_path)
- print('Result tar removed.')
- exit()
- all_rm_libs = rm_libs
- tar_path_v2 = tar_path
- while rm_libs is not None:
- _rm_local_tar(tar_path_v2)
- temp_dir, tar_path_v2 = _get_temp_tar_path(dir_path, v2=True)
- needed_files_v2 = _get_needed_files(dir_path, rm_libs=all_rm_libs)
- print(f'Found libs with no job ticket: ')
- print(all_rm_libs)
- _make_tar_from_files(needed_files_v2, tar_path_v2)
- print('Result tar V2 created.')
- _upload_tar_by_webdav(tar_path_v2)
- print('Result tar V2 uploaded to NextCloud.')
- rm_libs = _upload_tar_by_post(tar_path_v2)
- if rm_libs is not None:
- all_rm_libs = all_rm_libs + rm_libs
- print('Result tar V2 uploaded to online system.')
- _rm_local_tar(tar_path_v2)
- print('Result tar removed.')
|