为了进行测试,我设法运行了嵌入式独立脉冲星服务器和客户端。我也可以发送和接收消息。但是,我实际上想要(集成)测试功能(实现org.apache.pulsar.functions.api.Function)。如何在嵌入式设置中注册功能?

package kic.data.stream.pulsar

import groovy.util.logging.Log
import org.apache.pulsar.PulsarStandalone
import org.apache.pulsar.PulsarStandaloneBuilder
import org.apache.pulsar.broker.PulsarService
import org.apache.pulsar.broker.ServiceConfiguration
import org.apache.pulsar.client.api.Consumer
import org.apache.pulsar.client.api.ConsumerEventListener
import org.apache.pulsar.client.api.Message
import org.apache.pulsar.client.api.Producer
import org.apache.pulsar.client.api.PulsarClient
import org.apache.pulsar.client.api.Schema
import org.apache.pulsar.client.api.SubscriptionType
import spock.lang.Specification

import java.util.concurrent.TimeUnit

@Log
class PulsarEmbeddedTest extends Specification {

    static final String TOPIC = "hello";
    static final int NUM_OF_MESSAGES = 100;
    static PulsarStandalone standalone
    static PulsarService pulsarService

    def setupSpec() {
        def configFile = new File(ClassLoader.getSystemResource("broker.conf").toURI()).getAbsolutePath()
        def conf = new ServiceConfiguration(clusterName: "test-cluster", zookeeperServers: "localhost:2184")
        log.info("${PulsarStandalone.properties}")
        standalone = PulsarStandaloneBuilder.instance()
                                            .withConfig(conf)
                                            .withNoStreamStorage(true)
                                            .build()
        standalone.configFile = configFile
        standalone.start()
        pulsarService = new PulsarService(conf)
    }

    def test() {
        PulsarClient client = PulsarClient.builder()
                .serviceUrl(pulsarService.brokerServiceUrl)
                .build()

        Producer<String> producer = client.newProducer(Schema.STRING)
                .topic(TOPIC)
                .enableBatching(false)
                .create()

        Consumer<String> consumer = client.newConsumer(Schema.STRING)
                .topic(TOPIC)
                //.subscriptionInitialPosition()
                .subscriptionName("test-subs-1")
                .ackTimeout(10, TimeUnit.SECONDS)
                .subscriptionType(SubscriptionType.Exclusive)
                .messageListener(Mesa)
                .subscribe()



        for (int i = 1; i <= NUM_OF_MESSAGES; ++i) {
            producer.send("Hello_" + i)
        }


        Message<String> message
        for (int i = 1; i <= NUM_OF_MESSAGES; ++i) {
            // This calls blocks until a message is available.
            message = consumer.receive(1, TimeUnit.SECONDS)
            //log.info("Message received : ${message.getValue()}")
            println("Message received : ${message.messageId}:${message.value}")

            consumer.acknowledge(message)
        }

        producer.close()
        consumer.close()
        client.close()

        expect:
        1==1

    }

    def cleanupSpec() {
        standalone.close()
    }

}

最佳答案

您应该能够通过Pulsar Admin API创建Pulsar函数,就像处理普通的Pulsar集群一样,例如

PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl("http://localhost:8080").build();

FunctionConfig functionConfig = new FunctionConfig();
functionConfig.setName("exclamation");
functionConfig.setInputs(Collections.singleton("input"));
functionConfig.setClassName(ExclamationFunction.class.getName());
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
functionConfig.setOutput("output");
functionConfig.setJar("/tmp/my-jar.jar")

pulsarAdmin.functions().createFunction(functionConfig, functionConfig.getJar());


Apache Pulsar项目中还有很多集成测试,用于测试Pulsar功能。有基于docker的真正集成测试,并且有单进程“集成”测试。这是您可以参考的单个流程“集成”测试的示例:

https://github.com/apache/pulsar/blob/master/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java

10-06 02:32