123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219 |
- #!/usr/bin/python
- # -*- coding:utf-8 -*-
- import argparse,time
- gene_info = {}
- gene_strand = {}
- DB_DIR="/cgdata/IVDRD/18gene_DB/qaseq_fusion"
- primer_homo_file=DB_DIR+"/sample_primer_stat.txt"
- primer_seq_file=DB_DIR+"/select_primer_info_total.txt"
- ref=DB_DIR+"/hg38.fa"
- db=DB_DIR+"/hg38"
- transcript_nm=DB_DIR+"/big_panel_transcript_nm_hg38.xls"
- PRIMER_INFO={}
- PRIMER_ID={}
- PRIMER_HOMO={}
- BASE_DICT={'A':'T','T':'A','G':'C','C':'G','N':'N'}
- def get_primer_info():
- #PRIMER HOMO
- #{'primer':['primer1','primer2','primer3',...],}
- with open(primer_homo_file,'r') as phf:
- for line in phf:
- info = line.strip("\n").split("\t")
- PRIMER_HOMO[info[0]]=info[1].split(",")
- #PRIMER ID
- #{'primer':'ATGC','primer1':'ATGCCC'}
- #PRIMER INFO
- with open(primer_seq_file,'r') as psf:
- for line in psf:
- info = line.strip("\n").split("\t")
- PRIMER_ID[info[0]]=info[1]
- PRIMER_INFO[info[0]]=info
- def get_PrimerPre_Site(PrimerPre_Site_Info_file,reads_Primer_dict):
- Info_dict={}
- with open(PrimerPre_Site_Info_file,'r') as PPSI:
- for line in PPSI:
- info = line.strip("\n").split("\t")
- ID=info[-1].split("_")[0]
- if ID == "":
- continue
- primerID=reads_Primer_dict[ID][0]
- if 'IPC' in primerID:
- continue
-
- mismatch_seq=info[2]
- match_seq=info[3]
- if info[1] =="Down":
- primerID=reads_Primer_dict[ID][1]
- if 'IPC' in primerID:
- continue
- mismatch_seq="".join([BASE_DICT[base] for base in mismatch_seq][::-1])
- match_seq="".join([BASE_DICT[base] for base in match_seq][::-1])
- Info_dict[info[0]]=[info[0],primerID,mismatch_seq,match_seq,info[-2]]
- return Info_dict
- def out_test(dict):
- for key in dict.keys():
- print("TEST:")
- print(key)
- print(dict[key])
- def OvarLap(seq1,seq2,MISMATCH=1):
- num = len(seq1)
- count=0
- for index in range(num):
- if seq1[index] !=seq2[index]:
- count +=1
- if count >MISMATCH:
- return False
- return True
- def check(primer_seq,primer_down_seq,mismatch_seq):
- mismatch_len=len(mismatch_seq)
- for index in range(mismatch_len,5,-1):
- num=min(len(primer_seq),mismatch_len-index)
- for match_index in range(num,5,-1):
- primer_temp_seq=primer_seq[-1*match_index:]+primer_down_seq[:index]
- mismatch_temp_seq=mismatch_seq[-1*index-match_index:]
- if OvarLap(primer_temp_seq,mismatch_temp_seq):
- return True,index,match_index+index
- return False,0,0
- def get_primer_Site(primer_list,mismatch_seq,match_seq):
- primer_candidate_fusion_dict={}
- for primer in primer_list:
- primer_temp=PRIMER_INFO[primer]
- primer_down_seq=primer_temp[-1]
- primer_up_seq=primer_temp[-2]
- primer_seq=primer_temp[1]
- check_results=check(primer_seq,primer_down_seq,mismatch_seq)
- if check_results[0]:
- match_common_len=int(check_results[1])
- total_match_common_len=int(check_results[2])
- strand = primer_temp[5]
- primer_chr=primer_temp[2]
- primer_start=int(primer_temp[3])
- primer_end=int(primer_temp[4])
- breakpoints=primer_end+match_common_len
- site_e=primer_chr+":"+str(breakpoints-30)
- if match_common_len>=30:
- fusion_seq=primer_down_seq[match_common_len-30:match_common_len]+match_seq[:30]
- else:
- if match_common_len+len(primer_seq)>=30:
- fusion_seq=primer_seq[-1*(30-match_common_len):]+primer_down_seq[:match_common_len]+match_seq[:30]
- elif match_common_len+len(primer_seq)>=15:
- primer_seq_len=len(primer_seq)
- fusion_seq=primer_up_seq[-1*(30-match_common_len-primer_seq_len):]+primer_seq+primer_down_seq[:match_common_len]+match_seq[:30]
- if strand == '-1':
- breakpoints=primer_start-match_common_len
- site_e=primer_chr+":"+str(breakpoints+30)
- site=primer_chr+":"+str(breakpoints)
- primer_candidate_fusion_dict[primer]=[primer,site,site_e,fusion_seq,str(total_match_common_len)]
- return primer_candidate_fusion_dict
- def get_Uniq_UMIs(ID_list):
- UMIs_dict={}
- for ID in ID_list:
- UMI=ID.split("_")[2]
- UMIs_dict[UMI]=0
-
- return len(UMIs_dict)
- def get_single_primer_points(PrimerPre_Site_Info,indir):
- #记录每个readsID 对应site
- single_points_PrimersPre_readsInfo_file=indir+"/single_points_PrimersPre_readsInfo.txt"
- single_points_PrimersPre_readsInfo_dict={}
- with open(single_points_PrimersPre_readsInfo_file,'r') as sppr:
- for line in sppr:
- info=line.strip("\n").split("\t")
- if info[0] not in single_points_PrimersPre_readsInfo_dict.keys():
- single_points_PrimersPre_readsInfo_dict[info[0]]=[info[-1]]
- else:
- single_points_PrimersPre_readsInfo_dict[info[0]].append(info[-1])
-
- fusion_breakpoints_detail_file=indir+"/single_fusion_breakpoints_detail.txt"
- fusion_breakpoints_detail=open(fusion_breakpoints_detail_file,'w')
- fusion_breakpoints_detail.write("\t".join(['KeyPoints','Points','Point1_End','Point2_End','FusionSeq','Primer'])+"\n")
- fusion_breakpoints_readsID_file=indir+"/single_fusion_breakpoints_readsID.txt"
- fusion_breakpoints_readsID=open(fusion_breakpoints_readsID_file,'w')
- fusion_breakpoints_readsID.write("\t".join(['Points','FusionSeq','ReadsID'])+"\n")
- fusion_breakpoints_stat_file=indir+"/single_fusion_breakpoints_stat.txt"
- fusion_breakpoints_stat=open(fusion_breakpoints_stat_file,'w')
- fusion_breakpoints_stat.write("\t".join(['Points','Point1_End','Point2_End',"Overlap",'DoubleReads','SingleReads','UMIkinds','FusionSeq'])+"\n")
- for key_site in PrimerPre_Site_Info.keys():
- temp_info_dict=PrimerPre_Site_Info[key_site]
- primerID=temp_info_dict[1]
- key_site_end=temp_info_dict[-1]
- if primerID in PRIMER_HOMO.keys():
- primer_list=PRIMER_HOMO[primerID]
- else:
- primer_list=[]
- primer_list.append(primerID)
-
- primer_candidate_fusion_dict=get_primer_Site(primer_list,temp_info_dict[2],temp_info_dict[3])
-
- for item in primer_candidate_fusion_dict.keys():
- item_temp=primer_candidate_fusion_dict[item]
- site=item_temp[1]+"-"+key_site
- fusion_breakpoints_detail.write("\t".join([key_site,site,item_temp[2],key_site_end,item_temp[3],item])+"\n")
-
- ID_list=single_points_PrimersPre_readsInfo_dict[key_site]
- SingleID=len(ID_list)
- UMIkinds=get_Uniq_UMIs(ID_list)
- fusion_breakpoints_stat.write("\t".join([site,item_temp[2],key_site_end,str(0),str(0),str(SingleID),str(UMIkinds),item_temp[3]])+"\n")
- for ID in ID_list:
- fusion_breakpoints_readsID.write("\t".join([site,item_temp[-2],ID])+"\n")
- fusion_breakpoints_detail.close()
- fusion_breakpoints_readsID.close()
- fusion_breakpoints_stat.close()
- def get_reads_primers_info(readsPrimer):
-
- reads_Primer_dict={}
- with open(readsPrimer,'r') as rp:
- for line in rp:
- info=line.strip("\n").split("\t")
- if len(info)==10:
- reads_Primer_dict[info[0]]=[info[3],info[7]]
- return reads_Primer_dict
- def main(indir,readsPrimer):
- t1=time.time()
- get_primer_info()
- #读取reads primer信息
- reads_Primer_dict=get_reads_primers_info(readsPrimer)
- PrimerPre_Site_Info_file=indir+"/single_points_PrimersPre_site.txt"
- PrimerPre_Site_Info=get_PrimerPre_Site(PrimerPre_Site_Info_file,reads_Primer_dict)
- get_single_primer_points(PrimerPre_Site_Info,indir)
- t2=time.time()
- print("Times: "+str(int(t2-t1))+"s")
- print("BreakPoints Stat Single Done!")
- if __name__ == '__main__':
- parser = argparse.ArgumentParser(description='single breakpoints primers')
- parser.add_argument('-i', required=True, type=str, help="indir")
- parser.add_argument('-r', required=True, type=str, help="reads primers file")
- args = parser.parse_args()
- main(args.i,args.r)
|