Download the source code file "pytorch_test.py" and open it in your favorite IDE such as VS Code to review it. This code demonstrates how to use Ray for distributed training of a simple PyTorch model.
It sets up parallel training tasks where each worker trains a separate instance of the model and then aggregates the trained parameters from all workers. This approach is useful for speeding up training by distributing the computation across multiple processes or machines. Here is a brief description of what this code can help you accomplish.
Trains four independent instances of a simple PyTorch model using Ray’s distributed capabilities, running each model training on a separate Ray worker. Parallel training means that if each worker takes 5 seconds to train, the overall time will be close to 5 seconds rather than 20 seconds.
The trained models’ parameters are averaged to produce a single set of parameters, combining the results of all the workers. This is akin to methods used in federated learning or parameter server models.
This example can scale across multiple machines or GPUs. Since we are using a Ray cluster, you can perform larger-scale machine learning training tasks.
By measuring the time taken for each worker and aggregating the results, users can analyze the performance and efficiency of the distributed training setup.
@ray.remote: Decorates the train_worker function, allowing it to run on a separate Ray worker.
Function Parameters:
- rank: Identifies each worker (useful for logging).
- num_epochs: Specifies the number of epochs for training.
Training Data:
Generates 100 samples with 10 features each for X and corresponding target values y.
DataLoader:
Wraps the data in a DataLoader for batching and shuffling, with a batch size of 16.
Model, Loss, Optimizer:
Initializes a SimpleModel, MSE Loss (mean squared error), and SGD optimizer.
Training Loop:
- Iterates over the data for num_epochs.
- For each batch, computes the model's predictions, calculates the loss, backpropagates the error, and updates the model parameters.
- Tracks and prints the average loss per epoch.
Returns the Trained Model Parameters:
Uses model.state_dict() to return the trained parameters (weights and biases) of the model.
Averages the Model Parameters across all the workers to create a consensus model. Iterates over each parameter (e.g., weights) and sums the corresponding parameter values from all workers Divides the summed values by the number of models (workers) to compute the average.
Note that this approach is useful in distributed training or federated learning, where each worker trains on a subset of data, and their models are combined to form a global model.
if__name__=="__main__":# Number of workersnum_workers=4# Launch distributed training on multiple workersfutures=[train_worker.remote(rank=i,num_epochs=5)foriinrange(num_workers)]# Collect the trained model parameters from each workerresults=ray.get(futures)# Aggregate the model parameters from all workers (averaging)avg_model_params=average_model_params(results)print("Aggregated model parameters from all workers!")# Shutdown Rayray.shutdown()
Number of Workers: Defines the number of parallel workers as 4.
Submit Remote Training Tasks:
Creates a list of futures (task references) by calling train_worker.remote() for each worker with different rank values. Each worker trains a model independently.
Retrieve Results:
- Uses ray.get(futures) to wait for the completion of all tasks and retrieve their results.
- results is a list of model parameter dictionaries (one from each worker).
Aggregate Parameters:
Calls average_model_params to average the parameters across all trained models.
- Output: Prints a message indicating that the aggregation is complete.
- Shutdown Ray: Shuts down Ray, releasing any resources used during the training.
Download the source code file "run.py" and open it in your favorite IDE such as VS Code to review it. As you can see from the code snippet below, we will be using Ray's Job Submission Client to submit a job to the remote Ray endpoint.
fromray.job_submissionimportJobSubmissionClientimportrayimporturllib3# Suppress the warning about unverified HTTPS requestsurllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)# Ray clientclient=JobSubmissionClient("https://<URL> for Ray Endpoint>",headers={"Authorization":"Basic <Base64 Encoded Credentials>"},verify=False# Disable SSL verification)# Submit jobclient.submit_job(entrypoint="python pytorch_test.py",runtime_env={"working_dir":"./"})
Now, update the authorization credentials with the base64 encoded credentials for your Ray endpoint. You can use the following command to perform the encoding.
In order to submit the job to your remote Ray endpoint,
First, in your web browser, access the Ray Dashboard's URL and keep it open. We will monitor the status and progress of the submitted job here.
Now, open Terminal and enter the following command
python3./run.py
This will submit the job to the configured Ray endpoint and you can review progress and the results on the Ray Dashboard. Once the Ray endpoint receives the job, it will be pending for a few seconds.
Shown below is an example of the expected output from a typical run. For each worker, the code will print the average loss for each epoch: