#!/usr/bin/env python3 # -*- coding: utf-8 -*- """Bioinfo pipeline result uploader. Make tar.gz of bioinfo pipeline and upload the tar to Nextcloud and online system. __author__ = Yifan Wang __copyright__ = Copyright 2021 __version__ = 1.5 __maintainer__ = Yifan Wang __email__ = yifan.wang@nuprobe.com __status__ = Prod __date__ = 2021-09-13 """ # How to use # First install package webdav3 with "pip install webdavclient3" # Then run this script like: # python3 result_uploader.py /ngs/G_Counseling/Data/V4335-Lane487 import argparse import os import re import tarfile from shutil import copyfile, rmtree, SameFileError from webdav3.client import Client from webdav3.exceptions import ResponseErrorCode import requests from pprint import pprint NEEDED_FILE_SUFFIX = [ '.azfcall.xls', '.CNV.xls', '.CR_VAF.png', '.GT.xls', '.NO.DEL.png', '.ratio.png', '.vardict.hg19_multianno.xlsx', '.gatk.hg19_multianno.xlsx', '_FullGenome.png', '_M_QC_plot.pdf', '.AZF.result.xls', 'DEL.png', 'QC.xls', '.cnv_anno.xls', ] WEBDAV_OPTIONS = { 'webdav_hostname': "https://nextcloud.nuprobe.com.cn", 'webdav_login': "carrier", 'webdav_password': "carriergene" } WEBDAV_ROOT = '/remote.php/dav/files/carrier/订单系统开发共享/08-生信分析输出' REQ_URL_PRD = 'http://dingdan.nuprobe.com.cn/carriergene/api/third/BioAnalysisFilesUpload' REQ_URL_DEV = 'http://dingdan-dev.nuprobe.com.cn/carriergene/api/third/BioAnalysisFilesUpload' REQ_TOKEN = 'E56149D8D96EF74B90A84229E255A803' DEV_MODE = False def _is_needed_file(file): if not file or file is None: return False if 'TEST' in file: return False for suffix in NEEDED_FILE_SUFFIX: if file.endswith(suffix): return True return False def _get_needed_files(dir_path, rm_libs=None): needed_files = [] for root, dirs, files in os.walk(dir_path): if 'TEST' in root: continue if rm_libs is None: for file in files: if _is_needed_file(file): needed_files.append(os.path.join(root, file)) continue for file in files: if _is_needed_file(file): try: for lib in rm_libs: if lib in file: raise Exception() except Exception: continue needed_files.append(os.path.join(root, file)) return needed_files def _get_temp_tar_path(dir_path, to4digit=True, v2=False): upper_dir = os.path.basename(dir_path) matched = re.match(r'^(\w+)-Lane(\d+)$', upper_dir) if not matched: raise Exception('Fail to get lane number from dirname.') prefix = matched.group(1) lane_number = matched.group(2) if to4digit and len(lane_number) == 3: lane_number = '0' + lane_number temp_dir = os.path.abspath(os.path.join( dir_path, f'CG{lane_number}-{prefix}')) tar_path = os.path.abspath(temp_dir + '.tar.gz') if v2: tar_path = os.path.abspath(temp_dir + '-V2.tar.gz') return temp_dir, tar_path def _copy_file(file, dest, to4digit=True): filename = os.path.basename(file) if to4digit: matched = re.match(r'^CG(\d+)(V.+)$', filename) if not matched: raise Exception(f'Fail to get lane number from file: {filename}') lane_number = matched.group(1) remainder = matched.group(2) if len(lane_number) == 3: lane_number = '0' + lane_number filename = f'CG{lane_number}{remainder}' old_path = file new_path = os.path.join(dest, filename) try: copyfile(old_path, new_path) except SameFileError: pass def _make_tar(temp_dir, tar_path): with tarfile.open(tar_path, 'w:gz') as tar: for file in os.listdir(temp_dir): file_path = os.path.join(temp_dir, file) if not os.path.isfile(file_path): continue tar.add(file_path, arcname=file) def _make_tar_from_files(needed_files, tar_path, to4digit=True): with tarfile.open(tar_path, 'w:gz') as tar: for file in needed_files: filename = os.path.basename(file) if to4digit: matched = re.match(r'^CG(\d+)(V.+)$', filename) if not matched: raise Exception( f'Fail to get lane number from file: {filename}') lane_number = matched.group(1) remainder = matched.group(2) if len(lane_number) == 3: lane_number = '0' + lane_number filename = f'CG{lane_number}{remainder}' if not os.path.isfile(file): continue tar.add(file, arcname=filename) def _rm_temp_dir(temp_dir): rmtree(temp_dir) def _upload_tar_by_webdav(tar_path): client = Client(WEBDAV_OPTIONS) tar_name = os.path.basename(tar_path) remote_path = os.path.join(WEBDAV_ROOT, tar_name).replace('\\', '/') try: client.upload_sync(remote_path=remote_path, local_path=tar_path) except Exception as e: print('Exception while uploading to NextCloud.') def _rm_local_tar(tar_path): os.remove(tar_path) def _upload_tar_by_post(tar_path): req_url = REQ_URL_DEV if DEV_MODE else REQ_URL_PRD response = requests.post( req_url, files={'file': open(tar_path, 'rb')}, headers={'authorization': REQ_TOKEN} ) responded_code = response.json()['code'] if responded_code == 0: return None print(response.json()) if responded_code == 15: responded_detail = response.json()['detail'] lib_pattern = r'(Lib\d{4,}[Bb]?)' libs = re.findall(lib_pattern, responded_detail) print(libs) if libs: return libs raise Exception('Failed to load file.') if responded_code == -1: raise Exception('Non-authorized request.') if responded_code == 12: raise Exception('Wrong request header: failed to parse token.') raise Exception('Unknown exception while uploading to online system.') if __name__ == '__main__': parser = argparse.ArgumentParser(description='MBY2 result uploader.') parser.add_argument('dir_path', metavar='DIR', type=str, help='Lane data result dir.') args = parser.parse_args() dir_path = args.dir_path.rstrip(r'\/') if not os.path.isdir(dir_path): raise Exception(f'The input is not a dir: {dir_path}') needed_files = _get_needed_files(dir_path) needed_filenames = {f: os.path.basename(f) for f in needed_files} sort_orders = sorted(needed_filenames.items(), key=lambda x: x[1], reverse=True) temp_dir, tar_path = _get_temp_tar_path(dir_path) _make_tar_from_files(needed_files, tar_path) print('Result tar created.') _upload_tar_by_webdav(tar_path) print('Result tar uploaded to NextCloud.') rm_libs = _upload_tar_by_post(tar_path) if rm_libs is None: print('Result tar uploaded to online system.') _rm_local_tar(tar_path) print('Result tar removed.') exit() all_rm_libs = rm_libs tar_path_v2 = tar_path while rm_libs is not None: _rm_local_tar(tar_path_v2) temp_dir, tar_path_v2 = _get_temp_tar_path(dir_path, v2=True) needed_files_v2 = _get_needed_files(dir_path, rm_libs=all_rm_libs) print(f'Found libs with no job ticket: ') print(all_rm_libs) _make_tar_from_files(needed_files_v2, tar_path_v2) print('Result tar V2 created.') _upload_tar_by_webdav(tar_path_v2) print('Result tar V2 uploaded to NextCloud.') rm_libs = _upload_tar_by_post(tar_path_v2) if rm_libs is not None: all_rm_libs = all_rm_libs + rm_libs print('Result tar V2 uploaded to online system.') _rm_local_tar(tar_path_v2) print('Result tar removed.')