Skip to content

Examples of Specific Applications

Here we show how to implement some common applications with bohml.

RAD method is used by default


few shot learning

Few shot learning requires dataset Omniglot \ Miniimagenet.

Define LL and UL objective:

def ll_objective(data, target, ul_model, ll_model):
    out_f = ll_model(ul_model(data))
    loss_f = F.cross_entropy(out_f, target)
    return loss_f


def ul_objective(data, target, ul_model, ll_model):
    out_F = ll_model(ul_model(data))
    loss_F = F.cross_entropy(out_F, target)
    return loss_F

Define LL and UL model along with their opts, both offers Conv and Res12, here we use Res12:

ul_model = bohml.utils.model.backbone.Res12([5, 3, 84, 84], ways, use_head=False)
ll_model = nn.Sequential(
    nn.Flatten(),
    nn.Linear(np.prod(ul_model.output_shape[1:]), ways)
)
ll_opt = torch.optim.SGD(ll_model.parameters(), lr=args.learning_rate)
ul_opt = torch.optim.Adam(ul_model.parameters())
ll_model.to(device)
ul_model.to(device)

Define Bi-level optimizer:

optimizer = bohml.optimizer.BOHMLOptimizer(
            "Feature", ll_method="Dynamic", ul_method="Recurrence",
            ll_objective=ll_objective, ul_objective=ul_objective,
            ll_model=ll_model, ul_model=ul_model)
optimizer.build_ll_solver(args.inner_loop, ll_opt)
optimizer.build_ul_solver(ul_opt)

Train and evaluate model. Here we use package torchmeta to load train, validate and test data batches:

dataset = miniimagenet("data", ways=ways, shots=shots, test_shots=test_shots, 
                       meta_train=True, download=True)
test_dataset = miniimagenet("data", ways=ways, shots=shots, test_shots=test_shots,
                            meta_test=True, download=True)
train_dataloader = BatchMetaDataLoader(dataset, batch_size=batch_size, **kwargs)
test_dataloader = BatchMetaDataLoader(test_dataset, batch_size=batch_size, **kwargs)


with tqdm(train_dataloader, total=60000, desc="Training Phase") as pbar:
    for meta_iter, batch in enumerate(pbar):

        tr_xs, tr_ys = batch["train"][0].to(device), batch["train"][1].to(device)
        tst_xs, tst_ys = batch["test"][0].to(device), batch["test"][1].to(device)

        val_loss, forward_time, backward_time = \
                                    optimizer.run_iter(tr_xs, tr_ys,
                                    tst_xs, tst_ys, meta_iter, 
                                    forward_with_whole_batch=False)
        val_losses.append(val_loss)

        if meta_iter % eval_interval == 0:
            test_losses, test_accs = evaluate()

hyper cleaning

Hyper cleaning requires datasets MINIST \ CIFAR10.

Setting up data and metric function:

class Dataset:
    def __init__(self, data, target, polluted=False, rho=0.0):
        self.data = data.float() / torch.max(data)
        print(list(target.shape))
        if not polluted:
            self.clean_target = target
            self.dirty_target = None
            self.clean = np.ones(list(target.shape)[0])
        else:
            self.clean_target = None
            self.dirty_target = target
            self.clean = np.zeros(list(target.shape)[0])
        self.polluted = polluted
        self.rho = rho
        self.set = set(target.numpy().tolist())

    def data_polluting(self, rho):
        assert self.polluted == False and self.dirty_target is None
        number = self.data.shape[0]
        number_list = list(range(number))
        random.shuffle(number_list)
        self.dirty_target = copy.deepcopy(self.clean_target)
        for i in number_list[:int(rho * number)]:
            dirty_set = copy.deepcopy(self.set)
            dirty_set.remove(int(self.clean_target[i]))
            self.dirty_target[i] = random.randint(0, len(dirty_set))
            self.clean[i] = 0
        self.polluted = True
        self.rho = rho

    def data_flatten(self):
        try:
            self.data = self.data.view(self.data.shape[0], self.data.shape[1] * self.data.shape[2])
        except BaseException:
            self.data = self.data.reshape(self.data.shape[0],
                                          self.data.shape[1] * self.data.shape[2] * self.data.shape[3])

    def to_cuda(self):
        self.data = self.data.cuda()
        if self.clean_target is not None:
            self.clean_target = self.clean_target.cuda()
        if self.dirty_target is not None:
            self.dirty_target = self.dirty_target.cuda()

def data_splitting(dataset, tr, val, test):
    assert tr + val + test <= 1.0 or tr > 1

    number = dataset.targets.shape[0]
    number_list = list(range(number))
    random.shuffle(number_list)
    if tr < 1:
        tr_number = tr * number
        val_number = val * number
        test_number = test * number
    else:
        tr_number = tr
        val_number = val
        test_number = test

    train_data = Dataset(dataset.data[number_list[:int(tr_number)], :, :],
                         dataset.targets[number_list[:int(tr_number)]])
    val_data = Dataset(dataset.data[number_list[int(tr_number):int(tr_number + val_number)], :, :],
                       dataset.targets[number_list[int(tr_number):int(tr_number + val_number)]])
    test_data = Dataset(
        dataset.data[number_list[int(tr_number + val_number):(tr_number + val_number + test_number)], :, :],
        dataset.targets[number_list[int(tr_number + val_number):(tr_number + val_number + test_number)]])
    return train_data, val_data, test_data

from torchvision.datasets import MNIST
dataset = MNIST(root="data", train=True, download=True)
tr, val, test = data_splitting(dataset, 5000, 5000, 10000)
tr.data_polluting(0.5)
tr.data_flatten()
val.data_flatten()
test.data_flatten()

def accuary(out, target):
pred = out.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
acc = pred.eq(target.view_as(pred)).sum().item() / len(target)
return acc


def Binarization(x):
    x_bi = np.zeros_like(x)
    for i in range(x.shape[0]):
        # print(x[i])
        x_bi[i] = 1 if x[i] >= 0 else 0
    return x_bi

Define LL and UL objective:

def val_loss(data, target, upper_model, lower_model):
    loss = F.cross_entropy(lower_model(data), target)
    return loss

def train_loss(data, target, upper_model, lower_model):
    out = upper_model(F.cross_entropy(lower_model(data), target, reduction='none'))
    return out

Define LL and UL model along with their opts:

class Net_x(torch.nn.Module):
    def __init__(self, tr):
        super(Net_x, self).__init__()
        self.x = torch.nn.Parameter(torch.zeros(tr.data.shape[0]).to(device).requires_grad_(True))

    def forward(self, y):
        y = torch.sigmoid(self.x) * y
        y = y.mean()
        return y
x = Net_x(tr)
y = torch.nn.Sequential(torch.nn.Linear(28 ** 2, 10)).to(device)
x_opt = torch.optim.Adam(x.parameters(), lr=0.01)
y_opt=torch.optim.SGD(y.parameters(), lr=0.03)

Define Bi-level optimizer:

b_optimizer = bohml.optimizer.BOHMLOptimizer(
            'MetaRepr', 'Feature', 'RAD',
            ll_objective=train_loss, ul_objective=val_loss,
            ll_model=y, ul_model=x, total_iters=3000)
b_optimizer.build_ll_solver(100, y_opt)
b_optimizer.build_ul_solver(x_opt)

Train and evaluate model.

for x_itr in range(3000):
    loss, forward_time, backward_time = \
                    b_optimizer.run_iter(tr.data, tr.dirty_target,
                    val.data, val.clean_target,
                    current_iter=x_itr, forward_with_whole_batch=True)

    if x_itr % 10 == 0:
        with torch.no_grad():
            out = y(test.data)
            acc = accuary(out, test.clean_target)
            x_bi = Binarization(x.x.cpu().numpy())
            clean = x_bi * tr.clean
            p = clean.mean() / (x_bi.sum() / x_bi.shape[0] + 1e-8)
            r = clean.mean() / (1. - tr.rho)
            F1_score = 2 * p * r / (p + r + 1e-8)
            dc = 0
            if F1_score_last > F1_score:
                dc = 1
            F1_score_last = F1_score
            valLoss = F.cross_entropy(out, test.clean_target)
            print('x_itr={},acc={:.3f},p={:.3f}.r={:.3f},F1 score={:.3f},val_loss={:.3f}'.format(x_itr,
                                                                             100 * accuary(out,
                                                                                           test.clean_target),
                                                                             100 * p, 100 * r, 100 * F1_score,valLoss))

GAN

Desired packages:

import os
import numpy as np
import matplotlib.pyplot as plt

import seaborn
from tqdm import tqdm_notebook
import random

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import scipy

import bohml

Define data generator:

class data_generator(object):
    def __init__(self,dis='normal',n=8,std=0.02,radius=2):

        # n = 8      #8
        # radius = 2
        # std = 0.02 #0.02
        delta_theta = 2*np.pi / n

        centers_x = []
        centers_y = []
        for i in range(n):
            centers_x.append(radius*np.cos(i*delta_theta))
            centers_y.append(radius*np.sin(i*delta_theta))

        centers_x = np.expand_dims(np.array(centers_x), 1)
        centers_y = np.expand_dims(np.array(centers_y), 1)

        p = [1./n for _ in range(n)]

        self.p = p
        self.size = 2
        self.n = n
        self.dis=dis

        self.centers = np.concatenate([centers_x, centers_y], 1)
        self.std = std

    # switch to random distribution (harder)
    def random_distribution(self, p=None):
        if p is None:
            p = [np.random.uniform() for _ in range(self.n)]
            p = p / np.sum(p)
        self.p = p

    # switch to uniform distribution
    def uniform_distribution(self):
        p = [1./self.n for _ in range(self.n)]
        self.p = p

    def sample(self, N,center_require=False):
        n = self.n
        std = self.std
        centers = self.centers

        ith_center = np.random.choice(n, N,p=self.p)
        sample_centers = centers[ith_center, :]
        if self.dis=='normal':
            sample_points = np.random.normal(loc=sample_centers, scale=std)
        elif self.dis=='laplace':
            sample_points = scipy.stats.laplace.rvs(loc=sample_centers,scale=std)
        if center_require:
            return sample_points.astype('float32'),ith_center
        else:
            return sample_points.astype('float32')

## choose uniform mixture gaussian or weighted mixture gaussian
dset = data_generator()
# dset.random_distribution()
dset.uniform_distribution()

Plot function and KL function and other settings:

# plot the samples through iterations
def plot_samples(samples):
    xmax = 5
    cols = len(samples)
    bg_color = seaborn.color_palette('Greens', n_colors=256)[0]
    plt.figure(figsize=(2 * cols, 2))
    for i, samps in enumerate(samples):
        if i == 0:
            ax = plt.subplot(1, cols, 1)
        else:
            plt.subplot(1, cols, i + 1, sharex=ax, sharey=ax)
        ax2 = seaborn.kdeplot(samps[:, 0], samps[:, 1], shaded=True, cmap='Greens', n_levels=20,
                              clip=[[-xmax, xmax]] * 2)
        plt.xticks([])
        plt.yticks([])
        plt.title('step %d' % (i * 300))

    ax.set_ylabel('%d unrolling steps' % 10)
    plt.gcf().tight_layout()
    plt.savefig('target.png')
    plt.show()
    plt.close()

def KL(samples, d=-1):
    n = 8
    radius = 2
    std = 0.02
    delta_theta = 2 * np.pi / n

    centers_x = []
    centers_y = []
    for i in range(n):
        centers_x.append(radius * np.cos(i * delta_theta))
        centers_y.append(radius * np.sin(i * delta_theta))

    centers_x = np.expand_dims(np.array(centers_x), 1)
    centers_y = np.expand_dims(np.array(centers_y), 1)
    centers = np.concatenate([centers_x, centers_y], 1)

    s = samples
    samplesP = np.zeros(s.shape[0])
    for i in range(n):
        samplesP = samplesP + scipy.stats.multivariate_normal.pdf(samples, centers[i], [[std, 0], [0, std]]) / 8

    samplesP=samplesP/sum(samplesP)
    pd =np.log(samplesP / (1/float(s.shape[0]))) * samplesP

    KL = np.sum(pd)
    return KL

def setup_seed(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    random.seed(seed)
    torch.backends.cudnn.deterministic = True
# 设置随机数种子
setup_seed(30)

if torch.cuda.is_available():
    cuda = True
    os.environ['CUDA_VISIBLE_DEVICES'] = "0"
else:
    cuda = False

plt.style.use('ggplot')

Define LL and UL objective:

def ll_objective(data, target, ul_model, ll_model):
    d_real_data, d_fake_data = data
    d_fake_decision = ll_model(d_fake_data)
    fake_target = torch.zeros_like(d_fake_decision)
    if cuda:
        fake_target = fake_target.cuda()
    d_fake_error = criterion(d_fake_decision, fake_target)  # zeros = fake

    d_real_decision = ll_model(d_real_data)
    real_target = torch.ones_like(d_real_decision)
    if cuda:
        real_target = real_target.cuda()
    d_real_error = criterion(d_real_decision, real_target)  # ones = true

    d_loss = d_real_error + d_fake_error
    return d_loss


def ul_objective(data, target, ul_model, ll_model):
    g_fake_data = ul_model(data)
    dg_fake_decision = ll_model(g_fake_data)
    target = torch.ones_like(dg_fake_decision)
    if cuda:
        target = target.cuda()
    g_error = criterion(dg_fake_decision, target)
    return g_error

Define LL and UL model along with their opts:

class Generator(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(Generator, self).__init__()
        self.map1 = nn.Linear(input_size, hidden_size)
        self.map2 = nn.Linear(hidden_size, hidden_size)
        self.map3 = nn.Linear(hidden_size, output_size)
        # self.activation_fn = F.tanh
        self.activation_fn = F.relu

    def forward(self, x):
        x = self.activation_fn(self.map1(x))
        x = self.activation_fn(self.map2(x))
        return self.map3(x)


    class Discriminator(nn.Module):
        def __init__(self, input_size, hidden_size, output_size):
            super(Discriminator, self).__init__()
            self.map1 = nn.Linear(input_size, hidden_size)
            self.map2 = nn.Linear(hidden_size, hidden_size)
            self.map3 = nn.Linear(hidden_size, output_size)
            self.activation_fn = F.relu

    def forward(self, x):
        x = self.activation_fn(self.map1(x))
        x = self.activation_fn(self.map2(x))
        return torch.sigmoid(self.map3(x))

    def load(self, backup):
        for m_from, m_to in zip(backup.modules(), self.modules()):
            if isinstance(m_to, nn.Linear):
                m_to.weight.data = m_from.weight.data.clone()
                if m_to.bias is not None:
                    m_to.bias.data = m_from.bias.data.clone()

ul_model = Generator(input_size=256, hidden_size=128, output_size=dset.size)
ll_model = Discriminator(input_size=dset.size, hidden_size=128, output_size=1)

ll_opt = optim.Adam(ll_model.parameters(), lr=1e-4)
ul_opt = optim.Adam(ul_model.parameters(), lr=1e-4)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
ll_model.to(device)
ul_model.to(device)

Define Bi-level optimizer:

optimizer = bohml.optimizer.BOHMLOptimizer(
            "Feature", ll_method="Dynamic", ul_method="Recurrence",
            ll_objective=ll_objective, ul_objective=ul_objective,
            ll_model=ll_model, ul_model=ul_model, total_iters=3000)
optimizer.build_ll_solver(10, ll_opt)
optimizer.build_ul_solver(ul_opt)

Train model:

def noise_sampler(N, z_dim):
    return np.random.normal(size=[N, z_dim]).astype('float32')

criterion = nn.BCELoss()

for rhg in range(1):
    def g_sample():
        with torch.no_grad():
            gen_input = torch.from_numpy(noise_sampler(512, 256))
            if cuda:
                gen_input = gen_input.cuda()
            g_fake_data = ul_model(gen_input)
            return g_fake_data.cpu().numpy()
    samples = []
    g_infos = []
    for it in tqdm_notebook(range(3000)):
        gen_input = torch.from_numpy(noise_sampler(512, 256))
        if cuda:
            gen_input = gen_input.cuda()
        #  1A: Train D on real
        d_real_data = torch.from_numpy(dset.sample(512))
        if cuda:
            d_real_data = d_real_data.cuda()
        #  1B: Train D on fake
        d_gen_input = torch.from_numpy(noise_sampler(512, 256))
        if cuda:
            d_gen_input = d_gen_input.cuda()
        with torch.no_grad():
            d_fake_data = ul_model(d_gen_input)

        for i in range(1):
            ll_opt.zero_grad()
            d_loss = ll_objective([d_real_data, d_fake_data], None, ul_model, ll_model)
            d_loss.backward()
            ll_opt.step()
        g_info, forward_time, backward_time = optimizer.run_iter([d_real_data, d_fake_data], None, gen_input, None,
                                                                   it, forward_with_whole_batch=True)

        if it % log_interval == 0:
            g_fake_data = g_sample()
            samples.append(g_fake_data)
            print(g_info, KL(samples[-1]))

    plot_samples(samples)