tuya-cloudcutter

A tool that disconnects Tuya IoT devices from the cloud, allowing them to run completely locally.

#!/usr/bin/env python3
"""
This example script configures an RGB downlight that has just been flashed with the
OpenBK7231N_App firmware.

It was used on a Ubuntu 20.04 machine, but should be customized for your own use case.
Search this file for "here>" to find the places that need to be customized.

See scripts/README.md for example usage


You will need to install requests and paho-mqtt:
pip install requests paho-mqtt
"""
import subprocess
import os
import time
import requests
import pathlib

import paho.mqtt.client as mqtt

AP_PREFIX = "OpenBK7231N_"

MQTT_HOST = os.environ["MQTT_HOST"]
SWITCH_TOPIC = os.environ["SWITCH_TOPIC"]


mqtt_client = mqtt.Client()


COLOR_MAP = {
    "1": "#2200000000",
    "2": "#0022000000",
    "3": "#0000220000",
    "4": "#2200220000",
}


def get_wifi_adapter():
    # Get the wifi adapter name
    # Return the adapter name
    p = subprocess.run(
        ["nmcli", "dev", "status"],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True,
    )
    for l in p.stdout.splitlines():
        if " wifi " in l:
            return l.split()[0]
    raise ValueError("Failed to find wifi adapter")


def list_aps():
    # List the available APs
    # Return a list of APs

    subprocess.run(["nmcli", "radio", "wifi", "off"])
    time.sleep(1)
    subprocess.run(["nmcli", "radio", "wifi", "on"])
    time.sleep(1)

    p = subprocess.run(
        [
            "nmcli",
            "-t",
            "-f",
            "SSID,SECURITY",
            "dev",
            "wifi",
            "list",
            "--rescan",
            "yes",
        ],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True,
    )
    open_ssids = []
    for l in p.stdout.splitlines():
        ssid, security = l.split(":")
        if security == "":
            open_ssids.append(ssid)
    return open_ssids


def find_ap():
    # Ensure the interface is in managed mode
    subprocess.run(["nmcli", "device", "set", get_wifi_adapter(), "managed", "yes"])
    while 1:
        print("Scanning for APs...")
        for ap in list_aps():
            if ap.startswith(AP_PREFIX):
                print(f"Found AP: {ap}")
                return ap
        time.sleep(3)


def connect_ap(ap):
    # Connect to the AP
    # Return True if successful, False otherwise
    p = subprocess.run(
        ["nmcli", "dev", "wifi", "connect", ap, "name", ap],
    )
    if p.returncode != 0:
        raise ValueError("Failed to connect to AP")
    print("Connected to AP")


ID_PATH = pathlib.Path("light-id")


def get_id():
    if not ID_PATH.exists():
        ID_PATH.write_text("1")
    return int(ID_PATH.read_text()) + 1


def write_id(id):
    ID_PATH.write_text(str(id))


def make_req(url):
    print("Making request: ", url)
    RETRIES = 5
    for i in range(RETRIES):
        try:
            r = requests.get(url, timeout=5)
            if not r.ok:
                raise ValueError(f"Failed to make request: {url}")
            print("Done")
            time.sleep(1)
            return
        except Exception as e:
            print(i + 1, "Error making request:", e)
            time.sleep(3)

    raise ValueError(f"Failed to make request: {url} after {RETRIES} retries")


STATE = {"online": False}


def on_message(client, userdata, message):
    print("Message received (IP Address): ", str(message.payload.decode("utf-8")))
    parts = message.topic.split("/")
    msg = message.payload.decode("utf-8")
    if parts[-1] == "ip":
        print("Online")
        STATE["online"] = True


def send_command(command, value):
    mqtt_client.publish(f"cmnd/{device_name}/{command}", value)
    time.sleep(1)


def main():
    global device_name

    print(f"MQTT_HOST: {MQTT_HOST}, SWITCH_TOPIC: {SWITCH_TOPIC}")
    dev_id = get_id()
    device_name = f"downlight{dev_id}"
    print("Device name:", device_name)
    # Find the AP to connect to
    ap = find_ap()
    # Connect to the AP
    connect_ap(ap)

    # Configure the pins
    make_req(
        "http://192.168.4.1/cfg_pins?0=0&r0=0&1=0&r1=0&2=0&r2=0&3=0&r3=0&4=0&r4=0&5=0&r5=0&6=0&r6=0&7=23&r7=0&8=24&r8=0&9=0&r9=0&10=0&r10=0&11=0&r11=0&12=0&r12=0&13=0&r13=0&14=0&r14=0&15=0&r15=0&16=0&r16=0&17=0&r17=0&18=0&r18=0&19=0&r19=0&20=0&r20=0&21=0&r21=0&22=0&r22=0&23=0&r23=0&24=0&r24=0&25=0&r25=0&26=0&r26=0&27=0&r27=0&28=0&0=r28"
    )
    # Configure MQTT
    make_req(
        f"http://192.168.4.1/cfg_mqtt_set?host=<mqtt host here>&port=1883&client={device_name}&group=dining&user=&password="
    )
    # Set default startup color
    make_req(
        "http://192.168.4.1/startup_command?data=backlog+led_basecolor_rgbcw+%230000007777%3B+led_enableAll+1%3B&startup_cmd=1"
    )

    make_req(
        "http://192.168.4.1/cfg_wifi_set?ssid=<your ssid here>&pass=<your wifi password here>"
    )

    mqtt_client.connect(MQTT_HOST)
    mqtt_client.on_message = on_message
    mqtt_client.loop_start()
    mqtt_client.subscribe(f"{device_name}/ip")

    # Turn off the switch
    print("Rebooting device...")
    mqtt_client.publish(SWITCH_TOPIC, "off")
    time.sleep(2)
    mqtt_client.publish(SWITCH_TOPIC, "on")

    # Wait for the device to connect
    print("Waiting for device to connect via MQTT...", end="", flush=True)
    while not STATE["online"]:
        time.sleep(1)
        print(".", end="", flush=True)
    print("Connected")

    output_color = COLOR_MAP.get(SWITCH_TOPIC[-1], "#2200000000")

    # Set the device name
    send_command("ShortName", device_name)
    send_command("FriendlyName", device_name)
    send_command("led_basecolor_rgbcw", output_color)
    send_command("power", "1")

    write_id(dev_id)
    print(f"Flashed device {device_name} successfully")


if __name__ == "__main__":
    main()