四台服务器部署cassandra集群 和 presto搜索引擎 及代码演示:

还有很多细节没有补充,有问题和疑问的地方 咋们一起探讨哇!

1.创建用户
    使用root用户登录应用服务器,执行以下操作:
        adduser cassandra
        passwd cassandra
    使用cassandra用户登录应用服务器验证是否安装python 2 和JDK 8(我使用jdk8版本是因为项目中cassandra配合presto搜索引擎使用,presto需要jdk8支持):
  python:一般linux机器自带python,如需安装请自行安装

  JDK:
    以下操作都是在四台linux服务器下进行操作:

  将jdk-8u92-linux-x64.tar.gz软件上传到root用户下的/usr/local目录下:

  解压jdk-8u92-linux-x64.tar.gz软件:

  tar -zxvf jdk-8u92-linux-x64.tar.gz

  修改/etc/profile配置文件:

  vi /etc/profile

 在文件末尾新增下面内容:
        #set java environment
        JAVA_HOME=/usr/local/jdk1.8.0_92/
        PATH=$JAVA_HOME/bin:$PATH
        CLASSPATH=$JAVA_HOME/jre/lib:$JAVA_HOME/lib
        export  JAVA_HOME  PATH  CLASSPATH

 使文件重新生效:
        source /etc/profile

2.创建省级目录
  mkdir product
  mkdir -p /home/cassandra/install/`date +%C%y%m%d`/
  将升级文件apache-cassandra-2.1.16-bin.tar.gz、presto-server-0.165.tar.gz、etc.tar.gz、presto-cli-0.165-executable.jar上传至对应环境的应用服务器升级目录:/home/cassandra/install/`date +%C%y%m%d`/
解压升级包:
cd  /home/cassandra/install/`date +%C%y%m%d`/
tar  -zxvf  etc.tar.gz
简要概述机器用途(UAT环境):
*.*.178.131(cassandra种子节点)(presto的coordinator节点)
*.*.178.132(cassandra种子节点)(presto的worker节点
*.*.178.133(cassandra普通节点)(presto的worker节点)
*.*.178.134(cassandra普通节点)(presto的worker节点)

3.安装cassandra:
使用cassandra用户登录应用服务器(四台机器),执行如下操作:
cd  /home/cassandra/install/`date +%C%y%m%d`/
tar -zvxf  apache-cassandra-2.1.16-bin.tar.gz  -C /home/cassandra/product/
cd /home/cassandra/product/apache-cassandra-2.1.16/conf
sed -i 's/seeds: "127.0.0.1"/seeds: "种子节点IP1,种子节点IP2"/g' cassandra.yaml

(将种子节点IP1,种子节点IP2修改为对应环境种子节点IP地址,种子节点选择 稳定的服务器)
sed -i 's/listen_address: localhost/listen_address: 各服务器IP/g' cassandra.yaml
(将localhost修改为各服务器IP地址)
sed -i 's/rpc_address: localhost/rpc_address: 个服务器IP/g' cassandra.yaml
(将localhost修改为各服务器IP地址)
sed -i 's/authenticator: AllowAllAuthenticator/authenticator: PasswordAuthenticator/g' cassandra.yaml
 //这个内存很重要,cassandra预处理的数据都缓存在这里 最大建议值 不要超过16G
sed -i 's/#MAX_HEAP_SIZE="4G"/MAX_HEAP_SIZE="8G"/g' cassandra-env.sh
sed -i 's/#HEAP_NEWSIZE="800M"/HEAP_NEWSIZE="1600M"/g' cassandra-env.sh
cd /home/cassandra/product/apache-cassandra-2.1.16/bin
sed -i 's/DEFAULT_REQUEST_TIMEOUT_SECONDS = 10/DEFAULT_REQUEST_TIMEOUT_SECONDS = 3600/g' cqlsh

启动服务器:
 在服务器*.*.178.131和*.*.178.132,执行如下代码:
 cd /home/cassandra/product/apache-cassandra-2.1.16/bin
 ./cassandra -p cassandra.pid
 输出INFO  06:22:29  Node /机器IP state jump to NORMAL 表示成功启动。
 再分别在服务器*.*.178.133和*.*.178.134上执行。
  cd /home/cassandra/product/apache-cassandra-2.1.16/bin
 ./cassandra -p cassandra.pid
  输出INFO  06:22:29 Node /机器IP state jump to NORMAL 表示成功启动,即可回车退出

四台cassandra服务器启动好后,查看集群状态 第一列 UN为正常状态:

cd /home/cassandra/product/apache-cassandra-2.1.16/bin
[cassandra@hadoop01 bin]$ ./nodetool status
Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address        Load       Tokens  Owns (effective)  Host ID                               Rack
UN  *.*.178.131  159.37 KB  256     64.1%             6056ee11-f932-4a7c-a13e-632e143e2cbf  rack1
UN  *.*.178.132  141.05 KB  256     66.7%             d385b5d5-638d-4d9a-ae76-30b15846864d  rack1
UN  *.*.178.133  159.72 KB  256     69.2%             262c33b7-bf6e-4dfc-b2d0-3a90cbe66622  rack1

UN  *.*.178.134  159.72 KB  256     67.1%             262c33b7-bf6e-4dfc-b2d0-3a90cbe66622  rack1

4安装presto:
使用cassandra用户登录应用服务器(四台服务器) 执行如下操作:
cd  /home/cassandra/install/`date +%C%y%m%d`
cp presto-cli-0.165-executable.jar /home/cassandra/product/
cd /home/cassandra/product
mv presto-cli-*.jar  presto-cli
chmod 777 presto-cli
cd  /home/cassandra/install/`date +%C%y%m%d`
tar -zvxf  presto-server-0.165.tar.gz  -C /home/cassandra/product/
cp -r etc  /home/cassandra/product/presto-server-0.165/
mkdir /home/cassandra/product/presto-server-0.165/prestodata

①修改catalog属性:
cd /home/cassandra/product/presto-server-0.165/etc/catalog
sed -i 's/cassandra.contact-points=200.31.156.206,200.31.147.207/cassandra.contact-points=*.*.178.131,*.*.178.132/g'  cassandra.properties
(将*.*.178.131,*.*.178.132修改为对应环境种子节点IP地址)
②修改配置属性:
cd /home/cassandra/product/presto-server-0.165/etc
#presto的worker节点3台服务器(UAT为*.*.178.132,*.*.178.133和*.*.178.134),执行以下操作:
sed -i 's/coordinator=true/coordinator=false/g' config.properties
sed -i '/node-scheduler.include-coordinator/'d config.properties
sed -i '/discovery-server.enabled/'d config.properties

#使用cassandra用户登录4台应用服务器(UAT环境为:*.*.178.131、*.*.178.132、*.*.178. 133、*.*.178.134)
sed -i 's/discovery.uri=http:\/\/200.31.147.10:8089/discovery.uri=http:\/\/*.*.178.131:8089/g' config.properties
(将*.*.178.131修改为coordinator节点的地址,UAT环境为*.*.178.131)
sed -i 's/query.max-memory=30GB/query.max-memory=30GB/g' config.properties
(将30G修改为物理内存的60%  物理内存:查看  free -g)
sed -i 's/query.max-memory-per-node=20GB/query.max-memory-per-node=20GB/g' config.properties
(将20GB修改为物理内存的40% 物理内存:查看  free -g)

③ 修改节点信息:
sed -i 's/node.id=ffffffff-ffff-ffff-ffff-fffffffffff1/node.id=ffffffff-ffff-ffff-ffff-fffffffffff2/g' node.properties
(本操作只在三台worker节点上运行, fffffffffff2,fffffffffff3 , fffffffffff4)
node.id:(coordinator节点为ffffffff-ffff-ffff-ffff-fffffffffff1;其它三个work节点分别为ffffffff-ffff-ffff-ffff-fffffffffff2、ffffffff-ffff-ffff-ffff-fffffffffff3、ffffffff-ffff-ffff-ffff-fffffffffff4)
  ④修改jvm信息:
 sed -i 's/-Xmx40G/-Xmx20G/g' jvm.config
(20G修改为物理内存的70%  物理内存:查看  free -g)
启动服务器:
使用cassandra用户登录应用服务器(UAT环境为:*.*.178.131、*.*.178.132、*.*.178. 133、*.*.178.134),执行如下操作:
cd /home/cassandra/product/presto-server-0.165/bin
./launcher start

5.后续即可登录presto 或者 Cassandra 使用sql查询

cassndra 连接DB:
        cd /product/apache-cassandra-2.1.16/bin     ./cqlsh --request-timeout=3600 -u cassandra -p cassandra  *.*.178.131 (coordinator节点_IP)
    presto 连接cassandra数据库:
        cd /product    ./presto-cli --server IP:8089 --catalog cassandra --schema dep_ntpi(dep_ntpi数据库实例名)                
    presto 监控界面:     172.17.193.13:8089 ,可以监控presto查询 耗时,内存等状态

6.代码样例 ,Java 连接 Cassandra数据库 ,需要下载使用 cassandra-driver-core-2.0.9.2.jar ,代码如下
    a.连接数据库工具类

package com.cfets.util;

import org.apache.log4j.Logger;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.HostDistance;
import com.datastax.driver.core.PoolingOptions;
import com.datastax.driver.core.QueryOptions;
import com.datastax.driver.core.Session; public class CassandraHandle {
private static final Logger logger = Logger.getLogger(CassandraHandle.class);
private Cluster cluster;
private Session session;
private String[] hosts;
private final static CassandraHandle cassandraHandle = new CassandraHandle();
public static CassandraHandle getInstance(){
return cassandraHandle;
}
/**
* Initialize cassandra hosts.
*/
private CassandraHandle(){
hosts = new String[]{ReadDbConfigFile.getProperty("cassandra.Ip"),
ReadDbConfigFile.getProperty("cassandra.Ip2"),
ReadDbConfigFile.getProperty("cassandra.Ip3"),
ReadDbConfigFile.getProperty("cassandra.Ip4")};
getConnectPool(hosts);
} /**
* Get cassandra ConnectionPool
*@author:houyao
*@date:2018年1月4日 下午3:44:47
*@Description:
*@param hosts
*/
private void getConnectPool(String[] hosts){
try {
QueryOptions qo = new QueryOptions();
//一致性级别
qo.setConsistencyLevel(ConsistencyLevel.QUORUM);
PoolingOptions poolingOptions = new PoolingOptions();
poolingOptions.setMaxSimultaneousRequestsPerConnectionThreshold(HostDistance.LOCAL, 32);
poolingOptions.setCoreConnectionsPerHost(HostDistance.LOCAL, 2);
poolingOptions.setMaxConnectionsPerHost(HostDistance.LOCAL, 4);
cluster = Cluster.builder().addContactPoints(hosts)
.withQueryOptions(qo)
.withCredentials(ReadDbConfigFile.getProperty("cassandra.name"), ReadDbConfigFile.getProperty("cassandra.password"))
.withPoolingOptions(poolingOptions)
.build(); this.session = cluster.connect();
} catch (Exception e) {
logger.error("CassandraHandle.getConnectPool() 连接异常: "+e.getMessage(),e);
e.printStackTrace();
}
} public Session getSession() {
return session;
} public void closeConn() {
cluster.close();
}
}

b. cassandra执行insert操作

package com.cfets.parser;

import java.util.Date;
import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.log4j.Logger; import cn.com.cfets.data.MetaObject;
import cn.com.cfets.data.coretransaction.fxoption.Quote; import com.cfets.imt.DEPService;
import com.cfets.imt.common.AbstractParser;
import com.cfets.util.CassandraHandle;
import com.cfets.util.DateUtils;
import com.cfets.util.FileUtils;
import com.cfets.util.ReadConfigFile;
import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session; public class DataNTPI0021C0001 extends AbstractParser { private static final Logger logger = Logger.getLogger(DataNTPI0021C0001.class); private ConcurrentLinkedQueue<Quote> datas = new ConcurrentLinkedQueue<Quote>();
private static int oddOreven = Integer.parseInt(ReadConfigFile.getProperty("timestamp")); public DataNTPI0021C0001() {
new DealQThread().start();
} // 内部类
protected class DealQThread extends Thread { private ConcurrentLinkedQueue<Quote> queue; public void run() {
Session session=null;
BatchStatement bs=null;
PreparedStatement statement=null;
try {
session = CassandraHandle.getInstance().getSession();
statement = (PreparedStatement) session.prepare(" insert into DEP_NTPI.fx_optn_prc_dtl (ord_num,prdct_cd, prd, impld_vltlty, qt_cd, qt_tm, qt_st, dt_cnfrm, cn_shrt_nm, en_shrt_nm, qtng_instn_cd, qt_instn_trdr_cd, ccy_pair_cd, trdng_md_cd, qt_instn_lgl_grp_nm, qt_instn_cn_full_nm, qt_dir, qt_instn_cfets_instn_cd, qt_instn_en_full_nm, qt_vrty, trdng_mthd_cd) values (?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);");
while (true) {
try {
synchronized (DataNTPI0021C0001.class) {
queue = datas;
datas = new ConcurrentLinkedQueue<Quote>();
} if(session.isClosed()){
session = CassandraHandle.getInstance().getSession();
statement = (PreparedStatement) session.prepare(" insert into DEP_NTPI.fx_optn_prc_dtl (ord_num,prdct_cd, prd, impld_vltlty, qt_cd, qt_tm, qt_st, dt_cnfrm, cn_shrt_nm, en_shrt_nm, qtng_instn_cd, qt_instn_trdr_cd, ccy_pair_cd, trdng_md_cd, qt_instn_lgl_grp_nm, qt_instn_cn_full_nm, qt_dir, qt_instn_cfets_instn_cd, qt_instn_en_full_nm, qt_vrty, trdng_mthd_cd) values (?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);");
}
bs = new BatchStatement(); boolean flag=false;// if queue have message then flag = true ,next BatchStatement will be execute.
int timeStamp = oddOreven;//ord_num(cassandraDB)= currentTimeMillis* + timeStamp, timeStamp decide the ord_num=odd number or even number
long count;//memory currentTimeMillis*1000000
while (!queue.isEmpty()) {
flag=true;
Quote message = queue.poll();
logger.info("DATA_NTPI_0021_C_0001 JSON:"+FileUtils.toJSONString(message));
count = System.currentTimeMillis()*1000000;
timeStamp = timeStamp+2;//-1 ++2 ,every time add 2=odd number
bs.add(statement.bind(
//count+timeStamp replace DB.getSeqence("SEQ_FX_OPTN_DPTH_QT_DATA.NEXTVAL"),
//Attention: AA the First server=odd number ,the second server=even number
//now is odd number
count+timeStamp,
//DB.getSeqence("SEQ_FX_OPTN_PRC_DTL.NEXTVAL"),
message.isSetProductCode() ? message.getProductCode() : "",
message.isSetPeriod() ? message.getPeriod() : "",
message.isSetImpliedVolatility() ? message.getImpliedVolatility() + "" : "",
message.isSetQuoteCode() ? message.getQuoteCode(): "",
message.isSetQuoteTime() ? DateUtils.dateToString(message.getQuoteTime(),"yyyy-MM-dd HH:mm:ss") : "",
message.isSetQuoteStatus() ? message.getQuoteStatus() : "",
message.isSetDateConfirmed() ? DateUtils.dateToString(message.getDateConfirmed(),"yyyy/MM/dd") : "",
message.isSetChineseShortName() ? message.getChineseShortName() : "",
message.isSetEnglishShortName() ? message.getEnglishShortName() : "",
message.isSetQuotingInstitutionCode() ? message.getQuotingInstitutionCode() : "",
message.isSetQuoteInstitutionTraderCode() ? message.getQuoteInstitutionTraderCode() : "",
message.isSetCurrencyPairCode() ? message.getCurrencyPairCode() : "",
message.isSetTradingModeCode() ? message.getTradingModeCode() : "",
message.isSetQuoteInstitutionLegalGroupName() ? message.getQuoteInstitutionLegalGroupName() : "",
message.isSetQuoteInstitutionChineseFullName() ? message.getQuoteInstitutionChineseFullName() : "",
message.isSetQuoteDirection() ? message.getQuoteDirection() : "",
message.isSetQuoteInstitutionCfetsInstitutionCode() ? message.getQuoteInstitutionCfetsInstitutionCode(): "",
message.isSetQuoteInstitutionEnglishFullName() ? message.getQuoteInstitutionEnglishFullName() : "",
message.isSetQuoteVariety() ? message.getQuoteVariety() : "",
message.isSetTradingMethodCode() ? message.getTradingMethodCode() : ""));
if(bs.size()>=5000){
session.execute(bs);
bs.clear();
timeStamp = oddOreven;
}
}
if(flag){
session.execute(bs);
logger.info("DATA_NTPI_0021_C_0001类型的数据插入成功!");
}
} catch (Exception e1) {
logger.error("DATA_NTPI_0021_C_0001类型的数据插入失败!"+e1);
e1.printStackTrace();
} try {
Thread.sleep(1000);
} catch (InterruptedException e) {
break;
}
}
} catch (Exception e2) {
logger.error("DATA_NTPI_0021_C_0001获取cassandra连接失败:"+e2.getMessage(), e2);
}finally{
try{
session.close();
}catch(Exception ex){
logger.error("DATA_NTPI_0021_C_0001关闭cassandra的session异常:"+ex);
}
}

使用presto查询需要下载  presto-jdbc-0.100.jar

c.presto连接cassandra工具类

package com.cfets.util;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement; public class PrestoUtils { static{
try {
Class.forName("com.facebook.presto.jdbc.PrestoDriver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
} public static Connection getConnection(){
Connection conn = null;
try {
conn = DriverManager.getConnection(ReadDbConfigFile.getProperty("presto.url"),ReadDbConfigFile.getProperty("cassandra.name"),ReadDbConfigFile.getProperty("cassandra.password"));
} catch (SQLException e) {
e.printStackTrace();
}
return conn;
} public static void release(Connection conn, Statement ps, ResultSet rs){
if(rs != null){
try{
rs.close();
}catch(SQLException e){
e.printStackTrace();
}
rs = null;
}
if(ps != null){
try{
ps.close();
}catch(SQLException e){
e.printStackTrace();
}
ps = null;
}
if(conn != null){
try{
conn.close();
}catch(SQLException e){
e.printStackTrace();
}
conn = null;
}
}
}

4.presto查询demo

package com.cfets.market;

import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.Date;
import java.sql.ResultSet;
import java.sql.Statement;
import java.text.SimpleDateFormat; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import com.cfets.util.DB;
import com.cfets.util.PrestoUtils;
import com.cfets.vo.FxSptfwdswpMrktOrdrDpth; /**
* 外汇即期/远期/掉期档位行情(ODM)数据 转存oracle库
* @author wangzl
*/
public class FxSptfwdswpMrktOrdrDpthOrcl {
private final Logger logger = LoggerFactory.getLogger(FxSptfwdswpMrktOrdrDpthOrcl.class); /**
* 查询cassandra库数据
* @param targetDate
*/
public void loadDataFromPrestoFile(String targetDate) {
long start = System.currentTimeMillis();
Connection conn = null;
Statement stmt = null;
ResultSet rs = null;
try {
conn = PrestoUtils.getConnection();
stmt = conn.createStatement();
long num = 0;
logger.info("DATA-TSS-0012 外汇即期/远期/掉期档位行情(ODM):开始执行 转存oracle库 操作!");
while(true){
String sql = "SELECT ORD_NUM,PRDCT_CD,DL_MKT_DT_TP,BID_RATE,ASK_RATE,PRD,MKT_DATA_UPD_TM,DT_CNFRM,CCY_PAIR_CD,TRDNG_MD_CD,CNTRCT_NM,TRDNG_MD,MKT_DPTH_INDCTR,CFETS_UNFD_CD,BID_QT_TM,ASK_QT_TM,BID_QT_AMNT,ASK_QT_AMNT FROM DEP_NTPI.FX_SWAP_MKT_ORDER_DEPTH WHERE ORD_NUM > "+num+" AND DT_CNFRM = '"+targetDate+"' LIMIT 1000000";
logger.info("DATA-TSS-0012 外汇即期/远期/掉期档位行情(ODM):开始查询 SQL:"+sql);
rs = stmt.executeQuery(sql);
if(rs.next()){
FxSptfwdswpMrktOrdrDpth fsmod = new FxSptfwdswpMrktOrdrDpth();
fsmod.setOrdNum(rs.getLong("ORD_NUM"));
fsmod.setPrdctCd(rs.getString("PRDCT_CD"));
fsmod.setDlMktDtTp(rs.getString("DL_MKT_DT_TP"));
fsmod.setBidRate(rs.getString("BID_RATE"));
fsmod.setAskRate(rs.getString("ASK_RATE"));
fsmod.setPrd(rs.getString("PRD"));
fsmod.setMktDataUpdTm(rs.getString("MKT_DATA_UPD_TM"));
fsmod.setDtCnfrm(rs.getString("DT_CNFRM"));
fsmod.setCcyPairCd(rs.getString("CCY_PAIR_CD"));
fsmod.setTrdngMdCd(rs.getString("TRDNG_MD_CD"));
fsmod.setCntrctNm(rs.getString("CNTRCT_NM"));
fsmod.setTrdngMd(rs.getString("TRDNG_MD"));
fsmod.setMktDpthIndctr(rs.getString("MKT_DPTH_INDCTR"));
fsmod.setCfetsUnfdCd(rs.getString("CFETS_UNFD_CD"));
fsmod.setBidQtTm(rs.getString("BID_QT_TM"));
fsmod.setAskQtTm(rs.getString("ASK_QT_TM"));
fsmod.setBidQtAmnt(rs.getString("BID_QT_AMNT"));
fsmod.setAskQtAmnt(rs.getString("ASK_QT_AMNT"));
long firstOrdNum = rs.getLong("ORD_NUM");
num = toInsertDB(fsmod,rs);
if(num == 0){//防止出现死循环
num = firstOrdNum;
}
}else{
break;
}
}
logger.info("DATA-TSS-0012 外汇即期/远期/掉期档位行情(ODM):转存oracle库正常结束! 花费时间:"+(System.currentTimeMillis() - start)+" ms!");
} catch (Exception e) {
e.printStackTrace();
logger.error("DATA-TSS-0012 外汇即期/远期/掉期档位行情(ODM):转存oracle库 出现异常:"+e.getMessage(),e);
} finally{
PrestoUtils.release(conn, stmt, rs);
}
}
04-17 04:41