我们能够成功地将drools与spark集成在一起,当我们尝试应用Drools中的规则时,我们能够对HDFS中存在的Batch文件进行处理,但是我们尝试将drools用于流式文件以便我们可以立即做出决策,但是我们不知道该怎么做,下面是我们想要实现的代码片段。
案例1:

    SparkConf conf = new SparkConf().setAppName("sample");
    JavaSparkContext sc = new JavaSparkContext(conf);

    JavaRDD<String> javaRDD = sc.textFile("/user/root/spark/sample.dat");
    List<String> store = new ArrayList<String>();
    store = javaRDD.collect();

情况2:使用流上下文时的
SparkConf sparkconf = new SparkConf().setAppName("sparkstreaming");
    JavaStreamingContext ssc =
              new JavaStreamingContext(sparkconf, new Duration(1));

    JavaDStream<String> lines = ssc.socketTextStream("xx.xx.xx.xx", xxxx);

在第一种情况下,我们可以将规则应用于变量存储,但是在第二种情况下,我们不能将规则应用于dstream行。

如果有人有想法,如何实现将是很大的帮助。

最佳答案

这是完成它的一种方法。

  • 首先使用业务规则创建知识 session 。
    //Create knowledge and session here
    KnowledgeBase kbase = KnowledgeBaseFactory.newKnowledgeBase();
    KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
    kbuilder.add( ResourceFactory.newFileResource( "rulefile.drl"),
            ResourceType.DRL );
    Collection<KnowledgePackage> pkgs = kbuilder.getKnowledgePackages();
    kbase.addKnowledgePackages( pkgs );
    final StatelessKnowledgeSession ksession = kbase.newStatelessKnowledgeSession();
    
  • 使用StreamingContext创建JavaDStream。
    SparkConf sparkconf = new SparkConf().setAppName("sparkstreaming");
    JavaStreamingContext ssc =
              new JavaStreamingContext(sparkconf, new Duration(1));
    JavaDStream<String> lines = ssc.socketTextStream("xx.xx.xx.xx", xxxx);
    
  • 调用DStream的foreachRDD创建事实并触发您的规则。
    lines.foreachRDD(new Function<JavaRDD<String>, Void>() {
      @Override
      public Void call(JavaRDD<String> rdd) throws Exception {
         List<String> facts = rdd.collect();
         //Apply rules on facts here
         ksession.execute(facts);
         return null;
      }
    });
    
  • 关于java - Drools在Spark中用于流式传输文件,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/28407742/

    10-12 13:54