123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264 |
- #!/usr/bin/python
- # -*- coding:utf-8 -*-
- import argparse,os,re,gzip,time,itertools,sys
- primer_dict_seq={}
- primer_dict_seq_check={}
- primer_dict1={}
- primer_dict2={}
- primer_dict3={}
-
- def get_primer_dict(primerFile):
- global primer_dict_seq
- global primer_dict1
- global primer_dict2
- global primer_dict3
- with open(primerFile,'r') as pf:
- header= pf.readline()
- for line in pf:
- seq=line.strip("\n").split("\t")[2].upper()
- primer_name = line.strip("\n").split("\t")[0]
- seq_check=line.strip("\n").split("\t")[1]
- seq_len = len(seq)
- primer_dict_seq[primer_name] = seq
- primer_dict_seq_check[primer_name] = seq_check[-1*(seq_len+8):]
- for i in [0,4,8]:
- tmp_seq = seq[i:i+4]
- if i %4 == 0:
- if tmp_seq not in primer_dict1.keys():
- primer_dict1[tmp_seq]={}
- primer_dict1[tmp_seq][primer_name]=seq
- else:
- if primer_name not in primer_dict1[tmp_seq].keys():
- primer_dict1[tmp_seq][primer_name]=seq
- elif i % 4 == 1:
- if tmp_seq not in primer_dict2.keys():
- previous_seq=seq[i-3:i]
- primer_dict2[tmp_seq]={}
- primer_dict2[tmp_seq][primer_name]=seq
- else:
- if primer_name not in primer_dict2[tmp_seq].keys():
- primer_dict2[tmp_seq][primer_name]=seq
- elif i %4 ==2:
- if tmp_seq not in primer_dict3.keys():
- primer_dict3[tmp_seq]={}
- primer_dict3[tmp_seq][primer_name]=seq
- else:
- if primer_name not in primer_dict3[tmp_seq].keys():
- primer_dict3[tmp_seq][primer_name]=seq
- def mis_match(seq1,seq2,num=0):
- count = 0
- len1=len(seq1)
- len2=len(seq2)
- if len1 != len2:
- return num+1
- for i in range(len1):
- if seq1[i] != seq2[i] and seq1[i] != "H":
- count +=1
- if count > num:
- return count
- return count
- def get_location(sequence):
- #4-base seq
- seq1=sequence[:4]
- seq2=sequence[4:8]
- seq3=sequence[8:12]
- primer_dict1_tmp={}
- if seq1 in primer_dict1.keys():
- primer_dict1_tmp=primer_dict1[seq1]
- primer_dict2_tmp={}
- if seq2 in primer_dict2.keys():
- primer_dict2_tmp=primer_dict2[seq2]
- primer_dict3_tmp={}
- for seq3 in primer_dict3.keys():
- primer_dict3_tmp=primer_dict3[seq3]
- max_len=0
- final_primer=""
- mismatch=0
- #seq1 correct
- for primer in primer_dict1_tmp.keys():
- primer_seq=primer_dict1_tmp[primer]
- candidate_len=len(primer_seq)
- if max_len <= candidate_len:
- sequence_seq=sequence[:candidate_len]
- mis_num=mis_match(sequence_seq,primer_seq,2)
- if mis_num <=2:
- if max_len < candidate_len:
- final_primer=primer
- max_len = candidate_len
- mismatch = mis_num
- elif max_len == candidate_len:
- if mismatch>mis_num:
- final_primer=primer
- max_len = candidate_len
- mismatch = mis_num
- #seq1 incorrect,seq2 correct
- primer_dict2_key=[item for item in primer_dict2_tmp.keys() if item not in primer_dict2_tmp.keys()]
- for primer in primer_dict2_key:
- primer_seq=primer_dict2_tmp[primer]
- candidate_len=len(primer_seq)
- if max_len <= candidate_len:
- sequence_seq=sequence[:candidate_len]
- mis_num=mis_match(sequence_seq,primer_seq,2)
- if mis_num <=2:
- if max_len < candidate_len:
- final_primer=primer
- max_len = candidate_len
- mismatch = mis_num
- elif max_len == candidate_len:
- if mismatch>mis_num:
- final_primer=primer
- max_len = candidate_len
- mismatch = mis_num
-
- #seq1 incorrect,seq2 incorrect,seq3 correct
- primer_dict3_key=[item for item in primer_dict3_tmp.keys() if (item not in primer_dict2_tmp.keys() and item not in primer_dict1_tmp.keys())]
- for primer in primer_dict3_key:
- primer_seq=primer_dict3_tmp[primer]
- candidate_len=len(primer_seq)
- if max_len <= candidate_len:
- sequence_seq=sequence[:candidate_len]
- mis_num=mis_match(sequence_seq,primer_seq,2)
- if mis_num <=2:
- if max_len < candidate_len:
- final_primer=primer
- max_len = candidate_len
- mismatch = mis_num
- elif max_len == candidate_len:
- if mismatch>mis_num:
- final_primer=primer
- max_len = candidate_len
- mismatch = mis_num
- return max_len,final_primer,sequence[:max_len]
- def reverse(seq):
- base_dict={'A':'T','G':'C','T':'A','C':'G','N':'N'}
- new_seq=[base_dict[base] for base in seq][::-1]
- return "".join(new_seq)
- def get_Insert_Size(primer,sequence):
- primer_length = len(primer)
- sequence_length = len(sequence)
- if primer_length == 0:
- return 0
- for i in range(sequence_length-primer_length):
- tmp=sequence[i:i+primer_length]
- if mis_match(tmp,primer,1)<=1:
- return i
- for i in range(primer_length-3,primer_length):
- tmp=sequence[:i]
- primer_tmp=primer[-1*i:]
- if mis_match(tmp,primer,1)<=1:
- return 0
- return sequence_length
- def main(primerFile,FileR1,FileR2,outfile):
- get_primer_dict(primerFile)
- outf=open(outfile,'w')
- outf.write("ReadsID\tRP_Start\tRP_End\tRP_Name\tRP_Seq\tFP_Start\tFP_End\tFP_name\tFP_seq\tDimer\n")
-
- R1outf_insert=open(outfile.replace('.txt','_primer_check_')+"R1.fastq",'w')
- R2outf_insert=open(outfile.replace('.txt','_primer_check_')+"R2.fastq",'w')
- with gzip.open(FileR1,'r') as R1, gzip.open(FileR2,'r') as R2:
- while True:
- try:
- linesR1 = itertools.islice(R1,4)
- IDR1 = next(linesR1).decode('utf-8').strip("\n")
- IDR1tmp=IDR1.split(" ")
- ID=IDR1tmp[0][1:]
-
- sequenceR1 = next(linesR1).decode('utf-8').strip("\n")
- tmp1R1=next(linesR1).decode('utf-8').strip("\n")
- tmp2R1=next(linesR1).decode('utf-8').strip("\n")
- linesR2 = itertools.islice(R2,4)
- IDR2 = next(linesR2).decode('utf-8').strip("\n")
- IDR2tmp=IDR2.split(" ")
- sequenceR2 = next(linesR2).decode('utf-8').strip("\n")
- tmp1R2=next(linesR2).decode('utf-8').strip("\n")
- tmp2R2=next(linesR2).decode('utf-8').strip("\n")
- UMI = sequenceR2[4:19]
- RP_location=get_location(sequenceR1)
- FP_location=get_location(sequenceR2[19:])
- FP_start=20
- RP_start=1
-
- #check R2 primer
- if FP_location[1] != "":
- sequence_seq=sequenceR2[FP_start-9:FP_start-1+FP_location[0]]
- primer_seq=primer_dict_seq_check[FP_location[1]]
- if mis_match(primer_seq,sequence_seq,2)>2:
- FP_location=0,"",""
- out_str = ID+"\t"+str(RP_start)+"\t"+str(RP_start-1+int(RP_location[0]))+"\t"+RP_location[1]+"\t"+RP_location[2]+"\t"+str(FP_start)+"\t"+str(FP_start-1+int(FP_location[0]))+"\t"+FP_location[1]+"\t"+FP_location[2]
- if "G" not in UMI:
- trim_R1="@"+ID+"_UMI_"+UMI+" "+IDR1tmp[-1]+"\n"+sequenceR1[RP_start-1:]+"\n"+tmp1R1+"\n"+tmp2R1[RP_start-1:]
- trim_R2="@"+ID+"_UMI_"+UMI+" "+IDR2tmp[-1]+"\n"+sequenceR2[FP_start-1:]+"\n"+tmp1R2+"\n"+tmp2R2[FP_start-1:]
- else:
- trim_R1="@"+ID+" "+IDR1tmp[-1]+"\n"+sequenceR1[RP_start-1:]+"\n"+tmp1R1+"\n"+tmp2R1[RP_start-1:]
- trim_R2="@"+ID+" "+IDR2tmp[-1]+"\n"+sequenceR2[FP_start-1:]+"\n"+tmp1R2+"\n"+tmp2R2[FP_start-1:]
- #no primer
- if RP_location[1] == "" and FP_location[1] =="":
- R1outf_insert.write(trim_R1+"\n")
- R2outf_insert.write(trim_R2+"\n")
- outf.write(out_str+"\tno_primer"+"\n")
- else:
- #two primer
- if RP_location[1] !="" and FP_location[1] !="":
- rev_seq1=reverse(FP_location[2])
- Insert_Size = get_Insert_Size(rev_seq1,sequenceR1)
- if Insert_Size > int(RP_location[0]):
- RP_primer=RP_location[-1]
- FP_primer=FP_location[-1]
- trim_R1="@"+ID+"_UMI_"+UMI+"_"+RP_primer+"_"+FP_primer+" "+IDR1tmp[-1]+"\n"+sequenceR1[RP_start-1:]+"\n"+tmp1R1+"\n"+tmp2R1[RP_start-1:]
- trim_R2="@"+ID+"_UMI_"+UMI+"_"+RP_primer+"_"+FP_primer+" "+IDR2tmp[-1]+"\n"+sequenceR2[FP_start-1:]+"\n"+tmp1R2+"\n"+tmp2R2[FP_start-1:]
- R1outf_insert.write(trim_R1+"\n")
- R2outf_insert.write(trim_R2+"\n")
- outf.write(out_str+"\tothers"+"\n")
- else:
- outf.write(out_str+"\tdimer"+"\n")
- #only one primer
- else:
- R1outf_insert.write(trim_R1+"\n")
- R2outf_insert.write(trim_R2+"\n")
- outf.write(out_str+"\tone_primer"+"\n")
- except:
- err = sys.exc_info()
- for e in err:
- print(e)
- break
- outf.close()
- R1outf_insert.close()
- R2outf_insert.close()
- if __name__ == '__main__':
- parser = argparse.ArgumentParser(description='get primer location')
- parser.add_argument('-p', required=True, type=str, help="primer file")
- parser.add_argument('-R1', required=True, type=str, help="FileR1")
- parser.add_argument('-R2', required=True, type=str, help="FileR2")
- parser.add_argument('-o', required=True, type=str, help="outfile")
- args = parser.parse_args()
- main(args.p, args.R1, args.R2, args.o)
|