123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440 |
- class StatReadsFusion:
- def __init__(self):
- self.BASE_DICT={'A':'T','T':'A','G':'C','C':'G','N':'N','-':'-'}
- def GetSitePos(self,site):
-
- f_chr=site.split("-")[0].split(':')[0]
- f_pos=site.split("-")[0].split(':')[1]
- e_chr=site.split("-")[1].split(':')[0]
- e_pos=site.split("-")[1].split(':')[1]
- return f_chr,f_pos,e_chr,e_pos
- def CheckSite(self,key_site,select_site,FLAG=False):
-
- key_site_info=self.GetSitePos(key_site)
- select_site_info=self.GetSitePos(select_site)
- if key_site_info[0] == select_site_info[0] and abs(int(key_site_info[1])-int(select_site_info[1])) <=5:
- if FLAG or key_site_info[2] == select_site_info[2] and abs(int(key_site_info[3])-int(select_site_info[3])) <=5:
- return True
- elif key_site_info[0] == select_site_info[2] and abs(int(key_site_info[1])-int(select_site_info[3])) <=5:
- if FLAG or key_site_info[2] == select_site_info[0] and abs(int(key_site_info[3])-int(select_site_info[1])) <=5:
- return True
- else:
- return False
- def UniqSiteID(self,site_list):
- site_dict_num={}
- total_num = len(site_list)
- for item in site_list:
- if item not in site_dict_num.keys():
- site_dict_num[item]=1
- else:
- site_dict_num[item]+=1
-
- seq_site_sort= sorted(site_dict_num.items(), key=lambda d:d[1], reverse = True)
- site_num=len(seq_site_sort)
- key_site =seq_site_sort[0]
- site_list_new=[key_site]
- count = int(key_site[1])
- for index in range(1,site_num):
- select_site=seq_site_sort[index]
- if not self.CheckSite(key_site[0],select_site[0]):
- site_list_new.append(select_site)
- else:
- count += int(select_site[1])
- #不超过1半,则不合并
- if count*2 >= total_num:
- return site_list_new
- else:
- return seq_site_sort
-
- def GetCombineSeq(self,key_seq,check_index,seq_count_sort):
- total_seq_num=len(seq_count_sort)
- key_seq_list=[]
- for index in range(total_seq_num):
- if index not in check_index:
- select_seq=seq_count_sort[index][0]
- if self.IsSameSeq(key_seq,select_seq):
- key_seq_list.append(select_seq)
- check_index.append(index)
- return key_seq_list
- def IsSameSeq(self,key_seq,select_seq):
- select_seq_rev_comp="".join([self.BASE_DICT[base] for base in select_seq][::-1])
- #flag = True if key_seq == "AGTGGTTTTCTACTCTAAATATAGTAACTAGTATCCTCCTGGCTGATCAGGGGGTGGGGA" and select_seq == "GTCTCTGAGTGGTTTTCTACTCTAAATATATTAAATAGTATCCTCCTGGCTGATCAGGGG" else False
- flag =False
- #select seq former
- select_seq_num = len(select_seq)
- select_seq_num_half = int(select_seq_num/2)
- for index in range(select_seq_num_half):
- select_seq_temp=select_seq[index:]
- temp_len=len(select_seq_temp)
- key_seq_temp=key_seq[:temp_len]
- if self.SeqOverlap(select_seq_temp,key_seq_temp,8):
- return True
-
- #select seq latter
- key_seq_num = len(key_seq)
- key_seq_num_half = int(key_seq_num/2)
- for index in range(key_seq_num_half):
- key_seq_temp=key_seq[index:]
- temp_len=len(key_seq_temp)
- select_seq_temp=select_seq[:temp_len]
- if self.SeqOverlap(select_seq_temp,key_seq_temp):
- return True
-
- #rev_comp select seq former
- for index in range(select_seq_num_half):
- select_seq_rev_comp_temp=select_seq_rev_comp[index:]
- temp_len=len(select_seq_rev_comp_temp)
- key_seq_temp=key_seq[:temp_len]
- if self.SeqOverlap(select_seq_rev_comp_temp,key_seq_temp):
- return True
- #rev_comp select seq latter
- for index in range(key_seq_num_half):
- key_seq_temp=key_seq[index:]
- temp_len=len(key_seq_temp)
- select_seq_rev_comp_temp=select_seq_rev_comp[:temp_len]
- if self.SeqOverlap(select_seq_rev_comp_temp,key_seq_temp):
- return True
-
- return False
-
- def SeqOverlap(self,seq1,seq2,MISMATCH=4):
- seq_len=min(len(seq1),len(seq2))
- count =0
- for index in range(seq_len):
- if seq1[index] != seq2[index]:
- count +=1
- if count >MISMATCH:
- return False
- return True
-
- def SinglePoint(self,primer_list,mismatch_seq,match_seq):
-
- primer_down_seq = primer_list[9]
- primer_pos_start = primer_list[3]
- primer_pos_end = primer_list[4]
- primer_chr = primer_list[2]
- primer_strand = primer_list[5]
- primer_seq = primer_list[1]
- primer_len = len(primer_seq)
- mismatch_len = len(mismatch_seq)
- temp_len = mismatch_len - primer_len
- if temp_len<5:
- return False,""
- for index in range(primer_len,10,-1):
- for i_index in range(temp_len,5,-1):
- primer_seq_temp = primer_seq[-1*index:]
- mismatch_seq_temp = mismatch_seq[-1*(i_index+index):-1*i_index]
- if self.SeqOverlap(primer_seq_temp,mismatch_seq_temp,2):
- primer_down_seq_temp=primer_down_seq[:i_index]
- mismatch_seq_temp1 = mismatch_seq[-1*i_index:]
- if self.SeqOverlap(primer_down_seq_temp,mismatch_seq_temp1,0):
- if primer_strand =="-1":
- point=int(primer_pos_start)-i_index
- point_e=point+30
- else:
- point=int(primer_pos_end)+i_index
- point_e=point-30
- down_seq = match_seq[:30]
- return True,primer_seq_temp+mismatch_seq_temp1+down_seq,primer_chr+":"+str(point),primer_chr+":"+str(point_e)
- return False,""
-
- def SingleCombine(slef,info_dict,seq):
- seq_len=len(seq)
- for index in range(seq_len):
- base = seq[index]
- if index in info_dict.keys():
- info_dict[index][base]+=1
- else:
- BASE_DICT={'A':0,'T':0,'G':0,'C':0,'N':0}
- BASE_DICT[base] +=1
- info_dict[index]=BASE_DICT
-
- def SingleCombineSeq(self,info_dict):
- new_seq=""
- total_count =0
- flag = True
- for index in info_dict.keys():
- temp_dict=info_dict[index]
- max_count =0
- max_base=""
- for base in temp_dict.keys():
- total_count = total_count+temp_dict[base] if flag else total_count
- if temp_dict[base] > max_count:
- max_base = base
- max_count=temp_dict[base]
- flag = False
- if max_count > 0.5 * total_count:
- new_seq=new_seq+max_base
- else:
- return new_seq
- return new_seq
-
- def CheckSingleSite(slef,site,double_site):
- single_info=site.split(":")
- single_chr = single_info[0]
- single_pos = int(single_info[1])
- double_info_temp = double_site.split("-")
- double_info1 = double_info_temp[0]
- double_info2 = double_info_temp[1]
- double_info1_temp = double_info1.split(":")
- double_info2_temp = double_info2.split(":")
- double_info1_chr = double_info1_temp[0]
- double_info1_pos = int(double_info1_temp[1])
- double_info2_chr = double_info2_temp[0]
- double_info2_pos = int(double_info2_temp[1])
- if single_chr == double_info1_chr and abs(double_info1_pos-single_pos) <7:
- return True
- if single_chr == double_info2_chr and abs(double_info2_pos-single_pos) < 7:
- return True
-
- return False
-
- def CheckSingleFusionSeq(self,new_seq,fusion_seq):
- new_seq_rev="".join([self.BASE_DICT[base] for base in new_seq])[::-1]
- new_seq_len=len(new_seq)
- fusion_seq_len=len(fusion_seq)
- #new seq---fusion seq
- for index in range(new_seq_len,35,-1):
- new_seq_temp = new_seq[-1*index:]
- new_seq_temp_len = len(new_seq_temp)
- if new_seq_temp_len > fusion_seq_len:
- continue
- fusion_seq_temp = fusion_seq[:new_seq_temp_len]
- if self.SeqOverlap(fusion_seq_temp,new_seq_temp):
- return True
-
- #new seq rev comp--fusion seq
- for index in range(new_seq_len,35,-1):
- new_seq_temp = new_seq_rev[-1*index:]
- new_seq_temp_len = len(new_seq_temp)
- if new_seq_temp_len > fusion_seq_len:
- continue
- fusion_seq_temp = fusion_seq[:new_seq_temp_len]
- if self.SeqOverlap(fusion_seq_temp,new_seq_temp):
- return True
- #fusion seq -- new seq
- for index in range(fusion_seq_len,35,-1):
- fusion_seq_temp = fusion_seq[-1*index:]
- fusion_seq_temp_len = len(fusion_seq_temp)
- if fusion_seq_temp_len>new_seq_len:
- continue
- new_seq_temp = new_seq[:fusion_seq_temp_len]
- if self.SeqOverlap(fusion_seq_temp,new_seq_temp):
- return True
-
- #fusion seq -- new seq rev
- for index in range(fusion_seq_len,35,-1):
- fusion_seq_temp = fusion_seq[-1*index:]
- fusion_seq_temp_len = len(fusion_seq_temp)
- if fusion_seq_temp_len>new_seq_len:
- continue
- new_seq_temp = new_seq_rev[:fusion_seq_temp_len]
- if self.SeqOverlap(fusion_seq_temp,new_seq_temp):
- return True
-
- return False
-
- def GetSinglePrimer(self,seq,primer_seq_dict):
- primer=""
- if seq in primer_seq_dict.keys():
- primer = primer_seq_dict[seq][0]
- else:
- for item in primer_seq_dict.keys():
- if len(item) != len(seq):
- continue
- if self.SeqOverlap(seq,item):
- primer=primer_seq_dict[item][0]
- return primer
-
- def StatFPPrimer(self,readsIDFile,readsPrimer_dict):
- with open(readsIDFile,'r') as rf:
- header=rf.readline().strip("\n").split("\t")
- ReadsID_index=header.index("ReadsID")
- FP_index=header.index("FP_name")
- for line in rf:
- info = line.strip("\n").split("\t")
- readsPrimer_dict[info[ReadsID_index]] = info[FP_index]
-
- class StatDoubleReadsFusion(StatReadsFusion):
-
- def __init__(self):
- self.BASE_DICT={'A':'T','T':'A','G':'C','C':'G','N':'N','-':'-'}
- def StatPrimer(self,readsIDFile,readsPrimer_dict):
- self.StatFPPrimer(readsIDFile,readsPrimer_dict)
- def StatDouble(self,infile,readsID_dict,site_dict):
- seq_site_dict={}
- seq_readsID_dict={}
- seq_site_info={}
-
- with open(infile,'r') as inputF:
- header=inputF.readline().strip("\n").split("\t")
- seq_index=header.index("Seq")
- ReadsID_index=header.index("ReadsID")
- site_index=header.index("Site")
- Site1_Down_index=header.index("Site1_Down")
- Site2_Down_index=header.index("Site2_Down")
- Overlap_index=header.index("Overlap")
- for line in inputF:
- info = line.strip("\n").split("\t")
- seq = info[seq_index]
- ID = info[ReadsID_index]
- site = info[site_index]
- Site1_Down = info[Site1_Down_index]
- Site2_Down = info[Site2_Down_index]
- Overlap = info[Overlap_index]
- seq_rev_comp="".join([self.BASE_DICT[base] for base in seq][::-1])
- if seq == "":
- continue
- seq_site_info[site]=[Site1_Down,Site2_Down,Overlap]
- if seq not in seq_site_dict.keys() and seq_rev_comp not in seq_site_dict.keys():
- seq_site_dict[seq]=[site]
- seq_readsID_dict[seq]=[ID]
- elif seq_rev_comp in seq_site_dict.keys():
- site_temp=site.split("-")
- site=site_temp[1]+'-'+site_temp[0]
- seq_site_dict[seq_rev_comp].append(site)
- seq_readsID_dict[seq_rev_comp].append(ID)
- elif seq in seq_site_dict.keys():
- seq_site_dict[seq].append(site)
- seq_readsID_dict[seq].append(ID)
-
- #sort by count
- seq_count_dict={}
- for seq in seq_site_dict.keys():
- count = len(seq_site_dict[seq])
- seq_count_dict[seq]=count
- seq_count_sort= sorted(seq_count_dict.items(), key=lambda d:d[1], reverse = True)
- #combine seq
- total_seq_num=len(seq_count_sort)
- check_index=[]
- site_dict_temp={}
- for index in range(total_seq_num):
- if index not in check_index:
- key_seq=seq_count_sort[index][0]
- check_index.append(index)
- combine_seq_list=self.GetCombineSeq(key_seq,check_index,seq_count_sort)
- site_dict_temp[key_seq]=seq_site_dict[key_seq]
- seq_ID_list=seq_readsID_dict[key_seq]
- for seq in combine_seq_list:
- seq_ID_list.extend(seq_readsID_dict[seq])
- site_list=seq_site_dict[seq]
- for site_item in site_list:
- site_dict_temp[key_seq].append(site_item)
- readsID_dict[key_seq]=seq_ID_list
-
- #combine site
- for key_seq in site_dict_temp.keys():
- site_list=site_dict_temp[key_seq]
- temp = self.UniqSiteID(site_list)
- site=temp[0][0]
- site_rev_temp=site.split("-")
- site_rev=site_rev_temp[1]+'-'+site_rev_temp[0]
- site_temp=seq_site_info[site] if site in seq_site_info else seq_site_info[site_rev]
- site_dict[key_seq]=[site]
- for item in site_temp:
- site_dict[key_seq].append(item)
- class SingleReadsFusion(StatReadsFusion):
- def __init__(self,home_dict,info_dict,primer_seq_dict):
- self.home_dict=home_dict
- self.info_dict=info_dict
- self.primer_seq_dict=primer_seq_dict
- self.BASE_DICT={'A':'T','T':'A','G':'C','C':'G','N':'N','-':'-'}
-
- def StatPrimer(self,readsIDFile,readsPrimer_dict):
- self.StatFPPrimer(readsIDFile,readsPrimer_dict)
- def CheckSite(self,site,double_site):
- return self.CheckSingleSite(site,double_site)
- def CheckFusionSeq(self,new_seq,fusion_seq):
- return self.CheckSingleFusionSeq(new_seq,fusion_seq)
-
- def GetSingleCombine(self,info_list):
-
- mismatch_dict={}
- match_dict={}
- for item in info_list:
- mismatch_seq=item[1][::-1]
- self.SingleCombine(mismatch_dict,mismatch_seq)
- match_seq=item[2]
- self.SingleCombine(match_dict,match_seq)
-
- mismatch_seq=self.SingleCombineSeq(mismatch_dict)[::-1]
- match_seq=self.SingleCombineSeq(match_dict)
- return mismatch_seq,match_seq
- def GetSinglePoint(self,readsID,mismatch_seq,match_seq):
-
- readsID_info = readsID.split("_")
-
- if len(readsID_info)<5:
- return False,[]
- FP_primer = readsID_info[3]
- RP_primer = readsID_info[4]
- FP_primer_rev = "".join([self.BASE_DICT[base] for base in FP_primer])[::-1]
- #RP_primer_rev = "".join([self.BASE_DICT[base] for base in RP_primer])[::-1]
- if FP_primer in mismatch_seq or FP_primer_rev in mismatch_seq:
- primer_seq = FP_primer
- #elif RP_primer in mismatch_seq or RP_primer_rev in mismatch_seq:
- # primer_seq = RP_primer
- else:
- return False,[]
- primer_ID = self.GetSinglePrimer(primer_seq,self.primer_seq_dict)
-
- if primer_ID == "":
- return False,[]
- primer_list = self.info_dict[primer_ID]
- results=self.SinglePoint(primer_list,mismatch_seq,match_seq)
- final_point_list=[]
- if results[0]:
- temp=list(results)[1:]
- final_point_list.append(temp)
- #final_point_list.append(primer_ID)
-
- if primer_ID in self.home_dict.keys():
- primer_homo_list = self.home_dict[primer_ID]
- for ID in primer_homo_list:
- ID_list = self.info_dict[ID]
- results=self.SinglePoint(ID_list,mismatch_seq,match_seq)
- if results[0]:
- temp=list(results)[1:]
- final_point_list.append(temp)
- #final_point_list.append(ID)
- return True,final_point_list
|