RoBERTa 的源码 [LLM]
1719 words
9 minutes
RoBERTa 的源码 [LLM]
Image API Error
RoBERTa (Robustly optimized BERT approach) 是对 BERT 的一种改进。作者认为原版 BERT 存在训练不足的问题,在不改变模型架构的基础上针对预训练过程做出了改进:移除了 SNP 任务,改变 BERT 的 MLM 方法,对训练数据使用动态掩码策略。
模型主要定义在 fairseq/models/roberta/model.py 文件中,核心类和相关函数如下:
fairseq/models/roberta/model.py ├── 导入依赖 ├── RobertaEncoder 类 ├── RobertaModel 类 ├── RobertaClassificationHead 类 └── 注册模型架构
1 MLM 的重建#
RobertaLMHead 用于 token 级预训练任务: RoBERTa 将 BERT 的随机掩码更改为动态掩码,通过一个两层的 FFN 预测被掩码的 token:
1class RobertaLMHead(nn.Module):2 """Head for masked language modeling."""3
4 def __init__(self, embed_dim, output_dim, activation_fn, weight=None):5 super().__init__()6 # 全连接层7 self.dense = nn.Linear(embed_dim, embed_dim)8 # 获取参数中配置的激活函数9 self.activation_fn = utils.get_activation_fn(activation_fn)10 # 层归一化11 self.layer_norm = LayerNorm(embed_dim)12
13 if weight is None:14 weight = nn.Linear(embed_dim, output_dim, bias=False).weight15 self.weight = weight16 self.bias = nn.Parameter(torch.zeros(output_dim))17
18 def forward(self, features, masked_tokens=None, **kwargs):19 # Only project the masked tokens while training,20 # saves both memory and computation21 # 只对掩码位置进行计算22 if masked_tokens is not None:23 features = features[masked_tokens, :]24
25 x = self.dense(features)26 x = self.activation_fn(x)27 x = self.layer_norm(x)28 # project back to size of vocabulary with bias29 x = F.linear(x, self.weight) + self.bias30 return x2 分类头的实现#
RobertaClassificationHead 用于序列级分类任务,使用 [CLS] 向量进行分类:
1class RobertaClassificationHead(nn.Module):2 """Head for sentence-level classification tasks."""3
4 def __init__(5 self,6 input_dim,7 inner_dim,8 num_classes,9 activation_fn,10 pooler_dropout,11 q_noise=0, # 量化噪声比例12 qn_block_size=8, # 量化块大小13 do_spectral_norm=False, # 谱归一化针对的是矩阵的范数14 ):15 super().__init__()16 self.dense = nn.Linear(input_dim, inner_dim)17 self.activation_fn = utils.get_activation_fn(activation_fn)18 self.dropout = nn.Dropout(p=pooler_dropout)19 # 向前传播时向权重注入模拟量化噪声,让模型在训练时就适应低精度推理20 self.out_proj = apply_quant_noise_(21 nn.Linear(inner_dim, num_classes), q_noise, qn_block_size22 )23
24 def forward(self, features, **kwargs):25 # x 的形状是 [batch_size, seq_len, hidden_dim],取每个 seq 的首个元素即 [CLS]26 x = features[:, 0, :] # take <s> token (equiv. to [CLS])27 x = self.dropout(x)28 x = self.dense(x)29 x = self.activation_fn(x)30 x = self.dropout(x)31 x = self.out_proj(x)32 return x3 encoder 的实现#
1class RobertaEncoder(FairseqEncoder):2 """RoBERTa encoder."""3
4 # 调用了父类 FairseqEncoder 的构造函数,并把词表传进去5 def __init__(self, args, dictionary):6 super().__init__(dictionary)7
8 # 从架构模板中补充未指定的参数9 base_architecture(args)10 self.args = args11
12 # 设定只保留 encoder 的哪几层,用于剪枝或其他实验13 if args.encoder_layers_to_keep:14 args.encoder_layers = len(args.encoder_layers_to_keep.split(","))15
16 # 定义嵌入层17 embed_tokens = self.build_embedding(18 len(dictionary), args.encoder_embed_dim, dictionary.pad()19 )20
21 # 创建 encoder22 self.sentence_encoder = self.build_encoder(args, dictionary, embed_tokens)23
24 # 创建 lm 头25 self.lm_head = self.build_lm_head(26 embed_dim=args.encoder_embed_dim,27 output_dim=len(dictionary),28 activation_fn=args.activation_fn,29 weight=(30 self.sentence_encoder.embed_tokens.weight31 if not args.untie_weights_roberta32 else None33 ),34 )35
36 def build_embedding(self, vocab_size, embedding_dim, padding_idx):37 return nn.Embedding(vocab_size, embedding_dim, padding_idx)38
39 # 使用 TransformerEncoder 原版 encoder40 def build_encoder(self, args, dictionary, embed_tokens):41 encoder = TransformerEncoder(args, dictionary, embed_tokens)42 encoder.apply(init_bert_params)43 return encoder44
45 def build_lm_head(self, embed_dim, output_dim, activation_fn, weight):46 return RobertaLMHead(embed_dim, output_dim, activation_fn, weight)47
48 def forward(49 self,50 src_tokens,51 features_only=False, # 是否只返回隐藏状态(不经过 LM Head)52 return_all_hiddens=False,53 masked_tokens=None, # 哪些位置被掩码(用于 MLM 训练时的高效计算)54 **unused,55 ):56 # 特征提取57 x, extra = self.extract_features(58 src_tokens, return_all_hiddens=return_all_hiddens59 )60 if not features_only:61 # 通过 RobertaLMHead 输出62 x = self.output_layer(x, masked_tokens=masked_tokens)63 return x, extra64
65 def extract_features(self, src_tokens, return_all_hiddens=False, **kwargs):66 # 输出维度是 [TBC]67 encoder_out = self.sentence_encoder(68 src_tokens,69 return_all_hiddens=return_all_hiddens,70 token_embeddings=kwargs.get("token_embeddings", None),71 )72 # 将 Fairseq 内部的 T x B x C 格式转变为 B x T x C73 # T x B x C -> B x T x C74 features = encoder_out["encoder_out"][0].transpose(0, 1)75 inner_states = encoder_out["encoder_states"] if return_all_hiddens else None76 return features, {"inner_states": inner_states}77
78 def output_layer(self, features, masked_tokens=None, **unused):79 return self.lm_head(features, masked_tokens)80
81 def max_positions(self):82 """Maximum output length supported by the encoder."""83 return self.args.max_positions4 模型定义#
1class RobertaModel(FairseqEncoderModel):2 def __init__(self, args, encoder):3 super().__init__(encoder)4 self.args = args5
6 # We follow BERT's random weight initialization7 self.apply(init_bert_params)8 # 动态注册下游任务的分类头9 self.classification_heads = nn.ModuleDict()10
11 def build_model(cls, args, task):12 """Build a new model instance."""13
14 # make sure all arguments are present15 base_architecture(args)16
17 encoder = RobertaEncoder(args, task.source_dictionary)18
19 return cls(args, encoder)20
21 def forward(22 self,23 src_tokens,24 features_only=False,25 return_all_hiddens=False,26 classification_head_name=None,27 **kwargs,28 ):29 if classification_head_name is not None:30 features_only = True31
32 x, extra = self.encoder(src_tokens, features_only, return_all_hiddens, **kwargs)33
34 # 从 ModuleDict 中取出对应的分类头(RobertaClassificationHead),应用到编码器输出上35 if classification_head_name is not None:36 x = self.classification_heads[classification_head_name](x)37 return x, extra5 动态掩码的实现#
fairseq/data/mask_tokens_dataset.py 文件下:
1class MaskTokensDataset(BaseWrapperDataset):2 def __init__(3 self,4 # 封装 torch 的 Dataset5 dataset: torch.utils.data.Dataset,6 # 默认 15% 的概率7 mask_prob: float = 0.15,8 mask_multiple_length = 1,9 ...10 ):11
12 # 掩码概率必须在 0,1之间13 assert 0.0 < mask_prob < 1.014 assert 0.0 <= random_token_prob <= 1.015 assert 0.0 <= leave_unmasked_prob <= 1.016 # random_token_prob + leave_unmasked_prob 不能超过 117 # 真正替换成 [MASK] 的概率 = 1 - random_token_prob - leave_unmasked_prob18 assert random_token_prob + leave_unmasked_prob <= 1.019 assert mask_multiple_length >= 120 assert mask_stdev >= 0.021
22 if random_token_prob > 0.0:23 if freq_weighted_replacement:24 # 高频词更容易被抽到25 weights = np.array(self.vocab.count) # 按词频加权26 else:27 # 否则均匀分布28 weights = np.ones(len(self.vocab))29 weights[: self.vocab.nspecial] = 0 # 特殊 token 权重为 0,特殊 token 永远不会被抽到30 self.weights = weights / weights.sum() # 归一化31
32 def __getitem__(self, index: int):33 return self.__getitem_cached__(self.seed, self.epoch, index)34
35 def __getitem_cached__(self, seed: int, epoch: int, index: int):36 # 用 (seed, epoch, index) 生成唯一随机种子,用于动态掩码37 seed = int(hash((seed, epoch, index)) % 1e6)38 # 随机数生成器39 rng = np.random.default_rng(seed)40 item = self.dataset[index]41 sz = len(item)42
43 # decide elements to mask44 # 创建填充相同值的数组45 mask = np.full(sz, False)46 num_mask = int(47 # add a random number for probabilistic rounding48 # 计算有多少个 token 需要掩码49 self.mask_prob * sz / float(self.mask_multiple_length)50 + rng.random()51 )52
53 # 连续片段掩码54 # 不放回选取 num_mask 个位置作为 "锚点"55 mask_idc = rng.choice(sz, num_mask, replace=False)56 # mask_stdev 是使用多重掩码时掩码分布的 std,多重掩码指每个锚点扩展为多个连续 token 的 Span Masking57 if self.mask_stdev > 0.0:58 # 每个锚点的 Span 长度从正态分布中随机采样59 lengths = rng.normal(60 self.mask_multiple_length, self.mask_stdev, size=num_mask61 )62 # 四舍五入取 ≥0 的63 lengths = [max(0, int(round(x))) for x in lengths]64 # 锚点及其对应长度65 mask_idc = np.asarray(66 [67 mask_idc[j] + offset68 for j in range(len(mask_idc))69 for offset in range(lengths[j])70 ],71 dtype=np.int64,72 )73 else:74 # 固定长度的 span 长度取 mask_multiple_length75 mask_idc = np.concatenate(76 [mask_idc + i for i in range(self.mask_multiple_length)]77 )78 # Span 可能超出序列末尾,需要裁剪79 mask_idc = mask_idc[mask_idc < len(mask)]80
81 # Target 通道的输出逻辑:self.return_masked_tokens 时只返回被掩码位置的原词,其余位置全部填充为 pad_idx82 # 一个 MaskTokensDataset 经过 apply_mask 方法后,会同时产出两个 Dataset,分别提供训练所需的输入和标签,即 source 和 target83 if self.return_masked_tokens:84 # exit early if we're just returning the masked tokens85 # (i.e., the targets for masked LM training)86 if self.mask_whole_words is not None:87 mask = np.repeat(mask, word_lens)88 new_item = np.full(len(mask), self.pad_idx)89 new_item[mask] = item[torch.from_numpy(mask.astype(np.uint8)) == 1]90 return torch.from_numpy(new_item)91
92 # 替换策略93 # 默认 rand_or_unmask_prob = 0.1 + 0.1 = 0.2,选中 token 的 20% 不变成 [MASK] 而是进一步处理94 rand_or_unmask_prob = self.random_token_prob + self.leave_unmasked_prob95 if rand_or_unmask_prob > 0.0:96 # 从已经被掩码的位置中,随机抽取 20% 做特殊处理97 rand_or_unmask = mask & (rng.random(sz) < rand_or_unmask_prob)98 # 不随机替换成其他词 90-0-1099 if self.random_token_prob == 0.0:100 unmask = rand_or_unmask101 rand_mask = None102 # 全部随机替换 80-20-0103 elif self.leave_unmasked_prob == 0.0:104 unmask = None105 rand_mask = rand_or_unmask106 # 默认的 80-10-10107 else:108 # 0.1 / 0.2 = 0.5109 # 特殊处理的词中一半恢复原词一半替换为随机词110 unmask_prob = self.leave_unmasked_prob / rand_or_unmask_prob111 decision = rng.random(sz) < unmask_prob112 unmask = rand_or_unmask & decision113 rand_mask = rand_or_unmask & (~decision)114 else:115 # 全部替换为 [MASK]116 unmask = rand_mask = None117
118 if unmask is not None:119 mask = mask ^ unmask120
121 if self.mask_whole_words is not None:122 mask = np.repeat(mask, word_lens)123
124 # 拷贝原始序列125 new_item = np.copy(item)126 # 替换成 [MASK]127 new_item[mask] = self.mask_idx128 # 替换成随机词129 if rand_mask is not None:130 num_rand = rand_mask.sum()131 if num_rand > 0:132 if self.mask_whole_words is not None:133 rand_mask = np.repeat(rand_mask, word_lens)134 num_rand = rand_mask.sum()135
136 new_item[rand_mask] = rng.choice(137 len(self.vocab),138 num_rand,139 p=self.weights,140 )141
142 return torch.from_numpy(new_item)Comments
Site Statistics
144
6
9
2,255,454
0 days
0 days ago
2026年6月
Less More
日
一
二
三
四
五
六
Table of Contents