-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcontrastive_loss.py
94 lines (81 loc) · 3.18 KB
/
contrastive_loss.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import torch
import torch.nn as nn
import math
class InstanceLoss(nn.Module):
def __init__(self, batch_size, temperature, device):
super(InstanceLoss, self).__init__()
self.batch_size = batch_size
self.temperature = temperature
self.device = device
self.mask = self.mask_correlated_samples(batch_size)
self.criterion = nn.CrossEntropyLoss(reduction="sum")
def mask_correlated_samples(self, batch_size):
N = 2 * batch_size
mask = torch.ones((N, N))
mask = mask.fill_diagonal_(0)
for i in range(batch_size):
mask[i, batch_size + i] = 0
mask[batch_size + i, i] = 0
mask = mask.bool()
return mask
def forward(self, z_i, z_j):
N = 2 * self.batch_size
z = torch.cat((z_i, z_j), dim=0)
sim = torch.matmul(z, z.T) / self.temperature
sim_i_j = torch.diag(sim, self.batch_size)
sim_j_i = torch.diag(sim, -self.batch_size)
positive_samples = torch.cat((sim_i_j, sim_j_i), dim=0).reshape(N, 1)
negative_samples = sim[self.mask].reshape(N, -1)
labels = torch.zeros(N).to(positive_samples.device).long()
logits = torch.cat((positive_samples, negative_samples), dim=1)
loss = self.criterion(logits, labels)
loss /= N
return loss
# class ClusterLoss(nn.Module):
# def __init__(self, class_num, temperature, device):
# super(ClusterLoss, self).__init__()
# self.class_num = class_num
# self.temperature = temperature
# self.device = device
#
# self.mask = self.mask_correlated_clusters(class_num)
# self.criterion = nn.CrossEntropyLoss(reduction="sum")
# self.similarity_f = nn.CosineSimilarity(dim=2)
#
# def mask_correlated_clusters(self, class_num):
# N = 2 * class_num
# mask = torch.ones((N, N))
# mask = mask.fill_diagonal_(0)
# for i in range(class_num):
# mask[i, class_num + i] = 0
# mask[class_num + i, i] = 0
# mask = mask.bool()
# return mask
#
# def forward(self, c_i, c_j):
# p_i = c_i.sum(0).view(-1)
# p_i /= p_i.sum()
# ne_i = math.log(p_i.size(0)) + (p_i * torch.log(p_i)).sum()
# p_j = c_j.sum(0).view(-1)
# p_j /= p_j.sum()
# ne_j = math.log(p_j.size(0)) + (p_j * torch.log(p_j)).sum()
# ne_loss = ne_i + ne_j
#
# c_i = c_i.t()
# c_j = c_j.t()
# N = 2 * self.class_num
# c = torch.cat((c_i, c_j), dim=0)
#
# sim = self.similarity_f(c.unsqueeze(1), c.unsqueeze(0)) / self.temperature
# sim_i_j = torch.diag(sim, self.class_num)
# sim_j_i = torch.diag(sim, -self.class_num)
#
# positive_clusters = torch.cat((sim_i_j, sim_j_i), dim=0).reshape(N, 1)
# negative_clusters = sim[self.mask].reshape(N, -1)
#
# labels = torch.zeros(N).to(positive_clusters.device).long()
# logits = torch.cat((positive_clusters, negative_clusters), dim=1)
# loss = self.criterion(logits, labels)
# loss /= N
#
# return loss + ne_loss