Source code for pymoof.clients.sx3

from enum import Enum

import bleak.backends.client

from pymoof.profiles.sx3 import SX3Profile
from pymoof.util import bleak_utils


[docs]class BellTone(Enum): BOAT = 0x18 PARTY = 0x17 BELL = 0x16
[docs]class LockState(Enum): UNLOCKED = 0x00 LOCKED = 0x01 AWAITING_UNLOCK = 0x02
[docs]class Sound(Enum): SCROLLING_TONE = 0x1 BEEP_NEGATIVE = 0x2 BEEP_POSITIVE = 0x3 UNLOCK_COUNTDOWN = 0x4 PAIRING = 0x5 ENTER_BACKUP_CODE_MODE = 0x6 RESET_COUNTDOWN = 0x7 PAIRING_SUCCESSFUL = 0x8 PAIRING_FAILED = 0x9 HORN_1 = 0xA HORN_URGENT = 0xB LOCK = 0xC UNLOCK = 0xD ALARM_STAGE_ONE = 0xE ALARM_STAGE_TWO = 0xF SYSTEM_STARTUP = 0x10 SYSTEM_SHUTDOWN = 0x11 CHARGING = 0x12 DIAGNOSE = 0x13 FIRMWARE_DOWNLOAD = 0x14 FIRMWARE_FAILED = 0x15 HORN_2 = 0x16 HORN_3 = 0x17 HORN_4 = 0x17 FIRMWARE_SUCCESSFUL = 0x18 NOISE = 0x19 UNPAIRING = 0x1A FM_DISABLE = 0x1B FM_ENABLE = 0x1C FM_NOISE = 0x1D
[docs]class SX3Client: """ A wrapper around a bleak client that allows bluetooth communication with a Vanmoof S3 and X3. You must provide this object with a connected BleakClient and a hexidecimal string formatted key for the bike. :param bleak_client: Connected bleak.backends.client.BaseBleakClient :param key: The encryption key for the bike from Vanmoof servers :param user_key_id: The user key id for the bike from Vanmoof servers """ def __init__( self, bleak_client: bleak.backends.client.BaseBleakClient, key: str, user_key_id: int, ) -> None: self._gatt_client = bleak_client self._bike_profile = SX3Profile(key, user_key_id) async def _get_nonce(self) -> bytes: return await self._read( self._bike_profile.Security.CHALLENGE, needs_decryption=False, ) async def _read(self, characteristic_uuid, needs_decryption: bool = True) -> bytes: result = await bleak_utils.read_from_characteristic( self._gatt_client, characteristic_uuid, ) if needs_decryption: result = self._bike_profile.decrypt_payload(result) return result async def _write(self, characteristic_uuid, data: bytes) -> None: nonce = await self._get_nonce() payload = self._bike_profile.build_encrypted_payload(nonce, data) await bleak_utils.write_to_characteristic( self._gatt_client, characteristic_uuid, payload, )
[docs] async def authenticate(self) -> None: """ Attempts to authenticate with the bike by performing the nonce challenge. .. warning:: This method will not check if you have successfully authenticated and will silently return. """ nonce = await self._get_nonce() payload = self._bike_profile.build_authentication_payload(nonce) await bleak_utils.write_to_characteristic( self._gatt_client, self._bike_profile.Security.KEY_INDEX, payload, )
[docs] async def set_bell_tone(self, bell_tone: BellTone) -> None: """ **Must be authenticated to call** Sets the bell tone for the bike. :param bell_tone: The type of bell tone to use. See ``pymoof.clients.sx3.BellTone`` for a list of valid bell tones. :raises ``bleak.exc.BleakError``: if the client is not authenticated. """ await self._write( self._bike_profile.Sound.BELL_SOUND, [bell_tone.value], )
[docs] async def set_lock_state(self, state: LockState) -> None: """ **Must be authenticated to call** Sets the lock state for the bike. :param state: The lock state to use. See ``pymoof.clients.sx3.LockState`` for a list of lock states. :raises ``bleak.exc.BleakError``: if the client is not authenticated. """ await self._write( self._bike_profile.Defense.LOCK_STATE, [state.value], )
[docs] async def set_power_level(self, level: int) -> None: """ **Must be authenticated to call** Sets the power level for the bike. :param level: An integer between 0 and 5 inclusive. :raises ``bleak.exc.BleakError``: if the client is not authenticated. :raises AssertionError: if level is outside the valid range. """ assert 0 <= level <= 5 await self._write( self._bike_profile.Movement.POWER_LEVEL, [level, 0x1], )
[docs] async def play_sound(self, sound: Sound, count: int = 1): """ **Must be authenticated to call** Plays a sound some number of times. :param sound: The sound to use. See ``pymoof.clients.sx3.Sound`` for a list of valid sounds. :param count: An integer greater than 1. Defaults to 1. :raises ``bleak.exc.BleakError``: if the client is not authenticated. :raises AssertionError: if count is outside the valid range. """ assert 0 < count await self._write( self._bike_profile.Sound.PLAY_SOUND, [sound.value, count], )
[docs] async def get_battery_level(self) -> int: """ **Must be authenticated to call** Gets the battery level of the bike out of 100. :raises ``bleak.exc.BleakError``: if the client is not authenticated. :return: Battery level as an integer between 0 and 100 inclusive. """ result = await self._read( self._bike_profile.BikeInfo.MOTOR_BATTERY_LEVEL, ) return int(result[0])
[docs] async def get_lock_state(self) -> LockState: """ **Must be authenticated to call** Gets the lock state of the bike. :raises ``bleak.exc.BleakError``: if the client is not authenticated. :return: A ``pymoof.clients.sx3.LockState`` of the lock state of the bike. """ result = await self._read( self._bike_profile.Defense.LOCK_STATE, ) return LockState(result[0])
[docs] async def get_distance_travelled(self) -> float: """ **Must be authenticated to call** Gets the distance travelled of the bike in kilometers. :raises ``bleak.exc.BleakError``: if the client is not authenticated. :return: A float that represents the distance travelled in kilometers. """ # Returns kilometers, stored as hectometers result = await self._read( self._bike_profile.Movement.DISTANCE, ) return int.from_bytes(result, "little") / 10
[docs] async def get_power_level(self) -> int: """ **Must be authenticated to call** TODO: Need to figure out what this actually returns :raises ``bleak.exc.BleakError``: if the client is not authenticated. :return: N/A """ result = await self._read( self._bike_profile.Movement.POWER_LEVEL, ) return result
[docs] async def get_frame_number(self) -> str: """ **No authentication needed to call** Returns the frame number of the bike. :raises ``bleak.exc.BleakError``: if the client is not authenticated. :return: A string that represents the frame number. """ result = await self._read( self._bike_profile.BikeInfo.FRAME_NUMBER, needs_decryption=False, ) return result.decode("ascii")
[docs] async def get_sound_volume(self) -> int: """ **Must be authenticated to call** Gets the sound volume. TODO: parse output :raises ``bleak.exc.BleakError``: if the client is not authenticated. :return: N/A """ result = await self._read( self._bike_profile.Sound.SOUND_VOLUME, ) return result
[docs] async def get_speed(self) -> int: """ **Must be authenticated to call** Gets the current speed of the bike in kilometers per hour :raises ``bleak.exc.BleakError``: if the client is not authenticated. :return: An integer that represents the speed of the bike in kilometers per hour. """ result = await self._read( self._bike_profile.Movement.SPEED, ) return int.from_bytes(result, "little")
[docs] async def get_light_mode(self) -> int: """ **Must be authenticated to call** Gets the light mode. TODO: parse output :raises ``bleak.exc.BleakError``: if the client is not authenticated. :return: N/A """ result = await self._read( self._bike_profile.Movement.SPEED, ) return result