import org.jboss.netty.buffer.{ChannelBuffers, ChannelBuffer}
import java.nio.charset.Charset
import BigDecimal.RoundingMode._
/*
* 采用LittleEndian字节顺序。结构为控制字节+附加字节
* 控制字节
* 7 符号位
* 6-4 数据字节数量,在实际数据字节中保存原始数据的绝对值
* 3-0 特定小整数保留位,0-15的数字可以不使用数据字节直接在控制字节中表示
*
* 范围截断。由于控制字节的限制,最多附加4bit + 7个字节(即60bit)的数据,如果超过范围,则进行截断。
*/ class BitCompress(buf: ChannelBuffer) {
def compressLong(in: Long) {
//抽取符号位、如果为负值则取绝对值,同时进行范围截断
var (control, abs) = {
val MaxAbsValue = 0x0fffffffffffffffl //60bit
if (in < 0) (0x80.toByte, (-in) & MaxAbsValue)
else (0x00.toByte, in & MaxAbsValue)
} //附加数据缓冲区
val additionalBuffer = ChannelBuffers.buffer(8) //去除待压缩数据中多余的0
var additionalLength = 0 //为方便,将控制字节中的附加数据也作为一个附加数据处理,存储附加数据个数时应减1
if (abs == 0) additionalLength += 1 //0无法进入else中的循环,无法为附加数据长度赋值,因而特殊处理
else {
val BytesOfLong = 8
for (i <- 1 to BytesOfLong) {
val dataByte = ((abs >>> (BytesOfLong - i) * 8) & 0xff).toByte
if ((additionalLength != 0) || (dataByte != 0)) {
//有效数据开始:附加数据不为空或者字节数据不为0 if (additionalLength != 0) {
additionalBuffer.writeByte(dataByte)
additionalLength += 1
}
else {
//附加数据为空,有可能在控制字节中写入数据
//高4位有数据,为偶数个4bit,控制字节低4bit不用存放数据
//高4位没有数据,为奇数个4bit,控制字节低4bit需要存放数据
if ((dataByte & 0xf0) != 0) {
additionalBuffer.writeByte(dataByte)
additionalLength += 2
}
else {
control = (control | dataByte).toByte
additionalLength += 1
}
}
}
}
} //附加数据长度
additionalLength -= 1 //为方便,将控制字节中的附加数据也作为一个附加数据处理,存储附加数据个数时应减1
control = (control | (additionalLength << 4)).toByte //写入结果
buf.writeByte(control)
if (additionalLength != 0) buf.writeBytes(additionalBuffer)
} //压缩数据
def <-<(in: Byte) { compressLong(in.toLong) }
def <-<(in: Short) { compressLong(in.toLong) }
def <-<(in: Int) { compressLong(in.toLong)}
def <-<(in: Long) { compressLong(in) } //压缩BigDecimal数据
def <-<(in: BigDecimal) {
compressLong(in.setScale(0, HALF_UP).toLong)
} //压缩字符串(以0为结尾标志)
def compressString(bytes: Array[Byte]) {
buf.writeBytes(bytes)
buf.writeByte(0)
} def <-<(bytes: Array[Byte]) { compressString(bytes) } //写入数据
def <--(in: Byte) { buf.writeByte(in) }
def <--(in: Short) { buf.writeShort(in) }
def <--(in: Int) { buf.writeInt(in) }
def <--(in: Long) { buf.writeLong(in) }
def <--(in: Float) { buf.writeFloat(in) }
def <--(in: Double) { buf.writeDouble(in) } //解压Long数据
def decompressLong() = {
//控制字节
//7 符号位
//6-4 数据字节数,数据字节中保存原始数据的绝对值
//3-0 特定小整数保留位,0-15的数字可以不使用数据字节直接在控制字节中表示
val control = buf.readByte()
val negative = (control & 0x80) == 0x80 //是否为负数 //附加数据长度
val additionalLength = (control & 0x70) >>> 4 //控制字节所包含的附加数据
var result: Long = control.toLong & 0x0f //解压
for (i <- 1 to additionalLength) {
result <<= 8
result |= (buf.readByte().toLong & 0xffL) //byte转为Long时,如为负数则默认填充位为1
} //符号
if (negative) result = -result result
} def decompressString(charsetName: String = "GBK") = {
val dup = buf.duplicate() var length = 0
var done = false
while (dup.readable() && !done) {
if (dup.readByte() != 0) length += 1
else done = true
} val result = buf.readBytes(length).toString(Charset.forName(charsetName))
buf.skipBytes(1) result
} def readByte() = buf.readByte()
def readShort() = buf.readShort()
def readInt() = buf.readInt()
def readLong() = buf.readLong()
def readFloat() = buf.readFloat()
def readDouble() = buf.readDouble()
}
行情二进制写
import java.nio.ByteOrder
import java.util import org.jboss.netty.buffer.ChannelBuffers class SnapshotBuffer() { def read(bytes: Array[Byte]): Snapshot = {
null
} def write(snap: Snapshot): Array[Byte] = {
val bufferSize = 1024 * 1024
val buffer = ChannelBuffers.dynamicBuffer(ByteOrder.LITTLE_ENDIAN, bufferSize)
val bc = new BitCompress(buffer)
val points = 10000 bc.compressString(snap.from.getBytes("UTF-8"))
bc <-- snap.totalSn
bc <-- snap.exchangeSn
bc <-- snap.varietySn val year = snap.dateTime.getYear
val month = snap.dateTime.getMonthOfYear
val day = snap.dateTime.getDayOfMonth
val date = year * 10000 + month * 100 + day
val hour = snap.dateTime.getHourOfDay
val minute = snap.dateTime.getMinuteOfHour
val second = snap.dateTime.getSecondOfMinute
val mmsec = hour * 10000 + minute * 100 + second bc.compressLong(date)
bc.compressLong(mmsec)
bc.compressLong(snap.exchangeType.id)
bc.compressLong(snap.varietyType.id)
bc.compressLong(snap.marketStatus.id)
bc.compressLong(snap.snapshots.size) snap.snapshots.foreach { item =>
bc <-< 4 //不压缩
bc <-- (item.dateTime.getMillis / 1000).toInt
bc <-- item.marketId.id.toShort //字符串以0结束
bc.compressString(item.symbol.getBytes("UTF-8"))
bc.compressString(item.name.getBytes("UTF-8")) bc <-< item.prevClose * points
bc <-< (item.open - item.prevClose) * points
bc <-< (item.high - item.prevClose) * points
bc <-< (item.low - item.prevClose) * points
bc <-< (item.close - item.prevClose) * points //不压缩
bc <-- item.volume.toFloat
bc <-- item.amount.toFloat //保留2位压缩
bc <-< item.pe * 100 val bids = Array.fill(5)(new OrderBook(0, 0))
val asks = Array.fill(5)(new OrderBook(0, 0)) item.bids.toArray.copyToArray(bids)
item.asks.toArray.copyToArray(asks) val buyPrice = bids(0).price
bc <-< buyPrice * points
bc <-< (bids(1).price - buyPrice) * points
bc <-< (bids(2).price - buyPrice) * points
bc <-< (bids(3).price - buyPrice) * points
bc <-< (bids(4).price - buyPrice) * points bc <-< bids(0).lots
bc <-< bids(1).lots
bc <-< bids(2).lots
bc <-< bids(3).lots
bc <-< bids(4).lots bc <-< (asks(0).price - buyPrice) * points
bc <-< (asks(1).price - buyPrice) * points
bc <-< (asks(2).price - buyPrice) * points
bc <-< (asks(3).price - buyPrice) * points
bc <-< (asks(4).price - buyPrice) * points bc <-< asks(0).lots
bc <-< asks(1).lots
bc <-< asks(2).lots
bc <-< asks(3).lots
bc <-< asks(4).lots bc <-< item.tradeLots
bc <-< {
if (item.suspended) 1 else 0
}
bc <-< item.holds
bc <-< 0
} util.Arrays.copyOf(buffer.array, buffer.writerIndex) }
}
其中copyOf方法是将buffer复制到一个新数组,但是把多分配的size删除。
行情二进制的抽取
object SnapshotWrapper { val defaultTimeZone = DateTimeZone.forID("Asia/Shanghai")
def unapply(binary: Array[Byte]): Option[Snapshot] = {
val c = new BitCompress(ChannelBuffers.wrappedBuffer(ByteOrder.LITTLE_ENDIAN, binary)) val from = c.decompressString("UTF-8")
val sn = c.readLong() val dateTime = {
val date = c.decompressLong().toInt
val time = c.decompressLong().toInt DateTimeFormat.forPattern("yyyyMMddHHmmssZ").withOffsetParsed().parseDateTime(
f"""$date%08d$time%06d+0800"""
)
} val marketId = MarketId(c.decompressLong().toInt) //9:25 - 9:30 之间市场标志已经处于连续交易状态,但实际市场处于连续交易之前的准备阶段
//特殊处理一下,将9:30之前的连续交易状态处理成集合竞价状态
val marketStatus = {
val cur = MarketStatus(c.decompressLong().toInt) val before930 = dateTime.getHourOfDay == 9 &&
dateTime.getMinuteOfHour < 30 val before9 = dateTime.getHourOfDay < 9 import MarketStatus._
if (
cur == ContinueTrade && (before930 || before9)
) {
Auction } else {
cur
}
} val count = c.decompressLong().toInt val buf = new ListBuffer[SnapshotItem]
for (i <- 1 to count) {
val divider = if (c.decompressLong() == 4) 10000 else 1000 //价格除数 val time = new DateTime(c.readInt()*1000L, defaultTimeZone) //date time val marketId = MarketId(c.readShort())
val symbol = c.decompressString()
val cnName = c.decompressString() val prevClose = BigDecimal(c.decompressLong()) / divider
val open = (BigDecimal(c.decompressLong()) / divider) + prevClose
val high = (BigDecimal(c.decompressLong()) / divider) + prevClose
val low = (BigDecimal(c.decompressLong()) / divider) + prevClose
val close = (BigDecimal(c.decompressLong()) / divider) + prevClose val volume = BigDecimal(c.readFloat())
val amount = BigDecimal(c.readFloat()) val pe = BigDecimal(c.decompressLong()) / 100 //order books
val (bids, asks) = orderBooks(c, divider) val tradeLots = c.decompressLong() //成交笔数 val suspended = if (c.decompressLong() == 0) false else true //停牌
val holds = c.decompressLong() //持仓
c.decompressLong() //unused buf += SnapshotItem(
time,
marketId, symbol, cnName,
prevClose, open, high, low, close, volume, amount,
pe, bids, asks,
tradeLots, suspended, holds
)
} Some(Snapshot(
from,
sn,
dateTime,
marketId, marketStatus,
buf.toList
))
} /**
* 优化OrderBook性能
*
* order book结构如下
* bids: price1 ... price5 lots1 ... lots5
* asks: price1 ... price5 lots1 ... lots5
*/
@inline
private def orderBooks(c: BitCompress, divider: Int) = {
val base = c.decompressLong() val bidsPrice = Array(
base,
c.decompressLong() + base,
c.decompressLong() + base,
c.decompressLong() + base,
c.decompressLong() + base
) val bidsLots = Array(
c.decompressLong(),
c.decompressLong(),
c.decompressLong(),
c.decompressLong(),
c.decompressLong()
) val asksPrice = Array(
c.decompressLong() + base,
c.decompressLong() + base,
c.decompressLong() + base,
c.decompressLong() + base,
c.decompressLong() + base
) val asksLots = Array(
c.decompressLong(),
c.decompressLong(),
c.decompressLong(),
c.decompressLong(),
c.decompressLong()
) var bids: List[OrderBook] = Nil
for (i <- 0 to 4) {
if (bidsPrice(i) != 0) {
bids = OrderBook(
BigDecimal(bidsPrice(i)) / divider,
BigDecimal(bidsLots(i))
) :: bids
}
} var asks: List[OrderBook] = Nil
for (i <- 0 to 4) {
if (asksPrice(i) != 0) {
asks = OrderBook(
BigDecimal(asksPrice(i)) / divider,
BigDecimal(asksLots(i))
) :: asks
}
} (bids.reverse, asks.reverse)
}
}