# Импорт библиотек 

In [None]:
from pathlib import Path

import torch
from torch import optim
from torchvision.io import read_image
from torchvision.ops import box_convert
from torchvision.utils import draw_keypoints, save_image
from torchvision.datasets import CocoDetection
from torchvision.models.detection import keypointrcnn_resnet50_fpn, KeypointRCNN_ResNet50_FPN_Weights

from tqdm.notebook import tqdm

# Конфигурация

In [None]:
# Устройство, на котором будут проводиться вычисления
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(device)

# Путь до обучающего датасета carfusion
train_carfusion_dataset_path = "D:/ProjectsData/Car Key Point/datasets/carfusion/train"
# Путь до обучающей аннотации carfusion
train_carfusion_annotation_path = "D:/ProjectsData/Car Key Point/datasets/carfusion/annotations/car_keypoints_train.json"

# Путь до тестового датасета carfusion
test_carfusion_dataset_path = "D:/ProjectsData/Car Key Point/datasets/carfusion/test"
# Путь до тестовой аннотации carfusion
test_carfusion_annotation_path = "D:/ProjectsData/Car Key Point/datasets/carfusion/annotations/car_keypoints_test.json"

# Предобработчик для KeypointRCNN ResNet50 FPN
transforms = KeypointRCNN_ResNet50_FPN_Weights.DEFAULT.transforms()
# Количество обучающих эпох
n_epoch = 1

# Путь до сохранения модели
save_path = "D:/ProjectsData/Car Key Point/models/keypointrcnn_resnet50_fpn.pt"
# 
load_checkpoint = False
# 
checkpoint_path = "D:/ProjectsData/Car Key Point/models/keypointrcnn_resnet50_fpn.pt"

# Путь до тестового изображения
test_image_path = "10_0361.jpg"
# Путь до сохранения различных уровней при тестировании (вместо {} будет указан уровень)
save_img_path = "out/out_{}.jpg"

# Объявление функции предобработки целевой переменной

In [None]:
def target_transform(target, device):
    """
    Функция предобработки целевой переменной

    :param target: dict целевой переменной
    :param device: устройство, на котором будут проводиться вычисления
    :return: предобработанная целевая переменная
    """
    labels = []
    if device == 'cuda':
        for el in target:
            labels.append(
                {
                    "boxes": box_convert(torch.as_tensor(el["bbox"]).float(), in_fmt="xywh",
                                         out_fmt="xyxy").cuda(),
                    "keypoints": torch.as_tensor(el["keypoints"]).float().cuda(),
                    "labels": torch.as_tensor(el["category_id"]).cuda()
                }
            )
    else:
        for el in target:
            labels.append(
                {
                    "boxes": box_convert(torch.as_tensor(el["bbox"]).float(), in_fmt="xywh", out_fmt="xyxy"),
                    "keypoints": torch.as_tensor(el["keypoints"]).float(),
                    "labels": torch.as_tensor(el["category_id"])
                }
            )

    return labels

# Подготовка датасета

In [None]:
# Чтение датасета для обучения
dataset_train = CocoDetection(root=train_carfusion_dataset_path,
                              annFile=train_carfusion_annotation_path,
                              transform=transforms,
                              target_transform=lambda x: target_transform(x, device))
# Чтение датасета для тестирования
dataset_test = CocoDetection(root=test_carfusion_dataset_path,
                             annFile=test_carfusion_annotation_path,
                             transform=transforms,
                             target_transform=lambda x: target_transform(x, device))
# Вывод рамеров обучающей и тестовой выборок
print(f"Размер обучающей выборки: {len(dataset_train)}\nРазмер тестовой выборки: {len(dataset_test)}")

# Инициализация DataLoader-ов
# dataloader_train = torch.utils.data.DataLoader(dataset_train, batch_size=batch_size, shuffle=True, generator=torch.Generator(device=device))
# dataloader_test = torch.utils.data.DataLoader(dataset_test, batch_size=1, shuffle=False, generator=torch.Generator(device=device))
dataloader_train = torch.utils.data.DataLoader(dataset_train, batch_size=1, shuffle=True)
dataloader_test = torch.utils.data.DataLoader(dataset_test, batch_size=1, shuffle=False)

# Инициализация модели, оптимизатора и контроллера скорости оубчения

In [None]:
# Инициализация модели
model = keypointrcnn_resnet50_fpn(weights=None, num_classes=2)
model.to(device)

# Инициализация оптимизатора
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9, weight_decay=0.5)
# Инициализация контроллера скорости оубчения
lr_scheduler = optim.lr_scheduler.MultiStepLR(optimizer, milestones=list(range(0, n_epoch, 3)), gamma=0.1)

# Обучение модели

In [None]:
# Переменная для хранения лучшего значения функции потерь (меньшще - лучше)
best_loss = float('inf')
for epoch in range(n_epoch):
    print(f"Эпоха: {epoch + 1}/{n_epoch}")
    # Перевод модели в режим обучения
    model.train()

    # Переменная для подсчета потерь эпохи при обучении
    loss_train = 0
    # Переменная для подсчета количества данных при обучении (может возникнуть ситуация, когда в батче будет хоть один "boxes" == [0, 0, 0, 0], что сломает процесс обучения)
    count_train = 0
    # Переменная для подсчета потерь эпохи при тестировании
    loss_val = 0
    # Переменная для подсчета количества данных при тестировании (может возникнуть ситуация, когда в батче будет хоть один "boxes" == [0, 0, 0, 0], что сломает процесс валидации)
    count_val = 0
    
    for images, labels in tqdm(dataloader_train):
        flag = False
        # Проверка на пригодность labels для обучения
        for el in labels:
            # Массив для проверки
            check_arr = [ten.item() for ten in el["boxes"][0]]
            # Условие для проверки
            if check_arr == [.0, .0, .0, .0]:
                flag = True
                break
        if flag:
            continue
                
        # Конвертация данных
        images = list(image.to(device) for image in images)
        labels = [{k: v.to(device) for k, v in t.items()} for t in labels]

        # Обнуление градиента
        optimizer.zero_grad()
        
        try:
            # Предсказание
            loss_dict = model(images, labels)
        except Exception as e:
            print(e)
            continue

        # Нужное значение из предсказания
        loss_keypoint = loss_dict['loss_keypoint']

        # Дифференцирование с учетом параметров
        loss_keypoint.backward()
        # Шаг оптимизации
        optimizer.step()

        # Суммирование потерь
        loss_train += loss_keypoint
        # Подсчет количества данных
        count_train += 1

    # Уменьшение скорости обучения
    lr_scheduler.step()

    # Режим валидации модели
    model.eval()
    with torch.no_grad():
        for images, labels in tqdm(dataloader_test):
            flag = False
            # Проверка на пригодность labels для обучения
            for el in labels:
                # Массив для проверки
                check_arr = [ten.item() for ten in el["boxes"][0]]
                # Условие для проверки
                if check_arr == [.0, .0, .0, .0]:
                    flag = True
                    break
            if flag:
                continue
                    
            # Конвертация данных
            images = list(image.to(device) for image in images)
            labels = [{k: v.to(device) for k, v in t.items()} for t in labels]

            try:
                # Предсказание
                loss_dict = model(images, labels)
            except Exception as e:
                print(e)
                continue
    
            # Нужное значение из предсказания
            loss_keypoint = loss_dict['loss_keypoint']
    
            # Дифференцирование с учетом параметров
            loss_keypoint.backward()
    
            # Суммирование потерь
            loss_val += loss_keypoint
            # Подсчет количества данных
            count_val += 1
    
    
    print(f"Потери при обучении: {loss_train / count_train:.6f}\nПотери при валидации: {loss_val / count_val:.6f}\nСкорость обучения: {optimizer.param_groups[0]['lr']}")
    # Сохранение модели
    if loss_val < best_loss:
        best_loss = loss_val
        torch.save({
            'model': model.state_dict(),
            'epoch': epoch,
            'loss': loss_val
        }, save_path)

# Тестирование модели

In [None]:
# Чтение изображения
image = read_image(str(Path(test_image_path)))
# Препроцессинг изображения
image_preproc = transforms(image)

# Загрузка лучших весов модели
model.load_state_dict(torch.load(save_path)["model"])
# Режим валидации модели
model.eval()

# Предсказание
outputs = model([image_preproc.to(device)])

# Выделение ключивых точек
kpts = outputs[0]['keypoints']
# Выделение степени уверенности
scores = outputs[0]['scores']
# Сохранение изображения с различными ключевыми точками
for i in range(10):
    detect_threshold = i / 10
    idx = torch.where(scores > detect_threshold)
    keypoints = kpts[idx]

    res = draw_keypoints(image, keypoints, colors="blue", radius=3)
    save_image(res / 255, save_img_path.format(detect_threshold))