Examples of Specific Applications
Here we show how to implement some common applications with bohml
.
RAD method is used by default
few shot learning
Few shot learning requires dataset Omniglot \ Miniimagenet.
Define LL and UL objective:
def ll_objective(data, target, ul_model, ll_model):
out_f = ll_model(ul_model(data))
loss_f = F.cross_entropy(out_f, target)
return loss_f
def ul_objective(data, target, ul_model, ll_model):
out_F = ll_model(ul_model(data))
loss_F = F.cross_entropy(out_F, target)
return loss_F
Define LL and UL model along with their opts, both
offers Conv and Res12, here we use Res12:
ul_model = bohml.utils.model.backbone.Res12([5, 3, 84, 84], ways, use_head=False)
ll_model = nn.Sequential(
nn.Flatten(),
nn.Linear(np.prod(ul_model.output_shape[1:]), ways)
)
ll_opt = torch.optim.SGD(ll_model.parameters(), lr=args.learning_rate)
ul_opt = torch.optim.Adam(ul_model.parameters())
ll_model.to(device)
ul_model.to(device)
Define Bi-level optimizer:
optimizer = bohml.optimizer.BOHMLOptimizer(
"Feature", ll_method="Dynamic", ul_method="Recurrence",
ll_objective=ll_objective, ul_objective=ul_objective,
ll_model=ll_model, ul_model=ul_model)
optimizer.build_ll_solver(args.inner_loop, ll_opt)
optimizer.build_ul_solver(ul_opt)
Train and evaluate model. Here we use package torchmeta
to load train, validate and test data batches:
dataset = miniimagenet("data", ways=ways, shots=shots, test_shots=test_shots,
meta_train=True, download=True)
test_dataset = miniimagenet("data", ways=ways, shots=shots, test_shots=test_shots,
meta_test=True, download=True)
train_dataloader = BatchMetaDataLoader(dataset, batch_size=batch_size, **kwargs)
test_dataloader = BatchMetaDataLoader(test_dataset, batch_size=batch_size, **kwargs)
with tqdm(train_dataloader, total=60000, desc="Training Phase") as pbar:
for meta_iter, batch in enumerate(pbar):
tr_xs, tr_ys = batch["train"][0].to(device), batch["train"][1].to(device)
tst_xs, tst_ys = batch["test"][0].to(device), batch["test"][1].to(device)
val_loss, forward_time, backward_time = \
optimizer.run_iter(tr_xs, tr_ys,
tst_xs, tst_ys, meta_iter,
forward_with_whole_batch=False)
val_losses.append(val_loss)
if meta_iter % eval_interval == 0:
test_losses, test_accs = evaluate()
hyper cleaning
Hyper cleaning requires datasets MINIST \ CIFAR10.
Setting up data and metric function:
class Dataset:
def __init__(self, data, target, polluted=False, rho=0.0):
self.data = data.float() / torch.max(data)
print(list(target.shape))
if not polluted:
self.clean_target = target
self.dirty_target = None
self.clean = np.ones(list(target.shape)[0])
else:
self.clean_target = None
self.dirty_target = target
self.clean = np.zeros(list(target.shape)[0])
self.polluted = polluted
self.rho = rho
self.set = set(target.numpy().tolist())
def data_polluting(self, rho):
assert self.polluted == False and self.dirty_target is None
number = self.data.shape[0]
number_list = list(range(number))
random.shuffle(number_list)
self.dirty_target = copy.deepcopy(self.clean_target)
for i in number_list[:int(rho * number)]:
dirty_set = copy.deepcopy(self.set)
dirty_set.remove(int(self.clean_target[i]))
self.dirty_target[i] = random.randint(0, len(dirty_set))
self.clean[i] = 0
self.polluted = True
self.rho = rho
def data_flatten(self):
try:
self.data = self.data.view(self.data.shape[0], self.data.shape[1] * self.data.shape[2])
except BaseException:
self.data = self.data.reshape(self.data.shape[0],
self.data.shape[1] * self.data.shape[2] * self.data.shape[3])
def to_cuda(self):
self.data = self.data.cuda()
if self.clean_target is not None:
self.clean_target = self.clean_target.cuda()
if self.dirty_target is not None:
self.dirty_target = self.dirty_target.cuda()
def data_splitting(dataset, tr, val, test):
assert tr + val + test <= 1.0 or tr > 1
number = dataset.targets.shape[0]
number_list = list(range(number))
random.shuffle(number_list)
if tr < 1:
tr_number = tr * number
val_number = val * number
test_number = test * number
else:
tr_number = tr
val_number = val
test_number = test
train_data = Dataset(dataset.data[number_list[:int(tr_number)], :, :],
dataset.targets[number_list[:int(tr_number)]])
val_data = Dataset(dataset.data[number_list[int(tr_number):int(tr_number + val_number)], :, :],
dataset.targets[number_list[int(tr_number):int(tr_number + val_number)]])
test_data = Dataset(
dataset.data[number_list[int(tr_number + val_number):(tr_number + val_number + test_number)], :, :],
dataset.targets[number_list[int(tr_number + val_number):(tr_number + val_number + test_number)]])
return train_data, val_data, test_data
from torchvision.datasets import MNIST
dataset = MNIST(root="data", train=True, download=True)
tr, val, test = data_splitting(dataset, 5000, 5000, 10000)
tr.data_polluting(0.5)
tr.data_flatten()
val.data_flatten()
test.data_flatten()
def accuary(out, target):
pred = out.argmax(dim=1, keepdim=True) # get the index of the max log-probability
acc = pred.eq(target.view_as(pred)).sum().item() / len(target)
return acc
def Binarization(x):
x_bi = np.zeros_like(x)
for i in range(x.shape[0]):
# print(x[i])
x_bi[i] = 1 if x[i] >= 0 else 0
return x_bi
Define LL and UL objective:
def val_loss(data, target, upper_model, lower_model):
loss = F.cross_entropy(lower_model(data), target)
return loss
def train_loss(data, target, upper_model, lower_model):
out = upper_model(F.cross_entropy(lower_model(data), target, reduction='none'))
return out
Define LL and UL model along with their opts:
class Net_x(torch.nn.Module):
def __init__(self, tr):
super(Net_x, self).__init__()
self.x = torch.nn.Parameter(torch.zeros(tr.data.shape[0]).to(device).requires_grad_(True))
def forward(self, y):
y = torch.sigmoid(self.x) * y
y = y.mean()
return y
x = Net_x(tr)
y = torch.nn.Sequential(torch.nn.Linear(28 ** 2, 10)).to(device)
x_opt = torch.optim.Adam(x.parameters(), lr=0.01)
y_opt=torch.optim.SGD(y.parameters(), lr=0.03)
Define Bi-level optimizer:
b_optimizer = bohml.optimizer.BOHMLOptimizer(
'MetaRepr', 'Feature', 'RAD',
ll_objective=train_loss, ul_objective=val_loss,
ll_model=y, ul_model=x, total_iters=3000)
b_optimizer.build_ll_solver(100, y_opt)
b_optimizer.build_ul_solver(x_opt)
Train and evaluate model.
for x_itr in range(3000):
loss, forward_time, backward_time = \
b_optimizer.run_iter(tr.data, tr.dirty_target,
val.data, val.clean_target,
current_iter=x_itr, forward_with_whole_batch=True)
if x_itr % 10 == 0:
with torch.no_grad():
out = y(test.data)
acc = accuary(out, test.clean_target)
x_bi = Binarization(x.x.cpu().numpy())
clean = x_bi * tr.clean
p = clean.mean() / (x_bi.sum() / x_bi.shape[0] + 1e-8)
r = clean.mean() / (1. - tr.rho)
F1_score = 2 * p * r / (p + r + 1e-8)
dc = 0
if F1_score_last > F1_score:
dc = 1
F1_score_last = F1_score
valLoss = F.cross_entropy(out, test.clean_target)
print('x_itr={},acc={:.3f},p={:.3f}.r={:.3f},F1 score={:.3f},val_loss={:.3f}'.format(x_itr,
100 * accuary(out,
test.clean_target),
100 * p, 100 * r, 100 * F1_score,valLoss))
GAN
Desired packages:
import os
import numpy as np
import matplotlib.pyplot as plt
import seaborn
from tqdm import tqdm_notebook
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import scipy
import bohml
Define data generator:
class data_generator(object):
def __init__(self,dis='normal',n=8,std=0.02,radius=2):
# n = 8 #8
# radius = 2
# std = 0.02 #0.02
delta_theta = 2*np.pi / n
centers_x = []
centers_y = []
for i in range(n):
centers_x.append(radius*np.cos(i*delta_theta))
centers_y.append(radius*np.sin(i*delta_theta))
centers_x = np.expand_dims(np.array(centers_x), 1)
centers_y = np.expand_dims(np.array(centers_y), 1)
p = [1./n for _ in range(n)]
self.p = p
self.size = 2
self.n = n
self.dis=dis
self.centers = np.concatenate([centers_x, centers_y], 1)
self.std = std
# switch to random distribution (harder)
def random_distribution(self, p=None):
if p is None:
p = [np.random.uniform() for _ in range(self.n)]
p = p / np.sum(p)
self.p = p
# switch to uniform distribution
def uniform_distribution(self):
p = [1./self.n for _ in range(self.n)]
self.p = p
def sample(self, N,center_require=False):
n = self.n
std = self.std
centers = self.centers
ith_center = np.random.choice(n, N,p=self.p)
sample_centers = centers[ith_center, :]
if self.dis=='normal':
sample_points = np.random.normal(loc=sample_centers, scale=std)
elif self.dis=='laplace':
sample_points = scipy.stats.laplace.rvs(loc=sample_centers,scale=std)
if center_require:
return sample_points.astype('float32'),ith_center
else:
return sample_points.astype('float32')
## choose uniform mixture gaussian or weighted mixture gaussian
dset = data_generator()
# dset.random_distribution()
dset.uniform_distribution()
Plot function and KL function and other settings:
# plot the samples through iterations
def plot_samples(samples):
xmax = 5
cols = len(samples)
bg_color = seaborn.color_palette('Greens', n_colors=256)[0]
plt.figure(figsize=(2 * cols, 2))
for i, samps in enumerate(samples):
if i == 0:
ax = plt.subplot(1, cols, 1)
else:
plt.subplot(1, cols, i + 1, sharex=ax, sharey=ax)
ax2 = seaborn.kdeplot(samps[:, 0], samps[:, 1], shaded=True, cmap='Greens', n_levels=20,
clip=[[-xmax, xmax]] * 2)
plt.xticks([])
plt.yticks([])
plt.title('step %d' % (i * 300))
ax.set_ylabel('%d unrolling steps' % 10)
plt.gcf().tight_layout()
plt.savefig('target.png')
plt.show()
plt.close()
def KL(samples, d=-1):
n = 8
radius = 2
std = 0.02
delta_theta = 2 * np.pi / n
centers_x = []
centers_y = []
for i in range(n):
centers_x.append(radius * np.cos(i * delta_theta))
centers_y.append(radius * np.sin(i * delta_theta))
centers_x = np.expand_dims(np.array(centers_x), 1)
centers_y = np.expand_dims(np.array(centers_y), 1)
centers = np.concatenate([centers_x, centers_y], 1)
s = samples
samplesP = np.zeros(s.shape[0])
for i in range(n):
samplesP = samplesP + scipy.stats.multivariate_normal.pdf(samples, centers[i], [[std, 0], [0, std]]) / 8
samplesP=samplesP/sum(samplesP)
pd =np.log(samplesP / (1/float(s.shape[0]))) * samplesP
KL = np.sum(pd)
return KL
def setup_seed(seed):
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
np.random.seed(seed)
random.seed(seed)
torch.backends.cudnn.deterministic = True
# 设置随机数种子
setup_seed(30)
if torch.cuda.is_available():
cuda = True
os.environ['CUDA_VISIBLE_DEVICES'] = "0"
else:
cuda = False
plt.style.use('ggplot')
Define LL and UL objective:
def ll_objective(data, target, ul_model, ll_model):
d_real_data, d_fake_data = data
d_fake_decision = ll_model(d_fake_data)
fake_target = torch.zeros_like(d_fake_decision)
if cuda:
fake_target = fake_target.cuda()
d_fake_error = criterion(d_fake_decision, fake_target) # zeros = fake
d_real_decision = ll_model(d_real_data)
real_target = torch.ones_like(d_real_decision)
if cuda:
real_target = real_target.cuda()
d_real_error = criterion(d_real_decision, real_target) # ones = true
d_loss = d_real_error + d_fake_error
return d_loss
def ul_objective(data, target, ul_model, ll_model):
g_fake_data = ul_model(data)
dg_fake_decision = ll_model(g_fake_data)
target = torch.ones_like(dg_fake_decision)
if cuda:
target = target.cuda()
g_error = criterion(dg_fake_decision, target)
return g_error
Define LL and UL model along with their opts:
class Generator(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super(Generator, self).__init__()
self.map1 = nn.Linear(input_size, hidden_size)
self.map2 = nn.Linear(hidden_size, hidden_size)
self.map3 = nn.Linear(hidden_size, output_size)
# self.activation_fn = F.tanh
self.activation_fn = F.relu
def forward(self, x):
x = self.activation_fn(self.map1(x))
x = self.activation_fn(self.map2(x))
return self.map3(x)
class Discriminator(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super(Discriminator, self).__init__()
self.map1 = nn.Linear(input_size, hidden_size)
self.map2 = nn.Linear(hidden_size, hidden_size)
self.map3 = nn.Linear(hidden_size, output_size)
self.activation_fn = F.relu
def forward(self, x):
x = self.activation_fn(self.map1(x))
x = self.activation_fn(self.map2(x))
return torch.sigmoid(self.map3(x))
def load(self, backup):
for m_from, m_to in zip(backup.modules(), self.modules()):
if isinstance(m_to, nn.Linear):
m_to.weight.data = m_from.weight.data.clone()
if m_to.bias is not None:
m_to.bias.data = m_from.bias.data.clone()
ul_model = Generator(input_size=256, hidden_size=128, output_size=dset.size)
ll_model = Discriminator(input_size=dset.size, hidden_size=128, output_size=1)
ll_opt = optim.Adam(ll_model.parameters(), lr=1e-4)
ul_opt = optim.Adam(ul_model.parameters(), lr=1e-4)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
ll_model.to(device)
ul_model.to(device)
Define Bi-level optimizer:
optimizer = bohml.optimizer.BOHMLOptimizer(
"Feature", ll_method="Dynamic", ul_method="Recurrence",
ll_objective=ll_objective, ul_objective=ul_objective,
ll_model=ll_model, ul_model=ul_model, total_iters=3000)
optimizer.build_ll_solver(10, ll_opt)
optimizer.build_ul_solver(ul_opt)
Train model:
def noise_sampler(N, z_dim):
return np.random.normal(size=[N, z_dim]).astype('float32')
criterion = nn.BCELoss()
for rhg in range(1):
def g_sample():
with torch.no_grad():
gen_input = torch.from_numpy(noise_sampler(512, 256))
if cuda:
gen_input = gen_input.cuda()
g_fake_data = ul_model(gen_input)
return g_fake_data.cpu().numpy()
samples = []
g_infos = []
for it in tqdm_notebook(range(3000)):
gen_input = torch.from_numpy(noise_sampler(512, 256))
if cuda:
gen_input = gen_input.cuda()
# 1A: Train D on real
d_real_data = torch.from_numpy(dset.sample(512))
if cuda:
d_real_data = d_real_data.cuda()
# 1B: Train D on fake
d_gen_input = torch.from_numpy(noise_sampler(512, 256))
if cuda:
d_gen_input = d_gen_input.cuda()
with torch.no_grad():
d_fake_data = ul_model(d_gen_input)
for i in range(1):
ll_opt.zero_grad()
d_loss = ll_objective([d_real_data, d_fake_data], None, ul_model, ll_model)
d_loss.backward()
ll_opt.step()
g_info, forward_time, backward_time = optimizer.run_iter([d_real_data, d_fake_data], None, gen_input, None,
it, forward_with_whole_batch=True)
if it % log_interval == 0:
g_fake_data = g_sample()
samples.append(g_fake_data)
print(g_info, KL(samples[-1]))
plot_samples(samples)