# copyright (c) 2020 PaddlePaddle Authors. All Rights Reserve. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ This code is refer from: https://github.com/WenmuZhou/DBNet.pytorch/blob/master/data_loader/modules/iaa_augment.py """ import os # Prevent automatic updates in Albumentations for stability in augmentation behavior os.environ['NO_ALBUMENTATIONS_UPDATE'] = '1' import numpy as np import albumentations as A from albumentations.core.transforms_interface import DualTransform from albumentations.augmentations.geometric import functional as fgeometric from packaging import version ALBU_VERSION = version.parse(A.__version__) IS_ALBU_NEW_VERSION = ALBU_VERSION >= version.parse('1.4.15') # Custom resize transformation mimicking Imgaug's behavior with scaling class ImgaugLikeResize(DualTransform): def __init__(self, scale_range=(0.5, 3.0), interpolation=1, p=1.0): super(ImgaugLikeResize, self).__init__(p) self.scale_range = scale_range self.interpolation = interpolation # Resize the image based on a randomly chosen scale within the scale range def apply(self, img, scale=1.0, **params): height, width = img.shape[:2] new_height = int(height * scale) new_width = int(width * scale) if IS_ALBU_NEW_VERSION: return fgeometric.resize(img, (new_height, new_width), interpolation=self.interpolation) return fgeometric.resize(img, new_height, new_width, interpolation=self.interpolation) # Apply the same scaling transformation to keypoints (e.g., polygon points) def apply_to_keypoints(self, keypoints, scale=1.0, **params): return np.array([(x * scale, y * scale) + tuple(rest) for x, y, *rest in keypoints]) # Get random scale parameter within the specified range def get_params(self): scale = np.random.uniform(self.scale_range[0], self.scale_range[1]) return {'scale': scale} # Builder class to translate custom augmenter arguments into Albumentations-compatible format class AugmenterBuilder(object): def __init__(self): # Map common Imgaug transformations to equivalent Albumentations transforms self.imgaug_to_albu = { 'Fliplr': 'HorizontalFlip', 'Flipud': 'VerticalFlip', 'Affine': 'Affine', # Additional mappings can be added here if needed } # Recursive method to construct augmentation pipeline based on provided arguments def build(self, args, root=True): if args is None or len(args) == 0: return None elif isinstance(args, list): # Build the full augmentation sequence if it's a root-level call if root: sequence = [self.build(value, root=False) for value in args] return A.Compose( sequence, keypoint_params=A.KeypointParams(format='xy', remove_invisible=False), ) else: # Build individual augmenters for nested arguments augmenter_type = args[0] augmenter_args = args[1] if len(args) > 1 else {} augmenter_args_mapped = self.map_arguments( augmenter_type, augmenter_args) augmenter_type_mapped = self.imgaug_to_albu.get( augmenter_type, augmenter_type) if augmenter_type_mapped == 'Resize': return ImgaugLikeResize(**augmenter_args_mapped) else: cls = getattr(A, augmenter_type_mapped) return cls( **{ k: self.to_tuple_if_list(v) for k, v in augmenter_args_mapped.items() }) elif isinstance(args, dict): # Process individual transformation specified as dictionary augmenter_type = args['type'] augmenter_args = args.get('args', {}) augmenter_args_mapped = self.map_arguments(augmenter_type, augmenter_args) augmenter_type_mapped = self.imgaug_to_albu.get( augmenter_type, augmenter_type) if augmenter_type_mapped == 'Resize': return ImgaugLikeResize(**augmenter_args_mapped) else: cls = getattr(A, augmenter_type_mapped) return cls( **{ k: self.to_tuple_if_list(v) for k, v in augmenter_args_mapped.items() }) else: raise RuntimeError('Unknown augmenter arg: ' + str(args)) # Map arguments to expected format for each augmenter type def map_arguments(self, augmenter_type, augmenter_args): augmenter_args = augmenter_args.copy( ) # Avoid modifying the original arguments if augmenter_type == 'Resize': # Ensure size is a valid 2-element list or tuple size = augmenter_args.get('size') if size: if not isinstance(size, (list, tuple)) or len(size) != 2: raise ValueError( f"'size' must be a list or tuple of two numbers, but got {size}" ) min_scale, max_scale = size return { 'scale_range': (min_scale, max_scale), 'interpolation': 1, # Linear interpolation 'p': 1.0, } else: return { 'scale_range': (1.0, 1.0), 'interpolation': 1, 'p': 1.0 } elif augmenter_type == 'Affine': # Map rotation to a tuple and ensure p=1.0 to apply transformation rotate = augmenter_args.get('rotate', 0) if isinstance(rotate, list): rotate = tuple(rotate) elif isinstance(rotate, (int, float)): rotate = (float(rotate), float(rotate)) augmenter_args['rotate'] = rotate augmenter_args['p'] = 1.0 return augmenter_args else: # For other augmenters, ensure 'p' probability is specified p = augmenter_args.get('p', 1.0) augmenter_args['p'] = p return augmenter_args # Convert lists to tuples for Albumentations compatibility def to_tuple_if_list(self, obj): if isinstance(obj, list): return tuple(obj) return obj # Wrapper class for image and polygon transformations using Imgaug-style augmentation class IaaAugment: def __init__(self, augmenter_args=None, **kwargs): if augmenter_args is None: # Default augmenters if none are specified augmenter_args = [ { 'type': 'Fliplr', 'args': { 'p': 0.5 } }, { 'type': 'Affine', 'args': { 'rotate': [-10, 10] } }, { 'type': 'Resize', 'args': { 'size': [0.5, 3] } }, ] self.augmenter = AugmenterBuilder().build(augmenter_args) # Apply the augmentations to image and polygon data def __call__(self, data): image = data['image'] if self.augmenter: # Flatten polygons to individual keypoints for transformation keypoints = [] keypoints_lengths = [] for poly in data['polys']: keypoints.extend([tuple(point) for point in poly]) keypoints_lengths.append(len(poly)) # Apply the augmentation pipeline to image and keypoints transformed = self.augmenter(image=image, keypoints=keypoints) data['image'] = transformed['image'] # Extract transformed keypoints and reconstruct polygon structures transformed_keypoints = transformed['keypoints'] # Reassemble polygons from transformed keypoints new_polys = [] idx = 0 for length in keypoints_lengths: new_poly = transformed_keypoints[idx:idx + length] new_polys.append(np.array([kp[:2] for kp in new_poly])) idx += length data['polys'] = np.array(new_polys) return data