- 多模態大模型:算法、應用與微調
- 劉兆峰
- 3632字
- 2024-09-11 17:37:25
1.2.6 實戰:日期轉換
Transformer模型的完整架構如圖1-27所示。從功能角度來看,編碼器的核心作用是從輸入序列中提取特征,解碼器的核心作用則是處理生成任務。從結構上看,編碼器=嵌入層+位置編碼+N×[(多頭自注意力+殘差連接+標準化)+(前饋神經網絡+殘差連接+標準化)],解碼器=嵌入層+位置編碼+N×[(帶掩碼的多頭自注意力+殘差連接+標準化)+(多頭注意力+殘差連接+標準化)+(前饋神經網絡+殘差連接+標準化)]。本節我們將通過Transformer模型再實現一次日期轉換的功能。

圖1-27 Transformer模型的完整架構
1.組件定義
首先定義多頭注意力層。
python
class MultiHeadAttention(nn.Module):
def __init__(self, n_head, model_dim, drop_rate):
super().__init__()
# 每個注意力頭的維度
self.head_dim=model_dim//n_head
# 注意力頭的數量
self.n_head=n_head
# 模型的維度
self.model_dim=model_dim
# 初始化線性變換層,用于生成query、key和value
self.wq=nn.Linear(model_dim, n_head * self.head_dim)
self.wk=nn.Linear(model_dim, n_head * self.head_dim)
self.wv=nn.Linear(model_dim, n_head * self.head_dim)
# 輸出的全連接層
self.output_dense=nn.Linear(model_dim, model_dim)
# Dropout層,用于防止模型過擬合
self.output_drop=nn.Dropout(drop_rate)
# 層標準化,用于穩定神經網絡的訓練
self.layer_norm=nn.LayerNorm(model_dim)
self.attention=None
def forward(self, q, k, v, mask):
# 保存原始輸入q,用于后續的殘差連接
residual=q
# 分別對輸入的q、k、v做線性變換,生成query、key和value
query=self.wq(q)
key=self.wk(k)
value=self.wv(v)
# 對生成的query、key和value進行頭分割,以便進行多頭注意力計算
query=self.split_heads(query)
key=self.split_heads(key)
value=self.split_heads(value)
# 計算上下文向量
context=self.scaled_dot_product_attention(query, key, value, mask)
# 對上下文向量進行線性變換
output=self.output_dense(context)
# 添加dropout
output=self.output_drop(output)
# 添加殘差連接并進行層標準化
output=self.layer_norm(residual+output)
return output
def split_heads(self, x):
# 將輸入x的形狀(shape)變為(n, step, n_head, head_dim),然后重排,得到(n, n_head, step, head_dim)
x=th.reshape(x, (x.shape[0], x.shape[1], self.n_head, self.head_dim))
return x.permute(0, 2, 1, 3)
def scaled_dot_product_attention(self, q, k, v, mask=None):
# 計算縮放因子
dk=th.tensor(k.shape[-1]).type(th.float)
# 計算注意力分數
score=th.matmul(q, k.permute(0, 1, 3, 2))/(th.sqrt(dk)+1e-8)
if mask is not None:
# 如果提供了mask,則將mask位置的分數設置為負無窮,使得這些位置的softmax值接近0
score=score.masked_fill_(mask,-np.inf)
# 應用softmax函數計算得到注意力權重
self.attention=softmax(score,dim=-1)
# 計算上下文向量
context=th.matmul(self.attention,v)
# 重排上下文向量的維度并進行維度合并
context=context.permute(0, 2, 1, 3)
context=context.reshape((context.shape[0], context.shape[1],-1))
return context
重點介紹一下scaled_dot_product_attention函數,此函數實現了“縮放點積注意力機制”的注意力計算過程。這是Transformer模型中的核心部分。以下是函數執行的主要步驟。
1)計算縮放因子。函數計算了縮放因子dk,它等于k(對應key)的最后一個維度。這個縮放因子用于在計算注意力分數時,緩解可能因維度較高而導致的點積梯度消失或梯度爆炸問題。
2)計算注意力分數。函數計算了注意力分數score。注意力分數是通過對q(對應query)和k(對應key)進行點積運算并除以縮放因子dk的平方根來計算的。
3)應用mask。如果提供了mask,那么函數將在計算softmax值之前將mask位置的分數設置為負無窮。這將使得這些位置的softmax值接近0,也就是說,模型不會關注這些位置。
4)計算注意力權重。函數通過對score應用softmax函數,計算得到注意力權重self.attention。
5)計算上下文向量。使用注意力權重和v(對應value)進行矩陣乘法,計算出上下文向量context。
6)重排和合并維度。函數通過重排和合并維度,得到了最終的上下文向量。這個上下文向量將被用作多頭注意力機制的輸出。
注意:這個過程被多次應用于多頭注意力機制中,每個頭都會有自己的query、key和value,它們通過不同的線性變換得到,然后用于計算各自的注意力權重和上下文向量。
然后,我們定義注意力計算后的前饋神經網絡,它在每個Transformer模型的編碼器和解碼器的層中使用,并且獨立地應用于每個位置的輸入。這個網絡包含兩個線性變換層,其中間隔一個ReLU激活函數,并且在輸出之前使用了dropout和層標準化。
python
class PositionWiseFFN(nn.Module):
def __init__(self, model_dim, dropout=0.0):
super().__init__()
# 前饋神經網絡的隱藏層維度,設為模型維度的4倍
ffn_dim=model_dim * 4
# 第一個線性變換層,其輸出維度為前饋神經網絡的隱藏層維度
self.linear1=nn.Linear(model_dim, ffn_dim)
# 第二個線性變換層,其輸出維度為模型的維度
self.linear2=nn.Linear(ffn_dim, model_dim)
# Dropout層,用于防止模型過擬合
self.dropout=nn.Dropout(dropout)
# 層標準化,用于穩定神經網絡的訓練
self.layer_norm=nn.LayerNorm(model_dim)
def forward(self, x):
# 對輸入x進行前饋神經網絡的計算
# 首先,通過第一個線性變換層并使用ReLU作為激活函數
output=relu(self.linear1(x))
# 然后,通過第二個線性變換層
output=self.linear2(output)
# 接著,對上述輸出進行dropout操作
output=self.dropout(output)
# 最后,對輸入x和前饋神經網絡的輸出做殘差連接,然后進行層標準化
output=self.layer_norm(x+output)
return output # 返回結果,其形狀為[n, step, dim]
2.實現編碼器與解碼器
之后我們定義Transformer模型的編碼器。
python
class EncoderLayer(nn.Module):
def __init__(self, n_head, emb_dim, drop_rate):
super().__init__()
# 多頭注意力機制層
self.mha=MultiHeadAttention(n_head, emb_dim, drop_rate)
# 前饋神經網絡層
self.ffn=PositionWiseFFN(emb_dim, drop_rate)
def forward(self, xz, mask):
# xz的形狀為 [n, step, emb_dim]
# 通過多頭注意力機制層處理xz,得到context,其形狀也為 [n, step, emb_dim]
context=self.mha(xz, xz, xz, mask)
# 將context傳入前饋神經網絡層,得到輸出
output=self.ffn(context)
return output
class Encoder(nn.Module):
def __init__(self, n_head, emb_dim, drop_rate, n_layer):
super().__init__()
# 定義n_layer個EncoderLayer,保存在ModuleList中
self.encoder_layers=nn.ModuleList(
[EncoderLayer(n_head, emb_dim, drop_rate) for _ in range(n_layer)]
)
def forward(self, xz, mask):
# 依次通過所有的EncoderLayer
for encoder in self.encoder_layers:
xz=encoder(xz, mask)
return xz # 返回的xz形狀為 [n, step, emb_dim]
再定義Transformer模型的解碼器。
python
class DecoderLayer(nn.Module):
def __init__(self, n_head, model_dim, drop_rate):
super().__init__()
# 定義兩個多頭注意力機制層
self.mha=nn.ModuleList([MultiHeadAttention(n_head, model_dim, drop_rate) for _ in range(2)])
# 定義一個前饋神經網絡層
self.ffn=PositionWiseFFN(model_dim, drop_rate)
def forward(self, yz, xz, yz_look_ahead_mask, xz_pad_mask):
# 執行第一個注意力層的計算,3個輸入均為yz,使用自注意力機制
dec_output=self.mha[0](yz, yz, yz, yz_look_ahead_mask) # [n, step, model_dim]
# 執行第二個注意力層的計算,其中Q來自前一個注意力層的輸出,K和V來自編碼器的輸出
dec_output=self.mha[1](dec_output, xz, xz, xz_pad_mask) # [n, step, model_dim]
# 通過前饋神經網絡層
dec_output=self.ffn(dec_output) # [n, step, model_dim]
return dec_output
class Decoder(nn.Module):
def __init__(self, n_head, model_dim, drop_rate, n_layer):
super().__init__()
# 定義n_layer個DecoderLayer,保存在ModuleList中
self.num_layers=n_layer
self.decoder_layers=nn.ModuleList(
[DecoderLayer(n_head, model_dim, drop_rate) for _ in range(n_layer)]
)
def forward(self, yz, xz, yz_look_ahead_mask, xz_pad_mask):
# 依次通過所有的DecoderLayer
for decoder in self.decoder_layers:
yz=decoder(yz, xz, yz_look_ahead_mask, xz_pad_mask)
return yz # 返回的yz形狀為 [n, step, model_dim]
重點介紹一下解碼器的兩個注意力層。在解碼器的前向傳播過程中,輸入的yz首先會傳入第一個多頭注意力機制層中。這個注意力層是一個自注意力機制層,也就是說其query、key和value都來自yz。并且,這個注意力層使用了一個look ahead mask方法,使得在計算注意力分數時,每個位置只能關注它之前的位置,而不能關注它之后的位置。這是因為在預測時,模型只能看到已經預測出的詞,不能看到還未預測出的詞。這個注意力層的輸出dec_output也是一個序列,其形狀為[n, step, model_dim]。
然后,dec_output和編碼器的輸出xz一起傳入第二個多頭注意力機制層中。這個注意力層的query來自dec_output,而key和value來自xz。并且,這個注意力層使用了一個padding mask方法,使得在計算注意力分數時,模型不會關注xz中的padding位置。這個注意力層的輸出dec_output同樣是一個序列,其形狀為[n, step, model_dim]。
在解碼器中,每個位置的輸出不僅取決于當前位置的輸入,還取決于前面位置的輸入和編碼器所有位置的輸出。這意味著,解碼器可以捕捉到輸入和輸出之間的復雜依賴關系。
3.組裝Transformer
在處理輸入時,還需要定義位置編碼層,用于處理序列數據。這個層的作用是將序列中每個位置的詞編碼為一個固定大小的向量,這個向量包含了詞的信息和它在序列中的位置信息。
python
class PositionEmbedding(nn.Module):
def __init__(self, max_len, emb_dim, n_vocab):
super().__init__()
# 生成位置編碼矩陣
pos=np.expand_dims(np.arange(max_len), 1) # [max_len, 1]
# 使用正弦和余弦函數生成位置編碼
pe=pos/np.power(1000, 2*np.expand_dims(np.arange(emb_dim)//2, 0)/emb_dim)
pe[:, 0::2]=np.sin(pe[:, 0::2])
pe[:, 1::2]=np.cos(pe[:, 1::2])
pe=np.expand_dims(pe, 0) # [1, max_len, emb_dim]
self.pe=th.from_numpy(pe).type(th.float32)
# 定義詞嵌入層
self.embeddings=nn.Embedding(n_vocab, emb_dim)
# 初始化詞嵌入層的權重
self.embeddings.weight.data.normal_(0, 0.1)
def forward(self, x):
# 確保位置編碼在與詞嵌入權重相同的設備上
device=self.embeddings.weight.device
self.pe=self.pe.to(device)
# 計算輸入的詞嵌入權重,并加上位置編碼
x_embed=self.embeddings(x)+self.pe # [n, step, emb_dim]
return x_embed # [n, step, emb_dim]
最后,我們將其組裝成Transformer。
python
class Transformer(nn.Module):
def __init__(self, n_vocab, max_len, n_layer=6, emb_dim=512, n_head=8, drop_rate=0.1, padding_idx=0):
super().__init__()
# 初始化最大長度、填充索引、詞匯表大小
self.max_len=max_len
self.padding_idx=th.tensor(padding_idx)
self.dec_v_emb=n_vocab
# 初始化位置嵌入、編碼器、解碼器和輸出層
self.embed=PositionEmbedding(max_len, emb_dim, n_vocab)
self.encoder=Encoder(n_head, emb_dim, drop_rate, n_layer)
self.decoder=Decoder(n_head, emb_dim, drop_rate, n_layer)
self.output=nn.Linear(emb_dim, n_vocab)
# 初始化優化器
self.opt=th.optim.Adam(self.parameters(), lr=0.002)
def forward(self, x, y):
# 對輸入和目標進行嵌入
x_embed, y_embed=self.embed(x), self.embed(y)
# 創建填充掩碼
pad_mask=self._pad_mask(x)
# 對輸入進行編碼
encoded_z=self.encoder(x_embed, pad_mask)
# 創建前瞻掩碼
yz_look_ahead_mask=self._look_ahead_mask(y)
# 將編碼后的輸入和前瞻掩碼傳入解碼器
decoded_z=self.decoder(
y_embed, encoded_z, yz_look_ahead_mask, pad_mask)
# 通過輸出層得到最終輸出
output=self.output(decoded_z)
return output
def step(self, x, y):
# 清空梯度
self.opt.zero_grad()
# 計算輸出和損失
logits=self(x, y[:, :-1])
loss=cross_entropy(logits.reshape(-1, self.dec_v_emb), y[:, 1:].reshape(-1))
# 進行反向傳播
loss.backward()
# 更新參數
self.opt.step()
return loss.cpu().data.numpy(), logits
def _pad_bool(self, seqs):
# 創建掩碼,標記哪些位置是填充的
return th.eq(seqs, self.padding_idx)
def _pad_mask(self, seqs):
# 將填充掩碼擴展到合適的維度
len_q=seqs.size(1)
mask=self._pad_bool(seqs).unsqueeze(1).expand(-1, len_q,-1)
return mask.unsqueeze(1)
def _look_ahead_mask(self, seqs):
# 創建前瞻掩碼,防止在生成序列時看到未來位置的信息
device=next(self.parameters()).device
_, seq_len=seqs.shape
mask=th.triu(th.ones((seq_len, seq_len), dtype=th.long),
diagonal=1).to(device)
mask=th.where(self._pad_bool(seqs)[:, None, None, :], 1, mask[None, None, :, :]).to(device)
return mask>0
在處理序列數據(如文本或時間序列數據)時,通常會遇到一個問題,即序列的長度不一致。在大多數深度學習框架中,我們需要將一個批量的數據整理成相同的形狀才能進行計算。因此,我們需要一種方法來處理長度不一的序列,這就是“填充”(padding)的用處。通過填充,我們可以將不同長度的序列轉變為相同長度,具體來說,我們會找到批量中最長的序列,然后將其他較短的序列通過添加特殊的“填充值”(如0或特殊的標記)來擴展到相同的長度。
填充之后,我們就可以將序列數據整理成相同的形狀,這樣就可以用來訓練模型了。然而,填充值是沒有實際意義的,我們不希望它們對模型的訓練造成影響。因此,我們通常會創建一個掩碼(mask),用來告訴模型哪些位置是填充值,也就是Transformer模型定義中的_pad_mask和_look_ahead_mask函數。它們會返回一個布爾值矩陣,標記輸入中哪些位置是填充值。
python
def pad_zero(seqs, max_len):
# 初始化一個全是填充標識符PAD_token的二維矩陣,大小為(len(seqs), max_len)
padded=np.full((len(seqs), max_len), fill_value=PAD_token, dtype=np.int32)
for i, seq in enumerate(seqs):
# 將seqs中的每個seq序列的元素填入padded對應的行中,未填滿的部分仍為PAD_token
padded[i, :len(seq)]=seq
return padded
4.訓練與評估
接下來就可以開始訓練了。
python
# 初始化一個Transformer模型,設置詞匯表大小、最大序列長度、層數、嵌入維度、多頭注意力的頭數、dropout比率和填充標記的索引
model=Transformer(n_vocab=dataset.num_word, max_len=MAX_LENGTH, n_layer=3, emb_dim=32, n_head=8, drop_rate=0.1, padding_idx=0)
# 檢測是否有可用的GPU,如果有,則使用GPU進行計算;如果沒有,則使用CPU
device=th.device("cuda" if th.cuda.is_available() else "cpu")
# 將模型移動到相應的設備(CPU或GPU)
model=model.to(device)
# 創建一個數據集,包含1000個樣本
dataset=DateDataset(1000)
# 創建一個數據加載器,設定批量大小為32,每個批量的數據會被打亂
dataloader=DataLoader(dataset, batch_size=32, shuffle=True)
# 執行10個訓練周期
for i in range(10):
# 對于數據加載器中的每批數據,對輸入和目標張量進行零填充,使其長度達到最大,然后將其轉換為PyTorch張量,并移動到相應的設備(CPU或GPU)
for input_tensor, target_tensor, _ in dataloader:
input_tensor=th.from_numpy(
pad_zero(input_tensor, max_len=MAX_LENGTH)).long().to(device)
target_tensor=th.from_numpy(
pad_zero(target_tensor, MAX_LENGTH+1)).long().to(device)
# 使用模型的step方法進行一步訓練,并獲取損失值
loss, _=model.step(input_tensor, target_tensor)
# 打印每個訓練周期后的損失值
print(f"epoch: {i+1}, \tloss: {loss}")
類似于Seq2Seq結構模型的日期轉換,我們可以定義一個評估方法,查看Transformer模型能否正確地進行日期轉換。
python
def evaluate(model, x, y):
model.eval()
x=th.from_numpy(pad_zero([x], max_len=MAX_LENGTH)).long().to(device)
y=th.from_numpy(pad_zero([y], max_len=MAX_LENGTH)).long().to(device)
decoder_outputs=model(x, y)
_, topi=decoder_outputs.topk(1)
decoded_ids=topi.squeeze()
decoded_words=[]
for idx in decoded_ids:
decoded_words.append(dataset.index2word[idx.item()])
return ''.join(decoded_words)
最終模型的輸出如圖1-28所示。

圖1-28 Transformer模型的日期轉換輸出示例