转载自:哈希分治法 - 统计海量数据中出现次数最多的前10个IP
实现代码如下:
// 在海量数据中找出出现次数最高的前 10 个 IP // 算法思想, 分治法 // IP 记录文件大小 10 GB, 约 10 亿个 IP, 内存限制 64 MB // 将 IP 记录文件中的 IP 对 500 取模, 散列到 500 个小文件中 // 相同的 IP 会被散列到同一个文件中 // 平均每个小文件 20 MB, 在 64 MB 之内 // 再在每个小文件中用二叉树统计每个 IP 的次数 // 遍历二叉树返回最大的 IP 和出现次数 // 因为 IP 有 2^32 个, 要保证能产生重复的 IP 需要生成 2^32 个 IP // 约要 64 GB 的文件, 为了保证 10 GB 的文件中也能产生重复 IP // 将 IP 段设置为 0.0.0.0 - 100.100.100.100 // fopen() 最多只能打开 507 个文件 // 后记: 将主记录文件分解成 500 个小文件后 (分) // 可以将这些小文件通过网络发送给其他计算机计算 (治) // 再将结果返回给主程序, 即类似 google 的 MapReduce 工作原理 #include <stdio.h> #include <stdlib.h> #include <time.h> #include <io.h> // access() #include <string.h> const int num = 1000000; // 10 亿, 手动修改 num unsigned int iton(char *ip); // 点分记法的 IP 地址转为整型 void ntoi(unsigned int num, char *ip); int fileexist(char *path); // 判断文件存在 void fclose_all(FILE **t); // 关闭所有文件 int random_write(char *path); // 随机生成 6.7 亿个 IP, 约 10 GB // 统计 hashfile 中次数最多的 IP void count(char *hashfile, unsigned int *data, unsigned int *num); void sort(unsigned int *max, unsigned int *ip, int n); // 排序 inline unsigned int hash(unsigned int ip) // 哈希函数 { return (ip % 500); } typedef struct node // 二叉树结点 { unsigned int ip; // IP unsigned int n; // 出现次数 node *left; node *right; }node; int main(void) { FILE *in = NULL; FILE *tmpfile[505]; char *path = "c:\\ip_data.dat"; char hashfile[50]; char buf[20]; unsigned int add, data, n; unsigned int ip[10], max[10]; // 记录前十的 IP unsigned int t1, t2, s, e; // 记录时间 int i, j, len, now; // IP 个数 printf("正在生成数据 %s\n\n", path); if (!random_write(path)) return 0; // 随机生成 IP 记录文件 // 判断文件夹是否存在, access() == 0 存在 if (access("c:\\hashfile", 0) == 0) system("rmdir /s /q c:\\hashfile"); system("mkdir c:\\hashfile"); // 建立工作目录 system("attrib +h c:\\hashfile"); // 隐藏目录 in = fopen(path, "rt"); // 打开 IP 记录文件 if (in == NULL) return 0; for (i=0; i<505; i++) tmpfile[i] = NULL; // 将 6.7 亿个 IP 散列到 505 个小文件中 printf("\r正在哈希 %s\n\n", path); e = s = t1 = clock(); now = 0; while (fscanf(in, "%s", buf) != EOF) { data = iton(buf); // IP 数字化 add = hash(data); // 计算哈希地址, 文件地址 sprintf(hashfile, "c:\\hashfile\\hash_%u.~tmp", add); if (tmpfile[add] == NULL) tmpfile[add] = fopen(hashfile, "a"); sprintf(buf, "%u\n", data); len = strlen(buf); // 将该 IP 写入到文件中, 反复读写磁盘慢 fwrite(buf, len, 1, tmpfile[add]); now++; e = clock(); if (e - s > 1000) // 计算进度 { printf("\r处理进度 %0.2f %%\t", (now * 100.0) / num); s = e; } } fclose(in); fclose_all(tmpfile); remove(path); // 在每个小文件中统计重复度最高的 IP, 可以用多台计算机处理 for (i=0; i<10; i++) max[i] = 0; for (i=0; i<500; i++) { sprintf(hashfile, "c:\\hashfile\\hash_%d.~tmp", i); if (fileexist(hashfile)) { printf("\r正在处理 hash_%d.~tmp\t", i); // 统计该小文件中的次数最多的 IP count(hashfile, &data, &n); // 因为只有 10 个元素, 用选择排序的思想记录最大的 10 个 // 如果元素过多, 可以用插入排序的思想找或者用堆 unsigned int min = 0xFFFFFFFF, pos; for (j=0; j<10; j++) { if (max[j] < min) { min = max[j]; pos = j; } } if (n > min) { max[pos] = n; ip[pos] = data; } } } t2 = clock(); sort(max, ip, 10); FILE *log = NULL; // 同时在 C:\ip_result.txt 记录结果 log = fopen("C:\\ip_result.txt", "wt"); fprintf(log, "\n访问次数最高的前 10 个 IP:\n\n"); fprintf(log, " %-15s%s\n", "IP", "访问次数"); printf("\n\n访问次数最高的前 10 个 IP:\n\n"); printf(" %-15s%s\n", "IP", "访问次数"); for (i=0; i<10; i++) { ntoi(ip[i], buf); // 解码 printf(" %-20s%u\n", buf, max[i]); fprintf(log, " %-20s%u\n", buf, max[i]); } fprintf(log, "\n--- 用时 %0.3f 秒\n", (t2 - t1) / 1000.0); printf("\n--- 用时 %0.3f 秒\n\n", (t2 - t1) / 1000.0); fclose(log); system("rmdir /s /q c:\\hashfile"); return 0; } void fclose_all(FILE **t) // 关闭所有文件 { int i; for (i=0; i<500; i++) { if (t[i]) { fclose(t[i]); t[i] = NULL; } } } // 随机生成 6.7 亿个 IP, 约 10 GB int random_write(char *path) { FILE *out = NULL; int i, j, b; char buf[20]; char *cur; unsigned int s, e; out = fopen(path, "wt"); if (out == NULL) return 0; srand(time(NULL)); e = s = clock(); for (i=0; i<num; i++) { e = clock(); if (e - s > 1000) // 计算进度 { printf("\r处理进度 %0.2f %%\t", (i * 100.0) / num); s = e; } for (j=0; j<20; j++) buf[j] = '\0'; cur = buf; for (j=0; j<4; j++) { // 点分记法中应该产生 0-255 的数值 // 这里产生 0-101 的数值 b = rand() % 101; sprintf(cur, "%d.", b); while (*cur != '\0') cur++; } *(cur - 1) = '\n'; fwrite(buf, cur-(char *)buf, 1, out); } fclose(out); // 记得要关闭 return 1; } // 二叉树的插入 void insert(node **tree, unsigned int ip) { if ((*tree) == NULL) { // new_node (*tree) = (node *)malloc(sizeof(node)); (*tree)->ip = ip; (*tree)->n = 1; (*tree)->left = (*tree)->right = NULL; } else if ((*tree)->ip == ip) { (*tree)->n++; return ; } else if (ip < (*tree)->ip) // 左子树 insert(&((*tree)->left), ip); else insert(&((*tree)->right), ip); // 右子树 } unsigned int maxn; // 入口参数 node *max_node; // 出口参数 void max_n(node *tree) // 找出树中 n 最大的结点 { if (tree) { if (tree->n > maxn) { maxn = tree->n; max_node = tree; } max_n(tree->left); max_n(tree->right); } } void destory(node *tree) // 释放树 { if (tree) { destory(tree->left); destory(tree->right); free(tree); } } // 统计 hashfile 中次数最多的 IP void count(char *hashfile, unsigned int *data, unsigned int *n) { FILE *in = NULL; node *tree = NULL; unsigned int ip; in = fopen(hashfile, "rt"); while (fscanf(in, "%d", &ip) != EOF) { insert(&tree, ip); } fclose(in); maxn = 0; max_n(tree); // 结果在 max_node *n = max_node->n; *data = max_node->ip; destory(tree); } // 插入排序 void sort(unsigned int *max, unsigned int *ip, int n) { int i, j; unsigned int tmpm, tmpi; for (i=1; i<n; i++) { if (max[i-1] < max[i]) { tmpm = max[i]; tmpi = ip[i]; for (j=i; j>0; j--) { if (max[j-1] < tmpm) { max[j] = max[j-1]; ip[j] = ip[j-1]; } else break; } max[j] = tmpm; ip[j] = tmpi; } } } // 将字符串的 IP 地址转为整型 unsigned int iton(char *ip) { unsigned int r = 0; unsigned int t; int i, j; for (j=i=0; i<4; i++) { sscanf(ip + j, "%d", &t); if (i < 3) while (ip[j++] != '.'); r = r << 8; r += t; } return r; } // 将整型 num 转为字符型 IP void ntoi(unsigned int num, char *ip) { unsigned int b, f; int i, cur; f = 0x00FFFFFF; cur = 0; for (i=3; i>=0; i--) { b = num >> (i * 8); num = num & f; f = f >> 8; sprintf(ip + cur, "%u.", b); while (ip[cur] != '\0') cur++; } ip[cur - 1] = '\0'; } // 判断文件存在 int fileexist(char *path) { FILE *fp = NULL; fp = fopen(path, "rt"); if (fp) { fclose(fp); return 1; } else return 0; }