Building image classifier with Differential Privacy Using Opacus

Open In Colab

task:
The task is to train a LeNet or AlexNet with Mnist.

Model: LeNet or AlexNet
Dataset: Mnist
Requirements: test accuracy > 90% and epsilon < 2

Reference:
A Library for Training PyTorch models with Differential Privacy: https://opacus.ai/
Paper of DP-SGD: https://arxiv.org/abs/1607.00133

Opacus介绍

Opacus是一个可以让pytorch实现差分隐私的库,可以仅用非常简介的代码训练一个满足差分隐私的模型。opacus

Opacus主要原理

Opacus会修改model,optimizer和dataloader,在反向传播时对于返回的梯度加入Gaussian噪声来实现差分隐私。

使用Opacus

引入基础的库

1
2
3
4
5
6
7
8
9
10
%pip install opacus
import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import numpy as np

from opacus import PrivacyEngine
from opacus.utils.batch_memory_manager import BatchMemoryManager
1
2
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

设置超参数

EPSILON:$\varepsilon$
DELTA:$\delta$

1
2
3
4
5
6
7
EPOCH = 30 # 遍历数据集次数
BATCH_SIZE = 512 # 批处理尺寸(batch_size)
LR = 0.01 # 学习率
MAX_GRAD_NORM = 1.2
EPSILON = 2
DELTA = 1e-4
MAX_PHYSICAL_BATCH_SIZE = 256

获取数据

1
2
3
4
5
6
transform = transforms.ToTensor()
trainset = torchvision.datasets.MNIST(root='./dataset',train=True,download=True,transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=BATCH_SIZE,shuffle=True,num_workers=0)

testset = torchvision.datasets.MNIST(root='./dataset',train=False,download=True,transform=transform)
testloader = torch.utils.data.DataLoader(testset,batch_size=BATCH_SIZE,shuffle=False,num_workers=0)

定义网络

AlexNet

nn

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class AlexNet(nn.Module):
def __init__(self, width_mult=1):
super(AlexNet, self).__init__()
self.layer1 = nn.Sequential( # 输入1*28*28
nn.Conv2d(1, 32, kernel_size=3, padding=1), # 32*28*28
nn.MaxPool2d(kernel_size=2, stride=2), # 32*14*14
nn.ReLU(inplace=True),
)
self.layer2 = nn.Sequential(
nn.Conv2d(32, 64, kernel_size=3, padding=1), # 64*14*14
nn.MaxPool2d(kernel_size=2, stride=2), # 64*7*7
nn.ReLU(inplace=True),
)
self.layer3 = nn.Sequential(
nn.Conv2d(64, 128, kernel_size=3, padding=1), # 128*7*7
)
self.layer4 = nn.Sequential(
nn.Conv2d(128, 256, kernel_size=3, padding=1), # 256*7*7
)

self.layer5 = nn.Sequential(
nn.Conv2d(256, 256, kernel_size=3, padding=1), # 256*7*7
nn.MaxPool2d(kernel_size=3, stride=2), # 256*3*3
nn.ReLU(inplace=True),
)
self.fc1 = nn.Linear(256*3*3, 1024)
self.fc2 = nn.Linear(1024, 512)
self.fc3 = nn.Linear(512, 10)

def forward(self, x):
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
x = self.layer5(x)
x = x.view(-1, 256*3*3)
x = self.fc1(x)
x = self.fc2(x)
x = self.fc3(x)
return x

然后还可以检验一下网络中使用的层Opacus是否支持,存在有些层不支持的情况。将网络进行转化满足支持。

1
2
3
4
5
6
7
8
net = AlexNet()
from opacus.validators import ModuleValidator

errors = ModuleValidator.validate(net, strict=False)
errors[-5:]
net = ModuleValidator.fix(net)
ModuleValidator.validate(net, strict=False)
net = net.to(device)

定义损失函数和优化器

1
2
criterion = nn.CrossEntropyLoss()  # 交叉熵损失函数,通常用于多分类问题上
optimizer = optim.SGD(net.parameters(), lr=LR, momentum=0.9)

通过差分隐私引擎重新生成model,optimizer和dataloader

1
2
3
4
5
6
7
8
9
10
11
12
13
privacy_engine = PrivacyEngine()

net, optimizer, train_loader = privacy_engine.make_private_with_epsilon(
module=net,
optimizer=optimizer,
data_loader=trainloader,
epochs=EPOCH,
target_epsilon=EPSILON,
target_delta=DELTA,
max_grad_norm=MAX_GRAD_NORM,
)

print(f"Using sigma={optimizer.noise_multiplier} and C={MAX_GRAD_NORM}")

训练

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def train():

for epoch in range(EPOCH):
sum_loss = []
net.train()
with BatchMemoryManager(
data_loader=trainloader,
max_physical_batch_size=MAX_PHYSICAL_BATCH_SIZE,
optimizer=optimizer
) as memory_safe_data_loader:
# 数据读取
for i, data in enumerate(memory_safe_data_loader):
# 梯度清零
optimizer.zero_grad()
inputs, labels = data
inputs, labels = inputs.to(device), labels.to(device)


# forward + backward
outputs = net(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()

# 每训练100个batch打印一次平均loss
sum_loss.append(loss.item())

# 每跑完一次epoch测试一下准确率
net.eval()
with torch.no_grad():
correct = 0
total = 0
for data in testloader:
images, labels = data
images, labels = images.to(device), labels.to(device)
outputs = net(images)
# 取得分最高的那个类
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum()
epsilon = privacy_engine.get_epsilon(DELTA)
print('第%d个epoch的识别准确率为:%d%%' % (epoch + 1, (100 * correct / total)),"loss为%f" % (np.mean(sum_loss)),f"(ε = {epsilon:.2f}, δ = {DELTA})")
# 保存模型参数
# torch.save(net.state_dict(), './params.pth')
train()