Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add code files from Thread Safety tutorial #580

Merged
merged 3 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions thread-safety-locks/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Python Thread Safety: Using a Lock and Other Techniques

This folder contains code examples from the Real Python tutorial [Python Thread Safety: Using a Lock and Other Techniques](https://realpython.com/python-thread-lock/).

## About the Author

Adarsh Divakaran - Website: https://adarshd.dev/

## License

Distributed under the MIT license. See ``LICENSE`` for more information.
27 changes: 27 additions & 0 deletions thread-safety-locks/bank_barrier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import random
import threading
import time
from concurrent.futures import ThreadPoolExecutor

teller_barrier = threading.Barrier(3)


def prepare_for_work(name):
print(f"{int(time.time())}: {name} is preparing their counter.")

# Simulate the delay to prepare the counter
time.sleep(random.randint(1, 3))
print(f"{int(time.time())}: {name} has finished preparing.")

# Wait for all tellers to finish preparing
teller_barrier.wait()
print(f"{int(time.time())}: {name} is now ready to serve customers.")


tellers = ["Teller 1", "Teller 2", "Teller 3"]

with ThreadPoolExecutor(max_workers=3) as executor:
for teller_name in tellers:
executor.submit(prepare_for_work, teller_name)

print(f"{int(time.time())}: All tellers are ready to serve customers.")
53 changes: 53 additions & 0 deletions thread-safety-locks/bank_condition.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import random
import threading
import time
from concurrent.futures import ThreadPoolExecutor

customer_available_condition = threading.Condition()

# Customers waiting to be served by the Teller
customer_queue = []


def serve_customers():
while True:
with customer_available_condition:
# Wait for a customer to arrive
while not customer_queue:
print(f"{int(time.time())}: Teller is waiting for a customer.")
customer_available_condition.wait()

# Serve the customer
customer = customer_queue.pop(0)
print(f"{int(time.time())}: Teller is serving {customer}.")

# Simulate the time taken to serve the customer
time.sleep(random.randint(1, 3))
print(f"{int(time.time())}: Teller has finished serving {customer}.")


def add_customer_to_queue(name):
with customer_available_condition:
print(f"{int(time.time())}: {name} has arrived at the bank.")
customer_queue.append(name)

customer_available_condition.notify()


customer_names = [
"Customer 1",
"Customer 2",
"Customer 3",
"Customer 4",
"Customer 5",
]

with ThreadPoolExecutor(max_workers=6) as executor:

teller_thread = executor.submit(serve_customers)

for name in customer_names:
# Simulate customers arriving at random intervals
time.sleep(random.randint(2, 5))

executor.submit(add_customer_to_queue, name)
51 changes: 51 additions & 0 deletions thread-safety-locks/bank_deadlock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import threading
import time
from concurrent.futures import ThreadPoolExecutor


class BankAccount:
def __init__(self):
self.balance = 0
self.lock = threading.Lock()

def deposit(self, amount):
print(
f"Thread {threading.current_thread().name} waiting "
f"to acquire lock for deposit()"
)
with self.lock:
print(
f"Thread {threading.current_thread().name} "
"acquired lock for deposit()"
)
time.sleep(0.1)
self._update_balance(amount)

def _update_balance(self, amount):
print(
f"Thread {threading.current_thread().name} waiting to acquire "
f"lock for _update_balance()"
)
with self.lock: # This will cause a deadlock
print(
f"Thread {threading.current_thread().name} "
"acquired lock for _update_balance()"
)
self.balance += amount


account = BankAccount()


def make_deposit():
account.deposit(100)


with ThreadPoolExecutor(
max_workers=3, thread_name_prefix="Worker"
) as executor:
for _ in range(3):
executor.submit(make_deposit)


print(f"Final balance: {account.balance}")
50 changes: 50 additions & 0 deletions thread-safety-locks/bank_event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import threading
import time
from concurrent.futures import ThreadPoolExecutor

bank_open = threading.Event()
transactions_open = threading.Event()


def serve_customer(customer_data):
print(f"{customer_data['name']} is waiting for the bank to open.")

bank_open.wait()
print(f"{customer_data['name']} entered the bank")
if customer_data["type"] == "WITHDRAW_MONEY":
print(f"{customer_data['name']} is waiting for transactions to open.")
transactions_open.wait()
print(f"{customer_data['name']} is starting their transaction.")

# Simulate the time taken for performing the transaction
time.sleep(2)

print(f"{customer_data['name']} completed transaction and exited bank")
else:
# Simulate the time taken for banking
time.sleep(2)
print(f"{customer_data['name']} has exited bank")


customers = [
{"name": "Customer 1", "type": "WITHDRAW_MONEY"},
{"name": "Customer 2", "type": "CHECK_BALANCE"},
{"name": "Customer 3", "type": "WITHDRAW_MONEY"},
{"name": "Customer 4", "type": "WITHDRAW_MONEY"},
]

with ThreadPoolExecutor(max_workers=4) as executor:
for customer_data in customers:
executor.submit(serve_customer, customer_data)

print("Bank manager is preparing to open the bank.")
time.sleep(2)
print("Bank is now open!")
bank_open.set() # Signal that the bank is open

time.sleep(3)
print("Transactions are now open!")
transactions_open.set()


print("All customers have completed their transactions.")
24 changes: 24 additions & 0 deletions thread-safety-locks/bank_multithreaded_withdrawal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import time
from concurrent.futures import ThreadPoolExecutor


class BankAccount:
def __init__(self):
self.balance = 1000

def withdraw(self, amount):
if self.balance >= amount:
new_balance = self.balance - amount
time.sleep(0.1) # Simulate a delay
self.balance = new_balance
else:
raise Exception("Insufficient balance")


account = BankAccount()

with ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(account.withdraw, 500)
executor.submit(account.withdraw, 700)

print(f"Final account balance: {account.balance}")
51 changes: 51 additions & 0 deletions thread-safety-locks/bank_rlock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import threading
import time
from concurrent.futures import ThreadPoolExecutor


class BankAccount:
def __init__(self):
self.balance = 0
self.lock = threading.RLock()

def deposit(self, amount):
print(
f"Thread {threading.current_thread().name} "
"waiting to acquire lock for .deposit()"
)
with self.lock:
print(
f"Thread {threading.current_thread().name} "
"acquired lock for .deposit()"
)
time.sleep(0.1)
self._update_balance(amount)

def _update_balance(self, amount):
print(
f"Thread {threading.current_thread().name} "
"waiting to acquire lock for ._update_balance()"
)
with self.lock:
print(
f"Thread {threading.current_thread().name} "
"acquired lock for ._update_balance()"
)
self.balance += amount


account = BankAccount()


def make_deposit():
account.deposit(100)


with ThreadPoolExecutor(
max_workers=3, thread_name_prefix="Worker"
) as executor:
for _ in range(3):
executor.submit(make_deposit)


print(f"Final balance: {account.balance}")
32 changes: 32 additions & 0 deletions thread-safety-locks/bank_semaphore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import random
import threading
import time
from concurrent.futures import ThreadPoolExecutor

# Semaphore with a maximum of 2 resources (tellers)
teller_semaphore = threading.Semaphore(2)


def serve_customer(name):
print(f"{int(time.time())}: {name} is waiting for a teller.")
with teller_semaphore:
print(f"{int(time.time())}: {name} is being served by a teller.")
# Simulate the time taken for the teller to serve the customer
time.sleep(random.randint(1, 3))
print(f"{int(time.time())}: {name} is done being served.")


customers = [
"Customer 1",
"Customer 2",
"Customer 3",
"Customer 4",
"Customer 5",
]

with ThreadPoolExecutor(max_workers=5) as executor:
for customer_name in customers:
thread = executor.submit(serve_customer, customer_name)


print("All customers have been served.")
38 changes: 38 additions & 0 deletions thread-safety-locks/bank_thread_safe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import threading
import time
from concurrent.futures import ThreadPoolExecutor


class BankAccount:
def __init__(self, balance=0):
self.balance = balance
self.account_lock = threading.Lock()

def withdraw(self, amount):
with self.account_lock:
if self.balance >= amount:
new_balance = self.balance - amount
print(f"Withdrawing {amount}...")
time.sleep(0.1) # Simulate a delay
self.balance = new_balance
else:
raise Exception("Insufficient balance")

def deposit(self, amount):
with self.account_lock:
new_balance = self.balance + amount
print(f"Depositing {amount}...")
time.sleep(0.1) # Simulate a delay
self.balance = new_balance


account = BankAccount(1000)

with ThreadPoolExecutor(max_workers=3) as executor:

executor.submit(account.withdraw, 700)
executor.submit(account.deposit, 1000)
executor.submit(account.withdraw, 300)


print(f"Final account balance: {account.balance}")
16 changes: 16 additions & 0 deletions thread-safety-locks/threading_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import threading
import time
from concurrent.futures import ThreadPoolExecutor


def threaded_function():
for number in range(3):
print(f"Printing from {threading.current_thread().name}. {number=}")
time.sleep(0.1)


with ThreadPoolExecutor(
max_workers=4, thread_name_prefix="Worker"
) as executor:
for _ in range(4):
executor.submit(threaded_function)
Loading