Skip to content

Example: Control Remotely

Arvydas Juskevicius edited this page Feb 11, 2020 · 7 revisions

This example shows how to connect to BlinkStick.com and link it with a registered device on your account.

Requires additional packages which can be installed with the following command line:

pip install websocket-client
import websocket
import json
import sys
from blinkstick import blinkstick

access_code = "change this to AccessCode from BlinkStick.com"

bstick = blinkstick.find_first()

if bstick is None:
    sys.exit("BlinkStick not found...")


def HTMLColorToRGB(colorstring):
    """ convert #RRGGBB to an (R, G, B) tuple """
    colorstring = colorstring.strip()
    if colorstring[0] == '#': colorstring = colorstring[1:]
    if len(colorstring) != 6:
        raise ValueError("input #%s is not in #RRGGBB format" % colorstring)
    r, g, b = colorstring[:2], colorstring[2:4], colorstring[4:]
    r, g, b = [int(n, 16) for n in (r, g, b)]
    return (r, g, b)


def on_message(ws, message):
    global client_id
    global access_code
    global bstick

    #Uncomment this for debugging purposes
    #print message

    m = json.loads(message)

    if m[0]['channel'] == '/meta/connect':
        ws.send(json.dumps(
            {'channel': '/meta/connect',
             'clientId': client_id,
             'connectionType': 'websocket'}))
        return

    elif m[0]['channel'] == '/meta/handshake':
        client_id = m[0]['clientId']

        print ("Acquired clientId: " + client_id)

        ws.send(json.dumps(
            {'channel': '/meta/subscribe',
             'clientId': client_id,
             'subscription': '/devices/' + access_code}))
        return

    elif m[0]['channel'] == '/devices/' + access_code:
        if 'color' in m[0]["data"]:
            print ("Received color: " + m[0]["data"]["color"])

            (r, g, b) = HTMLColorToRGB(m[0]["data"]["color"])
            for x in range(0, 8):
                bstick.set_color(channel=0, index=x, red=r, green=g, blue=b)
        elif 'status' in m[0]["data"] and m[0]["data"]['status'] == "off":
            print ("Turn off")
            bstick.turn_off()

    elif m[0]['channel'] == '/meta/subscribe':
        if m[0]['successful']:
            print ("Subscribed to device. Waiting for color message...")
        else:
            print ("Subscription to the device failed. Please check the access_code value in the file.")

    #Reconnect again and wait for further messages
    ws.send(json.dumps(
        {'channel': '/meta/connect',
         'clientId': client_id,
         'connectionType': 'websocket'}))


def on_error(ws, error):
    print (error)


def on_close(ws):
    print ("### closed ###")


def on_open(ws):
    ws.send(json.dumps(
        {'channel': '/meta/handshake',
         'version': '1.0',
         'supportedConnectionTypes': ['long-polling', 'websocket']}))


if __name__ == "__main__":
    # Set this to True for debugging purposes
    websocket.enableTrace(False)
    ws = websocket.WebSocketApp("ws://live.blinkstick.com:9292/faye",
                                on_message=on_message,
                                on_error=on_error,
                                on_close=on_close)
    ws.on_open = on_open

    ws.run_forever()