我正在从 IDE 运行 flink。将数据存储在可查询的工作中,
但不知何故,当我查询它时,它会引发异常。

异常(exception)

Failure(akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://[email protected]:6123/), Path(/user/jobmanager)])

我的代码:
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,"localhost")
config.setString(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,"6123")

@throws[Throwable]
def recover(failure: Throwable): Future[Array[Byte]] = if (failure.isInstanceOf[AssertionError]) return Futures.failed(failure)
else {
  // At startup some failures are expected
  // due to races. Make sure that they don't
  // fail this test.
  return Patterns.after(retryDelay, TEST_ACTOR_SYSTEM.scheduler, TEST_ACTOR_SYSTEM.dispatcher, new Callable[Future[Array[Byte]]]() {
    @throws[Exception]
    def call: Future[Array[Byte]] = return getKvStateWithRetries(queryName, key, serializedKey)
  })
}
}

  @SuppressWarnings(Array("unchecked"))
  private def getKvStateWithRetries(queryName: String,
                                keyHash: Int,
                                serializedKey: Array[Byte]): Future[Array[Byte]] = {

val kvState = client.getKvState(jobID, queryName, keyHash, serializedKey)
kvState.recoverWith(recover(queryName, keyHash, serializedKey))
  }

def onSuccess = new OnSuccess[Array[Byte]]() {
@throws(classOf[Throwable])
override def onSuccess(result: Array[Byte]): Unit = {
  println("found record ")
  val value = KvStateRequestSerializer.deserializeValue(result, valueSerializer)
  println(value)
 }
}


override def invoke(query: QueryMetaData): Unit = {
println("getting inside querystore"+query.record)
val serializedResult = flinkQuery.getResult(query.record, queryName)
serializedResult.onSuccess(onSuccess)



从文档中默认 flink 运行在 localhost:6123
连接有问题吗?我需要在单独的集群中提交作业吗?

最佳答案

经过大量的谷歌搜索,我找到了一个解决方案。

我正在使用 LocalStreamEnvironment 并收到相同的错误,直到找到此线程 RemoteEnv connect failed 。所描述的错误是针对不同的设置(不是本地),但用于测试的主题中包含的 gist 示例正在创建 LocalFlinkMiniCluster,参数“useSingleActorSystem”设置为 false

查看 LocalStreamEnvironment 的实现,创建 MiniCluster 时将“useSingleActorSystem”设置为 true

我只是创建了一个扩展 LocalStreamEnvironment 的类 LocalQueryableStreamEnvironment ,其中创建的迷你集群“useSingleActorSystem”设置为 true ,并且一切都在 IDE 中工作。

现在我的代码如下:

配置:

Configuration config = new Configuration();
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 6);
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
config.setInteger(JobManagerOptions.WEB_PORT, JobManagerOptions.WEB_PORT.defaultValue());
config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true);
config.setString(JobManagerOptions.ADDRESS, "localhost");
config.setInteger(JobManagerOptions.PORT,JobManagerOptions.PORT.defaultValue());
**config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);**

注意:QueryableState 仅适用于此配置 LOCAL_NUMBER_TASK_MANAGER 设置为大于 1 的值!

实例化/执行环境:
LocalQueryableStreamEnvironment env = LocalQueryableStreamEnvironment.createLocalEnvironment(3, config);
...
env.addSource(anySource)
   .keyby(anyAtribute)
   .flatmap(new UpdateMyStateToBeQueriedLaterMapper())
   .addSink(..); //etc
...
env.execute("JobNameHere");

并创建客户端:
final Configuration config = new Configuration();
config.setString(JobManagerOptions.ADDRESS, "localhost");
config.setInteger(JobManagerOptions.PORT, JobManagerOptions.PORT.defaultValue());

HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils
    .createHighAvailabilityServices(
                   config,
                   Executors.newSingleThreadScheduledExecutor(),
                   HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION
    );
return new QueryableStateClient(config,highAvailabilityServices);

欲了解更多信息,请访问:

Queryable States in ApacheFlink - Implementation

Queryable State Client with 1.3.0-rc0

我的依赖:
compile group: 'org.apache.flink', name: 'flink-java', version: '1.3.1'
compile group: 'org.apache.flink', name: 'flink-jdbc', version: '1.3.1'
compile group: 'org.apache.flink', name: 'flink-streaming-java_2.11', version: '1.3.1'
compile group: 'org.apache.flink', name: 'flink-clients_2.11', version: '1.3.1'
compile group: 'org.apache.flink', name: 'flink-cep_2.11', version: '1.3.1'
compile group: 'org.apache.flink', name: 'flink-connector-kafka-0.10_2.11', version: '1.3.1'
compile 'org.apache.flink:flink-runtime-web_2.11:1.3.1'

关于apache-flink - FlinkQueryableState : configuration issues on a local cluster,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/39200421/

10-11 09:32