我有一个Python脚本,我想每天运行,我希望它只需要1-2个小时即可运行。当前已设置为针对给定的URL敲击4个不同的API,捕获结果,然后将数据保存到PostgreSQL数据库中。问题是我要遍历160,000个URL,脚本最终要花很长时间-我进行了一些初步测试,以目前的格式浏览每个URL将花费36个小时以上。因此,我的问题可以归结为:我应该优化脚本以同时运行多个线程吗?还是应该扩展正在使用的服务器数量?显然,第二种方法的成本更高,因此我希望在同一实例上运行多个线程。
我使用的是我创建的库(SocialAnalytics),该库提供了实现不同API端点并解析结果的方法。这是我配置脚本的方式:
import psycopg2
from socialanalytics import pinterest
from socialanalytics import facebook
from socialanalytics import twitter
from socialanalytics import google_plus
from time import strftime, sleep
conn = psycopg2.connect("dbname='***' user='***' host='***' password='***'")
cur = conn.cursor()
# Select all URLs
cur.execute("SELECT * FROM urls;")
urls = cur.fetchall()
for url in urls:
# Pinterest
try:
p = pinterest.getPins(url[2])
except:
p = { 'pin_count': 0 }
# Facebook
try:
f = facebook.getObject(url[2])
except:
f = { 'comment_count': 0, 'like_count': 0, 'share_count': 0 }
# Twitter
try:
t = twitter.getShares(url[2])
except:
t = { 'share_count': 0 }
# Google
try:
g = google_plus.getPlusOnes(url[2])
except:
g = { 'plus_count': 0 }
# Save results
try:
now = strftime("%Y-%m-%d %H:%M:%S")
cur.execute("INSERT INTO social_stats (fetched_at, pinterest_pins, facebook_likes, facebook_shares, facebook_comments, twitter_shares, google_plus_ones) VALUES(%s, %s, %s, %s, %s, %s, %s, %s);", (now, p['pin_count'], f['like_count'], f['share_count'], f['comment_count'], t['share_count'], g['plus_count']))
conn.commit()
except:
conn.rollback()
您可以看到,每次对API的调用都使用Requests library,这是一个同步的阻塞事件。经过一些初步研究,我发现了Treq,它是Twisted之上的API。 Twisted的异步,无阻塞性质似乎是改进我的方法的一个不错的选择,但我从未使用过它,并且我不确定它究竟能(或是否)能帮助我实现目标。
任何指导深表感谢!
最佳答案
首先,您应该测量脚本在每个步骤上花费的时间。可能是您发现了一些有趣的东西:)
其次,您可以将网址拆分为多个块:chunk_size = len(urls)/cpu_core_count; // don't forget about remainder of division
完成这些步骤后,您可以使用multiprocessing并行处理每个块。这是给你的例子:
import multiprocessing as mp
p = mp.Pool(5)
# first solution
for urls_chunk in urls: # urls = [(url1...url6),(url7...url12)...]
res = p.map(get_social_stat, urls_chunk)
for record in res:
save_to_db(record)
# or, simple
res = p.map(get_social_stat, urls)
for record in res:
save_to_db(record)
另外,gevent可以为您提供帮助。因为它可以优化同步阻塞请求的处理序列上的时间花费。