问题描述
在我的 python 脚本中,我获取了我最新视频的视频 ID.
In my python script I'm getting the video ID of my latest video.
这是代码,playlistId
是包含我所有视频的频道的播放列表 ID:
This is the code, playlistId
being my channel's playlist ID that contains all my videos:
def get_latest_video_id(youtube, playlistId):
id_request = youtube.playlistItems().list(
part = 'snippet',
playlistId = playlistId
)
id_response = id_request.execute()
video_id = id_response['items'][0]['snippet']['resourceId']['videoId']
return video_id
现在的问题是,我的直播流也被保存到这个播放列表中.我不知道是否有包含我上传的所有内容的播放列表,不包括我保存的直播.
The problem now is, my live streams also get saved into this playlist. I couldn't find out if there is a playlist with all my uploads excluding my saved live streams.
我想到的解决方法是获取我所有直播的列表,并将它们的 ID 与我通过上述方法获得的 ID 进行比较.
The workaround I thought of is to get a list of all my livestreams and compare their ID to the ID I got from the method above.
我的问题是,没有更好的方法来做到这一点吗?有没有偶然的 API 调用可以满足我的需要,而没有高额的配额成本?
My question is, isn't there a better way to do this? Is there by chance a API call that does what I need, without high quota cost?
推荐答案
您必须反复调用 PlaylistItems.list
API 端点(使用分页)用于手动过滤直播视频.
You'll have to iterate your call to PlaylistItems.list
API endpoint (using pagination) for to filter out manually the videos that are live streams.
def get_non_livestream_videos(youtube, video_ids):
assert len(video_ids) <= 50
if not video_ids: return []
response = youtube.videos().list(
fields = 'items(id,liveStreamingDetails)',
part = 'id,liveStreamingDetails',
maxResults = len(video_ids),
id = ','.join(video_ids),
).execute()
items = response.get('items', [])
assert len(items) <= len(video_ids)
not_live = lambda video: \
not video.get('liveStreamingDetails')
video_id = lambda video: video['id']
return map(video_id, filter(not_live, items))
def get_latest_video_id(youtube, playlistId):
request = youtube.playlistItems().list(
fields = 'nextPageToken,items/snippet/resourceId',
playlistId = playlistId,
maxResults = 50,
part = 'snippet'
)
is_video = lambda item: \
item['snippet']['resourceId']['kind'] == 'youtube#video'
video_id = lambda item: \
item['snippet']['resourceId']['videoId']
while request:
response = request.execute()
items = response.get('items', [])
assert len(items) <= 50
videos = map(video_id, filter(is_video, items))
if videos:
videos = get_non_livestream_videos(youtube, videos)
if videos: return videos[0]
request = youtube.playlistItems().list_next(
request, response)
return None
请注意,上面我使用了fields
请求参数,用于仅从 API 获取实际需要的信息.
Note that above I used the fields
request parameter for to get from the APIs only the info that's actually needed.
另请注意,您可能需要详细说明函数 get_non_livestream_videos
,因为 Videos.list
使用其 id
参数作为以逗号分隔的视频 ID 列表很可能会改变它返回的项目的顺序写video_ids
中 ID 的给定顺序.
Also note that you may have to elaborate a bit the function get_non_livestream_videos
, since the Videos.list
API endpoint queried with its id
parameter as a comma-separated list of video IDs may well alter the order of the items it returns w.r.t. the given order of the IDs in video_ids
.
还有一个重要的注意事项:如果您在 Python 3 下运行上面的代码(您的问题没有提到这一点),那么请确保您在顶部插入了以下配置代码脚本:
Yet an important note: if you're running the code above under Python 3 (your question does not mention this), then make sure you have the following configuration code inserted at the top of your script:
if sys.version_info[0] >= 3:
from builtins import map as builtin_map
map = lambda *args: list(builtin_map(*args))
这是必需的,因为在 Python 3 下,内置函数 map
返回一个迭代器,而在 Python 2 下,map
返回一个列表.
This is needed since, under Python 3, the builtin function map
returns an iterator, whereas under Python 2, map
returns a list.
这是解决我上面提到的问题的代码.Videos.list
相对于函数 get_non_livestream_videos
的参数 video_ids
给出的 ID 顺序改变返回项目顺序的情况:
Here is the code that solves the issue I mentioned above w.r.t. the case of Videos.list
altering the order of items returned relative to the order of the IDs given by the argument video_ids
of function get_non_livestream_videos
:
import sys
if sys.version_info[0] >= 3:
from builtins import map as builtin_map
map = lambda *args: list(builtin_map(*args))
class MergeVideoListsError(Exception): pass
def merge_video_lists(video_ids, video_res):
pair0 = lambda pair: pair[0]
pair1 = lambda pair: pair[1]
video_ids = sorted(
enumerate(video_ids), key = pair1)
video_res.sort(
key = lambda video: video['id'])
def error(video_id):
raise MergeVideoListsError(
"unexpected video resource of ID '%s'" % video_id)
def do_merge():
N = len(video_ids)
R = len(video_res)
assert R <= N
l = []
i, j = 0, 0
while i < N and j < R:
v = video_ids[i]
r = video_res[j]
s = v[1]
d = r['id']
if s == d:
l.append((v[0], r))
i += 1
j += 1
elif s < d:
i += 1
else:
error(d)
if j < R:
error(video_res[j]['id'])
return l
video_res = do_merge()
video_res.sort(key = pair0)
return map(pair1, video_res)
def println(*args):
for a in args:
sys.stdout.write(str(a))
sys.stdout.write('\n')
def test_merge_video_lists(ids, res, val):
try:
println("ids: ", ids)
println("res: ", res)
r = merge_video_lists(ids, res)
println("merge: ", r)
except MergeVideoListsError as e:
println("error: ", e)
r = str(e)
finally:
println("test: ", "OK" \
if val == r \
else "failed")
TESTS = ((
['c', 'b', 'a'],
[{'id': 'c'}, {'id': 'a'}, {'id': 'b'}],
[{'id': 'c'}, {'id': 'b'}, {'id': 'a'}]
),(
['c', 'b', 'a'],
[{'id': 'b'}, {'id': 'c'}],
[{'id': 'c'}, {'id': 'b'}]
),(
['c', 'b', 'a'],
[{'id': 'a'}, {'id': 'c'}],
[{'id': 'c'}, {'id': 'a'}]
),(
['c', 'b', 'a'],
[{'id': 'a'}, {'id': 'b'}],
[{'id': 'b'}, {'id': 'a'}]
),(
['c', 'b', 'a'],
[{'id': 'z'}, {'id': 'b'}, {'id': 'c'}],
"unexpected video resource of ID 'z'"
),(
['c', 'b', 'a'],
[{'id': 'a'}, {'id': 'z'}, {'id': 'c'}],
"unexpected video resource of ID 'z'"
),(
['c', 'b', 'a'],
[{'id': 'a'}, {'id': 'b'}, {'id': 'z'}],
"unexpected video resource of ID 'z'"
))
def main():
for i, t in enumerate(TESTS):
if i: println()
test_merge_video_lists(*t)
if __name__ == '__main__':
main()
# $ python merge-video-lists.py
# ids: ['c', 'b', 'a']
# res: [{'id': 'c'}, {'id': 'a'}, {'id': 'b'}]
# merge: [{'id': 'c'}, {'id': 'b'}, {'id': 'a'}]
# test: OK
#
# ids: ['c', 'b', 'a']
# res: [{'id': 'b'}, {'id': 'c'}]
# merge: [{'id': 'c'}, {'id': 'b'}]
# test: OK
#
# ids: ['c', 'b', 'a']
# res: [{'id': 'a'}, {'id': 'c'}]
# merge: [{'id': 'c'}, {'id': 'a'}]
# test: OK
#
# ids: ['c', 'b', 'a']
# res: [{'id': 'a'}, {'id': 'b'}]
# merge: [{'id': 'b'}, {'id': 'a'}]
# test: OK
#
# ids: ['c', 'b', 'a']
# res: [{'id': 'z'}, {'id': 'b'}, {'id': 'c'}]
# error: unexpected video resource of ID 'z'
# test: OK
#
# ids: ['c', 'b', 'a']
# res: [{'id': 'a'}, {'id': 'z'}, {'id': 'c'}]
# error: unexpected video resource of ID 'z'
# test: OK
#
# ids: ['c', 'b', 'a']
# res: [{'id': 'a'}, {'id': 'b'}, {'id': 'z'}]
# error: unexpected video resource of ID 'z'
# test: OK
上面的代码是一个独立的程序(在 Python v2 和 v3 下运行),它实现了一个合并函数merge_video_lists
.
The code above is a standalone program (running both under Python v2 and v3) that implements a merging function merge_video_lists
.
您必须在函数 get_non_livestream_videos
中通过替换以下行来使用此函数:
You'll have to use this function within the function get_non_livestream_videos
by replacing the line:
return map(video_id, filter(not_live, items))
与:
return map(video_id, merge_video_lists(
video_ids, filter(not_live, items)))
对于 Python 2.对于 Python 3,替换将是:
for Python 2. For Python 3 the replacement would be:
return map(video_id, merge_video_lists(
video_ids, list(filter(not_live, items))))
不要替换 return
语句,只需在该语句前面加上这个语句:
Instead of replacing the return
statement, just have that statement preceded by this one:
items = merge_video_lists(video_ids, items)
后一种变体更好,因为它还会验证 API 返回的视频 ID:如果有一个 ID 不在 video_ids
中,则 merge_video_lists
会抛出一个MergeVideoListsError
表示罪魁祸首 ID 的异常.
This latter variant is better, since it also validates the video IDs returned by the API: if there is an ID that is not in video_ids
, then merge_video_lists
throws a MergeVideoListsError
exception indicating the culprit ID.
要获取所有 N
天前的视频,不包括直播,请使用以下功能:
For obtaining all videos that are exactly N
days old, excluding live streams, use the function below:
def get_days_old_video_ids(youtube, playlistId, days = 7):
from datetime import date, datetime, timedelta
n_days = date.today() - timedelta(days = days)
request = youtube.playlistItems().list(
fields = 'nextPageToken,items(snippet/resourceId,contentDetails/videoPublishedAt)',
part = 'snippet,contentDetails',
playlistId = playlistId,
maxResults = 50
)
def parse_published_at(item):
details = item['contentDetails']
details['videoPublishedAt'] = datetime.strptime(
details['videoPublishedAt'],
'%Y-%m-%dT%H:%M:%SZ') \
.date()
return item
def find_if(cond, items):
for item in items:
if cond(item):
return True
return False
n_days_eq = lambda item: \
item['contentDetails']['videoPublishedAt'] == n_days
n_days_lt = lambda item: \
item['contentDetails']['videoPublishedAt'] < n_days
is_video = lambda item: \
item['snippet']['resourceId']['kind'] == 'youtube#video'
video_id = lambda item: \
item['snippet']['resourceId']['videoId']
videos = []
while request:
response = request.execute()
items = response.get('items', [])
assert len(items) <= 50
# remove the non-video entries in 'items'
items = filter(is_video, items)
# replace each 'videoPublishedAt' with
# its corresponding parsed date object
items = map(parse_published_at, items)
# terminate loop when found a 'videoPublishedAt' < n_days
done = find_if(n_days_lt, items)
# retain only the items with 'videoPublishedAt' == n_days
items = filter(n_days_eq, items)
# add to 'videos' the IDs of videos in 'items' that are not live streams
videos.extend(get_non_livestream_videos(youtube, map(video_id, items)))
if done: break
request = youtube.playlistItems().list_next(
request, response)
return videos
上面的函数get_days_old_video_ids
需要filter
和map
来返回列表,因此上面的configuration代码必须更新为:
The function get_days_old_video_ids
above needs filter
and map
to return lists, therefore the configuration code above has to be updated to:
if sys.version_info[0] >= 3:
from builtins import map as builtin_map
map = lambda *args: list(builtin_map(*args))
from builtins import filter as builtin_filter
filter = lambda *args: list(builtin_filter(*args))
请注意,get_days_old_video_ids
依赖于以下未记录的财产PlaylistItems.list产生的结果集的a>:对于一个频道的上传播放列表,PlaylistItems.list
返回的item是按时间倒序排列的(最新的第一个)由 contentDetails.videoPublishedAt
.
另请注意,get_days_old_video_ids
正在返回 恰好 天
旧的视频的 ID.如果需要获取最多天
的视频ID,那么定义:
n_days_ge = lambda item: \
item['contentDetails']['videoPublishedAt'] >= n_days
还有一点需要注意:在上面的函数 get_non_livestream_videos
的顶部,我添加了以下语句:
if not video_ids: return []
such that to avoid processing an empty video_ids
list.
这篇关于Youtube Data API:从除直播流之外的频道获取最新的视频 ID的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!