| import torch |
| import torch.nn as nn |
| import math |
|
|
| import torch |
| import torch.nn as nn |
| import math |
|
|
| class PositionalEncoding(nn.Module): |
| def __init__(self, d_model, max_len=5000): |
| super().__init__() |
| pe = torch.zeros(max_len, d_model) |
| position = torch.arange(0, max_len).unsqueeze(1).float() |
| div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) |
| pe[:, 0::2] = torch.sin(position * div_term) |
| pe[:, 1::2] = torch.cos(position * div_term) |
| pe = pe.unsqueeze(0) |
| self.register_buffer('pe', pe) |
|
|
| def forward(self, x): |
| return x + self.pe[:, :x.size(1)] |
|
|
| class MultiHeadSelfAttention(nn.Module): |
| def __init__(self, embed_dim, num_heads): |
| super().__init__() |
| self.attn = nn.MultiheadAttention(embed_dim, num_heads, batch_first=True) |
|
|
| def forward(self, x): |
| attn_output, _ = self.attn(x, x, x) |
| return attn_output |
|
|
| class FeedForward(nn.Module): |
| def __init__(self, embed_dim, ff_dim): |
| super().__init__() |
| self.ff = nn.Sequential( |
| nn.Linear(embed_dim, ff_dim), |
| nn.ReLU(), |
| nn.Linear(ff_dim, embed_dim) |
| ) |
|
|
| def forward(self, x): |
| return self.ff(x) |
|
|
| |
| class Adapter(nn.Module): |
| def __init__(self, dim, bottleneck=32): |
| super().__init__() |
| self.down = nn.Linear(dim, bottleneck) |
| self.relu = nn.ReLU() |
| self.up = nn.Linear(bottleneck, dim) |
| def forward(self, x): |
| return x + self.up(self.relu(self.down(x))) |
|
|
| class TransformerBlock(nn.Module): |
| def __init__(self, embed_dim, num_heads, ff_dim, |
| long_term_adapter_dim=None, session_adapter_dim=None): |
| super().__init__() |
| self.attn = MultiHeadSelfAttention(embed_dim, num_heads) |
| self.norm1 = nn.LayerNorm(embed_dim) |
| self.ff = FeedForward(embed_dim, ff_dim) |
| self.norm2 = nn.LayerNorm(embed_dim) |
| |
| self.long_term_adapter = Adapter(embed_dim, long_term_adapter_dim) if long_term_adapter_dim else None |
| self.session_adapter = Adapter(embed_dim, session_adapter_dim) if session_adapter_dim else None |
|
|
| def forward(self, x): |
| x = self.norm1(x + self.attn(x)) |
| x = self.norm2(x + self.ff(x)) |
| |
| if self.long_term_adapter is not None: |
| x = self.long_term_adapter(x) |
| if self.session_adapter is not None: |
| x = self.session_adapter(x) |
| return x |
|
|
|
|
| class Microformer(nn.Module): |
| def __init__(self, vocab_size, embed_dim, num_heads, ff_dim, num_layers, max_seq_len, |
| long_term_adapter_dim=None, session_adapter_dim=None): |
| super().__init__() |
| self.embedding = nn.Embedding(vocab_size, embed_dim) |
| self.positional_encoding = PositionalEncoding(embed_dim, max_seq_len) |
| self.layers = nn.ModuleList([ |
| TransformerBlock( |
| embed_dim, num_heads, ff_dim, |
| long_term_adapter_dim=long_term_adapter_dim, |
| session_adapter_dim=session_adapter_dim |
| ) |
| for _ in range(num_layers) |
| ]) |
| self.output = nn.Linear(embed_dim, vocab_size) |
|
|
| def forward(self, x): |
| x = self.embedding(x) |
| x = self.positional_encoding(x) |
| for layer in self.layers: |
| x = layer(x) |
| return self.output(x) |
|
|
| def freeze_except_adapters(self, session_only=True, include_output=True): |
| for param in self.parameters(): |
| param.requires_grad = False |
| for layer in self.layers: |
| if getattr(layer, 'session_adapter', None) is not None: |
| for param in layer.session_adapter.parameters(): |
| param.requires_grad = True |
| if not session_only and getattr(layer, 'long_term_adapter', None) is not None: |
| for param in layer.long_term_adapter.parameters(): |
| param.requires_grad = True |
| if include_output: |
| for param in self.output.parameters(): |
| param.requires_grad = True |
|
|
|
|