Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add emulator support #5

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions examples/emulator_examples/10_state_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from pybpodapi.protocol import Bpod, StateMachine
my_bpod = Bpod(emulator_mode=True)
sma = StateMachine(my_bpod)
for i in range(10):
sma.add_state(
state_name='State{}'.format(i),
state_timer=1,
state_change_conditions={Bpod.Events.Tup: 'State{}'.format(i + 1)},
output_actions=[])
sma.add_state(
state_name='State10',
state_timer=1,
state_change_conditions={Bpod.Events.Tup: 'exit'},
output_actions=[])
my_bpod.send_state_machine(sma)
my_bpod.run_state_machine(sma)
20 changes: 20 additions & 0 deletions examples/emulator_examples/10_state_example_w_global_timer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from pybpodapi.protocol import Bpod, StateMachine
my_bpod = Bpod(emulator_mode=True)
sma = StateMachine(my_bpod)
# Set global timer 1 for 3 seconds, following a 1.5 second onset delay after
# trigger. Link to LED of port 2.
sma.set_global_timer(timer_id=1, timer_duration=0, on_set_delay=1.5,
channel=Bpod.OutputChannels.PWM2, on_message=255)
for i in range(10):
sma.add_state(
state_name='State{}'.format(i),
state_timer=1,
state_change_conditions={Bpod.Events.Tup: 'State{}'.format(i + 1)},
output_actions=[])
sma.add_state(
state_name='State10',
state_timer=1,
state_change_conditions={Bpod.Events.Tup: 'exit'},
output_actions=[])
my_bpod.send_state_machine(sma)
my_bpod.run_state_machine(sma)
10 changes: 10 additions & 0 deletions examples/emulator_examples/1_state_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from pybpodapi.protocol import Bpod, StateMachine
my_bpod = Bpod(emulator_mode=True)
sma = StateMachine(my_bpod)
sma.add_state(
state_name='myState',
state_timer=1,
state_change_conditions={Bpod.Events.Tup: 'exit'},
output_actions=[])
my_bpod.send_state_machine(sma)
my_bpod.run_state_machine(sma)
2 changes: 2 additions & 0 deletions examples/emulator_examples/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# !/usr/bin/python3
# -*- coding: utf-8 -*-
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know where to add this comment about examples/tests in general so I'll add it here.
As far as I can tell, we can categorize tests into 2 categories:

  1. Blocking vs. Non-Blocking tests
  2. Tests that already reuse https://github.com/pybpod/pybpod-api/tree/master/examples/state_machine_examples and other new tests.

Possible improvements are:

  1. For blocking tests, the user should have a warning about it, or a way to terminate it. Also they don't play nicely with CI. I wonder if we should remove them and keep just their manual-override version.
  2. For non-blocking tests that are copied from the state_machine_examples folder, I wonder if we can find a way to include the code from the other modules, rather than rewrite, and inject the emulator before the run. In such way, only one version of the code is maintained and they would never run out of sync. However using the current design, i.e, using Bpod(emulator_mode), would require a workaround.
  3. For non-blocking tests generally, we should check and assert the resulting run events using current_trial or trials. This can also serve as the basis for unit-tests for any future changes in the whole project.

I don't know about the pybpod-api developers, but IMO we shouldn't block this PR to wait for all changes, rather these can be considered (if they are willing) more of a future road-map.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a second thought, I think we should have at least one example in this PR where we check the resulting current_trial output, otherwise there is nothing here that tests that part.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. For blocking tests, the user should have a warning about it, or a way to terminate it. Also they don't play nicely with CI. I wonder if we should remove them and keep just their manual-override version.

Will remove the blocking tests.

  1. For non-blocking tests that are copied from the state_machine_examples folder, I wonder if we can find a way to include the code from the other modules, rather than rewrite, and inject the emulator before the run. In such way, only one version of the code is maintained and they would never run out of sync. However using the current design, i.e, using Bpod(emulator_mode), would require a workaround

Sounds reasonable. Let me return to this one once I've cleaned up the examples.

On a second thought, I think we should have at least one example in this PR where we check the resulting current_trial output, otherwise there is nothing here that tests that part.

Since these were intended to be high-level behavioral tests for the user and not unit tests, I think we should avoid unit-test-like checks (e.g. assert) for these. Unit tests should be created as part of another PR.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since these were intended to be high-level behavioral tests for the user and not unit tests, I think we should avoid unit-test-like checks (e.g. assert) for these. Unit tests should be created as part of another PR.

True, however the core value of the emulator is to produce an output raw-events/raw-states as if it works in the real life. In such way, the user can test the update of trial settings according to the animal action (e.g as in updateCustomFields() in Mouse2AFC).
I'm not proposing that we make sweep test for the resulting raw-events/states in this PR, but at least one test should check that we get any sensible output.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will create one.

97 changes: 97 additions & 0 deletions examples/emulator_examples/add_trial_events_2_manual_override.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import random
import time
from pybpodapi.protocol import Bpod, StateMachine
from concurrent.futures import ThreadPoolExecutor
my_bpod = Bpod(emulator_mode=True)
nTrials = 5
graceTime = 5
port_numbers = [1, 2, 3]
trialTypes = [1, 2] # 1 (rewarded left) or 2 (rewarded right)
for i in range(nTrials): # Main loop
print('Trial: ', i + 1)
thisTrialType = random.choice(trialTypes) # Randomly choose trial type
if thisTrialType == 1:
# set stimulus channel for trial type 1
stimulus = Bpod.OutputChannels.PWM1
leftAction = 'Reward'
rightAction = 'Punish'
rewardValve = 1
elif thisTrialType == 2:
# set stimulus channel for trial type 1
stimulus = Bpod.OutputChannels.PWM3
leftAction = 'Punish'
rightAction = 'Reward'
rewardValve = 3
sma = StateMachine(my_bpod)
sma.set_global_timer_legacy(
timer_id=1, timer_duration=graceTime) # Set timeout
sma.add_state(
state_name='WaitForPort2Poke',
state_timer=1,
state_change_conditions={Bpod.Events.Port2In: 'FlashStimulus'},
output_actions=[('PWM2', 255)])
sma.add_state(
state_name='FlashStimulus',
state_timer=0.1,
state_change_conditions={Bpod.Events.Tup: 'WaitForResponse'},
output_actions=[(stimulus, 255),
(Bpod.OutputChannels.GlobalTimerTrig, 1)])
sma.add_state(
state_name='WaitForResponse',
state_timer=1,
state_change_conditions={Bpod.Events.Port1In: leftAction,
Bpod.Events.Port3In: rightAction,
Bpod.Events.Port2In: 'Warning',
Bpod.Events.GlobalTimer1_End: 'MiniPunish'},
output_actions=[])
sma.add_state(
state_name='Warning',
state_timer=0.1,
state_change_conditions={Bpod.Events.Tup: 'WaitForResponse',
Bpod.Events.GlobalTimer1_End: 'MiniPunish'},
output_actions=[(Bpod.OutputChannels.LED, 1),
(Bpod.OutputChannels.LED, 2),
(Bpod.OutputChannels.LED, 3)]) # Reward correct choice
sma.add_state(
state_name='Reward',
state_timer=0.1,
state_change_conditions={Bpod.Events.Tup: 'exit'},
# Reward correct choice
output_actions=[(Bpod.OutputChannels.Valve, rewardValve)])
sma.add_state(
state_name='Punish',
state_timer=3,
state_change_conditions={Bpod.Events.Tup: 'exit'},
# Signal incorrect choice
output_actions=[(Bpod.OutputChannels.LED, 1),
(Bpod.OutputChannels.LED, 2),
(Bpod.OutputChannels.LED, 3)])
sma.add_state(
state_name='MiniPunish',
state_timer=1,
state_change_conditions={Bpod.Events.Tup: 'exit'},
# Signal incorrect choice
output_actions=[(Bpod.OutputChannels.LED, 1),
(Bpod.OutputChannels.LED, 2),
(Bpod.OutputChannels.LED, 3)])
# Send state machine description to Bpod device
my_bpod.send_state_machine(sma)
print("Waiting for poke. Reward: ",
'left' if thisTrialType == 1 else 'right')

def mouse(data):
time.sleep(3)
my_bpod.manual_override(Bpod.ChannelTypes.INPUT, 'Port',
channel_number=2,
value=12)
time.sleep(2)
my_bpod.manual_override(Bpod.ChannelTypes.INPUT, 'Port',
channel_number=random.choice(port_numbers),
value=12)

executor = ThreadPoolExecutor(max_workers=1)
executor.submit(mouse, None)

my_bpod.run_state_machine(sma) # Run state machine
print("Current trial info: {0}".format(my_bpod.session.current_trial))
my_bpod.close() # Disconnect Bpod
73 changes: 73 additions & 0 deletions examples/emulator_examples/add_trial_events_manual_override.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import random
import time
from pybpodapi.protocol import Bpod, StateMachine
from concurrent.futures import ThreadPoolExecutor
my_bpod = Bpod(emulator_mode=True)
nTrials = 5
port_numbers = [1, 3]
trialTypes = [1, 2] # 1 (rewarded left) or 2 (rewarded right)
for i in range(nTrials): # Main loop
print('Trial: ', i + 1)
thisTrialType = random.choice(trialTypes) # Randomly choose trial type
if thisTrialType == 1:
# set stimulus channel for trial type 1
stimulus = Bpod.OutputChannels.PWM1
leftAction = 'Reward'
rightAction = 'Punish'
rewardValve = 1
elif thisTrialType == 2:
# set stimulus channel for trial type 1
stimulus = Bpod.OutputChannels.PWM3
leftAction = 'Punish'
rightAction = 'Reward'
rewardValve = 3
sma = StateMachine(my_bpod)
sma.add_state(
state_name='WaitForPort2Poke',
state_timer=1,
state_change_conditions={Bpod.Events.Port2In: 'FlashStimulus'},
output_actions=[(Bpod.OutputChannels.PWM2, 255)])
sma.add_state(
state_name='FlashStimulus',
state_timer=0.1,
state_change_conditions={Bpod.Events.Tup: 'WaitForResponse'},
output_actions=[(stimulus, 255)])
sma.add_state(
state_name='WaitForResponse',
state_timer=1,
state_change_conditions={
Bpod.Events.Port1In: leftAction, Bpod.Events.Port3In: rightAction},
output_actions=[])
sma.add_state(
state_name='Reward',
state_timer=0.1,
state_change_conditions={Bpod.Events.Tup: 'exit'},
# Reward correct choice
output_actions=[(Bpod.OutputChannels.Valve, rewardValve)])
sma.add_state(
state_name='Punish',
state_timer=3,
state_change_conditions={Bpod.Events.Tup: 'exit'},
# Signal incorrect choice
output_actions=[(Bpod.OutputChannels.LED, 1),
(Bpod.OutputChannels.LED, 2),
(Bpod.OutputChannels.LED, 3)])
# Send state machine description to Bpod device
my_bpod.send_state_machine(sma)
print("Waiting for poke. Reward: ",
'left' if thisTrialType == 1 else 'right')

def mouse(data):
time.sleep(3)
my_bpod.manual_override(Bpod.ChannelTypes.INPUT, 'Port',
channel_number=2, value=12)
time.sleep(2)
my_bpod.manual_override(Bpod.ChannelTypes.INPUT, 'Port',
channel_number=random.choice(port_numbers),
value=12)

executor = ThreadPoolExecutor(max_workers=1)
executor.submit(mouse, None)
my_bpod.run_state_machine(sma) # Run state machine
print("Current trial info: {0}".format(my_bpod.session.current_trial))
my_bpod.close() # Disconnect Bpod
9 changes: 9 additions & 0 deletions examples/emulator_examples/bpod_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from pybpodapi.protocol import Bpod
from confapp import conf
my_bpod = Bpod(emulator_mode=True)
my_bpod.close()
print("Target Bpod firmware version: ", conf.TARGET_BPOD_FIRMWARE_VERSION)
print("Firmware version (read from device): ",
my_bpod.hardware.firmware_version)
print("Machine type version (read from device): ",
my_bpod.hardware.machine_type)
25 changes: 25 additions & 0 deletions examples/emulator_examples/condition_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from pybpodapi.protocol import Bpod, StateMachine
my_bpod = Bpod(emulator_mode=True)
sma = StateMachine(my_bpod)
sma.set_condition(condition_number=1,
condition_channel='Port2', channel_value=1)
sma.add_state(
state_name='Port1Light',
state_timer=1,
state_change_conditions={Bpod.Events.Tup: 'Port2Light'},
output_actions=[(Bpod.OutputChannels.PWM1, 255)])
sma.add_state(
state_name='Port2Light',
state_timer=1,
state_change_conditions={
Bpod.Events.Tup: 'Port3Light', Bpod.Events.Condition1: 'Port3Light'},
output_actions=[(Bpod.OutputChannels.PWM2, 255)])
sma.add_state(
state_name='Port3Light',
state_timer=1,
state_change_conditions={Bpod.Events.Tup: 'exit'},
output_actions=[(Bpod.OutputChannels.PWM3, 255)])
my_bpod.send_state_machine(sma)
my_bpod.run_state_machine(sma)
print("Current trial info: {0}".format(my_bpod.session.current_trial))
my_bpod.close()
38 changes: 38 additions & 0 deletions examples/emulator_examples/condition_example_manual_override.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import time
from pybpodapi.protocol import Bpod, StateMachine
from concurrent.futures import ThreadPoolExecutor
my_bpod = Bpod(emulator_mode=True)
sma = StateMachine(my_bpod)
sma.set_condition(condition_number=1,
condition_channel='Port2', channel_value=1)
sma.add_state(
state_name='Port1Light',
state_timer=1,
state_change_conditions={Bpod.Events.Tup: 'Port2Light'},
output_actions=[(Bpod.OutputChannels.PWM1, 255)])
sma.add_state(
state_name='Port2Light',
state_timer=1,
state_change_conditions={
Bpod.Events.Tup: 'Port1Light', Bpod.Events.Condition1: 'Port3Light'},
output_actions=[(Bpod.OutputChannels.PWM2, 255)])
sma.add_state(
state_name='Port3Light',
state_timer=1,
state_change_conditions={Bpod.Events.Tup: 'exit'},
output_actions=[(Bpod.OutputChannels.PWM3, 255)])
my_bpod.send_state_machine(sma)


def mouse(data):
time.sleep(5)
my_bpod.manual_override(Bpod.ChannelTypes.INPUT, 'Port',
channel_number=2, value=12)


executor = ThreadPoolExecutor(max_workers=1)
executor.submit(mouse, None)

my_bpod.run_state_machine(sma)
print("Current trial info: {0}".format(my_bpod.session.current_trial))
my_bpod.close()
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import time
from pybpodapi.protocol import Bpod, StateMachine
from concurrent.futures import ThreadPoolExecutor
my_bpod = Bpod(emulator_mode=True)
sma = StateMachine(my_bpod)
sma.set_global_counter(counter_number=1, target_event='Port1In', threshold=5)
sma.add_state(
state_name='InitialDelay',
state_timer=2,
state_change_conditions={Bpod.Events.Tup: 'ResetGlobalCounter1'},
output_actions=[(Bpod.OutputChannels.PWM2, 255)])
sma.add_state(
state_name='ResetGlobalCounter1',
state_timer=0,
state_change_conditions={Bpod.Events.Tup: 'Port1Lit'},
output_actions=[(Bpod.OutputChannels.GlobalCounterReset, 1)])
sma.add_state(
# Infinite loop (with next state). Only a global counter can save us.
state_name='Port1Lit',
state_timer=.25,
state_change_conditions={
Bpod.Events.Tup: 'Port3Lit', 'GlobalCounter1_End': 'exit'},
output_actions=[(Bpod.OutputChannels.PWM1, 255)])
sma.add_state(
state_name='Port3Lit',
state_timer=.25,
state_change_conditions={
Bpod.Events.Tup: 'Port1Lit', 'GlobalCounter1_End': 'exit'},
output_actions=[(Bpod.OutputChannels.PWM3, 255)])
my_bpod.send_state_machine(sma)


def mouse(data):
time.sleep(1)
for _ in range(5):
time.sleep(1)
my_bpod.manual_override(Bpod.ChannelTypes.INPUT, 'Port',
channel_number=1, value=12)


executor = ThreadPoolExecutor(max_workers=1)
executor.submit(mouse, None)

my_bpod.run_state_machine(sma)
print("Current trial info: {0}".format(my_bpod.session.current_trial))
my_bpod.close()
27 changes: 27 additions & 0 deletions examples/emulator_examples/global_timer_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from pybpodapi.protocol import Bpod, StateMachine
my_bpod = Bpod(emulator_mode=True)
sma = StateMachine(my_bpod)
# Set global timer 1 for 3 seconds
sma.set_global_timer_legacy(timer_id=1, timer_duration=3)
sma.add_state(
state_name='TimerTrig', # Trigger global timer
state_timer=0,
state_change_conditions={Bpod.Events.Tup: 'Port1Lit'},
output_actions=[(Bpod.OutputChannels.GlobalTimerTrig, 1)])
sma.add_state(
# Infinite loop (with next state). Only a global timer can save us.
state_name='Port1Lit',
state_timer=.25,
state_change_conditions={Bpod.Events.Tup: 'Port3Lit',
Bpod.Events.GlobalTimer1_End: 'exit'},
output_actions=[(Bpod.OutputChannels.PWM1, 255)])
sma.add_state(
state_name='Port3Lit',
state_timer=.25,
state_change_conditions={Bpod.Events.Tup: 'Port1Lit',
Bpod.Events.GlobalTimer1_End: 'exit'},
output_actions=[(Bpod.OutputChannels.PWM3, 255)])
my_bpod.send_state_machine(sma)
my_bpod.run_state_machine(sma)
print("Current trial info: {0}".format(my_bpod.session.current_trial))
my_bpod.close()
Loading