From d3ddc89f33e4f15ae8f49ee3ee5c9236209a0e84 Mon Sep 17 00:00:00 2001 From: Chris Karageorgiou Kaneen Date: Sat, 19 Sep 2020 23:08:16 +0300 Subject: [PATCH] Add emulator support - Add Emulator object for emulating bpod hardware behavior and communicating outputs to emulator GUI server - Modify bpod classes to accommodate emulator functionality - Add emulator examples to showcase its usage --- .../emulator_examples/10_state_example.py | 16 + .../10_state_example_w_global_timer.py | 20 + examples/emulator_examples/1_state_example.py | 10 + examples/emulator_examples/__init__.py | 2 + .../add_trial_events_2_manual_override.py | 97 +++ .../add_trial_events_manual_override.py | 73 ++ examples/emulator_examples/bpod_info.py | 9 + .../emulator_examples/condition_example.py | 25 + .../condition_example_manual_override.py | 38 + .../global_counter_example_manual_override.py | 46 ++ .../emulator_examples/global_timer_example.py | 27 + .../global_timer_example_digital.py | 29 + .../global_timer_start_and_end_events.py | 42 ++ ...chasing_example_2_pokes_manual_override.py | 49 ++ ...chasing_example_3_pokes_manual_override.py | 65 ++ examples/emulator_examples/manual_override.py | 108 +++ .../simple_global_timer_example.py | 25 + ..._triggered_state_change_manual_override.py | 30 + examples/emulator_settings.py | 23 + pybpodapi/bpod/bpod_base.py | 363 ++++++---- pybpodapi/bpod/bpod_com_protocol.py | 42 +- pybpodapi/bpod/bpod_io.py | 5 +- pybpodapi/bpod/emulator/__init__.py | 1 + pybpodapi/bpod/emulator/constants.py | 97 +++ pybpodapi/bpod/emulator/emulator.py | 675 ++++++++++++++++++ pybpodapi/bpod/emulator/state.py | 45 ++ pybpodapi/bpod/hardware/hardware.py | 14 + pybpodapi/settings.py | 1 + 28 files changed, 1822 insertions(+), 155 deletions(-) create mode 100644 examples/emulator_examples/10_state_example.py create mode 100644 examples/emulator_examples/10_state_example_w_global_timer.py create mode 100644 examples/emulator_examples/1_state_example.py create mode 100644 examples/emulator_examples/__init__.py create mode 100644 examples/emulator_examples/add_trial_events_2_manual_override.py create mode 100644 examples/emulator_examples/add_trial_events_manual_override.py create mode 100644 examples/emulator_examples/bpod_info.py create mode 100644 examples/emulator_examples/condition_example.py create mode 100644 examples/emulator_examples/condition_example_manual_override.py create mode 100644 examples/emulator_examples/global_counter_example_manual_override.py create mode 100644 examples/emulator_examples/global_timer_example.py create mode 100644 examples/emulator_examples/global_timer_example_digital.py create mode 100644 examples/emulator_examples/global_timer_start_and_end_events.py create mode 100644 examples/emulator_examples/light_chasing_example_2_pokes_manual_override.py create mode 100644 examples/emulator_examples/light_chasing_example_3_pokes_manual_override.py create mode 100644 examples/emulator_examples/manual_override.py create mode 100644 examples/emulator_examples/simple_global_timer_example.py create mode 100644 examples/emulator_examples/uart_triggered_state_change_manual_override.py create mode 100644 examples/emulator_settings.py create mode 100644 pybpodapi/bpod/emulator/__init__.py create mode 100644 pybpodapi/bpod/emulator/constants.py create mode 100644 pybpodapi/bpod/emulator/emulator.py create mode 100644 pybpodapi/bpod/emulator/state.py diff --git a/examples/emulator_examples/10_state_example.py b/examples/emulator_examples/10_state_example.py new file mode 100644 index 0000000..324f382 --- /dev/null +++ b/examples/emulator_examples/10_state_example.py @@ -0,0 +1,16 @@ +from pybpodapi.protocol import Bpod, StateMachine +my_bpod = Bpod(emulator_mode=True) +sma = StateMachine(my_bpod) +for i in range(10): + sma.add_state( + state_name='State{}'.format(i), + state_timer=1, + state_change_conditions={Bpod.Events.Tup: 'State{}'.format(i + 1)}, + output_actions=[]) +sma.add_state( + state_name='State10', + state_timer=1, + state_change_conditions={Bpod.Events.Tup: 'exit'}, + output_actions=[]) +my_bpod.send_state_machine(sma) +my_bpod.run_state_machine(sma) diff --git a/examples/emulator_examples/10_state_example_w_global_timer.py b/examples/emulator_examples/10_state_example_w_global_timer.py new file mode 100644 index 0000000..fbd4898 --- /dev/null +++ b/examples/emulator_examples/10_state_example_w_global_timer.py @@ -0,0 +1,20 @@ +from pybpodapi.protocol import Bpod, StateMachine +my_bpod = Bpod(emulator_mode=True) +sma = StateMachine(my_bpod) +# Set global timer 1 for 3 seconds, following a 1.5 second onset delay after +# trigger. Link to LED of port 2. +sma.set_global_timer(timer_id=1, timer_duration=0, on_set_delay=1.5, + channel=Bpod.OutputChannels.PWM2, on_message=255) +for i in range(10): + sma.add_state( + state_name='State{}'.format(i), + state_timer=1, + state_change_conditions={Bpod.Events.Tup: 'State{}'.format(i + 1)}, + output_actions=[]) +sma.add_state( + state_name='State10', + state_timer=1, + state_change_conditions={Bpod.Events.Tup: 'exit'}, + output_actions=[]) +my_bpod.send_state_machine(sma) +my_bpod.run_state_machine(sma) diff --git a/examples/emulator_examples/1_state_example.py b/examples/emulator_examples/1_state_example.py new file mode 100644 index 0000000..74edf12 --- /dev/null +++ b/examples/emulator_examples/1_state_example.py @@ -0,0 +1,10 @@ +from pybpodapi.protocol import Bpod, StateMachine +my_bpod = Bpod(emulator_mode=True) +sma = StateMachine(my_bpod) +sma.add_state( + state_name='myState', + state_timer=1, + state_change_conditions={Bpod.Events.Tup: 'exit'}, + output_actions=[]) +my_bpod.send_state_machine(sma) +my_bpod.run_state_machine(sma) diff --git a/examples/emulator_examples/__init__.py b/examples/emulator_examples/__init__.py new file mode 100644 index 0000000..ec24c39 --- /dev/null +++ b/examples/emulator_examples/__init__.py @@ -0,0 +1,2 @@ +# !/usr/bin/python3 +# -*- coding: utf-8 -*- diff --git a/examples/emulator_examples/add_trial_events_2_manual_override.py b/examples/emulator_examples/add_trial_events_2_manual_override.py new file mode 100644 index 0000000..9b7f732 --- /dev/null +++ b/examples/emulator_examples/add_trial_events_2_manual_override.py @@ -0,0 +1,97 @@ +import random +import time +from pybpodapi.protocol import Bpod, StateMachine +from concurrent.futures import ThreadPoolExecutor +my_bpod = Bpod(emulator_mode=True) +nTrials = 5 +graceTime = 5 +port_numbers = [1, 2, 3] +trialTypes = [1, 2] # 1 (rewarded left) or 2 (rewarded right) +for i in range(nTrials): # Main loop + print('Trial: ', i + 1) + thisTrialType = random.choice(trialTypes) # Randomly choose trial type + if thisTrialType == 1: + # set stimulus channel for trial type 1 + stimulus = Bpod.OutputChannels.PWM1 + leftAction = 'Reward' + rightAction = 'Punish' + rewardValve = 1 + elif thisTrialType == 2: + # set stimulus channel for trial type 1 + stimulus = Bpod.OutputChannels.PWM3 + leftAction = 'Punish' + rightAction = 'Reward' + rewardValve = 3 + sma = StateMachine(my_bpod) + sma.set_global_timer_legacy( + timer_id=1, timer_duration=graceTime) # Set timeout + sma.add_state( + state_name='WaitForPort2Poke', + state_timer=1, + state_change_conditions={Bpod.Events.Port2In: 'FlashStimulus'}, + output_actions=[('PWM2', 255)]) + sma.add_state( + state_name='FlashStimulus', + state_timer=0.1, + state_change_conditions={Bpod.Events.Tup: 'WaitForResponse'}, + output_actions=[(stimulus, 255), + (Bpod.OutputChannels.GlobalTimerTrig, 1)]) + sma.add_state( + state_name='WaitForResponse', + state_timer=1, + state_change_conditions={Bpod.Events.Port1In: leftAction, + Bpod.Events.Port3In: rightAction, + Bpod.Events.Port2In: 'Warning', + Bpod.Events.GlobalTimer1_End: 'MiniPunish'}, + output_actions=[]) + sma.add_state( + state_name='Warning', + state_timer=0.1, + state_change_conditions={Bpod.Events.Tup: 'WaitForResponse', + Bpod.Events.GlobalTimer1_End: 'MiniPunish'}, + output_actions=[(Bpod.OutputChannels.LED, 1), + (Bpod.OutputChannels.LED, 2), + (Bpod.OutputChannels.LED, 3)]) # Reward correct choice + sma.add_state( + state_name='Reward', + state_timer=0.1, + state_change_conditions={Bpod.Events.Tup: 'exit'}, + # Reward correct choice + output_actions=[(Bpod.OutputChannels.Valve, rewardValve)]) + sma.add_state( + state_name='Punish', + state_timer=3, + state_change_conditions={Bpod.Events.Tup: 'exit'}, + # Signal incorrect choice + output_actions=[(Bpod.OutputChannels.LED, 1), + (Bpod.OutputChannels.LED, 2), + (Bpod.OutputChannels.LED, 3)]) + sma.add_state( + state_name='MiniPunish', + state_timer=1, + state_change_conditions={Bpod.Events.Tup: 'exit'}, + # Signal incorrect choice + output_actions=[(Bpod.OutputChannels.LED, 1), + (Bpod.OutputChannels.LED, 2), + (Bpod.OutputChannels.LED, 3)]) + # Send state machine description to Bpod device + my_bpod.send_state_machine(sma) + print("Waiting for poke. Reward: ", + 'left' if thisTrialType == 1 else 'right') + + def mouse(data): + time.sleep(3) + my_bpod.manual_override(Bpod.ChannelTypes.INPUT, 'Port', + channel_number=2, + value=12) + time.sleep(2) + my_bpod.manual_override(Bpod.ChannelTypes.INPUT, 'Port', + channel_number=random.choice(port_numbers), + value=12) + + executor = ThreadPoolExecutor(max_workers=1) + executor.submit(mouse, None) + + my_bpod.run_state_machine(sma) # Run state machine + print("Current trial info: {0}".format(my_bpod.session.current_trial)) +my_bpod.close() # Disconnect Bpod diff --git a/examples/emulator_examples/add_trial_events_manual_override.py b/examples/emulator_examples/add_trial_events_manual_override.py new file mode 100644 index 0000000..be31051 --- /dev/null +++ b/examples/emulator_examples/add_trial_events_manual_override.py @@ -0,0 +1,73 @@ +import random +import time +from pybpodapi.protocol import Bpod, StateMachine +from concurrent.futures import ThreadPoolExecutor +my_bpod = Bpod(emulator_mode=True) +nTrials = 5 +port_numbers = [1, 3] +trialTypes = [1, 2] # 1 (rewarded left) or 2 (rewarded right) +for i in range(nTrials): # Main loop + print('Trial: ', i + 1) + thisTrialType = random.choice(trialTypes) # Randomly choose trial type + if thisTrialType == 1: + # set stimulus channel for trial type 1 + stimulus = Bpod.OutputChannels.PWM1 + leftAction = 'Reward' + rightAction = 'Punish' + rewardValve = 1 + elif thisTrialType == 2: + # set stimulus channel for trial type 1 + stimulus = Bpod.OutputChannels.PWM3 + leftAction = 'Punish' + rightAction = 'Reward' + rewardValve = 3 + sma = StateMachine(my_bpod) + sma.add_state( + state_name='WaitForPort2Poke', + state_timer=1, + state_change_conditions={Bpod.Events.Port2In: 'FlashStimulus'}, + output_actions=[(Bpod.OutputChannels.PWM2, 255)]) + sma.add_state( + state_name='FlashStimulus', + state_timer=0.1, + state_change_conditions={Bpod.Events.Tup: 'WaitForResponse'}, + output_actions=[(stimulus, 255)]) + sma.add_state( + state_name='WaitForResponse', + state_timer=1, + state_change_conditions={ + Bpod.Events.Port1In: leftAction, Bpod.Events.Port3In: rightAction}, + output_actions=[]) + sma.add_state( + state_name='Reward', + state_timer=0.1, + state_change_conditions={Bpod.Events.Tup: 'exit'}, + # Reward correct choice + output_actions=[(Bpod.OutputChannels.Valve, rewardValve)]) + sma.add_state( + state_name='Punish', + state_timer=3, + state_change_conditions={Bpod.Events.Tup: 'exit'}, + # Signal incorrect choice + output_actions=[(Bpod.OutputChannels.LED, 1), + (Bpod.OutputChannels.LED, 2), + (Bpod.OutputChannels.LED, 3)]) + # Send state machine description to Bpod device + my_bpod.send_state_machine(sma) + print("Waiting for poke. Reward: ", + 'left' if thisTrialType == 1 else 'right') + + def mouse(data): + time.sleep(3) + my_bpod.manual_override(Bpod.ChannelTypes.INPUT, 'Port', + channel_number=2, value=12) + time.sleep(2) + my_bpod.manual_override(Bpod.ChannelTypes.INPUT, 'Port', + channel_number=random.choice(port_numbers), + value=12) + + executor = ThreadPoolExecutor(max_workers=1) + executor.submit(mouse, None) + my_bpod.run_state_machine(sma) # Run state machine + print("Current trial info: {0}".format(my_bpod.session.current_trial)) +my_bpod.close() # Disconnect Bpod diff --git a/examples/emulator_examples/bpod_info.py b/examples/emulator_examples/bpod_info.py new file mode 100644 index 0000000..1d4f8a5 --- /dev/null +++ b/examples/emulator_examples/bpod_info.py @@ -0,0 +1,9 @@ +from pybpodapi.protocol import Bpod +from confapp import conf +my_bpod = Bpod(emulator_mode=True) +my_bpod.close() +print("Target Bpod firmware version: ", conf.TARGET_BPOD_FIRMWARE_VERSION) +print("Firmware version (read from device): ", + my_bpod.hardware.firmware_version) +print("Machine type version (read from device): ", + my_bpod.hardware.machine_type) diff --git a/examples/emulator_examples/condition_example.py b/examples/emulator_examples/condition_example.py new file mode 100644 index 0000000..42a450b --- /dev/null +++ b/examples/emulator_examples/condition_example.py @@ -0,0 +1,25 @@ +from pybpodapi.protocol import Bpod, StateMachine +my_bpod = Bpod(emulator_mode=True) +sma = StateMachine(my_bpod) +sma.set_condition(condition_number=1, + condition_channel='Port2', channel_value=1) +sma.add_state( + state_name='Port1Light', + state_timer=1, + state_change_conditions={Bpod.Events.Tup: 'Port2Light'}, + output_actions=[(Bpod.OutputChannels.PWM1, 255)]) +sma.add_state( + state_name='Port2Light', + state_timer=1, + state_change_conditions={ + Bpod.Events.Tup: 'Port3Light', Bpod.Events.Condition1: 'Port3Light'}, + output_actions=[(Bpod.OutputChannels.PWM2, 255)]) +sma.add_state( + state_name='Port3Light', + state_timer=1, + state_change_conditions={Bpod.Events.Tup: 'exit'}, + output_actions=[(Bpod.OutputChannels.PWM3, 255)]) +my_bpod.send_state_machine(sma) +my_bpod.run_state_machine(sma) +print("Current trial info: {0}".format(my_bpod.session.current_trial)) +my_bpod.close() diff --git a/examples/emulator_examples/condition_example_manual_override.py b/examples/emulator_examples/condition_example_manual_override.py new file mode 100644 index 0000000..ad793c0 --- /dev/null +++ b/examples/emulator_examples/condition_example_manual_override.py @@ -0,0 +1,38 @@ +import time +from pybpodapi.protocol import Bpod, StateMachine +from concurrent.futures import ThreadPoolExecutor +my_bpod = Bpod(emulator_mode=True) +sma = StateMachine(my_bpod) +sma.set_condition(condition_number=1, + condition_channel='Port2', channel_value=1) +sma.add_state( + state_name='Port1Light', + state_timer=1, + state_change_conditions={Bpod.Events.Tup: 'Port2Light'}, + output_actions=[(Bpod.OutputChannels.PWM1, 255)]) +sma.add_state( + state_name='Port2Light', + state_timer=1, + state_change_conditions={ + Bpod.Events.Tup: 'Port1Light', Bpod.Events.Condition1: 'Port3Light'}, + output_actions=[(Bpod.OutputChannels.PWM2, 255)]) +sma.add_state( + state_name='Port3Light', + state_timer=1, + state_change_conditions={Bpod.Events.Tup: 'exit'}, + output_actions=[(Bpod.OutputChannels.PWM3, 255)]) +my_bpod.send_state_machine(sma) + + +def mouse(data): + time.sleep(5) + my_bpod.manual_override(Bpod.ChannelTypes.INPUT, 'Port', + channel_number=2, value=12) + + +executor = ThreadPoolExecutor(max_workers=1) +executor.submit(mouse, None) + +my_bpod.run_state_machine(sma) +print("Current trial info: {0}".format(my_bpod.session.current_trial)) +my_bpod.close() diff --git a/examples/emulator_examples/global_counter_example_manual_override.py b/examples/emulator_examples/global_counter_example_manual_override.py new file mode 100644 index 0000000..a193583 --- /dev/null +++ b/examples/emulator_examples/global_counter_example_manual_override.py @@ -0,0 +1,46 @@ +import time +from pybpodapi.protocol import Bpod, StateMachine +from concurrent.futures import ThreadPoolExecutor +my_bpod = Bpod(emulator_mode=True) +sma = StateMachine(my_bpod) +sma.set_global_counter(counter_number=1, target_event='Port1In', threshold=5) +sma.add_state( + state_name='InitialDelay', + state_timer=2, + state_change_conditions={Bpod.Events.Tup: 'ResetGlobalCounter1'}, + output_actions=[(Bpod.OutputChannels.PWM2, 255)]) +sma.add_state( + state_name='ResetGlobalCounter1', + state_timer=0, + state_change_conditions={Bpod.Events.Tup: 'Port1Lit'}, + output_actions=[(Bpod.OutputChannels.GlobalCounterReset, 1)]) +sma.add_state( + # Infinite loop (with next state). Only a global counter can save us. + state_name='Port1Lit', + state_timer=.25, + state_change_conditions={ + Bpod.Events.Tup: 'Port3Lit', 'GlobalCounter1_End': 'exit'}, + output_actions=[(Bpod.OutputChannels.PWM1, 255)]) +sma.add_state( + state_name='Port3Lit', + state_timer=.25, + state_change_conditions={ + Bpod.Events.Tup: 'Port1Lit', 'GlobalCounter1_End': 'exit'}, + output_actions=[(Bpod.OutputChannels.PWM3, 255)]) +my_bpod.send_state_machine(sma) + + +def mouse(data): + time.sleep(1) + for _ in range(5): + time.sleep(1) + my_bpod.manual_override(Bpod.ChannelTypes.INPUT, 'Port', + channel_number=1, value=12) + + +executor = ThreadPoolExecutor(max_workers=1) +executor.submit(mouse, None) + +my_bpod.run_state_machine(sma) +print("Current trial info: {0}".format(my_bpod.session.current_trial)) +my_bpod.close() diff --git a/examples/emulator_examples/global_timer_example.py b/examples/emulator_examples/global_timer_example.py new file mode 100644 index 0000000..369040a --- /dev/null +++ b/examples/emulator_examples/global_timer_example.py @@ -0,0 +1,27 @@ +from pybpodapi.protocol import Bpod, StateMachine +my_bpod = Bpod(emulator_mode=True) +sma = StateMachine(my_bpod) +# Set global timer 1 for 3 seconds +sma.set_global_timer_legacy(timer_id=1, timer_duration=3) +sma.add_state( + state_name='TimerTrig', # Trigger global timer + state_timer=0, + state_change_conditions={Bpod.Events.Tup: 'Port1Lit'}, + output_actions=[(Bpod.OutputChannels.GlobalTimerTrig, 1)]) +sma.add_state( + # Infinite loop (with next state). Only a global timer can save us. + state_name='Port1Lit', + state_timer=.25, + state_change_conditions={Bpod.Events.Tup: 'Port3Lit', + Bpod.Events.GlobalTimer1_End: 'exit'}, + output_actions=[(Bpod.OutputChannels.PWM1, 255)]) +sma.add_state( + state_name='Port3Lit', + state_timer=.25, + state_change_conditions={Bpod.Events.Tup: 'Port1Lit', + Bpod.Events.GlobalTimer1_End: 'exit'}, + output_actions=[(Bpod.OutputChannels.PWM3, 255)]) +my_bpod.send_state_machine(sma) +my_bpod.run_state_machine(sma) +print("Current trial info: {0}".format(my_bpod.session.current_trial)) +my_bpod.close() diff --git a/examples/emulator_examples/global_timer_example_digital.py b/examples/emulator_examples/global_timer_example_digital.py new file mode 100644 index 0000000..953de2d --- /dev/null +++ b/examples/emulator_examples/global_timer_example_digital.py @@ -0,0 +1,29 @@ +from pybpodapi.protocol import Bpod, StateMachine +my_bpod = Bpod(emulator_mode=True) +sma = StateMachine(my_bpod) +# Set global timer 1 for 3 seconds, following a 1.5 second onset delay after +# trigger. Link to channel BNC2. +sma.set_global_timer(timer_id=1, timer_duration=3, + on_set_delay=1.5, channel='BNC2') +sma.add_state( + state_name='TimerTrig', # Trigger global timer + state_timer=0, + state_change_conditions={Bpod.Events.Tup: 'Port1Lit'}, + output_actions=[(Bpod.OutputChannels.GlobalTimerTrig, 1)]) +sma.add_state( + # Infinite loop (with next state). Only a global timer can save us. + state_name='Port1Lit', + state_timer=.25, + state_change_conditions={ + Bpod.Events.Tup: 'Port3Lit', 'GlobalTimer1_End': 'exit'}, + output_actions=[(Bpod.OutputChannels.PWM1, 255)]) +sma.add_state( + state_name='Port3Lit', + state_timer=.25, + state_change_conditions={ + Bpod.Events.Tup: 'Port1Lit', 'GlobalTimer1_End': 'exit'}, + output_actions=[(Bpod.OutputChannels.PWM3, 255)]) +my_bpod.send_state_machine(sma) +my_bpod.run_state_machine(sma) +print("Current trial info: {0}".format(my_bpod.session.current_trial)) +my_bpod.close() diff --git a/examples/emulator_examples/global_timer_start_and_end_events.py b/examples/emulator_examples/global_timer_start_and_end_events.py new file mode 100644 index 0000000..b22047b --- /dev/null +++ b/examples/emulator_examples/global_timer_start_and_end_events.py @@ -0,0 +1,42 @@ +from pybpodapi.protocol import Bpod, StateMachine +my_bpod = Bpod(emulator_mode=True) +sma = StateMachine(my_bpod) +# Set global timer 1 for 3 seconds, following a 1.5 second onset delay after +# trigger. Link to LED of port 2. +sma.set_global_timer(timer_id=1, timer_duration=3, on_set_delay=1.5, + channel=Bpod.OutputChannels.PWM2, on_message=255) +sma.add_state( + state_name='TimerTrig', # Trigger global timer + state_timer=0, + state_change_conditions={Bpod.Events.Tup: 'Port1Lit_Pre'}, + output_actions=[('GlobalTimerTrig', 1)]) +sma.add_state( + state_name='Port1Lit_Pre', + state_timer=.25, + state_change_conditions={Bpod.Events.Tup: 'Port3Lit_Pre', + Bpod.Events.GlobalTimer1_Start: 'Port1Lit_Post'}, + output_actions=[(Bpod.OutputChannels.PWM1, 16)]) +sma.add_state( + state_name='Port3Lit_Pre', + state_timer=.25, + state_change_conditions={Bpod.Events.Tup: 'Port1Lit_Pre', + Bpod.Events.GlobalTimer1_Start: 'Port3Lit_Post'}, + output_actions=[(Bpod.OutputChannels.PWM3, 16)]) +sma.add_state( + state_name='Port1Lit_Post', + state_timer=.25, + state_change_conditions={ + Bpod.Events.Tup: 'Port3Lit_Post', + Bpod.Events.GlobalTimer1_End: 'exit'}, + output_actions=[(Bpod.OutputChannels.PWM1, 255)]) +sma.add_state( + state_name='Port3Lit_Post', + state_timer=.25, + state_change_conditions={ + Bpod.Events.Tup: 'Port1Lit_Post', + Bpod.Events.GlobalTimer1_End: 'exit'}, + output_actions=[(Bpod.OutputChannels.PWM3, 255)]) +my_bpod.send_state_machine(sma) +my_bpod.run_state_machine(sma) +print("Current trial info: {0}".format(my_bpod.session.current_trial)) +my_bpod.close() diff --git a/examples/emulator_examples/light_chasing_example_2_pokes_manual_override.py b/examples/emulator_examples/light_chasing_example_2_pokes_manual_override.py new file mode 100644 index 0000000..2666498 --- /dev/null +++ b/examples/emulator_examples/light_chasing_example_2_pokes_manual_override.py @@ -0,0 +1,49 @@ +import time +from pybpodapi.protocol import Bpod, StateMachine +from concurrent.futures import ThreadPoolExecutor +my_bpod = Bpod(emulator_mode=True) +sma = StateMachine(my_bpod) +sma.add_state( + state_name='Port1Active1', # Add a state + state_timer=0, + state_change_conditions={Bpod.Events.Port1In: 'Port2Active1'}, + output_actions=[(Bpod.OutputChannels.PWM1, 255)]) +sma.add_state( + state_name='Port2Active1', + state_timer=0, + state_change_conditions={Bpod.Events.Port2In: 'Port1Active2'}, + output_actions=[(Bpod.OutputChannels.PWM2, 255)]) +sma.add_state( + state_name='Port1Active2', + state_timer=0, + state_change_conditions={Bpod.Events.Port1In: 'Port2Active2'}, + output_actions=[(Bpod.OutputChannels.PWM1, 255)]) +sma.add_state( + state_name='Port2Active2', + state_timer=0, + state_change_conditions={Bpod.Events.Port2In: 'exit'}, + output_actions=[(Bpod.OutputChannels.PWM2, 255)]) +my_bpod.send_state_machine(sma) + + +def mouse(data): + time.sleep(3) + my_bpod.manual_override(Bpod.ChannelTypes.INPUT, 'Port', + channel_number=1, value=12) + time.sleep(3) + my_bpod.manual_override(Bpod.ChannelTypes.INPUT, 'Port', + channel_number=2, value=12) + time.sleep(3) + my_bpod.manual_override(Bpod.ChannelTypes.INPUT, 'Port', + channel_number=1, value=12) + time.sleep(3) + my_bpod.manual_override(Bpod.ChannelTypes.INPUT, 'Port', + channel_number=2, value=12) + + +executor = ThreadPoolExecutor(max_workers=1) +executor.submit(mouse, None) + +my_bpod.run_state_machine(sma) +print("Current trial info: {0}".format(my_bpod.session.current_trial)) +my_bpod.close() diff --git a/examples/emulator_examples/light_chasing_example_3_pokes_manual_override.py b/examples/emulator_examples/light_chasing_example_3_pokes_manual_override.py new file mode 100644 index 0000000..47bd3c5 --- /dev/null +++ b/examples/emulator_examples/light_chasing_example_3_pokes_manual_override.py @@ -0,0 +1,65 @@ +import time +from pybpodapi.protocol import Bpod, StateMachine +from concurrent.futures import ThreadPoolExecutor +my_bpod = Bpod(emulator_mode=True) +sma = StateMachine(my_bpod) +sma.add_state( + state_name='Port1Active1', # Add a state + state_timer=0, + state_change_conditions={Bpod.Events.Port1In: 'Port2Active1'}, + output_actions=[(Bpod.OutputChannels.PWM1, 255)]) +sma.add_state( + state_name='Port2Active1', + state_timer=0, + state_change_conditions={Bpod.Events.Port2In: 'Port3Active1'}, + output_actions=[(Bpod.OutputChannels.PWM2, 255)]) +sma.add_state( + state_name='Port3Active1', + state_timer=0, + state_change_conditions={Bpod.Events.Port3In: 'Port1Active2'}, + output_actions=[(Bpod.OutputChannels.PWM3, 255)]) +sma.add_state( + state_name='Port1Active2', + state_timer=0, + state_change_conditions={Bpod.Events.Port1In: 'Port2Active2'}, + output_actions=[(Bpod.OutputChannels.PWM1, 255)]) +sma.add_state( + state_name='Port2Active2', + state_timer=0, + state_change_conditions={Bpod.Events.Port2In: 'Port3Active2'}, + output_actions=[(Bpod.OutputChannels.PWM2, 255)]) +sma.add_state( + state_name='Port3Active2', + state_timer=0, + state_change_conditions={Bpod.Events.Port3In: 'exit'}, + output_actions=[(Bpod.OutputChannels.PWM3, 255)]) +my_bpod.send_state_machine(sma) + + +def mouse(data): + time.sleep(3) + my_bpod.manual_override(Bpod.ChannelTypes.INPUT, 'Port', + channel_number=1, value=12) + time.sleep(3) + my_bpod.manual_override(Bpod.ChannelTypes.INPUT, 'Port', + channel_number=2, value=12) + time.sleep(3) + my_bpod.manual_override(Bpod.ChannelTypes.INPUT, 'Port', + channel_number=3, value=12) + time.sleep(3) + my_bpod.manual_override(Bpod.ChannelTypes.INPUT, 'Port', + channel_number=1, value=12) + time.sleep(3) + my_bpod.manual_override(Bpod.ChannelTypes.INPUT, 'Port', + channel_number=2, value=12) + time.sleep(3) + my_bpod.manual_override(Bpod.ChannelTypes.INPUT, 'Port', + channel_number=3, value=12) + + +executor = ThreadPoolExecutor(max_workers=1) +executor.submit(mouse, None) + +my_bpod.run_state_machine(sma) +print("Current trial info: {0}".format(my_bpod.session.current_trial)) +my_bpod.close() diff --git a/examples/emulator_examples/manual_override.py b/examples/emulator_examples/manual_override.py new file mode 100644 index 0000000..2dc4b3d --- /dev/null +++ b/examples/emulator_examples/manual_override.py @@ -0,0 +1,108 @@ +import time +from pybpodapi.protocol import Bpod +import examples.emulator_settings as settings +my_bpod = Bpod(emulator_mode=True) +wait_active_time_ms = 2 +# INPUTS - BNC (1, 2) +print("Set BNC1 (Input) to a value") +my_bpod.manual_override(Bpod.ChannelTypes.INPUT, + Bpod.ChannelNames.BNC, channel_number=1, value=12) +time.sleep(wait_active_time_ms) +print("Set BNC2 (Input) to a value") +my_bpod.manual_override(Bpod.ChannelTypes.INPUT, + Bpod.ChannelNames.BNC, channel_number=2, value=15) +time.sleep(wait_active_time_ms) +# INPUTS - PWM (1..4) +print("Set PWM1 (Input) to a value") +my_bpod.manual_override(Bpod.ChannelTypes.INPUT, 'Port', + channel_number=1, value=12) +time.sleep(wait_active_time_ms) +print("Set PWM2 (Input) to a value") +my_bpod.manual_override(Bpod.ChannelTypes.INPUT, 'Port', + channel_number=2, value=13) +time.sleep(wait_active_time_ms) +print("Set PWM3 (Input) to a value") +my_bpod.manual_override(Bpod.ChannelTypes.INPUT, 'Port', + channel_number=3, value=14) +time.sleep(wait_active_time_ms) +print("Set PWM3 (Input) to a value") +my_bpod.manual_override(Bpod.ChannelTypes.INPUT, 'Port', + channel_number=4, value=15) +time.sleep(wait_active_time_ms) +# PORT 1 LED +print("Set LED of port 1 to max intensity") +my_bpod.manual_override(Bpod.ChannelTypes.OUTPUT, + Bpod.ChannelNames.PWM, channel_number=1, value=255) +time.sleep(wait_active_time_ms) +print("Set LED of port 1 to lower intensity") +my_bpod.manual_override(Bpod.ChannelTypes.OUTPUT, + Bpod.ChannelNames.PWM, channel_number=1, value=8) +time.sleep(wait_active_time_ms) +print("Set LED of port 1 to zero intensity") +my_bpod.manual_override(Bpod.ChannelTypes.OUTPUT, + Bpod.ChannelNames.PWM, channel_number=1, value=0) +time.sleep(1) +# PORT 2 LED +print("Set LED of port 2 to max intensity") +my_bpod.manual_override(Bpod.ChannelTypes.OUTPUT, + Bpod.ChannelNames.PWM, channel_number=2, value=255) +time.sleep(wait_active_time_ms) +print("Set LED of port 2 to lower intensity") +my_bpod.manual_override(Bpod.ChannelTypes.OUTPUT, + Bpod.ChannelNames.PWM, channel_number=2, value=8) +time.sleep(wait_active_time_ms) +print("Set LED of port 2 to zero intensity") +my_bpod.manual_override(Bpod.ChannelTypes.OUTPUT, + Bpod.ChannelNames.PWM, channel_number=2, value=0) +time.sleep(1) # Wait 1s +# PORT 1 VALVE +print("Set valve of port 1 to open") +my_bpod.manual_override(Bpod.ChannelTypes.OUTPUT, + Bpod.ChannelNames.VALVE, 1, value=1) +time.sleep(wait_active_time_ms) +print("Set valve of port 1 to close") +my_bpod.manual_override(Bpod.ChannelTypes.OUTPUT, + Bpod.ChannelNames.VALVE, 1, value=0) +time.sleep(1) # Wait 1s +# PORT 3 VALVE +print("Set valve of port 3 to open") +my_bpod.manual_override(Bpod.ChannelTypes.OUTPUT, + Bpod.ChannelNames.VALVE, channel_number=3, value=1) +time.sleep(wait_active_time_ms) # Wait 250ms +print("Set valve of port 3 to close") +my_bpod.manual_override(Bpod.ChannelTypes.OUTPUT, + Bpod.ChannelNames.VALVE, channel_number=3, value=0) +time.sleep(1) # Wait 1s +# PORT 2 BNC +print("Set BNC output ch2 to high") +my_bpod.manual_override(Bpod.ChannelTypes.OUTPUT, + Bpod.ChannelNames.BNC, channel_number=2, value=1) +time.sleep(0.01) # Wait 10ms +print("Set BNC output ch2 to low") +my_bpod.manual_override(Bpod.ChannelTypes.OUTPUT, + Bpod.ChannelNames.BNC, channel_number=2, value=0) +time.sleep(1) # Wait 1s +if settings.EMULATOR_BPOD_MACHINE_TYPE == 1: + # PORT 3 Wire + print("Set Wire output ch3 to high") + my_bpod.manual_override(Bpod.ChannelTypes.OUTPUT, + Bpod.ChannelNames.WIRE, + channel_number=3, value=1) + time.sleep(0.01) # Wait 10ms + print("Set Wire output ch3 to low") + my_bpod.manual_override(Bpod.ChannelTypes.OUTPUT, + Bpod.ChannelNames.WIRE, + channel_number=3, value=0) + time.sleep(1) # Wait 1s +# PORT 2 Serial +print("Send byte 65 on UART port 2") +my_bpod.manual_override(Bpod.ChannelTypes.OUTPUT, + Bpod.ChannelNames.SERIAL, channel_number=2, value=65) +time.sleep(0.01) # Wait 10ms +print("Send byte 66 on UART port 1") +my_bpod.manual_override(Bpod.ChannelTypes.OUTPUT, + Bpod.ChannelNames.SERIAL, channel_number=1, value=66) +# Stop Bpod +# Sends a termination byte and closes the serial port. PulsePal stores current +# params to its EEPROM. +my_bpod.close() diff --git a/examples/emulator_examples/simple_global_timer_example.py b/examples/emulator_examples/simple_global_timer_example.py new file mode 100644 index 0000000..1af55e2 --- /dev/null +++ b/examples/emulator_examples/simple_global_timer_example.py @@ -0,0 +1,25 @@ +from pybpodapi.protocol import Bpod, StateMachine +my_bpod = Bpod(emulator_mode=True) +sma = StateMachine(my_bpod) +# Set global timer 1 for 5 seconds +sma.set_global_timer(timer_id=1, timer_duration=5) +sma.add_state( + state_name='state1', + state_timer=1, + state_change_conditions={Bpod.Events.Tup: 'state2'}, + output_actions=[(Bpod.OutputChannels.GlobalTimerTrig, 1)]) +for i in range(2, 10): + sma.add_state( + state_name=f'state{i}', + state_timer=0.2, + state_change_conditions={Bpod.Events.Tup: f'state{i+1}'}, + output_actions=[]) +# The next one shouldn't fire except after 15 seconds +sma.add_state( + state_name='state10', + state_timer=0, + state_change_conditions={Bpod.Events.GlobalTimer1_End: 'exit'}, + output_actions=[]) +my_bpod.send_state_machine(sma) +my_bpod.run_state_machine(sma) +my_bpod.close() diff --git a/examples/emulator_examples/uart_triggered_state_change_manual_override.py b/examples/emulator_examples/uart_triggered_state_change_manual_override.py new file mode 100644 index 0000000..0089a9d --- /dev/null +++ b/examples/emulator_examples/uart_triggered_state_change_manual_override.py @@ -0,0 +1,30 @@ +import time +from pybpodapi.protocol import Bpod, StateMachine +from concurrent.futures import ThreadPoolExecutor +my_bpod = Bpod(emulator_mode=True) +sma = StateMachine(my_bpod) +sma.add_state( + state_name='Port1Light', + state_timer=0, + # Go to Port2Light when byte 0x3 arrives on UART port 2 + state_change_conditions={Bpod.Events.Serial2_3: 'Port2Light'}, + output_actions=[(Bpod.OutputChannels.PWM1, 255)]) +sma.add_state( + state_name='Port2Light', + state_timer=0, + state_change_conditions={Bpod.Events.Tup: 'exit'}, + output_actions=[(Bpod.OutputChannels.PWM2, 255)]) +my_bpod.send_state_machine(sma) + + +def worker(data): + time.sleep(5) + my_bpod.trigger_event_by_name(Bpod.Events.Serial2_3) + + +executor = ThreadPoolExecutor(max_workers=1) +executor.submit(worker, None) + +my_bpod.run_state_machine(sma) +print("Current trial info: ", my_bpod.session.current_trial) +my_bpod.close() diff --git a/examples/emulator_settings.py b/examples/emulator_settings.py new file mode 100644 index 0000000..06b146a --- /dev/null +++ b/examples/emulator_settings.py @@ -0,0 +1,23 @@ +# !/usr/bin/python3 +# -*- coding: utf-8 -*- +import logging + +PYBPOD_API_LOG_LEVEL = logging.DEBUG +PYBPOD_API_LOG_FILE = "pybpod-api.log" + +PYBPOD_API_MODULES = [ + # 'pybpod_rotaryencoder_module' +] + +PYBPOD_API_STREAM2STDOUT = False + +PYBPOD_API_ACCEPT_STDIN = False + +TARGET_BPOD_FIRMWARE_VERSION = "22" +EMULATOR_BPOD_MACHINE_TYPE = 3 + +PYBPOD_SERIAL_PORT = None + +BPOD_BNC_PORTS_ENABLED = [True, True] +BPOD_WIRED_PORTS_ENABLED = [True, True] +BPOD_BEHAVIOR_PORTS_ENABLED = [True, True, True, True, True, True, True, True] diff --git a/pybpodapi/bpod/bpod_base.py b/pybpodapi/bpod/bpod_base.py index 6e62262..52020c7 100644 --- a/pybpodapi/bpod/bpod_base.py +++ b/pybpodapi/bpod/bpod_base.py @@ -14,6 +14,8 @@ from pybpodapi.bpod.hardware.events import EventName from pybpodapi.bpod.hardware.output_channels import OutputChannel +from pybpodapi.bpod.emulator import Emulator + from pybpodapi.bpod_modules.bpod_modules import BpodModules from pybpodapi.exceptions.bpod_error import BpodErrorException @@ -59,7 +61,7 @@ class ChannelNames(ChannelName): CHECK_STATE_MACHINE_COUNTER = 0 - def __init__(self, serial_port=None, sync_channel=None, sync_mode=None, net_port=None): + def __init__(self, serial_port=None, sync_channel=None, sync_mode=None, net_port=None, emulator_mode=False): self._session = self.create_session() self.serial_port = serial_port if serial_port is not None else settings.PYBPOD_SERIAL_PORT @@ -68,7 +70,13 @@ def __init__(self, serial_port=None, sync_channel=None, sync_mode=None, net_port self.sync_mode = sync_mode if sync_mode is not None else settings.PYBPOD_SYNC_MODE self.net_port = net_port if net_port is not None else settings.PYBPOD_NET_PORT self._hardware = Hardware() # type: Hardware - self.bpod_modules = None # type: BpodModules + if emulator_mode: + self._emulator = Emulator(self._hardware) + self.__initialize_input_command_handler() + self.bpod_modules = self._emulator.bpod_modules + else: + self._emulator = None + self.bpod_modules = None # type: BpodModules self.bpod_start_timestamp = None self._new_sma_sent = False # type: bool @@ -163,18 +171,7 @@ def open(self): self._hardware.setup(self.bpod_modules) - # initialise the server to handle commands - if self.net_port is not None: - self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self.sock.bind(('0.0.0.0', self.net_port)) - self.socketin = NonBlockingSocketReceive(self.sock) - else: - self.sock = None - self.socketin = None - - # initialise the thread that will handle the stdin commands - self.stdin = NonBlockingStreamReader(sys.stdin) if settings.PYBPOD_API_ACCEPT_STDIN else None - ##################################################### + self.__initialize_input_command_handler() return self @@ -188,7 +185,8 @@ def close(self): for varname in settings.PYBPOD_VARSNAMES: self.session += ValueMessage(varname, getattr(settings, varname)) - self._bpodcom_disconnect() + if self._emulator is None: + self._bpodcom_disconnect() del self._session @@ -222,7 +220,7 @@ def send_state_machine(self, sma, run_asap=None): :param pybpodapi.model.state_machine sma: initialized state machine """ - if not self.bpod_com_ready: + if self._emulator is None and not self.bpod_com_ready: raise Exception('Bpod connection is closed') if self._skip_all_trials is True: @@ -232,9 +230,13 @@ def send_state_machine(self, sma, run_asap=None): sma.update_state_numbers() - state_machine_body = sma.build_message() + sma.build_message_global_timer() + sma.build_message_32_bits() + if self._emulator: + self._emulator.set_state_machine(sma) + self._emulator.log_state_machine_info() + else: + state_machine_body = sma.build_message() + sma.build_message_global_timer() + sma.build_message_32_bits() - self._bpodcom_send_state_machine(sma.build_header(run_asap, len(state_machine_body)) + state_machine_body) + self._bpodcom_send_state_machine(sma.build_header(run_asap, len(state_machine_body)) + state_machine_body) self._new_sma_sent = True @@ -259,7 +261,7 @@ def run_state_machine(self, sma): :param (:class:`pybpodapi.state_machine.StateMachine`) sma: initialized state machine """ - if not self.bpod_com_ready: + if self._emulator is None and not self.bpod_com_ready: raise Exception('Bpod connection is closed') if self._skip_all_trials is True: @@ -271,14 +273,20 @@ def run_state_machine(self, sma): self.trial_timestamps = [] # Store the trial timestamps in case bpod is using live_timestamps - self._bpodcom_run_state_machine() - if self._new_sma_sent: - if self._bpodcom_state_machine_installation_status(): - self._new_sma_sent = False - else: - raise BpodErrorException('Error: The last state machine sent was not acknowledged by the Bpod device.', self) - - self.trial_start_timestamp = self._bpodcom_get_trial_timestamp_start() + if self._emulator: + self._emulator.set_state_machine(sma) + self._emulator.initialize() + self._emulator.mirror_state(sma.current_state) + # TODO: Do the BpodSystem.RefreshGUI equivalent + self.trial_start_timestamp = self._emulator.matrix_start_time + else: + self._bpodcom_run_state_machine() + if self._new_sma_sent: + if self._bpodcom_state_machine_installation_status(): + self._new_sma_sent = False + else: + raise BpodErrorException('Error: The last state machine sent was not acknowledged by the Bpod device.', self) + self.trial_start_timestamp = self._bpodcom_get_trial_timestamp_start() if self.bpod_start_timestamp is None: self.bpod_start_timestamp = self.trial_start_timestamp @@ -287,6 +295,7 @@ def run_state_machine(self, sma): # create a list of executed states state_change_indexes = [] + pause_task = False # flags used to stop a trial (or all trials) interrupt_task = False kill_task = False @@ -298,7 +307,7 @@ def run_state_machine(self, sma): if self.stdin is not None: inline = self.stdin.readline() if inline is not None: - interrupt_task, kill_task = self.handle_inline(inline, sma) + pause_task, interrupt_task, kill_task = self.handle_inline(inline, sma) ##################################################### # read commands from a net socket ################### @@ -307,12 +316,20 @@ def run_state_machine(self, sma): if inline is not None: inline = inline.decode().strip() - interrupt_task, kill_task = self.handle_inline(inline, sma) + pause_task, interrupt_task, kill_task = self.handle_inline(inline, sma) ##################################################### + if pause_task: + continue - if self.data_available(): + opcode, data = None, None + if self._emulator: + opcode, data = self._emulator.run() + elif self.data_available(): opcode, data = self._bpodcom_read_opcode_message() - self.__process_opcode(sma, opcode, data, state_change_indexes) + if opcode is not None and data is not None and \ + not self.__process_opcode( + sma, opcode, data, state_change_indexes): + break self.loop_handler() @@ -320,6 +337,9 @@ def run_state_machine(self, sma): self._skip_all_trials = True break + if self._emulator: + self._emulator.mirror_state(None) + self.session += EndTrial('The trial ended') if not interrupt_task: @@ -335,19 +355,31 @@ def run_state_machine(self, sma): return not interrupt_task def handle_inline(self, inline, sma): + pause_task = False interrupt_task = False kill_task = False if inline.startswith('pause-trial'): - self.pause() + if self._emulator: + pause_task = True + else: + self.pause() elif inline.startswith('resume-trial'): - self.resume() + if self._emulator: + pause_task = False + else: + self.resume() elif inline.startswith('stop-trial'): - self.stop_trial() + if self._emulator: + interrupt_task = True + else: + self.stop_trial() elif inline.startswith('close'): - self.stop_trial() + if self._emulator is None: + self.stop_trial() interrupt_task = True elif inline.startswith('kill'): - self.stop_trial() + if self._emulator is None: + self.stop_trial() interrupt_task = kill_task = True elif inline.startswith('SoftCode'): softcode = int(inline[-1]) - 1 @@ -378,7 +410,7 @@ def handle_inline(self, inline, sma): final_msg.append(int(x)) self.load_message(module_index, final_msg) - return interrupt_task, kill_task + return pause_task, interrupt_task, kill_task def load_serial_message(self, serial_channel, message_ID, serial_message): """ @@ -413,17 +445,39 @@ def softcode_handler_function(self, data): def echo_softcode(self, softcode): return self._bpodcom_echo_softcode(softcode) + def trigger_event_by_name(self, event_name): + if self._emulator: + self._emulator.add_manual_override_event(event_name) + def trigger_event(self, event_index, event_data): - return self._bpodcom_manual_override_exec_event(event_index, event_data) + if self._emulator: + self._emulator.add_manual_override_event(event_index) + else: + return self._bpodcom_manual_override_exec_event(event_index, event_data) def trigger_input(self, channel_number, value): - return self._bpodcom_override_input_state(channel_number, value) + if self._emulator: + self._emulator.override_input_state(channel_number, int(value)) + else: + return self._bpodcom_override_input_state(channel_number, value) def trigger_output(self, channel_number, value): - return self._bpodcom_override_digital_hardware_state(channel_number, value) + if self._emulator: + self._emulator.serial_override(channel_number, int(value)) + else: + return self._bpodcom_override_digital_hardware_state(channel_number, value) def trigger_softcode(self, softcode): - return self._bpodcom_send_softcode(softcode) + if self._emulator: + self.softcode_handler_function(softcode) + else: + return self._bpodcom_send_softcode(softcode) + + def trigger_serial(self, channel_number, value): + if self._emulator: + self._emulator.serial_override(channel_number, value) + else: + return self._bpodcom_send_byte_to_hardware_serial(channel_number, value) def load_message(self, module_index, msg): # get module reference @@ -439,6 +493,60 @@ def load_message(self, module_index, msg): def create_session(self): return Session() + def __initialize_input_command_handler(self): + # Initialize the server to handle commands. + if self.net_port is None: + self.sock = None + self.socketin = None + else: + self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.sock.bind(('0.0.0.0', self.net_port)) + self.socketin = NonBlockingSocketReceive(self.sock) + + # Initialize the thread that will handle the stdin commands. + self.stdin = NonBlockingStreamReader( + sys.stdin) if settings.PYBPOD_API_ACCEPT_STDIN else None + + def __transition_to_new_state(self, sma, event_id, transition_matrix, + current_trial, state_change_indexes, + is_state_timer_matrix=False, + debug_message=None): + new_state_set = False + + def set_sma_current_state(new_state): + previous_state = sma.current_state + if sma.use_255_back_signal and new_state == 255: + sma.current_state = current_trial.states[-2] + else: + sma.current_state = new_state + logger.debug(('Transition occured: ' + f'state {previous_state} -> {sma.current_state}')) + if not math.isnan(sma.current_state): + if debug_message is not None: + logger.debug(debug_message) + current_trial.states.append(sma.current_state) + state_change_indexes.append( + len(current_trial.events_occurrences) - 1) + current_state = sma.current_state + if is_state_timer_matrix: + this_state_timer_state = transition_matrix[current_state] + is_event_id_tup = event_id == sma.hardware.channels.events_positions.Tup + if is_event_id_tup and this_state_timer_state != current_state: + set_sma_current_state(this_state_timer_state) + new_state_set = True + else: + for transition_event_code, transition_state in transition_matrix[ + current_state]: + if transition_event_code == event_id: + set_sma_current_state(transition_state) + new_state_set = True + else: + logger.debug((f'Event {transition_event_code} required ' + f'for transition: state ' + f'{sma.current_state} -> ' + f'{transition_state}')) + return new_state_set + def __process_opcode(self, sma, opcode, data, state_change_indexes): """ Process data from bpod board given an opcode @@ -455,11 +563,16 @@ def __process_opcode(self, sma, opcode, data, state_change_indexes): current_trial = self.session.current_trial if opcode == 1: # Read events - n_current_events = data - current_events = self._bpodcom_read_current_events(n_current_events) + if self._emulator: + current_events = data # virtual current events + logger.debug(f'virtual_current_events: {current_events}') + else: + n_current_events = data + current_events = self._bpodcom_read_current_events(n_current_events) + transition_event_found = False - if self.hardware.live_timestamps: + if self.hardware.live_timestamps and self._emulator is None: event_timestamp = self._bpodcom_read_event_timestamp() else: event_timestamp = None @@ -475,82 +588,60 @@ def __process_opcode(self, sma, opcode, data, state_change_indexes): ) self.trial_timestamps.append(event_timestamp) - # input matrix - if not transition_event_found: - logger.debug("transition event not found") - logger.debug("Current state: %s", sma.current_state) - for transition in sma.input_matrix[sma.current_state]: - logger.debug("Transition: %s", transition) - if transition[0] == event_id: - if sma.use_255_back_signal and transition[1] == 255: - sma.current_state = current_trial.states[-2] - else: - sma.current_state = transition[1] - - if not math.isnan(sma.current_state): - logger.debug("adding states input matrix") - current_trial.states.append(sma.current_state) - state_change_indexes.append(len(current_trial.events_occurrences) - 1) - - transition_event_found = True - - # state timer matrix - if not transition_event_found: - this_state_timer_transition = sma.state_timer_matrix[sma.current_state] - if event_id == sma.hardware.channels.events_positions.Tup: - if not (this_state_timer_transition == sma.current_state): - if sma.use_255_back_signal and this_state_timer_transition == 255: - sma.current_state = current_trial.states[-2] - else: - sma.current_state = this_state_timer_transition - - if not math.isnan(sma.current_state): - logger.debug("adding states state timer matrix") - current_trial.states.append(sma.current_state) - state_change_indexes.append(len(current_trial.events_occurrences) - 1) - transition_event_found = True - - # global timers start matrix - if not transition_event_found: - for transition in sma.global_timers.start_matrix[sma.current_state]: - if transition[0] == event_id: - if sma.use_255_back_signal and transition[1] == 255: - sma.current_state = current_trial.states[-2] - else: - sma.current_state = transition[1] - - if not math.isnan(sma.current_state): - logger.debug("adding states global timers start matrix") - current_trial.states.append(sma.current_state) - state_change_indexes.append(len(current_trial.events_occurrences) - 1) - transition_event_found = True - - # global timers end matrix - if not transition_event_found: - for transition in sma.global_timers.end_matrix[sma.current_state]: - if transition[0] == event_id: - - if sma.use_255_back_signal and transition[1] == 255: - sma.current_state = current_trial.states[-2] - else: - sma.current_state = transition[1] - - if not math.isnan(sma.current_state): - logger.debug("adding states global timers end matrix") - current_trial.states.append(sma.current_state) - state_change_indexes.append(len(current_trial.events_occurrences) - 1) - transition_event_found = True + logger.debug("Current state: %s", sma.current_state) + transition_matrices = { + 'input': sma.input_matrix, + 'state_timer': sma.state_timer_matrix, + 'global_timers_start': sma.global_timers.start_matrix, + 'global_timers_end': sma.global_timers.end_matrix, + 'global_counters': sma.global_counters.matrix, + 'conditions': sma.conditions.matrix, + } + for transition_matrix_name, transition_matrix in \ + transition_matrices.items(): + transition_event_found = \ + self.__transition_to_new_state( + sma, + event_id, + transition_matrix, + current_trial, + state_change_indexes, + is_state_timer_matrix=(transition_matrix_name == + 'state_timer'), + debug_message="Adding {} matrix states".format( + transition_matrix_name)) + if transition_event_found: + break logger.debug("States indexes: %s", current_trial.states) - + if self._emulator: + self._emulator.mirror_events(current_events) if transition_event_found and not math.isnan(sma.current_state): + if self._emulator: + self._emulator.mirror_state(sma.current_state) + self._emulator.set_state_machine(sma) + self._emulator.reset_current_state() + self._emulator.set_state_start_time( + self._emulator.current_time) + self._emulator.set_global_timer_end_time() + self._emulator.cancel_global_timers() + self._emulator.reset_global_counter_counts() + self._emulator.set_softcode( + self.hardware.channels.events_positions.output_USB) state_name = sma.state_names[sma.current_state] self._session += StateTransition(state_name, event_timestamp) + elif math.isnan(sma.current_state): + if self._emulator: + state_change_indexes.append( + len(current_trial.events_occurrences) - 1) + return False elif opcode == 2: # Handle soft code self._session += SoftcodeOccurrence(data) self.softcode_handler_function(data) + return True + def __update_timestamps(self, sma, state_change_indexes): """ Read timestamps from Bpod and update state machine info @@ -559,29 +650,45 @@ def __update_timestamps(self, sma, state_change_indexes): :param list state_change_indexes: """ - current_trial = self.session.current_trial - current_trial.trial_start_timestamp = self.trial_start_timestamp # start timestamp of first trial - current_trial.bpod_start_timestamp = self.bpod_start_timestamp + def add_session_info(trial_start_timestamp, trial_end_timestamp): + self.session += SessionInfo( + self.session.INFO_TRIAL_BPODTIME, + trial_end_timestamp - trial_start_timestamp, + start_time=trial_start_timestamp, + end_time=trial_end_timestamp + ) - trial_end_timestamp, discrepancy = self._bpodcom_read_timestamps() - current_trial.trial_end_timestamp = trial_end_timestamp + current_trial = self.session.current_trial + if self._emulator: + trial_start_timestamp = \ + self._emulator.matrix_start_time + timestamps = self._emulator.timestamps[ + :self._emulator.n_events] + trial_end_timestamp = \ + trial_start_timestamp + \ + timestamps[-1] + current_trial.bpod_start_timestamp = trial_start_timestamp + current_trial.trial_start_timestamp = trial_start_timestamp + current_trial.trial_end_timestamp = trial_end_timestamp + add_session_info(current_trial.trial_start_timestamp, trial_end_timestamp) + else: + current_trial.trial_start_timestamp = self.trial_start_timestamp # start timestamp of first trial + current_trial.bpod_start_timestamp = self.bpod_start_timestamp - self.session += SessionInfo( - self.session.INFO_TRIAL_BPODTIME, - trial_end_timestamp-self.trial_start_timestamp, - start_time=self.trial_start_timestamp, - end_time=trial_end_timestamp - ) + trial_end_timestamp, discrepancy = self._bpodcom_read_timestamps() + current_trial.trial_end_timestamp = trial_end_timestamp - if discrepancy > 1: - self.session += WarningMessage("Bpod missed hardware update deadline(s) on the past trial by ~{milliseconds}ms".format(milliseconds=discrepancy)) + add_session_info(current_trial.trial_start_timestamp, trial_end_timestamp) - if self.hardware.live_timestamps: - timestamps = self.trial_timestamps - else: - timestamps = self._bpodcom_read_alltimestamps() - timestamps = [float(t)*self._hardware.times_scale_factor for t in timestamps] + if discrepancy > 1: + self.session += WarningMessage("Bpod missed hardware update deadline(s) on the past trial by ~{milliseconds}ms".format(milliseconds=discrepancy)) + if self.hardware.live_timestamps: + timestamps = self.trial_timestamps + else: + timestamps = self._bpodcom_read_alltimestamps() + timestamps = [float(t) * self._hardware.times_scale_factor for t in timestamps] + if (self._emulator) or not self.hardware.live_timestamps: # update the timestamps of the events ############################################################# for event, timestamp in zip(current_trial.events_occurrences, timestamps): event.host_timestamp = timestamp diff --git a/pybpodapi/bpod/bpod_com_protocol.py b/pybpodapi/bpod/bpod_com_protocol.py index 6404367..ea96f7a 100644 --- a/pybpodapi/bpod/bpod_com_protocol.py +++ b/pybpodapi/bpod/bpod_com_protocol.py @@ -30,8 +30,8 @@ class BpodCOMProtocol(BpodBase): """ - def __init__(self, serial_port=None, sync_channel=None, sync_mode=None): - super(BpodCOMProtocol, self).__init__(serial_port, sync_channel, sync_mode) + def __init__(self, serial_port=None, sync_channel=None, sync_mode=None, emulator_mode=False): + super(BpodCOMProtocol, self).__init__(serial_port, sync_channel, sync_mode, emulator_mode=emulator_mode) self._arcom = None # type: ArCOM self.bpod_com_ready = False @@ -39,7 +39,7 @@ def __init__(self, serial_port=None, sync_channel=None, sync_mode=None): # used to keep the list of msg ids sent using the load_serial_message function self.msg_id_list = [False for i in range(255)] - if self.serial_port: + if self.serial_port and not emulator_mode: self.open() def open(self): @@ -63,27 +63,32 @@ def manual_override(self, channel_type, channel_name, channel_number, value): """ if channel_type == ChannelType.INPUT: input_channel_name = channel_name + str(channel_number) - channel_number = self.hardware.channels.input_channel_names.index(input_channel_name) try: - self._bpodcom_override_input_state(channel_number, value) + channel_number = self.hardware.channels.input_channel_names.index( + input_channel_name) + self.trigger_input(channel_number, value) except: raise BpodErrorException( 'Error using manual_override: {name} is not a valid channel name.'.format(name=channel_name)) elif channel_type == ChannelType.OUTPUT: - if channel_name == 'Serial': - self._bpodcom_send_byte_to_hardware_serial(channel_number, value) - + if channel_name == 'SoftCode': + self.trigger_softcode(value) + elif channel_name == 'Serial': + self.trigger_serial(channel_number, value) else: + output_channel_name = channel_name + \ + str(channel_number) try: - output_channel_name = channel_name + str(channel_number) - channel_number = self.hardware.channels.output_channel_names.index(output_channel_name) - self._bpodcom_override_digital_hardware_state(channel_number, value) + channel_number = self.hardware.channels.output_channel_names.index( + output_channel_name) + self.trigger_output(channel_number, value) except: raise BpodErrorException('Error using manual_override: {name} is not a valid channel name.'.format( name=output_channel_name)) else: - raise BpodErrorException('Error using manualOverride: first argument must be "Input" or "Output".') + raise BpodErrorException( + 'Error using manualOverride: first argument must be "Input" or "Output".') def _bpodcom_connect(self, serial_port, baudrate=115200, timeout=1): """ @@ -250,18 +255,7 @@ def _bpodcom_enable_ports(self, hardware): :rtype: bool """ - ###### set inputs enabled or disabled ####################################################### - hardware.inputs_enabled = [0] * len(hardware.inputs) - - for j, i in enumerate(hardware.bnc_inputports_indexes): - hardware.inputs_enabled[i] = settings.BPOD_BNC_PORTS_ENABLED[j] - - for j, i in enumerate(hardware.wired_inputports_indexes): - hardware.inputs_enabled[i] = settings.BPOD_WIRED_PORTS_ENABLED[j] - - for j, i in enumerate(hardware.behavior_inputports_indexes): - hardware.inputs_enabled[i] = settings.BPOD_BEHAVIOR_PORTS_ENABLED[j] - ############################################################################################# + hardware.configure_inputs() logger.debug("Requesting ports enabling (%s)", SendMessageHeader.ENABLE_PORTS) logger.debug("Inputs enabled (%s): %s", len(hardware.inputs_enabled), hardware.inputs_enabled) diff --git a/pybpodapi/bpod/bpod_io.py b/pybpodapi/bpod/bpod_io.py index 5447d79..4e96b90 100644 --- a/pybpodapi/bpod/bpod_io.py +++ b/pybpodapi/bpod/bpod_io.py @@ -20,11 +20,10 @@ class BpodIO(BpodCOMProtocolModules): """ Bpod I/O logic. """ - def __init__(self, serial_port=None, workspace_path=None, session_name=None, sync_channel=None, sync_mode=None): + def __init__(self, serial_port=None, workspace_path=None, session_name=None, sync_channel=None, sync_mode=None, emulator_mode=False): self.workspace_path = workspace_path if workspace_path is not None else settings.PYBPOD_SESSION_PATH self.session_name = session_name if session_name is not None else settings.PYBPOD_SESSION - - super(BpodIO, self).__init__(serial_port, sync_channel, sync_mode) + super(BpodIO, self).__init__(serial_port, sync_channel, sync_mode, emulator_mode) self.session += SessionInfo("This is a PYBPOD file. Find more info at http://pybpod.readthedocs.io") self.session += SessionInfo(Session.INFO_BPODAPI_VERSION, pybpodapi.__version__) diff --git a/pybpodapi/bpod/emulator/__init__.py b/pybpodapi/bpod/emulator/__init__.py new file mode 100644 index 0000000..6fd48da --- /dev/null +++ b/pybpodapi/bpod/emulator/__init__.py @@ -0,0 +1 @@ +from .emulator import Emulator diff --git a/pybpodapi/bpod/emulator/constants.py b/pybpodapi/bpod/emulator/constants.py new file mode 100644 index 0000000..e4acd47 --- /dev/null +++ b/pybpodapi/bpod/emulator/constants.py @@ -0,0 +1,97 @@ +""" +The following values must be set according to Bpod firmware specification: +https://github.com/sanworks/Bpod_StateMachine_Firmware/blob/v22/Dev/StateMachineFirmware/StateMachineFirmware.ino + +0 = USB (default), 1 = Ethernet (w/ Bpod Ethernet Module) +IMPORTANT: PC via Ethernet requires State Machine v2.0 or newer. +""" +__author__ = "Chris Karageorgiou Kaneen" + +from confapp import conf as settings + +################# +# from settings # +################# + +FIRMWARE_VERSION = int(settings.TARGET_BPOD_FIRMWARE_VERSION) +MACHINE_TYPE = int(settings.EMULATOR_BPOD_MACHINE_TYPE) + + +class MACHINE_TYPE_ENUM: + ONE = 1 + TWO = 2 + THREE = 3 + + +class SM_FEATURE_PROFILE_ENUM: + ZERO = 0 + ONE = 1 + TWO = 2 + THREE = 3 + + +class ETHERNET_COM_ENUM: + ON = 1 + OFF = 0 + + +ETHERNET_COM = ETHERNET_COM_ENUM.OFF +LIVE_TIMESTAMPS = 0 +TIMER_PERIOD = 100 +SM_FEATURE_PROFILE = { + MACHINE_TYPE_ENUM.ONE: SM_FEATURE_PROFILE_ENUM.ZERO, + MACHINE_TYPE_ENUM.TWO: SM_FEATURE_PROFILE_ENUM.ZERO, + MACHINE_TYPE_ENUM.THREE: SM_FEATURE_PROFILE_ENUM.ONE, +} +INPUT_HW = { + MACHINE_TYPE_ENUM.ONE: 'UUXBBWWWWPPPPPPPP', + MACHINE_TYPE_ENUM.TWO: 'UUUXBBWWPPPPPPPP', + MACHINE_TYPE_ENUM.THREE: { + ETHERNET_COM_ENUM.OFF: 'UUUUUXBBPPPP', + ETHERNET_COM_ENUM.ON: 'UUUUXBBPPPP', + }, +} +OUTPUT_HW = { + MACHINE_TYPE_ENUM.ONE: 'UUXBBWWWWPPPPPPPPVVVVVVVV', + MACHINE_TYPE_ENUM.TWO: 'UUUXBBWWWPPPPPPPPVVVVVVVV', + MACHINE_TYPE_ENUM.THREE: { + ETHERNET_COM_ENUM.OFF: 'UUUUUXBBPPPPVVVV', + ETHERNET_COM_ENUM.ON: 'UUUUXBBPPPPVVVV', + }, +} +HARDWARE_DESCRIPTION = { + 'MAX_STATES': { + MACHINE_TYPE_ENUM.ONE: 128, + MACHINE_TYPE_ENUM.TWO: 256, + MACHINE_TYPE_ENUM.THREE: 256, + }, + 'TIMER_PERIOD': TIMER_PERIOD, + 'MAX_SERIAL_EVENTS': { + MACHINE_TYPE_ENUM.ONE: 30, + MACHINE_TYPE_ENUM.TWO: 60, + MACHINE_TYPE_ENUM.THREE: { + ETHERNET_COM_ENUM.OFF: 90, + ETHERNET_COM_ENUM.ON: 75, + }, + }, + 'MAX_GLOBAL_TIMERS': { + SM_FEATURE_PROFILE_ENUM.ZERO: 5, + SM_FEATURE_PROFILE_ENUM.ONE: 16, + SM_FEATURE_PROFILE_ENUM.TWO: 8, + SM_FEATURE_PROFILE_ENUM.THREE: 20, + }, + 'MAX_GLOBAL_COUNTERS': { + SM_FEATURE_PROFILE_ENUM.ZERO: 5, + SM_FEATURE_PROFILE_ENUM.ONE: 8, + SM_FEATURE_PROFILE_ENUM.TWO: 2, + SM_FEATURE_PROFILE_ENUM.THREE: 2, + }, + 'MAX_CONDITIONS': { + SM_FEATURE_PROFILE_ENUM.ZERO: 5, + SM_FEATURE_PROFILE_ENUM.ONE: 16, + SM_FEATURE_PROFILE_ENUM.TWO: 8, + SM_FEATURE_PROFILE_ENUM.THREE: 20, + }, + 'INPUT_HW': INPUT_HW, + 'OUTPUT_HW': OUTPUT_HW, +} diff --git a/pybpodapi/bpod/emulator/emulator.py b/pybpodapi/bpod/emulator/emulator.py new file mode 100644 index 0000000..66df8a4 --- /dev/null +++ b/pybpodapi/bpod/emulator/emulator.py @@ -0,0 +1,675 @@ +""" + +.. moduleauthor:: Chris Karageorgiou Kaneen + +emulator: An implementation of an emulated Bpod device. + +""" +__author__ = "Chris Karageorgiou Kaneen" + +import logging +import math +import queue +import time + +from . import constants as const +from .state import State +from AnyQt import QtCore +from AnyQt import QtNetwork +from PyQt5 import sip +from pybpodapi.bpod_modules.bpod_modules import BpodModules + +logger = logging.getLogger(__name__) + + +# N_VIRTUAL_CURRENT_EVENTS = 10 +N_TIMESTAMPS = 10000 +UART_CODE = 'U' +USB_CODE = 'X' +BNC_CODE = 'B' +WIRE_CODE = 'W' +PORTS_CODE = 'P' +SERIAL_EVENT_CODE = 'S' +INPUT_CODE = 'I' +GLOBAL_TIMER_CODE = 'T' +GLOBAL_COUNTER_CODE = '+' +CONDITION_CODE = 'C' + +CONDITION_EVENT_NAME_PREFIX = 'Condition' +SOFTCODE_OUTPUT_ACTION = 'SoftCode' +GLOBAL_TIMER_TRIG_OUTPUT_ACTION = 'GlobalTimerTrig' +GLOBAL_TIMER_CANCEL_OUTPUT_ACTION = 'GlobalTimerCancel' +GLOBAL_COUNTER_RESET_OUTPUT_ACTION = 'GlobalCounterReset' +FINAL_GLOBAL_COUNTER_EVENT = 254 +FINAL_EVENT = 255 +FINAL_CHANNEL = 255 + + +class EmulatorError(Exception): + pass + + +def emulator_init_check(func): + def wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + except AttributeError: + message = 'The emulator initialize() has not been called.' + logger.error(message) + raise EmulatorError(message) + return wrapper + + +class Emulator: + """A class for emulating the Bpod device.""" + + GUI_PLUGIN_SERVER_NAME = 'emulator_gui_server' + + _GLOBAL_TIMER_START_OFFSET = None + _GLOBAL_TIMER_END_OFFSET = None + _GLOBAL_COUNTER_OFFSET = None + _CONDITION_OFFSET = None + _JUMP_OFFSET = None + _CONDITION_EVENT_NAME = None + _SOCKET_WAIT_FOR_CONNECTED_TIMEOUT = 500 + + def __init__(self, hardware): + self._init_hardware(hardware) + self._state = State(self.hardware) + self._manual_override_events = queue.Queue() + self._socket = None + + @property + def socket(self): + if self._socket is None or sip.isdeleted(self._socket): + self._socket = QtNetwork.QLocalSocket() + return self._socket + + @property + def hardware(self): + return self._hardware + + @property + @emulator_init_check + def matrix_start_time(self): + return self._matrix_start_time + + @property + @emulator_init_check + def timestamps(self): + return self._timestamps + + @property + @emulator_init_check + def n_events(self): + return self._n_events + + def _init_hardware(self, hardware): + self._hardware = hardware + self._setup_hardware_description() + self._hardware.configure_inputs() + self._setup_hardware_channels() + self._setup_hardware_event_types() + self._set_hardware_constants() + + def _set_hardware_constants(self): + n_event_names = len(self.hardware.channels.event_names) + n_extra_events = self.hardware.n_global_timers * 2 + \ + self.hardware.n_global_counters + self.hardware.n_conditions + # Subtract global timer/counter/condition and tup events + input_matrix_size = n_event_names - (n_extra_events + 1) + self._GLOBAL_TIMER_START_OFFSET = input_matrix_size + self._GLOBAL_TIMER_END_OFFSET = self._GLOBAL_TIMER_START_OFFSET + \ + self.hardware.n_global_timers + self._GLOBAL_COUNTER_OFFSET = self._GLOBAL_TIMER_END_OFFSET + \ + self.hardware.n_global_timers + self._CONDITION_OFFSET = self._GLOBAL_COUNTER_OFFSET + \ + self.hardware.n_global_counters + self._JUMP_OFFSET = self._CONDITION_OFFSET + \ + self.hardware.n_conditions + self._CONDITION_EVENT_NAME = [ + f'{CONDITION_EVENT_NAME_PREFIX}{str(i + 1)}' + for i in range(self.hardware.n_conditions)] + + def _setup_hardware_description(self): + HW_DESCR = const.HARDWARE_DESCRIPTION + machine_type = const.MACHINE_TYPE + self.hardware.firmware_version = const.FIRMWARE_VERSION + self.hardware.machine_type = machine_type + self.hardware.max_states = HW_DESCR['MAX_STATES'][machine_type] + self.hardware.cycle_period = HW_DESCR['TIMER_PERIOD'] + if machine_type == const.MACHINE_TYPE_ENUM.THREE: + self.hardware.max_serial_events = \ + HW_DESCR['MAX_SERIAL_EVENTS'][machine_type][const.ETHERNET_COM] + self.hardware.inputs = \ + HW_DESCR['INPUT_HW'][machine_type][const.ETHERNET_COM] + self.hardware.outputs = \ + HW_DESCR['OUTPUT_HW'][machine_type][const.ETHERNET_COM] + else: + self.hardware.max_serial_events = \ + HW_DESCR['MAX_SERIAL_EVENTS'][machine_type] + self.hardware.inputs = \ + HW_DESCR['INPUT_HW'][machine_type] + self.hardware.outputs = \ + HW_DESCR['OUTPUT_HW'][machine_type] + sm_feature_profile = const.SM_FEATURE_PROFILE[machine_type] + self.hardware.n_global_timers = HW_DESCR[ + 'MAX_GLOBAL_TIMERS'][sm_feature_profile] + self.hardware.n_global_counters = HW_DESCR[ + 'MAX_GLOBAL_COUNTERS'][sm_feature_profile] + self.hardware.n_conditions = HW_DESCR[ + 'MAX_CONDITIONS'][sm_feature_profile] + self.hardware.n_inputs = len(self.hardware.inputs) + self.hardware.n_outputs = len(self.hardware.outputs) + + def _setup_hardware_channels(self): + self.bpod_modules = BpodModules(self) + input_modules = [inp for inp in self.hardware.inputs + if inp == UART_CODE] + n_modules = len(input_modules) + n_serial_events = int( + self.hardware.max_serial_events / (n_modules + 1)) + connected = False + module_name = None + firmware_version = None + events_names = [] + for i in range(n_modules): + self.bpod_modules += BpodModules.create_module( + connected, + module_name, + firmware_version, + events_names, + n_serial_events, + serial_port=(i + 1), + ) + self.hardware.setup(self.bpod_modules) + + def _setup_hardware_event_types(self): + n_bnc_inputs = self.hardware.inputs.count(BNC_CODE) + n_wire_inputs = self.hardware.inputs.count(WIRE_CODE) + n_ports = self.hardware.outputs.count(PORTS_CODE) + n_digital_inputs = n_bnc_inputs + n_wire_inputs + n_ports + self.hardware_event_types = SERIAL_EVENT_CODE * \ + self.hardware.max_serial_events + INPUT_CODE * \ + n_digital_inputs * 2 + GLOBAL_TIMER_CODE * \ + self.hardware.n_global_timers * 2 + GLOBAL_COUNTER_CODE * \ + self.hardware.n_global_counters + CONDITION_CODE * \ + self.hardware.n_conditions + UART_CODE + + def _output_value(self, output_action=None, output_code=None): + if output_action is None and output_code is None: + message = 'Must pass an output action or output_code.' + logger.error(message) + raise EmulatorError(message) + if output_action is not None: + output_code = \ + self.hardware.channels.output_channel_names.index( + output_action) + for output_code_, output_value in self._state_machine.output_matrix[ + self._current_state]: + if output_code_ == output_code: + return output_value + return 0 + + def _assert_state_machine(self): + if self._state_machine is None: + message = 'State machine has not been set.' + logger.error(message) + raise EmulatorError(message) + + def _set_global_timer_channel(self, channel, on): + this_channel = self._state_machine.global_timers.channels[channel] + if this_channel < FINAL_CHANNEL: + self._state.output[this_channel] = on + + def _send_output_to_gui_server(self, channel_number, value): + self.socket.connectToServer( + self.GUI_PLUGIN_SERVER_NAME, QtCore.QIODevice.WriteOnly) + if self.socket.waitForConnected( + self._SOCKET_WAIT_FOR_CONNECTED_TIMEOUT): + output_channel_name = self._hardware.channels.output_channel_names[ + channel_number] + message = f'{output_channel_name}:{value}' + self.socket.write(message.encode('utf-8')) + if not self.socket.waitForBytesWritten(2000): + error_message = \ + f'Could not write to socket: {self.socket.errorString()}' + logger.error(error_message) + raise EmulatorError(error_message) + self.socket.disconnectFromServer() + elif self.socket.error() != \ + QtNetwork.QAbstractSocket.HostNotFoundError: + error_message = \ + f'Could not connect to server: {self.socket.errorString()}' + logger.error(error_message) + raise EmulatorError(error_message) + else: + logger.error('Emulator gui plugin server is down.') + + def set_state_machine(self, state_machine): + self._state_machine = state_machine + logger.debug('State machine set.') + + def set_state_start_time(self, time): + self._state_start_time = time + logger.debug(f'State start time set to: {self._state_start_time}') + + def set_global_timer_end_time(self, set_global_channel_timer=False): + this_global_timer = self._output_value( + GLOBAL_TIMER_TRIG_OUTPUT_ACTION) + if this_global_timer != 0: + timer_idx = this_global_timer - 1 + if self._state_machine.global_timers.on_set_delays[timer_idx] == 0: + self._global_timer_end[timer_idx] = self.current_time + \ + self._state_machine.global_timers.timers[timer_idx] + self._global_timers_active[timer_idx] = True + self._global_timers_triggered[timer_idx] = False + if set_global_channel_timer: + self._set_global_timer_channel(this_global_timer, 1) + else: + self._global_timer_start[timer_idx] = self.current_time + \ + self._state_machine.global_timers.on_set_delays[timer_idx] + self._global_timer_end[timer_idx] = self._global_timer_start[ + timer_idx] + self._state_machine.global_timers.timers[ + timer_idx] + self._global_timers_triggered[timer_idx] = True + logger.debug('Global timer end time set.') + + def set_softcode(self, output_code): + self._softcode = self._output_value( + output_code=output_code) + logger.debug(f'Softcode set to: {self._softcode}') + + def reset_current_state(self): + self._assert_state_machine() + self._current_state = self._state_machine.current_state + logger.debug(f'Current state reset to: {self._current_state}') + + def reset_global_counter_counts(self): + this_global_counter = self._output_value( + GLOBAL_COUNTER_RESET_OUTPUT_ACTION) + if this_global_counter != 0: + self._global_counter_counts[this_global_counter - 1] = 0 + logger.debug('Global counter counts reset.') + + def cancel_global_timers(self): + this_global_timer = self._output_value( + GLOBAL_TIMER_CANCEL_OUTPUT_ACTION) + if this_global_timer != 0: + self._global_timers_active[this_global_timer - 1] = False + logger.debug('Global timers cancelled.') + + def initialize(self): + self._assert_state_machine() + self._n_events = 0 + self._current_state = 0 + self._global_timer_start = [0] * self.hardware.n_global_timers + self._global_timer_end = [0] * self.hardware.n_global_timers + self._global_timers_triggered = [ + False] * self.hardware.n_global_timers + self._global_timers_active = [False] * self.hardware.n_global_timers + self._global_counter_counts = [0] * self.hardware.n_global_counters + self._condition_channels = [0] * self.hardware.n_conditions + self._condition_values = [0] * self.hardware.n_conditions + self._timestamps = [0] * N_TIMESTAMPS + self._meaningful_timer = [] + for tup in list(zip( + self._state_machine.state_timer_matrix, + range(self._state_machine.total_states_added))): + self._meaningful_timer.append(tup[0] != tup[1]) + self.current_time = time.time() + self._matrix_start_time = self.current_time + self._state_start_time = self.current_time + self._softcode = self._output_value(SOFTCODE_OUTPUT_ACTION) + # Set global timer end-time (if triggered in first state) + self.set_global_timer_end_time(set_global_channel_timer=True) + + def run(self): + self._assert_state_machine() + virtual_current_events = [] + # MATLAB EQUIVALENT: + # virtual_current_events = [0] * N_VIRTUAL_CURRENT_EVENTS + opcode, data = None, None + if self._softcode == 0: + self.current_time = time.time() + self._n_current_events = 0 + # Add oldest manual override event to current events + if not self._manual_override_events.empty(): + virtual_current_events.append( + self._manual_override_events.get()) + self._n_current_events += 1 + # Evaluate global timer transitions + for i in range(self.hardware.n_global_timers): + if self._global_timers_active[i]: + if self.current_time > self._global_timer_end[i]: + virtual_current_events.append( + self._GLOBAL_TIMER_END_OFFSET + i) + self._n_current_events += 1 + self._global_timers_active[i] = False + # MATLAB version has this commented out: + # self._set_global_timer_channel(i, 0) + elif self._global_timers_triggered[i] and \ + not self._global_timers_active[i] and \ + self.current_time > self._global_timer_start[i]: + virtual_current_events.append( + self._GLOBAL_TIMER_START_OFFSET + i) + self._n_current_events += 1 + self._global_timers_active[i] = True + self._global_timers_triggered[i] = False + # MATLAB version has this commented out: + # self._set_global_timer_channel(i, 1) + # Evaluate global counter transitions + for i in range(self.hardware.n_global_counters): + sma_global_counters = self._state_machine.global_counters + if sma_global_counters.attached_events[i] != \ + FINAL_GLOBAL_COUNTER_EVENT: + if self._global_counter_counts[i] == \ + sma_global_counters.thresholds[i]: + virtual_current_events.append( + self._GLOBAL_COUNTER_OFFSET + i) + self._n_current_events += 1 + if virtual_current_events and \ + virtual_current_events[0] == \ + sma_global_counters.attached_events[i]: + self._global_counter_counts[i] += 1 + # Evaluate condition transitions + for i in range(self.hardware.n_conditions): + sma_conditions = self._state_machine.conditions + condition_event_code = \ + self.hardware.channels.event_names.index( + self._CONDITION_EVENT_NAME[i]) + target_state = None + for event_code, destination_state in sma_conditions.matrix[ + self._current_state]: + if event_code == condition_event_code: + target_state = destination_state + break + if target_state and target_state != self._current_state: + this_channel = sma_conditions.channels[i] + hw_state = self._state.input[this_channel] + if hw_state == sma_conditions.values[i]: + virtual_current_events.append( + self._CONDITION_OFFSET + i) + self._n_current_events += 1 + # Evaluate state timer transitions + time_in_state = self.current_time - self._state_start_time + state_timer = self._state_machine.state_timers[self._current_state] + if time_in_state > state_timer and \ + self._meaningful_timer[self._current_state]: + hardware_state_timer_position = \ + self.hardware_event_types.index(UART_CODE) + virtual_current_events.append( + hardware_state_timer_position) + self._n_current_events += 1 + if virtual_current_events: + dominant_event = virtual_current_events[0] + if dominant_event > 0: + opcode = 1 + data = virtual_current_events + total_n_events = self._n_events + self._n_current_events + self._timestamps[ + self._n_events:total_n_events + 1] = \ + [self.current_time - self._matrix_start_time] * \ + self._n_current_events + self._n_events = total_n_events + else: + opcode = 2 + data = self._softcode + self._softcode = 0 + return opcode, data + + def mirror_state(self, state): + if state is None: + self._state.clear_input() + self._state.clear_output() + # TODO: Do the BpodSystem.RefreshGUI equivalent + else: + # Add outputs that have not been overridden to output state + for new_output_channel, new_output_value in \ + self._state_machine.output_matrix[state]: + if new_output_channel not in self._state.output: + self._state.output[new_output_channel] = new_output_value + self._send_output_to_gui_server( + new_output_channel, new_output_value) + logger.debug('State mirrored.') + + def mirror_events(self, events): + for event in events: + this_event = event + if this_event != FINAL_EVENT: + event_type = self.hardware_event_types[this_event] + if event_type == INPUT_CODE: + io_event_start_position = self.hardware_event_types.index( + INPUT_CODE) + n_uart_serial_channels = self.hardware.outputs.count( + UART_CODE) + n_usb_channels = self.hardware.outputs.count(USB_CODE) + n_serial_channels = n_uart_serial_channels + n_usb_channels + p = ((this_event - io_event_start_position) / 2) + \ + n_serial_channels + this_channel = math.floor(p) + is_odd = p % 1 + if is_odd == 0: + self._state.input[this_channel] = 1 + else: + self._state.input[this_channel] = 0 + elif event_type == GLOBAL_TIMER_CODE: + global_timer_start_position = \ + self.hardware_event_types.index(GLOBAL_TIMER_CODE) + timer_event = this_event - global_timer_start_position + if timer_event <= self.hardware.n_global_timers: + timer_number = timer_event + event_type = 1 # on + else: + timer_number = timer_event - \ + self.hardware.n_global_timers + event_type = 0 # off + timer_number_idx = timer_number - 1 + if self._state_machine.global_timers.channels[ + timer_number_idx] < FINAL_CHANNEL: + output_channel = \ + self._state_machine.global_timers.channels[ + timer_number_idx] + output_channel_type = self.hardware.outputs[ + output_channel] + if output_channel_type in \ + (BNC_CODE, WIRE_CODE, PORTS_CODE): + self._state.output[ + output_channel] = event_type + logger.debug('Events mirrored.') + + def override_input_state(self, channel_number, channel_value): + self._state.input[channel_number] = channel_value + event_type = self._state.input_type[channel_number] + if event_type not in (UART_CODE, USB_CODE): + event_bnc_position = \ + self._hardware.channels.events_positions.Event_BNC + output_usb_position = \ + self._hardware.channels.events_positions.output_USB + channel_value_offset = 0 + if channel_value > 0: + channel_value_offset = 1 + new_event = event_bnc_position - 1 + \ + 2 * (channel_number - output_usb_position) - \ + channel_value_offset + self._manual_override_events.put(new_event) + + def serial_override(self, channel_number, channel_value): + self._state.output[channel_number] = channel_value + + def add_manual_override_event(self, event): + hardware_channels = self._hardware.channels + if isinstance(event, int): + try: + hardware_channels.get_event_name(event) + except IndexError as e: + raise EmulatorError(str(e)) + else: + event_code = event + else: + if event not in hardware_channels.event_names: + raise EmulatorError('unknown event name') + event_code = hardware_channels.event_names.index(event) + self._manual_override_events.put(event_code) + + def log_state_machine_info(self): + self._assert_state_machine() + sma = self._state_machine + + def get_max_index_used(sma_global): + max_idx = sma_global.get_max_index_used() + return 0 if max_idx is None else max_idx + 1 + highest_used_global_counter = get_max_index_used(sma.global_counters) + highest_used_global_timer = get_max_index_used(sma.global_timers) + highest_used_global_condition = get_max_index_used( + sma.conditions) + # State timer matrix (state timer transitions) + state_timer_matrix = [] + for i in range(sma.total_states_added): + timer = sma.state_timer_matrix[i] + if math.isnan(timer): + timer = sma.total_states_added + state_timer_matrix.append(timer) + logger.debug("STATE TIMER MATRIX: %s", state_timer_matrix) + # Input matrix (event-triggered transitions) + input_matrix = [] + for i in range(sma.total_states_added): + state_transitions = sma.input_matrix[i] + n_transitions = len(state_transitions) + input_matrix += [n_transitions] + for transition in state_transitions: + input_matrix += [transition[0]] + dest_state = transition[1] + if math.isnan(dest_state): + dest_state = sma.total_states_added + input_matrix.append(dest_state) + logger.debug("INPUT MATRIX: %s", input_matrix) + # Output matrix (hardware states) + output_matrix = [] + global_timer_trigger_event_pos = \ + self.hardware.channels.events_positions.globalTimerTrigger + for i in range(sma.total_states_added): + hw_state = sma.output_matrix[i] + pos = global_timer_trigger_event_pos + hw_state = [evt for evt in hw_state if evt[0] < pos] + n_differences = len(hw_state) + output_matrix += [n_differences] + for hw_conf in hw_state: + output_matrix += hw_conf[:2] + logger.debug("OUTPUT MATRIX: %s", output_matrix) + # Global timer start matrix (global timer-start triggered transitions) + global_timer_start_matrix = [] + global_timer_start_event_pos = \ + self.hardware.channels.events_positions.globalTimerStart + for i in range(sma.total_states_added): + state_transitions = sma.global_timers.start_matrix[i] + n_transitions = len(state_transitions) + global_timer_start_matrix += [n_transitions] + for transition in state_transitions: + dest_state = transition[1] + global_timer_start_matrix += [ + transition[0] - global_timer_start_event_pos] + if math.isnan(dest_state): + dest_state = sma.total_states_added + global_timer_start_matrix.append(dest_state) + logger.debug("GLOBAL_TIMER_START_MATRIX: %s", + global_timer_start_matrix) + + # Global timer end matrix (global timer-end triggered transitions) + global_timer_end_matrix = [] + global_timer_end_event_pos = \ + self.hardware.channels.events_positions.globalTimerEnd + for i in range(sma.total_states_added): + state_transitions = sma.global_timers.end_matrix[i] + n_transitions = len(state_transitions) + global_timer_end_matrix += [n_transitions] + for transition in state_transitions: + dest_state = transition[1] + global_timer_end_matrix += [ + transition[0] - global_timer_end_event_pos] + if math.isnan(dest_state): + dest_state = sma.total_states_added + global_timer_end_matrix.append(dest_state) + logger.debug("GLOBAL_TIMER_END_MATRIX: %s", global_timer_end_matrix) + # Global counter matrix (global counter triggered transitions) + global_counter_matrix = [] + global_counter_event_pos = \ + self.hardware.channels.events_positions.globalCounter + for i in range(sma.total_states_added): + state_transitions = sma.global_counters.matrix[i] + n_transitions = len(state_transitions) + global_counter_matrix += [n_transitions] + for transition in state_transitions: + dest_state = transition[1] + global_counter_matrix += [ + transition[0] - global_counter_event_pos] + if math.isnan(dest_state): + dest_state = sma.total_states_added + global_counter_matrix.append(dest_state) + logger.debug("GLOBAL_COUNTER_MATRIX: %s", global_counter_matrix) + + # Condition matrix (condition triggered transitions) + condition_matrix = [] + condition_event_pos = self.hardware.channels.events_positions.condition + for i in range(sma.total_states_added): + state_transitions = sma.conditions.matrix[i] + n_transitions = len(state_transitions) + condition_matrix += [n_transitions] + for transition in state_transitions: + dest_state = transition[1] + condition_matrix += [ + transition[0] - condition_event_pos] + if math.isnan(dest_state): + dest_state = sma.total_states_added + condition_matrix.append(dest_state) + logger.debug("CONDITION_MATRIX: %s", condition_matrix) + # Global timer channels + global_timer_channels = [] + for i in range(highest_used_global_timer): + global_timer_channels += [sma.global_timers.channels[i]] + logger.debug("GLOBAL_TIMER_CHANNELS: %s", global_timer_channels) + # Global timer on messages + global_timer_on_messages = [] + for i in range(highest_used_global_timer): + v = sma.global_timers.on_messages[i] + global_timer_on_messages += [255 if v == 0 else v] + logger.debug("GLOBAL_TIMER_ON_MESSAGES: %s", global_timer_on_messages) + # Global timer off messages + global_timer_off_messages = [] + for i in range(highest_used_global_timer): + v = sma.global_timers.off_messages[i] + global_timer_off_messages += [255 if v == 0 else v] + logger.debug("GLOBAL_ TIMER_OFF_MESSAGES: %s", + global_timer_off_messages) + # Global timer loop mode + global_timer_loop_mode = [] + for i in range(highest_used_global_timer): + global_timer_loop_mode += [sma.global_timers.loop_mode[i]] + logger.debug("GLOBAL_TIMER_LOOP_MODE: %s", global_timer_loop_mode) + # Global timer events + global_timer_events = [] + for i in range(highest_used_global_timer): + global_timer_events += [sma.global_timers.send_events[i]] + logger.debug("GLOBAL_TIMER_EVENTS: %s", global_timer_events) + # Global counter attached events + global_counter_attached_events = [] + for i in range(highest_used_global_counter): + global_counter_attached_events += [ + sma.global_counters.attached_events[i]] + logger.debug("GLOBAL_COUNTER_ATTACHED_EVENTS: %s", + global_counter_attached_events) + # Conditions channels + conditions_channels = [] + for i in range(highest_used_global_condition): + conditions_channels += [sma.conditions.channels[i]] + logger.debug("CONDITIONS_CHANNELS: %s", conditions_channels) + # Conditions values + conditions_values = [] + for i in range(highest_used_global_condition): + conditions_values += [sma.conditions.values[i]] + logger.debug("CONDITIONS VALUES: %s", conditions_values) + # Global counter resets + global_counter_resets = [] + for i in range(sma.total_states_added): + global_counter_resets += [sma.global_counters.reset_matrix[i]] + logger.debug("GLOBAL_COUNTER_RESETS: %s", global_counter_resets) diff --git a/pybpodapi/bpod/emulator/state.py b/pybpodapi/bpod/emulator/state.py new file mode 100644 index 0000000..b5dcb7c --- /dev/null +++ b/pybpodapi/bpod/emulator/state.py @@ -0,0 +1,45 @@ +class StateDict(dict): + def __getitem__(self, value): + try: + return super().__getitem__(value) + except KeyError: + return 0 # default (in/out)put value + + +class State: + + def __init__(self, hardware): + self._reset_input(hardware.inputs) + self._reset_output(hardware.outputs) + + @property + def input(self): + return self._input + + @property + def output(self): + return self._output + + @property + def input_type(self): + return self._input_type + + @property + def output_type(self): + return self._output_type + + def _reset_input(self, inputs): + self._input_type = inputs + self.clear_input() + + def _reset_output(self, outputs): + self._output_type = outputs + self.clear_output() + + def clear_input(self): + self._input = StateDict() # dict(input code -> input value) + # In MATLAB: [0] * hardware.n_inputs + + def clear_output(self): + self._output = StateDict() # dict(output code -> output value) + # In MATLAB: [0] * (hardware.n_outputs + 3) diff --git a/pybpodapi/bpod/hardware/hardware.py b/pybpodapi/bpod/hardware/hardware.py index 081983f..b79242f 100644 --- a/pybpodapi/bpod/hardware/hardware.py +++ b/pybpodapi/bpod/hardware/hardware.py @@ -3,6 +3,7 @@ import logging +from confapp import conf as settings from pybpodapi.bpod.hardware.channels import Channels logger = logging.getLogger(__name__) @@ -65,6 +66,19 @@ def setup(self, modules): logger.debug(str(self)) + def configure_inputs(self): + """ Set inputs enabled or disabled """ + self.inputs_enabled = [0] * len(self.inputs) + + for j, i in enumerate(self.bnc_inputports_indexes): + self.inputs_enabled[i] = settings.BPOD_BNC_PORTS_ENABLED[j] + + for j, i in enumerate(self.wired_inputports_indexes): + self.inputs_enabled[i] = settings.BPOD_WIRED_PORTS_ENABLED[j] + + for j, i in enumerate(self.behavior_inputports_indexes): + self.inputs_enabled[i] = settings.BPOD_BEHAVIOR_PORTS_ENABLED[j] + def __str__(self): return ( "Hardware Configuration\n" diff --git a/pybpodapi/settings.py b/pybpodapi/settings.py index 773f883..8c2b5c7 100644 --- a/pybpodapi/settings.py +++ b/pybpodapi/settings.py @@ -19,6 +19,7 @@ # TARGET_BPOD_FIRMWARE_VERSION = "15" # 0.8 # TARGET_BPOD_FIRMWARE_VERSION = "17" # 0.9 TARGET_BPOD_FIRMWARE_VERSION = "22" +EMULATOR_BPOD_MACHINE_TYPE = 3 PYBPOD_SERIAL_PORT = None PYBPOD_NET_PORT = None