本文介绍了卡桑德拉(Cassandra)放慢速度,增加节点的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在AWS上建立了一个Cassandra集群.我想要获得的是随着添加更多节点(如广告所示)而增加的I/O吞吐量(每秒读/写数).但是,我恰恰相反.添加新节点会降低性能.

I set up a Cassandra cluster on AWS. What I want to get is increased I/O throughput (number of reads/writes per second) as more nodes are added (as advertised). However, I got exactly the opposite. The performance is reduced as new nodes are added.

您知道阻止其扩展的任何典型问题吗?

Do you know any typical issues that prevents it from scaling?

以下是一些详细信息:

我正在将文本文件(15MB)添加到列族.每行都是一条记录.有150000条记录.当有1个节点时,大约需要90秒才能写入.但是,如果有2个节点,则需要120秒.我可以看到数据分散到2个节点.但是,吞吐量没有增加.

I am adding a text file (15MB) to the column family. Each line is a record. There are 150000 records. When there is 1 node, it takes about 90 seconds to write. But when there are 2 nodes, it takes 120 seconds. I can see the data is spread to 2 nodes. However, there is no increase in throughput.

源代码如下:

public class WordGenCAS {
static final String KEYSPACE = "text_ks";
static final String COLUMN_FAMILY = "text_table";
static final String COLUMN_NAME = "text_col";

public static void main(String[] args) throws Exception {
    if (args.length < 2) {
        System.out.println("Usage: WordGenCAS <input file> <host1,host2,...>");
        System.exit(-1);
    }

    String[] contactPts = args[1].split(",");

    Cluster cluster = Cluster.builder()
            .addContactPoints(contactPts)
            .build();
    Session session = cluster.connect(KEYSPACE);

    InputStream fis = new FileInputStream(args[0]);
    InputStreamReader in = new InputStreamReader(fis, "UTF-8");
    BufferedReader br = new BufferedReader(in);

    String line;
    int lineCount = 0;
    while ( (line = br.readLine()) != null) {
        line = line.replaceAll("'", " ");
        line = line.trim();
        if (line.isEmpty())
            continue;
        System.out.println("[" + line + "]");
        String cqlStatement2 = String.format("insert into %s (id, %s) values (%d, '%s');",
                COLUMN_FAMILY,
                COLUMN_NAME,
                lineCount,
                line);
        session.execute(cqlStatement2);
        lineCount++;
    }

    System.out.println("Total lines written: " + lineCount);
}

}

数据库架构如下:

CREATE KEYSPACE text_ks WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 2 };

USE text_ks;

CREATE TABLE text_table (
    id int,
    text_col text,
    primary key (id)
) WITH COMPACT STORAGE;

谢谢!

推荐答案

即使这是一篇老文章,我也认为有必要针对这些(常见)问题发布解决方案.

Even if this an old post, I think it's worth posting a solution for these (common) kind of problems.

您已经发现,使用 serial 过程加载数据的速度很慢.建议您做正确的事.

As you've already discovered, loading data with a serial procedure is slow. What you've been suggested is the right thing to do.

但是,在不施加某种反向压力的情况下发出大量查询很可能会引起麻烦,并且由于服务器(和驱动程序在某种程度上)的过度过载,您将丢失数据.

However, issuing a lot of queries without applying some sort of back pressure is likely looking for troubles, and you'll gonna lose data due to excessive overload on the server (and on the driver to some extent).

此解决方案将通过异步调用加载数据,并将尝试对客户端施加一些反压力以避免数据丢失.

This solution will load data with async calls, and will try to apply some back pressure on the client to avoid data loss.

public class WordGenCAS {
    static final String KEYSPACE = "text_ks";
    static final String COLUMN_FAMILY = "text_table";
    static final String COLUMN_NAME = "text_col";

    public static void main(String[] args) throws Exception {
        if (args.length < 2) {
            System.out.println("Usage: WordGenCAS <input file> <host1,host2,...>");
            System.exit(-1);
        }

        String[] contactPts = args[1].split(",");

        Cluster cluster = Cluster.builder()
                .addContactPoints(contactPts)
                .build();
        Session session = cluster.connect(KEYSPACE);

        InputStream fis = new FileInputStream(args[0]);
        InputStreamReader in = new InputStreamReader(fis, "UTF-8");
        BufferedReader br = new BufferedReader(in);

        String line;
        int lineCount = 0;

        // This is the futures list of our queries
        List<Future<ResultSet>> futures = new ArrayList<>();

        // Loop
        while ( (line = br.readLine()) != null) {
            line = line.replaceAll("'", " ");
            line = line.trim();
            if (line.isEmpty())
                continue;
            System.out.println("[" + line + "]");
            String cqlStatement2 = String.format("insert into %s (id, %s) values (%d, '%s');",
                    COLUMN_FAMILY,
                    COLUMN_NAME,
                    lineCount,
                    line);
            lineCount++;

            // Add the "future" returned by async method the to the list
            futures.add(session.executeAsync(cqlStatement2));

            // Apply some backpressure if we issued more than X query.
            // Change X to another value suitable for your cluster
            while (futures.size() > 1000) {
                Future<ResultSet> future = futures.remove(0);
                try {
                    future.get();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

        System.out.println("Total lines written: " + lineCount);
        System.out.println("Waiting for writes to complete...");

        // Wait until all writes are done.
        while (futures.size() > 0) {
            Future<ResultSet> future = futures.remove(0);
            try {
                future.get();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        System.out.println("Done!");
    }
}

这篇关于卡桑德拉(Cassandra)放慢速度,增加节点的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-23 01:17