首页
视频
资源
登录
原
Pytorch 新闻分类任务(学习笔记)
709
人阅读
2024/1/4 16:54
总访问:
2621341
评论:
0
收藏:
0
手机
分类:
pytorch
![](https://img.tnblog.net/arcimg/hb/21f086c80c5d4afda1bc1029dadd8f3a.png) >#Pytorch 新闻分类任务(学习笔记) [TOC] ## 目录结构 ![](https://img.tnblog.net/arcimg/hb/101911228e54423ab6c42ec21d341b41.png) ### models文件夹 tn2>该文件夹显示搭建的网络结构。 里面有`TextCNN.py`和`TextRNN.py`两个文件,对应的网络结构是CNN和RNN,这里我们使用RNN。 ### THUCNews文件夹 tn2>在该文件夹下面有三个文件夹:`data`、`log`和`saved_dict`。 ![](https://img.tnblog.net/arcimg/hb/969988b3533b4ce797c2a5a520e916a9.png) tn2>data主要是我们的数据。 我们本次要做的一个实验是对一个新闻的一个分类。 首先我们看一下`train.txt`训练文件数据下的内容。 ![](https://img.tnblog.net/arcimg/hb/b7c5728f0abf4b8b9a920438187f7dbb.png) tn2>里面的训练样本格式是:内容+tab+新闻分类的值。 这里一共有10个类别这10个类别记录在我们的`class.txt`文件中,训练样本一共有18w行数据。 除此之外`dev.txt`文件和`test.txt`文件分别代表的我们的验证集和测试集。 #### 那么如何让计算机理解我们的内容呢? tn2>一般我们的做法是不是将内容进行清洗(举例:去掉`(图)`这种毫无意义的东西)之后,对内功进行分词或者分字,通过对应的语量表做成对应的索引id。 ![](https://img.tnblog.net/arcimg/hb/75825c6f86a84883b9502d6cf5efd76b.png) tn2>但是这种索引计算机根本**不认识**,或者说是很死板的认识。 然而`Embedding`映射表可以解决这样的问题,它是由一些知名的大厂训练出来用于将词或字映射成向量(没有大量的数据是训练不好的)。 词向量矩阵=`batch`(文本处理大小)* `max_len`(文本长度)* `E`(映射维度) tn>关于语量表这里准备的文件是`vocab.pkl`文件,关于`embedding`准备了腾讯的和搜狗的映射表。 ## 循环递归神经网络 RNN tn2>它主要解决了一个时间类型、文本类型的数据的模型。 如下图所示: ![](https://img.tnblog.net/arcimg/hb/a350452c3fc84c1c9e34681e0efd0cf1.png) tn2>我们可以看到,它仅仅是在普通的神经网络隐藏层中对特征进行再一次的,在又一次学习中既包含了新的特征又包含了旧的特征。 学习举例:如果我们有一个t1的时间数据和t2的时间数据,如果是普通神经网络模型它会将其中都放入同一输入层,二者没有凸显出时间的关系。 当使用RNN,在隐藏层中训练了t1数据,t2进入隐藏层时又回把t1和t2同时进行训练出新的特征。 ![](https://img.tnblog.net/arcimg/hb/ad1118e4de97428f961b53b815c5fc24.png) tn2>当我们有很多个时间序列数据时,我们一般只需要保留最后一个全链接层,因为它包含了前面所有的特征。 关于其中产生的中间数据结果,将全部忽略只需要保留最后。 ## 代码示例 tn2>我们的主要代码在`run.py`中。 ```python import time import torch import numpy as np from train_eval import train, init_network from importlib import import_module import argparse from tensorboardX import SummaryWriter parser = argparse.ArgumentParser(description='Chinese Text Classification') parser.add_argument('--model', type=str, required=True, help='choose a model: TextCNN, TextRNN, FastText, TextRCNN, TextRNN_Att, DPCNN, Transformer') parser.add_argument('--embedding', default='pre_trained', type=str, help='random or pre_trained') parser.add_argument('--word', default=False, type=bool, help='True for word, False for char') args = parser.parse_args() if __name__ == '__main__': dataset = 'THUCNews' # 数据集 # 旋转Embedding # 搜狗新闻:embedding_SougouNews.npz, 腾讯:embedding_Tencent.npz, 随机初始化:random embedding = 'embedding_SougouNews.npz' if args.embedding == 'random': embedding = 'random' model_name = args.model #参数model选择模型:TextRNN 可选:TextCNN, TextRNN, if model_name == 'FastText': from utils_fasttext import build_dataset, build_iterator, get_time_dif embedding = 'random' else: from utils import build_dataset, build_iterator, get_time_dif # utils包中用于加载数据集、分词、分字的工具 # 导入模块 models.TextRNN 模块 x = import_module('models.' + model_name) # 初始化该模块的Config类,进行配置参数 config = x.Config(dataset, embedding) # 设置随机数的一致性,举例:第一次是3,第二次是6..每次都按照这种进行随机,第一个参数表示初始化值 np.random.seed(1) torch.manual_seed(1) torch.cuda.manual_seed_all(1) torch.backends.cudnn.deterministic = True # 保证每次随机结果一样 # 打印时间 start_time = time.time() print("Loading data...") # 创建数据集 vocab, train_data, dev_data, test_data = build_dataset(config, args.word) train_iter = build_iterator(train_data, config) dev_iter = build_iterator(dev_data, config) test_iter = build_iterator(test_data, config) time_dif = get_time_dif(start_time) print("Time usage:", time_dif) # train config.n_vocab = len(vocab) # 初始化模型的 model = x.Model(config).to(config.device) writer = SummaryWriter(log_dir=config.log_path + '/' + time.strftime('%m-%d_%H.%M', time.localtime())) # 做一下模型初始化 if model_name != 'Transformer': init_network(model) print(model.parameters) # 训练模型 train(config, model, train_iter, dev_iter, test_iter,writer) ``` tn2>`TextRNN.py` ```python # coding: UTF-8 import torch import torch.nn as nn import torch.nn.functional as F import numpy as np class Config(object): """配置参数""" def __init__(self, dataset, embedding): # dataset 文件夹根目录名称 # embedding 映射表的文件名 self.model_name = 'TextRNN' self.train_path = dataset + '/data/train.txt' # 训练集 self.dev_path = dataset + '/data/dev.txt' # 验证集 self.test_path = dataset + '/data/test.txt' # 测试集 self.class_list = [x.strip() for x in open( dataset + '/data/class.txt').readlines()] # 类别名单 self.vocab_path = dataset + '/data/vocab.pkl' # 词表 self.save_path = dataset + '/saved_dict/' + self.model_name + '.ckpt' # 模型训练结果 self.log_path = dataset + '/log/' + self.model_name self.embedding_pretrained = torch.tensor( np.load(dataset + '/data/' + embedding)["embeddings"].astype('float32'))\ if embedding != 'random' else None # 预训练词向量 self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # 设备 self.dropout = 0.5 # 随机失活,随机丢弃网络中的部分神经元。这里丢弃其中的50%。 (可改) self.require_improvement = 1000 # 若超过1000batch效果还没提升,则提前结束训练 (提前停止策略)(可改) self.num_classes = len(self.class_list) # 类别数 self.n_vocab = 0 # 词表大小,在运行时赋值 self.num_epochs = 10 # epoch数 迭代轮数(可改) self.batch_size = 128 # mini-batch大小(可改) self.pad_size = 32 # 每句话处理成的长度(短填长切),多退少补 self.learning_rate = 1e-3 # 学习率 self.embed = self.embedding_pretrained.size(1)\ if self.embedding_pretrained is not None else 300 # 字向量维度, 若使用了预训练词向量,则维度统一 self.hidden_size = 128 # lstm隐藏层 self.num_layers = 2 # lstm层数 '''Recurrent Neural Network for Text Classification with Multi-Task Learning''' class Model(nn.Module): def __init__(self, config): super(Model, self).__init__() # 进行映射向量 if config.embedding_pretrained is not None: self.embedding = nn.Embedding.from_pretrained(config.embedding_pretrained, freeze=False) else: self.embedding = nn.Embedding(config.n_vocab, config.embed, padding_idx=config.n_vocab - 1) # config.embed把每个向量映射到多少个维度 # bidirectional=True 从左往右走后,又从右往左走。维度翻倍256 self.lstm = nn.LSTM(config.embed, config.hidden_size, config.num_layers, bidirectional=True, batch_first=True, dropout=config.dropout) # 最后加一个全连接层128*2 2层 self.fc = nn.Linear(config.hidden_size * 2, config.num_classes) def forward(self, x): x, _ = x out = self.embedding(x) # 映射 [batch_size, seq_len, embeding]=[128, 32, 300] out, _ = self.lstm(out) # LSTM 隐藏层进行训练 out = self.fc(out[:, -1, :]) # 最后全连接层进行训练 句子最后时刻的 hidden state return out ``` tn2>`train_eval.py` ```python # coding: UTF-8 import numpy as np import torch import torch.nn as nn import torch.nn.functional as F from sklearn import metrics import time from utils import get_time_dif from tensorboardX import SummaryWriter # 权重初始化,默认xavier def init_network(model, method='xavier', exclude='embedding', seed=123): for name, w in model.named_parameters(): if exclude not in name: if 'weight' in name: if method == 'xavier': nn.init.xavier_normal_(w) elif method == 'kaiming': nn.init.kaiming_normal_(w) else: nn.init.normal_(w) elif 'bias' in name: nn.init.constant_(w, 0) else: pass def train(config, model, train_iter, dev_iter, test_iter,writer): start_time = time.time() # 设置训练模式 model.train() # Adam训练模型 lr配置学习率 optimizer = torch.optim.Adam(model.parameters(), lr=config.learning_rate) # 学习率指数衰减,每次epoch:学习率 = gamma * 学习率 # scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.9) total_batch = 0 # 记录进行到多少batch dev_best_loss = float('inf') last_improve = 0 # 记录上次验证集loss下降的batch数 flag = False # 记录是否很久没有效果提升 #writer = SummaryWriter(log_dir=config.log_path + '/' + time.strftime('%m-%d_%H.%M', time.localtime())) for epoch in range(config.num_epochs): print('Epoch [{}/{}]'.format(epoch + 1, config.num_epochs)) # scheduler.step() # 学习率衰减 for i, (trains, labels) in enumerate(train_iter): #print (trains[0].shape) # 前向传播 outputs = model(trains) # 梯度清0 model.zero_grad() # 损失函数 loss = F.cross_entropy(outputs, labels) # 参数更新 loss.backward() optimizer.step() # 每执行一定的epoch我们执行一下验证集 if total_batch % 100 == 0: # 每多少轮输出在训练集和验证集上的效果 # 获取标签真实值 true = labels.data.cpu() # 预测值 predic = torch.max(outputs.data, 1)[1].cpu() # 计算准确率 train_acc = metrics.accuracy_score(true, predic) dev_acc, dev_loss = evaluate(config, model, dev_iter) # 判断当前损失相对于上一次如果要小,则保存模型 if dev_loss < dev_best_loss: dev_best_loss = dev_loss torch.save(model.state_dict(), config.save_path) improve = '*' last_improve = total_batch else: improve = '' time_dif = get_time_dif(start_time) # 打印结果 msg = 'Iter: {0:>6}, Train Loss: {1:>5.2}, Train Acc: {2:>6.2%}, Val Loss: {3:>5.2}, Val Acc: {4:>6.2%}, Time: {5} {6}' print(msg.format(total_batch, loss.item(), train_acc, dev_loss, dev_acc, time_dif, improve)) writer.add_scalar("loss/train", loss.item(), total_batch) writer.add_scalar("loss/dev", dev_loss, total_batch) writer.add_scalar("acc/train", train_acc, total_batch) writer.add_scalar("acc/dev", dev_acc, total_batch) # 又调整会训练模式中 model.train() total_batch += 1 # 如果当前次数比上一次最好的大于我们设定的1000次容忍度,就结束 if total_batch - last_improve > config.require_improvement: # 验证集loss超过1000batch没下降,结束训练 print("No optimization for a long time, auto-stopping...") flag = True break if flag: break writer.close() # 选择最好的一次,调用测试集 test(config, model, test_iter) def test(config, model, test_iter): # test model.load_state_dict(torch.load(config.save_path)) model.eval() start_time = time.time() test_acc, test_loss, test_report, test_confusion = evaluate(config, model, test_iter, test=True) msg = 'Test Loss: {0:>5.2}, Test Acc: {1:>6.2%}' print(msg.format(test_loss, test_acc)) print("Precision, Recall and F1-Score...") print(test_report) print("Confusion Matrix...") print(test_confusion) time_dif = get_time_dif(start_time) print("Time usage:", time_dif) def evaluate(config, model, data_iter, test=False): # 跳到验证集 model.eval() loss_total = 0 predict_all = np.array([], dtype=int) # 预测的结果 labels_all = np.array([], dtype=int) # 正确的结果 with torch.no_grad(): for texts, labels in data_iter: outputs = model(texts) # 计算损失 loss = F.cross_entropy(outputs, labels) loss_total += loss labels = labels.data.cpu().numpy() predic = torch.max(outputs.data, 1)[1].cpu().numpy() # 获取所有的验证集的真实值和预测值 labels_all = np.append(labels_all, labels) predict_all = np.append(predict_all, predic) # 正确率 acc = metrics.accuracy_score(labels_all, predict_all) # 如果我们在执行测试,它有一些评估指标 if test: report = metrics.classification_report(labels_all, predict_all, target_names=config.class_list, digits=4) confusion = metrics.confusion_matrix(labels_all, predict_all) return acc, loss_total / len(data_iter), report, confusion return acc, loss_total / len(data_iter) ``` tn2>`utils.py` ```python # coding: UTF-8 import os import torch import numpy as np import pickle as pkl from tqdm import tqdm import time from datetime import timedelta MAX_VOCAB_SIZE = 10000 # 词表长度限制 UNK, PAD = '<UNK>', '<PAD>' # 未知字,padding符号 def build_vocab(file_path, tokenizer, max_size, min_freq): vocab_dic = {} with open(file_path, 'r', encoding='UTF-8') as f: for line in tqdm(f): lin = line.strip() if not lin: continue content = lin.split('\t')[0] for word in tokenizer(content): vocab_dic[word] = vocab_dic.get(word, 0) + 1 vocab_list = sorted([_ for _ in vocab_dic.items() if _[1] >= min_freq], key=lambda x: x[1], reverse=True)[:max_size] vocab_dic = {word_count[0]: idx for idx, word_count in enumerate(vocab_list)} vocab_dic.update({UNK: len(vocab_dic), PAD: len(vocab_dic) + 1}) return vocab_dic def build_dataset(config, ues_word): if ues_word: tokenizer = lambda x: x.split(' ') # 以空格隔开,word-level 定义分词器 else: tokenizer = lambda x: [y for y in x] # char-level 定义分字器 # 加载语量表,词汇表(没有就根据训练集进行创建一个) if os.path.exists(config.vocab_path): vocab = pkl.load(open(config.vocab_path, 'rb')) else: vocab = build_vocab(config.train_path, tokenizer=tokenizer, max_size=MAX_VOCAB_SIZE, min_freq=1) pkl.dump(vocab, open(config.vocab_path, 'wb')) print(f"Vocab size: {len(vocab)}") # 4762个 def load_dataset(path, pad_size=32): contents = [] # 读取文本数据 with open(path, 'r', encoding='UTF-8') as f: # 遍历每一行 for line in tqdm(f): # 去掉换行符 lin = line.strip() if not lin: continue # 分句得出句子和标签 content, label = lin.split('\t') words_line = [] # 分字 token = tokenizer(content) # 获取分字长度 seq_len = len(token) # 判断超过我这里是10的长度的按照<PAD>补齐 if pad_size: if len(token) < pad_size: token.extend([vocab.get(PAD)] * (pad_size - len(token))) else: token = token[:pad_size] seq_len = pad_size # word to id for word in token: # 将字转换成语量表的索引id添加到words_line中,如果找不到用<UNK>代替 words_line.append(vocab.get(word, vocab.get(UNK))) contents.append((words_line, int(label), seq_len)) # 返回语量表、标签与长度 return contents # [([...], 0), ([...], 1), ...] # 加载训练集、验证集和测试集 参数:路径,分字。 train = load_dataset(config.train_path, config.pad_size) dev = load_dataset(config.dev_path, config.pad_size) test = load_dataset(config.test_path, config.pad_size) # 返回语量表,训练集、验证集和测试集 return vocab, train, dev, test class DatasetIterater(object): def __init__(self, batches, batch_size, device): self.batch_size = batch_size self.batches = batches self.n_batches = len(batches) // batch_size self.residue = False # 记录batch数量是否为整数 # 判断能否整除 if len(batches) % self.n_batches != 0: self.residue = True self.index = 0 # 设置跑的设备 self.device = device def _to_tensor(self, datas): x = torch.LongTensor([_[0] for _ in datas]).to(self.device) y = torch.LongTensor([_[1] for _ in datas]).to(self.device) # pad前的长度(超过pad_size的设为pad_size) seq_len = torch.LongTensor([_[2] for _ in datas]).to(self.device) return (x, seq_len), y def __next__(self): if self.residue and self.index == self.n_batches: batches = self.batches[self.index * self.batch_size: len(self.batches)] self.index += 1 batches = self._to_tensor(batches) return batches elif self.index > self.n_batches: self.index = 0 raise StopIteration else: batches = self.batches[self.index * self.batch_size: (self.index + 1) * self.batch_size] self.index += 1 batches = self._to_tensor(batches) return batches def __iter__(self): return self def __len__(self): if self.residue: return self.n_batches + 1 else: return self.n_batches def build_iterator(dataset, config): iter = DatasetIterater(dataset, config.batch_size, config.device) return iter def get_time_dif(start_time): """获取已使用时间""" end_time = time.time() time_dif = end_time - start_time return timedelta(seconds=int(round(time_dif))) if __name__ == "__main__": '''提取预训练词向量''' # 下面的目录、文件名按需更改。 train_dir = "./THUCNews/data/train.txt" vocab_dir = "./THUCNews/data/vocab.pkl" pretrain_dir = "./THUCNews/data/sgns.sogou.char" emb_dim = 300 filename_trimmed_dir = "./THUCNews/data/embedding_SougouNews" if os.path.exists(vocab_dir): word_to_id = pkl.load(open(vocab_dir, 'rb')) else: # tokenizer = lambda x: x.split(' ') # 以词为单位构建词表(数据集中词之间以空格隔开) tokenizer = lambda x: [y for y in x] # 以字为单位构建词表 word_to_id = build_vocab(train_dir, tokenizer=tokenizer, max_size=MAX_VOCAB_SIZE, min_freq=1) pkl.dump(word_to_id, open(vocab_dir, 'wb')) embeddings = np.random.rand(len(word_to_id), emb_dim) f = open(pretrain_dir, "r", encoding='UTF-8') for i, line in enumerate(f.readlines()): # if i == 0: # 若第一行是标题,则跳过 # continue lin = line.strip().split(" ") if lin[0] in word_to_id: idx = word_to_id[lin[0]] emb = [float(x) for x in lin[1:301]] embeddings[idx] = np.asarray(emb, dtype='float32') f.close() np.savez_compressed(filename_trimmed_dir, embeddings=embeddings) ``` tn>注意我们在使用vscode跑时需要在`launch.json`中加上`model`参数。 ```json { "name": "Python: 当前文件", "type": "python", "request": "launch", "program": "${file}", "console": "integratedTerminal", "justMyCode": true, "args": [ "--model","TextRNN" ] }, ``` ![](https://img.tnblog.net/arcimg/hb/89749b16a53743b99581af929cb79f30.png) tn2><a href="https://download.tnblog.net/resource/index/83298d1dd8e14856b0bf5f1dc3aaaf05">代码链接</a>
欢迎加群讨论技术,1群:677373950(满了,可以加,但通过不了),2群:656732739
👈{{preArticle.title}}
👉{{nextArticle.title}}
评价
{{titleitem}}
{{titleitem}}
{{item.content}}
{{titleitem}}
{{titleitem}}
{{item.content}}
尘叶心繁
这一世以无限游戏为使命!
博主信息
排名
6
文章
6
粉丝
16
评论
8
文章类别
.net后台框架
168篇
linux
17篇
linux中cve
1篇
windows中cve
0篇
资源分享
10篇
Win32
3篇
前端
28篇
传说中的c
4篇
Xamarin
9篇
docker
15篇
容器编排
101篇
grpc
4篇
Go
15篇
yaml模板
1篇
理论
2篇
更多
Sqlserver
4篇
云产品
39篇
git
3篇
Unity
1篇
考证
2篇
RabbitMq
23篇
Harbor
1篇
Ansible
8篇
Jenkins
17篇
Vue
1篇
Ids4
18篇
istio
1篇
架构
2篇
网络
7篇
windbg
4篇
AI
18篇
threejs
2篇
人物
1篇
嵌入式
3篇
python
13篇
HuggingFace
8篇
pytorch
9篇
opencv
6篇
Halcon
2篇
最新文章
最新评价
{{item.articleTitle}}
{{item.blogName}}
:
{{item.content}}
关于我们
ICP备案 :
渝ICP备18016597号-1
网站信息:
2018-2024
TNBLOG.NET
技术交流:
群号656732739
联系我们:
contact@tnblog.net
欢迎加群
欢迎加群交流技术