我们已经开发了一个 Camel 捆绑包(部署在Karaf中),有望每24小时从MySQL提取数据并将其推送到S3。但是,如果MySQL在闲置8小时后内部关闭了连接,则在下一次计划执行时,它会引发错误。请参阅下面的代码片段。

特性:

MySqlDriver=com.mysql.jdbc.Driver
MySqlDatabaseURL=jdbc:mysql://x.x.x.x/dbname?autoReconnect=true
MySqlUsername=sm*****
MySqlPassword=*******

激活剂:
public class Activator implements BundleActivator {

    public CamelContext context = null;

    public void start(BundleContext bundleContext) throws Exception {
        DataSource dataSource = UDMSUtils.createDataSource(UDMSUtils.getProperty(UDMSConstants.MYSQL_DATABASE_URL));

        SimpleRegistry simpleRegistry = new SimpleRegistry();
        simpleRegistry.put(UDMSConstants.UDMS_DATA_SOURCE, dataSource);

        context = new OsgiDefaultCamelContext(bundleContext, simpleRegistry);
        context.addRoutes(new CreativeRoutes());
        context.start();
    }

}

建筑数据源:
public static DataSource createDataSource(String connectURI) {
    BasicDataSource ds = new BasicDataSource();
    ds.setDriverClassName(getProperty(UDMSConstants.MYSQL_DRIVER));
    ds.setUsername(getProperty(UDMSConstants.MYSQL_USERNAME));
    ds.setPassword(getProperty(UDMSConstants.MYSQL_PASSWORD));
    ds.setUrl(connectURI);
    ds.setMaxWait(-1);  // Waits indefinately
    return ds;
}

路线:
from("timer://Timer?repeatCount=1").to("direct:record_count").end();

from("direct:record_count")
    .process(new Processor() {
        @Override
        public void process(Exchange exchange) throws Exception {
            exchange.getIn().setBody(query);
        }
    })
    .routeId("record_count")
    .to("jdbc:" + UDMSConstants.UDMS_DATA_SOURCE)
    .process(new Processor() {
        @Override
        public void process(Exchange exchange) throws Exception {
            // ...
        }
    );

任何人都可以提出建议,在上述代码中需要进行哪些更改,以便连接在我们需要的时间内保持 Activity 状态。

请注意:我们无权更改mysql.properties,因此我们需要在代码中进行处理。

最佳答案

我前段时间也遇到过类似的问题。 VikingSteve在他建议您做的事情中也很特别。由于我使用的是OSGI蓝图,因此我用XML进行了所有配置,因此我按照以下方法进行了解决。

1)在您的pom中添加一个Apache Commons DBCP依赖项:

<dependency>
    <groupId>commons-dbcp</groupId>
    <artifactId>commons-dbcp</artifactId>
    <version>1.4</version>
</dependency>

2)在 Camel 路径/蓝图文件中声明连接池,如下所示:
<bean id="MydataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close" scope="singleton" >
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" value="jdbc:mysql://DB-001:3306/Customer"/>
    <property name="username" value="sys_ETL"/>
    <property name="password" value="Blah"/>
    <property name="initialSize" value="4"/>
    <property name="maxActive" value="32"/>
    <property name="maxIdle" value="16"/>
    <property name="minIdle" value="8"/>
    <property name="timeBetweenEvictionRunsMillis" value="1800"/>
    <property name="minEvictableIdleTimeMillis" value="1800"/>
    <property name="testOnBorrow" value="true"/>
    <property name="testWhileIdle" value="true"/>
    <property name="testOnReturn" value="true"/>
    <property name="validationQuery" value="SELECT 1"/>
    <property name="maxWait"  value="1000"/>
    <property name="removeAbandoned" value="true"/>
    <property name="logAbandoned" value="true"/>
    <property name="removeAbandonedTimeout" value="30000"/>
</bean>

此步骤将创建一个数据库连接池作为Bean,然后可以在路由中使用它。该bean的名称为Mydatasource,稍后我将使用此信息。还要注意我在配置中为连接池设置的属性。这些属性使我的连接池可以增长和收缩,并且还可以确保即使闲置后连接也不会失效。

3)创建一个POJO以使用此连接池:
public class AccountInformationToDatabase {


private BasicDataSource dataSource;
public BasicDataSource getDataSource() {
    return dataSource;
}
public void setDataSource(BasicDataSource dataSource) {
    this.dataSource = dataSource;
}
@Handler
public void PersistRecord
(
        @Body AccountRecordBindy msgBody
        , @Headers Map hdr
        , Exchange exch
) throws Exception
{

    Connection conn = null;
    PreparedStatement stmt=null;



    try
    {


        conn= dataSource.getConnection();
        stmt =conn.prepareStatement("SOME INSERT STATEMENT");

        stmt.setString(1,msgBody.getAccountNumber().trim());
        stmt.setString(2,msgBody.getRecordType().trim() );
        stmt.setString(3,msgBody.getSequenceNumber().trim());
        stmt.setString(4,msgBody.getTitle().trim());
        stmt.setString(5,msgBody.getCustomerType().trim());
        stmt.setString(6,msgBody.getName().trim());
        stmt.setString(7,msgBody.getAccountAddress1().trim());


        stmt.executeUpdate();






    }
    catch (Exception e)
    {

        throw new Exception(e.getMessage());

    }

    finally
    {
        try
        {
                if (stmt!=null)
                {
                    stmt.close();
                    stmt= null;
                }
                if (conn!=null)
                {
                    conn.close();
                    conn= null;
                }
        }
        catch(SQLException e)
        {

            throw new Exception(e.getMessage());

        }

    }


}

}

该POJO具有一个名为datasource的属性,其类型为org.apache.commons.dbcp.BasicDataSource。现在,我可以将Mydatasource bean注入此POJO中,以便我的类可以访问连接池。

4)将POJO转换为bean并注入连接池:
<bean id="AccountPersist"   class="AccountInformationToDatabase">
    <property name="dataSource" ref="MydataSource"/>
</bean>

如果您正在执行文本文件处理并且想要使用并发插入等,则必须具有此技术。

07-28 02:33
查看更多