为了进行测试,我设法运行了嵌入式独立脉冲星服务器和客户端。我也可以发送和接收消息。但是,我实际上想要(集成)测试功能(实现org.apache.pulsar.functions.api.Function
)。如何在嵌入式设置中注册功能?
package kic.data.stream.pulsar
import groovy.util.logging.Log
import org.apache.pulsar.PulsarStandalone
import org.apache.pulsar.PulsarStandaloneBuilder
import org.apache.pulsar.broker.PulsarService
import org.apache.pulsar.broker.ServiceConfiguration
import org.apache.pulsar.client.api.Consumer
import org.apache.pulsar.client.api.ConsumerEventListener
import org.apache.pulsar.client.api.Message
import org.apache.pulsar.client.api.Producer
import org.apache.pulsar.client.api.PulsarClient
import org.apache.pulsar.client.api.Schema
import org.apache.pulsar.client.api.SubscriptionType
import spock.lang.Specification
import java.util.concurrent.TimeUnit
@Log
class PulsarEmbeddedTest extends Specification {
static final String TOPIC = "hello";
static final int NUM_OF_MESSAGES = 100;
static PulsarStandalone standalone
static PulsarService pulsarService
def setupSpec() {
def configFile = new File(ClassLoader.getSystemResource("broker.conf").toURI()).getAbsolutePath()
def conf = new ServiceConfiguration(clusterName: "test-cluster", zookeeperServers: "localhost:2184")
log.info("${PulsarStandalone.properties}")
standalone = PulsarStandaloneBuilder.instance()
.withConfig(conf)
.withNoStreamStorage(true)
.build()
standalone.configFile = configFile
standalone.start()
pulsarService = new PulsarService(conf)
}
def test() {
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarService.brokerServiceUrl)
.build()
Producer<String> producer = client.newProducer(Schema.STRING)
.topic(TOPIC)
.enableBatching(false)
.create()
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(TOPIC)
//.subscriptionInitialPosition()
.subscriptionName("test-subs-1")
.ackTimeout(10, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Exclusive)
.messageListener(Mesa)
.subscribe()
for (int i = 1; i <= NUM_OF_MESSAGES; ++i) {
producer.send("Hello_" + i)
}
Message<String> message
for (int i = 1; i <= NUM_OF_MESSAGES; ++i) {
// This calls blocks until a message is available.
message = consumer.receive(1, TimeUnit.SECONDS)
//log.info("Message received : ${message.getValue()}")
println("Message received : ${message.messageId}:${message.value}")
consumer.acknowledge(message)
}
producer.close()
consumer.close()
client.close()
expect:
1==1
}
def cleanupSpec() {
standalone.close()
}
}
最佳答案
您应该能够通过Pulsar Admin API创建Pulsar函数,就像处理普通的Pulsar集群一样,例如
PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl("http://localhost:8080").build();
FunctionConfig functionConfig = new FunctionConfig();
functionConfig.setName("exclamation");
functionConfig.setInputs(Collections.singleton("input"));
functionConfig.setClassName(ExclamationFunction.class.getName());
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
functionConfig.setOutput("output");
functionConfig.setJar("/tmp/my-jar.jar")
pulsarAdmin.functions().createFunction(functionConfig, functionConfig.getJar());
Apache Pulsar项目中还有很多集成测试,用于测试Pulsar功能。有基于docker的真正集成测试,并且有单进程“集成”测试。这是您可以参考的单个流程“集成”测试的示例:
https://github.com/apache/pulsar/blob/master/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java