我有一个Python脚本,我想每天运行,我希望它只需要1-2个小时即可运行。当前已设置为针对给定的URL敲击4个不同的API,捕获结果,然后将数据保存到PostgreSQL数据库中。问题是我要遍历160,000个URL,脚本最终要花很长时间-我进行了一些初步测试,以目前的格式浏览每个URL将花费36个小时以上。因此,我的问题可以归结为:我应该优化脚本以同时运行多个线程吗?还是应该扩展正在使用的服务器数量?显然,第二种方法的成本更高,因此我希望在同一实例上运行多个线程。

我使用的是我创建的库(SocialAnalytics),该库提供了实现不同API端点并解析结果的方法。这是我配置脚本的方式:

import psycopg2
from socialanalytics import pinterest
from socialanalytics import facebook
from socialanalytics import twitter
from socialanalytics import google_plus
from time import strftime, sleep

conn = psycopg2.connect("dbname='***' user='***' host='***' password='***'")
cur = conn.cursor()

# Select all URLs
cur.execute("SELECT * FROM urls;")
urls = cur.fetchall()

for url in urls:

    # Pinterest
    try:
        p = pinterest.getPins(url[2])
    except:
        p = { 'pin_count': 0 }
    # Facebook
    try:
        f = facebook.getObject(url[2])
    except:
        f = { 'comment_count': 0, 'like_count': 0, 'share_count': 0 }
    # Twitter
    try:
        t = twitter.getShares(url[2])
    except:
        t = { 'share_count': 0 }
    # Google
    try:
        g = google_plus.getPlusOnes(url[2])
    except:
        g = { 'plus_count': 0 }

    # Save results
    try:
        now = strftime("%Y-%m-%d %H:%M:%S")
        cur.execute("INSERT INTO social_stats (fetched_at, pinterest_pins, facebook_likes, facebook_shares, facebook_comments, twitter_shares, google_plus_ones) VALUES(%s, %s, %s, %s, %s, %s, %s, %s);", (now, p['pin_count'], f['like_count'], f['share_count'], f['comment_count'], t['share_count'], g['plus_count']))
        conn.commit()
    except:
        conn.rollback()


您可以看到,每次对API的调用都使用Requests library,这是一个同步的阻塞事件。经过一些初步研究,我发现了Treq,它是Twisted之上的API。 Twisted的异步,无阻塞性质似乎是改进我的方法的一个不错的选择,但我从未使用过它,并且我不确定它究竟能(或是否)能帮助我实现目标。

任何指导深表感谢!

最佳答案

首先,您应该测量脚本在每个步骤上花费的时间。可能是您发现了一些有趣的东西:)

其次,您可以将网址拆分为多个块:

chunk_size = len(urls)/cpu_core_count; // don't forget about remainder of division

完成这些步骤后,您可以使用multiprocessing并行处理每个块。这是给你的例子:

import multiprocessing as mp

p = mp.Pool(5)

# first solution
for urls_chunk in urls: # urls = [(url1...url6),(url7...url12)...]
    res = p.map(get_social_stat, urls_chunk)
    for record in res:
        save_to_db(record)

# or, simple
res = p.map(get_social_stat, urls)

for record in res:
   save_to_db(record)


另外,gevent可以为您提供帮助。因为它可以优化同步阻塞请求的处理序列上的时间花费。

09-10 14:11
查看更多