Skip to content

Commit 113593d

Browse files
authored
Switch to example found on geeksforgeeks.org
See https://www.geeksforgeeks.org/multiprocessing-in-python-and-pytorch/
1 parent 2103fb5 commit 113593d

File tree

1 file changed

+84
-89
lines changed

1 file changed

+84
-89
lines changed

PyTorch/pytorch_multiprocessing.py

+84-89
Original file line numberDiff line numberDiff line change
@@ -1,89 +1,84 @@
1-
import torch
2-
import torch.multiprocessing as mp
3-
import torch.nn as nn
4-
import torch.optim as optim
5-
import logging
6-
import random
7-
8-
SIZE = 500
9-
N = 10 # Number of forward passes
10-
11-
# Initialize logging
12-
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
13-
14-
class SimpleNet(nn.Module):
15-
def __init__(self):
16-
super(SimpleNet, self).__init__()
17-
self.fc1 = nn.Linear(SIZE, 100)
18-
self.relu = nn.ReLU()
19-
self.fc2 = nn.Linear(100, 10)
20-
21-
def forward(self, x):
22-
x = self.fc1(x)
23-
x = self.relu(x)
24-
x = self.fc2(x)
25-
return x
26-
27-
def worker(rank, barrier):
28-
logging.info(f"Worker {rank} started")
29-
30-
try:
31-
# Synchronize all workers
32-
barrier.wait()
33-
34-
# Set a unique seed for each worker
35-
seed = torch.initial_seed() + rank
36-
torch.manual_seed(seed)
37-
random.seed(seed)
38-
39-
# Create a simple neural network
40-
net = SimpleNet()
41-
42-
# Create a random input tensor and a target tensor
43-
input_tensor = torch.rand((SIZE, SIZE))
44-
target_tensor = torch.randint(0, 10, (SIZE,))
45-
46-
# Define a loss function and an optimizer
47-
criterion = nn.CrossEntropyLoss()
48-
optimizer = optim.SGD(net.parameters(), lr=0.01)
49-
50-
for _ in range(N):
51-
# Perform forward pass
52-
output = net(input_tensor)
53-
54-
# Compute the loss
55-
loss = criterion(output, target_tensor)
56-
57-
# Perform backward pass
58-
optimizer.zero_grad()
59-
loss.backward()
60-
optimizer.step()
61-
62-
logging.info(f"Worker {rank} - Loss: {loss.item()}")
63-
64-
logging.info(f"Worker {rank} finished all {N} forward passes.")
65-
66-
except Exception as e:
67-
logging.error(f"Worker {rank} failed with error: {e}")
68-
69-
def test():
70-
num_workers = mp.cpu_count() # Get the number of CPUs without limiting
71-
logging.info(f"Running test on {num_workers} CPUs")
72-
73-
# Create a barrier to synchronize workers
74-
barrier = mp.Barrier(num_workers)
75-
76-
# Create a process for each CPU
77-
processes = []
78-
for rank in range(num_workers):
79-
p = mp.Process(target=worker, args=(rank, barrier))
80-
p.start()
81-
processes.append(p)
82-
83-
# Wait for all processes to finish
84-
for p in processes:
85-
p.join()
86-
87-
if __name__ == "__main__":
88-
test()
89-
1+
# This example is based on the code given at
2+
# https://www.geeksforgeeks.org/multiprocessing-in-python-and-pytorch/
3+
4+
# Import the necessary libraries
5+
import torch
6+
import torch.nn as nn
7+
import torch.multiprocessing as mp
8+
9+
10+
# Define the training function
11+
def train(model, X, Y):
12+
# Define the learning rate, number of iterations, and loss function
13+
learning_rate = 0.01
14+
n_iters = 100
15+
loss = nn.MSELoss()
16+
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)
17+
18+
# Loop through the specified number of iterations
19+
for epoch in range(n_iters):
20+
# Make predictions using the model
21+
y_predicted = model(X)
22+
23+
# Calculate the loss
24+
l = loss(Y, y_predicted)
25+
26+
# Backpropagate the loss to update the model parameters
27+
l.backward()
28+
optimizer.step()
29+
optimizer.zero_grad()
30+
31+
# Print the current loss and weights every 10 epochs
32+
if epoch % 10 == 0:
33+
[w, b] = model.parameters()
34+
print(
35+
f"Rank {mp.current_process().name}: epoch {epoch+1}: w = {w[0][0].item():.3f}, loss = {l:.3f}"
36+
)
37+
38+
39+
# Main function
40+
if __name__ == "__main__":
41+
# Set the number of processes and define the input and output data
42+
num_processes = mp.cpu_count() # Get the number of CPUs without limiting
43+
X = torch.tensor([[1], [2], [3], [4]], dtype=torch.float32)
44+
Y = torch.tensor([[2], [4], [6], [8]], dtype=torch.float32)
45+
n_samples, n_features = X.shape
46+
47+
# Print the number of samples and features
48+
print(f"#samples: {n_samples}, #features: {n_features}")
49+
50+
# Define the test input and the model input/output sizes
51+
X_test = torch.tensor([5], dtype=torch.float32)
52+
input_size = n_features
53+
output_size = n_features
54+
55+
# Define the linear model and print its prediction on the test input before training
56+
model = nn.Linear(input_size, output_size)
57+
print(f"Prediction before training: f(5) = {model(X_test).item():.3f}")
58+
59+
# Share the model's memory to allow it to be accessed by multiple processes
60+
model.share_memory()
61+
62+
# Create a list of processes and start each process with the train function
63+
processes = []
64+
for rank in range(num_processes):
65+
p = mp.Process(
66+
target=train,
67+
args=(
68+
model,
69+
X,
70+
Y,
71+
),
72+
name=f"Process-{rank}",
73+
)
74+
p.start()
75+
processes.append(p)
76+
print(f"Started {p.name}")
77+
78+
# Wait for all processes to finish
79+
for p in processes:
80+
p.join()
81+
print(f"Finished {p.name}")
82+
83+
# Print the model's prediction on the test input after training
84+
print(f"Prediction after training: f(5) = {model(X_test).item():.3f}")

0 commit comments

Comments
 (0)