forked from liguge/Graph-Dynamic-Autoencoder
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmodel.py
154 lines (136 loc) · 4.57 KB
/
model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import torch
import torch.nn as nn
import numpy as np
import dgl
import dgl.function as fn
def construct_graph(data, neighbor=5):
n, c = data.shape
# nodes feature
g = dgl.DGLGraph()
g.add_nodes(n)
g.ndata['h'] = torch.Tensor(data)
# edges feature
for i in range(n):
if i - neighbor < 0:
left = 0
right = left + neighbor * 2
elif i + neighbor > n - 1:
right = n - 1
left = right - neighbor * 2
else:
left = i - neighbor
right = i + neighbor
neighbor_nodes = [x for x in range(left, right+1) if x != i]
di = []
for j in range(neighbor*2):
d = np.linalg.norm(data[i,:]-data[neighbor_nodes[j],:])
di.append(d)
beta = np.mean(di)
di = [np.exp(-x/beta) for x in di]
di = [x/np.sum(di) for x in di]
g.add_edges([i]*neighbor*2, neighbor_nodes)
g.edges[[i]*neighbor*2, neighbor_nodes].data['h_e'] = torch.Tensor(di)
g = dgl.add_self_loop(g)
for i in range(n):
g.edges[i, i].data['h_e'] = torch.Tensor([1.0])
return g
def updataEdgefeature(data, neighbor=5):
n = data.shape[0]
data = data.cpu().numpy()
dis = []
for i in range(n):
if i - neighbor < 0:
left = 0
right = left + neighbor * 2
elif i + neighbor > n - 1:
right = n - 1
left = right - neighbor * 2
else:
left = i - neighbor
right = i + neighbor
neighbor_nodes = [x for x in range(left, right+1) if x != i]
di = []
for j in range(neighbor*2):
d = np.linalg.norm(data[i,:]-data[neighbor_nodes[j],:])
di.append(d)
beta = np.mean(di)
di = [np.exp(-x/beta) for x in di]
di = [x/np.sum(di) for x in di]
dis.extend(di)
for i in range(n):
dis.append(1.0)
dis = torch.Tensor(dis)
dis = dis.cuda()
return dis
class GAElayer(nn.Module):
def __init__(self, input_dim=None, output_dim=None, SelfTraining=False):
super(GAElayer, self).__init__()
# if input_dim is None or output_dim is None:
# raise ValueError
self.in_features = input_dim
self.out_features = output_dim
self.is_training_self = SelfTraining # Indicates whether to pre-train layer by layer or train the entire network
self.msg_func = fn.u_mul_e('h', 'h_e', 'm')
self.reduce_func = fn.sum('m', 'h')
self.encoder = nn.Sequential(
nn.Linear(self.in_features, self.out_features, bias=True),
nn.ReLU() # Unified activation with ReLU
)
self.decoder = nn.Sequential(
nn.Linear(self.out_features, self.in_features, bias=True),
nn.ReLU()
)
def forward(self, inputs):
g, h = inputs
g.ndata['h'] = h
g.edata['h_e'] = updataEdgefeature(h, neighbor=5)
g.update_all(self.msg_func, self.reduce_func)
h = g.ndata.pop('h')
out = self.encoder(h)
if self.is_training_self:
return g, self.decoder(out)
else:
return g, out
def lock_grad(self):
for param in self.parameters():
param.requires_grad = False
def acquire_grad(self):
for param in self.parameters():
param.requires_grad = True
@property
def input_dim(self):
return self.in_features
@property
def output_dim(self):
return self.out_features
@property
def is_training_layer(self):
return self.is_training_self
@is_training_layer.setter
def is_training_layer(self, other: bool):
self.is_training_self = other
class GraphStackAE(nn.Module):
"""
Construct the whole network with layers_list
"""
def __init__(self, layers_list=None):
super(GraphStackAE, self).__init__()
self.layers_list = layers_list
self.initialize()
self.encoder_1 = self.layers_list[0]
self.encoder_2 = self.layers_list[1]
self.decoder_1 = self.layers_list[2]
self.decoder_2 = self.layers_list[3]
def initialize(self):
for layer in self.layers_list:
layer.is_training_layer = False
# for param in layer.parameters():
# param.requires_grad = True
def forward(self, inputs):
g, out = inputs
g, out = self.encoder_1([g, out])
g, out = self.encoder_2([g, out])
self.hidden_feature = out
g, out = self.decoder_1([g, out])
g, out = self.decoder_2([g, out])
return out