我正在编写一个python脚本来获取与特定group_id对应的主机列表。我将使用网络服务调用来获取相同的内容。主机数可以为10,000。现在,对于每个主机,我将从另一个Web服务中获取一个名为property的值。

所以group-id ----(ws1)----- 10000s主机-(ws2)----每个属性

我正在使用并发代码,如以下代码所示。但这似乎不是一个干净的设计,并且扩展性不太可能。

def call_ws_1(group_id):
     #fetch list of hosts for group_id


def call_ws_2(host):
     #find property for host


def fetch_hosts(group_ids):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        future_to_grp_id = {executor.submit(call_ws_1, group_id): group_id for group_id in group_ids}
        for future in concurrent.futures.as_completed(future_to_grp_id):
            group_id = future_to_grp_id[future]
            try:
                hosts = future.result()#this is a list
            except Exception as exp:
                #logging etc
            else:
                 fetch_property(hosts)


def fetch_property(hosts):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        future_to_host = {executor.submit(call_ws_2, host): host for host in hosts}
        for future in concurrent.futures.as_completed(future_to_host):
            host = future_to_host[future]
            try:
                host_prop = future.result()#String
            except Exception as exp:
                #logging etc
            else:
                 #Save host and property to DB
  • 使用ProcessPoolExecuter有什么好处吗?
  • 如何首先获取所有主机(其中约40000个),然后调用ws来获取属性
  • 还有其他建议来改进此设计吗?
  • 最佳答案

  • ProcessPoolExecutor具有不受GIL影响的优点。使用ThreadPoolExecutor,除非您正在执行I/O,否则GIL将阻止一次运行多个线程。好消息是,看起来您的两个线程都将主要执行I/O,但是在进行Web服务调用之前或之后,每个线程中发生的任何类型的处理都不会真正同时发生,这会损害您的性能。 ProcessPoolExecutor将没有此限制,但是它增加了在进程之间发送group_idhost数据的开销。如果您有成千上万的主机,那么在进程之间一次发送那些主机将产生相当大的开销。
  • 我认为仅此一项更改不会对性能造成太大影响,因为最后您仍然会将每台主机一次发送到一个线程进行处理。

  • 至于第3条,如果您的工作线程实际上除了I/O几乎什么都没做,则此方法可能会正常工作。但是,有了线程,在工作人员中进行的所有与CPU绑定(bind)的工作都会破坏您的性能。我采用了您确切的程序布局,并像这样实现了您的两名工作人员:
    def call_ws_1(group_id):
        return list(range(20))
    
    def call_ws_2(host):
        sum(range(33000000))  # CPU-bound
        #time.sleep(1)  # I/O-bound
        return "{} property".format(host)
    

    并执行以下所有操作:
    if __name__ == "__main__":
        start = time.time()
        fetch_hosts(['a', 'b', 'c', 'd', 'e'])
        end = time.time()
        print("Total time: {}".format(end-start))
    

    使用time.sleep,输出为:
    Fetching hosts for d
    Fetching hosts for a
    Fetching hosts for c
    Fetching hosts for b
    Fetching hosts for e
    Total time: 25.051292896270752
    

    使用sum(range(33000000))计算,性能会差很多:
    Fetching hosts for d
    Fetching hosts for a
    Fetching hosts for c
    Fetching hosts for b
    Fetching hosts for e
    Total time: 75.81612730026245
    

    请注意,在笔记本电脑上计算大约需要一秒钟:
    >>> timeit.timeit("sum(range(33000000))", number=1)
    1.023313045501709
    >>> timeit.timeit("sum(range(33000000))", number=1)
    1.029937982559204
    

    因此,每个 worker 大约需要一秒钟。但是由于一个线程受CPU限制,因此受GIL的影响,因此线程的性能非常差。

    这是使用ProcessPoolExecutortime.sleep:
    Fetching hosts for a
    Fetching hosts for b
    Fetching hosts for c
    Fetching hosts for d
    Fetching hosts for e
    Total time: 25.169482469558716
    

    现在使用sum(range(33000000)):
    Fetching hosts for a
    Fetching hosts for b
    Fetching hosts for c
    Fetching hosts for d
    Fetching hosts for e
    Total time: 43.54587936401367
    

    如您所见,尽管性能仍然比time.sleep差(可能是因为计算时间要长于一秒钟,并且CPU占用的工作必须与笔记本电脑上运行的所有其他产品竞争),但它仍然大大优于线程版本。

    但是,我怀疑随着主机数量的增加,IPC的成本会使您的运行速度大大降低。这是ThreadPoolExecutor对10000台主机执行的操作,但是一个不执行任何操作的工作进程(仅返回):
    Fetching hosts for c
    Fetching hosts for b
    Fetching hosts for d
    Fetching hosts for a
    Fetching hosts for e
    Total time: 9.535644769668579
    

    比较ProcessPoolExecutor:
    Fetching hosts for c
    Fetching hosts for b
    Fetching hosts for a
    Fetching hosts for d
    Fetching hosts for e
    Total time: 36.59257411956787
    

    因此ProcessPoolExecutor的速度要慢4倍,这都是由IPC的成本引起的。

    那么,这是什么意思呢?我认为,最好的性能将通过使用ProcessPoolExecutor来实现,但还要对IPC进行批处理,以便将大量主机发送到一个子进程中,而不是一次发送一个主机。

    像这样(未经测试,但可以为您提供想法):
    import time
    import itertools
    import concurrent.futures
    from concurrent.futures import ProcessPoolExecutor as Pool
    
    def call_ws_1(group_id):
        return list(range(10000))
    
    def call_ws_2(hosts):  # This worker now works on a list of hosts
        host_results = []
        for host in hosts:
            host_results.append(( host, "{} property".format(host)))  # returns a list of (host, property) tuples
        return host_results
    
    def chunk_list(l):
        chunksize = len(l) // 16  # Break the list into smaller pieces
        it = [iter(l)] * chunksize
        for item in itertools.zip_longest(*it):
            yield tuple(filter(None, item))
    
    def fetch_property(hosts):
        with Pool(max_workers=4) as executor:
            futs = []
            for chunk in chunk_list(hosts):
                futs.append(concurrent.futures.submit(call_ws_2, chunk))
            for future in concurrent.futures.as_completed(futs):
                try:
                     results = future.result()
                except Exception as exp:
                    print("Got %s" % exp)
                else:
                    for result in results:
                        host, property = result
                        # Save host and property to DB
    
    def fetch_hosts(group_ids):
        with Pool(max_workers=4) as executor:
            future_to_grp_id = {executor.submit(call_ws_1, group_id): group_id for group_id in group_ids}
            for future in concurrent.futures.as_completed(future_to_grp_id):
                group_id = future_to_grp_id[future]
                try:
                    hosts = future.result()#this is a list
                except Exception as exp:
                    print("Got %s" % exp)
                else:
                    print("Fetching hosts for {}".format(group_id))
                    fetch_property(hosts)
    
    if __name__ == "__main__":
        start = time.time()
        fetch_hosts(['a', 'b', 'c', 'd', 'e'])
        end = time.time()
        print("Total time: {}".format(end-start))
    

    关于python - Python : efficiency concerns in parallel async calls to fetch data from web services,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/24921758/

    10-12 18:14