#!/usr/bin/python # -*- coding:utf-8 -*- import argparse,time gene_info = {} gene_strand = {} DB_DIR="/cgdata/IVDRD/18gene_DB/qaseq_fusion" primer_homo_file=DB_DIR+"/sample_primer_stat.txt" primer_seq_file=DB_DIR+"/select_primer_info_total.txt" ref=DB_DIR+"/hg38.fa" db=DB_DIR+"/hg38" transcript_nm=DB_DIR+"/big_panel_transcript_nm_hg38.xls" PRIMER_INFO={} PRIMER_ID={} PRIMER_HOMO={} BASE_DICT={'A':'T','T':'A','G':'C','C':'G','N':'N'} def get_primer_info(): #PRIMER HOMO #{'primer':['primer1','primer2','primer3',...],} with open(primer_homo_file,'r') as phf: for line in phf: info = line.strip("\n").split("\t") PRIMER_HOMO[info[0]]=info[1].split(",") #PRIMER ID #{'primer':'ATGC','primer1':'ATGCCC'} #PRIMER INFO with open(primer_seq_file,'r') as psf: for line in psf: info = line.strip("\n").split("\t") PRIMER_ID[info[0]]=info[1] PRIMER_INFO[info[0]]=info def get_PrimerPre_Site(PrimerPre_Site_Info_file,reads_Primer_dict): Info_dict={} with open(PrimerPre_Site_Info_file,'r') as PPSI: for line in PPSI: info = line.strip("\n").split("\t") ID=info[-1].split("_")[0] if ID == "": continue primerID=reads_Primer_dict[ID][0] if 'IPC' in primerID: continue mismatch_seq=info[2] match_seq=info[3] if info[1] =="Down": primerID=reads_Primer_dict[ID][1] if 'IPC' in primerID: continue mismatch_seq="".join([BASE_DICT[base] for base in mismatch_seq][::-1]) match_seq="".join([BASE_DICT[base] for base in match_seq][::-1]) Info_dict[info[0]]=[info[0],primerID,mismatch_seq,match_seq,info[-2]] return Info_dict def out_test(dict): for key in dict.keys(): print("TEST:") print(key) print(dict[key]) def OvarLap(seq1,seq2,MISMATCH=1): num = len(seq1) count=0 for index in range(num): if seq1[index] !=seq2[index]: count +=1 if count >MISMATCH: return False return True def check(primer_seq,primer_down_seq,mismatch_seq): mismatch_len=len(mismatch_seq) for index in range(mismatch_len,5,-1): num=min(len(primer_seq),mismatch_len-index) for match_index in range(num,5,-1): primer_temp_seq=primer_seq[-1*match_index:]+primer_down_seq[:index] mismatch_temp_seq=mismatch_seq[-1*index-match_index:] if OvarLap(primer_temp_seq,mismatch_temp_seq): return True,index,match_index+index return False,0,0 def get_primer_Site(primer_list,mismatch_seq,match_seq): primer_candidate_fusion_dict={} for primer in primer_list: primer_temp=PRIMER_INFO[primer] primer_down_seq=primer_temp[-1] primer_up_seq=primer_temp[-2] primer_seq=primer_temp[1] check_results=check(primer_seq,primer_down_seq,mismatch_seq) if check_results[0]: match_common_len=int(check_results[1]) total_match_common_len=int(check_results[2]) strand = primer_temp[5] primer_chr=primer_temp[2] primer_start=int(primer_temp[3]) primer_end=int(primer_temp[4]) breakpoints=primer_end+match_common_len site_e=primer_chr+":"+str(breakpoints-30) if match_common_len>=30: fusion_seq=primer_down_seq[match_common_len-30:match_common_len]+match_seq[:30] else: if match_common_len+len(primer_seq)>=30: fusion_seq=primer_seq[-1*(30-match_common_len):]+primer_down_seq[:match_common_len]+match_seq[:30] elif match_common_len+len(primer_seq)>=15: primer_seq_len=len(primer_seq) fusion_seq=primer_up_seq[-1*(30-match_common_len-primer_seq_len):]+primer_seq+primer_down_seq[:match_common_len]+match_seq[:30] if strand == '-1': breakpoints=primer_start-match_common_len site_e=primer_chr+":"+str(breakpoints+30) site=primer_chr+":"+str(breakpoints) primer_candidate_fusion_dict[primer]=[primer,site,site_e,fusion_seq,str(total_match_common_len)] return primer_candidate_fusion_dict def get_Uniq_UMIs(ID_list): UMIs_dict={} for ID in ID_list: UMI=ID.split("_")[2] UMIs_dict[UMI]=0 return len(UMIs_dict) def get_single_primer_points(PrimerPre_Site_Info,indir): #记录每个readsID 对应site single_points_PrimersPre_readsInfo_file=indir+"/single_points_PrimersPre_readsInfo.txt" single_points_PrimersPre_readsInfo_dict={} with open(single_points_PrimersPre_readsInfo_file,'r') as sppr: for line in sppr: info=line.strip("\n").split("\t") if info[0] not in single_points_PrimersPre_readsInfo_dict.keys(): single_points_PrimersPre_readsInfo_dict[info[0]]=[info[-1]] else: single_points_PrimersPre_readsInfo_dict[info[0]].append(info[-1]) fusion_breakpoints_detail_file=indir+"/single_fusion_breakpoints_detail.txt" fusion_breakpoints_detail=open(fusion_breakpoints_detail_file,'w') fusion_breakpoints_detail.write("\t".join(['KeyPoints','Points','Point1_End','Point2_End','FusionSeq','Primer'])+"\n") fusion_breakpoints_readsID_file=indir+"/single_fusion_breakpoints_readsID.txt" fusion_breakpoints_readsID=open(fusion_breakpoints_readsID_file,'w') fusion_breakpoints_readsID.write("\t".join(['Points','FusionSeq','ReadsID'])+"\n") fusion_breakpoints_stat_file=indir+"/single_fusion_breakpoints_stat.txt" fusion_breakpoints_stat=open(fusion_breakpoints_stat_file,'w') fusion_breakpoints_stat.write("\t".join(['Points','Point1_End','Point2_End',"Overlap",'DoubleReads','SingleReads','UMIkinds','FusionSeq'])+"\n") for key_site in PrimerPre_Site_Info.keys(): temp_info_dict=PrimerPre_Site_Info[key_site] primerID=temp_info_dict[1] key_site_end=temp_info_dict[-1] if primerID in PRIMER_HOMO.keys(): primer_list=PRIMER_HOMO[primerID] else: primer_list=[] primer_list.append(primerID) primer_candidate_fusion_dict=get_primer_Site(primer_list,temp_info_dict[2],temp_info_dict[3]) for item in primer_candidate_fusion_dict.keys(): item_temp=primer_candidate_fusion_dict[item] site=item_temp[1]+"-"+key_site fusion_breakpoints_detail.write("\t".join([key_site,site,item_temp[2],key_site_end,item_temp[3],item])+"\n") ID_list=single_points_PrimersPre_readsInfo_dict[key_site] SingleID=len(ID_list) UMIkinds=get_Uniq_UMIs(ID_list) fusion_breakpoints_stat.write("\t".join([site,item_temp[2],key_site_end,str(0),str(0),str(SingleID),str(UMIkinds),item_temp[3]])+"\n") for ID in ID_list: fusion_breakpoints_readsID.write("\t".join([site,item_temp[-2],ID])+"\n") fusion_breakpoints_detail.close() fusion_breakpoints_readsID.close() fusion_breakpoints_stat.close() def get_reads_primers_info(readsPrimer): reads_Primer_dict={} with open(readsPrimer,'r') as rp: for line in rp: info=line.strip("\n").split("\t") if len(info)==10: reads_Primer_dict[info[0]]=[info[3],info[7]] return reads_Primer_dict def main(indir,readsPrimer): t1=time.time() get_primer_info() #读取reads primer信息 reads_Primer_dict=get_reads_primers_info(readsPrimer) PrimerPre_Site_Info_file=indir+"/single_points_PrimersPre_site.txt" PrimerPre_Site_Info=get_PrimerPre_Site(PrimerPre_Site_Info_file,reads_Primer_dict) get_single_primer_points(PrimerPre_Site_Info,indir) t2=time.time() print("Times: "+str(int(t2-t1))+"s") print("BreakPoints Stat Single Done!") if __name__ == '__main__': parser = argparse.ArgumentParser(description='single breakpoints primers') parser.add_argument('-i', required=True, type=str, help="indir") parser.add_argument('-r', required=True, type=str, help="reads primers file") args = parser.parse_args() main(args.i,args.r)