tuya-cloudcutter

A tool that disconnects Tuya IoT devices from the cloud, allowing them to run completely locally.

import argparse
from datetime import datetime, timedelta
import hmac
import json
import os
import re
import sys
import time
from hashlib import sha256
from traceback import print_exc

import tornado.httpserver
import tornado.ioloop
import tornado.web
from tornado.log import enable_pretty_logging

import tinytuya.tinytuya as tinytuya

from .crypto.pskcontext import PSKContext
from .device import DEFAULT_AUTH_KEY, DeviceConfig
from .exploit import (build_network_config_packet, exploit_device_with_config,
                      create_device_specific_config, send_network_config_datagram)
from .protocol import handlers, mqtt
from .protocol.transformers import ResponseTransformer

payload_trigger_time = None


def __configure_local_device_response_transformers(config):
    return [
        ResponseTransformer({"timestamp", "t", "time"}, lambda _: int(time.time())),
        ResponseTransformer({"devId", "deviceId"}, lambda _: config.get(DeviceConfig.DEVICE_ID)),
        ResponseTransformer({"secKey"}, lambda _: config.get(DeviceConfig.SEC_KEY)),
        ResponseTransformer({"localKey"}, lambda _: config.get(DeviceConfig.LOCAL_KEY)),
        ResponseTransformer({"pskKey", "psk_key"}, lambda _: "")
    ]


def __configure_ssid_on_device(ip: str, config: DeviceConfig, ssid: str, password: str):
    try:
        device_id = config.get(DeviceConfig.DEVICE_ID)
        local_key = config.get(DeviceConfig.LOCAL_KEY)
        device = tinytuya.Device(device_id, ip, local_key)
        device.connection_timeout = 0.5

        payload = {"ssid": ssid}
        if password:
            payload["passwd"] = password

        device.version = 3.3
        parsed_data = device.updatedps() or {}
        device.set_version(3.3)

        trials = 0
        while parsed_data is not None and "Err" not in parsed_data and trials < 5:
            # Once device joins SSID, we get a timeout / connection error, which adds "Err" attribute. We'll wait for that to happen.
            parsed_data = device._send_receive(device.generate_payload_raw(command=0x0f, retcode=0x0, data=payload, skip_header=False), minresponse=0)
            trials += 1
            time.sleep(0.5)

        if trials >= 5:
            print(f"Device Id: {device_id}")
            print(f"Local Key: {local_key}")
            print("Failed to set the WiFi AP creds on the device, latest error:")
            print(parsed_data)
            sys.exit(80)

        print(f"Device should be successfully onboarded on WiFi AP!  Please allow up to 2 minutes for the device to connect to your specified network.")
        print(f"Device MAC address: {handlers.device_mac}")
        print(f"Device Id: {device_id}")
        print(f"Local Key: {local_key}")

        sys.exit(0)
    except Exception:
        print_exc()
        sys.exit(90)


def __trigger_firmware_update(config: DeviceConfig, args):
    device_id = config.get(DeviceConfig.DEVICE_ID)
    local_key = config.get(DeviceConfig.LOCAL_KEY)

    mqtt.trigger_firmware_update(device_id=device_id, local_key=local_key, protocol="2.2", broker="127.0.0.1", verbose_output=args.verbose_output)


def __configure_local_device_or_update_firmware(args, update_firmware: bool = False):
    if not os.path.exists(args.config):
        print(f"Configuration file {args.config} does not exist", file=sys.stderr)
        sys.exit(10)

    if not os.path.isfile(args.profile):
        print(f"Provided device profile JSON {args.profile} does not exist, or is not a file", file=sys.stderr)
        sys.exit(30)

    config = DeviceConfig.read(args.config)
    authkey, uuid, pskkey = config.get_bytes(DeviceConfig.AUTH_KEY, default=DEFAULT_AUTH_KEY), config.get_bytes(DeviceConfig.UUID), config.get_bytes(DeviceConfig.PSK, default="")
    if len(pskkey) == 0:
        pskkey = None
    context = PSKContext(authkey=authkey, uuid=uuid, psk=pskkey)
    device_id, local_key = config.get(DeviceConfig.DEVICE_ID), config.get(DeviceConfig.LOCAL_KEY)
    flash_timeout = 15
    if args.flash_timeout is not None:
        flash_timeout = args.flash_timeout
    mqtt.mqtt_connect(device_id, local_key, tornado.ioloop.IOLoop.current(), graceful_exit_timeout=flash_timeout, verbose_output=args.verbose_output)

    with open(args.profile, "r") as f:
        combined = json.load(f)
        device = combined["device"]

    def trigger_payload_endpoint_hook(handler, *_):
        if update_firmware:
            task_function = __trigger_firmware_update
            task_args = (config, args)
        else:
            task_args = (handler.request.remote_ip, config, args.ssid, args.password)
            task_function = __configure_ssid_on_device

        tornado.ioloop.IOLoop.current().call_later(0, task_function, *task_args)

        return None

    def upgrade_endpoint_hook(handler, *_):
        global payload_trigger_time
        # Don't allow duplicates in a short period of time, but allow re-triggering if a new connection is made.
        if payload_trigger_time is not None and payload_trigger_time + timedelta(minutes=1) > datetime.now():
            print("Discarding duplicate upgrade request to avoid race condition.")
            return { "result": { "success": True, "t": int(time.time()) }}
        
        payload_trigger_time = datetime.now()
        with open(args.firmware, "rb") as fs:
            upgrade_data = fs.read()
        sec_key = config.get_bytes(DeviceConfig.SEC_KEY)
        file_sha = sha256(upgrade_data).hexdigest().upper().encode("utf-8")
        file_hmac = hmac.digest(sec_key, file_sha, sha256).hex().upper()
        firmware_filename = os.path.basename(args.firmware)

        return {
            "result": {
                "url": f"http://{args.ip}:80/files/{firmware_filename}",
                "hmac": file_hmac,
                "version": "9.0.0",
                "size": str(len(upgrade_data)),
                "type": 0,
            },
            "success": True,
            "t": int(time.time())
        }

    def active_endpoint_hook(handler, *_):
        # active should reset payload trigger time, in case the device reconnected and asked to activate.
        global payload_trigger_time
        payload_trigger_time = None
        schema_id, schema = list(device["schemas"].items())[0]
        
        # Trigger the payload after active has fully registered.
        tornado.ioloop.IOLoop.current().call_later(2, trigger_payload_endpoint_hook, *(handler, None))
        
        return {
            "result": {
                "schema": json.dumps(schema, separators=(',', ':')),
                "devId": "DUMMY",
                "resetFactory": False,
                "timeZone": "+02:00",
                "capability": 1025,
                "secKey": "DUMMY",
                "stdTimeZone": "+01:00",
                "schemaId": schema_id,
                "dstIntervals": [],
                "localKey": "DUMMY",
            },
            "success": True,
            "t": int(time.time())
        }

    response_transformers = __configure_local_device_response_transformers(config)
    endpoint_hooks = {
        "tuya.device.active": active_endpoint_hook,
    }

    if update_firmware:
        endpoint_hooks.update({
            "tuya.device.upgrade.get": upgrade_endpoint_hook,
            "tuya.device.upgrade.silent.get": upgrade_endpoint_hook,
        })

    application = tornado.web.Application([
        (r'/v1/url_config', handlers.GetURLHandlerV1, dict(ipaddr=args.ip, verbose_output=args.verbose_output)),
        (r'/v2/url_config', handlers.GetURLHandlerV2, dict(ipaddr=args.ip, verbose_output=args.verbose_output)),
        # 2018 SDK specific endpoint
        (r'/device/url_config', handlers.OldSDKGetURLHandler, dict(ipaddr=args.ip, verbose_output=args.verbose_output)),
        (r'/d.json', handlers.DetachHandler, dict(schema_directory=args.schema, response_transformers=response_transformers, config=config, endpoint_hooks=endpoint_hooks, verbose_output=args.verbose_output)),
        (f'/files/(.*)', handlers.OTAFilesHandler, dict(path="/work/custom-firmware/", graceful_exit_timeout=args.flash_timeout, verbose_output=args.verbose_output)),
    ])

    http_server = tornado.httpserver.HTTPServer(application)
    http_server.listen(80)

    https_server = tornado.httpserver.HTTPServer(application, ssl_options=context)
    https_server.listen(443)

    # 2018 SDK seems to request that port for some reason
    dns_https_server = tornado.httpserver.HTTPServer(application, ssl_options=context)
    dns_https_server.listen(4433)

    tornado.ioloop.IOLoop.current().start()
    sys.exit(0)


def __update_firmware(args):
    if not os.path.isfile(args.firmware):
        # try as a relative path
        args.firmware = os.path.join(args.firmware_dir, args.firmware)
    if not os.path.isfile(args.firmware):
        print(f"Firmware {args.firmware} does not exist or not a file.", file=sys.stderr)
        sys.exit(50)

    UG_FILE_MAGIC = b"\x55\xAA\x55\xAA"
    FILE_MAGIC_DICT = {
        b"RBL\x00": "RBL",
        b"\x43\x09\xb5\x96": "QIO",
        b"\x2f\x07\xb5\x94": "UA"
    }

    with open(args.firmware, "rb") as fs:
        magic = fs.read(4)
        error_code = 0
        if magic in FILE_MAGIC_DICT:
            print(f"Firmware {args.firmware} is an {FILE_MAGIC_DICT[magic]} file! Please provide a UG file.", file=sys.stderr)
            error_code = 51
        elif magic != UG_FILE_MAGIC:
            print(f"Firmware {args.firmware} is not a UG file.", file=sys.stderr)
            error_code = 52
        else:
            # File is a UG file
            error_code = 0
            pass

        if error_code != 0:
            sys.exit(error_code)

    __configure_local_device_or_update_firmware(args, update_firmware=True)


def __exploit_device(args):
    output_dir = args.output_directory
    if not (os.path.exists(output_dir) and os.path.isdir(output_dir)):
        print(f"Provided output directory {output_dir} does not exist or not a directory", file=sys.stderr)
        sys.exit(60)

    try:
        with open(args.profile, "r") as fs:
            combined = json.load(fs)
    except (OSError, KeyError):
        print(f"Could not load profile {args.profile}. Are you sure the profile file exists and is a valid combined JSON?", file=sys.stderr)
        sys.exit(65)

    device_config = exploit_device_with_config(args, combined)
    device_uuid = device_config.get(DeviceConfig.UUID)

    output_path = os.path.join(output_dir, f"{device_uuid}.deviceconfig")
    device_config.write(output_path)

    print("Exploit run, saved device config too!")

    # To communicate with external scripts
    print(f"output={output_path}")


def __write_deviceconfig(args):
    output_dir = args.output_directory
    if not (os.path.exists(output_dir) and os.path.isdir(output_dir)):
        print(f"Provided output directory {output_dir} does not exist or not a directory", file=sys.stderr)
        sys.exit(60)

    try:
        with open(args.profile, "r") as fs:
            combined = json.load(fs)
    except (OSError, KeyError):
        print(f"Could not load profile {args.profile}. Are you sure the profile file exists and is a valid combined JSON?", file=sys.stderr)
        sys.exit(65)

    device_config = create_device_specific_config(args, combined, args.uuid, args.auth_key, args.psk_key)
    output_path = os.path.join(output_dir, f"{args.uuid}.deviceconfig")
    device_config.write(output_path)

    print("Saved device config.")

    # To communicate with external scripts
    print(f"output={output_path}")


def __configure_wifi(args):
    SSID = args.SSID
    password = args.password

    # Pass the payload through the json module specifically
    # to avoid issues with special chars (e.g. ") in either
    # SSIDs or passwords.
    payload = {"ssid": SSID, "token": "AAAAAAAA"}

    # Configure the password ONLY if it's present
    # Some devices may parse incorrectly otherwise
    if password:
        payload["passwd"] = password

    payload = json.dumps(payload)

    datagram = build_network_config_packet(payload.encode('ascii'))
    # Send the configuration diagram a few times with minor delay
    # May improve reliability in some setups
    for _ in range(5):
        send_network_config_datagram(datagram)
        time.sleep(0.300)
    print(f"Configured device to connect to '{SSID}'")


def __validate_localapicredential_arg(length):
    def check_arg(value):
        if (len(value) == 0):
            return value
        elif (len(value) != length):
            raise argparse.ArgumentTypeError("%s length is invalid, it must be %s characters long" % value, length)
        elif (not re.compile('[a-zA-Z0-9]').match(value)):
            raise argparse.ArgumentTypeError("%s value is invalid, it must contain only letters or numbers" % value)
        return value
    return check_arg


def parse_args():
    parser = argparse.ArgumentParser(
        prog="cloudcutter",
        description="Detach tuya devices from the cloud or install custom firmware on them",
    )

    subparsers = parser.add_subparsers(dest="command", required=True, help="subcommand to execute")

    parser_configure = subparsers.add_parser("configure_local_device", help="Configure detached device with local keys and onboard it on desired WiFi AP")
    parser_configure.add_argument("profile", help="Device profile directory to use for detaching")
    parser_configure.add_argument("schema", help="Endpoint schemas directory to use for detaching")
    parser_configure.add_argument("config", help="Device configuration file")
    parser_configure.add_argument("flash_timeout", help="Not used for cutting mode", type=int)
    parser_configure.add_argument("verbose_output", help="Flag for more verbose output, 'true' for verbose output", type=bool)
    parser_configure.add_argument(
        "--ip",
        dest="ip",
        default="10.42.42.1",
        help="IP address to listen on and respond to the devices with (default: 10.42.42.1)",
    )
    parser_configure.add_argument(
        "--ssid",
        required=True,
        help="SSID that the device will be onboarded on after configuration",
    )
    parser_configure.add_argument(
        "--password",
        required=False,
        default="",
        help="Password of the SSID for device onboarding (default: empty)",
    )
    parser_configure.set_defaults(handler=__configure_local_device_or_update_firmware)

    parser_update_firmware = subparsers.add_parser("update_firmware", help="Update the device's firmware")
    parser_update_firmware.add_argument("profile", help="Device profile JSON file (combined)")
    parser_update_firmware.add_argument("schema", help="Endpoint schemas directory to use for updating")
    parser_update_firmware.add_argument("config", help="Device configuration file")
    parser_update_firmware.add_argument("firmware_dir", help="Directory containing firmware images")
    parser_update_firmware.add_argument("firmware", help="OTA firmware image to update the device to")
    parser_update_firmware.add_argument("flash_timeout", help="Number of seconds to wait before exiting after receiving flash", type=int)
    parser_update_firmware.add_argument("verbose_output", help="Flag for more verbose output, 'true' for verbose output", type=bool)
    parser_update_firmware.add_argument(
        "--ip",
        dest="ip",
        default="10.42.42.1",
        help="IP address to listen on and respond to the devices with (default: 10.42.42.1)",
    )
    parser_update_firmware.set_defaults(handler=__update_firmware)

    parser_exploit_device = subparsers.add_parser(
        "exploit_device",
        help="Exploit a device - requires that the attacking system is on the device's AP"
    )
    parser_exploit_device.add_argument("profile", help="Device profile JSON file (combined)")
    parser_exploit_device.add_argument("verbose_output", help="Flag for more verbose output, 'true' for verbose output", type=bool)
    parser_exploit_device.add_argument(
        "--output-directory",
        dest="output_directory",
        required=False,
        default="/work/configured-devices",
        help="A directory to which the modified device parameters file will be written (default: <workdir>/configured-devices)"
    )
    parser_exploit_device.add_argument(
        "--deviceid",
        dest="device_id",
        required=False,
        default="",
        help="deviceid assigned to the device (default: Random)",
        type=__validate_localapicredential_arg(20),
    )
    parser_exploit_device.add_argument(
        "--localkey",
        dest="local_key",
        required=False,
        default="",
        help="localkey assigned to the device (default: Random)",
        type=__validate_localapicredential_arg(16),
    )
    parser_exploit_device.set_defaults(handler=__exploit_device)

    parser_write_deviceconfig = subparsers.add_parser(
        "write_deviceconfig",
        help="Write the deviceconfig to use to for Tuya API emulation."
    )
    parser_write_deviceconfig.add_argument("profile", help="Device profile JSON file (combined)")
    parser_write_deviceconfig.add_argument("verbose_output", help="Flag for more verbose output, 'true' for verbose output", type=bool)
    parser_write_deviceconfig.add_argument(
        "--output-directory",
        dest="output_directory",
        required=False,
        default="/work/configured-devices",
        help="A directory to which the modified device parameters file will be written (default: <workdir>/configured-devices)"
    )
    parser_write_deviceconfig.add_argument(
        "--deviceid",
        dest="device_id",
        required=False,
        default="",
        help="deviceid assigned to the device (default: Random)",
        type=__validate_localapicredential_arg(20),
    )
    parser_write_deviceconfig.add_argument(
        "--localkey",
        dest="local_key",
        required=False,
        default="",
        help="localkey assigned to the device (default: Random)",
        type=__validate_localapicredential_arg(16),
    )
    parser_write_deviceconfig.add_argument(
        "--authkey",
        dest="auth_key",
        required=True,
        default="",
        help="authkey assigned to the device (default: Random)",
        type=__validate_localapicredential_arg(32),
    )
    parser_write_deviceconfig.add_argument(
        "--uuid",
        dest="uuid",
        required=True,
        default="",
        help="uuid assigned to the device (default: Random)",
        type=__validate_localapicredential_arg(16),
    )
    parser_write_deviceconfig.add_argument(
        "--pskkey",
        dest="psk_key",
        required=True,
        default="",
        help="pskkey assigned to the device (default: Random)",
        type=__validate_localapicredential_arg(37),
    )
    parser_write_deviceconfig.set_defaults(handler=__write_deviceconfig)

    parser_configure_wifi = subparsers.add_parser(
        "configure_wifi",
        help="Makes a device to which you're connected via its AP mode join a given WiFi network"
    )
    parser_configure_wifi.add_argument("SSID", help="WiFi access point name to make the device join")
    parser_configure_wifi.add_argument("password", help="WiFi access point password")
    parser_configure_wifi.add_argument("verbose_output", help="Flag for more verbose output, 'true' for verbose output", type=bool)
    parser_configure_wifi.set_defaults(handler=__configure_wifi)

    return parser.parse_args()


args = parse_args()
args.handler(args)

if args.verbose_output:
    # Enable tornado pretty logging for more verbose output by default
    enable_pretty_logging()