Simulación reto OPC UA (Defcon 31)

Recientemente leía un post en X en el que anunciaba la resolución del CTF ICS de la #Defcon33, una impresionante simulación de un aeropuerto:

Es demasiado pronto para adentrarnos en un writeup por lo que me llevó a pensar en retos similares precisamente de Red Alert ICS CTF, como el que se hizo en la Defcon 31, en esa ocasión sobre el estándar de comunicación industrial OPC UA. Lo que veía los participantes como briefing inicial era esta breve descripción:

A PLC that sends strange signals from the airport has been spotted.
Analyzing the signals from the PLC seems to give you a clue on how to control the airport.
Analyze the signals from the PLC to get the 35x35 QR code.

IP : 192.168.50.49 4840

Y si se conectaban a esa IP y puerto, encontraban que no era necesario autenticarse (acceso anónimo), viendo un servidor con dos objetos:

- Runaway Approach Light Control (un distractor).

- Approach Light que contenía los objetos line_00 hasta line_34 (35 valores binarios que cambiaban aproximadamente cada segundo) y un campo ts de timestamp. Estos valores eran la representación de los píxeles del QR que había que reconstruir para obtener la flag.

Divertido, verdad? Tenéis algún writeup como éste, pero ¿y si pudiéramos experimentarlo de forma sencilla con una simulación? Gracias a la IA hoy en día es trivial construirlo.

Primero el servidor:

from opcua import ua, Server
from datetime import datetime
import time
import qrcode
from PIL import Image
import random

FLAG_TEXT = "RACTF{4r3_y0u_Abl3_70_DeC0D3_qr}"  # texto para generar el QR
QR_SIZE = 35  # 35x35
UPDATE_INTERVAL = 1.0  # segundos entre frames (emula ~1Hz)
HOST = "0.0.0.0"
PORT = 4840


def generate_qr_bits(text, size=35):
    """Genera una matriz size x size con valores 0/1 a partir de un QR."""
    qr = qrcode.QRCode(border=0)
    qr.add_data(text)
    qr.make(fit=True)
    img = qr.make_image(fill_color="black", back_color="white").convert("1")

    # Redimensionamos exactamente a (size, size)
    img = img.resize((size, size), Image.NEAREST)
    pixels = img.load()
    bits = [[1 if pixels[x, y] == 0 else 0 for x in range(size)] for y in range(size)]
    return bits


if __name__ == "__main__":
    server = Server()
    endpoint = f"opc.tcp://{HOST}:{PORT}"
    server.set_endpoint(endpoint)
    uri = "http://examples.opcua/approach_light"
    idx = server.register_namespace(uri)

    objects = server.get_objects_node()
    approach = objects.add_object(idx, "Approach Light")

    # Crear variables line_00 .. line_34 (tipo String para simplicidad)
    line_vars = []
    for i in range(QR_SIZE):
        var = approach.add_variable(idx, f"line_{i:02d}", "0" * QR_SIZE)
        var.set_writable()  # permitir que el servidor actualice
        line_vars.append(var)

    # timestamp variable
    ts_var = approach.add_variable(idx, "ts", datetime.utcnow().isoformat())
    ts_var.set_writable()

    # generar la imagen base
    base_bits = generate_qr_bits(FLAG_TEXT, QR_SIZE)

    print(f"[server] Starting OPC-UA server at {endpoint}")
    server.start()
    print("[server] Server started. Approach Light exposed with 35 lines.")

    try:
        frame = 0
        while True:
            # crear frame exacto sin ruido
            noisy = [row.copy() for row in base_bits]

            # actualizar variables como strings '011001...'
            for y, row in enumerate(noisy):
                s = ''.join(str(b) for b in row)
                line_vars[y].set_value(ua.Variant(s, ua.VariantType.String))

            ts = datetime.utcnow().isoformat() + "Z"
            ts_var.set_value(ua.Variant(ts, ua.VariantType.String))

            frame += 1
            print(f"[server] frame {frame} published ts={ts}")
            time.sleep(UPDATE_INTERVAL)

    except KeyboardInterrupt:
        print("[server] shutting down...")
    finally:
        server.stop()
        print("[server] stopped")

Como veis con esto exponemos un servidor OPC-UA en opc.tcp://<IP>:4840 (por defecto localhost:4840 al estar en local). El primer paso es confirmar que el endpoint está vivo y accesible.

Podemos hacer esto con cualquier cliente OPC-UA básico:

o usando un script Python sencillo para listar nodos:

from opcua import Client

ENDPOINT = "opc.tcp://localhost:4840"

def print_node_tree(node, indent=0, max_depth=2):
    if indent > max_depth:
        return
    try:
        name = node.get_display_name().Text
        print("  " * indent + f"- {name}")
        for child in node.get_children():
            print_node_tree(child, indent + 1, max_depth)
    except Exception as e:
        print("  " * indent + f"[error reading node: {e}]")

if __name__ == "__main__":
    print(f"Conectando a {ENDPOINT}...")
    client = Client(ENDPOINT)
    client.connect()
    print("Conectado.")

    root = client.get_root_node()
    objects = root.get_child(["0:Objects"])

    print("Lista de nodos bajo 'Objects':")
    print_node_tree(objects)

    client.disconnect()
    print("Desconectado.")


Con esto confirmamos que  el objeto "Approach Light" con sus variables está expuesto.

Ahora que sabemos que las variables están ahí, vamos a leer el valor de unas cuantas líneas (por ejemplo, line_00, line_01) y el timestamp ts para ver qué nos devuelve el servidor.

from opcua import Client
import time

ENDPOINT = "opc.tcp://localhost:4840"

if __name__ == "__main__":
    print(f"Conectando a {ENDPOINT}...")
    client = Client(ENDPOINT)
    client.connect()
    print("Conectado.")

    root = client.get_root_node()
    objects = root.get_child(["0:Objects"])

    approach = None
    for child in objects.get_children():
        name = child.get_display_name().Text
        if name == "Approach Light":
            approach = child
            break

    if approach is None:
        print("No se encontró el nodo 'Approach Light'")
        client.disconnect()
        exit(1)

    # Leemos 3 líneas y timestamp 3 veces para ver cómo cambian
    for i in range(3):
        line0 = approach.get_child([f"{approach.nodeid.NamespaceIndex}:line_00"]).get_value()
        line1 = approach.get_child([f"{approach.nodeid.NamespaceIndex}:line_01"]).get_value()
        ts = approach.get_child([f"{approach.nodeid.NamespaceIndex}:ts"]).get_value()

        print(f"Frame {i+1} ts={ts}")
        print(f"line_00: {line0}")
        print(f"line_01: {line1}")
        print("-" * 30)
        time.sleep(1)

    client.disconnect()
    print("Desconectado.")

Vemos claramente los valores binarios por línea, y el timestamp que cambia cada segundo.

Ahora que vemos que todo funciona como debería funcionar, nuestro script para solucionar el reto debería hacer lo siguiente:

  • Leer varias frames (ej. 80 frames)

  • Guardar las líneas de bits en una lista

  • Para cada pixel (x,y), hacer majority vote (el bit que más aparece)

  • Armar una imagen binaria con los bits reconstruidos

  • Guardarla y, si tienes pyzbar instalado, decodificar el QR

from opcua import Client
from PIL import Image
import numpy as np

ENDPOINT = "opc.tcp://localhost:4840"
QR_SIZE = 35
SCALE = 10  # Escalar la imagen para verla mejor

def fetch_lines(client, approach, ns):
    lines = []
    for i in range(QR_SIZE):
        node = approach.get_child([f"{ns}:line_{i:02d}"])
        val = node.get_value()
        if len(val) != QR_SIZE:
            print(f"[warn] Línea {i:02d} no tiene longitud {QR_SIZE}: {len(val)}")
        else:
            print(f"line_{i:02d}: {val[:20]}...")  # Mostrar solo primeros 20 bits para no saturar
        lines.append(val)
    return lines

def build_qr_image(lines):
    img_array = np.zeros((QR_SIZE, QR_SIZE), dtype=np.uint8)

    for i, line in enumerate(lines):
        for j, c in enumerate(line):
            img_array[i, j] = 255 if c == '1' else 0

    img = Image.fromarray(img_array, mode='L')
    img = img.resize((QR_SIZE * SCALE, QR_SIZE * SCALE), Image.NEAREST)
    img.save("qr_recon.png")
    print("[*] Imagen QR reconstruida y guardada como 'qr_recon.png' (escalada para mejor visualización).")

def main():
    print(f"[client] Conectando a {ENDPOINT}...")
    client = Client(ENDPOINT)
    client.connect()
    print("[client] Conectado.")

    root = client.get_root_node()
    objects = root.get_child(["0:Objects"])

    approach = None
    for child in objects.get_children():
        if child.get_display_name().Text == "Approach Light":
            approach = child
            break

    if approach is None:
        print("[client] 'Approach Light' no encontrado.")
        client.disconnect()
        return

    children = approach.get_children()
    ns = children[0].get_browse_name().NamespaceIndex
    print(f"[client] Namespace detectado: {ns}")

    lines = fetch_lines(client, approach, ns)
    build_qr_image(lines)

    client.disconnect()
    print("[client] Desconectado.")

if __name__ == "__main__":
    main()
Si lo ejecutamos vemos la magia:

Y tenemos el QR generado:


Lo curioso es que si lo leemos desde consola con zbarimg u otro script con numpy, opencv, etc. fallaremos al obtener la flag, pero si lo leemos con el móvil....

Es bastante normal, a veces los lectores en consola tienen menos tolerancia a pequeñas imperfecciones o ruido en el QR, mientras que las apps móviles suelen usar algoritmos más robustos para decodificar, incluso si la imagen no es perfecta.

Así que esto es todo, espero que os haya gustado y que por unos momentos hayáis podido sentiros como uno de los afortunados participantes en Defcon 31 ;)

Comentarios