本文介绍了当每个螺栓从同一个喷口获取数据时,如何一个接一个地执行螺栓?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在从 spout 获取数据.每个 bolt 都会将映射的字段插入到我的数据库中的不同表中.但是我的数据库表有约束.在我的测试表中,我有两个名为 user-details 和 my-details 的表,它们的约束允许首先填充用户表(首先应该插入),然后只插入我的详细信息表.当我运行拓扑时,只有用户表被插入,因为当螺栓对数据库执行插入查询时,它允许只有 psqlbolt 首先插入(由于约束)并且 psqlbolt1 抛出异常,说没有找到用户 ID.所以当我做这两个螺栓工作时,我在 psqlbolt1 中保持(1000)睡眠.但是当我对许多螺栓应用相同时(12) 等待时间正在增加,并且bolt 执行失败,说bolt 等待时间已超过.如何在只有psql1 开始插入之后先执行用户字段.

I'm taking data from spout.Each bolt will insert mapped fields into different tables in my database.But my database tables have constraints.in my test tables I have two tables named user-details and my-details for which constraints allows users-table to fill first(first on should be inserted) after that only my-details table will be inserted.when I run the topology only users-table is getting inserted because when bolts perform the insert query to the database it is allowing only psqlbolt to insert first (because of constraints) and psqlbolt1 throwing the exception saying users id is not found.So I kept (1000)sleep in the psqlbolt1 when I do that two bolts are working.But when I apply same for many bolts(12) the waiting timing is increasing and bolt execution is failing saying bolt wait time is exceeded.How can I execute user fields first after that only psql1 should start inserting.

我的拓扑类

public class Topology {

ConnectionProvider cp;
protected static final String JDBC_CONF = "jdbc.conf";
protected static final String TABLE_NAME = "users";
protected static final String SELECT_QUERY = "select dept_name from department, user_department where department.dept_id = user_department.dept_id" +
        " and user_department.user_id = ?";
public static void main(String[] args) throws Exception{
    String argument = args[0];
    JdbcMapper jdbcMapper;
   TopologyBuilder builder = new TopologyBuilder();
Map map = Maps.newHashMap();
        map.put("dataSourceClassName", "org.postgresql.ds.PGSimpleDataSource");
    map.put("dataSource.url","jdbc:postgresql://localhost:5432/twitter_analysis?user=postgres");
ConnectionProvider cp = new MyConnectionProvider(map);

    jdbcMapper = new SimpleJdbcMapper(TABLE_NAME, cp);

    List<Column> schemaColumns = Lists.newArrayList(new Column("user_id", Types.INTEGER), new Column             ("user_name",Types.VARCHAR),new Column("create_date", Types.TIMESTAMP));



    JdbcMapper mapper = new SimpleJdbcMapper(schemaColumns);

        PsqlBolt userPersistanceBolt = new PsqlBolt(cp, mapper)
    .withInsertQuery("insert into user_details (id, user_name, created_timestamp) values (?,?,?)");

    builder.setSpout("myspout", new UserSpout(), 1);

    builder.setBolt("Psql_Bolt", userPersistanceBolt,1).shuffleGrouping("myspout");



    jdbcMapper = new SimpleJdbcMapper("My_details", cp);

    List<Column> schemaColumns1 = Lists.newArrayList(new Column("my_id", Types.INTEGER), new Column             ("my_name",Types.VARCHAR));



    JdbcMapper mapper1 = new SimpleJdbcMapper(schemaColumns1);

        PsqlBolt1 userPersistanceBolt1 = new PsqlBolt1(cp, mapper1)
    .withInsertQuery("insert into My_details (my_id, my_name) values (?,?)");

    //builder.setSpout("myspout", new UserSpout(), 1);

    builder.setBolt("Psql_Bolt1", userPersistanceBolt1,1).shuffleGrouping("myspout");
 Config conf = new Config();
     conf.put(JDBC_CONF, map);
     conf.setDebug(true);
         conf.setNumWorkers(3);


    if (argument.equalsIgnoreCase("runLocally")){
        System.out.println("Running topology locally...");
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("Twitter Test Storm-postgresql", conf, builder.createTopology());
    }
    else {
        System.out.println("Running topology on cluster...");
        StormSubmitter.submitTopology("Topology_psql", conf, builder.createTopology()); 
    }
}}

我的螺栓:psql1

public class PsqlBolt1 extends AbstractJdbcBolt {
private static final Logger LOG = Logger.getLogger(PsqlBolt1.class);

 private String tableName;
 private String insertQuery;
 private JdbcMapper jdbcMapper;

public PsqlBolt1(ConnectionProvider connectionProvider,  JdbcMapper jdbcMapper) {
    super(connectionProvider);
    this.jdbcMapper = jdbcMapper;
}
public PsqlBolt1 withInsertQuery(String insertQuery) {
    this.insertQuery = insertQuery;
System.out.println("query passsed.....");
    return this;
}
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
    super.prepare(map, topologyContext, collector);
    if(StringUtils.isBlank(tableName) && StringUtils.isBlank(insertQuery)) {
        throw new IllegalArgumentException("You must supply either a tableName or an insert Query.");
    }
}

@Override
public void execute(Tuple tuple) {
    try {
  Thread.sleep(1000);
        List<Column> columns = jdbcMapper.getColumns(tuple);
        List<List<Column>> columnLists = new ArrayList<List<Column>>();
        columnLists.add(columns);
        if(!StringUtils.isBlank(tableName)) {
            this.jdbcClient.insert(this.tableName, columnLists);
        } else {
            this.jdbcClient.executeInsertQuery(this.insertQuery, columnLists);
        }
        this.collector.ack(tuple);
    } catch (Exception e) {
        this.collector.reportError(e);
        this.collector.fail(tuple);
    }
}

@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

}}

psqlbolt:

public class PsqlBolt extends AbstractJdbcBolt {
private static final Logger LOG = Logger.getLogger(PsqlBolt.class);
 private String tableName;
 private String insertQuery;
 private JdbcMapper jdbcMapper;

public PsqlBolt(ConnectionProvider connectionProvider,  JdbcMapper jdbcMapper) {
    super(connectionProvider);
    this.jdbcMapper = jdbcMapper;
}
public PsqlBolt withTableName(String tableName) {
    this.tableName = tableName;
    return this;
}



public PsqlBolt withInsertQuery(String insertQuery) {
    this.insertQuery = insertQuery;
System.out.println("query passsed.....");
    return this;
}
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
    super.prepare(map, topologyContext, collector);
    if(StringUtils.isBlank(tableName) && StringUtils.isBlank(insertQuery)) {
        throw new IllegalArgumentException("You must supply either a tableName or an insert Query.");
    }
}

@Override
public void execute(Tuple tuple) {
    try {

        List<Column> columns = jdbcMapper.getColumns(tuple);
        List<List<Column>> columnLists = new ArrayList<List<Column>>();
        columnLists.add(columns);
        if(!StringUtils.isBlank(tableName)) {
            this.jdbcClient.insert(this.tableName, columnLists);
        } else {
            this.jdbcClient.executeInsertQuery(this.insertQuery, columnLists);
        }
        this.collector.ack(tuple);
    } catch (Exception e) {
        this.collector.reportError(e);
        this.collector.fail(tuple);
    }
}

@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

}}

当我对许多螺栓应用相同时,我的拓扑颜色变为红色(等待状态).

when I applied same for many bolts my topology colour is changing into red(wait state).

这是螺栓等待时间.第一个螺栓没有任何睡眠.我在第二个螺栓中保持 1 秒睡眠,其余所有螺栓都有 2 秒睡眠.

Here is bolts wait time.first bolt doesn't have any sleep.I kept 1 sec sleep in the second bolt and rest all the bolts are having 2 secs sleep.

如何更换睡眠来执行我的工作,或者如果我增加主管的数量,问题会得到解决吗?

How to replace that sleep to perform my work or if I Increase the number of supervisors does the problem will be solved?

推荐答案

我错过了一个点,即编写了一个 bolt 来在元组上执行一些不同的功能.我正在尝试从执行相同功能的不同 bolts 编写不同的插入查询插入来自 spout 的元组.我意识到我在螺栓上没有任何区别.所以我在一个污点中实现了所有插入查询,而不使用多个插入查询螺栓,在映射所有字段后,我只是根据需要编写了一系列插入查询(一一).

I missed a point that a bolt is written to do some different function on tuples.I'm trying to write different inserts queries from different bolts which perform same function of insert of tuples coming from spout. I realized that I'm Not making any difference in the bolts.so I implemented all the insert queries with in one blot with out using multiple insert query bolts,after mapping all the fields I just written a sequence of insert queries as I wanted(one by one).

kafka_spout 和 bolt

这篇关于当每个螺栓从同一个喷口获取数据时,如何一个接一个地执行螺栓?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-23 18:11