本文介绍了如何使Genserver以频率值Elixir运行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经看到了许多GenServer实现,我正在尝试创建具有此类规范的实现,但是我不确定其GenServer的用例.

I have seen many GenServer implementations, I am trying to create one with such specifications, But I am not sure its GenServer's use case.

我有一个状态,例如

%{url: "abc.com/jpeg", name: "Camera1", id: :camera_one, frequency: 10}

我有这100种状态,具有不同的值,我的用例包含5个步骤.

I have such 100 states, with different values, my use case contains on 5 steps.

  1. 以Gen身份开始每个州{?}.
  2. 将HTTP请求发送到该URL.
  3. 获取结果.
  4. 发送另一个HTTP请求,其中的数据来自第一个请求.
  5. 将过程置于睡眠状态.如果频率是10,则持续10秒钟,以此类推,然后在10秒钟后,它将再次从1步开始.

现在,当我启动100个这样的工作程序时,将有100 * 2个具有频率的HTTP请求.我不确定我要使用GenServer还是GenStage或Flow甚至是Broadway?

Now when I will start 100 such workers, there are going to be 100 * 2 HTTP requests with frequency. I am not sure about either I am going to use GenServer or GenStage or Flow or even Broadway?

我还担心HTTP请求不会崩溃,例如一个有状态的worker将发送一个请求,如果频率为1秒,则在第一个请求返回之前,另一个请求将被发送,GenServer是否有能力处理这些情况?我认为这叫背压?

I am also concerned the HTTP requests won't collapse such as one worker, with a state, will send a request and if the frequency is 1 Second before the first request comes back, the other request would have been sent, would GenServer is capable enough to handle those cases? which I think are called back pressure?

我一直在询问和研究这个用例,对于我的用例,我也一直被引向RabbitMQ.

I have been asking and looking at this use case of so long, I have been guided towards RabbitMQ as well for my use case.

任何指导都将非常有帮助,或者任何最低限度的示例都将不胜感激.

Any guidance would be so helpful, or any minimal example would be so grateful.

? GenServer/GenStage/GenStateMachine

? GenServer/ GenStage / GenStateMachine

推荐答案

您的问题归结为在给定时间减少并发网络请求的数量.

Your problem comes down to reducing the number of concurrent network requests at a given time.

一种简单的方法是让GenServer跟踪传出请求的数量.然后,对于每个客户(在您的情况下,最多200个),它可以检查是否有未解决的请求,然后采取相应的措施.服务器外观如下:

A simple approach would be to have a GenServer which keeps track of the count of outgoing requests. Then, for each client (Up to 200 in your case), it can check to see if there's an open request, and then act accordingly. Here's what the server could look like:

defmodule Throttler do
  use GenServer

  #server
  @impl true
  def init(max_concurrent: max_concurrent) do
    {:ok, %{count: 0, max_concurrent: max_concurrent}}
  end

  @impl true
  def handle_call(:run, _from, %{count: count, max_concurrent: max_concurrent} = state) when count < max_concurrent, do: {:reply, :ok, %{state | count: count + 1}}
  @impl true
  def handle_call(:run, _from, %{count: count, max_concurrent: max_concurrent} = state) when count >= max_concurrent, do: {:reply, {:error, :too_many_requests}, state}


  @impl true
  def handle_call(:finished, _from, %{count: count} = state) when count > 0, do: {:reply, :ok, %{state | count: count - 1}}
end

好的,现在我们有了一个可以在其中调用 handle_call(pid,:run)的服务器,它将告诉我们是否超出了计数.任务(获取URL)完成后,我们需要调用 handle_call(pid,:finished),以使服务器知道我们已经完成了任务.

Okay, so now we have a server where we can call handle_call(pid, :run) and it will tell us whether or not we've exceeded the count. Once the task (getting the URL) is complete, we need to call handle_call(pid, :finished) to let the server know we've completed the task.

在客户端,我们可以将其包装在方便的辅助函数中.(请注意,它仍然在Throttler模块中,因此 __ MODULE __ 可以使用)

On the client side, we can wrap that in a convenient helper function. (Note this is still within the Throttler module so __MODULE__ works)

defmodule Throttler do
  #client
  def start_link(max_concurrent: max_concurrent) when max_concurrent > 0 do
    GenServer.start_link(__MODULE__, [max_concurrent: max_concurrent])
  end

  def execute_async(pid, func) do
    GenServer.call(pid, :run)
    |> case do
      :ok ->
        task = Task.async(fn ->
          try do
            func.()
          after
            GenServer.call(pid, :finished)
          end
        end)
        {:ok, task}
      {:error, reason} -> {:error, reason, func}
    end
  end
end

在这里,我们传入一个要在客户端异步执行的函数,并在执行之前在服务器端调用:run和:finished的工作.如果成功,我们将退回任务,否则将失败.

Here we pass in a function that we want to asynchronously execute on the client side, and do the work of calling :run and :finished on the server side before executing. If it succeeds, we get a task back, otherwise we get a failure.

将它们放在一起,您将获得如下代码:

Putting it all together, and you get code that looks like this:

{:ok, pid} = Throttler.start_link(max_concurrent: 3)
results = Enum.map(1..5, fn num ->
  Throttler.execute(pid, fn ->
    IO.puts("Running command #{num}")
    :timer.sleep(:5000)
    IO.puts("Sleep complete for #{num}")
    num * 10
  end)
end)
valid_tasks = Enum.filter(results, &(match?({:ok, _func}, &1))) |> Enum.map(&elem(&1, 1))

现在您有一堆成功或失败的任务,您可以适当地采取行动.

Now you have a bunch of tasks that either succeeded, or failed and you can act appropriately.

失败后您会怎么做?这是反压的有趣部分:)最简单的方法是在您最终将消除下游压力的前提下,进行超时并重试.否则,您可以完全使请求失败,并继续将问题推向上游.

What do you do upon failure? That's the interesting part of backpressure :) The simplest thing would be to have a timeout and retry, under the assumption that you will eventually clear the pressure downstream. Otherwise you can fail out the requests entirely and keep pushing the problem upstream.

这篇关于如何使Genserver以频率值Elixir运行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-12 01:09