--- library_name: stable-baselines3 tags: - BipedalWalker-v3 - deep-reinforcement-learning - reinforcement-learning - stable-baselines3 - Bipedal - OpenAI model-index: - name: PPO results: - task: type: reinforcement-learning name: reinforcement-learning dataset: name: BipedalWalker-v3 type: BipedalWalker-v3 metrics: - type: mean_reward value: '-58.54 +/- 39.24' name: mean_reward verified: false --- # **PPO** Agent playing **BipedalWalker-v3** This is a trained model of a **PPO** agent playing **BipedalWalker-v3** using the [stable-baselines3 library](https://github.com/DLR-RM/stable-baselines3). ## Usage (with Stable-baselines3) TODO: Add your code ```python from stable_baselines3 import ... from huggingface_sb3 import load_from_hub # **1. Setup** ### **Install Packages** """ # Install necessary packages !apt install swig cmake ffmpeg xvfb python3-opengl !pip install stable-baselines3==2.0.0a5 gymnasium[box2d] huggingface_sb3 pyvirtualdisplay imageio[ffmpeg] """The Next Cell will force the notebook runtime to restart. This is to ensure all the new libraries installed will be used.""" import os os.kill(os.getpid(), 9) """### **Start Virtual Display**""" from pyvirtualdisplay import Display virtual_display = Display(visible=0, size=(1400, 900)) virtual_display.start() """### **Setup Environment**""" import gymnasium as gym env = gym.make("BipedalWalker-v3", hardcore=True) env.reset() """### **Observation Space** Observation Space Shape (24,) vector of size 24, where each value contains different information about the walker: - **Hull Angle Speed**: The speed at which the main body of the walker is rotating. - **Angular Velocity**: The rate of change of the angular position of the walker. - **Horizontal Speed**: The speed at which the walker is moving horizontally. - **Vertical Speed**: The speed at which the walker is moving vertically. - **Position of Joints**: The positions (angles) of the walker's joints. Given that the walker has 4 joints, this take up 4 values. - **Joints Angular Speed**: The rate of change of the angular position for each joint. Again, this would be 4 values for the 4 joints. - **Legs Contact with Ground**: Indicating whether each leg is in contact with the ground. Given two legs, this contains 2 values. - **10 Lidar Rangefinder Measurements**: These are distance measurements to detect obstacles or terrain features around the walker. There are 10 of these values. """ print("_____OBSERVATION SPACE_____ \n") print("Observation Space Shape", env.observation_space.shape) print("Sample observation", env.observation_space.sample()) # Get a random observation """### **Action Space** Actions are motor speed values in the [-1, 1] range for each of the 4 joints at both hips and knees. """ print("\n _____ACTION SPACE_____ \n") print("Action Space Shape", env.action_space.shape) print("Action Space Sample", env.action_space.sample()) # Take a random action """### **Vectorized Environment** Create a vectorized environment (a method for stacking multiple independent environments into a single environment) of 16 environments to have more diverse experiences. """ from stable_baselines3.common.env_util import make_vec_env env = make_vec_env('BipedalWalker-v3', n_envs=16) """# **2. Building the Model**""" from stable_baselines3 import PPO model = PPO( policy = 'MlpPolicy', env = env, n_steps = 2048, batch_size = 128, n_epochs = 6, gamma = 0.999, gae_lambda = 0.98, ent_coef = 0.01, verbose=1) """# 3.**Video Generation**""" from wasabi import Printer import numpy as np from stable_baselines3.common.base_class import BaseAlgorithm from pathlib import Path import tempfile from stable_baselines3.common.monitor import Monitor from stable_baselines3.common.vec_env import ( DummyVecEnv, VecEnv, VecVideoRecorder, ) msg = Printer() def generate_replay( model: BaseAlgorithm, eval_env: VecEnv, video_length: int, is_deterministic: bool, local_path: Path, ): """ Generate a replay video of the agent :param model: trained model :param eval_env: environment used to evaluate the agent :param video_length: length of the video (in timesteps) :param is_deterministic: use deterministic or stochastic actions :param local_path: path of the local repository """ # This is another temporary directory for video outputs # SB3 created a -step-0-to-... meta files as well as other # artifacts which we don't want in the repo. with tempfile.TemporaryDirectory() as tmpdirname: # Step 1: Create the VecVideoRecorder env = VecVideoRecorder( eval_env, tmpdirname, record_video_trigger=lambda x: x == 0, video_length=video_length, name_prefix="", ) obs = env.reset() lstm_states = None episode_starts = np.ones((env.num_envs,), dtype=bool) try: for _ in range(video_length): action, lstm_states = model.predict( obs, state=lstm_states, episode_start=episode_starts, deterministic=is_deterministic, ) obs, _, episode_starts, _ = env.step(action) # Save the video env.close() # Convert the video with x264 codec inp = env.video_recorder.path out = local_path os.system(f"ffmpeg -y -i {inp} -vcodec h264 {out}".format(inp, out)) print(f"Video saved to: {out}") except KeyboardInterrupt: pass except Exception as e: msg.fail(str(e)) # Add a message for video msg.fail( "We are unable to generate a replay of your agent" ) """# **4. Training, Saving and Record the Videos**""" import os #create a directory to save the videos video_dir = "/content/videos" if not os.path.exists(video_dir): os.makedirs(video_dir) env_id = "BipedalWalker-v3" # Train and generate video at every 100000 steps, adjust the timesteps to your liking for i in range(0, 2000000, 100000): model.learn(total_timesteps=100000) # Save the model model_name = "ppo-BipedalWalker-v3" model.save(model_name) video_name = f"replay_{i + 100000}.mp4" generate_replay( model=model, eval_env=DummyVecEnv([lambda: Monitor(gym.make(env_id, hardcore=True, render_mode="rgb_array"))]), video_length=1000, is_deterministic=True, local_path=os.path.join(video_dir, video_name) ) model_name = "ppo-BipedalWalker-v3" model.save(model_name) with open(os.path.join(video_dir, "filelist.txt"), "w") as f: for i in range(0, 2000000, 100000): video_name = f"replay_{i + 100000}.mp4" f.write(f"file '{os.path.join(video_dir, video_name)}'\n") # Concatenate all the videos into one os.system(f"ffmpeg -f concat -safe 0 -i {os.path.join(video_dir, 'filelist.txt')} -c copy {os.path.join(video_dir, 'replay_all.mp4')}") """# **5. Visualize Final Video**""" from IPython.display import HTML from base64 import b64encode mp4 = open('videos/replay_all.mp4','rb').read() data_url = "data:video/mp4;base64," + b64encode(mp4).decode() HTML(""" """ % data_url) """# **6. Evaluate the Model**""" from stable_baselines3.common.evaluation import evaluate_policy eval_env = Monitor(gym.make("BipedalWalker-v3")) mean_reward, std_reward = evaluate_policy(model, eval_env, n_eval_episodes=10, deterministic=True) print(f"mean_reward={mean_reward:.2f} +/- {std_reward}") """# **7. Upload to HuggingFace**""" from huggingface_sb3 import load_from_hub, package_to_hub from huggingface_hub import notebook_login # To log to our Hugging Face account to be able to upload models to the Hub. notebook_login() !git config --global credential.helper store env_id = "BipedalWalker-v3" model_name = "ppo-BipedalWalker-v3" model_architecture = "PPO" repo_id = "Mahanthesh0r/BipedalWalker-RL" # Change with your repo id ## Define the commit message commit_message = "Upload PPO BipedalWalker-v3 trained agent" # Create the evaluation env and set the render_mode="rgb_array" eval_env = DummyVecEnv([lambda: gym.make(env_id, hardcore=True, render_mode="rgb_array")]) package_to_hub(model=model, # trained model model_name=model_name, # The name of our trained model model_architecture=model_architecture, # The model architecture we used: in our case PPO env_id=env_id, # Name of the environment eval_env=eval_env, repo_id=repo_id, commit_message=commit_message) """# **8. Load Models from HuggingFace (Optional)**""" from huggingface_sb3 import load_from_hub repo_id = "Mahanthesh0r/BipedalWalker-RL" # The repo_id filename = "ppo-BipedalWalker-v3.zip" # The model filename.zip checkpoint = load_from_hub(repo_id, filename) model = PPO.load(checkpoint, print_system_info=True) eval_env = Monitor(gym.make("BipedalWalker-v3", hardcore=True)) mean_reward, std_reward = evaluate_policy(model, eval_env, n_eval_episodes=10, deterministic=True) print(f"mean_reward={mean_reward:.2f} +/- {std_reward}") ... ```