我在3节点中运行独立的ksql-server,与3节点的Kafka群集通信。从具有15个分区的Stream创建了一个Topic,数据在Stream中进行了一些扩充。以UDF的形式获取一段代码,以查找IP2Location.bin文件,而UDF类如下所示:

import java.io.IOException;
import java.util.Map;

import com.google.gson.Gson;

import io.confluent.common.Configurable;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;

@UdfDescription(name = "Ip2Lookup", description = "Lookup class for IP2Location database.")
public class Ip2Lookup implements Configurable {

    private IP2Location loc = null;
    private Gson gson = null;

    @Udf(description = "fetches the geoloc of the ipaddress.")
    public synchronized String ip2lookup(String ip) {

        String json = null;
        if (loc != null) {
            IP2LocationResult result = null;
            try {
                result = loc.query(ip);
                System.out.println(result);
                json = gson.toJson(result);
            } catch (IOException e) {
                e.printStackTrace();
            }
            return json;
        }
        return ip;
    }

    @Override
    public void configure(Map<String, ?> arg0) {

        try {
            String db_path = null;
            String os = System.getProperty("os.name").toLowerCase();

            db_path = "/data/md0/ip2loc/ipv4-bin/IP-COUNTRY-REGION-CITY-LATITUDE-LONGITUDE-ZIPCODE-TIMEZONE-ISP-DOMAIN-NETSPEED-AREACODE-WEATHER-MOBILE-ELEVATION-USAGETYPE.BIN";

            loc = new IP2Location(db_path);
            gson = new Gson();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}


进入TopicStream的数据非常快(每秒可能有一百万条记录)。在方法上使用synchronized时,每个ksql-server节点中的速度为每秒3000条记录/消息。以这种速度,要赶上速度需要花费的时间。没有synchronized方法,我看到损坏的数据,因为单个对象/方法被多个线程使用。

问题1:udf调用究竟如何被KSQL调用/调用?

问题2:我可以使用线程处理udf中的请求吗?

Question3:由于Topic / Stream由15个分区组成,我应该提升ksql-servers的15个节点吗?

谢谢。

最佳答案

问题1:KSQL如何准确地调用udf调用?


不明白你的意思。将您的UDF提供给KSQL使用后(请参见https://docs.confluent.io/current/ksql/docs/developer-guide/udf.html#deploying),您可以在KSQL语句中将UDF称为IP2LOOKUP。您也可以在KSQL中运行SHOW FUNCTIONS以确认您的UDF可供使用。

也许您是因为下一个问题而问? KSQL将一次给您的UDF一条消息。


  问题2:我可以使用线程来处理udf中的请求吗?


你为什么想这么做?您是否担心KSQL和您当前的UDF代码无法处理传入的数据量?说到其中,您可能要处理的预期数据量是多少,因为您可能正在尝试进行过早的优化?

另外,在不了解更多详细信息的情况下,我认为您的UDF的多线程设置不会产生任何优势,因为UDF在被调用时仍一次只能处理一条消息(对于每个KSQL服务器,或更准确地说,每个流任务,其中每个KSQL服务器可以有很多;我提到这一点是为了清楚地表明,KSQL中的UDF不会通过在所有服务器上仅处理一条消息来限制您的处理;当然,该处理是分布式的,并且在平行)。


  Question3:由于Topic / Stream由15个分区组成,我应该启动ksql-servers的15个节点吗?


这取决于您的数据量。您可以根据需要旋转任意数量的KSQL服务器。如果数据量很少,则单个KSQL服务器可能就足够了。如果数据量更大,则可以开始启动最多15台服务器的其他KSQL服务器(因为输入主题具有15个分区)。任何其他KSQL服务器都将处于空闲状态。

在15个KSQL服务器不够用的情况下,应将输入主题的分区数量从15个增加到一个更大的数量,然后还可以启动更多个KSQL服务器(从而增加设置的计算能力) )。

10-06 03:03