程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

Python實現飛翔的小鳥

編輯:Python

我手動最多打出10分,模型能打30多……

問題分析
時間不連續,最小單位為“幀”
狀態status是連續的浮點數值
動作action只有2種,即“升”和“不升”,無論采取什麼動作,都作用於玩家加速度而非直接改變位置
除了剛剛通過門時采取的動作外,動作的結果全是必然沒有隨機性
與玩家決策相關的量很多比如門(腔縫)的高度和寬度,飛機本身尺寸等等,具體要什麼不要什麼:

玩家左側與門右側水平距離占當前兩門間距的比例
玩家中點與門中點垂直距離占總高度的比例
玩家y向速度與“最大速度”的比例
玩家中點與屏幕水平中線的距離占屏幕高度的一半的比例
我們最後用這3個量作為模型入參,所以模型輸入3通道但輸出只有2通道。只要玩家存活就得到正獎勵。

環境搭建
安裝CUDA
CUDA版本:nvcc --version
nvcc: NVIDIA Cuda compiler driver
Copyright 2005-2021 NVIDIA Corporation
Built on Sun_Feb_14_21:12:58_PST_2021
Cuda compilation tools, release 11.2, V11.2.152
Build cuda_11.2.r11.2/compiler.29618528_0
1
2
3
4
5
Python版本:python --version
Python 3.9.12
1
系統版本:cat /proc/version
Linux version 5.4.0-109-generic ([email protected]) (gcc version 9.4.0 (Ubuntu 9.4.0-1ubuntu1~20.04.1)) #123-Ubuntu SMP Fri Apr 8 09:10:54 UTC 2022
1
安裝依賴
pip install pygame autopep8 numpy
pip install torch torchvision torchaudio --extra-index-url https://download.pytorch.org/whl/cu113
1
2
目錄結構
assets
textures
door.gif
player_age0.gif
player_age1.gif
main.py
game.py
util.py
代碼
main.py

-- coding: utf-8 --

“”“訓練和預測。
“””
import random
import sys
from collections import OrderedDict

import numpy as np
import pygame
import torch
from torch import nn, optim

from game import Game
from util import print_bar

class Model(nn.Module):
“”“Dueling DQN結構。
“””

def __init__(self):
super(Model, self).__init__()
self.layers = nn.ModuleDict({
'c': nn.Sequential(nn.Linear(3, 12, device=CUDA), nn.Sigmoid()),
'a': nn.Linear(12, 2, device=CUDA),
'v': nn.Linear(12, 1, device=CUDA),
'o': nn.ReLU(),
})
def forward(self, arg: torch.Tensor) -> torch.Tensor:
"""模型前向傳播。
Parameters
----------
x : torch.Tensor
樣本輸入模型
Returns
-------
torch.Tensor
預測值。
"""
output = arg
output = self.layers['c'](output)
adv = self.layers['a'](output)
val = self.layers['v'](output)
output = self.layers['o'](adv+val)
return output
def load_params(self, model: 'Model', rate: float = 1):
"""模型參數軟更新。
Parameters
----------
model : Model
將這個模型的參數復制到當前模型
rate : float, optional
`1`表示將模型參數完全復制到當前模型, by default 1
"""
for key, value in self.layers.items():
if rate >= 1.:
forign = model.layers[key].state_dict()
value.load_state_dict(forign)
else:
local = value.state_dict()
forign = model.layers[key].state_dict()
mix = OrderedDict()
for key in local.keys():
mix[key] = local.get(key)*(1-rate) + forign.get(key)*rate
value.load_state_dict(mix)

def simulate(model: Model, batch_size: int, epslion: float = .1, eval_step: int = None, env_args: dict = None) -> ‘tuple[list,float,int]’:
“”"模擬游戲過程並收集數據。

Parameters
----------
model : Model
決策用
batch_size : int
收集數據總條數
epslion : float, optional
嘗試比例, by default .1
eval_step : int, optional
模型將控制游戲的最大步數,參與模型評估, by default `batch_size`
env_args : dict, optional
環境初始化參數, by default None
Returns
-------
tuple[list,float,int]
采集的數據, 平均存活時長, 無探索情況下生存時間
"""
cache = []
env = Game(**env_args, without_screen=True)
livetimes = []
livetime = 0
for _ in range(batch_size):
state = env.shot()
if random.random() <= epslion:
action_index = random.randint(0, len(ACTIONS)-1)
else:
values = model(torch.tensor(state, device=CUDA))
action_index = values.argmax(-1)
jump = ACTIONS[action_index]
env.step(jump)
next_state = env.shot()
reward = float(env.playing)
cache.append((state, action_index, next_state, reward))
if not env.playing:
env = Game(**env_args, without_screen=True)
livetimes.append(livetime)
else:
livetime += 1
env = Game(**env_args, without_screen=True)
max_step = eval_step or batch_size
livetime = 0
for _ in range(max_step): # 看模型在不進行隨機探索條件下能維持多少幀不摔機,這是評估標准
state = env.shot()
values = model(torch.tensor(state, device=CUDA))
action_index = values.argmax(-1)
jump = ACTIONS[action_index]
env.step(jump)
if not env.playing:
break
livetime += 1
return cache, sum(livetimes)/max(1, len(livetimes))/batch_size, livetime

def train(policy_net: Model, opt: optim.Optimizer, loss_func: ‘nn._Loss’, epochs: int, batch_size: int, cache_size: int, epslion: float = .1, gamma: float = .5, update_ratio: float = .5, eval_step: int = None, target_accuracy=.99, env_args: dict = None) -> ‘tuple[Model,list[float],list[float],list[int]]’:
“”"訓練模型。

Parameters
----------
policy_net : Model
決策網絡對象
opt : optim.Optimizer
優化器
loss_func : nn._Loss
損失函數
epochs : int
迭代輪數
batch_size : int
批量
epslion : float, optional
探索動作比例, by default .1
gamma : float, optional
未來獎勵權重,`0`表示僅考慮當前獎勵, by default .5
update_ratio : float, optional
軟更新比例, by default .5
target_accuracy : float, optional
模型決策目標得分, by default .99
env_args : dict, optional
環境初始化參數, by default None
Returns
-------
tuple[Model,list[float],list[float],list[int]]
目標網絡, 損失, 存活時間
"""
target_net = Model()
target_net.load_params(policy_net)
policy_net.train(mode=True)
target_net.train(mode=False)
loss_vals, accuracies, livetimes, cache = [], [], [], []
for epoch in range(epochs):
target_net.load_params(policy_net, update_ratio)
# 獲取數據
batch, accuracy, livetime = simulate(model=target_net, batch_size=batch_size, epslion=epslion, eval_step=eval_step, env_args=env_args)
accuracies.append(accuracy)
livetimes.append(livetime)
if livetime/(eval_step or batch_size) >= target_accuracy:
# 模型的決策已經達標不需要再訓練了
break
# 裝入經驗池
cache.extend(batch)
cache = cache[-cache_size:]
# 經驗池抽樣並轉換成tensor
states, actions, nexts, rewards = [], [], [], []
for state, action, next_state, reward in random.sample(cache, batch_size):
states.append(state)
actions.append(action)
rewards.append(reward)
nexts.append(next_state)
states = torch.tensor(states, device=CUDA)
actions = torch.tensor(actions, device=CUDA).unsqueeze(-1)
rewards = torch.tensor(rewards, device=CUDA)
nexts = torch.tensor(nexts, device=CUDA)
# 計算輸出與損失,批量梯度下降
v_target = target_net.forward(nexts).detach()
y_target = v_target.max(dim=-1).values * gamma
y_target += rewards * (1-gamma)
v_eval = policy_net.forward(states)
y_eval = v_eval.gather(index=actions, dim=-1)
loss = loss_func(y_eval, y_target)
opt.zero_grad()
loss.backward()
opt.step()
loss = loss.item()
loss_vals.append(loss)
print_bar(epoch+1, epochs, ("%.10f" % loss, '%.10f' % accuracy, livetime))
return target_net, loss_vals, accuracies, livetimes

np.set_printoptions(suppress=True)
CUDA = torch.device(“cuda”)
MODEL = Model()
OPT = optim.Adam(MODEL.parameters(), lr=.01)
LOSS_FUNCTION = nn.MSELoss()
ACTIONS = (True, False)
SCREEN_SIZE = (800, 600)
FPS = 20
GAME_CONFIG = {
‘screen_size’: SCREEN_SIZE,
‘door_size’: (80, 180),
‘speed’: 10,
‘jump_force’: 3,
‘g’: 2,
‘door_distance’: 60,
}
if name == “main”:
pygame.init() # 初始化
model, loss_vals, accuracies, livetimes = train(
policy_net=MODEL,
opt=OPT,
loss_func=LOSS_FUNCTION,
epochs=20000,
batch_size=192,
cache_size=2000,
epslion=.3,
gamma=.9,
update_ratio=.1,
target_accuracy=.95,
env_args=GAME_CONFIG,
eval_step=1200,
)

# 使用模型決策並觀看結果
print('\n\n')
model = model.to('cpu')
model.train(mode=False)
SCREEN = pygame.display.set_mode(SCREEN_SIZE)
fcclock = pygame.time.Clock()
game = Game(**GAME_CONFIG)
while True:
# 循環,直到接收到窗口關閉事件
for event in pygame.event.get():
# 處理事件
if event.type == pygame.QUIT:
# 接收到窗口關閉事件
pygame.quit()
sys.exit()
keys = pygame.key.get_pressed()
if keys[pygame.K_ESCAPE]:
pygame.quit()
sys.exit()
else:
state = torch.tensor(game.shot())
values = model.forward(state)
action_index = values.argmax(-1)
jump = ACTIONS[action_index]
game.step(jump)
pygame.display.set_caption(f'SCORE: {game.score}')
game.draw(SCREEN)
fcclock.tick(FPS)
pygame.display.update()
if not game.playing:
# 自動開局
game = Game(**GAME_CONFIG)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
game.py

-- coding: utf-8 --

“”“游戲環境相關。
“””
import random
import sys
import pygame

class Box:
“”“包含基礎位置、尺寸、速度、加速度的盒子類。
“””
__position = None
__size = None
__speed = None
__acceleration = None

def __init__(self, cx: int, cy: int, w: int, h: int, sx: int = 0, sy: int = 0, ax: int = 0, ay: int = 0):
self.__position = [cx, cy]
self.__size = [w, h]
self.__speed = [sx or 0, sy or 0]
self.__acceleration = [ax or 0, ay or 0]
@property
def width(self):
return self.__size[0]
@property
def height(self):
return self.__size[-1]
@property
def size(self):
return self.__size
@property
def x(self):
return self.__position[0]
@property
def y(self):
return self.__position[-1]
@property
def position(self):
return self.__position
@property
def speed_x(self):
return self.__speed[0]
@speed_x.setter
def speed_x(self, v):
self.__speed[0] = v
@property
def speed_y(self):
return self.__speed[-1]
@speed_y.setter
def speed_y(self, v):
self.__speed[-1] = v
@property
def speed(self):
return self.__speed
@speed.setter
def speed(self, v: 'tuple[int,int]'):
self.__speed[0] = v[0]
self.__speed[-1] = v[-1]
@property
def acceleration_x(self):
return self.__acceleration[0]
@acceleration_x.setter
def acceleration_x(self, v: int):
self.__acceleration[0] = v
@property
def acceleration_y(self):
return self.__acceleration[-1]
@acceleration_y.setter
def acceleration_y(self, v: int):
self.__acceleration[-1] = v
@property
def acceleration(self):
return self.__acceleration
@acceleration.setter
def acceleration(self, v: 'tuple[int,int]'):
self.__acceleration[0] = v[0]
self.__acceleration[-1] = v[-1]
@property
def left(self):
return self.x-self.width/2
@property
def right(self):
return self.x+self.width/2
@property
def top(self):
return self.y-self.height/2
@property
def bottom(self):
return self.y+self.height/2
def move(self, force_x: int = None, force_y: int = None):
"""為盒子施力使其移動。
Parameters
----------
force_x : int, optional
水平分量, by default None
force_y : int, optional
垂直分量, by default None
"""
self.acceleration_x = force_x or 0
self.acceleration_y = force_y or 0
self.speed_x += self.acceleration_x
self.speed_y += self.acceleration_y
self.__position[0] += self.speed_x
self.__position[-1] += self.speed_y

def is_intersect(player: Box, door: Box) -> bool:
return (door.top > player.top or player.bottom > door.bottom)
and not (player.left >= door.right or door.left >= player.right)

class GameObject(Box):
“”“游戲基礎對象。
“””

def __init__(self, imgs: list, img_cd: int = 1, *args, **kwargs):
super(GameObject, self).__init__(*args, **kwargs)
self.__imgs = [item for item in imgs]
self.__img_cd = img_cd or -1
self.living = True
self.img_index = -1
def img_grow(self):
self.img_index = (self.img_index+1) % self.__img_cd
@property
def img(self):
return self.__imgs[self.img_index]

class Game:
door_size = None
player = None
jump_force = 0
g = 1
door_distance = 0
doors = None
time = 1
score = 0

def __init__(self, screen_size=(800, 600), player_size=(160, 80), door_size=(80, 160), speed=5, jump_force=1.3, g=0.4, door_distance=100, max_falling_speed: int = 100, without_screen=False, **_):
self.player = GameObject(
cx=screen_size[0]/4,
cy=screen_size[1]/2,
w=player_size[0],
h=player_size[1],
sx=0, sy=0,
ax=0, ay=g,
imgs=[None, ] if without_screen else[
pygame.image.load('./assets/textures/player_age0.gif').convert_alpha(),
pygame.image.load('./assets/textures/player_age1.gif').convert_alpha(),
],
img_cd=2
)
self.without_screen = without_screen
self.screen_size = screen_size
self.door_size = door_size
self.speed = speed
self.jump_force = jump_force
self.g = g
self.door_distance = door_distance
self.max_falling_speed = max_falling_speed
self.doors = [self.create_door()]
@property
def playing(self) -> bool:
"""描述玩家是否存活。
"""
return self.player.living
@property
def door(self) -> 'GameObject|None':
"""距離玩家最近的且玩家未穿過的門。
"""
for door in self.doors:
if door.right >= self.player.left:
return door
return None
def create_door(self) -> GameObject:
"""隨機初始化門。
Returns
-------
GameObject
屏幕右側隨機位置的門。
"""
door = GameObject(
cx=self.screen_size[0]+self.door_size[0]/2,
cy=random.randint(self.door_size[1]/2, self.screen_size[1]-self.door_size[1]/2),
w=self.door_size[0],
h=self.door_size[1],
sx=-self.speed,
imgs=[None, ] if self.without_screen else [pygame.image.load('./assets/textures/door.gif').convert_alpha(),],
img_cd=2
)
return door
def draw(self, surface: 'pygame.Surface'):
"""繪制游戲幀。
Parameters
----------
surface : pygame.Surface
pygame屏幕
"""
if not self.player.living:
return
surface.fill([86, 92, 66])
self.player.img_grow()
surface.blit(pygame.transform.scale(self.player.img, (self.player.width, self.player.height)), (self.player.left, self.player.top))
for door in self.doors:
surface.blit(pygame.transform.scale(door.img, (door.width, door.top)), (door.left, 0))
surface.blit(pygame.transform.scale(door.img, (door.width, self.screen_size[1]-door.bottom)), (door.left, door.bottom))
@staticmethod
def __shot(door: Box, player: Box, screen_size: 'tuple[int,int]', speed_scale: int) -> 'list[float]':
return [(door.right-player.left)/screen_size[0], (player.y-door.y)/screen_size[-1], player.speed_y/speed_scale, ]
def shot(self) -> 'list[float]':
"""組裝並返回當前游戲環境狀態。
Returns
-------
list[float]
模型所需的多元組。
"""
return Game.__shot( self.door, self.player, [self.door_distance*self.speed, self.screen_size[-1]], self.max_falling_speed, )
def step(self, jump: 'bool|int|float' = False):
"""游戲步進。
Parameters
----------
jump : bool, optional
玩家是否跳躍, by default False
"""
# 玩家必須存活才能繼續游戲
if not self.player.living:
return
if self.time % self.door_distance == 0 or not (self.doors and len(self.doors)):
# 時間間隔生成門,時間重置
self.doors.append(self.create_door())
self.time = 1
else:
# 時間正常遞增直到時間間隔
self.time += 1
# 清除已經移除屏幕的門
while self.doors[0].right < 0:
del self.doors[0]
# 移動玩家和所有門
for door in self.doors:
door.move()
door = self.door
living = 0 < self.player.y < self.screen_size[1] and not is_intersect(self.player, door)
self.player.move(None, -self.jump_force if jump else self.g)
if jump:
self.player.speed_y = min(0, self.player.speed_y)
self.player.living = living
# 判斷玩家和門存活
if door.living and self.player.left >= door.right:
door.living = False
self.score += 1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
util.py

-- coding: utf-8 --

“”“輸出打印工具模塊。
“””

def print_bar(epoch, epochs, etc=None, bar_size=50):
“”"打印進度條。

Parameters
----------
epoch : int
當前進度
epochs : int
總進度
etc : Any, optional
打印後綴, by default None
bar_size : int, optional
進度條長度, by default 50
"""
process = bar_size*epoch/epochs
process = int(process+(int(process) < process))
strs = [
f"Epoch {epoch}/{epochs}",
f" |\033[1;30;47m{' ' * process}\033[0m{' ' * (bar_size-process)}| ",
]
if etc is not None:
strs.append(str(etc))
if epoch:
strs.insert(0, "\033[A")
print("".join(strs)+" ")

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
door.gif

player_age0.gif

player_age1.gif


  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved