class StatReadsFusion: def __init__(self): self.BASE_DICT={'A':'T','T':'A','G':'C','C':'G','N':'N','-':'-'} def GetSitePos(self,site): f_chr=site.split("-")[0].split(':')[0] f_pos=site.split("-")[0].split(':')[1] e_chr=site.split("-")[1].split(':')[0] e_pos=site.split("-")[1].split(':')[1] return f_chr,f_pos,e_chr,e_pos def CheckSite(self,key_site,select_site,FLAG=False): key_site_info=self.GetSitePos(key_site) select_site_info=self.GetSitePos(select_site) if key_site_info[0] == select_site_info[0] and abs(int(key_site_info[1])-int(select_site_info[1])) <=5: if FLAG or key_site_info[2] == select_site_info[2] and abs(int(key_site_info[3])-int(select_site_info[3])) <=5: return True elif key_site_info[0] == select_site_info[2] and abs(int(key_site_info[1])-int(select_site_info[3])) <=5: if FLAG or key_site_info[2] == select_site_info[0] and abs(int(key_site_info[3])-int(select_site_info[1])) <=5: return True else: return False def UniqSiteID(self,site_list): site_dict_num={} total_num = len(site_list) for item in site_list: if item not in site_dict_num.keys(): site_dict_num[item]=1 else: site_dict_num[item]+=1 seq_site_sort= sorted(site_dict_num.items(), key=lambda d:d[1], reverse = True) site_num=len(seq_site_sort) key_site =seq_site_sort[0] site_list_new=[key_site] count = int(key_site[1]) for index in range(1,site_num): select_site=seq_site_sort[index] if not self.CheckSite(key_site[0],select_site[0]): site_list_new.append(select_site) else: count += int(select_site[1]) #不超过1半,则不合并 if count*2 >= total_num: return site_list_new else: return seq_site_sort def GetCombineSeq(self,key_seq,check_index,seq_count_sort): total_seq_num=len(seq_count_sort) key_seq_list=[] for index in range(total_seq_num): if index not in check_index: select_seq=seq_count_sort[index][0] if self.IsSameSeq(key_seq,select_seq): key_seq_list.append(select_seq) check_index.append(index) return key_seq_list def IsSameSeq(self,key_seq,select_seq): select_seq_rev_comp="".join([self.BASE_DICT[base] for base in select_seq][::-1]) #flag = True if key_seq == "AGTGGTTTTCTACTCTAAATATAGTAACTAGTATCCTCCTGGCTGATCAGGGGGTGGGGA" and select_seq == "GTCTCTGAGTGGTTTTCTACTCTAAATATATTAAATAGTATCCTCCTGGCTGATCAGGGG" else False flag =False #select seq former select_seq_num = len(select_seq) select_seq_num_half = int(select_seq_num/2) for index in range(select_seq_num_half): select_seq_temp=select_seq[index:] temp_len=len(select_seq_temp) key_seq_temp=key_seq[:temp_len] if self.SeqOverlap(select_seq_temp,key_seq_temp,8): return True #select seq latter key_seq_num = len(key_seq) key_seq_num_half = int(key_seq_num/2) for index in range(key_seq_num_half): key_seq_temp=key_seq[index:] temp_len=len(key_seq_temp) select_seq_temp=select_seq[:temp_len] if self.SeqOverlap(select_seq_temp,key_seq_temp): return True #rev_comp select seq former for index in range(select_seq_num_half): select_seq_rev_comp_temp=select_seq_rev_comp[index:] temp_len=len(select_seq_rev_comp_temp) key_seq_temp=key_seq[:temp_len] if self.SeqOverlap(select_seq_rev_comp_temp,key_seq_temp): return True #rev_comp select seq latter for index in range(key_seq_num_half): key_seq_temp=key_seq[index:] temp_len=len(key_seq_temp) select_seq_rev_comp_temp=select_seq_rev_comp[:temp_len] if self.SeqOverlap(select_seq_rev_comp_temp,key_seq_temp): return True return False def SeqOverlap(self,seq1,seq2,MISMATCH=4): seq_len=min(len(seq1),len(seq2)) count =0 for index in range(seq_len): if seq1[index] != seq2[index]: count +=1 if count >MISMATCH: return False return True def SinglePoint(self,primer_list,mismatch_seq,match_seq): primer_down_seq = primer_list[9] primer_pos_start = primer_list[3] primer_pos_end = primer_list[4] primer_chr = primer_list[2] primer_strand = primer_list[5] primer_seq = primer_list[1] primer_len = len(primer_seq) mismatch_len = len(mismatch_seq) temp_len = mismatch_len - primer_len if temp_len<5: return False,"" for index in range(primer_len,10,-1): for i_index in range(temp_len,5,-1): primer_seq_temp = primer_seq[-1*index:] mismatch_seq_temp = mismatch_seq[-1*(i_index+index):-1*i_index] if self.SeqOverlap(primer_seq_temp,mismatch_seq_temp,2): primer_down_seq_temp=primer_down_seq[:i_index] mismatch_seq_temp1 = mismatch_seq[-1*i_index:] if self.SeqOverlap(primer_down_seq_temp,mismatch_seq_temp1,0): if primer_strand =="-1": point=int(primer_pos_start)-i_index point_e=point+30 else: point=int(primer_pos_end)+i_index point_e=point-30 down_seq = match_seq[:30] return True,primer_seq_temp+mismatch_seq_temp1+down_seq,primer_chr+":"+str(point),primer_chr+":"+str(point_e) return False,"" def SingleCombine(slef,info_dict,seq): seq_len=len(seq) for index in range(seq_len): base = seq[index] if index in info_dict.keys(): info_dict[index][base]+=1 else: BASE_DICT={'A':0,'T':0,'G':0,'C':0,'N':0} BASE_DICT[base] +=1 info_dict[index]=BASE_DICT def SingleCombineSeq(self,info_dict): new_seq="" total_count =0 flag = True for index in info_dict.keys(): temp_dict=info_dict[index] max_count =0 max_base="" for base in temp_dict.keys(): total_count = total_count+temp_dict[base] if flag else total_count if temp_dict[base] > max_count: max_base = base max_count=temp_dict[base] flag = False if max_count > 0.5 * total_count: new_seq=new_seq+max_base else: return new_seq return new_seq def CheckSingleSite(slef,site,double_site): single_info=site.split(":") single_chr = single_info[0] single_pos = int(single_info[1]) double_info_temp = double_site.split("-") double_info1 = double_info_temp[0] double_info2 = double_info_temp[1] double_info1_temp = double_info1.split(":") double_info2_temp = double_info2.split(":") double_info1_chr = double_info1_temp[0] double_info1_pos = int(double_info1_temp[1]) double_info2_chr = double_info2_temp[0] double_info2_pos = int(double_info2_temp[1]) if single_chr == double_info1_chr and abs(double_info1_pos-single_pos) <7: return True if single_chr == double_info2_chr and abs(double_info2_pos-single_pos) < 7: return True return False def CheckSingleFusionSeq(self,new_seq,fusion_seq): new_seq_rev="".join([self.BASE_DICT[base] for base in new_seq])[::-1] new_seq_len=len(new_seq) fusion_seq_len=len(fusion_seq) #new seq---fusion seq for index in range(new_seq_len,35,-1): new_seq_temp = new_seq[-1*index:] new_seq_temp_len = len(new_seq_temp) if new_seq_temp_len > fusion_seq_len: continue fusion_seq_temp = fusion_seq[:new_seq_temp_len] if self.SeqOverlap(fusion_seq_temp,new_seq_temp): return True #new seq rev comp--fusion seq for index in range(new_seq_len,35,-1): new_seq_temp = new_seq_rev[-1*index:] new_seq_temp_len = len(new_seq_temp) if new_seq_temp_len > fusion_seq_len: continue fusion_seq_temp = fusion_seq[:new_seq_temp_len] if self.SeqOverlap(fusion_seq_temp,new_seq_temp): return True #fusion seq -- new seq for index in range(fusion_seq_len,35,-1): fusion_seq_temp = fusion_seq[-1*index:] fusion_seq_temp_len = len(fusion_seq_temp) if fusion_seq_temp_len>new_seq_len: continue new_seq_temp = new_seq[:fusion_seq_temp_len] if self.SeqOverlap(fusion_seq_temp,new_seq_temp): return True #fusion seq -- new seq rev for index in range(fusion_seq_len,35,-1): fusion_seq_temp = fusion_seq[-1*index:] fusion_seq_temp_len = len(fusion_seq_temp) if fusion_seq_temp_len>new_seq_len: continue new_seq_temp = new_seq_rev[:fusion_seq_temp_len] if self.SeqOverlap(fusion_seq_temp,new_seq_temp): return True return False def GetSinglePrimer(self,seq,primer_seq_dict): primer="" if seq in primer_seq_dict.keys(): primer = primer_seq_dict[seq][0] else: for item in primer_seq_dict.keys(): if len(item) != len(seq): continue if self.SeqOverlap(seq,item): primer=primer_seq_dict[item][0] return primer def StatFPPrimer(self,readsIDFile,readsPrimer_dict): with open(readsIDFile,'r') as rf: header=rf.readline().strip("\n").split("\t") ReadsID_index=header.index("ReadsID") FP_index=header.index("FP_name") for line in rf: info = line.strip("\n").split("\t") readsPrimer_dict[info[ReadsID_index]] = info[FP_index] class StatDoubleReadsFusion(StatReadsFusion): def __init__(self): self.BASE_DICT={'A':'T','T':'A','G':'C','C':'G','N':'N','-':'-'} def StatPrimer(self,readsIDFile,readsPrimer_dict): self.StatFPPrimer(readsIDFile,readsPrimer_dict) def StatDouble(self,infile,readsID_dict,site_dict): seq_site_dict={} seq_readsID_dict={} seq_site_info={} with open(infile,'r') as inputF: header=inputF.readline().strip("\n").split("\t") seq_index=header.index("Seq") ReadsID_index=header.index("ReadsID") site_index=header.index("Site") Site1_Down_index=header.index("Site1_Down") Site2_Down_index=header.index("Site2_Down") Overlap_index=header.index("Overlap") for line in inputF: info = line.strip("\n").split("\t") seq = info[seq_index] ID = info[ReadsID_index] site = info[site_index] Site1_Down = info[Site1_Down_index] Site2_Down = info[Site2_Down_index] Overlap = info[Overlap_index] seq_rev_comp="".join([self.BASE_DICT[base] for base in seq][::-1]) if seq == "": continue seq_site_info[site]=[Site1_Down,Site2_Down,Overlap] if seq not in seq_site_dict.keys() and seq_rev_comp not in seq_site_dict.keys(): seq_site_dict[seq]=[site] seq_readsID_dict[seq]=[ID] elif seq_rev_comp in seq_site_dict.keys(): site_temp=site.split("-") site=site_temp[1]+'-'+site_temp[0] seq_site_dict[seq_rev_comp].append(site) seq_readsID_dict[seq_rev_comp].append(ID) elif seq in seq_site_dict.keys(): seq_site_dict[seq].append(site) seq_readsID_dict[seq].append(ID) #sort by count seq_count_dict={} for seq in seq_site_dict.keys(): count = len(seq_site_dict[seq]) seq_count_dict[seq]=count seq_count_sort= sorted(seq_count_dict.items(), key=lambda d:d[1], reverse = True) #combine seq total_seq_num=len(seq_count_sort) check_index=[] site_dict_temp={} for index in range(total_seq_num): if index not in check_index: key_seq=seq_count_sort[index][0] check_index.append(index) combine_seq_list=self.GetCombineSeq(key_seq,check_index,seq_count_sort) site_dict_temp[key_seq]=seq_site_dict[key_seq] seq_ID_list=seq_readsID_dict[key_seq] for seq in combine_seq_list: seq_ID_list.extend(seq_readsID_dict[seq]) site_list=seq_site_dict[seq] for site_item in site_list: site_dict_temp[key_seq].append(site_item) readsID_dict[key_seq]=seq_ID_list #combine site for key_seq in site_dict_temp.keys(): site_list=site_dict_temp[key_seq] temp = self.UniqSiteID(site_list) site=temp[0][0] site_rev_temp=site.split("-") site_rev=site_rev_temp[1]+'-'+site_rev_temp[0] site_temp=seq_site_info[site] if site in seq_site_info else seq_site_info[site_rev] site_dict[key_seq]=[site] for item in site_temp: site_dict[key_seq].append(item) class SingleReadsFusion(StatReadsFusion): def __init__(self,home_dict,info_dict,primer_seq_dict): self.home_dict=home_dict self.info_dict=info_dict self.primer_seq_dict=primer_seq_dict self.BASE_DICT={'A':'T','T':'A','G':'C','C':'G','N':'N','-':'-'} def StatPrimer(self,readsIDFile,readsPrimer_dict): self.StatFPPrimer(readsIDFile,readsPrimer_dict) def CheckSite(self,site,double_site): return self.CheckSingleSite(site,double_site) def CheckFusionSeq(self,new_seq,fusion_seq): return self.CheckSingleFusionSeq(new_seq,fusion_seq) def GetSingleCombine(self,info_list): mismatch_dict={} match_dict={} for item in info_list: mismatch_seq=item[1][::-1] self.SingleCombine(mismatch_dict,mismatch_seq) match_seq=item[2] self.SingleCombine(match_dict,match_seq) mismatch_seq=self.SingleCombineSeq(mismatch_dict)[::-1] match_seq=self.SingleCombineSeq(match_dict) return mismatch_seq,match_seq def GetSinglePoint(self,readsID,mismatch_seq,match_seq): readsID_info = readsID.split("_") if len(readsID_info)<5: return False,[] FP_primer = readsID_info[3] RP_primer = readsID_info[4] FP_primer_rev = "".join([self.BASE_DICT[base] for base in FP_primer])[::-1] #RP_primer_rev = "".join([self.BASE_DICT[base] for base in RP_primer])[::-1] if FP_primer in mismatch_seq or FP_primer_rev in mismatch_seq: primer_seq = FP_primer #elif RP_primer in mismatch_seq or RP_primer_rev in mismatch_seq: # primer_seq = RP_primer else: return False,[] primer_ID = self.GetSinglePrimer(primer_seq,self.primer_seq_dict) if primer_ID == "": return False,[] primer_list = self.info_dict[primer_ID] results=self.SinglePoint(primer_list,mismatch_seq,match_seq) final_point_list=[] if results[0]: temp=list(results)[1:] final_point_list.append(temp) #final_point_list.append(primer_ID) if primer_ID in self.home_dict.keys(): primer_homo_list = self.home_dict[primer_ID] for ID in primer_homo_list: ID_list = self.info_dict[ID] results=self.SinglePoint(ID_list,mismatch_seq,match_seq) if results[0]: temp=list(results)[1:] final_point_list.append(temp) #final_point_list.append(ID) return True,final_point_list