tuya-cloudcutter

A tool that disconnects Tuya IoT devices from the cloud, allowing them to run completely locally.

import base64
import datetime
import json
import os
import sys
import time
from typing import List

import tornado

from cloudcutter.protocol import mqtt

from ..crypto.tuyacipher import TuyaCipher, TuyaCipherKeyChoice
from ..device import DeviceConfig
from ..utils import object_to_json
from .transformers import ResponseTransformer

device_mac = ""
file_send_finished = False


def log_request(endpoint, request, decrypted_request_body, verbose_output: bool = False):
    clean_request_body: str = ""
    if len(request.body) > 0 or len(decrypted_request_body) > 0:
        if (decrypted_request_body is not None):
            clean_request_body = decrypted_request_body
        else:
            clean_request_body = request.body
        if type(clean_request_body) == bytes:
            clean_request_body = clean_request_body.decode()
        try:
            body_json = json.loads(clean_request_body)
            if body_json['hid'] is not None:
                mac_str = body_json['hid']
                mac_iter = iter(mac_str)
                global device_mac
                device_mac = ':'.join(a+b for a, b in zip(mac_iter, mac_iter))
        except:
            pass

    if verbose_output:
        # print a blank line for easier reading
        print("")
        print(f'[{datetime.datetime.now().time()} Log (Client)] Request: {request}')

        if len(clean_request_body) > 0:
            print(f'[{datetime.datetime.now().time()} LOG (Client)] ==== Request body ===')
            print(clean_request_body)
            print(f'[{datetime.datetime.now().time()} LOG (Client)] ==== End request body ===')
    else:
        print(f"Processing endpoint {endpoint}")


def log_response(response, verbose_output: bool = False):
    if verbose_output:
        print(f'[{datetime.datetime.now().time()} LOG (Server)] Response: ', response)


class TuyaHeadersHandler(tornado.web.RequestHandler):
    def set_default_headers(self):
        self.set_header("Content-Type", "text/plain; charset=utf-8")
        self.set_header("Connection", "keep-alive")
        self.set_header("Server", "Tuya-Sec")


class TuyaServerHandler(TuyaHeadersHandler):
    def initialize(self, config: DeviceConfig):
        self.cipher = TuyaCipher(config.get_bytes(DeviceConfig.AUTH_KEY))
        self.cipher.set_seckey(config.get_bytes(DeviceConfig.SEC_KEY))
        self.config = config

    def reply(self, key_choice, response: dict):
        encrypted = self.cipher.encrypt(response, key_choice)
        encrypted = base64.b64encode(encrypted).decode("utf-8")
        timestamp = int(time.time())
        response = {"result": encrypted, "t": timestamp}
        signature = self.cipher.sign_server(response, key_choice)
        response["sign"] = signature
        response = object_to_json(response) + "\n"
        self.finish(response)


class GetURLHandlerV1(TuyaHeadersHandler):
    def initialize(self, ipaddr: str, verbose_output: bool):
        self.ipaddr = ipaddr
        self.verbose_output = verbose_output

    def post(self):
        log_request(self.request.uri, self.request, self.request.body, self.verbose_output)
        # The following is the full response, but firmware flashing via mqtt fails when the full response is sent.
        # This is just for reference in case there are any future adjustments that might need to know this data.
        #response = {"caArr": None, "httpUrl": {"addr": f"http://{self.ipaddr}/d.json", "ips": [self.ipaddr]}, "mqttUrl": {"addr": f"{self.ipaddr}:1883", "ips": [self.ipaddr]}, "httpsPSKUrl": {"addr": f"https://{self.ipaddr}/d.json", "ips": [self.ipaddr]}, "mqttsPSKUrl": {"addr": f"{self.ipaddr}:8886", "ips": [self.ipaddr]}, "ttl": 600}
        response = {"caArr": None, "httpUrl": {"addr": f"http://{self.ipaddr}/d.json", "ips": [self.ipaddr]}, "mqttUrl": {"addr": f"{self.ipaddr}:1883", "ips": [self.ipaddr]}, "ttl": 600}
        response = object_to_json(response)
        log_response(response, self.verbose_output)
        self.finish(response)


class GetURLHandlerV2(TuyaHeadersHandler):
    def initialize(self, ipaddr: str, verbose_output: bool):
        self.ipaddr = ipaddr
        self.verbose_output = verbose_output

    def post(self):
        log_request(self.request.uri, self.request, self.request.body, self.verbose_output)
        # The following is the full response, but firmware flashing via mqtt fails when the full response is sent.
        # This is just for reference in case there are any future adjustments that might need to know this data.
        #response = {"caArr": None, "httpUrl": {"addr": f"http://{self.ipaddr}/d.json", "ips": [self.ipaddr]}, "mqttUrl": {"addr": f"{self.ipaddr}:1883", "ips": [self.ipaddr]}, "httpsPSKUrl": {"addr": f"https://{self.ipaddr}/d.json", "ips": [self.ipaddr]}, "mqttsPSKUrl": {"addr": f"{self.ipaddr}:8886", "ips": [self.ipaddr]}, "psk_key": "", "ttl": 600}
        response = {"caArr": None, "httpUrl": {"addr": f"http://{self.ipaddr}/d.json", "ips": [self.ipaddr]}, "mqttUrl": {"addr": f"{self.ipaddr}:1883", "ips": [self.ipaddr]}, "ttl": 600}
        response = object_to_json(response)
        log_response(response, self.verbose_output)
        self.finish(response)


class OldSDKGetURLHandler(TuyaHeadersHandler):
    def initialize(self, ipaddr: str, verbose_output: bool):
        self.ipaddr = ipaddr
        self.verbose_output = verbose_output

    def post(self):
        log_request(self.request.uri, self.request, self.request.body, self.verbose_output)
        # The following is the full response, but firmware flashing via mqtt fails when the full response is sent.
        # This is just for reference in case there are any future adjustments that might need to know this data.
        #response = {"caArr": None, "httpUrl": f"http://{self.ipaddr}/d.json", "mqttUrl": f"{self.ipaddr}:1883", "httpPSKUrl": f"https://{self.ipaddr}/d.json", "mqttPSKUrl": f"{self.ipaddr}:8886"}
        response = {"caArr": None, "httpUrl": f"http://{self.ipaddr}/d.json", "mqttUrl": f"{self.ipaddr}:1883"}
        response = object_to_json(response)
        log_response(response, self.verbose_output)
        self.finish(response)


class OTAFilesHandler(tornado.web.StaticFileHandler):
    def initialize(self, path: str, graceful_exit_timeout: int, verbose_output: bool):
        self.root = self.path = self.absolute_path = path
        self.graceful_exit_timeout = graceful_exit_timeout
        self.verbose_output = verbose_output

    def prepare(self):
        log_request(self.request.uri, self.request, self.request.body, self.verbose_output)
        range_value = self.request.headers.get("Range", "bytes=0-0")
        # get_content_size() is not available in prepare without a lot of overriding work
        # total = self.get_content_size()
        log_response(range_value, self.verbose_output)

    def on_finish(self):
        range_value = self.request.headers.get("Range", "bytes=0-0")
        if range_value[:8].startswith('bytes=0-'):
            global file_send_finished
            file_send_finished = True
            # File send will always finish before mqtt sends a status of nearly complete
            # Leave all logic for shutting down in the mqtt progress check
        total = self.get_content_size()
        timestamp = ""
        if self.verbose_output:
            timestamp = str(datetime.datetime.now().time()) + " "
        print(f"[{timestamp}Firmware Upload] {self.request.uri} send complete, request range: {range_value}/{total}")


class DetachHandler(TuyaServerHandler):
    AUTHKEY_ENDPOINTS = ["tuya.device.active", "tuya.device.uuid.pskkey.get"]

    def initialize(self, schema_directory: os.PathLike, config: DeviceConfig, response_transformers: List[ResponseTransformer], endpoint_hooks, verbose_output: bool):
        super().initialize(config=config)
        self.schema_directory = schema_directory
        self.endpoint_hooks = endpoint_hooks
        self.response_transformers = response_transformers
        self.verbose_output = verbose_output

    def post(self):
        endpoint = self.get_query_argument("a")
        key_choice = TuyaCipherKeyChoice.AUTHKEY if endpoint in self.AUTHKEY_ENDPOINTS else TuyaCipherKeyChoice.SECKEY
        request_body = self.__decrypt_request_body(key_choice)
        log_request(endpoint, self.request, request_body, self.verbose_output)
        request_body = json.dumps(request_body)
        response = self.__rework_endpoint_response(endpoint, request_body)
        default_response = {"success": True, "t": int(time.time())}
        if not response:
            response = default_response
        log_response(response, self.verbose_output)
        self.reply(key_choice, response)

    def __rework_endpoint_response(self, endpoint, request_body):
        response = None
        endpoint_hook_response = None
        if self.endpoint_hooks is not None:
            # Check if any endpoint hook has a response and if so, use it as a response
            # while applying transformations anyway.
            # Otherwise, load the response from a file as usual.
            endpoint_hook = self.endpoint_hooks.get(endpoint, None)
            if endpoint_hook is not None:
                endpoint_hook_response = endpoint_hook(self, endpoint, request_body)

        if endpoint_hook_response is not None:
            response = endpoint_hook_response
        else:
            # Default process, read the response from the base schema directory and return None if it doesn't exist
            endpoint_response_path = os.path.join(self.schema_directory, f"{endpoint}.json")
            if os.path.exists(endpoint_response_path):
                with open(endpoint_response_path, "r") as responsefs:
                    response = json.load(responsefs)
            else:
                print(f"!!! Endpoint response not found, using default response - {endpoint} (This is usually okay and safe to ignore unless something isn't working)")

        if response is None:
            return None

        for transformer in self.response_transformers:
            response = transformer.apply(response)

        return response

    def __decrypt_request_body(self, key_choice: TuyaCipherKeyChoice):
        try:
            body = self.get_argument('data')
            body = bytes.fromhex(body)
            decrypted = self.cipher.decrypt(body, key_choice).decode('utf-8')
        except:
            print(f"[!] Unable to decrypt device reponse.  PSKKEY/AUTHKEY do not match device.")
            exit(90)
        return decrypted