variants.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. """
  2. Methods for manipulating genetic variants.
  3. """
  4. from __future__ import absolute_import
  5. from __future__ import unicode_literals
  6. import sys
  7. sys.path.append(r"/cgdata/liuxiangqiong/work62pancancer/Client/v0/script/20220705/pyhgvs")
  8. from models import Position
  9. _COMP = dict(A='T', C='G', G='C', T='A', N='N',
  10. a='t', c='g', g='c', t='a', n='n')
  11. def revcomp(seq):
  12. """Reverse complement."""
  13. return ''.join(_COMP[base] for base in reversed(seq))
  14. def get_sequence(genome, chrom, start, end, is_forward_strand=True):
  15. """Return a sequence for the genomic region.
  16. Coordinates are 0-based, end-exclusive.
  17. """
  18. # Prevent fetching negative coordinates.
  19. start = max(start, 0)
  20. if start >= end:
  21. return ''
  22. else:
  23. seq = genome[str(chrom)][start:end]
  24. if not is_forward_strand:
  25. seq = -seq
  26. return str(seq).upper()
  27. def get_sequence_from_position(genome, position):
  28. """Return a sequence for the genomic region
  29. Position is 0-based, end-exclusive.
  30. """
  31. return get_sequence(genome, position.chrom,
  32. position.chrom_start, position.chrom_stop,
  33. position.is_forward_strand)
  34. def justify_indel(start, end, indel, seq, justify):
  35. """
  36. Justify an indel to the left or right along a sequence 'seq'.
  37. start, end: 0-based, end-exclusive coordinates of 'indel' within the
  38. sequence 'seq'. Inserts denote the insertion point using start=end
  39. and deletions indicate the deleted region with (start,end).
  40. indel: indel sequence, can be insertion or deletion.
  41. seq: a larger sequence containing the indel. Can be a fragment from the
  42. genome.
  43. justify: Which direction to justify the indel ('left', 'right').
  44. """
  45. # No justification needed for empty indel.
  46. if len(indel) == 0:
  47. return start, end, indel
  48. if justify == 'left':
  49. while start > 0 and seq[start - 1] == indel[-1]:
  50. seq_added = seq[start - 1]
  51. indel = seq_added + indel[:-1]
  52. start -= 1
  53. end -= 1
  54. elif justify == 'right':
  55. while end < len(seq) and seq[end] == indel[0]:
  56. seq_added = seq[end]
  57. indel = indel[1:] + seq_added
  58. start += 1
  59. end += 1
  60. else:
  61. raise ValueError('unknown justify "%s"' % justify)
  62. return start, end, indel
  63. def justify_genomic_indel(genome, chrom, start, end, indel, justify,
  64. flank_length=20):
  65. """
  66. start, end: 0-based, end-exclusive coordinates of 'indel'.
  67. """
  68. ref_len = end - start
  69. while True:
  70. seq_start = max(start - flank_length, 0)
  71. indel_len = len(indel)
  72. fetch_len = indel_len + 2 * flank_length
  73. seq = get_sequence(
  74. genome, chrom, seq_start, seq_start + fetch_len)
  75. seq_end = seq_start + len(seq)
  76. if seq_end <= end and justify == 'right':
  77. # Indel is at end of chromosome, cannot justify right any further.
  78. return start, end, indel
  79. chrom_end = seq_end if seq_end < seq_start + fetch_len else 1e100
  80. # Get coordinates of indel within seq.
  81. indel_start = flank_length
  82. indel_end = flank_length + indel_len
  83. indel_start, indel_end, indel = justify_indel(
  84. indel_start, indel_end, indel, seq, justify)
  85. # Get indel coordinates with chrom.
  86. start = seq_start + indel_start
  87. end = start + ref_len
  88. if ((indel_start > 0 or seq_start == 0) and
  89. (indel_end < len(seq) or seq_end == chrom_end)):
  90. return start, end, indel
  91. # Since indel was justified to edge of seq, see if more justification
  92. # can be done.
  93. def normalize_variant(chrom, offset, ref_sequence, alt_sequences, genome,
  94. justify='left', flank_length=30):
  95. """
  96. Normalize variant according to the GATK/VCF standard.
  97. chrom: chromsome containing variant.
  98. offset: 1-based coordinate of reference allele in the genome.
  99. ref_sequence: reference allele.
  100. alt_sequences: list of all alternate sequences.
  101. genome: pygr-compatiable genome object.
  102. """
  103. start = offset - 1
  104. end = start + len(ref_sequence)
  105. position = Position(
  106. chrom=chrom,
  107. chrom_start=start,
  108. chrom_stop=end,
  109. is_forward_strand=True)
  110. return NormalizedVariant(position, ref_sequence, alt_sequences,
  111. genome=genome, justify=justify)
  112. class NormalizedVariant(object):
  113. """
  114. Normalizes variant representation to match GATK/VCF.
  115. """
  116. def __init__(self, position, ref_allele, alt_alleles,
  117. seq_5p='', seq_3p='', genome=None, justify='left'):
  118. """
  119. position: a 0-index genomic Position.
  120. ref_allele: the reference allele sequence.
  121. alt_alleles: a list of alternate allele sequences.
  122. seq_5p: 5 prime flanking sequence of variant.
  123. seq_3p: 3 prime flanking sequence of variant.
  124. genome: a pygr compatible genome object (optional).
  125. """
  126. self.position = position
  127. self.alleles = [ref_allele] + list(alt_alleles)
  128. self.seq_5p = seq_5p
  129. self.seq_3p = seq_3p
  130. self.genome = genome
  131. self.log = []
  132. self._on_forward_strand()
  133. self._trim_common_prefix()
  134. self._trim_common_suffix()
  135. self._align(justify)
  136. self._1bp_pad()
  137. self._set_1based_position()
  138. def _on_forward_strand(self):
  139. """
  140. Ensure variant is on forward strand.
  141. """
  142. if not self.position.is_forward_strand:
  143. self.log.append('flip strand')
  144. seq_5p = self.seq_5p
  145. seq_3p = self.seq_3p
  146. self.seq_5p = revcomp(seq_3p)
  147. self.seq_3p = revcomp(seq_5p)
  148. self.alleles = [revcomp(allele) for allele in self.alleles]
  149. def _trim_common_prefix(self):
  150. """
  151. Trim the common prefix amongst all alleles.
  152. """
  153. minlength = min(map(len, self.alleles))
  154. common_prefix = 0
  155. for i in range(minlength):
  156. if len(set(allele[i] for allele in self.alleles)) > 1:
  157. # Not all alleles match at this site, so common prefix ends.
  158. break
  159. common_prefix = i + 1
  160. # Remove common prefix from all alleles.
  161. if common_prefix:
  162. self.log.append('trim common prefix')
  163. self.position.chrom_start += common_prefix
  164. self.seq_5p += self.alleles[0][:common_prefix]
  165. for i, allele in enumerate(self.alleles):
  166. self.alleles[i] = allele[common_prefix:]
  167. def _trim_common_suffix(self):
  168. """
  169. Trim the common suffix amongst all alleles.
  170. """
  171. minlength = min(map(len, self.alleles))
  172. common_suffix = 0
  173. for i in range(1, minlength+1):
  174. if len(set(allele[-i] for allele in self.alleles)) > 1:
  175. # Not all alleles match at this site, so common suffix ends.
  176. break
  177. common_suffix = i
  178. # Remove common prefix from all alleles.
  179. if common_suffix:
  180. self.log.append('trim common suffix')
  181. self.position.chrom_stop -= common_suffix
  182. self.seq_3p = self.alleles[0][-common_suffix:] + self.seq_3p
  183. for i, allele in enumerate(self.alleles):
  184. self.alleles[i] = allele[:-common_suffix]
  185. def _align(self, justify):
  186. """
  187. Align variant as far to the left or right as possible.
  188. """
  189. # Aligning only makes sense for INDELs.
  190. if self.molecular_class != "INDEL":
  191. return
  192. # Identify the inserted or deleted sequence.
  193. alleles_with_seq = [i for i, allele in enumerate(self.alleles)
  194. if allele]
  195. # Can only left-align biallelic, non ins-plus-del indels.
  196. if len(alleles_with_seq) == 1:
  197. i = alleles_with_seq[0]
  198. allele = self.alleles[i]
  199. if self.genome:
  200. start, end, allele = justify_genomic_indel(
  201. self.genome, self.position.chrom,
  202. self.position.chrom_start, self.position.chrom_stop,
  203. allele, justify)
  204. # if right-aligning an insertion, insert at the end
  205. if justify == 'right' and i != 0:
  206. start += len(allele)
  207. end += len(allele)
  208. self.position.chrom_start = start
  209. self.position.chrom_stop = end
  210. flank_length = 30
  211. self.seq_5p = get_sequence(self.genome, self.position.chrom,
  212. start - flank_length, start)
  213. self.seq_3p = get_sequence(self.genome, self.position.chrom,
  214. end, end + flank_length)
  215. self.alleles[i] = allele
  216. else:
  217. offset = len(self.seq_5p)
  218. offset2, _, allele = justify_indel(
  219. offset, offset, allele, self.seq_5p, justify)
  220. delta = offset - offset2
  221. if delta > 0:
  222. self.position.chrom_start -= delta
  223. self.position.chrom_stop -= delta
  224. self.seq_5p = self.seq_5p[:-delta]
  225. seq = self.ref_allele + self.seq_3p
  226. self.seq_3p = seq[:delta] + self.seq_3p
  227. self.alleles[i] = allele
  228. def _1bp_pad(self):
  229. """
  230. Ensure no alleles are the empty string by padding to the left 1bp.
  231. """
  232. # Padding is only required for INDELs.
  233. if self.molecular_class != "INDEL":
  234. return
  235. # Pad sequences with one 5-prime base before the mutation event.
  236. empty_seq = any(not allele for allele in self.alleles)
  237. uniq_starts = set(allele[0] for allele in self.alleles if allele)
  238. if empty_seq or len(uniq_starts) > 1:
  239. # Fetch more 5p flanking sequence if needed.
  240. if self.genome and self.seq_5p == '':
  241. start = self.position.chrom_start
  242. self.seq_5p = get_sequence(
  243. self.genome, self.position.chrom, start - 5, start)
  244. self.log.append('1bp pad')
  245. if self.seq_5p:
  246. for i, allele in enumerate(self.alleles):
  247. self.alleles[i] = self.seq_5p[-1] + self.alleles[i]
  248. self.seq_5p = self.seq_5p[:-1]
  249. self.position.chrom_start -= 1
  250. else:
  251. # According to VCF standard, if there is no 5prime sequence,
  252. # use 3prime sequence instead.
  253. assert self.seq_3p
  254. for i, allele in enumerate(self.alleles):
  255. self.alleles[i] = self.alleles[i] + self.seq_3p[0]
  256. self.seq_3p = self.seq_3p[1:]
  257. self.position.chrom_stop += 1
  258. if len(set(a[0] for a in self.alleles)) != 1:
  259. raise AssertionError(
  260. "All INDEL alleles should start with same base.")
  261. def _set_1based_position(self):
  262. """
  263. Convert to 1-based end-inclusive coordinates.
  264. """
  265. self.position.chrom_start += 1
  266. @property
  267. def molecular_class(self):
  268. for allele in self.alleles:
  269. if len(allele) != 1:
  270. return 'INDEL'
  271. return 'SNP'
  272. @property
  273. def ref_allele(self):
  274. return self.alleles[0]
  275. @property
  276. def alt_alleles(self):
  277. return sorted(self.alleles[1:])
  278. @property
  279. def variant(self):
  280. return (self.position.chrom, self.position.chrom_start,
  281. self.ref_allele, self.alt_alleles)