我们已经开发了一个 Camel 捆绑包(部署在Karaf中),有望每24小时从MySQL提取数据并将其推送到S3。但是,如果MySQL在闲置8小时后内部关闭了连接,则在下一次计划执行时,它会引发错误。请参阅下面的代码片段。
特性:
MySqlDriver=com.mysql.jdbc.Driver
MySqlDatabaseURL=jdbc:mysql://x.x.x.x/dbname?autoReconnect=true
MySqlUsername=sm*****
MySqlPassword=*******
激活剂:
public class Activator implements BundleActivator {
public CamelContext context = null;
public void start(BundleContext bundleContext) throws Exception {
DataSource dataSource = UDMSUtils.createDataSource(UDMSUtils.getProperty(UDMSConstants.MYSQL_DATABASE_URL));
SimpleRegistry simpleRegistry = new SimpleRegistry();
simpleRegistry.put(UDMSConstants.UDMS_DATA_SOURCE, dataSource);
context = new OsgiDefaultCamelContext(bundleContext, simpleRegistry);
context.addRoutes(new CreativeRoutes());
context.start();
}
}
建筑数据源:
public static DataSource createDataSource(String connectURI) {
BasicDataSource ds = new BasicDataSource();
ds.setDriverClassName(getProperty(UDMSConstants.MYSQL_DRIVER));
ds.setUsername(getProperty(UDMSConstants.MYSQL_USERNAME));
ds.setPassword(getProperty(UDMSConstants.MYSQL_PASSWORD));
ds.setUrl(connectURI);
ds.setMaxWait(-1); // Waits indefinately
return ds;
}
路线:
from("timer://Timer?repeatCount=1").to("direct:record_count").end();
from("direct:record_count")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
exchange.getIn().setBody(query);
}
})
.routeId("record_count")
.to("jdbc:" + UDMSConstants.UDMS_DATA_SOURCE)
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
// ...
}
);
任何人都可以提出建议,在上述代码中需要进行哪些更改,以便连接在我们需要的时间内保持 Activity 状态。
请注意:我们无权更改
mysql.properties
,因此我们需要在代码中进行处理。 最佳答案
我前段时间也遇到过类似的问题。 VikingSteve在他建议您做的事情中也很特别。由于我使用的是OSGI蓝图,因此我用XML进行了所有配置,因此我按照以下方法进行了解决。
1)在您的pom中添加一个Apache Commons DBCP依赖项:
<dependency>
<groupId>commons-dbcp</groupId>
<artifactId>commons-dbcp</artifactId>
<version>1.4</version>
</dependency>
2)在 Camel 路径/蓝图文件中声明连接池,如下所示:
<bean id="MydataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close" scope="singleton" >
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://DB-001:3306/Customer"/>
<property name="username" value="sys_ETL"/>
<property name="password" value="Blah"/>
<property name="initialSize" value="4"/>
<property name="maxActive" value="32"/>
<property name="maxIdle" value="16"/>
<property name="minIdle" value="8"/>
<property name="timeBetweenEvictionRunsMillis" value="1800"/>
<property name="minEvictableIdleTimeMillis" value="1800"/>
<property name="testOnBorrow" value="true"/>
<property name="testWhileIdle" value="true"/>
<property name="testOnReturn" value="true"/>
<property name="validationQuery" value="SELECT 1"/>
<property name="maxWait" value="1000"/>
<property name="removeAbandoned" value="true"/>
<property name="logAbandoned" value="true"/>
<property name="removeAbandonedTimeout" value="30000"/>
</bean>
此步骤将创建一个数据库连接池作为Bean,然后可以在路由中使用它。该bean的名称为
Mydatasource
,稍后我将使用此信息。还要注意我在配置中为连接池设置的属性。这些属性使我的连接池可以增长和收缩,并且还可以确保即使闲置后连接也不会失效。3)创建一个POJO以使用此连接池:
public class AccountInformationToDatabase {
private BasicDataSource dataSource;
public BasicDataSource getDataSource() {
return dataSource;
}
public void setDataSource(BasicDataSource dataSource) {
this.dataSource = dataSource;
}
@Handler
public void PersistRecord
(
@Body AccountRecordBindy msgBody
, @Headers Map hdr
, Exchange exch
) throws Exception
{
Connection conn = null;
PreparedStatement stmt=null;
try
{
conn= dataSource.getConnection();
stmt =conn.prepareStatement("SOME INSERT STATEMENT");
stmt.setString(1,msgBody.getAccountNumber().trim());
stmt.setString(2,msgBody.getRecordType().trim() );
stmt.setString(3,msgBody.getSequenceNumber().trim());
stmt.setString(4,msgBody.getTitle().trim());
stmt.setString(5,msgBody.getCustomerType().trim());
stmt.setString(6,msgBody.getName().trim());
stmt.setString(7,msgBody.getAccountAddress1().trim());
stmt.executeUpdate();
}
catch (Exception e)
{
throw new Exception(e.getMessage());
}
finally
{
try
{
if (stmt!=null)
{
stmt.close();
stmt= null;
}
if (conn!=null)
{
conn.close();
conn= null;
}
}
catch(SQLException e)
{
throw new Exception(e.getMessage());
}
}
}
}
该POJO具有一个名为datasource的属性,其类型为
org.apache.commons.dbcp.BasicDataSource
。现在,我可以将Mydatasource
bean注入此POJO中,以便我的类可以访问连接池。4)将POJO转换为bean并注入连接池:
<bean id="AccountPersist" class="AccountInformationToDatabase">
<property name="dataSource" ref="MydataSource"/>
</bean>
如果您正在执行文本文件处理并且想要使用并发插入等,则必须具有此技术。