-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy patht5_flan.py
113 lines (89 loc) · 3.41 KB
/
t5_flan.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import torch
from transformers import T5ForConditionalGeneration, T5Tokenizer
from torchtext.datasets import IMDB
from torch.utils.data import DataLoader
import time
import psutil
import threading
DPINK = '\033[38;2;204;0;204m'
RESET = '\033[0m'
find_latency = True
find_throughput = True
process = psutil.Process()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"{DPINK}This workload is running on {device} ! {RESET} ")
# Load T5-Flan model and tokenizer
model_name = "google/flan-t5-xl"
model = T5ForConditionalGeneration.from_pretrained(model_name).to(device)
tokenizer = T5Tokenizer.from_pretrained(model_name)
# Load IMDb dataset
train_dataset, test_dataset = IMDB(split=('train', 'test'))
# Choose a subset of the test dataset for benchmarking
benchmark_data = list(test_dataset)[:100]
# Function to perform inference on a single input
def perform_inference(input_text):
output_text = ''
input_ids = tokenizer(input_text, return_tensors="pt").input_ids.to(device)
with torch.no_grad():
output_ids = model.generate(input_ids)
for txt in output_ids:
output_text = output_text + tokenizer.decode(txt, skip_special_tokens=True)
return output_text
# Function to measure latency
def measure_latency(data):
start_time = time.time()
for example in data:
input_text = example[1] # IMDb reviews are in the second column
output_text = perform_inference(input_text)
end_time = time.time()
return end_time - start_time
def worker(thread_id, data):
print(f"Thread {thread_id} started.")
for example in data:
input_text = example[1] # IMDb reviews are in the second column
output_text = perform_inference(input_text)
print(f"Thread {thread_id} finished.")
# Function to measure throughput
def measure_throughput(data, threads):
start_time = time.time()
#start threads
thread_list = []
for i in range(threads):
thread = threading.Thread(target=worker, args=(i+1,data))
thread_list.append(thread)
thread.start()
#wait for threads to complete
for thread in thread_list:
thread.join()
end_time = time.time()
throughput = threads * 100 / (end_time - start_time)
return throughput
# Function printing memory and cpu usage
def print_memusage():
# Measure memory usage
memory_used = torch.cuda.memory_allocated() / 1023 / 1024 # in megabytes
if memory_used == -1:
print("CUDA : %s" %(torch.cuda.is_available()))
print(f"GPU Memory used: {memory_used:.1f} MB")
if find_latency:
# latency benchmarking
print(f"\n\n{DPINK}***Latency***{RESET}\n\n")
cpu_start = process.cpu_percent(percpu=True)
latency = measure_latency(benchmark_data)
cpu_end = process.cpu_percent(percpu=True)
cpu = cpu_end - cpu_start
print(f"Latency for {len(benchmark_data)} examples: {latency:.2f} seconds")
print(f"CPU usage : {cpu} ")
print_memusage()
if find_throughput:
# throughput benchmarking
print(f"\n\n{DPINK}***Throughput***{RESET}\n\n")
cpu_start = process.cpu_percent(percpu=True)
num_threads = 3
print(f"Number of Threads : {num_threads}")
throughput = measure_throughput(benchmark_data, num_threads)
cpu_end = process.cpu_percent(percpu=True)
cpu = cpu_end - cpu_start
print(f"Throughput: {throughput:.2f} records per second")
print(f"CPU usage : {cpu} ")
print_memusage()