日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

使用 Carla 和 Python 的自动驾驶汽车第 4 部分 —— 强化学习代理

發布時間:2024/5/17 python 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 使用 Carla 和 Python 的自动驾驶汽车第 4 部分 —— 强化学习代理 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

在我們的自動駕駛汽車的第四部分,Carla, Python, TensorFlow,和強化學習項目,我們將為我們的實際代理編碼。在前一篇教程中,我們研究了環境類,我們的代理將與之交互。

在考慮如何創建代理時,我們還必須考慮模型本身。假設你已經完成了強化學習教程(你最好這樣做,否則事情會讓你感到非常困惑),你知道代理所采取的每一步都伴隨著一個合理的預測(取決于探索/epsilon),而且還伴隨著一個調整!這意味著我們要同時進行訓練和預測,但我們的代理必須獲得盡可能高的FPS(幀/秒)。

為了實現這一點,我們可以使用多處理或線程。線程處理允許我們保持事情相當簡單。以后,我可能至少會在某個時候為這個任務開放一些多處理代碼,但現在,它是線程化的。

import glob import os import sys import random import time import numpy as np import cv2 import mathtry:sys.path.append(glob.glob('../carla/dist/carla-*%d.%d-%s.egg' % (sys.version_info.major,sys.version_info.minor,'win-amd64' if os.name == 'nt' else 'linux-x86_64'))[0]) except IndexError:pass import carlaSHOW_PREVIEW = False IM_WIDTH = 640 IM_HEIGHT = 480 SECONDS_PER_EPISODE = 10class CarEnv:SHOW_CAM = SHOW_PREVIEWSTEER_AMT = 1.0im_width = IM_WIDTHim_height = IM_HEIGHTfront_camera = Nonedef __init__(self):self.client = carla.Client("localhost", 2000)self.client.set_timeout(2.0)self.world = self.client.get_world()self.blueprint_library = self.world.get_blueprint_library()self.model_3 = self.blueprint_library.filter("model3")[0]def reset(self):self.collision_hist = []self.actor_list = []self.transform = random.choice(self.world.get_map().get_spawn_points())self.vehicle = self.world.spawn_actor(self.model_3, self.transform)self.actor_list.append(self.vehicle)self.rgb_cam = self.blueprint_library.find('sensor.camera.rgb')self.rgb_cam.set_attribute("image_size_x", f"{self.im_width}")self.rgb_cam.set_attribute("image_size_y", f"{self.im_height}")self.rgb_cam.set_attribute("fov", f"110")transform = carla.Transform(carla.Location(x=2.5, z=0.7))self.sensor = self.world.spawn_actor(self.rgb_cam, transform, attach_to=self.vehicle)self.actor_list.append(self.sensor)self.sensor.listen(lambda data: self.process_img(data))self.vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=0.0))time.sleep(4)colsensor = self.blueprint_library.find("sensor.other.collision")self.colsensor = self.world.spawn_actor(colsensor, transform, attach_to=self.vehicle)self.actor_list.append(self.colsensor)self.colsensor.listen(lambda event: self.collision_data(event))while self.front_camera is None:time.sleep(0.01)self.episode_start = time.time()self.vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=0.0))return self.front_cameradef collision_data(self, event):self.collision_hist.append(event)def process_img(self, image):i = np.array(image.raw_data)#print(i.shape)i2 = i.reshape((self.im_height, self.im_width, 4))i3 = i2[:, :, :3]if self.SHOW_CAM:cv2.imshow("", i3)cv2.waitKey(1)self.front_camera = i3def step(self, action):if action == 0:self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=-1*self.STEER_AMT))elif action == 1:self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer= 0))elif action == 2:self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=1*self.STEER_AMT))v = self.vehicle.get_velocity()kmh = int(3.6 * math.sqrt(v.x**2 + v.y**2 + v.z**2))if len(self.collision_hist) != 0:done = Truereward = -200elif kmh < 50:done = Falsereward = -1else:done = Falsereward = 1if self.episode_start + SECONDS_PER_EPISODE < time.time():done = Truereturn self.front_camera, reward, done, None

現在,我們將創建一個新類,DQNAgent類。我們將從:

class DQNAgent:def __init__(self):self.model = self.create_model()self.target_model = self.create_model()self.target_model.set_weights(self.model.get_weights())

這和強化學習教程中的概念是一樣的,我們有一個不斷進化的主網絡,然后是目標網絡,我們更新每n個東西,其中n是你想要的東西,比如步驟或情節。

當我們訓練時,我們從回放記憶中隨機選擇的數據中進行訓練:

self.replay_memory = deque(maxlen=REPLAY_MEMORY_SIZE)

出于與之前(RL教程)相同的原因,我們將修改TensorBoard:

self.tensorboard = ModifiedTensorBoard(log_dir=f"logs/{MODEL_NAME}-{int(time.time())}")

現在,我們可以通過設置一些最終值來包裝init方法:

self.target_update_counter = 0 # will track when it's time to update the target modelself.graph = tf.get_default_graph()self.terminate = False # Should we quit?self.last_logged_episode = 0self.training_initialized = False # waiting for TF to get rolling

我們要用self.training_initialized用于跟蹤TensorFlow何時準備就緒。當一個模型開始的時候,第一次的預測/調整會花費額外的時間,所以我們會先傳遞一些無意義的信息,讓我們的模型能夠真正開始運行。

現在讓我們創建我們的模型:

def create_model(self):base_model = Xception(weights=None, include_top=False, input_shape=(IM_HEIGHT, IM_WIDTH,3))x = base_model.outputx = GlobalAveragePooling2D()(x)predictions = Dense(3, activation="linear")(x)model = Model(inputs=base_model.input, outputs=predictions)model.compile(loss="mse", optimizer=Adam(lr=0.001), metrics=["accuracy"])return model

這里,我們將使用預先制作好的Xception模型,但是您也可以制作其他模型,或者導入一個不同的模型。請注意,我們將GlobalAveragePooling添加到我們的輸出層,同時顯然還添加了3個神經元輸出,這是代理要采取的每個可能的動作。讓我們為這個方法做一些必需的導入:

from keras.applications.xception import Xception from keras.layers import Dense, GlobalAveragePooling2D from keras.optimizers import Adam from keras.models import Model

在我們的DQNAgent中,我們需要一個快速的方法來更新重放內存:

def update_replay_memory(self, transition):# transition = (current_state, action, reward, new_state, done)self.replay_memory.append(transition)

只要我們的方法這么簡單,我們可能甚至不需要一個方法來做這個,但我先把它留到現在。

現在來談談訓練方法。首先,我們只想在回放記憶中樣本最少的情況下進行訓練:

def train(self):if len(self.replay_memory) < MIN_REPLAY_MEMORY_SIZE:return

因為我們有很多像這樣的常數,讓我們繼續,把它們一次性塞進去,以免我忘記:

REPLAY_MEMORY_SIZE = 5_000 MIN_REPLAY_MEMORY_SIZE = 1_000 MINIBATCH_SIZE = 16 PREDICTION_BATCH_SIZE = 1 TRAINING_BATCH_SIZE = MINIBATCH_SIZE // 4 UPDATE_TARGET_EVERY = 5 MODEL_NAME = "Xception"MEMORY_FRACTION = 0.8 MIN_REWARD = -200EPISODES = 100DISCOUNT = 0.99 epsilon = 1 EPSILON_DECAY = 0.95 ## 0.9975 99975 MIN_EPSILON = 0.001AGGREGATE_STATS_EVERY = 10

把它們放在腳本的頂部,顯然,在所有類/函數/方法等之外。如果您在任何時候對內容的走向感到困惑,您可以查看本教程的結尾,查看到此為止的代碼。

回到我們的訓練方法:

def train(self):if len(self.replay_memory) < MIN_REPLAY_MEMORY_SIZE:return

如果我們沒有足夠的樣品,我們就會回來,然后結束。如果我們做到了,我們就會開始訓練。首先,我們需要隨機抽取一小批:

minibatch = random.sample(self.replay_memory, MINIBATCH_SIZE)

一旦我們有了我們的小批處理,我們想要獲取我們當前和未來的q值:

current_states = np.array([transition[0] for transition in minibatch])/255with self.graph.as_default():current_qs_list = self.model.predict(current_states, PREDICTION_BATCH_SIZE)new_current_states = np.array([transition[3] for transition in minibatch])/255with self.graph.as_default():future_qs_list = self.target_model.predict(new_current_states, PREDICTION_BATCH_SIZE)

回憶一下過渡是: transition = (current_state, action, reward, new_state, done)

現在,我們創建輸入(X)和輸出(y):

X = []y = []for index, (current_state, action, reward, new_state, done) in enumerate(minibatch):if not done:max_future_q = np.max(future_qs_list[index])new_q = reward + DISCOUNT * max_future_qelse:new_q = rewardcurrent_qs = current_qs_list[index]current_qs[action] = new_qX.append(current_state)y.append(current_qs)

這里沒有什么新奇的東西,幾乎與我們的強化學習教程(DQN)相同。

log_this_step = Falseif self.tensorboard.step > self.last_logged_episode:log_this_step = Trueself.last_log_episode = self.tensorboard.step

我們只嘗試記錄每一集,而不是實際的訓練步驟,所以我們將使用上面的方法來跟蹤。

接下來,我們將配合:

with self.graph.as_default():self.model.fit(np.array(X)/255, np.array(y), batch_size=TRAINING_BATCH_SIZE, verbose=0, shuffle=False, callbacks=[self.tensorboard] if log_this_step else None)

注意,只有當log_this_step為真時,我們才會設置tensorboard回調。如果它是假的,我們仍然適合,我們只是不會登錄到TensorBoard。

接下來,我們想繼續跟蹤日志記錄:

if log_this_step:self.target_update_counter += 1

最后,我們來看看是不是該更新我們的target_model:

if self.target_update_counter > UPDATE_TARGET_EVERY:self.target_model.set_weights(self.model.get_weights())self.target_update_counter = 0

現在我們只需要另外2個方法,我們就完成了代理類。首先,我們需要一個方法來獲得q值(基本上是為了進行預測)

def get_qs(self, state):return self.model.predict(np.array(state).reshape(-1, *state.shape)/255)[0]

最后,我們只需要進行培訓:

def train_in_loop(self):X = np.random.uniform(size=(1, IM_HEIGHT, IM_WIDTH, 3)).astype(np.float32)y = np.random.uniform(size=(1, 3)).astype(np.float32)with self.graph.as_default():self.model.fit(X,y, verbose=False, batch_size=1)self.training_initialized = True

首先,我們使用一些隨機數據,如上面的初始化,然后我們開始無限循環:

while True:if self.terminate:returnself.train()time.sleep(0.01)

這是我們的DQNAgent類。完整的代碼到這一點:

import glob import os import sys import random import time import numpy as np import cv2 import math from collections import deque from keras.applications.xception import Xception from keras.layers import Dense, GlobalAveragePooling2D from keras.optimizers import Adam from keras.models import Modeltry:sys.path.append(glob.glob('../carla/dist/carla-*%d.%d-%s.egg' % (sys.version_info.major,sys.version_info.minor,'win-amd64' if os.name == 'nt' else 'linux-x86_64'))[0]) except IndexError:pass import carlaSHOW_PREVIEW = False IM_WIDTH = 640 IM_HEIGHT = 480 SECONDS_PER_EPISODE = 10 REPLAY_MEMORY_SIZE = 5_000 MIN_REPLAY_MEMORY_SIZE = 1_000 MINIBATCH_SIZE = 16 PREDICTION_BATCH_SIZE = 1 TRAINING_BATCH_SIZE = MINIBATCH_SIZE // 4 UPDATE_TARGET_EVERY = 5 MODEL_NAME = "Xception"MEMORY_FRACTION = 0.8 MIN_REWARD = -200EPISODES = 100DISCOUNT = 0.99 epsilon = 1 EPSILON_DECAY = 0.95 ## 0.9975 99975 MIN_EPSILON = 0.001AGGREGATE_STATS_EVERY = 10class CarEnv:SHOW_CAM = SHOW_PREVIEWSTEER_AMT = 1.0im_width = IM_WIDTHim_height = IM_HEIGHTfront_camera = Nonedef __init__(self):self.client = carla.Client("localhost", 2000)self.client.set_timeout(2.0)self.world = self.client.get_world()self.blueprint_library = self.world.get_blueprint_library()self.model_3 = self.blueprint_library.filter("model3")[0]def reset(self):self.collision_hist = []self.actor_list = []self.transform = random.choice(self.world.get_map().get_spawn_points())self.vehicle = self.world.spawn_actor(self.model_3, self.transform)self.actor_list.append(self.vehicle)self.rgb_cam = self.blueprint_library.find('sensor.camera.rgb')self.rgb_cam.set_attribute("image_size_x", f"{self.im_width}")self.rgb_cam.set_attribute("image_size_y", f"{self.im_height}")self.rgb_cam.set_attribute("fov", f"110")transform = carla.Transform(carla.Location(x=2.5, z=0.7))self.sensor = self.world.spawn_actor(self.rgb_cam, transform, attach_to=self.vehicle)self.actor_list.append(self.sensor)self.sensor.listen(lambda data: self.process_img(data))self.vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=0.0))time.sleep(4)colsensor = self.blueprint_library.find("sensor.other.collision")self.colsensor = self.world.spawn_actor(colsensor, transform, attach_to=self.vehicle)self.actor_list.append(self.colsensor)self.colsensor.listen(lambda event: self.collision_data(event))while self.front_camera is None:time.sleep(0.01)self.episode_start = time.time()self.vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=0.0))return self.front_cameradef collision_data(self, event):self.collision_hist.append(event)def process_img(self, image):i = np.array(image.raw_data)#print(i.shape)i2 = i.reshape((self.im_height, self.im_width, 4))i3 = i2[:, :, :3]if self.SHOW_CAM:cv2.imshow("", i3)cv2.waitKey(1)self.front_camera = i3def step(self, action):if action == 0:self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=-1*self.STEER_AMT))elif action == 1:self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer= 0))elif action == 2:self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=1*self.STEER_AMT))v = self.vehicle.get_velocity()kmh = int(3.6 * math.sqrt(v.x**2 + v.y**2 + v.z**2))if len(self.collision_hist) != 0:done = Truereward = -200elif kmh < 50:done = Falsereward = -1else:done = Falsereward = 1if self.episode_start + SECONDS_PER_EPISODE < time.time():done = Truereturn self.front_camera, reward, done, Noneclass DQNAgent:def __init__(self):self.model = self.create_model()self.target_model = self.create_model()self.target_model.set_weights(self.model.get_weights())self.replay_memory = deque(maxlen=REPLAY_MEMORY_SIZE)self.tensorboard = ModifiedTensorBoard(log_dir=f"logs/{MODEL_NAME}-{int(time.time())}")self.target_update_counter = 0self.graph = tf.get_default_graph()self.terminate = Falseself.last_logged_episode = 0self.training_initialized = Falsedef create_model(self):base_model = Xception(weights=None, include_top=False, input_shape=(IM_HEIGHT, IM_WIDTH,3))x = base_model.outputx = GlobalAveragePooling2D()(x)predictions = Dense(3, activation="linear")(x)model = Model(inputs=base_model.input, outputs=predictions)model.compile(loss="mse", optimizer=Adam(lr=0.001), metrics=["accuracy"])return modeldef update_replay_memory(self, transition):# transition = (current_state, action, reward, new_state, done)self.replay_memory.append(transition)def train(self):if len(self.replay_memory) < MIN_REPLAY_MEMORY_SIZE:returnminibatch = random.sample(self.replay_memory, MINIBATCH_SIZE)current_states = np.array([transition[0] for transition in minibatch])/255with self.graph.as_default():current_qs_list = self.model.predict(current_states, PREDICTION_BATCH_SIZE)new_current_states = np.array([transition[3] for transition in minibatch])/255with self.graph.as_default():future_qs_list = self.target_model.predict(new_current_states, PREDICTION_BATCH_SIZE)X = []y = []for index, (current_state, action, reward, new_state, done) in enumerate(minibatch):if not done:max_future_q = np.max(future_qs_list[index])new_q = reward + DISCOUNT * max_future_qelse:new_q = rewardcurrent_qs = current_qs_list[index]current_qs[action] = new_qX.append(current_state)y.append(current_qs)log_this_step = Falseif self.tensorboard.step > self.last_logged_episode:log_this_step = Trueself.last_log_episode = self.tensorboard.stepwith self.graph.as_default():self.model.fit(np.array(X)/255, np.array(y), batch_size=TRAINING_BATCH_SIZE, verbose=0, shuffle=False, callbacks=[self.tensorboard] if log_this_step else None)if log_this_step:self.target_update_counter += 1if self.target_update_counter > UPDATE_TARGET_EVERY:self.target_model.set_weights(self.model.get_weights())self.target_update_counter = 0def get_qs(self, state):return self.model.predict(np.array(state).reshape(-1, *state.shape)/255)[0]def train_in_loop(self):X = np.random.uniform(size=(1, IM_HEIGHT, IM_WIDTH, 3)).astype(np.float32)y = np.random.uniform(size=(1, 3)).astype(np.float32)with self.graph.as_default():self.model.fit(X,y, verbose=False, batch_size=1)self.training_initialized = Truewhile True:if self.terminate:returnself.train()time.sleep(0.01)

現在,在下一個教程中,我們將對事情進行最后的潤色,并實際運行看看我們得到了什么!

總結

以上是生活随笔為你收集整理的使用 Carla 和 Python 的自动驾驶汽车第 4 部分 —— 强化学习代理的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 国产高清一级片 | 成人黄色在线播放 | 国产粉嫩一区二区三区 | 欧美拍拍 | 欧美成人a | 激情小说在线观看 | 国产精品jizz视频 | 先锋影音中文字幕 | 91精品国自产在线观看 | 人民的名义第二部 | 国产美女喷水视频 | 国产美女自拍视频 | 国产精品成人自拍 | 男人的天堂97 | 性猛交富婆╳xxx乱大交天津 | 九色porny视频 | 禁漫天堂免费网站 | 一个色的综合 | 伦理片av | 怡红院一区二区三区 | 超碰人人在线观看 | 日日拍拍| 伊人88 | 国产日韩欧美视频在线 | 肉嫁高柳在线 | 一区国产在线 | 国产精品12p | 久久伊人免费 | 国产农村妇女精品一区二区 | 色噜噜在线播放 | 激情av小说 | 日本久久久久久久久久久 | 中文字幕一区二区三区波野结 | 欧美激情一区二区在线 | 手机av片 | 成年人免费在线视频 | 美女被出白浆 | 欧美日韩免费观看一区=区三区 | www.色就是色| 黄色av网站免费在线观看 | 精品亚洲中文字幕 | 久久久久久久久久久网站 | 久久伊人影视 | 国模叶桐尿喷337p人体 | 国产精品久久婷婷六月丁香 | 福利视频免费观看 | 欧美亚洲色综久久精品国产 | 中文无码av一区二区三区 | 成人性免费视频 | 亚洲色图另类图片 | 欧美性生交大片免费看app麻豆 | 91中文字幕在线播放 | 97超级碰碰人妻中文字幕 | 一二三区中文字幕 | 美女的隐私免费看 | av有码在线 | 狠狠干狠狠艹 | 中文精品无码中文字幕无码专区 | 日本天堂网在线 | 国产精品一二区在线观看 | 日剧再来一次第十集 | 亚洲国产成人一区二区 | 在线不卡一区 | 欧美日本韩国在线 | 中文字幕综合网 | 国内成人免费视频 | 青青国产在线观看 | 玖玖精品 | 亚洲欧美另类在线 | 老太太av| 少妇喷白浆 | 日产精品久久久久久久蜜臀 | 精品一区李梓熙捆绑 | 午夜综合网 | 精品无码人妻一区二区三区品 | 欧美日韩高清在线 | 日日摸日日碰夜夜爽av | 在线视频亚洲欧美 | 天天爽天天操 | 又粗又猛又爽又黄少妇视频网站 | 99精品网 | 亚洲成人a v | 日韩高清在线播放 | www.久久 | 91视频com| www.久久av | 欧美精品一区视频 | 麻豆精品在线 | 深夜天堂 | 亚洲精品2 | 亚洲一区欧洲二区 | 色综合五月 | 激情亚洲网 | 久久国产免费观看 | 九九热这里有精品视频 | 久久伊人影院 | 亚洲人成免费电影 | 日韩一级理论片 | 原创露脸88av |