赞
踩
- import cv2
- import numpy as np
- from PIL import Image
- from ultralytics import YOLO
- from torchvision import utils as vutils
-
- """训练"""
- # 加载模型
- model = YOLO("yolov8s-seg.pt") # 加载预训练模型(建议用于训练)
- # model = YOLO(r"D:\PC_DeepLearing\yolov8\runs\segment\train_resize2560\weights\best.pt") # 加载预训练模型(建议用于训练)
- # 使用模型
- model.train(data="turntable.yaml",cfg="default.yaml") # 训练模型
训练程序跳入
D:\PC_DeepLearing\yolov8\ultralytics\engine\model.py
然后在其中找到下面代码
self.trainer.train()
训练程序跳入
D:\PC_DeepLearing\yolov8\ultralytics\engine\trainer.py
然后在其中找到下面代码
self._do_train(world_size)
在其中我们可以看到
self._setup_train(world_size) #训练数据读取加载
- def _do_train(self, world_size=1):
- """Train completed, evaluate and plot if specified by arguments."""
- if world_size > 1:
- self._setup_ddp(world_size)
- self._setup_train(world_size) #训练数据读取加载
-
- self.epoch_time = None
- self.epoch_time_start = time.time()
- self.train_time_start = time.time()
接下来然后查看数据加载 setup_train(self, world_size):
- def _setup_train(self, world_size):
- """Builds dataloaders and optimizer on correct rank process."""
-
- # Model
- self.run_callbacks('on_pretrain_routine_start')
- ckpt = self.setup_model()
- self.model = self.model.to(self.device)
- self.set_model_attributes()
-
- # Freeze layers 冻结层 循环遍历查找
- freeze_list = self.args.freeze if isinstance(
- self.args.freeze, list) else range(self.args.freeze) if isinstance(self.args.freeze, int) else []
- always_freeze_names = ['.dfl'] # always freeze these layers
- freeze_layer_names = [f'model.{x}.' for x in freeze_list] + always_freeze_names
- for k, v in self.model.named_parameters():
- # v.register_hook(lambda x: torch.nan_to_num(x)) # NaN to 0 (commented for erratic training results)
- if any(x in k for x in freeze_layer_names):
- LOGGER.info(f"Freezing layer '{k}'")
- v.requires_grad = False
- elif not v.requires_grad:
- LOGGER.info(f"WARNING ⚠️ setting 'requires_grad=True' for frozen layer '{k}'. "
- 'See ultralytics.engine.trainer for customization of frozen layers.')
- v.requires_grad = True
-
- # Check AMP
- self.amp = torch.tensor(self.args.amp).to(self.device) # True or False
- if self.amp and RANK in (-1, 0): # Single-GPU and DDP 判断是单GPU还是DDP模式训练
- callbacks_backup = callbacks.default_callbacks.copy() # backup callbacks as check_amp() resets them
- self.amp = torch.tensor(check_amp(self.model), device=self.device)
- callbacks.default_callbacks = callbacks_backup # restore callbacks
- if RANK > -1 and world_size > 1: # DDP
- dist.broadcast(self.amp, src=0) # broadcast the tensor from rank 0 to all other ranks (returns None)
- self.amp = bool(self.amp) # as boolean
- self.scaler = torch.cuda.amp.GradScaler(enabled=self.amp)
- if world_size > 1:
- self.model = nn.parallel.DistributedDataParallel(self.model, device_ids=[RANK])
-
- # Check imgsz 检查图像尺寸
- gs = max(int(self.model.stride.max() if hasattr(self.model, 'stride') else 32), 32) # grid size (max stride)
- self.args.imgsz = check_imgsz(self.args.imgsz, stride=gs, floor=gs, max_dim=1)
-
- # Batch size batchsize
- if self.batch_size == -1 and RANK == -1: # single-GPU only, estimate best batch size
- self.args.batch = self.batch_size = check_train_batch_size(self.model, self.args.imgsz, self.amp)
-
- # Dataloaders Dataloaders
- batch_size = self.batch_size // max(world_size, 1)
- self.train_loader = self.get_dataloader(self.trainset, batch_size=batch_size, rank=RANK, mode='train')
- if RANK in (-1, 0):
- self.test_loader = self.get_dataloader(self.testset, batch_size=batch_size * 2, rank=-1, mode='val')
- self.validator = self.get_validator()
- metric_keys = self.validator.metrics.keys + self.label_loss_items(prefix='val')
- self.metrics = dict(zip(metric_keys, [0] * len(metric_keys)))
- self.ema = ModelEMA(self.model)
- if self.args.plots:
- self.plot_training_labels()
-
- # Optimizer 优化器 设置 如果是auto的话,会判断迭代次数 然后选择SGD或者ADMW
- self.accumulate = max(round(self.args.nbs / self.batch_size), 1) # accumulate loss before optimizing
- weight_decay = self.args.weight_decay * self.batch_size * self.accumulate / self.args.nbs # scale weight_decay
- iterations = math.ceil(len(self.train_loader.dataset) / max(self.batch_size, self.args.nbs)) * self.epochs
- self.optimizer = self.build_optimizer(model=self.model,
- name=self.args.optimizer,
- lr=self.args.lr0,
- momentum=self.args.momentum,
- decay=weight_decay,
- iterations=iterations)
- # Scheduler 调取器 应该是选择学习率的一个参数
- if self.args.cos_lr:
- self.lf = one_cycle(1, self.args.lrf, self.epochs) # cosine 1->hyp['lrf']
- else:
- self.lf = lambda x: (1 - x / self.epochs) * (1.0 - self.args.lrf) + self.args.lrf # linear
- self.scheduler = optim.lr_scheduler.LambdaLR(self.optimizer, lr_lambda=self.lf)
- self.stopper, self.stop = EarlyStopping(patience=self.args.patience), False
- self.resume_training(ckpt)
- self.scheduler.last_epoch = self.start_epoch - 1 # do not move
- self.run_callbacks('on_pretrain_routine_end')
我们可以从
self.get_dataloader(self.trainset, batch_size=batch_size, rank=RANK, mode='train') 中跳入到 D:\PC_DeepLearing\yolov8\ultralytics\models\yolo\detect\train.py
- self.train_loader = self.get_dataloader(self.trainset, batch_size=batch_size, rank=RANK, mode='train')
- if RANK in (-1, 0):
- self.test_loader = self.get_dataloader(self.testset, batch_size=batch_size * 2, rank=-1, mode='val')
然后get_dataloader方法
- def get_dataloader(self, dataset_path, batch_size=16, rank=0, mode='train'):
- """Construct and return dataloader."""
- assert mode in ['train', 'val']
- with torch_distributed_zero_first(rank): # init dataset *.cache only once if DDP 初始化dataset如果是cache只缓存一次
- dataset = self.build_dataset(dataset_path, mode, batch_size)
- shuffle = mode == 'train'
- if getattr(dataset, 'rect', False) and shuffle:
- LOGGER.warning("WARNING ⚠️ 'rect=True' is incompatible with DataLoader shuffle, setting shuffle=False")
- shuffle = False
- workers = self.args.workers if mode == 'train' else self.args.workers * 2
- return build_dataloader(dataset, batch_size, workers, shuffle, rank) # return dataloader
- def build_dataset(self, img_path, mode='train', batch=None):
- """
- Build YOLO Dataset.
- Args:
- img_path (str): Path to the folder containing images.
- mode (str): `train` mode or `val` mode, users are able to customize different augmentations for each mode.
- batch (int, optional): Size of batches, this is for `rect`. Defaults to None.
- """
- gs = max(int(de_parallel(self.model).stride.max() if self.model else 0), 32)
- return build_yolo_dataset(self.args, img_path, batch, self.data, mode=mode, rect=mode == 'val', stride=gs)
- def build_yolo_dataset(cfg, img_path, batch, data, mode='train', rect=False, stride=32):
- """Build YOLO Dataset."""
- return YOLODataset(
- img_path=img_path,
- imgsz=cfg.imgsz,
- batch_size=batch,
- augment=mode == 'train', # augmentation
- hyp=cfg, # TODO: probably add a get_hyps_from_cfg function
- rect=cfg.rect or rect, # rectangular batches
- cache=cfg.cache or None,
- single_cls=cfg.single_cls or False,
- stride=int(stride),
- pad=0.0 if mode == 'train' else 0.5,
- prefix=colorstr(f'{mode}: '),
- use_segments=cfg.task == 'segment',
- use_keypoints=cfg.task == 'pose',
- classes=cfg.classes,
- data=data,
- fraction=cfg.fraction if mode == 'train' else 1.0)
然后在
YOLODataset中继续跳入到 同时可以看到YOLODataset是继承BaseDataset的
D:\PC_DeepLearing\yolov8\ultralytics\data\base.py
- class YOLODataset(BaseDataset):
- """
- Dataset class for loading object detection and/or segmentation labels in YOLO format.
- Args:
- data (dict, optional): A dataset YAML dictionary. Defaults to None.
- use_segments (bool, optional): If True, segmentation masks are used as labels. Defaults to False.
- use_keypoints (bool, optional): If True, keypoints are used as labels. Defaults to False.
- Returns:
- (torch.utils.data.Dataset): A PyTorch dataset object that can be used for training an object detection model.
- """
-
- def __init__(self, *args, data=None, use_segments=False, use_keypoints=False, **kwargs):
- """Initializes the YOLODataset with optional configurations for segments and keypoints."""
- self.use_segments = use_segments
- self.use_keypoints = use_keypoints
- self.data = data
- assert not (self.use_segments and self.use_keypoints), 'Can not use both segments and keypoints.'
- super().__init__(*args, **kwargs)
然后在BaseDataset中主要看
- self.im_files = self.get_img_files(self.img_path)
- self.labels = self.get_labels()
-
- 以及
-
- self.transforms = self.build_transforms(hyp=hyp)
然后跳入
D:\PC_DeepLearing\yolov8\ultralytics\data\dataset.py
- def build_transforms(self, hyp=None):
- """Builds and appends transforms to the list."""
- if self.augment:
- hyp.mosaic = hyp.mosaic if self.augment and not self.rect else 0.0
- hyp.mixup = hyp.mixup if self.augment and not self.rect else 0.0
- transforms = v8_transforms(self, self.imgsz, hyp)
- else:
- transforms = Compose([LetterBox(new_shape=(self.imgsz, self.imgsz), scaleup=False)])
- transforms.append(
- Format(bbox_format='xywh',
- normalize=True,
- return_mask=self.use_segments,
- return_keypoint=self.use_keypoints,
- batch_idx=True,
- mask_ratio=hyp.mask_ratio,
- mask_overlap=hyp.overlap_mask))
- return transforms
然后一般跳入D:\PC_DeepLearing\yolov8\ultralytics\data\augment.py查看
v8_transforms
- def v8_transforms(dataset, imgsz, hyp, stretch=False):
- """Convert images to a size suitable for YOLOv8 training."""
- pre_transform = Compose([
- Mosaic(dataset, imgsz=imgsz, p=hyp.mosaic),
- CopyPaste(p=hyp.copy_paste),
- RandomPerspective(
- degrees=hyp.degrees,
- translate=hyp.translate,
- scale=hyp.scale,
- shear=hyp.shear,
- perspective=hyp.perspective,
- pre_transform=None if stretch else LetterBox(new_shape=(imgsz, imgsz)),
- )])
- flip_idx = dataset.data.get('flip_idx', []) # for keypoints augmentation
- if dataset.use_keypoints:
- kpt_shape = dataset.data.get('kpt_shape', None)
- if len(flip_idx) == 0 and hyp.fliplr > 0.0:
- hyp.fliplr = 0.0
- LOGGER.warning("WARNING ⚠️ No 'flip_idx' array defined in data.yaml, setting augmentation 'fliplr=0.0'")
- elif flip_idx and (len(flip_idx) != kpt_shape[0]):
- raise ValueError(f'data.yaml flip_idx={flip_idx} length must be equal to kpt_shape[0]={kpt_shape[0]}')
-
- return Compose([
- pre_transform,
- MixUp(dataset, pre_transform=pre_transform, p=hyp.mixup),
- Albumentations(p=1.0),
- RandomHSV(hgain=hyp.hsv_h, sgain=hyp.hsv_s, vgain=hyp.hsv_v),
- RandomFlip(direction='vertical', p=hyp.flipud),
- RandomFlip(direction='horizontal', p=hyp.fliplr, flip_idx=flip_idx)]) # transforms
-
D:\PC_DeepLearing\yolov8\ultralytics\data\augment.py
YOLO中会直接对图像进行等比例缩放,然后再进行padding填充
这个地方一定要注意,因为在部署的时候前处理和后处理都会使用到
- class LetterBox:
- """Resize image and padding for detection, instance segmentation, pose."""
-
- def __init__(self, new_shape=(640, 640), auto=False, scaleFill=False, scaleup=True, center=True, stride=32):
- """Initialize LetterBox object with specific parameters."""
- self.new_shape = new_shape
- self.auto = auto
- self.scaleFill = scaleFill
- self.scaleup = scaleup
- self.stride = stride
- self.center = center # Put the image in the middle or top-left
-
- def __call__(self, labels=None, image=None):
- """Return updated labels and image with added border."""
- if labels is None:
- labels = {}
- img = labels.get('img') if image is None else image
- shape = img.shape[:2] # current shape [height, width]
- new_shape = labels.pop('rect_shape', self.new_shape)
- if isinstance(new_shape, int):
- new_shape = (new_shape, new_shape)
-
- # Scale ratio (new / old)
- r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
- if not self.scaleup: # only scale down, do not scale up (for better val mAP)
- r = min(r, 1.0)
-
- # Compute padding
- ratio = r, r # width, height ratios
- new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r))
- dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1] # wh padding
- if self.auto: # minimum rectangle
- dw, dh = np.mod(dw, self.stride), np.mod(dh, self.stride) # wh padding
- elif self.scaleFill: # stretch
- dw, dh = 0.0, 0.0
- new_unpad = (new_shape[1], new_shape[0])
- ratio = new_shape[1] / shape[1], new_shape[0] / shape[0] # width, height ratios
-
- if self.center:
- dw /= 2 # divide padding into 2 sides
- dh /= 2
-
- if shape[::-1] != new_unpad: # resize
- img = cv2.resize(img, new_unpad, interpolation=cv2.INTER_LINEAR)
- top, bottom = int(round(dh - 0.1)) if self.center else 0, int(round(dh + 0.1))
- left, right = int(round(dw - 0.1)) if self.center else 0, int(round(dw + 0.1))
- img = cv2.copyMakeBorder(img, top, bottom, left, right, cv2.BORDER_CONSTANT,
- value=(114, 114, 114)) # add border
- if labels.get('ratio_pad'):
- labels['ratio_pad'] = (labels['ratio_pad'], (left, top)) # for evaluation
-
- if len(labels):
- labels = self._update_labels(labels, ratio, dw, dh)
- labels['img'] = img
- labels['resized_shape'] = new_shape
- return labels
- else:
- return img
-
- def _update_labels(self, labels, ratio, padw, padh):
- """Update labels."""
- labels['instances'].convert_bbox(format='xyxy')
- labels['instances'].denormalize(*labels['img'].shape[:2][::-1])
- labels['instances'].scale(*ratio)
- labels['instances'].add_padding(padw, padh)
- return labels
- """预测推理"""
- model = YOLO(r"D:\PC_DeepLearing\yolov8\runs\segment\train_resize1280_1201\weights\best.onnx") # 加载预训练模型(建议用于训练)
- results = model.predict(data="turntable.yaml",
- cfg="default.yaml",
- source=r"C:\Users\1\Desktop\231130\C1\料盘定位\OK",
- save=True,
- save_conf=True,
- batch=1,
- imgsz=[1280,1280],
- iou=0.35,
- conf=0.1,
- rect=False
- ) # 在验证集上评估模型性能
先进入D:\PC_DeepLearing\yolov8\ultralytics\engine\model.py
- def predict(self, source=None, stream=False, predictor=None, **kwargs):
- """
- Perform prediction using the YOLO model.
- Args:
- source (str | int | PIL | np.ndarray): The source of the image to make predictions on.
- Accepts all source types accepted by the YOLO model.
- stream (bool): Whether to stream the predictions or not. Defaults to False.
- predictor (BasePredictor): Customized predictor.
- **kwargs : Additional keyword arguments passed to the predictor.
- Check the 'configuration' section in the documentation for all available options.
- Returns:
- (List[ultralytics.engine.results.Results]): The prediction results.
- """
- if source is None:
- source = ASSETS
- LOGGER.warning(f"WARNING ⚠️ 'source' is missing. Using 'source={source}'.")
-
- is_cli = (sys.argv[0].endswith('yolo') or sys.argv[0].endswith('ultralytics')) and any(
- x in sys.argv for x in ('predict', 'track', 'mode=predict', 'mode=track'))
-
- custom = {'conf': 0.25, 'save': is_cli} # method defaults
- args = {**self.overrides, **custom, **kwargs, 'mode': 'predict'} # highest priority args on the right
- prompts = args.pop('prompts', None) # for SAM-type models
-
- if not self.predictor:
- self.predictor = (predictor or self._smart_load('predictor'))(overrides=args, _callbacks=self.callbacks)
- self.predictor.setup_model(model=self.model, verbose=is_cli)
- else: # only update args if predictor is already setup
- self.predictor.args = get_cfg(self.predictor.args, args)
- if 'project' in args or 'name' in args:
- self.predictor.save_dir = get_save_dir(self.predictor.args)
- if prompts and hasattr(self.predictor, 'set_prompts'): # for SAM-type models
- self.predictor.set_prompts(prompts)
- return self.predictor.predict_cli(source=source) if is_cli else self.predictor(source=source, stream=stream)
在predictor中进行推理
D:\PC_DeepLearing\yolov8\ultralytics\engine\predictor.py
查看推理的流程步骤
- @smart_inference_mode()
- def stream_inference(self, source=None, model=None, *args, **kwargs):
- """Streams real-time inference on camera feed and saves results to file."""
- if self.args.verbose:
- LOGGER.info('')
-
- # Setup model
- if not self.model:
- self.setup_model(model)
-
- with self._lock: # for thread-safe inference
- # Setup source every time predict is called
- self.setup_source(source if source is not None else self.args.source)
-
- # Check if save_dir/ label file exists
- if self.args.save or self.args.save_txt:
- (self.save_dir / 'labels' if self.args.save_txt else self.save_dir).mkdir(parents=True, exist_ok=True)
-
- # Warmup model
- if not self.done_warmup:
- self.model.warmup(imgsz=(1 if self.model.pt or self.model.triton else self.dataset.bs, 3, *self.imgsz))
- self.done_warmup = True
-
- self.seen, self.windows, self.batch, profilers = 0, [], None, (ops.Profile(), ops.Profile(), ops.Profile())
- self.run_callbacks('on_predict_start')
-
- for batch in self.dataset:
- self.run_callbacks('on_predict_batch_start')
- self.batch = batch
- path, im0s, vid_cap, s = batch
-
- # Preprocess
- with profilers[0]:
- im = self.preprocess(im0s)
-
- # Inference
- with profilers[1]:
- preds = self.inference(im, *args, **kwargs)
-
- # Postprocess
- with profilers[2]:
- self.results = self.postprocess(preds, im, im0s)
-
- self.run_callbacks('on_predict_postprocess_end')
- # Visualize, save, write results
- n = len(im0s)
- for i in range(n):
- self.seen += 1
- self.results[i].speed = {
- 'preprocess': profilers[0].dt * 1E3 / n,
- 'inference': profilers[1].dt * 1E3 / n,
- 'postprocess': profilers[2].dt * 1E3 / n}
- p, im0 = path[i], None if self.source_type.tensor else im0s[i].copy()
- p = Path(p)
-
- if self.args.verbose or self.args.save or self.args.save_txt or self.args.show:
- s += self.write_results(i, self.results, (p, im, im0))
- if self.args.save or self.args.save_txt:
- self.results[i].save_dir = self.save_dir.__str__()
- if self.args.show and self.plotted_img is not None:
- self.show(p)
- if self.args.save and self.plotted_img is not None:
- self.save_preds(vid_cap, i, str(self.save_dir / p.name))
-
- self.run_callbacks('on_predict_batch_end')
- yield from self.results
-
- # Print time (inference-only)
- if self.args.verbose:
- LOGGER.info(f'{s}{profilers[1].dt * 1E3:.1f}ms')
-
- # Release assets
- if isinstance(self.vid_writer[-1], cv2.VideoWriter):
- self.vid_writer[-1].release() # release final video writer
-
- # Print results
- if self.args.verbose and self.seen:
- t = tuple(x.t / self.seen * 1E3 for x in profilers) # speeds per image
- LOGGER.info(f'Speed: %.1fms preprocess, %.1fms inference, %.1fms postprocess per image at shape '
- f'{(1, 3, *im.shape[2:])}' % t)
- if self.args.save or self.args.save_txt or self.args.save_crop:
- nl = len(list(self.save_dir.glob('labels/*.txt'))) # number of labels
- s = f"\n{nl} label{'s' * (nl > 1)} saved to {self.save_dir / 'labels'}" if self.args.save_txt else ''
- LOGGER.info(f"Results saved to {colorstr('bold', self.save_dir)}{s}")
-
- self.run_callbacks('on_predict_end')
- 图像会进行LetterBox
- 然后在进行归一化预处理
-
- D:\PC_DeepLearing\yolov8\ultralytics\engine\predictor.py
-
- def preprocess(self, im):
- """
- Prepares input image before inference.
- Args:
- im (torch.Tensor | List(np.ndarray)): BCHW for tensor, [(HWC) x B] for list.
- """
- not_tensor = not isinstance(im, torch.Tensor)
- if not_tensor:
- im = np.stack(self.pre_transform(im))
- im = im[..., ::-1].transpose((0, 3, 1, 2)) # BGR to RGB, BHWC to BCHW, (n, 3, h, w)
- im = np.ascontiguousarray(im) # contiguous
- im = torch.from_numpy(im)
-
- im = im.to(self.device)
- im = im.half() if self.model.fp16 else im.float() # uint8 to fp16/32
- if not_tensor:
- im /= 255 # 0 - 255 to 0.0 - 1.0
- return im
处理完成后后段处理
然后跳入D:\PC_DeepLearing\yolov8\ultralytics\models\yolo\segment\predict.py
- def postprocess(self, preds, img, orig_imgs):
- """Applies non-max suppression and processes detections for each image in an input batch."""
- p = ops.non_max_suppression(preds[0],
- self.args.conf,
- self.args.iou,
- agnostic=self.args.agnostic_nms,
- max_det=self.args.max_det,
- nc=len(self.model.names),
- classes=self.args.classes)
-
- if not isinstance(orig_imgs, list): # input images are a torch.Tensor, not a list
- orig_imgs = ops.convert_torch2numpy_batch(orig_imgs)
-
- results = []
- proto = preds[1][-1] if len(preds[1]) == 3 else preds[1] # second output is len 3 if pt, but only 1 if exported
- for i, pred in enumerate(p):
- orig_img = orig_imgs[i]
- img_path = self.batch[0][i]
- if not len(pred): # save empty boxes
- masks = None
- elif self.args.retina_masks:
- pred[:, :4] = ops.scale_boxes(img.shape[2:], pred[:, :4], orig_img.shape)
- masks = ops.process_mask_native(proto[i], pred[:, 6:], pred[:, :4], orig_img.shape[:2]) # HWC
- else:
- masks = ops.process_mask(proto[i], pred[:, 6:], pred[:, :4], img.shape[2:], upsample=True) # HWC
- pred[:, :4] = ops.scale_boxes(img.shape[2:], pred[:, :4], orig_img.shape)
- results.append(Results(orig_img, path=img_path, names=self.model.names, boxes=pred[:, :6], masks=masks))
- return results
其中
- def process_mask(protos, masks_in, bboxes, shape, upsample=False):
- """
- Apply masks to bounding boxes using the output of the mask head.
- Args:
- protos (torch.Tensor): A tensor of shape [mask_dim, mask_h, mask_w].
- masks_in (torch.Tensor): A tensor of shape [n, mask_dim], where n is the number of masks after NMS.
- bboxes (torch.Tensor): A tensor of shape [n, 4], where n is the number of masks after NMS.
- shape (tuple): A tuple of integers representing the size of the input image in the format (h, w).
- upsample (bool): A flag to indicate whether to upsample the mask to the original image size. Default is False.
- Returns:
- (torch.Tensor): A binary mask tensor of shape [n, h, w], where n is the number of masks after NMS, and h and w
- are the height and width of the input image. The mask is applied to the bounding boxes.
- """
-
- c, mh, mw = protos.shape # CHW
- ih, iw = shape
- masks = (masks_in @ protos.float().view(c, -1)).sigmoid().view(-1, mh, mw) # CHW
-
- downsampled_bboxes = bboxes.clone()
- downsampled_bboxes[:, 0] *= mw / iw
- downsampled_bboxes[:, 2] *= mw / iw
- downsampled_bboxes[:, 3] *= mh / ih
- downsampled_bboxes[:, 1] *= mh / ih
-
- masks = crop_mask(masks, downsampled_bboxes) # CHW
- if upsample:
- masks = F.interpolate(masks[None], shape, mode='bilinear', align_corners=False)[0] # CHW
- return masks.gt_(0.5)
masks = ops.process_mask(proto[i], pred[:, 6:], pred[:, :4], img.shape[2:], upsample=True) # HWC
pred[:, :4] = ops.scale_boxes(img.shape[2:], pred[:, :4], orig_img.shape)
先看box的操作
- def scale_boxes(img1_shape, boxes, img0_shape, ratio_pad=None, padding=True):
- """
- Rescales bounding boxes (in the format of xyxy) from the shape of the image they were originally specified in
- (img1_shape) to the shape of a different image (img0_shape).
- Args:
- img1_shape (tuple): The shape of the image that the bounding boxes are for, in the format of (height, width).
- boxes (torch.Tensor): the bounding boxes of the objects in the image, in the format of (x1, y1, x2, y2)
- img0_shape (tuple): the shape of the target image, in the format of (height, width).
- ratio_pad (tuple): a tuple of (ratio, pad) for scaling the boxes. If not provided, the ratio and pad will be
- calculated based on the size difference between the two images.
- padding (bool): If True, assuming the boxes is based on image augmented by yolo style. If False then do regular
- rescaling.
- Returns:
- boxes (torch.Tensor): The scaled bounding boxes, in the format of (x1, y1, x2, y2)
- """
- if ratio_pad is None: # calculate from img0_shape
- gain = min(img1_shape[0] / img0_shape[0], img1_shape[1] / img0_shape[1]) # gain = old / new
- pad = round((img1_shape[1] - img0_shape[1] * gain) / 2 - 0.1), round(
- (img1_shape[0] - img0_shape[0] * gain) / 2 - 0.1) # wh padding 计算WH的padding填充值
- else:
- gain = ratio_pad[0][0]
- pad = ratio_pad[1]
-
- if padding:
- boxes[..., [0, 2]] -= pad[0] # x padding 获取的box坐标-pad填充值
- boxes[..., [1, 3]] -= pad[1] # y padding 获取的box坐标-pad填充值
- boxes[..., :4] /= gain #坐标/缩放系数 返回在原图上原始坐标
- clip_boxes(boxes, img0_shape)
- return boxes
再看mask的操作
- def process_mask(protos, masks_in, bboxes, shape, upsample=False):
- """
- Apply masks to bounding boxes using the output of the mask head.
- Args:
- protos (torch.Tensor): A tensor of shape [mask_dim, mask_h, mask_w].
- masks_in (torch.Tensor): A tensor of shape [n, mask_dim], where n is the number of masks after NMS.
- bboxes (torch.Tensor): A tensor of shape [n, 4], where n is the number of masks after NMS.
- shape (tuple): A tuple of integers representing the size of the input image in the format (h, w).
- upsample (bool): A flag to indicate whether to upsample the mask to the original image size. Default is False.
- Returns:
- (torch.Tensor): A binary mask tensor of shape [n, h, w], where n is the number of masks after NMS, and h and w
- are the height and width of the input image. The mask is applied to the bounding boxes.
- """
-
- c, mh, mw = protos.shape # CHW
- ih, iw = shape
- masks = (masks_in @ protos.float().view(c, -1)).sigmoid().view(-1, mh, mw) # CHW
-
- downsampled_bboxes = bboxes.clone()
- downsampled_bboxes[:, 0] *= mw / iw
- downsampled_bboxes[:, 2] *= mw / iw
- downsampled_bboxes[:, 3] *= mh / ih
- downsampled_bboxes[:, 1] *= mh / ih
-
- masks = crop_mask(masks, downsampled_bboxes) # CHW
- if upsample:
- masks = F.interpolate(masks[None], shape, mode='bilinear', align_corners=False)[0] # CHW
- return masks.gt_(0.5)
此时如果输入图像的尺寸和模型的尺寸不一致的情况,我们还需要看保存的步骤
如果图像时Rect的 即长宽不一致的情况一定要注意
可以再D:\PC_DeepLearing\yolov8\ultralytics\engine\predictor.py
- def write_results(self, idx, results, batch):
- """Write inference results to a file or directory."""
- p, im, _ = batch
- log_string = ''
- if len(im.shape) == 3:
- im = im[None] # expand for batch dim
- if self.source_type.webcam or self.source_type.from_img or self.source_type.tensor: # batch_size >= 1
- log_string += f'{idx}: '
- frame = self.dataset.count
- else:
- frame = getattr(self.dataset, 'frame', 0)
- self.data_path = p
- self.txt_path = str(self.save_dir / 'labels' / p.stem) + ('' if self.dataset.mode == 'image' else f'_{frame}')
- log_string += '%gx%g ' % im.shape[2:] # print string
- result = results[idx]
- log_string += result.verbose()
-
- if self.args.save or self.args.show: # Add bbox to image
- plot_args = {
- 'line_width': self.args.line_width,
- 'boxes': self.args.boxes,
- 'conf': self.args.show_conf,
- 'labels': self.args.show_labels}
- if not self.args.retina_masks:
- plot_args['im_gpu'] = im[idx]
- self.plotted_img = result.plot(**plot_args)
- # Write
- if self.args.save_txt:
- result.save_txt(f'{self.txt_path}.txt', save_conf=self.args.save_conf)
- if self.args.save_crop:
- result.save_crop(save_dir=self.save_dir / 'crops',
- file_name=self.data_path.stem + ('' if self.dataset.mode == 'image' else f'_{frame}'))
-
- return log_string
在
self.plotted_img = result.plot(**plot_args)中可以看到保存图像的步骤
D:\PC_DeepLearing\yolov8\ultralytics\engine\results.py
在result.plot(**plot_args中查看)
然后跳入到
- # Plot Segment results
- if pred_masks and show_masks:
- if im_gpu is None:
- img = LetterBox(pred_masks.shape[1:])(image=annotator.result())
- im_gpu = torch.as_tensor(img, dtype=torch.float16, device=pred_masks.data.device).permute(
- 2, 0, 1).flip(0).contiguous() / 255
- idx = pred_boxes.cls if pred_boxes else range(len(pred_masks))
- annotator.masks(pred_masks.data, colors=[colors(x, True) for x in idx], im_gpu=im_gpu)
-
- # Plot Detect results
- if pred_boxes and show_boxes:
- for d in reversed(pred_boxes):
- c, conf, id = int(d.cls), float(d.conf) if conf else None, None if d.id is None else int(d.id.item())
- name = ('' if id is None else f'id:{id} ') + names[c]
- label = (f'{name} {conf:.2f}' if conf else name) if labels else None
- annotator.box_label(d.xyxy.squeeze(), label, color=colors(c, True))
-
- # Plot Classify results
- if pred_probs is not None and show_probs:
- text = ',\n'.join(f'{names[j] if names else j} {pred_probs.data[j]:.2f}' for j in pred_probs.top5)
- x = round(self.orig_shape[0] * 0.03)
- annotator.text([x, x], text, txt_color=(255, 255, 255)) # TODO: allow setting colors
在D:\PC_DeepLearing\yolov8\ultralytics\utils\plotting.py中查看
- def masks(self, masks, colors, im_gpu, alpha=0.5, retina_masks=False):
- """
- Plot masks on image.
- Args:
- masks (tensor): Predicted masks on cuda, shape: [n, h, w]
- colors (List[List[Int]]): Colors for predicted masks, [[r, g, b] * n]
- im_gpu (tensor): Image is in cuda, shape: [3, h, w], range: [0, 1]
- alpha (float): Mask transparency: 0.0 fully transparent, 1.0 opaque
- retina_masks (bool): Whether to use high resolution masks or not. Defaults to False.
- """
- if self.pil:
- # Convert to numpy first
- self.im = np.asarray(self.im).copy()
- if len(masks) == 0:
- self.im[:] = im_gpu.permute(1, 2, 0).contiguous().cpu().numpy() * 255
- if im_gpu.device != masks.device:
- im_gpu = im_gpu.to(masks.device)
- colors = torch.tensor(colors, device=masks.device, dtype=torch.float32) / 255.0 # shape(n,3)
- colors = colors[:, None, None] # shape(n,1,1,3)
- masks = masks.unsqueeze(3) # shape(n,h,w,1)
- masks_color = masks * (colors * alpha) # shape(n,h,w,3)
-
- inv_alpha_masks = (1 - masks * alpha).cumprod(0) # shape(n,h,w,1)
- mcs = masks_color.max(dim=0).values # shape(n,h,w,3)
-
- im_gpu = im_gpu.flip(dims=[0]) # flip channel
- im_gpu = im_gpu.permute(1, 2, 0).contiguous() # shape(h,w,3)
- im_gpu = im_gpu * inv_alpha_masks[-1] + mcs
- im_mask = (im_gpu * 255)
- im_mask_np = im_mask.byte().cpu().numpy()
- self.im[:] = im_mask_np if retina_masks else ops.scale_image(im_mask_np, self.im.shape)
- if self.pil:
- # Convert im back to PIL and update draw
- self.fromarray(self.im)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。