我正在编写一个与Lichess交互的应用程序。但是,当我使用API获取游戏状态时,必须使用
for event in client.bots.stream_game_state(game_id):
根据文档提供。实际上,这正是他们在示例中使用的代码。但是,出现以下错误:
HTTPError: 400 Client Error: Bad Request for url: https://lichess.org/api/bot/game/stream/8Kjolz4k
这是我的代码:
# Type "!pip install berserk" in Console to get berserk module
import berserk
import time
import threading
# Initiate a session with my personal API token
token = "***************"
session = berserk.TokenSession(token)
client = berserk.Client(session)
# Stream whats happening and continue when we are challenged
in_game = False
while(not in_game):
time.sleep(0.5)
for event in client.bots.stream_incoming_events():
if event['type'] == 'gameStart':
game_id = event['game']['id']
in_game = True
break
elif event['type'] == 'challenge':
game_id = event['challenge']['id']
client.bots.accept_challenge(game_id)
in_game = True
print("The game has started!")
# Stream the events of the game and respond accordingly
playing = True
while(playing):
for event in client.bots.stream_game_state(game_id):
if event['type'] == 'gameFull':
if client.account.get()['username'] == event['white']['id']:
client.bots.post_message(game_id, "I got first, nerd!")
else:
client.bots.post_message(game_id, "You got first, nerd!")
最佳答案
我不知道为什么您的代码当时失败了。可以这样捕获400响应(abort call的示例):
def abort(self):
try:
self.client.bots.abort_game(self.game_id)
return True
except requests.exceptions.HTTPError as herr:
if self.debug:
print (herr)
return False
请在下面找到一个稍作修改的版本,该版本在Play-Chess-With-A-WebCam(我是提交者)的环境下对我有用。您需要〜/ .pcwawc / config.yaml中的config.yaml文件,并带有令牌:
# Play Chess With a Webcam configuration file
# WF 2019-12-21
token: YOUR_TOKEN_HERE
测试结果为:
gameStart {'type': 'gameStart', 'game': {'id': '20XfiEMn'}}
The game 20XfiEMn has started!
gameFull {'id': '20XfiEMn', 'variant': {'key': 'standard', 'name': 'Standard', 'short': 'Std'}, 'clock': None, 'speed': 'correspondence', 'perf': {'name': 'Correspondence'}, 'rated': False, 'createdAt': datetime.datetime(2019, 12, 23, 19, 54, 20, 470000, tzinfo=datetime.timezone.utc), 'white': {'id': 'seppl2019', 'name': 'seppl2019', 'title': 'BOT', 'rating': 1500, 'provisional': True}, 'black': {'id': 'wolfgangfahl', 'name': 'WolfgangFahl', 'title': None, 'rating': 1662, 'provisional': True}, 'initialFen': 'startpos', 'type': 'gameFull', 'state': {'type': 'gameState', 'moves': 'f2f3 d7d5', 'wtime': 2147483647, 'btime': 2147483647, 'winc': 0, 'binc': 0, 'bdraw': False, 'wdraw': False}}
chatLine {'type': 'chatLine', 'room': 'player', 'username': 'seppl2019', 'text': 'I got first, nerd!'}
test_lichess
'''
Created on 2019-12-21
@author: wf
'''
from pcwawc.lichess import Lichess
def test_stream():
gameid="20XfiEMn"
lichess=Lichess(debug=True)
gameid=lichess.waitForChallenge()
lichess.streamGame(gameid)
test_stream()
lichess.py
'''
Created on 2019-12-21
@author: wf
'''
# https://github.com/rhgrant10/berserk
# https://berserk.readthedocs.io/en/master/
import berserk
import lichess.api
import os
import yaml
import requests
import time
class Account():
"""" Lichess account wrapper """
def __init__(self,adict):
self.adict=adict
self.id=adict["id"]
self.username=adict["username"]
pass
def __str__(self):
text="%s - (%s)" % (self.username,self.id)
return text
class Lichess():
""" Lichess adapter """
def __init__(self,debug=False):
self.debug=debug
token=self.getToken()
if token is not None:
self.session= berserk.TokenSession(token)
self.client=berserk.Client(self.session)
else:
self.client=None
def getAccount(self):
account=Account(self.client.account.get())
return account
def getToken(self):
home=os.getenv("HOME")
#print(home)
configPath=home+"/.pcwawc/config.yaml"
if not os.path.isfile(configPath):
print ("%s is missing please create it" % (configPath))
return None
config=yaml.load(open(configPath),Loader=yaml.FullLoader)
if not "token" in config:
print ("no token found in %s please add it" % (configPath))
return None
return config["token"]
def pgnImport(self,pgn):
payload = {
'pgn': pgn,
'analyse': 'on'
}
res = requests.post('https://lichess.org/import', data=payload)
print(res.url)
pass
def game(self,gameid):
game=lichess.api.game(gameid)
return game
def waitForChallenge(self):
# Stream whats happening and continue when we are challenged
in_game = False
client=self.client
while(not in_game):
time.sleep(0.5)
for event in client.bots.stream_incoming_events():
eventtype=event['type']
if self.debug:
print (eventtype,event)
if eventtype == 'gameStart':
game_id = event['game']['id']
in_game = True
break
elif eventtype == 'challenge':
game_id = event['challenge']['id']
client.bots.accept_challenge(game_id)
in_game = True
if self.debug:
print("The game %s has started!" % (game_id))
return game_id
# Stream the events of the game and respond accordingly
def streamGame(self,game_id):
playing = True
client=self.client
while(playing):
for event in client.bots.stream_game_state(game_id):
eventtype=event['type']
if self.debug:
print (eventtype,event)
if eventtype == 'gameFull':
if client.account.get()['username'] == event['white']['id']:
client.bots.post_message(game_id, "I got first, nerd!")
else:
client.bots.post_message(game_id, "You got first, nerd!")
Requirements.txt的摘录:
# lichess APIs
# https://github.com/rhgrant10/berserk
berserk
# https://github.com/cyanfish/python-lichess
python-lichess
关于python - Lichess API-尝试流式游戏状态时出现HTTPError 400,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/56269705/