多租户数据库的缓冲区共享和预分配方案设计
简介
云计算技术使企业能够受益于经济实惠、可扩展、安全且具有高可用性和可靠性的托管数据库服务。在云数据库中,多个租户可以共享一个数据库实例。多租户数据库方案为他们提供了相互隔离的数据库环境。这样,每个租户都可以在自己的数据库环境中管理数据,而不会影响其他租户。
数据库使用缓冲池技术来加快数据访问速度。当租户需要访问某个数据页时,数据库首先会检查该数据页是否在缓冲池中。如果不在,数据库就会从磁盘读取并存储到缓冲池中。当缓冲池中的数据页数量达到上限时,驱逐算法会驱逐一些数据页,以便从磁盘加载新的数据页。
租户的业务高峰可能有规律可循,我们可以为其预先分配缓冲空间。例如,电子商务服务的访问量可能会在促销期间突然增加;一些服务的访问高峰通常出现在白天,而另一些服务的访问高峰则出现在晚上。
我们可以把对数据库的访问抽象为 M M M 个指令的集合 S = { I 1 , I 2 , . . . , I M } S=\{I_{1}, I_{2}, ... , I_{M}\} S={I1,I2,...,IM} ,访问是并发的,单位时间内可能会有许多指令进入。我们假设租户的数量是 N N N 。在多租户环境中,不同的指令可能来自不同的租户。因此,每一个集合里面的指令 I i I_{i} Ii 都可以看作是租户 T i T_{i} Ti 和数据页 P i P_{i} Pi 的元组,例如: I i = ( T i , P i ) I_{i} = (T_{i}, P_{i}) Ii=(Ti,Pi) ,租户的数据页之间是相互独立的,并且会被分别标记。例如:指令 ( 1 , 1 ) (1, 1) (1,1) 以及指令 ( 2 , 1 ) (2, 1) (2,1) 指向的是不同的数据页。如果指令 I i I_{i} Ii 所指向的数据页 P i P_{i} Pi 已经存在于缓冲池中了,我们称这种情况为命中;反之,称之为数据页错误。当数据页错误发生的时候,驱逐算法会选择一个缓冲区中的一个数据页进行驱逐(移除缓冲区),并将该空间用来存储新的数据页 P i P_{i} Pi 。
在多租户环境中,所有的租户共享一个大小为 Q Q Q 的大型缓冲池, Q Q Q 表示在缓冲池中可以存储的数据页的最大数目,并且,每个租户 t t t 会占用其中的 Q t Q_{t} Qt 。我们应该能够应对租户突然的请求变化,动态调整每个租户使用的缓冲区数量,并在业务高峰出现之前为他们分配足够的缓冲区空间。当发生页面故障时,缓冲区中的页面将被逐出,由被访问的页面取代。在缓冲区替换过程中,还需要确保每个租户实际使用的缓冲区大小不小于其最小配额,即, Q t m i n ≤ Q t Q_{t}^{min} \le Q_{t} Qtmin≤Qt 。如果租户 t t t 的 Q t Q_{t} Qt 小于等于了 Q t m i n Q_{t}^{min} Qtmin ,那么其他用户就不允许再占用属于该用户的缓冲区了。 最初,缓冲区中的每个页面都不属于任何租户。请注意,驱逐这些初始页面仍然算作驱逐其他租户。
设计一种调度算法,以实现缓冲区共享和预分配机制。在有限的缓冲空间内,最大限度地提高多租户的整体使用体验。多租户的整体使用体验可视为每个租户的页面故障数与其基本期望值之间的差距。我们优先确保优先级较高的租户的用户体验。
初始化输入
初始的输入有三行,第一行是 N N N 、 Q Q Q 、 K K K , N N N 表示租户的数目, Q Q Q 表示缓冲区的总大小, K K K 表示单位时间内预先分配的缓冲区位置的最大数量。
第二行是 N N N 个整型数字 { L 1 , L 2 , L 3 , . . . , L N } \left \{ L_{1}, L_{2}, L_{3}, ... , L_{N} \right \} {L1,L2,L3,...,LN} ,其中, L t L_{t} Lt 代表租户 t t t 的优先级。
第三行包含有 N N N 个元组,每一个元素包含有两个整型数字, ( Q t m i n , Q t e x p ) \left ( Q_{t}^{min}, Q_{t}^{exp} \right ) (Qtmin,Qtexp) 。其中 Q t m i n Q_{t}^{min} Qtmin 表示租户 t t t 的最小缓冲区大小, Q t e x p Q_{t}^{exp} Qtexp 表示为了满足理想的用户体验所需要的缓冲区的大小。 Q t e x p Q_{t}^{exp} Qtexp 有关的部分将会在 评分 里面介绍其含义和作用。此外, ∑ t = 1 N Q t m i n ≤ Q \sum _{t=1}^{N} Q_{t}^{min} \le Q ∑t=1NQtmin≤Q 。
交互
在完成初始化输入之后,还会继续输入 M M M 行数据,每次输入一行。并且每一行的第一个整型数字表示旗子( F l a g Flag Flag ),不同的 F l a g Flag Flag 会表示不同的时间和状态:
-
F l a g ≥ 0 Flag \ge 0 Flag≥0 :在这种情况之下, F l a g Flag Flag 表示当前的时间点,并且,在 F l a g Flag Flag 之后紧跟着会输入两个整型数字,分别表示对应的租户以及需要读取的数据页,使用指令来表示, I i = ( T i , P i ) I_{i} = \left( T_{i}, P_{i} \right ) Ii=(Ti,Pi) 。对于每一个指令,都需要输出一个整型数字 C i ( 1 ≤ C i ≤ Q ) C_{i}(1 \le C_{i} \le Q) Ci(1≤Ci≤Q) ,表示将租户 T i T_{i} Ti 的 P i P_{i} Pi 这个数据页放置在缓冲区中的 C i C_{i} Ci 的位置上,此缓冲区位置既可以是提前预置的,也可以是直接分配的。时间会逐次增加,此外每个单位时间内可以输入的指令的数目不会超过带宽 B B B ,但是具体的指令数目不会给出来。
-
F l a g = 0 Flag = 0 Flag=0 :这表示两个时间点之间的间隙,在这个情况下,可以进行缓冲区的预置的操作,当然可以选择是否进行预置,如果进行预置,最多可以预置 K K K 个缓冲区域,预置的方法与初始化的预置一样,参见 输入部分的输出 。
-
F l a g = − 1 Flag = -1 Flag=−1 :交互部分结束。
调度算法必须是在线的,这意味着,解决方案必须在获取到第 i + 1 i + 1 i+1 条指令之前,输出第 i i i 条指令的输出结果。第一条指令发生在初始化的预置(预分配)之后。
输出
输入部分的输出
在程序读取了初始的三行输入之后,就需要考虑预分配的问题,如果不进行预分配就输出 0 0 0 ,如果需要进行预分配,那么就输出 n b 1 t 1 b 2 t 2 . . . b n t n nb_{1}t_{1}b_{2}t_{2}...b_{n}t_{n} nb1t1b2t2...bntn ,其中 n n n 表示预分配的缓冲区位置的数量,元组 ( b i , t i ) \left ( b_{i}, t_{i} \right ) (bi,ti) ,表示的是将 b i b_{i} bi 这个缓冲区位置分配给租户 t i t_{i} ti ,每个整型数字之间使用空格分隔开来。
交互部分的输出
参见 交互 这个部分。
评分
我们的目标是评估租户的用户体验,重点是跟踪数据页错误以及验证预先所分配缓冲区位置的可用性。我们引入 E t E_{t} Et 来量化表示租户 t t t 的用户体验,更精确的来讲, E t E_{t} Et 表示用户体验的损失。并且 E t E_{t} Et 越小越好。
对于租户 t t t ,如果发生了数据页错误,需要更新 E t E_{t} Et :
E t = E t + 1 + γ p E_{t} = E_{t} + 1 + \gamma p Et=Et+1+γp
其中, γ \gamma γ 是预置缓冲区的损失因子,而 p 则是你在没有预先分配的情况下临时决定替换哪一页的惩罚。具体来说:
p = max { 0 , τ − ( t i m e 2 − t i m e 1 ) } p = \max \{0, \tau - (time_{2} - time_{1}) \} p=max{0,τ−(time2−time1)}
假设分配缓冲区的损失为 τ \tau τ 单位时间。当缓冲区的某一个位置被预分配给某一个租户的时候,我们会在这个位置上记录时间 t i m e 1 time_{1} time1 ,当这个位置被使用了,我们会标记时间 t i m e 2 time_{2} time2。如果一个租户没有进行预置,而是直接替换了他自己或者其他租户的位置,这将需要 τ \tau τ 单位时间来完成这个操作,因此在这种情况下,我们可以认为 t i m e 1 = t i m e 2 time_{1} = time_{2} time1=time2 ,从而可以得出: p = τ p = \tau p=τ ,初始预分配的缓冲区可以认为是在负无穷的时间点进行与分配的。
在本题目中,会有 S ( 1 ≤ S ≤ 100 ) S(1\le S \le 100) S(1≤S≤100) 个测试。并且设定 τ = 3 \tau = 3 τ=3 , γ = 0.25 \gamma = 0.25 γ=0.25 , E t E_{t} Et 则从 0 0 0 开始计算,对于每一个测试均是如此。在每一个测试中,我们会计算使用你的算法所得到的 E t a c t u a l E_{t}^{actual} Etactual 和所期望的用户体验 E t e x p E_{t}^{exp} Etexp 之间的剪切率:
ρ t = max { E t a c t u a l E t e x p − 1 , 0 } \rho _{t} = \max \left \{ \frac{E_{t}^{actual}}{E_{t}^{exp}} - 1 , 0 \right \} ρt=max{EtexpEtactual−1,0}
其中, E t e x p E_{t}^{exp} Etexp 表示租户所期望的用户体验。它是在每个租户 t t t 的缓冲队列长度为 Q t e x p Q_{t}^{exp} Qtexp 的时候使用LRU算法所得到的由于数据页错误而造成的损失函数(用户体验)。
LRU算法参考下述资料:
https://en.wikipedia.org/wiki/Cache_replacement_policies#LRU
于是,我们可以计算出来第 j j j 次测试的损失函数为:
C o s t j = ∑ t = 1 N ρ t 2 L t w h e r e L t m e a n s p r i o r i t y Cost_{j} = \sum _{t=1}^{N} \rho _{t} ^{2} L_{t}\\ where \quad L_{t} \quad means \quad priority Costj=t=1∑Nρt2LtwhereLtmeanspriority
其中的平方函数是对用户体验损失超过基线两倍的调度进行严厉惩罚。
对于每一个测试,都应该尽可能地减小 C o s t Cost Cost ,并且最终会与基线损失 C o s t b a s e Cost^{base} Costbase 进行对比,这是一个正数,使用事先制定的基准解决方案计算得出。
最终,总得分地计算公式如下所示:
S c o r e = ∑ j = 1 S σ ( C o s t j C o s t j b a s e ) w h e r e σ ( x ) = 100 max { 0 , 5 − x } Score = \sum _{j=1}^{S} \sigma \left ( \frac{Cost_{j}}{Cost_{j}^{base}} \right)\\ where \quad \sigma (x) = 100 \max \left \{ 0, 5-x \right \} Score=j=1∑Sσ(CostjbaseCostj)whereσ(x)=100max{0,5−x}
这个 σ \sigma σ 函数给每一个测试设定了 500 500 500 分,所得到的损失函数的数值应该小于基准线的 5 5 5 倍,否则将会不得分。
注意点
可以多次提交,但每分钟最多提交一次,每天最多提交 20 次。
本问题有两套测试。在比赛期间,每份提交的材料都要经过初赛测试。比赛结束后,对于每位参赛者:
评审团选取在初赛中得分不为零的最新作品;
对该作品进行最终测试;
最终测试得分即为参赛者的最终得分。
然后根据选手的最终得分进行排名。决赛测试与初赛测试相似,但不完全相同。最终测试的次数可能会更多,但其分布与初试的分布相似。
不建议使用多种算法来选择最优解,因为这可能会导致超时。也不建议对每个测试集使用特定的解决方案,因为这可能会在最终排名中被淘汰。
语言要求
Only support C++ language
C++ compiler version: g++ 7.3.0
The default C++ language dialect option is -std=c++17
需要使用的模块
C++
的标准库 vector
以及 tuple
, vector
用来作为列表, tuple
用来存储租户的信息,例如一个简单的应用:
#include <iostream>
#include <vector>
#include <tuple>
int main() {
// 创建一个 vector,元素类型为 tuple<int, int, int>
std::vector<std::tuple<int, int, int>> vec;
// 创建一个 tuple 对象
std::tuple<int, int, int> tpl = std::make_tuple(1, 2, 3);
// 将 tuple 对象添加到 vector 中
vec.push_back(tpl);
// 输出 vector 中的元素
for (const auto& element : vec) {
std::cout<<std::get<0>(element)<<" "<<std::get<1>(element)<<" "<<std::get<2>(element)<<std::endl;
}
return 0;
}
系统框架图
方案基本确定之后再行绘制,主要是为了便于理解。
方案设计
初始化阶段
1、初始化阶段的缓冲区预置一定要做,由于该预置过程不需要考虑所需要的时间,也就是说可以预置数目不限的缓冲区位置,因此可以尽可能多的预置一些区域;但是预置空间又不能过多,这是因为可能存在有某几个租户查询数据页非常少的情况,这就会导致给这些租户分配的预置空间被浪费,从而会影响整体的用户体验。我们知道, ∑ t = 1 N Q t m i n ≤ Q \sum _{t=1}^{N} Q_{t}^{min} \le Q ∑t=1NQtmin≤Q ,并且在每一个离散时间节点都可以进行缓冲区的预置(每个时间节点最多可以预置 K K K 个缓冲空间),因此应该综合考虑这两个因素来进行初始化的预置。
譬如:我们对于初始化的预置,可以考虑给每一个租户都预分配一定的缓冲区,并且缓冲区的个数为:
Q t m i n 10 − K N \frac{Q_{t}^{min}}{10} - \frac{K}{N} 10Qtmin−NK
这种方式中数字 10 10 10 是待定的,可以修改,并且给每个租户分配的缓冲区空间数目根据不同租户的 Q t m i n Q_{t}^{min} Qtmin 而定,较为合理。
分段、log
2、每个租户的预置缓冲区的位置是否连续,这可能会影响到运行的时间,例如我们假定属于某一个租户的缓冲区一定是连续的,那么在遍历这个租户的缓冲区的时候就不需要循环所有的缓冲区位置,但是在存储数据的时候可能又会操作起来比较麻烦,因此也需要折中。
交互阶段
1、首先应该有两个循环,由于不知道带宽等信息,因此需要先写成死循环,然后根据条件跳出循环,最外面的循环是要不断地读取输入的信息,内层的循环是对这些信息的处理,也就是将相应的数据页放到哪一个缓冲区位置以及输出预置信息等,而外层的循环也需要进行流程的控制。
2、对于将数据页放置到哪一个缓冲区位置,需要区分多种情况:如果该租户的缓冲区中已经存在有该数据页,那么就不需要任何操作;如果该租户的缓冲区中没有对应的数据页,但是该租户有尚未使用的缓冲区,那么,就将新读取的数据页放在某一个尚未使用的缓冲区;如果该租户缓冲区中没有对应的数据页,并且该租户的所有缓冲区都存放有数据,那么有需要考虑多种情况:
2.1、有不属于任何一个租户的缓冲区,将新读取的数据页放置在某一个不属于任何一个租户的缓冲区位置即可;
2.2、所有的缓冲区都有对应的租户,此时应该综合考虑用户优先级、用户最小缓冲区大小、用户理想缓冲区大小、用户实时实际占用的缓冲区大小等多个因素。有待商榷。
譬如:我们假设,租户 t j t_{j} tj 需要考虑是否占用其他用户的缓冲区空间:
-
假如 Q t j > Q t j e x p + K N Q_{t_{j}} > Q_{t_{j}}^{exp} + \frac{K}{N} Qtj>Qtjexp+NK ,那么租户 t j t_{j} tj 不能占用属于其他租户的缓冲区空间;此时,就需要将属于租户 t j t_{j} tj 的缓冲区中的第一个位置进行 p o p pop pop 的操作,然后把新的数据页 p u s h push push 进缓冲区中去(对于每一个租户采用数组
vector
的方式存储属于这个租户的缓冲区位置以及其上的数据页)。 -
假如 Q t j ≤ Q t j e x p + K N Q_{t_{j}} \le Q_{t_{j}}^{exp} + \frac{K}{N} Qtj≤Qtjexp+NK ,那么就可以占用属于其他租户的缓冲区,此时需要根据优先级进行判断,我们假设与租户 t j t_{j} tj 进行优先级比较的为租户 t i t_{i} ti ,并假设租户 t j t_{j} tj 的优先级为 L j L_{j} Lj ,租户 t i t_{i} ti 的优先级为 L i L_{i} Li ,当然,由于需要优先考虑高优先级租户的用户体验,因此从最低优先级的租户开始比较。具体的比较如下:
-
假如 L j > L i L_{j} > L_{i} Lj>Li 并且 Q t i > Q t i m i n + Q t i e x p 2 Q_{t_{i}} > \frac{Q_{t_{i}}^{min} + Q_{t_{i}}^{exp}}{2} Qti>2Qtimin+Qtiexp 的时候,可以让租户 t j t_{j} tj 占用租户 t i t_{i} ti 的缓冲区空间,反之则不能。为什么是大于号,因为: Q t i ≤ Q t i m i n Q_{t_{i}} \le Q_{t_{i}}^{min} Qti≤Qtimin 的时候,不可以被占用。
-
假如 L j ≤ L i L_{j} \le L_{i} Lj≤Li 并且 Q t i > Q t i e x p Q_{t_{i}} > Q_{t_{i}}^{exp} Qti>Qtiexp ,那么,可以让租户 t j t_{j} tj 占用租户 t i t_{i} ti 的缓冲区空间,反之则不能。
-
-
假如 Q t j < Q t j m i n Q_{t_{j}} < Q_{t_{j}}^{min} Qtj<Qtjmin ,从低优先级到高优先级出现的第一个 Q t i > Q t i m i n Q_{t_{i}} > Q_{t_{i}}^{min} Qti>Qtimin 的情况,可以让租户 t j t_{j} tj 占用租户 t i t_{i} ti 的缓冲区空间。
3、对于预置缓冲区,此时应该考虑每一个租户的缓冲区使用情况,包括:用户最小缓冲区大小、用户理想缓冲区大小、用户实时实际占用的缓冲区大小,并且依据租户优先级对缓冲区进行预置,当然前提条件是仍然有不属于任何租户的缓冲区,否则一定不需要进行预置的操作。具体的预置方案有待商榷。
flag=0
暂不考虑
修改
- 优先级分组,[1,2,3,4] 为一组(L,Low),[5,6,7] 为一组(M,Middle),[8,9,10] 为一组(H,High)。只要在一轮测试中存在有 M 或者 H 的分组,优先级为 L 的分组只能使用 Qmin 。M 分组不能占用 H 分组,H 分组在超过 Qexp 之后不再占用其他租户的空间,除非存在有其他租户超过了 Qexp。
- flag = 0 ,在 Qc 不为空的前提下,从高优先级到低优先级分配,L 分组不进行分配,分配的时候 H 分组中的每个租户分到的数目是 M 分组中每个租户的 2 倍,并且总的分配数目不大于 K ,还要保证 H 分组在分配之后不会超过 Qexp 。
- 在1.中的占用目前的想法是沿用之前的占用方法。
进度规划
最终代码
#include <iostream>
#include <vector>
#include <tuple>
#include <cstdint>
#include <cmath>
#include <algorithm>
struct seek //对于每一个用户,基于二叉排序树结构的缓存分配结构体,取代原有元组,减小查找速度
{
uint32_t buffer; //缓冲区序号
uint32_t page; //存放的数据页号
struct seek* left;
struct seek* right;
};
class Tenant{
public:
Tenant(uint16_t t);
uint16_t GetT();
void SetL(uint16_t L);
uint16_t GetL();
void SetQs(std::tuple<uint32_t, uint32_t> Qsn);
uint32_t GetQmin();
uint32_t GetQexp();
uint32_t GetQt();
std::vector<uint32_t> GetBuffer();
std::vector<std::tuple<uint32_t, uint32_t>> GetQvec();
void AddBuffer(uint32_t c);
uint32_t RemoveBuffer();
void AddPage(uint32_t qb, uint32_t qp);
uint32_t RemovePage();
void LRURemovePage(uint32_t qp);
bool EnoughBuffer(); // 新增:检查是否有足够的缓冲区
bool MoreThanQmin(); // 新增:检查Qt是否大于Qmin
struct seek* GetRoot();
void Insert(struct seek* &R, struct seek* s); //将一个seek类型的节点插入到树中
void sortTree(uint32_t buffer1, uint32_t page1); //构建该用户的二叉排序树
struct seek* Search(struct seek* R, uint32_t page1); //查找函数
uint32_t Delete(struct seek* &R);
void Release(struct seek* &R);
//析构函数
~Tenant(){Release(Root);};
private:
uint16_t t;
uint16_t L;
uint32_t Qmin;
uint32_t Qexp;
uint32_t Qt;
// 属于租户的存放数据页的缓冲区,第一个数字是缓冲区的位置,第二个缓冲区是数据页的号码
// 对应操作是 AddPage() 和 RemovePage()
std::vector<std::tuple<uint32_t, uint32_t>> Qvec;
// 属于租户的缓冲区,Buffer里面放的是空的缓冲区
// 对应操作是 AddBuffer(uint32_t c) 和 RemoveBuffer()
std::vector<uint32_t> Buffer;
struct seek* Root;
};
void PreAllocateBuffer(uint16_t N, uint16_t K, std::vector<Tenant>& Ts, std::vector<uint32_t>& Qc){
uint32_t FQmin[N];
uint32_t sum = 0;
for(uint16_t n=0; n<N; n++){
FQmin[n] = Ts[n].GetQmin();
sum += FQmin[n];
}
std::cout<<sum;
for(uint16_t n=0; n<N; n++){
while(FQmin[n]>0){
uint32_t ind = Qc.back();
Qc.pop_back();
Ts[n].AddBuffer(ind);
FQmin[n] -= 1;
std::cout<<" "<<ind<<" "<<Ts[n].GetT();
}
}
std::cout<<std::endl;
}
// flag = 0 的时候的预分配
void AllocateBuffer(std::vector<uint32_t>& Qc, std::vector<Tenant>& Ts, uint16_t K){
if(Qc.empty()){
std::cout<<0;
return;
}else{
if(Qc.size() >= K){
std::cout<<K;
uint16_t k = 0;
while(k < K){
for(uint16_t i=0;i<Ts.size();i++){
if(Ts[i].GetL() >= 5 && Ts[i].GetL() <= 7){
if(Qc.empty() || k >= K){
return;
}
uint32_t ind = Qc.back();
Qc.pop_back();
Ts[i].AddBuffer(ind);
std::cout<<" "<<ind<<" "<<Ts[i].GetT();
k ++;
}else if(Ts[i].GetL() >= 8 && Ts[i].GetL() <= 10){
if(Qc.empty() || k >= K){
return;
}
uint32_t ind = Qc.back();
Qc.pop_back();
Ts[i].AddBuffer(ind);
std::cout<<" "<<ind<<" "<<Ts[i].GetT();
k ++;
if(Qc.empty() || k >= K){
return;
}
ind = Qc.back();
Qc.pop_back();
Ts[i].AddBuffer(ind);
std::cout<<" "<<ind<<" "<<Ts[i].GetT();
k ++;
}else{
continue;
}
}
}
return;
}else{
uint32_t QK = Qc.size();
std::cout<<QK;
uint16_t k = 0;
while(k < QK){
for(uint16_t i=0;i<Ts.size();i++){
if(Ts[i].GetL() >= 5 && Ts[i].GetL() <= 7){
if(Qc.empty() || k >= QK){
return;
}
uint32_t ind = Qc.back();
Qc.pop_back();
Ts[i].AddBuffer(ind);
std::cout<<" "<<ind<<" "<<Ts[i].GetT();
k ++;
}else if(Ts[i].GetL() >= 8 && Ts[i].GetL() <= 10){
if(Qc.empty() || k >= QK){
return;
}
uint32_t ind = Qc.back();
Qc.pop_back();
Ts[i].AddBuffer(ind);
std::cout<<" "<<ind<<" "<<Ts[i].GetT();
k ++;
if(Qc.empty() || k >= QK){
return;
}
ind = Qc.back();
Qc.pop_back();
Ts[i].AddBuffer(ind);
std::cout<<" "<<ind<<" "<<Ts[i].GetT();
k ++;
}else{
continue;
}
}
}
return;
}
}
}
/* 实现租户优先级分组功能
参数:所有租户列表
*/
int num_p[3][11]; //全局变量:一个二维数组[优先级组L:0,M:1,H:2][按顺序]:在vector中的位置
void Categorize(std::vector<Tenant>& Ts){
int l = 1, m = 1, h = 1;
num_p[0][0] = 0; //第一位记录这一类租户数量
num_p[1][0] = 0;
num_p[2][0] = 0;
for(uint16_t i=0; i<Ts.size(); i++){
if(Ts[i].GetL() <= 4){
num_p[0][l++] = i; //低优先级的租户
num_p[0][0]++;
}
else if(Ts[i].GetL() >= 8){
num_p[2][h++] = i; //高优先级的租户
num_p[2][0]++;
}
else {
num_p[1][m++] = i; //中优先级的租户
num_p[1][0]++;
}
}
}
/* 从Qti<Qtmin开始的替换函数
参数:当前操作的租户,所有租户列表
返回:-1为错误,0为自身缓存区,其它数字为替换其他租户
*/
int16_t Replace_s(Tenant& theTenant, std::vector<Tenant>& Ts){
uint16_t MemoryL = 11;
uint16_t MemoryT = 11;
// 比较Qt与Qmin
if(theTenant.MoreThanQmin()){
// 超过Qmin
for(uint16_t i=0; i<Ts.size(); i++){
if(theTenant.GetL() > Ts[i].GetL()){
// theTenant L 大
if(Ts[i].GetQt() > (std::floor((Ts[i].GetQmin()+Ts[i].GetQexp())/2) + 1)){
MemoryT = Ts[i].GetT();
return MemoryT;
}else
continue;
}else{
// theTenant L 小 <=
if(theTenant.GetT() != Ts[i].GetT()){
// 不能自己和自己比较
if(Ts[i].GetQt() > Ts[i].GetQexp()){
MemoryT = Ts[i].GetT();
return MemoryT;
}else
continue;
}else
continue;
}
}
if(MemoryT > Ts.size())
return -1;
else
return MemoryT;
}else{
// 不足最小 Qmin
for(uint16_t i=0; i<Ts.size(); i++){
if(Ts[i].MoreThanQmin() && Ts[i].GetL() < MemoryL){
MemoryL = Ts[i].GetL();
MemoryT = Ts[i].GetT();
}else
continue;
}
if(MemoryT > Ts.size())
return -1;
else
return MemoryT;
}
}
/* 实现选择替换哪位租户功能
参数:当前操作的租户,所有租户列表
返回:-1为错误,0为自身缓存区,其它数字为替换其他租户
*/
int16_t Replace(Tenant& theTenant, std::vector<Tenant>& Ts){
uint16_t MemoryL = theTenant.GetL();
uint16_t MemoryT = theTenant.GetT();
// 检查该租户是否有足够的缓冲区
if(theTenant.EnoughBuffer()){
return 0;
}else{
if(MemoryL < 5){ //L组优先级
if(num_p[1][0] > 0 || num_p[2][0] > 0) //有M/H
if(theTenant.MoreThanQmin()) //大于等于Qmin
return 0;
else{ //小于Qmin
for(uint16_t i=1; i<=num_p[0][0]; i++){ //所有处于L组的租户与存储的最小L比较(一开始为该租户本身)
if(MemoryL > Ts[num_p[0][i]].GetL()){
MemoryL = Ts[num_p[0][i]].GetL();
MemoryT = Ts[num_p[0][i]].GetT();
}
}
return MemoryT;
}
else //没有M&H,运行原有函数
return(Replace_s(theTenant, Ts));
}
if(MemoryL > 7){ //H组优先级
if(theTenant.GetQt() > theTenant.GetQexp()){ //超过Qexp
for(uint16_t i=0; i<=Ts.size(); i++){ //检查所有组的Qexp
if(Ts[i].GetQt() > Ts[i].GetQexp() && theTenant.GetL() > Ts[i].GetL())
return(Ts[i].GetT()); //有大于Qexp的L小于该租户的
}
return 0; //不存在则返回自身
}
else //不超过则运行原函数
return(Replace_s(theTenant, Ts));
}
else{ //M组优先级
// M组专用的replace
// 比较Qt与Qmin
MemoryL = 11;
MemoryT = 11;
if(theTenant.MoreThanQmin()){
// 超过Qmin
for(uint16_t i=0; i<Ts.size(); i++){
if(theTenant.GetL() > Ts[i].GetL()){
// theTenant L 大
if(Ts[i].GetQt() > (std::floor((Ts[i].GetQmin()+Ts[i].GetQexp())/2) + 1)){
MemoryT = Ts[i].GetT();
return MemoryT;
}else
continue;
}else{
// theTenant L 小 <=
if(theTenant.GetT() != Ts[i].GetT()){
// 不能自己和自己比较
if(Ts[i].GetQt() > Ts[i].GetQexp() && Ts[i].GetL() < 8){ //加入非H组限定条件
MemoryT = Ts[i].GetT();
return MemoryT;
}else
continue;
}else
continue;
}
}
if(MemoryT > Ts.size())
return -1;
else
return MemoryT;
}else{
// 不足最小 Qmin
for(uint16_t i=0; i<Ts.size(); i++){
if(Ts[i].MoreThanQmin() && Ts[i].GetL() < MemoryL && Ts[i].GetL() < 8){ //加入非H组限定条件
MemoryL = Ts[i].GetL();
MemoryT = Ts[i].GetT();
}else
continue;
}
if(MemoryT > Ts.size())
return -1;
else
return MemoryT;
}
}
}
}
int main(){
std::vector<Tenant> Ts;
// 输入 N Q K
uint16_t N;
uint32_t Q;
uint16_t K;
std::cin>>N>>Q>>K;
// 初始化租户对象
for(uint16_t n=1; n<=N; n++){
Ts.push_back(Tenant(n));
}
// 输入优先级
std::vector<uint16_t> Ls;
for(uint16_t n=0; n<N; n++){
uint16_t L;
std::cin>>L;
Ls.push_back(L);
}
for(uint16_t n=0; n<N; n++){
Ts[n].SetL(Ls[n]);
}
// 输入 Qmin Qexp
std::vector<std::tuple<uint32_t, uint32_t>> Qs;
for(uint16_t n=0; n<N; n++){
uint16_t Qmin;
uint16_t Qexp;
std::cin>>Qmin>>Qexp;
std::tuple<uint32_t, uint32_t> Qsn = {Qmin, Qexp};
Qs.push_back(Qsn);
}
for(uint16_t n=0; n<N; n++){
Ts[n].SetQs(Qs[n]);
}
// 初始化不属于任何租户的缓冲区,后续缓冲区分配给租户之后也需要从这里取出去
std::vector<uint32_t> Qc;
for(uint32_t c=1; c<=Q; c++){
Qc.push_back(c);
}
// 初始化的预分配
PreAllocateBuffer(N, K, Ts, Qc);
// std::cout<<0<<std::endl;
Categorize(Ts); //租户分类
// 交互阶段
while(true){
int flag;
std::cin>>flag;
if(flag == 0){
// 目前没有 flag = 0 的预分配,后续可能会添加
AllocateBuffer(Qc, Ts, K);
std::cout<<std::endl;
}else if(flag > 0){
uint16_t T;
uint32_t P;
std::cin>>T>>P;
// index -> T - 1
// std::vector<std::tuple<uint32_t, uint32_t>> Qvec = Ts[T - 1].GetQvec();
// 判断数据页是否已经存在于租户的缓冲区中
// uint16_t In_Or_Not = 0;
// for(const auto &element: Qvec){
// uint32_t qb;
// uint32_t qp;
// std::tie(qb, qp) = element;
// if(P == qp){
// Ts[T - 1].LRURemovePage(qp);
// Ts[T - 1].AddPage(qb, qp);
// In_Or_Not = 1;
// std::cout<<qb<<std::endl;
// break;
// }else{
// continue;
// }
// }
// if(In_Or_Not == 1){
// continue;
// }else{
// // pass
// }
// struct seek* r = Ts[T - 1].GetRoot();
struct seek* r = Ts[T - 1].Search(Ts[T - 1].GetRoot(), P);
if(r == NULL){
// pass (nothing to do)
}else{
// if(r->page == P){
std::cout<<r->buffer<<std::endl;
// continue;
continue;
// }else{
// pass (nothing to do)
// }
}
// 添加数据页
if(Ts[T - 1].GetBuffer().empty()){
if(Qc.empty()){
// 要么自己踢出去一个页面,要么让别的租户踢出去一个页面,Replace 函数进行判断
int16_t res = Replace(Ts[T - 1], Ts);
if(res == -1){
uint32_t buffer = Ts[T - 1].RemovePage();
Ts[T - 1].AddPage(buffer, P);
std::cout<<buffer<<std::endl;
}else if(res == 0){
uint32_t buffer = Ts[T - 1].RemovePage();
Ts[T - 1].AddPage(buffer, P);
std::cout<<buffer<<std::endl;
}else{
// 增加了一个判断,目的是避免出现 Error! Index cannot be replaced because Q[original user] - 1 < its Qmin !
if(Ts[res - 1].MoreThanQmin()){
// index -> res - 1
if(Ts[res - 1].GetBuffer().empty()){
uint32_t buffer = Ts[res - 1].RemovePage();
// res -> T
Ts[T - 1].AddPage(buffer, P);
std::cout<<buffer<<std::endl;
}else{
uint32_t buffer = Ts[res - 1].RemoveBuffer();
Ts[T - 1].AddPage(buffer, P);
std::cout<<buffer<<std::endl;
}
}else{
uint32_t buffer = Ts[T - 1].RemovePage();
Ts[T - 1].AddPage(buffer, P);
std::cout<<buffer<<std::endl;
}
}
}else{
uint32_t buffer = Qc.back();
Qc.pop_back();
Ts[T - 1].AddPage(buffer, P);
std::cout<<buffer<<std::endl;
continue;
}
}else{
// 有空余的未使用的缓冲区
uint32_t buffer = Ts[T - 1].RemoveBuffer();
Ts[T - 1].AddPage(buffer, P);
std::cout<<buffer<<std::endl;
continue;
}
}else if(flag == -1){
break;
}else{}
}
return 0;
}
Tenant::Tenant(uint16_t t):t(t){
// 初始化
this->Qt = 0;
this->Root = NULL;
}
// 获取租户的编号
uint16_t Tenant::GetT(){
return this->t;
}
// 设置租户优先级
void Tenant::SetL(uint16_t L){
this->L = L;
}
// 获取租户优先级
uint16_t Tenant::GetL(){
return this->L;
}
void Tenant::SetQs(std::tuple<uint32_t, uint32_t> Qsn){
this->Qmin = std::get<0>(Qsn);
this->Qexp = std::get<1>(Qsn);
}
// 获取租户的 Qmin
uint32_t Tenant::GetQmin(){
return this->Qmin;
}
// 获取租户的 Qexp
uint32_t Tenant::GetQexp(){
return this->Qexp;
}
// 获取租户的 Qt
uint32_t Tenant::GetQt(){
return this->Qt;
// Index 错误应该不是这里的问题,因为修改之后没有变化
// return this->Qvec.size();
}
// 获取给租户分配的尚未使用的缓冲区
std::vector<uint32_t> Tenant::GetBuffer(){
return this->Buffer;
}
// 获取租户的 Qvec
std::vector<std::tuple<uint32_t, uint32_t>> Tenant::GetQvec(){
return this->Qvec;
}
// 预分配的时候使用该函数给租户分配缓冲区,此时缓冲区中没有数据页
void Tenant::AddBuffer(uint32_t c){
this->Buffer.push_back(c);
this->Qt += 1;
}
// 给属于某一个租户的缓冲区中添加数据页,或者加空白的缓冲区踢出去给了别的租户,都需要使用这个函数,方便统计租户的Qt
uint32_t Tenant::RemoveBuffer(){
uint32_t front = this->Buffer.front();
this->Buffer.erase(this->Buffer.begin());
this->Qt -= 1;
return front;
}
// 给租户添加数据页以及数据页存放的位置
void Tenant::AddPage(uint32_t qb, uint32_t qp){
// this->Qvec.push_back(std::make_tuple(qb, qp));
Tenant::sortTree(qb, qp);
this->Qt += 1;
}
// 移除租户的数据页
uint32_t Tenant::RemovePage(){
// uint32_t qb;
// uint32_t qp;
// std::tie(qb, qp) = this->Qvec.front();
// this->Qvec.erase(this->Qvec.begin());
uint32_t qb = Tenant::Delete(this->Root);
this->Qt -= 1;
return qb;
}
// LRU 算法
void Tenant::LRURemovePage(uint32_t qp){
// 使用 remove_if 和 erase 删除元组中第一个元素等于特定值的元素
this->Qvec.erase(std::remove_if(
this->Qvec.begin(), this->Qvec.end(),
[qp](const auto &element) {
return std::get<1>(element) == qp;
}
),
this->Qvec.end()
);
this->Qt -= 1;
}
// 新增:检查是租户否有足够的缓冲区
bool Tenant::EnoughBuffer(){
// 由 Buffer.size() 改为 Qt 再改为 Qvec.size()
if(this->Qt > this->Qexp){
return true;
}else{
return false;
}
}
// 新增:检查Qt是否大于Qmin
bool Tenant::MoreThanQmin(){
// GetQt() -> Qvec.size()
if(this->Qt > this->Qmin){
return true;
}else{
return false;
}
}
// 获取 Root
struct seek* Tenant::GetRoot(){
return this->Root;
}
//插入函数
void Tenant::Insert(struct seek* &R, struct seek* s) { //R通常为根结点,s是待插入结点
if (R == NULL) //如果是空树就直接插入
R = s;
else if (s->page < R->page) //比较待插入结点的值与当前结点的值,依据排序二叉树的规则,
Tenant::Insert(R->left, s); //s的值小于当前结点的值,s继续与当前结点的左孩子比较,反之与右孩子比较
else if (s->page > R->page)
Tenant::Insert(R->right, s); //本质上就是递归调用,不断比较,直到能当作叶子结点插入时跳出递归
}
//将新的指令所占缓存区插入到该用户树中。
void Tenant::sortTree(uint32_t buffer1, uint32_t page1) {
struct seek* s = new seek; //创建新结点
s->page = page1;
s->buffer = buffer1;
s->left = NULL; //把新结点的数据域赋值后,要把左右孩子指针置空,防止其左右孩子无法正常插入
s->right = NULL;
Tenant::Insert(this->Root, s); //调用插入函数
}
//返回所查找元素位置的指针
struct seek* Tenant::Search(struct seek* R, uint32_t page1){
if(R == NULL || R->page == page1){
return R;
}else if(R->page < page1){ //利用二叉排序树的性质,值比根结点的值小则去其左子树中查找,反之去右子树
return Tenant::Search(R->right, page1);
}else{
return Tenant::Search(R->left, page1);
}
}
//删除一个节点
uint32_t Tenant::Delete(struct seek* &R){
uint32_t r;
struct seek* q;
struct seek* s; //q是s的父结点
if (R->left == NULL){ //只有右子树
q = R;
R = R->right;
r = q->buffer;
delete q;
return r;
}
else if (R->right == NULL) { //只有左子树
q = R;
R = R->left;
r = q->buffer;
delete q;
return r;
}
else { //有左子树和右子树
q = R;
s = R->left; //把待删除结点的值用它的左子树中最右侧的结点的值代替,这个值的大小仅次于待删除值
while (s->right != NULL) {
q = s;
s = s->right;
}
// R->page = s->page;
// R->buffer = s->buffer;
if (q != R) //分q与R的关系来分类
q->right = s->left;
else
R->left = s->left;
r = s->buffer;
delete s;
return r;
}
}
//依次删除节点,直至树空。
void Tenant::Release(struct seek* &R){
if (R != NULL) { //只有这样才能传进结点与结点之间的关系(左右孩子、父结点)
Tenant::Release(R->left);
Tenant::Release(R->right); //先释放左孩子,再释放右孩子,最后释放根结点
delete R;
}
}