我在3节点中运行独立的ksql-server
,与3节点的Kafka
群集通信。从具有15个分区的Stream
创建了一个Topic
,数据在Stream中进行了一些扩充。以UDF
的形式获取一段代码,以查找IP2Location.bin文件,而UDF
类如下所示:
import java.io.IOException;
import java.util.Map;
import com.google.gson.Gson;
import io.confluent.common.Configurable;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
@UdfDescription(name = "Ip2Lookup", description = "Lookup class for IP2Location database.")
public class Ip2Lookup implements Configurable {
private IP2Location loc = null;
private Gson gson = null;
@Udf(description = "fetches the geoloc of the ipaddress.")
public synchronized String ip2lookup(String ip) {
String json = null;
if (loc != null) {
IP2LocationResult result = null;
try {
result = loc.query(ip);
System.out.println(result);
json = gson.toJson(result);
} catch (IOException e) {
e.printStackTrace();
}
return json;
}
return ip;
}
@Override
public void configure(Map<String, ?> arg0) {
try {
String db_path = null;
String os = System.getProperty("os.name").toLowerCase();
db_path = "/data/md0/ip2loc/ipv4-bin/IP-COUNTRY-REGION-CITY-LATITUDE-LONGITUDE-ZIPCODE-TIMEZONE-ISP-DOMAIN-NETSPEED-AREACODE-WEATHER-MOBILE-ELEVATION-USAGETYPE.BIN";
loc = new IP2Location(db_path);
gson = new Gson();
} catch (IOException e) {
e.printStackTrace();
}
}
}
进入
Topic
和Stream
的数据非常快(每秒可能有一百万条记录)。在方法上使用synchronized
时,每个ksql-server
节点中的速度为每秒3000条记录/消息。以这种速度,要赶上速度需要花费的时间。没有synchronized
方法,我看到损坏的数据,因为单个对象/方法被多个线程使用。问题1:
udf
调用究竟如何被KSQL调用/调用?问题2:我可以使用线程处理
udf
中的请求吗?Question3:由于Topic / Stream由15个分区组成,我应该提升
ksql-servers
的15个节点吗?谢谢。
最佳答案
问题1:KSQL如何准确地调用udf调用?
不明白你的意思。将您的UDF提供给KSQL使用后(请参见https://docs.confluent.io/current/ksql/docs/developer-guide/udf.html#deploying),您可以在KSQL语句中将UDF称为IP2LOOKUP
。您也可以在KSQL中运行SHOW FUNCTIONS
以确认您的UDF可供使用。
也许您是因为下一个问题而问? KSQL将一次给您的UDF一条消息。
问题2:我可以使用线程来处理udf中的请求吗?
你为什么想这么做?您是否担心KSQL和您当前的UDF代码无法处理传入的数据量?说到其中,您可能要处理的预期数据量是多少,因为您可能正在尝试进行过早的优化?
另外,在不了解更多详细信息的情况下,我认为您的UDF的多线程设置不会产生任何优势,因为UDF在被调用时仍一次只能处理一条消息(对于每个KSQL服务器,或更准确地说,每个流任务,其中每个KSQL服务器可以有很多;我提到这一点是为了清楚地表明,KSQL中的UDF不会通过在所有服务器上仅处理一条消息来限制您的处理;当然,该处理是分布式的,并且在平行)。
Question3:由于Topic / Stream由15个分区组成,我应该启动ksql-servers的15个节点吗?
这取决于您的数据量。您可以根据需要旋转任意数量的KSQL服务器。如果数据量很少,则单个KSQL服务器可能就足够了。如果数据量更大,则可以开始启动最多15台服务器的其他KSQL服务器(因为输入主题具有15个分区)。任何其他KSQL服务器都将处于空闲状态。
在15个KSQL服务器不够用的情况下,应将输入主题的分区数量从15个增加到一个更大的数量,然后还可以启动更多个KSQL服务器(从而增加设置的计算能力) )。