-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathfc.py
231 lines (197 loc) · 6.84 KB
/
fc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import random
import numpy as np
from activators import SigmoidActivator, IdentityActivator
from functools import reduce
# 更适合深度学习算法的编程方式:向量化编程。
# 主要有两个原因:一个是我们事实上并不需要真的去定义Node、Connection这样的对象,直接把数学计算实现了就可以了;
# 另一个原因,是底层算法库会针对向量运算做优化(甚至有专用的硬件,比如GPU),程序效率会提升很多。
# 所以,在深度学习的世界里,我们总会想法设法的把计算表达为向量的形式
# 全连接层实现类
# 上面这个类一举取代了原先的Layer、Node、Connection等类,不但代码更加容易理解,而且运行速度也快了几百倍。
class FullConnectedLayer(object):
def __init__(self, input_size, output_size,
activator):
'''
构造函数
input_size: 本层输入向量的维度
output_size: 本层输出向量的维度
activator: 激活函数
'''
self.input_size = input_size
self.output_size = output_size
self.activator = activator
# 权重数组W
self.W = np.random.uniform(-0.1, 0.1, (output_size, input_size))
# 偏置项b
self.b = np.zeros((output_size, 1))
# 输出向量
self.output = np.zeros((output_size, 1))
def forward(self, input_array):
'''
前向计算
input_array: 输入向量,维度必须等于input_size
'''
# 式2
self.input = input_array
self.output = self.activator.forward(
np.dot(self.W, input_array) + self.b)
def backward(self, delta_array):
'''
反向计算W和b的梯度
delta_array: 从上一层传递过来的误差项
'''
# 式8
self.delta = self.activator.backward(self.input) * np.dot(
self.W.T, delta_array)
self.W_grad = np.dot(delta_array, self.input.T)
self.b_grad = delta_array
def update(self, learning_rate):
'''
使用梯度下降算法更新权重
'''
self.W += learning_rate * self.W_grad
self.b += learning_rate * self.b_grad
def dump(self):
print('W: %s\nb:%s' % (self.W, self.b))
# 神经网络类
class Network(object):
def __init__(self, layers):
'''
构造函数
'''
self.layers = []
for i in range(len(layers) - 1):
self.layers.append(
FullConnectedLayer(
layers[i], layers[i+1],
SigmoidActivator()
)
)
def predict(self, sample):
'''
使用神经网络实现预测
sample: 输入样本
'''
output = sample
for layer in self.layers:
layer.forward(output)
output = layer.output
return output
def train(self, labels, data_set, rate, epoch):
'''
训练函数
labels: 样本标签
data_set: 输入样本
rate: 学习速率
epoch: 训练轮数
'''
for i in range(epoch):
for d in range(len(data_set)):
self.train_one_sample(labels[d],
data_set[d], rate)
def train_one_sample(self, label, sample, rate):
self.predict(sample)
self.calc_gradient(label)
self.update_weight(rate)
def calc_gradient(self, label):
delta = self.layers[-1].activator.backward(
self.layers[-1].output
) * (label - self.layers[-1].output)
for layer in self.layers[::-1]:
layer.backward(delta)
delta = layer.delta
return delta
def update_weight(self, rate):
for layer in self.layers:
layer.update(rate)
def dump(self):
for layer in self.layers:
layer.dump()
def loss(self, output, label):
return 0.5 * ((label - output) * (label - output)).sum()
def gradient_check(self, sample_feature, sample_label):
'''
梯度检查
network: 神经网络对象
sample_feature: 样本的特征
sample_label: 样本的标签
'''
# 获取网络在当前样本下每个连接的梯度
self.predict(sample_feature)
self.calc_gradient(sample_label)
# 检查梯度
epsilon = 10e-4
for fc in self.layers:
for i in range(fc.W.shape[0]):
for j in range(fc.W.shape[1]):
fc.W[i,j] += epsilon
output = self.predict(sample_feature)
err1 = self.loss(sample_label, output)
fc.W[i,j] -= 2*epsilon
output = self.predict(sample_feature)
err2 = self.loss(sample_label, output)
expect_grad = (err1 - err2) / (2 * epsilon)
fc.W[i,j] += epsilon
print('weights(%d,%d): expected - actural %.4e - %.4e' % (
i, j, expect_grad, fc.W_grad[i,j]))
from bp import train_data_set
def transpose(args):
return list(map(
lambda arg: list(map(
lambda line: np.array(line).reshape(len(line), 1)
, arg))
, args
))
class Normalizer(object):
def __init__(self):
self.mask = [
0x1, 0x2, 0x4, 0x8, 0x10, 0x20, 0x40, 0x80
]
def norm(self, number):
data = map(lambda m: 0.9 if number & m else 0.1, self.mask)
return np.array(data).reshape(8, 1)
def denorm(self, vec):
binary = map(lambda i: 1 if i > 0.5 else 0, vec[:,0])
for i in range(len(self.mask)):
binary[i] = binary[i] * self.mask[i]
return reduce(lambda x,y: x + y, binary)
def train_data_set():
normalizer = Normalizer()
data_set = []
labels = []
for i in range(0, 256):
n = normalizer.norm(i)
data_set.append(n)
labels.append(n)
return labels, data_set
def correct_ratio(network):
normalizer = Normalizer()
correct = 0.0;
for i in range(256):
if normalizer.denorm(network.predict(normalizer.norm(i))) == i:
correct += 1.0
print('correct_ratio: %.2f%%' % (correct / 256 * 100))
def test():
labels, data_set = transpose(train_data_set())
net = Network([8, 3, 8])
rate = 0.5
mini_batch = 20
epoch = 10
for i in range(epoch):
net.train(labels, data_set, rate, mini_batch)
print('after epoch %d loss: %f' % (
(i + 1),
net.loss(labels[-1], net.predict(data_set[-1]))
))
rate /= 2
correct_ratio(net)
def gradient_check():
'''
梯度检查
'''
labels, data_set = transpose(train_data_set())
net = Network([8, 3, 8])
net.gradient_check(data_set[0], labels[0])
return net