ianpan commited on
Commit
1ab2c07
·
verified ·
1 Parent(s): 7e7b268

Upload model

Browse files
Files changed (4) hide show
  1. config.json +4 -0
  2. configuration.py +40 -0
  3. modeling.py +95 -0
  4. unet.py +243 -0
config.json CHANGED
@@ -2,6 +2,10 @@
2
  "architectures": [
3
  "PneumoniaModel"
4
  ],
 
 
 
 
5
  "backbone": "tf_efficientnetv2_s",
6
  "cls_dropout": 0.1,
7
  "cls_num_classes": 1,
 
2
  "architectures": [
3
  "PneumoniaModel"
4
  ],
5
+ "auto_map": {
6
+ "AutoConfig": "configuration.PneumoniaConfig",
7
+ "AutoModel": "modeling.PneumoniaModel"
8
+ },
9
  "backbone": "tf_efficientnetv2_s",
10
  "cls_dropout": 0.1,
11
  "cls_num_classes": 1,
configuration.py ADDED
@@ -0,0 +1,40 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from transformers import PretrainedConfig
2
+ from typing import List, Optional, Tuple
3
+
4
+
5
+ class PneumoniaConfig(PretrainedConfig):
6
+ model_type = "pneumonia"
7
+
8
+ def __init__(
9
+ self,
10
+ backbone: str = "tf_efficientnetv2_s",
11
+ feature_dim: int = 256,
12
+ seg_dropout: float = 0.1,
13
+ cls_dropout: float = 0.1,
14
+ seg_num_classes: int = 1,
15
+ cls_num_classes: int = 1,
16
+ in_chans: int = 1,
17
+ img_size: Tuple[int, int] = (512, 512), # height, width
18
+ decoder_n_blocks: int = 5,
19
+ decoder_channels: List[int] = [256, 128, 64, 32, 16],
20
+ encoder_channels: List[int] = [24, 48, 64, 160, 256],
21
+ decoder_center_block: bool = False,
22
+ decoder_norm_layer: str = "bn",
23
+ decoder_attention_type: Optional[str] = None,
24
+ **kwargs,
25
+ ):
26
+ self.backbone = backbone
27
+ self.feature_dim = feature_dim
28
+ self.seg_dropout = seg_dropout
29
+ self.cls_dropout = cls_dropout
30
+ self.seg_num_classes = seg_num_classes
31
+ self.cls_num_classes = cls_num_classes
32
+ self.in_chans = in_chans
33
+ self.img_size = img_size
34
+ self.decoder_n_blocks = decoder_n_blocks
35
+ self.decoder_channels = decoder_channels
36
+ self.encoder_channels = encoder_channels
37
+ self.decoder_center_block = decoder_center_block
38
+ self.decoder_norm_layer = decoder_norm_layer
39
+ self.decoder_attention_type = decoder_attention_type
40
+ super().__init__(**kwargs)
modeling.py ADDED
@@ -0,0 +1,95 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import albumentations as A
2
+ import torch
3
+ import torch.nn as nn
4
+
5
+ from numpy.typing import NDArray
6
+ from transformers import PreTrainedModel
7
+ from timm import create_model
8
+ from typing import Optional
9
+ from .configuration import PneumoniaConfig
10
+ from .unet import UnetDecoder, SegmentationHead
11
+
12
+ _PYDICOM_AVAILABLE = False
13
+ try:
14
+ from pydicom import dcmread
15
+ from pydicom.pixels import apply_voi_lut
16
+
17
+ _PYDICOM_AVAILABLE = True
18
+ except ModuleNotFoundError:
19
+ pass
20
+
21
+
22
+ class PneumoniaModel(PreTrainedModel):
23
+ config_class = PneumoniaConfig
24
+
25
+ def __init__(self, config):
26
+ super().__init__(config)
27
+ self.encoder = create_model(
28
+ model_name=config.backbone,
29
+ features_only=True,
30
+ pretrained=False,
31
+ in_chans=config.in_chans,
32
+ )
33
+ self.decoder = UnetDecoder(
34
+ decoder_n_blocks=config.decoder_n_blocks,
35
+ decoder_channels=config.decoder_channels,
36
+ encoder_channels=config.encoder_channels,
37
+ decoder_center_block=config.decoder_center_block,
38
+ decoder_norm_layer=config.decoder_norm_layer,
39
+ decoder_attention_type=config.decoder_attention_type,
40
+ )
41
+ self.img_size = config.img_size
42
+ self.segmentation_head = SegmentationHead(
43
+ in_channels=config.decoder_channels[-1],
44
+ out_channels=config.seg_num_classes,
45
+ size=self.img_size,
46
+ )
47
+ self.pooling = nn.AdaptiveAvgPool2d(1)
48
+ self.dropout = nn.Dropout(p=config.cls_dropout)
49
+ self.classifier = nn.Linear(config.feature_dim, config.cls_num_classes)
50
+
51
+ def normalize(self, x: torch.Tensor) -> torch.Tensor:
52
+ # [0, 255] -> [-1, 1]
53
+ mini, maxi = 0.0, 255.0
54
+ x = (x - mini) / (maxi - mini)
55
+ x = (x - 0.5) * 2.0
56
+ return x
57
+
58
+ @staticmethod
59
+ def load_image_from_dicom(path: str) -> Optional[NDArray]:
60
+ if not _PYDICOM_AVAILABLE:
61
+ print("`pydicom` is not installed, returning None ...")
62
+ return None
63
+ dicom = dcmread(path)
64
+ arr = apply_voi_lut(dicom.pixel_array, dicom)
65
+ if dicom.PhotometricInterpretation == "MONOCHROME1":
66
+ # invert image if needed
67
+ arr = arr.max() - arr
68
+
69
+ arr = arr - arr.min()
70
+ arr = arr / arr.max()
71
+ arr = (arr * 255).astype("uint8")
72
+ return arr
73
+
74
+ def preprocess(self, x: NDArray) -> NDArray:
75
+ x = A.Resize(self.img_size[0], self.img_size[1], p=1)(image=x)["image"]
76
+ return x
77
+
78
+ def forward(self, x: torch.Tensor, return_logits: bool = False) -> torch.Tensor:
79
+ x = self.normalize(x)
80
+ features = self.encoder(x)
81
+ decoder_output = self.decoder(features)
82
+ logits = self.segmentation_head(decoder_output[-1])
83
+ b, n = features[-1].shape[:2]
84
+ features = self.pooling(features[-1]).reshape(b, n)
85
+ features = self.dropout(features)
86
+ cls_logits = self.classifier(features)
87
+ out = {
88
+ "mask": logits,
89
+ "cls": cls_logits
90
+ }
91
+ if return_logits:
92
+ return out
93
+ out["mask"] = out["mask"].sigmoid()
94
+ out["cls"] = out["cls"].sigmoid()
95
+ return out
unet.py ADDED
@@ -0,0 +1,243 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import torch
2
+ import torch.nn as nn
3
+ import torch.nn.functional as F
4
+
5
+ from functools import partial
6
+ from typing import List, Optional
7
+
8
+
9
+ class Conv2dAct(nn.Sequential):
10
+ def __init__(
11
+ self,
12
+ in_channels: int,
13
+ out_channels: int,
14
+ kernel_size: int,
15
+ padding: int = 0,
16
+ stride: int = 1,
17
+ norm_layer: str = "bn",
18
+ num_groups: int = 32, # for GroupNorm,
19
+ activation: str = "ReLU",
20
+ inplace: bool = True, # for activation
21
+ ):
22
+ if norm_layer == "bn":
23
+ NormLayer = nn.BatchNorm2d
24
+ elif norm_layer == "gn":
25
+ NormLayer = partial(nn.GroupNorm, num_groups=num_groups)
26
+ else:
27
+ raise Exception(
28
+ f"`norm_layer` must be one of [`bn`, `gn`], got `{norm_layer}`"
29
+ )
30
+ super().__init__()
31
+ self.conv = nn.Conv2d(
32
+ in_channels,
33
+ out_channels,
34
+ kernel_size=kernel_size,
35
+ stride=stride,
36
+ padding=padding,
37
+ bias=False,
38
+ )
39
+ self.norm = NormLayer(out_channels)
40
+ self.act = getattr(nn, activation)(inplace=inplace)
41
+
42
+ def forward(self, x: torch.Tensor) -> torch.Tensor:
43
+ return self.act(self.norm(self.conv(x)))
44
+
45
+
46
+ class SCSEModule(nn.Module):
47
+ def __init__(
48
+ self,
49
+ in_channels: int,
50
+ reduction: int = 16,
51
+ activation: str = "ReLU",
52
+ inplace: bool = False,
53
+ ):
54
+ super().__init__()
55
+ self.cSE = nn.Sequential(
56
+ nn.AdaptiveAvgPool2d(1),
57
+ nn.Conv2d(in_channels, in_channels // reduction, 1),
58
+ getattr(nn, activation)(inplace=inplace),
59
+ nn.Conv2d(in_channels // reduction, in_channels, 1),
60
+ )
61
+ self.sSE = nn.Conv2d(in_channels, 1, 1)
62
+
63
+ def forward(self, x: torch.Tensor) -> torch.Tensor:
64
+ return x * self.cSE(x).sigmoid() + x * self.sSE(x).sigmoid()
65
+
66
+
67
+ class Attention(nn.Module):
68
+ def __init__(self, name: str, **params):
69
+ super().__init__()
70
+
71
+ if name is None:
72
+ self.attention = nn.Identity(**params)
73
+ elif name == "scse":
74
+ self.attention = SCSEModule(**params)
75
+ else:
76
+ raise ValueError("Attention {} is not implemented".format(name))
77
+
78
+ def forward(self, x: torch.Tensor) -> torch.Tensor:
79
+ return self.attention(x)
80
+
81
+
82
+ class DecoderBlock(nn.Module):
83
+ def __init__(
84
+ self,
85
+ in_channels: int,
86
+ skip_channels: int,
87
+ out_channels: int,
88
+ norm_layer: str = "bn",
89
+ activation: str = "ReLU",
90
+ attention_type: Optional[str] = None,
91
+ ):
92
+ super().__init__()
93
+ self.conv1 = Conv2dAct(
94
+ in_channels + skip_channels,
95
+ out_channels,
96
+ kernel_size=3,
97
+ padding=1,
98
+ norm_layer=norm_layer,
99
+ activation=activation,
100
+ )
101
+ self.attention1 = Attention(
102
+ attention_type, in_channels=in_channels + skip_channels
103
+ )
104
+ self.conv2 = Conv2dAct(
105
+ out_channels,
106
+ out_channels,
107
+ kernel_size=3,
108
+ padding=1,
109
+ norm_layer=norm_layer,
110
+ activation=activation,
111
+ )
112
+ self.attention2 = Attention(attention_type, in_channels=out_channels)
113
+
114
+ def forward(
115
+ self, x: torch.Tensor, skip: Optional[torch.Tensor] = None
116
+ ) -> torch.Tensor:
117
+ if skip is not None:
118
+ h, w = skip.shape[2:]
119
+ x = F.interpolate(x, size=(h, w), mode="nearest")
120
+ x = torch.cat([x, skip], dim=1)
121
+ x = self.attention1(x)
122
+ else:
123
+ x = F.interpolate(x, scale_factor=(2, 2), mode="nearest")
124
+ x = self.conv1(x)
125
+ x = self.conv2(x)
126
+ x = self.attention2(x)
127
+ return x
128
+
129
+
130
+ class CenterBlock(nn.Sequential):
131
+ def __init__(
132
+ self,
133
+ in_channels: int,
134
+ out_channels: int,
135
+ norm_layer: str = "bn",
136
+ activation: str = "ReLU",
137
+ ):
138
+ conv1 = Conv2dAct(
139
+ in_channels,
140
+ out_channels,
141
+ kernel_size=3,
142
+ padding=1,
143
+ norm_layer=norm_layer,
144
+ activation=activation,
145
+ )
146
+ conv2 = Conv2dAct(
147
+ out_channels,
148
+ out_channels,
149
+ kernel_size=3,
150
+ padding=1,
151
+ norm_layer=norm_layer,
152
+ activation=activation,
153
+ )
154
+ super().__init__(conv1, conv2)
155
+
156
+
157
+ class UnetDecoder(nn.Module):
158
+ def __init__(
159
+ self,
160
+ decoder_n_blocks: int,
161
+ decoder_channels: List[int],
162
+ encoder_channels: List[int],
163
+ decoder_center_block: bool = False,
164
+ decoder_norm_layer: str = "bn",
165
+ decoder_attention_type: Optional[str] = None,
166
+ ):
167
+ super().__init__()
168
+
169
+ self.decoder_n_blocks = decoder_n_blocks
170
+ self.decoder_channels = decoder_channels
171
+ self.encoder_channels = encoder_channels
172
+ self.decoder_center_block = decoder_center_block
173
+ self.decoder_norm_layer = decoder_norm_layer
174
+ self.decoder_attention_type = decoder_attention_type
175
+
176
+ if self.decoder_n_blocks != len(self.decoder_channels):
177
+ raise ValueError(
178
+ "Model depth is {}, but you provide `decoder_channels` for {} blocks.".format(
179
+ self.decoder_n_blocks, len(self.decoder_channels)
180
+ )
181
+ )
182
+ # reverse channels to start from head of encoder
183
+ encoder_channels = encoder_channels[::-1]
184
+
185
+ # computing blocks input and output channels
186
+ head_channels = encoder_channels[0]
187
+ in_channels = [head_channels] + list(self.decoder_channels[:-1])
188
+ skip_channels = list(encoder_channels[1:]) + [0]
189
+ out_channels = self.decoder_channels
190
+
191
+ if self.decoder_center_block:
192
+ self.center = CenterBlock(
193
+ head_channels, head_channels, norm_layer=self.decoder_norm_layer
194
+ )
195
+ else:
196
+ self.center = nn.Identity()
197
+
198
+ # combine decoder keyword arguments
199
+ kwargs = dict(
200
+ norm_layer=self.decoder_norm_layer,
201
+ attention_type=self.decoder_attention_type,
202
+ )
203
+ blocks = [
204
+ DecoderBlock(in_ch, skip_ch, out_ch, **kwargs)
205
+ for in_ch, skip_ch, out_ch in zip(in_channels, skip_channels, out_channels)
206
+ ]
207
+ self.blocks = nn.ModuleList(blocks)
208
+
209
+ def forward(self, features: List[torch.Tensor]) -> torch.Tensor:
210
+ features = features[::-1] # reverse channels to start from head of encoder
211
+
212
+ head = features[0]
213
+ skips = features[1:]
214
+
215
+ output = [self.center(head)]
216
+ for i, decoder_block in enumerate(self.blocks):
217
+ skip = skips[i] if i < len(skips) else None
218
+ output.append(decoder_block(output[-1], skip))
219
+
220
+ return output
221
+
222
+
223
+ class SegmentationHead(nn.Module):
224
+ def __init__(
225
+ self,
226
+ in_channels: int,
227
+ out_channels: int,
228
+ size: int,
229
+ kernel_size: int = 3,
230
+ dropout: float = 0.0,
231
+ ):
232
+ super().__init__()
233
+ self.drop = nn.Dropout2d(p=dropout)
234
+ self.conv = nn.Conv2d(
235
+ in_channels, out_channels, kernel_size=kernel_size, padding=kernel_size // 2
236
+ )
237
+ if isinstance(size, (tuple, list)):
238
+ self.up = nn.Upsample(size=size, mode="bilinear")
239
+ else:
240
+ self.up = nn.Identity()
241
+
242
+ def forward(self, x: torch.Tensor) -> torch.Tensor:
243
+ return self.up(self.conv(self.drop(x)))