本文是使用 golang 实现 redis 系列的第九篇,主要介绍如何使用 GeoHash 实现搜索附近的人。
搜索附近的POI是一个非常常见的功能,它的技术难点在于地理位置是二维的(经纬度)而我们常用的索引(无论是B树、红黑树还是跳表)都是一维的。GeoHash 算法的本质就是将二维的经纬度转换为一维的表示。
本文核心实现代码可以在Godis:lib/geohash中找到。也可以下载Godis来亲自体验。
GeoHash 原理
我们知道经度的取值范围是[-180,180]
, 纬度取值范围是[-90,90]
。我们将经纬度分别二等分,那么可以将地球表面分为4个部分:
纬度[-90, 0]
用0表示,[0, 90]
用1表示;经度[-180, 0]
用0表示,[0, 180]
用1表示。经度在前纬度在后,这样四个部分都有了一个二进制编码。
我们对四个小矩形继续二等分:
两次等分后的矩形需要用 4bit 来表示。前两位是第一次等分后所在大矩形的编码,后两位表示第二次分割出的小矩形在大矩形中的位置。
对这些矩形进行进一步分割,分割次数越多矩形越小经度越高,最终我们可以得到最够小的矩形来表示一个点。这个小矩形的编码可以代替这个点的坐标,矩形的边长就是GeoHash的误差。
这种分割方式让我们可以用Z型曲线涂满整个地图,这个Z型曲线叫做Peano空间填充曲线。在大多数情况下空间上相邻的点编码也非常相似。少数情况下编码会发生突变,导致编码相似的点空间距离却很大(比如上图中的0111与1000)。
如图所示除Peano空间填充曲线外还有很多空间填充曲线,其中效果公认较好是Hilbert空间填充曲线。相较于Peano曲线而言,Hilbert曲线没有较大的突变。但是由于Peano曲线实现更加简单,所以 Geohash 算法采用的是Peano空间填充曲线。
实现 GeoHash 编解码
看过上文的介绍,我们可以很快的写出GeoHash的编码过程:
// 返回二进制编码和对应矩形的经纬度范围
func encode0(latitude, longitude float64, bitSize uint) ([]byte, [2][2]float64) {
box := [2][2]float64{ // Geohash的矩形
{-180, 180}, // 经度
{-90, 90}, // 纬度
}
pos := [2]float64{longitude, latitude}
bits := make([]bit, 0)
var precision uint = 0
for precision < bitSize { // 循环直到精度足够
for direction, val := range pos { // 轮流处理经纬度,p.s. 看到这个循环了吗?你可以很方便的把GeoHash推广到N维空间
mid := (box[direction][0] + box[direction][1]) / 2 // 计算分割点
if val < mid {
// 经(纬)度小于中点,编码填0,把一下次二分的上界设为当前区间的中点
box[direction][1] = mid
bits = append(bits, 0)
} else {
// 经(纬)度大于中点,编码填1,把一下次二分的下界设为当前区间的中点
box[direction][0] = mid
bits = append(bits, 1)
}
bit++
precision++
if precision == bitSize {
break
}
}
}
return []byte(bits), box
}
代码非常简单,类似于二分查找。遗憾的是和大多数语言一样 golang 操作二进制数据的最小单位是 byte 而非 bit,所以我们需要额外做一些工作来实现按bit编码:
// 这才是真正的实现,请关注与上一节代码的不同
var bits = []uint8{128, 64, 32, 16, 8, 4, 2, 1}
func encode0(latitude, longitude float64, bitSize uint) ([]byte, [2][2]float64) {
box := [2][2]float64{
{-180, 180}, // lng
{-90, 90}, // lat
}
pos := [2]float64{longitude, latitude}
hash := &bytes.Buffer{}
bit := 0
var precision uint = 0
code := uint8(0)
for precision < bitSize {
for direction, val := range pos {
mid := (box[direction][0] + box[direction][1]) / 2
if val < mid {
box[direction][1] = mid
// 编码默认为0,不需要操作
} else {
box[direction][0] = mid
code |= bits[bit]
// 通过位或操作写入1,比如要在字节的第3位写入1应该 code |= 32
}
bit++
if bit == 8 { // 计算完一个字节的编码,将其写入buffer
hash.WriteByte(code)
bit = 0
code = 0
}
precision++
if precision == bitSize {
break
}
}
}
// precision 可能无法被 8 整除,此时剩下的二进制编码写到最后
if code > 0 {
hash.WriteByte(code)
}
return hash.Bytes(), box
}
为了方便传输GeoHash定义了一种文本格式的编码,它是将二进制编码进行Base32变换后得到的:
// GeoHash的映射表和标准Base32映射表有些不同
var enc = base32.NewEncoding("0123456789bcdefghjkmnpqrstuvwxyz").WithPadding(base32.NoPadding)
func ToString(buf []byte) string {
return enc.EncodeToString(buf)
}
写完代码后可以到www.geohash.cn上测试一下结果是否正确。
跟随二进制编码的指示进行二分既可完成解码的过程:
func decode0(hash []byte) [][]float64 {
box := [][]float64{
{-180, 180},
{-90, 90},
}
direction := 0
for i := 0; i < len(hash); i++ {
code := hash[i]
for j := 0; j < len(bits); j++ {
mid := (box[direction][0] + box[direction][1]) / 2
mask := bits[j] // 使用掩码取出指定位
if mask&code > 0 {
// 经(纬)度大于mid
box[direction][0] = mid
} else {
// 经(纬)度小于mid
box[direction][1] = mid
}
direction = (direction + 1) % 2
}
}
return box
}
解码过程不能得到精确的结果只能得到对应矩形的经纬度范围,我们通常使用矩形的中心点来作为编码对应的坐标。
搜索附近
因为位于同一个矩形中的点它们的GeoHash编码拥有相同的前缀,所以我们可以非常容易的找到某个矩形中的所有POI。
理论上我们在使用GeoHash来搜索附近POI时只需要找到一个合适的矩形然后找出其中所有POI即可,实际上我们面临两个问题:
- 即上文提到的GeoHash值突变会导致编码相近两个点空间距离很大。
- 若我们的位置在矩形的边缘, 那么隔壁矩形里的POI可能会更近。
若我们位于红点位置,北面的绿点虽然与我们不在同一个矩形内但显然更近。
为了解决这些问题,我们除了搜索定位点所在的矩形外,还搜索周围8个区域内的POI。
这样搜索附近需要分为两步来实现:
- 计算所有POI的GeoHash值,并使用跳表或B+树等便于进行范围查询的数据结构建立索引
- 计算“附近区域”对应的 GeoHash 编码,找出这些区域内的所有POI
建立空间索引
我们将GeoHash的精度设置为64位,这样我们就可以将GeoHash编码转换成uint64类型存入 SortedSet 数据结构中:
func ToInt(buf []byte) uint64 {
if len(buf) < 8 { // 用0填充不足的位数
buf2 := make([]byte, 8)
copy(buf2, buf)
return binary.BigEndian.Uint64(buf2)
}
return binary.BigEndian.Uint64(buf)
}
因为位于同一矩形的二进制编码拥有相同的前缀所以我们需要将二进制编码的低端作为uint64的高位(即使用大端序),这样位于同一矩形的uint64编码都会处于同一个数字区间内。比如0110矩形内所有点的uint64编码都会处于[0x6000000000000000, 0x7000000000000000)
区间内, 结合 SortedSet 的范围查询能力我们可以非常迅速地(时间复杂度O(logN))找到所有位于0110区域内的POI。
使用SortedSet 进行索引的代码可以在 Godis:db/geo.go 中找到。
找到附近的区域
我们知道地球半径约为 6371km 那么第一次分割后我们得到了四个东西宽 6371π km 南北高 3186π km 的矩形,以此递推在分割10次后我们可以得到宽约40km高约20km的矩形。也就是说 20bit 的 GeoHash 编码东西方向上的误差为 40km, 南北方向误差为 20 km。
我们在wikipedia 上查到 geohash 的误差范围:
表格中的geohash length 是 base32 编码后的字符串长度,1个字符可以表示5位(bit)。
我们也可以用代码估算指定的距离需要的geohash length:
func estimatePrecisionByRadius(radiusMeters float64, latitude float64) uint {
if radiusMeters == 0 {
return defaultBitSize - 1
}
var precision uint = 1
for radiusMeters < MercatorMax {
radiusMeters *= 2
precision++
}
precision -= 2
if latitude > 66 || latitude < -66 {
precision--
if latitude > 80 || latitude < -80 {
precision--
}
}
if precision < 1 {
precision = 1
}
if precision > 32 {
precision = 32
}
return precision*2 - 1
}
这段代码中需要注意两点:
- 因为地球是球型在绘制地图时采用墨卡托投影法(Mercator Projection)在靠近南北两极的地方投影面积比较大(在世界地图上靠近北极的格陵兰岛看上去非常大),所以在高纬度区域需要减少精度
- 经度的取值范围是
[-180,180]
纬度的取值范围是[-90,90]
, 所以在经纬度等分相同次数后我们得到的矩形总是东西长南北短。为了解决这个问题我们返回的精度总是奇数(precision*2 - 1), 这样经度比纬度多分割一次就可以得到长宽基本相等的矩形了。
接下来我们计算矩形区域对应的uint64编码的上下界:
func ToRange(scope []byte, precision uint) [2]uint64 {
lower := ToInt(scope)
radius := uint64(1 << (64 - precision))
upper := lower + radius
return [2]uint64{lower, upper}
}
比如位于0110矩形内的点它们的uint64编码会处于[0110..., 0111...)
范围内,在二进制编码中找到合适的位置+1就可以了。
下一步是计算九宫格的GeoHash编码,我们采用了计算各个矩形中心点经纬度然后重新编码的实现方式:
func GetNeighbours(latitude, longitude, radiusMeters float64) [][2]uint64 {
precision := estimatePrecisionByRadius(radiusMeters, latitude)
center, box := encode0(latitude, longitude, precision)
height := box[0][1] - box[0][0]
width := box[1][1] - box[1][0]
centerLng := (box[0][1] + box[0][0]) / 2
centerLat := (box[1][1] + box[1][0]) / 2
maxLat := ensureValidLat(centerLat + height)
minLat := ensureValidLat(centerLat - height)
maxLng := ensureValidLng(centerLng + width)
minLng := ensureValidLng(centerLng - width)
var result [10][2]uint64
leftUpper, _ := encode0(maxLat, minLng, precision)
result[1] = ToRange(leftUpper, precision)
upper, _ := encode0(maxLat, centerLng, precision)
result[2] = ToRange(upper, precision)
rightUpper, _ := encode0(maxLat, maxLng, precision)
result[3] = ToRange(rightUpper, precision)
left, _ := encode0(centerLat, minLng, precision)
result[4] = ToRange(left, precision)
result[5] = ToRange(center, precision)
right, _ := encode0(centerLat, maxLng, precision)
result[6] = ToRange(right, precision)
leftDown, _ := encode0(minLat, minLng, precision)
result[7] = ToRange(leftDown, precision)
down, _ := encode0(minLat, centerLng, precision)
result[8] = ToRange(down, precision)
rightDown, _ := encode0(minLat, maxLng, precision)
result[9] = ToRange(rightDown, precision)
return result[1:]
}
也可以通过某个矩形的编码快速推断出附近8个矩形的编码, 这种方式实现难度较高可以参考 Redis 中的实现:
- 入口: geohashNeighbors
- 东西向平移: geohash_move_x
- 南北向平移: geohash_move_y
最后在 SortedSet 中找到 POI:
func geoRadius0(sortedSet *sortedset.SortedSet, lat float64, lng float64, radius float64) redis.Reply {
areas := geohash.GetNeighbours(lat, lng, radius)
members := make([][]byte, 0)
for _, area := range areas {
lower := &sortedset.ScoreBorder{Value: float64(area[0])}
upper := &sortedset.ScoreBorder{Value: float64(area[1])}
elements := sortedSet.RangeByScore(lower, upper, 0, -1, true)
for _, elem := range elements {
members = append(members, []byte(elem.Member))
}
}
return reply.MakeMultiBulkReply(members)
}
OK, 大功告成。