Skip to content

Commit

Permalink
PulseView/sigrok protocol decoder plugins
Browse files Browse the repository at this point in the history
Modifications from the stock LIN decoder to pass LIN
frames as Python data up to stacked BEKANT decoder
  • Loading branch information
Ivan committed Feb 8, 2021
1 parent 84d94b0 commit 255c289
Show file tree
Hide file tree
Showing 4 changed files with 283 additions and 15 deletions.
2 changes: 2 additions & 0 deletions sigrok/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
__pycache__/
*.py[cod]
12 changes: 12 additions & 0 deletions sigrok/BEKANT/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
##
##

'''
Decodes control protocol for the IKEA BEKANT adjustable-height
sit/stand desk.
This decoder stacks on top of the 'lin' PD. The underlying uart
decoder should be set to 8n1, 19200 bps, lsb-first.
'''

from .pd import Decoder
195 changes: 195 additions & 0 deletions sigrok/BEKANT/pd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
##
##

## SIGROKDECODE_DIR=~/BEKANT/sigrok/ \
## sigrok-cli \
## -P 'uart:baudrate=19200:rx=RX,linpyout,BEKANT' \
## -i ~/BEKANT/logictrace/oem_init_startup.sr

import sigrokdecode
import struct
from collections import namedtuple

FrameRange = namedtuple('FrameRange', ['ss', 'es', 'frame'])

class Ann:
FRAME = 0
TRANSACTION = 1
ERROR = 2

class BusTransaction:
class Slot:
def __init__(self, frame_id, short_name, long_name):
self.frame_id = frame_id
self.short_name = short_name
self.long_name = long_name

bus_sched = [
Slot(0x11, "Master 0x11", "M 11"),
Slot(0x08, "Slave 0x08", "S 08"),
Slot(0x09, "Slave 0x09", "S 09"),
Slot(0x10, "Slave 0x10", "S 10"),
Slot(0x10, "Slave 0x10", "S 10"),
Slot(0x10, "Slave 0x10", "S 10"),
Slot(0x10, "Slave 0x10", "S 10"),
Slot(0x10, "Slave 0x10", "S 10"),
Slot(0x10, "Slave 0x10", "S 10"),
Slot(0x01, "Slave 0x01", "S 01"),
Slot(0x12, "Master 0x12", "M 12"),
]

def __init__(self):
self.next_sched_pos = 0
self.frames = []

def match_first_frame(self, frame_dict):
slot = self.bus_sched[0]
return 'id' in frame_dict and slot.frame_id == frame_dict['id']

def is_init(self):
return self.next_sched_pos == 0

def match_next_frame(self, frame_dict):
if self.next_sched_pos >= len(self.bus_sched):
return False
next_slot = self.bus_sched[self.next_sched_pos]
return 'id' in frame_dict and next_slot.frame_id == frame_dict['id']

def add_frame(self, ss, es, frame_dict):
self.frames.append(FrameRange(ss=ss, es=es, frame=frame_dict))
self.next_sched_pos += 1

def is_complete(self):
return self.next_sched_pos >= len(self.bus_sched)


class Decoder(sigrokdecode.Decoder):
api_version = 3
id = 'BEKANT'
name = 'BEKANT'
longname = 'BEKANT Control Protocol'
desc = 'IKEA BEKANT height-adjustable desk control protocol'
license = 'gplv2+'
inputs = ['lin']
outputs = []
tags = ['LIN']
annotations = (
('frame', 'LIN frames'),
('transaction', 'Transactions'),
('error', 'Error descriptions'),
)
annotation_rows = (
('frame', 'Frame', (Ann.FRAME,)),
('transaction', 'Transaction', (Ann.TRANSACTION,)),
('error', 'Error', (Ann.ERROR,)),
)

def __init__(self):
self.reset()

def reset(self):
self.txn = BusTransaction()

def start(self):
self.out_ann = self.register(sigrokdecode.OUTPUT_ANN)

def putx(self, data):
# Simplification, most annotations span exactly one SPI byte/packet.
self.put(self.ss, self.es, self.out_ann, data)

def putf(self, data):
self.put(self.ss_field, self.es_field, self.out_ann, data)

def putc(self, data):
self.put(self.ss_cmd, self.es_cmd, self.out_ann, data)

def cmd_ann_list(self):
s, l = self.current_cmd.short_name, self.current_cmd.long_name
return ['Command: %s (%s)' % (l, s), 'Command: %s' % l,
'Cmd: %s' % l, 'Cmd: %s' % s, s]

command_byte = {
7: '???',
0xbc: 'OEM_UP_SLOW?',
0xbd: 'OEM_DOWN_SLOW?',
0xbf: 'OEM_bf???',
0x83: 'OEM_83???',
0x80: 'OEM_80???',
0x81: 'OEM_81???',
0x82: 'OEM_82???',
0xc0: 'OEM_c0???',
0xff: 'OEM_ff???',
0xfc: 'STOP',
0xc4: 'PRE_MOVE',
0x86: 'UP',
0x85: 'DOWN',
0x87: 'DECEL',
0x84: 'PRE_STOP',
}

def decode(self, ss, es, packet):
ptype = packet[0]
pdata = packet[1]

if ptype == 'FRAME':
self.decode_frame(ss, es, pdata)
if self.txn.is_init() and not self.txn.match_first_frame(pdata):
# not tracking a running transaction yet, ignore frame
pass
else:
if self.txn.match_next_frame(pdata):
self.txn.add_frame(ss, es, pdata)
if self.txn.is_complete():
self.decode_transaction(self.txn)
self.reset()
else:
self.put(ss, es, self.out_ann, [Ann.ERROR, ["Error"]])
self.reset()
elif ptype == 'ERROR':
self.put(ss, es, self.out_ann, [Ann.ERROR, ["Error from LIN decoder"]])

def decode_transaction(self, transaction):
frame_08 = transaction.frames[1].frame
frame_09 = transaction.frames[2].frame
frame_cmd = transaction.frames[10].frame
first_ss = transaction.frames[0].ss
last_es = transaction.frames[10].es

data_08 = data_tuples_as_list(frame_08['data'])
data_09 = data_tuples_as_list(frame_09['data'])
data_cmd = data_tuples_as_list(frame_cmd['data'])

encoder_08 = extract_encoder_val(data_08)
encoder_09 = extract_encoder_val(data_09)
encoder_cmd = extract_encoder_val(data_cmd)

cmd_name = self.command_byte[ data_cmd[2] ]
self.put(first_ss, last_es, self.out_ann,
[Ann.TRANSACTION, [
'08:%04x 09:%04x cmd:%04x %s' % (
encoder_08, encoder_09, encoder_cmd, cmd_name),
'%04x %s' % (encoder_cmd, cmd_name),
cmd_name
]])

def decode_frame(self, ss, es, frame_dict):
self.put(ss, es, self.out_ann,
[Ann.FRAME, [
self.frame_as_string_long(frame_dict),
# self.frame_as_string_short(f)
]])


def frame_as_string_long(self, frame_dict):
data_list = data_tuples_as_list(frame_dict['data'])
return "ID:%02x Data:%s" % (
frame_dict['id'],
' '.join(["%02x" % b for b in data_list])
)

def data_tuples_as_list(dtups):
return [t[2] for t in dtups]

def extract_encoder_val(data_list):
(enc,) = struct.unpack("H", bytes(data_list[0:2]))
return enc
89 changes: 74 additions & 15 deletions sigrok/linpyout/pd.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,23 @@

import sigrokdecode as srd

'''
OUTPUT_PYTHON format:
Packet:
[<ptype>, <data>]
<ptype>:
- 'FRAME'
- 'ERROR'
Examples:
['FRAME', {'break': (4992, 5665, 0), 'sync': (5823, 6241, 85),
'pid': (6602, 7020, 8), 'id': 8, 'parity': 0, 'parity_valid': True,
'checksum': (9114, 9532, 64), 'checksum_valid': True,
'data': [(7401, 7819, 81), (7972, 8390, 6), (8543, 8961, 96)]} ]
['ERROR']
'''

class LinFsm:
class State:
WaitForBreak = 'WAIT_FOR_BREAK'
Expand Down Expand Up @@ -57,28 +74,37 @@ def __init__(self):
self.uart_idle_count = 0
self.reset()

class Ann:
DATA = 0
CONTROL = 1
FRAME = 2
INLINE_ERROR = 3
ERROR_DESC = 4

class Decoder(srd.Decoder):
api_version = 3
id = 'lin'
id = 'linpyout'
name = 'LIN'
longname = 'Local Interconnect Network'
desc = 'Local Interconnect Network (LIN) protocol.'
license = 'gplv2+'
inputs = ['uart']
outputs = []
outputs = ['lin']
tags = ['Automotive']
options = (
{'id': 'version', 'desc': 'Protocol version', 'default': 2, 'values': (1, 2)},
)
annotations = (
('data', 'LIN data'),
('control', 'Protocol info'),
('error', 'Error description'),
('frame', 'LIN frame'),
('inline_error', 'Protocol violation or error'),
('error', 'Error description'),
)
annotation_rows = (
('data_vals', 'Data', (0, 1, 3)),
('errors', 'Errors', (2,)),
('data_vals', 'Data', (Ann.DATA, Ann.CONTROL, Ann.INLINE_ERROR)),
('frames', 'Frame', (Ann.FRAME,)),
('errors', 'Errors', (Ann.ERROR_DESC,)),
)

def __init__(self):
Expand All @@ -94,6 +120,7 @@ def reset(self):

def start(self):
self.out_ann = self.register(srd.OUTPUT_ANN)
self.out_python = self.register(srd.OUTPUT_PYTHON)
self.lin_version = self.options['version']

def putx(self, data):
Expand Down Expand Up @@ -135,7 +162,8 @@ def handle_break(self, value):
self.fsm.reset()
self.fsm.transit(LinFsm.State.Sync)

self.putx([1, ['Break condition', 'Break', 'Brk', 'B']])
self.lin_header.append((self.ss_block, self.es_block, value))
self.putx([Ann.CONTROL, ['Break condition', 'Break', 'Brk', 'B']])

def handle_sync(self, value):
self.fsm.transit(LinFsm.State.Pid)
Expand All @@ -149,17 +177,24 @@ def handle_data(self, value):
self.lin_rsp.append((self.ss_block, self.es_block, value))

def handle_checksum(self):
frame_dict = {}

break_ = self.lin_header.pop(0) if len(self.lin_header) else None
sync = self.lin_header.pop(0) if len(self.lin_header) else None

self.put(sync[0], sync[1], self.out_ann, [0, ['Sync', 'S']])
self.put(sync[0], sync[1], self.out_ann, [Ann.DATA, ['Sync', 'S']])

if sync[2] != 0x55:
self.put(sync[0], sync[1], self.out_ann,
[2, ['Sync is not 0x55', 'Not 0x55', '!= 0x55']])
[Ann.ERROR_DESC, ['Sync is not 0x55', 'Not 0x55', '!= 0x55']])

pid = self.lin_header.pop(0) if len(self.lin_header) else None
checksum = self.lin_rsp.pop() if len(self.lin_rsp) else None

frame_dict['break'] = break_
frame_dict['sync'] = sync
frame_start = break_[0]

if pid:
id_ = pid[2] & 0x3F
parity = pid[2] >> 6
Expand All @@ -168,34 +203,58 @@ def handle_checksum(self):
parity_valid = parity == expected_parity

if not parity_valid:
self.put(pid[0], pid[1], self.out_ann, [2, ['P != %d' % expected_parity]])
self.put(pid[0], pid[1], self.out_ann,
[Ann.ERROR_DESC, ['P != %d' % expected_parity]])

ann_class = 0 if parity_valid else 3
ann_class = Ann.DATA if parity_valid else Ann.INLINE_ERROR
self.put(pid[0], pid[1], self.out_ann, [ann_class, [
'ID: %02X Parity: %d (%s)' % (id_, parity, 'ok' if parity_valid else 'bad'),
'ID: 0x%02X' % id_, 'I: %d' % id_
]])

frame_dict['pid'] = pid
frame_dict['id'] = id_
frame_dict['parity'] = parity
frame_dict['parity_valid'] = parity_valid

if len(self.lin_rsp):
checksum_valid = self.checksum_is_valid(pid[2], self.lin_rsp, checksum[2])

for b in self.lin_rsp:
self.put(b[0], b[1], self.out_ann, [0, ['Data: 0x%02X' % b[2], 'D: 0x%02X' % b[2]]])
self.put(b[0], b[1], self.out_ann,
[Ann.DATA, ['Data: 0x%02X' % b[2], 'D: 0x%02X' % b[2]]])

ann_class = 0 if checksum_valid else 3
ann_class = Ann.DATA if checksum_valid else Ann.INLINE_ERROR
self.put(checksum[0], checksum[1], self.out_ann,
[ann_class, ['Checksum: 0x%02X' % checksum[2], 'Checksum', 'Chk', 'C']])

if not checksum_valid:
self.put(checksum[0], checksum[1], self.out_ann, [2, ['Checksum invalid']])
self.put(checksum[0], checksum[1], self.out_ann,
[Ann.ERROR_DESC, ['Checksum invalid']])

frame_dict['checksum'] = checksum
frame_dict['checksum_valid'] = checksum_valid
frame_end = checksum[1]
else:
pass # No response.
# No response.
frame_end = pid[1]

frame_dict['data'] = list(self.lin_rsp) # copy
self.put_frame_annotation(frame_start, frame_end, frame_dict)
self.put(frame_start, frame_end, self.out_python, ['FRAME', frame_dict])

self.lin_header.clear()
self.lin_rsp.clear()

def put_frame_annotation(self, ss, es, frame_dict):
data_str = ' '.join(['0x%02X' % t[2] for t in frame_dict['data']])
id_str = '0x%02X' % frame_dict['id']
ann_short = id_str + ': ' + data_str
self.put(ss, es, self.out_ann, [Ann.FRAME, [ann_short]])

def handle_error(self, dummy):
self.putx([3, ['Error', 'Err', 'E']])
self.putx([Ann.INLINE_ERROR, ['Error', 'Err', 'E']])
self.put(self.ss_block, self.es_block, self.out_python, ['ERROR'])

def checksum_is_valid(self, pid, data, checksum):
if self.lin_version == 2:
Expand Down

0 comments on commit 255c289

Please sign in to comment.