本文介绍了influxdb-java:org.influxdb.InfluxDBIOException:java.net.SocketException:对等重置连接:套接字写入错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在测试InfluxDB以存储传感器时间序列.我正在使用 influxdb-java 客户端库(2.15版),并且正在运行InfluxDB1.7.6用于测试目的.

I'm testing InfluxDB to store sensor time series. I'm using the influxdb-java client library (version 2.15) and I'm running InfluxDB 1.7.6 locally for test purpose.

我所有的点都存储了.csv文件(每个传感器一个),这些文件本身也存储在.zip文件中(每个数据集一个).我的代码遍历每个csv文件的每一行.点以批处理模式编写.

All my points are stored .csv files (one per sensor) which are themselves stored in .zip files (one per dataset). My code run through each line of each csv files. Points are written in batch mode.

/**
 * Get the connection to the database
 */
InfluxDB influxDB = InfluxDBFactory.connect("http://192.168.51.51:8086");
influxDB.query(new Query("CREATE DATABASE theia_in_situ"));
influxDB.setDatabase("theia_in_situ");
influxDB.enableBatch();
influxDB.setLogLevel(InfluxDB.LogLevel.BASIC);
/**
 * Create batch point to write each measure of the time serie more efficiently
 */
BatchPoints batchPoints = BatchPoints
        .database("theia_in_situ")
        .build();

对于每个CSV数据文件,执行以下方法:

For each CSV data file the following method is executed:

public static void createAndImportTimeSeriesDocuments(InputStream txtFileIn, String observationId, String producerId,
        InfluxDB influxDB, BatchPoints batchPoints) throws IOException, ParseException {
    DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
    /**
     * Store the variable name
     */
    String observedProperty = null;
    try (BufferedReader br = new BufferedReader(new InputStreamReader(txtFileIn));) {
        String line = null;
        /**
         * Read the headers
         */
        while ((line = br.readLine()).substring(0, 1).equals("#")) {
            if (line.substring(0, 15).equals("#Variable_name;")) {
                observedProperty = line.split(";")[1];
            }
        }
        /**
         * Read the data
         */
        while ((line = br.readLine()) != null) {
            String[] lineSplitted = line.split(";", -1);
            Point point = Point.measurement(observedProperty)
                    .tag("producerId", producerId)
                    .tag("observationId", observationId)
                    .time(df.parse(lineSplitted[1]).getTime(), TimeUnit.MILLISECONDS)
                    .addField("value", lineSplitted[5])
                    .addField("flag", lineSplitted[6])
                    .build();
            batchPoints.point(point);
        }
        influxDB.write(batchPoints);
    }
}

我可以写一个或几个测量值,但是很快我得到了以下异常:

I can write one or few measurement but soon enough I get the following exception:

我已经禁用了最大并发写入限制,最大入队写入限制,入队写入超时(在/etc/influxdb/influxdb.conf 中将每个值设置为0),如此处.即使在Github页面中将此问题作为FAQ提及,我也找不到任何重现我问题的问题.

I have already disabled max-concurrent-write-limit, max-enqueued-write-limit, enqueued-write-timeout (setting each value to 0 in /etc/influxdb/influxdb.conf) as mentionned here.Even though this issue is mentionned as FAQ in the Github page, I can't find any issue reproducing my problem.

任何帮助将不胜感激.

推荐答案

尝试以批处理方式编写 BatchPoint 时,似乎发生此异常.

This exception seems to occur when trying to write BatchPoint in batch mode.

这是更新的代码.

/**
 * Read the data
 */
while ((line = br.readLine()) != null) {
    String[] lineSplitted = line.split(";", -1);
    Point point = Point.measurement(observedProperty)
            .tag("producerId", producerId)
            .tag("observationId", observationId)
            .time(df.parse(lineSplitted[1]).getTime(), TimeUnit.MILLISECONDS)
            .addField("value", lineSplitted[5])
            .addField("flag", lineSplitted[6])
            .build();
    influxDB.write(point);
  //  batchPoints.point(point);
}
//influxDB.write(batchPoints);

这篇关于influxdb-java:org.influxdb.InfluxDBIOException:java.net.SocketException:对等重置连接:套接字写入错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-01 20:14