在我的项目中,crawler的maven原型似乎与warc模块配合不好。当前,它仅创建名称为“ crawl-20180802121925-00000.warc.gz”的空0字节文件。我在这里想念什么吗?
我尝试通过创建一个默认项目来启用warc编写,如下所示:
mvn archetype:generate -DarchetypeGroupId=com.digitalpebble.stormcrawler -DarchetypeArtifactId=storm-crawler-archetype -DarchetypeVersion=1.10
然后像这样将依赖项添加到pom.xml中的warc模块中
<dependency>
<groupId>com.digitalpebble.stormcrawler</groupId>
<artifactId>storm-crawler-warc</artifactId>
<version>1.10</version>
</dependency>
然后,在尝试写入本地文件系统目录的同时,将WARCHdfsBolt添加到fetch分组中。
public class CrawlTopology extends ConfigurableTopology {
public static void main(String[] args) throws Exception {
ConfigurableTopology.start(new CrawlTopology(), args);
}
@Override
protected int run(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
String[] testURLs = new String[] { "http://www.lequipe.fr/",
"http://www.lemonde.fr/", "http://www.bbc.co.uk/",
"http://storm.apache.org/", "http://digitalpebble.com/" };
builder.setSpout("spout", new MemorySpout(testURLs));
builder.setBolt("partitioner", new URLPartitionerBolt())
.shuffleGrouping("spout");
builder.setBolt("fetch", new FetcherBolt())
.fieldsGrouping("partitioner", new Fields("key"));
builder.setBolt("warc", getWarcBolt())
.localOrShuffleGrouping("fetch");
builder.setBolt("sitemap", new SiteMapParserBolt())
.localOrShuffleGrouping("fetch");
builder.setBolt("feeds", new FeedParserBolt())
.localOrShuffleGrouping("sitemap");
builder.setBolt("parse", new JSoupParserBolt())
.localOrShuffleGrouping("feeds");
builder.setBolt("index", new StdOutIndexer())
.localOrShuffleGrouping("parse");
Fields furl = new Fields("url");
// can also use MemoryStatusUpdater for simple recursive crawls
builder.setBolt("status", new StdOutStatusUpdater())
.fieldsGrouping("fetch", Constants.StatusStreamName, furl)
.fieldsGrouping("sitemap", Constants.StatusStreamName, furl)
.fieldsGrouping("feeds", Constants.StatusStreamName, furl)
.fieldsGrouping("parse", Constants.StatusStreamName, furl)
.fieldsGrouping("index", Constants.StatusStreamName, furl);
return submit("crawl", conf, builder);
}
private WARCHdfsBolt getWarcBolt() {
String warcFilePath = "/Users/user/Documents/workspace/test/warc";
FileNameFormat fileNameFormat = new WARCFileNameFormat()
.withPath(warcFilePath);
Map<String,String> fields = new HashMap<>();
fields.put("software:", "StormCrawler 1.0 http://stormcrawler.net/");
fields.put("conformsTo:", "http://www.archive.org/documents/WarcFileFormat-1.0.html");
WARCHdfsBolt warcbolt = (WARCHdfsBolt) new WARCHdfsBolt()
.withFileNameFormat(fileNameFormat);
warcbolt.withHeader(fields);
// can specify the filesystem - will use the local FS by default
// String fsURL = "hdfs://localhost:9000";
// warcbolt.withFsUrl(fsURL);
// a custom max length can be specified - 1 GB will be used as a default
FileSizeRotationPolicy rotpol = new FileSizeRotationPolicy(50.0f,
FileSizeRotationPolicy.Units.MB);
warcbolt.withRotationPolicy(rotpol);
return warcbolt;
}
}
无论我是否在本地运行(有或没有助焊剂),似乎都没有什么不同。您可以在此处查看演示仓库:https://github.com/keyboardsamurai/storm-test-warc
最佳答案
感谢您的询问。从理论上讲,内容会在以下情况下写入WARC文件:
同步策略which we have by default at 10 tuples中设置了明确的同步
有一个自动的,通过记号元组every 15 secs by default发生
旋转文件-在您的情况下,应在内容达到50MB时发生
由于您用作起点的拓扑不是递归的,并且不会处理5个以上的URL,因此永远不会满足条件1和3。
您可以通过使用
builder.setBolt("status", new MemoryStatusUpdater())
代替。这样,新的URL将被连续处理。或者,您可以添加
warcbolt.withSyncPolicy(new CountSyncPolicy(1));
到您的代码,以便在每个元组之后触发同步。实际上,您不需要在URL不断出现的真实爬网中执行此操作。
现在,很奇怪的是,无论同步是由条件1还是条件2触发的,我都看不到文件的任何更改,并且它保持为0字节。版本1.8并非如此
<dependency>
<groupId>com.digitalpebble.stormcrawler</groupId>
<artifactId>storm-crawler-warc</artifactId>
<version>1.8</version>
</dependency>
因此可能是由于之后的代码更改。
我知道有些用户一直依赖FileTimeSizeRotationPolicy,它可以基于时间触发上述条件3。
随时在Github上打开一个问题,我会仔细研究一下(下个月我回来的时候)。
编辑:存在一个压缩条目的错误,该错误现已得到修复,将成为下一个SC版本的一部分。
请参阅OP对issue发表的评论。