tuya-cloudcutter

A tool that disconnects Tuya IoT devices from the cloud, allowing them to run completely locally.

import base64
import json
import socket
import ssl
import sys
import time
import os

import tornado.httpserver
import tornado.ioloop
import tornado.web

from pskcontext import PSKContext
from tuyacipher import TuyaCipher, TuyaCipherKeyChoice

import hmac
from hashlib import sha256

cipher = None

def object_to_json(obj):
    return json.dumps(obj, separators=(',',':'))

class TuyaServerHandler(tornado.web.RequestHandler):
    def initialize(self, authkey: bytes):
        global cipher
        self.authkey = authkey
        cipher = TuyaCipher(authkey)
        cipher.set_seckey(b'5b4e54679e2d7ce8')

    def reply(self, key_choice, response: dict):
        encrypted = cipher.encrypt(response, key_choice)
        encrypted = base64.b64encode(encrypted).decode("utf-8")

        timestamp = int(time.time())
        response = {"result": encrypted, "t": timestamp}
        signature = cipher.sign_server(response, key_choice)

        response["sign"] = signature
        response = object_to_json(response) + "\n"

        self.finish(response)

    def set_default_headers(self):
        self.set_header("Content-Type", "text/plain; charset=utf-8")
        self.set_header("Connection", "keep-alive")
        self.set_header("Server", "Tuya-Sec")


class GetURLHandler(TuyaServerHandler):
    def post(self):
        response = {"caArr":None,"httpUrl":{"addr":"http://10.42.42.1/d.json","ips":["10.42.42.1"]},"mqttUrl":{"addr":"10.42.42.1:1883","ips":["10.42.42.1"]},"ttl":600}
        response = object_to_json(response)

        self.finish(response)

class FilesHandler(tornado.web.StaticFileHandler):
    def parse_url_path(self, url_path):
        if not url_path or url_path.endswith('/'):
            url_path = url_path + str('index.html')
        return url_path

class ProxyHandler(TuyaServerHandler):
    def initialize(self, authkey: bytes, sslcontext: ssl.SSLContext, host: str, port: int, profiles_dir: os.PathLike):
        super().initialize(authkey)
        self.host = host
        self.port = port
        self.sslcontext = sslcontext
        self.profiles_dir = profiles_dir

    def post(self):
        global cipher
        endpoint = self.get_query_argument("a")
        other_body = None
        key_choice = TuyaCipherKeyChoice.AUTHKEY if ("active" in endpoint) else TuyaCipherKeyChoice.SECKEY
        if not ("upgrade.get" in endpoint or "upgrade.status.update" in endpoint):
            received, received_body, decoded, sent_request = self._outbound_request(self.request, key_choice, other_body)
            print("DEBUG: decoded POST response", decoded)
            try:
                response = json.loads(decoded)
            except Exception as e:
                print("Exception unpacking json", e)
                response = decoded

            if "active" in endpoint:
                try:
                    cipher.set_seckey(response["result"]["secKey"].encode("utf-8"))
                except Exception as e:
                    print("Some error exchanging TuyaCipher", e)
                    pass
        elif "upgrade.status.update" in endpoint:
            self.reply(key_choice, {"success": True})
            return
        else:
            with open("../files/upgrade.bin", "rb") as fs:
                upgdata = fs.read()

            fsha = sha256(upgdata).hexdigest().upper().encode("utf-8")
            filehmac = hmac.digest(cipher.seckey, fsha, sha256).hex().upper()
            print("filehmac", filehmac)
            response = {
                    "result": {
                        "url": "http://10.42.42.1:80/files/upgrade.bin",
                        #"pskUrl": "https://10.42.42.1/files/upgrade.bin",
                        #"httpsUrl": "https://10.42.42.1/files/upgrade.bin",
                        "hmac": filehmac,
                        "version": "2.9.15",
                        "size": str(len(upgdata)),
                        "type": 0,
                    },
                    "success": True,
                    "t": int(time.time()),
            }
            self.reply(key_choice, response)
            return

        self._store_request_response(endpoint, sent_request, response)
        self.finish(received_body)

    def _store_request_response(self, endpoint, sent_request, response):
        if response["success"]:
            with open(os.path.join(self.profiles_dir, f"dump_recv_{endpoint}.json"), "wt") as file:
                file.write(json.dumps(response))
            with open(os.path.join(self.profiles_dir, f"dump_sent_{endpoint}.txt"), "wt") as file:
                file.write(sent_request.decode("utf-8"))

    def _outbound_request(self, request, key_choice, other_body=None):
        request.headers["Host"] = self.host
        headers_formatted = "\r\n".join([f"{k}: {v}" for k, v in request.headers.items()])
        outbound_req = f"{request.method} {request.path}?{request.query} HTTP/1.1\r\n{headers_formatted}\r\n\r\n".encode("utf-8")
        if other_body is None:
            outbound_req += request.body
        else:
            outbound_req += other_body
        print(outbound_req)
        received = bytearray()
        with self.sslcontext.wrap_socket(socket.create_connection((self.host, self.port))) as csocket:
            csocket.settimeout(1.0)
            csocket.send(outbound_req)
            try:
                data = csocket.recv(10000)
                while len(data) > 0:
                    received += data
                    data = csocket.recv(10000)
            except socket.timeout:
                pass
        body_start = received.index(b"\r\n\r\n") + 4
        received_body = received[body_start:]
        response = json.loads(received_body.decode("utf-8").strip())
        print(response)

        decoded = cipher.decrypt(base64.b64decode(response["result"]), key_choice)
        decoded = decoded.decode("utf-8").strip("\n").strip("\r").strip()

        return received, bytes(received_body), decoded, outbound_req


if __name__ == '__main__':
    if len(sys.argv) < 4:
        sys.exit("WRONG, NINCOMPOOP")

    _, uuid, psk, authkey = [x.encode() for x in sys.argv]
    context = PSKContext(authkey=authkey, uuid=uuid, psk=psk)
    profiles_dir = os.getenv("DEVICE_PROFILES_DIR", "")

    handler_config = dict(authkey=authkey)
    proxy_config = dict(sslcontext=context, host="a3.tuyaeu.com", port=443, profiles_dir=profiles_dir, **handler_config)
    files_handler_config = dict(path="../files/")
    application = tornado.web.Application([
        (r'/v1/url_config', GetURLHandler, handler_config),
        (r'/d.json', ProxyHandler, proxy_config),
        (r'/files/(.*)', FilesHandler, files_handler_config),
    ])


    http_server = tornado.httpserver.HTTPServer(application)
    http_server.listen(80)

    https_server = tornado.httpserver.HTTPServer(application, ssl_options=context)
    https_server.listen(443)

    tornado.ioloop.IOLoop.current().start()