转载自:哈希分治法 - 统计海量数据中出现次数最多的前10个IP

实现代码如下:

// 在海量数据中找出出现次数最高的前 10 个 IP
// 算法思想, 分治法
// IP 记录文件大小 10 GB, 约 10 亿个 IP, 内存限制 64 MB
// 将 IP 记录文件中的 IP 对 500 取模, 散列到 500 个小文件中
// 相同的 IP 会被散列到同一个文件中
// 平均每个小文件 20 MB, 在 64 MB 之内
// 再在每个小文件中用二叉树统计每个 IP 的次数

// 遍历二叉树返回最大的 IP 和出现次数
// 因为 IP 有 2^32 个, 要保证能产生重复的 IP 需要生成 2^32 个 IP
// 约要 64 GB 的文件, 为了保证 10 GB 的文件中也能产生重复 IP
// 将 IP 段设置为 0.0.0.0 - 100.100.100.100
// fopen() 最多只能打开 507 个文件

// 后记: 将主记录文件分解成 500 个小文件后 (分)
// 可以将这些小文件通过网络发送给其他计算机计算 (治)
// 再将结果返回给主程序, 即类似 google 的 MapReduce 工作原理

#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <io.h>			// access()
#include <string.h>

const int num = 1000000;		// 10 亿, 手动修改 num

unsigned int iton(char *ip);		// 点分记法的 IP 地址转为整型
void ntoi(unsigned int num, char *ip);
int  fileexist(char *path);			// 判断文件存在
void fclose_all(FILE **t);			// 关闭所有文件
int  random_write(char *path);		// 随机生成 6.7 亿个 IP, 约 10 GB
// 统计 hashfile 中次数最多的 IP
void count(char *hashfile, unsigned int *data, unsigned int *num);
void sort(unsigned int *max, unsigned int *ip, int n);		// 排序
inline unsigned int hash(unsigned int ip)				// 哈希函数
{ return (ip % 500); }

typedef struct node		// 二叉树结点
{
	unsigned int ip;	// IP
	unsigned int n;		// 出现次数
	node *left;
	node *right;
}node;

int main(void)
{
	FILE *in = NULL;
	FILE *tmpfile[505];
	char *path = "c:\\ip_data.dat";
	char hashfile[50];
	char buf[20];
	unsigned int add, data, n;
	unsigned int ip[10], max[10];	// 记录前十的 IP
	unsigned int t1, t2, s, e;		// 记录时间
	int i, j, len, now;			// IP 个数

	printf("正在生成数据 %s\n\n", path);
	if (!random_write(path)) return 0;	// 随机生成 IP 记录文件

	// 判断文件夹是否存在, access() == 0 存在
	if (access("c:\\hashfile", 0) == 0)
		system("rmdir /s /q c:\\hashfile");
	system("mkdir c:\\hashfile");		// 建立工作目录
	system("attrib +h c:\\hashfile");	// 隐藏目录

	in = fopen(path, "rt");			// 打开 IP 记录文件
	if (in == NULL) return 0;
	for (i=0; i<505; i++) tmpfile[i] = NULL;

	// 将 6.7 亿个 IP 散列到 505 个小文件中
	printf("\r正在哈希 %s\n\n", path);
	e = s = t1 = clock();
	now = 0;
	while (fscanf(in, "%s", buf) != EOF)
	{
		data = iton(buf);		// IP 数字化
		add = hash(data);		// 计算哈希地址, 文件地址
		sprintf(hashfile, "c:\\hashfile\\hash_%u.~tmp", add);
		if (tmpfile[add] == NULL)
			tmpfile[add] = fopen(hashfile, "a");
		sprintf(buf, "%u\n", data);
		len = strlen(buf);
		// 将该 IP 写入到文件中, 反复读写磁盘慢
		fwrite(buf, len, 1, tmpfile[add]);
		now++;
		e = clock();
		if (e - s > 1000)		// 计算进度
		{
			printf("\r处理进度 %0.2f %%\t", (now * 100.0) / num);
			s = e;
		}
	}
	fclose(in);
	fclose_all(tmpfile);
	remove(path);

	// 在每个小文件中统计重复度最高的 IP, 可以用多台计算机处理
	for (i=0; i<10; i++) max[i] = 0;
	for (i=0; i<500; i++)
	{
		sprintf(hashfile, "c:\\hashfile\\hash_%d.~tmp", i);
		if (fileexist(hashfile))
		{
			printf("\r正在处理 hash_%d.~tmp\t", i);
			// 统计该小文件中的次数最多的 IP
			count(hashfile, &data, &n);
			// 因为只有 10 个元素, 用选择排序的思想记录最大的 10 个
			// 如果元素过多, 可以用插入排序的思想找或者用堆
			unsigned int min = 0xFFFFFFFF, pos;
			for (j=0; j<10; j++)
			{
				if (max[j] < min)
				{
					min = max[j];
					pos = j;
				}
			}
			if (n > min)
			{
				max[pos] = n;
				ip[pos] = data;
			}
		}
	}
	t2 = clock();
	sort(max, ip, 10);

	FILE *log = NULL;		// 同时在 C:\ip_result.txt 记录结果
	log = fopen("C:\\ip_result.txt", "wt");
	fprintf(log, "\n访问次数最高的前 10 个 IP:\n\n");
	fprintf(log, "     %-15s%s\n", "IP", "访问次数");
	printf("\n\n访问次数最高的前 10 个 IP:\n\n");
	printf("     %-15s%s\n", "IP", "访问次数");
	for (i=0; i<10; i++)
	{
		ntoi(ip[i], buf);		// 解码
		printf("   %-20s%u\n", buf, max[i]);
		fprintf(log, "   %-20s%u\n", buf, max[i]);
	}
	fprintf(log, "\n--- 用时 %0.3f 秒\n", (t2 - t1) / 1000.0);
	printf("\n--- 用时 %0.3f 秒\n\n", (t2 - t1) / 1000.0);
	fclose(log);
	system("rmdir /s /q c:\\hashfile");

	return 0;
}

void fclose_all(FILE **t)		// 关闭所有文件
{
	int i;

	for (i=0; i<500; i++)
	{
		if (t[i])
		{
			fclose(t[i]);
			t[i] = NULL;
		}
	}
}

// 随机生成 6.7 亿个 IP, 约 10 GB
int random_write(char *path)
{
	FILE *out = NULL;
	int i, j, b;
	char buf[20];
	char *cur;
	unsigned int s, e;

	out = fopen(path, "wt");
	if (out == NULL) return 0;
	srand(time(NULL));
	e = s = clock();
	for (i=0; i<num; i++)
	{
		e = clock();
		if (e - s > 1000)		// 计算进度
		{
			printf("\r处理进度 %0.2f %%\t", (i * 100.0) / num);
			s = e;
		}
		for (j=0; j<20; j++) buf[j] = '\0';
		cur = buf;
		for (j=0; j<4; j++)
		{
			// 点分记法中应该产生 0-255 的数值
			// 这里产生 0-101 的数值
			b = rand() % 101;
			sprintf(cur, "%d.", b);
			while (*cur != '\0') cur++;
		}
		*(cur - 1) = '\n';
		fwrite(buf, cur-(char *)buf, 1, out);
	}
	fclose(out);		// 记得要关闭
	return 1;
}

// 二叉树的插入
void insert(node **tree, unsigned int ip)
{
	if ((*tree) == NULL)
	{
		// new_node
		(*tree) = (node *)malloc(sizeof(node));
		(*tree)->ip = ip;
		(*tree)->n = 1;
		(*tree)->left = (*tree)->right = NULL;
	}
	else if ((*tree)->ip == ip)
	{
		(*tree)->n++;
		return ;
	}
	else if (ip < (*tree)->ip)		// 左子树
		insert(&((*tree)->left), ip);
	else insert(&((*tree)->right), ip);	// 右子树
}

unsigned int maxn;		// 入口参数
node *max_node;			// 出口参数
void max_n(node *tree)	// 找出树中 n 最大的结点
{
	if (tree)
	{
		if (tree->n > maxn)
		{
			maxn = tree->n;
			max_node = tree;
		}
		max_n(tree->left);
		max_n(tree->right);
	}
}

void destory(node *tree)		// 释放树
{
	if (tree)
	{
		destory(tree->left);
		destory(tree->right);
		free(tree);
	}
}

// 统计 hashfile 中次数最多的 IP
void count(char *hashfile, unsigned int *data, unsigned int *n)
{
	FILE *in = NULL;
	node *tree = NULL;
	unsigned int ip;

	in = fopen(hashfile, "rt");
	while (fscanf(in, "%d", &ip) != EOF)
	{
		insert(&tree, ip);
	}
	fclose(in);
	maxn = 0;
	max_n(tree);	// 结果在 max_node
	*n = max_node->n;
	*data = max_node->ip;
	destory(tree);
}

// 插入排序
void sort(unsigned int *max, unsigned int *ip, int n)
{
	int i, j;
	unsigned int tmpm, tmpi;

	for (i=1; i<n; i++)
	{
		if (max[i-1] < max[i])
		{
			tmpm = max[i];
			tmpi = ip[i];
			for (j=i; j>0; j--)
			{
				if (max[j-1] < tmpm)
				{
					max[j] = max[j-1];
					ip[j]  = ip[j-1];
				}
				else break;
			}
			max[j] = tmpm;
			ip[j]  = tmpi;
		}
	}
}

// 将字符串的 IP 地址转为整型
unsigned int iton(char *ip)
{
	unsigned int r = 0;
	unsigned int t;
	int i, j;

	for (j=i=0; i<4; i++)
	{
		sscanf(ip + j, "%d", &t);
		if (i < 3)
			while (ip[j++] != '.');
		r = r << 8;
		r += t;
	}
	return r;
}

// 将整型 num 转为字符型 IP
void ntoi(unsigned int num, char *ip)
{
	unsigned int b, f;
	int i, cur;

	f = 0x00FFFFFF;
	cur = 0;
	for (i=3; i>=0; i--)
	{
		b = num >> (i * 8);
		num = num & f;
		f = f >> 8;
		sprintf(ip + cur, "%u.", b);
		while (ip[cur] != '\0') cur++;
	}
	ip[cur - 1] = '\0';
}

// 判断文件存在
int fileexist(char *path)
{
	FILE *fp = NULL;

	fp = fopen(path, "rt");
	if (fp)
	{
		fclose(fp);
		return 1;
	}
	else return 0;
}

  

01-20 01:44