dptr_label_encode.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. import re
  2. from abc import ABC, abstractmethod
  3. from itertools import groupby
  4. from typing import List, Optional, Tuple
  5. import numpy as np
  6. import torch
  7. from torch import Tensor
  8. from torch.nn.utils.rnn import pad_sequence
  9. import unicodedata
  10. from ..modeling.decoders.dptr_parseq_clip_b_decoder import tokenize
  11. class CharsetAdapter:
  12. """Transforms labels according to the target charset."""
  13. def __init__(self, target_charset) -> None:
  14. super().__init__()
  15. self.lowercase_only = target_charset == target_charset.lower()
  16. self.uppercase_only = target_charset == target_charset.upper()
  17. self.unsupported = re.compile(f'[^{re.escape(target_charset)}]')
  18. def __call__(self, label):
  19. if self.lowercase_only:
  20. label = label.lower()
  21. elif self.uppercase_only:
  22. label = label.upper()
  23. # Remove unsupported characters
  24. label = self.unsupported.sub('', label)
  25. return label
  26. class BaseTokenizer(ABC):
  27. # eos=0, a=1, bos=37, pad=38
  28. def __init__(self, charset: str, specials_first: tuple = (), specials_last: tuple = ()) -> None:
  29. self._itos = specials_first + tuple(charset) + specials_last
  30. self._stoi = {s: i for i, s in enumerate(self._itos)}
  31. # print("stoi:", self._stoi)
  32. def __len__(self):
  33. return len(self._itos)
  34. def _tok2ids(self, tokens: str) -> List[int]:
  35. # print("tokens", tokens)
  36. return [self._stoi[s] for s in tokens]
  37. def _ids2tok(self, token_ids: List[int], join: bool = True) -> str:
  38. tokens = [self._itos[i] for i in token_ids]
  39. return ''.join(tokens) if join else tokens
  40. @abstractmethod
  41. def encode(self, labels: List[str], device: Optional[torch.device] = None) -> Tensor:
  42. """Encode a batch of labels to a representation suitable for the model.
  43. Args:
  44. labels: List of labels. Each can be of arbitrary length.
  45. device: Create tensor on this device.
  46. Returns:
  47. Batched tensor representation padded to the max label length. Shape: N, L
  48. """
  49. raise NotImplementedError
  50. @abstractmethod
  51. def _filter(self, probs: Tensor, ids: Tensor) -> Tuple[Tensor, List[int]]:
  52. """Internal method which performs the necessary filtering prior to decoding."""
  53. raise NotImplementedError
  54. def decode(self, token_dists: Tensor, raw: bool = False) -> Tuple[List[str], List[Tensor]]:
  55. """Decode a batch of token distributions.
  56. Args:
  57. token_dists: softmax probabilities over the token distribution. Shape: N, L, C
  58. raw: return unprocessed labels (will return list of list of strings)
  59. Returns:
  60. list of string labels (arbitrary length) and
  61. their corresponding sequence probabilities as a list of Tensors
  62. """
  63. batch_tokens = []
  64. batch_probs = []
  65. for dist in token_dists:
  66. probs, ids = dist.max(-1) # greedy selection
  67. if not raw:
  68. probs, ids = self._filter(probs, ids)
  69. tokens = self._ids2tok(ids, not raw)
  70. batch_tokens.append(tokens)
  71. batch_probs.append(probs)
  72. return batch_tokens, batch_probs
  73. class Tokenizer(BaseTokenizer):
  74. BOS = '[B]'
  75. EOS = '[E]'
  76. PAD = '[P]'
  77. def __init__(self, charset: str) -> None:
  78. specials_first = (self.EOS,)
  79. specials_last = (self.BOS, self.PAD)
  80. super().__init__(charset, specials_first, specials_last)
  81. self.eos_id, self.bos_id, self.pad_id = [self._stoi[s] for s in specials_first + specials_last]
  82. def encode(self, labels: List[str], device: Optional[torch.device] = None) -> Tensor:
  83. batch = [self.bos_id] + self._tok2ids(labels) + [self.eos_id]
  84. return batch
  85. # return pad_sequence(batch, batch_first=True, padding_value=self.pad_id)
  86. def _filter(self, probs: Tensor, ids: Tensor) -> Tuple[Tensor, List[int]]:
  87. ids = ids.tolist()
  88. try:
  89. eos_idx = ids.index(self.eos_id)
  90. except ValueError:
  91. eos_idx = len(ids) # Nothing to truncate.
  92. # Truncate after EOS
  93. ids = ids[:eos_idx]
  94. probs = probs[:eos_idx + 1] # but include prob. for EOS (if it exists)
  95. return probs, ids
  96. class DPTRLabelEncode(Tokenizer):
  97. """Convert between text-label and text-index."""
  98. def __init__(self, max_text_length=25, character_dict_path=None, **kwargs):
  99. self.max_length = max_text_length
  100. charset = get_alpha(character_dict_path)
  101. charset = ''.join(charset)
  102. # print(charset)
  103. super(DPTRLabelEncode, self).__init__(charset)
  104. def __call__(self, data, normalize_unicode=True):
  105. text = data['label']
  106. if normalize_unicode:
  107. text = unicodedata.normalize('NFKD', text).encode('ascii', 'ignore').decode()
  108. text = ''.join(text.split())
  109. if len(text) == 0 or len(text) > self.max_length:
  110. return None
  111. text_ids = self.encode(text)
  112. clip_ids = tokenize(f"a photo of a '{text}'")
  113. text_ids = text_ids + [self.pad_id] * (self.max_length + 2 - len(text_ids))
  114. # print(text, len(text_ids), len(clip_ids[0]))
  115. data['clip_label'] = np.array(clip_ids[0])
  116. data['label'] = np.array(text_ids)
  117. return data
  118. def add_special_char(self, dict_character):
  119. dict_character = [self.EOS] + dict_character + [self.BOS, self.PAD]
  120. return dict_character
  121. def get_alpha(alpha_path):
  122. character_str = []
  123. with open(alpha_path, 'rb') as fin:
  124. lines = fin.readlines()
  125. for line in lines:
  126. line = line.decode('utf-8').strip('\n').strip('\r\n')
  127. character_str.append(line)
  128. dict_character = list(character_str)
  129. if 'arabic' in alpha_path:
  130. reverse = True
  131. return dict_character