如果你对性能一无所知,把计算相似度的代码写成下面这样
float dot_float(int len, const float* v1, const float* v2) { float sumf = 0; for (int i = 0; i < len; i++) { sumf += v1[i] * v2[i]; } return sumf; }
然后在外层这样调用
for (int pp = 0; pp < person_num; pp++) scoresf[pp] = dot_float(N, valf1, valf2 + pp*N);
与这种最粗糙的写法相比,本文介绍的写法最终可以提速12.6倍左右。
基本思路:
1.float转short, 相似度损失小于万分之五(见最上面的链接),位宽缩小一半,定点比浮点要快。
2.利用SSE加速。
3.批量处理减少函数调用次数。
4. 展开循环
第一部分:数据准备
注意不用SSE时,两short相乘会溢出,因此直接用int存储。如果用short存储,相乘时才转成int,应该会更慢。
float scale = (int)0x7fff; void init() { person_num = total_elements / N; printf("N = %d, person_num = %d\n", N, person_num); valf1 = new float[N]; valf2 = new float[N*person_num]; vali1 = new int[N]; vali2 = new int[N*person_num]; valshort1 = new short[N]; valshort2 = new short[N*person_num]; scoresf = new float[person_num]; scoresi = new int[person_num]; float len1 = 0; for (int i = 0; i < N; i++) { valf1[i] = i; len1 += valf1[i] * valf1[i]; } len1 = sqrt(len1); for (int i = 0; i < N; i++) { valf1[i] /= len1; vali1[i] = __min(scale, valf1[i] * scale + 0.5f); valshort1[i] = vali1[i]; } for (int pp = 0; pp < person_num; pp++) { double len2 = 0; for (int i = 0; i < N; i++) { int tmp = rand() % N; valf2[pp*N + i] = tmp; len2 += tmp*tmp; } len2 = sqrt(len2); for (int i = 0; i < N; i++) { valf2[pp*N + i] /= len2; vali2[pp*N + i] = __min(scale, valf2[pp*N + i] * scale + 0.5f); valshort2[pp*N + i] = vali2[pp*N + i]; } } }
第二部分: 不用SSE时浮点点积和整型点积
float dot_float(int len, const float* v1, const float* v2) { float sumf = 0; for (int i = 0; i < len; i++) { sumf += v1[i] * v2[i]; } return sumf; } int dot_int(int len, const int* v1, const int* v2) { int sumi = 0; for (int i = 0; i < len; i++) { sumi += v1[i] * v2[i]; } return sumi; }
len =128, 256, 512, 1024, 2048时,整型比浮点大概快2.3-2.9倍
第三部分:使用SSE (位宽256,头文件在#include <immintrin.h>),浮点与short
注意地址对齐,否则会慢很多,包括前面申请数组内存的时候也要,我是因为测试的都是整16倍的,所以没有加对齐代码,对齐方式参照float tmp[8 + 8], *q = (float*)(((long long)tmp+8)>>5<<5),请读者自己琢磨。
浮点改SSE加速更明显,浮点改SSE后加速2倍左右,整型改SSE之后加速1.5倍左右。
#ifdef use_aligned #define my_mm256_load_ps _mm256_load_ps #define my_mm256_store_ps _mm256_store_ps #define my_mm256_load_si256 _mm256_load_si256 #define my_mm256_store_si256 _mm256_store_si256 #else #define my_mm256_load_ps _mm256_loadu_ps #define my_mm256_store_ps _mm256_storeu_ps #define my_mm256_load_si256 _mm256_loadu_si256 #define my_mm256_store_si256 _mm256_storeu_si256 #endif float dot_float_SSE(int len, const float* v1, const float* v2) { __m256 result = _mm256_setzero_ps(); __m256 val1, val2; float sumf; #ifdef use_aligned float tmp[8 + 8], *q = (float*)(((long long)tmp+8)>>5<<5); #else float q[8]; #endif int i = 0; for (; i < len; i += 8) { val1 = my_mm256_load_ps(v1 + i); val2 = my_mm256_load_ps(v2 + i); val1 = _mm256_mul_ps(val1, val2); result = _mm256_add_ps(result, val1); } my_mm256_store_ps(q, result); sumf = q[0] + q[1] + q[2] + q[3] + q[4] + q[5] + q[6] + q[7]; for (; i < len; i++) { sumf += v1[i] * v2[i]; } return sumf; } int dot_short_SSE(int len, const short* v1, const short* v2) { int sumi; #ifdef use_aligned int tmp[8 + 8], *q = (int*)(((long long)tmp + 8) >> 5 << 5); #else int q[8]; #endif __m256i result = _mm256_setzero_si256(); __m256i val1, val2; int i = 0; const __m256i* vv1 = (const __m256i*)v1; const __m256i* vv2 = (const __m256i*)v2; for (; i < len; i += 16) { val1 = my_mm256_load_si256(vv1++); val2 = my_mm256_load_si256(vv2++); val1 = _mm256_madd_epi16(val1, val2); result = _mm256_add_epi32(result, val1); } my_mm256_store_si256((__m256i*)q, result); sumi = q[0] + q[1] + q[2] + q[3] + q[4] + q[5] + q[6] + q[7]; for (; i < len; i++) { sumi += (int)v1[i] * (int)v2[i]; } return sumi; }
第四部分:SSE批量计算与SSE不批量
看下面这个函数意思意思一下,就是尽量少调用函数。
void dot_float_batch(int len, const float* v1, const float* v2, int num, float* scores) { for (int pp = 0; pp < num; pp++) { float sumf = 0; for (int i = 0; i < len; i++) { sumf += v1[i] * v2[pp*N+i]; } scores[pp] = sumf; } }
SSE批量比不批量提速较为明显,维度越低越明显。
第五部分:展开循环
只展开了256维。为什么选256维,不防看看这里SpherefaceNet-04, SpherefaceNet-06 Release · Issue #81 · wy1iu/sphereface
void dot_short_SSE_batch_dim256(const short* v1, const short* v2, int num, int* scores) { int sumi; #ifdef use_aligned int tmp[8 + 8], *q = (int*)(((long long)tmp + 8) >> 5 << 5); #else int q[8]; #endif for (int nn = 0; nn < num; nn++) { __m256i result = _mm256_setzero_si256(); __m256i val1, val2; int i = 0; const __m256i* vv1 = (const __m256i*)v1; const __m256i* vv2 = (const __m256i*)v2; val1 = my_mm256_load_si256(vv1++); val2 = my_mm256_load_si256(vv2++); val1 = _mm256_madd_epi16(val1, val2); result = _mm256_add_epi32(result, val1); val1 = my_mm256_load_si256(vv1++); val2 = my_mm256_load_si256(vv2++); val1 = _mm256_madd_epi16(val1, val2); result = _mm256_add_epi32(result, val1); val1 = my_mm256_load_si256(vv1++); val2 = my_mm256_load_si256(vv2++); val1 = _mm256_madd_epi16(val1, val2); result = _mm256_add_epi32(result, val1); val1 = my_mm256_load_si256(vv1++); val2 = my_mm256_load_si256(vv2++); val1 = _mm256_madd_epi16(val1, val2); result = _mm256_add_epi32(result, val1); val1 = my_mm256_load_si256(vv1++); val2 = my_mm256_load_si256(vv2++); val1 = _mm256_madd_epi16(val1, val2); result = _mm256_add_epi32(result, val1); val1 = my_mm256_load_si256(vv1++); val2 = my_mm256_load_si256(vv2++); val1 = _mm256_madd_epi16(val1, val2); result = _mm256_add_epi32(result, val1); val1 = my_mm256_load_si256(vv1++); val2 = my_mm256_load_si256(vv2++); val1 = _mm256_madd_epi16(val1, val2); result = _mm256_add_epi32(result, val1); val1 = my_mm256_load_si256(vv1++); val2 = my_mm256_load_si256(vv2++); val1 = _mm256_madd_epi16(val1, val2); result = _mm256_add_epi32(result, val1); val1 = my_mm256_load_si256(vv1++); val2 = my_mm256_load_si256(vv2++); val1 = _mm256_madd_epi16(val1, val2); result = _mm256_add_epi32(result, val1); val1 = my_mm256_load_si256(vv1++); val2 = my_mm256_load_si256(vv2++); val1 = _mm256_madd_epi16(val1, val2); result = _mm256_add_epi32(result, val1); val1 = my_mm256_load_si256(vv1++); val2 = my_mm256_load_si256(vv2++); val1 = _mm256_madd_epi16(val1, val2); result = _mm256_add_epi32(result, val1); val1 = my_mm256_load_si256(vv1++); val2 = my_mm256_load_si256(vv2++); val1 = _mm256_madd_epi16(val1, val2); result = _mm256_add_epi32(result, val1); val1 = my_mm256_load_si256(vv1++); val2 = my_mm256_load_si256(vv2++); val1 = _mm256_madd_epi16(val1, val2); result = _mm256_add_epi32(result, val1); val1 = my_mm256_load_si256(vv1++); val2 = my_mm256_load_si256(vv2++); val1 = _mm256_madd_epi16(val1, val2); result = _mm256_add_epi32(result, val1); val1 = my_mm256_load_si256(vv1++); val2 = my_mm256_load_si256(vv2++); val1 = _mm256_madd_epi16(val1, val2); result = _mm256_add_epi32(result, val1); val1 = my_mm256_load_si256(vv1++); val2 = my_mm256_load_si256(vv2++); val1 = _mm256_madd_epi16(val1, val2); result = _mm256_add_epi32(result, val1); my_mm256_store_si256((__m256i*)q, result); sumi = q[0] + q[1] + q[2] + q[3] + q[4] + q[5] + q[6] + q[7]; scores[nn] = sumi; } }
展开之后性能得到意想不到的提升。
实验:维度*人脸数=25.6百万,外层循环10次,每次随机生成向量,依次统计各种方法时间,每次执行100次,相当于维度*人脸数=2560百万。
符号说明:
浮点:不使用SSE的float点积
浮点*:批量float点积
浮点SSE:SSE float点积
浮点SSE*:SSE批量float点积
浮点SSE**:展开循环的SSE批量float点积
整型:不使用SSE的int点积
整型*:批量int点积
整型SSE:SSE short点积
整型SSE*: SSE批量short点积
整型SSE**:展开循环的SSE批量short点积
时间表
维度 人数 浮点 浮点* 浮点SSE 浮点SSE* 浮点SSE** 整型 整型* 整型SSE 整型SSE* 整型SSE**
128 20M 1.927 1.900 1.057 0.752 - 0.824 0.808 0.547 0.346 -
256 10M 2.166 2.082 1.006 0.786 0.231 0.784 0.802 0.507 0.330 0.172
512 5M 2.205 2.174 1.027 0.889 - 0.845 0.786 0.495 0.313 -
1024 2.5M 2.276 2.225 1.085 0.933 - 0.792 0.814 0.483 0.365 -
2048 1.25M 2.272 2.307 1.096 1.049 - 0.815 0.827 0.471 0.391 -
加速表
维度 人数 浮点 浮点* 浮点SSE 浮点SSE* 浮点SSE** 整型 整型* 整型SSE 整型SSE* 整型SSE**
128 20M 1.000 1.014 1.824 2.563 - 2.337 2.385 3.522 5.574 -
256 10M 1.000 1.040 2.153 2.757 9.375 2.762 2.701 4.273 6.563 12.605
512 5M 1.000 1.014 2.146 2.481 - 2.609 2.805 4.455 7.045 -
1024 2.5M 1.000 1.023 2.098 2.440 - 2.875 2.797 4.717 6.237 -
2048 1.25M 1.000 0.985 2.073 2.165 - 2.788 2.746 4.825 5.813 -
可以看到,与最粗糙的写法相比,不展开的时候也能得到5.5-7.0倍加速,展开的时候达到12.6倍加速
结论:
将浮点特征向量转为short型存储,并使用SSE加速,对特定维数展开循环,可以获得一个量级的性能提升。同时,存储空间也减半。其代价只是不到万分之五的精度损失。