forked from delphiki/AirStatus
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
132 lines (103 loc) · 3.98 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import asyncio
from binascii import hexlify
from datetime import datetime
from time import sleep, time_ns
from bleak import discover
# Configure update duration (update after n seconds)
UPDATE_DURATION = 1
MIN_RSSI = -60
AIRPODS_MANUFACTURER = 76
AIRPODS_DATA_LENGTH = 54
RECENT_BEACONS_MAX_T_NS = 10000000000 # 10 Seconds
recent_beacons = []
def get_best_result(device):
recent_beacons.append({
"time": time_ns(),
"device": device
})
strongest_beacon = None
i = 0
while i < len(recent_beacons):
if time_ns() - recent_beacons[i]["time"] > RECENT_BEACONS_MAX_T_NS:
recent_beacons.pop(i)
continue
if strongest_beacon is None or strongest_beacon.rssi < recent_beacons[i]["device"].rssi:
strongest_beacon = recent_beacons[i]["device"]
i += 1
if strongest_beacon is not None and strongest_beacon.address == device.address:
strongest_beacon = device
return strongest_beacon
# Getting data with hex format
async def get_device():
# Scanning for devices
devices = await discover()
for d in devices:
# Checking for AirPods
d = get_best_result(d)
if d.rssi >= MIN_RSSI and AIRPODS_MANUFACTURER in d.metadata['manufacturer_data']:
data_hex = hexlify(bytearray(d.metadata['manufacturer_data'][AIRPODS_MANUFACTURER]))
data_length = len(hexlify(bytearray(d.metadata['manufacturer_data'][AIRPODS_MANUFACTURER])))
if data_length == AIRPODS_DATA_LENGTH:
return data_hex
return False
def get_device_data():
return asyncio.run(get_device())
# Getting data from hex string and converting it to dict(json)
# Getting data from hex string and converting it to dict(json)
def get_data():
raw = get_device_data()
# Return blank data if airpods not found
if not raw:
return dict(status=0, model="AirPods not found")
flip: bool = is_flipped(raw)
# On 7th position we can get AirPods model, gen1, gen2, Pro or Max
if chr(raw[7]) == 'e':
model = "AirPodsPro"
elif chr(raw[7]) == 'f':
model = "AirPods2"
elif chr(raw[7]) == '2':
model = "AirPods1"
elif chr(raw[7]) == 'a':
model = "AirPodsMax"
else:
model = "unknown"
# Checking left AirPod for availability and storing charge in variable
status_tmp = int("" + chr(raw[12 if flip else 13]), 16)
left_status = (100 if status_tmp == 10 else (status_tmp * 10 + 5 if status_tmp <= 10 else -1))
# Checking right AirPod for availability and storing charge in variable
status_tmp = int("" + chr(raw[13 if flip else 12]), 16)
right_status = (100 if status_tmp == 10 else (status_tmp * 10 + 5 if status_tmp <= 10 else -1))
# Checking AirPods case for availability and storing charge in variable
status_tmp = int("" + chr(raw[15]), 16)
case_status = (100 if status_tmp == 10 else (status_tmp * 10 + 5 if status_tmp <= 10 else -1))
# On 14th position we can get charge status of AirPods
charging_status = int("" + chr(raw[14]), 16)
charging_left: bool = (charging_status & (0b00000010 if flip else 0b00000001)) != 0
charging_right: bool = (charging_status & (0b00000001 if flip else 0b00000010)) != 0
charging_case: bool = (charging_status & 0b00000100) != 0
# Return result info in dict format
return dict(
status=1,
charge=dict(
left=left_status,
right=right_status,
case=case_status
),
charging_left=charging_left,
charging_right=charging_right,
charging_case=charging_case,
model=model,
date=datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
raw=raw.decode("utf-8")
)
# Return if left and right is flipped in the data
def is_flipped(raw):
return (int("" + chr(raw[10]), 16) & 0x02) == 0
def run():
while True:
data = get_data()
if data["status"] == 1:
print(data)
sleep(UPDATE_DURATION)
if __name__ == '__main__':
run()