123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174 |
- import pandas as pd
- import os
- import xlrd
- import numpy as np
- import math
- import re
- import argparse
- #计算四分位数
- # 其中data为数据组,n为第几个四分位数
- def quantile_exc(TMBdata, n):
- if n<1 or n>3:
- return false
- TMBdata.sort()
- TMBdata=TMBdata[::-1]
- position = (len(TMBdata) + 1)*n/4
- pos_integer = int(math.modf(position)[1])
- pos_decimal = position - pos_integer
- quartile = TMBdata[pos_integer - 1] + (TMBdata[pos_integer] - TMBdata[pos_integer - 1])*pos_decimal
- return quartile
- #####修改2,不过滤indel,首先过滤的是vaf大于等于5%,如果为0,捞回来1个
- #修改3,当结果为空或者为 'No_somatic'时,将tmb设置为0
- def TMB_filter(sampleid,tumortype,svdata,Size_panel=1.39):
- TCGA_TMB_file = '/cgdata/liuxiangqiong/work62pancancer/pipeline/v0/refdata/TCGA_wes_panel_TMB_overlap_20221107.txt'
- #Occur_cosmic = 100
- if (len(svdata)!=0) & (svdata.loc[0,'Transid_uniq'] != 'No_somatic') & (pd.notna(svdata.loc[0,'Transid_uniq'] )):
- print('step7---select the vaf>=5%')
- # 提取VAF>=5%
- filterdata8_4 = svdata[svdata['VAF_tumor'] >= 0.05]
- # 如果结果为空,就默认设置为1
- if len(filterdata8_4) == 0:
- TMB_counts = 1
- else:
- TMB_counts = len(filterdata8_4)
- TMB = round(TMB_counts / Size_panel,2)
- # compare with TCGA
- elif (svdata.loc[0,'Transid_uniq'] == 'No_somatic') | (pd.isna(svdata.loc[0,'Transid_uniq'])) :
- TMB_counts = 0
- TMB = 0
- else:
- TMB_counts = 1
- #修改tmb保留小数点后面两位,四舍五入
- TMB = round(TMB_counts / Size_panel,2)
- TCGA_TMBdata = pd.read_table(TCGA_TMB_file, sep='\t')
- if tumortype!='unknown':
- tumordata = TCGA_TMBdata[TCGA_TMBdata['Tumor_type'] == tumortype]
- #如果对应的肿瘤没有数据,那么以整个数据集为背景
- if len(tumordata)==0:
- tumordata = TCGA_TMBdata
- tumordata.sort_values(by=["TMB_panel"], ascending=False, inplace=True)
- tumordata.reset_index(drop=True, inplace=True)
- elif tumortype=='unknown':
- tumordata=TCGA_TMBdata
- TMB_1quar = quantile_exc(tumordata['TMB_panel'].values, 1)
- TMB_2quar = quantile_exc(tumordata['TMB_panel'].values, 2)
- TMB_3quar = quantile_exc(tumordata['TMB_panel'].values, 3)
- if TMB >= TMB_1quar:
- TMB_label = 'TMB-H'
- else:
- TMB_label = 'TMB-L'
- TMB_result = pd.DataFrame(
- columns=['sampleid', 'mut_count', 'TMB', 'TMB_label', '25%TCGA', '50%TCGA', '75%TCGA', 'tumor'])
- TMB_result.loc[0, 'sampleid'] = sampleid
- TMB_result.loc[0, 'mut_count'] = TMB_counts
- TMB_result.loc[0, 'TMB'] = round(TMB,2)
- TMB_result.loc[0, 'TMB_label'] = TMB_label
- TMB_result.loc[0, '25%TCGA'] = round(TMB_1quar,2)
- TMB_result.loc[0, '50%TCGA'] = round(TMB_2quar,2)
- TMB_result.loc[0, '75%TCGA'] = round(TMB_3quar,2)
- TMB_result.loc[0, 'tumor'] = tumortype
- return TMB_result
- ###TMB
- def tmbsample_run_test(inputpath,laneid,sampleid):
- '''
- caculate the tmb based on the snv data
- :param laneid: the lib id
- :param sampleid: the sample id
- :param tumortype: the tumor type
- :return:
- '''
- #tumortype = 'unknown'
- resultfile=os.path.join(inputpath,'resultfile')
- sampledir=os.path.join(resultfile,sampleid)
- tumortypedata = os.path.join(sampledir,laneid + '-' + sampleid + '.tumortype.xlsx')
- tumortype_result = pd.read_excel(tumortypedata)
- tumortype = tumortype_result.loc[0, 'tumortype']
- snvdata = os.path.join(sampledir,laneid + '-' + sampleid + '.snv.xlsx')
- somatic_result = pd.read_excel(snvdata,sheet_name='Somatic_Mut')
- #将vaf_tumor为空的填0
- somatic_result['VAF_tumor'] =somatic_result['VAF_tumor'].fillna(0).astype('str')+'%'
- somatic_result['VAF_tumor'] = somatic_result['VAF_tumor'].str.strip('%').astype('float') / 100
- tmb=TMB_filter(sampleid,tumortype,somatic_result,Size_panel=1.39)
- # output the TMB
- TMB_result = tmb[['sampleid', 'TMB', 'TMB_label']]
- print(TMB_result)
- #return TMB_result
- outputfile =os.path.join(sampledir,laneid + '-' + sampleid + '.tmb1.xlsx')
- writer = pd.ExcelWriter(outputfile)
- TMB_result.to_excel(writer, sheet_name='tmb', index=False)
- writer.save()
- writer.close()
- ##修,当人工改了vaf常规变为百分比的时候
- ###TMB
- def tmbsample_run(inputpath,laneid,sampleid):
- '''
- caculate the tmb based on the snv data
- :param laneid: the lib id
- :param sampleid: the sample id
- :param tumortype: the tumor type
- :return:
- '''
- #tumortype = 'unknown'
- resultfile=os.path.join(inputpath,'resultfile')
- sampledir=os.path.join(resultfile,sampleid)
- tumortypedata = os.path.join(sampledir,laneid + '-' + sampleid + '.tumortype.xlsx')
- tumortype_result = pd.read_excel(tumortypedata)
- tumortype = tumortype_result.loc[0, 'tumortype']
- snvdata = os.path.join(sampledir,laneid + '-' + sampleid + '.snv.xlsx')
- somatic_result = pd.read_excel(snvdata,sheet_name='Somatic_Mut')
- for i in range(len(somatic_result)):
- print(i)
- vaf = str(somatic_result.loc[i, 'VAF_tumor'])
- print(vaf)
- if pd.isna(somatic_result.loc[i, 'VAF_tumor']):
- somatic_result.loc[i, 'VAF_tumor'] = 0
- elif (pd.notna(somatic_result.loc[i, 'VAF_tumor'])) & ('%' in vaf):
- somatic_result.loc[i, 'VAF_tumor'] = float(somatic_result.loc[i, 'VAF_tumor'].strip('%')) / 100
- tmb=TMB_filter(sampleid,tumortype,somatic_result,Size_panel=1.39)
- # output the TMB
- TMB_result = tmb[['sampleid', 'TMB', 'TMB_label']]
- print(TMB_result)
- #return TMB_result
- outputfile =os.path.join(sampledir,laneid + '-' + sampleid + '.tmb1.xlsx')
- writer = pd.ExcelWriter(outputfile)
- TMB_result.to_excel(writer, sheet_name='tmb', index=False)
- writer.save()
- writer.close()
- #####读入每个样本
- def main(inputpath,laneid):
- samplelist = os.listdir(inputpath)
- for i in range(len(samplelist)):
- sampleid = samplelist[i]
- tmbsample_run(inputpath, laneid, sampleid)
- if __name__=='__main__':
- parser = argparse.ArgumentParser(description='TMB')
- parser.add_argument('-i', '--inputpath', type=str, help='the path of lane')
- parser.add_argument('-l', '--laneid', type=str, help='laneid')
- parser.add_argument('-s', '--sampleid', type=str, help='sampleid')
- args = parser.parse_args()
- Inputpath = args.inputpath
- Laneid=args.laneid
- Sampleid = args.sampleid
- tmbsample_run(Inputpath,Laneid,Sampleid)
|