tuya-cloudcutter

A tool that disconnects Tuya IoT devices from the cloud, allowing them to run completely locally.

import json
import sys
from enum import Enum
from glob import glob
from os import listdir, makedirs
from os.path import abspath, basename, isdir, isfile, join

import click
import inquirer
import requests


class FirmwareType(Enum):
    INVALID = 0
    IGNORED_HEADER = 1
    IGNORED_FILENAME = 2
    VALID_UG = 3
    VALID_UF2 = 4


UF2_UG_SUFFIX = "-extracted.ug.bin"
UF2_FAMILY_MAP = {
    "bk7231t": 0x675A40B0,
    "bk7231n": 0x7B3EF230,
}


def api_get(path):
    with requests.get(f"https://tuya-cloudcutter.github.io/api/{path}") as r:
        if r.status_code == 404:
            print("The specified device does not exist in the API.")
            exit(1)
        if r.status_code != 200:
            print("API request failed. Make sure you have an Internet connection.")
            exit(1)
        return r.json()


def ask_options(text, options):
    res = inquirer.prompt(
        [
            inquirer.List(
                "result",
                carousel=True,
                message=text,
                choices=options,
            )
        ], theme=inquirer.themes.load_theme_from_dict({ "List": { "selection_color": "underline", "selection_cursor": "►" } })
    )
    if res is None:
        # Ctrl+C
        exit(1)
    return res["result"]


def ask_files(text, dir):
    files = [
        path
        for path in listdir(dir)
        if not path.startswith(".") and isfile(join(dir, path))
    ]
    path = ask_options(text, sorted(files, key=str.casefold))
    return abspath(join(dir, path))


def ask_dirs(text, dir):
    files = [
        path
        for path in listdir(dir)
        if not path.startswith(".") and isdir(join(dir, path)) and path != "schema"
    ]
    path = ask_options(text, sorted(files, key=str.casefold))
    return abspath(join(dir, path))


def ask_device_base(devices):
    brands = sorted(set(device["manufacturer"] for device in devices))
    manufacturer = ask_options("Select the brand of your device", brands)
    names = sorted(
        set(
            device["name"]
            for device in devices
            if device["manufacturer"] == manufacturer
        )
    )
    name = ask_options("Select the article number of your device", names)
    return next(
        device
        for device in devices
        if device["manufacturer"] == manufacturer and device["name"] == name
    )


def ask_profile_base(profiles):
    profiles = {
        f"{profile['name']} / {profile['sub_name']}": profile
        for profile in profiles
        if profile["type"] == "CLASSIC"
    }
    names = sorted(set(profiles.keys()))
    name = ask_options("Select the firmware version and name", names)
    return profiles[name]


def download_profile(device_slug):
    device = api_get(f"devices/{device_slug}.json")
    profiles = device["profiles"]
    profile_slug = profiles[0]["slug"]
    profile = api_get(f"profiles/{profile_slug}.json")
    return device, profile


def save_profile(profile_dir, device, profile):
    makedirs(profile_dir, exist_ok=True)
    with open(join(profile_dir, "device.json"), "w") as f:
        json.dump(device, f, indent="\t")
    with open(join(profile_dir, "profile.json"), "w") as f:
        json.dump(profile, f, indent="\t")


def load_profile(profile_dir):
    device, profile = None, None
    for file in glob(join(profile_dir, "*.json")):
        with open(file, "r") as f:
            try:
                data = json.load(f)
            except:
                print(
                    f"File {file} does not contain valid JSON. "
                    "Please update your file and try again."
                )
                exit(53)
        # match characteristic keys
        if "profiles" in data:
            device = data
            continue
        if "firmware" in data:
            profile = data
            continue
        if device and profile:
            break
    return device, profile


def save_combined_profile(profile_dir, device, profile):
    makedirs(profile_dir, exist_ok=True)
    combined = {
        "slug": basename(profile_dir),
        "device": device,
        "profile": profile,
    }
    combined_path = join(profile_dir, "combined.json")
    with open(combined_path, "w") as f:
        json.dump(combined, f, indent="\t")
    return abspath(combined_path)

def validate_firmware_file_internal(firmware: str, chip: str = None) -> FirmwareType:
    FILE_MAGIC_DICT = {
        b"RBL\x00": "RBL",
        b"\x43\x09\xb5\x96": "QIO",
        b"\x2f\x07\xb5\x94": "UA",
        b"\x55\xAA\x55\xAA": "UG",
        b"UF2\x0A": "UF2",
    }

    base = basename(firmware)
    with open(firmware, "rb") as fs:
        header = fs.read(512)

    magic = header[0:4]
    if magic not in FILE_MAGIC_DICT or len(header) < 512:
        print(
            f"!!! Unrecognized file type - '{base}' is not a UG or UF2 file.",
            file=sys.stderr,
        )
        return FirmwareType.INVALID
    file_type = FILE_MAGIC_DICT[magic]

    if file_type not in ["UG", "UF2"]:
        print(
            f"!!! File {base} is a '{file_type}' file! Please provide an UG file.",
            file=sys.stderr,
        )
        return FirmwareType.INVALID

    if file_type == "UG":
        # check LibreTiny UG version tag (chip type)
        rbl_ver = header[32 + 12 + 16 : 32 + 12 + 16 + 24]
        if b"bk7231" in rbl_ver:
            if chip and chip.encode() not in rbl_ver:
                # wrong chip type
                return FirmwareType.IGNORED_HEADER
            # correct chip type
            return FirmwareType.VALID_UG
        # check chip by filename
        if "bk7231" in base.lower():
            if chip and chip not in base.lower():
                # wrong chip type
                return FirmwareType.IGNORED_FILENAME
            # correct chip type
            return FirmwareType.VALID_UG
        print(
            f"!!! Can't verify chip type of UG file '{base}' - "
            "make sure that BK7231T or BK7231N is present in the filename!",
            file=sys.stderr,
        )
        return FirmwareType.INVALID

    if file_type == "UF2":
        if not chip:
            return FirmwareType.IGNORED_HEADER
        try:
            from ltchiptool import get_version
            from uf2tool.models import Block
        except (ImportError, ModuleNotFoundError) as e:
            print(
                f"!!! Can't read file '{base}' because ltchiptool is not installed. "
                "Ignoring UF2 file.",
                file=sys.stderr,
            )
            return FirmwareType.INVALID
        get_version()
        block = Block()
        block.decode(header)
        if UF2_FAMILY_MAP[chip] != block.family.id:
            return FirmwareType.IGNORED_HEADER
        return FirmwareType.VALID_UF2
    
def extract_uf2(file_with_path: str, firmware_dir: str, chip: str) -> str:
    target = file_with_path + "-" + chip.lower() + UF2_UG_SUFFIX
    print(f"Extracting UF2 package as '{basename(target)}'")

    from ltchiptool.util.intbin import inttobe32
    from uf2tool import OTAScheme, UploadContext
    from uf2tool.models import UF2

    with open(file_with_path, "rb") as f:
        uf2 = UF2(f)
        uf2.read()
        uctx = UploadContext(uf2)

    # BK7231 is single-OTA
    data = uctx.collect_data(OTAScheme.DEVICE_SINGLE)
    if len(data) != 1:
        print("!!! Incompatible UF2 package - got too many chunks!")
        exit(2)
    _, io = data.popitem()
    rbl = io.read()

    file_with_path = abspath(join(firmware_dir, target))
    with open(file_with_path, "wb") as f:
        # build Tuya UG header
        header = b"\x55\xAA\x55\xAA"
        header += b"1.0.0".ljust(12, b"\x00")
        header += inttobe32(len(rbl))
        header += inttobe32(sum(rbl))
        header += inttobe32(sum(header))
        header += b"\xAA\x55\xAA\x55"
        f.write(header)
        # write RBL data
        f.write(rbl)
    return file_with_path


@click.group()
@click.option(
    "-w",
    "--workdir",
    type=click.Path(exists=True, file_okay=False),
    required=True,
)
@click.option(
    "-o",
    "--output",
    type=click.File(mode="w"),
    required=True,
)
@click.pass_context
def cli(ctx, workdir: str, output: click.File):
    ctx.ensure_object(dict)
    ctx.obj["firmware_dir"] = join(workdir, "custom-firmware")
    ctx.obj["profiles_dir"] = join(workdir, "device-profiles")
    ctx.obj["output"] = output


@cli.command()
@click.argument("slug", type=str)
@click.pass_context
def write_profile(ctx, slug: str):
    device_slug = slug
    profiles_dir = ctx.obj["profiles_dir"]
    profile_dir = join(profiles_dir, device_slug)
    # try to find device and profile JSON files
    device, profile = None, None
    if isdir(profile_dir):
        device, profile = load_profile(profile_dir)
        if device is None or profile is None:
            print(
                "Custom device or profile is not present, "
                "attempting to download from API."
            )
    if device is None or profile is None:
        device, profile = download_profile(device_slug)
        save_profile(profile_dir, device, profile)
    # write profile data if found
    path = save_combined_profile(profile_dir, device, profile)
    ctx.obj["output"].write(path)


@cli.command()
@click.option(
    "-f",
    "--flashing",
    is_flag=True,
)
@click.pass_context
def choose_profile(ctx, flashing: bool = False):
    profiles_dir = ctx.obj["profiles_dir"]
    device_slug = None
    opts = [
        "By manufacturer/device name",
        "By firmware version and name",
        "From device-profiles (i.e. custom profile)",
    ]
    mode = ask_options("How do you want to choose the device?", opts)
    if mode == opts[0]:
        device_slug = ask_device_base(api_get("devices.json"))["slug"]
        device = api_get(f"devices/{device_slug}.json")
        profiles = device["profiles"]
        profile_slug = ask_profile_base(profiles)["slug"]
        profile = api_get(f"profiles/{profile_slug}.json")
    elif mode == opts[1]:
        profile_slug = ask_profile_base(api_get("profiles.json"))["slug"]
        profile = api_get(f"profiles/{profile_slug}.json")
        devices = profile["devices"]
        if flashing:
            device_slug = devices[0]["slug"]
        else:
            device_slug = ask_device_base(devices)["slug"]
        device = api_get(f"devices/{device_slug}.json")
    elif mode == opts[2]:
        profile_dir = ask_dirs("Select device profile", profiles_dir)
        device, profile = load_profile(profile_dir)
    else:
        exit(2)

    if device_slug is not None:
        profile_dir = join(profiles_dir, device_slug)
        save_profile(profile_dir, device, profile)

    path = save_combined_profile(profile_dir, device, profile)
    ctx.obj["output"].write(path)


@cli.command()
@click.option(
    "-c",
    "--chip",
    type=click.Choice(["bk7231t", "bk7231n"], case_sensitive=False),
    default=None,
)
@click.pass_context
def choose_firmware(ctx, chip: str = None):
    chip = chip and chip.upper()
    firmware_dir = ctx.obj["firmware_dir"]
    files = listdir(firmware_dir)
    options = {}
    invalid_filenames = {}
    for file in files:
        if file.startswith(".") or file.endswith(".md"):
            continue
        if file.endswith(UF2_UG_SUFFIX):
            continue
        path = join(firmware_dir, file)
        fw_type = validate_firmware_file_internal(path, chip and chip.lower())
        if fw_type in [FirmwareType.VALID_UG, FirmwareType.VALID_UF2]:
            options[file] = fw_type
        elif fw_type in [FirmwareType.INVALID]:
            invalid_filenames[file] = file

    if not options:
        print(
            "No valid custom firmware files were found!\n"
            "Add files to the custom-firmware/ directory first.",
            file=sys.stderr,
        )
        exit(1)
        
    if invalid_filenames:
        print("\nThe following files were ignored because they do not meet naming requirements and the chip type could not be determined:")
        for invalid_filename in invalid_filenames:
            print(invalid_filename)
        print("Please see https://github.com/tuya-cloudcutter/tuya-cloudcutter/tree/main/custom-firmware#naming-rules for more information.\n")

    prompt = "Select your custom firmware file"
    if chip:
        prompt += f" for {chip} chip"

    file = ask_options(prompt, sorted(options.keys(), key=str.casefold))
    file_with_path = abspath(join(firmware_dir, file))
    fw_type = options[file]

    if fw_type == FirmwareType.VALID_UF2:
        file_with_path = extract_uf2(file_with_path, firmware_dir, chip)

    ctx.obj["output"].write(basename(file_with_path))

@cli.command()
@click.argument("filename", type=str)
@click.option(
    "-c",
    "--chip",
    type=click.Choice(["bk7231t", "bk7231n"], case_sensitive=False),
    default=None,
)
@click.pass_context
def validate_firmware_file(ctx, filename: str, chip: str = None):
    chip = chip and chip.upper()
    firmware_dir = ctx.obj["firmware_dir"]
    fw_type = validate_firmware_file_internal(join(firmware_dir, filename), chip and chip.lower())
    if fw_type not in [FirmwareType.VALID_UG, FirmwareType.VALID_UF2]:
        print(
            f"The firmware file supplied ({filename}) is not valid for the chosen profile type of {chip}",
            file=sys.stderr,
        )
        exit(1)

    file_with_path = abspath(join(firmware_dir, filename))

    if fw_type == FirmwareType.VALID_UF2:
        file_with_path = extract_uf2(file_with_path, firmware_dir, chip)

    ctx.obj["output"].write(basename(file_with_path))

if __name__ == "__main__":
    cli(obj={})