我已经看到了很多有关如何加快python代码或使其更高效的技巧。我在下面的代码上尝试了一些操作,例如:尽可能将全局变量更改为局部变量,使用.format创建字符串而不是添加字符串,并尝试不创建多个变量。但是,此脚本仍需要1h25才能运行。我有两个输入文件:
1)一个床文件,两列(制表符分隔)文件,第一列带有数字或代码,第二列带有数字。它有约20亿行,其中数字的组合是唯一的(它具有基因组中的所有位置;第一列是染色体,第二列是位置):
1 1
1 2
1 3
1 4
...
2)一个复杂的文件,其中前几行(约3000行)是以#开头的标头,然后是前两列中数字/代码+数字的组合的条目。这两列构成与第一个文件的链接(文件1中的1 1与文件2中的1 1相同)。这大约有2200万行。这是前三行的示例:
1 1 . G . 32.9939 . DP=1;MQ0F=0;AF1=0;AC1=0;DP4=1,0,0,0;MQ=60;FQ=-29.9923 GT:PL:GQ 0/1:0:36
1 2 . T . 32.9939 . DP=1;MQ0F=0;AF1=0;AC1=0;DP4=1,0,0,0;MQ=60;FQ=-29.9923 GT:PL:GQ ./.:0:36
1 3 . C . 32.9939 . DP=1;MQ0F=0;AF1=0;AC1=0;DP4=1,0,0,0;MQ=60;FQ=-29.9923 GT:PL:GQ 1/1:0:36
问:我想过滤第一个文件中的行,如果第二个文件中的行在最后一列中具有0 / 0、0 / 1或1/1(第四种可能性是./。),那么我需要解析最后一列,以到达这三个字符)
增加的复杂性在于必须通过管道从另一个程序读取文件#2,因为该文件是以该程序完成的特定方式压缩的(打开该文件需要花费很长时间,但是我无能为力... )
调用:程序视图file2.vcf.gz | my_script.py file1.bed
import sys
import re
import time
start_time = time.time()
def make_vcf_dict(vcf):
mydict={}
for line in (line for line in vcf if not line.startswith("#")):
line=line.strip().split()
genotype=line[-1].split(':')[0]
motif=re.compile('\./\.')
if motif.match(genotype) is None:
mydict.setdefault('{}:{}'.format(line[0],line[1]),'{}:{}'.format(line[0],line[1]))
return mydict
def create_output_bed(bed,data):
print "creating output"
for line in (line for line in data if line.startswith('#CHROM')):
output_name='{}_mask_positions.bed'.format(line.strip().split()[-1])
print output_name
output=open(output_name,'w')
print "making dictionary"
for line in bed:
line=line.strip().split()
#creating the same entry as in dict:
region='{}:{}'.format(line[0], line[1])
if region not in mydict:
output.write('{}\t{}\n'.format(line[0],line[1]))
output.close()
bed.close()
return
print "reading data"
data=sys.stdin.readlines() #.readlines here is key!!
mydict=make_vcf_dict(data)
#read the bed file:
print "Writing output"
create_output_bed(open(sys.argv[1],'r'),data)
print("--- %s seconds ---" % (time.time() - start_time))
我想知道是否会有更有效的方法来完全解决这一问题?不制作字典,拆分文件吗?我有32台核心服务器来处理此问题,并且几乎没有脚本编写经验。
谢谢!
最佳答案
如果第二个文件只有几百万行(第一个文件不是十亿行),那么我希望数据能够容纳在内存中。
我有一个32核心服务器来处理
并行化对您没有多大帮助,因为主要瓶颈是磁盘而不是CPU。除非数据分布在不同磁盘上的许多文件中。
但是,您确实可以进行一些改进:
将正则表达式编译移到循环(motif=re.compile('\./\.')
)之外。
使用set
代替dict
。
避免使用format
,只需使用tuple
。
不要事先阅读所有内容。
避免重复两次stdin。
避免重复两次。
import sys
import re
import time
start_time = time.time()
def make_vcf(vcf_input):
output = set()
motif=re.compile('\./\.')
for line in vcf_input:
line = line.strip().split()
if line[0].startswith('#CHROM'):
output_name = '{}_mask_positions.bed'.format(line[-1])
continue
elif line[0].startswith("#"):
continue
genotype=line[-1].split(':')[0]
if motif.match(genotype) is None:
output.add( (line[0],line[1]) )
return output_name, output
def create_output_bed(output_name, vcf, bed):
print "creating output:", output_name
output = open(output_name,'w')
print "making dictionary"
for line in bed:
line = line.strip().split()
#creating the same entry as in dict:
region = line[0], line[1]
if region not in vcf:
output.write('{}\t{}\n'.format(line[0],line[1]))
output.close()
bed.close()
return
print "reading data"
output_name, vcf = make_vcf(sys.stdin.readlines())
#read the bed file:
print "Writing output"
create_output_bed(output_name, vcf, open(sys.argv[1],'r'))
print("--- %s seconds ---" % (time.time() - start_time))