我正在实现一个KSQL UDF,它将检测给定的(纬度,经度)对是否位于地理围栏内(形成多边形的许多纬度对)。我从该项目(https://github.com/gschmutz/various-demos/tree/master/kafka-geofencing)中获得了启发,并编写了自己的JAVA类(见下文),但该类所需要的功能有限。

import org.geotools.geometry.jts.JTSFactoryFinder;
import org.locationtech.jts.geom.Coordinate;
import org.locationtech.jts.geom.GeometryFactory;
import org.locationtech.jts.geom.Point;
import org.locationtech.jts.geom.Polygon;
import org.locationtech.jts.io.ParseException;
import org.locationtech.jts.io.WKTReader;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;

@UdfDescription(
    name = "geofence",
    description = "Determines if a lat/long is inside or outside the geometry passed as the third parameter as a WKT coded string",
    version = "0.1.0",
    author = "xyz"
)

public class GeoFence {
    private static GeometryFactory geometryFactory = JTSFactoryFinder.getGeometryFactory();
    private static WKTReader wktReader = new WKTReader(geometryFactory);

    /**
     * Determines if a lat/long is inside or outside the geometry passed as the third parameter as a WKT coded string.
     * Returns either INSIDE or OUSIDE.
     * @param latitude
     * @param longitude
     * @param geometryWKT
     * @return
     */
    @Udf(description = "Determines if a lat/long is inside or outside the geometry passed as the third parameter as a WKT coded string")
    public static boolean geofence(final double latitude, final double longitude, String geometryWKT) {

        boolean status = false;

        Polygon polygon = null;
        try {
            polygon = (Polygon) wktReader.read(geometryWKT);

            // However, an important point to note is that the longitude is the X value
            // and the latitude the Y value. So we say "lat/long",
            // but JTS will expect it in the order "long/lat".
            Coordinate coord = new Coordinate(longitude, latitude);
            Point point = geometryFactory.createPoint(coord);

            status = point.within(polygon);

        } catch (ParseException e) {
            throw new RuntimeException(e.getMessage());
        }
        return status;
    }
}


下面是我的POM.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.trivadis.sample.geofencing</groupId>
    <artifactId>geo-utils</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <description>
         Geo-Processing Utilities
    </description>

    <repositories>
        <repository>
            <id>confluent</id>
            <url>https://packages.confluent.io/maven/</url>
        </repository>
        <repository>
            <id>osgeo</id>
            <name>Open Source Geospatial Foundation Repository</name>
            <url>http://download.osgeo.org/webdav/geotools/</url>
        </repository>

        <repository>
          <snapshots>
            <enabled>true</enabled>
          </snapshots>
          <id>boundless</id>
          <name>Boundless Maven Repository</name>
          <url>http://repo.boundlessgeo.com/main</url>
        </repository>

        <repository>
            <id>dev-azure-com-se-innovationprojects-barge-tracking</id>
            <url>https://pkgs.dev.azure.com/SE-InnovationProjects/_packaging/barge-tracking/maven/v1</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>

    <pluginRepositories>
        <pluginRepository>
            <id>confluent</id>
            <url>https://packages.confluent.io/maven/</url>
        </pluginRepository>
    </pluginRepositories>

    <distributionManagement>
        <repository>
            <id>dev-azure-com-se-innovationprojects-barge-tracking</id>
            <url>https://pkgs.dev.azure.com/SE-InnovationProjects/_packaging/barge-tracking/maven/v1</url>

        </repository>
    </distributionManagement>

    <!-- Other properties such as kafka.version are derived from parent project(s) such as
         https://github.com/confluentinc/common (see common's pom.xml for kafka.version).
    -->
    <properties>
        <guava.version>24.1.1-jre</guava.version>
        <geotools.version>23-SNAPSHOT</geotools.version>
        <geohash.version>1.3.0</geohash.version>
        <ksql.version>5.4.0</ksql.version>
        <docker.skip-build>false</docker.skip-build>
        <docker.skip-test>false</docker.skip-test>
        <java.version>1.8</java.version>
        <maven.shade.version>3.2.1</maven.shade.version>
        <!-- JUnit 5 requires Surefire version 2.22.1 or higher -->
        <maven.surefire.version>2.22.1</maven.surefire.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>

        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>${guava.version}</version>
        </dependency>

        <dependency>
            <groupId>com.github.davidmoten</groupId>
            <artifactId>geo</artifactId>
            <version>0.7.1</version>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>io.lettuce</groupId>
            <artifactId>lettuce-core</artifactId>
            <version>5.1.7.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.geotools</groupId>
            <artifactId>gt-shapefile</artifactId>
            <version>${geotools.version}</version>
        </dependency>
        <dependency>
            <groupId>org.geotools</groupId>
            <artifactId>gt-main</artifactId>
            <version>${geotools.version}</version>
        </dependency>
        <dependency>
            <groupId>org.geotools</groupId>
            <artifactId>gt-epsg-hsql</artifactId>
            <version>${geotools.version}</version>
        </dependency>
          <dependency>
            <groupId>io.confluent.ksql</groupId>
            <artifactId>ksql-udf</artifactId>
            <version>${ksql.version}</version>
        </dependency>
    </dependencies>

    <build>

        <plugins>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.3</version>
                <inherited>true</inherited>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                    <compilerArgs>
                        <arg>-Xlint:all</arg>
                        <!--TODO: enable this once we have warnings under control<arg>-Werror</arg>-->
                    </compilerArgs>
                </configuration>
            </plugin>
             <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>${maven.shade.version}</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>${maven.surefire.version}</version>
            </plugin>

        </plugins>
    </build>

</project>


当我构建Maven项目时,将成功创建一个jar。当我尝试使用命令“ confluent local start ksql-server”重新启动Confluent KSQL-Server时,KSQL-Server无法启动。当我使用命令“ confluent log local ksql-server”检查KSQL-Server日志时,看到以下消息:

(io.confluent.ksql.rest.server.KsqlServerMain:61)
io.github.lukehutch.fastclasspathscanner.MatchProcessorException: java.lang.NoClassDefFoundError: org/locationtech/jts/io/ParseException
        at io.github.lukehutch.fastclasspathscanner.MatchProcessorException.newInstance(MatchProcessorException.java:81)
        at io.github.lukehutch.fastclasspathscanner.scanner.ScanSpec.callMatchProcessors(ScanSpec.java:757)
        at io.github.lukehutch.fastclasspathscanner.FastClasspathScanner.scan(FastClasspathScanner.java:1606)
        at io.github.lukehutch.fastclasspathscanner.FastClasspathScanner.scan(FastClasspathScanner.java:1678)
        at io.github.lukehutch.fastclasspathscanner.FastClasspathScanner.scan(FastClasspathScanner.java:1704)
        at io.confluent.ksql.function.UserFunctionLoader.loadFunctions(UserFunctionLoader.java:135)
        at io.confluent.ksql.function.UserFunctionLoader.lambda$load$2(UserFunctionLoader.java:97)
        at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
        at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
        at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
        at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)



我编写了另一个简单的UDF,它在KSQL-Server中可以正常运行并正常工作。任何人都可以帮忙弄清楚此UDF有什么问题吗?

最佳答案

有两个问题:
1.我使用的是mvn clean install,但执行mvn clean package可以正确导出uber-jar。
2. pom.xml中列出的某些依赖项是已签名的jar,因此当将此UDF jar上传到ksql-server时,它们将引发Java Security Exception。遵循以下提到的链接上提供的解决方案,也帮助我解决了此问题。
[What is the maven-shade-plugin used for, and why would you want to relocate Java packages?
[http://zhentao-li.blogspot.com/2012/06/maven-shade-plugin-invalid-signature.html]

09-27 14:54