-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmodel.py
158 lines (122 loc) · 5.57 KB
/
model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms.functional as TF
class DoubleConv(nn.Module):
def __init__(self, in_channels, out_channels):
super(DoubleConv, self).__init__()
self.conv = nn.Sequential(
nn.Conv2d(in_channels, out_channels, 3, 1, 1, bias=False),
nn.BatchNorm2d(out_channels),
nn.ReLU(inplace=True),
nn.Conv2d(out_channels, out_channels, 3, 1, 1, bias=False),
nn.BatchNorm2d(out_channels),
nn.ReLU(inplace=True),
)
def forward(self, x):
return self.conv(x)
class SparseAutoencoder(nn.Module):
def __init__(self, in_channels, sparsity_lambda=1e-4, sparsity_target=0.05):
super().__init__()
self.sparsity_lambda = sparsity_lambda
self.sparsity_target = sparsity_target
# Modified encoder/decoder with activation controls
self.encoder = nn.Sequential(
nn.Conv2d(in_channels, in_channels, 1),
nn.ReLU(),
nn.Conv2d(in_channels, in_channels, 1),
nn.Sigmoid() # Add sigmoid to bound values between 0 and 1
)
self.decoder = nn.Sequential(
nn.Conv2d(in_channels, in_channels, 1),
nn.ReLU(),
nn.Conv2d(in_channels, in_channels, 1),
nn.Sigmoid() # Add sigmoid for bounded output
)
# Initialize weights with smaller values
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.xavier_uniform_(m.weight, gain=0.1)
if m.bias is not None:
nn.init.zeros_(m.bias)
def forward(self, x):
encoded = self.encoder(x)
decoded = self.decoder(encoded)
sparsity_loss = self.sparsity_penalty(encoded)
return decoded, sparsity_loss
def sparsity_penalty(self, encoded):
# Calculate mean activation across all dimensions except channels
rho_hat = torch.mean(encoded, dim=[0, 2, 3])
rho = torch.tensor(self.sparsity_target, device=encoded.device)
# More aggressive clamping for numerical stability
epsilon = 1e-7
rho_hat = torch.clamp(rho_hat, min=epsilon, max=1-epsilon)
# Modified KL divergence calculation with better numerical stability
kl_div = rho * torch.log((rho + epsilon)/(rho_hat + epsilon)) + \
(1-rho) * torch.log((1-rho + epsilon)/(1-rho_hat + epsilon))
# Clamp the loss to prevent extremely large values
kl_div = torch.clamp(kl_div, max=100.0)
return self.sparsity_lambda * torch.mean(kl_div)
class UNET(nn.Module):
def __init__(self, in_channels=3, out_channels=1, features=[64, 128, 256, 512], sparsity_target=0.05, beta=1e-3):
super(UNET, self).__init__()
self.ups = nn.ModuleList()
self.downs = nn.ModuleList()
self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
self.sparse_autoencoders = nn.ModuleList()
self.beta = beta
# Down part of UNET
for feature in features:
self.downs.append(DoubleConv(in_channels, feature))
self.sparse_autoencoders.append(SparseAutoencoder(feature, sparsity_target=sparsity_target))
in_channels = feature
# Up part of UNET
for feature in reversed(features):
self.ups.append(
nn.ConvTranspose2d(
feature * 2, feature, kernel_size=2, stride=2,
)
)
self.ups.append(DoubleConv(feature * 3, feature)) # Modified to account for concatenated sparse output
self.bottleneck = DoubleConv(features[-1], features[-1] * 2)
self.final_conv = nn.Conv2d(features[0], out_channels, kernel_size=1)
def forward(self, x):
skip_connections = []
sparse_outputs = []
total_sparsity_loss = 0
# Down path
for down, autoencoder in zip(self.downs, self.sparse_autoencoders):
x = down(x)
skip_connections.append(x)
# Apply sparse autoencoder
sparse_out, sparsity_loss = autoencoder(x)
sparse_outputs.append(sparse_out)
total_sparsity_loss += sparsity_loss
x = self.pool(x)
x = self.bottleneck(x)
# Reverse lists for up path
skip_connections = skip_connections[::-1]
sparse_outputs = sparse_outputs[::-1]
# Up path
for idx in range(0, len(self.ups), 2):
x = self.ups[idx](x)
skip_connection = skip_connections[idx // 2]
sparse_output = sparse_outputs[idx // 2]
if x.shape != skip_connection.shape:
x = TF.resize(x, size=skip_connection.shape[2:])
sparse_output = TF.resize(sparse_output, size=skip_connection.shape[2:])
# Concatenate skip connection and sparse output
concat_skip = torch.cat((skip_connection, x, sparse_output), dim=1)
x = self.ups[idx + 1](concat_skip)
return self.final_conv(x), self.beta * total_sparsity_loss
def test():
x = torch.randn((3, 1, 161, 161))
model = UNET(in_channels=1, out_channels=1)
preds, sparsity_loss = model(x)
print("Input shape:", x.shape)
print("Output shape:", preds.shape)
print("Sparsity loss:", sparsity_loss.item())
assert preds.shape == x.shape, f"Shape mismatch: input {x.shape} vs output {preds.shape}"
print("Test passed successfully!")
if __name__ == "__main__":
test()