Skip to content

Commit

Permalink
Add emulator support
Browse files Browse the repository at this point in the history
- Add Emulator object that emulates bpod hardware behavior and
reference it in the BpodBase class
- Modify bpod classes to accommodate emulator functionality
- Add emulator examples to showcase its usage
  • Loading branch information
ckaraneen committed Dec 29, 2020
1 parent 4c9ced5 commit 7ed52ec
Show file tree
Hide file tree
Showing 28 changed files with 1,758 additions and 145 deletions.
16 changes: 16 additions & 0 deletions examples/emulator_examples/10_state_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from pybpodapi.protocol import Bpod, StateMachine
my_bpod = Bpod(emulator_mode=True)
sma = StateMachine(my_bpod)
for i in range(10):
sma.add_state(
state_name='State{}'.format(i),
state_timer=1,
state_change_conditions={Bpod.Events.Tup: 'State{}'.format(i + 1)},
output_actions=[])
sma.add_state(
state_name='State10',
state_timer=1,
state_change_conditions={Bpod.Events.Tup: 'exit'},
output_actions=[])
my_bpod.send_state_machine(sma)
my_bpod.run_state_machine(sma)
20 changes: 20 additions & 0 deletions examples/emulator_examples/10_state_example_w_global_timer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from pybpodapi.protocol import Bpod, StateMachine
my_bpod = Bpod(emulator_mode=True)
sma = StateMachine(my_bpod)
# Set global timer 1 for 3 seconds, following a 1.5 second onset delay after
# trigger. Link to LED of port 2.
sma.set_global_timer(timer_id=1, timer_duration=0, on_set_delay=1.5,
channel=Bpod.OutputChannels.PWM2, on_message=255)
for i in range(10):
sma.add_state(
state_name='State{}'.format(i),
state_timer=1,
state_change_conditions={Bpod.Events.Tup: 'State{}'.format(i + 1)},
output_actions=[])
sma.add_state(
state_name='State10',
state_timer=1,
state_change_conditions={Bpod.Events.Tup: 'exit'},
output_actions=[])
my_bpod.send_state_machine(sma)
my_bpod.run_state_machine(sma)
10 changes: 10 additions & 0 deletions examples/emulator_examples/1_state_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from pybpodapi.protocol import Bpod, StateMachine
my_bpod = Bpod(emulator_mode=True)
sma = StateMachine(my_bpod)
sma.add_state(
state_name='myState',
state_timer=1,
state_change_conditions={Bpod.Events.Tup: 'exit'},
output_actions=[])
my_bpod.send_state_machine(sma)
my_bpod.run_state_machine(sma)
2 changes: 2 additions & 0 deletions examples/emulator_examples/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# !/usr/bin/python3
# -*- coding: utf-8 -*-
97 changes: 97 additions & 0 deletions examples/emulator_examples/add_trial_events_2_manual_override.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import random
import time
from pybpodapi.protocol import Bpod, StateMachine
from concurrent.futures import ThreadPoolExecutor
my_bpod = Bpod(emulator_mode=True)
nTrials = 5
graceTime = 5
port_numbers = [1, 2, 3]
trialTypes = [1, 2] # 1 (rewarded left) or 2 (rewarded right)
for i in range(nTrials): # Main loop
print('Trial: ', i + 1)
thisTrialType = random.choice(trialTypes) # Randomly choose trial type
if thisTrialType == 1:
# set stimulus channel for trial type 1
stimulus = Bpod.OutputChannels.PWM1
leftAction = 'Reward'
rightAction = 'Punish'
rewardValve = 1
elif thisTrialType == 2:
# set stimulus channel for trial type 1
stimulus = Bpod.OutputChannels.PWM3
leftAction = 'Punish'
rightAction = 'Reward'
rewardValve = 3
sma = StateMachine(my_bpod)
sma.set_global_timer_legacy(
timer_id=1, timer_duration=graceTime) # Set timeout
sma.add_state(
state_name='WaitForPort2Poke',
state_timer=1,
state_change_conditions={Bpod.Events.Port2In: 'FlashStimulus'},
output_actions=[('PWM2', 255)])
sma.add_state(
state_name='FlashStimulus',
state_timer=0.1,
state_change_conditions={Bpod.Events.Tup: 'WaitForResponse'},
output_actions=[(stimulus, 255),
(Bpod.OutputChannels.GlobalTimerTrig, 1)])
sma.add_state(
state_name='WaitForResponse',
state_timer=1,
state_change_conditions={Bpod.Events.Port1In: leftAction,
Bpod.Events.Port3In: rightAction,
Bpod.Events.Port2In: 'Warning',
Bpod.Events.GlobalTimer1_End: 'MiniPunish'},
output_actions=[])
sma.add_state(
state_name='Warning',
state_timer=0.1,
state_change_conditions={Bpod.Events.Tup: 'WaitForResponse',
Bpod.Events.GlobalTimer1_End: 'MiniPunish'},
output_actions=[(Bpod.OutputChannels.LED, 1),
(Bpod.OutputChannels.LED, 2),
(Bpod.OutputChannels.LED, 3)]) # Reward correct choice
sma.add_state(
state_name='Reward',
state_timer=0.1,
state_change_conditions={Bpod.Events.Tup: 'exit'},
# Reward correct choice
output_actions=[(Bpod.OutputChannels.Valve, rewardValve)])
sma.add_state(
state_name='Punish',
state_timer=3,
state_change_conditions={Bpod.Events.Tup: 'exit'},
# Signal incorrect choice
output_actions=[(Bpod.OutputChannels.LED, 1),
(Bpod.OutputChannels.LED, 2),
(Bpod.OutputChannels.LED, 3)])
sma.add_state(
state_name='MiniPunish',
state_timer=1,
state_change_conditions={Bpod.Events.Tup: 'exit'},
# Signal incorrect choice
output_actions=[(Bpod.OutputChannels.LED, 1),
(Bpod.OutputChannels.LED, 2),
(Bpod.OutputChannels.LED, 3)])
# Send state machine description to Bpod device
my_bpod.send_state_machine(sma)
print("Waiting for poke. Reward: ",
'left' if thisTrialType == 1 else 'right')

def mouse(data):
time.sleep(3)
my_bpod.manual_override(Bpod.ChannelTypes.INPUT, 'Port',
channel_number=2,
value=12)
time.sleep(2)
my_bpod.manual_override(Bpod.ChannelTypes.INPUT, 'Port',
channel_number=random.choice(port_numbers),
value=12)

executor = ThreadPoolExecutor(max_workers=1)
executor.submit(mouse, None)

my_bpod.run_state_machine(sma) # Run state machine
print("Current trial info: {0}".format(my_bpod.session.current_trial))
my_bpod.close() # Disconnect Bpod
73 changes: 73 additions & 0 deletions examples/emulator_examples/add_trial_events_manual_override.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import random
import time
from pybpodapi.protocol import Bpod, StateMachine
from concurrent.futures import ThreadPoolExecutor
my_bpod = Bpod(emulator_mode=True)
nTrials = 5
port_numbers = [1, 3]
trialTypes = [1, 2] # 1 (rewarded left) or 2 (rewarded right)
for i in range(nTrials): # Main loop
print('Trial: ', i + 1)
thisTrialType = random.choice(trialTypes) # Randomly choose trial type
if thisTrialType == 1:
# set stimulus channel for trial type 1
stimulus = Bpod.OutputChannels.PWM1
leftAction = 'Reward'
rightAction = 'Punish'
rewardValve = 1
elif thisTrialType == 2:
# set stimulus channel for trial type 1
stimulus = Bpod.OutputChannels.PWM3
leftAction = 'Punish'
rightAction = 'Reward'
rewardValve = 3
sma = StateMachine(my_bpod)
sma.add_state(
state_name='WaitForPort2Poke',
state_timer=1,
state_change_conditions={Bpod.Events.Port2In: 'FlashStimulus'},
output_actions=[(Bpod.OutputChannels.PWM2, 255)])
sma.add_state(
state_name='FlashStimulus',
state_timer=0.1,
state_change_conditions={Bpod.Events.Tup: 'WaitForResponse'},
output_actions=[(stimulus, 255)])
sma.add_state(
state_name='WaitForResponse',
state_timer=1,
state_change_conditions={
Bpod.Events.Port1In: leftAction, Bpod.Events.Port3In: rightAction},
output_actions=[])
sma.add_state(
state_name='Reward',
state_timer=0.1,
state_change_conditions={Bpod.Events.Tup: 'exit'},
# Reward correct choice
output_actions=[(Bpod.OutputChannels.Valve, rewardValve)])
sma.add_state(
state_name='Punish',
state_timer=3,
state_change_conditions={Bpod.Events.Tup: 'exit'},
# Signal incorrect choice
output_actions=[(Bpod.OutputChannels.LED, 1),
(Bpod.OutputChannels.LED, 2),
(Bpod.OutputChannels.LED, 3)])
# Send state machine description to Bpod device
my_bpod.send_state_machine(sma)
print("Waiting for poke. Reward: ",
'left' if thisTrialType == 1 else 'right')

def mouse(data):
time.sleep(3)
my_bpod.manual_override(Bpod.ChannelTypes.INPUT, 'Port',
channel_number=2, value=12)
time.sleep(2)
my_bpod.manual_override(Bpod.ChannelTypes.INPUT, 'Port',
channel_number=random.choice(port_numbers),
value=12)

executor = ThreadPoolExecutor(max_workers=1)
executor.submit(mouse, None)
my_bpod.run_state_machine(sma) # Run state machine
print("Current trial info: {0}".format(my_bpod.session.current_trial))
my_bpod.close() # Disconnect Bpod
9 changes: 9 additions & 0 deletions examples/emulator_examples/bpod_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from pybpodapi.protocol import Bpod
from confapp import conf
my_bpod = Bpod(emulator_mode=True)
my_bpod.close()
print("Target Bpod firmware version: ", conf.TARGET_BPOD_FIRMWARE_VERSION)
print("Firmware version (read from device): ",
my_bpod.hardware.firmware_version)
print("Machine type version (read from device): ",
my_bpod.hardware.machine_type)
25 changes: 25 additions & 0 deletions examples/emulator_examples/condition_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from pybpodapi.protocol import Bpod, StateMachine
my_bpod = Bpod(emulator_mode=True)
sma = StateMachine(my_bpod)
sma.set_condition(condition_number=1,
condition_channel='Port2', channel_value=1)
sma.add_state(
state_name='Port1Light',
state_timer=1,
state_change_conditions={Bpod.Events.Tup: 'Port2Light'},
output_actions=[(Bpod.OutputChannels.PWM1, 255)])
sma.add_state(
state_name='Port2Light',
state_timer=1,
state_change_conditions={
Bpod.Events.Tup: 'Port3Light', Bpod.Events.Condition1: 'Port3Light'},
output_actions=[(Bpod.OutputChannels.PWM2, 255)])
sma.add_state(
state_name='Port3Light',
state_timer=1,
state_change_conditions={Bpod.Events.Tup: 'exit'},
output_actions=[(Bpod.OutputChannels.PWM3, 255)])
my_bpod.send_state_machine(sma)
my_bpod.run_state_machine(sma)
print("Current trial info: {0}".format(my_bpod.session.current_trial))
my_bpod.close()
38 changes: 38 additions & 0 deletions examples/emulator_examples/condition_example_manual_override.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import time
from pybpodapi.protocol import Bpod, StateMachine
from concurrent.futures import ThreadPoolExecutor
my_bpod = Bpod(emulator_mode=True)
sma = StateMachine(my_bpod)
sma.set_condition(condition_number=1,
condition_channel='Port2', channel_value=1)
sma.add_state(
state_name='Port1Light',
state_timer=1,
state_change_conditions={Bpod.Events.Tup: 'Port2Light'},
output_actions=[(Bpod.OutputChannels.PWM1, 255)])
sma.add_state(
state_name='Port2Light',
state_timer=1,
state_change_conditions={
Bpod.Events.Tup: 'Port1Light', Bpod.Events.Condition1: 'Port3Light'},
output_actions=[(Bpod.OutputChannels.PWM2, 255)])
sma.add_state(
state_name='Port3Light',
state_timer=1,
state_change_conditions={Bpod.Events.Tup: 'exit'},
output_actions=[(Bpod.OutputChannels.PWM3, 255)])
my_bpod.send_state_machine(sma)


def mouse(data):
time.sleep(5)
my_bpod.manual_override(Bpod.ChannelTypes.INPUT, 'Port',
channel_number=2, value=12)


executor = ThreadPoolExecutor(max_workers=1)
executor.submit(mouse, None)

my_bpod.run_state_machine(sma)
print("Current trial info: {0}".format(my_bpod.session.current_trial))
my_bpod.close()
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import time
from pybpodapi.protocol import Bpod, StateMachine
from concurrent.futures import ThreadPoolExecutor
my_bpod = Bpod(emulator_mode=True)
sma = StateMachine(my_bpod)
sma.set_global_counter(counter_number=1, target_event='Port1In', threshold=5)
sma.add_state(
state_name='InitialDelay',
state_timer=2,
state_change_conditions={Bpod.Events.Tup: 'ResetGlobalCounter1'},
output_actions=[(Bpod.OutputChannels.PWM2, 255)])
sma.add_state(
state_name='ResetGlobalCounter1',
state_timer=0,
state_change_conditions={Bpod.Events.Tup: 'Port1Lit'},
output_actions=[(Bpod.OutputChannels.GlobalCounterReset, 1)])
sma.add_state(
# Infinite loop (with next state). Only a global counter can save us.
state_name='Port1Lit',
state_timer=.25,
state_change_conditions={
Bpod.Events.Tup: 'Port3Lit', 'GlobalCounter1_End': 'exit'},
output_actions=[(Bpod.OutputChannels.PWM1, 255)])
sma.add_state(
state_name='Port3Lit',
state_timer=.25,
state_change_conditions={
Bpod.Events.Tup: 'Port1Lit', 'GlobalCounter1_End': 'exit'},
output_actions=[(Bpod.OutputChannels.PWM3, 255)])
my_bpod.send_state_machine(sma)


def mouse(data):
time.sleep(1)
for _ in range(5):
time.sleep(1)
my_bpod.manual_override(Bpod.ChannelTypes.INPUT, 'Port',
channel_number=1, value=12)


executor = ThreadPoolExecutor(max_workers=1)
executor.submit(mouse, None)

my_bpod.run_state_machine(sma)
print("Current trial info: {0}".format(my_bpod.session.current_trial))
my_bpod.close()
27 changes: 27 additions & 0 deletions examples/emulator_examples/global_timer_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from pybpodapi.protocol import Bpod, StateMachine
my_bpod = Bpod(emulator_mode=True)
sma = StateMachine(my_bpod)
# Set global timer 1 for 3 seconds
sma.set_global_timer_legacy(timer_id=1, timer_duration=3)
sma.add_state(
state_name='TimerTrig', # Trigger global timer
state_timer=0,
state_change_conditions={Bpod.Events.Tup: 'Port1Lit'},
output_actions=[(Bpod.OutputChannels.GlobalTimerTrig, 1)])
sma.add_state(
# Infinite loop (with next state). Only a global timer can save us.
state_name='Port1Lit',
state_timer=.25,
state_change_conditions={Bpod.Events.Tup: 'Port3Lit',
Bpod.Events.GlobalTimer1_End: 'exit'},
output_actions=[(Bpod.OutputChannels.PWM1, 255)])
sma.add_state(
state_name='Port3Lit',
state_timer=.25,
state_change_conditions={Bpod.Events.Tup: 'Port1Lit',
Bpod.Events.GlobalTimer1_End: 'exit'},
output_actions=[(Bpod.OutputChannels.PWM3, 255)])
my_bpod.send_state_machine(sma)
my_bpod.run_state_machine(sma)
print("Current trial info: {0}".format(my_bpod.session.current_trial))
my_bpod.close()
Loading

0 comments on commit 7ed52ec

Please sign in to comment.