使用selenium模拟浏览器行为,免得导出自己的文章还要一篇一篇点击。
下载:
https://googlechromelabs.github.io/chrome-for-testing/
python3.10以上,安装:
bashpip install selenium
在最后一句话打断点登录到博客管理页面。
python# encoding: utf-8
import time
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
import json
# 创建ChromeOptions对象
chrome_options = Options()
# chrome_options.add_argument("--headless") # 启动无头模式
chrome_options.add_argument("--disable-gpu")
# 创建Service对象
service = Service(executable_path='chromedriver.exe')
# 创建WebDriver对象并启动浏览器
driver = webdriver.Chrome(options=chrome_options, service=service)
# 访问网页
driver.get("https://mp.csdn.net/mp_blog/manage/article?spm=3001.5298")
# 关闭浏览器
driver.quit()
手动在断点期间执行这个代码可以保留下来某个页面下的所有博客信息。
pythonall_blogs = []
pagex = 5
for page in range(1, pagex + 1):
# 获取当前页面的博客数量(注意最后一页可能少于20个)
blogs_on_page = driver.find_elements(By.XPATH,
'//*[@id="view-containe"]/div/div/div[3]/div[2]/div/div[1]/div[1]/p[1]/a')
for i, blog in enumerate(blogs_on_page, start=1):
try:
blog_xpath = f'//*[@id="view-containe"]/div/div/div[3]/div[2]/div[{i}]/div[1]/div[1]/p[1]/a'
blog_name = driver.find_element(By.XPATH, blog_xpath).text
blog_link = driver.find_element(By.XPATH, blog_xpath).get_attribute('href')
all_blogs.append({"name": blog_name, "link": blog_link})
except Exception as e:
print(f"Error extracting blog on page {page}, index {i}: {e}")
if page < pagex:
try:
driver.find_element(By.CLASS_NAME, "btn-next").click()
time.sleep(2)
except Exception as e:
print(f"Error navigating to page {page + 1}: {e}")
break
with open('caogaoxiang.json', 'w', encoding='utf-8') as f:
json.dump(all_blogs, f, ensure_ascii=False, indent=4)
有了json信息后,依靠这个代码可以拿到自己的博客原始内容:
pythonwith open('all_blogs.json', 'r', encoding='utf-8') as f:
blogs = json.load(f)
for index, blog in enumerate(blogs, start=1):
file_name = f"output/{index:03}.md"
if os.path.exists(file_name):
continue
try:
driver.get(blog['link'])
time.sleep(1)
markdown_element = driver.find_element(By.XPATH, '/html/body/div[1]/div[1]/div[2]/div/div[2]/div[1]')
markdown_content = markdown_element.text
with open(file_name, 'w', encoding='utf-8') as f:
f.write(markdown_content)
print(f"成功保存 {blog['name']} 到 {file_name}")
except Exception as e:
print(f"处理 {blog['name']} 时出错: {e}")
通过这个代码可以拿到自己博客的所有图片:
python
import os
import re
import uuid
import requests
import markdown
from bs4 import BeautifulSoup
from urllib.parse import urlparse
# 创建一个函数来解析Markdown文件,下载图片并更新链接
def process_markdown_file(md_file, output_dir, img_cntx, dst_md):
with open(md_file, 'r', encoding='utf-8') as f:
markdown_content = f.read()
# 将Markdown解析为HTML
html = markdown.markdown(markdown_content)
# 使用BeautifulSoup解析HTML以查找图片标签
soup = BeautifulSoup(html, 'html.parser')
images = soup.find_all('img')
# 初始化一个字典来存储旧链接和新链接的映射
img_url_mapping = {}
# 自定义请求头来伪装为浏览器
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36'
}
# 遍历所有的图片标签并处理
for img in images:
img_url = img['src']
try:
# 获取图片扩展名
parsed_url = urlparse(img_url)
img_extension = os.path.splitext(parsed_url.path)[-1]
# img_extension不是png jpg jpeg gif webp就
# 生成UUID作为图片文件名
img_name = f'{img_cntx:05d}{img_extension}'
img_cntx += 1
img_path = os.path.join(output_dir, img_name)
if os.path.exists(img_path):
img_url_mapping[img_url] = img_path
else:
# 下载图片
img_data = requests.get(img_url, headers=headers).content
# 保存图片到本地
with open(img_path, 'wb') as img_file:
img_file.write(img_data)
# 更新链接映射
img_url_mapping[img_url] = img_path
# print(f"成功下载图片 {img_url} 并保存为 {img_path}")
except Exception as e:
print(f"下载图片 {img_url} 时出错: {e}")
# 从文件中读取正则表达式
with open('pat.txt', 'r', encoding='utf-8') as file:
pattern_str = file.read()
# 编译正则表达式
pattern = re.compile(pattern_str)
# 替换Markdown中的图片URL
for old_url, new_path in img_url_mapping.items():
# 将路径中的反斜杠替换为正斜杠
corrected_path = new_path.replace("\\", "/")
# 使用编译后的正则表达式进行替换
# markdown_content = pattern.sub(r'![image](' + corrected_path + r')', markdown_content)
corrected_path = "/" + corrected_path
markdown_content = markdown_content.replace(old_url, corrected_path)
# 将更新后的Markdown内容保存回文件
with open(dst_md, 'w', encoding='utf-8') as f:
f.write(markdown_content)
# print(f"更新后的Markdown文件已保存为 {dst_md}")
return img_cntx
if __name__ == '__main__':
img_cnt = 1
srcp = "output"
files = os.listdir(srcp)
dst = "output_mdwebsite"
os.makedirs(dst, exist_ok=True)
output_dir = 'static/img/images' # 替换为你希望保存图片的文件夹
os.makedirs(output_dir, exist_ok=True)
for md_file in files:
try:
dst_md = os.path.join(dst, md_file)
md_file = os.path.join(srcp, md_file)
img_cnt = process_markdown_file(md_file, output_dir, img_cnt, dst_md)
except:
print(md_file)
用这个代码取出所有tags:
python# encoding: utf-8
import time
from bs4 import BeautifulSoup
from fontTools.misc.psOperators import ps_string
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
import json
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
# 创建ChromeOptions对象
chrome_options = Options()
# chrome_options.add_argument("--headless") # 启动无头模式
chrome_options.add_argument("--disable-gpu")
# 创建Service对象
service = Service(executable_path='chromedriver.exe')
# 创建WebDriver对象并启动浏览器
driver = webdriver.Chrome(options=chrome_options, service=service)
# 访问网页
driver.get("https://mp.csdn.net/mp_blog/manage/article?spm=3001.5298")
# 打开all_blogs2.json文件并加载数据
with open('all_blogs2.json', 'r', encoding='utf-8') as f:
all_blogs = json.load(f)
# 定义等待条件
wait = WebDriverWait(driver, 10) # 设置最长等待时间为10秒
for blogdata in all_blogs:
name = blogdata['name']
link = blogdata['link']
# 提取出文章ID
get_qianzhui = "https://qq742971636.blog.csdn.net/article/details/"
articleId = link.replace("https://editor.csdn.net/md/?articleId=", "")
link = get_qianzhui + articleId
blogdata['link'] = link
# 等待元素加载
try:
driver.get(link)
time.sleep(1)
cls_name = 'blog-tags-box'
# 等待直到指定的元素可见
element = wait.until(EC.visibility_of_element_located((By.CLASS_NAME, cls_name)))
elements = driver.find_elements(By.CLASS_NAME, cls_name)
html_contents = [element.get_attribute('outerHTML') for element in elements]
html_content = ''.join(html_contents)
soup = BeautifulSoup(html_content, 'lxml')
category_links = soup.select('.tags-box .label + a')
categories = [link.text.strip() for link in category_links]
tag_links = soup.select('.tags-box a[rel="nofollow"]')
tags = [link.text.strip() for link in tag_links]
blogdata['categories'] = categories
blogdata['tags'] = tags
except:
print(f"Error extracting categories for blog: {name}")
try:
cls_name = 'time'
time_element = driver.find_element(By.CLASS_NAME, cls_name)
time_text = time_element.text
blogdata['time'] = time_text
except:
print(f"Error extracting time for blog: {name}")
# 将所有数据保存到一个JSON文件中
with open('all_blogs3.json', 'w', encoding='utf-8') as f:
json.dump(all_blogs, f, ensure_ascii=False, indent=4)
# 关闭浏览器
driver.quit()
最终用这个代码进行传输博客到自己服务器:
pythonimport json
import os
import re
import time
from datetime import datetime, timezone, timedelta
import requests
def adjust_markdown_line_breaks(input_file, output_file):
with open(input_file, 'r', encoding='utf-8') as file:
content = file.read()
# 使用正则表达式替换不在代码块中的单个换行符为双换行符
adjusted_content = re.sub(r'(?<!\n)\n(?!\n|```)', '\n\n', content)
with open(output_file, 'w', encoding='utf-8') as file:
file.write(adjusted_content)
# chuli_wenzhang = './danpianji.json'
# categoriesx = '单片机ban'
# chuli_wenzhang = './shenduxuexi.json'
# categoriesx = '深度学习ban'
# chuli_wenzhang = './linuxyunwei.json'
# categoriesx = 'Linux运维'
chuli_wenzhang = './taobaozuopin.json'
categoriesx = '售卖作品'
jinwokejian_wenzhang = 'jinwokejian.json'
caogaoxiang_wenzhang = 'caogaoxiang.json'
all_wenzhang_xinxi = "all_blogs3.json"
src = r"C:\Users\Administrator\Downloads\csdn-blog-export-main"
src_md_path = r"C:\Users\Administrator\Downloads\csdn-blog-export-main\output_mdwebsite"
dst_md = r"C:\Users\Administrator\Downloads\csdn-blog-export-main\md"
os.makedirs(dst_md, exist_ok=True)
with open(chuli_wenzhang, 'r', encoding='utf-8') as f:
linuxyunwei_data = json.load(f)
with open(jinwokejian_wenzhang, 'r', encoding='utf-8') as f:
jinwokejian_data = json.load(f)
with open(caogaoxiang_wenzhang, 'r', encoding='utf-8') as f:
caogaoxiang_data = json.load(f)
with open(all_wenzhang_xinxi, 'r', encoding='utf-8') as f:
all_blogs = json.load(f)
# 用文章id做key,文章信息做value,all_blogs
all_blogs_dict = {}
for index, blog_dict in enumerate(all_blogs):
blog_dict['index'] = index + 1
id = blog_dict['link'].replace('https://qq742971636.blog.csdn.net/article/details/', '')
if id in all_blogs_dict:
print('id重复', id)
all_blogs_dict[id] = blog_dict
# 仅我可见的文章要从chuli_wenzhang删除,先整上id
chuli_wenzhang_dict = {}
for blog_dict in linuxyunwei_data:
id = blog_dict['link'].replace('https://editor.csdn.net/md/?articleId=', '')
chuli_wenzhang_dict[id] = blog_dict
# jinwokejian_wenzhang
jinwokejian_wenzhang_dict = {}
for blog_dict in jinwokejian_data:
id = blog_dict['link'].replace('https://editor.csdn.net/md/?articleId=', '')
jinwokejian_wenzhang_dict[id] = blog_dict
# caogaoxiang
caogaoxiang_wenzhang_dict = {}
for blog_dict in caogaoxiang_data:
id = blog_dict['link'].replace('https://editor.csdn.net/md/?articleId=', '')
caogaoxiang_wenzhang_dict[id] = blog_dict
# 删除chuli_wenzhang_dict中的仅我可见的文章
for id in jinwokejian_wenzhang_dict:
if id in chuli_wenzhang_dict:
del chuli_wenzhang_dict[id]
print('删除了', id)
# 删除chuli_wenzhang_dict中的草稿文章
for id in caogaoxiang_wenzhang_dict:
if id in chuli_wenzhang_dict:
del chuli_wenzhang_dict[id]
print('删除了', id)
# 用id读取md文件,然后写入到dst_md
for id in chuli_wenzhang_dict:
blog_dict = all_blogs_dict[id]
name = blog_dict['name']
index = blog_dict['index']
src_md_file = os.path.join(src_md_path, f'{index:03}.md')
dst_md_file = os.path.join(dst_md, f'{index:03}.md')
adjust_markdown_line_breaks(src_md_file, dst_md_file)
def remove_first_newline(directory):
# 遍历指定目录下的所有文件
for filename in os.listdir(directory):
if filename.endswith(".md"):
file_path = os.path.join(directory, filename)
with open(file_path, 'r', encoding='utf-8') as file:
lines = file.readlines()
# 如果第一行是空行,移除它
if lines and lines[0].strip() == '':
lines.pop(0)
print(f'Removed first newline in {file_path}')
# 重新写入文件
with open(file_path, 'w', encoding='utf-8') as file:
file.writelines(lines)
for i in range(6):
# 调用函数,传入你的md文件所在的目录路径
remove_first_newline(dst_md)
import os
def add_prefix_if_codeblock(directory):
# 遍历指定目录下的所有文件
for filename in os.listdir(directory):
if filename.endswith(".md"):
file_path = os.path.join(directory, filename)
with open(file_path, 'r', encoding='utf-8') as file:
lines = file.readlines()
# 检查文件是否以```开头
if lines and lines[0].strip().startswith("```"):
# 在文件开头添加"运维信息:"
lines.insert(0, chuli_wenzhang)
print(f'Added prefix in {file_path}')
# 重新写入文件
with open(file_path, 'w', encoding='utf-8') as file:
file.writelines(lines)
# 调用函数,传入你的md文件所在的目录路径
add_prefix_if_codeblock(dst_md)
def add_more_marker(directory):
# 遍历指定目录下的所有文件
for filename in os.listdir(directory):
if filename.endswith(".md"):
file_path = os.path.join(directory, filename)
with open(file_path, 'r', encoding='utf-8') as file:
lines = file.readlines()
# 检查文件中是否含有<!-- more -->
if "<!-- more -->" not in "".join(lines):
# 如果第一行后不存在,插入<!-- more -->在第二行
lines.insert(1, "\n\n<!-- more -->\n\n")
# 重新写入文件
with open(file_path, 'w', encoding='utf-8') as file:
file.writelines(lines)
# 调用函数,传入你的md文件所在的目录路径
add_more_marker(dst_md)
import os
import re
def remove_blank_lines_in_code_blocks(md_content):
# 正则表达式匹配代码块
code_block_pattern = r'(```[\s\S]*?```)' # 匹配以 ``` 开始和结束的代码块
def remove_blank_lines(match):
# 获取匹配的代码块内容
code_block = match.group(0)
# 去掉代码块中的空行
code_block = re.sub(r'\n\s*\n', '\n', code_block)
return code_block
# 使用sub方法替换代码块中的内容
modified_content = re.sub(code_block_pattern, remove_blank_lines, md_content)
return modified_content
def process_md_files_in_directory(directory):
# 遍历指定目录下的所有文件
for filename in os.listdir(directory):
if filename.endswith('.md'):
filepath = os.path.join(directory, filename)
# 读取Markdown文件内容
with open(filepath, 'r', encoding='utf-8') as file:
content = file.read()
# 去掉代码块中的空行
modified_content = remove_blank_lines_in_code_blocks(content)
# 将修改后的内容写回原文件
with open(filepath, 'w', encoding='utf-8') as file:
file.write(modified_content)
print(f"已处理文件: {filename}")
# 处理目录下的所有Markdown文件
process_md_files_in_directory(dst_md)
chuli_wenzhang_dict_rev = []
for id in chuli_wenzhang_dict:
chuli_wenzhang_dict_rev.append(id)
chuli_wenzhang_dict_rev = chuli_wenzhang_dict_rev[::-1]
jiezhe = "139426685"
lueguo = True
# 将chuli_wenzhang_dict上传到api
for id in chuli_wenzhang_dict_rev:
if jiezhe == id:
lueguo = False
if lueguo is True:
continue
print("=======================================")
print(id)
blog_dict = all_blogs_dict[id]
name = blog_dict['name']
link = blog_dict['link']
categories = blog_dict['categories']
tags = blog_dict['tags']
original_time_str = blog_dict['time']
index = blog_dict['index']
categories = 'OpenCV'
tags = list(set(tags))
original_time_str = re.sub(r'[\u4e00-\u9fff]', '', original_time_str)
# 将字符串转换为 datetime 对象,并设定为中国时间(UTC+8)
dt = datetime.strptime(original_time_str.strip(), '%Y-%m-%d %H:%M:%S')
dt = dt.replace(tzinfo=timezone(timedelta(hours=8)))
# 转换为 UTC 时间
utc_dt = dt.astimezone(timezone.utc)
# 转换为 ISO 8601 格式字符串,带毫秒和Z时区标志
iso_time_str = utc_dt.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'
# md
md_filename = f"{index:03d}.md"
md_filepath = os.path.join(dst_md, md_filename)
content = open(md_filepath, 'r', encoding='utf-8').read()
a = 1
# API 端点
url = "http://19.31.11.10/api/admin/article"
# 设置请求头
headers = {
"accept": "application/json",
"token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyIsImlhdCI6MTcyNTE1NzI1MCwiZXhwIjo0ODc4NzU3MjUwfQ.",
"Content-Type": "application/json"
}
# 请求体
data = {
"title": name,
# "hidden": False,
# "private": False,
"content": content,
"tags": tags,
"category": categoriesx,
# "updatedAt": iso_time_str,
# "createdAt": iso_time_str,
}
max_attempts = 3 # 最大尝试次数
attempt = 0
while attempt < max_attempts:
attempt += 1
response = requests.post(url, headers=headers, json=data)
if response.status_code == 201:
print("Article published successfully.")
break # 请求成功,跳出循环
else:
print(f"Attempt {attempt}: Failed to publish article. Status code: {response.status_code}")
if attempt < max_attempts:
print("Retrying...")
time.sleep(1) # 等待2秒后重试(你可以根据需要调整等待时间)
else:
print("Failed to publish article after 3 attempts.")
print(len(chuli_wenzhang_dict))
本文作者:Dong
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC。本作品采用《知识共享署名-非商业性使用 4.0 国际许可协议》进行许可。您可以在非商业用途下自由转载和修改,但必须注明出处并提供原作者链接。 许可协议。转载请注明出处!