问题描述
我每天从某处获取数据并将其插入到 cassandra
中然后我需要从 cassandra
检索整周的数据并进行一些处理并将结果插入回 cassandra
.
I am getting data from somewhere and inserting it into cassandra
daily basisthen I need to retrieve the data from cassandra
for whole week and do some processing and insert result back onto cassandra
.
我有很多记录,每条记录执行以下大部分操作.
i have lot of records, each record executing most of the below operations.
为此,我编写了一个程序,其工作正常,但我收到警告,并且根据 API 文档不应多次使用 prepare 语句
其降低性能.
To do this I have written a program below its working fine but I get warning and according to API document should not use prepare statement
multiple timeits reducing performance.
请告诉我如何避免这种情况以提高性能或建议我在 Scala 中实现此目标的任何替代方法.
Please tell me how to avoid this to improve the performance OR suggest me any alternative approach to achieve this in scala.
这是我的代码的一部分:
Here is some part of my code:
object CassandraUtils {
println("##########entered cassandrutils")
val selectQuery = "select * from k1.table1 where s_id = ? and a_id = ? and summ_typ = ? and summ_dt >= ? and summ_dt <= ?;"
val selectTripQuery = "select * from k1.tale1 where s_id = ? and a_id = ? and summ_typ = ? and summ_dt = ? and t_summ_id = ?;"
val insertQuery = "insert into k1.table1 (s_id, a_id, summ_typ, summ_dt, t_summ_id, a_s_no, avg_sp, c_dist, c_epa, c_gal, c_mil, d_id, d_s_no, dist, en_dt, en_lat, en_long, epa, gal, h_dist, h_epa,h_gal, h_mil, id_tm, max_sp, mil, rec_crt_dt, st_lat, st_long, tr_dis, tr_dt, tr_dur,st_addr,en_addr) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?,?);"
val updateQuery = "update k1.table1 set tr_dur=?,id_tm=?,max_sp=?,c_dist=?,h_dist=?,dist=?,c_gal=?,c_mil=?,h_gal=?,h_mil=?,c_epa=?,h_epa=?,epa=?,gal=?,rec_crt_dt=?,mil=?,avg_sp=?,tr_dis=?,en_lat=?,en_long=? where s_id= ? and a_id= ? and summ_typ= ? and summ_dt= ? and t_summ_id=?; "
val dashboardSelectQuery: String = "select * from k1.table2 where s_id = ? and a_id = ? and hlth_typ= ? and hlth_s_typ= ?;"
val insertDashBoardQuery = "insert into k1.table2 (s_id, a_id, hlth_typ, hlth_s_typ, dsh_nval_01, rec_crt_dt, lst_rfr_dt, a_s_no) values (? ,?, ?, ?, ?, ?, ?, ?);"
val updateDashBoardQuery = "update k1.table2 set dsh_nval_01= ?, lst_rfr_dt= ? where s_id= ? and a_id= ? and hlth_typ= ? and hlth_s_typ= ?;"
val dInfoSelectQuery = "select d_s_no,d_type,a_id,d_id,s_id from k2.table3 where d_s_no = ?"
def insert(session: Session, data: THData, batch: BatchStatement) {
val insertStatement = session.prepare(insertQuery)
//insertStatement.setConsistencyLevel(ConsistencyLevel.QUORUM)
//println("data.st_addr,data.en_addr: ------------------->>>>>> " + data.st_addr, data.en_addr)
val boundStatement = new BoundStatement(insertStatement)
//session.execute(boundStatement.bind( data.s_id, data.a_id, data.summ_typ, data.summ_dt, data.t_summ_id, data.a_s_no, data.avg_sp, data.c_dist, data.c_epa, data.c_gal, data.c_mil, data.d_id, data.d_s_no, data.dist, data.en_dt, data.en_lat, data.en_long, data.epa, data.gal, data.h_dist, data.h_epa,data.h_gal, data.h_mil, data.id_tm, data.max_sp, data.mil, data.rec_crt_dt, data.st_lat, data.st_long, data.tr_dis, data.tr_dt, data.tr_dur,data.st_addr,data.en_addr))
batch.add(boundStatement.bind(data.s_id, data.a_id, data.summ_typ, data.summ_dt, data.t_summ_id, data.a_s_no, data.avg_sp, data.c_dist, data.c_epa, data.c_gal, data.c_mil, data.d_id, data.d_s_no, data.dist, data.en_dt, data.en_lat, data.en_long, data.epa, data.gal, data.h_dist, data.h_epa, data.h_gal, data.h_mil, data.id_tm, data.max_sp, data.mil, data.rec_crt_dt, data.st_lat, data.st_long, data.tr_dis, data.tr_dt, data.tr_dur, data.st_addr, data.en_addr))
}
def update(session: Session, data: THData, batch: BatchStatement) {
val updateStatement = session.prepare(updateQuery)
//insertStatement.setConsistencyLevel(ConsistencyLevel.QUORUM)
val boundStatement = new BoundStatement(updateStatement)
//session.execute(boundStatement.bind( data.tr_dur, data.id_tm, data.max_sp, data.c_dist, data.h_dist, data.dist, data.c_gal, data.c_mil, data.h_gal, data.h_mil, data.c_epa, data.h_epa, data.epa, data.gal, data.rec_crt_dt, data.mil, data.avg_sp, data.tr_dis,data.en_lat, data.en_long, data.s_id,data.a_id, data.summ_typ, data.summ_dt, data.t_summ_id ))
batch.add(boundStatement.bind(data.tr_dur, data.id_tm, data.max_sp, data.c_dist, data.h_dist, data.dist, data.c_gal, data.c_mil, data.h_gal, data.h_mil, data.c_epa, data.h_epa, data.epa, data.gal, data.rec_crt_dt, data.mil, data.avg_sp, data.tr_dis, data.en_lat, data.en_long, data.s_id, data.a_id, data.summ_typ, data.summ_dt, data.t_summ_id))
}
def getQueryData(session: Session, im: String): (Long, String, String, String) = {
//println("query---->>>> :" + dInfoSelectQuery)
val selectStatement = session.prepare(dInfoSelectQuery)
val boundStatement = new BoundStatement(selectStatement)
val result: ResultSet = session.execute(boundStatement.bind(im))
val row = result.one()
(row.getLong("s_id"), row.getString("a_id"), row.getString("d_id"), row.getString("d_s_no"))
}
def getDashBoardData(session: Session, Data: THData): AssetDashboardData = {
//println("query---->>>> :" + dashboardSelectQuery)
val selectStatement = session.prepare(dashboardSelectQuery)
val boundStatement = new BoundStatement(selectStatement)
val result: ResultSet = session.execute(boundStatement.bind(Data.s_id, Data.a_id, "odometer", "calculated"))
var assetDashboardData: AssetDashboardData = null
val row = result.one()
if (row != null) {
//doing some processing
}
assetDashboardData
}
def dashBoardInsert(session: Session, data: THData, batch: BatchStatement) {
val insertStatement = session.prepare(insertDashBoardQuery)
//insertStatement.setConsistencyLevel(ConsistencyLevel.QUORUM)
val boundStatement = new BoundStatement(insertStatement)
batch.add(boundStatement.bind(data.s_id, data.a_id, "odometer", "calculated", data.odometer, new Date(System.currentTimeMillis()), new Date(System.currentTimeMillis()), data.a_s_no))
}
def dashBoardUpdate(session: Session, data: THData, batch: BatchStatement) {
val updateStatement = session.prepare(updateDashBoardQuery)
//insertStatement.setConsistencyLevel(ConsistencyLevel.QUORUM)
val boundStatement = new BoundStatement(updateStatement)
batch.add(boundStatement.bind(data.odometer, new Date(System.currentTimeMillis()), data.s_id, data.a_id, "odometer", "calculated"))
}
................
.................
推荐答案
调用 prepare Everytime 不是一个好主意..以避免您可以简单地保留查询字符串与准备好的语句的映射..您可以在以下位置填充缓存仅启动因此准备只会被调用一次......现在在您的 Cassandrautil 方法中,您将从地图中获取准备好的语句并创建绑定语句并执行它.
Calling prepare Everytime is not a good idea..to avoid that you can simply keep a map of query string vs prepared statements..you can fill the cache at startup only hence prepare will be called only once ...now in your Cassandrautil methoda you will get prepared statement from map and create the bound statement and execute it.
这篇关于多次使用准备好的语句,警告 Cassandra 查询降低性能的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!