我有一个拓扑,其中我尝试计算SimulatorSpout(不是真正的Stream)生成的单词出现次数,然后将其写入MySQL数据库表,该表方案非常简单:

Field  |  Type        |  ...
ID     |  int(11)     |      Auto_icr
word   |  varchar(50) |
count  |  int(11)     |


但是我面临着奇怪的问题(正如我之前提到的)
我已成功将拓扑提交到由4个主管组成的Storm Cluster中,并且可以在Storm Web UI中看到拓扑的流程
(无例外),但是当我检查MySQL表时,令我惊讶的是,该表为空...

任何意见,建议都欢迎...

这是喷口和螺栓:

public class MySQLConnection {

    private static Connection conn = null;
    private static String dbUrl = "jdbc:mysql://192.168.0.2:3306/test?";
    private static String dbClass = "com.mysql.jdbc.Driver";

    public static Connection getConnection() throws SQLException, ClassNotFoundException  {

        Class.forName(dbClass);
        conn = DriverManager.getConnection(dbUrl, "root", "qwe123");
        return conn;
     }
}


============================ SentenceSpout ==================== ===========

public class SentenceSpout extends BaseRichSpout{

private static final long serialVersionUID = 1L;
private boolean _completed = false;

private SpoutOutputCollector _collector;
private String [] sentences = {
        "Obama delivered a powerfull speech against USA",
        "I like cold beverages",
        "RT http://www.turkeyairline.com Turkish Airlines has delayed some flights",
        "don't have a cow man...",
        "i don't think i like fleas"
    };

private int index = 0;


public void open (Map config, TopologyContext context, SpoutOutputCollector collector) {
    _collector = collector;
}

public void nextTuple () {

        _collector.emit(new Values(sentences[index]));
        index++;
        if (index >= sentences.length) {
            index = 0;

        Utils.waitForSeconds(1);
    }
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("sentence"));
}

public void ack(Object msgId) {
    System.out.println("OK: " + msgId);
}

public void close() {}

public void fail(Object msgId) {
    System.out.println("FAIL: " + msgId);
    }
}


============================ SplitSentenceBolt ===================== =========

public class SplitSentenceBolt extends BaseRichBolt {

private static final long serialVersionUID = 1L;
private OutputCollector _collector;

public void prepare (Map config, TopologyContext context, OutputCollector collector) {
    _collector = collector;
}

public void execute (Tuple tuple) {

    String sentence = tuple.getStringByField("sentence");
    String httpRegex = "((https?|ftp|telnet|gopher|file)):((//)|(\\\\))+[\\w\\d:#@%/;$()~_?\\+-=\\\\\\.&]*";
    sentence = sentence.replaceAll(httpRegex, "").replaceAll("RT", "").replaceAll("[.|,]", "");
    String[] words = sentence.split(" ");
    for (String word : words) {
        if (!word.isEmpty())
        _collector.emit(new Values(word.trim()));
    }
    _collector.ack(tuple);
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word"));
   }
 }


=========================== WordCountBolt ====================== ===========

public class WordCountBolt extends BaseRichBolt {
private static final long serialVersionUID = 1L;
private HashMap<String , Integer> counts = null;
private OutputCollector _collector;
private ResultSet resSet = null;
private Statement stmt = null;
private Connection _conn = null;

private String path = "/home/hduser/logOfStormTops/logger.txt";
String rLine = null;

public void prepare (Map config, TopologyContext context, OutputCollector collector) {
    counts = new HashMap<String, Integer>();
    _collector = collector;
}

public void execute (Tuple tuple) {

    int insertResult = 0;
    int updateResult = 0;

    String word = tuple.getStringByField("word");
    //----------------------------------------------------
    if (!counts.containsKey(word)) {

        counts.put(word, 1);
        try {
            insertResult = wordInsertIfNoExist(word);
            if (insertResult == 1) {
                _collector.ack(tuple);
            } else {
                _collector.fail(tuple);
            }
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }  else {
    //-----------------------------------------------
        counts.put(word, counts.get(word) + 1);

        try {
            // writing to db
            updateResult = updateCountOfExistingWord(word);
            if (updateResult == 1) {
                _collector.ack(tuple);
            } else {
                _collector.fail(tuple);
            }
            // Writing to file
            BufferedWriter buffer = new BufferedWriter(new FileWriter(path));
            buffer.write("[ " + word + " : " + counts.get("word") + " ]");
            buffer.newLine();
            buffer.flush();
            buffer.close();

        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (SQLException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
        System.out.println("{word-" + word + " : count-" + counts.get(word) + "}");
    }

}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
}

// *****************************************************

public int wordInsertIfNoExist(String word) throws ClassNotFoundException, SQLException {

    String query = "SELECT word FROM wordcount WHERE word=\"" + word + "\"";
    String insert = "INSERT INTO wordcount (word, count) VALUES (\"" + word +  "\", 1)";
    _conn = MySQLConnection.getConnection();
    stmt = _conn.createStatement();
    resSet = stmt.executeQuery(query);
    int res = 0;
    if (!resSet.next()) {

         res = stmt.executeUpdate(insert);

    } else {
        System.out.println("Yangi qiymatni kirityotrganda nimadir sodir bo'ldi");
    }
    resSet.close();
    stmt.close();
    _conn.close();
    return res;
}

public int updateCountOfExistingWord(String word) throws ClassNotFoundException, SQLException {

    String update = "UPDATE wordcount SET count=count+1 WHERE word=\"" + word + "\"";
    _conn = MySQLConnection.getConnection();
    stmt = _conn.createStatement();
    int result = stmt.executeUpdate(update);

        //System.out.println(word + "'s count has been updated (incremented)");

    resSet.close();
    stmt.close();
    _conn.close();
    return result;
    }
  }


========================= WordCountTopology ======================== ======

public class WordCountTopology {
private static final String SENTENCE_SPOUT_ID = "sentence-spout";
private static final String SPLIT_BOLT_ID = "split-bolt";
private static final String COUNT_BOLT_ID = "count-bolt";
private static final String TOPOLOGY_NAME = "NewWordCountTopology";

@SuppressWarnings("static-access")
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {

SentenceSpout spout = new SentenceSpout();
SplitSentenceBolt splitBolt = new SplitSentenceBolt();
WordCountBolt countBolt = new WordCountBolt();

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout(SENTENCE_SPOUT_ID, spout, 2);

builder.setBolt(SPLIT_BOLT_ID, splitBolt, 4).shuffleGrouping(SENTENCE_SPOUT_ID);

builder.setBolt(COUNT_BOLT_ID, countBolt, 4).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));

Config config = new Config();
config.setMaxSpoutPending(100);
config.setDebug(true);

StormSubmitter submitter = new StormSubmitter();

submitter.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());

   }
}

最佳答案

这是因为在引发异常时未调用_collector.ack(tuple)。当待处理的元组太多时,spout将停止发送新的元组。尝试抛出RuntimeException而不是printStackTrace。

关于java - 我的Storm拓扑既不工作(不生成输出)也不失败(不生成错误或异常),我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/31378092/

10-10 23:20