class ReadsFusion: def __init__(self): pass ###输入primer序列和reads序列 ###返回primer在reads上的位置,Front or End or NO def PrimerLoc(self,primer,seq): primer_len=len(primer) seq_primer=seq[:primer_len] seq_rev_comp="".join([self.BASE_DICT[base] for base in seq][::-1]) seq_rev_comp_primer=seq_rev_comp[:primer_len] if primer == seq_primer: return "Front" elif primer == seq_rev_comp_primer: return "End" return "NO" #####输入map info,例如99M51S #####返回{0:'M99',1:'S51'} def GetMapInfo(self,info_str): num_list=[] map_dict={} count=0 if not info_str: return map_dict for item in info_str: if item.isalpha(): num=len(num_list) index=0 for i in range(num): index += int(num_list[i])*10**(num-i-1) map_dict[count]=item+":"+str(index) count +=1 num_list=[] elif item.isdigit(): num_list.append(item) return map_dict ######输入reads序列,map info,目的是矫正map info中的 D or I ###返回矫正后的reads序列和矫正后的map info def AdjInfo(self,seq,map_info): flag = True for key in map_info.keys(): item = map_info[key] if 'D' in item or 'I' in item: flag = False if 'H' in map_info[0]: flag = False #不需要调整 if flag: return seq,map_info else: new_seq_list=[] new_info1={} start=0 old_item='' for key in map_info.keys(): item = map_info[key] temp_len=int(item.split(":")[1]) end = start+temp_len if item.startswith('M') or item.startswith('S'): for index in range(start,end): new_seq_list.append(seq[index]) old_item=item elif item.startswith('D') and old_item.startswith('M'): end = end-temp_len for index in range(temp_len): new_seq_list.append('-') elif item.startswith('H'): end = end-temp_len start = end # num=len(map_info) index =0 new_index= 0 while index+3 <= num: item1=map_info[index] item2=map_info[index+1] item3=map_info[index+2] if item1.startswith("M") and item2.startswith("D") and item3.startswith("M"): pos1=int(item1.split(":")[1]) pos2=int(item2.split(":")[1]) pos3=int(item3.split(":")[1]) new_info1[new_index] = 'M:'+str(pos1+pos2+pos3) new_index +=1 index +=3 elif item1.startswith("M") and item2.startswith("I") and item3.startswith("M"): pos1=int(item1.split(":")[1]) pos3=int(item3.split(":")[1]) new_info1[new_index] = 'M:'+str(pos1+pos3) new_index +=1 index +=3 elif item1.startswith("H") and item2.startswith("M"): new_info1[new_index] = item2 new_index +=1 index +=2 else: new_info1[new_index] = item1 new_index += 1 index += 1 while index < num: new_info1[new_index]=map_info[index] new_index += 1 index += 1 return "".join(new_seq_list),new_info1 ###对于double reads,同一条reads上同时比对到位置,输入同一reads两种比对情况 ###输出比对位置断点坐标 def GetPoints(self,soft_info,hard_info,OVERLAP=10): #不分析有缺省值的 if len(soft_info)<7 or len(hard_info)<7: return False,1 #map_info:{0:'M:90',1:'S:60'} #seq:ATGC..... map_info1=self.GetMapInfo(soft_info[5]) map_info2=self.GetMapInfo(hard_info[5]) seq1=soft_info[6] seq2=hard_info[6] #矫正map info中有D,I的 if 'D' in soft_info[5] or 'I' in soft_info[5]: info1_temp=self.AdjInfo(seq1,map_info1) seq1=info1_temp[0] map_info1=info1_temp[1] if 'D' in hard_info[5] or 'I' in hard_info[5]: info2_temp=self.AdjInfo(seq2,map_info2) seq2=info2_temp[0] map_info2=info2_temp[1] chr1=soft_info[2] pos1=int(soft_info[3]) chr2=hard_info[2] pos2=int(hard_info[3]) #point1 soft_stat="" match_ms=0 point_ms=0 match_sm=0 point_sm=0 match_sms=0 skip_sms_f=0 skip_sms_e=0 point_sms_f=0 point_sms_e=0 if len(map_info1)==2: #{0: 'M:131', 1: 'S:19'} if 'S' in map_info1[1]: soft_stat="MS" match_ms=int(map_info1[0].split(":")[1]) point_ms=pos1+match_ms-1 #{0:'S:19',1:'M:131'} elif 'S' in map_info1[0]: soft_stat="SM" match_sm=int(map_info1[1].split(":")[1]) point_sm=pos1 else: return False,2 elif len(map_info1)==3: #{0: 'S:51', 1: 'M:99', 2:'S:5'} if 'S' in map_info1[0] and 'M' in map_info1[1] and 'S' in map_info1[2]: soft_stat="SMS" match_sms=int(map_info1[1].split(":")[1]) skip_sms_f=int(map_info1[0].split(":")[1]) skip_sms_e=int(map_info1[2].split(":")[1]) point_sms_f=pos1 point_sms_e=pos1+match_sms-1 else: return False,3 else: return False,4 #point2 hard_stat="" match_mh=0 skip_mh=0 point_mh=0 skip_hm=0 point_hm=0 match_hmh=0 skip_hmh_f=0 skip_hmh_e=0 point_hmh_f=0 point_hmh_e=0 if len(map_info2)==2: #{0:'M:31',1:'H:50'} if 'H' in map_info2[1]: hard_stat="MH" match_mh=int(map_info2[0].split(":")[1]) skip_mh=int(map_info2[1].split(":")[1]) point_mh=pos2+match_mh-1 #{0:'H:50',1:'M:31'} elif 'H' in map_info2[0]: hard_stat="HM" skip_hm=int(map_info2[0].split(":")[1]) point_hm=pos2 else: return False,5 elif len(map_info2)==3: #{0:'H:30',1:"M:100",2:"H:15"} if 'H' in map_info2[0] and 'M' in map_info2[1] and 'H' in map_info2[2]: hard_stat="HMH" match_hmh=int(map_info2[1].split(":")[1]) skip_hmh_f=int(map_info2[0].split(":")[1]) skip_hmh_e=int(map_info2[2].split(":")[1]) point_hmh_f=pos2 point_hmh_e=pos2+match_hmh-1 else: return False,6 else: return False,7 seq1_rev="".join([self.BASE_DICT[base] for base in seq1][::-1]) #确定points和fusion_seq point1=0 point1_e=0 point2=0 point2_e=0 overlap=0 fusion_seq="" if soft_stat =="MS": point1=point_ms point1_e=point1-30 fusion_seq=seq1[match_ms-30:match_ms+30] if hard_stat == "MH": if match_ms >= skip_mh: overlap = match_ms - skip_mh else: return False,8 point2=point_mh-overlap point2_e=point2-30 elif hard_stat == "HM": if match_ms >= skip_hm: overlap = match_ms - skip_hm else: return False,9 point2=point_hm+overlap point2_e=point2+30 elif hard_stat == "HMH": seq1_temp=seq1[skip_hmh_f:skip_hmh_f+match_hmh] seq1_rev_temp=seq1_rev[skip_hmh_f:skip_hmh_f+match_hmh] if seq2 == seq1_temp: if match_ms >= skip_hmh_f: overlap = match_ms - skip_hmh_f else: return False,10 point2=point_hmh_f+overlap point2_e=point2+30 elif seq2 == seq1_rev_temp: if match_ms >= skip_hmh_e: overlap = match_ms -skip_hmh_e else: return False,11 point2=point_hmh_e-overlap point2_e=point2-30 else: return False,12 else: return False,13 elif soft_stat == "SM": fusion_seq=seq1[match_sm-30:match_sm+30] point1=point_sm point1_e=point1+30 if hard_stat == "HM": if match_sm >=skip_hm: overlap = match_sm - skip_hm else: return False,14 point2=point_hm+overlap point2_e=point2+30 elif hard_stat == "MH": if match_sm >= skip_mh: overlap = match_sm - skip_mh else: return False,15 point2=point_mh-overlap point2_e=point2-30 elif hard_stat == "HMH": seq1_temp=seq1[skip_hmh_f:skip_hmh_f+match_hmh] seq1_rev_temp=seq1_rev[skip_hmh_f:skip_hmh_f+match_hmh] if seq2 == seq1_temp: if match_sm >= skip_hmh_e: overlap = match_sm - skip_hmh_e else: return False,16 point2=point_hmh_e-overlap point2_e=point2-30 elif seq2 == seq1_rev_temp: if match_sm >= skip_hmh_f: overlap = match_sm - skip_hmh_f else: return False,17 point2=point_hmh_f+overlap point2_e=point2+30 else: return False,18 elif soft_stat == "SMS": if hard_stat == "HM": seq1_temp=seq1[skip_hm:] seq1_rev_temp=seq1_rev[skip_hm:] if seq2 == seq1_temp: point1=point_sms_e point1_e=point1-30 fusion_seq=seq1[skip_sms_f+match_sms-30:skip_sms_f+match_sms+30] if skip_sms_f+match_sms >= skip_hm: overlap = skip_sms_f+match_sms - skip_hm else: return False,19 elif seq2 == seq1_rev_temp: point1=point_sms_f point1_e=point1+30 fusion_seq=seq1[skip_sms_f-30:skip_sms_f+30] if skip_sms_e+match_sms >= skip_hm: overlap = skip_sms_e+match_sms - skip_hm else: return False,20 else: return False,21 point2=point_hm+overlap point2_e=point2+30 elif hard_stat == "MH": seq1_temp=seq1[:match_mh] seq1_rev_temp=seq1_rev[:match_mh] if seq2 == seq1_temp: point1=point_sms_f point1_e=point1+30 fusion_seq=seq1[skip_sms_f-30:skip_sms_f+30] if skip_sms_e+match_sms >= skip_mh: overlap = skip_sms_e+match_sms - skip_mh else: return False,22 elif seq2 == seq1_rev_temp: point1=point_sms_e point1_e=point1-30 fusion_seq=seq1[skip_sms_f+match_sms-30:skip_sms_f+match_sms+30] if skip_sms_f+match_sms >= skip_mh: overlap = skip_sms_f+match_sms - skip_mh else: return False,23 else: return False,24 point2=point_mh-overlap point2_e=point2-30 elif hard_stat == "HMH": if skip_hmh_f < skip_sms_f and skip_sms_f <= skip_hmh_f + match_hmh: seq_temp = seq1[skip_hmh_f:skip_hmh_f + match_hmh] if seq2 == seq_temp: point1 = point_sms_f point1_e = point1 + 30 fusion_seq=seq1[skip_sms_f-30:skip_sms_f+30] overlap = skip_hmh_f + match_hmh - skip_sms_f point2 = point_hmh_e - overlap point2_e = point2 -30 else: return False,25 elif skip_hmh_f >= skip_sms_f and skip_hmh_f <= skip_sms_f + match_sms: seq_temp = seq1[skip_hmh_f:skip_hmh_f + match_hmh] if seq2 == seq_temp: point1 = point_sms_e point1_e = point1 - 30 fusion_seq=seq1[skip_sms_f+match_sms-30:skip_sms_f+match_sms+30] overlap = skip_sms_f + match_sms - skip_hmh_f point2 = point_hmh_f + overlap point2_e = point2 +30 return False,26 elif skip_sms_e > skip_hmh_e and skip_sms_e <= skip_hmh_f+match_hmh: seq_temp = seq1_rev[skip_hmh_f:skip_hmh_f + match_hmh] if seq2 == seq_temp: point1 = point_sms_e point1_e = point1 - 30 fusion_seq=seq1[skip_sms_f+match_sms-30:skip_sms_f+match_sms+30] overlap = skip_hmh_f+match_hmh - skip_sms_e point2 = point_hmh_e - overlap point2_e = point2 - 30 else: return False,27 elif skip_hmh_f > skip_sms_e and skip_hmh_f <= skip_sms_e+match_sms: seq_temp = seq1_rev[skip_hmh_f:skip_hmh_f + match_hmh] if seq2 == seq_temp: point1 = point_sms_f point1_e = point1 + 30 fusion_seq=seq1[skip_sms_f-30:skip_sms_f+30] overlap = skip_sms_e+match_sms - skip_hmh_f point2 = point_hmh_e + overlap point2_e = point2 + 30 else: return False,28 return False,29 else: return False,30 else: return False,31 if abs(overlap) <= OVERLAP and point1 !=0 and point2 !=0: return True,chr1+":"+str(point1),chr2+":"+str(point2),chr1+":"+str(point1_e),fusion_seq,str(overlap),chr1+":"+str(point1_e),chr2+":"+str(point2_e) else: return False,32 ###输入含有soft clip 的reads ###输出比对到的位置坐标 def GetSinglePoints(self,R1_read_info,R2_read_info): ID_info = R1_read_info[0].split("_") ID = R1_read_info[0] R1_seq=R1_read_info[6] breakpoint1=0 breakpoint1_e=0 breakpoint2=0 breakpoint2_e=0 R1_loc="" R1_map_info={} map_str=R1_read_info[5] chr1=R1_read_info[2] R1_mismatch_seq="" R1_match_seq="" #R1 primer存在 if len(ID_info)>=4: R1_primer=ID_info[3] R1_map_info=self.GetMapInfo(R1_read_info[5]) R1_loc=self.PrimerLoc(R1_primer,R1_seq) R1_primer_len=len(R1_primer) #SMS if len(R1_map_info)==3 and R1_map_info[0][0] == "S" and R1_map_info[2][0]=="S": end_mismatch_len=int(R1_map_info[2].split(":")[1]) start_mismatch_len=int(R1_map_info[0].split(":")[1]) match_len=int(R1_map_info[1].split(":")[1]) if R1_loc == "End": #mismatch in end if end_mismatch_len > R1_primer_len: mismatch_len=end_mismatch_len R1_seq="".join([self.BASE_DICT[base] for base in R1_seq][::-1]) R1_mismatch_seq=R1_seq[:mismatch_len] R1_match_seq=R1_seq[mismatch_len:] breakpoint1=R1_read_info[3]+match_len-1 breakpoint1_e=breakpoint1-30 #mismatch in front else: mismatch_len=start_mismatch_len R1_mismatch_seq=R1_seq[:mismatch_len] R1_match_seq=R1_seq[mismatch_len:] breakpoint1=R1_read_info[3] breakpoint1_e=breakpoint1+30 elif R1_loc == "Front": #mismatch in front if start_mismatch_len > R1_primer_len: mismatch_len=start_mismatch_len R1_mismatch_seq=R1_seq[:mismatch_len] R1_match_seq=R1_seq[mismatch_len:] breakpoint1=R1_read_info[3] breakpoint1_e=breakpoint1+30 #mismatch in end else: mismatch_len=end_mismatch_len R1_seq="".join([self.BASE_DICT[base] for base in R1_seq][::-1]) R1_mismatch_seq=R1_seq[:mismatch_len] R1_match_seq=R1_seq[mismatch_len:] breakpoint1=R1_read_info[3]+match_len-1 breakpoint1_e=breakpoint1-30 #SM or MS elif len(R1_map_info)==2: #SM #mismatch in front if R1_map_info[0][0]=="S": mismatch_len=int(R1_map_info[0].split(":")[1]) if R1_loc == "End" or (R1_loc == "Front" and R1_primer_len < mismatch_len): R1_mismatch_seq=R1_seq[:mismatch_len] R1_match_seq=R1_seq[mismatch_len:] breakpoint1=R1_read_info[3] breakpoint1_e=breakpoint1+30 #else: # return [],[] #MS #mismatch in end elif R1_map_info[1][0]=="S": mismatch_len=int(R1_map_info[1].split(":")[1]) if R1_loc == "Front" or (R1_loc=="End" and R1_primer_len < mismatch_len): R1_seq="".join([self.BASE_DICT[base] for base in R1_seq][::-1]) match_len=int(R1_map_info[0].split(":")[1]) R1_mismatch_seq=R1_seq[:mismatch_len] R1_match_seq=R1_seq[mismatch_len:] breakpoint1=R1_read_info[3]+match_len-1 breakpoint1_e=breakpoint1-30 #else: # return [],[] R2_seq=R2_read_info[6] R2_loc="" R2_map_info={} map_str=R2_read_info[5] chr2=R2_read_info[2] R2_mismatch_seq="" R2_match_seq="" #R1,R2都存在primer if len(ID_info)==5: R2_primer=ID_info[4] R2_map_info=self.GetMapInfo(map_str) R2_loc=self.PrimerLoc(R2_primer,R2_seq) R2_primer_len=len(R2_primer) #SMS if len(R2_map_info)==3 and R2_map_info[0][0] == "S" and R2_map_info[2][0]=="S": end_mismatch_len=int(R2_map_info[2].split(":")[1]) start_mismatch_len=int(R2_map_info[0].split(":")[1]) match_len=int(R2_map_info[1].split(":")[1]) if R2_loc == "End": #mismatch in end if end_mismatch_len > R2_primer_len: R2_seq="".join([self.BASE_DICT[base] for base in R2_seq][::-1]) mismatch_len=end_mismatch_len R2_mismatch_seq=R2_seq[:mismatch_len] R2_match_seq=R2_seq[mismatch_len:] breakpoint2=R2_read_info[3]+match_len-1 breakpoint2_e=breakpoint2-30 #mismatch in front else: mismatch_len=start_mismatch_len R2_mismatch_seq=R2_seq[:mismatch_len] R2_match_seq=R2_seq[mismatch_len:] breakpoint2=R2_read_info[3] breakpoint2_e=breakpoint2+30 if R2_loc == "Front": #mismatch in front if start_mismatch_len > R2_primer_len: mismatch_len=start_mismatch_len R2_mismatch_seq=R2_seq[:mismatch_len] R2_match_seq=R2_seq[mismatch_len:] breakpoint2=R2_read_info[3] breakpoint2_e=breakpoint2+30 #mismatch in end else: mismatch_len=end_mismatch_len R2_seq="".join([self.BASE_DICT[base] for base in R2_seq][::-1]) R2_mismatch_seq=R2_seq[:mismatch_len] R2_match_seq=R2_seq[mismatch_len:] breakpoint2=R2_read_info[3]+match_len-1 breakpoint2_e=breakpoint2-30 elif len(R2_map_info)==2: #SM #mismatch in front if R2_map_info[0][0]=="S": mismatch_len=int(R2_map_info[0].split(":")[1]) if R2_loc == "End" or (R2_loc == "Front" and R2_primer_len=10: R1_point_list=[chr1+":"+str(breakpoint1),R1_mismatch_seq,R1_match_seq,chr1+":"+str(breakpoint1_e),ID] if breakpoint2 !=0 and len(R2_mismatch_seq)>=10: R2_point_list=[chr2+":"+str(breakpoint2),R2_mismatch_seq,R2_match_seq,chr2+":"+str(breakpoint2_e),ID] return R1_point_list,R2_point_list class DoubleReadsFusion(ReadsFusion): def __init__(self): self.BASE_DICT={'A':'T','T':'A','G':'C','C':'G','N':'N','-':'-'} def HardClipFusion(self,reads_info): R1_soft_info=[] R1_hard_info=[] R2_soft_info=[] R2_hard_info=[] for reads in reads_info: if reads[1]<128: R1_soft_info=reads elif reads[1]<256: R2_soft_info=reads elif reads[1]<2176: R1_hard_info=reads elif reads[1]<2304: R2_hard_info=reads if len(R1_soft_info)>0 and len(R1_hard_info)>0 and R1_soft_info[4]>=30: results=self.GetPoints(R1_soft_info,R1_hard_info) if results[0]: out_list=[results[1]+"-"+results[2],results[3],results[2],results[4],results[5],results[6],results[7]] return out_list #R2:hard-clip if len(R2_hard_info) > 0 and len(R2_soft_info) > 0 and R2_soft_info[4]>=30: results=self.GetPoints(R2_soft_info,R2_hard_info) if results[0]: out_list=[results[1]+"-"+results[2],results[3],results[2],results[4],results[5],results[6],results[7]] return out_list return [] class SingleReadsFusion(ReadsFusion): def __init__(self): self.BASE_DICT={'A':'T','T':'A','G':'C','C':'G','N':'N','-':'-'} def SoftClipFusion(self,reads_info): if len(reads_info)==1: item = reads_info[0] reads_info.append(item) R1_read_info=[] R2_read_info=[] for reads in reads_info: if reads[1] > 128: R2_read_info=reads else: R1_read_info=reads if len(R1_read_info) == 0: R1_read_info=R2_read_info if len(R2_read_info) == 0: R2_read_info=R1_read_info #比对质量值至少一个MAQ>=60 if int(R1_read_info[4])<60 and int(R2_read_info[4])<60: return [],[] return self.GetSinglePoints(R1_read_info,R2_read_info)