我正在尝试在Spark中使用org.slf4j.Logger。如果我写如下,我将得到错误non-static field cannot be referenced from a static context。因为方法main是静态的,但logger是非静态的。

import org.apache.spark.api.java.*;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.broadcast.Broadcast;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class simpleApp {
    private final Logger logger = LoggerFactory.getLogger(getClass());

    public static void main(String[] args) {
        String logFile = "/user/beibei/zhaokai/spark_java/a.txt"; // Should be some file on your system
        SparkConf conf = new SparkConf().setAppName("Simple Application");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> logData = sc.textFile(logFile).cache();

        logger.info("loading graph from cache");

        long numAs = logData.filter(new Function<String, Boolean>() {
            public Boolean call(String s) { return s.contains("a"); }
        }).count();

        long numBs = logData.filter(new Function<String, Boolean>() {
            public Boolean call(String s) { return s.contains("t"); }
        }).count();

        System.out.println("Lines with a: " + numAs + ", lines with t: " + numBs);
    }
}


但是,如果我这样写的话。我会得到另一个


  错误“主”线程org.apache.spark.SparkException中的异常:任务
  不可序列化。


因为类simpleApp的对象不可序列化。

import org.apache.spark.api.java.*;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.broadcast.Broadcast;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class simpleApp {
    private final Logger logger = LoggerFactory.getLogger(getClass());

    public static void main(String[] args) {
        new simpleApp().start();
    }

    private void start() {
        String logFile = "/path/a.txt"; // Should be some file on your system
        SparkConf conf = new SparkConf().setAppName("Simple Application");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> logData = sc.textFile(logFile).cache();

        logger.info("loading graph from cache");

        long numAs = logData.filter(new Function<String, Boolean>() {
            public Boolean call(String s) { return s.contains("a"); }
        }).count();

        long numBs = logData.filter(new Function<String, Boolean>() {
            public Boolean call(String s) { return s.contains("t"); }
        }).count();

        System.out.println("Lines with a: " + numAs + ", lines with t: " + numBs);
    }
}


那我该怎么办?
如果我想使用诸如org.slf4j.Logger之类的其他软件包,是否会遇到相同的问题?

最佳答案

可能有几个可用的选项。...我会提供spark(> = 2.2版本的spark)提供的org.apache.spark.internal.Logging

Doc说:

/**
 * Utility trait for classes that want to log data. Creates a SLF4J logger for the class and allows
 * logging messages at different levels using methods that only evaluate parameters lazily if the
 * log level is enabled.
 */




private def isLog4j12(): Boolean = {
// This distinguishes the log4j 1.2 binding, currently
// org.slf4j.impl.Log4jLoggerFactory, from the log4j 2.0 binding, currently
// org.apache.logging.slf4j.Log4jLoggerFactory
val binderClass = StaticLoggerBinder.getSingleton.getLoggerFactoryClassStr
"org.slf4j.impl.Log4jLoggerFactory".equals(binderClass)
 }


如果您想不使用spark提供的api自己做同样的事情,则可以模仿同样的事情。


  注意:在上述方法中。要调整日志记录级别,请使用
  sc.setLogLevel(newLevel)。对于SparkR,请使用setLogLevel(newLevel)。


还可以看看:apache-spark-logging-within-scala

08-06 05:57
查看更多