问题描述
这是我上一个的第二个问题
更快地进行多维矩阵加法的方法?遵循@Peter Cordes的建议后,我将代码矢量化,现在速度提高了50倍.然后我再次做了gprof,发现此功能占用了大多数时间.
This is my second question of previous one
Faster way to do multi dimensional matrix addition?After follow the advice of @Peter Cordes i vectorize my code and now the speed has been up by 50X. Then i again did the gprof and found this function is taking most of the time.
Each sample counts as 0.01 seconds.
% cumulative self self total
time seconds seconds calls Ts/call Ts/call name
69.97 1.53 1.53 cal_score(int, std::string, int const*, int, double)
double cal_score(int l, string seq, const int *__restrict__ pw,int cluster,double alpha)
{
const int cols =4;
const int *__restrict__ pwcluster = pw + ((long)cluster) * l * cols;
double score = 0;
char s;
string alphabet="ACGT";
int count=0;
for(int k=0;k<cols;k++)
count=count+pwcluster[k];
for (int i = 0; i < l; i++){
long row_offset = cols*i;
s=seq[i];
//#pragma omp simd
for(int k=0;k<cols;k++) {
if (s==alphabet[k])
score=score+log( ( pwcluster[row_offset+k]+alpha )/(count+4*alpha) );
}
}
return score;
}
我是第一次进行代码优化,所以不知道如何进行.因此,有什么方法可以更好地编写此函数.这样我可以提高速度.输入seq是长度为l的字符"ACGT"的序列.pw是大小为2 * l * 4或[p] [q] [r]的一维数组,簇为p.
I am doing the code optimization first time so don't know how to proceed. So is there any way to write better this function. So i can get some more speed.Input seq is the sequence of character 'ACGT' of length l.pw is one dimensional array of size 2*l*4 or [p][q][r] and cluster is p.
推荐答案
我对Mike的好主意和代码做了一些改进.
I made some more improvements to Mike's good idea and code.
我还制作了矢量化版本(需要SSE4.1).它更可能有错误,但是值得尝试,因为您可以从打包乘法中获得显着的加速.将其移植到AVX应该可以大大提高速度.
I also made a vectorized version (requiring SSE4.1). It's more likely to have bugs, but is worth trying because you should get a significant speedup from doing packed multiplies. Porting it to AVX should give another big speedup.
查看 godbolt ,包括从ASCII到0..3个基数的向量化转换(使用pshufb LUT).
See all the code on godbolt, including a vectorized conversion from ASCII to 0..3 bases (using a pshufb LUT).
我的更改:
-
不要提前翻译.它应该与FP循环的工作很好地重叠,而不是强迫它在FP工作开始之前等待一个小的翻译循环完成.
Don't translate ahead of time. It should overlap well with the FP loop's work, instead of forcing that to wait for a tiny translation loop to finish before the FP work can start.
简化计数器变量(gcc提供了更好的代码:实际上是将j
保留在寄存器中,而不是对其进行优化.否则,它将内部循环完全展开为一个巨大的循环.)
Simplify the counter variables (gcc makes better code: it was actually keeping j
in a register, rather than optimizing it away. Or else it was fully unrolling the inner loop into a giant loop.)
将缩放比例完全从循环中拉出(count + 4*alpha)
:而不是除以(或乘以反数),而是减去对数.由于log()的增长非常缓慢,因此我们可以无限期地推迟此操作,而不会在最终的score
中损失太多的精度.
Pull the scaling by (count + 4*alpha)
out of the loop entirely: instead of dividing by (or multiplying by the inverse), subtract the logarithm. Since log() grows extremely slowly, we can probably defer this indefinitely without losing too much precision in the final score
.
另一种方法是仅每N次迭代减去一次,但是循环必须弄清楚它是否提前终止.至少,我们可以乘以1.0 / (count + 4*alpha)
而不是除以.没有-ffast-math
,编译器将无法为您执行此操作.
An alternative would be only subtract every N iterations, but then the loop would have to figure out whether it terminated early. At the very least, we could multiply by 1.0 / (count + 4*alpha)
, instead of dividing. Without -ffast-math
, the compiler can't do this for you.
让调用者为我们计算pwcluster
:无论如何它可能会为自己计算使用,我们可以删除函数args(cluster
)之一.
Have the caller calculate pwcluster
for us: it probably calculates it for its own use anyway, and we can remove one of the function args (cluster
).
row_offset
编写的代码稍差.如果您希望将指针增量作为数组索引的替代方法,则gcc可以在内部循环中使pwcluster
直接增量更好的代码.
row_offset
was making slightly worse code compared to just writing i*cols
. If you like pointer increments as an alternative to array indexing, gcc makes even better code in the inner loop incrementing pwcluster
directly.
将l
重命名为len
:单字母变量名称是不好的样式,除非在很小的范围内. (例如循环,或者只做一件事的很小的函数),即使只有一个简短但有意义的名称也是如此.例如p
并不比ptr
更有意义,但是len
确实会告诉您它的含义,而不仅仅是它的含义.
rename l
to len
: single-letter variable names are bad style except in very small scopes. (like a loop, or a very small function that only does one thing), and even then only if there isn't a good short but meaningful name. e.g. p
is no more meaningful than ptr
, but len
does tell you about what it means, not just what it is.
-
以翻译后的格式在整个程序中存储序列对于此操作以及任何其他想要将DNA碱基用作数组索引或计数器的代码来说都是更好的选择.
Storing sequences in translated format throughout your program would be better for this and any other code that wants to use the DNA base as an array index or counter.
您还可以使用SSSE3 pshufb向量化将核苷酸编号(0..3)转换为ASCII或从ASCII转换为矢量. (请参阅我关于Godbolt的代码).
You can also vectorize translating nucleotide numbers (0..3) to/from ASCII using SSSE3 pshufb. (See my code on godbolt).
将矩阵存储在float
中而不是int
中可能会更好.由于您的代码现在大部分时间都花在此函数上,因此,如果不必继续将数据从int转换为float,它将运行得更快.在Haswell上,cvtss2sd
(单->双)显然具有比ctvsi2sd
(int->双)更好的吞吐量,但在Skylake上则没有. (ss2sd在SKL上比HSW慢).
Storing your matrix in float
instead of int
might possibly be better. Since your code spends most of its time in this function now, it would run faster if it didn't have to keep converting from int to float. On Haswell, cvtss2sd
(single->double) apparently has better throughput than ctvsi2sd
(int->double), but not on Skylake. (ss2sd is slower on SKL than HSW).
以double
格式存储矩阵可能会更快,但是使缓存占用空间增加一倍可能会更致命.用float
而不是double
进行此计算也可以避免转换成本.但是您可以使用double
将log()
推迟更多迭代.
Storing your matrix in double
format might be faster, but the doubled cache footprint might be killer. Doing this calculation with float
instead of double
would avoid the conversion cost, too. But you could defer log()
for more iterations with double
.
在手动展开的内部循环中使用多个product
变量(p1
,p2
等)将展示更多的并行性.在循环结束时将它们相乘. (我最终制作了带有两个向量累加器的向量化版本.)
Using multiple product
variables (p1
, p2
, etc.) in a manually unrolled inner loop would expose more parallelism. Multiply them together at the end of the loop. (I ended up making a vectorized version with two vector accumulators).
对于Skylake或Broadwell,可以使用VPGATHERDD
进行矢量化.从ASCII到0..3的矢量化转换在这里会很有帮助.
For Skylake or maybe Broadwell, you could vectorize with VPGATHERDD
. The vectorized translation from ASCII to 0..3 will be helpful here.
即使不使用collect指令,将两个整数加载到向量中并使用压缩转换指令也将是不错的选择.打包的转换指令比标量指令快.我们有很多乘法要做,当然可以利用对SIMD向量一次执行两个或四个运算的优势.见下文.
Even without using a gather instruction, loading two integers into a vector and using a packed conversion instruction would be good. The packed conversion instructions are faster than the scalar ones. We have a lot of multiplies to do, and can certainly take advantage of doing two or four at once with SIMD vectors. See below.
请参阅此答案顶部的链接,查看完整的代码.
See the full code on godbolt, link at the top of this answer.
double cal_score_simple(
int len // one-letter variable names are only good in the smallest scopes, like a loop
, const unsigned char* seq // if you want to pass a std::string, do it by const &, not by value
, const int *__restrict__ pwcluster // have the caller do the address math for us, since it probably already does it anyway
, double alpha )
{
// note that __restrict__ isn't needed because we don't write into any pointers
const int cols = 4;
const int logdelay_factor = 4; // accumulate products for this many iterations before doing a log()
int count=0; // count the first row of pwcluster
for(int k = 0; k < cols; k++)
count += pwcluster[k];
const double log_c4a = log(count + 4*alpha);
double score = 0;
for (int i = 0; i < len;){
double product = 1;
int inner_bound = std::min(len, i+logdelay_factor);
while (i < inner_bound){
unsigned int k = transTable[seq[i]]; // translate on the fly
product *= (pwcluster[i*cols + k] + alpha); // * count4alpha_inverse; // scaling deferred
// TODO: unroll this with two or four product accumulators to allow parallelism
i++;
}
score += log(product); // - log_c4a * j;
}
score -= log_c4a * len; // might be ok to defer this subtraction indefinitely, since log() accumulates very slowly
return score;
}
这可以编译为相当不错的asm,并且内部循环非常紧凑:
This compiles to quite good asm, with a pretty compact inner loop:
.L6:
movzx esi, BYTE PTR [rcx] # D.74129, MEM[base: _127, offset: 0B]
vxorpd xmm1, xmm1, xmm1 # D.74130
add rcx, 1 # ivtmp.44,
movzx esi, BYTE PTR transTable[rsi] # k, transTable
add esi, eax # D.74133, ivtmp.45
add eax, 4 # ivtmp.45,
vcvtsi2sd xmm1, xmm1, DWORD PTR [r12+rsi*4] # D.74130, D.74130, *_38
vaddsd xmm1, xmm1, xmm2 # D.74130, D.74130, alpha
vmulsd xmm0, xmm0, xmm1 # product, product, D.74130
cmp eax, r8d # ivtmp.45, D.74132
jne .L6 #,
使用指针增量而不是用i*cols
进行索引会从循环中删除一个add
,并将其降低到10个融合域uops(此循环中为11).因此,对于循环缓冲区中的前端吞吐量而言,这无关紧要,但是对于执行端口而言,它们的指针将更少. 即使总的uop吞吐量不是紧迫的瓶颈,资源停顿也可以解决这个问题.
Using a pointer increment instead of indexing with i*cols
removes one add
from the loop, getting it down to 10 fused-domain uops (vs. 11 in this loop). So it won't matter for frontend throughput from the loop buffer, but will be fewer uops for the execution ports. Resource stalls can make that matter, even when total uop throughput isn't the immediate bottleneck.
未经测试,并且没有精心编写.我很容易在这里犯了一些错误.如果要在装有AVX的计算机上运行此程序,则绝对应制作此版本的AVX.将vextractf128
用作水平乘积或总和的第一步,然后与此处的操作相同.
Not tested, and not that carefully written. I could easily have made some mistakes here. If you're running this on computers with AVX, you should definitely make an AVX version of this. Use vextractf128
as a first step in a horizontal product or sum, then the same as what I have here.
使用向量化的log()
函数在向量中并行计算两个(或使用AVX,四个)log()
结果,您可以在末尾进行水平求和,而不是在每个标量log()
.我敢肯定有人写的,但是我现在不花时间去搜索它.
With a vectorized log()
function to compute two (or four with AVX) log()
results in parallel in a vector, you could just do a horizontal sum at the end, instead of more frequent horizontal products before each scalar log()
. I'm sure someone's written one, but I'm not going to take the time to search for it right now.
// TODO: AVX version
double cal_score_SSE(
int len // one-letter variable names are only good in the smallest scopes, like a loop
, const unsigned char* seq // if you want to pass a std::string, do it by const &, not by value
, const int *__restrict__ pwcluster // have the caller do the address math for us, since it probably already does it anyway
, double alpha
)
{
const int cols = 4;
const int logdelay_factor = 16; // accumulate products for this many iterations before doing a log()
int count=0; // count the first row of pwcluster
for(int k = 0; k < cols; k++) count += pwcluster[k];
//const double count4alpha_inverse = 1.0 / (count + 4*alpha);
const double log_c4a = log(count + 4*alpha);
#define COUNTER_TYPE int
//// HELPER FUNCTION: make a vector of two (pwcluster[i*cols + k] + alpha)
auto lookup_two_doublevec = [&pwcluster, &seq, &alpha](COUNTER_TYPE pos) {
unsigned int k0 = transTable[seq[pos]];
unsigned int k1 = transTable[seq[pos+1]];
__m128i pwvec = _mm_cvtsi32_si128( pwcluster[cols*pos + k0] );
pwvec = _mm_insert_epi32(pwvec, pwcluster[cols*(pos+1) + k1], 1);
// for AVX: repeat the previous lines, and _mm_unpack_epi32 into one __m128i,
// then use _mm256_cvtepi32_pd (__m128i src)
__m128d alphavec = _mm_set1_pd(alpha);
return _mm_cvtepi32_pd(pwvec) + alphavec;
//p1d = _mm_add_pd(p1d, _mm_set1_pd(alpha));
};
double score = 0;
for (COUNTER_TYPE i = 0; i < len;){
double product = 1;
COUNTER_TYPE inner_bound = i+logdelay_factor;
if (inner_bound >= len) inner_bound = len;
// possibly do a whole vector of transTable translations; probably doesn't matter
if (likely(inner_bound < len)) {
// We can do 8 or 16 elements without checking the loop counter
__m128d p1d = lookup_two_doublevec(i+0);
__m128d p2d = lookup_two_doublevec(i+2);
i+=4; // start with four element loaded into two vectors, not multiplied by anything
static_assert(logdelay_factor % 4 == 0, "logdelay_factor must be a multiple of 4 for vectorization");
while (i < inner_bound) {
// The *= syntax requires GNU C vector extensions, which is how __m128d is defined in gcc
p1d *= lookup_two_doublevec(i+0);
p2d *= lookup_two_doublevec(i+2);
i+=4;
}
// we have two vector accumulators, holding two products each
p1d *= p2d; // combine to one vector
//p2d = _mm_permute_pd(p1d, 1); // if you have AVX. It's no better than movhlps, though.
// movhlps p2d, p1d // extract the high double, using p2d as a temporary
p2d = _mm_castps_pd( _mm_movehl_ps(_mm_castpd_ps(p2d), _mm_castpd_ps(p1d) ) );
p1d = _mm_mul_sd(p1d, p2d); // multiply the last two elements, now that we have them extracted to separate vectors
product = _mm_cvtsd_f64(p1d);
// TODO: find a vectorized log() function for use here, and do a horizontal add down to a scalar outside the outer loop.
} else {
// Scalar for the last unknown number of iterations
while (i < inner_bound){
unsigned int k = transTable[seq[i]];
product *= (pwcluster[i*cols + k] + alpha); // * count4alpha_inverse; // scaling deferred
i++;
}
}
score += log(product); // - log_c4a * j; // deferred
}
score -= log_c4a * len; // May be ok to defer this subtraction indefinitely, since log() accumulates very slowly
// if not, subtract log_c4a * logdefer_factor in the vector part,
// and (len&15)*log_c4a out here at the end. (i.e. len %16)
return score;
}
矢量化ASCII->整数DNA库
理想地,当您按顺序阅读时,请进行一次翻译,然后将其内部存储在0/1/2/3数组中,而不是A/C/G/T ASCII字符串中.
Vectorized ASCII->integer DNA base
Ideally, do the translation once when you read in sequences, and internally store them in 0/1/2/3 arrays instead of A/C/G/T ASCII strings.
如果我们不必检查错误(无效字符),则可以使用pshufb对其进行手动向量化.在Mike的代码中,我们在FP循环之前转换了整个输入,这可以大大提高代码的那部分.
It can be manually vectorized with pshufb, if we don't have to check for errors (invalid characters). In Mike's code, where we translate the whole input ahead of the FP loop, this can give a big speedup to that part of the code.
对于即时翻译,我们可以使用向量来实现:
For translating on the fly, we could use a vector to:
- 在外循环中翻译一个由16个输入字符组成的块
- 将其存储到16个字节的缓冲区中,
- 然后从内循环中进行标量加载.
由于gcc似乎完全展开了向量循环,因此这将用6条向量指令(包括加载和存储)替换16条movzx
指令.
Since gcc seems to fully unroll the vector loop, this would replace 16 movzx
instructions with 6 vector instructions (including the load and store).
#include <immintrin.h>
__m128i nucleotide_ASCII_to_number(__m128i input) {
// map A->0, C->1, G->2, T->3.
// low 4 bits aren't unique low 4 bits *are* unique
/* 'A' = 65 = 0b100 0001 >>1 : 0b10 0000
* 'C' = 67 = 0b100 0011 >>1 : 0b10 0001
* 'G' = 71 = 0b100 0111 >>1 : 0b10 0011
* 'T' = 87 = 0b101 0111 >>1 : 0b10 1011 // same low 4 bits for lower-case
*
* We right-shift by one, mask, and use that as indices into a LUT
* We can use pshufb as a 4bit LUT, to map all 16 chars in parallel
*/
__m128i LUT = _mm_set_epi8(0xff, 0xff, 0xff, 0xff, 3, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 2, 0xff, 1, 0);
// Not all "bogus" characters map to 0xFF, but 0xFF in the output only happens on invalid input
__m128i shifted = _mm_srli_epi32(input, 1); // And then mask, to emulate srli_epi8
__m128i masked = _mm_and_si128(shifted, _mm_set1_epi8(0x0F));
__m128i nucleotide_codes = _mm_shuffle_epi8(LUT, masked);
return nucleotide_codes;
}
// compiles to:
vmovdqa xmm1, XMMWORD PTR .LC2[rip] # the lookup table
vpsrld xmm0, xmm0, 1 # tmp96, input,
vpand xmm0, xmm0, XMMWORD PTR .LC1[rip] # D.74111, tmp96,
vpshufb xmm0, xmm1, xmm0 # tmp100, tmp101, D.74111
ret
这篇关于计算序列可能性的更快方法?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!